Explorar o código

重复数据检查

libf hai 3 meses
pai
achega
7a3429e7d9
Modificáronse 6 ficheiros con 42 adicións e 20 borrados
  1. 2 2
      go.mod
  2. 4 4
      go.sum
  3. 1 0
      importer/datainfo.go
  4. 3 1
      importer/importer.go
  5. 29 10
      importer/odbcimporter.go
  6. 3 3
      main.go

+ 2 - 2
go.mod

@@ -4,11 +4,11 @@ go 1.20
 
 require (
 	git.wecise.com/wecise/odb-go v0.0.0-20250208123107-b502a8626316
-	git.wecise.com/wecise/util v0.0.0-20250211091151-e5a2f521412f
+	git.wecise.com/wecise/util v0.0.0-20250211130446-7940676f0a44
 	github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
 	github.com/scylladb/go-set v1.0.3-0.20200225121959-cc7b2070d91e
 	github.com/spf13/cast v1.7.0
-	github.com/wecisecode/util v0.0.0-20250211090829-552742b3dfbb
+	github.com/wecisecode/util v0.0.0-20250211095752-7b0229971c4a
 	modernc.org/sqlite v1.30.1
 )
 

+ 4 - 4
go.sum

@@ -1,7 +1,7 @@
 git.wecise.com/wecise/odb-go v0.0.0-20250208123107-b502a8626316 h1:4ljPCr2MUA9w3HGRiS4dwYuqKJAbuLBoaho70kO8D8I=
 git.wecise.com/wecise/odb-go v0.0.0-20250208123107-b502a8626316/go.mod h1:0/+7FWRDsMND6k6fgW836IXTYEd1vzOdBuIDEA2FnX8=
-git.wecise.com/wecise/util v0.0.0-20250211091151-e5a2f521412f h1:NPco4igK5uBw7lTwFP4H4Yg06uEn6RPMh3xEbkgb1g4=
-git.wecise.com/wecise/util v0.0.0-20250211091151-e5a2f521412f/go.mod h1:9349QCkjPB2gs9a+kCzwHgMtOuf+KovY/2uRkLT4w3A=
+git.wecise.com/wecise/util v0.0.0-20250211130446-7940676f0a44 h1:kfzUFxCxLdlhOoBaP0SFYCwvTl6qFOReFnNXZJ5sel4=
+git.wecise.com/wecise/util v0.0.0-20250211130446-7940676f0a44/go.mod h1:9349QCkjPB2gs9a+kCzwHgMtOuf+KovY/2uRkLT4w3A=
 github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
 github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
 github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
@@ -77,8 +77,8 @@ github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IU
 github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
 github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
 github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
-github.com/wecisecode/util v0.0.0-20250211090829-552742b3dfbb h1:JNKgZsGGuGzbGG4i0ejVrmdu2mSRbIIfKOM9P6V66eI=
-github.com/wecisecode/util v0.0.0-20250211090829-552742b3dfbb/go.mod h1:JuKKMe2bEPXEDz3oFuCidZnW+0jdXWhUGFNPmkgsz78=
+github.com/wecisecode/util v0.0.0-20250211095752-7b0229971c4a h1:vA4zLRyhLT8ggWQ9z0GKXwOdflvlUj4srN2rJTEShu4=
+github.com/wecisecode/util v0.0.0-20250211095752-7b0229971c4a/go.mod h1:JuKKMe2bEPXEDz3oFuCidZnW+0jdXWhUGFNPmkgsz78=
 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 go.etcd.io/etcd/api/v3 v3.5.8 h1:Zf44zJszoU7zRV0X/nStPenegNXoFDWcB/MwrJbA+L4=

+ 1 - 0
importer/datainfo.go

@@ -61,6 +61,7 @@ func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
 			if e != nil && !strings.Contains(e.Error(), "already exist") {
 				return e
 			}
+			logger.Info(createedgemql)
 		}
 	}
 	return

+ 3 - 1
importer/importer.go

@@ -37,7 +37,7 @@ type Importer struct {
 }
 
 func ImportDir(datapath string, parallel int, reload bool) (totalfilescount, totalrecordscount int64, totalusetime time.Duration, filescount, recordscount int64, usetime time.Duration, err error) {
-	concurlimt := mcfg.GetInt("odbc.concurrent.limit", parallel*5)
+	concurlimt := mcfg.GetInt("odbc.concurrent.limit", 50)
 	importer := &Importer{
 		datapath:     datapath,
 		parallel:     parallel,
@@ -172,6 +172,7 @@ func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FW
 		importer.fileimportrc.ConcurCall(1,
 			func() {
 				defer wg.Done()
+				logger.Info("import", "file", filename)
 				importer.importstatus.mutex.RLock()
 				importstatus := importer.importstatus.ImportStatus[filename]
 				importer.importstatus.mutex.RUnlock()
@@ -193,6 +194,7 @@ func (importer *Importer) ImportFiles(fwop func(basedir string, fpath string) FW
 				importer.importstatus.TotalUseTime = time.Since(importer.starttime)
 				importer.importstatus.mutex.Unlock()
 				importer.importstatus.Save()
+				logger.Info("file", filename, "imported", records, "records")
 			},
 		)
 		return true

+ 29 - 10
importer/odbcimporter.go

@@ -123,9 +123,11 @@ func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string,
 		if level1data == nil {
 			// 先插入 master
 			masterdatas.Set(suid, data)
+			// 用 master 数据生成不完整的 level1 数据
 			level1data = map[string]any{}
 			for k, v := range data {
 				if k == "id" {
+					// master oid -> 重新生成 level1 oid
 					oid, _, e := graph.GetNodeId("level1", data)
 					if e != nil {
 						return e
@@ -137,16 +139,19 @@ func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string,
 		} else {
 			// 后插入 master
 			level1datas.Remove(suid)
-			// 用 master 补齐 level1 数据
+			// 用 level1 补齐 master 数据
 			// data 数据不能变,需要后续插入 master
+			entiredata := map[string]any{}
 			for k, v := range data {
-				if _, has := level1data[k]; !has {
-					level1data[k] = v
-				}
+				entiredata[k] = v
 			}
+			for k, v := range level1data {
+				entiredata[k] = v
+			}
+			level1data = entiredata
 		}
 		// 重新插入完整的 level1
-		e := odbci.insertData("level1", level1data)
+		e := odbci.insertData("level1", "", "", level1data)
 		if e != nil {
 			return e
 		}
@@ -159,14 +164,18 @@ func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string,
 			// 后插入 level1
 			masterdatas.Remove(suid)
 			// 用 level1 补齐 master 数据
+			entiredata := map[string]any{}
+			for k, v := range masterdata {
+				entiredata[k] = v
+			}
 			for k, v := range data {
-				masterdata[k] = v
+				entiredata[k] = v
 			}
 			// 完整 level1 数据
-			data = masterdata
+			data = entiredata
 		}
 		// 插入 level1 数据
-		e := odbci.insertData("level1", data)
+		e := odbci.insertData("level1", "", "", data)
 		if e != nil {
 			return e
 		}
@@ -309,7 +318,7 @@ func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any
 	} else {
 		data["depend"] = referencedata(classaliasname, data)
 	}
-	return odbci.insertData(classaliasname, data)
+	return odbci.insertData(classaliasname, oid, suid, data)
 }
 
 type InnerData struct {
@@ -366,7 +375,7 @@ func referencedata(classaliasname string, data map[string]any) (depend map[strin
 	return
 }
 
-func (odbci *ODBCImporter) insertData(classaliasname string, data map[string]any) (err error) {
+func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, data map[string]any) (err error) {
 	cdi := classdatainfos.GetIFPresent(classaliasname)
 	if cdi == nil {
 		return merrs.NewError("class not defined " + classaliasname)
@@ -419,6 +428,16 @@ func (odbci *ODBCImporter) insertData(classaliasname string, data map[string]any
 		values = append(values, v)
 	}
 	if odbci.client != nil {
+		if odbc.LogDebug && oid != "" {
+			mql := "select id,uniqueid from " + classaliasname + " where id=?"
+			r, e := odbci.client.Query(mql, oid).Do()
+			if e != nil {
+				return e
+			}
+			if r != nil && len(r.Data) != 0 {
+				logger.Info(classaliasname, "exists id:", oid, ", uniqueid:", r.Data[0]["uniqueid"], ", new uniqueid:", suid)
+			}
+		}
 		// logger.Info(values...)
 		_, err = odbci.client.Query(cdi.Insertmql, values...).Do()
 		if err != nil {

+ 3 - 3
main.go

@@ -47,12 +47,12 @@ func main() {
 		panic(e)
 	}
 	if totalfilescount == 0 {
-		fmt.Println(`not found data files in "` + datapath + `"`)
+		logger.Info(`not found data files in "` + datapath + `"`)
 		return
 	}
 	// 输出统计信息
-	fmt.Println("import", filescount, "files", recordscount, "records", "in", usetime)
-	fmt.Println("total import", totalfilescount, "files", totalrecordscount, "records", "in", totalusetime)
+	logger.Info("import", filescount, "files", recordscount, "records", "in", usetime)
+	logger.Info("total import", totalfilescount, "files", totalrecordscount, "records", "in", totalusetime)
 	fmt.Println("access", odbc.LogFile, "for detail information")
 
 	// 验证