package main import ( "encoding/json" "errors" "fmt" "io/ioutil" "os" "strings" "time" //"reflect" "git.wecise.com/wecise/common/cassandra" "git.wecise.com/wecise/odbserver/odb/cass/gocql" "git.wecise.com/wecise/odbserver/odb/promdb/pdstru" "git.wecise.com/wecise/odbserver/odb/util" "gitee.com/wecisecode/util/logger" "gitee.com/wecisecode/util/set/iset" "gitee.com/wecisecode/util/set/strset" "gitee.com/wecisecode/util/set/u64set" "github.com/dgryski/go-farm" /*. "git.wecise.com/wecise/odbserver/odb" "git.wecise.com/wecise/odbserver/odb/gerrs" */) const ( PROMDB_SINGLE = 0x0001 PROMDB_SHARE = 0x0002 PROMDB_SYNCDC = 0x0004 PROMDB_NOCACHE = 0x0008 // feature PROMDB_EIGEN = 0x0100 //PROMDB_PARTITION = 0x0200 ) type PromdbInfo struct { Cluster []string `json:"cluster"` Keyspace string `json:"keyspace"` DC string `json:"dc"` Field string `json:"field"` } type SyncConfig struct { From *PromdbInfo `json:"from"` To *PromdbInfo `json:"to"` Class []string `json:"class"` Date []string `json:"date"` SyncDict bool `json:"syncdict,omitempty"` } func PromdbKeyspaceWithOption(option int, keyspace, dc string) string { if option&PROMDB_SYNCDC > 0 { return keyspace } else { if dc == "dc1" { return keyspace + "_tsdb" } else { return keyspace + "_" + dc + "_tsdb" } } } func DateToPartition(dateString string) (int, error) { if t, err := time.Parse("2006-01-02", dateString); err != nil { return 0, err } else { return MSToPartition(t.UnixNano() / 1000000), nil } } func MSToPartition(msseconds int64) int { //---------------------- // align with local date //---------------------- return int((msseconds + util.G_TIME_ZONE*1000) / (86400000)) } func iddate(from_info *Info, from_promdb_session *gocql.Session, ids []string, dates []string) ([]uint64, []int, error) { var rows []map[string]interface{} var err error if from_info.Option&PROMDB_SHARE > 0 { var cql string if from_info.Slot == 0 { cql = fmt.Sprintf(`select distinct(id, partition, tid) as i_p from %s.promdb_%d`, from_info.Keyspace, uint64(from_info.Tid)) if rows, err = from_promdb_session.SliceMap(cql); err != nil { return nil, nil, err } } else { for i := 0; i < from_info.Slot; i++ { if i == 0 { cql = fmt.Sprintf(`select distinct(id, partition, tid) as i_p from %s.promdb_%d`, from_info.Keyspace, uint64(from_info.Tid)) } else { cql = fmt.Sprintf(`select distinct(id, partition, tid) as i_p from %s.promdb_%d_%d`, from_info.Keyspace, uint64(from_info.Tid), i) } if one, err := from_promdb_session.SliceMap(cql); err != nil { return nil, nil, err } else { rows = append(rows, one...) } } } } else { var cql string if from_info.Slot == 0 { cql = fmt.Sprintf(`select distinct(id, partition) as i_p from %s.promdb_%d`, from_info.Keyspace, uint64(from_info.Tid)) logger.Warn(cql) if rows, err = from_promdb_session.SliceMap(cql); err != nil { return nil, nil, err } } else { for i := 0; i < from_info.Slot; i++ { if i == 0 { cql = fmt.Sprintf(`select distinct(id, partition) as i_p from %s.promdb_%d`, from_info.Keyspace, uint64(from_info.Tid)) } else { cql = fmt.Sprintf(`select distinct(id, partition) as i_p from %s.promdb_%d_%d`, from_info.Keyspace, uint64(from_info.Tid), i) } //logger.Warn(cql) if one, err := from_promdb_session.SliceMap(cql); err != nil { return nil, nil, err } else { rows = append(rows, one...) } } } } var forid, fordate bool nids := u64set.New() if len(ids) == 0 { forid = true } else { for _, pid := range ids { id := farm.Hash64([]byte(strings.TrimSpace(pid))) nids.Add(id) } } partitions := iset.New() if len(dates) == 0 { fordate = true } else { for _, sdate := range dates { sdate = strings.TrimSpace(sdate) var partition int if partition, err = DateToPartition(sdate); err != nil { return nil, nil, errors.New(fmt.Sprintf("date format error :%v .", err)) } partitions.Add(partition) } } //logger.Warn(rows) for _, row := range rows { if forid { if v, ok := row["i_p[0]"]; ok { nids.Add(uint64(v.(int64))) } } if fordate { if v, ok := row["i_p[1]"]; ok { partitions.Add(v.(int)) } } } return nids.List(), partitions.List(), nil } func copyIDPartition(from_info, to_info *Info, from_promdb_session, to_promdb_session *gocql.Session, id uint64, partition int) error { var rows []map[string]interface{} var err error var tmod int if from_info.Option&PROMDB_SHARE > 0 { if from_info.Slot > 1 { tmod = int(id % uint64(from_info.Slot)) } var cql string if tmod > 0 { cql = fmt.Sprintf(`select * from %s.promdbs_%d where tid=? and id=? and partition=?`, from_info.Keyspace, tmod) } else { cql = fmt.Sprintf(`select * from %s.promdbs where tid=? and id=? and partition=?`, from_info.Keyspace) } rows, err = from_promdb_session.SliceMap(cql, from_info.Tid, id, partition) } else { if from_info.Slot > 1 { tmod = int(id % uint64(from_info.Slot)) } var cql string if tmod > 0 { cql = fmt.Sprintf(`select * from %s.promdb_%d_%d where id=? and partition=?`, from_info.Keyspace, uint64(from_info.Tid), tmod) } else { cql = fmt.Sprintf(`select * from %s.promdb_%d where id=? and partition=?`, from_info.Keyspace, uint64(from_info.Tid)) } //logger.Error(cql, id, partition) rows, err = from_promdb_session.SliceMap(cql, id, partition) //logger.Error(rows) } if err != nil { return err } if to_info.Slot > 1 { tmod = int(id % uint64(to_info.Slot)) } else { tmod = 0 } var insert_cql string if tmod > 0 { if to_info.Option&PROMDB_EIGEN > 0 { if to_info.Option&PROMDB_SHARE > 0 { insert_cql = fmt.Sprintf(`insert into %s.promdbs_%d (tid, id, partition, data, sdata, endtime, time, hour, num, eigen) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace, tmod) } else { insert_cql = fmt.Sprintf(`insert into %s.promdb_%d_%d (id, partition, data, sdata, endtime, time, hour, num, eigen) values (?, ?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace, uint64(to_info.Tid), tmod) } } else { if to_info.Option&PROMDB_SHARE > 0 { insert_cql = fmt.Sprintf(`insert into %s.promdbs_%d (tid, id, partition, data, sdata, endtime, time, hour, num) values (?, ?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace, tmod) } else { insert_cql = fmt.Sprintf(`insert into %s.promdb_%d_%d (id, partition, data, sdata, endtime, time, hour, num) values (?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace, uint64(to_info.Tid), tmod) } } } else { if to_info.Option&PROMDB_EIGEN > 0 { if to_info.Option&PROMDB_SHARE > 0 { insert_cql = fmt.Sprintf(`insert into %s.promdbs (tid, id, partition, data, sdata, endtime, time, hour, num, eigen) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace) } else { insert_cql = fmt.Sprintf(`insert into %s.promdb_%d (id, partition, data, sdata, endtime, time, hour, num, eigen) values (?, ?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace, uint64(to_info.Tid)) } } else { if to_info.Option&PROMDB_SHARE > 0 { insert_cql = fmt.Sprintf(`insert into %s.promdbs (tid, id, partition, data, sdata, endtime, time, hour, num) values (?, ?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace) } else { insert_cql = fmt.Sprintf(`insert into %s.promdb_%d (id, partition, data, sdata, endtime, time, hour, num) values (?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace, uint64(to_info.Tid)) } } } for _, row := range rows { var data []byte if row["data"] != nil { data = row["data"].([]byte) } var sdata map[int64]string if row["sdata"] != nil { sdata = row["sdata"].(map[int64]string) } endtime := row["endtime"].(float64) time := row["time"].(float64) var hour int if row["hour"] != nil { hour = row["hour"].(int) } var num int if row["num"] != nil { num = row["num"].(int) } var eigen int64 if from_info.Option&PROMDB_EIGEN > 0 && row["eigen"] != nil { eigen = row["eigen"].(int64) } if to_info.Option&PROMDB_EIGEN > 0 { if eigen == 0 { pdistru := &pdstru.PromdbInsertStru{Sdata: sdata, Data: data} eigen = pdistru.Eigen() } if to_info.Option&PROMDB_SHARE > 0 { if err = to_promdb_session.Exec(insert_cql, from_info.Tid, id, partition, data, sdata, endtime, time, hour, num, eigen); err != nil { return err } } else { //logger.Warn(insert_cql, id, partition, data, sdata, endtime, time, hour, num, eigen) if err = to_promdb_session.Exec(insert_cql, id, partition, data, sdata, endtime, time, hour, num, eigen); err != nil { logger.Warn(err) return err } } } else { if to_info.Option&PROMDB_SHARE > 0 { if err = to_promdb_session.Exec(insert_cql, from_info.Tid, id, partition, data, sdata, endtime, time, hour, num); err != nil { return err } } else { //logger.Warn(id, partition, data, sdata, endtime, time, hour, num, eigen) if err = to_promdb_session.Exec(insert_cql, id, partition, data, sdata, endtime, time, hour, num); err != nil { return err } } } } return nil } func copyPromdb(from_info, to_info *Info, from_promdb_session, to_promdb_session *gocql.Session, ids, dates []string) (err error) { logger.Infof("sync from(tid): %v, to(tid): %v, id: %v, date: %v .", from_info.Tid, to_info.Tid, ids, dates) logger.Infof("from info: %v , to_info: %v", from_info, to_info) if len(ids) == 0 || len(dates) == 0 { ids, partitions, err := iddate(from_info, from_promdb_session, ids, dates) //logger.Warn(ids, partitions) if err != nil { return err } for _, id := range ids { for _, partition := range partitions { logger.Infof("sync id:%d , partition:%d .", id, partition) if err := copyIDPartition(from_info, to_info, from_promdb_session, to_promdb_session, id, partition); err != nil { return err } } } return nil } for _, pid := range ids { id := farm.Hash64([]byte(strings.TrimSpace(pid))) for _, sdate := range dates { logger.Infof("sync id:%s , date:%s .", pid, sdate) sdate = strings.TrimSpace(sdate) var partition int if partition, err = DateToPartition(sdate); err != nil { return errors.New(fmt.Sprintf("date format error :%v .", err)) } if err := copyIDPartition(from_info, to_info, from_promdb_session, to_promdb_session, id, partition); err != nil { return err } } } return nil } type Option struct { Keyspace string Addrs []string User string Pass string DisableInitialHostLookup bool } type Client struct { Session *gocql.Session } type Info struct { Keyspace string Option int Tid int64 Slot int Dict bool } func GetEnv() (addrs []string, user string, pass string, err error) { return cassandra.GetEnv() } func NewCassClient(opt *Option) (*Client, error) { c, e := cassandra.NewClientByOption(cassandra.Option{ Keyspace: opt.Keyspace, Addrs: opt.Addrs, // 默认通过 GetEnv() 获取 User: opt.User, // 默认通过 GetEnv() 获取 Pass: opt.Pass, // 默认通过 GetEnv() 获取 DCPolicy: "", // 默认从ETCD获取 Consistency: "", // 默认从ETCD获取 DisableInitialHostLookup: opt.DisableInitialHostLookup}) if e != nil { return nil, e } return &Client{ Session: &gocql.Session{ Session: c.Session, ClusterConfig: c.Config, Keyspace: opt.Keyspace, }}, nil } func NewCassSession(cassOption *Option) (*gocql.Session, error) { client, err := NewCassClient(cassOption) if err != nil { return nil, err } return client.Session, nil } func classInfo(from_session *gocql.Session, class string) ([]string, string, error) { cql := `select subclass, namespace from class where name= ?` if rows, err := from_session.SliceMap(cql, class); err != nil { return nil, "", err } else if len(rows) == 0 { return nil, "", errors.New(fmt.Sprintf("class %s not exists .", class)) } else { subclass := []string{} namespace := "" if sub, ok := rows[0]["subclass"]; ok { subclass = sub.([]string) } if ns, ok := rows[0]["namespace"]; ok { namespace = ns.(string) } return subclass, namespace, nil } } func findSubClass(from_session *gocql.Session, class string) ([]string, error) { cql := `select name from class` if rows, err := from_session.SliceMap(cql); err != nil { return nil, err } else { clazz := []string{class[:len(class)-1]} for _, row := range rows { cls := row["name"].(string) if strings.HasPrefix(cls, class) { clazz = append(clazz, cls) } } return clazz, nil } } func main() { /*config := ` { "From" : { "cluster" : ["47.92.151.165"], "keyspace": "matrix", "DC" : "dc1", "field" : "prom" }, "To" : { "cluster" : ["47.92.151.165"], "keyspace": "matrix", "DC" : "dc1", "field" : "prom2" }, "start": "2024-09-14 00:00:00", "end" : "2024-09-15 00:00:00" } `*/ config_file := "config.json" if len(os.Args) >= 2 { config_file = os.Args[1] } var from_session, to_session, from_promdb_session, to_promdb_session *gocql.Session var err error conf := &SyncConfig{} if content, err := ioutil.ReadFile(config_file); err != nil { logger.Errorf("read config file : %s, error: %v", config_file, err) } else { if err := json.Unmarshal([]byte(content), conf); err != nil { logger.Errorf("config error: %v", err) } else { logger.Infof("From: %v", conf.From) logger.Infof("To: %v", conf.To) logger.Infof("Class: %v", conf.Class) logger.Infof("Date: %v", conf.Date) } } logger.SetConsole(true) from_session, err = NewCassSession(&Option{Addrs: conf.From.Cluster, Keyspace: conf.From.Keyspace, User: "cassandra", Pass: "9CxUjyNZ1QrztwigBeFjxA==", DisableInitialHostLookup: true}) if err != nil { logger.Error(err.Error()) return } else { defer from_session.Close() } to_session, err = NewCassSession(&Option{Addrs: conf.To.Cluster, Keyspace: conf.To.Keyspace, User: "cassandra", Pass: "9CxUjyNZ1QrztwigBeFjxA==", DisableInitialHostLookup: true}) if err != nil { logger.Error(err.Error()) return } else { defer to_session.Close() } cql := `select tid, option, slot, dict from promdb where field= ? limit 1 ALLOW FILTERING` var from_info, to_info *Info if srows, err := from_session.SliceMap(cql, conf.From.Field); err != nil { logger.Error(err.Error()) return } else if trows, err := to_session.SliceMap(cql, conf.To.Field); err != nil { logger.Error(err.Error()) return } else { if len(srows) == 0 { logger.Error(fmt.Sprintf("field %s not exist .", conf.From.Field)) return } if len(trows) == 0 { logger.Error(fmt.Sprintf("field %s not exist .", conf.To.Field)) return } from_option := srows[0]["option"].(int) from_keyspace := PromdbKeyspaceWithOption(from_option, conf.From.Keyspace, conf.From.DC) var self_dict bool if dict, ok := srows[0]["dict"]; ok { self_dict = dict.(bool) } from_info = &Info{Keyspace: from_keyspace, Option: from_option, Tid: srows[0]["tid"].(int64), Slot: srows[0]["slot"].(int), Dict: self_dict} to_option := trows[0]["option"].(int) to_keyspace := PromdbKeyspaceWithOption(to_option, conf.To.Keyspace, conf.To.DC) if dict, ok := trows[0]["dict"]; ok { self_dict = dict.(bool) } to_info = &Info{Keyspace: to_keyspace, Option: to_option, Tid: trows[0]["tid"].(int64), Slot: trows[0]["slot"].(int), Dict: self_dict} } from_promdb_session, err = NewCassSession(&Option{Addrs: conf.From.Cluster, Keyspace: from_info.Keyspace, User: "cassandra", Pass: "9CxUjyNZ1QrztwigBeFjxA==", DisableInitialHostLookup: true}) if err != nil { logger.Error(err.Error()) return } else { defer from_promdb_session.Close() } to_promdb_session, err = NewCassSession(&Option{Addrs: conf.To.Cluster, Keyspace: to_info.Keyspace, User: "cassandra", Pass: "9CxUjyNZ1QrztwigBeFjxA==", DisableInitialHostLookup: true}) if err != nil { logger.Error(err.Error()) return } else { defer to_promdb_session.Close() } clazz := strset.New() if len(conf.Class) > 0 { for _, class := range conf.Class { normal_class := true if strings.HasSuffix(class, "/") { normal_class = false if clss, err := findSubClass(from_session, class); err != nil { logger.Warnf(err.Error()) continue } else { for _, c := range clss { clazz.Add(c) } } class = class[:len(class)-1] } else { clazz.Add(class) } // class exists ? subclass, namespace, errr := classInfo(from_session, class) if errr != nil { logger.Warnf(errr.Error()) continue } else if len(subclass) > 0 { normal_class = false } table := "object" if namespace != "" { table = "object_" + namespace } var cql, param string if normal_class { cql = fmt.Sprintf("SELECT id FROM %s WHERE class = ?", table) param = class } else { idx := table + "_lucene" cql = fmt.Sprintf("SELECT id FROM %s WHERE expr(%s, ?)", table, idx) param = fmt.Sprintf(`{ filter: { type: "boolean", should: [ {type: "match", field: "class", value: "%s"}, {type: "prefix", field: "class", value: "%s/"} ] } }`, class, class) } iter := from_session.Query(cql, param).PageSize(1000).Iter() defer iter.Close() var id string ids := []string{} for iter.Scan(&id) { ids = append(ids, id) if len(ids) >= 100 { if err := copyPromdb(from_info, to_info, from_promdb_session, to_promdb_session, ids, conf.Date); err != nil { logger.Error(err.Error()) return } ids = []string{} } } if len(ids) > 0 { if err := copyPromdb(from_info, to_info, from_promdb_session, to_promdb_session, ids, conf.Date); err != nil { logger.Error(err.Error()) return } } } } else { if err := copyPromdb(from_info, to_info, from_promdb_session, to_promdb_session, []string{}, conf.Date); err != nil { logger.Error(err.Error()) return } } if conf.SyncDict { logger.Info("sync promdb dict .") var from_dict_cql string if from_info.Dict { from_dict_cql = fmt.Sprintf(`select name, class, ikey, labels from promdb_dict_%s where field='%s' allow filtering`, conf.From.Field, conf.From.Field) } else { from_dict_cql = fmt.Sprintf(`select name, class, ikey, labels from promdb_dict where field='%s' allow filtering`, conf.From.Field) } //logger.Error(from_dict_cql) iter2 := from_session.Query(from_dict_cql).PageSize(1000).Iter() defer iter2.Close() var name, class string var ikey int64 var labels map[string]string var to_dict_cql string if from_info.Dict { to_dict_cql = fmt.Sprintf(`insert into promdb_dict_%s (field, name, class, ikey, labels) values (?, ?, ?, ?, ?)`, conf.To.Field) } else { to_dict_cql = `insert into promdb_dict (field, name, class, ikey, labels) values (?, ?, ?, ?, ?)` } count := 0 for iter2.Scan(&name, &class, &ikey, &labels) { //logger.Error(to_dict_cql, conf.To.Field, name, class, ikey, labels) if clazz.Size() == 0 || clazz.Has(class) { count++ if count%1000 == 0 { logger.Infof("%d", count) } //logger.Error(to_dict_cql, conf.To.Field, name, class, ikey, labels) if err = to_session.Exec(to_dict_cql, conf.To.Field, name, class, ikey, labels); err != nil { logger.Warn(err) } } } logger.Infof("total : %d", count) } }