package importer import ( "encoding/json" "fmt" "regexp" "strings" "sync" "sync/atomic" "time" "git.wecise.com/wecise/cgimport/graph" "git.wecise.com/wecise/cgimport/odbc" "git.wecise.com/wecise/odb-go/odb" "github.com/dgryski/go-farm" "github.com/scylladb/go-set/strset" "github.com/wecisecode/util/cast" "github.com/wecisecode/util/cmap" "github.com/wecisecode/util/merrs" "github.com/wecisecode/util/mfmt" ) type ODBCImporter struct { client odb.Client } func NewODBCImporter() *ODBCImporter { odbci := &ODBCImporter{} if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 { odbci.client = odbc.ODBC() } return odbci } var masterlevel1mutex = make([]sync.Mutex, 256) var masterdatas = cmap.New[string, map[string]any]() var level1datas = cmap.New[string, map[string]any]() func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) (retrycount int, err error) { hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex))) masterlevel1mutex[hidx].Lock() defer masterlevel1mutex[hidx].Unlock() switch classaliasname { case "master": level1data := level1datas.GetIFPresent(suid) if level1data == nil { // 先插入 master masterdatas.Set(suid, data) // 用 master 数据生成不完整的 level1 数据 level1data = map[string]any{} for k, v := range data { if k == "id" { // master oid -> 重新生成 level1 oid oid, _, e := graph.GetNodeId("level1", data) if e != nil { return retrycount, e } v = oid } level1data[k] = v } } else { // 后插入 master level1datas.Remove(suid) // 用 level1 补齐 master 数据 // data 数据不能变,需要后续插入 master entiredata := map[string]any{} for k, v := range data { entiredata[k] = v } for k, v := range level1data { entiredata[k] = v } level1data = entiredata } // 重新插入完整的 level1 retrycount, _, e := odbci.insertData("level1", "", "", level1data) if e != nil { return retrycount, e } case "level1": masterdata := masterdatas.GetIFPresent(suid) if masterdata == nil { // 先插入 level 1 level1datas.Set(suid, data) } else { // 后插入 level1 masterdatas.Remove(suid) // 用 level1 补齐 master 数据 entiredata := map[string]any{} for k, v := range masterdata { entiredata[k] = v } for k, v := range data { entiredata[k] = v } // 完整 level1 数据 data = entiredata } // 插入 level1 数据 retrycount, _, e := odbci.insertData("level1", "", "", data) if e != nil { return retrycount, e } } return retrycount, nil } // 插入数据 func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (retrycount int, err error) { oid, suid, e := graph.GetNodeId(classaliasname, data) if e != nil { return 0, e } data["id"] = oid if classaliasname == "master" { rc, e := odbci.masterlevel1data(classaliasname, suid, data) retrycount += rc if e != nil { return retrycount, e } } else if classaliasname == "level1" { ei := graph.GetEdgeInfo(oid) if ei != nil { data["contain"] = ei["contain"] data["depend"] = ei["depend"] data["topology"] = ei["topology"] } rc, e := odbci.masterlevel1data(classaliasname, suid, data) retrycount += rc if e != nil { return retrycount, e } // 数据已经在 masterlevel1data 中插入完成 return } else { data["depend"] = referencedata(classaliasname, data) } rc, _, e := odbci.insertData(classaliasname, oid, suid, data) retrycount += rc if e != nil { return retrycount, e } return retrycount, nil } type InnerData struct { oid string suid string contain map[string][]string depend map[string][]string topology map[string][]string } func referencedata(classaliasname string, data map[string]any) (depend map[string][]string) { refer := data["_references"] switch vv := refer.(type) { case []interface{}: for _, v := range vv { switch vv := v.(type) { case map[string]interface{}: for k, v := range vv { switch k { case "_edgeType": case "_toUniqueId": suid := cast.ToString(v) toclassname := "master" switch classaliasname { case "level1": toclassname = "level1" case "level2": toclassname = "level1" case "level3": toclassname = "level2" case "level4": toclassname = "level3" case "level5": toclassname = "level4" case "level6": toclassname = "level5" case "level7": toclassname = "level6" case "level8": toclassname = "level7" } toid := graph.ToNodeId(toclassname, suid) m := map[string]string{"_direction": "out"} mbs, _ := json.Marshal(m) depend = map[string][]string{ "_all": {toid}, toid: {string(mbs)}, } } } } } } return } func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, data map[string]any) (retrycount int, responsetime time.Duration, err error) { cdi := classdatainfos.GetIFPresent(classaliasname) if cdi == nil { return retrycount, 0, merrs.NewError("class not defined " + classaliasname) } if cdi.Insertmql == "" { return retrycount, 0, merrs.NewError("class no fields to insert " + classaliasname) } values := []any{} for _, fn := range cdi.Fieldslist { fi := cdi.Fieldinfos[fn] if fi == nil { values = append(values, data[fn]) continue } // 合并扩展字段 if strset.New(fi.Datakey...).Has("*") { if fi.Fieldtype != "map" { return retrycount, 0, merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map") } td := map[string]any{} for k, v := range data { if cdi.DatakeyFieldinfos[k] == nil { td[k] = cast.ToString(v) } } values = append(values, td) continue } // 字段类型修正 var v any for _, dk := range fi.Datakey { v = data[dk] if v != nil { switch fi.Fieldtype { case "set": v = cast.ToStringSlice(v) case "timestamp": tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000") if e != nil { return retrycount, 0, merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'")) } v = tv.Format("2006-01-02 15:04:05.000000") } break } } if fn == "tags" { v = append(cast.ToStringSlice(v), classaliasname) } values = append(values, v) } if odbci.client != nil { if odbc.LogDebug && oid != "" { mql := "select id,uniqueid from " + classaliasname + " where id=?" r, e := odbci.client.Query(mql, oid).Do() if e != nil { return retrycount, 0, e } if r != nil && len(r.Data) != 0 { logger.Debug(classaliasname, "exists id:", oid, ", uniqueid:", r.Data[0]["uniqueid"], ", new uniqueid:", suid) } } // logger.Info(values...) retrycount, responsetime, err = odbci.insertDo(cdi.Insertmql, values...) if err != nil { databs, _ := json.MarshalIndent(data, "", " ") err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}}) logger.Error(err) return } } atomic.AddInt64(&cdi.insertcount, 1) cdi.mutex.Lock() if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return } type ODBCRetryConfig struct { retry int interval time.Duration contains string } var renumber = regexp.MustCompile(`^\s*(-?[0-9]+)\s*$`) var reduration = regexp.MustCompile(`^\s*(-?[0-9]+)[smhd]\s*$`) var odbcretry = "" var odbcretryconfig []*ODBCRetryConfig func init() { mcfg.OnChange(func() { _odbcretry := mcfg.GetStrings("odbc.retry", "-1, 1s, timed out", "-1, 1s, proc timeout") if strings.Join(_odbcretry, "|") != odbcretry { odbcretryconfig = RetryConfig(_odbcretry...) odbcretry = strings.Join(_odbcretry, "|") } }) } func RetryConfig(retryconfig ...string) (orcs []*ODBCRetryConfig) { defaultorc := &ODBCRetryConfig{ retry: 0, interval: time.Second, contains: "", } for _, retrycfg := range retryconfig { sss := strings.SplitN(retrycfg, ",", 3) if len(sss) == 3 && renumber.MatchString(sss[0]) && reduration.MatchString(sss[1]) { retry := cast.ToInt(strings.TrimSpace(sss[0])) interval := mfmt.ParseDuration(strings.TrimSpace(sss[1])) contains := strings.TrimSpace(sss[2]) orc := &ODBCRetryConfig{ retry: retry, interval: interval, contains: contains, } if orc.contains == "" { defaultorc = orc } else { orcs = append(orcs, orc) } } else { panic("odbc.retry config format error") } } orcs = append(orcs, defaultorc) return } func (odbci *ODBCImporter) insertDo(insertmql string, values ...any) (trycount int, responsetime time.Duration, err error) { for { st := time.Now() _, e := odbci.client.Query(insertmql, values...).Do() if e != nil { maxtrycount := 0 for _, orc := range odbcretryconfig { if orc.contains != "" { if strings.Contains(e.Error(), orc.contains) { maxtrycount = orc.retry break } } else { maxtrycount = orc.retry break } } trycount++ e = merrs.New(e, merrs.Map{"trycount": trycount}) if maxtrycount < 0 || trycount <= maxtrycount { logger.Debug(merrs.New(e, merrs.Map{"retrycount": trycount})) time.Sleep(time.Duration(trycount) * time.Second) continue } return trycount, 0, e } responsetime = time.Since(st) return trycount, responsetime, nil } } func (odbci *ODBCImporter) done() { classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool { cdi.mutex.Lock() if cdi.lastlogicount != cdi.insertcount { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return true }) } func (odbci *ODBCImporter) alldone() { classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool { cdi.mutex.Lock() if cdi.insertcount != 0 { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return true }) }