promdb_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package promdb_test
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "sync/atomic"
  7. "testing"
  8. "time"
  9. "gitee.com/wecisecode/util/rc"
  10. "github.com/stretchr/testify/assert"
  11. )
  12. // 可通过 build.sh 编译成执行文件
  13. // 测试 Promdb 数据持续插入对 odbserver 的影响
  14. func TestPromdbInsert(ti *testing.T) {
  15. t = ti
  16. defer func() {
  17. x := recover()
  18. if x != nil && x != "" {
  19. print(fmt.Sprint(x))
  20. }
  21. }()
  22. InitODBC()
  23. PromdbInitClass()
  24. PromdbInitID()
  25. PromdbInsertData()
  26. }
  27. // 初始化类
  28. func PromdbInitClass() {
  29. logger.Info("PromdbInitClass")
  30. _, err := ODBC.Query(`
  31. create class if not exists /test ()
  32. ;
  33. create class if not exists /test/bucketpromdb (
  34. host varchar,
  35. prom bucket {
  36. "type" : "promdb",
  37. "ttl" : 2,
  38. "dict" : true,
  39. "collect": 55,
  40. "version": 0,
  41. "slot" : 5
  42. } 'full',
  43. keys(host),
  44. indexes(host)
  45. ) with key=manu, nickname='bucketpromdb'
  46. ;
  47. `).Do()
  48. if !assert.Nil(t, err, err) {
  49. panic("")
  50. }
  51. }
  52. // 初始化 ID
  53. func PromdbInitID() {
  54. logger.Info("PromdbClearData")
  55. buf := new(bytes.Buffer)
  56. buf.WriteString(` delete prom from /test/bucketpromdb ;
  57. delete from /test/bucketpromdb ; `)
  58. _, err := ODBC.Query(string(buf.Bytes())).Do()
  59. if !assert.Nil(t, err, err) {
  60. panic("")
  61. }
  62. logger.Info("PromdbInitID")
  63. for i := 0; i < 1000; i++ {
  64. buf = new(bytes.Buffer)
  65. buf.WriteString(fmt.Sprintf("insert into /test/bucketpromdb (id) values ('bucketpromdb:wecise%d');", i))
  66. _, err = ODBC.Query(string(buf.Bytes())).Do()
  67. if !assert.Nil(t, err, err) {
  68. panic("")
  69. }
  70. }
  71. }
  72. // 插入数据
  73. func PromdbInsertData() {
  74. logger.Info("PromdbInsertData")
  75. // 并发控制
  76. cc := rc.NewRoutinesController("", 1000)
  77. var count int32
  78. var errorcount int32
  79. var waitConcurQueue = false
  80. // 持续循环直至出错退出
  81. for errorcount == 0 {
  82. if cc.QueueCount() > 100000 || waitConcurQueue {
  83. // 等待并发执行队列减少后继续
  84. time.Sleep(1 * time.Second)
  85. waitConcurQueue = cc.QueueCount() > 10000
  86. } else {
  87. // 并发执行
  88. cc.ConcurCall(1, func() {
  89. // 随机选一个ID
  90. idn := rand.Int() % 1000
  91. id := fmt.Sprint("bucketpromdb:wecise", idn)
  92. // 生成随机数据
  93. keyn := rand.Int() % 1000
  94. // 生成随机数据
  95. labeln := rand.Int() % 10000
  96. key := fmt.Sprint("key", keyn) // 随机key
  97. value := fmt.Sprintf("%0.2f", rand.Float32()) // 随机设置一个value
  98. timestamp := time.Now().Format("2006-01-02 15:04:05") // 当前时间,精确到秒
  99. label := fmt.Sprint("label", labeln) // 随机key对应的label
  100. data := `{"name": "` + key + `", "value": ` + value + `, "timestamp": "` + timestamp + `", "label": "` + label + `"}`
  101. // 插入数据
  102. p := `["` + id + `", ` + data + `]`
  103. _, err := ODBC.Query(`insert into /test/bucketpromdb (id, prom) values (?, ? )`, Params(p)...).Do()
  104. if !assert.Nil(t, err, err) {
  105. atomic.AddInt32(&errorcount, 1) // 出错退出标记
  106. return
  107. }
  108. n := atomic.AddInt32(&count, 1) // 取整输出计数
  109. if n%1000 == 0 {
  110. logger.Info("PromdbInsertData ", n, "/", cc.QueueCount())
  111. }
  112. })
  113. }
  114. }
  115. }