odbcimporter.go 10 KB


  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.wecise.com/wecise/cgimport/graph"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/odb-go/dbo"
  13. "git.wecise.com/wecise/odb-go/odb"
  14. "github.com/dgryski/go-farm"
  15. "github.com/scylladb/go-set/strset"
  16. "github.com/wecisecode/util/cast"
  17. "github.com/wecisecode/util/cmap"
  18. "github.com/wecisecode/util/merrs"
  19. "github.com/wecisecode/util/mfmt"
  20. )
  21. type ODBCImporter struct {
  22. client odb.Client
  23. schema *dbo.Schema
  24. }
  25. func NewODBCImporter() *ODBCImporter {
  26. odbci := &ODBCImporter{}
  27. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  28. odbci.client = odbc.ODBC()
  29. odbci.schema = dbo.NewSchema(odbci.client)
  30. }
  31. return odbci
  32. }
  33. var masterlevel1mutex = make([]sync.Mutex, 256)
  34. var masterdatas = cmap.New[string, map[string]any]()
  35. var level1datas = cmap.New[string, map[string]any]()
  36. func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) (retrycount int, err error) {
  37. hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
  38. masterlevel1mutex[hidx].Lock()
  39. defer masterlevel1mutex[hidx].Unlock()
  40. switch classaliasname {
  41. case "master":
  42. level1data := level1datas.GetIFPresent(suid)
  43. if level1data == nil {
  44. // 先插入 master
  45. masterdatas.Set(suid, data)
  46. // 用 master 数据生成不完整的 level1 数据
  47. level1data = map[string]any{}
  48. for k, v := range data {
  49. if k == "id" {
  50. // master oid -> 重新生成 level1 oid
  51. oid, _, e := graph.GetNodeId("level1", data)
  52. if e != nil {
  53. return retrycount, e
  54. }
  55. v = oid
  56. }
  57. level1data[k] = v
  58. }
  59. } else {
  60. // 后插入 master
  61. level1datas.Remove(suid)
  62. // 用 level1 补齐 master 数据
  63. // data 数据不能变,需要后续插入 master
  64. entiredata := map[string]any{}
  65. for k, v := range data {
  66. entiredata[k] = v
  67. }
  68. for k, v := range level1data {
  69. entiredata[k] = v
  70. }
  71. level1data = entiredata
  72. }
  73. // 重新插入完整的 level1
  74. retrycount, _, e := odbci.insertData("level1", "", "", level1data)
  75. if e != nil {
  76. return retrycount, e
  77. }
  78. case "level1":
  79. masterdata := masterdatas.GetIFPresent(suid)
  80. if masterdata == nil {
  81. // 先插入 level 1
  82. level1datas.Set(suid, data)
  83. } else {
  84. // 后插入 level1
  85. masterdatas.Remove(suid)
  86. // 用 level1 补齐 master 数据
  87. entiredata := map[string]any{}
  88. for k, v := range masterdata {
  89. entiredata[k] = v
  90. }
  91. for k, v := range data {
  92. entiredata[k] = v
  93. }
  94. // 完整 level1 数据
  95. data = entiredata
  96. }
  97. // 插入 level1 数据
  98. retrycount, _, e := odbci.insertData("level1", "", "", data)
  99. if e != nil {
  100. return retrycount, e
  101. }
  102. }
  103. return retrycount, nil
  104. }
  105. // 插入数据
  106. func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (retrycount int, err error) {
  107. oid, suid, e := graph.GetNodeId(classaliasname, data)
  108. if e != nil {
  109. return 0, e
  110. }
  111. data["id"] = oid
  112. if classaliasname == "master" {
  113. rc, e := odbci.masterlevel1data(classaliasname, suid, data)
  114. retrycount += rc
  115. if e != nil {
  116. return retrycount, e
  117. }
  118. } else if classaliasname == "level1" {
  119. ei := graph.GetEdgeInfo(oid)
  120. if ei != nil {
  121. data["contain"] = ei["contain"]
  122. data["depend"] = ei["depend"]
  123. data["topology"] = ei["topology"]
  124. }
  125. rc, e := odbci.masterlevel1data(classaliasname, suid, data)
  126. retrycount += rc
  127. if e != nil {
  128. return retrycount, e
  129. }
  130. // 数据已经在 masterlevel1data 中插入完成
  131. return
  132. } else {
  133. data["depend"] = referencedata(classaliasname, data)
  134. }
  135. rc, _, e := odbci.insertData(classaliasname, oid, suid, data)
  136. retrycount += rc
  137. if e != nil {
  138. return retrycount, e
  139. }
  140. return retrycount, nil
  141. }
  142. type InnerData struct {
  143. oid string
  144. suid string
  145. contain map[string][]string
  146. depend map[string][]string
  147. topology map[string][]string
  148. }
  149. func referencedata(classaliasname string, data map[string]any) (depend map[string][]string) {
  150. refer := data["_references"]
  151. switch vv := refer.(type) {
  152. case []interface{}:
  153. for _, v := range vv {
  154. switch vv := v.(type) {
  155. case map[string]interface{}:
  156. for k, v := range vv {
  157. switch k {
  158. case "_edgeType":
  159. case "_toUniqueId":
  160. suid := cast.ToString(v)
  161. toclassname := "master"
  162. switch classaliasname {
  163. case "level1":
  164. toclassname = "level1"
  165. case "level2":
  166. toclassname = "level1"
  167. case "level3":
  168. toclassname = "level2"
  169. case "level4":
  170. toclassname = "level3"
  171. case "level5":
  172. toclassname = "level4"
  173. case "level6":
  174. toclassname = "level5"
  175. case "level7":
  176. toclassname = "level6"
  177. case "level8":
  178. toclassname = "level7"
  179. }
  180. toid := graph.ToNodeId(toclassname, suid)
  181. m := map[string]string{"_direction": "out"}
  182. mbs, _ := json.Marshal(m)
  183. depend = map[string][]string{
  184. "_all": {toid},
  185. toid: {string(mbs)},
  186. }
  187. }
  188. }
  189. }
  190. }
  191. }
  192. return
  193. }
  194. func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, data map[string]any) (retrycount int, responsetime time.Duration, err error) {
  195. cdi := classdatainfos.GetIFPresent(classaliasname)
  196. if cdi == nil {
  197. return retrycount, 0, merrs.NewError("class not defined " + classaliasname)
  198. }
  199. if cdi.Insertmql == "" {
  200. return retrycount, 0, merrs.NewError("class no fields to insert " + classaliasname)
  201. }
  202. values := []any{}
  203. for _, fn := range cdi.Fieldslist {
  204. fi := cdi.Fieldinfos[fn]
  205. if fi == nil {
  206. values = append(values, data[fn])
  207. continue
  208. }
  209. // 合并扩展字段
  210. if strset.New(fi.Datakey...).Has("*") {
  211. if fi.Fieldtype != "map<varchar,varchar>" {
  212. return retrycount, 0, merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
  213. }
  214. td := map[string]any{}
  215. for k, v := range data {
  216. if cdi.DatakeyFieldinfos[k] == nil {
  217. td[k] = cast.ToString(v)
  218. }
  219. }
  220. values = append(values, td)
  221. continue
  222. }
  223. // 字段类型修正
  224. var v any
  225. for _, dk := range fi.Datakey {
  226. v = data[dk]
  227. if v != nil {
  228. switch fi.Fieldtype {
  229. case "set<varchar>":
  230. v = cast.ToStringSlice(v)
  231. case "timestamp":
  232. tv, e := cast.ToDatetimeE(v, "2006-01-02-15.04.05.000000")
  233. if e != nil {
  234. return retrycount, 0, merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  235. }
  236. v = tv.Format("2006-01-02 15:04:05.000000")
  237. }
  238. break
  239. }
  240. }
  241. if fn == "tags" {
  242. v = append(cast.ToStringSlice(v), classaliasname)
  243. }
  244. values = append(values, v)
  245. }
  246. if odbci.client != nil {
  247. if odbc.LogDebug && oid != "" {
  248. mql := "select id,uniqueid from " + classaliasname + " where id=?"
  249. r, e := odbci.client.Query(mql, oid).Do()
  250. if e != nil {
  251. return retrycount, 0, e
  252. }
  253. if r != nil && len(r.Data) != 0 {
  254. logger.Debug(classaliasname, "exists id:", oid, ", uniqueid:", r.Data[0]["uniqueid"], ", new uniqueid:", suid)
  255. }
  256. }
  257. // logger.Info(values...)
  258. retrycount, responsetime, err = odbci.insertDo(cdi.Insertmql, values...)
  259. if err != nil {
  260. databs, _ := json.MarshalIndent(data, "", " ")
  261. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
  262. logger.Error(err)
  263. return
  264. }
  265. }
  266. atomic.AddInt64(&cdi.insertcount, 1)
  267. cdi.mutex.Lock()
  268. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  269. cdi.lastlogtime = time.Now()
  270. cdi.lastlogicount = cdi.insertcount
  271. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  272. }
  273. cdi.mutex.Unlock()
  274. return
  275. }
  276. type ODBCRetryConfig struct {
  277. retry int
  278. interval time.Duration
  279. contains string
  280. }
  281. var renumber = regexp.MustCompile(`^\s*(-?[0-9]+)\s*$`)
  282. var reduration = regexp.MustCompile(`^\s*(-?[0-9]+)[smhd]\s*$`)
  283. var odbcretry = ""
  284. var odbcretryconfig []*ODBCRetryConfig
  285. func init() {
  286. mcfg.OnChange(func() {
  287. _odbcretry := mcfg.GetStrings("odbc.retry",
  288. "-1, 1s, timed out",
  289. "-1, 1s, proc timeout",
  290. "-1, 1m, no response received from cassandra")
  291. if strings.Join(_odbcretry, "|") != odbcretry {
  292. odbcretryconfig = RetryConfig(_odbcretry...)
  293. odbcretry = strings.Join(_odbcretry, "|")
  294. }
  295. })
  296. }
  297. func RetryConfig(retryconfig ...string) (orcs []*ODBCRetryConfig) {
  298. defaultorc := &ODBCRetryConfig{
  299. retry: 0,
  300. interval: time.Second,
  301. contains: "",
  302. }
  303. for _, retrycfg := range retryconfig {
  304. sss := strings.SplitN(retrycfg, ",", 3)
  305. if len(sss) == 3 && renumber.MatchString(sss[0]) && reduration.MatchString(sss[1]) {
  306. retry := cast.ToInt(strings.TrimSpace(sss[0]))
  307. interval := mfmt.ParseDuration(strings.TrimSpace(sss[1]))
  308. contains := strings.TrimSpace(sss[2])
  309. orc := &ODBCRetryConfig{
  310. retry: retry,
  311. interval: interval,
  312. contains: contains,
  313. }
  314. if orc.contains == "" {
  315. defaultorc = orc
  316. } else {
  317. orcs = append(orcs, orc)
  318. }
  319. } else {
  320. panic("odbc.retry config format error")
  321. }
  322. }
  323. orcs = append(orcs, defaultorc)
  324. return
  325. }
  326. func (odbci *ODBCImporter) insertDo(insertmql string, values ...any) (trycount int, responsetime time.Duration, err error) {
  327. for {
  328. st := time.Now()
  329. _, e := odbci.client.Query(insertmql, values...).Do()
  330. if e != nil {
  331. maxtrycount := 0
  332. for _, orc := range odbcretryconfig {
  333. if orc.contains != "" {
  334. if strings.Contains(e.Error(), orc.contains) {
  335. maxtrycount = orc.retry
  336. break
  337. }
  338. } else {
  339. maxtrycount = orc.retry
  340. break
  341. }
  342. }
  343. trycount++
  344. e = merrs.New(e, merrs.Map{"trycount": trycount})
  345. if maxtrycount < 0 || trycount <= maxtrycount {
  346. logger.Debug(merrs.New(e, merrs.Map{"retrycount": trycount}))
  347. time.Sleep(time.Duration(trycount) * time.Second)
  348. continue
  349. }
  350. return trycount, 0, e
  351. }
  352. responsetime = time.Since(st)
  353. return trycount, responsetime, nil
  354. }
  355. }
  356. func (odbci *ODBCImporter) done() {
  357. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  358. cdi.mutex.Lock()
  359. if cdi.lastlogicount != cdi.insertcount {
  360. cdi.lastlogtime = time.Now()
  361. cdi.lastlogicount = cdi.insertcount
  362. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  363. }
  364. cdi.mutex.Unlock()
  365. return true
  366. })
  367. }
  368. func (odbci *ODBCImporter) alldone() {
  369. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  370. cdi.mutex.Lock()
  371. if cdi.insertcount != 0 {
  372. cdi.lastlogtime = time.Now()
  373. cdi.lastlogicount = cdi.insertcount
  374. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  375. }
  376. cdi.mutex.Unlock()
  377. return true
  378. })
  379. }