client.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package odbclient
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "strings"
  7. ccfg "git.wecise.com/wecise/common/matrix/cfg"
  8. clog "git.wecise.com/wecise/common/matrix/logger"
  9. "git.wecise.com/wecise/odb-go/odb"
  10. "github.com/scylladb/go-set/strset"
  11. )
  12. var cfg = ccfg.MConfig()
  13. var log = clog.New().WithConfig(cfg, "log")
  14. func init() {
  15. log.Info("cpu limit", 1)
  16. runtime.GOMAXPROCS(1)
  17. }
  18. var odbcfg *odb.Config
  19. func init() {
  20. odbpath := cfg.GetStrings("ODBPATH", "127.0.0.1:11001")
  21. keyspace := cfg.GetString("KEYSPACE", "oktest")
  22. poolsize := cfg.GetInt("POOLSIZE", 0)
  23. odbpaths := strset.New()
  24. for _, p := range odbpath {
  25. odbpaths.Add(strings.Split(p, ",")...)
  26. }
  27. odbcfg = &odb.Config{
  28. Hosts: odbpaths.List(),
  29. Keyspace: keyspace,
  30. User: fmt.Sprint("测试客户端", cfg.GetString("N")),
  31. Pass: "********",
  32. PoolSize: poolsize,
  33. Debug: true,
  34. }
  35. }
  36. func Config() *odb.Config {
  37. return odbcfg
  38. }
  39. func NewClient() (c odb.Client, err error) {
  40. client, err := odb.NewClient(odbcfg)
  41. if err != nil {
  42. return nil, err
  43. }
  44. return client, nil
  45. }
  46. func Subscribe(client odb.Client, topic string) {
  47. sub := client.Subscribe(context.Background(), topic)
  48. go func() {
  49. defer func() {
  50. if err := sub.Unsubscribe(context.Background()); err != nil {
  51. log.Error(err)
  52. return
  53. }
  54. }()
  55. for m := range sub.Channel() {
  56. log.Info(topic + " received " + m.Payload)
  57. }
  58. }()
  59. }