| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- package main
- import (
- "flag"
- "fmt"
- "github.com/nats-io/nats.go"
- lua "github.com/yuin/gopher-lua"
- "log"
- "os"
- "runtime/debug"
- "strings"
- "sync"
- "time"
- )
- func main() {
- isStream := flag.Bool("stream", true, "Nats streaming mode")
- stream := flag.String("stream", "TEST", "Nats Jetstream name")
- subject := flag.String("subject", "", "Send to subject")
- poolsize := flag.Int("poolsize", 1, "Consumer pool size")
- worksize := flag.Int("worksize", 1, "Worker pool size")
- server := flag.String("server", "nats://user:user@127.0.0.1:4222", "Nats server")
- nolua := flag.Bool("nolua", false, "Do not run lua")
- flag.Parse()
- log.SetFlags(log.LstdFlags | log.Lshortfile)
- fmt.Printf("Consumer pool size: %d\n", *poolsize)
- fmt.Printf("Worker pool size: %d\n", *worksize)
- conn, err := newConn(*server)
- if err != nil {
- log.Fatal(err)
- }
- var js nats.JetStreamContext
- sch := make(chan int, *worksize)
- wch := make(chan string, *worksize)
- var mu sync.RWMutex
- testTask := time.NewTicker(time.Minute)
- testTask2 := time.NewTicker(time.Minute)
- // workers
- for i := 0; i < *worksize; i++ {
- go func() {
- ls := lua.NewState()
- for {
- select {
- case s := <-wch:
- func() {
- defer func() {
- if r := recover();r != nil {
- fmt.Println(string(debug.Stack()))
- }
- }()
- mu.RLock()
- defer mu.RUnlock()
- if !*nolua {
- luaDo(ls)
- }
- switch s {
- case "start":
- sch <- -1
- case "over":
- sch <- -2
- default:
- sch <- len(s)
- }
- }()
- case <-testTask.C:
- // Don't do anything
- case <-testTask2.C:
- // Don't do anything
- }
- }
- }()
- }
- // cosumers
- for i := 0; i < *poolsize; i++ {
- if *isStream {
- if !strings.HasPrefix(*subject, *stream + ".") {
- log.Fatalf("stream '%s' not match subject '%s'", *stream, *subject)
- }
- if js == nil {
- js, err = newStream(*stream, conn)
- if err != nil {
- log.Fatal(err)
- }
- }
- _, err = js.QueueSubscribe(*subject, "queue1", func(msg *nats.Msg) {
- wch <- string(msg.Data)
- //if msg.Reply != "" {
- // _, _ = sc.PublishAsync(msg.Reply, []byte("ok"), func(_ string, _ error) {})
- //}
- }, nats.Durable(*subject))
- if err != nil {
- log.Fatal(err)
- }
- } else {
- _, err = conn.QueueSubscribe(*subject, "queue1", func(msg *nats.Msg) {
- wch <- string(msg.Data)
- if msg.Reply != "" {
- _ = conn.Publish(msg.Reply, []byte("ok"))
- }
- })
- if err != nil {
- log.Fatal(err)
- }
- }
- }
- var (
- n int
- bs int64
- last = -1
- )
- ticker := time.NewTicker(time.Second)
- var start time.Time
- for {
- select {
- case v := <-sch:
- if start.IsZero() {
- start = time.Now()
- n = 0
- bs = 0
- }
- n++
- bs += int64(v)
- case <-ticker.C:
- if last != -1 && last == n {
- start = time.Time{}
- fmt.Println()
- os.Exit(0)
- }
- if !start.IsZero() {
- dur := time.Now().Sub(start).Seconds()
- fmt.Printf("\rStarted with %s: %4.2f msgs/sec ~ %4.2f MB/sec (%d msgs) ", start.Format("15:04:05"), float64(n)/dur, float64(bs)/1048576/dur, n)
- last = n
- }
- }
- }
- }
- func luaDo(ls *lua.LState) {
- luascript := `
- function increase(a, b)
- return a + b
- end
- local tb1 = {}
- table.insert(tb1, "delta")
- table.insert(tb1, "delta2")
- table.insert(tb1, "delta3")
- table.insert(tb1, "delta4")
- if #tb1 >= 4 then
- --print("table size:", #tb1)
- end
- local sum = increase(1, 2)
- --print("increase:", "1 + 2 = " .. increase(1, 2))
- `
- if err := ls.DoString(luascript); err != nil {
- log.Fatal(err)
- }
- }
|