odbcimporter.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package importer
  2. import (
  3. "strings"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "git.wecise.com/wecise/cgimport/odbc"
  8. "git.wecise.com/wecise/odb-go/odb"
  9. "git.wecise.com/wecise/util/cmap"
  10. "git.wecise.com/wecise/util/merrs"
  11. )
  12. type fieldinfo struct {
  13. fieldname string
  14. fieldtype string
  15. keyidx int // 主键顺序值,0为非主键
  16. datakey string // 对应数据中的键名
  17. }
  18. type classinfo struct {
  19. classname string
  20. nickname string
  21. fieldinfos map[string]*fieldinfo
  22. keyfields []string
  23. fieldslist []string
  24. insertmql string
  25. insertcount int64
  26. lastlogtime time.Time
  27. mutex sync.Mutex
  28. }
  29. type ODBCImporter struct {
  30. client odb.Client
  31. classinfos cmap.ConcurrentMap[string, *classinfo]
  32. }
  33. func NewODBCImporter() *ODBCImporter {
  34. return &ODBCImporter{
  35. client: odbc.ODBC(),
  36. classinfos: cmap.New[string, *classinfo](),
  37. }
  38. }
  39. // 根据数据修正类定义
  40. func (odbci *ODBCImporter) ReviseClassStruct(data map[string]any) (err error) {
  41. e := odbci.createClass("cgitest", "/cgitest", nil)
  42. if e != nil {
  43. return e
  44. }
  45. e = odbci.createClass("x10", "/cgitest/x10", []*fieldinfo{
  46. {fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
  47. })
  48. if e != nil {
  49. return e
  50. }
  51. e = odbci.createClass("x1001", "/cgitest/x10/x1001", []*fieldinfo{
  52. {fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
  53. {fieldname: "distname", datakey: "distName", fieldtype: "varchar"},
  54. })
  55. if e != nil {
  56. return e
  57. }
  58. e = odbci.createClass("x1002", "/cgitest/x10/x1002", []*fieldinfo{
  59. {fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
  60. {fieldname: "distname", datakey: "distName", fieldtype: "varchar"},
  61. })
  62. if e != nil {
  63. return e
  64. }
  65. return
  66. }
  67. // 插入数据
  68. func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
  69. if data["uniqueId"] == nil {
  70. // TODO 开发期只导入部分数据
  71. return
  72. }
  73. ci := odbci.classinfos.GetIFPresent(classname)
  74. if ci == nil {
  75. return merrs.NewError("class not defined " + classname)
  76. }
  77. if ci.insertmql == "" {
  78. return merrs.NewError("class no fields to insert " + classname)
  79. }
  80. values := []any{}
  81. for _, fn := range ci.fieldslist {
  82. fi := ci.fieldinfos[fn]
  83. v := data[fi.datakey]
  84. values = append(values, v)
  85. }
  86. _, err = odbci.client.Query(ci.insertmql, values...).Do()
  87. if err != nil {
  88. return
  89. }
  90. atomic.AddInt64(&ci.insertcount, 1)
  91. ci.mutex.Lock()
  92. if time.Since(ci.lastlogtime) > 5*time.Second {
  93. ci.lastlogtime = time.Now()
  94. logger.Info("class", ci.classname, "import", ci.insertcount, "records")
  95. }
  96. ci.mutex.Unlock()
  97. return
  98. }
  99. // 新建类
  100. func (odbci *ODBCImporter) createClass(nickname, classname string, fieldinfoslist []*fieldinfo) (err error) {
  101. _, err = odbci.classinfos.GetWithNew(classname, func() (ci *classinfo, err error) {
  102. logger.Info("create class " + classname)
  103. fieldinfos := map[string]*fieldinfo{}
  104. fieldslist := []string{}
  105. keyfields := []string{}
  106. mql := `create class if not exists ` + classname + `(`
  107. if len(fieldinfoslist) > 0 {
  108. field_defines := []string{}
  109. for _, fi := range fieldinfoslist {
  110. field_defines = append(field_defines, fi.fieldname+" "+fi.fieldtype)
  111. fieldslist = append(fieldslist, fi.fieldname)
  112. if fi.keyidx > 0 {
  113. for len(keyfields) < fi.keyidx {
  114. keyfields = append(keyfields, "")
  115. }
  116. keyfields[fi.keyidx-1] = fi.fieldname
  117. }
  118. fieldinfos[fi.fieldname] = fi
  119. }
  120. mql += strings.Join(field_defines, ",")
  121. }
  122. if len(keyfields) > 0 {
  123. mql += ", keys(" + strings.Join(keyfields, ",") + ")"
  124. }
  125. mql += `)with namespace="cgitest" and key=manu and nickname='` + nickname + `'`
  126. _, err = odbci.client.Query(mql).Do()
  127. if err != nil {
  128. return
  129. }
  130. var insertmql string
  131. if len(fieldslist) > 0 {
  132. insertmql = `insert into ` + classname + "(" + strings.Join(fieldslist, ",") + ")values(" + strings.Repeat(",?", len(fieldslist))[1:] + ")"
  133. }
  134. ci = &classinfo{
  135. classname: classname,
  136. nickname: nickname,
  137. fieldinfos: fieldinfos,
  138. keyfields: keyfields,
  139. fieldslist: fieldslist,
  140. insertmql: insertmql,
  141. }
  142. return
  143. })
  144. return
  145. }
  146. // 修改类定义
  147. func (odbci *ODBCImporter) alterClass() (err error) {
  148. return
  149. }