odbcimporter.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.wecise.com/wecise/cgimport/graph"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/odb-go/odb"
  13. "github.com/dgryski/go-farm"
  14. "github.com/scylladb/go-set/strset"
  15. "github.com/wecisecode/util/cast"
  16. "github.com/wecisecode/util/cmap"
  17. "github.com/wecisecode/util/merrs"
  18. "github.com/wecisecode/util/mfmt"
  19. )
  20. type ODBCImporter struct {
  21. client odb.Client
  22. }
  23. func NewODBCImporter() *ODBCImporter {
  24. odbci := &ODBCImporter{}
  25. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  26. odbci.client = odbc.ODBC()
  27. }
  28. return odbci
  29. }
  30. var masterlevel1mutex = make([]sync.Mutex, 256)
  31. var masterdatas = cmap.New[string, map[string]any]()
  32. var level1datas = cmap.New[string, map[string]any]()
  33. func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) (retrycount int, err error) {
  34. hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
  35. masterlevel1mutex[hidx].Lock()
  36. defer masterlevel1mutex[hidx].Unlock()
  37. switch classaliasname {
  38. case "master":
  39. level1data := level1datas.GetIFPresent(suid)
  40. if level1data == nil {
  41. // 先插入 master
  42. masterdatas.Set(suid, data)
  43. // 用 master 数据生成不完整的 level1 数据
  44. level1data = map[string]any{}
  45. for k, v := range data {
  46. if k == "id" {
  47. // master oid -> 重新生成 level1 oid
  48. oid, _, e := graph.GetNodeId("level1", data)
  49. if e != nil {
  50. return retrycount, e
  51. }
  52. v = oid
  53. }
  54. level1data[k] = v
  55. }
  56. } else {
  57. // 后插入 master
  58. level1datas.Remove(suid)
  59. // 用 level1 补齐 master 数据
  60. // data 数据不能变,需要后续插入 master
  61. entiredata := map[string]any{}
  62. for k, v := range data {
  63. entiredata[k] = v
  64. }
  65. for k, v := range level1data {
  66. entiredata[k] = v
  67. }
  68. level1data = entiredata
  69. }
  70. // 重新插入完整的 level1
  71. retrycount, _, e := odbci.insertData("level1", "", "", level1data)
  72. if e != nil {
  73. return retrycount, e
  74. }
  75. case "level1":
  76. masterdata := masterdatas.GetIFPresent(suid)
  77. if masterdata == nil {
  78. // 先插入 level 1
  79. level1datas.Set(suid, data)
  80. } else {
  81. // 后插入 level1
  82. masterdatas.Remove(suid)
  83. // 用 level1 补齐 master 数据
  84. entiredata := map[string]any{}
  85. for k, v := range masterdata {
  86. entiredata[k] = v
  87. }
  88. for k, v := range data {
  89. entiredata[k] = v
  90. }
  91. // 完整 level1 数据
  92. data = entiredata
  93. }
  94. // 插入 level1 数据
  95. retrycount, _, e := odbci.insertData("level1", "", "", data)
  96. if e != nil {
  97. return retrycount, e
  98. }
  99. }
  100. return retrycount, nil
  101. }
  102. // 插入数据
  103. func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (retrycount int, err error) {
  104. oid, suid, e := graph.GetNodeId(classaliasname, data)
  105. if e != nil {
  106. return 0, e
  107. }
  108. data["id"] = oid
  109. if classaliasname == "master" {
  110. rc, e := odbci.masterlevel1data(classaliasname, suid, data)
  111. retrycount += rc
  112. if e != nil {
  113. return retrycount, e
  114. }
  115. } else if classaliasname == "level1" {
  116. ei := graph.GetEdgeInfo(oid)
  117. if ei != nil {
  118. data["contain"] = ei["contain"]
  119. data["depend"] = ei["depend"]
  120. data["topology"] = ei["topology"]
  121. }
  122. rc, e := odbci.masterlevel1data(classaliasname, suid, data)
  123. retrycount += rc
  124. if e != nil {
  125. return retrycount, e
  126. }
  127. // 数据已经在 masterlevel1data 中插入完成
  128. return
  129. } else {
  130. data["depend"] = referencedata(classaliasname, data)
  131. }
  132. rc, _, e := odbci.insertData(classaliasname, oid, suid, data)
  133. retrycount += rc
  134. if e != nil {
  135. return retrycount, e
  136. }
  137. return retrycount, nil
  138. }
  139. type InnerData struct {
  140. oid string
  141. suid string
  142. contain map[string][]string
  143. depend map[string][]string
  144. topology map[string][]string
  145. }
  146. func referencedata(classaliasname string, data map[string]any) (depend map[string][]string) {
  147. refer := data["_references"]
  148. switch vv := refer.(type) {
  149. case []interface{}:
  150. for _, v := range vv {
  151. switch vv := v.(type) {
  152. case map[string]interface{}:
  153. for k, v := range vv {
  154. switch k {
  155. case "_edgeType":
  156. case "_toUniqueId":
  157. suid := cast.ToString(v)
  158. toclassname := "master"
  159. switch classaliasname {
  160. case "level1":
  161. toclassname = "level1"
  162. case "level2":
  163. toclassname = "level1"
  164. case "level3":
  165. toclassname = "level2"
  166. case "level4":
  167. toclassname = "level3"
  168. case "level5":
  169. toclassname = "level4"
  170. case "level6":
  171. toclassname = "level5"
  172. case "level7":
  173. toclassname = "level6"
  174. case "level8":
  175. toclassname = "level7"
  176. }
  177. toid := graph.ToNodeId(toclassname, suid)
  178. m := map[string]string{"_direction": "out"}
  179. mbs, _ := json.Marshal(m)
  180. depend = map[string][]string{
  181. "_all": {toid},
  182. toid: {string(mbs)},
  183. }
  184. }
  185. }
  186. }
  187. }
  188. }
  189. return
  190. }
  191. func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, data map[string]any) (retrycount int, responsetime time.Duration, err error) {
  192. cdi := classdatainfos.GetIFPresent(classaliasname)
  193. if cdi == nil {
  194. return retrycount, 0, merrs.NewError("class not defined " + classaliasname)
  195. }
  196. if cdi.Insertmql == "" {
  197. return retrycount, 0, merrs.NewError("class no fields to insert " + classaliasname)
  198. }
  199. values := []any{}
  200. for _, fn := range cdi.Fieldslist {
  201. fi := cdi.Fieldinfos[fn]
  202. if fi == nil {
  203. values = append(values, data[fn])
  204. continue
  205. }
  206. // 合并扩展字段
  207. if strset.New(fi.Datakey...).Has("*") {
  208. if fi.Fieldtype != "map<varchar,varchar>" {
  209. return retrycount, 0, merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
  210. }
  211. td := map[string]any{}
  212. for k, v := range data {
  213. if cdi.DatakeyFieldinfos[k] == nil {
  214. td[k] = cast.ToString(v)
  215. }
  216. }
  217. values = append(values, td)
  218. continue
  219. }
  220. // 字段类型修正
  221. var v any
  222. for _, dk := range fi.Datakey {
  223. v = data[dk]
  224. if v != nil {
  225. switch fi.Fieldtype {
  226. case "set<varchar>":
  227. v = cast.ToStringSlice(v)
  228. case "timestamp":
  229. tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
  230. if e != nil {
  231. return retrycount, 0, merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  232. }
  233. v = tv.Format("2006-01-02 15:04:05.000000")
  234. }
  235. break
  236. }
  237. }
  238. if fn == "tags" {
  239. v = append(cast.ToStringSlice(v), classaliasname)
  240. }
  241. values = append(values, v)
  242. }
  243. if odbci.client != nil {
  244. if odbc.LogDebug && oid != "" {
  245. mql := "select id,uniqueid from " + classaliasname + " where id=?"
  246. r, e := odbci.client.Query(mql, oid).Do()
  247. if e != nil {
  248. return retrycount, 0, e
  249. }
  250. if r != nil && len(r.Data) != 0 {
  251. logger.Debug(classaliasname, "exists id:", oid, ", uniqueid:", r.Data[0]["uniqueid"], ", new uniqueid:", suid)
  252. }
  253. }
  254. // logger.Info(values...)
  255. retrycount, responsetime, err = odbci.insertDo(cdi.Insertmql, values...)
  256. if err != nil {
  257. databs, _ := json.MarshalIndent(data, "", " ")
  258. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
  259. logger.Error(err)
  260. return
  261. }
  262. }
  263. atomic.AddInt64(&cdi.insertcount, 1)
  264. cdi.mutex.Lock()
  265. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  266. cdi.lastlogtime = time.Now()
  267. cdi.lastlogicount = cdi.insertcount
  268. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  269. }
  270. cdi.mutex.Unlock()
  271. return
  272. }
  273. type ODBCRetryConfig struct {
  274. retry int
  275. interval time.Duration
  276. contains string
  277. }
  278. var renumber = regexp.MustCompile(`^\s*(-?[0-9]+)\s*$`)
  279. var reduration = regexp.MustCompile(`^\s*(-?[0-9]+)[smhd]\s*$`)
  280. var odbcretry = ""
  281. var odbcretryconfig []*ODBCRetryConfig
  282. func init() {
  283. mcfg.OnChange(func() {
  284. _odbcretry := mcfg.GetStrings("odbc.retry",
  285. "-1, 1s, timed out",
  286. "-1, 1s, proc timeout",
  287. "-1, 1m, no response received from cassandra")
  288. if strings.Join(_odbcretry, "|") != odbcretry {
  289. odbcretryconfig = RetryConfig(_odbcretry...)
  290. odbcretry = strings.Join(_odbcretry, "|")
  291. }
  292. })
  293. }
  294. func RetryConfig(retryconfig ...string) (orcs []*ODBCRetryConfig) {
  295. defaultorc := &ODBCRetryConfig{
  296. retry: 0,
  297. interval: time.Second,
  298. contains: "",
  299. }
  300. for _, retrycfg := range retryconfig {
  301. sss := strings.SplitN(retrycfg, ",", 3)
  302. if len(sss) == 3 && renumber.MatchString(sss[0]) && reduration.MatchString(sss[1]) {
  303. retry := cast.ToInt(strings.TrimSpace(sss[0]))
  304. interval := mfmt.ParseDuration(strings.TrimSpace(sss[1]))
  305. contains := strings.TrimSpace(sss[2])
  306. orc := &ODBCRetryConfig{
  307. retry: retry,
  308. interval: interval,
  309. contains: contains,
  310. }
  311. if orc.contains == "" {
  312. defaultorc = orc
  313. } else {
  314. orcs = append(orcs, orc)
  315. }
  316. } else {
  317. panic("odbc.retry config format error")
  318. }
  319. }
  320. orcs = append(orcs, defaultorc)
  321. return
  322. }
  323. func (odbci *ODBCImporter) insertDo(insertmql string, values ...any) (trycount int, responsetime time.Duration, err error) {
  324. for {
  325. st := time.Now()
  326. _, e := odbci.client.Query(insertmql, values...).Do()
  327. if e != nil {
  328. maxtrycount := 0
  329. for _, orc := range odbcretryconfig {
  330. if orc.contains != "" {
  331. if strings.Contains(e.Error(), orc.contains) {
  332. maxtrycount = orc.retry
  333. break
  334. }
  335. } else {
  336. maxtrycount = orc.retry
  337. break
  338. }
  339. }
  340. trycount++
  341. e = merrs.New(e, merrs.Map{"trycount": trycount})
  342. if maxtrycount < 0 || trycount <= maxtrycount {
  343. logger.Debug(merrs.New(e, merrs.Map{"retrycount": trycount}))
  344. time.Sleep(time.Duration(trycount) * time.Second)
  345. continue
  346. }
  347. return trycount, 0, e
  348. }
  349. responsetime = time.Since(st)
  350. return trycount, responsetime, nil
  351. }
  352. }
  353. func (odbci *ODBCImporter) done() {
  354. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  355. cdi.mutex.Lock()
  356. if cdi.lastlogicount != cdi.insertcount {
  357. cdi.lastlogtime = time.Now()
  358. cdi.lastlogicount = cdi.insertcount
  359. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  360. }
  361. cdi.mutex.Unlock()
  362. return true
  363. })
  364. }
  365. func (odbci *ODBCImporter) alldone() {
  366. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  367. cdi.mutex.Lock()
  368. if cdi.insertcount != 0 {
  369. cdi.lastlogtime = time.Now()
  370. cdi.lastlogicount = cdi.insertcount
  371. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  372. }
  373. cdi.mutex.Unlock()
  374. return true
  375. })
  376. }