package main import ( "flag" "fmt" "github.com/nats-io/nats.go" lua "github.com/yuin/gopher-lua" "log" "os" "runtime/debug" "strings" "sync" "time" ) func main() { isStream := flag.Bool("stream", true, "Nats streaming mode") stream := flag.String("stream", "TEST", "Nats Jetstream name") subject := flag.String("subject", "", "Send to subject") poolsize := flag.Int("poolsize", 1, "Consumer pool size") worksize := flag.Int("worksize", 1, "Worker pool size") server := flag.String("server", "nats://user:user@127.0.0.1:4222", "Nats server") nolua := flag.Bool("nolua", false, "Do not run lua") flag.Parse() log.SetFlags(log.LstdFlags | log.Lshortfile) fmt.Printf("Consumer pool size: %d\n", *poolsize) fmt.Printf("Worker pool size: %d\n", *worksize) conn, err := newConn(*server) if err != nil { log.Fatal(err) } var js nats.JetStreamContext sch := make(chan int, *worksize) wch := make(chan string, *worksize) var mu sync.RWMutex testTask := time.NewTicker(time.Minute) testTask2 := time.NewTicker(time.Minute) // workers for i := 0; i < *worksize; i++ { go func() { ls := lua.NewState() for { select { case s := <-wch: func() { defer func() { if r := recover();r != nil { fmt.Println(string(debug.Stack())) } }() mu.RLock() defer mu.RUnlock() if !*nolua { luaDo(ls) } switch s { case "start": sch <- -1 case "over": sch <- -2 default: sch <- len(s) } }() case <-testTask.C: // Don't do anything case <-testTask2.C: // Don't do anything } } }() } // cosumers for i := 0; i < *poolsize; i++ { if *isStream { if !strings.HasPrefix(*subject, *stream + ".") { log.Fatalf("stream '%s' not match subject '%s'", *stream, *subject) } if js == nil { js, err = newStream(*stream, conn) if err != nil { log.Fatal(err) } } _, err = js.QueueSubscribe(*subject, "queue1", func(msg *nats.Msg) { wch <- string(msg.Data) //if msg.Reply != "" { // _, _ = sc.PublishAsync(msg.Reply, []byte("ok"), func(_ string, _ error) {}) //} }, nats.Durable(*subject)) if err != nil { log.Fatal(err) } } else { _, err = conn.QueueSubscribe(*subject, "queue1", func(msg *nats.Msg) { wch <- string(msg.Data) if msg.Reply != "" { _ = conn.Publish(msg.Reply, []byte("ok")) } }) if err != nil { log.Fatal(err) } } } var ( n int bs int64 last = -1 ) ticker := time.NewTicker(time.Second) var start time.Time for { select { case v := <-sch: if start.IsZero() { start = time.Now() n = 0 bs = 0 } n++ bs += int64(v) case <-ticker.C: if last != -1 && last == n { start = time.Time{} fmt.Println() os.Exit(0) } if !start.IsZero() { dur := time.Now().Sub(start).Seconds() fmt.Printf("\rStarted with %s: %4.2f msgs/sec ~ %4.2f MB/sec (%d msgs) ", start.Format("15:04:05"), float64(n)/dur, float64(bs)/1048576/dur, n) last = n } } } } func luaDo(ls *lua.LState) { luascript := ` function increase(a, b) return a + b end local tb1 = {} table.insert(tb1, "delta") table.insert(tb1, "delta2") table.insert(tb1, "delta3") table.insert(tb1, "delta4") if #tb1 >= 4 then --print("table size:", #tb1) end local sum = increase(1, 2) --print("increase:", "1 + 2 = " .. increase(1, 2)) ` if err := ls.DoString(luascript); err != nil { log.Fatal(err) } }