123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- package importer
- import (
- "encoding/json"
- "fmt"
- "regexp"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "git.wecise.com/wecise/cgimport/graph"
- "git.wecise.com/wecise/cgimport/odbc"
- "git.wecise.com/wecise/odb-go/odb"
- "github.com/dgryski/go-farm"
- "github.com/scylladb/go-set/strset"
- "github.com/wecisecode/util/cast"
- "github.com/wecisecode/util/cmap"
- "github.com/wecisecode/util/merrs"
- "github.com/wecisecode/util/mfmt"
- )
- type ODBCImporter struct {
- client odb.Client
- }
- func NewODBCImporter() *ODBCImporter {
- odbci := &ODBCImporter{}
- if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
- odbci.client = odbc.ODBC()
- }
- return odbci
- }
- var masterlevel1mutex = make([]sync.Mutex, 256)
- var masterdatas = cmap.New[string, map[string]any]()
- var level1datas = cmap.New[string, map[string]any]()
- func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) (retrycount int, err error) {
- hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
- masterlevel1mutex[hidx].Lock()
- defer masterlevel1mutex[hidx].Unlock()
- switch classaliasname {
- case "master":
- level1data := level1datas.GetIFPresent(suid)
- if level1data == nil {
- // 先插入 master
- masterdatas.Set(suid, data)
- // 用 master 数据生成不完整的 level1 数据
- level1data = map[string]any{}
- for k, v := range data {
- if k == "id" {
- // master oid -> 重新生成 level1 oid
- oid, _, e := graph.GetNodeId("level1", data)
- if e != nil {
- return retrycount, e
- }
- v = oid
- }
- level1data[k] = v
- }
- } else {
- // 后插入 master
- level1datas.Remove(suid)
- // 用 level1 补齐 master 数据
- // data 数据不能变,需要后续插入 master
- entiredata := map[string]any{}
- for k, v := range data {
- entiredata[k] = v
- }
- for k, v := range level1data {
- entiredata[k] = v
- }
- level1data = entiredata
- }
- // 重新插入完整的 level1
- retrycount, _, e := odbci.insertData("level1", "", "", level1data)
- if e != nil {
- return retrycount, e
- }
- case "level1":
- masterdata := masterdatas.GetIFPresent(suid)
- if masterdata == nil {
- // 先插入 level 1
- level1datas.Set(suid, data)
- } else {
- // 后插入 level1
- masterdatas.Remove(suid)
- // 用 level1 补齐 master 数据
- entiredata := map[string]any{}
- for k, v := range masterdata {
- entiredata[k] = v
- }
- for k, v := range data {
- entiredata[k] = v
- }
- // 完整 level1 数据
- data = entiredata
- }
- // 插入 level1 数据
- retrycount, _, e := odbci.insertData("level1", "", "", data)
- if e != nil {
- return retrycount, e
- }
- }
- return retrycount, nil
- }
- // 插入数据
- func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (retrycount int, err error) {
- oid, suid, e := graph.GetNodeId(classaliasname, data)
- if e != nil {
- return 0, e
- }
- data["id"] = oid
- if classaliasname == "master" {
- rc, e := odbci.masterlevel1data(classaliasname, suid, data)
- retrycount += rc
- if e != nil {
- return retrycount, e
- }
- } else if classaliasname == "level1" {
- ei := graph.GetEdgeInfo(oid)
- if ei != nil {
- data["contain"] = ei["contain"]
- data["depend"] = ei["depend"]
- data["topology"] = ei["topology"]
- }
- rc, e := odbci.masterlevel1data(classaliasname, suid, data)
- retrycount += rc
- if e != nil {
- return retrycount, e
- }
- // 数据已经在 masterlevel1data 中插入完成
- return
- } else {
- data["depend"] = referencedata(classaliasname, data)
- }
- rc, _, e := odbci.insertData(classaliasname, oid, suid, data)
- retrycount += rc
- if e != nil {
- return retrycount, e
- }
- return retrycount, nil
- }
- type InnerData struct {
- oid string
- suid string
- contain map[string][]string
- depend map[string][]string
- topology map[string][]string
- }
- func referencedata(classaliasname string, data map[string]any) (depend map[string][]string) {
- refer := data["_references"]
- switch vv := refer.(type) {
- case []interface{}:
- for _, v := range vv {
- switch vv := v.(type) {
- case map[string]interface{}:
- for k, v := range vv {
- switch k {
- case "_edgeType":
- case "_toUniqueId":
- suid := cast.ToString(v)
- toclassname := "master"
- switch classaliasname {
- case "level1":
- toclassname = "level1"
- case "level2":
- toclassname = "level1"
- case "level3":
- toclassname = "level2"
- case "level4":
- toclassname = "level3"
- case "level5":
- toclassname = "level4"
- case "level6":
- toclassname = "level5"
- case "level7":
- toclassname = "level6"
- case "level8":
- toclassname = "level7"
- }
- toid := graph.ToNodeId(toclassname, suid)
- m := map[string]string{"_direction": "out"}
- mbs, _ := json.Marshal(m)
- depend = map[string][]string{
- "_all": {toid},
- toid: {string(mbs)},
- }
- }
- }
- }
- }
- }
- return
- }
- func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, data map[string]any) (retrycount int, responsetime time.Duration, err error) {
- cdi := classdatainfos.GetIFPresent(classaliasname)
- if cdi == nil {
- return retrycount, 0, merrs.NewError("class not defined " + classaliasname)
- }
- if cdi.Insertmql == "" {
- return retrycount, 0, merrs.NewError("class no fields to insert " + classaliasname)
- }
- values := []any{}
- for _, fn := range cdi.Fieldslist {
- fi := cdi.Fieldinfos[fn]
- if fi == nil {
- values = append(values, data[fn])
- continue
- }
- // 合并扩展字段
- if strset.New(fi.Datakey...).Has("*") {
- if fi.Fieldtype != "map<varchar,varchar>" {
- return retrycount, 0, merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
- }
- td := map[string]any{}
- for k, v := range data {
- if cdi.DatakeyFieldinfos[k] == nil {
- td[k] = cast.ToString(v)
- }
- }
- values = append(values, td)
- continue
- }
- // 字段类型修正
- var v any
- for _, dk := range fi.Datakey {
- v = data[dk]
- if v != nil {
- switch fi.Fieldtype {
- case "set<varchar>":
- v = cast.ToStringSlice(v)
- case "timestamp":
- tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
- if e != nil {
- return retrycount, 0, merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
- }
- v = tv.Format("2006-01-02 15:04:05.000000")
- }
- break
- }
- }
- if fn == "tags" {
- v = append(cast.ToStringSlice(v), classaliasname)
- }
- values = append(values, v)
- }
- if odbci.client != nil {
- if odbc.LogDebug && oid != "" {
- mql := "select id,uniqueid from " + classaliasname + " where id=?"
- r, e := odbci.client.Query(mql, oid).Do()
- if e != nil {
- return retrycount, 0, e
- }
- if r != nil && len(r.Data) != 0 {
- logger.Debug(classaliasname, "exists id:", oid, ", uniqueid:", r.Data[0]["uniqueid"], ", new uniqueid:", suid)
- }
- }
- // logger.Info(values...)
- retrycount, responsetime, err = odbci.insertDo(cdi.Insertmql, values...)
- if err != nil {
- databs, _ := json.MarshalIndent(data, "", " ")
- err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
- logger.Error(err)
- return
- }
- }
- atomic.AddInt64(&cdi.insertcount, 1)
- cdi.mutex.Lock()
- if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
- cdi.lastlogtime = time.Now()
- cdi.lastlogicount = cdi.insertcount
- logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
- }
- cdi.mutex.Unlock()
- return
- }
- type ODBCRetryConfig struct {
- retry int
- interval time.Duration
- contains string
- }
- var renumber = regexp.MustCompile(`^\s*(-?[0-9]+)\s*$`)
- var reduration = regexp.MustCompile(`^\s*(-?[0-9]+)[smhd]\s*$`)
- var odbcretry = ""
- var odbcretryconfig []*ODBCRetryConfig
- func init() {
- mcfg.OnChange(func() {
- _odbcretry := mcfg.GetStrings("odbc.retry",
- "-1, 1s, timed out",
- "-1, 1s, proc timeout",
- "-1, 1m, no response received from cassandra")
- if strings.Join(_odbcretry, "|") != odbcretry {
- odbcretryconfig = RetryConfig(_odbcretry...)
- odbcretry = strings.Join(_odbcretry, "|")
- }
- })
- }
- func RetryConfig(retryconfig ...string) (orcs []*ODBCRetryConfig) {
- defaultorc := &ODBCRetryConfig{
- retry: 0,
- interval: time.Second,
- contains: "",
- }
- for _, retrycfg := range retryconfig {
- sss := strings.SplitN(retrycfg, ",", 3)
- if len(sss) == 3 && renumber.MatchString(sss[0]) && reduration.MatchString(sss[1]) {
- retry := cast.ToInt(strings.TrimSpace(sss[0]))
- interval := mfmt.ParseDuration(strings.TrimSpace(sss[1]))
- contains := strings.TrimSpace(sss[2])
- orc := &ODBCRetryConfig{
- retry: retry,
- interval: interval,
- contains: contains,
- }
- if orc.contains == "" {
- defaultorc = orc
- } else {
- orcs = append(orcs, orc)
- }
- } else {
- panic("odbc.retry config format error")
- }
- }
- orcs = append(orcs, defaultorc)
- return
- }
- func (odbci *ODBCImporter) insertDo(insertmql string, values ...any) (trycount int, responsetime time.Duration, err error) {
- for {
- st := time.Now()
- _, e := odbci.client.Query(insertmql, values...).Do()
- if e != nil {
- maxtrycount := 0
- for _, orc := range odbcretryconfig {
- if orc.contains != "" {
- if strings.Contains(e.Error(), orc.contains) {
- maxtrycount = orc.retry
- break
- }
- } else {
- maxtrycount = orc.retry
- break
- }
- }
- trycount++
- e = merrs.New(e, merrs.Map{"trycount": trycount})
- if maxtrycount < 0 || trycount <= maxtrycount {
- logger.Debug(merrs.New(e, merrs.Map{"retrycount": trycount}))
- time.Sleep(time.Duration(trycount) * time.Second)
- continue
- }
- return trycount, 0, e
- }
- responsetime = time.Since(st)
- return trycount, responsetime, nil
- }
- }
- func (odbci *ODBCImporter) done() {
- classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
- cdi.mutex.Lock()
- if cdi.lastlogicount != cdi.insertcount {
- cdi.lastlogtime = time.Now()
- cdi.lastlogicount = cdi.insertcount
- logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
- }
- cdi.mutex.Unlock()
- return true
- })
- }
- func (odbci *ODBCImporter) alldone() {
- classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
- cdi.mutex.Lock()
- if cdi.insertcount != 0 {
- cdi.lastlogtime = time.Now()
- cdi.lastlogicount = cdi.insertcount
- logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
- }
- cdi.mutex.Unlock()
- return true
- })
- }
|