exportcql.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "strings"
  6. "time"
  7. . "git.wecise.com/wecise/odbserver/odb"
  8. "git.wecise.com/wecise/odbserver/odb/cass"
  9. "gitee.com/wecisecode/util/logger"
  10. )
  11. func main() {
  12. // connect to the cluster
  13. //cluster := gocql.NewCluster("192.168.40.14")
  14. date := time.Now().AddDate(0, -3, 0).Format("2006-01-02")
  15. if len(os.Args) > 1 {
  16. date = os.Args[1]
  17. }
  18. keyspace := "matrix"
  19. if len(os.Args) > 2 {
  20. keyspace = os.Args[2]
  21. }
  22. table := "vobject"
  23. if len(os.Args) > 3 {
  24. table = os.Args[3]
  25. }
  26. fmt.Printf("keyspace: %s , table: %s, date: %s\n", keyspace, table, date)
  27. var pp int
  28. var err error
  29. if pp, err = DateToPartition(date); err != nil {
  30. fmt.Printf("date format error :%v .", err)
  31. os.Exit(1)
  32. }
  33. if table == "vobject" || strings.HasPrefix(table, "vobject_") {
  34. var cassOption cass.Option
  35. cassOption.Keyspace = keyspace
  36. cassOption.DisableInitialHostLookup = true
  37. if session, err := cass.NewCassSession(cassOption); err != nil {
  38. logger.Error(err)
  39. } else {
  40. fmt.Printf("create session successfully .\n")
  41. sql := fmt.Sprintf(`select distinct id, day from %s.%s limit 50000`, keyspace, table)
  42. iter := session.Query(sql).PageSize(200).Iter()
  43. defer iter.Close()
  44. var id, day string
  45. del_set := map[string][]string{}
  46. record := 0
  47. for iter.Scan(&id, &day) {
  48. if partition, err := DateToPartition(day); err != nil {
  49. fmt.Printf("date format error :%v .", err)
  50. os.Exit(1)
  51. } else {
  52. record++
  53. if record%200 == 0 {
  54. fmt.Printf("current record:%d .\n", record)
  55. }
  56. if partition <= pp {
  57. if ss, ok := del_set[id]; ok {
  58. del_set[id] = append(ss, day)
  59. } else {
  60. del_set[id] = []string{day}
  61. }
  62. }
  63. }
  64. }
  65. fmt.Printf("select total %d.\n", len(del_set))
  66. if f, err := os.Create(fmt.Sprintf("%s_del.cql", table)); err != nil {
  67. fmt.Println("create file fail")
  68. os.Exit(1)
  69. } else {
  70. defer f.Close()
  71. counter := 0
  72. for id, days := range del_set {
  73. counter++
  74. cql := fmt.Sprintf("delete from %s.%s where id='%s' and day in (%s) ;\n", keyspace, table, id, `'`+strings.Join(days, `','`)+`'`)
  75. f.WriteString(cql)
  76. f.Sync()
  77. if counter%500 == 0 {
  78. fmt.Printf("write total :%d .\n", counter)
  79. }
  80. }
  81. }
  82. }
  83. } else {
  84. fmt.Printf("unsupport table %s .", table)
  85. os.Exit(1)
  86. }
  87. }