123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- package importer
- import (
- "os"
- "path/filepath"
- "regexp"
- "strings"
- "sync"
- "time"
- "git.wecise.com/wecise/cgimport/schema"
- "github.com/wecisecode/util/cmap"
- "github.com/wecisecode/util/merrs"
- "github.com/wecisecode/util/mio"
- "github.com/wecisecode/util/spliter"
- )
- type classdatainfo struct {
- *schema.ClassInfo
- insertcount int64
- lastlogtime time.Time
- lastlogicount int64
- mutex sync.Mutex
- }
- var classdatainfos = cmap.NewSingle[string, *classdatainfo]()
- var recreatestatement = regexp.MustCompile(`(?is)create\s+(class|edge\s+type)\s+(?:if\s+not\s+exists\s+)?([\/@\.\w]+)`)
- var respace = regexp.MustCompile(`\s*`)
- // 根据数据修正类定义
- func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
- if odbci.client == nil {
- return
- }
- mqlfile := filepath.Join(filepath.Dir(os.Args[0]), "cgimport.mql")
- bs, e := mio.ReadFile(mqlfile)
- if e != nil {
- return e
- }
- schema_mqls := schema.MQLs
- if len(bs) > 0 {
- logger.Info("read schema mql from file", mqlfile)
- schema_mqls = string(bs)
- }
- mqls := spliter.MQLSplit(schema_mqls)
- for _, mql := range mqls {
- sstn := recreatestatement.FindStringSubmatch(mql)
- if len(sstn) > 2 {
- switch strings.ToLower(respace.ReplaceAllString(sstn[1], "")) {
- case "class":
- e := odbci.createclass(sstn[2], mql)
- if e != nil {
- return e
- }
- case "edgetype":
- e := odbci.createedgetype(sstn[2], mql)
- if e != nil {
- return e
- }
- }
- }
- }
- return
- }
- func (odbci *ODBCImporter) createclass(classname, mql string) (err error) {
- logger.Debug("create class " + classname)
- _, e := odbci.client.Query(mql).Do()
- if e != nil {
- return e
- }
- cis, e := odbci.client.ClassInfo(classname, false)
- if e != nil {
- return e
- }
- if len(cis) != 1 {
- return merrs.New("len(cis) != 1")
- }
- oci := cis[0]
- ci := schema.ClassInfoHelper(oci)
- // add graph tags
- _, e = odbci.client.Query(ci.Addtagmql, ci.Classaliasname, ci.Classaliasname, []string{ci.Classaliasname}).Do()
- if e != nil {
- return e
- }
- cdi := &classdatainfo{ClassInfo: ci}
- classdatainfos.Set(ci.Classaliasname, cdi)
- classdatainfos.Set(ci.Classfullname, cdi)
- logger.Info("created class", ci.Classfullname)
- return
- }
- func (odbci *ODBCImporter) createedgetype(edgetypename, mql string) (err error) {
- _, e := odbci.client.Query(mql).Do()
- if e != nil {
- if strings.Contains(e.Error(), "already exist") {
- return nil
- }
- return e
- }
- logger.Info("created edge type", edgetypename)
- return nil
- }
- func (odbci *ODBCImporter) getClassinfos() (err error) {
- schema.ClassInfos.Clear()
- schema.ClassAliasNames = schema.ClassAliasNames[:0]
- classinfos, e := odbci.client.ClassInfo("/m3cnet", true)
- if e != nil {
- return e
- }
- for _, oci := range classinfos {
- ci := schema.ClassInfoHelper(oci)
- cdi := &classdatainfo{ClassInfo: ci}
- classdatainfos.Set(ci.Classaliasname, cdi)
- classdatainfos.Set(ci.Classfullname, cdi)
- }
- return nil
- }
- func (odbci *ODBCImporter) init(rebuild bool) (err error) {
- err = odbci.getClassinfos()
- if err != nil {
- return
- }
- // bs, _ := json.MarshalIndent(schema.ClassInfos, "", " ")
- // fmt.Println(string(bs))
- if rebuild {
- // 清除已有类
- err = odbci.rebuild()
- if err != nil {
- return
- }
- schema.ClassInfos.Clear()
- schema.ClassAliasNames = schema.ClassAliasNames[:0]
- }
- if schema.ClassInfos.Count() == 0 {
- // 建类
- err = odbci.ReviseClassStruct()
- if err != nil {
- return
- }
- }
- return nil
- }
- func (odbci *ODBCImporter) rebuild() error {
- if odbci.client != nil {
- for i := len(schema.ClassAliasNames) - 1; i >= 0; i-- {
- classaliasname := schema.ClassAliasNames[i]
- ci := schema.ClassInfos.GetIFPresent(classaliasname)
- if ci == nil {
- continue
- }
- e := odbci.dropclass(ci.Classfullname)
- if e != nil {
- return e
- }
- }
- }
- return nil
- }
- func (odbci *ODBCImporter) dropclass(classnames ...string) error {
- for _, classname := range classnames {
- for retry := 2; retry >= 0; retry-- {
- // 强制多次删除,避免drop失败导致后续错误
- _, e := odbci.client.Query(`delete from /matrix/tagdir where tags='` + classname + `'`).Do()
- _ = e
- _, e = odbci.client.Query(`delete from "` + classname + `" with version`).Do()
- _ = e
- _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
- if e != nil {
- matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
- if len(matchstr) >= 2 {
- e = odbci.dropclass(matchstr[1])
- if e != nil {
- return e
- }
- } else {
- matchstr := regexp.MustCompile(`has children \[([^\]]+)\]`).FindStringSubmatch(e.Error())
- if len(matchstr) >= 2 {
- e = odbci.dropclass(strings.Split(matchstr[1], " ")...)
- if e != nil {
- return e
- }
- }
- }
- if retry > 0 {
- continue
- }
- return e
- }
- }
- logger.Info("drop class " + classname)
- ci := schema.ClassInfos.GetIFPresent(classname)
- if ci == nil {
- continue
- }
- schema.ClassInfos.Remove(ci.Classaliasname)
- schema.ClassInfos.Remove(ci.Classfullname)
- }
- return nil
- }
|