| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- package odbcmql
- import (
- "context"
- "fmt"
- "strings"
- "sync"
- "time"
- odb "git.wecise.com/wecise/odb-go/odb"
- )
- var subscriptionsmutex sync.Mutex
- var subscriptions = map[string]odb.Subscription{}
- var receivedcount = map[string]int{}
- func subscribe(topics ...string) {
- subscriptionsmutex.Lock()
- defer subscriptionsmutex.Unlock()
- for _, topic := range topics {
- topic = strings.TrimSpace(topic)
- if subscriptions[topic] == nil {
- sub := ODBC.Subscribe(context.Background(), topic)
- go func(sub odb.Subscription) {
- st := time.Now()
- for m := range sub.Channel() {
- eds := m.EventData().String()
- subscriptionsmutex.Lock()
- receivedcount[eds]++
- n := receivedcount[eds]
- subscriptionsmutex.Unlock()
- logger.Info(fmt.Sprint("received ", topic, " notify message: ", eds, " [", n, "] in ", time.Since(st)))
- oem, err := m.EventMessage()
- if err != nil {
- logger.Error("event message error: " + err.Error())
- } else {
- logger.Infof("event message value: %v", oem)
- }
- }
- }(sub)
- subscriptions[topic] = sub
- logger.Info("subscribe topic " + topic + " done")
- }
- }
- }
- func unsubscribe(topics ...string) {
- subscriptionsmutex.Lock()
- defer subscriptionsmutex.Unlock()
- for _, topic := range topics {
- topic = strings.TrimSpace(topic)
- if sub := subscriptions[topic]; sub != nil {
- if err := sub.Unsubscribe(context.Background()); err != nil {
- logger.Error(err)
- } else {
- delete(subscriptions, topic)
- logger.Info("unsubscribe topic " + topic + " done")
- }
- }
- }
- }
|