rebulk.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. /*
  2. Before you execute the program, Launch `cqlsh` and execute:
  3. create keyspace example with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
  4. create table example.tweet(timeline text, id UUID, text text, PRIMARY KEY(id));
  5. create index on example.tweet(timeline);
  6. */
  7. package main
  8. import (
  9. "fmt"
  10. "os"
  11. "strconv"
  12. "strings"
  13. "time"
  14. . "git.wecise.com/wecise/odbserver/odb"
  15. "git.wecise.com/wecise/odbserver/odb/test"
  16. "gitee.com/wecisecode/util/logger"
  17. )
  18. func main() {
  19. // connect to the cluster
  20. //cluster := gocql.NewCluster("192.168.40.14")
  21. keyspace := "icbctest"
  22. domain := "m3event"
  23. if len(os.Args) > 1 {
  24. keyspace = os.Args[1]
  25. }
  26. option := &Option{Cache: CacheAll, Keyspace: keyspace, DisableInitialHostLookup: true}
  27. g, err := test.NewG(option)
  28. if err != nil {
  29. logger.Error(err.Error())
  30. } else {
  31. defer g.Close()
  32. }
  33. //logger.SetRollingDaily("C:/test/zkcron/src/test", "test.log")
  34. logger.SetConsole(true)
  35. var table_name string
  36. if domain != keyspace {
  37. table_name = "object_" + domain
  38. } else {
  39. table_name = "object"
  40. }
  41. colcap := map[string]int{}
  42. if rows, err := g.RawQuery(`select column_name from system_schema.columns where keyspace_name=? and table_name=?`, keyspace, table_name); err != nil {
  43. logger.Error(err)
  44. return
  45. } else {
  46. for _, row := range rows {
  47. col := row["column_name"].(string)
  48. if strings.HasPrefix(col, "_") {
  49. continue
  50. }
  51. if idx := strings.LastIndex(col, "_"); idx == -1 {
  52. continue
  53. } else {
  54. prefix := col[:idx]
  55. n, _ := strconv.Atoi(col[idx+1:])
  56. if max, ok := colcap[prefix]; ok {
  57. if n > max {
  58. colcap[prefix] = n
  59. }
  60. } else {
  61. colcap[prefix] = n
  62. }
  63. }
  64. }
  65. }
  66. var collucene = map[string]string{
  67. "varchar": `{"type" : "string"}`,
  68. "text": `{"type" : "text"}`,
  69. "smallint": `{"type" : "integer"}`,
  70. "int": `{"type" : "integer"}`,
  71. "bigint": `{"type" : "bigint"}`,
  72. "double": `{"type" : "double"}`,
  73. "float": `{"type" : "float"}`,
  74. "boolean": `{"type" : "boolean"}`,
  75. "blob": `{"type" : "bytes"}`,
  76. "date": `{"type" : "date", "pattern" :"yyyy-MM-dd"}`,
  77. "timestamp": `{"type" : "date", "pattern" :"yyyy-MM-dd HH:mm:ss.SSS"}`,
  78. "set_varchar": `{"type" : "string"}`,
  79. "set_text": `{"type" : "text"}`,
  80. "set_double": `{"type" : "double"}`,
  81. "set_float": `{"type" : "float"}`,
  82. "set_int": `{"type" : "integer"}`,
  83. "list_varchar": `{"type" : "string"}`,
  84. "list_text": `{"type" : "text"}`,
  85. "list_double": `{"type" : "double"}`,
  86. "list_float": `{"type" : "float"}`,
  87. "list_int": `{"type" : "integer"}`,
  88. "map_varchar_text": `{"type" : "text"}`,
  89. "map_varchar_varchar": `{"type" : "string"}`,
  90. "map_varchar_float": `{"type" : "float"}`,
  91. "map_varchar_set": `{"type" : "string"}`,
  92. }
  93. var colstamp = map[string]time.Time{}
  94. now := time.Now()
  95. for colprefix := range colcap {
  96. colstamp[colprefix] = now
  97. }
  98. fmt.Println("update colbulk ...")
  99. logger.Warn(colcap)
  100. if _, err := g.RawQuery(`INSERT INTO colbulk (domain, cap, collucene, colstamp, version) VALUES(?, ?, ?, ?, ?)`, domain, colcap, collucene, colstamp, 1); err != nil {
  101. logger.Errorf(" error: %v", err)
  102. }
  103. }