odbcimporter.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. package importer
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "regexp"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/cgimport/schema"
  13. "git.wecise.com/wecise/odb-go/odb"
  14. "git.wecise.com/wecise/util/cast"
  15. "git.wecise.com/wecise/util/cmap"
  16. "git.wecise.com/wecise/util/merrs"
  17. "github.com/scylladb/go-set/strset"
  18. )
  19. type classdatainfo struct {
  20. *schema.ClassInfo
  21. insertcount int64
  22. lastlogtime time.Time
  23. lastlogicount int64
  24. mutex sync.Mutex
  25. }
  26. var classdatainfos = cmap.NewSingle[string, *classdatainfo]()
  27. type ODBCImporter struct {
  28. client odb.Client
  29. }
  30. func NewODBCImporter() *ODBCImporter {
  31. odbci := &ODBCImporter{}
  32. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  33. odbci.client = odbc.ODBC()
  34. }
  35. return odbci
  36. }
  37. // 根据数据修正类定义
  38. func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
  39. for _, classname := range schema.ClassNames {
  40. ci := schema.ClassInfos.GetIFPresent(classname)
  41. if ci == nil {
  42. return merrs.NewError("classinfo not found " + classname)
  43. }
  44. cdi, e := classdatainfos.GetWithNew(ci.Classaliasname, func() (cdi *classdatainfo, err error) {
  45. if odbci.client != nil {
  46. _, e := odbci.client.Query("select class,id from " + ci.Classfullname + " limit 1").Do()
  47. if e != nil {
  48. if !strings.Contains(e.Error(), "not find") {
  49. return nil, e
  50. }
  51. logger.Info("create class " + ci.Classfullname)
  52. _, e = odbci.client.Query(ci.Createmql).Do()
  53. if e != nil {
  54. return nil, e
  55. }
  56. }
  57. }
  58. cdi = &classdatainfo{ClassInfo: ci}
  59. return
  60. })
  61. if e != nil {
  62. return e
  63. }
  64. classdatainfos.Set(ci.Classfullname, cdi)
  65. }
  66. if odbci.client != nil {
  67. for _, createedgemql := range schema.CreateEdgeMqls {
  68. _, e := odbci.client.Query(createedgemql).Do()
  69. if e != nil && !strings.Contains(e.Error(), "already exist") {
  70. return e
  71. }
  72. }
  73. }
  74. return
  75. }
  76. func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
  77. extraattr := map[string]string{}
  78. fromuid := ""
  79. touid := ""
  80. edgetype := ""
  81. for k, v := range data {
  82. switch k {
  83. case "FROMUNIQUEID":
  84. fromuid = cast.ToString(v)
  85. case "TOUNIQUEID":
  86. touid = cast.ToString(v)
  87. case "EDGETYPE":
  88. edgetype = cast.ToString(v)
  89. default:
  90. extraattr[k] = cast.ToString(v)
  91. }
  92. }
  93. if fromuid == "" {
  94. databs, _ := json.MarshalIndent(data, "", " ")
  95. return merrs.NewError("not found valid fromuniqueid in data ", merrs.SSMap{"data": string(databs)})
  96. }
  97. if touid == "" {
  98. databs, _ := json.MarshalIndent(data, "", " ")
  99. return merrs.NewError("not found valid touniqueid in data ", merrs.SSMap{"data": string(databs)})
  100. }
  101. if edgetype == "" {
  102. databs, _ := json.MarshalIndent(data, "", " ")
  103. return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
  104. }
  105. return odbci.insertEdge(edgetype, fromuid, touid, extraattr, data)
  106. }
  107. func (odbci *ODBCImporter) insertEdge(edgetype, fromuid, touid string, extraattr map[string]string, data map[string]any) (err error) {
  108. edgetype = schema.Relations[edgetype]
  109. if edgetype == "" {
  110. databs, _ := json.MarshalIndent(data, "", " ")
  111. return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
  112. }
  113. if odbci.client != nil {
  114. foid := get_object_id_from_cache("level1", fromuid)
  115. toid := to_object_id("level1", touid)
  116. eabs, _ := json.Marshal(extraattr)
  117. // quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
  118. // _, err = odbci.client.Query(quadmql).Do()
  119. // if err != nil {
  120. // err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}})
  121. // logger.Error(err)
  122. // return
  123. // }
  124. updatemql := "update " + "level1" + " set " + " contain=?" + " where id='" + foid + "'"
  125. _, err = odbci.client.Query(updatemql, map[string][]string{
  126. "_all": {toid},
  127. toid: {string(eabs)},
  128. }).Do()
  129. if err != nil {
  130. err = merrs.NewError(err, merrs.SSMaps{{"mql": updatemql}})
  131. return
  132. }
  133. logger.Info("relation immport " + foid + "->" + toid)
  134. }
  135. return
  136. }
  137. var cm_object_id_cache = cmap.New[string, chan string]()
  138. func object_id_cache(classaliasname, suid string) chan string {
  139. choid, _ := cm_object_id_cache.GetWithNew(classaliasname+":"+suid,
  140. func() (chan string, error) {
  141. ch := make(chan string, 2)
  142. return ch, nil
  143. })
  144. return choid
  145. }
  146. func get_object_id_from_cache(classaliasname, suid string) string {
  147. choid := object_id_cache(classaliasname, suid)
  148. oid := <-choid
  149. push_object_id_into_cache(choid, oid)
  150. return oid
  151. }
  152. func push_object_id_into_cache(choid chan string, oid string) {
  153. choid <- oid
  154. if len(choid) == 2 {
  155. // 最多保留 1 个
  156. // chan cap = 2,第三个元素进不来
  157. // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
  158. <-choid
  159. }
  160. }
  161. func object_id(classaliasname string, data map[string]any) (oid, suid string, err error) {
  162. uid := data["uniqueId"]
  163. if uid == nil {
  164. uid = data["UNIQUEID"]
  165. if uid == nil {
  166. databs, _ := json.MarshalIndent(data, "", " ")
  167. return "", "", merrs.NewError("not found uniqueid in data ", merrs.SSMap{"data": string(databs)})
  168. }
  169. }
  170. suid = cast.ToString(uid)
  171. if suid == "" {
  172. databs, _ := json.MarshalIndent(data, "", " ")
  173. return "", "", merrs.NewError("not found valid uniqueid in data ", merrs.SSMap{"data": string(databs)})
  174. }
  175. return to_object_id(classaliasname, suid), suid, nil
  176. }
  177. func to_object_id(classaliasname string, suid string) string {
  178. suid64 := base64.RawURLEncoding.EncodeToString([]byte(suid))
  179. return classaliasname + ":" + suid64
  180. }
  181. // 插入数据
  182. func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
  183. cdi := classdatainfos.GetIFPresent(classname)
  184. if cdi == nil {
  185. return merrs.NewError("class not defined " + classname)
  186. }
  187. if cdi.Insertmql == "" {
  188. return merrs.NewError("class no fields to insert " + classname)
  189. }
  190. innerdata := &InnerData{}
  191. innerdata.oid, innerdata.suid, err = object_id(cdi.Classaliasname, data)
  192. if err != nil {
  193. return
  194. }
  195. innerdata.depend = referencedata(classname, data)
  196. return odbci.insertData(classname, cdi, innerdata, data)
  197. }
  198. type InnerData struct {
  199. oid string
  200. suid string
  201. contain map[string][]string
  202. depend map[string][]string
  203. topology map[string][]string
  204. }
  205. func referencedata(classname string, data map[string]any) (depend map[string][]string) {
  206. refer := data["_references"]
  207. switch vv := refer.(type) {
  208. case []interface{}:
  209. for _, v := range vv {
  210. switch vv := v.(type) {
  211. case map[string]interface{}:
  212. for k, v := range vv {
  213. switch k {
  214. case "_edgeType":
  215. case "_toUniqueId":
  216. suid := cast.ToString(v)
  217. toclassname := "master"
  218. switch classname {
  219. case "level1":
  220. toclassname = "level1"
  221. case "level2":
  222. toclassname = "level1"
  223. case "level3":
  224. toclassname = "level2"
  225. case "level4":
  226. toclassname = "level3"
  227. case "level5":
  228. toclassname = "level4"
  229. case "level6":
  230. toclassname = "level5"
  231. case "level7":
  232. toclassname = "level6"
  233. case "level8":
  234. toclassname = "level7"
  235. }
  236. toid := to_object_id(toclassname, suid)
  237. m := map[string]string{"_direction": "out"}
  238. mbs, _ := json.Marshal(m)
  239. depend = map[string][]string{
  240. "_all": {toid},
  241. toid: {string(mbs)},
  242. }
  243. }
  244. }
  245. }
  246. }
  247. }
  248. return
  249. }
  250. func (odbci *ODBCImporter) insertData(classname string, cdi *classdatainfo, innerdata *InnerData, data map[string]any) (err error) {
  251. values := []any{}
  252. for _, fn := range cdi.Fieldslist {
  253. // 内部字段
  254. switch fn {
  255. case "id":
  256. values = append(values, innerdata.oid)
  257. continue
  258. case "contain":
  259. values = append(values, innerdata.contain)
  260. continue
  261. case "depend":
  262. values = append(values, innerdata.depend)
  263. continue
  264. case "topology":
  265. values = append(values, innerdata.topology)
  266. continue
  267. }
  268. fi := cdi.Fieldinfos[fn]
  269. // 合并扩展字段
  270. if strset.New(fi.Datakey...).Has("*") {
  271. td := map[string]any{}
  272. for k, v := range data {
  273. if cdi.DatakeyFieldinfos[k] == nil {
  274. td[k] = v
  275. }
  276. }
  277. tdbs, e := json.Marshal(td)
  278. if e != nil {
  279. return merrs.NewError(e)
  280. }
  281. values = append(values, string(tdbs))
  282. continue
  283. }
  284. // 字段类型修正
  285. var v any
  286. for _, dk := range fi.Datakey {
  287. v = data[dk]
  288. if v != nil {
  289. switch fi.Fieldtype {
  290. case "set<varchar>":
  291. v = cast.ToStringSlice(v)
  292. case "timestamp":
  293. tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
  294. if e != nil {
  295. return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  296. }
  297. v = tv.Format("2006-01-02 15:04:05.000000")
  298. }
  299. break
  300. }
  301. }
  302. if fn == "tags" {
  303. v = append(cast.ToStringSlice(v), classname)
  304. }
  305. values = append(values, v)
  306. }
  307. if odbci.client != nil {
  308. _, err = odbci.client.Query(cdi.Insertmql, values...).Do()
  309. if err != nil {
  310. databs, _ := json.MarshalIndent(data, "", " ")
  311. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}, {"data": string(databs)}})
  312. logger.Error(err)
  313. return
  314. }
  315. push_object_id_into_cache(object_id_cache(cdi.Classaliasname, innerdata.suid), innerdata.oid)
  316. }
  317. atomic.AddInt64(&cdi.insertcount, 1)
  318. cdi.mutex.Lock()
  319. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  320. cdi.lastlogtime = time.Now()
  321. cdi.lastlogicount = cdi.insertcount
  322. logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  323. }
  324. cdi.mutex.Unlock()
  325. return
  326. }
  327. func (odbci *ODBCImporter) done() {
  328. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  329. cdi.mutex.Lock()
  330. if cdi.lastlogicount != cdi.insertcount {
  331. cdi.lastlogtime = time.Now()
  332. cdi.lastlogicount = cdi.insertcount
  333. logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  334. }
  335. cdi.mutex.Unlock()
  336. return true
  337. })
  338. }
  339. func (odbci *ODBCImporter) alldone() {
  340. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  341. cdi.mutex.Lock()
  342. if cdi.insertcount != 0 {
  343. cdi.lastlogtime = time.Now()
  344. cdi.lastlogicount = cdi.insertcount
  345. logger.Info("class", cdi.Classfullname, "import", cdi.insertcount, "records")
  346. }
  347. cdi.mutex.Unlock()
  348. return true
  349. })
  350. }
  351. func (odbci *ODBCImporter) reload() error {
  352. if odbci.client != nil {
  353. for i := len(schema.ClassNames) - 1; i >= 0; i-- {
  354. classname := schema.ClassNames[i]
  355. ci := schema.ClassInfos.GetIFPresent(classname)
  356. if ci == nil {
  357. continue
  358. }
  359. e := odbci.dropclass(ci.Classfullname)
  360. if e != nil {
  361. return e
  362. }
  363. }
  364. }
  365. return nil
  366. }
  367. func (odbci *ODBCImporter) dropclass(classnames ...string) error {
  368. for _, classname := range classnames {
  369. for retry := 2; retry >= 0; retry-- {
  370. _, e := odbci.client.Query(`delete from "` + classname + `" with version`).Do()
  371. _ = e
  372. _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
  373. if e != nil {
  374. matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
  375. if len(matchstr) >= 2 {
  376. e = odbci.dropclass(matchstr[1])
  377. if e != nil {
  378. return e
  379. }
  380. } else {
  381. matchstr := regexp.MustCompile(`has children \[([^\]]+)\]`).FindStringSubmatch(e.Error())
  382. if len(matchstr) >= 2 {
  383. e = odbci.dropclass(strings.Split(matchstr[1], ",")...)
  384. if e != nil {
  385. return e
  386. }
  387. }
  388. }
  389. if retry > 0 {
  390. continue
  391. }
  392. return e
  393. }
  394. }
  395. logger.Info("drop class " + classname)
  396. }
  397. return nil
  398. }