delbucket.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "time"
  6. . "git.wecise.com/wecise/odbserver/odb"
  7. "git.wecise.com/wecise/odbserver/odb/cass"
  8. "git.wecise.com/wecise/odbserver/odb/cass/gocql"
  9. "git.wecise.com/wecise/odbserver/odb/gerrs"
  10. "gitee.com/wecisecode/util/logger"
  11. )
  12. func main() {
  13. field := os.Args[1]
  14. date := os.Args[2]
  15. keyspace := "matrix"
  16. if len(os.Args) > 3 {
  17. keyspace = os.Args[3]
  18. }
  19. logger.SetConsole(true)
  20. //option := &StoreOption{Cache:CacheAll}
  21. session, err := cass.NewCassSession(cass.Option{Keyspace: keyspace, DisableInitialHostLookup: true})
  22. if err != nil {
  23. logger.Error(err.Error())
  24. } else {
  25. defer session.Close()
  26. }
  27. for i := 0; i < 1000; i++ {
  28. //fmt.Print(field, date, keyspace)
  29. if has_data, err := delete_bucket(session, field, date, keyspace); err != nil {
  30. fmt.Printf("err ==> %v \n", err)
  31. return
  32. } else if !has_data {
  33. fmt.Printf("run %d delete ok, finish .\n", i)
  34. return
  35. }
  36. time.Sleep(10 * time.Second)
  37. fmt.Printf("run %d delete ok .\n", i+1)
  38. }
  39. fmt.Println("run 1000 delete ok, finish .")
  40. }
  41. func sessionQuery(session *gocql.Session, cql string, values ...interface{}) ([]map[string]interface{}, error) {
  42. cqlQuery := session.Query(cql, values...)
  43. if cqlQuery == nil {
  44. return nil, gerrs.ErrExec.New(" can't connect to cassandra ")
  45. }
  46. iter := cqlQuery.Iter()
  47. defer iter.Close()
  48. if data, err := iter.SliceMap(); err != nil {
  49. return nil, err
  50. } else {
  51. return data, nil
  52. }
  53. }
  54. func delete_bucket(session *gocql.Session, field, date, keyspace string) (has_data bool, err error) {
  55. has_data = false
  56. var pp int
  57. if pp, err = DateToPartition(date); err != nil {
  58. return has_data, gerrs.ExecErr("gdelete_bucket_index", fmt.Sprintf("date format error :%v .", err))
  59. }
  60. var rows []map[string]interface{}
  61. cql := `select tid from promdb where field=? limit 1 ALLOW FILTERING`
  62. rows, err = sessionQuery(session, cql, field)
  63. if err == nil && len(rows) > 0 {
  64. tid := rows[0]["tid"].(int64)
  65. sql := fmt.Sprintf(`select distinct id, partition from %s.promdb_%d limit 50000`, keyspace+"_tsdb", uint64(tid))
  66. iter := session.Query(sql).PageSize(TSDB_PAGE_SIZE).Iter()
  67. defer iter.Close()
  68. var id int64
  69. var partition int
  70. del_set := map[int64][]int{}
  71. cql := fmt.Sprintf(`delete from %s.promdb_%d where id=? and partition in ?`, keyspace+"_tsdb", uint64(tid))
  72. for iter.Scan(&id, &partition) {
  73. if partition <= pp {
  74. if ss, ok := del_set[id]; ok {
  75. del_set[id] = append(ss, partition)
  76. } else {
  77. del_set[id] = []int{partition}
  78. }
  79. }
  80. }
  81. //logger.Warn(cql, del_set)
  82. for id, partitions := range del_set {
  83. if err := session.Exec(cql, id, partitions); err != nil {
  84. return has_data, gerrs.ExecErr("gdelete_bucket_index", fmt.Sprintf("delete id:%v, partitions: %v error :%v .", id, partitions, err))
  85. }
  86. }
  87. if len(del_set) > 0 {
  88. has_data = true
  89. }
  90. }
  91. return has_data, nil
  92. }