|
@@ -9,12 +9,16 @@ import (
|
|
|
"sync"
|
|
|
"sync/atomic"
|
|
|
|
|
|
+ "git.wecise.com/wecise/cgimport/cfg"
|
|
|
"git.wecise.com/wecise/cgimport/cgf/reader"
|
|
|
"git.wecise.com/wecise/util/filewalker"
|
|
|
"git.wecise.com/wecise/util/merrs"
|
|
|
"git.wecise.com/wecise/util/rc"
|
|
|
)
|
|
|
|
|
|
+var mcfg = cfg.Config
|
|
|
+var logger = cfg.Logger
|
|
|
+
|
|
|
func ImportDir(datapath string, parallel int) (filescount, recordscount int64, err error) {
|
|
|
// 遍历文件目录
|
|
|
var cgirc = rc.NewRoutinesController("", parallel)
|
|
@@ -61,12 +65,19 @@ func ImportFile(filepath string) (blockcount int, err error) {
|
|
|
return importReader(filepath, f)
|
|
|
}
|
|
|
|
|
|
+var parserc = rc.NewRoutinesController("", 1000)
|
|
|
+
|
|
|
func importReader(filename string, buf io.Reader) (blockcount int, err error) {
|
|
|
br, e := reader.NewBlockReader(filename, buf)
|
|
|
if e != nil {
|
|
|
return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
|
|
|
}
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ defer wg.Wait()
|
|
|
for {
|
|
|
+ if err != nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
block, linecount, e := br.ReadBlock()
|
|
|
if e != nil {
|
|
|
return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
|
|
@@ -74,12 +85,21 @@ func importReader(filename string, buf io.Reader) (blockcount int, err error) {
|
|
|
if block == nil {
|
|
|
return
|
|
|
}
|
|
|
- e = importBlock(block, filename, linecount)
|
|
|
+ wg.Add(1)
|
|
|
+ e = parserc.ConcurCall(1, func() {
|
|
|
+ defer wg.Done()
|
|
|
+ e = importBlock(block, filename, linecount)
|
|
|
+ if e != nil {
|
|
|
+ err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
|
|
|
+ return
|
|
|
+ }
|
|
|
+ blockcount++
|
|
|
+ })
|
|
|
if e != nil {
|
|
|
return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
|
|
|
}
|
|
|
- blockcount++
|
|
|
}
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
func importBlock(block map[string]any, filename string, linecount int) (err error) {
|
|
@@ -87,6 +107,6 @@ func importBlock(block map[string]any, filename string, linecount int) (err erro
|
|
|
if e != nil {
|
|
|
return e
|
|
|
}
|
|
|
- fmt.Println(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
|
|
|
+ logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
|
|
|
return
|
|
|
}
|