|
@@ -135,7 +135,7 @@ const (
|
|
func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
|
|
func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
|
|
// 遍历文件目录
|
|
// 遍历文件目录
|
|
var wg sync.WaitGroup
|
|
var wg sync.WaitGroup
|
|
- fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
|
|
|
|
|
|
+ fw, e := filewalker.NewFileWalker([]string{importer.datapath}, `^[^\.].*`)
|
|
if e != nil {
|
|
if e != nil {
|
|
err = e
|
|
err = e
|
|
return
|
|
return
|
|
@@ -265,6 +265,8 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
|
|
skiplines := int(linefrom)
|
|
skiplines := int(linefrom)
|
|
blockcount = blockfrom
|
|
blockcount = blockfrom
|
|
doinglines := []int64{}
|
|
doinglines := []int64{}
|
|
|
|
+ donelines := linefrom
|
|
|
|
+ doneblocks := blockfrom
|
|
retrycount = totalretrycount
|
|
retrycount = totalretrycount
|
|
// maxresponsetime := time.Duration(0)
|
|
// maxresponsetime := time.Duration(0)
|
|
var wg sync.WaitGroup
|
|
var wg sync.WaitGroup
|
|
@@ -313,7 +315,9 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
|
|
err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
|
|
err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
+ atomic.AddInt64(&doneblocks, 1)
|
|
if logstatus {
|
|
if logstatus {
|
|
|
|
+ readinglines := doinglines[len(doinglines)-1]
|
|
if doingline == doinglines[0] {
|
|
if doingline == doinglines[0] {
|
|
importer.importstatus.mutex.Lock()
|
|
importer.importstatus.mutex.Lock()
|
|
importer.importstatus.ImportStatus[filename] = &ImportStatus{
|
|
importer.importstatus.ImportStatus[filename] = &ImportStatus{
|
|
@@ -321,14 +325,15 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
|
|
RecordsCount: doingblock,
|
|
RecordsCount: doingblock,
|
|
RetryCount: retrycount,
|
|
RetryCount: retrycount,
|
|
}
|
|
}
|
|
|
|
+ donelines = doingline
|
|
importer.importstatus.TotalUseTime = time.Since(importer.starttime)
|
|
importer.importstatus.TotalUseTime = time.Since(importer.starttime)
|
|
importer.importstatus.Save()
|
|
importer.importstatus.Save()
|
|
doinglines = doinglines[1:]
|
|
doinglines = doinglines[1:]
|
|
if time.Since(lastlogtime) > 5*time.Second {
|
|
if time.Since(lastlogtime) > 5*time.Second {
|
|
if retrycount > 0 {
|
|
if retrycount > 0 {
|
|
- logger.Info("file", filename, "imported", doingline, "lines", doingblock, "records", retrycount, "retry times")
|
|
|
|
|
|
+ logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records", retrycount, "retry times")
|
|
} else {
|
|
} else {
|
|
- logger.Info("file", filename, "imported", doingline, "lines", doingblock, "records")
|
|
|
|
|
|
+ logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records")
|
|
}
|
|
}
|
|
lastlogtime = time.Now()
|
|
lastlogtime = time.Now()
|
|
}
|
|
}
|
|
@@ -337,8 +342,17 @@ func (importer *Importer) importReader(filename string, buf io.Reader, linefrom,
|
|
for i, l := range doinglines {
|
|
for i, l := range doinglines {
|
|
if l == doingline {
|
|
if l == doingline {
|
|
doinglines = append(doinglines[:i], doinglines[i+1:]...)
|
|
doinglines = append(doinglines[:i], doinglines[i+1:]...)
|
|
|
|
+ break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ if time.Since(lastlogtime) > 5*time.Second {
|
|
|
|
+ if retrycount > 0 {
|
|
|
|
+ logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records", retrycount, "retry times")
|
|
|
|
+ } else {
|
|
|
|
+ logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records")
|
|
|
|
+ }
|
|
|
|
+ lastlogtime = time.Now()
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
})
|