client.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package odbclient
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "runtime"
  7. "strings"
  8. ccfg "git.wecise.com/wecise/common/matrix/cfg"
  9. clog "git.wecise.com/wecise/common/matrix/logger"
  10. "git.wecise.com/wecise/odb-go/odb"
  11. "git.wecise.com/wecise/odb-go/odb/eventmsg"
  12. "github.com/scylladb/go-set/strset"
  13. "gopkg.in/yaml.v3"
  14. )
  15. var cfg = ccfg.MConfig()
  16. var log = clog.New().WithConfig(cfg, "log")
  17. func init() {
  18. log.Info("cpu limit", 1)
  19. runtime.GOMAXPROCS(1)
  20. }
  21. var odbcfg *odb.Config
  22. func init() {
  23. odbpath := cfg.GetStrings("ODBPATH", "127.0.0.1:11001,47.92.151.165:11001")
  24. keyspace := cfg.GetString("KEYSPACE", "oktest")
  25. poolsize := cfg.GetInt("POOLSIZE", 0)
  26. odbpaths := strset.New()
  27. for _, p := range odbpath {
  28. odbpaths.Add(strings.Split(p, ",")...)
  29. }
  30. odbcfg = &odb.Config{
  31. Hosts: odbpaths.List(),
  32. Keyspace: keyspace,
  33. User: fmt.Sprint("测试客户端", cfg.GetString("N")),
  34. Pass: "********",
  35. PoolSize: poolsize,
  36. Debug: true,
  37. }
  38. }
  39. func Config() *odb.Config {
  40. return odbcfg
  41. }
  42. func NewClient() (c odb.Client, err error) {
  43. client, err := odb.NewClient(odbcfg)
  44. if err != nil {
  45. return nil, err
  46. }
  47. return client, nil
  48. }
  49. func Subscribe(client odb.Client, topic string) {
  50. sub := client.Subscribe(context.Background(), topic)
  51. go func() {
  52. defer func() {
  53. if err := sub.Unsubscribe(context.Background()); err != nil {
  54. log.Error(err)
  55. return
  56. }
  57. }()
  58. for m := range sub.Channel() {
  59. oem, err := eventmsg.FromMsgpack([]byte(m.Payload))
  60. if err != nil {
  61. log.Error(err)
  62. }
  63. mm := map[string]interface{}{}
  64. json.Unmarshal([]byte(oem.JsonString()), &mm)
  65. bs, _ := yaml.Marshal(map[string]interface{}{"oem": mm})
  66. log.Info(topic + " received " + m.Payload + "\n" + string(bs))
  67. }
  68. }()
  69. }