cassinsert.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. package main
  2. import (
  3. "encoding/json"
  4. "flag"
  5. "fmt"
  6. "os"
  7. "runtime/pprof"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "git.wecise.com/wecise/common/lib/gocql/gocql"
  13. "github.com/cheggaaa/pb"
  14. )
  15. var (
  16. hosts = flag.String("h", "", "Cluster hosts.")
  17. replicationFactor = flag.String("r", "dc1:3", "Replication factor, format => dc1:3,dc2:3")
  18. columnNums = flag.Int("n", 10, "Column nums.")
  19. tableColumnNums = flag.Int("tn", 0, "Column nums.")
  20. hasIndex = flag.Bool("i", false, "Create index.")
  21. poolsize = flag.Int("p", 100, "Exec pool size.")
  22. //statInterval = flag.Int("s", 10, "Stat interval.")
  23. dataSize = flag.Int("d", 1000000, "Test data size.")
  24. colname = flag.String("c", "column", "Column base name.")
  25. ispprof = flag.Bool("f", false, "Output pprof.")
  26. isJson = flag.Bool("j", false, "Json mode.")
  27. isMemory = flag.Bool("m", false, "Memory cache mode.")
  28. keyspace = flag.String("k", "test", "Test keyspace.")
  29. ttl = flag.Int("t", -1, "Test data ttl.")
  30. notPrepare = flag.Bool("np", false, "Not prepare.")
  31. isLargePartion = flag.Bool("lp", false, "Large-partion data.")
  32. auth = flag.Bool("auth", false, "Use auth.")
  33. user = flag.String("user", "cassandra", "Username.")
  34. pass = flag.String("pass", "cassandra", "Password.")
  35. dataWokerPool = 500
  36. wgchan = new(sync.WaitGroup)
  37. wgdata = new(sync.WaitGroup)
  38. )
  39. type object struct {
  40. cql string
  41. values []interface{}
  42. }
  43. func main() {
  44. flag.Parse()
  45. if *hosts == "" {
  46. fmt.Println("No hosts.")
  47. os.Exit(1)
  48. }
  49. if *tableColumnNums == 0 {
  50. *tableColumnNums = *columnNums
  51. }
  52. if *tableColumnNums < *columnNums {
  53. fmt.Println("tn must be greater than or equal to n")
  54. }
  55. session := getSession()
  56. defer session.Close()
  57. fmt.Println("Cassandra connected.")
  58. if *notPrepare {
  59. fmt.Println("Not Prepare.")
  60. } else {
  61. prepareTest(session)
  62. fmt.Println("Prepare the test table space to complete.")
  63. }
  64. // start stat
  65. sch := make(chan int, *poolsize)
  66. // start worker
  67. wch := make(chan *object, *poolsize)
  68. for i := 0; i < *poolsize; i++ {
  69. wgchan.Add(1)
  70. go worker(session, wch, sch)
  71. }
  72. // create data
  73. fmt.Println("Creating data...")
  74. datas := make([]*object, 0)
  75. remainingNums := *dataSize % dataWokerPool
  76. perWorkNums := (*dataSize - (*dataSize % dataWokerPool)) / dataWokerPool
  77. dwkChan := make(chan *[]*object, 1)
  78. if perWorkNums != 0 {
  79. for i := 0; i < dataWokerPool; i++ {
  80. n := i * perWorkNums
  81. go dataWorker(n, perWorkNums, dwkChan)
  82. }
  83. }
  84. if remainingNums != 0 {
  85. var n int
  86. if perWorkNums != 0 {
  87. n = dataWokerPool*perWorkNums + 1
  88. }
  89. go dataWorker(n, remainingNums, dwkChan)
  90. }
  91. barCount := dataWokerPool
  92. if remainingNums != 0 {
  93. barCount += 1
  94. }
  95. bar := pb.New(barCount).SetWidth(50)
  96. bar.ShowCounters = false
  97. bar.Start()
  98. for d := range dwkChan {
  99. datas = append(datas, *d...)
  100. bar.Increment()
  101. //time.Sleep(10*time.Millisecond)
  102. if len(datas) == *dataSize {
  103. break
  104. }
  105. }
  106. close(dwkChan)
  107. bar.Finish()
  108. //datas := make([]*object, *dataSize)
  109. //for i := 0; i < *dataSize; i++ {
  110. // o := &object{}
  111. // cql := "insert into test.tobject (id, day, time, %s) values (?, ?, ?, %s)"
  112. // cols := make([]string, *columnNums)
  113. // vals := make([]string, *columnNums)
  114. // for k := range cols {
  115. // cols[k] = *colname + strconv.Itoa(k)
  116. // vals[k] = "?"
  117. // }
  118. // cql = fmt.Sprintf(cql, strings.Join(cols, ", "), strings.Join(vals, ", "))
  119. // values := make([]interface{}, *columnNums + 3)
  120. // values[0] = i
  121. // now := time.Now()
  122. // values[1] = now.Format("2006-01-02")
  123. // values[2] = now.UnixNano()/int64(time.Millisecond)
  124. // for j := range values {
  125. // if j > 2 {
  126. // values[j] = fmt.Sprintf("value_%d_%d", i, j - 3)
  127. // }
  128. // }
  129. // o.cql = cql
  130. // o.values = values
  131. // datas[i] = o
  132. //}
  133. fmt.Printf("Create %d data to complete.\n", len(datas))
  134. wgchan.Add(1)
  135. go stat(sch)
  136. // pprof
  137. if *ispprof {
  138. pf, err := os.Create("casstest_cpu.pprof")
  139. if err != nil {
  140. fmt.Println(err)
  141. os.Exit(1)
  142. }
  143. defer func() {
  144. _ = pf.Close()
  145. }()
  146. _ = pprof.StartCPUProfile(pf)
  147. defer pprof.StopCPUProfile()
  148. pfMem, err := os.Create("casstest_mem.pprof")
  149. if err != nil {
  150. fmt.Println(err)
  151. os.Exit(1)
  152. }
  153. defer func() {
  154. _ = pfMem.Close()
  155. }()
  156. _ = pprof.WriteHeapProfile(pfMem)
  157. //tf, err := os.Create("casstest.trace")
  158. //if err != nil {
  159. // fmt.Println(err)
  160. // os.Exit(1)
  161. //}
  162. //trace.Start(tf)
  163. //defer trace.Stop()
  164. }
  165. fmt.Println("Start to insert data.")
  166. // send data
  167. for _, d := range datas {
  168. wgdata.Add(1)
  169. wch <- d
  170. }
  171. wgdata.Wait()
  172. close(wch)
  173. close(sch)
  174. wgchan.Wait()
  175. fmt.Println("Test finished.")
  176. }
  177. func getSession() *gocql.Session {
  178. hostList := strings.Split(*hosts, ",")
  179. for i, h := range hostList {
  180. hostList[i] = strings.TrimSpace(h)
  181. }
  182. cluster := gocql.NewCluster(hostList...)
  183. cluster.Keyspace = "system"
  184. cluster.Timeout = 120 * time.Second
  185. cluster.Consistency = gocql.LocalOne
  186. cluster.ProtoVersion = 4 // default 2
  187. cluster.NumConns = 2
  188. cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
  189. if *auth {
  190. cluster.Authenticator = gocql.PasswordAuthenticator{
  191. Username: *user,
  192. Password: *pass,
  193. }
  194. }
  195. var session *gocql.Session
  196. session, err := cluster.CreateSession()
  197. if err != nil {
  198. fmt.Println("Create session error:", err.Error())
  199. os.Exit(1)
  200. }
  201. return session
  202. }
  203. func prepareTest(session *gocql.Session) {
  204. err := session.Query("DROP KEYSPACE IF EXISTS " + *keyspace).Exec()
  205. checkError(err)
  206. fmt.Println("Drop keyspace wait 5s.")
  207. time.Sleep(5 * time.Second)
  208. dcs := strings.Split(*replicationFactor, ",")
  209. strr := []string{}
  210. for _, dc := range dcs {
  211. dc_r := strings.Split(dc, ":")
  212. if len(dc_r) != 2 {
  213. fmt.Println("dc replicationFactor format err", *replicationFactor)
  214. os.Exit(1)
  215. }
  216. strr = append(strr, fmt.Sprintf("'%s':%s", dc_r[0], dc_r[1]))
  217. }
  218. cql := fmt.Sprintf(`CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class':'NetworkTopologyStrategy', %s}`, *keyspace, strings.Join(strr, ","))
  219. fmt.Printf("keyspace: %s.\n", cql)
  220. err = session.Query(cql).Exec()
  221. checkError(err)
  222. fmt.Println("Create keyspace wait 5s.")
  223. time.Sleep(5 * time.Second)
  224. err = session.Query(fmt.Sprintf(`DROP TABLE IF EXISTS %s.tobject`, *keyspace)).Exec()
  225. checkError(err)
  226. table := `CREATE TABLE %s.tobject (
  227. id bigint,
  228. day date,
  229. %s,
  230. time timestamp,
  231. PRIMARY key (id, time)
  232. ) WITH CLUSTERING ORDER BY (time DESC)`
  233. if *isMemory {
  234. table += ` AND caching = {'keys':'NONE', 'rows_per_partition':'ALL'}`
  235. }
  236. cols := make([]string, *tableColumnNums)
  237. for i := range cols {
  238. cols[i] = fmt.Sprintf("%s%d varchar", *colname, i)
  239. }
  240. table = fmt.Sprintf(table, *keyspace, strings.Join(cols, ",\n"))
  241. err = session.Query(table).Exec()
  242. checkError(err)
  243. if *hasIndex {
  244. err = session.Query(fmt.Sprintf(`DROP INDEX IF EXISTS %s.tobject_index`, *keyspace)).Exec()
  245. checkError(err)
  246. idxs := make([]string, *tableColumnNums)
  247. for i := range idxs {
  248. idxs[i] = fmt.Sprintf(`%s%d: {type : "string"}`, *colname, i)
  249. }
  250. /*
  251. 'refresh_seconds': '1',
  252. 'ram_buffer_mb': '256',
  253. 'max_merge_mb': '20',
  254. 'max_cached_mb': '120',
  255. 'indexing_threads': '16',
  256. 'indexing_queues_size': '200',
  257. */
  258. index := `CREATE CUSTOM INDEX tobject_index ON %s.tobject ()
  259. USING 'com.stratio.cassandra.lucene.Index'
  260. WITH OPTIONS = {
  261. 'refresh_seconds': '1',
  262. 'indexing_threads': '16',
  263. 'indexing_queues_size': '200',
  264. 'schema' : '{
  265. fields : {
  266. id : {type: "integer"},
  267. day : {type: "date", pattern: "yyyy-MM-dd"},
  268. time : {type: "date", pattern: "yyyy-MM-dd HH:mm:ss.SSS"},
  269. %s
  270. }
  271. }'
  272. }`
  273. index = fmt.Sprintf(index, *keyspace, strings.Join(idxs, ",\n"))
  274. err = session.Query(index).Exec()
  275. checkError(err)
  276. }
  277. }
  278. func worker(session *gocql.Session, wch chan *object, sch chan int) {
  279. var (
  280. o *object
  281. err error
  282. )
  283. for o = range wch {
  284. if err = session.Query(o.cql, o.values...).Exec(); err != nil {
  285. fmt.Println("Insert error:", err.Error())
  286. sch <- 2
  287. } else {
  288. sch <- 1
  289. }
  290. wgdata.Done()
  291. }
  292. wgchan.Done()
  293. }
  294. func dataWorker(base, num int, ch chan *[]*object) {
  295. datas := make([]*object, num)
  296. lpNums := 3 // Large-partition nums
  297. if *isJson {
  298. for i := 0; i < num; i++ {
  299. o := &object{}
  300. cql := fmt.Sprintf("insert into %s.tobject json ? default unset", *keyspace)
  301. cols := make([]string, *columnNums)
  302. for k := range cols {
  303. cols[k] = *colname + strconv.Itoa(k)
  304. }
  305. m := make(map[string]interface{})
  306. now := time.Now()
  307. if *isLargePartion {
  308. m["id"] = (base + i) % lpNums
  309. } else {
  310. m["id"] = base + i
  311. }
  312. m["day"] = now.Format("2006-01-02")
  313. m["time"] = now.UnixNano() / int64(time.Millisecond)
  314. for j, c := range cols {
  315. if *isLargePartion {
  316. m[c] = fmt.Sprintf("value_%d_%d", (base+i)%lpNums, j-3)
  317. } else {
  318. m[c] = fmt.Sprintf("value_%d_%d", base+i, j-3)
  319. }
  320. }
  321. b, _ := json.Marshal(m)
  322. o.values = []interface{}{string(b)}
  323. if *ttl != -1 {
  324. cql += " using ttl ?"
  325. o.values = append(o.values, *ttl)
  326. }
  327. o.cql = cql
  328. datas[i] = o
  329. }
  330. } else {
  331. for i := 0; i < num; i++ {
  332. o := &object{}
  333. cql := "insert into %s.tobject (id, day, time, %s) values (?, ?, ?, %s)"
  334. cols := make([]string, *columnNums)
  335. vals := make([]string, *columnNums)
  336. for k := range cols {
  337. cols[k] = *colname + strconv.Itoa(k)
  338. vals[k] = "?"
  339. }
  340. cql = fmt.Sprintf(cql, *keyspace, strings.Join(cols, ", "), strings.Join(vals, ", "))
  341. values := make([]interface{}, *columnNums+3)
  342. if *isLargePartion {
  343. values[0] = (base + i) % lpNums
  344. } else {
  345. values[0] = base + i
  346. }
  347. now := time.Now()
  348. values[1] = now.Format("2006-01-02")
  349. values[2] = now.UnixNano() / int64(time.Millisecond)
  350. for j := range values {
  351. if j > 2 {
  352. if *isLargePartion {
  353. values[j] = fmt.Sprintf("value_%d_%d", (base+i)%3, j-3)
  354. } else {
  355. values[j] = fmt.Sprintf("value_%d_%d", base+i, j-3)
  356. }
  357. }
  358. }
  359. if *ttl != -1 {
  360. cql += " using ttl ?"
  361. values = append(values, *ttl)
  362. }
  363. o.cql = cql
  364. o.values = values
  365. datas[i] = o
  366. }
  367. }
  368. ch <- &datas
  369. }
  370. func stat(ch chan int) {
  371. var (
  372. success int
  373. fail int
  374. speed int64
  375. )
  376. start := time.Now().Unix()
  377. bar := pb.New(*dataSize).SetWidth(50)
  378. bar.ShowCounters = false
  379. bar.RefreshRate = 1 * time.Second
  380. bar.Callback = func(out string) {
  381. if speed == 0 {
  382. return
  383. }
  384. //countdown := getCountdown(int64(*dataSize - success - fail)/speed)
  385. fmt.Printf("\r%s, %s", out, fmt.Sprintf("success %d, fail %d, speed %d/s.", success, fail, speed))
  386. }
  387. bar.Start()
  388. for n := range ch {
  389. switch n {
  390. case 1:
  391. success++
  392. now := time.Now().Unix()
  393. if now-start > 0 {
  394. speed = int64(success) / (now - start)
  395. }
  396. bar.Increment()
  397. case 2:
  398. fail++
  399. bar.Increment()
  400. }
  401. }
  402. bar.Finish()
  403. wgchan.Done()
  404. }
  405. func checkError(err error) {
  406. if err != nil {
  407. fmt.Println(err)
  408. os.Exit(1)
  409. }
  410. }
  411. //func getCountdown(seconds int64) string {
  412. // var (
  413. // d, h, m, s int64
  414. // )
  415. // if seconds >= 60 {
  416. // m = seconds/60
  417. // s = seconds%60
  418. // if m >= 60 {
  419. // h = m/60
  420. // m = m%60
  421. // if h >= 24 {
  422. // d = h/24
  423. // h = h%24
  424. // }
  425. // }
  426. // }
  427. // if d != 0 {
  428. // return fmt.Sprintf("%dd%dh%dm%ds", d, h, m, s)
  429. // } else {
  430. // if h != 0 {
  431. // return fmt.Sprintf("%dh%dm%ds", h, m, s)
  432. // } else {
  433. // if m != 0 {
  434. // return fmt.Sprintf("%dm%ds", m, s)
  435. // } else {
  436. // return fmt.Sprintf("%ds", s)
  437. // }
  438. // }
  439. // }
  440. //}