odbcimporter.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. package importer
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "regexp"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "git.wecise.com/wecise/cgimport/odbc"
  12. "git.wecise.com/wecise/cgimport/schema"
  13. "git.wecise.com/wecise/odb-go/odb"
  14. "git.wecise.com/wecise/util/cast"
  15. "git.wecise.com/wecise/util/cmap"
  16. "git.wecise.com/wecise/util/merrs"
  17. )
  18. type classdatainfo struct {
  19. *schema.ClassInfo
  20. insertcount int64
  21. lastlogtime time.Time
  22. lastlogicount int64
  23. mutex sync.Mutex
  24. }
  25. var classdatainfos = cmap.New[string, *classdatainfo]()
  26. type ODBCImporter struct {
  27. client odb.Client
  28. }
  29. func NewODBCImporter() *ODBCImporter {
  30. odbci := &ODBCImporter{}
  31. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  32. odbci.client = odbc.ODBC()
  33. }
  34. return odbci
  35. }
  36. // 根据数据修正类定义
  37. func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
  38. for _, classname := range schema.ClassNames {
  39. ci := schema.ClassInfos.GetIFPresent(classname)
  40. if ci == nil {
  41. return merrs.NewError("classinfo not found " + classname)
  42. }
  43. _, err = classdatainfos.GetWithNew(classname, func() (cdi *classdatainfo, err error) {
  44. if odbci.client != nil {
  45. _, err = odbci.client.Query("select class,id from " + ci.Classname + " limit 1").Do()
  46. if err != nil {
  47. if !strings.Contains(err.Error(), "not find") {
  48. return nil, err
  49. }
  50. logger.Info("create class " + ci.Classfullname)
  51. _, err = odbci.client.Query(ci.Createmql).Do()
  52. if err != nil {
  53. return
  54. }
  55. }
  56. }
  57. cdi = &classdatainfo{ClassInfo: ci}
  58. return
  59. })
  60. }
  61. if odbci.client != nil {
  62. for _, createedgemql := range schema.CreateEdgeMqls {
  63. _, e := odbci.client.Query(createedgemql).Do()
  64. if e != nil && !strings.Contains(e.Error(), "already exist") {
  65. err = e
  66. return
  67. }
  68. }
  69. }
  70. return
  71. }
  72. func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
  73. extraattr := map[string]string{}
  74. fromuid := ""
  75. touid := ""
  76. edgetype := ""
  77. for k, v := range data {
  78. switch k {
  79. case "FROMUNIQUEID":
  80. fromuid = cast.ToString(v)
  81. case "TOUNIQUEID":
  82. touid = cast.ToString(v)
  83. case "EDGETYPE":
  84. edgetype = cast.ToString(v)
  85. default:
  86. extraattr[k] = cast.ToString(v)
  87. }
  88. }
  89. if fromuid == "" {
  90. databs, _ := json.MarshalIndent(data, "", " ")
  91. return merrs.NewError("not found valid fromuniqueid in data ", merrs.SSMap{"data": string(databs)})
  92. }
  93. if touid == "" {
  94. databs, _ := json.MarshalIndent(data, "", " ")
  95. return merrs.NewError("not found valid touniqueid in data ", merrs.SSMap{"data": string(databs)})
  96. }
  97. if edgetype == "" {
  98. databs, _ := json.MarshalIndent(data, "", " ")
  99. return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
  100. }
  101. return odbci.insertEdge(edgetype, fromuid, touid, extraattr, data)
  102. }
  103. func (odbci *ODBCImporter) insertEdge(edgetype, fromuid, touid string, extraattr map[string]string, data map[string]any) (err error) {
  104. edgetype = schema.Relations[edgetype]
  105. if edgetype == "" {
  106. databs, _ := json.MarshalIndent(data, "", " ")
  107. return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
  108. }
  109. if odbci.client != nil {
  110. foid := get_object_id_from_cache("master:" + fromuid)
  111. toid := to_object_id("level1", touid)
  112. eabs, _ := json.Marshal(extraattr)
  113. // quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
  114. // _, err = odbci.client.Query(quadmql).Do()
  115. // if err != nil {
  116. // err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}})
  117. // logger.Error(err)
  118. // return
  119. // }
  120. updatemql := "update " + "/m3cnet/master" + " set " + " contain=?" + " where id='" + foid + "'"
  121. _, err = odbci.client.Query(updatemql, map[string][]string{
  122. "_all": {toid},
  123. toid: {string(eabs)},
  124. }).Do()
  125. if err != nil {
  126. err = merrs.NewError(err, merrs.SSMaps{{"mql": updatemql}})
  127. return
  128. }
  129. logger.Info("relation immport " + foid + "->" + toid)
  130. }
  131. return
  132. }
  133. var cm_object_id_cache = cmap.New[string, chan string]()
  134. func object_id_cache(suid string) chan string {
  135. choid, _ := cm_object_id_cache.GetWithNew(suid,
  136. func() (chan string, error) {
  137. ch := make(chan string, 2)
  138. return ch, nil
  139. })
  140. return choid
  141. }
  142. func get_object_id_from_cache(suid string) string {
  143. choid := object_id_cache(suid)
  144. oid := <-choid
  145. push_object_id_into_cache(choid, oid)
  146. return oid
  147. }
  148. func push_object_id_into_cache(choid chan string, oid string) {
  149. choid <- oid
  150. if len(choid) == 2 {
  151. // 最多保留 1 个
  152. // chan cap = 2,第三个元素进不来
  153. // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
  154. <-choid
  155. }
  156. }
  157. func object_id(classaliasname string, data map[string]any) (oid, suid string, err error) {
  158. uid := data["uniqueId"]
  159. if uid == nil {
  160. uid = data["UNIQUEID"]
  161. if uid == nil {
  162. databs, _ := json.MarshalIndent(data, "", " ")
  163. return "", "", merrs.NewError("not found uniqueid in data ", merrs.SSMap{"data": string(databs)})
  164. }
  165. }
  166. suid = cast.ToString(uid)
  167. if suid == "" {
  168. databs, _ := json.MarshalIndent(data, "", " ")
  169. return "", "", merrs.NewError("not found valid uniqueid in data ", merrs.SSMap{"data": string(databs)})
  170. }
  171. return to_object_id(classaliasname, suid), suid, nil
  172. }
  173. func to_object_id(classaliasname string, suid string) string {
  174. suid64 := base64.RawURLEncoding.EncodeToString([]byte(suid))
  175. return classaliasname + ":" + suid64
  176. }
  177. // 插入数据
  178. func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
  179. cdi := classdatainfos.GetIFPresent(classname)
  180. if cdi == nil {
  181. return merrs.NewError("class not defined " + classname)
  182. }
  183. if cdi.Insertmql == "" {
  184. return merrs.NewError("class no fields to insert " + classname)
  185. }
  186. oid, suid, e := object_id(cdi.Aliasname, data)
  187. if e != nil {
  188. return e
  189. }
  190. var contain, depend, topology map[string][]string
  191. refer := data["_references"]
  192. switch vv := refer.(type) {
  193. case []interface{}:
  194. for _, v := range vv {
  195. switch vv := v.(type) {
  196. case map[string]interface{}:
  197. for k, v := range vv {
  198. switch k {
  199. case "_edgeType":
  200. case "_toUniqueId":
  201. suid := cast.ToString(v)
  202. toclassname := "master"
  203. switch classname {
  204. case "level2":
  205. toclassname = "level1"
  206. case "level3":
  207. toclassname = "level2"
  208. case "level4":
  209. toclassname = "level3"
  210. case "level5":
  211. toclassname = "level4"
  212. case "level6":
  213. toclassname = "level5"
  214. case "level7":
  215. toclassname = "level6"
  216. case "level8":
  217. toclassname = "level7"
  218. }
  219. toid := to_object_id(toclassname, suid)
  220. m := map[string]string{"_direction": "out"}
  221. mbs, _ := json.Marshal(m)
  222. depend = map[string][]string{
  223. "_all": {toid},
  224. toid: {string(mbs)},
  225. }
  226. }
  227. }
  228. }
  229. }
  230. }
  231. values := []any{}
  232. for _, fn := range cdi.Fieldslist {
  233. if fn == "id" {
  234. values = append(values, oid)
  235. continue
  236. }
  237. if fn == "contain" {
  238. values = append(values, contain)
  239. continue
  240. }
  241. if fn == "depend" {
  242. values = append(values, depend)
  243. continue
  244. }
  245. if fn == "topology" {
  246. values = append(values, topology)
  247. continue
  248. }
  249. fi := cdi.Fieldinfos[fn]
  250. if fi.Datakey == "" {
  251. td := map[string]any{}
  252. for k, v := range data {
  253. if cdi.DatakeyFieldinfos[k] == nil {
  254. td[k] = v
  255. }
  256. }
  257. tdbs, e := json.Marshal(td)
  258. if e != nil {
  259. return merrs.NewError(e)
  260. }
  261. values = append(values, string(tdbs))
  262. continue
  263. }
  264. v := data[fi.Datakey]
  265. switch fi.Fieldtype {
  266. case "set<varchar>":
  267. v = cast.ToStringSlice(v)
  268. case "timestamp":
  269. tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
  270. if e != nil {
  271. return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  272. }
  273. v = tv.Format("2006-01-02 15:04:05.000000")
  274. }
  275. if fn == "tags" {
  276. v = append(cast.ToStringSlice(v), classname)
  277. }
  278. values = append(values, v)
  279. }
  280. if odbci.client != nil {
  281. _, err = odbci.client.Query(cdi.Insertmql, values...).Do()
  282. if err != nil {
  283. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.Insertmql}, {"values": fmt.Sprint(values)}})
  284. logger.Error(err)
  285. return
  286. }
  287. push_object_id_into_cache(object_id_cache(classname+":"+suid), oid)
  288. }
  289. atomic.AddInt64(&cdi.insertcount, 1)
  290. cdi.mutex.Lock()
  291. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  292. cdi.lastlogtime = time.Now()
  293. cdi.lastlogicount = cdi.insertcount
  294. logger.Info("class", cdi.Classname, "import", cdi.insertcount, "records")
  295. }
  296. cdi.mutex.Unlock()
  297. return
  298. }
  299. func (odbci *ODBCImporter) done() {
  300. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  301. cdi.mutex.Lock()
  302. if cdi.lastlogicount != cdi.insertcount {
  303. cdi.lastlogtime = time.Now()
  304. cdi.lastlogicount = cdi.insertcount
  305. logger.Info("class", cdi.Classname, "import", cdi.insertcount, "records")
  306. }
  307. cdi.mutex.Unlock()
  308. return true
  309. })
  310. }
  311. func (odbci *ODBCImporter) alldone() {
  312. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  313. cdi.mutex.Lock()
  314. if cdi.insertcount != 0 {
  315. cdi.lastlogtime = time.Now()
  316. cdi.lastlogicount = cdi.insertcount
  317. logger.Info("class", cdi.Classname, "import", cdi.insertcount, "records")
  318. }
  319. cdi.mutex.Unlock()
  320. return true
  321. })
  322. }
  323. func (odbci *ODBCImporter) reload() error {
  324. if odbci.client != nil {
  325. for i := len(schema.ClassNames) - 1; i >= 0; i-- {
  326. classname := schema.ClassNames[i]
  327. ci := schema.ClassInfos.GetIFPresent(classname)
  328. if ci == nil {
  329. continue
  330. }
  331. e := odbci.dropclass(ci.Classfullname)
  332. if e != nil {
  333. return e
  334. }
  335. }
  336. }
  337. return nil
  338. }
  339. func (odbci *ODBCImporter) dropclass(classname string) error {
  340. for retry := 2; retry >= 0; retry-- {
  341. _, e := odbci.client.Query(`delete from "` + classname + `" with version`).Do()
  342. _ = e
  343. _, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
  344. if e != nil {
  345. matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
  346. if len(matchstr) >= 2 {
  347. e = odbci.dropclass(matchstr[1])
  348. if e != nil {
  349. return e
  350. }
  351. }
  352. if retry > 0 {
  353. continue
  354. }
  355. return e
  356. }
  357. }
  358. logger.Info("drop class " + classname)
  359. return nil
  360. }