package importer import ( "encoding/json" "fmt" "sync" "sync/atomic" "time" "git.wecise.com/wecise/cgimport/graph" "git.wecise.com/wecise/cgimport/odbc" "git.wecise.com/wecise/odb-go/odb" "git.wecise.com/wecise/util/cast" "git.wecise.com/wecise/util/cmap" "github.com/dgryski/go-farm" "github.com/scylladb/go-set/strset" "github.com/wecisecode/util/merrs" ) type ODBCImporter struct { client odb.Client // localdb *sqlite.SQLDB } func NewODBCImporter() *ODBCImporter { odbci := &ODBCImporter{} if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 { odbci.client = odbc.ODBC() // var e error // odbci.localdb, e = sqlite.NewSQLDB(odbc.Keyspace, "localdb", false) // if e != nil { // panic(e) // } } return odbci } func (odbci *ODBCImporter) InitLocalDB(force bool) error { // return odbci.localdb.InitTable(&schema.TableDefine{ // TableName: "localcache", // Fields: schema.Fields{ // {Name: "key", Type: "TEXT"}, // {Name: "value", Type: "TEXT"}, // }, // Indexes: map[string][]string{ // "key": {"key"}, // }, // Ttl: 0, // }, force) return nil } // func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) { // ei, e := graph.ParseEdgeInfo(data) // if e != nil { // return e // } // if odbci.client != nil { // // foid := get_object_id_from_cache("level1", fromuid) // // toid := to_object_id("level1", touid) // // eabs, _ := json.Marshal(extraattr) // // quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs) // // _, err = odbci.client.Query(quadmql).Do() // // if err != nil { // // err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}}) // // logger.Error(err) // // return // // } // updatemql := "update " + "level1" + " set " + " contain=contain+?" + " where id='" + foid + "'" // _, err = odbci.client.Query(updatemql, map[string][]string{ // "_all": {toid}, // toid: {string(eabs)}, // }).Do() // if err != nil { // err = merrs.NewError(err, merrs.SSMaps{{"mql": updatemql}}) // return // } // logger.Info("relation immport " + foid + "->" + toid) // } // return // } // var cm_object_id_cache = cmap.New[string, chan string]() // func object_id_cache(classaliasname, suid string) chan string { // choid, _ := cm_object_id_cache.GetWithNew(classaliasname+":"+suid, // func() (chan string, error) { // ch := make(chan string, 2) // return ch, nil // }) // return choid // } // func get_object_id_from_cache(classaliasname, suid string) string { // choid := object_id_cache(classaliasname, suid) // oid := <-choid // push_object_id_into_cache(choid, oid) // return oid // } // func push_object_id_into_cache(choid chan string, oid string) { // choid <- oid // if len(choid) == 2 { // // 最多保留 1 个 // // chan cap = 2,第三个元素进不来 // // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素 // <-choid // } // } var masterlevel1mutex = make([]sync.Mutex, 256) var masterdatas = cmap.New[string, map[string]any]() var level1datas = cmap.New[string, map[string]any]() func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) error { hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex))) masterlevel1mutex[hidx].Lock() defer masterlevel1mutex[hidx].Unlock() switch classaliasname { case "master": level1data := level1datas.GetIFPresent(suid) if level1data == nil { // 先插入 master masterdatas.Set(suid, data) level1data = map[string]any{} for k, v := range data { if k == "id" { oid, _, e := graph.GetNodeId("level1", data) if e != nil { return e } v = oid } level1data[k] = v } } else { // 后插入 master level1datas.Remove(suid) // 用 master 补齐 level1 数据 // data 数据不能变,需要后续插入 master for k, v := range data { if _, has := level1data[k]; !has { level1data[k] = v } } } // 重新插入完整的 level1 e := odbci.insertData("level1", level1data) if e != nil { return e } case "level1": masterdata := masterdatas.GetIFPresent(suid) if masterdata == nil { // 先插入 level 1 level1datas.Set(suid, data) } else { // 后插入 level1 masterdatas.Remove(suid) // 用 level1 补齐 master 数据 for k, v := range data { masterdata[k] = v } // 完整 level1 数据 data = masterdata } // 插入 level1 数据 e := odbci.insertData("level1", data) if e != nil { return e } } return nil } // func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) error { // key := strings.ReplaceAll("suid", "'", "''") // hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex))) // masterlevel1mutex[hidx].Lock() // defer masterlevel1mutex[hidx].Unlock() // switch classaliasname { // case "master": // iter, e := odbci.localdb.Select(nil, "select * from localcache where key='"+strings.ReplaceAll("suid", "'", "''")+"'") // if e != nil { // return e // } // maps, e := iter.AllMaps() // if e != nil { // return e // } // var level1data = map[string]any{} // if len(maps) == 0 { // bs_data, e := msgpack.Encode(data) // if e != nil { // return e // } // _, e = odbci.localdb.Insert(map[string]any{"key": key, "value": string(bs_data)}, false) // if e != nil { // return e // } // level1data = map[string]any{} // for k, v := range data { // if k == "id" { // oid, _, e := graph.GetNodeId("level1", data) // if e != nil { // return e // } // v = oid // } // level1data[k] = v // } // } else { // // 后插入 master // s_level1data := maps[0]["value"].(string) // e = msgpack.Decode([]byte(s_level1data), &level1data) // if e != nil { // return e // } // _, e = odbci.localdb.Delete(map[string]any{"key": key}) // if e != nil { // return e // } // // 用 master 补齐 level1 数据 // // data 数据不能变,需要后续插入 master // for k, v := range data { // if _, has := level1data[k]; !has { // level1data[k] = v // } // } // } // // 重新插入完整的 level1 // e = odbci.insertData("level1", level1data) // if e != nil { // return e // } // case "level1": // iter, e := odbci.localdb.Select(nil, "select * from localcache where key='"+strings.ReplaceAll("suid", "'", "''")+"'") // if e != nil { // return e // } // maps, e := iter.AllMaps() // if e != nil { // return e // } // var masterdata = map[string]any{} // if len(maps) == 0 { // // 先插入 level 1 // bs_data, e := msgpack.Encode(data) // if e != nil { // return e // } // _, e = odbci.localdb.Insert(map[string]any{"key": key, "value": string(bs_data)}, false) // if e != nil { // return e // } // } else { // // 后插入 level1 // s_masterdata := maps[0]["value"].(string) // e = msgpack.Decode([]byte(s_masterdata), &masterdata) // if e != nil { // return e // } // _, e = odbci.localdb.Delete(map[string]any{"key": key}) // if e != nil { // return e // } // // 用 level1 补齐 master 数据 // for k, v := range data { // masterdata[k] = v // } // // 完整 level1 数据 // data = masterdata // } // // 插入 level1 数据 // e = odbci.insertData("level1", data) // if e != nil { // return e // } // } // return nil // } // 插入数据 func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (err error) { oid, suid, e := graph.GetNodeId(classaliasname, data) if e != nil { return e } data["id"] = oid if classaliasname == "master" { e := odbci.masterlevel1data(classaliasname, suid, data) if e != nil { return e } } else if classaliasname == "level1" { ei := graph.GetEdgeInfo(oid) if ei != nil { data["contain"] = ei["contain"] data["depend"] = ei["depend"] data["topology"] = ei["topology"] } e := odbci.masterlevel1data(classaliasname, suid, data) if e != nil { return e } // 数据已经在 masterlevel1data 中插入完成 return } else { data["depend"] = referencedata(classaliasname, data) } return odbci.insertData(classaliasname, data) } type InnerData struct { oid string suid string contain map[string][]string depend map[string][]string topology map[string][]string } func referencedata(classaliasname string, data map[string]any) (depend map[string][]string) { refer := data["_references"] switch vv := refer.(type) { case []interface{}: for _, v := range vv { switch vv := v.(type) { case map[string]interface{}: for k, v := range vv { switch k { case "_edgeType": case "_toUniqueId": suid := cast.ToString(v) toclassname := "master" switch classaliasname { case "level1": toclassname = "level1" case "level2": toclassname = "level1" case "level3": toclassname = "level2" case "level4": toclassname = "level3" case "level5": toclassname = "level4" case "level6": toclassname = "level5" case "level7": toclassname = "level6" case "level8": toclassname = "level7" } toid := graph.ToNodeId(toclassname, suid) m := map[string]string{"_direction": "out"} mbs, _ := json.Marshal(m) depend = map[string][]string{ "_all": {toid}, toid: {string(mbs)}, } } } } } } return } func (odbci *ODBCImporter) insertData(classaliasname string, data map[string]any) (err error) { cdi := classdatainfos.GetIFPresent(classaliasname) if cdi == nil { return merrs.NewError("class not defined " + classaliasname) } if cdi.Insertmql == "" { return merrs.NewError("class no fields to insert " + classaliasname) } values := []any{} for _, fn := range cdi.Fieldslist { fi := cdi.Fieldinfos[fn] if fi == nil { values = append(values, data[fn]) continue } // 合并扩展字段 if strset.New(fi.Datakey...).Has("*") { if fi.Fieldtype != "map" { return merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map") } td := map[string]any{} for k, v := range data { if cdi.DatakeyFieldinfos[k] == nil { td[k] = cast.ToString(v) } } values = append(values, td) continue } // 字段类型修正 var v any for _, dk := range fi.Datakey { v = data[dk] if v != nil { switch fi.Fieldtype { case "set": v = cast.ToStringSlice(v) case "timestamp": tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000") if e != nil { return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'")) } v = tv.Format("2006-01-02 15:04:05.000000") } break } } if fn == "tags" { v = append(cast.ToStringSlice(v), classaliasname) } values = append(values, v) } if odbci.client != nil { // logger.Info(values...) _, err = odbci.client.Query(cdi.Insertmql, values...).Do() if err != nil { databs, _ := json.MarshalIndent(data, "", " ") err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}}) logger.Error(err) return } } atomic.AddInt64(&cdi.insertcount, 1) cdi.mutex.Lock() if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return } func (odbci *ODBCImporter) done() { classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool { cdi.mutex.Lock() if cdi.lastlogicount != cdi.insertcount { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return true }) } func (odbci *ODBCImporter) alldone() { classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool { cdi.mutex.Lock() if cdi.insertcount != 0 { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return true }) }