package importer import ( "strings" "sync" "sync/atomic" "time" "git.wecise.com/wecise/cgimport/odbc" "git.wecise.com/wecise/odb-go/odb" "git.wecise.com/wecise/util/cmap" "git.wecise.com/wecise/util/merrs" ) type fieldinfo struct { fieldname string fieldtype string keyidx int // 主键顺序值,0为非主键 datakey string // 对应数据中的键名 } type classinfo struct { classname string nickname string fieldinfos map[string]*fieldinfo keyfields []string fieldslist []string insertmql string insertcount int64 lastlogtime time.Time mutex sync.Mutex } type ODBCImporter struct { client odb.Client classinfos cmap.ConcurrentMap[string, *classinfo] } func NewODBCImporter() *ODBCImporter { return &ODBCImporter{ client: odbc.ODBC(), classinfos: cmap.New[string, *classinfo](), } } // 根据数据修正类定义 func (odbci *ODBCImporter) ReviseClassStruct(data map[string]any) (err error) { e := odbci.createClass("cgitest", "/cgitest", nil) if e != nil { return e } e = odbci.createClass("x10", "/cgitest/x10", []*fieldinfo{ {fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1}, }) if e != nil { return e } e = odbci.createClass("x1001", "/cgitest/x10/x1001", []*fieldinfo{ {fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1}, {fieldname: "distname", datakey: "distName", fieldtype: "varchar"}, }) if e != nil { return e } e = odbci.createClass("x1002", "/cgitest/x10/x1002", []*fieldinfo{ {fieldname: "uniqueid", datakey: "uniqueId", fieldtype: "varchar", keyidx: 1}, {fieldname: "distname", datakey: "distName", fieldtype: "varchar"}, }) if e != nil { return e } return } // 插入数据 func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) { if data["uniqueId"] == nil { // TODO 开发期只导入部分数据 return } ci := odbci.classinfos.GetIFPresent(classname) if ci == nil { return merrs.NewError("class not defined " + classname) } if ci.insertmql == "" { return merrs.NewError("class no fields to insert " + classname) } values := []any{} for _, fn := range ci.fieldslist { fi := ci.fieldinfos[fn] v := data[fi.datakey] values = append(values, v) } _, err = odbci.client.Query(ci.insertmql, values...).Do() if err != nil { return } atomic.AddInt64(&ci.insertcount, 1) ci.mutex.Lock() if time.Since(ci.lastlogtime) > 5*time.Second { ci.lastlogtime = time.Now() logger.Info("class", ci.classname, "import", ci.insertcount, "records") } ci.mutex.Unlock() return } // 新建类 func (odbci *ODBCImporter) createClass(nickname, classname string, fieldinfoslist []*fieldinfo) (err error) { _, err = odbci.classinfos.GetWithNew(classname, func() (ci *classinfo, err error) { logger.Info("create class " + classname) fieldinfos := map[string]*fieldinfo{} fieldslist := []string{} keyfields := []string{} mql := `create class if not exists ` + classname + `(` if len(fieldinfoslist) > 0 { field_defines := []string{} for _, fi := range fieldinfoslist { field_defines = append(field_defines, fi.fieldname+" "+fi.fieldtype) fieldslist = append(fieldslist, fi.fieldname) if fi.keyidx > 0 { for len(keyfields) < fi.keyidx { keyfields = append(keyfields, "") } keyfields[fi.keyidx-1] = fi.fieldname } fieldinfos[fi.fieldname] = fi } mql += strings.Join(field_defines, ",") } if len(keyfields) > 0 { mql += ", keys(" + strings.Join(keyfields, ",") + ")" } mql += `)with namespace="cgitest" and key=manu and nickname='` + nickname + `'` _, err = odbci.client.Query(mql).Do() if err != nil { return } var insertmql string if len(fieldslist) > 0 { insertmql = `insert into ` + classname + "(" + strings.Join(fieldslist, ",") + ")values(" + strings.Repeat(",?", len(fieldslist))[1:] + ")" } ci = &classinfo{ classname: classname, nickname: nickname, fieldinfos: fieldinfos, keyfields: keyfields, fieldslist: fieldslist, insertmql: insertmql, } return }) return } // 修改类定义 func (odbci *ODBCImporter) alterClass() (err error) { return }