package importer import ( "encoding/json" "fmt" "io" "os" "path/filepath" "regexp" "runtime" "strings" "sync" "sync/atomic" "time" "git.wecise.com/wecise/cgimport/graph" "git.wecise.com/wecise/cgimport/odbc" "git.wecise.com/wecise/cgimport/reader" "git.wecise.com/wecise/cgimport/schema" "github.com/wecisecode/util/cmap" "github.com/wecisecode/util/filewalker" "github.com/wecisecode/util/merrs" "github.com/wecisecode/util/pattern" "github.com/wecisecode/util/rc" ) var mcfg = odbc.Config var logger = odbc.Logger type Importer struct { datapath string parallel int rebuild bool reload bool importstatus *CGIStatus fileimportrc *rc.RoutinesController odbcqueryrc *rc.RoutinesController odbcimporter *ODBCImporter starttime time.Time currentstarttime time.Time lastlogtime cmap.ConcurrentMap[string, time.Time] } func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) { concurlimt := mcfg.GetInt("odbc.concurrent.limit", runtime.GOMAXPROCS(0)) importer := &Importer{ datapath: datapath, parallel: parallel, rebuild: rebuild, reload: reload, importstatus: NewCGIStatus(), fileimportrc: rc.NewRoutinesController("", parallel), odbcqueryrc: rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*5), odbcimporter: NewODBCImporter(), lastlogtime: cmap.NewSingle[string, time.Time](), } return importer.Import() } func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) { if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload && !importer.rebuild { // reload err = importer.importstatus.Load() if err != nil { return } } err = importer.odbcimporter.init(importer.rebuild) if err != nil { return } // logger.Info("graph data import start") totalusetime = importer.importstatus.TotalUseTime importer.starttime = time.Now().Add(-totalusetime) importer.currentstarttime = time.Now() reedgefile := regexp.MustCompile("(?i).*edge.*.csv") efc, elc, erc, etc, ut, e := importer.ImportEdgeFiles(reedgefile, false) if e != nil { err = e return } afc, alc, arc, atc, ut, e := importer.ImportNonEdgeFiles(reedgefile, true) if e != nil { err = e return } totalfilescount = int64(len(importer.importstatus.ImportStatus)) + efc for _, v := range importer.importstatus.ImportStatus { totallinecount += v.LinesCount totalrecordscount += v.RecordsCount totalretrycount += v.RetryCount } totallinecount += elc totalrecordscount += erc totalretrycount += etc filescount = afc + efc linescount = alc + elc recordscount = arc + erc retrycount = atc + etc usetime = ut totalusetime = importer.importstatus.TotalUseTime importer.importstatus.WaitSaveDone() importer.alldone() return } func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) { return importer.ImportFiles(func(basedir string, fpath string) FWOP { if !reedgefile.MatchString(filepath.Base(fpath)) { // 忽略非EDGE文件 return FWOP_IGNORE } return FWOP_CONTINUE }, logstatus) } func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) { return importer.ImportFiles(func(basedir string, fpath string) FWOP { if reedgefile.MatchString(filepath.Base(fpath)) { // 忽略EDGE文件 return FWOP_IGNORE } return FWOP_CONTINUE }, logstatus) } type FWOP int const ( FWOP_IGNORE FWOP = iota + 1 FWOP_BREAK FWOP_CONTINUE ) func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) { // 遍历文件目录 var wg sync.WaitGroup fw, e := filewalker.NewFileWalker([]string{importer.datapath}, `^[^\.].*`) if e != nil { err = e return } e = fw.List(func(basedir string, fpath string) bool { if err != nil { // 前方发生错误,结束遍历 return false } if strings.Contains(fpath, string(filepath.Separator)) { // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历 return false } switch fwop(basedir, fpath) { case FWOP_IGNORE: // 忽略当前文件,继续处理下一文件 return true case FWOP_BREAK: // 结束遍历 return false case FWOP_CONTINUE: default: } // 继续处理当前文件 filename := filepath.Join(basedir, fpath) filescount++ wg.Add(1) // 并发处理 importer.fileimportrc.ConcurCall(1, func() { defer wg.Done() importer.importstatus.mutex.RLock() importstatus := importer.importstatus.ImportStatus[filename] importer.importstatus.mutex.RUnlock() linefrom, blockfrom := int64(0), int64(0) totalretrycount := int64(0) if importstatus != nil { linefrom, blockfrom, totalretrycount = importstatus.LinesCount, importstatus.RecordsCount, importstatus.RetryCount } if linefrom == 0 { logger.Info("import", "file", filename) } else { logger.Info("import", "file", filename, "from line", linefrom) } lines, records, retries, e := importer.ImportFile(filename, linefrom, blockfrom, totalretrycount, logstatus) if e != nil { err = e return } atomic.AddInt64(&linescount, lines-linefrom) atomic.AddInt64(&recordscount, records-blockfrom) atomic.AddInt64(&retrycount, retries-totalretrycount) usetime = time.Since(importer.currentstarttime) importer.importstatus.mutex.Lock() if logstatus { importer.importstatus.ImportStatus[filename] = &ImportStatus{ LinesCount: lines, RecordsCount: records, RetryCount: retries, } } importer.importstatus.TotalUseTime = time.Since(importer.starttime) importer.importstatus.mutex.Unlock() importer.importstatus.Save() if retries > 0 { logger.Info("file", filename, "total imported", lines, "lines", records, "records", retries, "retry times") } else { logger.Info("file", filename, "total imported", lines, "lines", records, "records") } }, ) return true }) wg.Wait() if e != nil { if os.IsNotExist(e) { err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e) } else { err = merrs.NewError(e) } return } return } func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) { f, e := os.Open(filepath) if e != nil { return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}}) } defer f.Close() return importer.importReader(filepath, f, linefrom, blockfrom, totalretrycount, logstatus) } var fileclassmapping = cmap.NewSingle[string, *regexp.Regexp]() func init() { fileclassmapping.Set("EDGE", regexp.MustCompile(pattern.Contain2RegexpString(`EDGE`))) fileclassmapping.Set("master", regexp.MustCompile(pattern.Contain2RegexpString(`MASTER`))) fileclassmapping.Set("level1", regexp.MustCompile(pattern.Contain2RegexpString(`_L1_`))) fileclassmapping.Set("level2", regexp.MustCompile(pattern.Contain2RegexpString(`_L2_`))) fileclassmapping.Set("level3", regexp.MustCompile(pattern.Contain2RegexpString(`_L3_`))) fileclassmapping.Set("level4", regexp.MustCompile(pattern.Contain2RegexpString(`_L4_`))) fileclassmapping.Set("level5", regexp.MustCompile(pattern.Contain2RegexpString(`_L5_`))) fileclassmapping.Set("level6", regexp.MustCompile(pattern.Contain2RegexpString(`_L6_`))) fileclassmapping.Set("level7", regexp.MustCompile(pattern.Contain2RegexpString(`_L7_`))) fileclassmapping.Set("level8", regexp.MustCompile(pattern.Contain2RegexpString(`_L8_`))) keys := mcfg.Keys() for _, key := range keys { if strings.HasPrefix(key, "cgi.mapping.class.") { classname := key[len("cgi.mapping.class."):] filepatterns := mcfg.GetStrings(key) for _, fp := range filepatterns { fp = pattern.Wildcard2RegexpString(fp) fileclassmapping.Set(classname, regexp.MustCompile(fp)) } } } } func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) { classname := "" fileclassmapping.Fetch(func(key string, v *regexp.Regexp) bool { if v.MatchString(filename) { classname = key return false } return true }) if classname == "" { err = merrs.NewError("cannot mapping to class, filename:" + filename) return } br, e := reader.NewBlockReader(filename, classname, buf, importer.odbcimporter.schema) if e != nil { return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}}) } skiplines := int(linefrom) blockcount = blockfrom doinglines := []int64{} donelines := linefrom doneblocks := blockfrom savedlines := linefrom savedblocks := blockfrom retrycount = totalretrycount linecount = linefrom // maxresponsetime := time.Duration(0) var wg sync.WaitGroup defer importer.done() defer wg.Wait() for { if err != nil { return } lastlinecount := linecount block, line, linenumber, e := br.ReadBlock(skiplines) linecount = int64(linenumber) if e != nil { return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}}) } if block == nil { return } blockcount++ wg.Add(1) doingline := linecount doingblock := blockcount if logstatus { doinglines = append(doinglines, doingline) } e = importer.odbcqueryrc.ConcurCall(1, func() { defer wg.Done() if err != nil { return } // st := time.Now() // defer func() { // ut := time.Since(st) // if ut > maxresponsetime { // maxresponsetime = ut // } // }() // logger.Info("G:", runtime.NumGoroutine(), // "RC:", importer.fileimportrc.ConcurCount(), // "WC:", importer.odbcqueryrc.ConcurCount(), // "RQ:", importer.fileimportrc.QueueCount(), // "WQ:", importer.odbcqueryrc.QueueCount(), // "maxresponsetime:", maxresponsetime) rc, e := importer.importRecord(block, line, filename, classname, int(doingline)) atomic.AddInt64(&retrycount, int64(rc)) if e != nil { err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}}) return } atomic.AddInt64(&donelines, doingline-lastlinecount) atomic.AddInt64(&doneblocks, 1) if logstatus { readinglines := doinglines[len(doinglines)-1] if doingline == doinglines[0] { importer.importstatus.mutex.Lock() importer.importstatus.ImportStatus[filename] = &ImportStatus{ LinesCount: doingline, RecordsCount: doingblock, RetryCount: retrycount, } importer.importstatus.TotalUseTime = time.Since(importer.starttime) importer.importstatus.Save() savedlines = doingline savedblocks = doingblock doinglines = doinglines[1:] importer.importstatus.mutex.Unlock() } else { for i, l := range doinglines { if l == doingline { doinglines = append(doinglines[:i], doinglines[i+1:]...) break } } } importer.logInfo(filename, readinglines, doinglines, donelines, doneblocks, savedlines, savedblocks, retrycount) } }) if e != nil { return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}}) } } } func (importer *Importer) logInfo(filename string, readinglines int64, doinglines []int64, donelines, doneblocks, savedlines, savedblocks, retrycount int64) { if time.Since(importer.lastlogtime.GetIFPresent(filename)) > 5*time.Second { if odbc.LogDebug { if retrycount > 0 { logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records,", "saved", savedlines, "lines", savedblocks, "records", retrycount, "retry times") } else { logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records,", "saved", savedlines, "lines", savedblocks, "records") } } else { if retrycount > 0 { logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records", retrycount, "retry times") } else { logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records") } } importer.lastlogtime.Set(filename, time.Now()) } } func (importer *Importer) importRecord(record map[string]any, line string, filename string, classaliasname string, linecount int) (retrycount int, err error) { if odbc.LogDebug { bs, e := json.MarshalIndent(record, "", " ") if e != nil { return 0, merrs.NewError(e) } logger.Trace(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs))) } switch classaliasname { case schema.EDGE: graph.CacheEdgeInfo(record) default: retrycount, err = importer.odbcimporter.InsertData(classaliasname, record) if err != nil { err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}}) return } } return } func (importer *Importer) alldone() { importer.odbcimporter.alldone() } func (importer *Importer) done() { importer.odbcimporter.done() } func Check() { client := odbc.ODBClient if client == nil { return } { // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'" mql := "select * from level1 where uniqueid='E2E:OTR0002L'" r, e := client.Query(mql).Do() if e != nil { panic(merrs.NewError(e)) } bs, _ := json.MarshalIndent(r.Data, "", " ") fmt.Println(string(bs)) } { mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() with namespace="m3cnet", fields="uniqueid,distname" in "level1","level2"` r, e := client.Query(mql).Do() if e != nil { panic(merrs.NewError(e)) } bs, _ := json.MarshalIndent(r.Data, "", " ") fmt.Println(string(bs)) } }