odbcimporter.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "git.wecise.com/wecise/cgimport/graph"
  9. "git.wecise.com/wecise/cgimport/odbc"
  10. "git.wecise.com/wecise/odb-go/odb"
  11. "github.com/dgryski/go-farm"
  12. "github.com/scylladb/go-set/strset"
  13. "github.com/wecisecode/util/cast"
  14. "github.com/wecisecode/util/cmap"
  15. "github.com/wecisecode/util/merrs"
  16. )
  17. type ODBCImporter struct {
  18. client odb.Client
  19. // localdb *sqlite.SQLDB
  20. }
  21. func NewODBCImporter() *ODBCImporter {
  22. odbci := &ODBCImporter{}
  23. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  24. odbci.client = odbc.ODBC()
  25. // var e error
  26. // odbci.localdb, e = sqlite.NewSQLDB(odbc.Keyspace, "localdb", false)
  27. // if e != nil {
  28. // panic(e)
  29. // }
  30. }
  31. return odbci
  32. }
  33. func (odbci *ODBCImporter) InitLocalDB(force bool) error {
  34. // return odbci.localdb.InitTable(&schema.TableDefine{
  35. // TableName: "localcache",
  36. // Fields: schema.Fields{
  37. // {Name: "key", Type: "TEXT"},
  38. // {Name: "value", Type: "TEXT"},
  39. // },
  40. // Indexes: map[string][]string{
  41. // "key": {"key"},
  42. // },
  43. // Ttl: 0,
  44. // }, force)
  45. return nil
  46. }
  47. // func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
  48. // ei, e := graph.ParseEdgeInfo(data)
  49. // if e != nil {
  50. // return e
  51. // }
  52. // if odbci.client != nil {
  53. // // foid := get_object_id_from_cache("level1", fromuid)
  54. // // toid := to_object_id("level1", touid)
  55. // // eabs, _ := json.Marshal(extraattr)
  56. // // quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
  57. // // _, err = odbci.client.Query(quadmql).Do()
  58. // // if err != nil {
  59. // // err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}})
  60. // // logger.Error(err)
  61. // // return
  62. // // }
  63. // updatemql := "update " + "level1" + " set " + " contain=contain+?" + " where id='" + foid + "'"
  64. // _, err = odbci.client.Query(updatemql, map[string][]string{
  65. // "_all": {toid},
  66. // toid: {string(eabs)},
  67. // }).Do()
  68. // if err != nil {
  69. // err = merrs.NewError(err, merrs.SSMaps{{"mql": updatemql}})
  70. // return
  71. // }
  72. // logger.Info("relation immport " + foid + "->" + toid)
  73. // }
  74. // return
  75. // }
  76. // var cm_object_id_cache = cmap.New[string, chan string]()
  77. // func object_id_cache(classaliasname, suid string) chan string {
  78. // choid, _ := cm_object_id_cache.GetWithNew(classaliasname+":"+suid,
  79. // func() (chan string, error) {
  80. // ch := make(chan string, 2)
  81. // return ch, nil
  82. // })
  83. // return choid
  84. // }
  85. // func get_object_id_from_cache(classaliasname, suid string) string {
  86. // choid := object_id_cache(classaliasname, suid)
  87. // oid := <-choid
  88. // push_object_id_into_cache(choid, oid)
  89. // return oid
  90. // }
  91. // func push_object_id_into_cache(choid chan string, oid string) {
  92. // choid <- oid
  93. // if len(choid) == 2 {
  94. // // 最多保留 1 个
  95. // // chan cap = 2,第三个元素进不来
  96. // // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
  97. // <-choid
  98. // }
  99. // }
  100. var masterlevel1mutex = make([]sync.Mutex, 256)
  101. var masterdatas = cmap.New[string, map[string]any]()
  102. var level1datas = cmap.New[string, map[string]any]()
  103. func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) (retrycount int, err error) {
  104. hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
  105. masterlevel1mutex[hidx].Lock()
  106. defer masterlevel1mutex[hidx].Unlock()
  107. switch classaliasname {
  108. case "master":
  109. level1data := level1datas.GetIFPresent(suid)
  110. if level1data == nil {
  111. // 先插入 master
  112. masterdatas.Set(suid, data)
  113. // 用 master 数据生成不完整的 level1 数据
  114. level1data = map[string]any{}
  115. for k, v := range data {
  116. if k == "id" {
  117. // master oid -> 重新生成 level1 oid
  118. oid, _, e := graph.GetNodeId("level1", data)
  119. if e != nil {
  120. return retrycount, e
  121. }
  122. v = oid
  123. }
  124. level1data[k] = v
  125. }
  126. } else {
  127. // 后插入 master
  128. level1datas.Remove(suid)
  129. // 用 level1 补齐 master 数据
  130. // data 数据不能变,需要后续插入 master
  131. entiredata := map[string]any{}
  132. for k, v := range data {
  133. entiredata[k] = v
  134. }
  135. for k, v := range level1data {
  136. entiredata[k] = v
  137. }
  138. level1data = entiredata
  139. }
  140. // 重新插入完整的 level1
  141. retrycount, e := odbci.insertData("level1", "", "", level1data)
  142. if e != nil {
  143. return retrycount, e
  144. }
  145. case "level1":
  146. masterdata := masterdatas.GetIFPresent(suid)
  147. if masterdata == nil {
  148. // 先插入 level 1
  149. level1datas.Set(suid, data)
  150. } else {
  151. // 后插入 level1
  152. masterdatas.Remove(suid)
  153. // 用 level1 补齐 master 数据
  154. entiredata := map[string]any{}
  155. for k, v := range masterdata {
  156. entiredata[k] = v
  157. }
  158. for k, v := range data {
  159. entiredata[k] = v
  160. }
  161. // 完整 level1 数据
  162. data = entiredata
  163. }
  164. // 插入 level1 数据
  165. retrycount, e := odbci.insertData("level1", "", "", data)
  166. if e != nil {
  167. return retrycount, e
  168. }
  169. }
  170. return retrycount, nil
  171. }
  172. // func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) error {
  173. // key := strings.ReplaceAll("suid", "'", "''")
  174. // hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
  175. // masterlevel1mutex[hidx].Lock()
  176. // defer masterlevel1mutex[hidx].Unlock()
  177. // switch classaliasname {
  178. // case "master":
  179. // iter, e := odbci.localdb.Select(nil, "select * from localcache where key='"+strings.ReplaceAll("suid", "'", "''")+"'")
  180. // if e != nil {
  181. // return e
  182. // }
  183. // maps, e := iter.AllMaps()
  184. // if e != nil {
  185. // return e
  186. // }
  187. // var level1data = map[string]any{}
  188. // if len(maps) == 0 {
  189. // bs_data, e := msgpack.Encode(data)
  190. // if e != nil {
  191. // return e
  192. // }
  193. // _, e = odbci.localdb.Insert(map[string]any{"key": key, "value": string(bs_data)}, false)
  194. // if e != nil {
  195. // return e
  196. // }
  197. // level1data = map[string]any{}
  198. // for k, v := range data {
  199. // if k == "id" {
  200. // oid, _, e := graph.GetNodeId("level1", data)
  201. // if e != nil {
  202. // return e
  203. // }
  204. // v = oid
  205. // }
  206. // level1data[k] = v
  207. // }
  208. // } else {
  209. // // 后插入 master
  210. // s_level1data := maps[0]["value"].(string)
  211. // e = msgpack.Decode([]byte(s_level1data), &level1data)
  212. // if e != nil {
  213. // return e
  214. // }
  215. // _, e = odbci.localdb.Delete(map[string]any{"key": key})
  216. // if e != nil {
  217. // return e
  218. // }
  219. // // 用 master 补齐 level1 数据
  220. // // data 数据不能变,需要后续插入 master
  221. // for k, v := range data {
  222. // if _, has := level1data[k]; !has {
  223. // level1data[k] = v
  224. // }
  225. // }
  226. // }
  227. // // 重新插入完整的 level1
  228. // e = odbci.insertData("level1", level1data)
  229. // if e != nil {
  230. // return e
  231. // }
  232. // case "level1":
  233. // iter, e := odbci.localdb.Select(nil, "select * from localcache where key='"+strings.ReplaceAll("suid", "'", "''")+"'")
  234. // if e != nil {
  235. // return e
  236. // }
  237. // maps, e := iter.AllMaps()
  238. // if e != nil {
  239. // return e
  240. // }
  241. // var masterdata = map[string]any{}
  242. // if len(maps) == 0 {
  243. // // 先插入 level 1
  244. // bs_data, e := msgpack.Encode(data)
  245. // if e != nil {
  246. // return e
  247. // }
  248. // _, e = odbci.localdb.Insert(map[string]any{"key": key, "value": string(bs_data)}, false)
  249. // if e != nil {
  250. // return e
  251. // }
  252. // } else {
  253. // // 后插入 level1
  254. // s_masterdata := maps[0]["value"].(string)
  255. // e = msgpack.Decode([]byte(s_masterdata), &masterdata)
  256. // if e != nil {
  257. // return e
  258. // }
  259. // _, e = odbci.localdb.Delete(map[string]any{"key": key})
  260. // if e != nil {
  261. // return e
  262. // }
  263. // // 用 level1 补齐 master 数据
  264. // for k, v := range data {
  265. // masterdata[k] = v
  266. // }
  267. // // 完整 level1 数据
  268. // data = masterdata
  269. // }
  270. // // 插入 level1 数据
  271. // e = odbci.insertData("level1", data)
  272. // if e != nil {
  273. // return e
  274. // }
  275. // }
  276. // return nil
  277. // }
  278. // 插入数据
  279. func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (retrycount int, err error) {
  280. oid, suid, e := graph.GetNodeId(classaliasname, data)
  281. if e != nil {
  282. return 0, e
  283. }
  284. data["id"] = oid
  285. if classaliasname == "master" {
  286. rc, e := odbci.masterlevel1data(classaliasname, suid, data)
  287. retrycount += rc
  288. if e != nil {
  289. return retrycount, e
  290. }
  291. } else if classaliasname == "level1" {
  292. ei := graph.GetEdgeInfo(oid)
  293. if ei != nil {
  294. data["contain"] = ei["contain"]
  295. data["depend"] = ei["depend"]
  296. data["topology"] = ei["topology"]
  297. }
  298. rc, e := odbci.masterlevel1data(classaliasname, suid, data)
  299. retrycount += rc
  300. if e != nil {
  301. return retrycount, e
  302. }
  303. // 数据已经在 masterlevel1data 中插入完成
  304. return
  305. } else {
  306. data["depend"] = referencedata(classaliasname, data)
  307. }
  308. rc, e := odbci.insertData(classaliasname, oid, suid, data)
  309. retrycount += rc
  310. if e != nil {
  311. return retrycount, e
  312. }
  313. return retrycount, nil
  314. }
  315. type InnerData struct {
  316. oid string
  317. suid string
  318. contain map[string][]string
  319. depend map[string][]string
  320. topology map[string][]string
  321. }
  322. func referencedata(classaliasname string, data map[string]any) (depend map[string][]string) {
  323. refer := data["_references"]
  324. switch vv := refer.(type) {
  325. case []interface{}:
  326. for _, v := range vv {
  327. switch vv := v.(type) {
  328. case map[string]interface{}:
  329. for k, v := range vv {
  330. switch k {
  331. case "_edgeType":
  332. case "_toUniqueId":
  333. suid := cast.ToString(v)
  334. toclassname := "master"
  335. switch classaliasname {
  336. case "level1":
  337. toclassname = "level1"
  338. case "level2":
  339. toclassname = "level1"
  340. case "level3":
  341. toclassname = "level2"
  342. case "level4":
  343. toclassname = "level3"
  344. case "level5":
  345. toclassname = "level4"
  346. case "level6":
  347. toclassname = "level5"
  348. case "level7":
  349. toclassname = "level6"
  350. case "level8":
  351. toclassname = "level7"
  352. }
  353. toid := graph.ToNodeId(toclassname, suid)
  354. m := map[string]string{"_direction": "out"}
  355. mbs, _ := json.Marshal(m)
  356. depend = map[string][]string{
  357. "_all": {toid},
  358. toid: {string(mbs)},
  359. }
  360. }
  361. }
  362. }
  363. }
  364. }
  365. return
  366. }
  367. func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, data map[string]any) (retrycount int, err error) {
  368. cdi := classdatainfos.GetIFPresent(classaliasname)
  369. if cdi == nil {
  370. return retrycount, merrs.NewError("class not defined " + classaliasname)
  371. }
  372. if cdi.Insertmql == "" {
  373. return retrycount, merrs.NewError("class no fields to insert " + classaliasname)
  374. }
  375. values := []any{}
  376. for _, fn := range cdi.Fieldslist {
  377. fi := cdi.Fieldinfos[fn]
  378. if fi == nil {
  379. values = append(values, data[fn])
  380. continue
  381. }
  382. // 合并扩展字段
  383. if strset.New(fi.Datakey...).Has("*") {
  384. if fi.Fieldtype != "map<varchar,varchar>" {
  385. return retrycount, merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
  386. }
  387. td := map[string]any{}
  388. for k, v := range data {
  389. if cdi.DatakeyFieldinfos[k] == nil {
  390. td[k] = cast.ToString(v)
  391. }
  392. }
  393. values = append(values, td)
  394. continue
  395. }
  396. // 字段类型修正
  397. var v any
  398. for _, dk := range fi.Datakey {
  399. v = data[dk]
  400. if v != nil {
  401. switch fi.Fieldtype {
  402. case "set<varchar>":
  403. v = cast.ToStringSlice(v)
  404. case "timestamp":
  405. tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
  406. if e != nil {
  407. return retrycount, merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  408. }
  409. v = tv.Format("2006-01-02 15:04:05.000000")
  410. }
  411. break
  412. }
  413. }
  414. if fn == "tags" {
  415. v = append(cast.ToStringSlice(v), classaliasname)
  416. }
  417. values = append(values, v)
  418. }
  419. if odbci.client != nil {
  420. if odbc.LogDebug && oid != "" {
  421. mql := "select id,uniqueid from " + classaliasname + " where id=?"
  422. r, e := odbci.client.Query(mql, oid).Do()
  423. if e != nil {
  424. return retrycount, e
  425. }
  426. if r != nil && len(r.Data) != 0 {
  427. logger.Debug(classaliasname, "exists id:", oid, ", uniqueid:", r.Data[0]["uniqueid"], ", new uniqueid:", suid)
  428. }
  429. }
  430. // logger.Info(values...)
  431. retrycount, err = odbci.insertDo(cdi.Insertmql, values...)
  432. if err != nil {
  433. databs, _ := json.MarshalIndent(data, "", " ")
  434. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
  435. logger.Error(err)
  436. return
  437. }
  438. }
  439. atomic.AddInt64(&cdi.insertcount, 1)
  440. cdi.mutex.Lock()
  441. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  442. cdi.lastlogtime = time.Now()
  443. cdi.lastlogicount = cdi.insertcount
  444. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  445. }
  446. cdi.mutex.Unlock()
  447. return
  448. }
  449. var insertretrylimitcount = 0
  450. func init() {
  451. mcfg.OnChange(func() {
  452. insertretrylimitcount = mcfg.GetInt("odbc.insert.retry", 3)
  453. })
  454. }
  455. func (odbci *ODBCImporter) insertDo(insertmql string, values ...any) (trycount int, err error) {
  456. for {
  457. _, e := odbci.client.Query(insertmql, values...).Do()
  458. if e != nil {
  459. trycount++
  460. e = merrs.New(e, merrs.Map{"trycount": trycount})
  461. if trycount <= insertretrylimitcount {
  462. logger.Debug(e)
  463. time.Sleep(time.Duration(trycount) * time.Second)
  464. continue
  465. }
  466. return trycount, e
  467. }
  468. return trycount, nil
  469. }
  470. }
  471. func (odbci *ODBCImporter) done() {
  472. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  473. cdi.mutex.Lock()
  474. if cdi.lastlogicount != cdi.insertcount {
  475. cdi.lastlogtime = time.Now()
  476. cdi.lastlogicount = cdi.insertcount
  477. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  478. }
  479. cdi.mutex.Unlock()
  480. return true
  481. })
  482. }
  483. func (odbci *ODBCImporter) alldone() {
  484. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  485. cdi.mutex.Lock()
  486. if cdi.insertcount != 0 {
  487. cdi.lastlogtime = time.Now()
  488. cdi.lastlogicount = cdi.insertcount
  489. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  490. }
  491. cdi.mutex.Unlock()
  492. return true
  493. })
  494. }