odbcimporter.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package importer
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.wecise.com/wecise/cgimport/odbc"
  11. "git.wecise.com/wecise/odb-go/odb"
  12. "git.wecise.com/wecise/util/cast"
  13. "git.wecise.com/wecise/util/cmap"
  14. "git.wecise.com/wecise/util/merrs"
  15. )
  16. type classdatainfo struct {
  17. *classinfo
  18. insertcount int64
  19. lastlogtime time.Time
  20. lastlogicount int64
  21. mutex sync.Mutex
  22. }
  23. var classdatainfos = cmap.New[string, *classdatainfo]()
  24. type ODBCImporter struct {
  25. client odb.Client
  26. }
  27. func NewODBCImporter() *ODBCImporter {
  28. odbci := &ODBCImporter{}
  29. if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
  30. odbci.client = odbc.ODBC()
  31. }
  32. return odbci
  33. }
  34. // 根据数据修正类定义
  35. func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
  36. for _, classname := range classnames {
  37. ci := classinfos.GetIFPresent(classname)
  38. if ci == nil {
  39. return merrs.NewError("classinfo not found " + classname)
  40. }
  41. _, err = classdatainfos.GetWithNew(classname, func() (cdi *classdatainfo, err error) {
  42. logger.Info("create class " + ci.classname)
  43. if odbci.client != nil {
  44. _, err = odbci.client.Query(ci.createmql).Do()
  45. if err != nil {
  46. return
  47. }
  48. }
  49. cdi = &classdatainfo{classinfo: ci}
  50. return
  51. })
  52. }
  53. if odbci.client != nil {
  54. for _, createedgemql := range createedgemqls {
  55. _, e := odbci.client.Query(createedgemql).Do()
  56. if e != nil && !strings.Contains(e.Error(), "already exist") {
  57. err = e
  58. return
  59. }
  60. }
  61. }
  62. return
  63. }
  64. func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
  65. extraattr := map[string]string{}
  66. fromuid := ""
  67. touid := ""
  68. edgetype := ""
  69. for k, v := range data {
  70. switch k {
  71. case "FROMUNIQUEID":
  72. fromuid = cast.ToString(v)
  73. case "TOUNIQUEID":
  74. touid = cast.ToString(v)
  75. case "EDGETYPE":
  76. edgetype = cast.ToString(v)
  77. default:
  78. extraattr[k] = cast.ToString(v)
  79. }
  80. }
  81. if fromuid == "" {
  82. databs, _ := json.MarshalIndent(data, "", " ")
  83. return merrs.NewError("not found valid fromuniqueid in data ", merrs.SSMap{"data": string(databs)})
  84. }
  85. if touid == "" {
  86. databs, _ := json.MarshalIndent(data, "", " ")
  87. return merrs.NewError("not found valid touniqueid in data ", merrs.SSMap{"data": string(databs)})
  88. }
  89. if edgetype == "" {
  90. databs, _ := json.MarshalIndent(data, "", " ")
  91. return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
  92. }
  93. edgetype = relations[edgetype]
  94. if edgetype == "" {
  95. databs, _ := json.MarshalIndent(data, "", " ")
  96. return merrs.NewError("not found valid edgetype in data ", merrs.SSMap{"data": string(databs)})
  97. }
  98. if odbci.client != nil {
  99. foid := get_object_id_from_cache(fromuid)
  100. toid := get_object_id_from_cache(touid)
  101. eabs, _ := json.Marshal(extraattr)
  102. quadmql := `quad "` + foid + `" ` + edgetype + ` + "` + toid + `" ` + string(eabs)
  103. _, err = odbci.client.Query(quadmql).Do()
  104. if err != nil {
  105. err = merrs.NewError(err, merrs.SSMaps{{"mql": quadmql}})
  106. logger.Error(err)
  107. return
  108. }
  109. }
  110. return
  111. }
  112. var cm_object_id_cache = cmap.New[string, chan string]()
  113. func object_id_cache(suid string) chan string {
  114. choid, _ := cm_object_id_cache.GetWithNew(suid,
  115. func() (chan string, error) {
  116. ch := make(chan string, 2)
  117. return ch, nil
  118. })
  119. return choid
  120. }
  121. func get_object_id_from_cache(suid string) string {
  122. choid := object_id_cache(suid)
  123. oid := <-choid
  124. push_object_id_into_cache(choid, oid)
  125. return oid
  126. }
  127. func push_object_id_into_cache(choid chan string, oid string) {
  128. choid <- oid
  129. if len(choid) == 2 {
  130. // 最多保留 1 个
  131. // chan cap = 2,第三个元素进不来
  132. // 进第二个元素的协程,清除第一个元素,允许其它协程后续进入新元素
  133. <-choid
  134. }
  135. }
  136. func object_id(classaliasname string, data map[string]any) (oid, suid string, err error) {
  137. uid := data["uniqueId"]
  138. if uid == nil {
  139. uid = data["UNIQUEID"]
  140. if uid == nil {
  141. databs, _ := json.MarshalIndent(data, "", " ")
  142. return "", "", merrs.NewError("not found uniqueid in data ", merrs.SSMap{"data": string(databs)})
  143. }
  144. }
  145. suid = cast.ToString(uid)
  146. if suid == "" {
  147. databs, _ := json.MarshalIndent(data, "", " ")
  148. return "", "", merrs.NewError("not found valid uniqueid in data ", merrs.SSMap{"data": string(databs)})
  149. }
  150. suid64 := base64.RawURLEncoding.EncodeToString([]byte(suid))
  151. return classaliasname + ":" + suid64, suid, nil
  152. }
  153. // 插入数据
  154. func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
  155. cdi := classdatainfos.GetIFPresent(classname)
  156. if cdi == nil {
  157. return merrs.NewError("class not defined " + classname)
  158. }
  159. if cdi.insertmql == "" {
  160. return merrs.NewError("class no fields to insert " + classname)
  161. }
  162. oid, suid, e := object_id(cdi.aliasname, data)
  163. if e != nil {
  164. return e
  165. }
  166. values := []any{}
  167. for _, fn := range cdi.fieldslist {
  168. if fn == "id" {
  169. values = append(values, oid)
  170. continue
  171. }
  172. fi := cdi.fieldinfos[fn]
  173. if fi.datakey == "" {
  174. td := map[string]any{}
  175. for k, v := range data {
  176. if cdi.datakey_fieldinfos[k] == nil {
  177. td[k] = v
  178. }
  179. }
  180. tdbs, e := json.Marshal(td)
  181. if e != nil {
  182. return merrs.NewError(e)
  183. }
  184. values = append(values, string(tdbs))
  185. continue
  186. }
  187. v := data[fi.datakey]
  188. switch fi.fieldtype {
  189. case "set<varchar>":
  190. v = cast.ToStringSlice(v)
  191. case "timestamp":
  192. tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
  193. if e != nil {
  194. return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
  195. }
  196. v = tv.Format("2006-01-02 15:04:05.000000")
  197. }
  198. if fn == "tags" {
  199. v = append(cast.ToStringSlice(v), classname)
  200. }
  201. values = append(values, v)
  202. }
  203. if odbci.client != nil {
  204. _, err = odbci.client.Query(cdi.insertmql, values...).Do()
  205. if err != nil {
  206. err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.insertmql}, {"values": fmt.Sprint(values)}})
  207. logger.Error(err)
  208. return
  209. }
  210. push_object_id_into_cache(object_id_cache(suid), oid)
  211. }
  212. atomic.AddInt64(&cdi.insertcount, 1)
  213. cdi.mutex.Lock()
  214. if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
  215. cdi.lastlogtime = time.Now()
  216. cdi.lastlogicount = cdi.insertcount
  217. logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
  218. }
  219. cdi.mutex.Unlock()
  220. return
  221. }
  222. func (odbci *ODBCImporter) done() {
  223. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  224. cdi.mutex.Lock()
  225. if cdi.lastlogicount != cdi.insertcount {
  226. cdi.lastlogtime = time.Now()
  227. cdi.lastlogicount = cdi.insertcount
  228. logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
  229. }
  230. cdi.mutex.Unlock()
  231. return true
  232. })
  233. }
  234. func (odbci *ODBCImporter) alldone() {
  235. classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
  236. cdi.mutex.Lock()
  237. if cdi.insertcount != 0 {
  238. cdi.lastlogtime = time.Now()
  239. cdi.lastlogicount = cdi.insertcount
  240. logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
  241. }
  242. cdi.mutex.Unlock()
  243. return true
  244. })
  245. }
  246. func (odbci *ODBCImporter) reload() error {
  247. if odbci.client != nil {
  248. for i := len(classnames) - 1; i >= 0; i-- {
  249. classname := classnames[i]
  250. ci := classinfos.GetIFPresent(classname)
  251. if ci == nil {
  252. continue
  253. }
  254. for retry := 2; retry >= 0; retry-- {
  255. _, e := odbci.client.Query(`delete from "` + ci.classfullname + `" with version`).Do()
  256. _ = e
  257. _, e = odbci.client.Query(`drop class if exists "` + ci.classfullname + `"`).Do()
  258. if e != nil {
  259. if retry > 0 {
  260. continue
  261. }
  262. return e
  263. }
  264. }
  265. }
  266. }
  267. return nil
  268. }