nativeinsert.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "os"
  6. "runtime/pprof"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "encoding/json"
  12. "git.wecise.com/wecise/common/etcd/env"
  13. "git.wecise.com/wecise/common/lib/gocql/gocql"
  14. "git.wecise.com/wecise/odb-go/odb"
  15. "gitee.com/wecisecode/util/logger"
  16. "github.com/cheggaaa/pb"
  17. )
  18. var (
  19. hosts = flag.String("h", "", "Cluster hosts.")
  20. replicationFactor = flag.String("r", "dc1:3", "Replication factor, format => dc1:3,dc2:3")
  21. keyspace = flag.String("k", "matrix", "Test keyspace.")
  22. columnNums = flag.Int("n", 10, "Column nums.")
  23. tableColumnNums = flag.Int("tn", 0, "Column nums.")
  24. hasIndex = flag.Bool("i", false, "Create index.")
  25. poolsize = flag.Int("p", 100, "Exec pool size.")
  26. statInterval = flag.Int("s", 1, "Stat interval.")
  27. dataSize = flag.Int("d", 1000000, "Test data size.")
  28. colname = flag.String("c", "column", "Column base name.")
  29. ispprof = flag.Bool("f", false, "Output pprof.")
  30. debug = flag.Bool("debug", false, "Log debug.")
  31. //isOrigin = flag.Bool("o", false, "Origin api insert.")
  32. isMemory = flag.Bool("m", false, "Memory cache mode.")
  33. class = flag.String("a", "/tobject", "Test class name.")
  34. beginNumber = flag.Int("b", 0, "Number of begin value.")
  35. auth = flag.Bool("auth", false, "Use auth.")
  36. user = flag.String("user", "cassandra", "Username.")
  37. pass = flag.String("pass", "cassandra", "Password.")
  38. isLargePartion = flag.Bool("lp", false, "Large-partion data.")
  39. ttl = flag.Int("t", -1, "Test data ttl.")
  40. isJson = flag.Bool("j", false, "Json mode.")
  41. dataWokerPool = 500
  42. wgchan = new(sync.WaitGroup)
  43. wgdata = new(sync.WaitGroup)
  44. )
  45. type object struct {
  46. mql string
  47. values []interface{}
  48. }
  49. func main() {
  50. flag.Parse()
  51. logger.SetConsole(*debug)
  52. etcdenv := os.Getenv("ETCDPATH")
  53. if etcdenv == "" {
  54. fmt.Println("No ETCDPATH.")
  55. os.Exit(1)
  56. }
  57. if *tableColumnNums == 0 {
  58. *tableColumnNums = *columnNums
  59. }
  60. if *tableColumnNums < *columnNums {
  61. fmt.Println("tn must be greater than or equal to n")
  62. }
  63. session := getSession()
  64. defer session.Close()
  65. fmt.Println("Cassandra connected.")
  66. db := getDB()
  67. defer func() { _ = db.Close() }()
  68. fmt.Println("Odbserver connected.")
  69. prepareTest(session)
  70. fmt.Println("Prepare the test class data to complete.")
  71. // start stat
  72. sch := make(chan int, *poolsize)
  73. // start worker
  74. wch := make(chan *object, *poolsize)
  75. for i := 0; i < *poolsize; i++ {
  76. wgchan.Add(1)
  77. go worker(db, wch, sch)
  78. }
  79. // create data
  80. fmt.Println("Creating data...")
  81. datas := make([]*object, 0)
  82. remainingNums := *dataSize % dataWokerPool
  83. perWorkNums := (*dataSize - (*dataSize % dataWokerPool)) / dataWokerPool
  84. dwkChan := make(chan *[]*object, 1)
  85. if perWorkNums != 0 {
  86. for i := 0; i < dataWokerPool; i++ {
  87. n := i * perWorkNums
  88. go dataWorker(n+*beginNumber, perWorkNums, dwkChan)
  89. }
  90. }
  91. if remainingNums != 0 {
  92. var n int
  93. if perWorkNums != 0 {
  94. n = dataWokerPool*perWorkNums + 1
  95. }
  96. go dataWorker(n+*beginNumber, remainingNums, dwkChan)
  97. }
  98. barCount := dataWokerPool
  99. if remainingNums != 0 {
  100. barCount += 1
  101. }
  102. bar := pb.New(barCount).SetWidth(50)
  103. bar.ShowCounters = false
  104. bar.Start()
  105. for d := range dwkChan {
  106. datas = append(datas, *d...)
  107. bar.Increment()
  108. //time.Sleep(10*time.Millisecond)
  109. if len(datas) == *dataSize {
  110. break
  111. }
  112. }
  113. close(dwkChan)
  114. bar.Finish()
  115. fmt.Printf("Create %d data to complete.\n", len(datas))
  116. wgchan.Add(1)
  117. go stat(sch)
  118. // pprof
  119. if *ispprof {
  120. pf, err := os.Create("odbtest_cpu.pprof")
  121. if err != nil {
  122. fmt.Println(err)
  123. os.Exit(1)
  124. }
  125. defer func() {
  126. _ = pf.Close()
  127. }()
  128. _ = pprof.StartCPUProfile(pf)
  129. defer pprof.StopCPUProfile()
  130. pfMem, err := os.Create("odbtest_mem.pprof")
  131. if err != nil {
  132. fmt.Println(err)
  133. os.Exit(1)
  134. }
  135. defer func() {
  136. _ = pfMem.Close()
  137. }()
  138. _ = pprof.WriteHeapProfile(pfMem)
  139. //tf, err := os.Create("odbtest.trace")
  140. //if err != nil {
  141. // fmt.Println(err)
  142. // os.Exit(1)
  143. //}
  144. //trace.Start(tf)
  145. //defer trace.Stop()
  146. }
  147. fmt.Println("Start to insert data.")
  148. // send data
  149. for _, d := range datas {
  150. wgdata.Add(1)
  151. wch <- d
  152. }
  153. wgdata.Wait()
  154. close(wch)
  155. close(sch)
  156. wgchan.Wait()
  157. fmt.Println("Test finished.")
  158. }
  159. func getDB() odb.Client {
  160. var (
  161. hosts []string
  162. err error
  163. )
  164. s := os.Getenv("ODBPATH")
  165. if s == "" {
  166. hosts, err = env.GetVars("ODBPATH")
  167. if err != nil {
  168. fmt.Println("New db error:", err.Error())
  169. os.Exit(1)
  170. }
  171. } else {
  172. hosts = strings.Split(s, ",")
  173. for i := range hosts {
  174. hosts[i] = strings.TrimSpace(hosts[i])
  175. }
  176. }
  177. root_keyspace := *keyspace
  178. if idx := strings.Index(root_keyspace, "_"); idx != -1 {
  179. root_keyspace = root_keyspace[:idx]
  180. }
  181. db, err := odb.NewClient(&odb.Config{
  182. Keyspace: root_keyspace,
  183. Hosts: hosts,
  184. PoolSize: 1000,
  185. Debug: true,
  186. })
  187. if err != nil {
  188. fmt.Println("New db error:", err.Error())
  189. os.Exit(1)
  190. }
  191. //db, err := modb.New(*keyspace)
  192. //if err != nil {
  193. // fmt.Println("New db error:", err.Error())
  194. // os.Exit(1)
  195. //}
  196. return db
  197. }
  198. func prepareTest(session *gocql.Session) {
  199. var err error
  200. /*err := session.Query("DROP KEYSPACE IF EXISTS " + *keyspace).Exec()
  201. checkError(err)
  202. fmt.Println("Drop keyspace wait 5s.")
  203. time.Sleep(5*time.Second)
  204. dcs := strings.Split(*replicationFactor, ",")
  205. strr := []string{}
  206. for _, dc := range dcs {
  207. dc_r := strings.Split(dc, ":")
  208. if len(dc_r) != 2 {
  209. fmt.Println("dc replicationFactor format err", *replicationFactor)
  210. os.Exit(1)
  211. }
  212. strr = append(strr, fmt.Sprintf("'%s':%s", dc_r[0], dc_r[1]) )
  213. }
  214. cql := fmt.Sprintf(`CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class':'NetworkTopologyStrategy', %s}`, *keyspace, strings.Join(strr,",") )
  215. fmt.Printf("keyspace: %s.\n", cql)
  216. err = session.Query(cql).Exec()
  217. checkError(err)
  218. fmt.Println("Create keyspace wait 5s.")
  219. time.Sleep(5*time.Second)
  220. */
  221. err = session.Query(fmt.Sprintf(`DROP TABLE IF EXISTS %s.tobject`, *keyspace)).Exec()
  222. checkError(err)
  223. table := `CREATE TABLE %s.tobject (
  224. id bigint,
  225. day date,
  226. %s,
  227. time timestamp,
  228. PRIMARY key (id, time)
  229. ) WITH CLUSTERING ORDER BY (time DESC)`
  230. if *isMemory {
  231. table += ` AND caching = {'keys':'NONE', 'rows_per_partition':'ALL'}`
  232. }
  233. cols := make([]string, *tableColumnNums)
  234. for i := range cols {
  235. cols[i] = fmt.Sprintf("%s%d varchar", *colname, i)
  236. }
  237. table = fmt.Sprintf(table, *keyspace, strings.Join(cols, ",\n"))
  238. err = session.Query(table).Exec()
  239. checkError(err)
  240. if *hasIndex {
  241. err = session.Query(fmt.Sprintf(`DROP INDEX IF EXISTS %s.tobject_index`, *keyspace)).Exec()
  242. checkError(err)
  243. idxs := make([]string, *tableColumnNums)
  244. for i := range idxs {
  245. idxs[i] = fmt.Sprintf(`%s%d: {type : "string"}`, *colname, i)
  246. }
  247. /*
  248. 'refresh_seconds': '1',
  249. 'ram_buffer_mb': '256',
  250. 'max_merge_mb': '20',
  251. 'max_cached_mb': '120',
  252. 'indexing_threads': '16',
  253. 'indexing_queues_size': '200',
  254. */
  255. index := `CREATE CUSTOM INDEX tobject_index ON %s.tobject ()
  256. USING 'com.stratio.cassandra.lucene.Index'
  257. WITH OPTIONS = {
  258. 'refresh_seconds': '1',
  259. 'indexing_threads': '16',
  260. 'indexing_queues_size': '200',
  261. 'schema' : '{
  262. fields : {
  263. id : {type: "integer"},
  264. day : {type: "date", pattern: "yyyy-MM-dd"},
  265. time : {type: "date", pattern: "yyyy-MM-dd HH:mm:ss.SSS"},
  266. %s
  267. }
  268. }'
  269. }`
  270. index = fmt.Sprintf(index, *keyspace, strings.Join(idxs, ",\n"))
  271. //fmt.Println(index)
  272. err = session.Query(index).Exec()
  273. checkError(err)
  274. }
  275. }
  276. func worker(db odb.Client, wch chan *object, sch chan int) {
  277. var (
  278. o *object
  279. err error
  280. )
  281. for o = range wch {
  282. if _, err = db.Query(o.mql, o.values...).Do(); err != nil {
  283. fmt.Println("Exec error:", err.Error())
  284. sch <- 2
  285. } else {
  286. sch <- 1
  287. }
  288. wgdata.Done()
  289. }
  290. wgchan.Done()
  291. }
  292. func dataWorker(base, num int, ch chan *[]*object) {
  293. datas := make([]*object, num)
  294. lpNums := 3 // Large-partition nums
  295. if *isJson {
  296. for i := 0; i < num; i++ {
  297. o := &object{}
  298. cql := "insert into /native/tobject json ? default unset"
  299. cols := make([]string, *columnNums)
  300. for k := range cols {
  301. cols[k] = *colname + strconv.Itoa(k)
  302. }
  303. m := make(map[string]interface{})
  304. now := time.Now()
  305. if *isLargePartion {
  306. m["id"] = (base + i) % lpNums
  307. } else {
  308. m["id"] = base + i
  309. }
  310. m["day"] = now.Format("2006-01-02")
  311. m["time"] = now.UnixNano() / int64(time.Millisecond)
  312. for j, c := range cols {
  313. if *isLargePartion {
  314. m[c] = fmt.Sprintf("value_%d_%d", (base+i)%lpNums, j-3)
  315. } else {
  316. m[c] = fmt.Sprintf("value_%d_%d", base+i, j-3)
  317. }
  318. }
  319. b, _ := json.Marshal(m)
  320. o.values = []interface{}{string(b)}
  321. if *ttl != -1 {
  322. cql += " using ttl ?"
  323. o.values = append(o.values, *ttl)
  324. }
  325. o.mql = cql
  326. //fmt.Println(o.mql)
  327. //fmt.Println(o.values)
  328. datas[i] = o
  329. }
  330. } else {
  331. for i := 0; i < num; i++ {
  332. o := &object{}
  333. cql := "insert into /native/tobject (id, day, time, %s) values (?, ?, ?, %s)"
  334. cols := make([]string, *columnNums)
  335. vals := make([]string, *columnNums)
  336. for k := range cols {
  337. cols[k] = *colname + strconv.Itoa(k)
  338. vals[k] = "?"
  339. }
  340. cql = fmt.Sprintf(cql, strings.Join(cols, ", "), strings.Join(vals, ", "))
  341. values := make([]interface{}, *columnNums+3)
  342. if *isLargePartion {
  343. values[0] = (base + i) % lpNums
  344. } else {
  345. values[0] = base + i
  346. }
  347. now := time.Now()
  348. values[1] = now.Format("2006-01-02")
  349. values[2] = now.UnixNano() / int64(time.Millisecond)
  350. for j := range values {
  351. if j > 2 {
  352. if *isLargePartion {
  353. values[j] = fmt.Sprintf("value_%d_%d", (base+i)%3, j-3)
  354. } else {
  355. values[j] = fmt.Sprintf("value_%d_%d", base+i, j-3)
  356. }
  357. }
  358. }
  359. if *ttl != -1 {
  360. cql += " using ttl ?"
  361. values = append(values, *ttl)
  362. }
  363. o.mql = cql
  364. o.values = values
  365. datas[i] = o
  366. }
  367. }
  368. ch <- &datas
  369. }
  370. func stat(ch chan int) {
  371. ticker := time.NewTicker(time.Duration(*statInterval) * time.Second)
  372. var (
  373. success int
  374. fail int
  375. )
  376. start := time.Now().Unix()
  377. L:
  378. for {
  379. select {
  380. case <-ticker.C:
  381. now := time.Now().Unix()
  382. speed := int64(success) / (now - start)
  383. if speed == 0 {
  384. continue
  385. }
  386. //countdown := getCountdown(int64(*dataSize - success - fail)/speed)
  387. fmt.Printf("\rStat info: success %d, fail %d, speed %d/s.", success, fail, speed)
  388. case n := <-ch:
  389. switch n {
  390. case 1:
  391. success++
  392. case 2:
  393. fail++
  394. default:
  395. now := time.Now().Unix()
  396. speed := int64(success) / (now - start)
  397. //countdown := getCountdown(int64(*dataSize - success - fail)/speed)
  398. fmt.Printf("\rStat info: success %d, fail %d, speed %d/s.", success, fail, speed)
  399. break L
  400. }
  401. }
  402. }
  403. fmt.Println()
  404. wgchan.Done()
  405. }
  406. func checkError(err error) {
  407. if err != nil {
  408. fmt.Println(err)
  409. os.Exit(1)
  410. }
  411. }
  412. func getSession() *gocql.Session {
  413. hostList := strings.Split(*hosts, ",")
  414. for i, h := range hostList {
  415. hostList[i] = strings.TrimSpace(h)
  416. }
  417. cluster := gocql.NewCluster(hostList...)
  418. cluster.Keyspace = "system"
  419. cluster.Timeout = 120 * time.Second
  420. cluster.Consistency = gocql.LocalOne
  421. cluster.ProtoVersion = 4 // default 2
  422. cluster.NumConns = 2
  423. cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
  424. if *auth {
  425. cluster.Authenticator = gocql.PasswordAuthenticator{
  426. Username: *user,
  427. Password: *pass,
  428. }
  429. }
  430. var session *gocql.Session
  431. session, err := cluster.CreateSession()
  432. if err != nil {
  433. fmt.Println("Create session error:", err.Error())
  434. os.Exit(1)
  435. }
  436. return session
  437. }
  438. //func getCountdown(seconds int64) string {
  439. // var (
  440. // d, h, m, s int64
  441. // )
  442. // if seconds >= 60 {
  443. // m = seconds/60
  444. // s = seconds%60
  445. // if m >= 60 {
  446. // h = m/60
  447. // m = m%60
  448. // if h >= 24 {
  449. // d = h/24
  450. // h = h%24
  451. // }
  452. // }
  453. // }
  454. // if d != 0 {
  455. // return fmt.Sprintf("%dd%dh%dm%ds", d, h, m, s)
  456. // } else {
  457. // if h != 0 {
  458. // return fmt.Sprintf("%dh%dm%ds", h, m, s)
  459. // } else {
  460. // if m != 0 {
  461. // return fmt.Sprintf("%dm%ds", m, s)
  462. // } else {
  463. // return fmt.Sprintf("%ds", s)
  464. // }
  465. // }
  466. // }
  467. //}