package importer import ( "encoding/json" "fmt" "io" "os" "path/filepath" "regexp" "strings" "sync" "sync/atomic" "time" "git.wecise.com/wecise/cgimport/graph" "git.wecise.com/wecise/cgimport/odbc" "git.wecise.com/wecise/cgimport/reader" "git.wecise.com/wecise/cgimport/schema" "git.wecise.com/wecise/util/filewalker" "git.wecise.com/wecise/util/rc" "github.com/wecisecode/util/merrs" ) var mcfg = odbc.Config var logger = odbc.Logger type Importer struct { datapath string parallel int reload bool importstatus *CGIStatus fileimportrc *rc.RoutinesController odbcqueryrc *rc.RoutinesController odbcimporter *ODBCImporter starttime time.Time currentstarttime time.Time } func ImportDir(datapath string, parallel int, reload bool) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) { concurlimt := mcfg.GetInt("odbc.concurrent.limit", parallel*5) importer := &Importer{ datapath: datapath, parallel: parallel, reload: reload, importstatus: NewCGIStatus(), fileimportrc: rc.NewRoutinesController("", parallel), odbcqueryrc: rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*2), odbcimporter: NewODBCImporter(), } return importer.Import() } func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) { if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload { err = importer.importstatus.Load() if err != nil { return } err = importer.odbcimporter.init() if err != nil { return } } else { // reload // 清除已有类 err = importer.odbcimporter.reload() if err != nil { return } } // 建类 err = importer.odbcimporter.ReviseClassStruct() if err != nil { return } totalfilescount = int64(len(importer.importstatus.ImportStatus)) for _, v := range importer.importstatus.ImportStatus { totalrecordscount += v.RecordsCount } totalusetime = importer.importstatus.TotalUseTime importer.starttime = time.Now().Add(-totalusetime) importer.currentstarttime = time.Now() reedgefile := regexp.MustCompile("(?i).*edge.*.csv") fc, rc, ut, e := importer.ImportEdgeFiles(reedgefile) if e != nil { err = e return } totalfilescount += fc totalrecordscount += rc filescount += fc recordscount += rc usetime += ut totalusetime = importer.importstatus.TotalUseTime fc, rc, ut, e = importer.ImportNonEdgeFiles(reedgefile) if e != nil { err = e return } totalfilescount += fc totalrecordscount += rc filescount += fc recordscount += rc usetime += ut totalusetime = importer.importstatus.TotalUseTime importer.importstatus.WaitSaveDone() importer.alldone() return } func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) { return importer.ImportFiles(func(basedir string, fpath string) FWOP { if !reedgefile.MatchString(filepath.Base(fpath)) { // 忽略非EDGE文件 return FWOP_IGNORE } return FWOP_CONTINUE }) } func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp) (filescount, recordscount int64, usetime time.Duration, err error) { return importer.ImportFiles(func(basedir string, fpath string) FWOP { if reedgefile.MatchString(filepath.Base(fpath)) { // 忽略EDGE文件 return FWOP_IGNORE } return FWOP_CONTINUE }) } type FWOP int const ( FWOP_IGNORE FWOP = iota + 1 FWOP_BREAK FWOP_CONTINUE ) func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP) (filescount, recordscount int64, usetime time.Duration, err error) { // 遍历文件目录 var wg sync.WaitGroup fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*") if e != nil { err = e return } e = fw.List(func(basedir string, fpath string) bool { if err != nil { // 前方发生错误,结束遍历 return false } if strings.Contains(fpath, string(filepath.Separator)) { // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历 return false } switch fwop(basedir, fpath) { case FWOP_IGNORE: // 忽略当前文件,继续处理下一文件 return true case FWOP_BREAK: // 结束遍历 return false case FWOP_CONTINUE: default: } // 继续处理当前文件 filename := filepath.Join(basedir, fpath) wg.Add(1) // 并发处理 importer.fileimportrc.ConcurCall(1, func() { defer wg.Done() importer.importstatus.mutex.RLock() importstatus := importer.importstatus.ImportStatus[filename] importer.importstatus.mutex.RUnlock() importedrecordscount := int64(0) if importstatus != nil { importedrecordscount = importstatus.RecordsCount return } records, e := importer.ImportFile(filename, importedrecordscount) if e != nil { err = e return } atomic.AddInt64(&filescount, 1) atomic.AddInt64(&recordscount, records) usetime = time.Since(importer.currentstarttime) importer.importstatus.mutex.Lock() importer.importstatus.ImportStatus[filename] = &ImportStatus{RecordsCount: importedrecordscount + records} importer.importstatus.TotalUseTime = time.Since(importer.starttime) importer.importstatus.mutex.Unlock() importer.importstatus.Save() }, ) return true }) wg.Wait() if e != nil { if os.IsNotExist(e) { err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e) } else { err = merrs.NewError(e) } return } return } func (importer *Importer) ImportFile(filepath string, skiprecordscount int64) (blockcount int64, err error) { f, e := os.Open(filepath) if e != nil { return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}}) } defer f.Close() return importer.importReader(filepath, f, skiprecordscount) } func (importer *Importer) importReader(filename string, buf io.Reader, skiprecordscount int64) (blockcount int64, err error) { var filetype schema.FileType switch { case strings.Contains(filename, "_L1_"): filetype = schema.FT_LEVEL1 case strings.Contains(filename, "_L2_"): filetype = schema.FT_LEVEL2 case strings.Contains(filename, "_L3_"): filetype = schema.FT_LEVEL3 case strings.Contains(filename, "_L4_"): filetype = schema.FT_LEVEL4 case strings.Contains(filename, "_L5_"): filetype = schema.FT_LEVEL5 case strings.Contains(filename, "_L6_"): filetype = schema.FT_LEVEL6 case strings.Contains(filename, "_L7_"): filetype = schema.FT_LEVEL7 case strings.Contains(filename, "_L8_"): filetype = schema.FT_LEVEL8 case strings.Contains(filename, "MASTER"): filetype = schema.FT_MASTER case strings.Contains(filename, "EDGE"): filetype = schema.FT_EDGE default: err = merrs.NewError("filename does not conform to the agreed format " + filename) return } br, e := reader.NewBlockReader(filename, filetype, buf) if e != nil { return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}}) } var wg sync.WaitGroup defer importer.done() defer wg.Wait() n := int64(0) for { if err != nil { break } block, line, linecount, e := br.ReadBlock() if e != nil { return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}}) } if block == nil { return } n++ if n <= skiprecordscount { continue } wg.Add(1) e = importer.odbcqueryrc.ConcurCall(1, func() { defer wg.Done() e = importer.importRecord(block, line, filename, filetype, linecount) if e != nil { err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}}) return } atomic.AddInt64(&blockcount, 1) }) if e != nil { return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}}) } } return } func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype schema.FileType, linecount int) (err error) { if odbc.LogDebug { bs, e := json.MarshalIndent(record, "", " ") if e != nil { return merrs.NewError(e) } logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs))) } var classaliasname string switch filetype { case schema.FT_EDGE: graph.CacheEdgeInfo(record) default: classaliasname = string(filetype) err = importer.odbcimporter.InsertData(classaliasname, record) if err != nil { err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}}) return } } return } func (importer *Importer) alldone() { importer.odbcimporter.alldone() } func (importer *Importer) done() { importer.odbcimporter.done() } func Check() { client := odbc.ODBClient if client == nil { return } { // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'" mql := "select * from level1 where uniqueid='E2E:OTR0002L'" r, e := client.Query(mql).Do() if e != nil { panic(merrs.NewError(e)) } bs, _ := json.MarshalIndent(r.Data, "", " ") fmt.Println(string(bs)) } { mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() in "level1","level2"` r, e := client.Query(mql).Do() if e != nil { panic(merrs.NewError(e)) } bs, _ := json.MarshalIndent(r.Data, "", " ") fmt.Println(string(bs)) } }