| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- package main
- import (
- "flag"
- "fmt"
- "os"
- "runtime"
- "runtime/pprof"
- "strconv"
- "strings"
- "sync"
- "time"
- "git.wecise.com/wecise/common/etcd/env"
- "git.wecise.com/wecise/odb-go/odb"
- "gitee.com/wecisecode/util/logger"
- "github.com/cheggaaa/pb"
- )
- var (
- keyspace = flag.String("k", "matrix", "Test keyspace.")
- columnNums = flag.Int("n", 10, "Column nums.")
- hasIndex = flag.Bool("i", false, "Create index.")
- poolsize = flag.Int("p", 100, "Exec pool size.")
- statInterval = flag.Int("s", 1, "Stat interval.")
- dataSize = flag.Int("d", 1000000, "Test data size.")
- colname = flag.String("c", "column", "Column base name.")
- ispprof = flag.Bool("f", false, "Output pprof.")
- debug = flag.Bool("debug", false, "Log debug.")
- //isOrigin = flag.Bool("o", false, "Origin api insert.")
- class = flag.String("a", "/test_odbinsert", "Test class name.")
- beginNumber = flag.Int("b", 0, "Number of begin value.")
- dataWokerPool = 500
- wgchan = new(sync.WaitGroup)
- wgdata = new(sync.WaitGroup)
- )
- type object struct {
- mql string
- values []interface{}
- }
- func main() {
- flag.Parse()
- logger.SetConsole(*debug)
- etcdenv := os.Getenv("ETCDPATH")
- if etcdenv == "" {
- fmt.Println("No ETCDPATH.")
- os.Exit(1)
- }
- db := getDB()
- defer func() { _ = db.Close() }()
- fmt.Println("Odbserver connected.")
- prepareTest(db)
- fmt.Println("Prepare the test class data to complete.")
- // start stat
- sch := make(chan int, *poolsize)
- // start worker
- wch := make(chan *object, *poolsize)
- for i := 0; i < *poolsize; i++ {
- wgchan.Add(1)
- go worker(db, wch, sch)
- }
- // create data
- fmt.Println("Creating data...")
- datas := make([]*object, 0)
- remainingNums := *dataSize % dataWokerPool
- perWorkNums := (*dataSize - (*dataSize % dataWokerPool)) / dataWokerPool
- dwkChan := make(chan *[]*object, 1)
- if perWorkNums != 0 {
- for i := 0; i < dataWokerPool; i++ {
- n := i * perWorkNums
- go dataWorker(n+*beginNumber, perWorkNums, dwkChan)
- }
- }
- if remainingNums != 0 {
- var n int
- if perWorkNums != 0 {
- n = dataWokerPool*perWorkNums + 1
- }
- go dataWorker(n+*beginNumber, remainingNums, dwkChan)
- }
- barCount := dataWokerPool
- if remainingNums != 0 {
- barCount += 1
- }
- bar := pb.New(barCount).SetWidth(50)
- bar.ShowCounters = false
- bar.Start()
- for d := range dwkChan {
- datas = append(datas, *d...)
- bar.Increment()
- //time.Sleep(10*time.Millisecond)
- if len(datas) == *dataSize {
- break
- }
- }
- close(dwkChan)
- bar.Finish()
- fmt.Printf("Create %d data to complete.\n", len(datas))
- wgchan.Add(1)
- go stat(sch)
- // pprof
- if *ispprof {
- pf, err := os.Create("odbtest_cpu.pprof")
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
- defer func() {
- _ = pf.Close()
- }()
- _ = pprof.StartCPUProfile(pf)
- defer pprof.StopCPUProfile()
- pfMem, err := os.Create("odbtest_mem.pprof")
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
- defer func() {
- _ = pfMem.Close()
- }()
- _ = pprof.WriteHeapProfile(pfMem)
- //tf, err := os.Create("odbtest.trace")
- //if err != nil {
- // fmt.Println(err)
- // os.Exit(1)
- //}
- //trace.Start(tf)
- //defer trace.Stop()
- }
- fmt.Println("Start to insert data.")
- // send data
- for _, d := range datas {
- wgdata.Add(1)
- wch <- d
- }
- wgdata.Wait()
- close(wch)
- close(sch)
- wgchan.Wait()
- fmt.Println("Test finished.")
- }
- func getDB() odb.Client {
- var (
- hosts []string
- err error
- )
- s := os.Getenv("ODBPATH")
- if s == "" {
- hosts, err = env.GetVars("ODBPATH")
- if err != nil {
- fmt.Println("New db error:", err.Error())
- os.Exit(1)
- }
- } else {
- hosts = strings.Split(s, ",")
- for i := range hosts {
- hosts[i] = strings.TrimSpace(hosts[i])
- }
- }
- fmt.Println("connect to odbserver ", strings.Join(hosts, ","))
- db, err := odb.NewClient(&odb.Config{
- Keyspace: *keyspace,
- Hosts: hosts,
- PoolSize: 1000,
- Debug: true,
- })
- if err != nil {
- fmt.Println("New db error:", err.Error())
- os.Exit(1)
- }
- //db, err := modb.New(*keyspace)
- //if err != nil {
- // fmt.Println("New db error:", err.Error())
- // os.Exit(1)
- //}
- return db
- }
- func prepareTest(db odb.Client) {
- var err error
- db.Query(`truncate namespace 'odbinsert'`).Do()
- table := `create class if not exists %s (
- %s,
- keys(%s0)%s
- )`
- cols := make([]string, *columnNums)
- var inames []string
- for i := range cols {
- cols[i] = fmt.Sprintf("%s%d varchar", *colname, i)
- inames = append(inames, fmt.Sprintf("%s%d", *colname, i))
- }
- var idx string
- if *hasIndex {
- idx = fmt.Sprintf(",\nindexes(%s)", strings.Join(inames, ","))
- }
- table = fmt.Sprintf(table, *class, strings.Join(cols, ",\n"), *colname, idx) + " with namespace='odbinsert', version = false"
- fmt.Println(table)
- _, err = db.Query(table).Do()
- checkError(err)
- }
- func worker(db odb.Client, wch chan *object, sch chan int) {
- var (
- o *object
- err error
- )
- for o = range wch {
- if _, err = db.Query(o.mql, o.values...).Do(); err != nil {
- fmt.Println("Exec error:", err.Error())
- sch <- 2
- } else {
- sch <- 1
- }
- wgdata.Done()
- }
- wgchan.Done()
- }
- func dataWorker(base, num int, ch chan *[]*object) {
- //datas := make([]*object, num)
- //for i := 0; i < num; i++ {
- // o := &object{}
- // cql := "insert into %s %s"
- // cols := make([]string, *columnNums)
- // values := make([]interface{}, *columnNums)
- // for j := range cols {
- // cols[j] = *colname + strconv.Itoa(j) + "=?"
- // values[j] = fmt.Sprintf("value_%d_%d", base + i, j)
- // }
- // cql = fmt.Sprintf(cql, *class, strings.Join(cols, ", "))
- // o.mql = cql
- // o.values = values
- // datas[i] = o
- //}
- //ch <- &datas
- datas := make([]*object, num)
- for i := 0; i < num; i++ {
- o := &object{}
- mql := "insert into %s (%s) values (%s)"
- cols := make([]string, *columnNums)
- qms := make([]string, *columnNums)
- values := make([]interface{}, *columnNums)
- for j := range cols {
- cols[j] = *colname + strconv.Itoa(j)
- qms[j] = "?"
- values[j] = fmt.Sprintf("value_%d_%d", base+i, j)
- }
- mql = fmt.Sprintf(mql, *class, strings.Join(cols, ", "), strings.Join(qms, ", "))
- o.mql = mql
- o.values = values
- datas[i] = o
- }
- ch <- &datas
- }
- func stat(ch chan int) {
- ticker := time.NewTicker(time.Duration(*statInterval) * time.Second)
- var (
- success int
- fail int
- )
- start := time.Now().Unix()
- L:
- for {
- select {
- case <-ticker.C:
- now := time.Now().Unix()
- speed := int64(success) / (now - start)
- if speed == 0 {
- continue
- }
- //countdown := getCountdown(int64(*dataSize - success - fail)/speed)
- fmt.Printf("\rStat info: success %d, fail %d, speed %d/s, routines %d.", success, fail, speed, runtime.NumGoroutine())
- case n := <-ch:
- switch n {
- case 1:
- success++
- case 2:
- fail++
- default:
- now := time.Now().Unix()
- speed := int64(success) / (now - start)
- //countdown := getCountdown(int64(*dataSize - success - fail)/speed)
- fmt.Printf("\rStat info: success %d, fail %d, speed %d/s.", success, fail, speed)
- break L
- }
- }
- }
- fmt.Println()
- wgchan.Done()
- }
- func checkError(err error) {
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
- }
- //func getCountdown(seconds int64) string {
- // var (
- // d, h, m, s int64
- // )
- // if seconds >= 60 {
- // m = seconds/60
- // s = seconds%60
- // if m >= 60 {
- // h = m/60
- // m = m%60
- // if h >= 24 {
- // d = h/24
- // h = h%24
- // }
- // }
- // }
- // if d != 0 {
- // return fmt.Sprintf("%dd%dh%dm%ds", d, h, m, s)
- // } else {
- // if h != 0 {
- // return fmt.Sprintf("%dh%dm%ds", h, m, s)
- // } else {
- // if m != 0 {
- // return fmt.Sprintf("%dm%ds", m, s)
- // } else {
- // return fmt.Sprintf("%ds", s)
- // }
- // }
- // }
- //}
|