|
|
@@ -338,7 +338,7 @@ func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition
|
|
|
sfirstdatavtime := firstdatavtime.Format("2006-01-02 15:04:05.000000")
|
|
|
for i := 0; ; i++ {
|
|
|
mqlseg := mqlAddVtimeRange(mqlfrom, ssincevtime, sfirstdatavtime)
|
|
|
- mqlchunk := mqlseg + fmt.Sprint(" order by vtime limit 1")
|
|
|
+ mqlchunk := mqlseg + " order by vtime limit 1"
|
|
|
logger.Debug("check first data vtime:", ds.odbcFrom.Config().Keyspace, mqlchunk)
|
|
|
// 读取源数据
|
|
|
r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
|
|
|
@@ -435,6 +435,9 @@ func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition
|
|
|
}
|
|
|
|
|
|
func (ds *DataSync) insertData(mqlfrom string, cifrom, cito *dbo.ClassInfoHelper, data map[string]any) error {
|
|
|
+ if data["class"] != cito.Fullname {
|
|
|
+ data["class"] = cito.Fullname
|
|
|
+ }
|
|
|
logger.Debug("insertData", data["class"], data["id"], data["vtime"])
|
|
|
vals := []any{}
|
|
|
for _, fn := range cito.Fieldslist {
|
|
|
@@ -489,7 +492,7 @@ func (ds *DataSync) syncbucketdatacontinue(cifrom, cito *dbo.ClassInfoHelper, mq
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- mqlchunk := mqlfrom + fmt.Sprint(" limit 1")
|
|
|
+ mqlchunk := mqlfrom + " limit 1"
|
|
|
logger.Debug("check data fields:", ds.odbcFrom.Config().Keyspace, mqlchunk)
|
|
|
// 读取源数据
|
|
|
r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
|
|
|
@@ -634,17 +637,19 @@ func (ds *DataSync) syncbucketdata(mqlfrom string, cifrom, cito *dbo.ClassInfoHe
|
|
|
}
|
|
|
|
|
|
func (ds *DataSync) assureToClass(toclass string, cifrom *dbo.ClassInfoHelper) (cito *dbo.ClassInfoHelper, err error) {
|
|
|
+ ddl := cifrom.DDL
|
|
|
logger.Info("assureToClass", cifrom.Classfullname, toclass)
|
|
|
if toclass != cifrom.Classfullname {
|
|
|
- return nil, merrs.New("not support class mapping", []string{"toclass", toclass, "fromclass", cifrom.Classfullname})
|
|
|
+ re := regexp.MustCompile(`(?i)(create\s+class(?:\s+if\s+not\s+exists)*\s+)([^\()]+)(\s*\(.*)`)
|
|
|
+ ddl = re.ReplaceAllString(ddl, "$1"+toclass+"$3")
|
|
|
}
|
|
|
cis, e := ds.odbcTo.ClassInfo(toclass, false)
|
|
|
if e != nil && !merrs.NotExistError.Contains(e) && !strings.Contains(e.Error(), "not exists") {
|
|
|
return nil, merrs.New(e)
|
|
|
}
|
|
|
if len(cis) == 0 {
|
|
|
- logger.Debug("auto create class", cifrom.DDL)
|
|
|
- _, e = ds.odbcTo.Query(cifrom.DDL).WithContext(ds.ctx).Do()
|
|
|
+ logger.Debug("auto create class", ddl)
|
|
|
+ _, e = ds.odbcTo.Query(ddl).WithContext(ds.ctx).Do()
|
|
|
if e != nil {
|
|
|
return nil, merrs.New(e)
|
|
|
}
|