libf il y a 8 mois
Parent
commit
34b2da5f0a
3 fichiers modifiés avec 66 ajouts et 37 suppressions
  1. 13 18
      importer/importer.go
  2. 52 19
      importer/odbcimporter.go
  3. 1 0
      odbc/odbclient.go

+ 13 - 18
importer/importer.go

@@ -188,6 +188,7 @@ func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, to
 		return
 	}
 	cgistatus.WaitSaveDone()
+	importer.alldone()
 	return
 }
 
@@ -206,9 +207,7 @@ func (importer *Importer) importReader(filename string, buf io.Reader) (blockcou
 		return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
 	}
 	var wg sync.WaitGroup
-	defer func() {
-		importer.done()
-	}()
+	defer importer.done()
 	defer wg.Wait()
 	for {
 		if err != nil {
@@ -239,23 +238,15 @@ func (importer *Importer) importReader(filename string, buf io.Reader) (blockcou
 }
 
 func (importer *Importer) importRecord(record map[string]any, filename string, linecount int) (err error) {
-	bs, e := json.MarshalIndent(record, "", "  ")
-	if e != nil {
-		return e
+	if odbc.LogDebug {
+		bs, e := json.MarshalIndent(record, "", "  ")
+		if e != nil {
+			return e
+		}
+		logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
 	}
-	logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
 	var classname string
-	switch {
-	case record["uniqueId"] != nil:
-		classname = "/cgitest/x10/x1002"
-	case record["UNIQUEID"] != nil:
-		classname = "/cgitest/x10/x1001"
-	case record["FROMUNIQUEID"] != nil:
-		classname = "/cgitest/x10/x1003"
-	default:
-		return merrs.NewError("no mapping classname", merrs.SSMaps{{"data": string(bs)}})
-	}
-	e = importer.odbcimporter.ReviseClassStruct(record)
+	classname, e := importer.odbcimporter.ReviseClassStruct(record)
 	if e != nil {
 		return e
 	}
@@ -266,6 +257,10 @@ func (importer *Importer) importRecord(record map[string]any, filename string, l
 	return
 }
 
+func (importer *Importer) alldone() {
+	importer.odbcimporter.alldone()
+}
+
 func (importer *Importer) done() {
 	importer.odbcimporter.done()
 }

+ 52 - 19
importer/odbcimporter.go

@@ -1,6 +1,7 @@
 package importer
 
 import (
+	"encoding/json"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -45,36 +46,53 @@ func NewODBCImporter() *ODBCImporter {
 }
 
 // 根据数据修正类定义
-func (odbci *ODBCImporter) ReviseClassStruct(data map[string]any) (err error) {
-	e := odbci.createClass("cgitest", "/cgitest", nil)
-	if e != nil {
-		return e
+func (odbci *ODBCImporter) ReviseClassStruct(data map[string]any) (classname string, err error) {
+	switch {
+	case data["uniqueId"] != nil:
+		classname = "/cgitest/x10/x1002"
+	case data["UNIQUEID"] != nil:
+		classname = "/cgitest/x10/x1001"
+	case data["FROMUNIQUEID"] != nil:
+		classname = "/cgitest/x10/x1003"
+	default:
+		bs, e := json.MarshalIndent(data, "", "  ")
+		if e != nil {
+			err = e
+			return
+		}
+		err = merrs.NewError("no mapping classname", merrs.SSMaps{{"data": string(bs)}})
 	}
-	e = odbci.createClass("x10", "/cgitest/x10", nil)
-	if e != nil {
-		return e
+
+	err = odbci.createClass("cgitest", "/cgitest", nil)
+	if err != nil {
+		return
 	}
-	e = odbci.createClass("x1001", "/cgitest/x10/x1001", []*fieldinfo{
+	err = odbci.createClass("x10", "/cgitest/x10", nil)
+	if err != nil {
+		return
+	}
+	err = odbci.createClass("x1001", "/cgitest/x10/x1001", []*fieldinfo{
 		{fieldname: "uniqueid", datakey: "UNIQUEID", fieldtype: "varchar", keyidx: 1},
 		{fieldname: "distname", datakey: "BASENAME", fieldtype: "varchar"},
 	})
-	if e != nil {
-		return e
+	if err != nil {
+		return
 	}
-	e = odbci.createClass("x1002", "/cgitest/x10/x1002", []*fieldinfo{
+	err = odbci.createClass("x1002", "/cgitest/x10/x1002", []*fieldinfo{
 		{fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
 		{fieldname: "distname", datakey: "distName", fieldtype: "varchar"},
 	})
-	if e != nil {
-		return e
+	if err != nil {
+		return
 	}
-	e = odbci.createClass("x1003", "/cgitest/x10/x1003", []*fieldinfo{
+	err = odbci.createClass("x1003", "/cgitest/x10/x1003", []*fieldinfo{
 		{fieldname: "fromuniqueid", datakey: "FROMUNIQUEID", fieldtype: "varchar", keyidx: 1},
 		{fieldname: "touniqueid", datakey: "TOUNIQUEID", fieldtype: "varchar", keyidx: 2},
 	})
-	if e != nil {
-		return e
+	if err != nil {
+		return
 	}
+
 	return
 }
 
@@ -112,9 +130,24 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 func (odbci *ODBCImporter) done() {
 	odbci.classinfos.Fetch(func(cn string, ci *classinfo) bool {
 		ci.mutex.Lock()
-		ci.lastlogtime = time.Now()
-		ci.lastlogicount = ci.insertcount
-		logger.Info("class", ci.classname, "import", ci.insertcount, "records")
+		if ci.lastlogicount != ci.insertcount {
+			ci.lastlogtime = time.Now()
+			ci.lastlogicount = ci.insertcount
+			logger.Info("class", ci.classname, "import", ci.insertcount, "records")
+		}
+		ci.mutex.Unlock()
+		return true
+	})
+}
+
+func (odbci *ODBCImporter) alldone() {
+	odbci.classinfos.Fetch(func(cn string, ci *classinfo) bool {
+		ci.mutex.Lock()
+		if ci.insertcount != 0 {
+			ci.lastlogtime = time.Now()
+			ci.lastlogicount = ci.insertcount
+			logger.Info("class", ci.classname, "import", ci.insertcount, "records")
+		}
 		ci.mutex.Unlock()
 		return true
 	})

+ 1 - 0
odbc/odbclient.go

@@ -37,6 +37,7 @@ expire=14d
 
 var Logger = ulog.New().WithConfig(Config, "log")
 var LogFile = Logger.FileOutPath()
+var LogDebug = Logger.FileOutLevel() <= ulog.DEBUG
 
 var ODBClient odb.Client
 var ODBError error