odbcimporter.go 12 KB


  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.wecise.com/wecise/cgimport/graph"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/cgimport/schema"
  13. "git.wecise.com/wecise/odb-go/odb"
  14. "git.wecise.com/wecise/util/cast"
  15. "git.wecise.com/wecise/util/cmap"
  16. "git.wecise.com/wecise/util/merrs"
  17. "github.com/dgryski/go-farm"
  18. "github.com/scylladb/go-set/strset"
  19. )
  20. type classdatainfo struct {
  21. *schema.ClassInfo
  22. insertcount int64
  23. lastlogtime time.Time
  24. lastlogicount int64
  25. mutex sync.Mutex
  26. }
  27. var classdatainfos = cmap.NewSingle[string, *classdatainfo]()
  28. type ODBCImporter struct {
  29. client odb.Client
  30. }
  31. func NewODBCImporter() *ODBCImporter {
  32. odbci := &ODBCImporter{}
  33. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  34. odbci.client = odbc.ODBC()
  35. }
  36. return odbci
  37. }
  38. // 根据数据修正类定义
  39. func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
  40. for _, classname := range schema.ClassNames {
  41. ci := schema.ClassInfos.GetIFPresent(classname)
  42. if ci == nil {
  43. return merrs.NewError("classinfo not found " + classname)
  44. }
  45. cdi, e := classdatainfos.GetWithNew(ci.Classaliasname, func() (cdi *classdatainfo, err error) {
  46. if odbci.client != nil {
  47. _, e := odbci.client.Query("select class,id from " + ci.Classfullname + " limit 1").Do()
  48. if e != nil {
  49. if !strings.Contains(e.Error(), "not find") {
  50. return nil, e
  51. }
  52. logger.Info("create class " + ci.Classfullname)
  53. _, e = odbci.client.Query(ci.Createmql).Do()
  54. if e != nil {
  55. return nil, e
  56. }
  57. // add graph tags
  58. _, e = odbci.client.Query(ci.Addtagmql, ci.Classaliasname, ci.Classaliasname, []string{ci.Classaliasname}).Do()
  59. if e != nil {
  60. return nil, e
  61. }
  62. }
  63. }
  64. cdi = &classdatainfo{ClassInfo: ci}
  65. return
  66. })
  67. if e != nil {
  68. return e
  69. }
  70. classdatainfos.Set(ci.Classfullname, cdi)
  71. }
  72. if odbci.client != nil {
  73. for _, createedgemql := range schema.CreateEdgeMqls {
  74. _, e := odbci.client.Query(createedgemql).Do()
  75. if e != nil && !strings.Contains(e.Error(), "already exist") {
  76. return e
  77. }
  78. }
  79. }
  80. return
  81. }
  82. // func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
  83. // ei, e := graph.ParseEdgeInfo(data)
  84. // if e != nil {
  85. // return e
  86. // }
  87. // if odbci.client != nil {
  88. // // foid := get_object_id_from_cache("level1", fromuid)
  89. // // toid := to_object_id("level1", touid)
  90. // // eabs, _ := json.Marshal(extraattr)
  91. // // quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
  92. // // _, err = odbci.client.Query(quadmql).Do()
  93. // // if err != nil {
  94. // // err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}})
  95. // // logger.Error(err)
  96. // // return
  97. // // }
  98. // updatemql := "update " + "level1" + " set " + " contain=contain+?" + " where id='" + foid + "'"
  99. // _, err = odbci.client.Query(updatemql, map[string][]string{
  100. // "_all": {toid},
  101. // toid: {string(eabs)},
  102. // }).Do()
  103. // if err != nil {
  104. // err = merrs.NewError(err, merrs.SSMaps{{"mql": updatemql}})
  105. // return
  106. // }
  107. // logger.Info("relation immport " + foid + "->" + toid)
  108. // }
  109. // return
  110. // }
  111. // var cm_object_id_cache = cmap.New[string, chan string]()
  112. // func object_id_cache(classaliasname, suid string) chan string {
  113. // choid, _ := cm_object_id_cache.GetWithNew(classaliasname+":"+suid,
  114. // func() (chan string, error) {
  115. // ch := make(chan string, 2)
  116. // return ch, nil
  117. // })
  118. // return choid
  119. // }
  120. // func get_object_id_from_cache(classaliasname, suid string) string {
  121. // choid := object_id_cache(classaliasname, suid)
  122. // oid := <-choid
  123. // push_object_id_into_cache(choid, oid)
  124. // return oid
  125. // }
  126. // func push_object_id_into_cache(choid chan string, oid string) {
  127. // choid <- oid
  128. // if len(choid) == 2 {
  129. // // 最多保留 1 个
  130. // // chan cap = 2,第三个元素进不来
  131. // // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
  132. // <-choid
  133. // }
  134. // }
  135. var masterlevel1mutex = make([]sync.Mutex, 256)
  136. var masterdatas = cmap.New[string, map[string]any]()
  137. var level1datas = cmap.New[string, map[string]any]()
  138. func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) error {
  139. hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
  140. masterlevel1mutex[hidx].Lock()
  141. defer masterlevel1mutex[hidx].Unlock()
  142. switch classaliasname {
  143. case "master":
  144. level1data := level1datas.GetIFPresent(suid)
  145. if level1data == nil {
  146. // 先插入 master
  147. masterdatas.Set(suid, data)
  148. level1data = map[string]any{}
  149. for k, v := range data {
  150. if k == "id" {
  151. oid, _, e := graph.GetNodeId("level1", data)
  152. if e != nil {
  153. return e
  154. }
  155. v = oid
  156. }
  157. level1data[k] = v
  158. }
  159. } else {
  160. // 后插入 master
  161. level1datas.Remove(suid)
  162. // 用 master 补齐 level1 数据
  163. // data 数据不能变,需要后续插入 master
  164. for k, v := range data {
  165. if _, has := level1data[k]; !has {
  166. level1data[k] = v
  167. }
  168. }
  169. }
  170. // 重新插入完整的 level1
  171. e := odbci.insertData("level1", level1data)
  172. if e != nil {
  173. return e
  174. }
  175. case "level1":
  176. masterdata := masterdatas.GetIFPresent(suid)
  177. if masterdata == nil {
  178. // 先插入 level 1
  179. level1datas.Set(suid, data)
  180. } else {
  181. // 后插入 level1
  182. masterdatas.Remove(suid)
  183. // 用 level1 补齐 master 数据
  184. for k, v := range data {
  185. masterdata[k] = v
  186. }
  187. // 完整 level1 数据
  188. data = masterdata
  189. }
  190. // 插入 level1 数据
  191. e := odbci.insertData("level1", data)
  192. if e != nil {
  193. return e
  194. }
  195. }
  196. return nil
  197. }
  198. // 插入数据
  199. func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (err error) {
  200. oid, suid, e := graph.GetNodeId(classaliasname, data)
  201. if e != nil {
  202. return e
  203. }
  204. data["id"] = oid
  205. if classaliasname == "master" {
  206. e := odbci.masterlevel1data(classaliasname, suid, data)
  207. if e != nil {
  208. return e
  209. }
  210. } else if classaliasname == "level1" {
  211. ei := graph.GetEdgeInfo(oid)
  212. if ei != nil {
  213. data["contain"] = ei["contain"]
  214. data["depend"] = ei["depend"]
  215. data["topology"] = ei["topology"]
  216. }
  217. e := odbci.masterlevel1data(classaliasname, suid, data)
  218. if e != nil {
  219. return e
  220. }
  221. // 数据已经在 masterlevel1data 中插入完成
  222. return
  223. } else {
  224. data["depend"] = referencedata(classaliasname, data)
  225. }
  226. return odbci.insertData(classaliasname, data)
  227. }
  228. type InnerData struct {
  229. oid string
  230. suid string
  231. contain map[string][]string
  232. depend map[string][]string
  233. topology map[string][]string
  234. }
  235. func referencedata(classaliasname string, data map[string]any) (depend map[string][]string) {
  236. refer := data["_references"]
  237. switch vv := refer.(type) {
  238. case []interface{}:
  239. for _, v := range vv {
  240. switch vv := v.(type) {
  241. case map[string]interface{}:
  242. for k, v := range vv {
  243. switch k {
  244. case "_edgeType":
  245. case "_toUniqueId":
  246. suid := cast.ToString(v)
  247. toclassname := "master"
  248. switch classaliasname {
  249. case "level1":
  250. toclassname = "level1"
  251. case "level2":
  252. toclassname = "level1"
  253. case "level3":
  254. toclassname = "level2"
  255. case "level4":
  256. toclassname = "level3"
  257. case "level5":
  258. toclassname = "level4"
  259. case "level6":
  260. toclassname = "level5"
  261. case "level7":
  262. toclassname = "level6"
  263. case "level8":
  264. toclassname = "level7"
  265. }
  266. toid := graph.ToNodeId(toclassname, suid)
  267. m := map[string]string{"_direction": "out"}
  268. mbs, _ := json.Marshal(m)
  269. depend = map[string][]string{
  270. "_all": {toid},
  271. toid: {string(mbs)},
  272. }
  273. }
  274. }
  275. }
  276. }
  277. }
  278. return
  279. }
  280. func (odbci *ODBCImporter) insertData(classaliasname string, data map[string]any) (err error) {
  281. cdi := classdatainfos.GetIFPresent(classaliasname)
  282. if cdi == nil {
  283. return merrs.NewError("class not defined " + classaliasname)
  284. }
  285. if cdi.Insertmql == "" {
  286. return merrs.NewError("class no fields to insert " + classaliasname)
  287. }
  288. values := []any{}
  289. for _, fn := range cdi.Fieldslist {
  290. fi := cdi.Fieldinfos[fn]
  291. if fi == nil {
  292. values = append(values, data[fn])
  293. continue
  294. }
  295. // 合并扩展字段
  296. if strset.New(fi.Datakey...).Has("*") {
  297. if fi.Fieldtype != "map<varchar,varchar>" {
  298. return merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
  299. }
  300. td := map[string]any{}
  301. for k, v := range data {
  302. if cdi.DatakeyFieldinfos[k] == nil {
  303. td[k] = cast.ToString(v)
  304. }
  305. }
  306. values = append(values, td)
  307. continue
  308. }
  309. // 字段类型修正
  310. var v any
  311. for _, dk := range fi.Datakey {
  312. v = data[dk]
  313. if v != nil {
  314. switch fi.Fieldtype {
  315. case "set<varchar>":
  316. v = cast.ToStringSlice(v)
  317. case "timestamp":
  318. tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
  319. if e != nil {
  320. return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  321. }
  322. v = tv.Format("2006-01-02 15:04:05.000000")
  323. }
  324. break
  325. }
  326. }
  327. if fn == "tags" {
  328. v = append(cast.ToStringSlice(v), classaliasname)
  329. }
  330. values = append(values, v)
  331. }
  332. if odbci.client != nil {
  333. // logger.Info(values...)
  334. _, err = odbci.client.Query(cdi.Insertmql, values...).Do()
  335. if err != nil {
  336. databs, _ := json.MarshalIndent(data, "", " ")
  337. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
  338. logger.Error(err)
  339. return
  340. }
  341. }
  342. atomic.AddInt64(&cdi.insertcount, 1)
  343. cdi.mutex.Lock()
  344. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  345. cdi.lastlogtime = time.Now()
  346. cdi.lastlogicount = cdi.insertcount
  347. logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  348. }
  349. cdi.mutex.Unlock()
  350. return
  351. }
  352. func (odbci *ODBCImporter) done() {
  353. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  354. cdi.mutex.Lock()
  355. if cdi.lastlogicount != cdi.insertcount {
  356. cdi.lastlogtime = time.Now()
  357. cdi.lastlogicount = cdi.insertcount
  358. logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  359. }
  360. cdi.mutex.Unlock()
  361. return true
  362. })
  363. }
  364. func (odbci *ODBCImporter) alldone() {
  365. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  366. cdi.mutex.Lock()
  367. if cdi.insertcount != 0 {
  368. cdi.lastlogtime = time.Now()
  369. cdi.lastlogicount = cdi.insertcount
  370. logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  371. }
  372. cdi.mutex.Unlock()
  373. return true
  374. })
  375. }
  376. func (odbci *ODBCImporter) reload() error {
  377. if odbci.client != nil {
  378. for i := len(schema.ClassNames) - 1; i >= 0; i-- {
  379. classname := schema.ClassNames[i]
  380. ci := schema.ClassInfos.GetIFPresent(classname)
  381. if ci == nil {
  382. continue
  383. }
  384. e := odbci.dropclass(ci.Classfullname)
  385. if e != nil {
  386. return e
  387. }
  388. }
  389. }
  390. return nil
  391. }
  392. func (odbci *ODBCImporter) dropclass(classnames ...string) error {
  393. for _, classname := range classnames {
  394. for retry := 2; retry >= 0; retry-- {
  395. _, e := odbci.client.Query(`delete from /matrix/tagdir where tags='` + classname + `'`).Do()
  396. _ = e
  397. _, e = odbci.client.Query(`delete from "` + classname + `" with version`).Do()
  398. _ = e
  399. _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
  400. if e != nil {
  401. matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
  402. if len(matchstr) >= 2 {
  403. e = odbci.dropclass(matchstr[1])
  404. if e != nil {
  405. return e
  406. }
  407. } else {
  408. matchstr := regexp.MustCompile(`has children \[([^\]]+)\]`).FindStringSubmatch(e.Error())
  409. if len(matchstr) >= 2 {
  410. e = odbci.dropclass(strings.Split(matchstr[1], ",")...)
  411. if e != nil {
  412. return e
  413. }
  414. }
  415. }
  416. if retry > 0 {
  417. continue
  418. }
  419. return e
  420. }
  421. }
  422. logger.Info("drop class " + classname)
  423. }
  424. return nil
  425. }