123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- package importer
- import (
- "encoding/json"
- "fmt"
- "regexp"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "git.wecise.com/wecise/cgimport/graph"
- "git.wecise.com/wecise/cgimport/odbc"
- "git.wecise.com/wecise/cgimport/schema"
- "git.wecise.com/wecise/odb-go/odb"
- "git.wecise.com/wecise/util/cast"
- "git.wecise.com/wecise/util/cmap"
- "git.wecise.com/wecise/util/merrs"
- "github.com/scylladb/go-set/strset"
- )
- type classdatainfo struct {
- *schema.ClassInfo
- insertcount int64
- lastlogtime time.Time
- lastlogicount int64
- mutex sync.Mutex
- }
- var classdatainfos = cmap.NewSingle[string, *classdatainfo]()
- type ODBCImporter struct {
- client odb.Client
- }
- func NewODBCImporter() *ODBCImporter {
- odbci := &ODBCImporter{}
- if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
- odbci.client = odbc.ODBC()
- }
- return odbci
- }
- // 根据数据修正类定义
- func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
- for _, classname := range schema.ClassNames {
- ci := schema.ClassInfos.GetIFPresent(classname)
- if ci == nil {
- return merrs.NewError("classinfo not found " + classname)
- }
- cdi, e := classdatainfos.GetWithNew(ci.Classaliasname, func() (cdi *classdatainfo, err error) {
- if odbci.client != nil {
- _, e := odbci.client.Query("select class,id from " + ci.Classfullname + " limit 1").Do()
- if e != nil {
- if !strings.Contains(e.Error(), "not find") {
- return nil, e
- }
- logger.Info("create class " + ci.Classfullname)
- _, e = odbci.client.Query(ci.Createmql).Do()
- if e != nil {
- return nil, e
- }
- }
- }
- cdi = &classdatainfo{ClassInfo: ci}
- return
- })
- if e != nil {
- return e
- }
- classdatainfos.Set(ci.Classfullname, cdi)
- }
- if odbci.client != nil {
- for _, createedgemql := range schema.CreateEdgeMqls {
- _, e := odbci.client.Query(createedgemql).Do()
- if e != nil && !strings.Contains(e.Error(), "already exist") {
- return e
- }
- }
- }
- return
- }
- // func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
- // ei, e := graph.ParseEdgeInfo(data)
- // if e != nil {
- // return e
- // }
- // if odbci.client != nil {
- // // foid := get_object_id_from_cache("level1", fromuid)
- // // toid := to_object_id("level1", touid)
- // // eabs, _ := json.Marshal(extraattr)
- // // quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
- // // _, err = odbci.client.Query(quadmql).Do()
- // // if err != nil {
- // // err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}})
- // // logger.Error(err)
- // // return
- // // }
- // updatemql := "update " + "level1" + " set " + " contain=contain+?" + " where id='" + foid + "'"
- // _, err = odbci.client.Query(updatemql, map[string][]string{
- // "_all": {toid},
- // toid: {string(eabs)},
- // }).Do()
- // if err != nil {
- // err = merrs.NewError(err, merrs.SSMaps{{"mql": updatemql}})
- // return
- // }
- // logger.Info("relation immport " + foid + "->" + toid)
- // }
- // return
- // }
- var cm_object_id_cache = cmap.New[string, chan string]()
- func object_id_cache(classaliasname, suid string) chan string {
- choid, _ := cm_object_id_cache.GetWithNew(classaliasname+":"+suid,
- func() (chan string, error) {
- ch := make(chan string, 2)
- return ch, nil
- })
- return choid
- }
- func get_object_id_from_cache(classaliasname, suid string) string {
- choid := object_id_cache(classaliasname, suid)
- oid := <-choid
- push_object_id_into_cache(choid, oid)
- return oid
- }
- func push_object_id_into_cache(choid chan string, oid string) {
- choid <- oid
- if len(choid) == 2 {
- // 最多保留 1 个
- // chan cap = 2,第三个元素进不来
- // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
- <-choid
- }
- }
- // 插入数据
- func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
- cdi := classdatainfos.GetIFPresent(classname)
- if cdi == nil {
- return merrs.NewError("class not defined " + classname)
- }
- if cdi.Insertmql == "" {
- return merrs.NewError("class no fields to insert " + classname)
- }
- innerdata := &InnerData{}
- innerdata.oid, innerdata.suid, err = graph.GetNodeId(cdi.Classaliasname, data)
- if err != nil {
- return
- }
- if cdi.Classaliasname == "level1" {
- ei := graph.GetEdgeInfo(innerdata.oid)
- if ei != nil {
- innerdata.contain = ei["contain"]
- innerdata.depend = ei["depend"]
- innerdata.topology = ei["topology"]
- }
- } else {
- innerdata.depend = referencedata(classname, data)
- }
- return odbci.insertData(classname, cdi, innerdata, data)
- }
- type InnerData struct {
- oid string
- suid string
- contain map[string][]string
- depend map[string][]string
- topology map[string][]string
- }
- func referencedata(classname string, data map[string]any) (depend map[string][]string) {
- refer := data["_references"]
- switch vv := refer.(type) {
- case []interface{}:
- for _, v := range vv {
- switch vv := v.(type) {
- case map[string]interface{}:
- for k, v := range vv {
- switch k {
- case "_edgeType":
- case "_toUniqueId":
- suid := cast.ToString(v)
- toclassname := "master"
- switch classname {
- case "level1":
- toclassname = "level1"
- case "level2":
- toclassname = "level1"
- case "level3":
- toclassname = "level2"
- case "level4":
- toclassname = "level3"
- case "level5":
- toclassname = "level4"
- case "level6":
- toclassname = "level5"
- case "level7":
- toclassname = "level6"
- case "level8":
- toclassname = "level7"
- }
- toid := graph.ToNodeId(toclassname, suid)
- m := map[string]string{"_direction": "out"}
- mbs, _ := json.Marshal(m)
- depend = map[string][]string{
- "_all": {toid},
- toid: {string(mbs)},
- }
- }
- }
- }
- }
- }
- return
- }
- func (odbci *ODBCImporter) insertData(classname string, cdi *classdatainfo, innerdata *InnerData, data map[string]any) (err error) {
- values := []any{}
- for _, fn := range cdi.Fieldslist {
- // 内部字段
- switch fn {
- case "id":
- values = append(values, innerdata.oid)
- continue
- case "contain":
- values = append(values, innerdata.contain)
- continue
- case "depend":
- values = append(values, innerdata.depend)
- continue
- case "topology":
- values = append(values, innerdata.topology)
- continue
- }
- fi := cdi.Fieldinfos[fn]
- // 合并扩展字段
- if strset.New(fi.Datakey...).Has("*") {
- td := map[string]any{}
- for k, v := range data {
- if cdi.DatakeyFieldinfos[k] == nil {
- td[k] = v
- }
- }
- tdbs, e := json.Marshal(td)
- if e != nil {
- return merrs.NewError(e)
- }
- values = append(values, string(tdbs))
- continue
- }
- // 字段类型修正
- var v any
- for _, dk := range fi.Datakey {
- v = data[dk]
- if v != nil {
- switch fi.Fieldtype {
- case "set<varchar>":
- v = cast.ToStringSlice(v)
- case "timestamp":
- tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
- if e != nil {
- return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
- }
- v = tv.Format("2006-01-02 15:04:05.000000")
- }
- break
- }
- }
- if fn == "tags" {
- v = append(cast.ToStringSlice(v), classname)
- }
- values = append(values, v)
- }
- if odbci.client != nil {
- _, err = odbci.client.Query(cdi.Insertmql, values...).Do()
- if err != nil {
- databs, _ := json.MarshalIndent(data, "", " ")
- err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
- logger.Error(err)
- return
- }
- push_object_id_into_cache(object_id_cache(cdi.Classaliasname, innerdata.suid), innerdata.oid)
- }
- atomic.AddInt64(&cdi.insertcount, 1)
- cdi.mutex.Lock()
- if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
- cdi.lastlogtime = time.Now()
- cdi.lastlogicount = cdi.insertcount
- logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
- }
- cdi.mutex.Unlock()
- return
- }
- func (odbci *ODBCImporter) done() {
- classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
- cdi.mutex.Lock()
- if cdi.lastlogicount != cdi.insertcount {
- cdi.lastlogtime = time.Now()
- cdi.lastlogicount = cdi.insertcount
- logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
- }
- cdi.mutex.Unlock()
- return true
- })
- }
- func (odbci *ODBCImporter) alldone() {
- classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
- cdi.mutex.Lock()
- if cdi.insertcount != 0 {
- cdi.lastlogtime = time.Now()
- cdi.lastlogicount = cdi.insertcount
- logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
- }
- cdi.mutex.Unlock()
- return true
- })
- }
- func (odbci *ODBCImporter) reload() error {
- if odbci.client != nil {
- for i := len(schema.ClassNames) - 1; i >= 0; i-- {
- classname := schema.ClassNames[i]
- ci := schema.ClassInfos.GetIFPresent(classname)
- if ci == nil {
- continue
- }
- e := odbci.dropclass(ci.Classfullname)
- if e != nil {
- return e
- }
- }
- }
- return nil
- }
- func (odbci *ODBCImporter) dropclass(classnames ...string) error {
- for _, classname := range classnames {
- for retry := 2; retry >= 0; retry-- {
- _, e := odbci.client.Query(`delete from "` + classname + `" with version`).Do()
- _ = e
- _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
- if e != nil {
- matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
- if len(matchstr) >= 2 {
- e = odbci.dropclass(matchstr[1])
- if e != nil {
- return e
- }
- } else {
- matchstr := regexp.MustCompile(`has children \[([^\]]+)\]`).FindStringSubmatch(e.Error())
- if len(matchstr) >= 2 {
- e = odbci.dropclass(strings.Split(matchstr[1], ",")...)
- if e != nil {
- return e
- }
- }
- }
- if retry > 0 {
- continue
- }
- return e
- }
- }
- logger.Info("drop class " + classname)
- }
- return nil
- }
|