package main import ( "flag" "fmt" "os" "runtime" "runtime/pprof" "strconv" "strings" "sync" "time" "git.wecise.com/wecise/common/etcd/env" "git.wecise.com/wecise/odb-go/odb" "gitee.com/wecisecode/util/logger" "github.com/cheggaaa/pb" ) var ( keyspace = flag.String("k", "matrix", "Test keyspace.") columnNums = flag.Int("n", 10, "Column nums.") hasIndex = flag.Bool("i", false, "Create index.") poolsize = flag.Int("p", 100, "Exec pool size.") statInterval = flag.Int("s", 1, "Stat interval.") dataSize = flag.Int("d", 1000000, "Test data size.") colname = flag.String("c", "column", "Column base name.") ispprof = flag.Bool("f", false, "Output pprof.") debug = flag.Bool("debug", false, "Log debug.") //isOrigin = flag.Bool("o", false, "Origin api insert.") class = flag.String("a", "/test_odbinsert", "Test class name.") beginNumber = flag.Int("b", 0, "Number of begin value.") dataWokerPool = 500 wgchan = new(sync.WaitGroup) wgdata = new(sync.WaitGroup) ) type object struct { mql string values []interface{} } func main() { flag.Parse() logger.SetConsole(*debug) etcdenv := os.Getenv("ETCDPATH") if etcdenv == "" { fmt.Println("No ETCDPATH.") os.Exit(1) } db := getDB() defer func() { _ = db.Close() }() fmt.Println("Odbserver connected.") prepareTest(db) fmt.Println("Prepare the test class data to complete.") // start stat sch := make(chan int, *poolsize) // start worker wch := make(chan *object, *poolsize) for i := 0; i < *poolsize; i++ { wgchan.Add(1) go worker(db, wch, sch) } // create data fmt.Println("Creating data...") datas := make([]*object, 0) remainingNums := *dataSize % dataWokerPool perWorkNums := (*dataSize - (*dataSize % dataWokerPool)) / dataWokerPool dwkChan := make(chan *[]*object, 1) if perWorkNums != 0 { for i := 0; i < dataWokerPool; i++ { n := i * perWorkNums go dataWorker(n+*beginNumber, perWorkNums, dwkChan) } } if remainingNums != 0 { var n int if perWorkNums != 0 { n = dataWokerPool*perWorkNums + 1 } go dataWorker(n+*beginNumber, remainingNums, dwkChan) } barCount := dataWokerPool if remainingNums != 0 { barCount += 1 } bar := pb.New(barCount).SetWidth(50) bar.ShowCounters = false bar.Start() for d := range dwkChan { datas = append(datas, *d...) bar.Increment() //time.Sleep(10*time.Millisecond) if len(datas) == *dataSize { break } } close(dwkChan) bar.Finish() fmt.Printf("Create %d data to complete.\n", len(datas)) wgchan.Add(1) go stat(sch) // pprof if *ispprof { pf, err := os.Create("odbtest_cpu.pprof") if err != nil { fmt.Println(err) os.Exit(1) } defer func() { _ = pf.Close() }() _ = pprof.StartCPUProfile(pf) defer pprof.StopCPUProfile() pfMem, err := os.Create("odbtest_mem.pprof") if err != nil { fmt.Println(err) os.Exit(1) } defer func() { _ = pfMem.Close() }() _ = pprof.WriteHeapProfile(pfMem) //tf, err := os.Create("odbtest.trace") //if err != nil { // fmt.Println(err) // os.Exit(1) //} //trace.Start(tf) //defer trace.Stop() } fmt.Println("Start to insert data.") // send data for _, d := range datas { wgdata.Add(1) wch <- d } wgdata.Wait() close(wch) close(sch) wgchan.Wait() fmt.Println("Test finished.") } func getDB() odb.Client { var ( hosts []string err error ) s := os.Getenv("ODBPATH") if s == "" { hosts, err = env.GetVars("ODBPATH") if err != nil { fmt.Println("New db error:", err.Error()) os.Exit(1) } } else { hosts = strings.Split(s, ",") for i := range hosts { hosts[i] = strings.TrimSpace(hosts[i]) } } fmt.Println("connect to odbserver ", strings.Join(hosts, ",")) db, err := odb.NewClient(&odb.Config{ Keyspace: *keyspace, Hosts: hosts, PoolSize: 1000, Debug: true, }) if err != nil { fmt.Println("New db error:", err.Error()) os.Exit(1) } //db, err := modb.New(*keyspace) //if err != nil { // fmt.Println("New db error:", err.Error()) // os.Exit(1) //} return db } func prepareTest(db odb.Client) { var err error db.Query(`truncate namespace 'odbinsert'`).Do() table := `create class if not exists %s ( %s, keys(%s0)%s )` cols := make([]string, *columnNums) var inames []string for i := range cols { cols[i] = fmt.Sprintf("%s%d varchar", *colname, i) inames = append(inames, fmt.Sprintf("%s%d", *colname, i)) } var idx string if *hasIndex { idx = fmt.Sprintf(",\nindexes(%s)", strings.Join(inames, ",")) } table = fmt.Sprintf(table, *class, strings.Join(cols, ",\n"), *colname, idx) + " with namespace='odbinsert', version = false" fmt.Println(table) _, err = db.Query(table).Do() checkError(err) } func worker(db odb.Client, wch chan *object, sch chan int) { var ( o *object err error ) for o = range wch { if _, err = db.Query(o.mql, o.values...).Do(); err != nil { fmt.Println("Exec error:", err.Error()) sch <- 2 } else { sch <- 1 } wgdata.Done() } wgchan.Done() } func dataWorker(base, num int, ch chan *[]*object) { //datas := make([]*object, num) //for i := 0; i < num; i++ { // o := &object{} // cql := "insert into %s %s" // cols := make([]string, *columnNums) // values := make([]interface{}, *columnNums) // for j := range cols { // cols[j] = *colname + strconv.Itoa(j) + "=?" // values[j] = fmt.Sprintf("value_%d_%d", base + i, j) // } // cql = fmt.Sprintf(cql, *class, strings.Join(cols, ", ")) // o.mql = cql // o.values = values // datas[i] = o //} //ch <- &datas datas := make([]*object, num) for i := 0; i < num; i++ { o := &object{} mql := "insert into %s (%s) values (%s)" cols := make([]string, *columnNums) qms := make([]string, *columnNums) values := make([]interface{}, *columnNums) for j := range cols { cols[j] = *colname + strconv.Itoa(j) qms[j] = "?" values[j] = fmt.Sprintf("value_%d_%d", base+i, j) } mql = fmt.Sprintf(mql, *class, strings.Join(cols, ", "), strings.Join(qms, ", ")) o.mql = mql o.values = values datas[i] = o } ch <- &datas } func stat(ch chan int) { ticker := time.NewTicker(time.Duration(*statInterval) * time.Second) var ( success int fail int ) start := time.Now().Unix() L: for { select { case <-ticker.C: now := time.Now().Unix() speed := int64(success) / (now - start) if speed == 0 { continue } //countdown := getCountdown(int64(*dataSize - success - fail)/speed) fmt.Printf("\rStat info: success %d, fail %d, speed %d/s, routines %d.", success, fail, speed, runtime.NumGoroutine()) case n := <-ch: switch n { case 1: success++ case 2: fail++ default: now := time.Now().Unix() speed := int64(success) / (now - start) //countdown := getCountdown(int64(*dataSize - success - fail)/speed) fmt.Printf("\rStat info: success %d, fail %d, speed %d/s.", success, fail, speed) break L } } } fmt.Println() wgchan.Done() } func checkError(err error) { if err != nil { fmt.Println(err) os.Exit(1) } } //func getCountdown(seconds int64) string { // var ( // d, h, m, s int64 // ) // if seconds >= 60 { // m = seconds/60 // s = seconds%60 // if m >= 60 { // h = m/60 // m = m%60 // if h >= 24 { // d = h/24 // h = h%24 // } // } // } // if d != 0 { // return fmt.Sprintf("%dd%dh%dm%ds", d, h, m, s) // } else { // if h != 0 { // return fmt.Sprintf("%dh%dm%ds", h, m, s) // } else { // if m != 0 { // return fmt.Sprintf("%dm%ds", m, s) // } else { // return fmt.Sprintf("%ds", s) // } // } // } //}