package odbcmql import ( "context" "fmt" "strings" "sync" "time" odb "git.wecise.com/wecise/odb-go/odb" ) var subscriptionsmutex sync.Mutex var subscriptions = map[string]odb.Subscription{} var receivedcount = map[string]int{} func subscribe(topics ...string) { subscriptionsmutex.Lock() defer subscriptionsmutex.Unlock() for _, topic := range topics { topic = strings.TrimSpace(topic) if subscriptions[topic] == nil { sub := ODBC.Subscribe(context.Background(), topic) go func(sub odb.Subscription) { st := time.Now() for m := range sub.Channel() { eds := m.EventData().String() subscriptionsmutex.Lock() receivedcount[eds]++ n := receivedcount[eds] subscriptionsmutex.Unlock() logger.Info(fmt.Sprint("received ", topic, " notify message: ", eds, " [", n, "] in ", time.Since(st))) oem, err := m.EventMessage() if err != nil { logger.Error("event message error: " + err.Error()) } else { logger.Infof("event message value: %v", oem) } } }(sub) subscriptions[topic] = sub logger.Info("subscribe topic " + topic + " done") } } } func unsubscribe(topics ...string) { subscriptionsmutex.Lock() defer subscriptionsmutex.Unlock() for _, topic := range topics { topic = strings.TrimSpace(topic) if sub := subscriptions[topic]; sub != nil { if err := sub.Unsubscribe(context.Background()); err != nil { logger.Error(err) } else { delete(subscriptions, topic) logger.Info("unsubscribe topic " + topic + " done") } } } }