package main import ( "flag" "fmt" "os" "runtime/pprof" "strconv" "strings" "sync" "time" "encoding/json" "git.wecise.com/wecise/common/etcd/env" "git.wecise.com/wecise/common/lib/gocql/gocql" "git.wecise.com/wecise/odb-go/odb" "gitee.com/wecisecode/util/logger" "github.com/cheggaaa/pb" ) var ( hosts = flag.String("h", "", "Cluster hosts.") replicationFactor = flag.String("r", "dc1:3", "Replication factor, format => dc1:3,dc2:3") keyspace = flag.String("k", "matrix", "Test keyspace.") columnNums = flag.Int("n", 10, "Column nums.") tableColumnNums = flag.Int("tn", 0, "Column nums.") hasIndex = flag.Bool("i", false, "Create index.") poolsize = flag.Int("p", 100, "Exec pool size.") statInterval = flag.Int("s", 1, "Stat interval.") dataSize = flag.Int("d", 1000000, "Test data size.") colname = flag.String("c", "column", "Column base name.") ispprof = flag.Bool("f", false, "Output pprof.") debug = flag.Bool("debug", false, "Log debug.") //isOrigin = flag.Bool("o", false, "Origin api insert.") isMemory = flag.Bool("m", false, "Memory cache mode.") class = flag.String("a", "/tobject", "Test class name.") beginNumber = flag.Int("b", 0, "Number of begin value.") auth = flag.Bool("auth", false, "Use auth.") user = flag.String("user", "cassandra", "Username.") pass = flag.String("pass", "cassandra", "Password.") isLargePartion = flag.Bool("lp", false, "Large-partion data.") ttl = flag.Int("t", -1, "Test data ttl.") isJson = flag.Bool("j", false, "Json mode.") dataWokerPool = 500 wgchan = new(sync.WaitGroup) wgdata = new(sync.WaitGroup) ) type object struct { mql string values []interface{} } func main() { flag.Parse() logger.SetConsole(*debug) etcdenv := os.Getenv("ETCDPATH") if etcdenv == "" { fmt.Println("No ETCDPATH.") os.Exit(1) } if *tableColumnNums == 0 { *tableColumnNums = *columnNums } if *tableColumnNums < *columnNums { fmt.Println("tn must be greater than or equal to n") } session := getSession() defer session.Close() fmt.Println("Cassandra connected.") db := getDB() defer func() { _ = db.Close() }() fmt.Println("Odbserver connected.") prepareTest(session) fmt.Println("Prepare the test class data to complete.") // start stat sch := make(chan int, *poolsize) // start worker wch := make(chan *object, *poolsize) for i := 0; i < *poolsize; i++ { wgchan.Add(1) go worker(db, wch, sch) } // create data fmt.Println("Creating data...") datas := make([]*object, 0) remainingNums := *dataSize % dataWokerPool perWorkNums := (*dataSize - (*dataSize % dataWokerPool)) / dataWokerPool dwkChan := make(chan *[]*object, 1) if perWorkNums != 0 { for i := 0; i < dataWokerPool; i++ { n := i * perWorkNums go dataWorker(n+*beginNumber, perWorkNums, dwkChan) } } if remainingNums != 0 { var n int if perWorkNums != 0 { n = dataWokerPool*perWorkNums + 1 } go dataWorker(n+*beginNumber, remainingNums, dwkChan) } barCount := dataWokerPool if remainingNums != 0 { barCount += 1 } bar := pb.New(barCount).SetWidth(50) bar.ShowCounters = false bar.Start() for d := range dwkChan { datas = append(datas, *d...) bar.Increment() //time.Sleep(10*time.Millisecond) if len(datas) == *dataSize { break } } close(dwkChan) bar.Finish() fmt.Printf("Create %d data to complete.\n", len(datas)) wgchan.Add(1) go stat(sch) // pprof if *ispprof { pf, err := os.Create("odbtest_cpu.pprof") if err != nil { fmt.Println(err) os.Exit(1) } defer func() { _ = pf.Close() }() _ = pprof.StartCPUProfile(pf) defer pprof.StopCPUProfile() pfMem, err := os.Create("odbtest_mem.pprof") if err != nil { fmt.Println(err) os.Exit(1) } defer func() { _ = pfMem.Close() }() _ = pprof.WriteHeapProfile(pfMem) //tf, err := os.Create("odbtest.trace") //if err != nil { // fmt.Println(err) // os.Exit(1) //} //trace.Start(tf) //defer trace.Stop() } fmt.Println("Start to insert data.") // send data for _, d := range datas { wgdata.Add(1) wch <- d } wgdata.Wait() close(wch) close(sch) wgchan.Wait() fmt.Println("Test finished.") } func getDB() odb.Client { var ( hosts []string err error ) s := os.Getenv("ODBPATH") if s == "" { hosts, err = env.GetVars("ODBPATH") if err != nil { fmt.Println("New db error:", err.Error()) os.Exit(1) } } else { hosts = strings.Split(s, ",") for i := range hosts { hosts[i] = strings.TrimSpace(hosts[i]) } } root_keyspace := *keyspace if idx := strings.Index(root_keyspace, "_"); idx != -1 { root_keyspace = root_keyspace[:idx] } db, err := odb.NewClient(&odb.Config{ Keyspace: root_keyspace, Hosts: hosts, PoolSize: 1000, Debug: true, }) if err != nil { fmt.Println("New db error:", err.Error()) os.Exit(1) } //db, err := modb.New(*keyspace) //if err != nil { // fmt.Println("New db error:", err.Error()) // os.Exit(1) //} return db } func prepareTest(session *gocql.Session) { var err error /*err := session.Query("DROP KEYSPACE IF EXISTS " + *keyspace).Exec() checkError(err) fmt.Println("Drop keyspace wait 5s.") time.Sleep(5*time.Second) dcs := strings.Split(*replicationFactor, ",") strr := []string{} for _, dc := range dcs { dc_r := strings.Split(dc, ":") if len(dc_r) != 2 { fmt.Println("dc replicationFactor format err", *replicationFactor) os.Exit(1) } strr = append(strr, fmt.Sprintf("'%s':%s", dc_r[0], dc_r[1]) ) } cql := fmt.Sprintf(`CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class':'NetworkTopologyStrategy', %s}`, *keyspace, strings.Join(strr,",") ) fmt.Printf("keyspace: %s.\n", cql) err = session.Query(cql).Exec() checkError(err) fmt.Println("Create keyspace wait 5s.") time.Sleep(5*time.Second) */ err = session.Query(fmt.Sprintf(`DROP TABLE IF EXISTS %s.tobject`, *keyspace)).Exec() checkError(err) table := `CREATE TABLE %s.tobject ( id bigint, day date, %s, time timestamp, PRIMARY key (id, time) ) WITH CLUSTERING ORDER BY (time DESC)` if *isMemory { table += ` AND caching = {'keys':'NONE', 'rows_per_partition':'ALL'}` } cols := make([]string, *tableColumnNums) for i := range cols { cols[i] = fmt.Sprintf("%s%d varchar", *colname, i) } table = fmt.Sprintf(table, *keyspace, strings.Join(cols, ",\n")) err = session.Query(table).Exec() checkError(err) if *hasIndex { err = session.Query(fmt.Sprintf(`DROP INDEX IF EXISTS %s.tobject_index`, *keyspace)).Exec() checkError(err) idxs := make([]string, *tableColumnNums) for i := range idxs { idxs[i] = fmt.Sprintf(`%s%d: {type : "string"}`, *colname, i) } /* 'refresh_seconds': '1', 'ram_buffer_mb': '256', 'max_merge_mb': '20', 'max_cached_mb': '120', 'indexing_threads': '16', 'indexing_queues_size': '200', */ index := `CREATE CUSTOM INDEX tobject_index ON %s.tobject () USING 'com.stratio.cassandra.lucene.Index' WITH OPTIONS = { 'refresh_seconds': '1', 'indexing_threads': '16', 'indexing_queues_size': '200', 'schema' : '{ fields : { id : {type: "integer"}, day : {type: "date", pattern: "yyyy-MM-dd"}, time : {type: "date", pattern: "yyyy-MM-dd HH:mm:ss.SSS"}, %s } }' }` index = fmt.Sprintf(index, *keyspace, strings.Join(idxs, ",\n")) //fmt.Println(index) err = session.Query(index).Exec() checkError(err) } } func worker(db odb.Client, wch chan *object, sch chan int) { var ( o *object err error ) for o = range wch { if _, err = db.Query(o.mql, o.values...).Do(); err != nil { fmt.Println("Exec error:", err.Error()) sch <- 2 } else { sch <- 1 } wgdata.Done() } wgchan.Done() } func dataWorker(base, num int, ch chan *[]*object) { datas := make([]*object, num) lpNums := 3 // Large-partition nums if *isJson { for i := 0; i < num; i++ { o := &object{} cql := "insert into /native/tobject json ? default unset" cols := make([]string, *columnNums) for k := range cols { cols[k] = *colname + strconv.Itoa(k) } m := make(map[string]interface{}) now := time.Now() if *isLargePartion { m["id"] = (base + i) % lpNums } else { m["id"] = base + i } m["day"] = now.Format("2006-01-02") m["time"] = now.UnixNano() / int64(time.Millisecond) for j, c := range cols { if *isLargePartion { m[c] = fmt.Sprintf("value_%d_%d", (base+i)%lpNums, j-3) } else { m[c] = fmt.Sprintf("value_%d_%d", base+i, j-3) } } b, _ := json.Marshal(m) o.values = []interface{}{string(b)} if *ttl != -1 { cql += " using ttl ?" o.values = append(o.values, *ttl) } o.mql = cql //fmt.Println(o.mql) //fmt.Println(o.values) datas[i] = o } } else { for i := 0; i < num; i++ { o := &object{} cql := "insert into /native/tobject (id, day, time, %s) values (?, ?, ?, %s)" cols := make([]string, *columnNums) vals := make([]string, *columnNums) for k := range cols { cols[k] = *colname + strconv.Itoa(k) vals[k] = "?" } cql = fmt.Sprintf(cql, strings.Join(cols, ", "), strings.Join(vals, ", ")) values := make([]interface{}, *columnNums+3) if *isLargePartion { values[0] = (base + i) % lpNums } else { values[0] = base + i } now := time.Now() values[1] = now.Format("2006-01-02") values[2] = now.UnixNano() / int64(time.Millisecond) for j := range values { if j > 2 { if *isLargePartion { values[j] = fmt.Sprintf("value_%d_%d", (base+i)%3, j-3) } else { values[j] = fmt.Sprintf("value_%d_%d", base+i, j-3) } } } if *ttl != -1 { cql += " using ttl ?" values = append(values, *ttl) } o.mql = cql o.values = values datas[i] = o } } ch <- &datas } func stat(ch chan int) { ticker := time.NewTicker(time.Duration(*statInterval) * time.Second) var ( success int fail int ) start := time.Now().Unix() L: for { select { case <-ticker.C: now := time.Now().Unix() speed := int64(success) / (now - start) if speed == 0 { continue } //countdown := getCountdown(int64(*dataSize - success - fail)/speed) fmt.Printf("\rStat info: success %d, fail %d, speed %d/s.", success, fail, speed) case n := <-ch: switch n { case 1: success++ case 2: fail++ default: now := time.Now().Unix() speed := int64(success) / (now - start) //countdown := getCountdown(int64(*dataSize - success - fail)/speed) fmt.Printf("\rStat info: success %d, fail %d, speed %d/s.", success, fail, speed) break L } } } fmt.Println() wgchan.Done() } func checkError(err error) { if err != nil { fmt.Println(err) os.Exit(1) } } func getSession() *gocql.Session { hostList := strings.Split(*hosts, ",") for i, h := range hostList { hostList[i] = strings.TrimSpace(h) } cluster := gocql.NewCluster(hostList...) cluster.Keyspace = "system" cluster.Timeout = 120 * time.Second cluster.Consistency = gocql.LocalOne cluster.ProtoVersion = 4 // default 2 cluster.NumConns = 2 cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) if *auth { cluster.Authenticator = gocql.PasswordAuthenticator{ Username: *user, Password: *pass, } } var session *gocql.Session session, err := cluster.CreateSession() if err != nil { fmt.Println("Create session error:", err.Error()) os.Exit(1) } return session } //func getCountdown(seconds int64) string { // var ( // d, h, m, s int64 // ) // if seconds >= 60 { // m = seconds/60 // s = seconds%60 // if m >= 60 { // h = m/60 // m = m%60 // if h >= 24 { // d = h/24 // h = h%24 // } // } // } // if d != 0 { // return fmt.Sprintf("%dd%dh%dm%ds", d, h, m, s) // } else { // if h != 0 { // return fmt.Sprintf("%dh%dm%ds", h, m, s) // } else { // if m != 0 { // return fmt.Sprintf("%dm%ds", m, s) // } else { // return fmt.Sprintf("%ds", s) // } // } // } //}