package importer import ( "encoding/json" "fmt" "regexp" "strings" "sync" "sync/atomic" "time" "git.wecise.com/wecise/cgimport/graph" "git.wecise.com/wecise/cgimport/odbc" "git.wecise.com/wecise/cgimport/schema" "git.wecise.com/wecise/odb-go/odb" "git.wecise.com/wecise/util/cast" "git.wecise.com/wecise/util/cmap" "git.wecise.com/wecise/util/merrs" "github.com/dgryski/go-farm" "github.com/scylladb/go-set/strset" ) type classdatainfo struct { *schema.ClassInfo insertcount int64 lastlogtime time.Time lastlogicount int64 mutex sync.Mutex } var classdatainfos = cmap.NewSingle[string, *classdatainfo]() type ODBCImporter struct { client odb.Client } func NewODBCImporter() *ODBCImporter { odbci := &ODBCImporter{} if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 { odbci.client = odbc.ODBC() } return odbci } // 根据数据修正类定义 func (odbci *ODBCImporter) ReviseClassStruct() (err error) { for _, classname := range schema.ClassNames { ci := schema.ClassInfos.GetIFPresent(classname) if ci == nil { return merrs.NewError("classinfo not found " + classname) } cdi, e := classdatainfos.GetWithNew(ci.Classaliasname, func() (cdi *classdatainfo, err error) { if odbci.client != nil { _, e := odbci.client.Query("select class,id from " + ci.Classfullname + " limit 1").Do() if e != nil { if !strings.Contains(e.Error(), "not find") { return nil, e } logger.Info("create class " + ci.Classfullname) _, e = odbci.client.Query(ci.Createmql).Do() if e != nil { return nil, e } } } cdi = &classdatainfo{ClassInfo: ci} return }) if e != nil { return e } classdatainfos.Set(ci.Classfullname, cdi) } if odbci.client != nil { for _, createedgemql := range schema.CreateEdgeMqls { _, e := odbci.client.Query(createedgemql).Do() if e != nil && !strings.Contains(e.Error(), "already exist") { return e } } } return } // func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) { // ei, e := graph.ParseEdgeInfo(data) // if e != nil { // return e // } // if odbci.client != nil { // // foid := get_object_id_from_cache("level1", fromuid) // // toid := to_object_id("level1", touid) // // eabs, _ := json.Marshal(extraattr) // // quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs) // // _, err = odbci.client.Query(quadmql).Do() // // if err != nil { // // err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}}) // // logger.Error(err) // // return // // } // updatemql := "update " + "level1" + " set " + " contain=contain+?" + " where id='" + foid + "'" // _, err = odbci.client.Query(updatemql, map[string][]string{ // "_all": {toid}, // toid: {string(eabs)}, // }).Do() // if err != nil { // err = merrs.NewError(err, merrs.SSMaps{{"mql": updatemql}}) // return // } // logger.Info("relation immport " + foid + "->" + toid) // } // return // } // var cm_object_id_cache = cmap.New[string, chan string]() // func object_id_cache(classaliasname, suid string) chan string { // choid, _ := cm_object_id_cache.GetWithNew(classaliasname+":"+suid, // func() (chan string, error) { // ch := make(chan string, 2) // return ch, nil // }) // return choid // } // func get_object_id_from_cache(classaliasname, suid string) string { // choid := object_id_cache(classaliasname, suid) // oid := <-choid // push_object_id_into_cache(choid, oid) // return oid // } // func push_object_id_into_cache(choid chan string, oid string) { // choid <- oid // if len(choid) == 2 { // // 最多保留 1 个 // // chan cap = 2,第三个元素进不来 // // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素 // <-choid // } // } var masterlevel1mutex = make([]sync.Mutex, 256) var masterdatas = cmap.New[string, map[string]any]() var level1datas = cmap.New[string, map[string]any]() func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) error { hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex))) masterlevel1mutex[hidx].Lock() defer masterlevel1mutex[hidx].Unlock() switch classaliasname { case "master": level1data := level1datas.GetIFPresent(suid) if level1data == nil { // 先插入 master masterdatas.Set(suid, data) } else { // 后插入 master level1datas.Remove(suid) // 用 master 补齐 level1 数据 // data 数据不能变,需要后续插入 master for k, v := range data { if _, has := level1data[k]; !has { level1data[k] = v } } // 重新插入完整的 level1 e := odbci.insertData("level1", level1data) if e != nil { return e } } case "level1": masterdata := masterdatas.GetIFPresent(suid) if masterdata == nil { // 先插入 level 1 level1datas.Set(suid, data) } else { // 后插入 level1 masterdatas.Remove(suid) // 用 level1 补齐 master 数据 for k, v := range data { masterdata[k] = v } // 完整 level1 数据 data = masterdata } // 插入 level1 数据 e := odbci.insertData("level1", data) if e != nil { return e } } return nil } // 插入数据 func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (err error) { oid, suid, e := graph.GetNodeId(classaliasname, data) if e != nil { return e } data["id"] = oid if classaliasname == "master" { e := odbci.masterlevel1data(classaliasname, suid, data) if e != nil { return e } } else if classaliasname == "level1" { ei := graph.GetEdgeInfo(oid) if ei != nil { data["contain"] = ei["contain"] data["depend"] = ei["depend"] data["topology"] = ei["topology"] } e := odbci.masterlevel1data(classaliasname, suid, data) if e != nil { return e } // 数据已经在 masterlevel1data 中插入完成 return } else { data["depend"] = referencedata(classaliasname, data) } return odbci.insertData(classaliasname, data) } type InnerData struct { oid string suid string contain map[string][]string depend map[string][]string topology map[string][]string } func referencedata(classaliasname string, data map[string]any) (depend map[string][]string) { refer := data["_references"] switch vv := refer.(type) { case []interface{}: for _, v := range vv { switch vv := v.(type) { case map[string]interface{}: for k, v := range vv { switch k { case "_edgeType": case "_toUniqueId": suid := cast.ToString(v) toclassname := "master" switch classaliasname { case "level1": toclassname = "level1" case "level2": toclassname = "level1" case "level3": toclassname = "level2" case "level4": toclassname = "level3" case "level5": toclassname = "level4" case "level6": toclassname = "level5" case "level7": toclassname = "level6" case "level8": toclassname = "level7" } toid := graph.ToNodeId(toclassname, suid) m := map[string]string{"_direction": "out"} mbs, _ := json.Marshal(m) depend = map[string][]string{ "_all": {toid}, toid: {string(mbs)}, } } } } } } return } func (odbci *ODBCImporter) insertData(classaliasname string, data map[string]any) (err error) { cdi := classdatainfos.GetIFPresent(classaliasname) if cdi == nil { return merrs.NewError("class not defined " + classaliasname) } if cdi.Insertmql == "" { return merrs.NewError("class no fields to insert " + classaliasname) } values := []any{} for _, fn := range cdi.Fieldslist { fi := cdi.Fieldinfos[fn] if fi == nil { values = append(values, data[fn]) continue } // 合并扩展字段 if strset.New(fi.Datakey...).Has("*") { if fi.Fieldtype != "map" { return merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map") } td := map[string]any{} for k, v := range data { if cdi.DatakeyFieldinfos[k] == nil { td[k] = cast.ToString(v) } } values = append(values, td) continue } // 字段类型修正 var v any for _, dk := range fi.Datakey { v = data[dk] if v != nil { switch fi.Fieldtype { case "set": v = cast.ToStringSlice(v) case "timestamp": tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000") if e != nil { return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'")) } v = tv.Format("2006-01-02 15:04:05.000000") } break } } if fn == "tags" { v = append(cast.ToStringSlice(v), classaliasname) } values = append(values, v) } if odbci.client != nil { // logger.Info(values...) _, err = odbci.client.Query(cdi.Insertmql, values...).Do() if err != nil { databs, _ := json.MarshalIndent(data, "", " ") err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}}) logger.Error(err) return } } atomic.AddInt64(&cdi.insertcount, 1) cdi.mutex.Lock() if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return } func (odbci *ODBCImporter) done() { classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool { cdi.mutex.Lock() if cdi.lastlogicount != cdi.insertcount { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return true }) } func (odbci *ODBCImporter) alldone() { classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool { cdi.mutex.Lock() if cdi.insertcount != 0 { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return true }) } func (odbci *ODBCImporter) reload() error { if odbci.client != nil { for i := len(schema.ClassNames) - 1; i >= 0; i-- { classname := schema.ClassNames[i] ci := schema.ClassInfos.GetIFPresent(classname) if ci == nil { continue } e := odbci.dropclass(ci.Classfullname) if e != nil { return e } } } return nil } func (odbci *ODBCImporter) dropclass(classnames ...string) error { for _, classname := range classnames { for retry := 2; retry >= 0; retry-- { _, e := odbci.client.Query(`delete from "` + classname + `" with version`).Do() _ = e _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do() if e != nil { matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error()) if len(matchstr) >= 2 { e = odbci.dropclass(matchstr[1]) if e != nil { return e } } else { matchstr := regexp.MustCompile(`has children \[([^\]]+)\]`).FindStringSubmatch(e.Error()) if len(matchstr) >= 2 { e = odbci.dropclass(strings.Split(matchstr[1], ",")...) if e != nil { return e } } } if retry > 0 { continue } return e } } logger.Info("drop class " + classname) } return nil }