natsreceive.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/nats-io/nats.go"
  6. lua "github.com/yuin/gopher-lua"
  7. "log"
  8. "os"
  9. "runtime/debug"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. func main() {
  15. isStream := flag.Bool("stream", true, "Nats streaming mode")
  16. stream := flag.String("stream", "TEST", "Nats Jetstream name")
  17. subject := flag.String("subject", "", "Send to subject")
  18. poolsize := flag.Int("poolsize", 1, "Consumer pool size")
  19. worksize := flag.Int("worksize", 1, "Worker pool size")
  20. server := flag.String("server", "nats://user:user@127.0.0.1:4222", "Nats server")
  21. nolua := flag.Bool("nolua", false, "Do not run lua")
  22. flag.Parse()
  23. log.SetFlags(log.LstdFlags | log.Lshortfile)
  24. fmt.Printf("Consumer pool size: %d\n", *poolsize)
  25. fmt.Printf("Worker pool size: %d\n", *worksize)
  26. conn, err := newConn(*server)
  27. if err != nil {
  28. log.Fatal(err)
  29. }
  30. var js nats.JetStreamContext
  31. sch := make(chan int, *worksize)
  32. wch := make(chan string, *worksize)
  33. var mu sync.RWMutex
  34. testTask := time.NewTicker(time.Minute)
  35. testTask2 := time.NewTicker(time.Minute)
  36. // workers
  37. for i := 0; i < *worksize; i++ {
  38. go func() {
  39. ls := lua.NewState()
  40. for {
  41. select {
  42. case s := <-wch:
  43. func() {
  44. defer func() {
  45. if r := recover();r != nil {
  46. fmt.Println(string(debug.Stack()))
  47. }
  48. }()
  49. mu.RLock()
  50. defer mu.RUnlock()
  51. if !*nolua {
  52. luaDo(ls)
  53. }
  54. switch s {
  55. case "start":
  56. sch <- -1
  57. case "over":
  58. sch <- -2
  59. default:
  60. sch <- len(s)
  61. }
  62. }()
  63. case <-testTask.C:
  64. // Don't do anything
  65. case <-testTask2.C:
  66. // Don't do anything
  67. }
  68. }
  69. }()
  70. }
  71. // cosumers
  72. for i := 0; i < *poolsize; i++ {
  73. if *isStream {
  74. if !strings.HasPrefix(*subject, *stream + ".") {
  75. log.Fatalf("stream '%s' not match subject '%s'", *stream, *subject)
  76. }
  77. if js == nil {
  78. js, err = newStream(*stream, conn)
  79. if err != nil {
  80. log.Fatal(err)
  81. }
  82. }
  83. _, err = js.QueueSubscribe(*subject, "queue1", func(msg *nats.Msg) {
  84. wch <- string(msg.Data)
  85. //if msg.Reply != "" {
  86. // _, _ = sc.PublishAsync(msg.Reply, []byte("ok"), func(_ string, _ error) {})
  87. //}
  88. }, nats.Durable(*subject))
  89. if err != nil {
  90. log.Fatal(err)
  91. }
  92. } else {
  93. _, err = conn.QueueSubscribe(*subject, "queue1", func(msg *nats.Msg) {
  94. wch <- string(msg.Data)
  95. if msg.Reply != "" {
  96. _ = conn.Publish(msg.Reply, []byte("ok"))
  97. }
  98. })
  99. if err != nil {
  100. log.Fatal(err)
  101. }
  102. }
  103. }
  104. var (
  105. n int
  106. bs int64
  107. last = -1
  108. )
  109. ticker := time.NewTicker(time.Second)
  110. var start time.Time
  111. for {
  112. select {
  113. case v := <-sch:
  114. if start.IsZero() {
  115. start = time.Now()
  116. n = 0
  117. bs = 0
  118. }
  119. n++
  120. bs += int64(v)
  121. case <-ticker.C:
  122. if last != -1 && last == n {
  123. start = time.Time{}
  124. fmt.Println()
  125. os.Exit(0)
  126. }
  127. if !start.IsZero() {
  128. dur := time.Now().Sub(start).Seconds()
  129. fmt.Printf("\rStarted with %s: %4.2f msgs/sec ~ %4.2f MB/sec (%d msgs) ", start.Format("15:04:05"), float64(n)/dur, float64(bs)/1048576/dur, n)
  130. last = n
  131. }
  132. }
  133. }
  134. }
  135. func luaDo(ls *lua.LState) {
  136. luascript := `
  137. function increase(a, b)
  138. return a + b
  139. end
  140. local tb1 = {}
  141. table.insert(tb1, "delta")
  142. table.insert(tb1, "delta2")
  143. table.insert(tb1, "delta3")
  144. table.insert(tb1, "delta4")
  145. if #tb1 >= 4 then
  146. --print("table size:", #tb1)
  147. end
  148. local sum = increase(1, 2)
  149. --print("increase:", "1 + 2 = " .. increase(1, 2))
  150. `
  151. if err := ls.DoString(luascript); err != nil {
  152. log.Fatal(err)
  153. }
  154. }