| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- package main
- import (
- "encoding/json"
- "flag"
- "fmt"
- "os"
- "runtime/pprof"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
- "git.wecise.com/wecise/common/mnats"
- "git.wecise.com/wecise/odbserver/odb"
- )
- var (
- isCreateObject = flag.Bool("o", false, "Create object data.")
- isSendParser = flag.Bool("p", false, "Send mql to cassandra service of parser.")
- ds = flag.Int("d", 100000, "Data size.")
- ws = flag.Int("w", 40, "Worker size.")
- firstErr string
- )
- func main() {
- flag.Parse()
- g, err := odb.New(&odb.Option{Cache: odb.CacheAll})
- if err != nil {
- fmt.Println("New odb error:", err)
- os.Exit(1)
- }
- dataSize := *ds
- workerSize := *ws
- if *isCreateObject {
- fmt.Println("Create bucket object.")
- if err = createObject(dataSize, g); err != nil {
- fmt.Println("Create object error:", err)
- os.Exit(2)
- }
- } else {
- fmt.Println("Ignore creating data.")
- }
- fmt.Println("Prepare data size:", dataSize)
- data := prepareMQL(dataSize)
- var wg sync.WaitGroup
- wg.Add(dataSize)
- ch := make(chan []interface{}, workerSize)
- statCh := make(chan bool, workerSize)
- stopCh := make(chan bool, 1)
- conn, err := mnats.Get()
- if err != nil {
- fmt.Println(err)
- os.Exit(5)
- }
- // stats
- go func() {
- var (
- total, success, fail int64
- speed float64
- )
- ticker := time.NewTicker(time.Second)
- start := time.Now()
- L:
- for {
- select {
- case <-stopCh:
- break L
- case ok := <-statCh:
- total++
- if ok {
- success++
- } else {
- fail++
- }
- case <-ticker.C:
- speed = float64(total) / time.Now().Sub(start).Seconds()
- fmt.Printf("\rtotal:%d, success:%d, fail:%d, spped:%.2f/s", total, success, fail, speed)
- }
- }
- }()
- // cpu pprof
- pf, err := os.Create("testbucket_cpu.pprof")
- if err != nil {
- fmt.Println(err)
- os.Exit(3)
- }
- defer pf.Close()
- if err = pprof.StartCPUProfile(pf); err != nil {
- fmt.Println(err)
- os.Exit(3)
- }
- defer pprof.StopCPUProfile()
- // mem pprof
- go func() {
- ticker := time.NewTicker(time.Second * 5)
- L:
- for {
- select {
- case <-stopCh:
- break L
- case <-ticker.C:
- func() {
- pmf, err := os.OpenFile("testbucket_mem.pprof", os.O_CREATE|os.O_TRUNC|os.O_RDWR, os.ModePerm)
- if err != nil {
- fmt.Println(err)
- os.Exit(4)
- }
- defer pmf.Close()
- if err = pprof.WriteHeapProfile(pmf); err != nil {
- fmt.Println(err)
- os.Exit(4)
- }
- }()
- }
- }
- }()
- // mql worker
- for i := 0; i < workerSize; i++ {
- go func() {
- ps, err := g.Prepare(`insert into /test/bucket2 (id, values) values (?, ?) at ?`)
- if err != nil {
- fmt.Println(err)
- os.Exit(5)
- }
- m := map[string]interface{}{
- "mql": `insert into /test/bucket2 (id, values) values (?, ?) at ?`,
- }
- for d := range ch {
- if *isSendParser {
- m["values"] = d
- b, _ := json.Marshal(m)
- if err = conn.Publish("queue.cassandra.query.matrix", b); err != nil {
- statCh <- false
- } else {
- statCh <- true
- }
- } else {
- if _, _, err = ps.Exec(d...); err != nil {
- statCh <- false
- } else {
- statCh <- true
- }
- }
- if err != nil && firstErr == "" {
- firstErr = err.Error()
- }
- wg.Done()
- }
- }()
- }
- // mql sender
- for i := range data {
- ch <- data[i]
- }
- wg.Wait()
- time.Sleep(time.Second)
- close(stopCh)
- fmt.Println()
- if firstErr != "" {
- fmt.Println("Error info:", firstErr)
- }
- }
- func createObject(num int, g *odb.Gutil) error {
- ps, err := g.Prepare("insert into /test/bucket2 (id, devicename, deviceoid, ip, perfstructure, desc) values (?, ?, ?, ?, ?, ?)")
- if err != nil {
- return err
- }
- defer fmt.Println()
- var wg sync.WaitGroup
- ch := make(chan []interface{}, 20)
- var count int64
- go func() {
- for count <= int64(num) {
- fmt.Printf("\r%d/%d", count, num)
- time.Sleep(time.Millisecond * 500)
- }
- }()
- for i := 0; i < 20; i++ {
- go func() {
- for v := range ch {
- if _, _, err = ps.Exec(v...); err != nil {
- if firstErr == "" {
- firstErr = err.Error()
- }
- } else {
- atomic.AddInt64(&count, 1)
- }
- wg.Done()
- }
- }()
- }
- wg.Add(num)
- for i := 0; i < num; i++ {
- no := strconv.FormatInt(int64(i+1), 10)
- ch <- []interface{}{"ID" + no, "DN" + no, "OID" + no, "10.0.0.1", "PERFSTRUCTURE" + no, "DESC" + no}
- }
- wg.Wait()
- close(ch)
- time.Sleep(time.Second)
- return nil
- }
- func prepareMQL(num int) [][]interface{} {
- t := time.Now().Format("2006-01-02 15:04:05")
- data := make([][]interface{}, num)
- paramsSize := 20
- params := make([]interface{}, paramsSize)
- for i := 0; i < paramsSize; i++ {
- params[i] = i + 1
- }
- for i := 0; i < num; i++ {
- no := strconv.FormatInt(int64(i+1), 10)
- d := []interface{}{"ID" + no, params, t}
- data[i] = d
- }
- return data
- }
- /*
- create class if not exists /test/bucket1 (
- host varchar,
- real map<varchar,varchar>,
- bk1 bucket {
- "type" : "tsdb",
- "size" : 10
- } 'simple',
- bk2 bucket {
- "type" : "tsdb",
- "param" : ["iparam1","iparam2","fparam1","fparam2","sparam1"],
- "ptype" : ["i","i","f","f","s"],
- "unit" : ["%","m","ms","ns",""],
- "precision":[0,0,3,4,0],
- "ttl" : 365
- } 'full',
- keys(host),
- indexes(host)
- );
- create class if not exists /test/bucket2 (
- devicename varchar '设备名称',
- deviceoid varchar 'OID',
- ip varchar 'ip',
- perfstructure varchar '性能结构描述',
- desc varchar '设备描述',
- values bucket {
- "type": "tsdb",
- "params": [
- ["${1-5}","int","","${desc[0-4]}"],
- ["${6-9,11-12,15-16,19-20}","int","MB","${desc[5-8,10-11,14-15,18-19]}"],
- ["10","int","%","${desc[9]}"],
- ["${13-14,17-18}","int","KB/s","${desc[12-13,16-17]}"]
- ],
- "desc": ["在线点","及早期火警","及早期预警","DO1","DO2","系统分区总容量","系统分区剩余量","数据分区总容量","数据分区剩余量","CUP使用率","内存总容量","内存剩余量","网卡1上行速度","网卡1下行速度","网卡1上行总量","网卡1下行总量","网卡2上行速度","网卡2下行速度","网卡2上行总量","网卡2下行总量"],
- "ttl": 365
- } 'full',
- keys(devicename,deviceoid,ip),
- indexes(devicename,deviceoid,ip,perfstructure,desc)
- );
- */
|