package promdb_test import ( "bytes" "fmt" "math/rand" "sync/atomic" "testing" "time" "gitee.com/wecisecode/util/rc" "github.com/stretchr/testify/assert" ) // 可通过 build.sh 编译成执行文件 // 测试 Promdb 数据持续插入对 odbserver 的影响 func TestPromdbInsert(ti *testing.T) { t = ti defer func() { x := recover() if x != nil && x != "" { print(fmt.Sprint(x)) } }() InitODBC() PromdbInitClass() PromdbInitID() PromdbInsertData() } // 初始化类 func PromdbInitClass() { logger.Info("PromdbInitClass") _, err := ODBC.Query(` create class if not exists /test () ; create class if not exists /test/bucketpromdb ( host varchar, prom bucket { "type" : "promdb", "ttl" : 2, "dict" : true, "collect": 55, "version": 0, "slot" : 5 } 'full', keys(host), indexes(host) ) with key=manu, nickname='bucketpromdb' ; `).Do() if !assert.Nil(t, err, err) { panic("") } } // 初始化 ID func PromdbInitID() { logger.Info("PromdbClearData") buf := new(bytes.Buffer) buf.WriteString(` delete prom from /test/bucketpromdb ; delete from /test/bucketpromdb ; `) _, err := ODBC.Query(string(buf.Bytes())).Do() if !assert.Nil(t, err, err) { panic("") } logger.Info("PromdbInitID") for i := 0; i < 1000; i++ { buf = new(bytes.Buffer) buf.WriteString(fmt.Sprintf("insert into /test/bucketpromdb (id) values ('bucketpromdb:wecise%d');", i)) _, err = ODBC.Query(string(buf.Bytes())).Do() if !assert.Nil(t, err, err) { panic("") } } } // 插入数据 func PromdbInsertData() { logger.Info("PromdbInsertData") // 并发控制 cc := rc.NewRoutinesController("", 1000) var count int32 var errorcount int32 var waitConcurQueue = false // 持续循环直至出错退出 for errorcount == 0 { if cc.QueueCount() > 100000 || waitConcurQueue { // 等待并发执行队列减少后继续 time.Sleep(1 * time.Second) waitConcurQueue = cc.QueueCount() > 10000 } else { // 并发执行 cc.ConcurCall(1, func() { // 随机选一个ID idn := rand.Int() % 1000 id := fmt.Sprint("bucketpromdb:wecise", idn) // 生成随机数据 keyn := rand.Int() % 1000 // 生成随机数据 labeln := rand.Int() % 10000 key := fmt.Sprint("key", keyn) // 随机key value := fmt.Sprintf("%0.2f", rand.Float32()) // 随机设置一个value timestamp := time.Now().Format("2006-01-02 15:04:05") // 当前时间,精确到秒 label := fmt.Sprint("label", labeln) // 随机key对应的label data := `{"name": "` + key + `", "value": ` + value + `, "timestamp": "` + timestamp + `", "label": "` + label + `"}` // 插入数据 p := `["` + id + `", ` + data + `]` _, err := ODBC.Query(`insert into /test/bucketpromdb (id, prom) values (?, ? )`, Params(p)...).Do() if !assert.Nil(t, err, err) { atomic.AddInt32(&errorcount, 1) // 出错退出标记 return } n := atomic.AddInt32(&count, 1) // 取整输出计数 if n%1000 == 0 { logger.Info("PromdbInsertData ", n, "/", cc.QueueCount()) } }) } } }