createtsdb.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package main
  2. import (
  3. "encoding/csv"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "log"
  8. "os"
  9. "runtime"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "git.wecise.com/wecise/util/cast"
  15. "github.com/nats-io/nats.go"
  16. )
  17. func main() {
  18. stream := flag.String("stream", "TEST", "Nats Jetstream name")
  19. subject := flag.String("subject", "", "Send to subject")
  20. request := flag.Bool("request", false, "Send to subject by request mode")
  21. server := flag.String("server", "nats://user:user@127.0.0.1:4222", "Nats server")
  22. dataSize := flag.Int("size", 20000, "Data size")
  23. msgSize := flag.Int("msgsize", 0, "Message size")
  24. isJson := flag.Bool("json", false, "Json data")
  25. isStream := flag.Bool("stream", false, "Nats streaming mode to send")
  26. flag.Parse()
  27. log.SetFlags(log.LstdFlags | log.Lshortfile)
  28. var (
  29. file *os.File
  30. err error
  31. writer *csv.Writer
  32. )
  33. if *subject == "" {
  34. file, err = os.OpenFile("test_perf_tsdb.csv", os.O_CREATE|os.O_TRUNC|os.O_RDWR, os.ModePerm)
  35. if err != nil {
  36. log.Fatal(err)
  37. }
  38. defer file.Close()
  39. writer = csv.NewWriter(file)
  40. }
  41. size := *dataSize
  42. var supplementStr = ""
  43. if *msgSize != 0 {
  44. for i := 0; i < *msgSize; i++ {
  45. supplementStr += "A"
  46. }
  47. }
  48. // Create data
  49. // testhost: test, id: 12864814057330053911
  50. now := time.Now().Add(time.Second * time.Duration(size+1) * -1)
  51. data := make([][]string, size)
  52. for i := 0; i < size; i++ {
  53. n := i + 1
  54. a := []string{"12864814057330053911", cast.ToString(now.Unix() * 1000), strconv.Itoa(n), strconv.Itoa(n), cast.ToString(float64(n) + 0.123), cast.ToString(float64(n) + 0.123), cast.ToString(n) + "ABCD", cast.ToString(n) + "ABCD" + supplementStr}
  55. data[i] = a
  56. now = now.Add(time.Second)
  57. }
  58. msgsize := 0
  59. for _, s := range data[0] {
  60. msgsize += len(s)
  61. }
  62. fmt.Printf("Create data finished. msgsize=%d\n", msgsize)
  63. // Start stat
  64. ch := make(chan int, runtime.GOMAXPROCS(0))
  65. defer close(ch)
  66. go func() {
  67. var (
  68. n int
  69. bs int64
  70. )
  71. ticker := time.NewTicker(time.Second)
  72. start := time.Now()
  73. L:
  74. for {
  75. select {
  76. case v := <-ch:
  77. n++
  78. bs += int64(v)
  79. case <-ticker.C:
  80. dur := time.Now().Sub(start).Seconds()
  81. fmt.Printf("\r%d/%d %4.2f msgs/sec ~ %4.2f MB/sec", n, size, float64(n)/dur, float64(bs)/1048576/dur)
  82. if n == size {
  83. break L
  84. }
  85. }
  86. }
  87. fmt.Println()
  88. }()
  89. // Start sender
  90. var wg sync.WaitGroup
  91. wg.Add(size)
  92. sendPool := runtime.GOMAXPROCS(0)
  93. fmt.Println("Sender size:", sendPool)
  94. sendCh := make(chan []string, sendPool)
  95. var writerMu sync.Mutex
  96. for i := 0; i < sendPool; i++ {
  97. conn, err := newConn(*server)
  98. if err != nil {
  99. log.Fatal(err)
  100. }
  101. var js nats.JetStreamContext
  102. if *isStream {
  103. if !strings.HasPrefix(*subject, *stream+".") {
  104. log.Fatalf("stream '%s' not match subject '%s'", *stream, *subject)
  105. }
  106. js, err = newStream(*stream, conn)
  107. if err != nil {
  108. log.Fatal(err)
  109. }
  110. }
  111. go func() {
  112. for a := range sendCh {
  113. var bs int
  114. if *subject == "" {
  115. writerMu.Lock()
  116. if *isJson { // json
  117. b, _ := json.Marshal(map[string]interface{}{"ID": a[0], "TIME": a[1], "P1": a[2], "P2": a[3], "P3": a[4], "P4": a[5], "P5": a[6], "P6": a[7]})
  118. if _, err := file.WriteString(string(b) + "\n"); err != nil {
  119. log.Fatal(err)
  120. }
  121. } else { // csv
  122. if err := writer.Write(a); err != nil {
  123. log.Fatal(err)
  124. }
  125. }
  126. writerMu.Unlock()
  127. } else {
  128. if *request {
  129. if *isJson {
  130. b, _ := json.Marshal(map[string]interface{}{"ID": a[0], "TIME": a[1], "P1": a[2], "P2": a[3], "P3": a[4], "P4": a[5], "P5": a[6], "P6": a[7]})
  131. _, err = conn.Request(*subject, b, time.Second)
  132. if err != nil {
  133. log.Fatal(err)
  134. }
  135. bs = len(b)
  136. } else {
  137. b := []byte(strings.Join(a, ","))
  138. _, err = conn.Request(*subject, b, time.Second)
  139. if err != nil {
  140. log.Fatal(err)
  141. }
  142. bs = len(b)
  143. }
  144. } else {
  145. if *isJson {
  146. b, _ := json.Marshal(map[string]interface{}{"ID": a[0], "TIME": a[1], "P1": a[2], "P2": a[3], "P3": a[4], "P4": a[5], "P5": a[6], "P6": a[7]})
  147. if *isStream {
  148. _, err = js.PublishAsync(*subject, b)
  149. } else {
  150. err = conn.Publish(*subject, b)
  151. }
  152. if err != nil {
  153. log.Fatal(err)
  154. }
  155. bs = len(b)
  156. } else {
  157. b := []byte(strings.Join(a, ","))
  158. if *isStream {
  159. _, err = js.PublishAsync(*subject, b)
  160. } else {
  161. err = conn.Publish(*subject, b)
  162. }
  163. if err != nil {
  164. log.Fatal(err)
  165. }
  166. bs = len(b)
  167. }
  168. }
  169. }
  170. ch <- bs
  171. wg.Done()
  172. }
  173. }()
  174. }
  175. //if *subject != "" {
  176. // if *isStream {
  177. // _, err = sc.PublishAsync(*subject, []byte("start"), func(_ string, _ error) {})
  178. // } else {
  179. // err = conn.Publish(*subject, []byte("start"))
  180. // }
  181. // if err != nil {
  182. // log.Fatal(err)
  183. // }
  184. //}
  185. for _, a := range data {
  186. sendCh <- a
  187. }
  188. wg.Wait()
  189. if writer != nil {
  190. writer.Flush()
  191. }
  192. //if *subject != "" {
  193. // if *isStream {
  194. // _, err = sc.PublishAsync(*subject, []byte("over"), func(_ string, _ error) {})
  195. // } else {
  196. // err = conn.Publish(*subject, []byte("over"))
  197. // }
  198. // if err != nil {
  199. // log.Fatal(err)
  200. // }
  201. //}
  202. time.Sleep(time.Second)
  203. }