odbcimporter.go 15 KB


  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.wecise.com/wecise/cgimport/graph"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/odb-go/odb"
  13. "github.com/dgryski/go-farm"
  14. "github.com/scylladb/go-set/strset"
  15. "github.com/wecisecode/util/cast"
  16. "github.com/wecisecode/util/cmap"
  17. "github.com/wecisecode/util/merrs"
  18. )
  19. type ODBCImporter struct {
  20. client odb.Client
  21. // localdb *sqlite.SQLDB
  22. }
  23. func NewODBCImporter() *ODBCImporter {
  24. odbci := &ODBCImporter{}
  25. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  26. odbci.client = odbc.ODBC()
  27. // var e error
  28. // odbci.localdb, e = sqlite.NewSQLDB(odbc.Keyspace, "localdb", false)
  29. // if e != nil {
  30. // panic(e)
  31. // }
  32. }
  33. return odbci
  34. }
  35. func (odbci *ODBCImporter) InitLocalDB(force bool) error {
  36. // return odbci.localdb.InitTable(&schema.TableDefine{
  37. // TableName: "localcache",
  38. // Fields: schema.Fields{
  39. // {Name: "key", Type: "TEXT"},
  40. // {Name: "value", Type: "TEXT"},
  41. // },
  42. // Indexes: map[string][]string{
  43. // "key": {"key"},
  44. // },
  45. // Ttl: 0,
  46. // }, force)
  47. return nil
  48. }
  49. // func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
  50. // ei, e := graph.ParseEdgeInfo(data)
  51. // if e != nil {
  52. // return e
  53. // }
  54. // if odbci.client != nil {
  55. // // foid := get_object_id_from_cache("level1", fromuid)
  56. // // toid := to_object_id("level1", touid)
  57. // // eabs, _ := json.Marshal(extraattr)
  58. // // quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
  59. // // _, err = odbci.client.Query(quadmql).Do()
  60. // // if err != nil {
  61. // // err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}})
  62. // // logger.Error(err)
  63. // // return
  64. // // }
  65. // updatemql := "update " + "level1" + " set " + " contain=contain+?" + " where id='" + foid + "'"
  66. // _, err = odbci.client.Query(updatemql, map[string][]string{
  67. // "_all": {toid},
  68. // toid: {string(eabs)},
  69. // }).Do()
  70. // if err != nil {
  71. // err = merrs.NewError(err, merrs.SSMaps{{"mql": updatemql}})
  72. // return
  73. // }
  74. // logger.Info("relation immport " + foid + "->" + toid)
  75. // }
  76. // return
  77. // }
  78. // var cm_object_id_cache = cmap.New[string, chan string]()
  79. // func object_id_cache(classaliasname, suid string) chan string {
  80. // choid, _ := cm_object_id_cache.GetWithNew(classaliasname+":"+suid,
  81. // func() (chan string, error) {
  82. // ch := make(chan string, 2)
  83. // return ch, nil
  84. // })
  85. // return choid
  86. // }
  87. // func get_object_id_from_cache(classaliasname, suid string) string {
  88. // choid := object_id_cache(classaliasname, suid)
  89. // oid := <-choid
  90. // push_object_id_into_cache(choid, oid)
  91. // return oid
  92. // }
  93. // func push_object_id_into_cache(choid chan string, oid string) {
  94. // choid <- oid
  95. // if len(choid) == 2 {
  96. // // 最多保留 1 个
  97. // // chan cap = 2,第三个元素进不来
  98. // // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
  99. // <-choid
  100. // }
  101. // }
  102. var masterlevel1mutex = make([]sync.Mutex, 256)
  103. var masterdatas = cmap.New[string, map[string]any]()
  104. var level1datas = cmap.New[string, map[string]any]()
  105. func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) (retrycount int, err error) {
  106. hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
  107. masterlevel1mutex[hidx].Lock()
  108. defer masterlevel1mutex[hidx].Unlock()
  109. switch classaliasname {
  110. case "master":
  111. level1data := level1datas.GetIFPresent(suid)
  112. if level1data == nil {
  113. // 先插入 master
  114. masterdatas.Set(suid, data)
  115. // 用 master 数据生成不完整的 level1 数据
  116. level1data = map[string]any{}
  117. for k, v := range data {
  118. if k == "id" {
  119. // master oid -> 重新生成 level1 oid
  120. oid, _, e := graph.GetNodeId("level1", data)
  121. if e != nil {
  122. return retrycount, e
  123. }
  124. v = oid
  125. }
  126. level1data[k] = v
  127. }
  128. } else {
  129. // 后插入 master
  130. level1datas.Remove(suid)
  131. // 用 level1 补齐 master 数据
  132. // data 数据不能变,需要后续插入 master
  133. entiredata := map[string]any{}
  134. for k, v := range data {
  135. entiredata[k] = v
  136. }
  137. for k, v := range level1data {
  138. entiredata[k] = v
  139. }
  140. level1data = entiredata
  141. }
  142. // 重新插入完整的 level1
  143. retrycount, _, e := odbci.insertData("level1", "", "", level1data)
  144. if e != nil {
  145. return retrycount, e
  146. }
  147. case "level1":
  148. masterdata := masterdatas.GetIFPresent(suid)
  149. if masterdata == nil {
  150. // 先插入 level 1
  151. level1datas.Set(suid, data)
  152. } else {
  153. // 后插入 level1
  154. masterdatas.Remove(suid)
  155. // 用 level1 补齐 master 数据
  156. entiredata := map[string]any{}
  157. for k, v := range masterdata {
  158. entiredata[k] = v
  159. }
  160. for k, v := range data {
  161. entiredata[k] = v
  162. }
  163. // 完整 level1 数据
  164. data = entiredata
  165. }
  166. // 插入 level1 数据
  167. retrycount, _, e := odbci.insertData("level1", "", "", data)
  168. if e != nil {
  169. return retrycount, e
  170. }
  171. }
  172. return retrycount, nil
  173. }
  174. // func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) error {
  175. // key := strings.ReplaceAll("suid", "'", "''")
  176. // hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
  177. // masterlevel1mutex[hidx].Lock()
  178. // defer masterlevel1mutex[hidx].Unlock()
  179. // switch classaliasname {
  180. // case "master":
  181. // iter, e := odbci.localdb.Select(nil, "select * from localcache where key='"+strings.ReplaceAll("suid", "'", "''")+"'")
  182. // if e != nil {
  183. // return e
  184. // }
  185. // maps, e := iter.AllMaps()
  186. // if e != nil {
  187. // return e
  188. // }
  189. // var level1data = map[string]any{}
  190. // if len(maps) == 0 {
  191. // bs_data, e := msgpack.Encode(data)
  192. // if e != nil {
  193. // return e
  194. // }
  195. // _, e = odbci.localdb.Insert(map[string]any{"key": key, "value": string(bs_data)}, false)
  196. // if e != nil {
  197. // return e
  198. // }
  199. // level1data = map[string]any{}
  200. // for k, v := range data {
  201. // if k == "id" {
  202. // oid, _, e := graph.GetNodeId("level1", data)
  203. // if e != nil {
  204. // return e
  205. // }
  206. // v = oid
  207. // }
  208. // level1data[k] = v
  209. // }
  210. // } else {
  211. // // 后插入 master
  212. // s_level1data := maps[0]["value"].(string)
  213. // e = msgpack.Decode([]byte(s_level1data), &level1data)
  214. // if e != nil {
  215. // return e
  216. // }
  217. // _, e = odbci.localdb.Delete(map[string]any{"key": key})
  218. // if e != nil {
  219. // return e
  220. // }
  221. // // 用 master 补齐 level1 数据
  222. // // data 数据不能变,需要后续插入 master
  223. // for k, v := range data {
  224. // if _, has := level1data[k]; !has {
  225. // level1data[k] = v
  226. // }
  227. // }
  228. // }
  229. // // 重新插入完整的 level1
  230. // e = odbci.insertData("level1", level1data)
  231. // if e != nil {
  232. // return e
  233. // }
  234. // case "level1":
  235. // iter, e := odbci.localdb.Select(nil, "select * from localcache where key='"+strings.ReplaceAll("suid", "'", "''")+"'")
  236. // if e != nil {
  237. // return e
  238. // }
  239. // maps, e := iter.AllMaps()
  240. // if e != nil {
  241. // return e
  242. // }
  243. // var masterdata = map[string]any{}
  244. // if len(maps) == 0 {
  245. // // 先插入 level 1
  246. // bs_data, e := msgpack.Encode(data)
  247. // if e != nil {
  248. // return e
  249. // }
  250. // _, e = odbci.localdb.Insert(map[string]any{"key": key, "value": string(bs_data)}, false)
  251. // if e != nil {
  252. // return e
  253. // }
  254. // } else {
  255. // // 后插入 level1
  256. // s_masterdata := maps[0]["value"].(string)
  257. // e = msgpack.Decode([]byte(s_masterdata), &masterdata)
  258. // if e != nil {
  259. // return e
  260. // }
  261. // _, e = odbci.localdb.Delete(map[string]any{"key": key})
  262. // if e != nil {
  263. // return e
  264. // }
  265. // // 用 level1 补齐 master 数据
  266. // for k, v := range data {
  267. // masterdata[k] = v
  268. // }
  269. // // 完整 level1 数据
  270. // data = masterdata
  271. // }
  272. // // 插入 level1 数据
  273. // e = odbci.insertData("level1", data)
  274. // if e != nil {
  275. // return e
  276. // }
  277. // }
  278. // return nil
  279. // }
  280. // 插入数据
  281. func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (retrycount int, err error) {
  282. oid, suid, e := graph.GetNodeId(classaliasname, data)
  283. if e != nil {
  284. return 0, e
  285. }
  286. data["id"] = oid
  287. if classaliasname == "master" {
  288. rc, e := odbci.masterlevel1data(classaliasname, suid, data)
  289. retrycount += rc
  290. if e != nil {
  291. return retrycount, e
  292. }
  293. } else if classaliasname == "level1" {
  294. ei := graph.GetEdgeInfo(oid)
  295. if ei != nil {
  296. data["contain"] = ei["contain"]
  297. data["depend"] = ei["depend"]
  298. data["topology"] = ei["topology"]
  299. }
  300. rc, e := odbci.masterlevel1data(classaliasname, suid, data)
  301. retrycount += rc
  302. if e != nil {
  303. return retrycount, e
  304. }
  305. // 数据已经在 masterlevel1data 中插入完成
  306. return
  307. } else {
  308. data["depend"] = referencedata(classaliasname, data)
  309. }
  310. rc, _, e := odbci.insertData(classaliasname, oid, suid, data)
  311. retrycount += rc
  312. if e != nil {
  313. return retrycount, e
  314. }
  315. return retrycount, nil
  316. }
  317. type InnerData struct {
  318. oid string
  319. suid string
  320. contain map[string][]string
  321. depend map[string][]string
  322. topology map[string][]string
  323. }
  324. func referencedata(classaliasname string, data map[string]any) (depend map[string][]string) {
  325. refer := data["_references"]
  326. switch vv := refer.(type) {
  327. case []interface{}:
  328. for _, v := range vv {
  329. switch vv := v.(type) {
  330. case map[string]interface{}:
  331. for k, v := range vv {
  332. switch k {
  333. case "_edgeType":
  334. case "_toUniqueId":
  335. suid := cast.ToString(v)
  336. toclassname := "master"
  337. switch classaliasname {
  338. case "level1":
  339. toclassname = "level1"
  340. case "level2":
  341. toclassname = "level1"
  342. case "level3":
  343. toclassname = "level2"
  344. case "level4":
  345. toclassname = "level3"
  346. case "level5":
  347. toclassname = "level4"
  348. case "level6":
  349. toclassname = "level5"
  350. case "level7":
  351. toclassname = "level6"
  352. case "level8":
  353. toclassname = "level7"
  354. }
  355. toid := graph.ToNodeId(toclassname, suid)
  356. m := map[string]string{"_direction": "out"}
  357. mbs, _ := json.Marshal(m)
  358. depend = map[string][]string{
  359. "_all": {toid},
  360. toid: {string(mbs)},
  361. }
  362. }
  363. }
  364. }
  365. }
  366. }
  367. return
  368. }
  369. func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, data map[string]any) (retrycount int, responsetime time.Duration, err error) {
  370. cdi := classdatainfos.GetIFPresent(classaliasname)
  371. if cdi == nil {
  372. return retrycount, 0, merrs.NewError("class not defined " + classaliasname)
  373. }
  374. if cdi.Insertmql == "" {
  375. return retrycount, 0, merrs.NewError("class no fields to insert " + classaliasname)
  376. }
  377. values := []any{}
  378. for _, fn := range cdi.Fieldslist {
  379. fi := cdi.Fieldinfos[fn]
  380. if fi == nil {
  381. values = append(values, data[fn])
  382. continue
  383. }
  384. // 合并扩展字段
  385. if strset.New(fi.Datakey...).Has("*") {
  386. if fi.Fieldtype != "map<varchar,varchar>" {
  387. return retrycount, 0, merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
  388. }
  389. td := map[string]any{}
  390. for k, v := range data {
  391. if cdi.DatakeyFieldinfos[k] == nil {
  392. td[k] = cast.ToString(v)
  393. }
  394. }
  395. values = append(values, td)
  396. continue
  397. }
  398. // 字段类型修正
  399. var v any
  400. for _, dk := range fi.Datakey {
  401. v = data[dk]
  402. if v != nil {
  403. switch fi.Fieldtype {
  404. case "set<varchar>":
  405. v = cast.ToStringSlice(v)
  406. case "timestamp":
  407. tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
  408. if e != nil {
  409. return retrycount, 0, merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  410. }
  411. v = tv.Format("2006-01-02 15:04:05.000000")
  412. }
  413. break
  414. }
  415. }
  416. if fn == "tags" {
  417. v = append(cast.ToStringSlice(v), classaliasname)
  418. }
  419. values = append(values, v)
  420. }
  421. if odbci.client != nil {
  422. if odbc.LogDebug && oid != "" {
  423. mql := "select id,uniqueid from " + classaliasname + " where id=?"
  424. r, e := odbci.client.Query(mql, oid).Do()
  425. if e != nil {
  426. return retrycount, 0, e
  427. }
  428. if r != nil && len(r.Data) != 0 {
  429. logger.Debug(classaliasname, "exists id:", oid, ", uniqueid:", r.Data[0]["uniqueid"], ", new uniqueid:", suid)
  430. }
  431. }
  432. // logger.Info(values...)
  433. retrycount, responsetime, err = odbci.insertDo(cdi.Insertmql, values...)
  434. if err != nil {
  435. databs, _ := json.MarshalIndent(data, "", " ")
  436. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
  437. logger.Error(err)
  438. return
  439. }
  440. }
  441. atomic.AddInt64(&cdi.insertcount, 1)
  442. cdi.mutex.Lock()
  443. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  444. cdi.lastlogtime = time.Now()
  445. cdi.lastlogicount = cdi.insertcount
  446. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  447. }
  448. cdi.mutex.Unlock()
  449. return
  450. }
  451. type ODBCRetryConfig struct {
  452. retry int
  453. contains string
  454. }
  455. var reodbcretry = regexp.MustCompile(`(?s)^\s*(-?[0-9]+)\s*(?:,\s*(.*)\s*)?$`)
  456. var odbcretry = ""
  457. var odbcretryconfig []*ODBCRetryConfig
  458. func init() {
  459. mcfg.OnChange(func() {
  460. _odbcretry := mcfg.GetStrings("odbc.retry", "-1, timed out")
  461. if strings.Join(_odbcretry, "|") != odbcretry {
  462. odbcretryconfig = RetryConfig(_odbcretry...)
  463. odbcretry = strings.Join(_odbcretry, "|")
  464. }
  465. })
  466. }
  467. func RetryConfig(retryconfig ...string) (orcs []*ODBCRetryConfig) {
  468. defaultorc := &ODBCRetryConfig{
  469. retry: 0,
  470. contains: "",
  471. }
  472. for _, retrycfg := range retryconfig {
  473. ss := reodbcretry.FindStringSubmatch(retrycfg)
  474. if len(ss) > 1 {
  475. orc := &ODBCRetryConfig{
  476. retry: cast.ToInt(ss[0]),
  477. contains: ss[1],
  478. }
  479. if orc.contains == "" {
  480. defaultorc = orc
  481. } else {
  482. orcs = append(orcs, orc)
  483. }
  484. } else {
  485. panic("odbc.retry config format error")
  486. }
  487. }
  488. orcs = append(orcs, defaultorc)
  489. return
  490. }
  491. func (odbci *ODBCImporter) insertDo(insertmql string, values ...any) (trycount int, responsetime time.Duration, err error) {
  492. for {
  493. st := time.Now()
  494. _, e := odbci.client.Query(insertmql, values...).Do()
  495. if e != nil {
  496. maxtrycount := 0
  497. for _, orc := range odbcretryconfig {
  498. if orc.contains != "" {
  499. if strings.Contains(e.Error(), orc.contains) {
  500. maxtrycount = orc.retry
  501. break
  502. }
  503. } else {
  504. maxtrycount = orc.retry
  505. break
  506. }
  507. }
  508. trycount++
  509. e = merrs.New(e, merrs.Map{"trycount": trycount})
  510. if maxtrycount < 0 || trycount <= maxtrycount {
  511. logger.Debug(merrs.New(e, merrs.Map{"retrycount": trycount}))
  512. time.Sleep(time.Duration(trycount) * time.Second)
  513. continue
  514. }
  515. return trycount, 0, e
  516. }
  517. responsetime = time.Since(st)
  518. return trycount, responsetime, nil
  519. }
  520. }
  521. func (odbci *ODBCImporter) done() {
  522. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  523. cdi.mutex.Lock()
  524. if cdi.lastlogicount != cdi.insertcount {
  525. cdi.lastlogtime = time.Now()
  526. cdi.lastlogicount = cdi.insertcount
  527. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  528. }
  529. cdi.mutex.Unlock()
  530. return true
  531. })
  532. }
  533. func (odbci *ODBCImporter) alldone() {
  534. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  535. cdi.mutex.Lock()
  536. if cdi.insertcount != 0 {
  537. cdi.lastlogtime = time.Now()
  538. cdi.lastlogicount = cdi.insertcount
  539. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  540. }
  541. cdi.mutex.Unlock()
  542. return true
  543. })
  544. }