wecisecode 6 dienas atpakaļ
vecāks
revīzija
ffaab3d7e5
2 mainītis faili ar 44 papildinājumiem un 23 dzēšanām
  1. 35 19
      datasync/datasync.conf
  2. 9 4
      datasync/datasync/datasync.go

+ 35 - 19
datasync/datasync.conf

@@ -1,4 +1,9 @@
 
+[log]
+# 日志输出级别,info / debug,默认 info
+level=debug
+
+
 [datasync]
 from.odbserver=127.0.0.1:11001
 from.keyspace=matrix
@@ -9,9 +14,9 @@ to.keyspace=oktest
 # 可以配置多个 类名 或 查询语句,查询结果中必须包含 class id 字段
 # 查询语句中可以指定 where 条件,不支持聚合、排序、limit 等子句
 # 不配置将读取所有类数据
-from.data=/test/
+# from.data=/test/
 # from.data=/test/bucketpromdb
-from.data=select * from /test/alert_status where vtime>'2022-03-10 16:53:43'
+# from.data=select * from /test/alert_status where vtime>'2022-03-10 16:53:43'
 from.data=/matrix/ldap
 from.data=/matrix/group
 from.data=/matrix/perms/
@@ -32,39 +37,50 @@ deny.class=/testdc2/
 deny.class=/testruledata
 deny.class=/aywl/
 
+# 类映射
+# mapping.class.fromclass=toclass
+
+# 指定建类语句等初始化mql语句,可以是mql文件或mql语句,默认根据原始元数据信息自动建类
+# 尚未实现定制化
+# to.init.mql= 
+
+# 目标数据存在时的选项 overwrite / ignore / error,默认 overwrite
+# 尚未实现定制化
+# exists=overwrite
 
 # 只同步指定的最近一段时间的对象数据,默认 365d 
 data.time.since=3000d
 # 只同步指定的最近一段时间的Bucket数据,默认 30d 
 bucket.time.since=30d
 
-# 指定建类语句等初始化mql语句,可以是mql文件或mql语句,默认根据原始元数据信息自动建类
-#to.init.mql= 
-
-# 类映射
-# mapping.class.fromclass=toclass
-
-# overwrite / ignore / error
-# exists=overwrite
-
-# 每秒写入数据量,同步上限
+# 每秒写入数据量,同步上限,默认 0 无限制
+# 尚未实现定制化
 # limit.rate=50
 
-# 对象数据同步并发数,默认cpu数
-concur.threads=2
+# 对象数据同步控制并发数,对应from.data配置,默认cpu数
+# ctrl.threads=2
+# 对象数据同步并发数,对应源数据查询语句,默认cpu数
+# concur.threads=2
 # Bucket数据同步并发数,默认cpu数
-bucket.threads=2
+# bucket.threads=2
 # 每次读取数据的最大数量,默认50,立即生效
-pagesize=10
+pagesize=50
 # 每次读取数据的时间分段,默认一天,立即生效
+# 每次读取一个完整时间分段的数据,因此数据同步存在一个完整时间分段的延迟
 pagetime=1d
 # 每次读取bucket数据的时间分段,默认1h,立即生效
+# 每次读取一个完整时间分段的数据,因此数据同步存在一个完整时间分段的延迟
+# 设为 0 不同步bucket数据
 bucket.pagetime=1d
-# 用于存放同步进度记录
+# 用于存放同步进度记录,新目录会自动创建
 data.dir=data
 # 持续运行间隔,数据同步到当前时间后,间隔指定时间轮询执行
-# 默认 0 运行一次,数据同步到当前时间后退出
-# 立即生效
+# 设为 0 只运行一次,数据同步到当前时间后退出
+# 默认 1m,立即生效
 run.interval=5s
 # 每次轮询读取数据的重叠时间,默认30s,立即生效
+# 从最后同步完成时间回退一段时间,防止时钟不同步等原因导致运行间隔期间新产生的数据丢失
 run.overtime=30s
+
+# 运行参数
+# reload 重新开始同步

+ 9 - 4
datasync/datasync/datasync.go

@@ -13,11 +13,11 @@ import (
 	"git.wecise.com/wecise/odb-go/dbo"
 	"git.wecise.com/wecise/odb-go/odb"
 	"git.wecise.com/wecise/odb-go/odbc"
-	"github.com/scylladb/go-set/strset"
-	"github.com/spf13/cast"
 	"gitee.com/wecisecode/util/merrs"
 	"gitee.com/wecisecode/util/mfmt"
 	"gitee.com/wecisecode/util/rc"
+	"github.com/scylladb/go-set/strset"
+	"github.com/spf13/cast"
 )
 
 var mcfg = odbc.Config
@@ -108,7 +108,11 @@ func (ds *DataSync) Run() (done <-chan error) {
 	key := regexp.MustCompile(`\W`).ReplaceAllString(strings.Split(ds.fromodbserver, ",")[0]+"_"+ds.fromkeyspace+"_"+strings.Split(ds.toodbserver, ",")[0]+"_"+ds.tokeyspace, "_")
 	ds.syncstatus = NewSyncStatus(key)
 	if !mcfg.GetBool("reload") {
-		ds.syncstatus.Load()
+		e := ds.syncstatus.Load()
+		if e != nil {
+			ret <- e
+			return ret
+		}
 	}
 	go ds.run(ret)
 	return ret
@@ -136,6 +140,7 @@ func (ds *DataSync) run(ret chan error) {
 	defer cancel()
 	ds.ctx = ctx
 	ds.cancel = cancel
+	logger.Info("resume sync data, from", len(fromdatas), "configure")
 	for {
 		ds.wg = &sync.WaitGroup{}
 		ds.syncstatus.Resume()
@@ -189,7 +194,7 @@ func (ds *DataSync) startsyncproc(wg *sync.WaitGroup, rc *rc.RoutinesController,
 // 同步一块数据
 // mqlfrom 可以是类名 或 查询语句
 func (ds *DataSync) syncdata(mqlfrom string) error {
-	// 一格式化为查询语句
+	// 一格式化为查询语句
 	mqlfrom = FormatMQL(mqlfrom)
 	// 已完成同步进度
 	fromclass, fields, condition, e := ds.LastSyncProgress(mqlfrom)