odbcimporter.go 9.8 KB


  1. package importer
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "regexp"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/odb-go/odb"
  13. "git.wecise.com/wecise/util/cast"
  14. "git.wecise.com/wecise/util/cmap"
  15. "git.wecise.com/wecise/util/merrs"
  16. )
  17. type classdatainfo struct {
  18. *classinfo
  19. insertcount int64
  20. lastlogtime time.Time
  21. lastlogicount int64
  22. mutex sync.Mutex
  23. }
  24. var classdatainfos = cmap.New[string, *classdatainfo]()
  25. type ODBCImporter struct {
  26. client odb.Client
  27. }
  28. func NewODBCImporter() *ODBCImporter {
  29. odbci := &ODBCImporter{}
  30. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  31. odbci.client = odbc.ODBC()
  32. }
  33. return odbci
  34. }
  35. // 根据数据修正类定义
  36. func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
  37. for _, classname := range classnames {
  38. ci := classinfos.GetIFPresent(classname)
  39. if ci == nil {
  40. return merrs.NewError("classinfo not found " + classname)
  41. }
  42. _, err = classdatainfos.GetWithNew(classname, func() (cdi *classdatainfo, err error) {
  43. if odbci.client != nil {
  44. _, err = odbci.client.Query("select class,id from " + ci.classname + " limit 1").Do()
  45. if err != nil {
  46. if !strings.Contains(err.Error(), "not find") {
  47. return nil, err
  48. }
  49. logger.Info("create class " + ci.classname)
  50. _, err = odbci.client.Query(ci.createmql).Do()
  51. if err != nil {
  52. return
  53. }
  54. }
  55. }
  56. cdi = &classdatainfo{classinfo: ci}
  57. return
  58. })
  59. }
  60. if odbci.client != nil {
  61. for _, createedgemql := range createedgemqls {
  62. _, e := odbci.client.Query(createedgemql).Do()
  63. if e != nil && !strings.Contains(e.Error(), "already exist") {
  64. err = e
  65. return
  66. }
  67. }
  68. }
  69. return
  70. }
  71. func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
  72. extraattr := map[string]string{}
  73. fromuid := ""
  74. touid := ""
  75. edgetype := ""
  76. for k, v := range data {
  77. switch k {
  78. case "FROMUNIQUEID":
  79. fromuid = cast.ToString(v)
  80. case "TOUNIQUEID":
  81. touid = cast.ToString(v)
  82. case "EDGETYPE":
  83. edgetype = cast.ToString(v)
  84. default:
  85. extraattr[k] = cast.ToString(v)
  86. }
  87. }
  88. if fromuid == "" {
  89. databs, _ := json.MarshalIndent(data, "", " ")
  90. return merrs.NewError("not found valid fromuniqueid in data ", merrs.SSMap{"data": string(databs)})
  91. }
  92. if touid == "" {
  93. databs, _ := json.MarshalIndent(data, "", " ")
  94. return merrs.NewError("not found valid touniqueid in data ", merrs.SSMap{"data": string(databs)})
  95. }
  96. if edgetype == "" {
  97. databs, _ := json.MarshalIndent(data, "", " ")
  98. return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
  99. }
  100. return odbci.insertEdge(edgetype, fromuid, touid, extraattr, data)
  101. }
  102. func (odbci *ODBCImporter) insertEdge(edgetype, fromuid, touid string, extraattr map[string]string, data map[string]any) (err error) {
  103. edgetype = relations[edgetype]
  104. if edgetype == "" {
  105. databs, _ := json.MarshalIndent(data, "", " ")
  106. return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
  107. }
  108. if odbci.client != nil {
  109. foid := get_object_id_from_cache("master:" + fromuid)
  110. toid := to_object_id("level1", touid)
  111. eabs, _ := json.Marshal(extraattr)
  112. // quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
  113. // _, err = odbci.client.Query(quadmql).Do()
  114. // if err != nil {
  115. // err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}})
  116. // logger.Error(err)
  117. // return
  118. // }
  119. updatemql := "update " + "/m3cnet/master" + " set " + " contain=?" + " where id='" + foid + "'"
  120. _, err = odbci.client.Query(updatemql, map[string][]string{
  121. "_all": {toid},
  122. toid: {string(eabs)},
  123. }).Do()
  124. if err != nil {
  125. err = merrs.NewError(err, merrs.SSMaps{{"mql": updatemql}})
  126. return
  127. }
  128. logger.Info("relation immport " + foid + "->" + toid)
  129. }
  130. return
  131. }
  132. var cm_object_id_cache = cmap.New[string, chan string]()
  133. func object_id_cache(suid string) chan string {
  134. choid, _ := cm_object_id_cache.GetWithNew(suid,
  135. func() (chan string, error) {
  136. ch := make(chan string, 2)
  137. return ch, nil
  138. })
  139. return choid
  140. }
  141. func get_object_id_from_cache(suid string) string {
  142. choid := object_id_cache(suid)
  143. oid := <-choid
  144. push_object_id_into_cache(choid, oid)
  145. return oid
  146. }
  147. func push_object_id_into_cache(choid chan string, oid string) {
  148. choid <- oid
  149. if len(choid) == 2 {
  150. // 最多保留 1 个
  151. // chan cap = 2,第三个元素进不来
  152. // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
  153. <-choid
  154. }
  155. }
  156. func object_id(classaliasname string, data map[string]any) (oid, suid string, err error) {
  157. uid := data["uniqueId"]
  158. if uid == nil {
  159. uid = data["UNIQUEID"]
  160. if uid == nil {
  161. databs, _ := json.MarshalIndent(data, "", " ")
  162. return "", "", merrs.NewError("not found uniqueid in data ", merrs.SSMap{"data": string(databs)})
  163. }
  164. }
  165. suid = cast.ToString(uid)
  166. if suid == "" {
  167. databs, _ := json.MarshalIndent(data, "", " ")
  168. return "", "", merrs.NewError("not found valid uniqueid in data ", merrs.SSMap{"data": string(databs)})
  169. }
  170. return to_object_id(classaliasname, suid), suid, nil
  171. }
  172. func to_object_id(classaliasname string, suid string) string {
  173. suid64 := base64.RawURLEncoding.EncodeToString([]byte(suid))
  174. return classaliasname + ":" + suid64
  175. }
  176. // 插入数据
  177. func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
  178. cdi := classdatainfos.GetIFPresent(classname)
  179. if cdi == nil {
  180. return merrs.NewError("class not defined " + classname)
  181. }
  182. if cdi.insertmql == "" {
  183. return merrs.NewError("class no fields to insert " + classname)
  184. }
  185. oid, suid, e := object_id(cdi.aliasname, data)
  186. if e != nil {
  187. return e
  188. }
  189. var contain, depend, topology map[string][]string
  190. refer := data["_references"]
  191. switch vv := refer.(type) {
  192. case []interface{}:
  193. for _, v := range vv {
  194. switch vv := v.(type) {
  195. case map[string]interface{}:
  196. for k, v := range vv {
  197. switch k {
  198. case "_edgeType":
  199. case "_toUniqueId":
  200. suid := cast.ToString(v)
  201. toclassname := "master"
  202. switch classname {
  203. case "level2":
  204. toclassname = "level1"
  205. case "level3":
  206. toclassname = "level2"
  207. case "level4":
  208. toclassname = "level3"
  209. case "level5":
  210. toclassname = "level4"
  211. case "level6":
  212. toclassname = "level5"
  213. case "level7":
  214. toclassname = "level6"
  215. case "level8":
  216. toclassname = "level7"
  217. }
  218. toid := to_object_id(toclassname, suid)
  219. m := map[string]string{"_direction": "out"}
  220. mbs, _ := json.Marshal(m)
  221. depend = map[string][]string{
  222. "_all": {toid},
  223. toid: {string(mbs)},
  224. }
  225. }
  226. }
  227. }
  228. }
  229. }
  230. values := []any{}
  231. for _, fn := range cdi.fieldslist {
  232. if fn == "id" {
  233. values = append(values, oid)
  234. continue
  235. }
  236. if fn == "contain" {
  237. values = append(values, contain)
  238. continue
  239. }
  240. if fn == "depend" {
  241. values = append(values, depend)
  242. continue
  243. }
  244. if fn == "topology" {
  245. values = append(values, topology)
  246. continue
  247. }
  248. fi := cdi.fieldinfos[fn]
  249. if fi.datakey == "" {
  250. td := map[string]any{}
  251. for k, v := range data {
  252. if cdi.datakey_fieldinfos[k] == nil {
  253. td[k] = v
  254. }
  255. }
  256. tdbs, e := json.Marshal(td)
  257. if e != nil {
  258. return merrs.NewError(e)
  259. }
  260. values = append(values, string(tdbs))
  261. continue
  262. }
  263. v := data[fi.datakey]
  264. switch fi.fieldtype {
  265. case "set<varchar>":
  266. v = cast.ToStringSlice(v)
  267. case "timestamp":
  268. tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
  269. if e != nil {
  270. return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  271. }
  272. v = tv.Format("2006-01-02 15:04:05.000000")
  273. }
  274. if fn == "tags" {
  275. v = append(cast.ToStringSlice(v), classname)
  276. }
  277. values = append(values, v)
  278. }
  279. if odbci.client != nil {
  280. _, err = odbci.client.Query(cdi.insertmql, values...).Do()
  281. if err != nil {
  282. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.insertmql}, {"values": fmt.Sprint(values)}})
  283. logger.Error(err)
  284. return
  285. }
  286. push_object_id_into_cache(object_id_cache(classname+":"+suid), oid)
  287. }
  288. atomic.AddInt64(&cdi.insertcount, 1)
  289. cdi.mutex.Lock()
  290. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  291. cdi.lastlogtime = time.Now()
  292. cdi.lastlogicount = cdi.insertcount
  293. logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
  294. }
  295. cdi.mutex.Unlock()
  296. return
  297. }
  298. func (odbci *ODBCImporter) done() {
  299. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  300. cdi.mutex.Lock()
  301. if cdi.lastlogicount != cdi.insertcount {
  302. cdi.lastlogtime = time.Now()
  303. cdi.lastlogicount = cdi.insertcount
  304. logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
  305. }
  306. cdi.mutex.Unlock()
  307. return true
  308. })
  309. }
  310. func (odbci *ODBCImporter) alldone() {
  311. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  312. cdi.mutex.Lock()
  313. if cdi.insertcount != 0 {
  314. cdi.lastlogtime = time.Now()
  315. cdi.lastlogicount = cdi.insertcount
  316. logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
  317. }
  318. cdi.mutex.Unlock()
  319. return true
  320. })
  321. }
  322. func (odbci *ODBCImporter) reload() error {
  323. if odbci.client != nil {
  324. for i := len(classnames) - 1; i >= 0; i-- {
  325. classname := classnames[i]
  326. ci := classinfos.GetIFPresent(classname)
  327. if ci == nil {
  328. continue
  329. }
  330. e := odbci.dropclass(ci.classfullname)
  331. if e != nil {
  332. return e
  333. }
  334. }
  335. }
  336. return nil
  337. }
  338. func (odbci *ODBCImporter) dropclass(classname string) error {
  339. for retry := 2; retry >= 0; retry-- {
  340. _, e := odbci.client.Query(`delete from "` + classname + `" with version`).Do()
  341. _ = e
  342. _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
  343. if e != nil {
  344. matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
  345. if len(matchstr) >= 2 {
  346. e = odbci.dropclass(matchstr[1])
  347. if e != nil {
  348. return e
  349. }
  350. }
  351. if retry > 0 {
  352. continue
  353. }
  354. return e
  355. }
  356. }
  357. logger.Info("drop class " + classname)
  358. return nil
  359. }