libf il y a 2 mois
Parent
commit
6150928214
4 fichiers modifiés avec 33 ajouts et 16 suppressions
  1. 2 0
      datasync/datasync.conf
  2. 22 12
      datasync/datasync/datasync.go
  3. 5 2
      go.mod
  4. 4 2
      go.sum

+ 2 - 0
datasync/datasync.conf

@@ -66,3 +66,5 @@ data.dir=data
 # 默认 0 运行一次,数据同步到当前时间后退出
 # 立即生效
 run.interval=5s
+# 每次轮询读取数据的重叠时间,默认30s,立即生效
+run.overtime=30s

+ 22 - 12
datasync/datasync/datasync.go

@@ -346,11 +346,19 @@ func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition
 		return e
 	}
 	// 继续执行对象数据同步
+	interval := mcfg.GetDuration("datasync.run.interval", "1m")
+	lastsyncvtime = lastsyncvtime.Add(-interval)
+	if lastdatavtime.After(lastsyncvtime) {
+		lastsyncvtime = lastdatavtime
+	}
+	lastsyncvtime = lastsyncvtime.Add(-mcfg.GetDuration("datasync.run.overtime", "30s"))
 	nextvtime := lastsyncvtime
-	for {
+	run := true
+	for run {
 		nextvtime = lastsyncvtime.Add(mcfg.GetDuration("datasync.pagetime", "1d"))
 		if time.Now().Before(nextvtime) {
-			break
+			nextvtime = time.Now()
+			run = false
 		}
 		snextvtime := nextvtime.Format("2006-01-02 15:04:05")
 		mqlseg := mqlAddVtimeRange(mqlfrom, slastsyncvtime, snextvtime)
@@ -389,7 +397,7 @@ func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition
 		dc.LastSyncVtime = slastsyncvtime
 		ds.syncstatus.Save(mqlfrom, dc)
 	}
-	logger.Info("sync data:", mqlfrom, "done", "vtime:", dc.FromVtime, "~", dc.LastSyncVtime, "records:", dc.RecordsCount, "lastvtime:", dc.LastDataVtime)
+	logger.Info("total sync data:", mqlfrom, "vtime:", dc.FromVtime, "~", dc.LastSyncVtime, "records:", dc.RecordsCount, "lastvtime:", dc.LastDataVtime)
 	return nil
 }
 
@@ -522,12 +530,19 @@ func (ds *DataSync) syncbucketdata(mqlfrom string, cifrom, cito *dbo.ClassInfoHe
 		dc.LastSyncVtime = slastsyncvtime
 	}
 	// 继续执行数据同步
-	count := 0
+	interval := mcfg.GetDuration("datasync.run.interval", "1m")
+	lastsyncvtime = lastsyncvtime.Add(-interval)
+	if lastdatavtime.After(lastsyncvtime) {
+		lastsyncvtime = lastdatavtime
+	}
+	lastsyncvtime = lastsyncvtime.Add(-mcfg.GetDuration("datasync.run.overtime", "30s"))
 	nextvtime := lastsyncvtime
-	for {
+	run := true
+	for run {
 		nextvtime = lastsyncvtime.Add(mcfg.GetDuration("datasync.bucket.pagetime", "1h"))
 		if time.Now().Before(nextvtime) {
-			break
+			nextvtime = time.Now()
+			run = false
 		}
 		snextvtime := nextvtime.Format("2006-01-02 15:04:05")
 		offset := 0
@@ -575,18 +590,13 @@ func (ds *DataSync) syncbucketdata(mqlfrom string, cifrom, cito *dbo.ClassInfoHe
 		lastsyncvtime = nextvtime
 		slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
 		recordscount += int64(offset)
-		count += offset
 		//
 		dc.RecordsCount = recordscount
 		dc.LastDataVtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
 		dc.LastSyncVtime = slastsyncvtime
 		ds.syncstatus.Save(dckey, dc)
 	}
-	logger.Debug("end sync", bucketType, "data", dc.BucketClass, dc.BucketField, "id:", dc.BucketObjID)
-
-	if count > 0 {
-		logger.Info("sync", bucketType, "data", dc.BucketClass, dc.BucketField, "id:", dc.BucketObjID, count, "records", "to time", dc.LastSyncVtime)
-	}
+	logger.Info("total sync", bucketType, "data", dc.BucketClass, dc.BucketField, "id:", dc.BucketObjID, recordscount, "records", "to time", dc.LastSyncVtime)
 	return nil
 }
 

+ 5 - 2
go.mod

@@ -1,15 +1,18 @@
 module git.wecise.com/wecise/odbtools
 
-go 1.22.0
+go 1.23
+
+toolchain go1.24.0
 
 require (
-	git.wecise.com/wecise/odb-go v0.0.0-20250328085256-ba8b2fb06fe1
+	git.wecise.com/wecise/odb-go v0.0.0-20250408102550-b3e4738bc87c
 	github.com/scylladb/go-set v1.0.3-0.20200225121959-cc7b2070d91e
 	github.com/spf13/cast v1.7.1
 	github.com/wecisecode/util v0.3.2
 )
 
 require (
+	git.wecise.com/wecise/util v0.0.0-20250407104014-5fbb6fc18909 // indirect
 	github.com/bluele/gcache v0.0.2 // indirect
 	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/coreos/go-semver v0.3.0 // indirect

+ 4 - 2
go.sum

@@ -1,5 +1,7 @@
-git.wecise.com/wecise/odb-go v0.0.0-20250328085256-ba8b2fb06fe1 h1:HiUKPb6BWR4EqAx5LZGOHiWdUzQfUj8tJvey+wpr0BA=
-git.wecise.com/wecise/odb-go v0.0.0-20250328085256-ba8b2fb06fe1/go.mod h1:7DJYChz1HA8QZ2lslxCa0507/G47FdzmjTqpdHnKxSo=
+git.wecise.com/wecise/odb-go v0.0.0-20250408102550-b3e4738bc87c h1:ZRcDFdZdzVYz5sXqx+SJK8iHHpGTctgywpFYn0o9ZlM=
+git.wecise.com/wecise/odb-go v0.0.0-20250408102550-b3e4738bc87c/go.mod h1:bK9z48Ybh5sxBURydT9xBKxH3uUbky8GAJ2AXrMRI/w=
+git.wecise.com/wecise/util v0.0.0-20250407104014-5fbb6fc18909 h1:nfZGKsHrwYo0iSw9tuYAgXZ5Ud88ZntafCZ17Yhig8A=
+git.wecise.com/wecise/util v0.0.0-20250407104014-5fbb6fc18909/go.mod h1:9Nisd3hYU7Mqe7vRggfbzCuF326mvvLnOw1AqK05AGY=
 github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
 github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
 github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=