2 Commits e0c323d105 ... e0b1f0441f

Auteur SHA1 Bericht Datum
  wecisecode e0b1f0441f syslog 1 maand geleden
  wecisecode ffaab3d7e5 datasync 1 maand geleden
4 gewijzigde bestanden met toevoegingen van 176 en 55 verwijderingen
  1. 15 32
      .gitignore
  2. 35 19
      datasync/datasync.conf
  3. 9 4
      datasync/datasync/datasync.go
  4. 117 0
      syslog/syslog.go

+ 15 - 32
.gitignore

@@ -1,40 +1,23 @@
-# Compiled Object files, Static and Dynamic libs (Shared Objects)
-*.o
-*.a
-*.so
+*
+!*.*
+!*/
+.*
+.*/
+*.code-workspace
 
 # Folders
+~*
 _obj
 _test
-.idea
-.vscode
-.github
-data
-
-# Architecture specific extensions/prefixes
-*.[568vq]
-[568vq].out
-
-*.cgo1.go
-*.cgo2.c
-_cgo_defun.c
-_cgo_gotypes.go
-_cgo_export.*
-
-_testmain.go
-
-test/*
-!test/*.*
-!test/*/
-
-test/*/*
-!test/*/*.*
-!test/*/*/
+data/
+bak/
 
+*.log
+*.bak
 *.exe
-*.test
-*.prof
-*.iml
 *.class
 *.tmp
-*.zip
+*.a
+*.o
+*.so
+*.zip

+ 35 - 19
datasync/datasync.conf

@@ -1,4 +1,9 @@
 
+[log]
+# 日志输出级别,info / debug,默认 info
+level=debug
+
+
 [datasync]
 from.odbserver=127.0.0.1:11001
 from.keyspace=matrix
@@ -9,9 +14,9 @@ to.keyspace=oktest
 # 可以配置多个 类名 或 查询语句,查询结果中必须包含 class id 字段
 # 查询语句中可以指定 where 条件,不支持聚合、排序、limit 等子句
 # 不配置将读取所有类数据
-from.data=/test/
+# from.data=/test/
 # from.data=/test/bucketpromdb
-from.data=select * from /test/alert_status where vtime>'2022-03-10 16:53:43'
+# from.data=select * from /test/alert_status where vtime>'2022-03-10 16:53:43'
 from.data=/matrix/ldap
 from.data=/matrix/group
 from.data=/matrix/perms/
@@ -32,39 +37,50 @@ deny.class=/testdc2/
 deny.class=/testruledata
 deny.class=/aywl/
 
+# 类映射
+# mapping.class.fromclass=toclass
+
+# 指定建类语句等初始化mql语句,可以是mql文件或mql语句,默认根据原始元数据信息自动建类
+# 尚未实现定制化
+# to.init.mql= 
+
+# 目标数据存在时的选项 overwrite / ignore / error,默认 overwrite
+# 尚未实现定制化
+# exists=overwrite
 
 # 只同步指定的最近一段时间的对象数据,默认 365d 
 data.time.since=3000d
 # 只同步指定的最近一段时间的Bucket数据,默认 30d 
 bucket.time.since=30d
 
-# 指定建类语句等初始化mql语句,可以是mql文件或mql语句,默认根据原始元数据信息自动建类
-#to.init.mql= 
-
-# 类映射
-# mapping.class.fromclass=toclass
-
-# overwrite / ignore / error
-# exists=overwrite
-
-# 每秒写入数据量,同步上限
+# 每秒写入数据量,同步上限,默认 0 无限制
+# 尚未实现定制化
 # limit.rate=50
 
-# 对象数据同步并发数,默认cpu数
-concur.threads=2
+# 对象数据同步控制并发数,对应from.data配置,默认cpu数
+# ctrl.threads=2
+# 对象数据同步并发数,对应源数据查询语句,默认cpu数
+# concur.threads=2
 # Bucket数据同步并发数,默认cpu数
-bucket.threads=2
+# bucket.threads=2
 # 每次读取数据的最大数量,默认50,立即生效
-pagesize=10
+pagesize=50
 # 每次读取数据的时间分段,默认一天,立即生效
+# 每次读取一个完整时间分段的数据,因此数据同步存在一个完整时间分段的延迟
 pagetime=1d
 # 每次读取bucket数据的时间分段,默认1h,立即生效
+# 每次读取一个完整时间分段的数据,因此数据同步存在一个完整时间分段的延迟
+# 设为 0 不同步bucket数据
 bucket.pagetime=1d
-# 用于存放同步进度记录
+# 用于存放同步进度记录,新目录会自动创建
 data.dir=data
 # 持续运行间隔,数据同步到当前时间后,间隔指定时间轮询执行
-# 默认 0 运行一次,数据同步到当前时间后退出
-# 立即生效
+# 设为 0 只运行一次,数据同步到当前时间后退出
+# 默认 1m,立即生效
 run.interval=5s
 # 每次轮询读取数据的重叠时间,默认30s,立即生效
+# 从最后同步完成时间回退一段时间,防止时钟不同步等原因导致运行间隔期间新产生的数据丢失
 run.overtime=30s
+
+# 运行参数
+# reload 重新开始同步

+ 9 - 4
datasync/datasync/datasync.go

@@ -13,11 +13,11 @@ import (
 	"git.wecise.com/wecise/odb-go/dbo"
 	"git.wecise.com/wecise/odb-go/odb"
 	"git.wecise.com/wecise/odb-go/odbc"
-	"github.com/scylladb/go-set/strset"
-	"github.com/spf13/cast"
 	"gitee.com/wecisecode/util/merrs"
 	"gitee.com/wecisecode/util/mfmt"
 	"gitee.com/wecisecode/util/rc"
+	"github.com/scylladb/go-set/strset"
+	"github.com/spf13/cast"
 )
 
 var mcfg = odbc.Config
@@ -108,7 +108,11 @@ func (ds *DataSync) Run() (done <-chan error) {
 	key := regexp.MustCompile(`\W`).ReplaceAllString(strings.Split(ds.fromodbserver, ",")[0]+"_"+ds.fromkeyspace+"_"+strings.Split(ds.toodbserver, ",")[0]+"_"+ds.tokeyspace, "_")
 	ds.syncstatus = NewSyncStatus(key)
 	if !mcfg.GetBool("reload") {
-		ds.syncstatus.Load()
+		e := ds.syncstatus.Load()
+		if e != nil {
+			ret <- e
+			return ret
+		}
 	}
 	go ds.run(ret)
 	return ret
@@ -136,6 +140,7 @@ func (ds *DataSync) run(ret chan error) {
 	defer cancel()
 	ds.ctx = ctx
 	ds.cancel = cancel
+	logger.Info("resume sync data, from", len(fromdatas), "configure")
 	for {
 		ds.wg = &sync.WaitGroup{}
 		ds.syncstatus.Resume()
@@ -189,7 +194,7 @@ func (ds *DataSync) startsyncproc(wg *sync.WaitGroup, rc *rc.RoutinesController,
 // 同步一块数据
 // mqlfrom 可以是类名 或 查询语句
 func (ds *DataSync) syncdata(mqlfrom string) error {
-	// 一格式化为查询语句
+	// 一格式化为查询语句
 	mqlfrom = FormatMQL(mqlfrom)
 	// 已完成同步进度
 	fromclass, fields, condition, e := ds.LastSyncProgress(mqlfrom)

+ 117 - 0
syslog/syslog.go

@@ -0,0 +1,117 @@
+//go:build !windows && !plan9
+
+package main
+
+import (
+	"bufio"
+	"flag"
+	"fmt"
+	"log"
+	"log/syslog"
+	"os"
+	"strings"
+	"time"
+)
+
+func main() {
+	var ip = flag.String("ip", "localhost:514", "ip:port")
+	var cycle = flag.Int("cycle", 1000, "repeat number")
+	var cycleSpan = flag.Int("sleep", 1, "sleep millisecond after one cycle")
+	var span = flag.Int("span", 1, "sleep 1 millisecond every span row")
+	var protocol = flag.String("protocol", "udp", "udp/tcp")
+	var incTime = flag.Int("inctime", 1, "Increase seconds")
+	var file = flag.String("file", "", "filenamesave for save")
+
+	flag.Parse()
+
+	var writer *bufio.Writer
+
+	if *file != "" {
+		// 创建输出文件
+		outputFile, err := os.Create(*file)
+		if err != nil {
+			log.Fatal("无法创建输出文件:", err)
+		}
+		defer outputFile.Close()
+
+		writer = bufio.NewWriter(outputFile)
+		defer writer.Flush() // 确保所有缓冲数据写入磁盘
+	}
+
+	filePath := os.Args[len(os.Args)-1]
+	readFile, err := os.Open(filePath)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	fileScanner := bufio.NewScanner(readFile)
+	fileScanner.Split(bufio.ScanLines)
+	var fileLines []string
+	for fileScanner.Scan() {
+		fileLines = append(fileLines, fileScanner.Text())
+	}
+	_ = readFile.Close()
+
+	/*for _, line := range fileLines {
+	      fmt.Println(line)
+	  }
+
+	  fmt.Println(fileLines)
+	*/
+
+	sysLog, err := syslog.Dial(*protocol, *ip, syslog.LOG_WARNING|syslog.LOG_DAEMON, "demo")
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	// 11-Jul-2023 16:00:01 DNS systemd: Started Session 1370 of user root.
+	format := "02-Jan-2006 15:04:05"
+	total := 0
+	loop := 0
+	for i := 0; i < *cycle; i++ {
+		incDuration := time.Second * time.Duration(i*(*incTime))
+		for k, line := range fileLines {
+			if *incTime != 0 && line != "" {
+				fields := strings.Fields(line)
+				if len(fields) > 2 {
+					timeStr := fields[0] + " " + fields[1]
+					line = strings.Join(fields[2:], " ")
+					t, err := time.Parse(format, timeStr)
+					if err != nil {
+						log.Fatal(err)
+					}
+					t = t.Add(incDuration)
+					line = t.Format(format) + " " + line
+				}
+			}
+
+			if *file == "" {
+				if err = sysLog.Emerg(line); err != nil {
+					log.Fatal(err)
+				}
+			} else {
+				// 将行内容写入输出文件(包括换行符)
+				_, err := writer.WriteString(line + "\n")
+				if err != nil {
+					log.Fatal("写入文件失败:", err)
+				}
+
+				// 每处理100行刷新一次缓冲区(可选)
+				if total%10 == 0 {
+					writer.Flush()
+				}
+
+			}
+
+			if *span > 0 && (k%*span == 0) {
+				time.Sleep(time.Duration(*span) * time.Millisecond)
+			}
+			total++
+		}
+		loop++
+		fmt.Printf("send %d , total %d row .\n", loop, total)
+		if *cycleSpan > 0 {
+			time.Sleep(time.Duration(*cycleSpan) * time.Millisecond)
+		}
+	}
+}