odbcimporter.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.wecise.com/wecise/cgimport/graph"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/odb-go/odb"
  13. "github.com/dgryski/go-farm"
  14. "github.com/scylladb/go-set/strset"
  15. "github.com/wecisecode/util/cast"
  16. "github.com/wecisecode/util/cmap"
  17. "github.com/wecisecode/util/merrs"
  18. "github.com/wecisecode/util/mfmt"
  19. )
  20. type ODBCImporter struct {
  21. client odb.Client
  22. }
  23. func NewODBCImporter() *ODBCImporter {
  24. odbci := &ODBCImporter{}
  25. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  26. odbci.client = odbc.ODBC()
  27. }
  28. return odbci
  29. }
  30. var masterlevel1mutex = make([]sync.Mutex, 256)
  31. var masterdatas = cmap.New[string, map[string]any]()
  32. var level1datas = cmap.New[string, map[string]any]()
  33. func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) (retrycount int, err error) {
  34. hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
  35. masterlevel1mutex[hidx].Lock()
  36. defer masterlevel1mutex[hidx].Unlock()
  37. switch classaliasname {
  38. case "master":
  39. level1data := level1datas.GetIFPresent(suid)
  40. if level1data == nil {
  41. // 先插入 master
  42. masterdatas.Set(suid, data)
  43. // 用 master 数据生成不完整的 level1 数据
  44. level1data = map[string]any{}
  45. for k, v := range data {
  46. if k == "id" {
  47. // master oid -> 重新生成 level1 oid
  48. oid, _, e := graph.GetNodeId("level1", data)
  49. if e != nil {
  50. return retrycount, e
  51. }
  52. v = oid
  53. }
  54. level1data[k] = v
  55. }
  56. } else {
  57. // 后插入 master
  58. level1datas.Remove(suid)
  59. // 用 level1 补齐 master 数据
  60. // data 数据不能变,需要后续插入 master
  61. entiredata := map[string]any{}
  62. for k, v := range data {
  63. entiredata[k] = v
  64. }
  65. for k, v := range level1data {
  66. entiredata[k] = v
  67. }
  68. level1data = entiredata
  69. }
  70. // 重新插入完整的 level1
  71. retrycount, _, e := odbci.insertData("level1", "", "", level1data)
  72. if e != nil {
  73. return retrycount, e
  74. }
  75. case "level1":
  76. masterdata := masterdatas.GetIFPresent(suid)
  77. if masterdata == nil {
  78. // 先插入 level 1
  79. level1datas.Set(suid, data)
  80. } else {
  81. // 后插入 level1
  82. masterdatas.Remove(suid)
  83. // 用 level1 补齐 master 数据
  84. entiredata := map[string]any{}
  85. for k, v := range masterdata {
  86. entiredata[k] = v
  87. }
  88. for k, v := range data {
  89. entiredata[k] = v
  90. }
  91. // 完整 level1 数据
  92. data = entiredata
  93. }
  94. // 插入 level1 数据
  95. retrycount, _, e := odbci.insertData("level1", "", "", data)
  96. if e != nil {
  97. return retrycount, e
  98. }
  99. }
  100. return retrycount, nil
  101. }
  102. // 插入数据
  103. func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (retrycount int, err error) {
  104. oid, suid, e := graph.GetNodeId(classaliasname, data)
  105. if e != nil {
  106. return 0, e
  107. }
  108. data["id"] = oid
  109. if classaliasname == "master" {
  110. rc, e := odbci.masterlevel1data(classaliasname, suid, data)
  111. retrycount += rc
  112. if e != nil {
  113. return retrycount, e
  114. }
  115. } else if classaliasname == "level1" {
  116. ei := graph.GetEdgeInfo(oid)
  117. if ei != nil {
  118. data["contain"] = ei["contain"]
  119. data["depend"] = ei["depend"]
  120. data["topology"] = ei["topology"]
  121. }
  122. rc, e := odbci.masterlevel1data(classaliasname, suid, data)
  123. retrycount += rc
  124. if e != nil {
  125. return retrycount, e
  126. }
  127. // 数据已经在 masterlevel1data 中插入完成
  128. return
  129. } else {
  130. data["depend"] = referencedata(classaliasname, data)
  131. }
  132. rc, _, e := odbci.insertData(classaliasname, oid, suid, data)
  133. retrycount += rc
  134. if e != nil {
  135. return retrycount, e
  136. }
  137. return retrycount, nil
  138. }
  139. type InnerData struct {
  140. oid string
  141. suid string
  142. contain map[string][]string
  143. depend map[string][]string
  144. topology map[string][]string
  145. }
  146. func referencedata(classaliasname string, data map[string]any) (depend map[string][]string) {
  147. refer := data["_references"]
  148. switch vv := refer.(type) {
  149. case []interface{}:
  150. for _, v := range vv {
  151. switch vv := v.(type) {
  152. case map[string]interface{}:
  153. for k, v := range vv {
  154. switch k {
  155. case "_edgeType":
  156. case "_toUniqueId":
  157. suid := cast.ToString(v)
  158. toclassname := "master"
  159. switch classaliasname {
  160. case "level1":
  161. toclassname = "level1"
  162. case "level2":
  163. toclassname = "level1"
  164. case "level3":
  165. toclassname = "level2"
  166. case "level4":
  167. toclassname = "level3"
  168. case "level5":
  169. toclassname = "level4"
  170. case "level6":
  171. toclassname = "level5"
  172. case "level7":
  173. toclassname = "level6"
  174. case "level8":
  175. toclassname = "level7"
  176. }
  177. toid := graph.ToNodeId(toclassname, suid)
  178. m := map[string]string{"_direction": "out"}
  179. mbs, _ := json.Marshal(m)
  180. depend = map[string][]string{
  181. "_all": {toid},
  182. toid: {string(mbs)},
  183. }
  184. }
  185. }
  186. }
  187. }
  188. }
  189. return
  190. }
  191. func (odbci *ODBCImporter) insertData(classaliasname string, oid, suid string, data map[string]any) (retrycount int, responsetime time.Duration, err error) {
  192. cdi := classdatainfos.GetIFPresent(classaliasname)
  193. if cdi == nil {
  194. return retrycount, 0, merrs.NewError("class not defined " + classaliasname)
  195. }
  196. if cdi.Insertmql == "" {
  197. return retrycount, 0, merrs.NewError("class no fields to insert " + classaliasname)
  198. }
  199. values := []any{}
  200. for _, fn := range cdi.Fieldslist {
  201. fi := cdi.Fieldinfos[fn]
  202. if fi == nil {
  203. values = append(values, data[fn])
  204. continue
  205. }
  206. // 合并扩展字段
  207. if strset.New(fi.Datakey...).Has("*") {
  208. if fi.Fieldtype != "map<varchar,varchar>" {
  209. return retrycount, 0, merrs.NewError("fi.Fieldtype=" + fi.Fieldtype + " != map<varchar,varchar>")
  210. }
  211. td := map[string]any{}
  212. for k, v := range data {
  213. if cdi.DatakeyFieldinfos[k] == nil {
  214. td[k] = cast.ToString(v)
  215. }
  216. }
  217. values = append(values, td)
  218. continue
  219. }
  220. // 字段类型修正
  221. var v any
  222. for _, dk := range fi.Datakey {
  223. v = data[dk]
  224. if v != nil {
  225. switch fi.Fieldtype {
  226. case "set<varchar>":
  227. v = cast.ToStringSlice(v)
  228. case "timestamp":
  229. tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
  230. if e != nil {
  231. return retrycount, 0, merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  232. }
  233. v = tv.Format("2006-01-02 15:04:05.000000")
  234. }
  235. break
  236. }
  237. }
  238. if fn == "tags" {
  239. v = append(cast.ToStringSlice(v), classaliasname)
  240. }
  241. values = append(values, v)
  242. }
  243. if odbci.client != nil {
  244. if odbc.LogDebug && oid != "" {
  245. mql := "select id,uniqueid from " + classaliasname + " where id=?"
  246. r, e := odbci.client.Query(mql, oid).Do()
  247. if e != nil {
  248. return retrycount, 0, e
  249. }
  250. if r != nil && len(r.Data) != 0 {
  251. logger.Debug(classaliasname, "exists id:", oid, ", uniqueid:", r.Data[0]["uniqueid"], ", new uniqueid:", suid)
  252. }
  253. }
  254. // logger.Info(values...)
  255. retrycount, responsetime, err = odbci.insertDo(cdi.Insertmql, values...)
  256. if err != nil {
  257. databs, _ := json.MarshalIndent(data, "", " ")
  258. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
  259. logger.Error(err)
  260. return
  261. }
  262. }
  263. atomic.AddInt64(&cdi.insertcount, 1)
  264. cdi.mutex.Lock()
  265. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  266. cdi.lastlogtime = time.Now()
  267. cdi.lastlogicount = cdi.insertcount
  268. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  269. }
  270. cdi.mutex.Unlock()
  271. return
  272. }
  273. type ODBCRetryConfig struct {
  274. retry int
  275. interval time.Duration
  276. contains string
  277. }
  278. var renumber = regexp.MustCompile(`^\s*(-?[0-9]+)\s*$`)
  279. var reduration = regexp.MustCompile(`^\s*(-?[0-9]+)[smhd]\s*$`)
  280. var odbcretry = ""
  281. var odbcretryconfig []*ODBCRetryConfig
  282. func init() {
  283. mcfg.OnChange(func() {
  284. _odbcretry := mcfg.GetStrings("odbc.retry", "-1, 1s, timed out", "-1, 1s, proc timeout")
  285. if strings.Join(_odbcretry, "|") != odbcretry {
  286. odbcretryconfig = RetryConfig(_odbcretry...)
  287. odbcretry = strings.Join(_odbcretry, "|")
  288. }
  289. })
  290. }
  291. func RetryConfig(retryconfig ...string) (orcs []*ODBCRetryConfig) {
  292. defaultorc := &ODBCRetryConfig{
  293. retry: 0,
  294. interval: time.Second,
  295. contains: "",
  296. }
  297. for _, retrycfg := range retryconfig {
  298. sss := strings.SplitN(retrycfg, ",", 3)
  299. if len(sss) == 3 && renumber.MatchString(sss[0]) && reduration.MatchString(sss[1]) {
  300. retry := cast.ToInt(strings.TrimSpace(sss[0]))
  301. interval := mfmt.ParseDuration(strings.TrimSpace(sss[1]))
  302. contains := strings.TrimSpace(sss[2])
  303. orc := &ODBCRetryConfig{
  304. retry: retry,
  305. interval: interval,
  306. contains: contains,
  307. }
  308. if orc.contains == "" {
  309. defaultorc = orc
  310. } else {
  311. orcs = append(orcs, orc)
  312. }
  313. } else {
  314. panic("odbc.retry config format error")
  315. }
  316. }
  317. orcs = append(orcs, defaultorc)
  318. return
  319. }
  320. func (odbci *ODBCImporter) insertDo(insertmql string, values ...any) (trycount int, responsetime time.Duration, err error) {
  321. for {
  322. st := time.Now()
  323. _, e := odbci.client.Query(insertmql, values...).Do()
  324. if e != nil {
  325. maxtrycount := 0
  326. for _, orc := range odbcretryconfig {
  327. if orc.contains != "" {
  328. if strings.Contains(e.Error(), orc.contains) {
  329. maxtrycount = orc.retry
  330. break
  331. }
  332. } else {
  333. maxtrycount = orc.retry
  334. break
  335. }
  336. }
  337. trycount++
  338. e = merrs.New(e, merrs.Map{"trycount": trycount})
  339. if maxtrycount < 0 || trycount <= maxtrycount {
  340. logger.Debug(merrs.New(e, merrs.Map{"retrycount": trycount}))
  341. time.Sleep(time.Duration(trycount) * time.Second)
  342. continue
  343. }
  344. return trycount, 0, e
  345. }
  346. responsetime = time.Since(st)
  347. return trycount, responsetime, nil
  348. }
  349. }
  350. func (odbci *ODBCImporter) done() {
  351. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  352. cdi.mutex.Lock()
  353. if cdi.lastlogicount != cdi.insertcount {
  354. cdi.lastlogtime = time.Now()
  355. cdi.lastlogicount = cdi.insertcount
  356. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  357. }
  358. cdi.mutex.Unlock()
  359. return true
  360. })
  361. }
  362. func (odbci *ODBCImporter) alldone() {
  363. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  364. cdi.mutex.Lock()
  365. if cdi.insertcount != 0 {
  366. cdi.lastlogtime = time.Now()
  367. cdi.lastlogicount = cdi.insertcount
  368. logger.Debug("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  369. }
  370. cdi.mutex.Unlock()
  371. return true
  372. })
  373. }