odbcimporter.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. package importer
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "regexp"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/odb-go/odb"
  13. "git.wecise.com/wecise/util/cast"
  14. "git.wecise.com/wecise/util/cmap"
  15. "git.wecise.com/wecise/util/merrs"
  16. )
  17. type classdatainfo struct {
  18. *classinfo
  19. insertcount int64
  20. lastlogtime time.Time
  21. lastlogicount int64
  22. mutex sync.Mutex
  23. }
  24. var classdatainfos = cmap.New[string, *classdatainfo]()
  25. type ODBCImporter struct {
  26. client odb.Client
  27. }
  28. func NewODBCImporter() *ODBCImporter {
  29. odbci := &ODBCImporter{}
  30. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  31. odbci.client = odbc.ODBC()
  32. }
  33. return odbci
  34. }
  35. // 根据数据修正类定义
  36. func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
  37. for _, classname := range classnames {
  38. ci := classinfos.GetIFPresent(classname)
  39. if ci == nil {
  40. return merrs.NewError("classinfo not found " + classname)
  41. }
  42. _, err = classdatainfos.GetWithNew(classname, func() (cdi *classdatainfo, err error) {
  43. logger.Info("create class " + ci.classname)
  44. if odbci.client != nil {
  45. _, err = odbci.client.Query(ci.createmql).Do()
  46. if err != nil {
  47. return
  48. }
  49. }
  50. cdi = &classdatainfo{classinfo: ci}
  51. return
  52. })
  53. }
  54. if odbci.client != nil {
  55. for _, createedgemql := range createedgemqls {
  56. _, e := odbci.client.Query(createedgemql).Do()
  57. if e != nil && !strings.Contains(e.Error(), "already exist") {
  58. err = e
  59. return
  60. }
  61. }
  62. }
  63. return
  64. }
  65. func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
  66. extraattr := map[string]string{}
  67. fromuid := ""
  68. touid := ""
  69. edgetype := ""
  70. for k, v := range data {
  71. switch k {
  72. case "FROMUNIQUEID":
  73. fromuid = cast.ToString(v)
  74. case "TOUNIQUEID":
  75. touid = cast.ToString(v)
  76. case "EDGETYPE":
  77. edgetype = cast.ToString(v)
  78. default:
  79. extraattr[k] = cast.ToString(v)
  80. }
  81. }
  82. if fromuid == "" {
  83. databs, _ := json.MarshalIndent(data, "", " ")
  84. return merrs.NewError("not found valid fromuniqueid in data ", merrs.SSMap{"data": string(databs)})
  85. }
  86. if touid == "" {
  87. databs, _ := json.MarshalIndent(data, "", " ")
  88. return merrs.NewError("not found valid touniqueid in data ", merrs.SSMap{"data": string(databs)})
  89. }
  90. if edgetype == "" {
  91. databs, _ := json.MarshalIndent(data, "", " ")
  92. return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
  93. }
  94. return odbci.insertEdge(edgetype, fromuid, touid, extraattr, data)
  95. }
  96. func (odbci *ODBCImporter) insertEdge(edgetype, fromuid, touid string, extraattr map[string]string, data map[string]any) (err error) {
  97. edgetype = relations[edgetype]
  98. if edgetype == "" {
  99. databs, _ := json.MarshalIndent(data, "", " ")
  100. return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
  101. }
  102. if odbci.client != nil {
  103. foid := get_object_id_from_cache("master:" + fromuid)
  104. toid := to_object_id("level1", touid)
  105. eabs, _ := json.Marshal(extraattr)
  106. quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
  107. _, err = odbci.client.Query(quadmql).Do()
  108. if err != nil {
  109. err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}})
  110. logger.Error(err)
  111. return
  112. }
  113. logger.Info("relation immport " + foid + "->" + toid)
  114. }
  115. return
  116. }
  117. var cm_object_id_cache = cmap.New[string, chan string]()
  118. func object_id_cache(suid string) chan string {
  119. choid, _ := cm_object_id_cache.GetWithNew(suid,
  120. func() (chan string, error) {
  121. ch := make(chan string, 2)
  122. return ch, nil
  123. })
  124. return choid
  125. }
  126. func get_object_id_from_cache(suid string) string {
  127. choid := object_id_cache(suid)
  128. oid := <-choid
  129. push_object_id_into_cache(choid, oid)
  130. return oid
  131. }
  132. func push_object_id_into_cache(choid chan string, oid string) {
  133. choid <- oid
  134. if len(choid) == 2 {
  135. // 最多保留 1 个
  136. // chan cap = 2,第三个元素进不来
  137. // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
  138. <-choid
  139. }
  140. }
  141. func object_id(classaliasname string, data map[string]any) (oid, suid string, err error) {
  142. uid := data["uniqueId"]
  143. if uid == nil {
  144. uid = data["UNIQUEID"]
  145. if uid == nil {
  146. databs, _ := json.MarshalIndent(data, "", " ")
  147. return "", "", merrs.NewError("not found uniqueid in data ", merrs.SSMap{"data": string(databs)})
  148. }
  149. }
  150. suid = cast.ToString(uid)
  151. if suid == "" {
  152. databs, _ := json.MarshalIndent(data, "", " ")
  153. return "", "", merrs.NewError("not found valid uniqueid in data ", merrs.SSMap{"data": string(databs)})
  154. }
  155. return to_object_id(classaliasname, suid), suid, nil
  156. }
  157. func to_object_id(classaliasname string, suid string) string {
  158. suid64 := base64.RawURLEncoding.EncodeToString([]byte(suid))
  159. return classaliasname + ":" + suid64
  160. }
  161. // 插入数据
  162. func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
  163. cdi := classdatainfos.GetIFPresent(classname)
  164. if cdi == nil {
  165. return merrs.NewError("class not defined " + classname)
  166. }
  167. if cdi.insertmql == "" {
  168. return merrs.NewError("class no fields to insert " + classname)
  169. }
  170. oid, suid, e := object_id(cdi.aliasname, data)
  171. if e != nil {
  172. return e
  173. }
  174. var contain, depend, topology map[string][]string
  175. refer := data["_references"]
  176. switch vv := refer.(type) {
  177. case []interface{}:
  178. for _, v := range vv {
  179. switch vv := v.(type) {
  180. case map[string]interface{}:
  181. for k, v := range vv {
  182. switch k {
  183. case "_edgeType":
  184. case "_toUniqueId":
  185. suid := cast.ToString(v)
  186. toclassname := "master"
  187. switch classname {
  188. case "level2":
  189. toclassname = "level1"
  190. case "level3":
  191. toclassname = "level2"
  192. case "level4":
  193. toclassname = "level3"
  194. case "level5":
  195. toclassname = "level4"
  196. case "level6":
  197. toclassname = "level5"
  198. case "level7":
  199. toclassname = "level6"
  200. case "level8":
  201. toclassname = "level7"
  202. }
  203. toid := to_object_id(toclassname, suid)
  204. m := map[string]string{"_direction": "out"}
  205. mbs, _ := json.Marshal(m)
  206. depend = map[string][]string{
  207. "_all": {toid},
  208. toid: {string(mbs)},
  209. }
  210. }
  211. }
  212. }
  213. }
  214. }
  215. values := []any{}
  216. for _, fn := range cdi.fieldslist {
  217. if fn == "id" {
  218. values = append(values, oid)
  219. continue
  220. }
  221. if fn == "contain" {
  222. values = append(values, contain)
  223. continue
  224. }
  225. if fn == "depend" {
  226. values = append(values, depend)
  227. continue
  228. }
  229. if fn == "topology" {
  230. values = append(values, topology)
  231. continue
  232. }
  233. fi := cdi.fieldinfos[fn]
  234. if fi.datakey == "" {
  235. td := map[string]any{}
  236. for k, v := range data {
  237. if cdi.datakey_fieldinfos[k] == nil {
  238. td[k] = v
  239. }
  240. }
  241. tdbs, e := json.Marshal(td)
  242. if e != nil {
  243. return merrs.NewError(e)
  244. }
  245. values = append(values, string(tdbs))
  246. continue
  247. }
  248. v := data[fi.datakey]
  249. switch fi.fieldtype {
  250. case "set<varchar>":
  251. v = cast.ToStringSlice(v)
  252. case "timestamp":
  253. tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
  254. if e != nil {
  255. return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  256. }
  257. v = tv.Format("2006-01-02 15:04:05.000000")
  258. }
  259. if fn == "tags" {
  260. v = append(cast.ToStringSlice(v), classname)
  261. }
  262. values = append(values, v)
  263. }
  264. if odbci.client != nil {
  265. _, err = odbci.client.Query(cdi.insertmql, values...).Do()
  266. if err != nil {
  267. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.insertmql}, {"values": fmt.Sprint(values)}})
  268. logger.Error(err)
  269. return
  270. }
  271. push_object_id_into_cache(object_id_cache(classname+":"+suid), oid)
  272. }
  273. atomic.AddInt64(&cdi.insertcount, 1)
  274. cdi.mutex.Lock()
  275. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  276. cdi.lastlogtime = time.Now()
  277. cdi.lastlogicount = cdi.insertcount
  278. logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
  279. }
  280. cdi.mutex.Unlock()
  281. return
  282. }
  283. func (odbci *ODBCImporter) done() {
  284. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  285. cdi.mutex.Lock()
  286. if cdi.lastlogicount != cdi.insertcount {
  287. cdi.lastlogtime = time.Now()
  288. cdi.lastlogicount = cdi.insertcount
  289. logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
  290. }
  291. cdi.mutex.Unlock()
  292. return true
  293. })
  294. }
  295. func (odbci *ODBCImporter) alldone() {
  296. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  297. cdi.mutex.Lock()
  298. if cdi.insertcount != 0 {
  299. cdi.lastlogtime = time.Now()
  300. cdi.lastlogicount = cdi.insertcount
  301. logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
  302. }
  303. cdi.mutex.Unlock()
  304. return true
  305. })
  306. }
  307. func (odbci *ODBCImporter) reload() error {
  308. if odbci.client != nil {
  309. for i := len(classnames) - 1; i >= 0; i-- {
  310. classname := classnames[i]
  311. ci := classinfos.GetIFPresent(classname)
  312. if ci == nil {
  313. continue
  314. }
  315. e := odbci.dropclass(ci.classfullname)
  316. if e != nil {
  317. return e
  318. }
  319. }
  320. }
  321. return nil
  322. }
  323. func (odbci *ODBCImporter) dropclass(classname string) error {
  324. for retry := 2; retry >= 0; retry-- {
  325. _, e := odbci.client.Query(`delete from "` + classname + `" with version`).Do()
  326. _ = e
  327. _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
  328. if e != nil {
  329. matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
  330. if len(matchstr) >= 2 {
  331. e = odbci.dropclass(matchstr[1])
  332. if e != nil {
  333. return e
  334. }
  335. }
  336. if retry > 0 {
  337. continue
  338. }
  339. return e
  340. }
  341. }
  342. logger.Info("drop class " + classname)
  343. return nil
  344. }