package importer import ( "encoding/base64" "encoding/json" "fmt" "regexp" "strings" "sync" "sync/atomic" "time" "git.wecise.com/wecise/cgimport/odbc" "git.wecise.com/wecise/odb-go/odb" "git.wecise.com/wecise/util/cast" "git.wecise.com/wecise/util/cmap" "git.wecise.com/wecise/util/merrs" ) type classdatainfo struct { *classinfo insertcount int64 lastlogtime time.Time lastlogicount int64 mutex sync.Mutex } var classdatainfos = cmap.New[string, *classdatainfo]() type ODBCImporter struct { client odb.Client } func NewODBCImporter() *ODBCImporter { odbci := &ODBCImporter{} if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 { odbci.client = odbc.ODBC() } return odbci } // 根据数据修正类定义 func (odbci *ODBCImporter) ReviseClassStruct() (err error) { for _, classname := range classnames { ci := classinfos.GetIFPresent(classname) if ci == nil { return merrs.NewError("classinfo not found " + classname) } _, err = classdatainfos.GetWithNew(classname, func() (cdi *classdatainfo, err error) { logger.Info("create class " + ci.classname) if odbci.client != nil { _, err = odbci.client.Query(ci.createmql).Do() if err != nil { return } } cdi = &classdatainfo{classinfo: ci} return }) } if odbci.client != nil { for _, createedgemql := range createedgemqls { _, e := odbci.client.Query(createedgemql).Do() if e != nil && !strings.Contains(e.Error(), "already exist") { err = e return } } } return } func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) { extraattr := map[string]string{} fromuid := "" touid := "" edgetype := "" for k, v := range data { switch k { case "FROMUNIQUEID": fromuid = cast.ToString(v) case "TOUNIQUEID": touid = cast.ToString(v) case "EDGETYPE": edgetype = cast.ToString(v) default: extraattr[k] = cast.ToString(v) } } if fromuid == "" { databs, _ := json.MarshalIndent(data, "", " ") return merrs.NewError("not found valid fromuniqueid in data ", merrs.SSMap{"data": string(databs)}) } if touid == "" { databs, _ := json.MarshalIndent(data, "", " ") return merrs.NewError("not found valid touniqueid in data ", merrs.SSMap{"data": string(databs)}) } if edgetype == "" { databs, _ := json.MarshalIndent(data, "", " ") return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)}) } return odbci.insertEdge(edgetype, fromuid, touid, extraattr, data) } func (odbci *ODBCImporter) insertEdge(edgetype, fromuid, touid string, extraattr map[string]string, data map[string]any) (err error) { edgetype = relations[edgetype] if edgetype == "" { databs, _ := json.MarshalIndent(data, "", " ") return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)}) } if odbci.client != nil { foid := get_object_id_from_cache("master:" + fromuid) toid := to_object_id("level1", touid) eabs, _ := json.Marshal(extraattr) quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs) _, err = odbci.client.Query(quadmql).Do() if err != nil { err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}}) logger.Error(err) return } logger.Info("relation immport " + foid + "->" + toid) } return } var cm_object_id_cache = cmap.New[string, chan string]() func object_id_cache(suid string) chan string { choid, _ := cm_object_id_cache.GetWithNew(suid, func() (chan string, error) { ch := make(chan string, 2) return ch, nil }) return choid } func get_object_id_from_cache(suid string) string { choid := object_id_cache(suid) oid := <-choid push_object_id_into_cache(choid, oid) return oid } func push_object_id_into_cache(choid chan string, oid string) { choid <- oid if len(choid) == 2 { // 最多保留 1 个 // chan cap = 2,第三个元素进不来 // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素 <-choid } } func object_id(classaliasname string, data map[string]any) (oid, suid string, err error) { uid := data["uniqueId"] if uid == nil { uid = data["UNIQUEID"] if uid == nil { databs, _ := json.MarshalIndent(data, "", " ") return "", "", merrs.NewError("not found uniqueid in data ", merrs.SSMap{"data": string(databs)}) } } suid = cast.ToString(uid) if suid == "" { databs, _ := json.MarshalIndent(data, "", " ") return "", "", merrs.NewError("not found valid uniqueid in data ", merrs.SSMap{"data": string(databs)}) } return to_object_id(classaliasname, suid), suid, nil } func to_object_id(classaliasname string, suid string) string { suid64 := base64.RawURLEncoding.EncodeToString([]byte(suid)) return classaliasname + ":" + suid64 } // 插入数据 func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) { cdi := classdatainfos.GetIFPresent(classname) if cdi == nil { return merrs.NewError("class not defined " + classname) } if cdi.insertmql == "" { return merrs.NewError("class no fields to insert " + classname) } oid, suid, e := object_id(cdi.aliasname, data) if e != nil { return e } var contain, depend, topology map[string][]string refer := data["_references"] switch vv := refer.(type) { case []interface{}: for _, v := range vv { switch vv := v.(type) { case map[string]interface{}: for k, v := range vv { switch k { case "_edgeType": case "_toUniqueId": suid := cast.ToString(v) toclassname := "master" switch classname { case "level2": toclassname = "level1" case "level3": toclassname = "level2" case "level4": toclassname = "level3" case "level5": toclassname = "level4" case "level6": toclassname = "level5" case "level7": toclassname = "level6" case "level8": toclassname = "level7" } toid := to_object_id(toclassname, suid) m := map[string]string{"_direction": "out"} mbs, _ := json.Marshal(m) depend = map[string][]string{ "_all": {toid}, toid: {string(mbs)}, } } } } } } values := []any{} for _, fn := range cdi.fieldslist { if fn == "id" { values = append(values, oid) continue } if fn == "contain" { values = append(values, contain) continue } if fn == "depend" { values = append(values, depend) continue } if fn == "topology" { values = append(values, topology) continue } fi := cdi.fieldinfos[fn] if fi.datakey == "" { td := map[string]any{} for k, v := range data { if cdi.datakey_fieldinfos[k] == nil { td[k] = v } } tdbs, e := json.Marshal(td) if e != nil { return merrs.NewError(e) } values = append(values, string(tdbs)) continue } v := data[fi.datakey] switch fi.fieldtype { case "set": v = cast.ToStringSlice(v) case "timestamp": tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000") if e != nil { return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'")) } v = tv.Format("2006-01-02 15:04:05.000000") } if fn == "tags" { v = append(cast.ToStringSlice(v), classname) } values = append(values, v) } if odbci.client != nil { _, err = odbci.client.Query(cdi.insertmql, values...).Do() if err != nil { err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.insertmql}, {"values": fmt.Sprint(values)}}) logger.Error(err) return } push_object_id_into_cache(object_id_cache(classname+":"+suid), oid) } atomic.AddInt64(&cdi.insertcount, 1) cdi.mutex.Lock() if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Info("class", cdi.classname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return } func (odbci *ODBCImporter) done() { classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool { cdi.mutex.Lock() if cdi.lastlogicount != cdi.insertcount { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Info("class", cdi.classname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return true }) } func (odbci *ODBCImporter) alldone() { classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool { cdi.mutex.Lock() if cdi.insertcount != 0 { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Info("class", cdi.classname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return true }) } func (odbci *ODBCImporter) reload() error { if odbci.client != nil { for i := len(classnames) - 1; i >= 0; i-- { classname := classnames[i] ci := classinfos.GetIFPresent(classname) if ci == nil { continue } e := odbci.dropclass(ci.classfullname) if e != nil { return e } } } return nil } func (odbci *ODBCImporter) dropclass(classname string) error { for retry := 2; retry >= 0; retry-- { _, e := odbci.client.Query(`delete from "` + classname + `" with version`).Do() _ = e _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do() if e != nil { matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error()) if len(matchstr) >= 2 { e = odbci.dropclass(matchstr[1]) if e != nil { return e } } if retry > 0 { continue } return e } } logger.Info("drop class " + classname) return nil }