| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670 |
- package datasync
- import (
- "context"
- "fmt"
- "os"
- "regexp"
- "runtime"
- "strings"
- "sync"
- "time"
- "git.wecise.com/wecise/odb-go/dbo"
- "git.wecise.com/wecise/odb-go/odb"
- "git.wecise.com/wecise/odb-go/odbc"
- "gitee.com/wecisecode/util/merrs"
- "gitee.com/wecisecode/util/mfmt"
- "gitee.com/wecisecode/util/rc"
- "github.com/scylladb/go-set/strset"
- "github.com/spf13/cast"
- )
- var mcfg = odbc.Config
- var logger = odbc.Logger
- type DataSync struct {
- odbcFrom odb.Client
- odbcTo odb.Client
- schemaFrom *dbo.Schema
- schemaTo *dbo.Schema
- fromodbserver string
- fromkeyspace string
- fromdc string
- toodbserver string
- tokeyspace string
- todc string
- fromdata []string
- classmapping map[string][]string
- datatimesince time.Duration
- buckettimesince time.Duration
- ctx context.Context
- cancel context.CancelFunc
- wg *sync.WaitGroup
- mutex sync.Mutex
- errs []error
- ctrlrc *rc.RoutinesController
- objectrc *rc.RoutinesController
- bucketrc *rc.RoutinesController
- syncstatus *SyncStatus
- }
- func NewDataSync() *DataSync {
- return &DataSync{}
- }
- func (ds *DataSync) Init() (err error) {
- ds.fromodbserver = mcfg.GetString("datasync.from.odbserver")
- ds.fromkeyspace = mcfg.GetString("datasync.from.keyspace")
- ds.toodbserver = mcfg.GetString("datasync.to.odbserver")
- ds.tokeyspace = mcfg.GetString("datasync.to.keyspace")
- if ds.fromodbserver == "" ||
- ds.fromkeyspace == "" ||
- ds.toodbserver == "" ||
- ds.tokeyspace == "" {
- return odbc.NoConfError.New("need configure settings: datasync.from.odbserver, datasync.from.keyspace, datasync.to.odbserver, datasync.to.keyspace")
- }
- ds.fromdc = mcfg.GetString("datasync.from.dc")
- ds.todc = mcfg.GetString("datasync.to.dc")
- ds.fromdata = mcfg.GetStrings("datasync.from.data")
- ds.classmapping = mcfg.GetMapping("datasync.mapping.class")
- ds.odbcFrom, err = odb.NewClient(&odb.Config{
- Keyspace: ds.fromkeyspace,
- Hosts: strings.Split(ds.fromodbserver, ","),
- })
- if err != nil {
- if strings.Contains(err.Error(), "error: EOF") {
- println("\n!!!should add your ip to odbserver(" + ds.fromodbserver + ") whitelist!!!\n")
- os.Exit(1)
- }
- return merrs.New(err)
- }
- ds.odbcTo, err = odb.NewClient(&odb.Config{
- Keyspace: ds.tokeyspace,
- Hosts: strings.Split(ds.toodbserver, ","),
- })
- if err != nil {
- if strings.Contains(err.Error(), "error: EOF") {
- println("\n!!!should add your ip to odbserver(" + ds.toodbserver + ") whitelist!!!\n")
- os.Exit(1)
- }
- return merrs.New(err)
- }
- ds.schemaFrom = dbo.NewSchema(ds.odbcFrom)
- ds.schemaTo = dbo.NewSchema(ds.odbcTo)
- ctrlthreads := mcfg.GetInt("datasync.ctrl.threads", runtime.GOMAXPROCS(0))
- ds.ctrlrc = rc.NewRoutinesControllerLimit("", ctrlthreads, ctrlthreads*2)
- concurthreads := mcfg.GetInt("datasync.concur.threads", runtime.GOMAXPROCS(0))
- ds.objectrc = rc.NewRoutinesControllerLimit("", concurthreads, concurthreads*2)
- bucketthreads := mcfg.GetInt("datasync.bucket.threads", runtime.GOMAXPROCS(0))
- ds.bucketrc = rc.NewRoutinesControllerLimit("", bucketthreads, bucketthreads*2)
- ds.datatimesince = mcfg.GetDuration("datasync.data.time.since", "365d")
- ds.buckettimesince = mcfg.GetDuration("datasync.bucket.time.since", "30d")
- return nil
- }
- func (ds *DataSync) Run() (done <-chan error) {
- logger.Info(mcfg.Info())
- ret := make(chan error, 1)
- key := regexp.MustCompile(`\W`).ReplaceAllString(strings.Split(ds.fromodbserver, ",")[0]+"_"+ds.fromkeyspace+"_"+strings.Split(ds.toodbserver, ",")[0]+"_"+ds.tokeyspace, "_")
- ds.syncstatus = NewSyncStatus(key)
- if !mcfg.GetBool("reload") {
- e := ds.syncstatus.Load()
- if e != nil {
- ret <- e
- return ret
- }
- }
- go ds.run(ret)
- return ret
- }
- func (ds *DataSync) run(ret chan error) {
- fromdatas := []string{}
- for _, fromdata := range ds.fromdata {
- fromdata = strings.TrimSpace(fromdata)
- if len(fromdata) > 0 {
- fromdatas = append(fromdatas, fromdata)
- }
- }
- if len(fromdatas) == 0 {
- cis, e := ds.odbcFrom.ClassInfo("/", true)
- if e != nil {
- ret <- e
- return
- }
- for _, ci := range cis {
- fromdatas = append(fromdatas, ci.Fullname)
- }
- }
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- ds.ctx = ctx
- ds.cancel = cancel
- logger.Info("resume sync data, from", len(fromdatas), "configure")
- for {
- ds.wg = &sync.WaitGroup{}
- // logger.Trace(1)
- ds.syncstatus.Resume()
- // logger.Trace(2)
- for _, fromdata := range fromdatas {
- mqlfrom := fromdata
- // logger.Trace(3)
- ds.startsyncproc(ds.wg, ds.ctrlrc, func() error {
- logger.Info("sync data, from", mqlfrom)
- return ds.syncdata(mqlfrom)
- })
- }
- ds.wg.Wait()
- ds.syncstatus.WaitSaveDone()
- if len(ds.errs) > 0 {
- e := merrs.New(ds.errs)
- logger.Error(e)
- ret <- e
- return
- }
- logger.Info("total sync data", ds.syncstatus.TotalChunks(), "chunks", ds.syncstatus.TotalRecords(), "records,", "use time:", mfmt.FormatDuration(ds.syncstatus.TotalUseTime()))
- interval := mcfg.GetDuration("datasync.run.interval", 0)
- if interval > 0 {
- time.Sleep(interval)
- } else {
- break
- }
- }
- ret <- nil
- }
- func (ds *DataSync) startsyncproc(wg *sync.WaitGroup, rc *rc.RoutinesController, proc func() error) {
- wg.Add(1)
- // logger.Trace(4)
- e := rc.ConcurCall(1, func() {
- // logger.Trace(5)
- defer wg.Done()
- e := ds.ctx.Err()
- if e != nil {
- logger.Error(e)
- return
- }
- e = proc()
- if e != nil {
- logger.Error(e)
- if !merrs.ContextError.Contains(e) {
- ds.mutex.Lock()
- ds.errs = append(ds.errs, e)
- ds.mutex.Unlock()
- }
- ds.cancel()
- }
- })
- if e != nil {
- // logger.Trace(6)
- ds.mutex.Lock()
- ds.errs = append(ds.errs, merrs.New(e))
- ds.mutex.Unlock()
- wg.Done()
- }
- // logger.Trace(7)
- }
- // 同步一块数据
- // mqlfrom 可以是类名 或 查询语句
- func (ds *DataSync) syncdata(mqlfrom string) error {
- // 统一格式化为查询语句
- logger.Debug("syncdata", mqlfrom)
- mqlfrom = FormatMQL(mqlfrom)
- // 已完成同步进度
- fromclass, fields, condition, e := ds.parseMQL(mqlfrom)
- if e != nil {
- return merrs.New(e)
- }
- cifroms, e := ds.schemaFrom.LoadClassinfos(fromclass)
- if e != nil {
- return merrs.New(e)
- }
- logger.Debug("loadClassinfos", fromclass, len(cifroms))
- for _, v := range cifroms {
- cifrom := v
- ds.startsyncproc(ds.wg, ds.objectrc, func() error {
- return ds.syncclassdata(cifrom, fields, condition)
- })
- }
- return nil
- }
- var reselect = regexp.MustCompile(`(?is)select\s.*`)
- var reselectfromclass = regexp.MustCompile(`(?is)select\s+(.*)\s+from\s+(\S+)(?:\s+(.*)\s*)?`)
- var commentexprs = regexp.MustCompile(`(?s)\/\*(?:[^\*]|\*+[^\*\/])*\*+\/`)
- var commentexprs_2 = regexp.MustCompile(`(?ms)(?:^|\n)\-\-[^\n]*(?:\n|$)`)
- var commentexprs_3 = regexp.MustCompile(`(?ms)(?:^|\n)//[^\n]*(?:\n|$)`)
- func FormatMQL(mql string) string {
- mql = commentexprs.ReplaceAllString(mql, "")
- mql = commentexprs_2.ReplaceAllString(mql, "")
- mql = commentexprs_3.ReplaceAllString(mql, "")
- mql = strings.TrimSpace(mql)
- if !reselect.MatchString(mql) {
- mql = "select * from " + mql
- }
- return mql
- }
- func (ds *DataSync) parseMQL(mql string) (class string, fields, condition string, err error) {
- logger.Debug("parseMQL", mql)
- defer func() {
- logger.Debug("parseMQL return", mql, class, fields, condition)
- }()
- selstmt := reselectfromclass.FindStringSubmatch(mql)
- if len(selstmt) < 3 {
- return "", "", "", merrs.New("from.data select statement error", []string{"mql", mql})
- }
- fields = selstmt[1]
- class = selstmt[2]
- condition = selstmt[3]
- if class == "" {
- return "", "", "", merrs.New("from.data select statement error", []string{"mql", mql})
- }
- return class, fields, condition, nil
- }
- var rewhere = regexp.MustCompile(`(?is)\swhere\s(.*)`)
- func mqlAddVtimeRange(mql, sbeginvtime, sendvtime string) string {
- mqlseg := mql
- segwhere := fmt.Sprint(" where vtime>='", sbeginvtime, "' and vtime<'", sendvtime, "'")
- if rewhere.MatchString(mql) {
- mqlseg = rewhere.ReplaceAllString(mqlseg, segwhere+" and ($1)")
- } else {
- mqlseg = mqlseg + segwhere
- }
- return mqlseg
- }
- func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition string) error {
- logger.Debug("syncclassdata", cifrom.Fullname, fields, condition)
- denyclass := strset.New(mcfg.GetStrings("datasync.deny.class")...)
- logger.Debug("denyclass", denyclass)
- if denyclass.Has(cifrom.Fullname) || denyclass.Has(cifrom.Fullname+"/") {
- logger.Debug(cifrom.Fullname, "in denyclass")
- return nil
- }
- checkdenyclass := cifrom
- for checkdenyclass != nil {
- if denyclass.Has(checkdenyclass.BaseClassFullname() + "/") {
- logger.Debug(checkdenyclass.BaseClassFullname(), "in denyclass")
- return nil
- }
- checkdenyclass = ds.schemaFrom.GetClassInfo(checkdenyclass.BaseClassFullname())
- }
- // 确定目标类已创建
- toclass := ds.classmapping[cifrom.Fullname]
- if len(toclass) > 1 {
- return merrs.New("datasync.mapping.class config error, should be map to one class only", []string{"fromclass", cifrom.Fullname, "toclass", fmt.Sprint(toclass)})
- }
- if len(toclass) == 0 || toclass[0] == "" {
- toclass = []string{cifrom.Fullname}
- }
- cito, e := ds.assureToClass(toclass[0], cifrom)
- if e != nil {
- return merrs.New(e)
- }
- //
- mqlfrom := "select " + fields + " from " + cifrom.Fullname
- if condition != "" {
- mqlfrom += " " + condition
- }
- dc := ds.syncstatus.DoneCount(mqlfrom)
- isrunning := <-dc.isrunning
- if isrunning {
- dc.isrunning <- isrunning
- return nil
- }
- dc.isrunning <- true
- defer func() {
- <-dc.isrunning
- dc.isrunning <- false
- }()
- recordscount := dc.RecordsCount
- sfromvtime := dc.FromVtime
- slastdatavtime := dc.LastDataVtime
- slastsyncvtime := dc.LastSyncVtime
- // 分段获取数据
- fromvtime, _ := time.Parse("2006-01-02 15:04:05", sfromvtime)
- lastdatavtime, _ := time.Parse("2006-01-02 15:04:05.000000", slastdatavtime)
- lastsyncvtime, _ := time.Parse("2006-01-02 15:04:05", slastsyncvtime)
- sincevtime := time.Now().Add(-ds.datatimesince)
- if fromvtime.Before(sincevtime) || lastsyncvtime.Before(fromvtime) || lastdatavtime.Before(fromvtime) {
- ssincevtime := sincevtime.Format("2006-01-02 00:00:00")
- firstdatavtime := time.Now()
- sfirstdatavtime := firstdatavtime.Format("2006-01-02 15:04:05.000000")
- for i := 0; ; i++ {
- mqlseg := mqlAddVtimeRange(mqlfrom, ssincevtime, sfirstdatavtime)
- mqlchunk := mqlseg + " order by vtime limit 1"
- logger.Debug("check first data vtime:", ds.odbcFrom.Config().Keyspace, mqlchunk)
- // 读取源数据
- r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
- if e != nil {
- return merrs.New(e)
- }
- if len(r.Data) == 0 {
- // 没有更多数据
- if i == 0 {
- logger.Info("check first data vtime:", ds.odbcFrom.Config().Keyspace, cifrom.Fullname, "no data")
- ds.syncstatus.RemoveDoneCount(mqlfrom)
- return nil
- }
- logger.Info("check first data vtime:", ds.odbcFrom.Config().Keyspace, cifrom.Fullname, sfirstdatavtime, mqlfrom)
- break
- }
- firstdata := r.Data[0]
- firstdatavtime = firstdata["vtime"].(time.Time)
- sfirstdatavtime = firstdatavtime.Format("2006-01-02 15:04:05.000000")
- logger.Debug("check first data vtime:", ds.odbcFrom.Config().Keyspace, firstdata["class"], firstdata["id"], sfirstdatavtime)
- }
- fromvtime = firstdatavtime
- lastdatavtime = firstdatavtime
- lastsyncvtime = firstdatavtime
- sfromvtime = fromvtime.Format("2006-01-02 15:04:05")
- slastdatavtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
- slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
- recordscount = 0
- // 初始化DataCount进度信息
- dc.RecordsCount = recordscount
- dc.FromVtime = sfromvtime
- dc.LastDataVtime = slastdatavtime
- dc.LastSyncVtime = slastsyncvtime
- }
- // 继续执行相关bucket数据同步
- e = ds.syncbucketdatacontinue(cifrom, cito, mqlfrom)
- if e != nil {
- return merrs.New(e)
- }
- // 继续执行对象数据同步
- interval := mcfg.GetDuration("datasync.run.interval", "1m")
- lastsyncvtime = lastsyncvtime.Add(-interval)
- if lastdatavtime.After(lastsyncvtime) {
- lastsyncvtime = lastdatavtime
- }
- lastsyncvtime = lastsyncvtime.Add(-mcfg.GetDuration("datasync.run.overtime", "30s"))
- nextvtime := lastsyncvtime
- run := true
- for run {
- nextvtime = lastsyncvtime.Add(mcfg.GetDuration("datasync.pagetime", "1d"))
- if time.Now().Before(nextvtime) {
- nextvtime = time.Now()
- run = false
- }
- snextvtime := nextvtime.Format("2006-01-02 15:04:05")
- mqlseg := mqlAddVtimeRange(mqlfrom, slastsyncvtime, snextvtime)
- offset := 0
- for {
- mqlchunk := mqlseg + fmt.Sprint(" limit ", offset, ",", mcfg.GetInt("datasync.pagesize", 50))
- logger.Debug(mqlchunk)
- // 读取源数据
- r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
- if e != nil {
- return merrs.New(e)
- }
- if len(r.Data) == 0 {
- // 没有更多数据
- break
- }
- // 写入目标数据
- for _, data := range r.Data {
- e = ds.insertData(mqlfrom, cifrom, cito, data)
- if e != nil {
- return e
- }
- datavtime := data["vtime"].(time.Time)
- if lastdatavtime.Before(datavtime) {
- lastdatavtime = datavtime
- }
- }
- offset += len(r.Data)
- }
- lastsyncvtime = nextvtime
- slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
- recordscount += int64(offset)
- //
- dc.RecordsCount = recordscount
- dc.LastDataVtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
- dc.LastSyncVtime = slastsyncvtime
- ds.syncstatus.Save(mqlfrom, dc)
- }
- logger.Info("total sync data:", mqlfrom, "vtime:", dc.FromVtime, "~", dc.LastSyncVtime, "records:", dc.RecordsCount, "lastvtime:", dc.LastDataVtime)
- return nil
- }
- func (ds *DataSync) insertData(mqlfrom string, cifrom, cito *dbo.ClassInfoHelper, data map[string]any) error {
- if data["class"] != cito.Fullname {
- data["class"] = cito.Fullname
- }
- logger.Debug("insertData", data["class"], data["id"], data["vtime"])
- vals := []any{}
- for _, fn := range cito.Fieldslist {
- if cito.Fieldinfos[fn].Fieldtype == "bucket" {
- vals = append(vals, nil)
- continue
- }
- v := data[fn]
- if v == nil {
- i := strings.Index(fn, ":")
- if i >= 0 {
- fn = fn[i+1:]
- }
- v = data[fn]
- }
- vals = append(vals, v)
- }
- _, e := ds.odbcTo.Query(cito.Insertmql, vals...).WithContext(ds.ctx).Do()
- if e != nil {
- return merrs.New(e, []string{"mql", cito.Insertmql, "vals", fmt.Sprint(data)})
- }
- e = ds.syncbucketdatanew(cifrom, cito, mqlfrom, data)
- if e != nil {
- return e
- }
- return nil
- }
- func (ds *DataSync) syncbucketdatanew(cifrom, cito *dbo.ClassInfoHelper, mqlfrom string, data map[string]any) error {
- if ds.buckettimesince == 0 {
- return nil
- }
- for _, bf := range cifrom.BucketFields {
- if _, has := data[bf]; has {
- ds.startsyncproc(ds.wg, ds.bucketrc, func() error {
- oid := data["id"].(string)
- buckettype := cast.ToString(cifrom.Fieldinfos[bf].Fieldoption["type"])
- key := buckettype + ":" + cifrom.Fullname + ":" + bf + "[" + oid + "]"
- dc := ds.syncstatus.DoneCount(key)
- dc.BucketClass = cifrom.Fullname
- dc.BucketField = bf
- dc.BucketObjID = oid
- return ds.syncbucketdata(mqlfrom, cifrom, cito, key, dc)
- })
- }
- }
- return nil
- }
- func (ds *DataSync) syncbucketdatacontinue(cifrom, cito *dbo.ClassInfoHelper, mqlfrom string) error {
- if ds.buckettimesince == 0 {
- return nil
- }
- mqlchunk := mqlfrom + " limit 1"
- logger.Debug("check data fields:", ds.odbcFrom.Config().Keyspace, mqlchunk)
- // 读取源数据
- r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
- if e != nil {
- return e
- }
- if len(r.Data) == 0 {
- return nil
- }
- data := r.Data[0]
- for _, bf := range cifrom.BucketFields {
- if _, has := data[bf]; has {
- e := func() error {
- bucketdonecount := ds.syncstatus.DoneCountCopy(func(k string, v *DoneCount) bool {
- return v.BucketClass == cifrom.Fullname && v.BucketField == bf
- })
- for k, v := range bucketdonecount {
- key := k
- dc := v
- ds.startsyncproc(ds.wg, ds.bucketrc, func() error {
- return ds.syncbucketdata(mqlfrom, cifrom, cito, key, dc)
- })
- }
- return nil
- }()
- if e != nil {
- return e
- }
- }
- }
- return nil
- }
- func (ds *DataSync) syncbucketdata(mqlfrom string, cifrom, cito *dbo.ClassInfoHelper, dckey string, dc *DoneCount) error {
- isrunning := <-dc.isrunning
- if isrunning {
- dc.isrunning <- isrunning
- return nil
- }
- dc.isrunning <- true
- defer func() {
- <-dc.isrunning
- dc.isrunning <- false
- }()
- bucketType := cast.ToString(cifrom.Fieldinfos[dc.BucketField].Fieldoption["type"])
- logger.Debug("to sync", bucketType, "data", dc.BucketClass, dc.BucketField, "id:", dc.BucketObjID, mqlfrom)
- recordscount := dc.RecordsCount
- sfromvtime := dc.FromVtime
- slastdatavtime := dc.LastDataVtime
- slastsyncvtime := dc.LastSyncVtime
- // 分段获取数据
- fromvtime, _ := time.Parse("2006-01-02 15:04:05", sfromvtime)
- lastdatavtime, _ := time.Parse("2006-01-02 15:04:05.000000", slastdatavtime)
- lastsyncvtime, _ := time.Parse("2006-01-02 15:04:05", slastsyncvtime)
- sincevtime := time.Now().Add(-ds.buckettimesince)
- if fromvtime.Before(sincevtime) || lastsyncvtime.Before(fromvtime) || lastdatavtime.Before(fromvtime) {
- fromvtime = sincevtime
- lastdatavtime = sincevtime
- lastsyncvtime = sincevtime
- sfromvtime = fromvtime.Format("2006-01-02 15:04:05")
- slastdatavtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
- slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
- recordscount = 0
- // 初始化DataCount进度信息
- dc.RecordsCount = recordscount
- dc.FromVtime = sfromvtime
- dc.LastDataVtime = slastdatavtime
- dc.LastSyncVtime = slastsyncvtime
- }
- // 继续执行数据同步
- interval := mcfg.GetDuration("datasync.run.interval", "1m")
- lastsyncvtime = lastsyncvtime.Add(-interval)
- if lastdatavtime.After(lastsyncvtime) {
- lastsyncvtime = lastdatavtime
- }
- lastsyncvtime = lastsyncvtime.Add(-mcfg.GetDuration("datasync.run.overtime", "30s"))
- nextvtime := lastsyncvtime
- run := true
- for run {
- nextvtime = lastsyncvtime.Add(mcfg.GetDuration("datasync.bucket.pagetime", "1h"))
- if time.Now().Before(nextvtime) {
- nextvtime = time.Now()
- run = false
- }
- snextvtime := nextvtime.Format("2006-01-02 15:04:05")
- offset := 0
- {
- mqlchunk := "select " + dc.BucketField + ".time('" + slastsyncvtime + "','" + snextvtime + "')" + " from " + dc.BucketClass + " where id=?"
- logger.Debug(mqlchunk, dc.BucketObjID)
- // 读取源数据
- r, e := ds.odbcFrom.Query(mqlchunk, dc.BucketObjID).WithContext(ds.ctx).Do()
- if e != nil {
- return merrs.New(e)
- }
- if len(r.Data) == 0 {
- return merrs.New("bucket host data not found id="+dc.BucketObjID, merrs.Map{"mql": mqlchunk})
- }
- idata := r.Data[0][dc.BucketField]
- if idata != nil {
- data := cast.ToSlice(idata)
- ms := []map[string]any{}
- for i := 0; i < len(data); i++ {
- dat := cast.ToSlice(data[i])
- if len(dat) >= 3 {
- m := map[string]any{}
- m["__timestamp__"] = dat[0]
- m["__name__"] = dat[1]
- m["__value__"] = dat[2]
- if len(dat) >= 4 {
- tm := cast.ToStringMap(dat[3])
- for k, v := range tm {
- m[k] = v
- }
- }
- ms = append(ms, m)
- }
- }
- // 写入目标数据
- mqlinsert := "insert into " + dc.BucketClass + "(id," + dc.BucketField + ") values(?,?)"
- _, e = ds.odbcTo.Query(mqlinsert, dc.BucketObjID, ms).WithContext(ds.ctx).Do()
- if e != nil {
- return merrs.New(e)
- }
- lastdatavtime = nextvtime
- offset += len(data)
- }
- }
- lastsyncvtime = nextvtime
- slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
- recordscount += int64(offset)
- //
- dc.RecordsCount = recordscount
- dc.LastDataVtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
- dc.LastSyncVtime = slastsyncvtime
- ds.syncstatus.Save(dckey, dc)
- }
- logger.Info("total sync", bucketType, "data", dc.BucketClass, dc.BucketField, "id:", dc.BucketObjID, recordscount, "records", "to time", dc.LastSyncVtime)
- return nil
- }
- func (ds *DataSync) assureToClass(toclass string, cifrom *dbo.ClassInfoHelper) (cito *dbo.ClassInfoHelper, err error) {
- ddl := cifrom.DDL
- logger.Info("assureToClass", cifrom.Classfullname, toclass)
- if toclass != cifrom.Classfullname {
- re := regexp.MustCompile(`(?i)(create\s+class(?:\s+if\s+not\s+exists)*\s+)([^\()]+)(\s*\(.*)`)
- ddl = re.ReplaceAllString(ddl, "$1"+toclass+"$3")
- }
- cis, e := ds.odbcTo.ClassInfo(toclass, false)
- if e != nil && !merrs.NotExistError.Contains(e) && !strings.Contains(e.Error(), "not exists") {
- return nil, merrs.New(e)
- }
- if len(cis) == 0 {
- logger.Debug("auto create class", ddl)
- _, e = ds.odbcTo.Query(ddl).WithContext(ds.ctx).Do()
- if e != nil {
- return nil, merrs.New(e)
- }
- cis, e = ds.odbcTo.ClassInfo(toclass, false)
- if e != nil {
- return nil, merrs.New(e)
- }
- if len(cis) == 0 {
- return nil, merrs.New("len(cis) == 0")
- }
- }
- cito, e = ds.schemaTo.NewClassinfo(cis[0])
- if e != nil {
- return nil, merrs.New(e)
- }
- logger.Debug("sync to class", cito.Classfullname)
- return cito, nil
- }
|