odbcimporter.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.wecise.com/wecise/cgimport/graph"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/cgimport/schema"
  13. "git.wecise.com/wecise/odb-go/odb"
  14. "git.wecise.com/wecise/util/cast"
  15. "git.wecise.com/wecise/util/cmap"
  16. "git.wecise.com/wecise/util/merrs"
  17. "github.com/dgryski/go-farm"
  18. "github.com/scylladb/go-set/strset"
  19. )
  20. type classdatainfo struct {
  21. *schema.ClassInfo
  22. insertcount int64
  23. lastlogtime time.Time
  24. lastlogicount int64
  25. mutex sync.Mutex
  26. }
  27. var classdatainfos = cmap.NewSingle[string, *classdatainfo]()
  28. type ODBCImporter struct {
  29. client odb.Client
  30. }
  31. func NewODBCImporter() *ODBCImporter {
  32. odbci := &ODBCImporter{}
  33. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  34. odbci.client = odbc.ODBC()
  35. }
  36. return odbci
  37. }
  38. // 根据数据修正类定义
  39. func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
  40. for _, classname := range schema.ClassNames {
  41. ci := schema.ClassInfos.GetIFPresent(classname)
  42. if ci == nil {
  43. return merrs.NewError("classinfo not found " + classname)
  44. }
  45. cdi, e := classdatainfos.GetWithNew(ci.Classaliasname, func() (cdi *classdatainfo, err error) {
  46. if odbci.client != nil {
  47. _, e := odbci.client.Query("select class,id from " + ci.Classfullname + " limit 1").Do()
  48. if e != nil {
  49. if !strings.Contains(e.Error(), "not find") {
  50. return nil, e
  51. }
  52. logger.Info("create class " + ci.Classfullname)
  53. _, e = odbci.client.Query(ci.Createmql).Do()
  54. if e != nil {
  55. return nil, e
  56. }
  57. }
  58. }
  59. cdi = &classdatainfo{ClassInfo: ci}
  60. return
  61. })
  62. if e != nil {
  63. return e
  64. }
  65. classdatainfos.Set(ci.Classfullname, cdi)
  66. }
  67. if odbci.client != nil {
  68. for _, createedgemql := range schema.CreateEdgeMqls {
  69. _, e := odbci.client.Query(createedgemql).Do()
  70. if e != nil && !strings.Contains(e.Error(), "already exist") {
  71. return e
  72. }
  73. }
  74. }
  75. return
  76. }
  77. // func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
  78. // ei, e := graph.ParseEdgeInfo(data)
  79. // if e != nil {
  80. // return e
  81. // }
  82. // if odbci.client != nil {
  83. // // foid := get_object_id_from_cache("level1", fromuid)
  84. // // toid := to_object_id("level1", touid)
  85. // // eabs, _ := json.Marshal(extraattr)
  86. // // quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
  87. // // _, err = odbci.client.Query(quadmql).Do()
  88. // // if err != nil {
  89. // // err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}})
  90. // // logger.Error(err)
  91. // // return
  92. // // }
  93. // updatemql := "update " + "level1" + " set " + " contain=contain+?" + " where id='" + foid + "'"
  94. // _, err = odbci.client.Query(updatemql, map[string][]string{
  95. // "_all": {toid},
  96. // toid: {string(eabs)},
  97. // }).Do()
  98. // if err != nil {
  99. // err = merrs.NewError(err, merrs.SSMaps{{"mql": updatemql}})
  100. // return
  101. // }
  102. // logger.Info("relation immport " + foid + "->" + toid)
  103. // }
  104. // return
  105. // }
  106. // var cm_object_id_cache = cmap.New[string, chan string]()
  107. // func object_id_cache(classaliasname, suid string) chan string {
  108. // choid, _ := cm_object_id_cache.GetWithNew(classaliasname+":"+suid,
  109. // func() (chan string, error) {
  110. // ch := make(chan string, 2)
  111. // return ch, nil
  112. // })
  113. // return choid
  114. // }
  115. // func get_object_id_from_cache(classaliasname, suid string) string {
  116. // choid := object_id_cache(classaliasname, suid)
  117. // oid := <-choid
  118. // push_object_id_into_cache(choid, oid)
  119. // return oid
  120. // }
  121. // func push_object_id_into_cache(choid chan string, oid string) {
  122. // choid <- oid
  123. // if len(choid) == 2 {
  124. // // 最多保留 1 个
  125. // // chan cap = 2,第三个元素进不来
  126. // // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
  127. // <-choid
  128. // }
  129. // }
  130. var masterlevel1mutex = make([]sync.Mutex, 256)
  131. var masterdatas = cmap.New[string, map[string]any]()
  132. var level1datas = cmap.New[string, map[string]any]()
  133. func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) error {
  134. hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
  135. masterlevel1mutex[hidx].Lock()
  136. defer masterlevel1mutex[hidx].Unlock()
  137. switch classaliasname {
  138. case "master":
  139. level1data := level1datas.GetIFPresent(suid)
  140. if level1data == nil {
  141. // 先插入 master
  142. masterdatas.Set(suid, data)
  143. level1data = map[string]any{}
  144. for k, v := range data {
  145. if k == "id" {
  146. oid, _, e := graph.GetNodeId("level1", data)
  147. if e != nil {
  148. return e
  149. }
  150. v = oid
  151. }
  152. level1data[k] = v
  153. }
  154. } else {
  155. // 后插入 master
  156. level1datas.Remove(suid)
  157. // 用 master 补齐 level1 数据
  158. // data 数据不能变,需要后续插入 master
  159. for k, v := range data {
  160. if _, has := level1data[k]; !has {
  161. level1data[k] = v
  162. }
  163. }
  164. }
  165. // 重新插入完整的 level1
  166. e := odbci.insertData("level1", level1data)
  167. if e != nil {
  168. return e
  169. }
  170. case "level1":
  171. masterdata := masterdatas.GetIFPresent(suid)
  172. if masterdata == nil {
  173. // 先插入 level 1
  174. level1datas.Set(suid, data)
  175. } else {
  176. // 后插入 level1
  177. masterdatas.Remove(suid)
  178. // 用 level1 补齐 master 数据
  179. for k, v := range data {
  180. masterdata[k] = v
  181. }
  182. // 完整 level1 数据
  183. data = masterdata
  184. }
  185. // 插入 level1 数据
  186. e := odbci.insertData("level1", data)
  187. if e != nil {
  188. return e
  189. }
  190. }
  191. return nil
  192. }
  193. // 插入数据
  194. func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (err error) {
  195. oid, suid, e := graph.GetNodeId(classaliasname, data)
  196. if e != nil {
  197. return e
  198. }
  199. data["id"] = oid
  200. if classaliasname == "master" {
  201. e := odbci.masterlevel1data(classaliasname, suid, data)
  202. if e != nil {
  203. return e
  204. }
  205. } else if classaliasname == "level1" {
  206. ei := graph.GetEdgeInfo(oid)
  207. if ei != nil {
  208. data["contain"] = ei["contain"]
  209. data["depend"] = ei["depend"]
  210. data["topology"] = ei["topology"]
  211. }
  212. e := odbci.masterlevel1data(classaliasname, suid, data)
  213. if e != nil {
  214. return e
  215. }
  216. // 数据已经在 masterlevel1data 中插入完成
  217. return
  218. } else {
  219. data["depend"] = referencedata(classaliasname, data)
  220. }
  221. return odbci.insertData(classaliasname, data)
  222. }
  223. type InnerData struct {
  224. oid string
  225. suid string
  226. contain map[string][]string
  227. depend map[string][]string
  228. topology map[string][]string
  229. }
  230. func referencedata(classaliasname string, data map[string]any) (depend map[string][]string) {
  231. refer := data["_references"]
  232. switch vv := refer.(type) {
  233. case []interface{}:
  234. for _, v := range vv {
  235. switch vv := v.(type) {
  236. case map[string]interface{}:
  237. for k, v := range vv {
  238. switch k {
  239. case "_edgeType":
  240. case "_toUniqueId":
  241. suid := cast.ToString(v)
  242. toclassname := "master"
  243. switch classaliasname {
  244. case "level1":
  245. toclassname = "level1"
  246. case "level2":
  247. toclassname = "level1"
  248. case "level3":
  249. toclassname = "level2"
  250. case "level4":
  251. toclassname = "level3"
  252. case "level5":
  253. toclassname = "level4"
  254. case "level6":
  255. toclassname = "level5"
  256. case "level7":
  257. toclassname = "level6"
  258. case "level8":
  259. toclassname = "level7"
  260. }
  261. toid := graph.ToNodeId(toclassname, suid)
  262. m := map[string]string{"_direction": "out"}
  263. mbs, _ := json.Marshal(m)
  264. depend = map[string][]string{
  265. "_all": {toid},
  266. toid: {string(mbs)},
  267. }
  268. }
  269. }
  270. }
  271. }
  272. }
  273. return
  274. }
  275. func (odbci *ODBCImporter) insertData(classaliasname string, data map[string]any) (err error) {
  276. cdi := classdatainfos.GetIFPresent(classaliasname)
  277. if cdi == nil {
  278. return merrs.NewError("class not defined " + classaliasname)
  279. }
  280. if cdi.Insertmql == "" {
  281. return merrs.NewError("class no fields to insert " + classaliasname)
  282. }
  283. values := []any{}
  284. for _, fn := range cdi.Fieldslist {
  285. fi := cdi.Fieldinfos[fn]
  286. if fi == nil {
  287. values = append(values, data[fn])
  288. continue
  289. }
  290. // 合并扩展字段
  291. if strset.New(fi.Datakey...).Has("*") {
  292. if fi.Fieldtype != "map<varchar,varchar>" {
  293. return merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
  294. }
  295. td := map[string]any{}
  296. for k, v := range data {
  297. if cdi.DatakeyFieldinfos[k] == nil {
  298. td[k] = cast.ToString(v)
  299. }
  300. }
  301. values = append(values, td)
  302. continue
  303. }
  304. // 字段类型修正
  305. var v any
  306. for _, dk := range fi.Datakey {
  307. v = data[dk]
  308. if v != nil {
  309. switch fi.Fieldtype {
  310. case "set<varchar>":
  311. v = cast.ToStringSlice(v)
  312. case "timestamp":
  313. tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
  314. if e != nil {
  315. return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  316. }
  317. v = tv.Format("2006-01-02 15:04:05.000000")
  318. }
  319. break
  320. }
  321. }
  322. if fn == "tags" {
  323. v = append(cast.ToStringSlice(v), classaliasname)
  324. }
  325. values = append(values, v)
  326. }
  327. if odbci.client != nil {
  328. // logger.Info(values...)
  329. _, err = odbci.client.Query(cdi.Insertmql, values...).Do()
  330. if err != nil {
  331. databs, _ := json.MarshalIndent(data, "", " ")
  332. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
  333. logger.Error(err)
  334. return
  335. }
  336. }
  337. atomic.AddInt64(&cdi.insertcount, 1)
  338. cdi.mutex.Lock()
  339. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  340. cdi.lastlogtime = time.Now()
  341. cdi.lastlogicount = cdi.insertcount
  342. logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  343. }
  344. cdi.mutex.Unlock()
  345. return
  346. }
  347. func (odbci *ODBCImporter) done() {
  348. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  349. cdi.mutex.Lock()
  350. if cdi.lastlogicount != cdi.insertcount {
  351. cdi.lastlogtime = time.Now()
  352. cdi.lastlogicount = cdi.insertcount
  353. logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  354. }
  355. cdi.mutex.Unlock()
  356. return true
  357. })
  358. }
  359. func (odbci *ODBCImporter) alldone() {
  360. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  361. cdi.mutex.Lock()
  362. if cdi.insertcount != 0 {
  363. cdi.lastlogtime = time.Now()
  364. cdi.lastlogicount = cdi.insertcount
  365. logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  366. }
  367. cdi.mutex.Unlock()
  368. return true
  369. })
  370. }
  371. func (odbci *ODBCImporter) reload() error {
  372. if odbci.client != nil {
  373. for i := len(schema.ClassNames) - 1; i >= 0; i-- {
  374. classname := schema.ClassNames[i]
  375. ci := schema.ClassInfos.GetIFPresent(classname)
  376. if ci == nil {
  377. continue
  378. }
  379. e := odbci.dropclass(ci.Classfullname)
  380. if e != nil {
  381. return e
  382. }
  383. }
  384. }
  385. return nil
  386. }
  387. func (odbci *ODBCImporter) dropclass(classnames ...string) error {
  388. for _, classname := range classnames {
  389. for retry := 2; retry >= 0; retry-- {
  390. _, e := odbci.client.Query(`delete from "` + classname + `" with version`).Do()
  391. _ = e
  392. _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
  393. if e != nil {
  394. matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
  395. if len(matchstr) >= 2 {
  396. e = odbci.dropclass(matchstr[1])
  397. if e != nil {
  398. return e
  399. }
  400. } else {
  401. matchstr := regexp.MustCompile(`has children \[([^\]]+)\]`).FindStringSubmatch(e.Error())
  402. if len(matchstr) >= 2 {
  403. e = odbci.dropclass(strings.Split(matchstr[1], ",")...)
  404. if e != nil {
  405. return e
  406. }
  407. }
  408. }
  409. if retry > 0 {
  410. continue
  411. }
  412. return e
  413. }
  414. }
  415. logger.Info("drop class " + classname)
  416. }
  417. return nil
  418. }