|
@@ -4,6 +4,7 @@ import (
|
|
"encoding/base64"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"fmt"
|
|
"fmt"
|
|
|
|
+ "regexp"
|
|
"strings"
|
|
"strings"
|
|
"sync"
|
|
"sync"
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
@@ -98,14 +99,19 @@ func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
|
|
databs, _ := json.MarshalIndent(data, "", " ")
|
|
databs, _ := json.MarshalIndent(data, "", " ")
|
|
return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
|
|
return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
|
|
}
|
|
}
|
|
|
|
+ return odbci.insertEdge(edgetype, fromuid, touid, extraattr, data)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (odbci *ODBCImporter) insertEdge(edgetype, fromuid, touid string, extraattr map[string]string, data map[string]any) (err error) {
|
|
|
|
+
|
|
edgetype = relations[edgetype]
|
|
edgetype = relations[edgetype]
|
|
if edgetype == "" {
|
|
if edgetype == "" {
|
|
databs, _ := json.MarshalIndent(data, "", " ")
|
|
databs, _ := json.MarshalIndent(data, "", " ")
|
|
return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
|
|
return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
|
|
}
|
|
}
|
|
if odbci.client != nil {
|
|
if odbci.client != nil {
|
|
- foid := get_object_id_from_cache(fromuid)
|
|
|
|
- toid := get_object_id_from_cache(touid)
|
|
|
|
|
|
+ foid := get_object_id_from_cache("master:" + fromuid)
|
|
|
|
+ toid := to_object_id("level1", touid)
|
|
eabs, _ := json.Marshal(extraattr)
|
|
eabs, _ := json.Marshal(extraattr)
|
|
quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
|
|
quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
|
|
_, err = odbci.client.Query(quadmql).Do()
|
|
_, err = odbci.client.Query(quadmql).Do()
|
|
@@ -114,6 +120,7 @@ func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
|
|
logger.Error(err)
|
|
logger.Error(err)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
+ logger.Info("relation immport " + foid + "->" + toid)
|
|
}
|
|
}
|
|
return
|
|
return
|
|
}
|
|
}
|
|
@@ -160,8 +167,11 @@ func object_id(classaliasname string, data map[string]any) (oid, suid string, er
|
|
databs, _ := json.MarshalIndent(data, "", " ")
|
|
databs, _ := json.MarshalIndent(data, "", " ")
|
|
return "", "", merrs.NewError("not found valid uniqueid in data ", merrs.SSMap{"data": string(databs)})
|
|
return "", "", merrs.NewError("not found valid uniqueid in data ", merrs.SSMap{"data": string(databs)})
|
|
}
|
|
}
|
|
|
|
+ return to_object_id(classaliasname, suid), suid, nil
|
|
|
|
+}
|
|
|
|
+func to_object_id(classaliasname string, suid string) string {
|
|
suid64 := base64.RawURLEncoding.EncodeToString([]byte(suid))
|
|
suid64 := base64.RawURLEncoding.EncodeToString([]byte(suid))
|
|
- return classaliasname + ":" + suid64, suid, nil
|
|
|
|
|
|
+ return classaliasname + ":" + suid64
|
|
}
|
|
}
|
|
|
|
|
|
// 插入数据
|
|
// 插入数据
|
|
@@ -177,12 +187,66 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
|
|
if e != nil {
|
|
if e != nil {
|
|
return e
|
|
return e
|
|
}
|
|
}
|
|
|
|
+ var contain, depend, topology map[string][]string
|
|
|
|
+ refer := data["_references"]
|
|
|
|
+ switch vv := refer.(type) {
|
|
|
|
+ case []interface{}:
|
|
|
|
+ for _, v := range vv {
|
|
|
|
+ switch vv := v.(type) {
|
|
|
|
+ case map[string]interface{}:
|
|
|
|
+ for k, v := range vv {
|
|
|
|
+ switch k {
|
|
|
|
+ case "_edgeType":
|
|
|
|
+ case "_toUniqueId":
|
|
|
|
+ suid := cast.ToString(v)
|
|
|
|
+ toclassname := "master"
|
|
|
|
+ switch classname {
|
|
|
|
+ case "level2":
|
|
|
|
+ toclassname = "level1"
|
|
|
|
+ case "level3":
|
|
|
|
+ toclassname = "level2"
|
|
|
|
+ case "level4":
|
|
|
|
+ toclassname = "level3"
|
|
|
|
+ case "level5":
|
|
|
|
+ toclassname = "level4"
|
|
|
|
+ case "level6":
|
|
|
|
+ toclassname = "level5"
|
|
|
|
+ case "level7":
|
|
|
|
+ toclassname = "level6"
|
|
|
|
+ case "level8":
|
|
|
|
+ toclassname = "level7"
|
|
|
|
+ }
|
|
|
|
+ toid := to_object_id(toclassname, suid)
|
|
|
|
+ m := map[string]string{"_direction": "out"}
|
|
|
|
+ mbs, _ := json.Marshal(m)
|
|
|
|
+ depend = map[string][]string{
|
|
|
|
+ "_all": {toid},
|
|
|
|
+ toid: {string(mbs)},
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
values := []any{}
|
|
values := []any{}
|
|
for _, fn := range cdi.fieldslist {
|
|
for _, fn := range cdi.fieldslist {
|
|
if fn == "id" {
|
|
if fn == "id" {
|
|
values = append(values, oid)
|
|
values = append(values, oid)
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
|
|
+ if fn == "contain" {
|
|
|
|
+ values = append(values, contain)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if fn == "depend" {
|
|
|
|
+ values = append(values, depend)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if fn == "topology" {
|
|
|
|
+ values = append(values, topology)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
fi := cdi.fieldinfos[fn]
|
|
fi := cdi.fieldinfos[fn]
|
|
if fi.datakey == "" {
|
|
if fi.datakey == "" {
|
|
td := map[string]any{}
|
|
td := map[string]any{}
|
|
@@ -221,7 +285,7 @@ func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (er
|
|
logger.Error(err)
|
|
logger.Error(err)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- push_object_id_into_cache(object_id_cache(suid), oid)
|
|
|
|
|
|
+ push_object_id_into_cache(object_id_cache(classname+":"+suid), oid)
|
|
}
|
|
}
|
|
atomic.AddInt64(&cdi.insertcount, 1)
|
|
atomic.AddInt64(&cdi.insertcount, 1)
|
|
cdi.mutex.Lock()
|
|
cdi.mutex.Lock()
|
|
@@ -268,18 +332,34 @@ func (odbci *ODBCImporter) reload() error {
|
|
if ci == nil {
|
|
if ci == nil {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
- for retry := 2; retry >= 0; retry-- {
|
|
|
|
- _, e := odbci.client.Query(`delete from "` + ci.classfullname + `" with version`).Do()
|
|
|
|
- _ = e
|
|
|
|
- _, e = odbci.client.Query(`drop class if exists "` + ci.classfullname + `"`).Do()
|
|
|
|
|
|
+ e := odbci.dropclass(ci.classfullname)
|
|
|
|
+ if e != nil {
|
|
|
|
+ return e
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (odbci *ODBCImporter) dropclass(classname string) error {
|
|
|
|
+ for retry := 2; retry >= 0; retry-- {
|
|
|
|
+ _, e := odbci.client.Query(`delete from "` + classname + `" with version`).Do()
|
|
|
|
+ _ = e
|
|
|
|
+ _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
|
|
|
|
+ if e != nil {
|
|
|
|
+ matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
|
|
|
|
+ if len(matchstr) >= 2 {
|
|
|
|
+ e = odbci.dropclass(matchstr[1])
|
|
if e != nil {
|
|
if e != nil {
|
|
- if retry > 0 {
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
return e
|
|
return e
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ if retry > 0 {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ return e
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ logger.Info("drop class " + classname)
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|