libf 3 maanden geleden
bovenliggende
commit
3a52527acc
3 gewijzigde bestanden met toevoegingen van 117 en 0 verwijderingen
  1. 89 0
      main.go
  2. 22 0
      odbtools.conf
  3. 6 0
      omtool/omtool.go

+ 89 - 0
main.go

@@ -0,0 +1,89 @@
+package main
+
+import (
+	"fmt"
+	"os"
+	"time"
+
+	"git.wecise.com/wecise/odb-go/odbc"
+	"git.wecise.com/wecise/odbtools/omtool"
+	"github.com/wecisecode/util/cmap"
+)
+
+var mcfg, logger = odbc.SetDefaultAppName("odbtools")
+
+func main() {
+	odbtools := NewODBTools()
+	e := odbtools.Init()
+	if e != nil {
+		println(fmt.Sprint(e))
+		os.Exit(1)
+		return
+	}
+	e = odbtools.Run()
+	if e != nil {
+		println(fmt.Sprint(e))
+		os.Exit(1)
+		return
+	}
+	time.Sleep(1 * time.Second)
+}
+
+type ODBTools struct {
+	tools cmap.ConcurrentMap[omtool.OMTool, bool]
+}
+
+func NewODBTools() *ODBTools {
+	tools := cmap.NewSingle(map[omtool.OMTool]bool{
+		// datasync.NewDataSync(): false,
+	})
+	return &ODBTools{
+		tools: tools,
+	}
+}
+
+func (ots *ODBTools) Init() error {
+	for i := range ots.tools.IterBuffered() {
+		e := i.Key.Init()
+		if e != nil {
+			if !odbc.NoConfError.Contains(e) {
+				return e
+			}
+			ots.tools.Set(i.Key, false)
+		} else {
+			ots.tools.Set(i.Key, true)
+			logger.Info(fmt.Sprintf("%T inited", i.Key))
+		}
+	}
+	return nil
+}
+
+func (ots *ODBTools) Run() error {
+	dones := map[omtool.OMTool]<-chan error{}
+	for i := range ots.tools.IterBuffered() {
+		configured := i.Val
+		if configured {
+			done := i.Key.Run()
+			if done != nil {
+				dones[i.Key] = done
+			} else {
+				logger.Info(fmt.Sprintf("%T not run", i.Key))
+			}
+		}
+	}
+	for len(dones) > 0 {
+		for kt, done := range dones {
+			select {
+			case e := <-done:
+				logger.Info(fmt.Sprintf("%T done", kt))
+				if e != nil {
+					logger.Error(e)
+				}
+				delete(dones, kt)
+			default:
+			}
+		}
+		time.Sleep(1 * time.Second)
+	}
+	return nil
+}

+ 22 - 0
odbtools.conf

@@ -0,0 +1,22 @@
+
+[datasync]
+from.odbserver=127.0.0.1:11001
+from.keyspace=oktest
+
+to.odbserver=127.0.0.1:11001
+to.keyspace=oktest 
+
+# 可以配置多个 类名 或 查询语句,查询结果中必须包含 class id 字段
+# 查询语句中不要使用 limit 子句
+# 不配置将读取所有类数据
+from.data= /test/basic
+from.data= select * from /test/basic2 where vtime>'2025-03-10 16:53:43'
+
+# 指定建类语句等初始化mql语句,可以是mql文件或mql语句
+to.init.mql= 
+
+# overwrite / ignore / error
+exists=overwrite
+
+# 每秒写入数据量,同步上限
+limit.rate=50

+ 6 - 0
omtool/omtool.go

@@ -0,0 +1,6 @@
+package omtool
+
+type OMTool interface {
+	Init() (err error)
+	Run() (done <-chan error)
+}