| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- package main
- import (
- "flag"
- "fmt"
- "os"
- "runtime/pprof"
- "strconv"
- "strings"
- "sync"
- "time"
- "encoding/json"
- "git.wecise.com/wecise/common/etcd/env"
- "git.wecise.com/wecise/common/lib/gocql/gocql"
- "git.wecise.com/wecise/odb-go/odb"
- "gitee.com/wecisecode/util/logger"
- "github.com/cheggaaa/pb"
- )
- var (
- hosts = flag.String("h", "", "Cluster hosts.")
- replicationFactor = flag.String("r", "dc1:3", "Replication factor, format => dc1:3,dc2:3")
- keyspace = flag.String("k", "matrix", "Test keyspace.")
- columnNums = flag.Int("n", 10, "Column nums.")
- tableColumnNums = flag.Int("tn", 0, "Column nums.")
- hasIndex = flag.Bool("i", false, "Create index.")
- poolsize = flag.Int("p", 100, "Exec pool size.")
- statInterval = flag.Int("s", 1, "Stat interval.")
- dataSize = flag.Int("d", 1000000, "Test data size.")
- colname = flag.String("c", "column", "Column base name.")
- ispprof = flag.Bool("f", false, "Output pprof.")
- debug = flag.Bool("debug", false, "Log debug.")
- //isOrigin = flag.Bool("o", false, "Origin api insert.")
- isMemory = flag.Bool("m", false, "Memory cache mode.")
- class = flag.String("a", "/tobject", "Test class name.")
- beginNumber = flag.Int("b", 0, "Number of begin value.")
- auth = flag.Bool("auth", false, "Use auth.")
- user = flag.String("user", "cassandra", "Username.")
- pass = flag.String("pass", "cassandra", "Password.")
- isLargePartion = flag.Bool("lp", false, "Large-partion data.")
- ttl = flag.Int("t", -1, "Test data ttl.")
- isJson = flag.Bool("j", false, "Json mode.")
- dataWokerPool = 500
- wgchan = new(sync.WaitGroup)
- wgdata = new(sync.WaitGroup)
- )
- type object struct {
- mql string
- values []interface{}
- }
- func main() {
- flag.Parse()
- logger.SetConsole(*debug)
- etcdenv := os.Getenv("ETCDPATH")
- if etcdenv == "" {
- fmt.Println("No ETCDPATH.")
- os.Exit(1)
- }
- if *tableColumnNums == 0 {
- *tableColumnNums = *columnNums
- }
- if *tableColumnNums < *columnNums {
- fmt.Println("tn must be greater than or equal to n")
- }
- session := getSession()
- defer session.Close()
- fmt.Println("Cassandra connected.")
- db := getDB()
- defer func() { _ = db.Close() }()
- fmt.Println("Odbserver connected.")
- prepareTest(session)
- fmt.Println("Prepare the test class data to complete.")
- // start stat
- sch := make(chan int, *poolsize)
- // start worker
- wch := make(chan *object, *poolsize)
- for i := 0; i < *poolsize; i++ {
- wgchan.Add(1)
- go worker(db, wch, sch)
- }
- // create data
- fmt.Println("Creating data...")
- datas := make([]*object, 0)
- remainingNums := *dataSize % dataWokerPool
- perWorkNums := (*dataSize - (*dataSize % dataWokerPool)) / dataWokerPool
- dwkChan := make(chan *[]*object, 1)
- if perWorkNums != 0 {
- for i := 0; i < dataWokerPool; i++ {
- n := i * perWorkNums
- go dataWorker(n+*beginNumber, perWorkNums, dwkChan)
- }
- }
- if remainingNums != 0 {
- var n int
- if perWorkNums != 0 {
- n = dataWokerPool*perWorkNums + 1
- }
- go dataWorker(n+*beginNumber, remainingNums, dwkChan)
- }
- barCount := dataWokerPool
- if remainingNums != 0 {
- barCount += 1
- }
- bar := pb.New(barCount).SetWidth(50)
- bar.ShowCounters = false
- bar.Start()
- for d := range dwkChan {
- datas = append(datas, *d...)
- bar.Increment()
- //time.Sleep(10*time.Millisecond)
- if len(datas) == *dataSize {
- break
- }
- }
- close(dwkChan)
- bar.Finish()
- fmt.Printf("Create %d data to complete.\n", len(datas))
- wgchan.Add(1)
- go stat(sch)
- // pprof
- if *ispprof {
- pf, err := os.Create("odbtest_cpu.pprof")
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
- defer func() {
- _ = pf.Close()
- }()
- _ = pprof.StartCPUProfile(pf)
- defer pprof.StopCPUProfile()
- pfMem, err := os.Create("odbtest_mem.pprof")
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
- defer func() {
- _ = pfMem.Close()
- }()
- _ = pprof.WriteHeapProfile(pfMem)
- //tf, err := os.Create("odbtest.trace")
- //if err != nil {
- // fmt.Println(err)
- // os.Exit(1)
- //}
- //trace.Start(tf)
- //defer trace.Stop()
- }
- fmt.Println("Start to insert data.")
- // send data
- for _, d := range datas {
- wgdata.Add(1)
- wch <- d
- }
- wgdata.Wait()
- close(wch)
- close(sch)
- wgchan.Wait()
- fmt.Println("Test finished.")
- }
- func getDB() odb.Client {
- var (
- hosts []string
- err error
- )
- s := os.Getenv("ODBPATH")
- if s == "" {
- hosts, err = env.GetVars("ODBPATH")
- if err != nil {
- fmt.Println("New db error:", err.Error())
- os.Exit(1)
- }
- } else {
- hosts = strings.Split(s, ",")
- for i := range hosts {
- hosts[i] = strings.TrimSpace(hosts[i])
- }
- }
- root_keyspace := *keyspace
- if idx := strings.Index(root_keyspace, "_"); idx != -1 {
- root_keyspace = root_keyspace[:idx]
- }
- db, err := odb.NewClient(&odb.Config{
- Keyspace: root_keyspace,
- Hosts: hosts,
- PoolSize: 1000,
- Debug: true,
- })
- if err != nil {
- fmt.Println("New db error:", err.Error())
- os.Exit(1)
- }
- //db, err := modb.New(*keyspace)
- //if err != nil {
- // fmt.Println("New db error:", err.Error())
- // os.Exit(1)
- //}
- return db
- }
- func prepareTest(session *gocql.Session) {
- var err error
- /*err := session.Query("DROP KEYSPACE IF EXISTS " + *keyspace).Exec()
- checkError(err)
- fmt.Println("Drop keyspace wait 5s.")
- time.Sleep(5*time.Second)
- dcs := strings.Split(*replicationFactor, ",")
- strr := []string{}
- for _, dc := range dcs {
- dc_r := strings.Split(dc, ":")
- if len(dc_r) != 2 {
- fmt.Println("dc replicationFactor format err", *replicationFactor)
- os.Exit(1)
- }
- strr = append(strr, fmt.Sprintf("'%s':%s", dc_r[0], dc_r[1]) )
- }
- cql := fmt.Sprintf(`CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class':'NetworkTopologyStrategy', %s}`, *keyspace, strings.Join(strr,",") )
- fmt.Printf("keyspace: %s.\n", cql)
- err = session.Query(cql).Exec()
- checkError(err)
- fmt.Println("Create keyspace wait 5s.")
- time.Sleep(5*time.Second)
- */
- err = session.Query(fmt.Sprintf(`DROP TABLE IF EXISTS %s.tobject`, *keyspace)).Exec()
- checkError(err)
- table := `CREATE TABLE %s.tobject (
- id bigint,
- day date,
- %s,
- time timestamp,
- PRIMARY key (id, time)
- ) WITH CLUSTERING ORDER BY (time DESC)`
- if *isMemory {
- table += ` AND caching = {'keys':'NONE', 'rows_per_partition':'ALL'}`
- }
- cols := make([]string, *tableColumnNums)
- for i := range cols {
- cols[i] = fmt.Sprintf("%s%d varchar", *colname, i)
- }
- table = fmt.Sprintf(table, *keyspace, strings.Join(cols, ",\n"))
- err = session.Query(table).Exec()
- checkError(err)
- if *hasIndex {
- err = session.Query(fmt.Sprintf(`DROP INDEX IF EXISTS %s.tobject_index`, *keyspace)).Exec()
- checkError(err)
- idxs := make([]string, *tableColumnNums)
- for i := range idxs {
- idxs[i] = fmt.Sprintf(`%s%d: {type : "string"}`, *colname, i)
- }
- /*
- 'refresh_seconds': '1',
- 'ram_buffer_mb': '256',
- 'max_merge_mb': '20',
- 'max_cached_mb': '120',
- 'indexing_threads': '16',
- 'indexing_queues_size': '200',
- */
- index := `CREATE CUSTOM INDEX tobject_index ON %s.tobject ()
- USING 'com.stratio.cassandra.lucene.Index'
- WITH OPTIONS = {
- 'refresh_seconds': '1',
- 'indexing_threads': '16',
- 'indexing_queues_size': '200',
- 'schema' : '{
- fields : {
- id : {type: "integer"},
- day : {type: "date", pattern: "yyyy-MM-dd"},
- time : {type: "date", pattern: "yyyy-MM-dd HH:mm:ss.SSS"},
- %s
- }
- }'
- }`
- index = fmt.Sprintf(index, *keyspace, strings.Join(idxs, ",\n"))
- //fmt.Println(index)
- err = session.Query(index).Exec()
- checkError(err)
- }
- }
- func worker(db odb.Client, wch chan *object, sch chan int) {
- var (
- o *object
- err error
- )
- for o = range wch {
- if _, err = db.Query(o.mql, o.values...).Do(); err != nil {
- fmt.Println("Exec error:", err.Error())
- sch <- 2
- } else {
- sch <- 1
- }
- wgdata.Done()
- }
- wgchan.Done()
- }
- func dataWorker(base, num int, ch chan *[]*object) {
- datas := make([]*object, num)
- lpNums := 3 // Large-partition nums
- if *isJson {
- for i := 0; i < num; i++ {
- o := &object{}
- cql := "insert into /native/tobject json ? default unset"
- cols := make([]string, *columnNums)
- for k := range cols {
- cols[k] = *colname + strconv.Itoa(k)
- }
- m := make(map[string]interface{})
- now := time.Now()
- if *isLargePartion {
- m["id"] = (base + i) % lpNums
- } else {
- m["id"] = base + i
- }
- m["day"] = now.Format("2006-01-02")
- m["time"] = now.UnixNano() / int64(time.Millisecond)
- for j, c := range cols {
- if *isLargePartion {
- m[c] = fmt.Sprintf("value_%d_%d", (base+i)%lpNums, j-3)
- } else {
- m[c] = fmt.Sprintf("value_%d_%d", base+i, j-3)
- }
- }
- b, _ := json.Marshal(m)
- o.values = []interface{}{string(b)}
- if *ttl != -1 {
- cql += " using ttl ?"
- o.values = append(o.values, *ttl)
- }
- o.mql = cql
- //fmt.Println(o.mql)
- //fmt.Println(o.values)
- datas[i] = o
- }
- } else {
- for i := 0; i < num; i++ {
- o := &object{}
- cql := "insert into /native/tobject (id, day, time, %s) values (?, ?, ?, %s)"
- cols := make([]string, *columnNums)
- vals := make([]string, *columnNums)
- for k := range cols {
- cols[k] = *colname + strconv.Itoa(k)
- vals[k] = "?"
- }
- cql = fmt.Sprintf(cql, strings.Join(cols, ", "), strings.Join(vals, ", "))
- values := make([]interface{}, *columnNums+3)
- if *isLargePartion {
- values[0] = (base + i) % lpNums
- } else {
- values[0] = base + i
- }
- now := time.Now()
- values[1] = now.Format("2006-01-02")
- values[2] = now.UnixNano() / int64(time.Millisecond)
- for j := range values {
- if j > 2 {
- if *isLargePartion {
- values[j] = fmt.Sprintf("value_%d_%d", (base+i)%3, j-3)
- } else {
- values[j] = fmt.Sprintf("value_%d_%d", base+i, j-3)
- }
- }
- }
- if *ttl != -1 {
- cql += " using ttl ?"
- values = append(values, *ttl)
- }
- o.mql = cql
- o.values = values
- datas[i] = o
- }
- }
- ch <- &datas
- }
- func stat(ch chan int) {
- ticker := time.NewTicker(time.Duration(*statInterval) * time.Second)
- var (
- success int
- fail int
- )
- start := time.Now().Unix()
- L:
- for {
- select {
- case <-ticker.C:
- now := time.Now().Unix()
- speed := int64(success) / (now - start)
- if speed == 0 {
- continue
- }
- //countdown := getCountdown(int64(*dataSize - success - fail)/speed)
- fmt.Printf("\rStat info: success %d, fail %d, speed %d/s.", success, fail, speed)
- case n := <-ch:
- switch n {
- case 1:
- success++
- case 2:
- fail++
- default:
- now := time.Now().Unix()
- speed := int64(success) / (now - start)
- //countdown := getCountdown(int64(*dataSize - success - fail)/speed)
- fmt.Printf("\rStat info: success %d, fail %d, speed %d/s.", success, fail, speed)
- break L
- }
- }
- }
- fmt.Println()
- wgchan.Done()
- }
- func checkError(err error) {
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
- }
- func getSession() *gocql.Session {
- hostList := strings.Split(*hosts, ",")
- for i, h := range hostList {
- hostList[i] = strings.TrimSpace(h)
- }
- cluster := gocql.NewCluster(hostList...)
- cluster.Keyspace = "system"
- cluster.Timeout = 120 * time.Second
- cluster.Consistency = gocql.LocalOne
- cluster.ProtoVersion = 4 // default 2
- cluster.NumConns = 2
- cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
- if *auth {
- cluster.Authenticator = gocql.PasswordAuthenticator{
- Username: *user,
- Password: *pass,
- }
- }
- var session *gocql.Session
- session, err := cluster.CreateSession()
- if err != nil {
- fmt.Println("Create session error:", err.Error())
- os.Exit(1)
- }
- return session
- }
- //func getCountdown(seconds int64) string {
- // var (
- // d, h, m, s int64
- // )
- // if seconds >= 60 {
- // m = seconds/60
- // s = seconds%60
- // if m >= 60 {
- // h = m/60
- // m = m%60
- // if h >= 24 {
- // d = h/24
- // h = h%24
- // }
- // }
- // }
- // if d != 0 {
- // return fmt.Sprintf("%dd%dh%dm%ds", d, h, m, s)
- // } else {
- // if h != 0 {
- // return fmt.Sprintf("%dh%dm%ds", h, m, s)
- // } else {
- // if m != 0 {
- // return fmt.Sprintf("%dm%ds", m, s)
- // } else {
- // return fmt.Sprintf("%ds", s)
- // }
- // }
- // }
- //}
|