|
@@ -19,11 +19,28 @@ import (
|
|
var mcfg = odbc.Config
|
|
var mcfg = odbc.Config
|
|
var logger = odbc.Logger
|
|
var logger = odbc.Logger
|
|
|
|
|
|
|
|
+type Importer struct {
|
|
|
|
+ datapath string
|
|
|
|
+ parallel int
|
|
|
|
+ importrc *rc.RoutinesController
|
|
|
|
+ odbcimporter *ODBCImporter
|
|
|
|
+}
|
|
|
|
+
|
|
func ImportDir(datapath string, parallel int) (filescount, recordscount int64, err error) {
|
|
func ImportDir(datapath string, parallel int) (filescount, recordscount int64, err error) {
|
|
|
|
+ importer := &Importer{
|
|
|
|
+ datapath: datapath,
|
|
|
|
+ parallel: parallel,
|
|
|
|
+ importrc: rc.NewRoutinesController("", 1000),
|
|
|
|
+ odbcimporter: NewODBCImporter(),
|
|
|
|
+ }
|
|
|
|
+ return importer.Import()
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (importer *Importer) Import() (filescount, recordscount int64, err error) {
|
|
// 遍历文件目录
|
|
// 遍历文件目录
|
|
- var cgirc = rc.NewRoutinesController("", parallel)
|
|
|
|
|
|
+ var cgirc = rc.NewRoutinesController("", importer.parallel)
|
|
var wg sync.WaitGroup
|
|
var wg sync.WaitGroup
|
|
- fw, e := filewalker.NewFileWalker([]string{datapath}, ".*")
|
|
|
|
|
|
+ fw, e := filewalker.NewFileWalker([]string{importer.datapath}, ".*")
|
|
if e != nil {
|
|
if e != nil {
|
|
err = e
|
|
err = e
|
|
return
|
|
return
|
|
@@ -37,7 +54,7 @@ func ImportDir(datapath string, parallel int) (filescount, recordscount int64, e
|
|
cgirc.ConcurCall(1,
|
|
cgirc.ConcurCall(1,
|
|
func() {
|
|
func() {
|
|
defer wg.Done()
|
|
defer wg.Done()
|
|
- records, e := ImportFile(filename)
|
|
|
|
|
|
+ records, e := importer.ImportFile(filename)
|
|
if e != nil {
|
|
if e != nil {
|
|
err = e
|
|
err = e
|
|
return
|
|
return
|
|
@@ -56,18 +73,16 @@ func ImportDir(datapath string, parallel int) (filescount, recordscount int64, e
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
-func ImportFile(filepath string) (blockcount int, err error) {
|
|
|
|
|
|
+func (importer *Importer) ImportFile(filepath string) (blockcount int, err error) {
|
|
f, e := os.Open(filepath)
|
|
f, e := os.Open(filepath)
|
|
if e != nil {
|
|
if e != nil {
|
|
return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
|
|
return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filepath}})
|
|
}
|
|
}
|
|
defer f.Close()
|
|
defer f.Close()
|
|
- return importReader(filepath, f)
|
|
|
|
|
|
+ return importer.importReader(filepath, f)
|
|
}
|
|
}
|
|
|
|
|
|
-var parserc = rc.NewRoutinesController("", 1000)
|
|
|
|
-
|
|
|
|
-func importReader(filename string, buf io.Reader) (blockcount int, err error) {
|
|
|
|
|
|
+func (importer *Importer) importReader(filename string, buf io.Reader) (blockcount int, err error) {
|
|
br, e := reader.NewBlockReader(filename, buf)
|
|
br, e := reader.NewBlockReader(filename, buf)
|
|
if e != nil {
|
|
if e != nil {
|
|
return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
|
|
return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
|
|
@@ -86,9 +101,9 @@ func importReader(filename string, buf io.Reader) (blockcount int, err error) {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
wg.Add(1)
|
|
wg.Add(1)
|
|
- e = parserc.ConcurCall(1, func() {
|
|
|
|
|
|
+ e = importer.importrc.ConcurCall(1, func() {
|
|
defer wg.Done()
|
|
defer wg.Done()
|
|
- e = importBlock(block, filename, linecount)
|
|
|
|
|
|
+ e = importer.importRecord(block, filename, linecount)
|
|
if e != nil {
|
|
if e != nil {
|
|
err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
|
|
err = merrs.NewError(e, merrs.SSMaps{{"filename": filename}, {"line": fmt.Sprint(linecount)}})
|
|
return
|
|
return
|
|
@@ -102,11 +117,19 @@ func importReader(filename string, buf io.Reader) (blockcount int, err error) {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
-func importBlock(block map[string]any, filename string, linecount int) (err error) {
|
|
|
|
- bs, e := json.MarshalIndent(block, "", " ")
|
|
|
|
|
|
+func (importer *Importer) importRecord(record map[string]any, filename string, linecount int) (err error) {
|
|
|
|
+ bs, e := json.MarshalIndent(record, "", " ")
|
|
if e != nil {
|
|
if e != nil {
|
|
return e
|
|
return e
|
|
}
|
|
}
|
|
logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
|
|
logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
|
|
|
|
+ e = importer.odbcimporter.ReviseClassStruct(record)
|
|
|
|
+ if e != nil {
|
|
|
|
+ return e
|
|
|
|
+ }
|
|
|
|
+ e = importer.odbcimporter.InsertData(record)
|
|
|
|
+ if e != nil {
|
|
|
|
+ return e
|
|
|
|
+ }
|
|
return
|
|
return
|
|
}
|
|
}
|