testbucket.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package main
  2. import (
  3. "encoding/json"
  4. "flag"
  5. "fmt"
  6. "os"
  7. "runtime/pprof"
  8. "strconv"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "git.wecise.com/wecise/common/mnats"
  13. "git.wecise.com/wecise/odbserver/odb"
  14. )
  15. var (
  16. isCreateObject = flag.Bool("o", false, "Create object data.")
  17. isSendParser = flag.Bool("p", false, "Send mql to cassandra service of parser.")
  18. ds = flag.Int("d", 100000, "Data size.")
  19. ws = flag.Int("w", 40, "Worker size.")
  20. firstErr string
  21. )
  22. func main() {
  23. flag.Parse()
  24. g, err := odb.New(&odb.Option{Cache: odb.CacheAll})
  25. if err != nil {
  26. fmt.Println("New odb error:", err)
  27. os.Exit(1)
  28. }
  29. dataSize := *ds
  30. workerSize := *ws
  31. if *isCreateObject {
  32. fmt.Println("Create bucket object.")
  33. if err = createObject(dataSize, g); err != nil {
  34. fmt.Println("Create object error:", err)
  35. os.Exit(2)
  36. }
  37. } else {
  38. fmt.Println("Ignore creating data.")
  39. }
  40. fmt.Println("Prepare data size:", dataSize)
  41. data := prepareMQL(dataSize)
  42. var wg sync.WaitGroup
  43. wg.Add(dataSize)
  44. ch := make(chan []interface{}, workerSize)
  45. statCh := make(chan bool, workerSize)
  46. stopCh := make(chan bool, 1)
  47. conn, err := mnats.Get()
  48. if err != nil {
  49. fmt.Println(err)
  50. os.Exit(5)
  51. }
  52. // stats
  53. go func() {
  54. var (
  55. total, success, fail int64
  56. speed float64
  57. )
  58. ticker := time.NewTicker(time.Second)
  59. start := time.Now()
  60. L:
  61. for {
  62. select {
  63. case <-stopCh:
  64. break L
  65. case ok := <-statCh:
  66. total++
  67. if ok {
  68. success++
  69. } else {
  70. fail++
  71. }
  72. case <-ticker.C:
  73. speed = float64(total) / time.Now().Sub(start).Seconds()
  74. fmt.Printf("\rtotal:%d, success:%d, fail:%d, spped:%.2f/s", total, success, fail, speed)
  75. }
  76. }
  77. }()
  78. // cpu pprof
  79. pf, err := os.Create("testbucket_cpu.pprof")
  80. if err != nil {
  81. fmt.Println(err)
  82. os.Exit(3)
  83. }
  84. defer pf.Close()
  85. if err = pprof.StartCPUProfile(pf); err != nil {
  86. fmt.Println(err)
  87. os.Exit(3)
  88. }
  89. defer pprof.StopCPUProfile()
  90. // mem pprof
  91. go func() {
  92. ticker := time.NewTicker(time.Second * 5)
  93. L:
  94. for {
  95. select {
  96. case <-stopCh:
  97. break L
  98. case <-ticker.C:
  99. func() {
  100. pmf, err := os.OpenFile("testbucket_mem.pprof", os.O_CREATE|os.O_TRUNC|os.O_RDWR, os.ModePerm)
  101. if err != nil {
  102. fmt.Println(err)
  103. os.Exit(4)
  104. }
  105. defer pmf.Close()
  106. if err = pprof.WriteHeapProfile(pmf); err != nil {
  107. fmt.Println(err)
  108. os.Exit(4)
  109. }
  110. }()
  111. }
  112. }
  113. }()
  114. // mql worker
  115. for i := 0; i < workerSize; i++ {
  116. go func() {
  117. ps, err := g.Prepare(`insert into /test/bucket2 (id, values) values (?, ?) at ?`)
  118. if err != nil {
  119. fmt.Println(err)
  120. os.Exit(5)
  121. }
  122. m := map[string]interface{}{
  123. "mql": `insert into /test/bucket2 (id, values) values (?, ?) at ?`,
  124. }
  125. for d := range ch {
  126. if *isSendParser {
  127. m["values"] = d
  128. b, _ := json.Marshal(m)
  129. if err = conn.Publish("queue.cassandra.query.matrix", b); err != nil {
  130. statCh <- false
  131. } else {
  132. statCh <- true
  133. }
  134. } else {
  135. if _, _, err = ps.Exec(d...); err != nil {
  136. statCh <- false
  137. } else {
  138. statCh <- true
  139. }
  140. }
  141. if err != nil && firstErr == "" {
  142. firstErr = err.Error()
  143. }
  144. wg.Done()
  145. }
  146. }()
  147. }
  148. // mql sender
  149. for i := range data {
  150. ch <- data[i]
  151. }
  152. wg.Wait()
  153. time.Sleep(time.Second)
  154. close(stopCh)
  155. fmt.Println()
  156. if firstErr != "" {
  157. fmt.Println("Error info:", firstErr)
  158. }
  159. }
  160. func createObject(num int, g *odb.Gutil) error {
  161. ps, err := g.Prepare("insert into /test/bucket2 (id, devicename, deviceoid, ip, perfstructure, desc) values (?, ?, ?, ?, ?, ?)")
  162. if err != nil {
  163. return err
  164. }
  165. defer fmt.Println()
  166. var wg sync.WaitGroup
  167. ch := make(chan []interface{}, 20)
  168. var count int64
  169. go func() {
  170. for count <= int64(num) {
  171. fmt.Printf("\r%d/%d", count, num)
  172. time.Sleep(time.Millisecond * 500)
  173. }
  174. }()
  175. for i := 0; i < 20; i++ {
  176. go func() {
  177. for v := range ch {
  178. if _, _, err = ps.Exec(v...); err != nil {
  179. if firstErr == "" {
  180. firstErr = err.Error()
  181. }
  182. } else {
  183. atomic.AddInt64(&count, 1)
  184. }
  185. wg.Done()
  186. }
  187. }()
  188. }
  189. wg.Add(num)
  190. for i := 0; i < num; i++ {
  191. no := strconv.FormatInt(int64(i+1), 10)
  192. ch <- []interface{}{"ID" + no, "DN" + no, "OID" + no, "10.0.0.1", "PERFSTRUCTURE" + no, "DESC" + no}
  193. }
  194. wg.Wait()
  195. close(ch)
  196. time.Sleep(time.Second)
  197. return nil
  198. }
  199. func prepareMQL(num int) [][]interface{} {
  200. t := time.Now().Format("2006-01-02 15:04:05")
  201. data := make([][]interface{}, num)
  202. paramsSize := 20
  203. params := make([]interface{}, paramsSize)
  204. for i := 0; i < paramsSize; i++ {
  205. params[i] = i + 1
  206. }
  207. for i := 0; i < num; i++ {
  208. no := strconv.FormatInt(int64(i+1), 10)
  209. d := []interface{}{"ID" + no, params, t}
  210. data[i] = d
  211. }
  212. return data
  213. }
  214. /*
  215. create class if not exists /test/bucket1 (
  216. host varchar,
  217. real map<varchar,varchar>,
  218. bk1 bucket {
  219. "type" : "tsdb",
  220. "size" : 10
  221. } 'simple',
  222. bk2 bucket {
  223. "type" : "tsdb",
  224. "param" : ["iparam1","iparam2","fparam1","fparam2","sparam1"],
  225. "ptype" : ["i","i","f","f","s"],
  226. "unit" : ["%","m","ms","ns",""],
  227. "precision":[0,0,3,4,0],
  228. "ttl" : 365
  229. } 'full',
  230. keys(host),
  231. indexes(host)
  232. );
  233. create class if not exists /test/bucket2 (
  234. devicename varchar '设备名称',
  235. deviceoid varchar 'OID',
  236. ip varchar 'ip',
  237. perfstructure varchar '性能结构描述',
  238. desc varchar '设备描述',
  239. values bucket {
  240. "type": "tsdb",
  241. "params": [
  242. ["${1-5}","int","","${desc[0-4]}"],
  243. ["${6-9,11-12,15-16,19-20}","int","MB","${desc[5-8,10-11,14-15,18-19]}"],
  244. ["10","int","%","${desc[9]}"],
  245. ["${13-14,17-18}","int","KB/s","${desc[12-13,16-17]}"]
  246. ],
  247. "desc": ["在线点","及早期火警","及早期预警","DO1","DO2","系统分区总容量","系统分区剩余量","数据分区总容量","数据分区剩余量","CUP使用率","内存总容量","内存剩余量","网卡1上行速度","网卡1下行速度","网卡1上行总量","网卡1下行总量","网卡2上行速度","网卡2下行速度","网卡2上行总量","网卡2下行总量"],
  248. "ttl": 365
  249. } 'full',
  250. keys(devicename,deviceoid,ip),
  251. indexes(devicename,deviceoid,ip,perfstructure,desc)
  252. );
  253. */