subscribe.go 746 B

123456789101112131415161718192021222324252627282930313233
  1. package odbc
  2. import (
  3. "context"
  4. "git.wecise.com/wecise/odb-go/odb"
  5. "git.wecise.com/wecise/odb-go/odb/eventmsg"
  6. )
  7. func SubscribeTest(client odb.Client) {
  8. Subscribe(client, "CLASS_DATA_CHANGE:/")
  9. Subscribe(client, "META_DATA_CHANGE:/")
  10. }
  11. func Subscribe(client odb.Client, topic string) {
  12. sub := client.Subscribe(context.Background(), topic)
  13. go func() {
  14. defer func() {
  15. if err := sub.Unsubscribe(context.Background()); err != nil {
  16. log.Error(err)
  17. return
  18. }
  19. }()
  20. for m := range sub.Channel() {
  21. oem, err := eventmsg.FromMsgpack([]byte(m.Payload))
  22. if err != nil {
  23. log.Error(err)
  24. }
  25. string_message := oem.Data.String()
  26. log.Info(topic + " received " + topic + " message " + "\n" + string_message)
  27. }
  28. }()
  29. }