package importer import ( "encoding/json" "strings" "sync" "sync/atomic" "time" "git.wecise.com/wecise/cgimport/odbc" "git.wecise.com/wecise/odb-go/odb" "git.wecise.com/wecise/util/cmap" "git.wecise.com/wecise/util/merrs" ) type fieldinfo struct { fieldname string fieldtype string keyidx int // 主键顺序值,0为非主键 datakey string // 对应数据中的键名 } type classinfo struct { classname string nickname string fieldinfos map[string]*fieldinfo keyfields []string fieldslist []string insertmql string insertcount int64 lastlogtime time.Time lastlogicount int64 mutex sync.Mutex } type ODBCImporter struct { client odb.Client classinfos cmap.ConcurrentMap[string, *classinfo] } func NewODBCImporter() *ODBCImporter { return &ODBCImporter{ client: odbc.ODBC(), classinfos: cmap.New[string, *classinfo](), } } // 根据数据修正类定义 func (odbci *ODBCImporter) ReviseClassStruct(data map[string]any) (classname string, err error) { switch { case data["uniqueId"] != nil: classname = "/cgitest/x10/x1002" case data["UNIQUEID"] != nil: classname = "/cgitest/x10/x1001" case data["FROMUNIQUEID"] != nil: classname = "/cgitest/x10/x1003" default: bs, e := json.MarshalIndent(data, "", " ") if e != nil { err = e return } err = merrs.NewError("no mapping classname", merrs.SSMaps{{"data": string(bs)}}) } err = odbci.createClass("cgitest", "/cgitest", nil) if err != nil { return } err = odbci.createClass("x10", "/cgitest/x10", nil) if err != nil { return } err = odbci.createClass("x1001", "/cgitest/x10/x1001", []*fieldinfo{ {fieldname: "uniqueid", datakey: "UNIQUEID", fieldtype: "varchar", keyidx: 1}, {fieldname: "distname", datakey: "BASENAME", fieldtype: "varchar"}, }) if err != nil { return } err = odbci.createClass("x1002", "/cgitest/x10/x1002", []*fieldinfo{ {fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1}, {fieldname: "distname", datakey: "distName", fieldtype: "varchar"}, }) if err != nil { return } err = odbci.createClass("x1003", "/cgitest/x10/x1003", []*fieldinfo{ {fieldname: "fromuniqueid", datakey: "FROMUNIQUEID", fieldtype: "varchar", keyidx: 1}, {fieldname: "touniqueid", datakey: "TOUNIQUEID", fieldtype: "varchar", keyidx: 2}, }) if err != nil { return } return } // 插入数据 func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) { ci := odbci.classinfos.GetIFPresent(classname) if ci == nil { return merrs.NewError("class not defined " + classname) } if ci.insertmql == "" { return merrs.NewError("class no fields to insert " + classname) } values := []any{} for _, fn := range ci.fieldslist { fi := ci.fieldinfos[fn] v := data[fi.datakey] values = append(values, v) } _, err = odbci.client.Query(ci.insertmql, values...).Do() if err != nil { return } atomic.AddInt64(&ci.insertcount, 1) ci.mutex.Lock() if time.Since(ci.lastlogtime) > 5*time.Second && ci.lastlogicount != ci.insertcount { ci.lastlogtime = time.Now() ci.lastlogicount = ci.insertcount logger.Info("class", ci.classname, "import", ci.insertcount, "records") } ci.mutex.Unlock() return } func (odbci *ODBCImporter) done() { odbci.classinfos.Fetch(func(cn string, ci *classinfo) bool { ci.mutex.Lock() if ci.lastlogicount != ci.insertcount { ci.lastlogtime = time.Now() ci.lastlogicount = ci.insertcount logger.Info("class", ci.classname, "import", ci.insertcount, "records") } ci.mutex.Unlock() return true }) } func (odbci *ODBCImporter) alldone() { odbci.classinfos.Fetch(func(cn string, ci *classinfo) bool { ci.mutex.Lock() if ci.insertcount != 0 { ci.lastlogtime = time.Now() ci.lastlogicount = ci.insertcount logger.Info("class", ci.classname, "import", ci.insertcount, "records") } ci.mutex.Unlock() return true }) } func (odbci *ODBCImporter) reload() error { _, e := odbci.client.Query(`delete from "/cgitest/x10/x1001" with version`).Do() _ = e _, e = odbci.client.Query(`delete from "/cgitest/x10/x1002" with version`).Do() _ = e _, e = odbci.client.Query(`delete from "/cgitest/x10/x1003" with version`).Do() _ = e _, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1001"`).Do() if e != nil { return e } _, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1002"`).Do() if e != nil { return e } _, e = odbci.client.Query(`drop class if exists "/cgitest/x10/x1003"`).Do() if e != nil { return e } _, e = odbci.client.Query(`drop class if exists "/cgitest/x10"`).Do() if e != nil { return e } return nil } // 新建类 func (odbci *ODBCImporter) createClass(nickname, classname string, fieldinfoslist []*fieldinfo) (err error) { _, err = odbci.classinfos.GetWithNew(classname, func() (ci *classinfo, err error) { logger.Info("create class " + classname) fieldinfos := map[string]*fieldinfo{} fieldslist := []string{} keyfields := []string{} mql := `create class if not exists ` + classname + `(` if len(fieldinfoslist) > 0 { field_defines := []string{} for _, fi := range fieldinfoslist { field_defines = append(field_defines, fi.fieldname+" "+fi.fieldtype) fieldslist = append(fieldslist, fi.fieldname) if fi.keyidx > 0 { for len(keyfields) < fi.keyidx { keyfields = append(keyfields, "") } keyfields[fi.keyidx-1] = fi.fieldname } fieldinfos[fi.fieldname] = fi } mql += strings.Join(field_defines, ",") } if len(keyfields) > 0 { mql += ", keys(" + strings.Join(keyfields, ",") + ")" } mql += `)with namespace="cgitest" and key=manu and nickname='` + nickname + `'` _, err = odbci.client.Query(mql).Do() if err != nil { return } var insertmql string if len(fieldslist) > 0 { insertmql = `insert into ` + classname + "(" + strings.Join(fieldslist, ",") + ")values(" + strings.Repeat(",?", len(fieldslist))[1:] + ")" } ci = &classinfo{ classname: classname, nickname: nickname, fieldinfos: fieldinfos, keyfields: keyfields, fieldslist: fieldslist, insertmql: insertmql, } return }) return } // 修改类定义 func (odbci *ODBCImporter) alterClass() (err error) { return }