mqls_sub.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package mql
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. odb "git.wecise.com/wecise/odb-go/odb"
  9. )
  10. var subscriptionsmutex sync.Mutex
  11. var subscriptions = map[string]odb.Subscription{}
  12. var receivedcount = map[string]int{}
  13. func subscribe(topics ...string) {
  14. subscriptionsmutex.Lock()
  15. defer subscriptionsmutex.Unlock()
  16. for _, topic := range topics {
  17. topic = strings.TrimSpace(topic)
  18. if subscriptions[topic] == nil {
  19. sub := ODBC.Subscribe(context.Background(), topic)
  20. go func(sub odb.Subscription) {
  21. st := time.Now()
  22. for m := range sub.Channel() {
  23. eds := m.EventData().String()
  24. subscriptionsmutex.Lock()
  25. receivedcount[eds]++
  26. n := receivedcount[eds]
  27. subscriptionsmutex.Unlock()
  28. logger.Info(fmt.Sprint("received ", topic, " notify message: ", eds, " [", n, "] in ", time.Since(st)))
  29. oem, err := m.EventMessage()
  30. if err != nil {
  31. logger.Error("event message error: " + err.Error())
  32. } else {
  33. logger.Infof("event message value: %v", oem)
  34. }
  35. }
  36. }(sub)
  37. subscriptions[topic] = sub
  38. logger.Info("subscribe topic " + topic + " done")
  39. }
  40. }
  41. }
  42. func unsubscribe(topics ...string) {
  43. subscriptionsmutex.Lock()
  44. defer subscriptionsmutex.Unlock()
  45. for _, topic := range topics {
  46. topic = strings.TrimSpace(topic)
  47. if sub := subscriptions[topic]; sub != nil {
  48. if err := sub.Unsubscribe(context.Background()); err != nil {
  49. logger.Error(err)
  50. } else {
  51. delete(subscriptions, topic)
  52. logger.Info("unsubscribe topic " + topic + " done")
  53. }
  54. }
  55. }
  56. }