|
@@ -104,6 +104,7 @@ func (ds *DataSync) Init() (err error) {
|
|
|
}
|
|
|
|
|
|
func (ds *DataSync) Run() (done <-chan error) {
|
|
|
+ logger.Info(mcfg.Info())
|
|
|
ret := make(chan error, 1)
|
|
|
key := regexp.MustCompile(`\W`).ReplaceAllString(strings.Split(ds.fromodbserver, ",")[0]+"_"+ds.fromkeyspace+"_"+strings.Split(ds.toodbserver, ",")[0]+"_"+ds.tokeyspace, "_")
|
|
|
ds.syncstatus = NewSyncStatus(key)
|
|
@@ -143,17 +144,23 @@ func (ds *DataSync) run(ret chan error) {
|
|
|
logger.Info("resume sync data, from", len(fromdatas), "configure")
|
|
|
for {
|
|
|
ds.wg = &sync.WaitGroup{}
|
|
|
+ // logger.Trace(1)
|
|
|
ds.syncstatus.Resume()
|
|
|
+ // logger.Trace(2)
|
|
|
for _, fromdata := range fromdatas {
|
|
|
mqlfrom := fromdata
|
|
|
+ // logger.Trace(3)
|
|
|
ds.startsyncproc(ds.wg, ds.ctrlrc, func() error {
|
|
|
+ logger.Info("sync data, from", mqlfrom)
|
|
|
return ds.syncdata(mqlfrom)
|
|
|
})
|
|
|
}
|
|
|
ds.wg.Wait()
|
|
|
ds.syncstatus.WaitSaveDone()
|
|
|
if len(ds.errs) > 0 {
|
|
|
- ret <- merrs.New(ds.errs)
|
|
|
+ e := merrs.New(ds.errs)
|
|
|
+ logger.Error(e)
|
|
|
+ ret <- e
|
|
|
return
|
|
|
}
|
|
|
logger.Info("total sync data", ds.syncstatus.TotalChunks(), "chunks", ds.syncstatus.TotalRecords(), "records,", "use time:", mfmt.FormatDuration(ds.syncstatus.TotalUseTime()))
|
|
@@ -169,35 +176,44 @@ func (ds *DataSync) run(ret chan error) {
|
|
|
|
|
|
func (ds *DataSync) startsyncproc(wg *sync.WaitGroup, rc *rc.RoutinesController, proc func() error) {
|
|
|
wg.Add(1)
|
|
|
+ // logger.Trace(4)
|
|
|
e := rc.ConcurCall(1, func() {
|
|
|
+ // logger.Trace(5)
|
|
|
defer wg.Done()
|
|
|
- if ds.ctx.Err() == nil {
|
|
|
- e := proc()
|
|
|
- if e != nil {
|
|
|
- if !merrs.ContextError.Contains(e) {
|
|
|
- ds.mutex.Lock()
|
|
|
- ds.errs = append(ds.errs, e)
|
|
|
- ds.mutex.Unlock()
|
|
|
- }
|
|
|
- ds.cancel()
|
|
|
+ e := ds.ctx.Err()
|
|
|
+ if e != nil {
|
|
|
+ logger.Error(e)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ e = proc()
|
|
|
+ if e != nil {
|
|
|
+ logger.Error(e)
|
|
|
+ if !merrs.ContextError.Contains(e) {
|
|
|
+ ds.mutex.Lock()
|
|
|
+ ds.errs = append(ds.errs, e)
|
|
|
+ ds.mutex.Unlock()
|
|
|
}
|
|
|
+ ds.cancel()
|
|
|
}
|
|
|
})
|
|
|
if e != nil {
|
|
|
+ // logger.Trace(6)
|
|
|
ds.mutex.Lock()
|
|
|
ds.errs = append(ds.errs, e)
|
|
|
ds.mutex.Unlock()
|
|
|
wg.Done()
|
|
|
}
|
|
|
+ // logger.Trace(7)
|
|
|
}
|
|
|
|
|
|
// 同步一块数据
|
|
|
// mqlfrom 可以是类名 或 查询语句
|
|
|
func (ds *DataSync) syncdata(mqlfrom string) error {
|
|
|
// 统一格式化为查询语句
|
|
|
+ logger.Debug("syncdata", mqlfrom)
|
|
|
mqlfrom = FormatMQL(mqlfrom)
|
|
|
// 已完成同步进度
|
|
|
- fromclass, fields, condition, e := ds.LastSyncProgress(mqlfrom)
|
|
|
+ fromclass, fields, condition, e := ds.parseMQL(mqlfrom)
|
|
|
if e != nil {
|
|
|
return e
|
|
|
}
|
|
@@ -205,6 +221,7 @@ func (ds *DataSync) syncdata(mqlfrom string) error {
|
|
|
if e != nil {
|
|
|
return e
|
|
|
}
|
|
|
+ logger.Debug("loadClassinfos", fromclass, len(cifroms))
|
|
|
for _, v := range cifroms {
|
|
|
cifrom := v
|
|
|
ds.startsyncproc(ds.wg, ds.objectrc, func() error {
|
|
@@ -231,7 +248,11 @@ func FormatMQL(mql string) string {
|
|
|
return mql
|
|
|
}
|
|
|
|
|
|
-func (ds *DataSync) LastSyncProgress(mql string) (class string, fields, condition string, err error) {
|
|
|
+func (ds *DataSync) parseMQL(mql string) (class string, fields, condition string, err error) {
|
|
|
+ logger.Debug("parseMQL", mql)
|
|
|
+ defer func() {
|
|
|
+ logger.Debug("parseMQL return", mql, class, fields, condition)
|
|
|
+ }()
|
|
|
selstmt := reselectfromclass.FindStringSubmatch(mql)
|
|
|
if len(selstmt) < 3 {
|
|
|
return "", "", "", merrs.New("from.data select statement error", []string{"mql", mql})
|
|
@@ -259,13 +280,17 @@ func mqlAddVtimeRange(mql, sbeginvtime, sendvtime string) string {
|
|
|
}
|
|
|
|
|
|
func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition string) error {
|
|
|
+ logger.Debug("syncclassdata", cifrom.Fullname, fields, condition)
|
|
|
denyclass := strset.New(mcfg.GetStrings("datasync.deny.class")...)
|
|
|
+ logger.Debug("denyclass", denyclass)
|
|
|
if denyclass.Has(cifrom.Fullname) || denyclass.Has(cifrom.Fullname+"/") {
|
|
|
+ logger.Debug(cifrom.Fullname, "in denyclass")
|
|
|
return nil
|
|
|
}
|
|
|
checkdenyclass := cifrom
|
|
|
for checkdenyclass != nil {
|
|
|
if denyclass.Has(checkdenyclass.BaseClassFullname() + "/") {
|
|
|
+ logger.Debug(checkdenyclass.BaseClassFullname(), "in denyclass")
|
|
|
return nil
|
|
|
}
|
|
|
checkdenyclass = ds.schemaFrom.GetClassInfo(checkdenyclass.BaseClassFullname())
|
|
@@ -407,7 +432,7 @@ func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition
|
|
|
}
|
|
|
|
|
|
func (ds *DataSync) insertData(mqlfrom string, cifrom, cito *dbo.ClassInfoHelper, data map[string]any) error {
|
|
|
- logger.Debug(data["class"], data["id"], data["vtime"])
|
|
|
+ logger.Debug("insertData", data["class"], data["id"], data["vtime"])
|
|
|
vals := []any{}
|
|
|
for _, fn := range cito.Fieldslist {
|
|
|
if cito.Fieldinfos[fn].Fieldtype == "bucket" {
|
|
@@ -606,6 +631,7 @@ func (ds *DataSync) syncbucketdata(mqlfrom string, cifrom, cito *dbo.ClassInfoHe
|
|
|
}
|
|
|
|
|
|
func (ds *DataSync) assureToClass(toclass string, cifrom *dbo.ClassInfoHelper) (cito *dbo.ClassInfoHelper, err error) {
|
|
|
+ logger.Info("assureToClass", cifrom.Classfullname, toclass)
|
|
|
if toclass != cifrom.Classfullname {
|
|
|
return nil, merrs.New("not support class mapping", []string{"toclass", toclass, "fromclass", cifrom.Classfullname})
|
|
|
}
|
|
@@ -614,6 +640,7 @@ func (ds *DataSync) assureToClass(toclass string, cifrom *dbo.ClassInfoHelper) (
|
|
|
return nil, merrs.New(e)
|
|
|
}
|
|
|
if len(cis) == 0 {
|
|
|
+ logger.Debug("auto create class", cifrom.DDL)
|
|
|
_, e = ds.odbcTo.Query(cifrom.DDL).WithContext(ds.ctx).Do()
|
|
|
if e != nil {
|
|
|
return nil, merrs.New(e)
|
|
@@ -630,5 +657,6 @@ func (ds *DataSync) assureToClass(toclass string, cifrom *dbo.ClassInfoHelper) (
|
|
|
if e != nil {
|
|
|
return nil, merrs.New(e)
|
|
|
}
|
|
|
+ logger.Debug("sync to class", cito.Classfullname)
|
|
|
return cito, nil
|
|
|
}
|