Ver Fonte

data insert

libf há 5 meses atrás
pai
commit
a55c390da9
2 ficheiros alterados com 84 adições e 20 exclusões
  1. 25 1
      importer/importer.go
  2. 59 19
      importer/odbcimporter.go

+ 25 - 1
importer/importer.go

@@ -132,6 +132,12 @@ func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, to
 			err = e
 			return
 		}
+	} else {
+		e = importer.odbcimporter.reload()
+		if e != nil {
+			err = e
+			return
+		}
 	}
 	totalfilescount = int64(len(cgistatus.ImportStatus))
 	for _, v := range cgistatus.ImportStatus {
@@ -200,6 +206,9 @@ func (importer *Importer) importReader(filename string, buf io.Reader) (blockcou
 		return blockcount, merrs.NewError(e, merrs.SSMaps{{"filename": filename}})
 	}
 	var wg sync.WaitGroup
+	defer func() {
+		importer.done()
+	}()
 	defer wg.Wait()
 	for {
 		if err != nil {
@@ -235,13 +244,28 @@ func (importer *Importer) importRecord(record map[string]any, filename string, l
 		return e
 	}
 	logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
+	var classname string
+	switch {
+	case record["uniqueId"] != nil:
+		classname = "/cgitest/x10/x1002"
+	case record["UNIQUEID"] != nil:
+		classname = "/cgitest/x10/x1001"
+	case record["FROMUNIQUEID"] != nil:
+		classname = "/cgitest/x10/x1003"
+	default:
+		return merrs.NewError("no mapping classname", merrs.SSMaps{{"data": string(bs)}})
+	}
 	e = importer.odbcimporter.ReviseClassStruct(record)
 	if e != nil {
 		return e
 	}
-	e = importer.odbcimporter.InsertData("/cgitest/x10/x1002", record)
+	e = importer.odbcimporter.InsertData(classname, record)
 	if e != nil {
 		return e
 	}
 	return
 }
+
+func (importer *Importer) done() {
+	importer.odbcimporter.done()
+}

+ 59 - 19
importer/odbcimporter.go

@@ -20,15 +20,16 @@ type fieldinfo struct {
 }
 
 type classinfo struct {
-	classname   string
-	nickname    string
-	fieldinfos  map[string]*fieldinfo
-	keyfields   []string
-	fieldslist  []string
-	insertmql   string
-	insertcount int64
-	lastlogtime time.Time
-	mutex       sync.Mutex
+	classname     string
+	nickname      string
+	fieldinfos    map[string]*fieldinfo
+	keyfields     []string
+	fieldslist    []string
+	insertmql     string
+	insertcount   int64
+	lastlogtime   time.Time
+	lastlogicount int64
+	mutex         sync.Mutex
 }
 
 type ODBCImporter struct {
@@ -49,15 +50,13 @@ func (odbci *ODBCImporter) ReviseClassStruct(data map[string]any) (err error) {
 	if e != nil {
 		return e
 	}
-	e = odbci.createClass("x10", "/cgitest/x10", []*fieldinfo{
-		{fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
-	})
+	e = odbci.createClass("x10", "/cgitest/x10", nil)
 	if e != nil {
 		return e
 	}
 	e = odbci.createClass("x1001", "/cgitest/x10/x1001", []*fieldinfo{
-		{fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
-		{fieldname: "distname", datakey: "distName", fieldtype: "varchar"},
+		{fieldname: "uniqueid", datakey: "UNIQUEID", fieldtype: "varchar", keyidx: 1},
+		{fieldname: "distname", datakey: "BASENAME", fieldtype: "varchar"},
 	})
 	if e != nil {
 		return e
@@ -69,15 +68,18 @@ func (odbci *ODBCImporter) ReviseClassStruct(data map[string]any) (err error) {
 	if e != nil {
 		return e
 	}
+	e = odbci.createClass("x1003", "/cgitest/x10/x1003", []*fieldinfo{
+		{fieldname: "fromuniqueid", datakey: "FROMUNIQUEID", fieldtype: "varchar", keyidx: 1},
+		{fieldname: "touniqueid", datakey: "TOUNIQUEID", fieldtype: "varchar", keyidx: 2},
+	})
+	if e != nil {
+		return e
+	}
 	return
 }
 
 // 插入数据
 func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
-	if data["uniqueId"] == nil {
-		// TODO 开发期只导入部分数据
-		return
-	}
 	ci := odbci.classinfos.GetIFPresent(classname)
 	if ci == nil {
 		return merrs.NewError("class not defined " + classname)
@@ -98,14 +100,52 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 	}
 	atomic.AddInt64(&ci.insertcount, 1)
 	ci.mutex.Lock()
-	if time.Since(ci.lastlogtime) > 5*time.Second {
+	if time.Since(ci.lastlogtime) > 5*time.Second && ci.lastlogicount != ci.insertcount {
 		ci.lastlogtime = time.Now()
+		ci.lastlogicount = ci.insertcount
 		logger.Info("class", ci.classname, "import", ci.insertcount, "records")
 	}
 	ci.mutex.Unlock()
 	return
 }
 
+func (odbci *ODBCImporter) done() {
+	odbci.classinfos.Fetch(func(cn string, ci *classinfo) bool {
+		ci.mutex.Lock()
+		ci.lastlogtime = time.Now()
+		ci.lastlogicount = ci.insertcount
+		logger.Info("class", ci.classname, "import", ci.insertcount, "records")
+		ci.mutex.Unlock()
+		return true
+	})
+}
+
+func (odbci *ODBCImporter) reload() error {
+	_, e := odbci.client.Query(`delete from "/cgitest/x10/x1001" with version`).Do()
+	_ = e
+	_, e = odbci.client.Query(`delete from "/cgitest/x10/x1002" with version`).Do()
+	_ = e
+	_, e = odbci.client.Query(`delete from "/cgitest/x10/x1003" with version`).Do()
+	_ = e
+	_, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1001"`).Do()
+	if e != nil {
+		return e
+	}
+	_, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1002"`).Do()
+	if e != nil {
+		return e
+	}
+	_, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1003"`).Do()
+	if e != nil {
+		return e
+	}
+	_, e = odbci.client.Query(`drop class if exists "/cgitest/x10"`).Do()
+	if e != nil {
+		return e
+	}
+	return nil
+}
+
 // 新建类
 func (odbci *ODBCImporter) createClass(nickname, classname string, fieldinfoslist []*fieldinfo) (err error) {
 	_, err = odbci.classinfos.GetWithNew(classname, func() (ci *classinfo, err error) {