| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- package promdb_test
- import (
- "bytes"
- "fmt"
- "math/rand"
- "sync/atomic"
- "testing"
- "time"
- "gitee.com/wecisecode/util/rc"
- "github.com/stretchr/testify/assert"
- )
- // 可通过 build.sh 编译成执行文件
- // 测试 Promdb 数据持续插入对 odbserver 的影响
- func TestPromdbInsert(ti *testing.T) {
- t = ti
- defer func() {
- x := recover()
- if x != nil && x != "" {
- print(fmt.Sprint(x))
- }
- }()
- InitODBC()
- PromdbInitClass()
- PromdbInitID()
- PromdbInsertData()
- }
- // 初始化类
- func PromdbInitClass() {
- logger.Info("PromdbInitClass")
- _, err := ODBC.Query(`
- create class if not exists /test ()
- ;
-
- create class if not exists /test/bucketpromdb (
-
- host varchar,
-
- prom bucket {
- "type" : "promdb",
- "ttl" : 2,
- "dict" : true,
- "collect": 55,
- "version": 0,
- "slot" : 5
- } 'full',
- keys(host),
- indexes(host)
-
- ) with key=manu, nickname='bucketpromdb'
- ;
- `).Do()
- if !assert.Nil(t, err, err) {
- panic("")
- }
- }
- // 初始化 ID
- func PromdbInitID() {
- logger.Info("PromdbClearData")
- buf := new(bytes.Buffer)
- buf.WriteString(` delete prom from /test/bucketpromdb ;
- delete from /test/bucketpromdb ; `)
- _, err := ODBC.Query(string(buf.Bytes())).Do()
- if !assert.Nil(t, err, err) {
- panic("")
- }
- logger.Info("PromdbInitID")
- for i := 0; i < 1000; i++ {
- buf = new(bytes.Buffer)
- buf.WriteString(fmt.Sprintf("insert into /test/bucketpromdb (id) values ('bucketpromdb:wecise%d');", i))
- _, err = ODBC.Query(string(buf.Bytes())).Do()
- if !assert.Nil(t, err, err) {
- panic("")
- }
- }
- }
- // 插入数据
- func PromdbInsertData() {
- logger.Info("PromdbInsertData")
- // 并发控制
- cc := rc.NewRoutinesController("", 1000)
- var count int32
- var errorcount int32
- var waitConcurQueue = false
- // 持续循环直至出错退出
- for errorcount == 0 {
- if cc.QueueCount() > 100000 || waitConcurQueue {
- // 等待并发执行队列减少后继续
- time.Sleep(1 * time.Second)
- waitConcurQueue = cc.QueueCount() > 10000
- } else {
- // 并发执行
- cc.ConcurCall(1, func() {
- // 随机选一个ID
- idn := rand.Int() % 1000
- id := fmt.Sprint("bucketpromdb:wecise", idn)
- // 生成随机数据
- keyn := rand.Int() % 1000
- // 生成随机数据
- labeln := rand.Int() % 10000
- key := fmt.Sprint("key", keyn) // 随机key
- value := fmt.Sprintf("%0.2f", rand.Float32()) // 随机设置一个value
- timestamp := time.Now().Format("2006-01-02 15:04:05") // 当前时间,精确到秒
- label := fmt.Sprint("label", labeln) // 随机key对应的label
- data := `{"name": "` + key + `", "value": ` + value + `, "timestamp": "` + timestamp + `", "label": "` + label + `"}`
- // 插入数据
- p := `["` + id + `", ` + data + `]`
- _, err := ODBC.Query(`insert into /test/bucketpromdb (id, prom) values (?, ? )`, Params(p)...).Do()
- if !assert.Nil(t, err, err) {
- atomic.AddInt32(&errorcount, 1) // 出错退出标记
- return
- }
- n := atomic.AddInt32(&count, 1) // 取整输出计数
- if n%1000 == 0 {
- logger.Info("PromdbInsertData ", n, "/", cc.QueueCount())
- }
- })
- }
- }
- }
|