testbucketlog.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "os"
  8. "runtime/pprof"
  9. "strconv"
  10. "sync"
  11. "time"
  12. "git.wecise.com/wecise/common/mnats"
  13. "git.wecise.com/wecise/odbserver/odb"
  14. "gitee.com/wecisecode/util/logger"
  15. )
  16. const (
  17. logFileName = "testbucketlog.log"
  18. )
  19. var (
  20. isCreateLogObject = flag.Bool("o", false, "Create log object data.")
  21. isSendLogParser = flag.Bool("p", false, "Send mql to cassandra service of parser.")
  22. isInsert = flag.Bool("i", false, "Insert mql to odb.")
  23. dsize = flag.Int("d", 5000, "Data size.")
  24. bsize = flag.Int("b", 200, "Bucket size.")
  25. wsize = flag.Int("w", 40, "Worker size.")
  26. isCreateLog = flag.Bool("c", false, "Create test log file.")
  27. )
  28. func main() {
  29. logger.SetConsole(false)
  30. flag.Parse()
  31. g, err := odb.New(&odb.Option{Cache: odb.CacheAll})
  32. if err != nil {
  33. fmt.Println("New odb error:", err)
  34. os.Exit(1)
  35. }
  36. if *isCreateLog {
  37. fmt.Println("Create log file", logFileName)
  38. if err = createLog(); err != nil {
  39. fmt.Println("Create log file error:", err)
  40. os.Exit(1)
  41. }
  42. }
  43. file, err := os.Open(logFileName)
  44. if err != nil {
  45. fmt.Printf("Open log file %s error: %v\n", logFileName, err)
  46. os.Exit(1)
  47. }
  48. dataSize := *dsize
  49. bucketSize := *bsize
  50. workerSize := *wsize
  51. if *isCreateLogObject {
  52. fmt.Println("Create log bucket object.")
  53. if err = createLogObject(1, g); err != nil {
  54. fmt.Println("Create object error:", err)
  55. os.Exit(2)
  56. }
  57. } else {
  58. fmt.Println("Ignore creating object.")
  59. }
  60. fmt.Println("Prepare data size:", dataSize)
  61. data := prepareLogMQL(dataSize, bucketSize, file)
  62. var wg sync.WaitGroup
  63. wg.Add(len(data))
  64. ch := make(chan []interface{}, workerSize)
  65. statCh := make(chan int, workerSize)
  66. stopCh := make(chan bool, 1)
  67. conn, err := mnats.Get()
  68. if err != nil {
  69. fmt.Println(err)
  70. os.Exit(5)
  71. }
  72. // stats
  73. go func() {
  74. var (
  75. total, success, fail, bts int64
  76. speed, bspeed float64
  77. )
  78. ticker := time.NewTicker(time.Second)
  79. start := time.Now()
  80. L:
  81. for {
  82. select {
  83. case <-stopCh:
  84. break L
  85. case n := <-statCh:
  86. total++
  87. if n > -1 {
  88. success++
  89. if n > 0 {
  90. bts += int64(n)
  91. }
  92. } else {
  93. fail++
  94. }
  95. case <-ticker.C:
  96. seconds := time.Now().Sub(start).Seconds()
  97. speed = float64(total) / seconds
  98. bspeed = float64(bts) / seconds
  99. fmt.Printf("\rtotal:%d, success:%d, fail:%d, speed:%.2f/s %.2fB/s, total time: %.2fs", total, success, fail, speed, bspeed, time.Now().Sub(start).Seconds())
  100. }
  101. }
  102. fmt.Println()
  103. }()
  104. // cpu pprof
  105. pf, err := os.Create("testbucketlog_cpu.pprof")
  106. if err != nil {
  107. fmt.Println(err)
  108. os.Exit(3)
  109. }
  110. defer pf.Close()
  111. if err = pprof.StartCPUProfile(pf); err != nil {
  112. fmt.Println(err)
  113. os.Exit(3)
  114. }
  115. defer pprof.StopCPUProfile()
  116. // mem pprof
  117. go func() {
  118. ticker := time.NewTicker(time.Second * 5)
  119. L:
  120. for {
  121. select {
  122. case <-stopCh:
  123. break L
  124. case <-ticker.C:
  125. func() {
  126. pmf, err := os.OpenFile("testbucketlog_mem.pprof", os.O_CREATE|os.O_TRUNC|os.O_RDWR, os.ModePerm)
  127. if err != nil {
  128. fmt.Println(err)
  129. os.Exit(4)
  130. }
  131. defer pmf.Close()
  132. if err = pprof.WriteHeapProfile(pmf); err != nil {
  133. fmt.Println(err)
  134. os.Exit(4)
  135. }
  136. }()
  137. }
  138. }
  139. }()
  140. // mql worker
  141. insertMql := `insert into /test_bitlog (id, logs["/opt/matrix/var/test/testbucketlog.log"]) values (?, ?)`
  142. for i := 0; i < workerSize; i++ {
  143. go func() {
  144. ps, err := g.Prepare(insertMql)
  145. if err != nil {
  146. fmt.Println(err)
  147. os.Exit(5)
  148. }
  149. m := map[string]interface{}{
  150. "mql": insertMql,
  151. }
  152. for d := range ch {
  153. if *isSendLogParser {
  154. m["values"] = d[:len(d)-1]
  155. b, _ := json.Marshal(m)
  156. if err = conn.Publish("queue.cassandra.query.matrix", b); err != nil {
  157. statCh <- -1
  158. } else {
  159. statCh <- d[len(d)-1].(int)
  160. }
  161. } else {
  162. if *isInsert {
  163. if _, _, err := ps.Exec(d[:len(d)-1]...); err != nil {
  164. statCh <- -1
  165. } else {
  166. statCh <- d[len(d)-1].(int)
  167. }
  168. } else {
  169. statCh <- d[len(d)-1].(int)
  170. }
  171. }
  172. wg.Done()
  173. }
  174. }()
  175. }
  176. // mql sender
  177. fmt.Println("Send data to worker.")
  178. for i := range data {
  179. ch <- data[i]
  180. }
  181. wg.Wait()
  182. time.Sleep(time.Second)
  183. close(stopCh)
  184. }
  185. func createLogObject(num int, g *odb.Gutil) error {
  186. ps, err := g.Prepare("insert into /test_bitlog (id, host) values (?, ?)")
  187. if err != nil {
  188. return err
  189. }
  190. defer fmt.Println()
  191. for i := 0; i < num; i++ {
  192. no := strconv.FormatInt(int64(i+1), 10)
  193. if _, _, err = ps.Exec("ID"+no, "HOST"+no); err != nil {
  194. return err
  195. }
  196. fmt.Printf("\r%d/%d", i+1, num)
  197. }
  198. return nil
  199. }
  200. func prepareLogMQL(num, logSize int, file *os.File) [][]interface{} {
  201. data := make([][]interface{}, num)
  202. r := bufio.NewReader(file)
  203. var lines string
  204. var linesCount int
  205. rowNum := 0
  206. d := []interface{}{"ID1", nil, 0}
  207. dataCount := 0
  208. dBytesSize := 0
  209. for rowNum < num*logSize {
  210. line, _, err := r.ReadLine()
  211. if err != nil {
  212. break
  213. }
  214. if lines == "" {
  215. lines = string(line)
  216. } else {
  217. lines += "\n" + string(line)
  218. }
  219. linesCount++
  220. dBytesSize += len(line)
  221. if linesCount == logSize {
  222. d[1] = []string{lines}
  223. d[2] = dBytesSize
  224. v := make([]interface{}, 3)
  225. copy(v, d)
  226. data[dataCount] = v
  227. lines = ""
  228. linesCount = 0
  229. d[2] = 0
  230. dBytesSize = 0
  231. dataCount++
  232. }
  233. rowNum++
  234. }
  235. return data
  236. }
  237. func createLog() error {
  238. file, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
  239. if err != nil {
  240. return err
  241. }
  242. defer file.Close()
  243. start := time.Now().Add(-30 * 24 * time.Hour)
  244. for i := 0; i < (*dsize)*(*bsize); i++ {
  245. t := start.Add(time.Second)
  246. _, err = file.WriteString(fmt.Sprintf(`WARN [ReadStage-%d] %s BooleanCondition.java:78 - Performing resource-intensive pure negation query BooleanCondition{boost=null, must=[], should=[], not=[RangeCondition{boost=null, field=map_varchar_varchar_2$group, lower=null, upper=null, includeLower=false, includeUpper=false, docValues=false}]}
  247. `, i, t.Format("2006-01-02 15:04:05,000")))
  248. if err != nil {
  249. return err
  250. }
  251. start = t
  252. }
  253. return nil
  254. }
  255. /*
  256. create class if not exists /test_bitlog (
  257. host varchar,
  258. logs bucket {
  259. "type" : "bitlog",
  260. "files" : ["*.log"],
  261. "pattern" : ["YYYY-MM-DD HH:mm:ss,SSS"],
  262. "ttl": 90
  263. } 'simple',
  264. keys(host),
  265. indexes(host)
  266. );
  267. */