package main import ( "fmt" "os" "time" . "git.wecise.com/wecise/odbserver/odb" "git.wecise.com/wecise/odbserver/odb/cass" "git.wecise.com/wecise/odbserver/odb/cass/gocql" "git.wecise.com/wecise/odbserver/odb/gerrs" "gitee.com/wecisecode/util/logger" ) func main() { field := os.Args[1] date := os.Args[2] keyspace := "matrix" if len(os.Args) > 3 { keyspace = os.Args[3] } logger.SetConsole(true) //option := &StoreOption{Cache:CacheAll} session, err := cass.NewCassSession(cass.Option{Keyspace: keyspace, DisableInitialHostLookup: true}) if err != nil { logger.Error(err.Error()) } else { defer session.Close() } for i := 0; i < 1000; i++ { //fmt.Print(field, date, keyspace) if has_data, err := delete_bucket(session, field, date, keyspace); err != nil { fmt.Printf("err ==> %v \n", err) return } else if !has_data { fmt.Printf("run %d delete ok, finish .\n", i) return } time.Sleep(10 * time.Second) fmt.Printf("run %d delete ok .\n", i+1) } fmt.Println("run 1000 delete ok, finish .") } func sessionQuery(session *gocql.Session, cql string, values ...interface{}) ([]map[string]interface{}, error) { cqlQuery := session.Query(cql, values...) if cqlQuery == nil { return nil, gerrs.ErrExec.New(" can't connect to cassandra ") } iter := cqlQuery.Iter() defer iter.Close() if data, err := iter.SliceMap(); err != nil { return nil, err } else { return data, nil } } func delete_bucket(session *gocql.Session, field, date, keyspace string) (has_data bool, err error) { has_data = false var pp int if pp, err = DateToPartition(date); err != nil { return has_data, gerrs.ExecErr("gdelete_bucket_index", fmt.Sprintf("date format error :%v .", err)) } var rows []map[string]interface{} cql := `select tid from promdb where field=? limit 1 ALLOW FILTERING` rows, err = sessionQuery(session, cql, field) if err == nil && len(rows) > 0 { tid := rows[0]["tid"].(int64) sql := fmt.Sprintf(`select distinct id, partition from %s.promdb_%d limit 50000`, keyspace+"_tsdb", uint64(tid)) iter := session.Query(sql).PageSize(TSDB_PAGE_SIZE).Iter() defer iter.Close() var id int64 var partition int del_set := map[int64][]int{} cql := fmt.Sprintf(`delete from %s.promdb_%d where id=? and partition in ?`, keyspace+"_tsdb", uint64(tid)) for iter.Scan(&id, &partition) { if partition <= pp { if ss, ok := del_set[id]; ok { del_set[id] = append(ss, partition) } else { del_set[id] = []int{partition} } } } //logger.Warn(cql, del_set) for id, partitions := range del_set { if err := session.Exec(cql, id, partitions); err != nil { return has_data, gerrs.ExecErr("gdelete_bucket_index", fmt.Sprintf("delete id:%v, partitions: %v error :%v .", id, partitions, err)) } } if len(del_set) > 0 { has_data = true } } return has_data, nil }