package cgf import ( "encoding/json" "fmt" "io" "os" "path/filepath" "sync" "sync/atomic" "git.wecise.com/wecise/cgimport/cgf/reader" "git.wecise.com/wecise/util/filewalker" "git.wecise.com/wecise/util/rc" ) func ImportDir(datapath string, parallel int) (filescount, recordscount int64, err error) { // 遍历文件目录 var cgirc = rc.NewRoutinesController("", parallel) var wg sync.WaitGroup fw, e := filewalker.NewFileWalker([]string{datapath}, ".*") if e != nil { err = e return } e = fw.List(func(basedir string, fpath string) bool { if err != nil { return false } filename := filepath.Join(basedir, fpath) wg.Add(1) cgirc.ConcurCall(1, func() { defer wg.Done() records, e := ImportFile(filename) if e != nil { err = e return } atomic.AddInt64(&filescount, 1) atomic.AddInt64(&recordscount, int64(records)) }, ) return true }) wg.Wait() if e != nil { err = e return } return } func ImportFile(filepath string) (blockcount int, err error) { f, e := os.Open(filepath) if e != nil { return blockcount, e } defer f.Close() return importReader(f) } func importReader(buf io.Reader) (blockcount int, err error) { br := reader.NewBlockReader(buf) for { block, e := br.ReadBlock() if e != nil { return blockcount, e } if block == nil { return } e = importBlock(block) if e != nil { return blockcount, e } blockcount++ } } func importBlock(block map[string]any) (err error) { bs, e := json.MarshalIndent(block, "", " ") if e != nil { return e } fmt.Println("import:", string(bs)) return }