libf 1 год назад
Родитель
Сommit
05c744adf7
4 измененных файлов с 53 добавлено и 22 удалено
  1. 1 1
      go.mod
  2. 6 0
      go.sum
  3. 24 1
      odbclient/client.go
  4. 22 20
      odbclient/client_test.go

+ 1 - 1
go.mod

@@ -35,7 +35,7 @@ require (
 
 require (
 	git.wecise.com/wecise/common v0.0.0-20230614163235-0dc2eb7c9aad
-	git.wecise.com/wecise/odb-go v0.0.0-20230626070254-ebbbd95408b4
+	git.wecise.com/wecise/odb-go v0.0.0-20230627132242-9e5f6810a6be
 	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
 	github.com/gomodule/redigo v1.8.5 // indirect

+ 6 - 0
go.sum

@@ -4,6 +4,12 @@ git.wecise.com/wecise/common v0.0.0-20230614163235-0dc2eb7c9aad h1:fdnnu3PO6v7VQ
 git.wecise.com/wecise/common v0.0.0-20230614163235-0dc2eb7c9aad/go.mod h1:VDERBLdm5+B3J/y4HpSLMobmFzRbL1f/2m1+pm68jWI=
 git.wecise.com/wecise/odb-go v0.0.0-20230626070254-ebbbd95408b4 h1:OROAvnnhaup259G0aravPV7MEWVCUugApKkM3VLMjwI=
 git.wecise.com/wecise/odb-go v0.0.0-20230626070254-ebbbd95408b4/go.mod h1:IwKuJPFQl4C3TK3Qyoo+qppM82f/5xJFwMHW2hKV4uA=
+git.wecise.com/wecise/odb-go v0.0.0-20230627124907-407af65fba96 h1:fpOvvZeKKQgOU7EyMYeJr9JS9f94cGcy1c8a96n8OcA=
+git.wecise.com/wecise/odb-go v0.0.0-20230627124907-407af65fba96/go.mod h1:IwKuJPFQl4C3TK3Qyoo+qppM82f/5xJFwMHW2hKV4uA=
+git.wecise.com/wecise/odb-go v0.0.0-20230627130330-91aa59d544cc h1:V4OUvQUs0bcwGKWKbilFCgMmx6uuTNokzdDc2KCiXD0=
+git.wecise.com/wecise/odb-go v0.0.0-20230627130330-91aa59d544cc/go.mod h1:IwKuJPFQl4C3TK3Qyoo+qppM82f/5xJFwMHW2hKV4uA=
+git.wecise.com/wecise/odb-go v0.0.0-20230627132242-9e5f6810a6be h1:oFRxKE5/V7qaWrnuLpa3HJ+ZiEF7JQLlVj9vYiBwrAo=
+git.wecise.com/wecise/odb-go v0.0.0-20230627132242-9e5f6810a6be/go.mod h1:IwKuJPFQl4C3TK3Qyoo+qppM82f/5xJFwMHW2hKV4uA=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
 github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=

+ 24 - 1
odbclient/client.go

@@ -1,7 +1,9 @@
 package odbclient
 
 import (
+	"context"
 	"fmt"
+	"runtime"
 	"strings"
 
 	ccfg "git.wecise.com/wecise/common/matrix/cfg"
@@ -13,12 +15,17 @@ import (
 var cfg = ccfg.MConfig()
 var log = clog.New().WithConfig(cfg, "log")
 
+func init() {
+	log.Info("cpu limit", 1)
+	runtime.GOMAXPROCS(1)
+}
+
 var odbcfg *odb.Config
 
 func init() {
 	odbpath := cfg.GetStrings("ODBPATH", "127.0.0.1:11001")
 	keyspace := cfg.GetString("KEYSPACE", "oktest")
-	poolsize := cfg.GetInt("POOLSIZE", 10)
+	poolsize := cfg.GetInt("POOLSIZE", 0)
 
 	odbpaths := strset.New()
 	for _, p := range odbpath {
@@ -31,6 +38,7 @@ func init() {
 		User:     fmt.Sprint("测试客户端", cfg.GetString("N")),
 		Pass:     "********",
 		PoolSize: poolsize,
+		Debug:    true,
 	}
 }
 
@@ -46,3 +54,18 @@ func NewClient() (c odb.Client, err error) {
 	}
 	return client, nil
 }
+
+func Subscribe(client odb.Client, topic string) {
+	sub := client.Subscribe(context.Background(), topic)
+	go func() {
+		defer func() {
+			if err := sub.Unsubscribe(context.Background()); err != nil {
+				log.Error(err)
+				return
+			}
+		}()
+		for m := range sub.Channel() {
+			log.Info(topic + " received " + m.Payload)
+		}
+	}()
+}

+ 22 - 20
odbclient/client_test.go

@@ -20,16 +20,16 @@ var log = clog.New().WithConfig(cfg, "log")
 func TestConcurrent(t *testing.T) {
 	var n, reqcount, okcount, confirmcount, errcount int32
 	odb.QueryDoAsync = true
-	fmt.Println("QueryDoAsync =", odb.QueryDoAsync)
-	fmt.Println("开始测试连接到", odbclient.Config().Hosts)
+	log.Info("QueryDoAsync =", odb.QueryDoAsync)
+	log.Info("开始测试连接到", odbclient.Config().Hosts)
 	client, err := odbclient.NewClient()
 	if err != nil {
-		println(fmt.Sprint("connection error:", err, atomic.AddInt32(&errcount, 1)))
+		log.Error(fmt.Sprint("connection error:", err, atomic.AddInt32(&errcount, 1)))
 		return
 	}
 	defer func() {
 		_ = client.Close()
-		fmt.Println("connection closed")
+		log.Info("connection closed")
 	}()
 
 	classname := "/oktest/alert_status"
@@ -38,7 +38,11 @@ func TestConcurrent(t *testing.T) {
 		classname += "_" + strconv.Itoa(N)
 	}
 
-	fmt.Println("开始创建相关类", classname)
+	log.Info("开启事件订阅")
+	odbclient.Subscribe(client, "CLASS_DATA_CHANGE:/")
+	odbclient.Subscribe(client, "META_DATA_CHANGE:/")
+
+	log.Info("开始创建相关类", classname)
 	cns := regexp.MustCompile("^/([^/]+)/([^/]+)").FindAllStringSubmatch(classname, -1)
 	rootclass := "/" + cns[0][1]
 	aliasname := cns[0][2] + "_" + cns[0][1]
@@ -166,25 +170,25 @@ func TestConcurrent(t *testing.T) {
 	)with ttl=366 day , autosearch=true , version=true , key=manu, alias='` + aliasname + `' , nickname='` + aliasname + `' ;
 	`).Do()
 	if err != nil {
-		println(fmt.Sprint("query count error:", err, atomic.AddInt32(&errcount, 1)))
+		log.Error(fmt.Sprint("query count error:", err, atomic.AddInt32(&errcount, 1)))
 	}
 
-	fmt.Println("检查历史数据")
+	log.Info("检查历史数据")
 
 	result, err := client.Query("select count(id) as count FROM " + classname + " LIMIT -1").Do()
 	if err != nil {
-		println(fmt.Sprint("query count error:", err, atomic.AddInt32(&errcount, 1)))
+		log.Error(fmt.Sprint("query count error:", err, atomic.AddInt32(&errcount, 1)))
 	} else {
-		fmt.Println(fmt.Sprintf("query count return: %#v", result.Data[0]["count"]))
+		log.Info(fmt.Sprintf("query count return: %#v", result.Data[0]["count"]))
 	}
 
-	fmt.Println("清除历史数据")
+	log.Info("清除历史数据")
 
 	_, err = client.Query("delete FROM " + classname + "").Do()
 	if err != nil {
-		println(fmt.Sprint("delete error:", err, atomic.AddInt32(&errcount, 1)))
+		log.Error(fmt.Sprint("delete error:", err, atomic.AddInt32(&errcount, 1)))
 	} else {
-		fmt.Println(fmt.Sprintf("delete ok"))
+		log.Info(fmt.Sprintf("delete ok"))
 	}
 	st := time.Now()
 	go func() {
@@ -192,11 +196,11 @@ func TestConcurrent(t *testing.T) {
 		for {
 			select {
 			case <-t.C:
-				fmt.Println(time.Since(st), "插入数据", "req:", atomic.AddInt32(&reqcount, 0), "ok:", atomic.AddInt32(&okcount, 0), "confirm:", atomic.AddInt32(&confirmcount, 0), "err:", atomic.AddInt32(&errcount, 0))
+				log.Info(time.Since(st), "插入数据", "req:", atomic.AddInt32(&reqcount, 0), "ok:", atomic.AddInt32(&okcount, 0), "confirm:", atomic.AddInt32(&confirmcount, 0), "err:", atomic.AddInt32(&errcount, 0))
 			}
 		}
 	}()
-	for i := 0; i < odbclient.Config().PoolSize*1000; i++ {
+	for i := 0; i < client.Config().PoolSize*1000; i++ {
 		go func(i int) {
 			for {
 				identifier := fmt.Sprint("222.129.134.178.1.3.6.1.4.1.2011.5.25.219.2.5.665.25.219.2.5.6.", i, ".", atomic.AddInt32(&n, 1))
@@ -205,19 +209,17 @@ func TestConcurrent(t *testing.T) {
 				atomic.AddInt32(&reqcount, 1)
 				result, err := client.Query(mql, params...).Do()
 				if err != nil {
-					println(fmt.Sprint("connection", i, err, atomic.AddInt32(&errcount, 1)))
+					log.Error(fmt.Sprint("connection", i, err, atomic.AddInt32(&errcount, 1)))
 					time.Sleep(1 * time.Second)
 				} else {
 					atomic.AddInt32(&okcount, 1)
 					result, err = client.Query("select id,tally FROM "+classname+" where identifier=?", identifier).Do()
 					if err != nil {
-						println(fmt.Sprint("connection", i, err, atomic.AddInt32(&errcount, 1)))
+						log.Error(fmt.Sprint("connection", i, err, atomic.AddInt32(&errcount, 1)))
 						time.Sleep(1 * time.Second)
 					} else {
 						atomic.AddInt32(&confirmcount, 1)
-						if "log" == "info" {
-							fmt.Println(fmt.Sprint("connection", i, result))
-						}
+						log.Debug(fmt.Sprint("connection", i, result))
 						// time.Sleep(1 * time.Second)
 					}
 				}
@@ -226,5 +228,5 @@ func TestConcurrent(t *testing.T) {
 		}(i)
 	}
 	time.Sleep(3600 * time.Second)
-	fmt.Println("测试结束")
+	log.Info("测试结束")
 }