|
@@ -0,0 +1,618 @@
|
|
|
+package datasync
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "fmt"
|
|
|
+ "os"
|
|
|
+ "regexp"
|
|
|
+ "runtime"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "git.wecise.com/wecise/odb-go/dbo"
|
|
|
+ "git.wecise.com/wecise/odb-go/odb"
|
|
|
+ "git.wecise.com/wecise/odb-go/odbc"
|
|
|
+ "github.com/scylladb/go-set/strset"
|
|
|
+ "github.com/spf13/cast"
|
|
|
+ "github.com/wecisecode/util/merrs"
|
|
|
+ "github.com/wecisecode/util/mfmt"
|
|
|
+ "github.com/wecisecode/util/rc"
|
|
|
+)
|
|
|
+
|
|
|
+var mcfg = odbc.Config
|
|
|
+var logger = odbc.Logger
|
|
|
+
|
|
|
+type DataSync struct {
|
|
|
+ odbcFrom odb.Client
|
|
|
+ odbcTo odb.Client
|
|
|
+ schemaFrom *dbo.Schema
|
|
|
+ schemaTo *dbo.Schema
|
|
|
+ fromodbserver string
|
|
|
+ fromkeyspace string
|
|
|
+ fromdc string
|
|
|
+ toodbserver string
|
|
|
+ tokeyspace string
|
|
|
+ todc string
|
|
|
+ fromdata []string
|
|
|
+ classmapping map[string]string
|
|
|
+ datatimesince time.Duration
|
|
|
+ buckettimesince time.Duration
|
|
|
+ ctx context.Context
|
|
|
+ cancel context.CancelFunc
|
|
|
+ wg *sync.WaitGroup
|
|
|
+ mutex sync.Mutex
|
|
|
+ errs []error
|
|
|
+ ctrlrc *rc.RoutinesController
|
|
|
+ objectrc *rc.RoutinesController
|
|
|
+ bucketrc *rc.RoutinesController
|
|
|
+ syncstatus *SyncStatus
|
|
|
+}
|
|
|
+
|
|
|
+func NewDataSync() *DataSync {
|
|
|
+ return &DataSync{}
|
|
|
+}
|
|
|
+
|
|
|
+func (ds *DataSync) Init() (err error) {
|
|
|
+ ds.fromodbserver = mcfg.GetString("datasync.from.odbserver")
|
|
|
+ ds.fromkeyspace = mcfg.GetString("datasync.from.keyspace")
|
|
|
+ ds.toodbserver = mcfg.GetString("datasync.to.odbserver")
|
|
|
+ ds.tokeyspace = mcfg.GetString("datasync.to.keyspace")
|
|
|
+ if ds.fromodbserver == "" ||
|
|
|
+ ds.fromkeyspace == "" ||
|
|
|
+ ds.toodbserver == "" ||
|
|
|
+ ds.tokeyspace == "" {
|
|
|
+ return odbc.NoConfError.New("need configure settings: datasync.from.odbserver, datasync.from.keyspace, datasync.to.odbserver, datasync.to.keyspace")
|
|
|
+ }
|
|
|
+ ds.fromdc = mcfg.GetString("datasync.from.dc")
|
|
|
+ ds.todc = mcfg.GetString("datasync.to.dc")
|
|
|
+ ds.fromdata = mcfg.GetStrings("datasync.from.data")
|
|
|
+ ds.classmapping = mcfg.GetMapping("datasync.mapping.class")
|
|
|
+ ds.odbcFrom, err = odb.NewClient(&odb.Config{
|
|
|
+ Keyspace: ds.fromkeyspace,
|
|
|
+ Hosts: strings.Split(ds.fromodbserver, ","),
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ if strings.Contains(err.Error(), "error: EOF") {
|
|
|
+ println("\n!!!should add your ip to odbserver(" + ds.fromodbserver + ") whitelist!!!\n")
|
|
|
+ os.Exit(1)
|
|
|
+ }
|
|
|
+ return merrs.New(err)
|
|
|
+ }
|
|
|
+ ds.odbcTo, err = odb.NewClient(&odb.Config{
|
|
|
+ Keyspace: ds.tokeyspace,
|
|
|
+ Hosts: strings.Split(ds.toodbserver, ","),
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ if strings.Contains(err.Error(), "error: EOF") {
|
|
|
+ println("\n!!!should add your ip to odbserver(" + ds.toodbserver + ") whitelist!!!\n")
|
|
|
+ os.Exit(1)
|
|
|
+ }
|
|
|
+ return merrs.New(err)
|
|
|
+ }
|
|
|
+ ds.schemaFrom = dbo.NewSchema(ds.odbcFrom)
|
|
|
+ ds.schemaTo = dbo.NewSchema(ds.odbcTo)
|
|
|
+ ctrlthreads := mcfg.GetInt("datasync.ctrl.threads", runtime.GOMAXPROCS(0))
|
|
|
+ ds.ctrlrc = rc.NewRoutinesControllerLimit("", ctrlthreads, ctrlthreads*2)
|
|
|
+ concurthreads := mcfg.GetInt("datasync.concur.threads", runtime.GOMAXPROCS(0))
|
|
|
+ ds.objectrc = rc.NewRoutinesControllerLimit("", concurthreads, concurthreads*2)
|
|
|
+ bucketthreads := mcfg.GetInt("datasync.bucket.threads", runtime.GOMAXPROCS(0))
|
|
|
+ ds.bucketrc = rc.NewRoutinesControllerLimit("", bucketthreads, bucketthreads*2)
|
|
|
+ ds.datatimesince = mcfg.GetDuration("datasync.data.time.since", "365d")
|
|
|
+ ds.buckettimesince = mcfg.GetDuration("datasync.bucket.time.since", "30d")
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ds *DataSync) Run() (done <-chan error) {
|
|
|
+ ret := make(chan error, 1)
|
|
|
+ key := regexp.MustCompile(`\W`).ReplaceAllString(strings.Split(ds.fromodbserver, ",")[0]+"_"+ds.fromkeyspace+"_"+strings.Split(ds.toodbserver, ",")[0]+"_"+ds.tokeyspace, "_")
|
|
|
+ ds.syncstatus = NewSyncStatus(key)
|
|
|
+ if !mcfg.GetBool("reload") {
|
|
|
+ ds.syncstatus.Load()
|
|
|
+ }
|
|
|
+ go ds.run(ret)
|
|
|
+ return ret
|
|
|
+}
|
|
|
+
|
|
|
+func (ds *DataSync) run(ret chan error) {
|
|
|
+ fromdatas := []string{}
|
|
|
+ for _, fromdata := range ds.fromdata {
|
|
|
+ fromdata = strings.TrimSpace(fromdata)
|
|
|
+ if len(fromdata) > 0 {
|
|
|
+ fromdatas = append(fromdatas, fromdata)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(fromdatas) == 0 {
|
|
|
+ cis, e := ds.odbcFrom.ClassInfo("/", true)
|
|
|
+ if e != nil {
|
|
|
+ ret <- e
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for _, ci := range cis {
|
|
|
+ fromdatas = append(fromdatas, ci.Fullname)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ctx, cancel := context.WithCancel(context.Background())
|
|
|
+ defer cancel()
|
|
|
+ ds.ctx = ctx
|
|
|
+ ds.cancel = cancel
|
|
|
+ for {
|
|
|
+ ds.wg = &sync.WaitGroup{}
|
|
|
+ ds.syncstatus.Resume()
|
|
|
+ for _, fromdata := range fromdatas {
|
|
|
+ mqlfrom := fromdata
|
|
|
+ ds.startsyncproc(ds.wg, ds.ctrlrc, func() error {
|
|
|
+ return ds.syncdata(mqlfrom)
|
|
|
+ })
|
|
|
+ }
|
|
|
+ ds.wg.Wait()
|
|
|
+ ds.syncstatus.WaitSaveDone()
|
|
|
+ if len(ds.errs) > 0 {
|
|
|
+ ret <- merrs.New(ds.errs)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ logger.Info("total sync data", ds.syncstatus.TotalChunks(), "chunks", ds.syncstatus.TotalRecords(), "records,", "use time:", mfmt.FormatDuration(ds.syncstatus.TotalUseTime()))
|
|
|
+ interval := mcfg.GetDuration("datasync.run.interval", 0)
|
|
|
+ if interval > 0 {
|
|
|
+ time.Sleep(interval)
|
|
|
+ } else {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ret <- nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ds *DataSync) startsyncproc(wg *sync.WaitGroup, rc *rc.RoutinesController, proc func() error) {
|
|
|
+ wg.Add(1)
|
|
|
+ e := rc.ConcurCall(1, func() {
|
|
|
+ defer wg.Done()
|
|
|
+ if ds.ctx.Err() == nil {
|
|
|
+ e := proc()
|
|
|
+ if e != nil {
|
|
|
+ if !merrs.ContextError.Contains(e) {
|
|
|
+ ds.mutex.Lock()
|
|
|
+ ds.errs = append(ds.errs, e)
|
|
|
+ ds.mutex.Unlock()
|
|
|
+ }
|
|
|
+ ds.cancel()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ if e != nil {
|
|
|
+ ds.mutex.Lock()
|
|
|
+ ds.errs = append(ds.errs, e)
|
|
|
+ ds.mutex.Unlock()
|
|
|
+ wg.Done()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// 同步一块数据
|
|
|
+// mqlfrom 可以是类名 或 查询语句
|
|
|
+func (ds *DataSync) syncdata(mqlfrom string) error {
|
|
|
+ // 同一格式化为查询语句
|
|
|
+ mqlfrom = FormatMQL(mqlfrom)
|
|
|
+ // 已完成同步进度
|
|
|
+ fromclass, fields, condition, e := ds.LastSyncProgress(mqlfrom)
|
|
|
+ if e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ cifroms, e := ds.schemaFrom.LoadClassinfos(fromclass)
|
|
|
+ if e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ for _, v := range cifroms {
|
|
|
+ cifrom := v
|
|
|
+ ds.startsyncproc(ds.wg, ds.objectrc, func() error {
|
|
|
+ return ds.syncclassdata(cifrom, fields, condition)
|
|
|
+ })
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+var reselect = regexp.MustCompile(`(?is)select\s.*`)
|
|
|
+var reselectfromclass = regexp.MustCompile(`(?is)select\s+(.*)\s+from\s+(\S+)(?:\s+(.*)\s*)?`)
|
|
|
+var commentexprs = regexp.MustCompile(`(?s)\/\*(?:[^\*]|\*+[^\*\/])*\*+\/`)
|
|
|
+var commentexprs_2 = regexp.MustCompile(`(?ms)(?:^|\n)\-\-[^\n]*(?:\n|$)`)
|
|
|
+var commentexprs_3 = regexp.MustCompile(`(?ms)(?:^|\n)//[^\n]*(?:\n|$)`)
|
|
|
+
|
|
|
+func FormatMQL(mql string) string {
|
|
|
+ mql = commentexprs.ReplaceAllString(mql, "")
|
|
|
+ mql = commentexprs_2.ReplaceAllString(mql, "")
|
|
|
+ mql = commentexprs_3.ReplaceAllString(mql, "")
|
|
|
+ mql = strings.TrimSpace(mql)
|
|
|
+ if !reselect.MatchString(mql) {
|
|
|
+ mql = "select * from " + mql
|
|
|
+ }
|
|
|
+ return mql
|
|
|
+}
|
|
|
+
|
|
|
+func (ds *DataSync) LastSyncProgress(mql string) (class string, fields, condition string, err error) {
|
|
|
+ selstmt := reselectfromclass.FindStringSubmatch(mql)
|
|
|
+ if len(selstmt) < 3 {
|
|
|
+ return "", "", "", merrs.New("from.data select statement error", []string{"mql", mql})
|
|
|
+ }
|
|
|
+ fields = selstmt[1]
|
|
|
+ class = selstmt[2]
|
|
|
+ condition = selstmt[3]
|
|
|
+ if class == "" {
|
|
|
+ return "", "", "", merrs.New("from.data select statement error", []string{"mql", mql})
|
|
|
+ }
|
|
|
+ return class, fields, condition, nil
|
|
|
+}
|
|
|
+
|
|
|
+var rewhere = regexp.MustCompile(`(?is)\swhere\s(.*)`)
|
|
|
+
|
|
|
+func mqlAddVtimeRange(mql, sbeginvtime, sendvtime string) string {
|
|
|
+ mqlseg := mql
|
|
|
+ segwhere := fmt.Sprint(" where vtime>='", sbeginvtime, "' and vtime<'", sendvtime, "'")
|
|
|
+ if rewhere.MatchString(mql) {
|
|
|
+ mqlseg = rewhere.ReplaceAllString(mqlseg, segwhere+" and ($1)")
|
|
|
+ } else {
|
|
|
+ mqlseg = mqlseg + segwhere
|
|
|
+ }
|
|
|
+ return mqlseg
|
|
|
+}
|
|
|
+
|
|
|
+func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition string) error {
|
|
|
+ denyclass := strset.New(mcfg.GetStrings("datasync.deny.class")...)
|
|
|
+ if denyclass.Has(cifrom.Fullname) || denyclass.Has(cifrom.Fullname+"/") {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ checkdenyclass := cifrom
|
|
|
+ for checkdenyclass != nil {
|
|
|
+ if denyclass.Has(checkdenyclass.BaseClassFullname() + "/") {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ checkdenyclass = ds.schemaFrom.GetClassInfo(checkdenyclass.BaseClassFullname())
|
|
|
+ }
|
|
|
+ // 确定目标类已创建
|
|
|
+ toclass := ds.classmapping[cifrom.Fullname]
|
|
|
+ if toclass == "" {
|
|
|
+ toclass = cifrom.Fullname
|
|
|
+ }
|
|
|
+ cito, e := ds.assureToClass(toclass, cifrom)
|
|
|
+ if e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ //
|
|
|
+ mqlfrom := "select " + fields + " from " + cifrom.Fullname
|
|
|
+ if condition != "" {
|
|
|
+ mqlfrom += " " + condition
|
|
|
+ }
|
|
|
+ dc := ds.syncstatus.DoneCount(mqlfrom)
|
|
|
+ isrunning := <-dc.isrunning
|
|
|
+ if isrunning {
|
|
|
+ dc.isrunning <- isrunning
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ dc.isrunning <- true
|
|
|
+ defer func() {
|
|
|
+ <-dc.isrunning
|
|
|
+ dc.isrunning <- false
|
|
|
+ }()
|
|
|
+ recordscount := dc.RecordsCount
|
|
|
+ sfromvtime := dc.FromVtime
|
|
|
+ slastdatavtime := dc.LastDataVtime
|
|
|
+ slastsyncvtime := dc.LastSyncVtime
|
|
|
+ // 分段获取数据
|
|
|
+ fromvtime, _ := time.Parse("2006-01-02 15:04:05", sfromvtime)
|
|
|
+ lastdatavtime, _ := time.Parse("2006-01-02 15:04:05.000000", slastdatavtime)
|
|
|
+ lastsyncvtime, _ := time.Parse("2006-01-02 15:04:05", slastsyncvtime)
|
|
|
+ sincevtime := time.Now().Add(-ds.datatimesince)
|
|
|
+ if fromvtime.Before(sincevtime) || lastsyncvtime.Before(fromvtime) || lastdatavtime.Before(fromvtime) {
|
|
|
+ ssincevtime := sincevtime.Format("2006-01-02 00:00:00")
|
|
|
+ firstdatavtime := time.Now()
|
|
|
+ sfirstdatavtime := firstdatavtime.Format("2006-01-02 15:04:05.000000")
|
|
|
+ for i := 0; ; i++ {
|
|
|
+ mqlseg := mqlAddVtimeRange(mqlfrom, ssincevtime, sfirstdatavtime)
|
|
|
+ mqlchunk := mqlseg + fmt.Sprint(" order by vtime limit 1")
|
|
|
+ logger.Debug("check first data vtime:", ds.odbcFrom.Config().Keyspace, mqlchunk)
|
|
|
+ // 读取源数据
|
|
|
+ r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
|
|
|
+ if e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ if len(r.Data) == 0 {
|
|
|
+ // 没有更多数据
|
|
|
+ if i == 0 {
|
|
|
+ logger.Info("check first data vtime:", ds.odbcFrom.Config().Keyspace, cifrom.Fullname, "no data")
|
|
|
+ ds.syncstatus.RemoveDoneCount(mqlfrom)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ logger.Info("check first data vtime:", ds.odbcFrom.Config().Keyspace, cifrom.Fullname, sfirstdatavtime, mqlfrom)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ firstdata := r.Data[0]
|
|
|
+ firstdatavtime = firstdata["vtime"].(time.Time)
|
|
|
+ sfirstdatavtime = firstdatavtime.Format("2006-01-02 15:04:05.000000")
|
|
|
+ logger.Debug("check first data vtime:", ds.odbcFrom.Config().Keyspace, firstdata["class"], firstdata["id"], sfirstdatavtime)
|
|
|
+ }
|
|
|
+ fromvtime = firstdatavtime
|
|
|
+ lastdatavtime = firstdatavtime
|
|
|
+ lastsyncvtime = firstdatavtime
|
|
|
+ sfromvtime = fromvtime.Format("2006-01-02 15:04:05")
|
|
|
+ slastdatavtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
|
|
|
+ slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
|
|
|
+ recordscount = 0
|
|
|
+ // 初始化DataCount进度信息
|
|
|
+ dc.RecordsCount = recordscount
|
|
|
+ dc.FromVtime = sfromvtime
|
|
|
+ dc.LastDataVtime = slastdatavtime
|
|
|
+ dc.LastSyncVtime = slastsyncvtime
|
|
|
+ }
|
|
|
+ // 继续执行相关bucket数据同步
|
|
|
+ e = ds.syncbucketdatacontinue(cifrom, cito, mqlfrom)
|
|
|
+ if e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ // 继续执行对象数据同步
|
|
|
+ nextvtime := lastsyncvtime
|
|
|
+ for {
|
|
|
+ nextvtime = lastsyncvtime.Add(mcfg.GetDuration("datasync.pagetime", "1d"))
|
|
|
+ if time.Now().Before(nextvtime) {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ snextvtime := nextvtime.Format("2006-01-02 15:04:05")
|
|
|
+ mqlseg := mqlAddVtimeRange(mqlfrom, slastsyncvtime, snextvtime)
|
|
|
+ offset := 0
|
|
|
+ for {
|
|
|
+ mqlchunk := mqlseg + fmt.Sprint(" limit ", offset, ",", mcfg.GetInt("datasync.pagesize", 50))
|
|
|
+ logger.Debug(mqlchunk)
|
|
|
+ // 读取源数据
|
|
|
+ r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
|
|
|
+ if e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ if len(r.Data) == 0 {
|
|
|
+ // 没有更多数据
|
|
|
+ break
|
|
|
+ }
|
|
|
+ // 写入目标数据
|
|
|
+ for _, data := range r.Data {
|
|
|
+ e = ds.insertData(mqlfrom, cifrom, cito, data)
|
|
|
+ if e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ datavtime := data["vtime"].(time.Time)
|
|
|
+ if lastdatavtime.Before(datavtime) {
|
|
|
+ lastdatavtime = datavtime
|
|
|
+ }
|
|
|
+ }
|
|
|
+ offset += len(r.Data)
|
|
|
+ }
|
|
|
+ lastsyncvtime = nextvtime
|
|
|
+ slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
|
|
|
+ recordscount += int64(offset)
|
|
|
+ //
|
|
|
+ dc.RecordsCount = recordscount
|
|
|
+ dc.LastDataVtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
|
|
|
+ dc.LastSyncVtime = slastsyncvtime
|
|
|
+ ds.syncstatus.Save(mqlfrom, dc)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ds *DataSync) insertData(mqlfrom string, cifrom, cito *dbo.ClassInfoHelper, data map[string]any) error {
|
|
|
+ logger.Debug(data["class"], data["id"], data["vtime"])
|
|
|
+ vals := []any{}
|
|
|
+ for _, fn := range cito.Fieldslist {
|
|
|
+ if cito.Fieldinfos[fn].Fieldtype == "bucket" {
|
|
|
+ vals = append(vals, nil)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ v := data[fn]
|
|
|
+ if v == nil {
|
|
|
+ i := strings.Index(fn, ":")
|
|
|
+ if i >= 0 {
|
|
|
+ fn = fn[i+1:]
|
|
|
+ }
|
|
|
+ v = data[fn]
|
|
|
+ }
|
|
|
+ vals = append(vals, v)
|
|
|
+ }
|
|
|
+ _, e := ds.odbcTo.Query(cito.Insertmql, vals...).WithContext(ds.ctx).Do()
|
|
|
+ if e != nil {
|
|
|
+ return merrs.New(e, []string{"mql", cito.Insertmql, "vals", fmt.Sprint(data)})
|
|
|
+ }
|
|
|
+ e = ds.syncbucketdatanew(cifrom, cito, mqlfrom, data)
|
|
|
+ if e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ds *DataSync) syncbucketdatanew(cifrom, cito *dbo.ClassInfoHelper, mqlfrom string, data map[string]any) error {
|
|
|
+ if ds.buckettimesince == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ for _, bf := range cifrom.BucketFields {
|
|
|
+ if _, has := data[bf]; has {
|
|
|
+ ds.startsyncproc(ds.wg, ds.bucketrc, func() error {
|
|
|
+ oid := data["id"].(string)
|
|
|
+ buckettype := cast.ToString(cifrom.Fieldinfos[bf].Fieldoption["type"])
|
|
|
+ key := buckettype + ":" + cifrom.Fullname + ":" + bf + "[" + oid + "]"
|
|
|
+ dc := ds.syncstatus.DoneCount(key)
|
|
|
+ dc.BucketClass = cifrom.Fullname
|
|
|
+ dc.BucketField = bf
|
|
|
+ dc.BucketObjID = oid
|
|
|
+ return ds.syncbucketdata(mqlfrom, cifrom, cito, key, dc)
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ds *DataSync) syncbucketdatacontinue(cifrom, cito *dbo.ClassInfoHelper, mqlfrom string) error {
|
|
|
+ if ds.buckettimesince == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ mqlchunk := mqlfrom + fmt.Sprint(" limit 1")
|
|
|
+ logger.Debug("check data fields:", ds.odbcFrom.Config().Keyspace, mqlchunk)
|
|
|
+ // 读取源数据
|
|
|
+ r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
|
|
|
+ if e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ if len(r.Data) == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ data := r.Data[0]
|
|
|
+ for _, bf := range cifrom.BucketFields {
|
|
|
+ if _, has := data[bf]; has {
|
|
|
+ e := func() error {
|
|
|
+ bucketdonecount := ds.syncstatus.DoneCountCopy(func(k string, v *DoneCount) bool {
|
|
|
+ return v.BucketClass == cifrom.Fullname && v.BucketField == bf
|
|
|
+ })
|
|
|
+ for k, v := range bucketdonecount {
|
|
|
+ key := k
|
|
|
+ dc := v
|
|
|
+ ds.startsyncproc(ds.wg, ds.bucketrc, func() error {
|
|
|
+ return ds.syncbucketdata(mqlfrom, cifrom, cito, key, dc)
|
|
|
+ })
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }()
|
|
|
+ if e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ds *DataSync) syncbucketdata(mqlfrom string, cifrom, cito *dbo.ClassInfoHelper, dckey string, dc *DoneCount) error {
|
|
|
+ isrunning := <-dc.isrunning
|
|
|
+ if isrunning {
|
|
|
+ dc.isrunning <- isrunning
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ dc.isrunning <- true
|
|
|
+ defer func() {
|
|
|
+ <-dc.isrunning
|
|
|
+ dc.isrunning <- false
|
|
|
+ }()
|
|
|
+
|
|
|
+ bucketType := cast.ToString(cifrom.Fieldinfos[dc.BucketField].Fieldoption["type"])
|
|
|
+
|
|
|
+ logger.Debug("to sync", bucketType, "data", dc.BucketClass, dc.BucketField, "id:", dc.BucketObjID, mqlfrom)
|
|
|
+
|
|
|
+ recordscount := dc.RecordsCount
|
|
|
+ sfromvtime := dc.FromVtime
|
|
|
+ slastdatavtime := dc.LastDataVtime
|
|
|
+ slastsyncvtime := dc.LastSyncVtime
|
|
|
+ // 分段获取数据
|
|
|
+ fromvtime, _ := time.Parse("2006-01-02 15:04:05", sfromvtime)
|
|
|
+ lastdatavtime, _ := time.Parse("2006-01-02 15:04:05.000000", slastdatavtime)
|
|
|
+ lastsyncvtime, _ := time.Parse("2006-01-02 15:04:05", slastsyncvtime)
|
|
|
+ sincevtime := time.Now().Add(-ds.buckettimesince)
|
|
|
+ if fromvtime.Before(sincevtime) || lastsyncvtime.Before(fromvtime) || lastdatavtime.Before(fromvtime) {
|
|
|
+ fromvtime = sincevtime
|
|
|
+ lastdatavtime = sincevtime
|
|
|
+ lastsyncvtime = sincevtime
|
|
|
+ sfromvtime = fromvtime.Format("2006-01-02 15:04:05")
|
|
|
+ slastdatavtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
|
|
|
+ slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
|
|
|
+ recordscount = 0
|
|
|
+ // 初始化DataCount进度信息
|
|
|
+ dc.RecordsCount = recordscount
|
|
|
+ dc.FromVtime = sfromvtime
|
|
|
+ dc.LastDataVtime = slastdatavtime
|
|
|
+ dc.LastSyncVtime = slastsyncvtime
|
|
|
+ }
|
|
|
+ // 继续执行数据同步
|
|
|
+ count := 0
|
|
|
+ nextvtime := lastsyncvtime
|
|
|
+ for {
|
|
|
+ nextvtime = lastsyncvtime.Add(mcfg.GetDuration("datasync.bucket.pagetime", "1h"))
|
|
|
+ if time.Now().Before(nextvtime) {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ snextvtime := nextvtime.Format("2006-01-02 15:04:05")
|
|
|
+ offset := 0
|
|
|
+ {
|
|
|
+ mqlchunk := "select " + dc.BucketField + ".time('" + slastsyncvtime + "','" + snextvtime + "')" + " from " + dc.BucketClass + " where id=?"
|
|
|
+ logger.Debug(mqlchunk, dc.BucketObjID)
|
|
|
+ // 读取源数据
|
|
|
+ r, e := ds.odbcFrom.Query(mqlchunk, dc.BucketObjID).WithContext(ds.ctx).Do()
|
|
|
+ if e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ if len(r.Data) == 0 {
|
|
|
+ return merrs.New("bucket host data not found id="+dc.BucketObjID, merrs.Map{"mql": mqlchunk})
|
|
|
+ }
|
|
|
+ idata := r.Data[0][dc.BucketField]
|
|
|
+ if idata != nil {
|
|
|
+ data := cast.ToSlice(idata)
|
|
|
+ ms := []map[string]any{}
|
|
|
+ for i := 0; i < len(data); i++ {
|
|
|
+ dat := cast.ToSlice(data[i])
|
|
|
+ if len(dat) >= 3 {
|
|
|
+ m := map[string]any{}
|
|
|
+ m["timestamp"] = dat[0]
|
|
|
+ m["name"] = dat[1]
|
|
|
+ m["value"] = dat[2]
|
|
|
+ if len(dat) >= 4 {
|
|
|
+ tm := cast.ToStringMap(dat[3])
|
|
|
+ for k, v := range tm {
|
|
|
+ m[k] = v
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ms = append(ms, m)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 写入目标数据
|
|
|
+ mqlinsert := "insert into " + dc.BucketClass + "(id," + dc.BucketField + ") values(?,?)"
|
|
|
+ _, e = ds.odbcTo.Query(mqlinsert, dc.BucketObjID, ms).WithContext(ds.ctx).Do()
|
|
|
+ if e != nil {
|
|
|
+ return e
|
|
|
+ }
|
|
|
+ lastdatavtime = nextvtime
|
|
|
+ offset += len(data)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ lastsyncvtime = nextvtime
|
|
|
+ slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
|
|
|
+ recordscount += int64(offset)
|
|
|
+ count += offset
|
|
|
+ //
|
|
|
+ dc.RecordsCount = recordscount
|
|
|
+ dc.LastDataVtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
|
|
|
+ dc.LastSyncVtime = slastsyncvtime
|
|
|
+ ds.syncstatus.Save(dckey, dc)
|
|
|
+ }
|
|
|
+ logger.Debug("end sync", bucketType, "data", dc.BucketClass, dc.BucketField, "id:", dc.BucketObjID)
|
|
|
+
|
|
|
+ if count > 0 {
|
|
|
+ logger.Info("sync", bucketType, "data", dc.BucketClass, dc.BucketField, "id:", dc.BucketObjID, count, "records", "to time", dc.LastSyncVtime)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ds *DataSync) assureToClass(toclass string, cifrom *dbo.ClassInfoHelper) (cito *dbo.ClassInfoHelper, err error) {
|
|
|
+ if toclass != cifrom.Classfullname {
|
|
|
+ return nil, merrs.New("not support class mapping", []string{"toclass", toclass, "fromclass", cifrom.Classfullname})
|
|
|
+ }
|
|
|
+ cis, e := ds.odbcTo.ClassInfo(toclass, false)
|
|
|
+ if e != nil && !merrs.NotExistError.Contains(e) && !strings.Contains(e.Error(), "not exists") {
|
|
|
+ return nil, merrs.New(e)
|
|
|
+ }
|
|
|
+ if len(cis) == 0 {
|
|
|
+ _, e = ds.odbcTo.Query(cifrom.DDL).WithContext(ds.ctx).Do()
|
|
|
+ if e != nil {
|
|
|
+ return nil, merrs.New(e)
|
|
|
+ }
|
|
|
+ cis, e = ds.odbcTo.ClassInfo(toclass, false)
|
|
|
+ if e != nil {
|
|
|
+ return nil, merrs.New(e)
|
|
|
+ }
|
|
|
+ if len(cis) == 0 {
|
|
|
+ return nil, merrs.New("len(cis) == 0")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ cito, e = ds.schemaTo.NewClassinfo(cis[0])
|
|
|
+ if e != nil {
|
|
|
+ return nil, merrs.New(e)
|
|
|
+ }
|
|
|
+ return cito, nil
|
|
|
+}
|