odbcimporter.go 9.7 KB


  1. package importer
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.wecise.com/wecise/cgimport/graph"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/cgimport/schema"
  13. "git.wecise.com/wecise/odb-go/odb"
  14. "git.wecise.com/wecise/util/cast"
  15. "git.wecise.com/wecise/util/cmap"
  16. "git.wecise.com/wecise/util/merrs"
  17. "github.com/scylladb/go-set/strset"
  18. )
  19. type classdatainfo struct {
  20. *schema.ClassInfo
  21. insertcount int64
  22. lastlogtime time.Time
  23. lastlogicount int64
  24. mutex sync.Mutex
  25. }
  26. var classdatainfos = cmap.NewSingle[string, *classdatainfo]()
  27. type ODBCImporter struct {
  28. client odb.Client
  29. }
  30. func NewODBCImporter() *ODBCImporter {
  31. odbci := &ODBCImporter{}
  32. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  33. odbci.client = odbc.ODBC()
  34. }
  35. return odbci
  36. }
  37. // 根据数据修正类定义
  38. func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
  39. for _, classname := range schema.ClassNames {
  40. ci := schema.ClassInfos.GetIFPresent(classname)
  41. if ci == nil {
  42. return merrs.NewError("classinfo not found " + classname)
  43. }
  44. cdi, e := classdatainfos.GetWithNew(ci.Classaliasname, func() (cdi *classdatainfo, err error) {
  45. if odbci.client != nil {
  46. _, e := odbci.client.Query("select class,id from " + ci.Classfullname + " limit 1").Do()
  47. if e != nil {
  48. if !strings.Contains(e.Error(), "not find") {
  49. return nil, e
  50. }
  51. logger.Info("create class " + ci.Classfullname)
  52. _, e = odbci.client.Query(ci.Createmql).Do()
  53. if e != nil {
  54. return nil, e
  55. }
  56. }
  57. }
  58. cdi = &classdatainfo{ClassInfo: ci}
  59. return
  60. })
  61. if e != nil {
  62. return e
  63. }
  64. classdatainfos.Set(ci.Classfullname, cdi)
  65. }
  66. if odbci.client != nil {
  67. for _, createedgemql := range schema.CreateEdgeMqls {
  68. _, e := odbci.client.Query(createedgemql).Do()
  69. if e != nil && !strings.Contains(e.Error(), "already exist") {
  70. return e
  71. }
  72. }
  73. }
  74. return
  75. }
  76. // func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
  77. // ei, e := graph.ParseEdgeInfo(data)
  78. // if e != nil {
  79. // return e
  80. // }
  81. // if odbci.client != nil {
  82. // // foid := get_object_id_from_cache("level1", fromuid)
  83. // // toid := to_object_id("level1", touid)
  84. // // eabs, _ := json.Marshal(extraattr)
  85. // // quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
  86. // // _, err = odbci.client.Query(quadmql).Do()
  87. // // if err != nil {
  88. // // err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}})
  89. // // logger.Error(err)
  90. // // return
  91. // // }
  92. // updatemql := "update " + "level1" + " set " + " contain=contain+?" + " where id='" + foid + "'"
  93. // _, err = odbci.client.Query(updatemql, map[string][]string{
  94. // "_all": {toid},
  95. // toid: {string(eabs)},
  96. // }).Do()
  97. // if err != nil {
  98. // err = merrs.NewError(err, merrs.SSMaps{{"mql": updatemql}})
  99. // return
  100. // }
  101. // logger.Info("relation immport " + foid + "->" + toid)
  102. // }
  103. // return
  104. // }
  105. var cm_object_id_cache = cmap.New[string, chan string]()
  106. func object_id_cache(classaliasname, suid string) chan string {
  107. choid, _ := cm_object_id_cache.GetWithNew(classaliasname+":"+suid,
  108. func() (chan string, error) {
  109. ch := make(chan string, 2)
  110. return ch, nil
  111. })
  112. return choid
  113. }
  114. func get_object_id_from_cache(classaliasname, suid string) string {
  115. choid := object_id_cache(classaliasname, suid)
  116. oid := <-choid
  117. push_object_id_into_cache(choid, oid)
  118. return oid
  119. }
  120. func push_object_id_into_cache(choid chan string, oid string) {
  121. choid <- oid
  122. if len(choid) == 2 {
  123. // 最多保留 1 个
  124. // chan cap = 2,第三个元素进不来
  125. // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
  126. <-choid
  127. }
  128. }
  129. // 插入数据
  130. func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
  131. cdi := classdatainfos.GetIFPresent(classname)
  132. if cdi == nil {
  133. return merrs.NewError("class not defined " + classname)
  134. }
  135. if cdi.Insertmql == "" {
  136. return merrs.NewError("class no fields to insert " + classname)
  137. }
  138. innerdata := &InnerData{}
  139. innerdata.oid, innerdata.suid, err = graph.GetNodeId(cdi.Classaliasname, data)
  140. if err != nil {
  141. return
  142. }
  143. if cdi.Classaliasname == "level1" {
  144. ei := graph.GetEdgeInfo(innerdata.oid)
  145. if ei != nil {
  146. innerdata.contain = ei["contain"]
  147. innerdata.depend = ei["depend"]
  148. innerdata.topology = ei["topology"]
  149. }
  150. } else {
  151. innerdata.depend = referencedata(classname, data)
  152. }
  153. return odbci.insertData(classname, cdi, innerdata, data)
  154. }
  155. type InnerData struct {
  156. oid string
  157. suid string
  158. contain map[string][]string
  159. depend map[string][]string
  160. topology map[string][]string
  161. }
  162. func referencedata(classname string, data map[string]any) (depend map[string][]string) {
  163. refer := data["_references"]
  164. switch vv := refer.(type) {
  165. case []interface{}:
  166. for _, v := range vv {
  167. switch vv := v.(type) {
  168. case map[string]interface{}:
  169. for k, v := range vv {
  170. switch k {
  171. case "_edgeType":
  172. case "_toUniqueId":
  173. suid := cast.ToString(v)
  174. toclassname := "master"
  175. switch classname {
  176. case "level1":
  177. toclassname = "level1"
  178. case "level2":
  179. toclassname = "level1"
  180. case "level3":
  181. toclassname = "level2"
  182. case "level4":
  183. toclassname = "level3"
  184. case "level5":
  185. toclassname = "level4"
  186. case "level6":
  187. toclassname = "level5"
  188. case "level7":
  189. toclassname = "level6"
  190. case "level8":
  191. toclassname = "level7"
  192. }
  193. toid := graph.ToNodeId(toclassname, suid)
  194. m := map[string]string{"_direction": "out"}
  195. mbs, _ := json.Marshal(m)
  196. depend = map[string][]string{
  197. "_all": {toid},
  198. toid: {string(mbs)},
  199. }
  200. }
  201. }
  202. }
  203. }
  204. }
  205. return
  206. }
  207. func (odbci *ODBCImporter) insertData(classname string, cdi *classdatainfo, innerdata *InnerData, data map[string]any) (err error) {
  208. values := []any{}
  209. for _, fn := range cdi.Fieldslist {
  210. // 内部字段
  211. switch fn {
  212. case "id":
  213. values = append(values, innerdata.oid)
  214. continue
  215. case "contain":
  216. values = append(values, innerdata.contain)
  217. continue
  218. case "depend":
  219. values = append(values, innerdata.depend)
  220. continue
  221. case "topology":
  222. values = append(values, innerdata.topology)
  223. continue
  224. }
  225. fi := cdi.Fieldinfos[fn]
  226. // 合并扩展字段
  227. if strset.New(fi.Datakey...).Has("*") {
  228. td := map[string]any{}
  229. for k, v := range data {
  230. if cdi.DatakeyFieldinfos[k] == nil {
  231. td[k] = v
  232. }
  233. }
  234. tdbs, e := json.Marshal(td)
  235. if e != nil {
  236. return merrs.NewError(e)
  237. }
  238. values = append(values, string(tdbs))
  239. continue
  240. }
  241. // 字段类型修正
  242. var v any
  243. for _, dk := range fi.Datakey {
  244. v = data[dk]
  245. if v != nil {
  246. switch fi.Fieldtype {
  247. case "set<varchar>":
  248. v = cast.ToStringSlice(v)
  249. case "timestamp":
  250. tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
  251. if e != nil {
  252. return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  253. }
  254. v = tv.Format("2006-01-02 15:04:05.000000")
  255. }
  256. break
  257. }
  258. }
  259. if fn == "tags" {
  260. v = append(cast.ToStringSlice(v), classname)
  261. }
  262. values = append(values, v)
  263. }
  264. if odbci.client != nil {
  265. _, err = odbci.client.Query(cdi.Insertmql, values...).Do()
  266. if err != nil {
  267. databs, _ := json.MarshalIndent(data, "", " ")
  268. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
  269. logger.Error(err)
  270. return
  271. }
  272. push_object_id_into_cache(object_id_cache(cdi.Classaliasname, innerdata.suid), innerdata.oid)
  273. }
  274. atomic.AddInt64(&cdi.insertcount, 1)
  275. cdi.mutex.Lock()
  276. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  277. cdi.lastlogtime = time.Now()
  278. cdi.lastlogicount = cdi.insertcount
  279. logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  280. }
  281. cdi.mutex.Unlock()
  282. return
  283. }
  284. func (odbci *ODBCImporter) done() {
  285. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  286. cdi.mutex.Lock()
  287. if cdi.lastlogicount != cdi.insertcount {
  288. cdi.lastlogtime = time.Now()
  289. cdi.lastlogicount = cdi.insertcount
  290. logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  291. }
  292. cdi.mutex.Unlock()
  293. return true
  294. })
  295. }
  296. func (odbci *ODBCImporter) alldone() {
  297. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  298. cdi.mutex.Lock()
  299. if cdi.insertcount != 0 {
  300. cdi.lastlogtime = time.Now()
  301. cdi.lastlogicount = cdi.insertcount
  302. logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  303. }
  304. cdi.mutex.Unlock()
  305. return true
  306. })
  307. }
  308. func (odbci *ODBCImporter) reload() error {
  309. if odbci.client != nil {
  310. for i := len(schema.ClassNames) - 1; i >= 0; i-- {
  311. classname := schema.ClassNames[i]
  312. ci := schema.ClassInfos.GetIFPresent(classname)
  313. if ci == nil {
  314. continue
  315. }
  316. e := odbci.dropclass(ci.Classfullname)
  317. if e != nil {
  318. return e
  319. }
  320. }
  321. }
  322. return nil
  323. }
  324. func (odbci *ODBCImporter) dropclass(classnames ...string) error {
  325. for _, classname := range classnames {
  326. for retry := 2; retry >= 0; retry-- {
  327. _, e := odbci.client.Query(`delete from "` + classname + `" with version`).Do()
  328. _ = e
  329. _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
  330. if e != nil {
  331. matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
  332. if len(matchstr) >= 2 {
  333. e = odbci.dropclass(matchstr[1])
  334. if e != nil {
  335. return e
  336. }
  337. } else {
  338. matchstr := regexp.MustCompile(`has children \[([^\]]+)\]`).FindStringSubmatch(e.Error())
  339. if len(matchstr) >= 2 {
  340. e = odbci.dropclass(strings.Split(matchstr[1], ",")...)
  341. if e != nil {
  342. return e
  343. }
  344. }
  345. }
  346. if retry > 0 {
  347. continue
  348. }
  349. return e
  350. }
  351. }
  352. logger.Info("drop class " + classname)
  353. }
  354. return nil
  355. }