123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- package importer
- import (
- "encoding/json"
- "fmt"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "git.wecise.com/wecise/cgimport/odbc"
- "git.wecise.com/wecise/odb-go/odb"
- "git.wecise.com/wecise/util/cast"
- "git.wecise.com/wecise/util/cmap"
- "git.wecise.com/wecise/util/merrs"
- )
- type classdatainfo struct {
- *classinfo
- insertcount int64
- lastlogtime time.Time
- lastlogicount int64
- mutex sync.Mutex
- }
- var classdatainfos = cmap.New[string, *classdatainfo]()
- type ODBCImporter struct {
- client odb.Client
- }
- func NewODBCImporter() *ODBCImporter {
- odbci := &ODBCImporter{}
- if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
- odbci.client = odbc.ODBC()
- }
- return odbci
- }
- // 根据数据修正类定义
- func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
- for _, classname := range classnames {
- ci := classinfos.GetIFPresent(classname)
- if ci == nil {
- return merrs.NewError("classinfo not found " + classname)
- }
- _, err = classdatainfos.GetWithNew(classname, func() (cdi *classdatainfo, err error) {
- logger.Info("create class " + ci.classname)
- if odbci.client != nil {
- _, err = odbci.client.Query(ci.createmql).Do()
- if err != nil {
- return
- }
- }
- cdi = &classdatainfo{classinfo: ci}
- return
- })
- }
- if odbci.client != nil {
- for _, createedgemql := range createedgemqls {
- _, e := odbci.client.Query(createedgemql).Do()
- if e != nil && !strings.Contains(e.Error(), "already exist") {
- err = e
- return
- }
- }
- }
- return
- }
- func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
- return
- }
- // 插入数据
- func (odbci *ODBCImporter) InsertData(classname string, data map[string]any) (err error) {
- cdi := classdatainfos.GetIFPresent(classname)
- if cdi == nil {
- return merrs.NewError("class not defined " + classname)
- }
- if cdi.insertmql == "" {
- return merrs.NewError("class no fields to insert " + classname)
- }
- values := []any{}
- for _, fn := range cdi.fieldslist {
- fi := cdi.fieldinfos[fn]
- if fi.datakey == "" {
- td := map[string]any{}
- for k, v := range data {
- if cdi.datakey_fieldinfos[k] == nil {
- td[k] = v
- }
- }
- tdbs, e := json.Marshal(td)
- if e != nil {
- return merrs.NewError(e)
- }
- values = append(values, string(tdbs))
- continue
- }
- v := data[fi.datakey]
- switch fi.fieldtype {
- case "set<varchar>":
- v = cast.ToStringSlice(v)
- case "timestamp":
- tv, e := cast.ToDateTimeE(v, "2006-01-02-15.04.05.000000")
- if e != nil {
- return merrs.NewError(fmt.Sprint("can't parse datetime value '", v, "'"))
- }
- v = tv.Format("2006-01-02 15:04:05.000000")
- }
- values = append(values, v)
- }
- if odbci.client != nil {
- _, err = odbci.client.Query(cdi.insertmql, values...).Do()
- if err != nil {
- err = merrs.NewError(err, merrs.SSMaps{{"mql": cdi.insertmql}, {"values": fmt.Sprint(values)}})
- logger.Error(err)
- return
- }
- }
- atomic.AddInt64(&cdi.insertcount, 1)
- cdi.mutex.Lock()
- if time.Since(cdi.lastlogtime) > 5*time.Second && cdi.lastlogicount != cdi.insertcount {
- cdi.lastlogtime = time.Now()
- cdi.lastlogicount = cdi.insertcount
- logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
- }
- cdi.mutex.Unlock()
- return
- }
- func (odbci *ODBCImporter) done() {
- classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
- cdi.mutex.Lock()
- if cdi.lastlogicount != cdi.insertcount {
- cdi.lastlogtime = time.Now()
- cdi.lastlogicount = cdi.insertcount
- logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
- }
- cdi.mutex.Unlock()
- return true
- })
- }
- func (odbci *ODBCImporter) alldone() {
- classdatainfos.Fetch(func(cn string, cdi *classdatainfo) bool {
- cdi.mutex.Lock()
- if cdi.insertcount != 0 {
- cdi.lastlogtime = time.Now()
- cdi.lastlogicount = cdi.insertcount
- logger.Info("class", cdi.classname, "import", cdi.insertcount, "records")
- }
- cdi.mutex.Unlock()
- return true
- })
- }
- func (odbci *ODBCImporter) reload() error {
- if odbci.client != nil {
- for i := len(classnames) - 1; i >= 0; i-- {
- classname := classnames[i]
- ci := classinfos.GetIFPresent(classname)
- if ci == nil {
- continue
- }
- for retry := 2; retry >= 0; retry-- {
- _, e := odbci.client.Query(`delete from "` + ci.classfullname + `" with version`).Do()
- _ = e
- _, e = odbci.client.Query(`drop class if exists "` + ci.classfullname + `"`).Do()
- if e != nil {
- if retry > 0 {
- continue
- }
- return e
- }
- }
- }
- }
- return nil
- }
|