|
@@ -6,11 +6,13 @@ import (
|
|
|
"io"
|
|
|
"os"
|
|
|
"path/filepath"
|
|
|
+ "regexp"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"sync/atomic"
|
|
|
"time"
|
|
|
|
|
|
+ "git.wecise.com/wecise/cgimport/graph"
|
|
|
"git.wecise.com/wecise/cgimport/odbc"
|
|
|
"git.wecise.com/wecise/cgimport/reader"
|
|
|
"git.wecise.com/wecise/cgimport/schema"
|
|
@@ -23,11 +25,15 @@ var mcfg = odbc.Config
|
|
|
var logger = odbc.Logger
|
|
|
|
|
|
type Importer struct {
|
|
|
- datapath string
|
|
|
- parallel int
|
|
|
- reload bool
|
|
|
- importrc *rc.RoutinesController
|
|
|
- odbcimporter *ODBCImporter
|
|
|
+ datapath string
|
|
|
+ parallel int
|
|
|
+ reload bool
|
|
|
+ importstatus *CGIStatus
|
|
|
+ fileimportrc *rc.RoutinesController
|
|
|
+ odbcqueryrc *rc.RoutinesController
|
|
|
+ odbcimporter *ODBCImporter
|
|
|
+ starttime time.Time
|
|
|
+ currentstarttime time.Time
|
|
|
}
|
|
|
|
|
|
func ImportDir(datapath string, parallel int, reload bool) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
|
|
@@ -35,23 +41,17 @@ func ImportDir(datapath string, parallel int, reload bool) (totalfilescount, tot
|
|
|
datapath: datapath,
|
|
|
parallel: parallel,
|
|
|
reload: reload,
|
|
|
- importrc: rc.NewRoutinesController("", 100),
|
|
|
+ importstatus: NewCGIStatus(),
|
|
|
+ fileimportrc: rc.NewRoutinesController("", parallel),
|
|
|
+ odbcqueryrc: rc.NewRoutinesController("", parallel*10),
|
|
|
odbcimporter: NewODBCImporter(),
|
|
|
}
|
|
|
return importer.Import()
|
|
|
}
|
|
|
|
|
|
func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
|
|
|
- var cgirc = rc.NewRoutinesController("", importer.parallel)
|
|
|
- var wg sync.WaitGroup
|
|
|
- fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
|
|
|
- if e != nil {
|
|
|
- err = e
|
|
|
- return
|
|
|
- }
|
|
|
- cgistatus := NewCGIStatus()
|
|
|
if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload {
|
|
|
- err = cgistatus.Load()
|
|
|
+ err = importer.importstatus.Load()
|
|
|
if err != nil {
|
|
|
return
|
|
|
}
|
|
@@ -68,45 +68,126 @@ func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, to
|
|
|
if err != nil {
|
|
|
return
|
|
|
}
|
|
|
- totalfilescount = int64(len(cgistatus.ImportStatus))
|
|
|
- for _, v := range cgistatus.ImportStatus {
|
|
|
+ totalfilescount = int64(len(importer.importstatus.ImportStatus))
|
|
|
+ for _, v := range importer.importstatus.ImportStatus {
|
|
|
totalrecordscount += v.RecordsCount
|
|
|
}
|
|
|
- totalusetime = cgistatus.TotalUseTime
|
|
|
- st := time.Now().Add(-totalusetime)
|
|
|
- cst := time.Now()
|
|
|
+ totalusetime = importer.importstatus.TotalUseTime
|
|
|
+ importer.starttime = time.Now().Add(-totalusetime)
|
|
|
+ importer.currentstarttime = time.Now()
|
|
|
+
|
|
|
+ reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
|
|
|
+ fc, rc, ut, e := importer.ImportEdgeFiles(reedgefile)
|
|
|
+ if e != nil {
|
|
|
+ err = e
|
|
|
+ return
|
|
|
+ }
|
|
|
+ totalfilescount += fc
|
|
|
+ totalrecordscount += rc
|
|
|
+ filescount += fc
|
|
|
+ recordscount += rc
|
|
|
+ usetime += ut
|
|
|
+ totalusetime = importer.importstatus.TotalUseTime
|
|
|
+ fc, rc, ut, e = importer.ImportNonEdgeFiles(reedgefile)
|
|
|
+ if e != nil {
|
|
|
+ err = e
|
|
|
+ return
|
|
|
+ }
|
|
|
+ totalfilescount += fc
|
|
|
+ totalrecordscount += rc
|
|
|
+ filescount += fc
|
|
|
+ recordscount += rc
|
|
|
+ usetime += ut
|
|
|
+ totalusetime = importer.importstatus.TotalUseTime
|
|
|
+
|
|
|
+ importer.importstatus.WaitSaveDone()
|
|
|
+ importer.alldone()
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) {
|
|
|
+ return importer.ImportFiles(func(basedir string, fpath string) FWOP {
|
|
|
+ if !reedgefile.MatchString(filepath.Base(fpath)) {
|
|
|
+ // 忽略非EDGE文件
|
|
|
+ return FWOP_IGNORE
|
|
|
+ }
|
|
|
+ return FWOP_CONTINUE
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) {
|
|
|
+ return importer.ImportFiles(func(basedir string, fpath string) FWOP {
|
|
|
+ if reedgefile.MatchString(filepath.Base(fpath)) {
|
|
|
+ // 忽略EDGE文件
|
|
|
+ return FWOP_IGNORE
|
|
|
+ }
|
|
|
+ return FWOP_CONTINUE
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+type FWOP int
|
|
|
+
|
|
|
+const (
|
|
|
+ FWOP_IGNORE FWOP = iota + 1
|
|
|
+ FWOP_BREAK
|
|
|
+ FWOP_CONTINUE
|
|
|
+)
|
|
|
+
|
|
|
+func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP) (filescount, recordscount int64, usetime time.Duration, err error) {
|
|
|
// 遍历文件目录
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
|
|
|
+ if e != nil {
|
|
|
+ err = e
|
|
|
+ return
|
|
|
+ }
|
|
|
e = fw.List(func(basedir string, fpath string) bool {
|
|
|
if err != nil {
|
|
|
+ // 前方发生错误,结束遍历
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ if strings.Contains(fpath, string(filepath.Separator)) {
|
|
|
+ // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历
|
|
|
return false
|
|
|
}
|
|
|
+ switch fwop(basedir, fpath) {
|
|
|
+ case FWOP_IGNORE:
|
|
|
+ // 忽略当前文件,继续处理下一文件
|
|
|
+ return true
|
|
|
+ case FWOP_BREAK:
|
|
|
+ // 结束遍历
|
|
|
+ return false
|
|
|
+ case FWOP_CONTINUE:
|
|
|
+ default:
|
|
|
+ }
|
|
|
+ // 继续处理当前文件
|
|
|
filename := filepath.Join(basedir, fpath)
|
|
|
wg.Add(1)
|
|
|
- cgirc.ConcurCall(1,
|
|
|
+ // 并发处理
|
|
|
+ importer.fileimportrc.ConcurCall(1,
|
|
|
func() {
|
|
|
defer wg.Done()
|
|
|
- cgistatus.mutex.RLock()
|
|
|
- importstatus := cgistatus.ImportStatus[filename]
|
|
|
- cgistatus.mutex.RUnlock()
|
|
|
+ importer.importstatus.mutex.RLock()
|
|
|
+ importstatus := importer.importstatus.ImportStatus[filename]
|
|
|
+ importer.importstatus.mutex.RUnlock()
|
|
|
+ importedrecordscount := int64(0)
|
|
|
if importstatus != nil {
|
|
|
+ importedrecordscount = importstatus.RecordsCount
|
|
|
return
|
|
|
}
|
|
|
- records, e := importer.ImportFile(filename)
|
|
|
+ records, e := importer.ImportFile(filename, importedrecordscount)
|
|
|
if e != nil {
|
|
|
err = e
|
|
|
return
|
|
|
}
|
|
|
atomic.AddInt64(&filescount, 1)
|
|
|
atomic.AddInt64(&recordscount, records)
|
|
|
- atomic.AddInt64(&totalfilescount, 1)
|
|
|
- atomic.AddInt64(&totalrecordscount, records)
|
|
|
- usetime = time.Since(cst)
|
|
|
- totalusetime = time.Since(st)
|
|
|
- cgistatus.mutex.Lock()
|
|
|
- cgistatus.ImportStatus[filename] = &ImportStatus{RecordsCount: records}
|
|
|
- cgistatus.TotalUseTime = totalusetime
|
|
|
- cgistatus.mutex.Unlock()
|
|
|
- cgistatus.Save()
|
|
|
+ usetime = time.Since(importer.currentstarttime)
|
|
|
+ importer.importstatus.mutex.Lock()
|
|
|
+ importer.importstatus.ImportStatus[filename] = &ImportStatus{RecordsCount: importedrecordscount + records}
|
|
|
+ importer.importstatus.TotalUseTime = time.Since(importer.starttime)
|
|
|
+ importer.importstatus.mutex.Unlock()
|
|
|
+ importer.importstatus.Save()
|
|
|
},
|
|
|
)
|
|
|
return true
|
|
@@ -120,21 +201,19 @@ func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, to
|
|
|
}
|
|
|
return
|
|
|
}
|
|
|
- cgistatus.WaitSaveDone()
|
|
|
- importer.alldone()
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func (importer *Importer) ImportFile(filepath string) (blockcount int64, err error) {
|
|
|
+func (importer *Importer) ImportFile(filepath string, skiprecordscount int64) (blockcount int64, err error) {
|
|
|
f, e := os.Open(filepath)
|
|
|
if e != nil {
|
|
|
return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
|
|
|
}
|
|
|
defer f.Close()
|
|
|
- return importer.importReader(filepath, f)
|
|
|
+ return importer.importReader(filepath, f, skiprecordscount)
|
|
|
}
|
|
|
|
|
|
-func (importer *Importer) importReader(filename string, buf io.Reader) (blockcount int64, err error) {
|
|
|
+func (importer *Importer) importReader(filename string, buf io.Reader, skiprecordscount int64) (blockcount int64, err error) {
|
|
|
var filetype schema.FileType
|
|
|
switch {
|
|
|
case strings.Contains(filename, "_L1_"):
|
|
@@ -168,6 +247,7 @@ func (importer *Importer) importReader(filename string, buf io.Reader) (blockcou
|
|
|
var wg sync.WaitGroup
|
|
|
defer importer.done()
|
|
|
defer wg.Wait()
|
|
|
+ n := int64(0)
|
|
|
for {
|
|
|
if err != nil {
|
|
|
break
|
|
@@ -179,8 +259,12 @@ func (importer *Importer) importReader(filename string, buf io.Reader) (blockcou
|
|
|
if block == nil {
|
|
|
return
|
|
|
}
|
|
|
+ n++
|
|
|
+ if n <= skiprecordscount {
|
|
|
+ continue
|
|
|
+ }
|
|
|
wg.Add(1)
|
|
|
- e = importer.importrc.ConcurCall(1, func() {
|
|
|
+ e = importer.odbcqueryrc.ConcurCall(1, func() {
|
|
|
defer wg.Done()
|
|
|
e = importer.importRecord(block, line, filename, filetype, linecount)
|
|
|
if e != nil {
|
|
@@ -207,11 +291,12 @@ func (importer *Importer) importRecord(record map[string]any, line string, filen
|
|
|
var classname string
|
|
|
switch filetype {
|
|
|
case schema.FT_EDGE:
|
|
|
- err = importer.odbcimporter.InsertEdge(record)
|
|
|
- if err != nil {
|
|
|
- err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
|
|
|
- return
|
|
|
- }
|
|
|
+ // err = importer.odbcimporter.InsertEdge(record)
|
|
|
+ // if err != nil {
|
|
|
+ // err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
|
|
|
+ // return
|
|
|
+ // }
|
|
|
+ graph.CacheEdgeInfo(record)
|
|
|
default:
|
|
|
classname = string(filetype)
|
|
|
err = importer.odbcimporter.InsertData(classname, record)
|