|
@@ -199,7 +199,7 @@ func (ds *DataSync) startsyncproc(wg *sync.WaitGroup, rc *rc.RoutinesController,
|
|
|
if e != nil {
|
|
|
// logger.Trace(6)
|
|
|
ds.mutex.Lock()
|
|
|
- ds.errs = append(ds.errs, e)
|
|
|
+ ds.errs = append(ds.errs, merrs.New(e))
|
|
|
ds.mutex.Unlock()
|
|
|
wg.Done()
|
|
|
}
|
|
@@ -215,11 +215,11 @@ func (ds *DataSync) syncdata(mqlfrom string) error {
|
|
|
// 已完成同步进度
|
|
|
fromclass, fields, condition, e := ds.parseMQL(mqlfrom)
|
|
|
if e != nil {
|
|
|
- return e
|
|
|
+ return merrs.New(e)
|
|
|
}
|
|
|
cifroms, e := ds.schemaFrom.LoadClassinfos(fromclass)
|
|
|
if e != nil {
|
|
|
- return e
|
|
|
+ return merrs.New(e)
|
|
|
}
|
|
|
logger.Debug("loadClassinfos", fromclass, len(cifroms))
|
|
|
for _, v := range cifroms {
|
|
@@ -302,7 +302,7 @@ func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition
|
|
|
}
|
|
|
cito, e := ds.assureToClass(toclass, cifrom)
|
|
|
if e != nil {
|
|
|
- return e
|
|
|
+ return merrs.New(e)
|
|
|
}
|
|
|
//
|
|
|
mqlfrom := "select " + fields + " from " + cifrom.Fullname
|
|
@@ -340,7 +340,7 @@ func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition
|
|
|
// 读取源数据
|
|
|
r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
|
|
|
if e != nil {
|
|
|
- return e
|
|
|
+ return merrs.New(e)
|
|
|
}
|
|
|
if len(r.Data) == 0 {
|
|
|
// 没有更多数据
|
|
@@ -373,7 +373,7 @@ func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition
|
|
|
// 继续执行相关bucket数据同步
|
|
|
e = ds.syncbucketdatacontinue(cifrom, cito, mqlfrom)
|
|
|
if e != nil {
|
|
|
- return e
|
|
|
+ return merrs.New(e)
|
|
|
}
|
|
|
// 继续执行对象数据同步
|
|
|
interval := mcfg.GetDuration("datasync.run.interval", "1m")
|
|
@@ -399,7 +399,7 @@ func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition
|
|
|
// 读取源数据
|
|
|
r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
|
|
|
if e != nil {
|
|
|
- return e
|
|
|
+ return merrs.New(e)
|
|
|
}
|
|
|
if len(r.Data) == 0 {
|
|
|
// 没有更多数据
|
|
@@ -582,7 +582,7 @@ func (ds *DataSync) syncbucketdata(mqlfrom string, cifrom, cito *dbo.ClassInfoHe
|
|
|
// 读取源数据
|
|
|
r, e := ds.odbcFrom.Query(mqlchunk, dc.BucketObjID).WithContext(ds.ctx).Do()
|
|
|
if e != nil {
|
|
|
- return e
|
|
|
+ return merrs.New(e)
|
|
|
}
|
|
|
if len(r.Data) == 0 {
|
|
|
return merrs.New("bucket host data not found id="+dc.BucketObjID, merrs.Map{"mql": mqlchunk})
|
|
@@ -611,7 +611,7 @@ func (ds *DataSync) syncbucketdata(mqlfrom string, cifrom, cito *dbo.ClassInfoHe
|
|
|
mqlinsert := "insert into " + dc.BucketClass + "(id," + dc.BucketField + ") values(?,?)"
|
|
|
_, e = ds.odbcTo.Query(mqlinsert, dc.BucketObjID, ms).WithContext(ds.ctx).Do()
|
|
|
if e != nil {
|
|
|
- return e
|
|
|
+ return merrs.New(e)
|
|
|
}
|
|
|
lastdatavtime = nextvtime
|
|
|
offset += len(data)
|