libf 8 months ago
parent
commit
fe8304f8c7
2 changed files with 30 additions and 19 deletions
  1. 1 1
      cgimport.conf
  2. 29 18
      importer/importer.go

+ 1 - 1
cgimport.conf

@@ -13,7 +13,7 @@
 ; count=20           ; 保留数量,默认 100
 ; count=20           ; 保留数量,默认 100
 ; scroll=1天         ; 时间滚动 scroll,覆盖 dialy 设置,默认 1 天,支持时间单位 d 天 h 小时 m 分钟 s 秒 ms 毫秒 us 微秒 ns 纳秒,默认毫秒
 ; scroll=1天         ; 时间滚动 scroll,覆盖 dialy 设置,默认 1 天,支持时间单位 d 天 h 小时 m 分钟 s 秒 ms 毫秒 us 微秒 ns 纳秒,默认毫秒
 ; expire=14d         ; 保留时间,默认 14 天
 ; expire=14d         ; 保留时间,默认 14 天
-level=debug
+level=info
 
 
 [cgi]
 [cgi]
 # 文件解析并发数
 # 文件解析并发数

+ 29 - 18
importer/importer.go

@@ -38,6 +38,7 @@ type Importer struct {
 	odbcimporter     *ODBCImporter
 	odbcimporter     *ODBCImporter
 	starttime        time.Time
 	starttime        time.Time
 	currentstarttime time.Time
 	currentstarttime time.Time
+	lastlogtime      cmap.ConcurrentMap[string, time.Time]
 }
 }
 
 
 func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
 func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
@@ -51,6 +52,7 @@ func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilesc
 		fileimportrc: rc.NewRoutinesController("", parallel),
 		fileimportrc: rc.NewRoutinesController("", parallel),
 		odbcqueryrc:  rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*5),
 		odbcqueryrc:  rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*5),
 		odbcimporter: NewODBCImporter(),
 		odbcimporter: NewODBCImporter(),
+		lastlogtime:  cmap.NewSingle[string, time.Time](),
 	}
 	}
 	return importer.Import()
 	return importer.Import()
 }
 }
@@ -275,13 +277,15 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 	if e != nil {
 	if e != nil {
 		return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
 		return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
 	}
 	}
-	lastlogtime := time.Now()
 	skiplines := int(linefrom)
 	skiplines := int(linefrom)
 	blockcount = blockfrom
 	blockcount = blockfrom
 	doinglines := []int64{}
 	doinglines := []int64{}
 	donelines := linefrom
 	donelines := linefrom
 	doneblocks := blockfrom
 	doneblocks := blockfrom
+	savedlines := linefrom
+	savedblocks := blockfrom
 	retrycount = totalretrycount
 	retrycount = totalretrycount
+	linecount = linefrom
 	// maxresponsetime := time.Duration(0)
 	// maxresponsetime := time.Duration(0)
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
 	defer importer.done()
 	defer importer.done()
@@ -290,6 +294,7 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 		if err != nil {
 		if err != nil {
 			return
 			return
 		}
 		}
+		lastlinecount := linecount
 		block, line, linenumber, e := br.ReadBlock(skiplines)
 		block, line, linenumber, e := br.ReadBlock(skiplines)
 		linecount = int64(linenumber)
 		linecount = int64(linenumber)
 		if e != nil {
 		if e != nil {
@@ -329,6 +334,7 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 				err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
 				err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
 				return
 				return
 			}
 			}
+			atomic.AddInt64(&donelines, doingline-lastlinecount)
 			atomic.AddInt64(&doneblocks, 1)
 			atomic.AddInt64(&doneblocks, 1)
 			if logstatus {
 			if logstatus {
 				readinglines := doinglines[len(doinglines)-1]
 				readinglines := doinglines[len(doinglines)-1]
@@ -339,18 +345,11 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 						RecordsCount: doingblock,
 						RecordsCount: doingblock,
 						RetryCount:   retrycount,
 						RetryCount:   retrycount,
 					}
 					}
-					donelines = doingline
 					importer.importstatus.TotalUseTime = time.Since(importer.starttime)
 					importer.importstatus.TotalUseTime = time.Since(importer.starttime)
 					importer.importstatus.Save()
 					importer.importstatus.Save()
+					savedlines = doingline
+					savedblocks = doingblock
 					doinglines = doinglines[1:]
 					doinglines = doinglines[1:]
-					if time.Since(lastlogtime) > 5*time.Second {
-						if retrycount > 0 {
-							logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records", retrycount, "retry times")
-						} else {
-							logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records")
-						}
-						lastlogtime = time.Now()
-					}
 					importer.importstatus.mutex.Unlock()
 					importer.importstatus.mutex.Unlock()
 				} else {
 				} else {
 					for i, l := range doinglines {
 					for i, l := range doinglines {
@@ -359,15 +358,8 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 							break
 							break
 						}
 						}
 					}
 					}
-					if time.Since(lastlogtime) > 5*time.Second {
-						if retrycount > 0 {
-							logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records", retrycount, "retry times")
-						} else {
-							logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records")
-						}
-						lastlogtime = time.Now()
-					}
 				}
 				}
+				importer.logInfo(filename, readinglines, doinglines, donelines, doneblocks, savedlines, savedblocks, retrycount)
 			}
 			}
 		})
 		})
 		if e != nil {
 		if e != nil {
@@ -376,6 +368,25 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
 	}
 	}
 }
 }
 
 
+func (importer *Importer) logInfo(filename string, readinglines int64, doinglines []int64, donelines, doneblocks, savedlines, savedblocks, retrycount int64) {
+	if time.Since(importer.lastlogtime.GetIFPresent(filename)) > 5*time.Second {
+		if odbc.LogDebug {
+			if retrycount > 0 {
+				logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records,", "saved", savedlines, "lines", savedblocks, "records", retrycount, "retry times")
+			} else {
+				logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records,", "saved", savedlines, "lines", savedblocks, "records")
+			}
+		} else {
+			if retrycount > 0 {
+				logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records", retrycount, "retry times")
+			} else {
+				logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records")
+			}
+		}
+		importer.lastlogtime.Set(filename, time.Now())
+	}
+}
+
 func (importer *Importer) importRecord(record map[string]any, line string, filename string, classaliasname string, linecount int) (retrycount int, err error) {
 func (importer *Importer) importRecord(record map[string]any, line string, filename string, classaliasname string, linecount int) (retrycount int, err error) {
 	if odbc.LogDebug {
 	if odbc.LogDebug {
 		bs, e := json.MarshalIndent(record, "", "  ")
 		bs, e := json.MarshalIndent(record, "", "  ")