package main import ( "encoding/csv" "encoding/json" "flag" "fmt" "log" "os" "runtime" "strconv" "strings" "sync" "time" "git.wecise.com/wecise/util/cast" "github.com/nats-io/nats.go" ) func main() { stream := flag.String("stream", "TEST", "Nats Jetstream name") subject := flag.String("subject", "", "Send to subject") request := flag.Bool("request", false, "Send to subject by request mode") server := flag.String("server", "nats://user:user@127.0.0.1:4222", "Nats server") dataSize := flag.Int("size", 20000, "Data size") msgSize := flag.Int("msgsize", 0, "Message size") isJson := flag.Bool("json", false, "Json data") isStream := flag.Bool("stream", false, "Nats streaming mode to send") flag.Parse() log.SetFlags(log.LstdFlags | log.Lshortfile) var ( file *os.File err error writer *csv.Writer ) if *subject == "" { file, err = os.OpenFile("test_perf_tsdb.csv", os.O_CREATE|os.O_TRUNC|os.O_RDWR, os.ModePerm) if err != nil { log.Fatal(err) } defer file.Close() writer = csv.NewWriter(file) } size := *dataSize var supplementStr = "" if *msgSize != 0 { for i := 0; i < *msgSize; i++ { supplementStr += "A" } } // Create data // testhost: test, id: 12864814057330053911 now := time.Now().Add(time.Second * time.Duration(size+1) * -1) data := make([][]string, size) for i := 0; i < size; i++ { n := i + 1 a := []string{"12864814057330053911", cast.ToString(now.Unix() * 1000), strconv.Itoa(n), strconv.Itoa(n), cast.ToString(float64(n) + 0.123), cast.ToString(float64(n) + 0.123), cast.ToString(n) + "ABCD", cast.ToString(n) + "ABCD" + supplementStr} data[i] = a now = now.Add(time.Second) } msgsize := 0 for _, s := range data[0] { msgsize += len(s) } fmt.Printf("Create data finished. msgsize=%d\n", msgsize) // Start stat ch := make(chan int, runtime.GOMAXPROCS(0)) defer close(ch) go func() { var ( n int bs int64 ) ticker := time.NewTicker(time.Second) start := time.Now() L: for { select { case v := <-ch: n++ bs += int64(v) case <-ticker.C: dur := time.Now().Sub(start).Seconds() fmt.Printf("\r%d/%d %4.2f msgs/sec ~ %4.2f MB/sec", n, size, float64(n)/dur, float64(bs)/1048576/dur) if n == size { break L } } } fmt.Println() }() // Start sender var wg sync.WaitGroup wg.Add(size) sendPool := runtime.GOMAXPROCS(0) fmt.Println("Sender size:", sendPool) sendCh := make(chan []string, sendPool) var writerMu sync.Mutex for i := 0; i < sendPool; i++ { conn, err := newConn(*server) if err != nil { log.Fatal(err) } var js nats.JetStreamContext if *isStream { if !strings.HasPrefix(*subject, *stream+".") { log.Fatalf("stream '%s' not match subject '%s'", *stream, *subject) } js, err = newStream(*stream, conn) if err != nil { log.Fatal(err) } } go func() { for a := range sendCh { var bs int if *subject == "" { writerMu.Lock() if *isJson { // json b, _ := json.Marshal(map[string]interface{}{"ID": a[0], "TIME": a[1], "P1": a[2], "P2": a[3], "P3": a[4], "P4": a[5], "P5": a[6], "P6": a[7]}) if _, err := file.WriteString(string(b) + "\n"); err != nil { log.Fatal(err) } } else { // csv if err := writer.Write(a); err != nil { log.Fatal(err) } } writerMu.Unlock() } else { if *request { if *isJson { b, _ := json.Marshal(map[string]interface{}{"ID": a[0], "TIME": a[1], "P1": a[2], "P2": a[3], "P3": a[4], "P4": a[5], "P5": a[6], "P6": a[7]}) _, err = conn.Request(*subject, b, time.Second) if err != nil { log.Fatal(err) } bs = len(b) } else { b := []byte(strings.Join(a, ",")) _, err = conn.Request(*subject, b, time.Second) if err != nil { log.Fatal(err) } bs = len(b) } } else { if *isJson { b, _ := json.Marshal(map[string]interface{}{"ID": a[0], "TIME": a[1], "P1": a[2], "P2": a[3], "P3": a[4], "P4": a[5], "P5": a[6], "P6": a[7]}) if *isStream { _, err = js.PublishAsync(*subject, b) } else { err = conn.Publish(*subject, b) } if err != nil { log.Fatal(err) } bs = len(b) } else { b := []byte(strings.Join(a, ",")) if *isStream { _, err = js.PublishAsync(*subject, b) } else { err = conn.Publish(*subject, b) } if err != nil { log.Fatal(err) } bs = len(b) } } } ch <- bs wg.Done() } }() } //if *subject != "" { // if *isStream { // _, err = sc.PublishAsync(*subject, []byte("start"), func(_ string, _ error) {}) // } else { // err = conn.Publish(*subject, []byte("start")) // } // if err != nil { // log.Fatal(err) // } //} for _, a := range data { sendCh <- a } wg.Wait() if writer != nil { writer.Flush() } //if *subject != "" { // if *isStream { // _, err = sc.PublishAsync(*subject, []byte("over"), func(_ string, _ error) {}) // } else { // err = conn.Publish(*subject, []byte("over")) // } // if err != nil { // log.Fatal(err) // } //} time.Sleep(time.Second) }