| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- package main
- import (
- "bufio"
- "encoding/json"
- "flag"
- "fmt"
- "os"
- "runtime/pprof"
- "strconv"
- "sync"
- "time"
- "git.wecise.com/wecise/common/mnats"
- "git.wecise.com/wecise/odbserver/odb"
- "gitee.com/wecisecode/util/logger"
- )
- const (
- logFileName = "testbucketlog.log"
- )
- var (
- isCreateLogObject = flag.Bool("o", false, "Create log object data.")
- isSendLogParser = flag.Bool("p", false, "Send mql to cassandra service of parser.")
- isInsert = flag.Bool("i", false, "Insert mql to odb.")
- dsize = flag.Int("d", 5000, "Data size.")
- bsize = flag.Int("b", 200, "Bucket size.")
- wsize = flag.Int("w", 40, "Worker size.")
- isCreateLog = flag.Bool("c", false, "Create test log file.")
- )
- func main() {
- logger.SetConsole(false)
- flag.Parse()
- g, err := odb.New(&odb.Option{Cache: odb.CacheAll})
- if err != nil {
- fmt.Println("New odb error:", err)
- os.Exit(1)
- }
- if *isCreateLog {
- fmt.Println("Create log file", logFileName)
- if err = createLog(); err != nil {
- fmt.Println("Create log file error:", err)
- os.Exit(1)
- }
- }
- file, err := os.Open(logFileName)
- if err != nil {
- fmt.Printf("Open log file %s error: %v\n", logFileName, err)
- os.Exit(1)
- }
- dataSize := *dsize
- bucketSize := *bsize
- workerSize := *wsize
- if *isCreateLogObject {
- fmt.Println("Create log bucket object.")
- if err = createLogObject(1, g); err != nil {
- fmt.Println("Create object error:", err)
- os.Exit(2)
- }
- } else {
- fmt.Println("Ignore creating object.")
- }
- fmt.Println("Prepare data size:", dataSize)
- data := prepareLogMQL(dataSize, bucketSize, file)
- var wg sync.WaitGroup
- wg.Add(len(data))
- ch := make(chan []interface{}, workerSize)
- statCh := make(chan int, workerSize)
- stopCh := make(chan bool, 1)
- conn, err := mnats.Get()
- if err != nil {
- fmt.Println(err)
- os.Exit(5)
- }
- // stats
- go func() {
- var (
- total, success, fail, bts int64
- speed, bspeed float64
- )
- ticker := time.NewTicker(time.Second)
- start := time.Now()
- L:
- for {
- select {
- case <-stopCh:
- break L
- case n := <-statCh:
- total++
- if n > -1 {
- success++
- if n > 0 {
- bts += int64(n)
- }
- } else {
- fail++
- }
- case <-ticker.C:
- seconds := time.Now().Sub(start).Seconds()
- speed = float64(total) / seconds
- bspeed = float64(bts) / seconds
- fmt.Printf("\rtotal:%d, success:%d, fail:%d, speed:%.2f/s %.2fB/s, total time: %.2fs", total, success, fail, speed, bspeed, time.Now().Sub(start).Seconds())
- }
- }
- fmt.Println()
- }()
- // cpu pprof
- pf, err := os.Create("testbucketlog_cpu.pprof")
- if err != nil {
- fmt.Println(err)
- os.Exit(3)
- }
- defer pf.Close()
- if err = pprof.StartCPUProfile(pf); err != nil {
- fmt.Println(err)
- os.Exit(3)
- }
- defer pprof.StopCPUProfile()
- // mem pprof
- go func() {
- ticker := time.NewTicker(time.Second * 5)
- L:
- for {
- select {
- case <-stopCh:
- break L
- case <-ticker.C:
- func() {
- pmf, err := os.OpenFile("testbucketlog_mem.pprof", os.O_CREATE|os.O_TRUNC|os.O_RDWR, os.ModePerm)
- if err != nil {
- fmt.Println(err)
- os.Exit(4)
- }
- defer pmf.Close()
- if err = pprof.WriteHeapProfile(pmf); err != nil {
- fmt.Println(err)
- os.Exit(4)
- }
- }()
- }
- }
- }()
- // mql worker
- insertMql := `insert into /test_bitlog (id, logs["/opt/matrix/var/test/testbucketlog.log"]) values (?, ?)`
- for i := 0; i < workerSize; i++ {
- go func() {
- ps, err := g.Prepare(insertMql)
- if err != nil {
- fmt.Println(err)
- os.Exit(5)
- }
- m := map[string]interface{}{
- "mql": insertMql,
- }
- for d := range ch {
- if *isSendLogParser {
- m["values"] = d[:len(d)-1]
- b, _ := json.Marshal(m)
- if err = conn.Publish("queue.cassandra.query.matrix", b); err != nil {
- statCh <- -1
- } else {
- statCh <- d[len(d)-1].(int)
- }
- } else {
- if *isInsert {
- if _, _, err := ps.Exec(d[:len(d)-1]...); err != nil {
- statCh <- -1
- } else {
- statCh <- d[len(d)-1].(int)
- }
- } else {
- statCh <- d[len(d)-1].(int)
- }
- }
- wg.Done()
- }
- }()
- }
- // mql sender
- fmt.Println("Send data to worker.")
- for i := range data {
- ch <- data[i]
- }
- wg.Wait()
- time.Sleep(time.Second)
- close(stopCh)
- }
- func createLogObject(num int, g *odb.Gutil) error {
- ps, err := g.Prepare("insert into /test_bitlog (id, host) values (?, ?)")
- if err != nil {
- return err
- }
- defer fmt.Println()
- for i := 0; i < num; i++ {
- no := strconv.FormatInt(int64(i+1), 10)
- if _, _, err = ps.Exec("ID"+no, "HOST"+no); err != nil {
- return err
- }
- fmt.Printf("\r%d/%d", i+1, num)
- }
- return nil
- }
- func prepareLogMQL(num, logSize int, file *os.File) [][]interface{} {
- data := make([][]interface{}, num)
- r := bufio.NewReader(file)
- var lines string
- var linesCount int
- rowNum := 0
- d := []interface{}{"ID1", nil, 0}
- dataCount := 0
- dBytesSize := 0
- for rowNum < num*logSize {
- line, _, err := r.ReadLine()
- if err != nil {
- break
- }
- if lines == "" {
- lines = string(line)
- } else {
- lines += "\n" + string(line)
- }
- linesCount++
- dBytesSize += len(line)
- if linesCount == logSize {
- d[1] = []string{lines}
- d[2] = dBytesSize
- v := make([]interface{}, 3)
- copy(v, d)
- data[dataCount] = v
- lines = ""
- linesCount = 0
- d[2] = 0
- dBytesSize = 0
- dataCount++
- }
- rowNum++
- }
- return data
- }
- func createLog() error {
- file, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
- if err != nil {
- return err
- }
- defer file.Close()
- start := time.Now().Add(-30 * 24 * time.Hour)
- for i := 0; i < (*dsize)*(*bsize); i++ {
- t := start.Add(time.Second)
- _, err = file.WriteString(fmt.Sprintf(`WARN [ReadStage-%d] %s BooleanCondition.java:78 - Performing resource-intensive pure negation query BooleanCondition{boost=null, must=[], should=[], not=[RangeCondition{boost=null, field=map_varchar_varchar_2$group, lower=null, upper=null, includeLower=false, includeUpper=false, docValues=false}]}
- `, i, t.Format("2006-01-02 15:04:05,000")))
- if err != nil {
- return err
- }
- start = t
- }
- return nil
- }
- /*
- create class if not exists /test_bitlog (
- host varchar,
- logs bucket {
- "type" : "bitlog",
- "files" : ["*.log"],
- "pattern" : ["YYYY-MM-DD HH:mm:ss,SSS"],
- "ttl": 90
- } 'simple',
- keys(host),
- indexes(host)
- );
- */
|