util.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/nats-io/nats.go"
  5. "runtime"
  6. "strings"
  7. )
  8. func newConn(svr string) (*nats.Conn, error) {
  9. options := nats.GetDefaultOptions()
  10. //options.Url = nats.DefaultURL
  11. _, file, _, _ := runtime.Caller(0)
  12. fileDirs := strings.Split(file, "/")
  13. name := strings.Split(fileDirs[len(fileDirs) - 1], ".")[0]
  14. options.Name = name
  15. servers := strings.Split(svr, ",")
  16. options.Servers = servers
  17. options.MaxReconnect = nats.DefaultMaxReconnect
  18. options.ReconnectWait = nats.DefaultReconnectWait
  19. options.Timeout = nats.DefaultTimeout
  20. if conn, err := options.Connect(); err != nil {
  21. return nil, err
  22. } else {
  23. return conn, nil
  24. }
  25. }
  26. func newStream(streamName string, conn *nats.Conn) (js nats.JetStreamContext, err error) {
  27. js, err = conn.JetStream()
  28. if err != nil {
  29. return nil, fmt.Errorf("new jetstream error: %v", err)
  30. }
  31. stream, _ := js.StreamInfo(streamName)
  32. if stream == nil {
  33. _, err := js.AddStream(&nats.StreamConfig{
  34. Name: streamName,
  35. Subjects: []string{streamName + ".>"},
  36. })
  37. if err != nil {
  38. return nil, fmt.Errorf("new stream '%s' error: %v", streamName, err)
  39. }
  40. }
  41. return js, nil
  42. }