package importer import ( "encoding/json" "fmt" "strings" "sync" "sync/atomic" "time" "git.wecise.com/wecise/cgimport/odbc" "git.wecise.com/wecise/odb-go/odb" "git.wecise.com/wecise/util/cast" "git.wecise.com/wecise/util/cmap" "git.wecise.com/wecise/util/merrs" ) type classdatainfo struct { *classinfo insertcount int64 lastlogtime time.Time lastlogicount int64 mutex sync.Mutex } var classdatainfos = cmap.New[string, *classdatainfo]() type ODBCImporter struct { client odb.Client } func NewODBCImporter() *ODBCImporter { odbci := &ODBCImporter{} if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 { odbci.client = odbc.ODBC() } return odbci } // 根据数据修正类定义 func (odbci *ODBCImporter) ReviseClassStruct() (err error) { for _, classname := range classnames { ci := classinfos.GetIFPresent(classname) if ci == nil { return merrs.NewError("classinfo not found " + classname) } _, err = classdatainfos.GetWithNew(classname, func() (cdi *classdatainfo, err error) { logger.Info("create class " + ci.classname) if odbci.client != nil { _, err = odbci.client.Query(ci.createmql).Do() if err != nil { return } } cdi = &classdatainfo{classinfo: ci} return }) } if odbci.client != nil { for _, createedgemql := range createedgemqls { _, e := odbci.client.Query(createedgemql).Do() if e != nil && !strings.Contains(e.Error(), "already exist") { err = e return } } } return } func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) { return } // 插入数据 func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) { cdi := classdatainfos.GetIFPresent(classname) if cdi == nil { return merrs.NewError("class not defined " + classname) } if cdi.insertmql == "" { return merrs.NewError("class no fields to insert " + classname) } values := []any{} for _, fn := range cdi.fieldslist { fi := cdi.fieldinfos[fn] if fi.datakey == "" { td := map[string]any{} for k, v := range data { if cdi.datakey_fieldinfos[k] == nil { td[k] = v } } tdbs, e := json.Marshal(td) if e != nil { return merrs.NewError(e) } values = append(values, string(tdbs)) continue } v := data[fi.datakey] switch fi.fieldtype { case "set": v = cast.ToStringSlice(v) case "timestamp": tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000") if e != nil { return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'")) } v = tv.Format("2006-01-02 15:04:05.000000") } values = append(values, v) } if odbci.client != nil { _, err = odbci.client.Query(cdi.insertmql, values...).Do() if err != nil { err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.insertmql}, {"values": fmt.Sprint(values)}}) logger.Error(err) return } } atomic.AddInt64(&cdi.insertcount, 1) cdi.mutex.Lock() if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Info("class", cdi.classname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return } func (odbci *ODBCImporter) done() { classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool { cdi.mutex.Lock() if cdi.lastlogicount != cdi.insertcount { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Info("class", cdi.classname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return true }) } func (odbci *ODBCImporter) alldone() { classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool { cdi.mutex.Lock() if cdi.insertcount != 0 { cdi.lastlogtime = time.Now() cdi.lastlogicount = cdi.insertcount logger.Info("class", cdi.classname, "import", cdi.insertcount, "records") } cdi.mutex.Unlock() return true }) } func (odbci *ODBCImporter) reload() error { if odbci.client != nil { for i := len(classnames) - 1; i >= 0; i-- { classname := classnames[i] ci := classinfos.GetIFPresent(classname) if ci == nil { continue } for retry := 2; retry >= 0; retry-- { _, e := odbci.client.Query(`delete from "` + ci.classfullname + `" with version`).Do() _ = e _, e = odbci.client.Query(`drop class if exists "` + ci.classfullname + `"`).Do() if e != nil { if retry > 0 { continue } return e } } } } return nil }