package main import ( "encoding/json" "flag" "fmt" "os" "runtime/pprof" "strconv" "sync" "sync/atomic" "time" "git.wecise.com/wecise/common/mnats" "git.wecise.com/wecise/odbserver/odb" ) var ( isCreateObject = flag.Bool("o", false, "Create object data.") isSendParser = flag.Bool("p", false, "Send mql to cassandra service of parser.") ds = flag.Int("d", 100000, "Data size.") ws = flag.Int("w", 40, "Worker size.") firstErr string ) func main() { flag.Parse() g, err := odb.New(&odb.Option{Cache: odb.CacheAll}) if err != nil { fmt.Println("New odb error:", err) os.Exit(1) } dataSize := *ds workerSize := *ws if *isCreateObject { fmt.Println("Create bucket object.") if err = createObject(dataSize, g); err != nil { fmt.Println("Create object error:", err) os.Exit(2) } } else { fmt.Println("Ignore creating data.") } fmt.Println("Prepare data size:", dataSize) data := prepareMQL(dataSize) var wg sync.WaitGroup wg.Add(dataSize) ch := make(chan []interface{}, workerSize) statCh := make(chan bool, workerSize) stopCh := make(chan bool, 1) conn, err := mnats.Get() if err != nil { fmt.Println(err) os.Exit(5) } // stats go func() { var ( total, success, fail int64 speed float64 ) ticker := time.NewTicker(time.Second) start := time.Now() L: for { select { case <-stopCh: break L case ok := <-statCh: total++ if ok { success++ } else { fail++ } case <-ticker.C: speed = float64(total) / time.Now().Sub(start).Seconds() fmt.Printf("\rtotal:%d, success:%d, fail:%d, spped:%.2f/s", total, success, fail, speed) } } }() // cpu pprof pf, err := os.Create("testbucket_cpu.pprof") if err != nil { fmt.Println(err) os.Exit(3) } defer pf.Close() if err = pprof.StartCPUProfile(pf); err != nil { fmt.Println(err) os.Exit(3) } defer pprof.StopCPUProfile() // mem pprof go func() { ticker := time.NewTicker(time.Second * 5) L: for { select { case <-stopCh: break L case <-ticker.C: func() { pmf, err := os.OpenFile("testbucket_mem.pprof", os.O_CREATE|os.O_TRUNC|os.O_RDWR, os.ModePerm) if err != nil { fmt.Println(err) os.Exit(4) } defer pmf.Close() if err = pprof.WriteHeapProfile(pmf); err != nil { fmt.Println(err) os.Exit(4) } }() } } }() // mql worker for i := 0; i < workerSize; i++ { go func() { ps, err := g.Prepare(`insert into /test/bucket2 (id, values) values (?, ?) at ?`) if err != nil { fmt.Println(err) os.Exit(5) } m := map[string]interface{}{ "mql": `insert into /test/bucket2 (id, values) values (?, ?) at ?`, } for d := range ch { if *isSendParser { m["values"] = d b, _ := json.Marshal(m) if err = conn.Publish("queue.cassandra.query.matrix", b); err != nil { statCh <- false } else { statCh <- true } } else { if _, _, err = ps.Exec(d...); err != nil { statCh <- false } else { statCh <- true } } if err != nil && firstErr == "" { firstErr = err.Error() } wg.Done() } }() } // mql sender for i := range data { ch <- data[i] } wg.Wait() time.Sleep(time.Second) close(stopCh) fmt.Println() if firstErr != "" { fmt.Println("Error info:", firstErr) } } func createObject(num int, g *odb.Gutil) error { ps, err := g.Prepare("insert into /test/bucket2 (id, devicename, deviceoid, ip, perfstructure, desc) values (?, ?, ?, ?, ?, ?)") if err != nil { return err } defer fmt.Println() var wg sync.WaitGroup ch := make(chan []interface{}, 20) var count int64 go func() { for count <= int64(num) { fmt.Printf("\r%d/%d", count, num) time.Sleep(time.Millisecond * 500) } }() for i := 0; i < 20; i++ { go func() { for v := range ch { if _, _, err = ps.Exec(v...); err != nil { if firstErr == "" { firstErr = err.Error() } } else { atomic.AddInt64(&count, 1) } wg.Done() } }() } wg.Add(num) for i := 0; i < num; i++ { no := strconv.FormatInt(int64(i+1), 10) ch <- []interface{}{"ID" + no, "DN" + no, "OID" + no, "10.0.0.1", "PERFSTRUCTURE" + no, "DESC" + no} } wg.Wait() close(ch) time.Sleep(time.Second) return nil } func prepareMQL(num int) [][]interface{} { t := time.Now().Format("2006-01-02 15:04:05") data := make([][]interface{}, num) paramsSize := 20 params := make([]interface{}, paramsSize) for i := 0; i < paramsSize; i++ { params[i] = i + 1 } for i := 0; i < num; i++ { no := strconv.FormatInt(int64(i+1), 10) d := []interface{}{"ID" + no, params, t} data[i] = d } return data } /* create class if not exists /test/bucket1 ( host varchar, real map, bk1 bucket { "type" : "tsdb", "size" : 10 } 'simple', bk2 bucket { "type" : "tsdb", "param" : ["iparam1","iparam2","fparam1","fparam2","sparam1"], "ptype" : ["i","i","f","f","s"], "unit" : ["%","m","ms","ns",""], "precision":[0,0,3,4,0], "ttl" : 365 } 'full', keys(host), indexes(host) ); create class if not exists /test/bucket2 ( devicename varchar '设备名称', deviceoid varchar 'OID', ip varchar 'ip', perfstructure varchar '性能结构描述', desc varchar '设备描述', values bucket { "type": "tsdb", "params": [ ["${1-5}","int","","${desc[0-4]}"], ["${6-9,11-12,15-16,19-20}","int","MB","${desc[5-8,10-11,14-15,18-19]}"], ["10","int","%","${desc[9]}"], ["${13-14,17-18}","int","KB/s","${desc[12-13,16-17]}"] ], "desc": ["在线点","及早期火警","及早期预警","DO1","DO2","系统分区总容量","系统分区剩余量","数据分区总容量","数据分区剩余量","CUP使用率","内存总容量","内存剩余量","网卡1上行速度","网卡1下行速度","网卡1上行总量","网卡1下行总量","网卡2上行速度","网卡2下行速度","网卡2上行总量","网卡2下行总量"], "ttl": 365 } 'full', keys(devicename,deviceoid,ip), indexes(devicename,deviceoid,ip,perfstructure,desc) ); */