package odbclient import ( "context" "encoding/json" "fmt" "runtime" "strings" ccfg "git.wecise.com/wecise/common/matrix/cfg" clog "git.wecise.com/wecise/common/matrix/logger" "git.wecise.com/wecise/odb-go/odb" "git.wecise.com/wecise/odb-go/odb/eventmsg" "github.com/scylladb/go-set/strset" "gopkg.in/yaml.v3" ) var cfg = ccfg.MConfig() var log = clog.New().WithConfig(cfg, "log") func init() { log.Info("cpu limit", 1) runtime.GOMAXPROCS(1) } var odbcfg *odb.Config func init() { odbpath := cfg.GetStrings("ODBPATH", "127.0.0.1:11001,47.92.151.165:11001") keyspace := cfg.GetString("KEYSPACE", "oktest") poolsize := cfg.GetInt("POOLSIZE", 0) odbpaths := strset.New() for _, p := range odbpath { odbpaths.Add(strings.Split(p, ",")...) } odbcfg = &odb.Config{ Hosts: odbpaths.List(), Keyspace: keyspace, User: fmt.Sprint("测试客户端", cfg.GetString("N")), Pass: "********", PoolSize: poolsize, Debug: true, } } func Config() *odb.Config { return odbcfg } func NewClient() (c odb.Client, err error) { client, err := odb.NewClient(odbcfg) if err != nil { return nil, err } return client, nil } func Subscribe(client odb.Client, topic string) { sub := client.Subscribe(context.Background(), topic) go func() { defer func() { if err := sub.Unsubscribe(context.Background()); err != nil { log.Error(err) return } }() for m := range sub.Channel() { oem, err := eventmsg.FromMsgpack([]byte(m.Payload)) if err != nil { log.Error(err) } mm := map[string]interface{}{} json.Unmarshal([]byte(oem.JsonString()), &mm) bs, _ := yaml.Marshal(map[string]interface{}{"oem": mm}) log.Info(topic + " received " + m.Payload + "\n" + string(bs)) } }() }