odbcimporter.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package importer
  2. import (
  3. "strings"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "git.wecise.com/wecise/cgimport/odbc"
  8. "git.wecise.com/wecise/odb-go/odb"
  9. "git.wecise.com/wecise/util/cmap"
  10. "git.wecise.com/wecise/util/merrs"
  11. )
  12. type fieldinfo struct {
  13. fieldname string
  14. fieldtype string
  15. keyidx int // 主键顺序值,0为非主键
  16. datakey string // 对应数据中的键名
  17. }
  18. type classinfo struct {
  19. classname string
  20. nickname string
  21. fieldinfos map[string]*fieldinfo
  22. keyfields []string
  23. fieldslist []string
  24. insertmql string
  25. insertcount int64
  26. lastlogtime time.Time
  27. lastlogicount int64
  28. mutex sync.Mutex
  29. }
  30. type ODBCImporter struct {
  31. client odb.Client
  32. classinfos cmap.ConcurrentMap[string, *classinfo]
  33. }
  34. func NewODBCImporter() *ODBCImporter {
  35. return &ODBCImporter{
  36. client: odbc.ODBC(),
  37. classinfos: cmap.New[string, *classinfo](),
  38. }
  39. }
  40. // 根据数据修正类定义
  41. func (odbci *ODBCImporter) ReviseClassStruct(data map[string]any) (err error) {
  42. e := odbci.createClass("cgitest", "/cgitest", nil)
  43. if e != nil {
  44. return e
  45. }
  46. e = odbci.createClass("x10", "/cgitest/x10", nil)
  47. if e != nil {
  48. return e
  49. }
  50. e = odbci.createClass("x1001", "/cgitest/x10/x1001", []*fieldinfo{
  51. {fieldname: "uniqueid", datakey: "UNIQUEID", fieldtype: "varchar", keyidx: 1},
  52. {fieldname: "distname", datakey: "BASENAME", fieldtype: "varchar"},
  53. })
  54. if e != nil {
  55. return e
  56. }
  57. e = odbci.createClass("x1002", "/cgitest/x10/x1002", []*fieldinfo{
  58. {fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
  59. {fieldname: "distname", datakey: "distName", fieldtype: "varchar"},
  60. })
  61. if e != nil {
  62. return e
  63. }
  64. e = odbci.createClass("x1003", "/cgitest/x10/x1003", []*fieldinfo{
  65. {fieldname: "fromuniqueid", datakey: "FROMUNIQUEID", fieldtype: "varchar", keyidx: 1},
  66. {fieldname: "touniqueid", datakey: "TOUNIQUEID", fieldtype: "varchar", keyidx: 2},
  67. })
  68. if e != nil {
  69. return e
  70. }
  71. return
  72. }
  73. // 插入数据
  74. func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
  75. ci := odbci.classinfos.GetIFPresent(classname)
  76. if ci == nil {
  77. return merrs.NewError("class not defined " + classname)
  78. }
  79. if ci.insertmql == "" {
  80. return merrs.NewError("class no fields to insert " + classname)
  81. }
  82. values := []any{}
  83. for _, fn := range ci.fieldslist {
  84. fi := ci.fieldinfos[fn]
  85. v := data[fi.datakey]
  86. values = append(values, v)
  87. }
  88. _, err = odbci.client.Query(ci.insertmql, values...).Do()
  89. if err != nil {
  90. return
  91. }
  92. atomic.AddInt64(&ci.insertcount, 1)
  93. ci.mutex.Lock()
  94. if time.Since(ci.lastlogtime) > 5*time.Second && ci.lastlogicount != ci.insertcount {
  95. ci.lastlogtime = time.Now()
  96. ci.lastlogicount = ci.insertcount
  97. logger.Info("class", ci.classname, "import", ci.insertcount, "records")
  98. }
  99. ci.mutex.Unlock()
  100. return
  101. }
  102. func (odbci *ODBCImporter) done() {
  103. odbci.classinfos.Fetch(func(cn string, ci *classinfo) bool {
  104. ci.mutex.Lock()
  105. ci.lastlogtime = time.Now()
  106. ci.lastlogicount = ci.insertcount
  107. logger.Info("class", ci.classname, "import", ci.insertcount, "records")
  108. ci.mutex.Unlock()
  109. return true
  110. })
  111. }
  112. func (odbci *ODBCImporter) reload() error {
  113. _, e := odbci.client.Query(`delete from "/cgitest/x10/x1001" with version`).Do()
  114. _ = e
  115. _, e = odbci.client.Query(`delete from "/cgitest/x10/x1002" with version`).Do()
  116. _ = e
  117. _, e = odbci.client.Query(`delete from "/cgitest/x10/x1003" with version`).Do()
  118. _ = e
  119. _, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1001"`).Do()
  120. if e != nil {
  121. return e
  122. }
  123. _, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1002"`).Do()
  124. if e != nil {
  125. return e
  126. }
  127. _, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1003"`).Do()
  128. if e != nil {
  129. return e
  130. }
  131. _, e = odbci.client.Query(`drop class if exists "/cgitest/x10"`).Do()
  132. if e != nil {
  133. return e
  134. }
  135. return nil
  136. }
  137. // 新建类
  138. func (odbci *ODBCImporter) createClass(nickname, classname string, fieldinfoslist []*fieldinfo) (err error) {
  139. _, err = odbci.classinfos.GetWithNew(classname, func() (ci *classinfo, err error) {
  140. logger.Info("create class " + classname)
  141. fieldinfos := map[string]*fieldinfo{}
  142. fieldslist := []string{}
  143. keyfields := []string{}
  144. mql := `create class if not exists ` + classname + `(`
  145. if len(fieldinfoslist) > 0 {
  146. field_defines := []string{}
  147. for _, fi := range fieldinfoslist {
  148. field_defines = append(field_defines, fi.fieldname+" "+fi.fieldtype)
  149. fieldslist = append(fieldslist, fi.fieldname)
  150. if fi.keyidx > 0 {
  151. for len(keyfields) < fi.keyidx {
  152. keyfields = append(keyfields, "")
  153. }
  154. keyfields[fi.keyidx-1] = fi.fieldname
  155. }
  156. fieldinfos[fi.fieldname] = fi
  157. }
  158. mql += strings.Join(field_defines, ",")
  159. }
  160. if len(keyfields) > 0 {
  161. mql += ", keys(" + strings.Join(keyfields, ",") + ")"
  162. }
  163. mql += `)with namespace="cgitest" and key=manu and nickname='` + nickname + `'`
  164. _, err = odbci.client.Query(mql).Do()
  165. if err != nil {
  166. return
  167. }
  168. var insertmql string
  169. if len(fieldslist) > 0 {
  170. insertmql = `insert into ` + classname + "(" + strings.Join(fieldslist, ",") + ")values(" + strings.Repeat(",?", len(fieldslist))[1:] + ")"
  171. }
  172. ci = &classinfo{
  173. classname: classname,
  174. nickname: nickname,
  175. fieldinfos: fieldinfos,
  176. keyfields: keyfields,
  177. fieldslist: fieldslist,
  178. insertmql: insertmql,
  179. }
  180. return
  181. })
  182. return
  183. }
  184. // 修改类定义
  185. func (odbci *ODBCImporter) alterClass() (err error) {
  186. return
  187. }