package datasync import ( "context" "fmt" "os" "regexp" "runtime" "strings" "sync" "time" "git.wecise.com/wecise/odb-go/dbo" "git.wecise.com/wecise/odb-go/odb" "git.wecise.com/wecise/odb-go/odbc" "gitee.com/wecisecode/util/merrs" "gitee.com/wecisecode/util/mfmt" "gitee.com/wecisecode/util/rc" "github.com/scylladb/go-set/strset" "github.com/spf13/cast" ) var mcfg = odbc.Config var logger = odbc.Logger type DataSync struct { odbcFrom odb.Client odbcTo odb.Client schemaFrom *dbo.Schema schemaTo *dbo.Schema fromodbserver string fromkeyspace string fromdc string toodbserver string tokeyspace string todc string fromdata []string classmapping map[string][]string datatimesince time.Duration buckettimesince time.Duration ctx context.Context cancel context.CancelFunc wg *sync.WaitGroup mutex sync.Mutex errs []error ctrlrc *rc.RoutinesController objectrc *rc.RoutinesController bucketrc *rc.RoutinesController syncstatus *SyncStatus } func NewDataSync() *DataSync { return &DataSync{} } func (ds *DataSync) Init() (err error) { ds.fromodbserver = mcfg.GetString("datasync.from.odbserver") ds.fromkeyspace = mcfg.GetString("datasync.from.keyspace") ds.toodbserver = mcfg.GetString("datasync.to.odbserver") ds.tokeyspace = mcfg.GetString("datasync.to.keyspace") if ds.fromodbserver == "" || ds.fromkeyspace == "" || ds.toodbserver == "" || ds.tokeyspace == "" { return odbc.NoConfError.New("need configure settings: datasync.from.odbserver, datasync.from.keyspace, datasync.to.odbserver, datasync.to.keyspace") } ds.fromdc = mcfg.GetString("datasync.from.dc") ds.todc = mcfg.GetString("datasync.to.dc") ds.fromdata = mcfg.GetStrings("datasync.from.data") ds.classmapping = mcfg.GetMapping("datasync.mapping.class") ds.odbcFrom, err = odb.NewClient(&odb.Config{ Keyspace: ds.fromkeyspace, Hosts: strings.Split(ds.fromodbserver, ","), }) if err != nil { if strings.Contains(err.Error(), "error: EOF") { println("\n!!!should add your ip to odbserver(" + ds.fromodbserver + ") whitelist!!!\n") os.Exit(1) } return merrs.New(err) } ds.odbcTo, err = odb.NewClient(&odb.Config{ Keyspace: ds.tokeyspace, Hosts: strings.Split(ds.toodbserver, ","), }) if err != nil { if strings.Contains(err.Error(), "error: EOF") { println("\n!!!should add your ip to odbserver(" + ds.toodbserver + ") whitelist!!!\n") os.Exit(1) } return merrs.New(err) } ds.schemaFrom = dbo.NewSchema(ds.odbcFrom) ds.schemaTo = dbo.NewSchema(ds.odbcTo) ctrlthreads := mcfg.GetInt("datasync.ctrl.threads", runtime.GOMAXPROCS(0)) ds.ctrlrc = rc.NewRoutinesControllerLimit("", ctrlthreads, ctrlthreads*2) concurthreads := mcfg.GetInt("datasync.concur.threads", runtime.GOMAXPROCS(0)) ds.objectrc = rc.NewRoutinesControllerLimit("", concurthreads, concurthreads*2) bucketthreads := mcfg.GetInt("datasync.bucket.threads", runtime.GOMAXPROCS(0)) ds.bucketrc = rc.NewRoutinesControllerLimit("", bucketthreads, bucketthreads*2) ds.datatimesince = mcfg.GetDuration("datasync.data.time.since", "365d") ds.buckettimesince = mcfg.GetDuration("datasync.bucket.time.since", "30d") return nil } func (ds *DataSync) Run() (done <-chan error) { logger.Info(mcfg.Info()) ret := make(chan error, 1) key := regexp.MustCompile(`\W`).ReplaceAllString(strings.Split(ds.fromodbserver, ",")[0]+"_"+ds.fromkeyspace+"_"+strings.Split(ds.toodbserver, ",")[0]+"_"+ds.tokeyspace, "_") ds.syncstatus = NewSyncStatus(key) if !mcfg.GetBool("reload") { e := ds.syncstatus.Load() if e != nil { ret <- e return ret } } go ds.run(ret) return ret } func (ds *DataSync) run(ret chan error) { fromdatas := []string{} for _, fromdata := range ds.fromdata { fromdata = strings.TrimSpace(fromdata) if len(fromdata) > 0 { fromdatas = append(fromdatas, fromdata) } } if len(fromdatas) == 0 { cis, e := ds.odbcFrom.ClassInfo("/", true) if e != nil { ret <- e return } for _, ci := range cis { fromdatas = append(fromdatas, ci.Fullname) } } ctx, cancel := context.WithCancel(context.Background()) defer cancel() ds.ctx = ctx ds.cancel = cancel logger.Info("resume sync data, from", len(fromdatas), "configure") for { ds.wg = &sync.WaitGroup{} // logger.Trace(1) ds.syncstatus.Resume() // logger.Trace(2) for _, fromdata := range fromdatas { mqlfrom := fromdata // logger.Trace(3) ds.startsyncproc(ds.wg, ds.ctrlrc, func() error { logger.Info("sync data, from", mqlfrom) return ds.syncdata(mqlfrom) }) } ds.wg.Wait() ds.syncstatus.WaitSaveDone() if len(ds.errs) > 0 { e := merrs.New(ds.errs) logger.Error(e) ret <- e return } logger.Info("total sync data", ds.syncstatus.TotalChunks(), "chunks", ds.syncstatus.TotalRecords(), "records,", "use time:", mfmt.FormatDuration(ds.syncstatus.TotalUseTime())) interval := mcfg.GetDuration("datasync.run.interval", 0) if interval > 0 { time.Sleep(interval) } else { break } } ret <- nil } func (ds *DataSync) startsyncproc(wg *sync.WaitGroup, rc *rc.RoutinesController, proc func() error) { wg.Add(1) // logger.Trace(4) e := rc.ConcurCall(1, func() { // logger.Trace(5) defer wg.Done() e := ds.ctx.Err() if e != nil { logger.Error(e) return } e = proc() if e != nil { logger.Error(e) if !merrs.ContextError.Contains(e) { ds.mutex.Lock() ds.errs = append(ds.errs, e) ds.mutex.Unlock() } ds.cancel() } }) if e != nil { // logger.Trace(6) ds.mutex.Lock() ds.errs = append(ds.errs, merrs.New(e)) ds.mutex.Unlock() wg.Done() } // logger.Trace(7) } // 同步一块数据 // mqlfrom 可以是类名 或 查询语句 func (ds *DataSync) syncdata(mqlfrom string) error { // 统一格式化为查询语句 logger.Debug("syncdata", mqlfrom) mqlfrom = FormatMQL(mqlfrom) // 已完成同步进度 fromclass, fields, condition, e := ds.parseMQL(mqlfrom) if e != nil { return merrs.New(e) } cifroms, e := ds.schemaFrom.LoadClassinfos(fromclass) if e != nil { return merrs.New(e) } logger.Debug("loadClassinfos", fromclass, len(cifroms)) for _, v := range cifroms { cifrom := v ds.startsyncproc(ds.wg, ds.objectrc, func() error { return ds.syncclassdata(cifrom, fields, condition) }) } return nil } var reselect = regexp.MustCompile(`(?is)select\s.*`) var reselectfromclass = regexp.MustCompile(`(?is)select\s+(.*)\s+from\s+(\S+)(?:\s+(.*)\s*)?`) var commentexprs = regexp.MustCompile(`(?s)\/\*(?:[^\*]|\*+[^\*\/])*\*+\/`) var commentexprs_2 = regexp.MustCompile(`(?ms)(?:^|\n)\-\-[^\n]*(?:\n|$)`) var commentexprs_3 = regexp.MustCompile(`(?ms)(?:^|\n)//[^\n]*(?:\n|$)`) func FormatMQL(mql string) string { mql = commentexprs.ReplaceAllString(mql, "") mql = commentexprs_2.ReplaceAllString(mql, "") mql = commentexprs_3.ReplaceAllString(mql, "") mql = strings.TrimSpace(mql) if !reselect.MatchString(mql) { mql = "select * from " + mql } return mql } func (ds *DataSync) parseMQL(mql string) (class string, fields, condition string, err error) { logger.Debug("parseMQL", mql) defer func() { logger.Debug("parseMQL return", mql, class, fields, condition) }() selstmt := reselectfromclass.FindStringSubmatch(mql) if len(selstmt) < 3 { return "", "", "", merrs.New("from.data select statement error", []string{"mql", mql}) } fields = selstmt[1] class = selstmt[2] condition = selstmt[3] if class == "" { return "", "", "", merrs.New("from.data select statement error", []string{"mql", mql}) } return class, fields, condition, nil } var rewhere = regexp.MustCompile(`(?is)\swhere\s(.*)`) func mqlAddVtimeRange(mql, sbeginvtime, sendvtime string) string { mqlseg := mql segwhere := fmt.Sprint(" where vtime>='", sbeginvtime, "' and vtime<'", sendvtime, "'") if rewhere.MatchString(mql) { mqlseg = rewhere.ReplaceAllString(mqlseg, segwhere+" and ($1)") } else { mqlseg = mqlseg + segwhere } return mqlseg } func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition string) error { logger.Debug("syncclassdata", cifrom.Fullname, fields, condition) denyclass := strset.New(mcfg.GetStrings("datasync.deny.class")...) logger.Debug("denyclass", denyclass) if denyclass.Has(cifrom.Fullname) || denyclass.Has(cifrom.Fullname+"/") { logger.Debug(cifrom.Fullname, "in denyclass") return nil } checkdenyclass := cifrom for checkdenyclass != nil { if denyclass.Has(checkdenyclass.BaseClassFullname() + "/") { logger.Debug(checkdenyclass.BaseClassFullname(), "in denyclass") return nil } checkdenyclass = ds.schemaFrom.GetClassInfo(checkdenyclass.BaseClassFullname()) } // 确定目标类已创建 toclass := ds.classmapping[cifrom.Fullname] if len(toclass) > 1 { return merrs.New("datasync.mapping.class config error, should be map to one class only", []string{"fromclass", cifrom.Fullname, "toclass", fmt.Sprint(toclass)}) } if len(toclass) == 0 || toclass[0] == "" { toclass = []string{cifrom.Fullname} } cito, e := ds.assureToClass(toclass[0], cifrom) if e != nil { return merrs.New(e) } // mqlfrom := "select " + fields + " from " + cifrom.Fullname if condition != "" { mqlfrom += " " + condition } dc := ds.syncstatus.DoneCount(mqlfrom) isrunning := <-dc.isrunning if isrunning { dc.isrunning <- isrunning return nil } dc.isrunning <- true defer func() { <-dc.isrunning dc.isrunning <- false }() recordscount := dc.RecordsCount sfromvtime := dc.FromVtime slastdatavtime := dc.LastDataVtime slastsyncvtime := dc.LastSyncVtime // 分段获取数据 fromvtime, _ := time.Parse("2006-01-02 15:04:05", sfromvtime) lastdatavtime, _ := time.Parse("2006-01-02 15:04:05.000000", slastdatavtime) lastsyncvtime, _ := time.Parse("2006-01-02 15:04:05", slastsyncvtime) sincevtime := time.Now().Add(-ds.datatimesince) if fromvtime.Before(sincevtime) || lastsyncvtime.Before(fromvtime) || lastdatavtime.Before(fromvtime) { ssincevtime := sincevtime.Format("2006-01-02 00:00:00") firstdatavtime := time.Now() sfirstdatavtime := firstdatavtime.Format("2006-01-02 15:04:05.000000") for i := 0; ; i++ { mqlseg := mqlAddVtimeRange(mqlfrom, ssincevtime, sfirstdatavtime) mqlchunk := mqlseg + " order by vtime limit 1" logger.Debug("check first data vtime:", ds.odbcFrom.Config().Keyspace, mqlchunk) // 读取源数据 r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do() if e != nil { return merrs.New(e) } if len(r.Data) == 0 { // 没有更多数据 if i == 0 { logger.Info("check first data vtime:", ds.odbcFrom.Config().Keyspace, cifrom.Fullname, "no data") ds.syncstatus.RemoveDoneCount(mqlfrom) return nil } logger.Info("check first data vtime:", ds.odbcFrom.Config().Keyspace, cifrom.Fullname, sfirstdatavtime, mqlfrom) break } firstdata := r.Data[0] firstdatavtime = firstdata["vtime"].(time.Time) sfirstdatavtime = firstdatavtime.Format("2006-01-02 15:04:05.000000") logger.Debug("check first data vtime:", ds.odbcFrom.Config().Keyspace, firstdata["class"], firstdata["id"], sfirstdatavtime) } fromvtime = firstdatavtime lastdatavtime = firstdatavtime lastsyncvtime = firstdatavtime sfromvtime = fromvtime.Format("2006-01-02 15:04:05") slastdatavtime = lastdatavtime.Format("2006-01-02 15:04:05.000000") slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05") recordscount = 0 // 初始化DataCount进度信息 dc.RecordsCount = recordscount dc.FromVtime = sfromvtime dc.LastDataVtime = slastdatavtime dc.LastSyncVtime = slastsyncvtime } // 继续执行相关bucket数据同步 e = ds.syncbucketdatacontinue(cifrom, cito, mqlfrom) if e != nil { return merrs.New(e) } // 继续执行对象数据同步 interval := mcfg.GetDuration("datasync.run.interval", "1m") lastsyncvtime = lastsyncvtime.Add(-interval) if lastdatavtime.After(lastsyncvtime) { lastsyncvtime = lastdatavtime } lastsyncvtime = lastsyncvtime.Add(-mcfg.GetDuration("datasync.run.overtime", "30s")) nextvtime := lastsyncvtime run := true for run { nextvtime = lastsyncvtime.Add(mcfg.GetDuration("datasync.pagetime", "1d")) if time.Now().Before(nextvtime) { nextvtime = time.Now() run = false } snextvtime := nextvtime.Format("2006-01-02 15:04:05") mqlseg := mqlAddVtimeRange(mqlfrom, slastsyncvtime, snextvtime) offset := 0 for { mqlchunk := mqlseg + fmt.Sprint(" limit ", offset, ",", mcfg.GetInt("datasync.pagesize", 50)) logger.Debug(mqlchunk) // 读取源数据 r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do() if e != nil { return merrs.New(e) } if len(r.Data) == 0 { // 没有更多数据 break } // 写入目标数据 for _, data := range r.Data { e = ds.insertData(mqlfrom, cifrom, cito, data) if e != nil { return e } datavtime := data["vtime"].(time.Time) if lastdatavtime.Before(datavtime) { lastdatavtime = datavtime } } offset += len(r.Data) } lastsyncvtime = nextvtime slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05") recordscount += int64(offset) // dc.RecordsCount = recordscount dc.LastDataVtime = lastdatavtime.Format("2006-01-02 15:04:05.000000") dc.LastSyncVtime = slastsyncvtime ds.syncstatus.Save(mqlfrom, dc) } logger.Info("total sync data:", mqlfrom, "vtime:", dc.FromVtime, "~", dc.LastSyncVtime, "records:", dc.RecordsCount, "lastvtime:", dc.LastDataVtime) return nil } func (ds *DataSync) insertData(mqlfrom string, cifrom, cito *dbo.ClassInfoHelper, data map[string]any) error { if data["class"] != cito.Fullname { data["class"] = cito.Fullname } logger.Debug("insertData", data["class"], data["id"], data["vtime"]) vals := []any{} for _, fn := range cito.Fieldslist { if cito.Fieldinfos[fn].Fieldtype == "bucket" { vals = append(vals, nil) continue } v := data[fn] if v == nil { i := strings.Index(fn, ":") if i >= 0 { fn = fn[i+1:] } v = data[fn] } vals = append(vals, v) } _, e := ds.odbcTo.Query(cito.Insertmql, vals...).WithContext(ds.ctx).Do() if e != nil { return merrs.New(e, []string{"mql", cito.Insertmql, "vals", fmt.Sprint(data)}) } e = ds.syncbucketdatanew(cifrom, cito, mqlfrom, data) if e != nil { return e } return nil } func (ds *DataSync) syncbucketdatanew(cifrom, cito *dbo.ClassInfoHelper, mqlfrom string, data map[string]any) error { if ds.buckettimesince == 0 { return nil } for _, bf := range cifrom.BucketFields { if _, has := data[bf]; has { ds.startsyncproc(ds.wg, ds.bucketrc, func() error { oid := data["id"].(string) buckettype := cast.ToString(cifrom.Fieldinfos[bf].Fieldoption["type"]) key := buckettype + ":" + cifrom.Fullname + ":" + bf + "[" + oid + "]" dc := ds.syncstatus.DoneCount(key) dc.BucketClass = cifrom.Fullname dc.BucketField = bf dc.BucketObjID = oid return ds.syncbucketdata(mqlfrom, cifrom, cito, key, dc) }) } } return nil } func (ds *DataSync) syncbucketdatacontinue(cifrom, cito *dbo.ClassInfoHelper, mqlfrom string) error { if ds.buckettimesince == 0 { return nil } mqlchunk := mqlfrom + " limit 1" logger.Debug("check data fields:", ds.odbcFrom.Config().Keyspace, mqlchunk) // 读取源数据 r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do() if e != nil { return e } if len(r.Data) == 0 { return nil } data := r.Data[0] for _, bf := range cifrom.BucketFields { if _, has := data[bf]; has { e := func() error { bucketdonecount := ds.syncstatus.DoneCountCopy(func(k string, v *DoneCount) bool { return v.BucketClass == cifrom.Fullname && v.BucketField == bf }) for k, v := range bucketdonecount { key := k dc := v ds.startsyncproc(ds.wg, ds.bucketrc, func() error { return ds.syncbucketdata(mqlfrom, cifrom, cito, key, dc) }) } return nil }() if e != nil { return e } } } return nil } func (ds *DataSync) syncbucketdata(mqlfrom string, cifrom, cito *dbo.ClassInfoHelper, dckey string, dc *DoneCount) error { isrunning := <-dc.isrunning if isrunning { dc.isrunning <- isrunning return nil } dc.isrunning <- true defer func() { <-dc.isrunning dc.isrunning <- false }() bucketType := cast.ToString(cifrom.Fieldinfos[dc.BucketField].Fieldoption["type"]) logger.Debug("to sync", bucketType, "data", dc.BucketClass, dc.BucketField, "id:", dc.BucketObjID, mqlfrom) recordscount := dc.RecordsCount sfromvtime := dc.FromVtime slastdatavtime := dc.LastDataVtime slastsyncvtime := dc.LastSyncVtime // 分段获取数据 fromvtime, _ := time.Parse("2006-01-02 15:04:05", sfromvtime) lastdatavtime, _ := time.Parse("2006-01-02 15:04:05.000000", slastdatavtime) lastsyncvtime, _ := time.Parse("2006-01-02 15:04:05", slastsyncvtime) sincevtime := time.Now().Add(-ds.buckettimesince) if fromvtime.Before(sincevtime) || lastsyncvtime.Before(fromvtime) || lastdatavtime.Before(fromvtime) { fromvtime = sincevtime lastdatavtime = sincevtime lastsyncvtime = sincevtime sfromvtime = fromvtime.Format("2006-01-02 15:04:05") slastdatavtime = lastdatavtime.Format("2006-01-02 15:04:05.000000") slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05") recordscount = 0 // 初始化DataCount进度信息 dc.RecordsCount = recordscount dc.FromVtime = sfromvtime dc.LastDataVtime = slastdatavtime dc.LastSyncVtime = slastsyncvtime } // 继续执行数据同步 interval := mcfg.GetDuration("datasync.run.interval", "1m") lastsyncvtime = lastsyncvtime.Add(-interval) if lastdatavtime.After(lastsyncvtime) { lastsyncvtime = lastdatavtime } lastsyncvtime = lastsyncvtime.Add(-mcfg.GetDuration("datasync.run.overtime", "30s")) nextvtime := lastsyncvtime run := true for run { nextvtime = lastsyncvtime.Add(mcfg.GetDuration("datasync.bucket.pagetime", "1h")) if time.Now().Before(nextvtime) { nextvtime = time.Now() run = false } snextvtime := nextvtime.Format("2006-01-02 15:04:05") offset := 0 { mqlchunk := "select " + dc.BucketField + ".time('" + slastsyncvtime + "','" + snextvtime + "')" + " from " + dc.BucketClass + " where id=?" logger.Debug(mqlchunk, dc.BucketObjID) // 读取源数据 r, e := ds.odbcFrom.Query(mqlchunk, dc.BucketObjID).WithContext(ds.ctx).Do() if e != nil { return merrs.New(e) } if len(r.Data) == 0 { return merrs.New("bucket host data not found id="+dc.BucketObjID, merrs.Map{"mql": mqlchunk}) } idata := r.Data[0][dc.BucketField] if idata != nil { data := cast.ToSlice(idata) ms := []map[string]any{} for i := 0; i < len(data); i++ { dat := cast.ToSlice(data[i]) if len(dat) >= 3 { m := map[string]any{} m["__timestamp__"] = dat[0] m["__name__"] = dat[1] m["__value__"] = dat[2] if len(dat) >= 4 { tm := cast.ToStringMap(dat[3]) for k, v := range tm { m[k] = v } } ms = append(ms, m) } } // 写入目标数据 mqlinsert := "insert into " + dc.BucketClass + "(id," + dc.BucketField + ") values(?,?)" _, e = ds.odbcTo.Query(mqlinsert, dc.BucketObjID, ms).WithContext(ds.ctx).Do() if e != nil { return merrs.New(e) } lastdatavtime = nextvtime offset += len(data) } } lastsyncvtime = nextvtime slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05") recordscount += int64(offset) // dc.RecordsCount = recordscount dc.LastDataVtime = lastdatavtime.Format("2006-01-02 15:04:05.000000") dc.LastSyncVtime = slastsyncvtime ds.syncstatus.Save(dckey, dc) } logger.Info("total sync", bucketType, "data", dc.BucketClass, dc.BucketField, "id:", dc.BucketObjID, recordscount, "records", "to time", dc.LastSyncVtime) return nil } func (ds *DataSync) assureToClass(toclass string, cifrom *dbo.ClassInfoHelper) (cito *dbo.ClassInfoHelper, err error) { ddl := cifrom.DDL logger.Info("assureToClass", cifrom.Classfullname, toclass) if toclass != cifrom.Classfullname { re := regexp.MustCompile(`(?i)(create\s+class(?:\s+if\s+not\s+exists)*\s+)([^\()]+)(\s*\(.*)`) ddl = re.ReplaceAllString(ddl, "$1"+toclass+"$3") } cis, e := ds.odbcTo.ClassInfo(toclass, false) if e != nil && !merrs.NotExistError.Contains(e) && !strings.Contains(e.Error(), "not exists") { return nil, merrs.New(e) } if len(cis) == 0 { logger.Debug("auto create class", ddl) _, e = ds.odbcTo.Query(ddl).WithContext(ds.ctx).Do() if e != nil { return nil, merrs.New(e) } cis, e = ds.odbcTo.ClassInfo(toclass, false) if e != nil { return nil, merrs.New(e) } if len(cis) == 0 { return nil, merrs.New("len(cis) == 0") } } cito, e = ds.schemaTo.NewClassinfo(cis[0]) if e != nil { return nil, merrs.New(e) } logger.Debug("sync to class", cito.Classfullname) return cito, nil }