libf 1 ano atrás
pai
commit
3774703db2
6 arquivos alterados com 42 adições e 34 exclusões
  1. 1 1
      go.mod
  2. 2 0
      go.sum
  3. 5 6
      odbclient/client_test.go
  4. 0 0
      odbc/make.sh
  5. 1 27
      odbclient/client.go
  6. 33 0
      odbc/subscribe.go

+ 1 - 1
go.mod

@@ -35,7 +35,7 @@ require (
 
 require (
 	git.wecise.com/wecise/common v0.0.0-20230614163235-0dc2eb7c9aad
-	git.wecise.com/wecise/odb-go v0.0.0-20230628074400-771e6bcd95fd
+	git.wecise.com/wecise/odb-go v0.0.0-20230628123346-dccc08180677
 	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
 	github.com/gomodule/redigo v1.8.5 // indirect

+ 2 - 0
go.sum

@@ -38,6 +38,8 @@ git.wecise.com/wecise/odb-go v0.0.0-20230628070448-ce59fdc458c1 h1:XVJ7TApbUpVCA
 git.wecise.com/wecise/odb-go v0.0.0-20230628070448-ce59fdc458c1/go.mod h1:Ug7qgUkXxI4SoVqR4ASMCvzLWzqHVQDFk5vUE023Eyg=
 git.wecise.com/wecise/odb-go v0.0.0-20230628074400-771e6bcd95fd h1:yJSzDkDUOhKUuCIRxpsim4X+MfSfwAo2RyM9LMzW6z0=
 git.wecise.com/wecise/odb-go v0.0.0-20230628074400-771e6bcd95fd/go.mod h1:Ug7qgUkXxI4SoVqR4ASMCvzLWzqHVQDFk5vUE023Eyg=
+git.wecise.com/wecise/odb-go v0.0.0-20230628123346-dccc08180677 h1:5jXJ5Hv+hpwl+cxuxF33F1YNFFWp496+BfdQG8rNVxQ=
+git.wecise.com/wecise/odb-go v0.0.0-20230628123346-dccc08180677/go.mod h1:Ug7qgUkXxI4SoVqR4ASMCvzLWzqHVQDFk5vUE023Eyg=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
 github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=

+ 5 - 6
odbclient/client_test.go

@@ -1,4 +1,4 @@
-package odbclient_test
+package odbc_test
 
 import (
 	"fmt"
@@ -11,7 +11,7 @@ import (
 	ccfg "git.wecise.com/wecise/common/matrix/cfg"
 	clog "git.wecise.com/wecise/common/matrix/logger"
 	"git.wecise.com/wecise/odb-go/odb"
-	"git.wecise.com/wecise/odbtest/odbclient"
+	"git.wecise.com/wecise/odbtest/odbc"
 )
 
 var cfg = ccfg.MConfig()
@@ -21,8 +21,8 @@ func TestConcurrent(t *testing.T) {
 	var n, reqcount, okcount, confirmcount, errcount int32
 	odb.QueryDoAsync = true
 	log.Info("QueryDoAsync =", odb.QueryDoAsync)
-	log.Info("开始测试连接到", odbclient.Config().Hosts)
-	client, err := odbclient.NewClient()
+	log.Info("开始测试连接到", odbc.Config().Hosts)
+	client, err := odbc.NewClient()
 	if err != nil {
 		log.Error(fmt.Sprint("connection error:", err, atomic.AddInt32(&errcount, 1)))
 		return
@@ -39,8 +39,7 @@ func TestConcurrent(t *testing.T) {
 	}
 
 	log.Info("开启事件订阅")
-	odbclient.Subscribe(client, "CLASS_DATA_CHANGE:/")
-	odbclient.Subscribe(client, "META_DATA_CHANGE:/")
+	odbc.SubscribeTest(client)
 
 	log.Info("开始创建相关类", classname)
 	cns := regexp.MustCompile("^/([^/]+)/([^/]+)").FindAllStringSubmatch(classname, -1)

odbclient/make.sh → odbc/make.sh


+ 1 - 27
odbclient/client.go

@@ -1,8 +1,6 @@
-package odbclient
+package odbc
 
 import (
-	"context"
-	"encoding/json"
 	"fmt"
 	"runtime"
 	"strings"
@@ -10,9 +8,7 @@ import (
 	ccfg "git.wecise.com/wecise/common/matrix/cfg"
 	clog "git.wecise.com/wecise/common/matrix/logger"
 	"git.wecise.com/wecise/odb-go/odb"
-	"git.wecise.com/wecise/odb-go/odb/eventmsg"
 	"github.com/scylladb/go-set/strset"
-	"gopkg.in/yaml.v3"
 )
 
 var cfg = ccfg.MConfig()
@@ -57,25 +53,3 @@ func NewClient() (c odb.Client, err error) {
 	}
 	return client, nil
 }
-
-func Subscribe(client odb.Client, topic string) {
-	sub := client.Subscribe(context.Background(), topic)
-	go func() {
-		defer func() {
-			if err := sub.Unsubscribe(context.Background()); err != nil {
-				log.Error(err)
-				return
-			}
-		}()
-		for m := range sub.Channel() {
-			oem, err := eventmsg.FromMsgpack([]byte(m.Payload))
-			if err != nil {
-				log.Error(err)
-			}
-			mm := map[string]interface{}{}
-			json.Unmarshal([]byte(oem.JsonString()), &mm)
-			bs, _ := yaml.Marshal(map[string]interface{}{"oem": mm})
-			log.Info(topic + " received " + m.Payload + "\n" + string(bs))
-		}
-	}()
-}

+ 33 - 0
odbc/subscribe.go

@@ -0,0 +1,33 @@
+package odbc
+
+import (
+	"context"
+
+	"git.wecise.com/wecise/odb-go/odb"
+	"git.wecise.com/wecise/odb-go/odb/eventmsg"
+)
+
+func SubscribeTest(client odb.Client) {
+	Subscribe(client, "CLASS_DATA_CHANGE:/")
+	Subscribe(client, "META_DATA_CHANGE:/")
+}
+
+func Subscribe(client odb.Client, topic string) {
+	sub := client.Subscribe(context.Background(), topic)
+	go func() {
+		defer func() {
+			if err := sub.Unsubscribe(context.Background()); err != nil {
+				log.Error(err)
+				return
+			}
+		}()
+		for m := range sub.Channel() {
+			oem, err := eventmsg.FromMsgpack([]byte(m.Payload))
+			if err != nil {
+				log.Error(err)
+			}
+			string_message := oem.Data.String()
+			log.Info(topic + " received " + topic + " message " + "\n" + string_message)
+		}
+	}()
+}