123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- package importer
- import (
- "encoding/json"
- "fmt"
- "io"
- "os"
- "path/filepath"
- "regexp"
- "runtime"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "git.wecise.com/wecise/cgimport/graph"
- "git.wecise.com/wecise/cgimport/odbc"
- "git.wecise.com/wecise/cgimport/reader"
- "git.wecise.com/wecise/cgimport/schema"
- "github.com/wecisecode/util/cmap"
- "github.com/wecisecode/util/filewalker"
- "github.com/wecisecode/util/merrs"
- "github.com/wecisecode/util/pattern"
- "github.com/wecisecode/util/rc"
- )
- var mcfg = odbc.Config
- var logger = odbc.Logger
- type Importer struct {
- datapath string
- parallel int
- rebuild bool
- reload bool
- importstatus *CGIStatus
- fileimportrc *rc.RoutinesController
- odbcqueryrc *rc.RoutinesController
- odbcimporter *ODBCImporter
- starttime time.Time
- currentstarttime time.Time
- lastlogtime cmap.ConcurrentMap[string, time.Time]
- }
- func ImportDir(datapath string, parallel int, rebuild, reload bool) (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
- concurlimt := mcfg.GetInt("odbc.concurrent.limit", runtime.GOMAXPROCS(0))
- importer := &Importer{
- datapath: datapath,
- parallel: parallel,
- rebuild: rebuild,
- reload: reload,
- importstatus: NewCGIStatus(),
- fileimportrc: rc.NewRoutinesController("", parallel),
- odbcqueryrc: rc.NewRoutinesControllerLimit("", concurlimt, concurlimt*5),
- odbcimporter: NewODBCImporter(),
- lastlogtime: cmap.NewSingle[string, time.Time](),
- }
- return importer.Import()
- }
- func (importer *Importer) Import() (totalfilescount, totallinecount, totalrecordscount, totalretrycount int64, totalusetime time.Duration, filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
- if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && !importer.reload && !importer.rebuild {
- // reload
- err = importer.importstatus.Load()
- if err != nil {
- return
- }
- }
- err = importer.odbcimporter.init(importer.rebuild)
- if err != nil {
- return
- }
- //
- logger.Info("graph data import start")
- totalusetime = importer.importstatus.TotalUseTime
- importer.starttime = time.Now().Add(-totalusetime)
- importer.currentstarttime = time.Now()
- reedgefile := regexp.MustCompile("(?i).*edge.*.csv")
- efc, elc, erc, etc, ut, e := importer.ImportEdgeFiles(reedgefile, false)
- if e != nil {
- err = e
- return
- }
- afc, alc, arc, atc, ut, e := importer.ImportNonEdgeFiles(reedgefile, true)
- if e != nil {
- err = e
- return
- }
- totalfilescount = int64(len(importer.importstatus.ImportStatus)) + efc
- for _, v := range importer.importstatus.ImportStatus {
- totallinecount += v.LinesCount
- totalrecordscount += v.RecordsCount
- totalretrycount += v.RetryCount
- }
- totallinecount += elc
- totalrecordscount += erc
- totalretrycount += etc
- filescount = afc + efc
- linescount = alc + elc
- recordscount = arc + erc
- retrycount = atc + etc
- usetime = ut
- totalusetime = importer.importstatus.TotalUseTime
- importer.importstatus.WaitSaveDone()
- importer.alldone()
- return
- }
- func (importer *Importer) ImportEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
- return importer.ImportFiles(func(basedir string, fpath string) FWOP {
- if !reedgefile.MatchString(filepath.Base(fpath)) {
- // 忽略非EDGE文件
- return FWOP_IGNORE
- }
- return FWOP_CONTINUE
- }, logstatus)
- }
- func (importer *Importer) ImportNonEdgeFiles(reedgefile *regexp.Regexp, logstatus bool) (filescount, linecount, recordscount, retrycount int64, usetime time.Duration, err error) {
- return importer.ImportFiles(func(basedir string, fpath string) FWOP {
- if reedgefile.MatchString(filepath.Base(fpath)) {
- // 忽略EDGE文件
- return FWOP_IGNORE
- }
- return FWOP_CONTINUE
- }, logstatus)
- }
- type FWOP int
- const (
- FWOP_IGNORE FWOP = iota + 1
- FWOP_BREAK
- FWOP_CONTINUE
- )
- func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FWOP, logstatus bool) (filescount, linescount, recordscount, retrycount int64, usetime time.Duration, err error) {
- // 遍历文件目录
- var wg sync.WaitGroup
- fw, e := filewalker.NewFileWalker([]string{importer.datapath}, `^[^\.].*`)
- if e != nil {
- err = e
- return
- }
- e = fw.List(func(basedir string, fpath string) bool {
- if err != nil {
- // 前方发生错误,结束遍历
- return false
- }
- if strings.Contains(fpath, string(filepath.Separator)) {
- // 忽略子目录,fw.List有序,目录排在文件后面,遇到子目录即可结束遍历
- return false
- }
- switch fwop(basedir, fpath) {
- case FWOP_IGNORE:
- // 忽略当前文件,继续处理下一文件
- return true
- case FWOP_BREAK:
- // 结束遍历
- return false
- case FWOP_CONTINUE:
- default:
- }
- // 继续处理当前文件
- filename := filepath.Join(basedir, fpath)
- filescount++
- wg.Add(1)
- // 并发处理
- importer.fileimportrc.ConcurCall(1,
- func() {
- defer wg.Done()
- importer.importstatus.mutex.RLock()
- importstatus := importer.importstatus.ImportStatus[filename]
- importer.importstatus.mutex.RUnlock()
- linefrom, blockfrom := int64(0), int64(0)
- totalretrycount := int64(0)
- if importstatus != nil {
- linefrom, blockfrom, totalretrycount = importstatus.LinesCount, importstatus.RecordsCount, importstatus.RetryCount
- }
- if linefrom == 0 {
- logger.Info("import", "file", filename)
- } else {
- logger.Info("import", "file", filename, "from line", linefrom)
- }
- lines, records, retries, e := importer.ImportFile(filename, linefrom, blockfrom, totalretrycount, logstatus)
- if e != nil {
- err = e
- return
- }
- atomic.AddInt64(&linescount, lines-linefrom)
- atomic.AddInt64(&recordscount, records-blockfrom)
- atomic.AddInt64(&retrycount, retries-totalretrycount)
- usetime = time.Since(importer.currentstarttime)
- importer.importstatus.mutex.Lock()
- if logstatus {
- importer.importstatus.ImportStatus[filename] = &ImportStatus{
- LinesCount: lines,
- RecordsCount: records,
- RetryCount: retries,
- }
- }
- importer.importstatus.TotalUseTime = time.Since(importer.starttime)
- importer.importstatus.mutex.Unlock()
- importer.importstatus.Save()
- if retries > 0 {
- logger.Info("file", filename, "total imported", lines, "lines", records, "records", retries, "retry times")
- } else {
- logger.Info("file", filename, "total imported", lines, "lines", records, "records")
- }
- },
- )
- return true
- })
- wg.Wait()
- if e != nil {
- if os.IsNotExist(e) {
- err = merrs.NewError(`directory "`+importer.datapath+`" not exist specified by "datapath"`, e)
- } else {
- err = merrs.NewError(e)
- }
- return
- }
- return
- }
- func (importer *Importer) ImportFile(filepath string, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
- f, e := os.Open(filepath)
- if e != nil {
- return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
- }
- defer f.Close()
- return importer.importReader(filepath, f, linefrom, blockfrom, totalretrycount, logstatus)
- }
- var fileclassmapping = cmap.NewSingle[string, *regexp.Regexp]()
- func init() {
- fileclassmapping.Set("EDGE", regexp.MustCompile(pattern.Contain2RegexpString(`EDGE`)))
- fileclassmapping.Set("master", regexp.MustCompile(pattern.Contain2RegexpString(`MASTER`)))
- fileclassmapping.Set("level1", regexp.MustCompile(pattern.Contain2RegexpString(`_L1_`)))
- fileclassmapping.Set("level2", regexp.MustCompile(pattern.Contain2RegexpString(`_L2_`)))
- fileclassmapping.Set("level3", regexp.MustCompile(pattern.Contain2RegexpString(`_L3_`)))
- fileclassmapping.Set("level4", regexp.MustCompile(pattern.Contain2RegexpString(`_L4_`)))
- fileclassmapping.Set("level5", regexp.MustCompile(pattern.Contain2RegexpString(`_L5_`)))
- fileclassmapping.Set("level6", regexp.MustCompile(pattern.Contain2RegexpString(`_L6_`)))
- fileclassmapping.Set("level7", regexp.MustCompile(pattern.Contain2RegexpString(`_L7_`)))
- fileclassmapping.Set("level8", regexp.MustCompile(pattern.Contain2RegexpString(`_L8_`)))
- keys := mcfg.Keys()
- for _, key := range keys {
- if strings.HasPrefix(key, "cgi.mapping.class.") {
- classname := key[len("cgi.mapping.class."):]
- filepatterns := mcfg.GetStrings(key)
- for _, fp := range filepatterns {
- fp = pattern.Wildcard2RegexpString(fp)
- fileclassmapping.Set(classname, regexp.MustCompile(fp))
- }
- }
- }
- }
- func (importer *Importer) importReader(filename string, buf io.Reader, linefrom, blockfrom, totalretrycount int64, logstatus bool) (linecount, blockcount, retrycount int64, err error) {
- classname := ""
- fileclassmapping.Fetch(func(key string, v *regexp.Regexp) bool {
- if v.MatchString(filename) {
- classname = key
- return false
- }
- return true
- })
- if classname == "" {
- err = merrs.NewError("cannot mapping to class, filename:" + filename)
- return
- }
- br, e := reader.NewBlockReader(filename, classname, buf, importer.odbcimporter.schema)
- if e != nil {
- return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
- }
- skiplines := int(linefrom)
- blockcount = blockfrom
- doinglines := []int64{}
- donelines := linefrom
- doneblocks := blockfrom
- savedlines := linefrom
- savedblocks := blockfrom
- retrycount = totalretrycount
- linecount = linefrom
- // maxresponsetime := time.Duration(0)
- var wg sync.WaitGroup
- defer importer.done()
- defer wg.Wait()
- for {
- if err != nil {
- return
- }
- lastlinecount := linecount
- block, line, linenumber, e := br.ReadBlock(skiplines)
- linecount = int64(linenumber)
- if e != nil {
- return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
- }
- if block == nil {
- return
- }
- blockcount++
- wg.Add(1)
- doingline := linecount
- doingblock := blockcount
- if logstatus {
- doinglines = append(doinglines, doingline)
- }
- e = importer.odbcqueryrc.ConcurCall(1, func() {
- defer wg.Done()
- if err != nil {
- return
- }
- // st := time.Now()
- // defer func() {
- // ut := time.Since(st)
- // if ut > maxresponsetime {
- // maxresponsetime = ut
- // }
- // }()
- // logger.Info("G:", runtime.NumGoroutine(),
- // "RC:", importer.fileimportrc.ConcurCount(),
- // "WC:", importer.odbcqueryrc.ConcurCount(),
- // "RQ:", importer.fileimportrc.QueueCount(),
- // "WQ:", importer.odbcqueryrc.QueueCount(),
- // "maxresponsetime:", maxresponsetime)
- rc, e := importer.importRecord(block, line, filename, classname, int(doingline))
- atomic.AddInt64(&retrycount, int64(rc))
- if e != nil {
- err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(doingline)}, {"line": line}})
- return
- }
- atomic.AddInt64(&donelines, doingline-lastlinecount)
- atomic.AddInt64(&doneblocks, 1)
- if logstatus {
- readinglines := doinglines[len(doinglines)-1]
- if doingline == doinglines[0] {
- importer.importstatus.mutex.Lock()
- importer.importstatus.ImportStatus[filename] = &ImportStatus{
- LinesCount: doingline,
- RecordsCount: doingblock,
- RetryCount: retrycount,
- }
- importer.importstatus.TotalUseTime = time.Since(importer.starttime)
- importer.importstatus.Save()
- savedlines = doingline
- savedblocks = doingblock
- doinglines = doinglines[1:]
- importer.importstatus.mutex.Unlock()
- } else {
- for i, l := range doinglines {
- if l == doingline {
- doinglines = append(doinglines[:i], doinglines[i+1:]...)
- break
- }
- }
- }
- importer.logInfo(filename, readinglines, doinglines, donelines, doneblocks, savedlines, savedblocks, retrycount)
- }
- })
- if e != nil {
- return linecount, blockcount, retrycount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
- }
- }
- }
- func (importer *Importer) logInfo(filename string, readinglines int64, doinglines []int64, donelines, doneblocks, savedlines, savedblocks, retrycount int64) {
- if time.Since(importer.lastlogtime.GetIFPresent(filename)) > 5*time.Second {
- if odbc.LogDebug {
- if retrycount > 0 {
- logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records,", "saved", savedlines, "lines", savedblocks, "records", retrycount, "retry times")
- } else {
- logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records,", "saved", savedlines, "lines", savedblocks, "records")
- }
- } else {
- if retrycount > 0 {
- logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records", retrycount, "retry times")
- } else {
- logger.Info("file", filename, "read", readinglines, "lines,", "importing", len(doinglines), "chunks,", "imported", donelines, "lines", doneblocks, "records")
- }
- }
- importer.lastlogtime.Set(filename, time.Now())
- }
- }
- func (importer *Importer) importRecord(record map[string]any, line string, filename string, classaliasname string, linecount int) (retrycount int, err error) {
- if odbc.LogDebug {
- bs, e := json.MarshalIndent(record, "", " ")
- if e != nil {
- return 0, merrs.NewError(e)
- }
- logger.Trace(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
- }
- switch classaliasname {
- case schema.EDGE:
- graph.CacheEdgeInfo(record)
- default:
- retrycount, err = importer.odbcimporter.InsertData(classaliasname, record)
- if err != nil {
- err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
- return
- }
- }
- return
- }
- func (importer *Importer) alldone() {
- importer.odbcimporter.alldone()
- }
- func (importer *Importer) done() {
- importer.odbcimporter.done()
- }
- func Check() {
- client := odbc.ODBClient
- if client == nil {
- return
- }
- {
- // mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
- mql := "select * from level1 where uniqueid='E2E:OTR0002L'"
- r, e := client.Query(mql).Do()
- if e != nil {
- panic(merrs.NewError(e))
- }
- bs, _ := json.MarshalIndent(r.Data, "", " ")
- fmt.Println(string(bs))
- }
- {
- mql := `match ("level1:E2E:OTR0002L")-[*]->(),("level1:E2E:OTR0002L")<-[*]-() with namespace="m3cnet", fields="uniqueid,distname" in "level1","level2"`
- r, e := client.Query(mql).Do()
- if e != nil {
- panic(merrs.NewError(e))
- }
- bs, _ := json.MarshalIndent(r.Data, "", " ")
- fmt.Println(string(bs))
- }
- }
|