odbcimporter.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package importer
  2. import (
  3. "encoding/json"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "git.wecise.com/wecise/cgimport/odbc"
  9. "git.wecise.com/wecise/odb-go/odb"
  10. "git.wecise.com/wecise/util/cmap"
  11. "git.wecise.com/wecise/util/merrs"
  12. )
  13. type fieldinfo struct {
  14. fieldname string
  15. fieldtype string
  16. keyidx int // 主键顺序值,0为非主键
  17. datakey string // 对应数据中的键名
  18. }
  19. type classinfo struct {
  20. classname string
  21. nickname string
  22. fieldinfos map[string]*fieldinfo
  23. keyfields []string
  24. fieldslist []string
  25. insertmql string
  26. insertcount int64
  27. lastlogtime time.Time
  28. lastlogicount int64
  29. mutex sync.Mutex
  30. }
  31. type ODBCImporter struct {
  32. client odb.Client
  33. classinfos cmap.ConcurrentMap[string, *classinfo]
  34. }
  35. func NewODBCImporter() *ODBCImporter {
  36. return &ODBCImporter{
  37. client: odbc.ODBC(),
  38. classinfos: cmap.New[string, *classinfo](),
  39. }
  40. }
  41. // 根据数据修正类定义
  42. func (odbci *ODBCImporter) ReviseClassStruct(data map[string]any) (classname string, err error) {
  43. switch {
  44. case data["uniqueId"] != nil:
  45. classname = "/cgitest/x10/x1002"
  46. case data["UNIQUEID"] != nil:
  47. classname = "/cgitest/x10/x1001"
  48. case data["FROMUNIQUEID"] != nil:
  49. classname = "/cgitest/x10/x1003"
  50. default:
  51. bs, e := json.MarshalIndent(data, "", " ")
  52. if e != nil {
  53. err = e
  54. return
  55. }
  56. err = merrs.NewError("no mapping classname", merrs.SSMaps{{"data": string(bs)}})
  57. }
  58. err = odbci.createClass("cgitest", "/cgitest", nil)
  59. if err != nil {
  60. return
  61. }
  62. err = odbci.createClass("x10", "/cgitest/x10", nil)
  63. if err != nil {
  64. return
  65. }
  66. err = odbci.createClass("x1001", "/cgitest/x10/x1001", []*fieldinfo{
  67. {fieldname: "uniqueid", datakey: "UNIQUEID", fieldtype: "varchar", keyidx: 1},
  68. {fieldname: "distname", datakey: "BASENAME", fieldtype: "varchar"},
  69. })
  70. if err != nil {
  71. return
  72. }
  73. err = odbci.createClass("x1002", "/cgitest/x10/x1002", []*fieldinfo{
  74. {fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
  75. {fieldname: "distname", datakey: "distName", fieldtype: "varchar"},
  76. })
  77. if err != nil {
  78. return
  79. }
  80. err = odbci.createClass("x1003", "/cgitest/x10/x1003", []*fieldinfo{
  81. {fieldname: "fromuniqueid", datakey: "FROMUNIQUEID", fieldtype: "varchar", keyidx: 1},
  82. {fieldname: "touniqueid", datakey: "TOUNIQUEID", fieldtype: "varchar", keyidx: 2},
  83. })
  84. if err != nil {
  85. return
  86. }
  87. return
  88. }
  89. // 插入数据
  90. func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
  91. ci := odbci.classinfos.GetIFPresent(classname)
  92. if ci == nil {
  93. return merrs.NewError("class not defined " + classname)
  94. }
  95. if ci.insertmql == "" {
  96. return merrs.NewError("class no fields to insert " + classname)
  97. }
  98. values := []any{}
  99. for _, fn := range ci.fieldslist {
  100. fi := ci.fieldinfos[fn]
  101. v := data[fi.datakey]
  102. values = append(values, v)
  103. }
  104. _, err = odbci.client.Query(ci.insertmql, values...).Do()
  105. if err != nil {
  106. return
  107. }
  108. atomic.AddInt64(&ci.insertcount, 1)
  109. ci.mutex.Lock()
  110. if time.Since(ci.lastlogtime) > 5*time.Second && ci.lastlogicount != ci.insertcount {
  111. ci.lastlogtime = time.Now()
  112. ci.lastlogicount = ci.insertcount
  113. logger.Info("class", ci.classname, "import", ci.insertcount, "records")
  114. }
  115. ci.mutex.Unlock()
  116. return
  117. }
  118. func (odbci *ODBCImporter) done() {
  119. odbci.classinfos.Fetch(func(cn string, ci *classinfo) bool {
  120. ci.mutex.Lock()
  121. if ci.lastlogicount != ci.insertcount {
  122. ci.lastlogtime = time.Now()
  123. ci.lastlogicount = ci.insertcount
  124. logger.Info("class", ci.classname, "import", ci.insertcount, "records")
  125. }
  126. ci.mutex.Unlock()
  127. return true
  128. })
  129. }
  130. func (odbci *ODBCImporter) alldone() {
  131. odbci.classinfos.Fetch(func(cn string, ci *classinfo) bool {
  132. ci.mutex.Lock()
  133. if ci.insertcount != 0 {
  134. ci.lastlogtime = time.Now()
  135. ci.lastlogicount = ci.insertcount
  136. logger.Info("class", ci.classname, "import", ci.insertcount, "records")
  137. }
  138. ci.mutex.Unlock()
  139. return true
  140. })
  141. }
  142. func (odbci *ODBCImporter) reload() error {
  143. _, e := odbci.client.Query(`delete from "/cgitest/x10/x1001" with version`).Do()
  144. _ = e
  145. _, e = odbci.client.Query(`delete from "/cgitest/x10/x1002" with version`).Do()
  146. _ = e
  147. _, e = odbci.client.Query(`delete from "/cgitest/x10/x1003" with version`).Do()
  148. _ = e
  149. _, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1001"`).Do()
  150. if e != nil {
  151. return e
  152. }
  153. _, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1002"`).Do()
  154. if e != nil {
  155. return e
  156. }
  157. _, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1003"`).Do()
  158. if e != nil {
  159. return e
  160. }
  161. _, e = odbci.client.Query(`drop class if exists "/cgitest/x10"`).Do()
  162. if e != nil {
  163. return e
  164. }
  165. return nil
  166. }
  167. // 新建类
  168. func (odbci *ODBCImporter) createClass(nickname, classname string, fieldinfoslist []*fieldinfo) (err error) {
  169. _, err = odbci.classinfos.GetWithNew(classname, func() (ci *classinfo, err error) {
  170. logger.Info("create class " + classname)
  171. fieldinfos := map[string]*fieldinfo{}
  172. fieldslist := []string{}
  173. keyfields := []string{}
  174. mql := `create class if not exists ` + classname + `(`
  175. if len(fieldinfoslist) > 0 {
  176. field_defines := []string{}
  177. for _, fi := range fieldinfoslist {
  178. field_defines = append(field_defines, fi.fieldname+" "+fi.fieldtype)
  179. fieldslist = append(fieldslist, fi.fieldname)
  180. if fi.keyidx > 0 {
  181. for len(keyfields) < fi.keyidx {
  182. keyfields = append(keyfields, "")
  183. }
  184. keyfields[fi.keyidx-1] = fi.fieldname
  185. }
  186. fieldinfos[fi.fieldname] = fi
  187. }
  188. mql += strings.Join(field_defines, ",")
  189. }
  190. if len(keyfields) > 0 {
  191. mql += ", keys(" + strings.Join(keyfields, ",") + ")"
  192. }
  193. mql += `)with namespace="cgitest" and key=manu and nickname='` + nickname + `'`
  194. _, err = odbci.client.Query(mql).Do()
  195. if err != nil {
  196. return
  197. }
  198. var insertmql string
  199. if len(fieldslist) > 0 {
  200. insertmql = `insert into ` + classname + "(" + strings.Join(fieldslist, ",") + ")values(" + strings.Repeat(",?", len(fieldslist))[1:] + ")"
  201. }
  202. ci = &classinfo{
  203. classname: classname,
  204. nickname: nickname,
  205. fieldinfos: fieldinfos,
  206. keyfields: keyfields,
  207. fieldslist: fieldslist,
  208. insertmql: insertmql,
  209. }
  210. return
  211. })
  212. return
  213. }
  214. // 修改类定义
  215. func (odbci *ODBCImporter) alterClass() (err error) {
  216. return
  217. }