package importer import ( "encoding/json" "fmt" "io" "os" "path/filepath" "strings" "sync" "sync/atomic" "time" "git.wecise.com/wecise/cgimport/odbc" "git.wecise.com/wecise/cgimport/reader" "git.wecise.com/wecise/util/filewalker" "git.wecise.com/wecise/util/merrs" "git.wecise.com/wecise/util/rc" ) var mcfg = odbc.Config var logger = odbc.Logger type Importer struct { datapath string parallel int importrc *rc.RoutinesController odbcimporter *ODBCImporter } type ImportStatus struct { RecordsCount int64 } type CGIStatus struct { mutex sync.RWMutex TotalUseTime time.Duration ImportStatus map[string]*ImportStatus rc *rc.RoutinesController lasterror error lastsavetime time.Time waitdone chan any } var cgistatusfile = mcfg.GetString("cgi.statusfile", "/opt/matrix/var/cgimport.status.txt") func NewCGIStatus() *CGIStatus { return &CGIStatus{ ImportStatus: map[string]*ImportStatus{}, rc: rc.NewRoutinesController("", 1), waitdone: make(chan any, 1), } } func (cgistatus *CGIStatus) Load() error { cgistatusbs, e := os.ReadFile(cgistatusfile) if e != nil && !os.IsNotExist(e) { return e } if len(cgistatusbs) > 0 { e = json.Unmarshal(cgistatusbs, &cgistatus) if e != nil { logger.Warn(e) } } return nil } func (cgistatus *CGIStatus) WaitSaveDone() { cgistatus.waitdone <- 1 cgistatus.rc.WaitDone() } func (cgistatus *CGIStatus) Save() (err error) { cgistatus.rc.CallLast2Only(func() { if !cgistatus.lastsavetime.Equal(time.Time{}) { interval := time.Since(cgistatus.lastsavetime) if interval < 1*time.Second { t := time.NewTimer(1*time.Second - interval) select { case <-t.C: case v := <-cgistatus.waitdone: cgistatus.waitdone <- v } } } cgistatus.mutex.RLock() cgistatusbs, e := json.MarshalIndent(cgistatus, "", " ") cgistatus.mutex.RUnlock() if e != nil { cgistatus.lasterror = e return } e = os.MkdirAll(filepath.Dir(cgistatusfile), os.ModePerm) if e != nil { cgistatus.lasterror = e return } e = os.WriteFile(cgistatusfile, cgistatusbs, os.ModePerm) if e != nil { cgistatus.lasterror = e return } cgistatus.lastsavetime = time.Now() // fmt.Println(cgistatus.lastsavetime) }) return cgistatus.lasterror } func ImportDir(datapath string, parallel int) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) { importer := &Importer{ datapath: datapath, parallel: parallel, importrc: rc.NewRoutinesController("", 100), odbcimporter: NewODBCImporter(), } return importer.Import() } func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) { var cgirc = rc.NewRoutinesController("", importer.parallel) var wg sync.WaitGroup fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*") if e != nil { err = e return } cgistatus := NewCGIStatus() reload := mcfg.GetString("reload") if odbc.DevPhase&odbc.DP_PROCESSCONTINUE != 0 && reload == "" { err = cgistatus.Load() if err != nil { return } } else { // reload // 清除已有类 err = importer.odbcimporter.reload() if err != nil { return } } // 建类 err = importer.odbcimporter.ReviseClassStruct() if err != nil { return } totalfilescount = int64(len(cgistatus.ImportStatus)) for _, v := range cgistatus.ImportStatus { totalrecordscount += v.RecordsCount } totalusetime = cgistatus.TotalUseTime st := time.Now().Add(-totalusetime) cst := time.Now() // 遍历文件目录 e = fw.List(func(basedir string, fpath string) bool { if err != nil { return false } filename := filepath.Join(basedir, fpath) wg.Add(1) cgirc.ConcurCall(1, func() { defer wg.Done() cgistatus.mutex.RLock() importstatus := cgistatus.ImportStatus[filename] cgistatus.mutex.RUnlock() if importstatus != nil { return } records, e := importer.ImportFile(filename) if e != nil { err = e return } atomic.AddInt64(&filescount, 1) atomic.AddInt64(&recordscount, records) atomic.AddInt64(&totalfilescount, 1) atomic.AddInt64(&totalrecordscount, records) usetime = time.Since(cst) totalusetime = time.Since(st) cgistatus.mutex.Lock() cgistatus.ImportStatus[filename] = &ImportStatus{RecordsCount: records} cgistatus.TotalUseTime = totalusetime cgistatus.mutex.Unlock() cgistatus.Save() }, ) return true }) wg.Wait() if e != nil { err = e return } cgistatus.WaitSaveDone() importer.alldone() return } func (importer *Importer) ImportFile(filepath string) (blockcount int64, err error) { f, e := os.Open(filepath) if e != nil { return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}}) } defer f.Close() return importer.importReader(filepath, f) } func (importer *Importer) importReader(filename string, buf io.Reader) (blockcount int64, err error) { br, e := reader.NewBlockReader(filename, buf) if e != nil { return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}}) } var filetype FileType switch { case strings.Contains(filename, "_L1_"): filetype = FT_LEVEL1 case strings.Contains(filename, "_L2_"): filetype = FT_LEVEL2 case strings.Contains(filename, "_L3_"): filetype = FT_LEVEL3 case strings.Contains(filename, "_L4_"): filetype = FT_LEVEL4 case strings.Contains(filename, "_L5_"): filetype = FT_LEVEL5 case strings.Contains(filename, "_L6_"): filetype = FT_LEVEL6 case strings.Contains(filename, "_L7_"): filetype = FT_LEVEL7 case strings.Contains(filename, "_L8_"): filetype = FT_LEVEL8 case strings.Contains(filename, "MASTER"): filetype = FT_MASTER case strings.Contains(filename, "EDGE"): filetype = FT_EDGE default: err = merrs.NewError("filename does not conform to the agreed format " + filename) return } var wg sync.WaitGroup defer importer.done() defer wg.Wait() for { if err != nil { break } block, line, linecount, e := br.ReadBlock() if e != nil { return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}}) } if block == nil { return } wg.Add(1) e = importer.importrc.ConcurCall(1, func() { defer wg.Done() e = importer.importRecord(block, line, filename, filetype, linecount) if e != nil { err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}}) return } atomic.AddInt64(&blockcount, 1) }) if e != nil { return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}}) } } return } func (importer *Importer) importRecord(record map[string]any, line string, filename string, filetype FileType, linecount int) (err error) { if odbc.LogDebug { bs, e := json.MarshalIndent(record, "", " ") if e != nil { return merrs.NewError(e) } logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs))) } var classname string switch filetype { case FT_EDGE: err = importer.odbcimporter.InsertEdge(record) if err != nil { err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}}) return } default: classname = string(filetype) err = importer.odbcimporter.InsertData(classname, record) if err != nil { err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}}) return } } return } func (importer *Importer) alldone() { importer.odbcimporter.alldone() } func (importer *Importer) done() { importer.odbcimporter.done() } func Check() { client := odbc.ODBClient if client == nil { return } { time.Sleep(3 * time.Second) mql := "select * from /m3cnet/master" r, e := client.Query(mql).Do() if e != nil { panic(merrs.NewError(e)) } bs, _ := json.MarshalIndent(r.Data, "", " ") fmt.Println(string(bs)) } // { // mql := "select * from /m3cnet/minfo/level1" // r, e := client.Query(mql).Do() // if e != nil { // panic(merrs.NewError(e)) // } // bs, _ := json.MarshalIndent(r.Data, "", " ") // fmt.Println(string(bs)) // } }