libf 4 tháng trước cách đây
mục cha
commit
8e1b977466
4 tập tin đã thay đổi với 142 bổ sung1 xóa
  1. 21 0
      importer/classinfo.go
  2. 14 0
      importer/importer.go
  3. 104 1
      importer/odbcimporter.go
  4. 3 0
      main.go

+ 21 - 0
importer/classinfo.go

@@ -63,6 +63,7 @@ func init() {
 			{fieldname: "lastupdated", fieldtype: "timestamp", datakey: "LASTUPDATED"},
 		},
 		[2]string{"partition", "name"},
+		[2]string{"key", "manu"},
 	)
 	newclassinfo("minfo", "minfo", "m3cnet",
 		[]*fieldinfo{
@@ -73,30 +74,39 @@ func init() {
 			{fieldname: "extraattr", fieldtype: "varchar", datakey: ""},
 			{fieldname: "tags", fieldtype: "set<varchar>", datakey: "tags"},
 		},
+		[2]string{"key", "manu"},
 	)
 	newclassinfo("level1", "level1", "minfo", nil,
 		[2]string{"partition", "name"},
+		[2]string{"key", "manu"},
 	)
 	newclassinfo("level2", "level2", "minfo", nil,
 		[2]string{"partition", "name"},
+		[2]string{"key", "manu"},
 	)
 	newclassinfo("level3", "level3", "minfo", nil,
 		[2]string{"partition", "name"},
+		[2]string{"key", "manu"},
 	)
 	newclassinfo("level4", "level4", "minfo", nil,
 		[2]string{"partition", "name"},
+		[2]string{"key", "manu"},
 	)
 	newclassinfo("level5", "level5", "minfo", nil,
 		[2]string{"partition", "name"},
+		[2]string{"key", "manu"},
 	)
 	newclassinfo("level6", "level6", "minfo", nil,
 		[2]string{"partition", "name"},
+		[2]string{"key", "manu"},
 	)
 	newclassinfo("level7", "level7", "minfo", nil,
 		[2]string{"partition", "name"},
+		[2]string{"key", "manu"},
 	)
 	newclassinfo("level8", "level8", "minfo", nil,
 		[2]string{"partition", "name"},
+		[2]string{"key", "manu"},
 	)
 }
 
@@ -150,6 +160,9 @@ func newclassinfo(aliasname, classname, baseclassname string, fieldinfoslist []*
 	createmql += `)with alias='` + aliasname + `'`
 	for _, withoption := range withoptions {
 		createmql += " and " + withoption[0] + "=" + withoption[1]
+		if withoption[0] == "key" && withoption[1] == "manu" {
+			fieldslist = append([]string{"id"}, fieldslist...)
+		}
 	}
 
 	var insertmql string
@@ -223,6 +236,14 @@ create class if not exists level7 : minfo () with partition=entitytypes , alias=
 create class if not exists level8 : minfo () with partition=entitytypes , alias='level8' ;
 `
 
+var relations = map[string]string{
+	"contains": "contain",
+	"contain":  "contain",
+	"dependon": "depend",
+	"depend":   "depend",
+	"topology": "topology",
+}
+
 var createedgemqls = []string{
 	`create edge type m3cnet.contain`,
 	`create edge type m3cnet.depend`,

+ 14 - 0
importer/importer.go

@@ -303,3 +303,17 @@ func (importer *Importer) alldone() {
 func (importer *Importer) done() {
 	importer.odbcimporter.done()
 }
+
+func Check() {
+	client := odbc.ODBClient
+	if client == nil {
+		return
+	}
+	mql := "select * from /m3cnet/master"
+	r, e := client.Query(mql).Do()
+	if e != nil {
+		panic(merrs.NewError(e))
+	}
+	bs, _ := json.MarshalIndent(r.Data, "", "  ")
+	fmt.Println(string(bs))
+}

+ 104 - 1
importer/odbcimporter.go

@@ -1,6 +1,7 @@
 package importer
 
 import (
+	"encoding/base64"
 	"encoding/json"
 	"fmt"
 	"strings"
@@ -69,10 +70,100 @@ func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
 }
 
 func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
-
+	extraattr := map[string]string{}
+	fromuid := ""
+	touid := ""
+	edgetype := ""
+	for k, v := range data {
+		switch k {
+		case "FROMUNIQUEID":
+			fromuid = cast.ToString(v)
+		case "TOUNIQUEID":
+			touid = cast.ToString(v)
+		case "EDGETYPE":
+			edgetype = cast.ToString(v)
+		default:
+			extraattr[k] = cast.ToString(v)
+		}
+	}
+	if fromuid == "" {
+		databs, _ := json.MarshalIndent(data, "", "  ")
+		return merrs.NewError("not found valid fromuniqueid in data ", merrs.SSMap{"data": string(databs)})
+	}
+	if touid == "" {
+		databs, _ := json.MarshalIndent(data, "", "  ")
+		return merrs.NewError("not found valid touniqueid in data ", merrs.SSMap{"data": string(databs)})
+	}
+	if edgetype == "" {
+		databs, _ := json.MarshalIndent(data, "", "  ")
+		return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
+	}
+	edgetype = relations[edgetype]
+	if edgetype == "" {
+		databs, _ := json.MarshalIndent(data, "", "  ")
+		return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
+	}
+	if odbci.client != nil {
+		foid := get_object_id_from_cache(fromuid)
+		toid := get_object_id_from_cache(touid)
+		eabs, _ := json.Marshal(extraattr)
+		quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
+		_, err = odbci.client.Query(quadmql).Do()
+		if err != nil {
+			err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}})
+			logger.Error(err)
+			return
+		}
+	}
 	return
 }
 
+var cm_object_id_cache = cmap.New[string, chan string]()
+
+func object_id_cache(suid string) chan string {
+	choid, _ := cm_object_id_cache.GetWithNew(suid,
+		func() (chan string, error) {
+			ch := make(chan string, 2)
+			return ch, nil
+		})
+	return choid
+}
+
+func get_object_id_from_cache(suid string) string {
+	choid := object_id_cache(suid)
+	oid := <-choid
+	push_object_id_into_cache(choid, oid)
+	return oid
+}
+
+func push_object_id_into_cache(choid chan string, oid string) {
+	choid <- oid
+	if len(choid) == 2 {
+		// 最多保留 1 个
+		// chan cap = 2,第三个元素进不来
+		// 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
+		<-choid
+	}
+}
+
+func object_id(classaliasname string, data map[string]any) (oid, suid string, err error) {
+	uid := data["uniqueId"]
+	if uid == nil {
+		uid = data["UNIQUEID"]
+		if uid == nil {
+			databs, _ := json.MarshalIndent(data, "", "  ")
+			return "", "", merrs.NewError("not found uniqueid in data ", merrs.SSMap{"data": string(databs)})
+		}
+	}
+	suid = cast.ToString(uid)
+	if suid == "" {
+		databs, _ := json.MarshalIndent(data, "", "  ")
+		return "", "", merrs.NewError("not found valid uniqueid in data ", merrs.SSMap{"data": string(databs)})
+	}
+	suid64 := base64.RawURLEncoding.EncodeToString([]byte(suid))
+	return classaliasname + ":" + suid64, suid, nil
+}
+
 // 插入数据
 func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
 	cdi := classdatainfos.GetIFPresent(classname)
@@ -82,8 +173,16 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 	if cdi.insertmql == "" {
 		return merrs.NewError("class no fields to insert " + classname)
 	}
+	oid, suid, e := object_id(cdi.aliasname, data)
+	if e != nil {
+		return e
+	}
 	values := []any{}
 	for _, fn := range cdi.fieldslist {
+		if fn == "id" {
+			values = append(values, oid)
+			continue
+		}
 		fi := cdi.fieldinfos[fn]
 		if fi.datakey == "" {
 			td := map[string]any{}
@@ -110,6 +209,9 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 			}
 			v = tv.Format("2006-01-02 15:04:05.000000")
 		}
+		if fn == "tags" {
+			v = append(cast.ToStringSlice(v), classname)
+		}
 		values = append(values, v)
 	}
 	if odbci.client != nil {
@@ -119,6 +221,7 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
 			logger.Error(err)
 			return
 		}
+		push_object_id_into_cache(object_id_cache(suid), oid)
 	}
 	atomic.AddInt64(&cdi.insertcount, 1)
 	cdi.mutex.Lock()

+ 3 - 0
main.go

@@ -35,4 +35,7 @@ func main() {
 	fmt.Println("import", filescount, "files", recordscount, "records", "in", usetime)
 	fmt.Println("total import", totalfilescount, "files", totalrecordscount, "records", "in", totalusetime)
 	fmt.Println("access", odbc.LogFile, "for detail information")
+
+	// 验证
+	importer.Check()
 }