|
@@ -37,7 +37,7 @@ type Importer struct {
|
|
|
currentstarttime time.Time
|
|
|
}
|
|
|
|
|
|
-func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
|
|
|
+func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount int64, totalusetime time.Duration, filescount, linescount, recordscount int64, usetime time.Duration, err error) {
|
|
|
concurlimt := mcfg.GetInt("odbc.concurrent.limit", 100)
|
|
|
importer := &Importer{
|
|
|
datapath: datapath,
|
|
@@ -52,8 +52,9 @@ func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilesc
|
|
|
return importer.Import()
|
|
|
}
|
|
|
|
|
|
-func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
|
|
|
+func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecordscount int64, totalusetime time.Duration, filescount, linescount, recordscount int64, usetime time.Duration, err error) {
|
|
|
if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload {
|
|
|
+ // reload
|
|
|
err = importer.importstatus.Load()
|
|
|
if err != nil {
|
|
|
return
|
|
@@ -64,9 +65,9 @@ func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, to
|
|
|
}
|
|
|
}
|
|
|
if importer.rebuild {
|
|
|
- // reload
|
|
|
+ // rebuild
|
|
|
// 清除已有类
|
|
|
- err = importer.odbcimporter.reload()
|
|
|
+ err = importer.odbcimporter.rebuild()
|
|
|
if err != nil {
|
|
|
return
|
|
|
}
|
|
@@ -76,36 +77,32 @@ func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, to
|
|
|
if err != nil {
|
|
|
return
|
|
|
}
|
|
|
- totalfilescount = int64(len(importer.importstatus.ImportStatus))
|
|
|
- for _, v := range importer.importstatus.ImportStatus {
|
|
|
- totalrecordscount += v.RecordsCount
|
|
|
- }
|
|
|
totalusetime = importer.importstatus.TotalUseTime
|
|
|
importer.starttime = time.Now().Add(-totalusetime)
|
|
|
importer.currentstarttime = time.Now()
|
|
|
|
|
|
reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
|
|
|
- fc, rc, ut, e := importer.ImportEdgeFiles(reedgefile)
|
|
|
+ efc, elc, erc, ut, e := importer.ImportEdgeFiles(reedgefile, false)
|
|
|
if e != nil {
|
|
|
err = e
|
|
|
return
|
|
|
}
|
|
|
- totalfilescount += fc
|
|
|
- totalrecordscount += rc
|
|
|
- filescount += fc
|
|
|
- recordscount += rc
|
|
|
- usetime += ut
|
|
|
- totalusetime = importer.importstatus.TotalUseTime
|
|
|
- fc, rc, ut, e = importer.ImportNonEdgeFiles(reedgefile)
|
|
|
+ afc, alc, arc, ut, e := importer.ImportNonEdgeFiles(reedgefile, true)
|
|
|
if e != nil {
|
|
|
err = e
|
|
|
return
|
|
|
}
|
|
|
- totalfilescount += fc
|
|
|
- totalrecordscount += rc
|
|
|
- filescount += fc
|
|
|
- recordscount += rc
|
|
|
- usetime += ut
|
|
|
+ totalfilescount = int64(len(importer.importstatus.ImportStatus)) + efc
|
|
|
+ for _, v := range importer.importstatus.ImportStatus {
|
|
|
+ totallinecount += v.LinesCount
|
|
|
+ totalrecordscount += v.RecordsCount
|
|
|
+ }
|
|
|
+ totallinecount += elc
|
|
|
+ totalrecordscount += erc
|
|
|
+ filescount = afc + efc
|
|
|
+ linescount = alc + elc
|
|
|
+ recordscount = arc + erc
|
|
|
+ usetime = ut
|
|
|
totalusetime = importer.importstatus.TotalUseTime
|
|
|
|
|
|
importer.importstatus.WaitSaveDone()
|
|
@@ -113,24 +110,24 @@ func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, to
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) {
|
|
|
+func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount int64, usetime time.Duration, err error) {
|
|
|
return importer.ImportFiles(func(basedir string, fpath string) FWOP {
|
|
|
if !reedgefile.MatchString(filepath.Base(fpath)) {
|
|
|
// 忽略非EDGE文件
|
|
|
return FWOP_IGNORE
|
|
|
}
|
|
|
return FWOP_CONTINUE
|
|
|
- })
|
|
|
+ }, logstatus)
|
|
|
}
|
|
|
|
|
|
-func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) {
|
|
|
+func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount int64, usetime time.Duration, err error) {
|
|
|
return importer.ImportFiles(func(basedir string, fpath string) FWOP {
|
|
|
if reedgefile.MatchString(filepath.Base(fpath)) {
|
|
|
// 忽略EDGE文件
|
|
|
return FWOP_IGNORE
|
|
|
}
|
|
|
return FWOP_CONTINUE
|
|
|
- })
|
|
|
+ }, logstatus)
|
|
|
}
|
|
|
|
|
|
type FWOP int
|
|
@@ -141,7 +138,7 @@ const (
|
|
|
FWOP_CONTINUE
|
|
|
)
|
|
|
|
|
|
-func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP) (filescount, recordscount int64, usetime time.Duration, err error) {
|
|
|
+func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount int64, usetime time.Duration, err error) {
|
|
|
// 遍历文件目录
|
|
|
var wg sync.WaitGroup
|
|
|
fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
|
|
@@ -170,30 +167,38 @@ func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FW
|
|
|
}
|
|
|
// 继续处理当前文件
|
|
|
filename := filepath.Join(basedir, fpath)
|
|
|
+ filescount++
|
|
|
wg.Add(1)
|
|
|
// 并发处理
|
|
|
importer.fileimportrc.ConcurCall(1,
|
|
|
func() {
|
|
|
defer wg.Done()
|
|
|
- logger.Info("import", "file", filename)
|
|
|
importer.importstatus.mutex.RLock()
|
|
|
importstatus := importer.importstatus.ImportStatus[filename]
|
|
|
importer.importstatus.mutex.RUnlock()
|
|
|
- importedrecordscount := int64(0)
|
|
|
+ linefrom, blockfrom := int64(0), int64(0)
|
|
|
if importstatus != nil {
|
|
|
- importedrecordscount = importstatus.RecordsCount
|
|
|
- return
|
|
|
+ linefrom, blockfrom = importstatus.LinesCount, importstatus.RecordsCount
|
|
|
}
|
|
|
- records, e := importer.ImportFile(filename, importedrecordscount)
|
|
|
+ if linefrom == 0 {
|
|
|
+ logger.Info("import", "file", filename)
|
|
|
+ } else {
|
|
|
+ logger.Info("import", "file", filename, "from line", linefrom)
|
|
|
+ }
|
|
|
+ lines, records, e := importer.ImportFile(filename, linefrom, blockfrom, logstatus)
|
|
|
if e != nil {
|
|
|
err = e
|
|
|
return
|
|
|
}
|
|
|
- atomic.AddInt64(&filescount, 1)
|
|
|
- atomic.AddInt64(&recordscount, records)
|
|
|
+ atomic.AddInt64(&linescount, lines-linefrom)
|
|
|
+ atomic.AddInt64(&recordscount, records-blockfrom)
|
|
|
usetime = time.Since(importer.currentstarttime)
|
|
|
importer.importstatus.mutex.Lock()
|
|
|
- importer.importstatus.ImportStatus[filename] = &ImportStatus{RecordsCount: importedrecordscount + records}
|
|
|
+ if logstatus {
|
|
|
+ importer.importstatus.ImportStatus[filename] = &ImportStatus{
|
|
|
+ LinesCount: lines,
|
|
|
+ RecordsCount: records}
|
|
|
+ }
|
|
|
importer.importstatus.TotalUseTime = time.Since(importer.starttime)
|
|
|
importer.importstatus.mutex.Unlock()
|
|
|
importer.importstatus.Save()
|
|
@@ -214,16 +219,16 @@ func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FW
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func (importer *Importer) ImportFile(filepath string, skiprecordscount int64) (blockcount int64, err error) {
|
|
|
+func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom int64, logstatus bool) (linecount, blockcount int64, err error) {
|
|
|
f, e := os.Open(filepath)
|
|
|
if e != nil {
|
|
|
- return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
|
|
|
+ return linecount, blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
|
|
|
}
|
|
|
defer f.Close()
|
|
|
- return importer.importReader(filepath, f, skiprecordscount)
|
|
|
+ return importer.importReader(filepath, f, linefrom, blockfrom, logstatus)
|
|
|
}
|
|
|
|
|
|
-func (importer *Importer) importReader(filename string, buf io.Reader, skiprecordscount int64) (blockcount int64, err error) {
|
|
|
+func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom int64, logstatus bool) (linecount, blockcount int64, err error) {
|
|
|
var filetype schema.FileType
|
|
|
switch {
|
|
|
case strings.Contains(filename, "_L1_"):
|
|
@@ -252,39 +257,67 @@ func (importer *Importer) importReader(filename string, buf io.Reader, skiprecor
|
|
|
}
|
|
|
br, e := reader.NewBlockReader(filename, filetype, buf)
|
|
|
if e != nil {
|
|
|
- return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
|
|
|
+ return linecount, blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
|
|
|
}
|
|
|
+ lastlogtime := time.Now()
|
|
|
+ skiplines := int(linefrom)
|
|
|
+ blockcount = blockfrom
|
|
|
+ doinglines := []int64{}
|
|
|
var wg sync.WaitGroup
|
|
|
defer importer.done()
|
|
|
defer wg.Wait()
|
|
|
- n := int64(0)
|
|
|
for {
|
|
|
if err != nil {
|
|
|
break
|
|
|
}
|
|
|
- block, line, linecount, e := br.ReadBlock()
|
|
|
+ block, line, linenumber, e := br.ReadBlock(skiplines)
|
|
|
+ linecount = int64(linenumber)
|
|
|
if e != nil {
|
|
|
- return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
|
|
|
+ return linecount, blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
|
|
|
}
|
|
|
if block == nil {
|
|
|
return
|
|
|
}
|
|
|
- n++
|
|
|
- if n <= skiprecordscount {
|
|
|
- continue
|
|
|
- }
|
|
|
+ blockcount++
|
|
|
wg.Add(1)
|
|
|
+ doingline := linecount
|
|
|
+ doingblock := blockcount
|
|
|
+ if logstatus {
|
|
|
+ doinglines = append(doinglines, doingline)
|
|
|
+ }
|
|
|
e = importer.odbcqueryrc.ConcurCall(1, func() {
|
|
|
defer wg.Done()
|
|
|
- e = importer.importRecord(block, line, filename, filetype, linecount)
|
|
|
+ e = importer.importRecord(block, line, filename, filetype, int(doingline))
|
|
|
if e != nil {
|
|
|
- err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
|
|
|
+ err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
|
|
|
return
|
|
|
}
|
|
|
- atomic.AddInt64(&blockcount, 1)
|
|
|
+ if logstatus {
|
|
|
+ if doingline == doinglines[0] {
|
|
|
+ importer.importstatus.mutex.Lock()
|
|
|
+ importer.importstatus.ImportStatus[filename] = &ImportStatus{
|
|
|
+ LinesCount: doingline,
|
|
|
+ RecordsCount: doingblock,
|
|
|
+ }
|
|
|
+ importer.importstatus.TotalUseTime = time.Since(importer.starttime)
|
|
|
+ importer.importstatus.Save()
|
|
|
+ doinglines = doinglines[1:]
|
|
|
+ if time.Since(lastlogtime) > 5*time.Second {
|
|
|
+ logger.Info("file", filename, "imported", doingblock, "records")
|
|
|
+ lastlogtime = time.Now()
|
|
|
+ }
|
|
|
+ importer.importstatus.mutex.Unlock()
|
|
|
+ } else {
|
|
|
+ for i, l := range doinglines {
|
|
|
+ if l == doingline {
|
|
|
+ doinglines = append(doinglines[:i], doinglines[i+1:]...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
})
|
|
|
if e != nil {
|
|
|
- return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
|
|
|
+ return linecount, blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
|
|
|
}
|
|
|
}
|
|
|
return
|