package main import ( "bufio" "encoding/json" "flag" "fmt" "os" "runtime/pprof" "strconv" "sync" "time" "git.wecise.com/wecise/common/mnats" "git.wecise.com/wecise/odbserver/odb" "gitee.com/wecisecode/util/logger" ) const ( logFileName = "testbucketlog.log" ) var ( isCreateLogObject = flag.Bool("o", false, "Create log object data.") isSendLogParser = flag.Bool("p", false, "Send mql to cassandra service of parser.") isInsert = flag.Bool("i", false, "Insert mql to odb.") dsize = flag.Int("d", 5000, "Data size.") bsize = flag.Int("b", 200, "Bucket size.") wsize = flag.Int("w", 40, "Worker size.") isCreateLog = flag.Bool("c", false, "Create test log file.") ) func main() { logger.SetConsole(false) flag.Parse() g, err := odb.New(&odb.Option{Cache: odb.CacheAll}) if err != nil { fmt.Println("New odb error:", err) os.Exit(1) } if *isCreateLog { fmt.Println("Create log file", logFileName) if err = createLog(); err != nil { fmt.Println("Create log file error:", err) os.Exit(1) } } file, err := os.Open(logFileName) if err != nil { fmt.Printf("Open log file %s error: %v\n", logFileName, err) os.Exit(1) } dataSize := *dsize bucketSize := *bsize workerSize := *wsize if *isCreateLogObject { fmt.Println("Create log bucket object.") if err = createLogObject(1, g); err != nil { fmt.Println("Create object error:", err) os.Exit(2) } } else { fmt.Println("Ignore creating object.") } fmt.Println("Prepare data size:", dataSize) data := prepareLogMQL(dataSize, bucketSize, file) var wg sync.WaitGroup wg.Add(len(data)) ch := make(chan []interface{}, workerSize) statCh := make(chan int, workerSize) stopCh := make(chan bool, 1) conn, err := mnats.Get() if err != nil { fmt.Println(err) os.Exit(5) } // stats go func() { var ( total, success, fail, bts int64 speed, bspeed float64 ) ticker := time.NewTicker(time.Second) start := time.Now() L: for { select { case <-stopCh: break L case n := <-statCh: total++ if n > -1 { success++ if n > 0 { bts += int64(n) } } else { fail++ } case <-ticker.C: seconds := time.Now().Sub(start).Seconds() speed = float64(total) / seconds bspeed = float64(bts) / seconds fmt.Printf("\rtotal:%d, success:%d, fail:%d, speed:%.2f/s %.2fB/s, total time: %.2fs", total, success, fail, speed, bspeed, time.Now().Sub(start).Seconds()) } } fmt.Println() }() // cpu pprof pf, err := os.Create("testbucketlog_cpu.pprof") if err != nil { fmt.Println(err) os.Exit(3) } defer pf.Close() if err = pprof.StartCPUProfile(pf); err != nil { fmt.Println(err) os.Exit(3) } defer pprof.StopCPUProfile() // mem pprof go func() { ticker := time.NewTicker(time.Second * 5) L: for { select { case <-stopCh: break L case <-ticker.C: func() { pmf, err := os.OpenFile("testbucketlog_mem.pprof", os.O_CREATE|os.O_TRUNC|os.O_RDWR, os.ModePerm) if err != nil { fmt.Println(err) os.Exit(4) } defer pmf.Close() if err = pprof.WriteHeapProfile(pmf); err != nil { fmt.Println(err) os.Exit(4) } }() } } }() // mql worker insertMql := `insert into /test_bitlog (id, logs["/opt/matrix/var/test/testbucketlog.log"]) values (?, ?)` for i := 0; i < workerSize; i++ { go func() { ps, err := g.Prepare(insertMql) if err != nil { fmt.Println(err) os.Exit(5) } m := map[string]interface{}{ "mql": insertMql, } for d := range ch { if *isSendLogParser { m["values"] = d[:len(d)-1] b, _ := json.Marshal(m) if err = conn.Publish("queue.cassandra.query.matrix", b); err != nil { statCh <- -1 } else { statCh <- d[len(d)-1].(int) } } else { if *isInsert { if _, _, err := ps.Exec(d[:len(d)-1]...); err != nil { statCh <- -1 } else { statCh <- d[len(d)-1].(int) } } else { statCh <- d[len(d)-1].(int) } } wg.Done() } }() } // mql sender fmt.Println("Send data to worker.") for i := range data { ch <- data[i] } wg.Wait() time.Sleep(time.Second) close(stopCh) } func createLogObject(num int, g *odb.Gutil) error { ps, err := g.Prepare("insert into /test_bitlog (id, host) values (?, ?)") if err != nil { return err } defer fmt.Println() for i := 0; i < num; i++ { no := strconv.FormatInt(int64(i+1), 10) if _, _, err = ps.Exec("ID"+no, "HOST"+no); err != nil { return err } fmt.Printf("\r%d/%d", i+1, num) } return nil } func prepareLogMQL(num, logSize int, file *os.File) [][]interface{} { data := make([][]interface{}, num) r := bufio.NewReader(file) var lines string var linesCount int rowNum := 0 d := []interface{}{"ID1", nil, 0} dataCount := 0 dBytesSize := 0 for rowNum < num*logSize { line, _, err := r.ReadLine() if err != nil { break } if lines == "" { lines = string(line) } else { lines += "\n" + string(line) } linesCount++ dBytesSize += len(line) if linesCount == logSize { d[1] = []string{lines} d[2] = dBytesSize v := make([]interface{}, 3) copy(v, d) data[dataCount] = v lines = "" linesCount = 0 d[2] = 0 dBytesSize = 0 dataCount++ } rowNum++ } return data } func createLog() error { file, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm) if err != nil { return err } defer file.Close() start := time.Now().Add(-30 * 24 * time.Hour) for i := 0; i < (*dsize)*(*bsize); i++ { t := start.Add(time.Second) _, err = file.WriteString(fmt.Sprintf(`WARN [ReadStage-%d] %s BooleanCondition.java:78 - Performing resource-intensive pure negation query BooleanCondition{boost=null, must=[], should=[], not=[RangeCondition{boost=null, field=map_varchar_varchar_2$group, lower=null, upper=null, includeLower=false, includeUpper=false, docValues=false}]} `, i, t.Format("2006-01-02 15:04:05,000"))) if err != nil { return err } start = t } return nil } /* create class if not exists /test_bitlog ( host varchar, logs bucket { "type" : "bitlog", "files" : ["*.log"], "pattern" : ["YYYY-MM-DD HH:mm:ss,SSS"], "ttl": 90 } 'simple', keys(host), indexes(host) ); */