123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- package importer
- import (
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "git.wecise.com/wecise/cgimport/odbc"
- "git.wecise.com/wecise/odb-go/odb"
- "git.wecise.com/wecise/util/cmap"
- "git.wecise.com/wecise/util/merrs"
- )
- type fieldinfo struct {
- fieldname string
- fieldtype string
- keyidx int // 主键顺序值,0为非主键
- datakey string // 对应数据中的键名
- }
- type classinfo struct {
- classname string
- nickname string
- fieldinfos map[string]*fieldinfo
- keyfields []string
- fieldslist []string
- insertmql string
- insertcount int64
- lastlogtime time.Time
- lastlogicount int64
- mutex sync.Mutex
- }
- type ODBCImporter struct {
- client odb.Client
- classinfos cmap.ConcurrentMap[string, *classinfo]
- }
- func NewODBCImporter() *ODBCImporter {
- return &ODBCImporter{
- client: odbc.ODBC(),
- classinfos: cmap.New[string, *classinfo](),
- }
- }
- // 根据数据修正类定义
- func (odbci *ODBCImporter) ReviseClassStruct(data map[string]any) (err error) {
- e := odbci.createClass("cgitest", "/cgitest", nil)
- if e != nil {
- return e
- }
- e = odbci.createClass("x10", "/cgitest/x10", nil)
- if e != nil {
- return e
- }
- e = odbci.createClass("x1001", "/cgitest/x10/x1001", []*fieldinfo{
- {fieldname: "uniqueid", datakey: "UNIQUEID", fieldtype: "varchar", keyidx: 1},
- {fieldname: "distname", datakey: "BASENAME", fieldtype: "varchar"},
- })
- if e != nil {
- return e
- }
- e = odbci.createClass("x1002", "/cgitest/x10/x1002", []*fieldinfo{
- {fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1},
- {fieldname: "distname", datakey: "distName", fieldtype: "varchar"},
- })
- if e != nil {
- return e
- }
- e = odbci.createClass("x1003", "/cgitest/x10/x1003", []*fieldinfo{
- {fieldname: "fromuniqueid", datakey: "FROMUNIQUEID", fieldtype: "varchar", keyidx: 1},
- {fieldname: "touniqueid", datakey: "TOUNIQUEID", fieldtype: "varchar", keyidx: 2},
- })
- if e != nil {
- return e
- }
- return
- }
- // 插入数据
- func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
- ci := odbci.classinfos.GetIFPresent(classname)
- if ci == nil {
- return merrs.NewError("class not defined " + classname)
- }
- if ci.insertmql == "" {
- return merrs.NewError("class no fields to insert " + classname)
- }
- values := []any{}
- for _, fn := range ci.fieldslist {
- fi := ci.fieldinfos[fn]
- v := data[fi.datakey]
- values = append(values, v)
- }
- _, err = odbci.client.Query(ci.insertmql, values...).Do()
- if err != nil {
- return
- }
- atomic.AddInt64(&ci.insertcount, 1)
- ci.mutex.Lock()
- if time.Since(ci.lastlogtime) > 5*time.Second && ci.lastlogicount != ci.insertcount {
- ci.lastlogtime = time.Now()
- ci.lastlogicount = ci.insertcount
- logger.Info("class", ci.classname, "import", ci.insertcount, "records")
- }
- ci.mutex.Unlock()
- return
- }
- func (odbci *ODBCImporter) done() {
- odbci.classinfos.Fetch(func(cn string, ci *classinfo) bool {
- ci.mutex.Lock()
- ci.lastlogtime = time.Now()
- ci.lastlogicount = ci.insertcount
- logger.Info("class", ci.classname, "import", ci.insertcount, "records")
- ci.mutex.Unlock()
- return true
- })
- }
- func (odbci *ODBCImporter) reload() error {
- _, e := odbci.client.Query(`delete from "/cgitest/x10/x1001" with version`).Do()
- _ = e
- _, e = odbci.client.Query(`delete from "/cgitest/x10/x1002" with version`).Do()
- _ = e
- _, e = odbci.client.Query(`delete from "/cgitest/x10/x1003" with version`).Do()
- _ = e
- _, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1001"`).Do()
- if e != nil {
- return e
- }
- _, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1002"`).Do()
- if e != nil {
- return e
- }
- _, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1003"`).Do()
- if e != nil {
- return e
- }
- _, e = odbci.client.Query(`drop class if exists "/cgitest/x10"`).Do()
- if e != nil {
- return e
- }
- return nil
- }
- // 新建类
- func (odbci *ODBCImporter) createClass(nickname, classname string, fieldinfoslist []*fieldinfo) (err error) {
- _, err = odbci.classinfos.GetWithNew(classname, func() (ci *classinfo, err error) {
- logger.Info("create class " + classname)
- fieldinfos := map[string]*fieldinfo{}
- fieldslist := []string{}
- keyfields := []string{}
- mql := `create class if not exists ` + classname + `(`
- if len(fieldinfoslist) > 0 {
- field_defines := []string{}
- for _, fi := range fieldinfoslist {
- field_defines = append(field_defines, fi.fieldname+" "+fi.fieldtype)
- fieldslist = append(fieldslist, fi.fieldname)
- if fi.keyidx > 0 {
- for len(keyfields) < fi.keyidx {
- keyfields = append(keyfields, "")
- }
- keyfields[fi.keyidx-1] = fi.fieldname
- }
- fieldinfos[fi.fieldname] = fi
- }
- mql += strings.Join(field_defines, ",")
- }
- if len(keyfields) > 0 {
- mql += ", keys(" + strings.Join(keyfields, ",") + ")"
- }
- mql += `)with namespace="cgitest" and key=manu and nickname='` + nickname + `'`
- _, err = odbci.client.Query(mql).Do()
- if err != nil {
- return
- }
- var insertmql string
- if len(fieldslist) > 0 {
- insertmql = `insert into ` + classname + "(" + strings.Join(fieldslist, ",") + ")values(" + strings.Repeat(",?", len(fieldslist))[1:] + ")"
- }
- ci = &classinfo{
- classname: classname,
- nickname: nickname,
- fieldinfos: fieldinfos,
- keyfields: keyfields,
- fieldslist: fieldslist,
- insertmql: insertmql,
- }
- return
- })
- return
- }
- // 修改类定义
- func (odbci *ODBCImporter) alterClass() (err error) {
- return
- }
|