package cgf import ( "encoding/json" "fmt" "io" "os" "path/filepath" "sync" "sync/atomic" "git.wecise.com/wecise/cgimport/cgf/reader" "git.wecise.com/wecise/cgimport/odbc" "git.wecise.com/wecise/util/filewalker" "git.wecise.com/wecise/util/merrs" "git.wecise.com/wecise/util/rc" ) var mcfg = odbc.Config var logger = odbc.Logger func ImportDir(datapath string, parallel int) (filescount, recordscount int64, err error) { // 遍历文件目录 var cgirc = rc.NewRoutinesController("", parallel) var wg sync.WaitGroup fw, e := filewalker.NewFileWalker([]string{datapath}, ".*") if e != nil { err = e return } e = fw.List(func(basedir string, fpath string) bool { if err != nil { return false } filename := filepath.Join(basedir, fpath) wg.Add(1) cgirc.ConcurCall(1, func() { defer wg.Done() records, e := ImportFile(filename) if e != nil { err = e return } atomic.AddInt64(&filescount, 1) atomic.AddInt64(&recordscount, int64(records)) }, ) return true }) wg.Wait() if e != nil { err = e return } return } func ImportFile(filepath string) (blockcount int, err error) { f, e := os.Open(filepath) if e != nil { return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}}) } defer f.Close() return importReader(filepath, f) } var parserc = rc.NewRoutinesController("", 1000) func importReader(filename string, buf io.Reader) (blockcount int, err error) { br, e := reader.NewBlockReader(filename, buf) if e != nil { return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}}) } var wg sync.WaitGroup defer wg.Wait() for { if err != nil { break } block, linecount, e := br.ReadBlock() if e != nil { return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}}) } if block == nil { return } wg.Add(1) e = parserc.ConcurCall(1, func() { defer wg.Done() e = importBlock(block, filename, linecount) if e != nil { err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}}) return } blockcount++ }) if e != nil { return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}}) } } return } func importBlock(block map[string]any, filename string, linecount int) (err error) { bs, e := json.MarshalIndent(block, "", " ") if e != nil { return e } logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs))) return }