datainfo.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package importer
  2. import (
  3. "os"
  4. "path/filepath"
  5. "regexp"
  6. "strings"
  7. "sync"
  8. "time"
  9. "git.wecise.com/wecise/cgimport/schema"
  10. "github.com/wecisecode/util/cmap"
  11. "github.com/wecisecode/util/merrs"
  12. "github.com/wecisecode/util/mio"
  13. "github.com/wecisecode/util/spliter"
  14. )
  15. type classdatainfo struct {
  16. *schema.ClassInfo
  17. insertcount int64
  18. lastlogtime time.Time
  19. lastlogicount int64
  20. mutex sync.Mutex
  21. }
  22. var classdatainfos = cmap.NewSingle[string, *classdatainfo]()
  23. var recreatestatement = regexp.MustCompile(`(?is)create\s+(class|edge\s+type)\s+(?:if\s+not\s+exists\s+)?([\/@\.\w]+)`)
  24. var respace = regexp.MustCompile(`\s*`)
  25. // 根据数据修正类定义
  26. func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
  27. if odbci.client == nil {
  28. return
  29. }
  30. mqlfile := filepath.Join(filepath.Dir(os.Args[0]), "cgimport.mql")
  31. bs, e := mio.ReadFile(mqlfile)
  32. if e != nil {
  33. return e
  34. }
  35. schema_mqls := schema.MQLs
  36. if len(bs) > 0 {
  37. logger.Info("read schema mql from file", mqlfile)
  38. schema_mqls = string(bs)
  39. }
  40. mqls := spliter.MQLSplit(schema_mqls)
  41. for _, mql := range mqls {
  42. sstn := recreatestatement.FindStringSubmatch(mql)
  43. if len(sstn) > 2 {
  44. switch strings.ToLower(respace.ReplaceAllString(sstn[1], "")) {
  45. case "class":
  46. e := odbci.createclass(sstn[2], mql)
  47. if e != nil {
  48. return e
  49. }
  50. case "edgetype":
  51. e := odbci.createedgetype(sstn[2], mql)
  52. if e != nil {
  53. return e
  54. }
  55. }
  56. }
  57. }
  58. return
  59. }
  60. func (odbci *ODBCImporter) createclass(classname, mql string) (err error) {
  61. logger.Debug("create class " + classname)
  62. _, e := odbci.client.Query(mql).Do()
  63. if e != nil {
  64. return e
  65. }
  66. cis, e := odbci.client.ClassInfo(classname, false)
  67. if e != nil {
  68. return e
  69. }
  70. if len(cis) != 1 {
  71. return merrs.New("len(cis) != 1")
  72. }
  73. oci := cis[0]
  74. fis := []*schema.FieldInfo{}
  75. for _, fi := range oci.Fieldinfos {
  76. fis = append(fis, &schema.FieldInfo{
  77. Fieldname: fi.Fieldname,
  78. Fieldtype: fi.Fieldtype,
  79. Keyidx: fi.Keyidx,
  80. Datakey: fi.NameMapping("dispname"),
  81. })
  82. }
  83. ci := schema.NewClassinfo(oci.Aliasname, oci.Shortname, oci.Basealias, fis, oci.WithOptions...)
  84. // add graph tags
  85. _, e = odbci.client.Query(ci.Addtagmql, ci.Classaliasname, ci.Classaliasname, []string{ci.Classaliasname}).Do()
  86. if e != nil {
  87. return e
  88. }
  89. cdi := &classdatainfo{ClassInfo: ci}
  90. classdatainfos.Set(ci.Classaliasname, cdi)
  91. classdatainfos.Set(ci.Classfullname, cdi)
  92. logger.Info("created class", ci.Classfullname)
  93. return
  94. }
  95. func (odbci *ODBCImporter) createedgetype(edgetypename, mql string) (err error) {
  96. _, e := odbci.client.Query(mql).Do()
  97. if e != nil {
  98. if strings.Contains(e.Error(), "already exist") {
  99. return nil
  100. }
  101. return e
  102. }
  103. logger.Info("created edge type", edgetypename)
  104. return nil
  105. }
  106. func (odbci *ODBCImporter) getClassinfos() (err error) {
  107. schema.ClassInfos.Clear()
  108. schema.ClassAliasNames = schema.ClassAliasNames[:0]
  109. classinfos, e := odbci.client.ClassInfo("/m3cnet", true)
  110. if e != nil {
  111. return e
  112. }
  113. for _, oci := range classinfos {
  114. fis := []*schema.FieldInfo{}
  115. for _, fi := range oci.Fieldinfos {
  116. fis = append(fis, &schema.FieldInfo{
  117. Fieldname: fi.Fieldname,
  118. Fieldtype: fi.Fieldtype,
  119. Keyidx: fi.Keyidx,
  120. Datakey: fi.NameMapping("dispname"),
  121. })
  122. }
  123. ci := schema.NewClassinfo(oci.Aliasname, oci.Shortname, oci.Basealias, fis, oci.WithOptions...)
  124. cdi := &classdatainfo{ClassInfo: ci}
  125. classdatainfos.Set(ci.Classaliasname, cdi)
  126. classdatainfos.Set(ci.Classfullname, cdi)
  127. }
  128. return nil
  129. }
  130. func (odbci *ODBCImporter) init(rebuild bool) (err error) {
  131. err = odbci.getClassinfos()
  132. if err != nil {
  133. return
  134. }
  135. // bs, _ := json.MarshalIndent(schema.ClassInfos, "", " ")
  136. // fmt.Println(string(bs))
  137. if rebuild {
  138. // 清除已有类
  139. err = odbci.rebuild()
  140. if err != nil {
  141. return
  142. }
  143. schema.ClassInfos.Clear()
  144. schema.ClassAliasNames = schema.ClassAliasNames[:0]
  145. }
  146. if schema.ClassInfos.Count() == 0 {
  147. // 建类
  148. err = odbci.ReviseClassStruct()
  149. if err != nil {
  150. return
  151. }
  152. }
  153. return nil
  154. }
  155. func (odbci *ODBCImporter) rebuild() error {
  156. if odbci.client != nil {
  157. for i := len(schema.ClassAliasNames) - 1; i >= 0; i-- {
  158. classaliasname := schema.ClassAliasNames[i]
  159. ci := schema.ClassInfos.GetIFPresent(classaliasname)
  160. if ci == nil {
  161. continue
  162. }
  163. e := odbci.dropclass(ci.Classfullname)
  164. if e != nil {
  165. return e
  166. }
  167. }
  168. }
  169. return nil
  170. }
  171. func (odbci *ODBCImporter) dropclass(classnames ...string) error {
  172. for _, classname := range classnames {
  173. for retry := 2; retry >= 0; retry-- {
  174. // 强制多次删除,避免drop失败导致后续错误
  175. _, e := odbci.client.Query(`delete from /matrix/tagdir where tags='` + classname + `'`).Do()
  176. _ = e
  177. _, e = odbci.client.Query(`delete from "` + classname + `" with version`).Do()
  178. _ = e
  179. _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
  180. if e != nil {
  181. matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
  182. if len(matchstr) >= 2 {
  183. e = odbci.dropclass(matchstr[1])
  184. if e != nil {
  185. return e
  186. }
  187. } else {
  188. matchstr := regexp.MustCompile(`has children \[([^\]]+)\]`).FindStringSubmatch(e.Error())
  189. if len(matchstr) >= 2 {
  190. e = odbci.dropclass(strings.Split(matchstr[1], " ")...)
  191. if e != nil {
  192. return e
  193. }
  194. }
  195. }
  196. if retry > 0 {
  197. continue
  198. }
  199. return e
  200. }
  201. }
  202. logger.Info("drop class " + classname)
  203. ci := schema.ClassInfos.GetIFPresent(classname)
  204. if ci == nil {
  205. continue
  206. }
  207. schema.ClassInfos.Remove(ci.Classaliasname)
  208. schema.ClassInfos.Remove(ci.Classfullname)
  209. }
  210. return nil
  211. }