| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- package main
- import (
- "encoding/csv"
- "encoding/json"
- "flag"
- "fmt"
- "log"
- "os"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "time"
- "git.wecise.com/wecise/util/cast"
- "github.com/nats-io/nats.go"
- )
- func main() {
- stream := flag.String("stream", "TEST", "Nats Jetstream name")
- subject := flag.String("subject", "", "Send to subject")
- request := flag.Bool("request", false, "Send to subject by request mode")
- server := flag.String("server", "nats://user:user@127.0.0.1:4222", "Nats server")
- dataSize := flag.Int("size", 20000, "Data size")
- msgSize := flag.Int("msgsize", 0, "Message size")
- isJson := flag.Bool("json", false, "Json data")
- isStream := flag.Bool("stream", false, "Nats streaming mode to send")
- flag.Parse()
- log.SetFlags(log.LstdFlags | log.Lshortfile)
- var (
- file *os.File
- err error
- writer *csv.Writer
- )
- if *subject == "" {
- file, err = os.OpenFile("test_perf_tsdb.csv", os.O_CREATE|os.O_TRUNC|os.O_RDWR, os.ModePerm)
- if err != nil {
- log.Fatal(err)
- }
- defer file.Close()
- writer = csv.NewWriter(file)
- }
- size := *dataSize
- var supplementStr = ""
- if *msgSize != 0 {
- for i := 0; i < *msgSize; i++ {
- supplementStr += "A"
- }
- }
- // Create data
- // testhost: test, id: 12864814057330053911
- now := time.Now().Add(time.Second * time.Duration(size+1) * -1)
- data := make([][]string, size)
- for i := 0; i < size; i++ {
- n := i + 1
- a := []string{"12864814057330053911", cast.ToString(now.Unix() * 1000), strconv.Itoa(n), strconv.Itoa(n), cast.ToString(float64(n) + 0.123), cast.ToString(float64(n) + 0.123), cast.ToString(n) + "ABCD", cast.ToString(n) + "ABCD" + supplementStr}
- data[i] = a
- now = now.Add(time.Second)
- }
- msgsize := 0
- for _, s := range data[0] {
- msgsize += len(s)
- }
- fmt.Printf("Create data finished. msgsize=%d\n", msgsize)
- // Start stat
- ch := make(chan int, runtime.GOMAXPROCS(0))
- defer close(ch)
- go func() {
- var (
- n int
- bs int64
- )
- ticker := time.NewTicker(time.Second)
- start := time.Now()
- L:
- for {
- select {
- case v := <-ch:
- n++
- bs += int64(v)
- case <-ticker.C:
- dur := time.Now().Sub(start).Seconds()
- fmt.Printf("\r%d/%d %4.2f msgs/sec ~ %4.2f MB/sec", n, size, float64(n)/dur, float64(bs)/1048576/dur)
- if n == size {
- break L
- }
- }
- }
- fmt.Println()
- }()
- // Start sender
- var wg sync.WaitGroup
- wg.Add(size)
- sendPool := runtime.GOMAXPROCS(0)
- fmt.Println("Sender size:", sendPool)
- sendCh := make(chan []string, sendPool)
- var writerMu sync.Mutex
- for i := 0; i < sendPool; i++ {
- conn, err := newConn(*server)
- if err != nil {
- log.Fatal(err)
- }
- var js nats.JetStreamContext
- if *isStream {
- if !strings.HasPrefix(*subject, *stream+".") {
- log.Fatalf("stream '%s' not match subject '%s'", *stream, *subject)
- }
- js, err = newStream(*stream, conn)
- if err != nil {
- log.Fatal(err)
- }
- }
- go func() {
- for a := range sendCh {
- var bs int
- if *subject == "" {
- writerMu.Lock()
- if *isJson { // json
- b, _ := json.Marshal(map[string]interface{}{"ID": a[0], "TIME": a[1], "P1": a[2], "P2": a[3], "P3": a[4], "P4": a[5], "P5": a[6], "P6": a[7]})
- if _, err := file.WriteString(string(b) + "\n"); err != nil {
- log.Fatal(err)
- }
- } else { // csv
- if err := writer.Write(a); err != nil {
- log.Fatal(err)
- }
- }
- writerMu.Unlock()
- } else {
- if *request {
- if *isJson {
- b, _ := json.Marshal(map[string]interface{}{"ID": a[0], "TIME": a[1], "P1": a[2], "P2": a[3], "P3": a[4], "P4": a[5], "P5": a[6], "P6": a[7]})
- _, err = conn.Request(*subject, b, time.Second)
- if err != nil {
- log.Fatal(err)
- }
- bs = len(b)
- } else {
- b := []byte(strings.Join(a, ","))
- _, err = conn.Request(*subject, b, time.Second)
- if err != nil {
- log.Fatal(err)
- }
- bs = len(b)
- }
- } else {
- if *isJson {
- b, _ := json.Marshal(map[string]interface{}{"ID": a[0], "TIME": a[1], "P1": a[2], "P2": a[3], "P3": a[4], "P4": a[5], "P5": a[6], "P6": a[7]})
- if *isStream {
- _, err = js.PublishAsync(*subject, b)
- } else {
- err = conn.Publish(*subject, b)
- }
- if err != nil {
- log.Fatal(err)
- }
- bs = len(b)
- } else {
- b := []byte(strings.Join(a, ","))
- if *isStream {
- _, err = js.PublishAsync(*subject, b)
- } else {
- err = conn.Publish(*subject, b)
- }
- if err != nil {
- log.Fatal(err)
- }
- bs = len(b)
- }
- }
- }
- ch <- bs
- wg.Done()
- }
- }()
- }
- //if *subject != "" {
- // if *isStream {
- // _, err = sc.PublishAsync(*subject, []byte("start"), func(_ string, _ error) {})
- // } else {
- // err = conn.Publish(*subject, []byte("start"))
- // }
- // if err != nil {
- // log.Fatal(err)
- // }
- //}
- for _, a := range data {
- sendCh <- a
- }
- wg.Wait()
- if writer != nil {
- writer.Flush()
- }
- //if *subject != "" {
- // if *isStream {
- // _, err = sc.PublishAsync(*subject, []byte("over"), func(_ string, _ error) {})
- // } else {
- // err = conn.Publish(*subject, []byte("over"))
- // }
- // if err != nil {
- // log.Fatal(err)
- // }
- //}
- time.Sleep(time.Second)
- }
|