| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- package main
- import (
- "fmt"
- "os"
- "time"
- . "git.wecise.com/wecise/odbserver/odb"
- "git.wecise.com/wecise/odbserver/odb/cass"
- "git.wecise.com/wecise/odbserver/odb/cass/gocql"
- "git.wecise.com/wecise/odbserver/odb/gerrs"
- "gitee.com/wecisecode/util/logger"
- )
- func main() {
- field := os.Args[1]
- date := os.Args[2]
- keyspace := "matrix"
- if len(os.Args) > 3 {
- keyspace = os.Args[3]
- }
- logger.SetConsole(true)
- //option := &StoreOption{Cache:CacheAll}
- session, err := cass.NewCassSession(cass.Option{Keyspace: keyspace, DisableInitialHostLookup: true})
- if err != nil {
- logger.Error(err.Error())
- } else {
- defer session.Close()
- }
- for i := 0; i < 1000; i++ {
- //fmt.Print(field, date, keyspace)
- if has_data, err := delete_bucket(session, field, date, keyspace); err != nil {
- fmt.Printf("err ==> %v \n", err)
- return
- } else if !has_data {
- fmt.Printf("run %d delete ok, finish .\n", i)
- return
- }
- time.Sleep(10 * time.Second)
- fmt.Printf("run %d delete ok .\n", i+1)
- }
- fmt.Println("run 1000 delete ok, finish .")
- }
- func sessionQuery(session *gocql.Session, cql string, values ...interface{}) ([]map[string]interface{}, error) {
- cqlQuery := session.Query(cql, values...)
- if cqlQuery == nil {
- return nil, gerrs.ErrExec.New(" can't connect to cassandra ")
- }
- iter := cqlQuery.Iter()
- defer iter.Close()
- if data, err := iter.SliceMap(); err != nil {
- return nil, err
- } else {
- return data, nil
- }
- }
- func delete_bucket(session *gocql.Session, field, date, keyspace string) (has_data bool, err error) {
- has_data = false
- var pp int
- if pp, err = DateToPartition(date); err != nil {
- return has_data, gerrs.ExecErr("gdelete_bucket_index", fmt.Sprintf("date format error :%v .", err))
- }
- var rows []map[string]interface{}
- cql := `select tid from promdb where field=? limit 1 ALLOW FILTERING`
- rows, err = sessionQuery(session, cql, field)
- if err == nil && len(rows) > 0 {
- tid := rows[0]["tid"].(int64)
- sql := fmt.Sprintf(`select distinct id, partition from %s.promdb_%d limit 50000`, keyspace+"_tsdb", uint64(tid))
- iter := session.Query(sql).PageSize(TSDB_PAGE_SIZE).Iter()
- defer iter.Close()
- var id int64
- var partition int
- del_set := map[int64][]int{}
- cql := fmt.Sprintf(`delete from %s.promdb_%d where id=? and partition in ?`, keyspace+"_tsdb", uint64(tid))
- for iter.Scan(&id, &partition) {
- if partition <= pp {
- if ss, ok := del_set[id]; ok {
- del_set[id] = append(ss, partition)
- } else {
- del_set[id] = []int{partition}
- }
- }
- }
- //logger.Warn(cql, del_set)
- for id, partitions := range del_set {
- if err := session.Exec(cql, id, partitions); err != nil {
- return has_data, gerrs.ExecErr("gdelete_bucket_index", fmt.Sprintf("delete id:%v, partitions: %v error :%v .", id, partitions, err))
- }
- }
- if len(del_set) > 0 {
- has_data = true
- }
- }
- return has_data, nil
- }
|