Bladeren bron

combin master level1

libf 4 maanden geleden
bovenliggende
commit
89b878db3c
3 gewijzigde bestanden met toevoegingen van 127 en 77 verwijderingen
  1. 5 9
      importer/importer.go
  2. 120 66
      importer/odbcimporter.go
  3. 2 2
      schema/classinfo.go

+ 5 - 9
importer/importer.go

@@ -288,18 +288,13 @@ func (importer *Importer) importRecord(record map[string]any, line string, filen
 		}
 		logger.Debug(fmt.Sprint("import ", filename, "[", linecount, "]:", string(bs)))
 	}
-	var classname string
+	var classaliasname string
 	switch filetype {
 	case schema.FT_EDGE:
-		// err = importer.odbcimporter.InsertEdge(record)
-		// if err != nil {
-		// 	err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
-		// 	return
-		// }
 		graph.CacheEdgeInfo(record)
 	default:
-		classname = string(filetype)
-		err = importer.odbcimporter.InsertData(classname, record)
+		classaliasname = string(filetype)
+		err = importer.odbcimporter.InsertData(classaliasname, record)
 		if err != nil {
 			err = merrs.NewError(err, merrs.SSMaps{{"filename": filename}, {"linecount": fmt.Sprint(linecount)}, {"line": line}})
 			return
@@ -322,7 +317,8 @@ func Check() {
 		return
 	}
 	{
-		mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
+		// mql := "select id,uniqueid,tags,contain,day,vtime from level1 where uniqueid='E2E:OTR0002L'"
+		mql := "select * from level1 where uniqueid='E2E:OTR0002L'"
 		r, e := client.Query(mql).Do()
 		if e != nil {
 			panic(merrs.NewError(e))

+ 120 - 66
importer/odbcimporter.go

@@ -16,6 +16,7 @@ import (
 	"git.wecise.com/wecise/util/cast"
 	"git.wecise.com/wecise/util/cmap"
 	"git.wecise.com/wecise/util/merrs"
+	"github.com/dgryski/go-farm"
 	"github.com/scylladb/go-set/strset"
 )
 
@@ -111,59 +112,117 @@ func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
 // 	return
 // }
 
-var cm_object_id_cache = cmap.New[string, chan string]()
+// var cm_object_id_cache = cmap.New[string, chan string]()
 
-func object_id_cache(classaliasname, suid string) chan string {
-	choid, _ := cm_object_id_cache.GetWithNew(classaliasname+":"+suid,
-		func() (chan string, error) {
-			ch := make(chan string, 2)
-			return ch, nil
-		})
-	return choid
-}
+// func object_id_cache(classaliasname, suid string) chan string {
+// 	choid, _ := cm_object_id_cache.GetWithNew(classaliasname+":"+suid,
+// 		func() (chan string, error) {
+// 			ch := make(chan string, 2)
+// 			return ch, nil
+// 		})
+// 	return choid
+// }
 
-func get_object_id_from_cache(classaliasname, suid string) string {
-	choid := object_id_cache(classaliasname, suid)
-	oid := <-choid
-	push_object_id_into_cache(choid, oid)
-	return oid
-}
+// func get_object_id_from_cache(classaliasname, suid string) string {
+// 	choid := object_id_cache(classaliasname, suid)
+// 	oid := <-choid
+// 	push_object_id_into_cache(choid, oid)
+// 	return oid
+// }
+
+// func push_object_id_into_cache(choid chan string, oid string) {
+// 	choid <- oid
+// 	if len(choid) == 2 {
+// 		// 最多保留 1 个
+// 		// chan cap = 2,第三个元素进不来
+// 		// 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
+// 		<-choid
+// 	}
+// }
+
+var masterlevel1mutex = make([]sync.Mutex, 256)
+var masterdatas = cmap.New[string, map[string]any]()
+var level1datas = cmap.New[string, map[string]any]()
 
-func push_object_id_into_cache(choid chan string, oid string) {
-	choid <- oid
-	if len(choid) == 2 {
-		// 最多保留 1 个
-		// chan cap = 2,第三个元素进不来
-		// 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
-		<-choid
+func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) error {
+	hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
+	masterlevel1mutex[hidx].Lock()
+	defer masterlevel1mutex[hidx].Unlock()
+	switch classaliasname {
+	case "master":
+		level1data := level1datas.GetIFPresent(suid)
+		if level1data == nil {
+			// 先插入 master
+			masterdatas.Set(suid, data)
+		} else {
+			// 后插入 master
+			level1datas.Remove(suid)
+			// 用 master 补齐 level1 数据
+			// data 数据不能变,需要后续插入 master
+			for k, v := range data {
+				if _, has := level1data[k]; !has {
+					level1data[k] = v
+				}
+			}
+			// 重新插入完整的 level1
+			e := odbci.insertData("level1", level1data)
+			if e != nil {
+				return e
+			}
+		}
+	case "level1":
+		masterdata := masterdatas.GetIFPresent(suid)
+		if masterdata == nil {
+			// 先插入 level 1
+			level1datas.Set(suid, data)
+		} else {
+			// 后插入 level1
+			masterdatas.Remove(suid)
+			// 用 level1 补齐 master 数据
+			for k, v := range data {
+				masterdata[k] = v
+			}
+			// 完整 level1 数据
+			data = masterdata
+		}
+		// 插入 level1 数据
+		e := odbci.insertData("level1", data)
+		if e != nil {
+			return e
+		}
 	}
+	return nil
 }
 
 // 插入数据
-func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
-	cdi := classdatainfos.GetIFPresent(classname)
-	if cdi == nil {
-		return merrs.NewError("class not defined " + classname)
+func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (err error) {
+	oid, suid, e := graph.GetNodeId(classaliasname, data)
+	if e != nil {
+		return e
 	}
-	if cdi.Insertmql == "" {
-		return merrs.NewError("class no fields to insert " + classname)
-	}
-	innerdata := &InnerData{}
-	innerdata.oid, innerdata.suid, err = graph.GetNodeId(cdi.Classaliasname, data)
-	if err != nil {
-		return
-	}
-	if cdi.Classaliasname == "level1" {
-		ei := graph.GetEdgeInfo(innerdata.oid)
+	data["id"] = oid
+	if classaliasname == "master" {
+		e := odbci.masterlevel1data(classaliasname, suid, data)
+		if e != nil {
+			return e
+		}
+	} else if classaliasname == "level1" {
+		ei := graph.GetEdgeInfo(oid)
 		if ei != nil {
-			innerdata.contain = ei["contain"]
-			innerdata.depend = ei["depend"]
-			innerdata.topology = ei["topology"]
+			data["contain"] = ei["contain"]
+			data["depend"] = ei["depend"]
+			data["topology"] = ei["topology"]
+		}
+		e := odbci.masterlevel1data(classaliasname, suid, data)
+		if e != nil {
+			return e
 		}
+		// 数据已经在 masterlevel1data 中插入完成
+		return
 	} else {
-		innerdata.depend = referencedata(classname, data)
+		data["depend"] = referencedata(classaliasname, data)
 	}
-	return odbci.insertData(classname, cdi, innerdata, data)
+	return odbci.insertData(classaliasname, data)
 }
 
 type InnerData struct {
@@ -174,7 +233,7 @@ type InnerData struct {
 	topology map[string][]string
 }
 
-func referencedata(classname string, data map[string]any) (depend map[string][]string) {
+func referencedata(classaliasname string, data map[string]any) (depend map[string][]string) {
 	refer := data["_references"]
 	switch vv := refer.(type) {
 	case []interface{}:
@@ -187,7 +246,7 @@ func referencedata(classname string, data map[string]any) (depend map[string][]s
 					case "_toUniqueId":
 						suid := cast.ToString(v)
 						toclassname := "master"
-						switch classname {
+						switch classaliasname {
 						case "level1":
 							toclassname = "level1"
 						case "level2":
@@ -220,38 +279,33 @@ func referencedata(classname string, data map[string]any) (depend map[string][]s
 	return
 }
 
-func (odbci *ODBCImporter) insertData(classname string, cdi *classdatainfo, innerdata *InnerData, data map[string]any) (err error) {
+func (odbci *ODBCImporter) insertData(classaliasname string, data map[string]any) (err error) {
+	cdi := classdatainfos.GetIFPresent(classaliasname)
+	if cdi == nil {
+		return merrs.NewError("class not defined " + classaliasname)
+	}
+	if cdi.Insertmql == "" {
+		return merrs.NewError("class no fields to insert " + classaliasname)
+	}
 	values := []any{}
 	for _, fn := range cdi.Fieldslist {
-		// 内部字段
-		switch fn {
-		case "id":
-			values = append(values, innerdata.oid)
-			continue
-		case "contain":
-			values = append(values, innerdata.contain)
-			continue
-		case "depend":
-			values = append(values, innerdata.depend)
-			continue
-		case "topology":
-			values = append(values, innerdata.topology)
+		fi := cdi.Fieldinfos[fn]
+		if fi == nil {
+			values = append(values, data[fn])
 			continue
 		}
-		fi := cdi.Fieldinfos[fn]
 		// 合并扩展字段
 		if strset.New(fi.Datakey...).Has("*") {
+			if fi.Fieldtype != "map<varchar,varchar>" {
+				return merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
+			}
 			td := map[string]any{}
 			for k, v := range data {
 				if cdi.DatakeyFieldinfos[k] == nil {
-					td[k] = v
+					td[k] = cast.ToString(v)
 				}
 			}
-			tdbs, e := json.Marshal(td)
-			if e != nil {
-				return merrs.NewError(e)
-			}
-			values = append(values, string(tdbs))
+			values = append(values, td)
 			continue
 		}
 		// 字段类型修正
@@ -273,11 +327,12 @@ func (odbci *ODBCImporter) insertData(classname string, cdi *classdatainfo, inne
 			}
 		}
 		if fn == "tags" {
-			v = append(cast.ToStringSlice(v), classname)
+			v = append(cast.ToStringSlice(v), classaliasname)
 		}
 		values = append(values, v)
 	}
 	if odbci.client != nil {
+		// logger.Info(values...)
 		_, err = odbci.client.Query(cdi.Insertmql, values...).Do()
 		if err != nil {
 			databs, _ := json.MarshalIndent(data, "", "  ")
@@ -285,7 +340,6 @@ func (odbci *ODBCImporter) insertData(classname string, cdi *classdatainfo, inne
 			logger.Error(err)
 			return
 		}
-		push_object_id_into_cache(object_id_cache(cdi.Classaliasname, innerdata.suid), innerdata.oid)
 	}
 	atomic.AddInt64(&cdi.insertcount, 1)
 	cdi.mutex.Lock()

+ 2 - 2
schema/classinfo.go

@@ -70,7 +70,7 @@ func init() {
 			{Fieldname: "distname", Fieldtype: "varchar", Datakey: []string{"distName"}},
 			{Fieldname: "name", Fieldtype: "varchar", Datakey: []string{"name"}},
 			{Fieldname: "entitytypes", Fieldtype: "set<varchar>", Datakey: []string{"entityTypes"}},
-			{Fieldname: "extraattr", Fieldtype: "varchar", Datakey: []string{"*"}},
+			{Fieldname: "extends", Fieldtype: "map<varchar,varchar>", Datakey: []string{"*"}},
 			{Fieldname: "tags", Fieldtype: "set<varchar>", Datakey: []string{"tags"}},
 		},
 		[2]string{"key", "manu"},
@@ -81,7 +81,7 @@ func init() {
 			{Fieldname: "distname", Fieldtype: "varchar", Datakey: []string{"distName"}},
 			{Fieldname: "name", Fieldtype: "varchar", Datakey: []string{"name"}},
 			{Fieldname: "entitytypes", Fieldtype: "set<varchar>", Datakey: []string{"entityTypes"}},
-			{Fieldname: "extraattr", Fieldtype: "varchar", Datakey: []string{"*"}},
+			{Fieldname: "extends", Fieldtype: "map<varchar,varchar>", Datakey: []string{"*"}},
 			{Fieldname: "tags", Fieldtype: "set<varchar>", Datakey: []string{"tags"}},
 		},
 		[2]string{"partition", "name"},