package main import ( "encoding/json" "flag" "fmt" "os" "runtime/pprof" "strconv" "strings" "sync" "time" "git.wecise.com/wecise/common/lib/gocql/gocql" "github.com/cheggaaa/pb" ) var ( hosts = flag.String("h", "", "Cluster hosts.") replicationFactor = flag.String("r", "dc1:3", "Replication factor, format => dc1:3,dc2:3") columnNums = flag.Int("n", 10, "Column nums.") tableColumnNums = flag.Int("tn", 0, "Column nums.") hasIndex = flag.Bool("i", false, "Create index.") poolsize = flag.Int("p", 100, "Exec pool size.") //statInterval = flag.Int("s", 10, "Stat interval.") dataSize = flag.Int("d", 1000000, "Test data size.") colname = flag.String("c", "column", "Column base name.") ispprof = flag.Bool("f", false, "Output pprof.") isJson = flag.Bool("j", false, "Json mode.") isMemory = flag.Bool("m", false, "Memory cache mode.") keyspace = flag.String("k", "test", "Test keyspace.") ttl = flag.Int("t", -1, "Test data ttl.") notPrepare = flag.Bool("np", false, "Not prepare.") isLargePartion = flag.Bool("lp", false, "Large-partion data.") auth = flag.Bool("auth", false, "Use auth.") user = flag.String("user", "cassandra", "Username.") pass = flag.String("pass", "cassandra", "Password.") dataWokerPool = 500 wgchan = new(sync.WaitGroup) wgdata = new(sync.WaitGroup) ) type object struct { cql string values []interface{} } func main() { flag.Parse() if *hosts == "" { fmt.Println("No hosts.") os.Exit(1) } if *tableColumnNums == 0 { *tableColumnNums = *columnNums } if *tableColumnNums < *columnNums { fmt.Println("tn must be greater than or equal to n") } session := getSession() defer session.Close() fmt.Println("Cassandra connected.") if *notPrepare { fmt.Println("Not Prepare.") } else { prepareTest(session) fmt.Println("Prepare the test table space to complete.") } // start stat sch := make(chan int, *poolsize) // start worker wch := make(chan *object, *poolsize) for i := 0; i < *poolsize; i++ { wgchan.Add(1) go worker(session, wch, sch) } // create data fmt.Println("Creating data...") datas := make([]*object, 0) remainingNums := *dataSize % dataWokerPool perWorkNums := (*dataSize - (*dataSize % dataWokerPool)) / dataWokerPool dwkChan := make(chan *[]*object, 1) if perWorkNums != 0 { for i := 0; i < dataWokerPool; i++ { n := i * perWorkNums go dataWorker(n, perWorkNums, dwkChan) } } if remainingNums != 0 { var n int if perWorkNums != 0 { n = dataWokerPool*perWorkNums + 1 } go dataWorker(n, remainingNums, dwkChan) } barCount := dataWokerPool if remainingNums != 0 { barCount += 1 } bar := pb.New(barCount).SetWidth(50) bar.ShowCounters = false bar.Start() for d := range dwkChan { datas = append(datas, *d...) bar.Increment() //time.Sleep(10*time.Millisecond) if len(datas) == *dataSize { break } } close(dwkChan) bar.Finish() //datas := make([]*object, *dataSize) //for i := 0; i < *dataSize; i++ { // o := &object{} // cql := "insert into test.tobject (id, day, time, %s) values (?, ?, ?, %s)" // cols := make([]string, *columnNums) // vals := make([]string, *columnNums) // for k := range cols { // cols[k] = *colname + strconv.Itoa(k) // vals[k] = "?" // } // cql = fmt.Sprintf(cql, strings.Join(cols, ", "), strings.Join(vals, ", ")) // values := make([]interface{}, *columnNums + 3) // values[0] = i // now := time.Now() // values[1] = now.Format("2006-01-02") // values[2] = now.UnixNano()/int64(time.Millisecond) // for j := range values { // if j > 2 { // values[j] = fmt.Sprintf("value_%d_%d", i, j - 3) // } // } // o.cql = cql // o.values = values // datas[i] = o //} fmt.Printf("Create %d data to complete.\n", len(datas)) wgchan.Add(1) go stat(sch) // pprof if *ispprof { pf, err := os.Create("casstest_cpu.pprof") if err != nil { fmt.Println(err) os.Exit(1) } defer func() { _ = pf.Close() }() _ = pprof.StartCPUProfile(pf) defer pprof.StopCPUProfile() pfMem, err := os.Create("casstest_mem.pprof") if err != nil { fmt.Println(err) os.Exit(1) } defer func() { _ = pfMem.Close() }() _ = pprof.WriteHeapProfile(pfMem) //tf, err := os.Create("casstest.trace") //if err != nil { // fmt.Println(err) // os.Exit(1) //} //trace.Start(tf) //defer trace.Stop() } fmt.Println("Start to insert data.") // send data for _, d := range datas { wgdata.Add(1) wch <- d } wgdata.Wait() close(wch) close(sch) wgchan.Wait() fmt.Println("Test finished.") } func getSession() *gocql.Session { hostList := strings.Split(*hosts, ",") for i, h := range hostList { hostList[i] = strings.TrimSpace(h) } cluster := gocql.NewCluster(hostList...) cluster.Keyspace = "system" cluster.Timeout = 120 * time.Second cluster.Consistency = gocql.LocalOne cluster.ProtoVersion = 4 // default 2 cluster.NumConns = 2 cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) if *auth { cluster.Authenticator = gocql.PasswordAuthenticator{ Username: *user, Password: *pass, } } var session *gocql.Session session, err := cluster.CreateSession() if err != nil { fmt.Println("Create session error:", err.Error()) os.Exit(1) } return session } func prepareTest(session *gocql.Session) { err := session.Query("DROP KEYSPACE IF EXISTS " + *keyspace).Exec() checkError(err) fmt.Println("Drop keyspace wait 5s.") time.Sleep(5 * time.Second) dcs := strings.Split(*replicationFactor, ",") strr := []string{} for _, dc := range dcs { dc_r := strings.Split(dc, ":") if len(dc_r) != 2 { fmt.Println("dc replicationFactor format err", *replicationFactor) os.Exit(1) } strr = append(strr, fmt.Sprintf("'%s':%s", dc_r[0], dc_r[1])) } cql := fmt.Sprintf(`CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class':'NetworkTopologyStrategy', %s}`, *keyspace, strings.Join(strr, ",")) fmt.Printf("keyspace: %s.\n", cql) err = session.Query(cql).Exec() checkError(err) fmt.Println("Create keyspace wait 5s.") time.Sleep(5 * time.Second) err = session.Query(fmt.Sprintf(`DROP TABLE IF EXISTS %s.tobject`, *keyspace)).Exec() checkError(err) table := `CREATE TABLE %s.tobject ( id bigint, day date, %s, time timestamp, PRIMARY key (id, time) ) WITH CLUSTERING ORDER BY (time DESC)` if *isMemory { table += ` AND caching = {'keys':'NONE', 'rows_per_partition':'ALL'}` } cols := make([]string, *tableColumnNums) for i := range cols { cols[i] = fmt.Sprintf("%s%d varchar", *colname, i) } table = fmt.Sprintf(table, *keyspace, strings.Join(cols, ",\n")) err = session.Query(table).Exec() checkError(err) if *hasIndex { err = session.Query(fmt.Sprintf(`DROP INDEX IF EXISTS %s.tobject_index`, *keyspace)).Exec() checkError(err) idxs := make([]string, *tableColumnNums) for i := range idxs { idxs[i] = fmt.Sprintf(`%s%d: {type : "string"}`, *colname, i) } /* 'refresh_seconds': '1', 'ram_buffer_mb': '256', 'max_merge_mb': '20', 'max_cached_mb': '120', 'indexing_threads': '16', 'indexing_queues_size': '200', */ index := `CREATE CUSTOM INDEX tobject_index ON %s.tobject () USING 'com.stratio.cassandra.lucene.Index' WITH OPTIONS = { 'refresh_seconds': '1', 'indexing_threads': '16', 'indexing_queues_size': '200', 'schema' : '{ fields : { id : {type: "integer"}, day : {type: "date", pattern: "yyyy-MM-dd"}, time : {type: "date", pattern: "yyyy-MM-dd HH:mm:ss.SSS"}, %s } }' }` index = fmt.Sprintf(index, *keyspace, strings.Join(idxs, ",\n")) err = session.Query(index).Exec() checkError(err) } } func worker(session *gocql.Session, wch chan *object, sch chan int) { var ( o *object err error ) for o = range wch { if err = session.Query(o.cql, o.values...).Exec(); err != nil { fmt.Println("Insert error:", err.Error()) sch <- 2 } else { sch <- 1 } wgdata.Done() } wgchan.Done() } func dataWorker(base, num int, ch chan *[]*object) { datas := make([]*object, num) lpNums := 3 // Large-partition nums if *isJson { for i := 0; i < num; i++ { o := &object{} cql := fmt.Sprintf("insert into %s.tobject json ? default unset", *keyspace) cols := make([]string, *columnNums) for k := range cols { cols[k] = *colname + strconv.Itoa(k) } m := make(map[string]interface{}) now := time.Now() if *isLargePartion { m["id"] = (base + i) % lpNums } else { m["id"] = base + i } m["day"] = now.Format("2006-01-02") m["time"] = now.UnixNano() / int64(time.Millisecond) for j, c := range cols { if *isLargePartion { m[c] = fmt.Sprintf("value_%d_%d", (base+i)%lpNums, j-3) } else { m[c] = fmt.Sprintf("value_%d_%d", base+i, j-3) } } b, _ := json.Marshal(m) o.values = []interface{}{string(b)} if *ttl != -1 { cql += " using ttl ?" o.values = append(o.values, *ttl) } o.cql = cql datas[i] = o } } else { for i := 0; i < num; i++ { o := &object{} cql := "insert into %s.tobject (id, day, time, %s) values (?, ?, ?, %s)" cols := make([]string, *columnNums) vals := make([]string, *columnNums) for k := range cols { cols[k] = *colname + strconv.Itoa(k) vals[k] = "?" } cql = fmt.Sprintf(cql, *keyspace, strings.Join(cols, ", "), strings.Join(vals, ", ")) values := make([]interface{}, *columnNums+3) if *isLargePartion { values[0] = (base + i) % lpNums } else { values[0] = base + i } now := time.Now() values[1] = now.Format("2006-01-02") values[2] = now.UnixNano() / int64(time.Millisecond) for j := range values { if j > 2 { if *isLargePartion { values[j] = fmt.Sprintf("value_%d_%d", (base+i)%3, j-3) } else { values[j] = fmt.Sprintf("value_%d_%d", base+i, j-3) } } } if *ttl != -1 { cql += " using ttl ?" values = append(values, *ttl) } o.cql = cql o.values = values datas[i] = o } } ch <- &datas } func stat(ch chan int) { var ( success int fail int speed int64 ) start := time.Now().Unix() bar := pb.New(*dataSize).SetWidth(50) bar.ShowCounters = false bar.RefreshRate = 1 * time.Second bar.Callback = func(out string) { if speed == 0 { return } //countdown := getCountdown(int64(*dataSize - success - fail)/speed) fmt.Printf("\r%s, %s", out, fmt.Sprintf("success %d, fail %d, speed %d/s.", success, fail, speed)) } bar.Start() for n := range ch { switch n { case 1: success++ now := time.Now().Unix() if now-start > 0 { speed = int64(success) / (now - start) } bar.Increment() case 2: fail++ bar.Increment() } } bar.Finish() wgchan.Done() } func checkError(err error) { if err != nil { fmt.Println(err) os.Exit(1) } } //func getCountdown(seconds int64) string { // var ( // d, h, m, s int64 // ) // if seconds >= 60 { // m = seconds/60 // s = seconds%60 // if m >= 60 { // h = m/60 // m = m%60 // if h >= 24 { // d = h/24 // h = h%24 // } // } // } // if d != 0 { // return fmt.Sprintf("%dd%dh%dm%ds", d, h, m, s) // } else { // if h != 0 { // return fmt.Sprintf("%dh%dm%ds", h, m, s) // } else { // if m != 0 { // return fmt.Sprintf("%dm%ds", m, s) // } else { // return fmt.Sprintf("%ds", s) // } // } // } //}