odbinsert.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "os"
  6. "runtime"
  7. "runtime/pprof"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "git.wecise.com/wecise/common/etcd/env"
  13. "git.wecise.com/wecise/odb-go/odb"
  14. "gitee.com/wecisecode/util/logger"
  15. "github.com/cheggaaa/pb"
  16. )
  17. var (
  18. keyspace = flag.String("k", "matrix", "Test keyspace.")
  19. columnNums = flag.Int("n", 10, "Column nums.")
  20. hasIndex = flag.Bool("i", false, "Create index.")
  21. poolsize = flag.Int("p", 100, "Exec pool size.")
  22. statInterval = flag.Int("s", 1, "Stat interval.")
  23. dataSize = flag.Int("d", 1000000, "Test data size.")
  24. colname = flag.String("c", "column", "Column base name.")
  25. ispprof = flag.Bool("f", false, "Output pprof.")
  26. debug = flag.Bool("debug", false, "Log debug.")
  27. //isOrigin = flag.Bool("o", false, "Origin api insert.")
  28. class = flag.String("a", "/test_odbinsert", "Test class name.")
  29. beginNumber = flag.Int("b", 0, "Number of begin value.")
  30. dataWokerPool = 500
  31. wgchan = new(sync.WaitGroup)
  32. wgdata = new(sync.WaitGroup)
  33. )
  34. type object struct {
  35. mql string
  36. values []interface{}
  37. }
  38. func main() {
  39. flag.Parse()
  40. logger.SetConsole(*debug)
  41. etcdenv := os.Getenv("ETCDPATH")
  42. if etcdenv == "" {
  43. fmt.Println("No ETCDPATH.")
  44. os.Exit(1)
  45. }
  46. db := getDB()
  47. defer func() { _ = db.Close() }()
  48. fmt.Println("Odbserver connected.")
  49. prepareTest(db)
  50. fmt.Println("Prepare the test class data to complete.")
  51. // start stat
  52. sch := make(chan int, *poolsize)
  53. // start worker
  54. wch := make(chan *object, *poolsize)
  55. for i := 0; i < *poolsize; i++ {
  56. wgchan.Add(1)
  57. go worker(db, wch, sch)
  58. }
  59. // create data
  60. fmt.Println("Creating data...")
  61. datas := make([]*object, 0)
  62. remainingNums := *dataSize % dataWokerPool
  63. perWorkNums := (*dataSize - (*dataSize % dataWokerPool)) / dataWokerPool
  64. dwkChan := make(chan *[]*object, 1)
  65. if perWorkNums != 0 {
  66. for i := 0; i < dataWokerPool; i++ {
  67. n := i * perWorkNums
  68. go dataWorker(n+*beginNumber, perWorkNums, dwkChan)
  69. }
  70. }
  71. if remainingNums != 0 {
  72. var n int
  73. if perWorkNums != 0 {
  74. n = dataWokerPool*perWorkNums + 1
  75. }
  76. go dataWorker(n+*beginNumber, remainingNums, dwkChan)
  77. }
  78. barCount := dataWokerPool
  79. if remainingNums != 0 {
  80. barCount += 1
  81. }
  82. bar := pb.New(barCount).SetWidth(50)
  83. bar.ShowCounters = false
  84. bar.Start()
  85. for d := range dwkChan {
  86. datas = append(datas, *d...)
  87. bar.Increment()
  88. //time.Sleep(10*time.Millisecond)
  89. if len(datas) == *dataSize {
  90. break
  91. }
  92. }
  93. close(dwkChan)
  94. bar.Finish()
  95. fmt.Printf("Create %d data to complete.\n", len(datas))
  96. wgchan.Add(1)
  97. go stat(sch)
  98. // pprof
  99. if *ispprof {
  100. pf, err := os.Create("odbtest_cpu.pprof")
  101. if err != nil {
  102. fmt.Println(err)
  103. os.Exit(1)
  104. }
  105. defer func() {
  106. _ = pf.Close()
  107. }()
  108. _ = pprof.StartCPUProfile(pf)
  109. defer pprof.StopCPUProfile()
  110. pfMem, err := os.Create("odbtest_mem.pprof")
  111. if err != nil {
  112. fmt.Println(err)
  113. os.Exit(1)
  114. }
  115. defer func() {
  116. _ = pfMem.Close()
  117. }()
  118. _ = pprof.WriteHeapProfile(pfMem)
  119. //tf, err := os.Create("odbtest.trace")
  120. //if err != nil {
  121. // fmt.Println(err)
  122. // os.Exit(1)
  123. //}
  124. //trace.Start(tf)
  125. //defer trace.Stop()
  126. }
  127. fmt.Println("Start to insert data.")
  128. // send data
  129. for _, d := range datas {
  130. wgdata.Add(1)
  131. wch <- d
  132. }
  133. wgdata.Wait()
  134. close(wch)
  135. close(sch)
  136. wgchan.Wait()
  137. fmt.Println("Test finished.")
  138. }
  139. func getDB() odb.Client {
  140. var (
  141. hosts []string
  142. err error
  143. )
  144. s := os.Getenv("ODBPATH")
  145. if s == "" {
  146. hosts, err = env.GetVars("ODBPATH")
  147. if err != nil {
  148. fmt.Println("New db error:", err.Error())
  149. os.Exit(1)
  150. }
  151. } else {
  152. hosts = strings.Split(s, ",")
  153. for i := range hosts {
  154. hosts[i] = strings.TrimSpace(hosts[i])
  155. }
  156. }
  157. fmt.Println("connect to odbserver ", strings.Join(hosts, ","))
  158. db, err := odb.NewClient(&odb.Config{
  159. Keyspace: *keyspace,
  160. Hosts: hosts,
  161. PoolSize: 1000,
  162. Debug: true,
  163. })
  164. if err != nil {
  165. fmt.Println("New db error:", err.Error())
  166. os.Exit(1)
  167. }
  168. //db, err := modb.New(*keyspace)
  169. //if err != nil {
  170. // fmt.Println("New db error:", err.Error())
  171. // os.Exit(1)
  172. //}
  173. return db
  174. }
  175. func prepareTest(db odb.Client) {
  176. var err error
  177. db.Query(`truncate namespace 'odbinsert'`).Do()
  178. table := `create class if not exists %s (
  179. %s,
  180. keys(%s0)%s
  181. )`
  182. cols := make([]string, *columnNums)
  183. var inames []string
  184. for i := range cols {
  185. cols[i] = fmt.Sprintf("%s%d varchar", *colname, i)
  186. inames = append(inames, fmt.Sprintf("%s%d", *colname, i))
  187. }
  188. var idx string
  189. if *hasIndex {
  190. idx = fmt.Sprintf(",\nindexes(%s)", strings.Join(inames, ","))
  191. }
  192. table = fmt.Sprintf(table, *class, strings.Join(cols, ",\n"), *colname, idx) + " with namespace='odbinsert', version = false"
  193. fmt.Println(table)
  194. _, err = db.Query(table).Do()
  195. checkError(err)
  196. }
  197. func worker(db odb.Client, wch chan *object, sch chan int) {
  198. var (
  199. o *object
  200. err error
  201. )
  202. for o = range wch {
  203. if _, err = db.Query(o.mql, o.values...).Do(); err != nil {
  204. fmt.Println("Exec error:", err.Error())
  205. sch <- 2
  206. } else {
  207. sch <- 1
  208. }
  209. wgdata.Done()
  210. }
  211. wgchan.Done()
  212. }
  213. func dataWorker(base, num int, ch chan *[]*object) {
  214. //datas := make([]*object, num)
  215. //for i := 0; i < num; i++ {
  216. // o := &object{}
  217. // cql := "insert into %s %s"
  218. // cols := make([]string, *columnNums)
  219. // values := make([]interface{}, *columnNums)
  220. // for j := range cols {
  221. // cols[j] = *colname + strconv.Itoa(j) + "=?"
  222. // values[j] = fmt.Sprintf("value_%d_%d", base + i, j)
  223. // }
  224. // cql = fmt.Sprintf(cql, *class, strings.Join(cols, ", "))
  225. // o.mql = cql
  226. // o.values = values
  227. // datas[i] = o
  228. //}
  229. //ch <- &datas
  230. datas := make([]*object, num)
  231. for i := 0; i < num; i++ {
  232. o := &object{}
  233. mql := "insert into %s (%s) values (%s)"
  234. cols := make([]string, *columnNums)
  235. qms := make([]string, *columnNums)
  236. values := make([]interface{}, *columnNums)
  237. for j := range cols {
  238. cols[j] = *colname + strconv.Itoa(j)
  239. qms[j] = "?"
  240. values[j] = fmt.Sprintf("value_%d_%d", base+i, j)
  241. }
  242. mql = fmt.Sprintf(mql, *class, strings.Join(cols, ", "), strings.Join(qms, ", "))
  243. o.mql = mql
  244. o.values = values
  245. datas[i] = o
  246. }
  247. ch <- &datas
  248. }
  249. func stat(ch chan int) {
  250. ticker := time.NewTicker(time.Duration(*statInterval) * time.Second)
  251. var (
  252. success int
  253. fail int
  254. )
  255. start := time.Now().Unix()
  256. L:
  257. for {
  258. select {
  259. case <-ticker.C:
  260. now := time.Now().Unix()
  261. speed := int64(success) / (now - start)
  262. if speed == 0 {
  263. continue
  264. }
  265. //countdown := getCountdown(int64(*dataSize - success - fail)/speed)
  266. fmt.Printf("\rStat info: success %d, fail %d, speed %d/s, routines %d.", success, fail, speed, runtime.NumGoroutine())
  267. case n := <-ch:
  268. switch n {
  269. case 1:
  270. success++
  271. case 2:
  272. fail++
  273. default:
  274. now := time.Now().Unix()
  275. speed := int64(success) / (now - start)
  276. //countdown := getCountdown(int64(*dataSize - success - fail)/speed)
  277. fmt.Printf("\rStat info: success %d, fail %d, speed %d/s.", success, fail, speed)
  278. break L
  279. }
  280. }
  281. }
  282. fmt.Println()
  283. wgchan.Done()
  284. }
  285. func checkError(err error) {
  286. if err != nil {
  287. fmt.Println(err)
  288. os.Exit(1)
  289. }
  290. }
  291. //func getCountdown(seconds int64) string {
  292. // var (
  293. // d, h, m, s int64
  294. // )
  295. // if seconds >= 60 {
  296. // m = seconds/60
  297. // s = seconds%60
  298. // if m >= 60 {
  299. // h = m/60
  300. // m = m%60
  301. // if h >= 24 {
  302. // d = h/24
  303. // h = h%24
  304. // }
  305. // }
  306. // }
  307. // if d != 0 {
  308. // return fmt.Sprintf("%dd%dh%dm%ds", d, h, m, s)
  309. // } else {
  310. // if h != 0 {
  311. // return fmt.Sprintf("%dh%dm%ds", h, m, s)
  312. // } else {
  313. // if m != 0 {
  314. // return fmt.Sprintf("%dm%ds", m, s)
  315. // } else {
  316. // return fmt.Sprintf("%ds", s)
  317. // }
  318. // }
  319. // }
  320. //}