|
@@ -1,20 +1,14 @@
|
|
|
package odbcmql
|
|
package odbcmql
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
- "bufio"
|
|
|
|
|
- "context"
|
|
|
|
|
- "encoding/json"
|
|
|
|
|
"fmt"
|
|
"fmt"
|
|
|
"log"
|
|
"log"
|
|
|
- "math/rand"
|
|
|
|
|
"os"
|
|
"os"
|
|
|
"path/filepath"
|
|
"path/filepath"
|
|
|
"regexp"
|
|
"regexp"
|
|
|
"runtime"
|
|
"runtime"
|
|
|
- "strconv"
|
|
|
|
|
"strings"
|
|
"strings"
|
|
|
"sync"
|
|
"sync"
|
|
|
- "sync/atomic"
|
|
|
|
|
"testing"
|
|
"testing"
|
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
@@ -24,9 +18,7 @@ import (
|
|
|
mcfg "gitee.com/wecisecode/util/cfg"
|
|
mcfg "gitee.com/wecisecode/util/cfg"
|
|
|
"gitee.com/wecisecode/util/filewalker"
|
|
"gitee.com/wecisecode/util/filewalker"
|
|
|
"gitee.com/wecisecode/util/mfmt"
|
|
"gitee.com/wecisecode/util/mfmt"
|
|
|
- "gitee.com/wecisecode/util/pqc"
|
|
|
|
|
"gitee.com/wecisecode/util/set/strset"
|
|
"gitee.com/wecisecode/util/set/strset"
|
|
|
- "gitee.com/wecisecode/util/spliter"
|
|
|
|
|
"github.com/gofrs/flock"
|
|
"github.com/gofrs/flock"
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/assert"
|
|
|
)
|
|
)
|
|
@@ -136,7 +128,9 @@ options:
|
|
|
odb=` + default_odbpaths + ` 指定odbserver路径,默认通过环境变量ODBPATH或通过ETCD相关配置获取
|
|
odb=` + default_odbpaths + ` 指定odbserver路径,默认通过环境变量ODBPATH或通过ETCD相关配置获取
|
|
|
keyspace=` + default_keyspace + ` 指定keyspace,默认通过环境变量KEYSPACE获取
|
|
keyspace=` + default_keyspace + ` 指定keyspace,默认通过环境变量KEYSPACE获取
|
|
|
ksnative=` + default_keyspace + `_native 指定native keyspace,默认通过环境变量KSNATIVE获取
|
|
ksnative=` + default_keyspace + `_native 指定native keyspace,默认通过环境变量KSNATIVE获取
|
|
|
|
|
+ loop=1 循环执行测试次数
|
|
|
debug=true 开启调试模式,输出更多信息
|
|
debug=true 开启调试模式,输出更多信息
|
|
|
|
|
+ pprof=. 设置pprof文件输出目录,开启自监控
|
|
|
|
|
|
|
|
环境变量需求:
|
|
环境变量需求:
|
|
|
KEYSPACE=` + default_keyspace + `
|
|
KEYSPACE=` + default_keyspace + `
|
|
@@ -379,698 +373,6 @@ type GlobalVars struct {
|
|
|
ch_wait_mql_done map[string]chan bool
|
|
ch_wait_mql_done map[string]chan bool
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (mt *MQLTest) Run(t *testing.T, fw *filewalker.FileWalker) (retok bool) {
|
|
|
|
|
- mt.t = t
|
|
|
|
|
- mt.fw = fw
|
|
|
|
|
- global := &GlobalVars{
|
|
|
|
|
- CurrentVars: &CurrentVars{},
|
|
|
|
|
- wg_wait_fork_routine: make(map[string]*sync.WaitGroup),
|
|
|
|
|
- ch_wait_mql_done: make(map[string]chan bool),
|
|
|
|
|
- }
|
|
|
|
|
- mt.scopevars = &ScopeVars{
|
|
|
|
|
- top: &Variables{
|
|
|
|
|
- vars: map[string]interface{}{},
|
|
|
|
|
- loop_count: 1,
|
|
|
|
|
- loop_from: 1,
|
|
|
|
|
- loop_step: 1},
|
|
|
|
|
- dir: map[string]*Variables{},
|
|
|
|
|
- file: map[string]*Variables{},
|
|
|
|
|
- mql: map[string]*Variables{}}
|
|
|
|
|
- ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
- defer cancel()
|
|
|
|
|
- st := time.Now()
|
|
|
|
|
- loop_i := 0
|
|
|
|
|
- parallel_queue := pqc.NewQueue[any](0)
|
|
|
|
|
- mqlcount := int32(0)
|
|
|
|
|
- defer func() {
|
|
|
|
|
- mt.scopevars.RLock()
|
|
|
|
|
- loop_count := mt.scopevars.top.loop_count
|
|
|
|
|
- mt.scopevars.RUnlock()
|
|
|
|
|
- ut := time.Since(st)
|
|
|
|
|
- aut := time.Duration(0)
|
|
|
|
|
- if mqlcount > 0 {
|
|
|
|
|
- aut = global.totalusetime / time.Duration(mqlcount)
|
|
|
|
|
- }
|
|
|
|
|
- logger.Info("完成 ", loop_count, " 次执行,共", mqlcount, "次 MQL 请求,耗时", ut, "单条响应时间", global.minusetime, "~", global.maxusetime, "/", aut, "平均每秒吞吐量", (int64(mqlcount)*int64(time.Second))/int64(ut))
|
|
|
|
|
- }()
|
|
|
|
|
- var wg sync.WaitGroup
|
|
|
|
|
- for {
|
|
|
|
|
- mt.scopevars.Lock()
|
|
|
|
|
- ok := loop_i < mt.scopevars.top.loop_count
|
|
|
|
|
- mt.scopevars.Unlock()
|
|
|
|
|
- if !ok {
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- loop_i++
|
|
|
|
|
- //
|
|
|
|
|
- ch_parallel_count := make(chan int)
|
|
|
|
|
- topvars := &CurrentVars{
|
|
|
|
|
- loop_i: loop_i,
|
|
|
|
|
- ch_parallel_count: ch_parallel_count,
|
|
|
|
|
- }
|
|
|
|
|
- //
|
|
|
|
|
- ok_chan := make(chan bool, 1)
|
|
|
|
|
- parallel_chan := make(chan bool, 1)
|
|
|
|
|
- parallel := false
|
|
|
|
|
- parallelcount := 0
|
|
|
|
|
- done := false
|
|
|
|
|
- wg.Add(1)
|
|
|
|
|
- go func() {
|
|
|
|
|
- defer func() {
|
|
|
|
|
- atomic.AddInt32(&mqlcount, topvars.mqlcount)
|
|
|
|
|
- wg.Done()
|
|
|
|
|
- }()
|
|
|
|
|
- ch_ok := make(chan bool)
|
|
|
|
|
- go func() {
|
|
|
|
|
- for {
|
|
|
|
|
- select {
|
|
|
|
|
- case <-ch_parallel_count:
|
|
|
|
|
- if !done && !parallel {
|
|
|
|
|
- parallel = true
|
|
|
|
|
- // 加入并发控制队列
|
|
|
|
|
- if parallelcount > 0 {
|
|
|
|
|
- if parallelcount > parallel_queue.Size() {
|
|
|
|
|
- parallel_queue.Growth(parallelcount)
|
|
|
|
|
- }
|
|
|
|
|
- parallel_queue.Push(1)
|
|
|
|
|
- }
|
|
|
|
|
- parallel_chan <- true
|
|
|
|
|
- }
|
|
|
|
|
- case ok := <-ch_ok:
|
|
|
|
|
- ok_chan <- ok
|
|
|
|
|
- if parallel {
|
|
|
|
|
- if parallelcount > 0 {
|
|
|
|
|
- // 从并发控制队列中移除
|
|
|
|
|
- parallel_queue.Pop()
|
|
|
|
|
- }
|
|
|
|
|
- } else {
|
|
|
|
|
- done = true
|
|
|
|
|
- }
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }()
|
|
|
|
|
- ok := mt.RunAll(t, ctx,
|
|
|
|
|
- global,
|
|
|
|
|
- topvars)
|
|
|
|
|
- ch_ok <- ok
|
|
|
|
|
- if !ok {
|
|
|
|
|
- cancel() // 并发测试,有一个线程出错,就全停
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- }()
|
|
|
|
|
- success := true
|
|
|
|
|
- select {
|
|
|
|
|
- case success = <-ok_chan: // 非并发,等待完成
|
|
|
|
|
- case <-parallel_chan: // 并发,执行继续下一次
|
|
|
|
|
- logger.Info("第", topvars.loop_i, "次并发执行继续")
|
|
|
|
|
- }
|
|
|
|
|
- if !success {
|
|
|
|
|
- return false // 失败,不等,直接返回
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- wg.Wait()
|
|
|
|
|
- return
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (mt *MQLTest) RunAll(t *testing.T, ctx context.Context, global *GlobalVars, topvars *CurrentVars) bool {
|
|
|
|
|
- logger.Info("开始第", topvars.loop_i, "次执行")
|
|
|
|
|
- st := time.Now()
|
|
|
|
|
- // 读取文件列表
|
|
|
|
|
- listdirs := []string{}
|
|
|
|
|
- dirfiles := map[string][]string{}
|
|
|
|
|
- err := mt.fw.List(func(basedir, filename string) bool {
|
|
|
|
|
- if dirfiles[basedir] == nil {
|
|
|
|
|
- listdirs = append(listdirs, basedir)
|
|
|
|
|
- }
|
|
|
|
|
- dirfiles[basedir] = append(dirfiles[basedir], filename)
|
|
|
|
|
- return true
|
|
|
|
|
- })
|
|
|
|
|
- if !assert.Nil(t, err, err) {
|
|
|
|
|
- return false
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 执行所有目录
|
|
|
|
|
- for _, basedir := range listdirs {
|
|
|
|
|
- // 循环执行目录下所有文件
|
|
|
|
|
- mt.scopevars.Lock()
|
|
|
|
|
- if mt.scopevars.dir[basedir] == nil {
|
|
|
|
|
- mt.scopevars.dir[basedir] = &Variables{
|
|
|
|
|
- vars: map[string]interface{}{},
|
|
|
|
|
- loop_count: 1,
|
|
|
|
|
- loop_from: 1,
|
|
|
|
|
- loop_step: 1}
|
|
|
|
|
- }
|
|
|
|
|
- mt.scopevars.Unlock()
|
|
|
|
|
- var wg sync.WaitGroup
|
|
|
|
|
- st := time.Now()
|
|
|
|
|
- loop_i := 0
|
|
|
|
|
- parallel_queue := pqc.NewQueue[any](0)
|
|
|
|
|
- mqlcount := int32(0)
|
|
|
|
|
- for {
|
|
|
|
|
- mt.scopevars.Lock()
|
|
|
|
|
- ok := loop_i < mt.scopevars.dir[basedir].loop_count
|
|
|
|
|
- mt.scopevars.Unlock()
|
|
|
|
|
- if !ok {
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- loop_i++
|
|
|
|
|
- //
|
|
|
|
|
- ch_parallel_count := make(chan int)
|
|
|
|
|
- dirvars := &CurrentVars{
|
|
|
|
|
- loop_i: loop_i,
|
|
|
|
|
- ch_parallel_count: ch_parallel_count,
|
|
|
|
|
- }
|
|
|
|
|
- //
|
|
|
|
|
- ok_chan := make(chan bool, 1)
|
|
|
|
|
- parallel_chan := make(chan bool, 1)
|
|
|
|
|
- parallel := false
|
|
|
|
|
- parallelcount := 0
|
|
|
|
|
- done := false
|
|
|
|
|
- wg.Add(1)
|
|
|
|
|
- go func(basedir string) {
|
|
|
|
|
- defer func() {
|
|
|
|
|
- atomic.AddInt32(&mqlcount, dirvars.mqlcount)
|
|
|
|
|
- wg.Done()
|
|
|
|
|
- }()
|
|
|
|
|
- ch_ok := make(chan bool)
|
|
|
|
|
- go func() {
|
|
|
|
|
- for {
|
|
|
|
|
- select {
|
|
|
|
|
- case <-ch_parallel_count:
|
|
|
|
|
- if !done && !parallel {
|
|
|
|
|
- parallel = true
|
|
|
|
|
- // 加入并发控制队列
|
|
|
|
|
- if parallelcount > 0 {
|
|
|
|
|
- if parallelcount > parallel_queue.Size() {
|
|
|
|
|
- parallel_queue.Growth(parallelcount)
|
|
|
|
|
- }
|
|
|
|
|
- parallel_queue.Push(1)
|
|
|
|
|
- }
|
|
|
|
|
- parallel_chan <- true
|
|
|
|
|
- }
|
|
|
|
|
- case ok := <-ch_ok:
|
|
|
|
|
- ok_chan <- ok
|
|
|
|
|
- if parallel {
|
|
|
|
|
- if parallelcount > 0 {
|
|
|
|
|
- // 从并发控制队列中移除
|
|
|
|
|
- parallel_queue.Pop()
|
|
|
|
|
- }
|
|
|
|
|
- } else {
|
|
|
|
|
- done = true
|
|
|
|
|
- }
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }()
|
|
|
|
|
- // logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次执行开始")
|
|
|
|
|
- files := dirfiles[basedir]
|
|
|
|
|
- ch_ok <- mt.RunDir(t, ctx,
|
|
|
|
|
- global,
|
|
|
|
|
- topvars,
|
|
|
|
|
- dirvars,
|
|
|
|
|
- basedir, files)
|
|
|
|
|
- // logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次执行结束")
|
|
|
|
|
- }(basedir)
|
|
|
|
|
- success := true
|
|
|
|
|
- select {
|
|
|
|
|
- case success = <-ok_chan: // 非并发,等待完成
|
|
|
|
|
- logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次顺序执行完成")
|
|
|
|
|
- case <-parallel_chan: // 并发,执行继续下一次
|
|
|
|
|
- logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次并发执行继续")
|
|
|
|
|
- }
|
|
|
|
|
- if !success {
|
|
|
|
|
- return false
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- wg.Wait()
|
|
|
|
|
- mt.scopevars.RLock()
|
|
|
|
|
- loop_count := mt.scopevars.dir[basedir].loop_count
|
|
|
|
|
- mt.scopevars.RUnlock()
|
|
|
|
|
- if loop_count > 1 {
|
|
|
|
|
- ut := time.Since(st)
|
|
|
|
|
- logger.Info(fmt.Sprint("dir ", basedir, " loop ", loop_count, " times, run ", mqlcount, " mqls, usetime ", ut))
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- logger.Info("完成第", topvars.loop_i, "次执行,共", topvars.mqlcount, "次 MQL 请求,耗时", time.Since(st))
|
|
|
|
|
- return true
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (mt *MQLTest) RunDir(t *testing.T, ctx context.Context,
|
|
|
|
|
- global *GlobalVars,
|
|
|
|
|
- topvars *CurrentVars,
|
|
|
|
|
- dirvars *CurrentVars,
|
|
|
|
|
- basedir string, files []string) bool {
|
|
|
|
|
- for _, filename := range files {
|
|
|
|
|
- ok := mt.RunFile(t, ctx, global, topvars, dirvars, basedir, filename)
|
|
|
|
|
- if !ok {
|
|
|
|
|
- return false
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- return true
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (mt *MQLTest) RunFile(t *testing.T, ctx context.Context,
|
|
|
|
|
- global *GlobalVars,
|
|
|
|
|
- topvars *CurrentVars,
|
|
|
|
|
- dirvars *CurrentVars,
|
|
|
|
|
- basedir, filename string) bool {
|
|
|
|
|
- // 读取文件内容
|
|
|
|
|
- ffpath := filepath.Join(basedir, filename)
|
|
|
|
|
- bs, err := os.ReadFile(ffpath)
|
|
|
|
|
- if !assert.Nil(t, err, err) {
|
|
|
|
|
- return false
|
|
|
|
|
- }
|
|
|
|
|
- // mql语句切分
|
|
|
|
|
- mqgr := NewMQLGroupRequest()
|
|
|
|
|
- var multilines *MQLRequest
|
|
|
|
|
- mqs := spliter.NewMQLSpliter(bufio.NewReader(strings.NewReader(string(bs))))
|
|
|
|
|
- for {
|
|
|
|
|
- mql, fromline, toline, fromchar, tochar, hasnext, _ := mqs.NextMQL()
|
|
|
|
|
- if !hasnext {
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- // 去掉注释
|
|
|
|
|
- clean_mql := strings.Join(spliter.MQLSplitClean(mql), ";")
|
|
|
|
|
- if multilines != nil {
|
|
|
|
|
- // 多条语句处理
|
|
|
|
|
- if strings.TrimSpace(clean_mql) == "multilines end" {
|
|
|
|
|
- // 保留语句中的注释
|
|
|
|
|
- tm := strings.TrimSpace(strings.Replace(mql, "multilines end", "", 1))
|
|
|
|
|
- if len(tm) > 0 {
|
|
|
|
|
- if multilines.OriginQueryString != "" {
|
|
|
|
|
- multilines.OriginQueryString += ";"
|
|
|
|
|
- }
|
|
|
|
|
- multilines.OriginQueryString += tm
|
|
|
|
|
- }
|
|
|
|
|
- multilines.Toline = toline
|
|
|
|
|
- multilines.Tochar = tochar
|
|
|
|
|
- // 加入请求组,并初始化,提取动作信息
|
|
|
|
|
- e := mqgr.Append(multilines)
|
|
|
|
|
- if !assert.Nil(t, e, e) {
|
|
|
|
|
- return false
|
|
|
|
|
- }
|
|
|
|
|
- multilines = nil
|
|
|
|
|
- } else {
|
|
|
|
|
- if multilines.OriginQueryString != "" {
|
|
|
|
|
- multilines.OriginQueryString += ";"
|
|
|
|
|
- }
|
|
|
|
|
- multilines.OriginQueryString += mql
|
|
|
|
|
- multilines.Toline = toline
|
|
|
|
|
- multilines.Tochar = tochar
|
|
|
|
|
- }
|
|
|
|
|
- } else if strings.TrimSpace(clean_mql) == "multilines begin" {
|
|
|
|
|
- multilines = &MQLRequest{FilePath: ffpath}
|
|
|
|
|
- // 保留语句中的注释
|
|
|
|
|
- multilines.OriginQueryString = strings.TrimSpace(strings.Replace(mql, "multilines begin", "", 1))
|
|
|
|
|
- multilines.Fromline = fromline
|
|
|
|
|
- multilines.Fromchar = fromchar
|
|
|
|
|
- multilines.Toline = toline
|
|
|
|
|
- multilines.Tochar = tochar
|
|
|
|
|
- } else {
|
|
|
|
|
- // 单条语句
|
|
|
|
|
- mqr := &MQLRequest{OriginQueryString: mql, FilePath: ffpath, Fromline: fromline, Toline: toline, Fromchar: fromchar, Tochar: tochar}
|
|
|
|
|
- // 加入请求组,并初始化,提取动作信息
|
|
|
|
|
- e := mqgr.Append(mqr)
|
|
|
|
|
- if !assert.Nil(t, e, e) {
|
|
|
|
|
- return false
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- // mqls := spliter.MQLSplit(string(bs))
|
|
|
|
|
- mt.scopevars.Lock()
|
|
|
|
|
- if mt.scopevars.file[ffpath] == nil {
|
|
|
|
|
- mt.scopevars.file[ffpath] = &Variables{
|
|
|
|
|
- vars: map[string]interface{}{},
|
|
|
|
|
- loop_count: 1,
|
|
|
|
|
- loop_from: 1,
|
|
|
|
|
- loop_step: 1}
|
|
|
|
|
- }
|
|
|
|
|
- mt.scopevars.Unlock()
|
|
|
|
|
- var wg sync.WaitGroup
|
|
|
|
|
- st := time.Now()
|
|
|
|
|
- loop_i := 0
|
|
|
|
|
- parallel_queue := pqc.NewQueue[any](0)
|
|
|
|
|
- mqlcount := int32(0)
|
|
|
|
|
- for {
|
|
|
|
|
- mt.scopevars.Lock()
|
|
|
|
|
- ok := loop_i < mt.scopevars.file[ffpath].loop_count
|
|
|
|
|
- mt.scopevars.Unlock()
|
|
|
|
|
- if !ok {
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- loop_i++
|
|
|
|
|
- //
|
|
|
|
|
- ch_parallel_count := make(chan int)
|
|
|
|
|
- filevars := &CurrentVars{
|
|
|
|
|
- loop_i: loop_i,
|
|
|
|
|
- ch_parallel_count: ch_parallel_count,
|
|
|
|
|
- }
|
|
|
|
|
- //
|
|
|
|
|
- ok_chan := make(chan bool, 1)
|
|
|
|
|
- parallel_chan := make(chan bool, 1)
|
|
|
|
|
- parallel := false
|
|
|
|
|
- parallelcount := 0
|
|
|
|
|
- done := false
|
|
|
|
|
- wg.Add(1)
|
|
|
|
|
- go func() {
|
|
|
|
|
- defer func() {
|
|
|
|
|
- atomic.AddInt32(&mqlcount, filevars.mqlcount)
|
|
|
|
|
- wg.Done()
|
|
|
|
|
- }()
|
|
|
|
|
- ch_ok := make(chan bool)
|
|
|
|
|
- go func() {
|
|
|
|
|
- for {
|
|
|
|
|
- select {
|
|
|
|
|
- case <-ch_parallel_count:
|
|
|
|
|
- if !done && !parallel {
|
|
|
|
|
- parallel = true
|
|
|
|
|
- // 加入并发控制队列
|
|
|
|
|
- if parallelcount > 0 {
|
|
|
|
|
- if parallelcount > parallel_queue.Size() {
|
|
|
|
|
- parallel_queue.Growth(parallelcount)
|
|
|
|
|
- }
|
|
|
|
|
- parallel_queue.Push(1)
|
|
|
|
|
- }
|
|
|
|
|
- parallel_chan <- true
|
|
|
|
|
- }
|
|
|
|
|
- case ok := <-ch_ok:
|
|
|
|
|
- ok_chan <- ok
|
|
|
|
|
- if parallel {
|
|
|
|
|
- if parallelcount > 0 {
|
|
|
|
|
- // 从并发控制队列中移除
|
|
|
|
|
- parallel_queue.Pop()
|
|
|
|
|
- }
|
|
|
|
|
- } else {
|
|
|
|
|
- done = true
|
|
|
|
|
- }
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }()
|
|
|
|
|
- // logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次执行开始")
|
|
|
|
|
- ch_ok <- mt.RunMQLGroup(t, ctx,
|
|
|
|
|
- global,
|
|
|
|
|
- topvars,
|
|
|
|
|
- dirvars,
|
|
|
|
|
- filevars,
|
|
|
|
|
- basedir, ffpath, mqgr)
|
|
|
|
|
- // logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次执行结束")
|
|
|
|
|
- }()
|
|
|
|
|
- success := true
|
|
|
|
|
- select {
|
|
|
|
|
- case success = <-ok_chan: // 非并发,等待完成
|
|
|
|
|
- logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次顺序执行完成")
|
|
|
|
|
- case <-parallel_chan: // 并发,执行继续下一次
|
|
|
|
|
- logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次并发执行继续")
|
|
|
|
|
- }
|
|
|
|
|
- if !success {
|
|
|
|
|
- return false
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- wg.Wait()
|
|
|
|
|
- mt.scopevars.RLock()
|
|
|
|
|
- loop_count := mt.scopevars.file[ffpath].loop_count
|
|
|
|
|
- mt.scopevars.RUnlock()
|
|
|
|
|
- if loop_count > 1 {
|
|
|
|
|
- ut := time.Since(st)
|
|
|
|
|
- sn := fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i)
|
|
|
|
|
- logger.Info(fmt.Sprint("file ", ffpath+"/"+sn, " loop ", loop_count, " times, run ", mqlcount, " mqls, usetime ", ut))
|
|
|
|
|
- }
|
|
|
|
|
- return true
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (mt *MQLTest) RunMQLGroup(t *testing.T, ctx context.Context,
|
|
|
|
|
- global *GlobalVars,
|
|
|
|
|
- topvars *CurrentVars,
|
|
|
|
|
- dirvars *CurrentVars,
|
|
|
|
|
- filevars *CurrentVars,
|
|
|
|
|
- basedir, ffpath string, mqgr *MQLGroupRequest) (pass bool) {
|
|
|
|
|
- pass = true
|
|
|
|
|
- var wg sync.WaitGroup
|
|
|
|
|
- for _, mqs := range mqgr.mqrs {
|
|
|
|
|
- wg.Add(1)
|
|
|
|
|
- go func(mqs []*MQLRequest) {
|
|
|
|
|
- defer wg.Done()
|
|
|
|
|
- if len(mqs) == 0 {
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- if mqs[0].StaticActions.ForkName != nil {
|
|
|
|
|
- forkname := *mqs[0].StaticActions.ForkName
|
|
|
|
|
- global.Lock()
|
|
|
|
|
- wg := global.wg_wait_fork_routine[forkname]
|
|
|
|
|
- if wg == nil {
|
|
|
|
|
- wg = &sync.WaitGroup{}
|
|
|
|
|
- global.wg_wait_fork_routine[forkname] = wg
|
|
|
|
|
- }
|
|
|
|
|
- global.Unlock()
|
|
|
|
|
- wg.Add(1)
|
|
|
|
|
- defer wg.Done()
|
|
|
|
|
- }
|
|
|
|
|
- ok := mt.RunMQLs(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqs)
|
|
|
|
|
- if !ok {
|
|
|
|
|
- pass = false
|
|
|
|
|
- }
|
|
|
|
|
- }(mqs)
|
|
|
|
|
- }
|
|
|
|
|
- wg.Wait()
|
|
|
|
|
- return
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (mt *MQLTest) RunMQLs(t *testing.T, ctx context.Context,
|
|
|
|
|
- global *GlobalVars,
|
|
|
|
|
- topvars *CurrentVars,
|
|
|
|
|
- dirvars *CurrentVars,
|
|
|
|
|
- filevars *CurrentVars,
|
|
|
|
|
- basedir, ffpath string, mqrs []*MQLRequest) bool {
|
|
|
|
|
- for _, mqr := range mqrs {
|
|
|
|
|
- mqlkey := mqr.Key
|
|
|
|
|
- mqlstr := mqr.OriginQueryString
|
|
|
|
|
- if mqlstr == "" {
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
- staticactions := mqr.StaticActions
|
|
|
|
|
- // 设置执行过程中的控制参数
|
|
|
|
|
- mt.InitScopeVars(basedir, ffpath, mqlkey, staticactions)
|
|
|
|
|
- ch_test_run_one_mql_result := make(chan bool)
|
|
|
|
|
- go func() {
|
|
|
|
|
- mqrinst := fmt.Sprint(mqlkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
|
|
|
|
|
- global.Lock()
|
|
|
|
|
- mqrdone := global.ch_wait_mql_done[mqrinst]
|
|
|
|
|
- if mqrdone == nil {
|
|
|
|
|
- mqrdone = make(chan bool)
|
|
|
|
|
- global.ch_wait_mql_done[mqrinst] = mqrdone
|
|
|
|
|
- }
|
|
|
|
|
- var waitmqrdone chan bool
|
|
|
|
|
- if mqr.WaitMQLRequest != nil {
|
|
|
|
|
- waitmqrkey := mqr.WaitMQLRequest.Key
|
|
|
|
|
- waitmqrinst := fmt.Sprint(waitmqrkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
|
|
|
|
|
- waitmqrdone = global.ch_wait_mql_done[waitmqrinst]
|
|
|
|
|
- if waitmqrdone == nil {
|
|
|
|
|
- waitmqrdone = make(chan bool)
|
|
|
|
|
- global.ch_wait_mql_done[waitmqrinst] = waitmqrdone
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- global.Unlock()
|
|
|
|
|
- var ret bool
|
|
|
|
|
- defer func() { mqrdone <- ret }()
|
|
|
|
|
- if waitmqrdone != nil {
|
|
|
|
|
- v := <-waitmqrdone
|
|
|
|
|
- waitmqrdone <- v
|
|
|
|
|
- if !v {
|
|
|
|
|
- // 依赖MQR失败
|
|
|
|
|
- ch_test_run_one_mql_result <- false
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- ret = mt.RunMQR(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqr)
|
|
|
|
|
- ch_test_run_one_mql_result <- ret
|
|
|
|
|
- }()
|
|
|
|
|
- ret := <-ch_test_run_one_mql_result
|
|
|
|
|
- if !ret {
|
|
|
|
|
- return ret
|
|
|
|
|
- }
|
|
|
|
|
- // continue
|
|
|
|
|
- }
|
|
|
|
|
- return true
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (mt *MQLTest) RunMQR(t *testing.T, ctx context.Context,
|
|
|
|
|
- global *GlobalVars,
|
|
|
|
|
- topvars *CurrentVars,
|
|
|
|
|
- dirvars *CurrentVars,
|
|
|
|
|
- filevars *CurrentVars,
|
|
|
|
|
- basedir, ffpath string, mqr *MQLRequest) bool {
|
|
|
|
|
- mqlkey := mqr.Key
|
|
|
|
|
- staticactionexprs := mqr.StaticActionExprs
|
|
|
|
|
- staticactions := mqr.StaticActions
|
|
|
|
|
- return t.Run(mqlkey, func(t *testing.T) {
|
|
|
|
|
- var wg sync.WaitGroup
|
|
|
|
|
- st := time.Now()
|
|
|
|
|
- loop_i := 0
|
|
|
|
|
- parallel_queue := pqc.NewQueue[any](0)
|
|
|
|
|
- mqlcount := int32(0)
|
|
|
|
|
- for {
|
|
|
|
|
- mt.scopevars.Lock()
|
|
|
|
|
- ok := loop_i < mt.scopevars.mql[mqlkey].loop_count
|
|
|
|
|
- mt.scopevars.Unlock()
|
|
|
|
|
- if !ok {
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- loop_i++
|
|
|
|
|
- //
|
|
|
|
|
- ch_parallel_count := make(chan int)
|
|
|
|
|
- mqlvars := &CurrentVars{
|
|
|
|
|
- loop_i: loop_i,
|
|
|
|
|
- ch_parallel_count: ch_parallel_count,
|
|
|
|
|
- }
|
|
|
|
|
- // 运行实例
|
|
|
|
|
- mqlsn := fmt.Sprint(mqlkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i, ".", mqlvars.loop_i)
|
|
|
|
|
- mt.scopevars.Lock()
|
|
|
|
|
- if mt.scopevars.mql[mqlsn] == nil {
|
|
|
|
|
- mt.scopevars.mql[mqlsn] = &Variables{
|
|
|
|
|
- vars: map[string]interface{}{},
|
|
|
|
|
- loop_count: 1,
|
|
|
|
|
- loop_from: 1,
|
|
|
|
|
- loop_step: 1}
|
|
|
|
|
- }
|
|
|
|
|
- mt.scopevars.Unlock()
|
|
|
|
|
- //
|
|
|
|
|
- mt.scopevars.Lock()
|
|
|
|
|
- top_loopi := mt.scopevars.top.loop_from + (topvars.loop_i-1)*mt.scopevars.top.loop_step
|
|
|
|
|
- mt.scopevars.top.vars["topi"] = top_loopi
|
|
|
|
|
- dir_loopi := mt.scopevars.dir[basedir].loop_from + (dirvars.loop_i-1)*mt.scopevars.dir[basedir].loop_step
|
|
|
|
|
- mt.scopevars.dir[basedir].vars["diri"] = dir_loopi
|
|
|
|
|
- mt.scopevars.dir[basedir].vars["topi"] = top_loopi
|
|
|
|
|
- file_loopi := mt.scopevars.file[ffpath].loop_from + (filevars.loop_i-1)*mt.scopevars.file[ffpath].loop_step
|
|
|
|
|
- mt.scopevars.file[ffpath].vars["filei"] = file_loopi
|
|
|
|
|
- mt.scopevars.file[ffpath].vars["diri"] = dir_loopi
|
|
|
|
|
- mt.scopevars.file[ffpath].vars["topi"] = top_loopi
|
|
|
|
|
- mql_loopi := mt.scopevars.mql[mqlkey].loop_from + (mqlvars.loop_i-1)*mt.scopevars.mql[mqlkey].loop_step
|
|
|
|
|
- mt.scopevars.mql[mqlkey].vars["mqli"] = mql_loopi
|
|
|
|
|
- mt.scopevars.mql[mqlkey].vars["filei"] = file_loopi
|
|
|
|
|
- mt.scopevars.mql[mqlkey].vars["diri"] = dir_loopi
|
|
|
|
|
- mt.scopevars.mql[mqlkey].vars["topi"] = top_loopi
|
|
|
|
|
- //
|
|
|
|
|
- mt.scopevars.mql[mqlsn].vars["mqli"] = mql_loopi
|
|
|
|
|
- mt.scopevars.mql[mqlsn].vars["filei"] = file_loopi
|
|
|
|
|
- mt.scopevars.mql[mqlsn].vars["diri"] = dir_loopi
|
|
|
|
|
- mt.scopevars.mql[mqlsn].vars["topi"] = top_loopi
|
|
|
|
|
- mt.scopevars.Unlock()
|
|
|
|
|
- mt.BeforeRunAction(basedir, ffpath, mqlkey, mqlsn, staticactions)
|
|
|
|
|
- formatedmqlstr := mqr.FormatedQueryString
|
|
|
|
|
- mql := mt.ReplaceLoopSN(formatedmqlstr,
|
|
|
|
|
- global, topvars, dirvars, filevars, mqlvars,
|
|
|
|
|
- top_loopi, dir_loopi, file_loopi, mql_loopi,
|
|
|
|
|
- basedir, ffpath, mqlkey, mqlsn,
|
|
|
|
|
- )
|
|
|
|
|
- mqlreplace := rereplace_nbytes.FindAllStringSubmatch(mql, -1)
|
|
|
|
|
- for _, mqr := range mqlreplace {
|
|
|
|
|
- if len(mqr) == 2 {
|
|
|
|
|
- bs := make([]byte, cast.ToInt(mqr[1]))
|
|
|
|
|
- for i := 0; i < len(bs); i++ {
|
|
|
|
|
- bs[i] = byte(32 + rand.Intn(91))
|
|
|
|
|
- }
|
|
|
|
|
- s := string(bs)
|
|
|
|
|
- s = strings.ReplaceAll(s, ")", string([]byte{123}))
|
|
|
|
|
- s = strings.ReplaceAll(s, "'", string([]byte{124}))
|
|
|
|
|
- s = strings.ReplaceAll(s, "\"", string([]byte{125}))
|
|
|
|
|
- s = strings.ReplaceAll(s, "\\", string([]byte{126}))
|
|
|
|
|
- mql = strings.Replace(mql, mqr[0], s, 1)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- for i, sat := range staticactionexprs {
|
|
|
|
|
- mql = strings.Replace(mql, "["+strconv.Itoa(i)+"]", sat.SourceText, 1)
|
|
|
|
|
- }
|
|
|
|
|
- mql = strings.ReplaceAll(mql, "[[]", "[")
|
|
|
|
|
- mqri := &MQLRequestInstance{
|
|
|
|
|
- MQLRequest: mqr,
|
|
|
|
|
- PreparedQueryString: mql,
|
|
|
|
|
- }
|
|
|
|
|
- //
|
|
|
|
|
- ok_chan := make(chan error, 1)
|
|
|
|
|
- parallel_chan := make(chan bool, 1)
|
|
|
|
|
- parallel := false
|
|
|
|
|
- parallelcount := 0
|
|
|
|
|
- done := false
|
|
|
|
|
- wg.Add(1)
|
|
|
|
|
- go func(mqlvars *CurrentVars) {
|
|
|
|
|
- defer func() {
|
|
|
|
|
- atomic.AddInt32(&mqlcount, mqlvars.mqlcount)
|
|
|
|
|
- wg.Done()
|
|
|
|
|
- }()
|
|
|
|
|
- ch_ok := make(chan error)
|
|
|
|
|
- go func() {
|
|
|
|
|
- for {
|
|
|
|
|
- select {
|
|
|
|
|
- case parallelcount = <-ch_parallel_count:
|
|
|
|
|
- if !done && !parallel {
|
|
|
|
|
- parallel = true
|
|
|
|
|
- // 加入并发控制队列
|
|
|
|
|
- if parallelcount > 0 {
|
|
|
|
|
- if parallelcount > parallel_queue.Size() {
|
|
|
|
|
- parallel_queue.Growth(parallelcount)
|
|
|
|
|
- }
|
|
|
|
|
- parallel_queue.Push(1)
|
|
|
|
|
- }
|
|
|
|
|
- parallel_chan <- true
|
|
|
|
|
- }
|
|
|
|
|
- case ok := <-ch_ok:
|
|
|
|
|
- ok_chan <- ok
|
|
|
|
|
- if parallel {
|
|
|
|
|
- if parallelcount > 0 {
|
|
|
|
|
- // 从并发控制队列中移除
|
|
|
|
|
- parallel_queue.Pop()
|
|
|
|
|
- }
|
|
|
|
|
- } else {
|
|
|
|
|
- done = true
|
|
|
|
|
- }
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }()
|
|
|
|
|
- nstaticactionexprs := map[int]*Action{}
|
|
|
|
|
- for i, sat := range staticactionexprs {
|
|
|
|
|
- nstaticactionexprs[i] = sat
|
|
|
|
|
- }
|
|
|
|
|
- // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次执行开始")
|
|
|
|
|
- ch_ok <- mt.RunMQL(t, ctx,
|
|
|
|
|
- global,
|
|
|
|
|
- topvars,
|
|
|
|
|
- dirvars,
|
|
|
|
|
- filevars,
|
|
|
|
|
- mqlvars,
|
|
|
|
|
- basedir, ffpath, mqlkey, mqlsn, mqri, staticactions, nstaticactionexprs)
|
|
|
|
|
- // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次执行结束")
|
|
|
|
|
- }(mqlvars)
|
|
|
|
|
- var err error
|
|
|
|
|
- select {
|
|
|
|
|
- case err = <-ok_chan: // 非并发,等待完成
|
|
|
|
|
- // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次顺序执行完成")
|
|
|
|
|
- case <-parallel_chan: // 并发,执行继续下一次
|
|
|
|
|
- // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次并发执行继续")
|
|
|
|
|
- }
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- // logger.Error(err)
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- wg.Wait()
|
|
|
|
|
- mt.scopevars.RLock()
|
|
|
|
|
- loop_count := mt.scopevars.mql[mqlkey].loop_count
|
|
|
|
|
- mt.scopevars.RUnlock()
|
|
|
|
|
- if loop_count > 1 {
|
|
|
|
|
- ut := time.Since(st)
|
|
|
|
|
- sn := fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
|
|
|
|
|
- as := ""
|
|
|
|
|
- if topvars.sleeptime > 0 {
|
|
|
|
|
- as = fmt.Sprint(", sleep ", topvars.sleeptime)
|
|
|
|
|
- }
|
|
|
|
|
- logger.Info(fmt.Sprint("mql ", mqlkey+"/"+sn, " loop ", loop_i, " times, usetime ", ut, as))
|
|
|
|
|
- }
|
|
|
|
|
- })
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
func (mt *MQLTest) getValue(basedir, ffpath, mqlkey, mqlsn string,
|
|
func (mt *MQLTest) getValue(basedir, ffpath, mqlkey, mqlsn string,
|
|
|
staticactions *StaticActions, a any) (ak string, va any) {
|
|
staticactions *StaticActions, a any) (ak string, va any) {
|
|
|
switch av := a.(type) {
|
|
switch av := a.(type) {
|
|
@@ -1327,96 +629,3 @@ func (mt *MQLTest) InitScopeVars(basedir, ffpath, mqlkey string,
|
|
|
mt.scopevars.Unlock()
|
|
mt.scopevars.Unlock()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
-func (mt *MQLTest) RunMQL(t *testing.T, ctx context.Context,
|
|
|
|
|
- global *GlobalVars,
|
|
|
|
|
- topvars *CurrentVars,
|
|
|
|
|
- dirvars *CurrentVars,
|
|
|
|
|
- filevars *CurrentVars,
|
|
|
|
|
- mqlvars *CurrentVars,
|
|
|
|
|
- basedir, ffpath, mqlkey, mqlsn string, mqri *MQLRequestInstance, staticactions *StaticActions, staticactionexprs map[int]*Action) error {
|
|
|
|
|
-
|
|
|
|
|
- mqlstr := mqri.PreparedQueryString
|
|
|
|
|
-
|
|
|
|
|
- if staticactions.WaitName != nil {
|
|
|
|
|
- global.Lock()
|
|
|
|
|
- wgs := []*sync.WaitGroup{}
|
|
|
|
|
- if *staticactions.WaitName == "" {
|
|
|
|
|
- for _, wg := range global.wg_wait_fork_routine {
|
|
|
|
|
- wgs = append(wgs, wg)
|
|
|
|
|
- }
|
|
|
|
|
- } else {
|
|
|
|
|
- wg := global.wg_wait_fork_routine[*staticactions.WaitName]
|
|
|
|
|
- if wg != nil {
|
|
|
|
|
- wgs = append(wgs, wg)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- global.Unlock()
|
|
|
|
|
- for _, wg := range wgs {
|
|
|
|
|
- wg.Wait()
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if staticactions.ParallelCount != nil {
|
|
|
|
|
- mt.scopevars.Lock()
|
|
|
|
|
- switch staticactions.Scope {
|
|
|
|
|
- case "top":
|
|
|
|
|
- topvars.ch_parallel_count <- *staticactions.ParallelCount
|
|
|
|
|
- case "dir":
|
|
|
|
|
- dirvars.ch_parallel_count <- *staticactions.ParallelCount
|
|
|
|
|
- case "file":
|
|
|
|
|
- filevars.ch_parallel_count <- *staticactions.ParallelCount
|
|
|
|
|
- default:
|
|
|
|
|
- mqlvars.ch_parallel_count <- *staticactions.ParallelCount
|
|
|
|
|
- }
|
|
|
|
|
- mt.scopevars.Unlock()
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 重新获取修正后的动作
|
|
|
|
|
- actionexprs, e := getActionExprs(mqlstr, staticactionexprs)
|
|
|
|
|
- if !assert.Nil(t, e, e) {
|
|
|
|
|
- return e
|
|
|
|
|
- }
|
|
|
|
|
- dynamicactions := actionexprs.DynamicActions()
|
|
|
|
|
- if len(dynamicactions.SubscribeArgs) > 0 {
|
|
|
|
|
- subscribe(dynamicactions.SubscribeArgs...)
|
|
|
|
|
- }
|
|
|
|
|
- if len(dynamicactions.UnsubscribeArgs) > 0 {
|
|
|
|
|
- unsubscribe(dynamicactions.UnsubscribeArgs...)
|
|
|
|
|
- }
|
|
|
|
|
- values := []interface{}{}
|
|
|
|
|
- if len(dynamicactions.Params) > 0 {
|
|
|
|
|
- err := json.Unmarshal([]byte(dynamicactions.Params), &values)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- assert.Nil(t, fmt.Sprint("params参数只支持JSON Array,", dynamicactions.Params, mqlstr), err)
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- x := atomic.AddInt32(&global.mqlcount, 1)
|
|
|
|
|
- atomic.AddInt32(&topvars.mqlcount, 1)
|
|
|
|
|
- atomic.AddInt32(&dirvars.mqlcount, 1)
|
|
|
|
|
- atomic.AddInt32(&filevars.mqlcount, 1)
|
|
|
|
|
- atomic.AddInt32(&mqlvars.mqlcount, 1)
|
|
|
|
|
-
|
|
|
|
|
- err := mt.RunMQLTryDo(t, ctx,
|
|
|
|
|
- global,
|
|
|
|
|
- topvars,
|
|
|
|
|
- dirvars,
|
|
|
|
|
- filevars,
|
|
|
|
|
- mqlvars,
|
|
|
|
|
- basedir, ffpath, mqlkey, mqlsn+"("+strconv.Itoa(int(x))+")", mqri, values, staticactions, actionexprs)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- // 执行过程有错,停止继续执行
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
- if staticactions.SleepTime != nil {
|
|
|
|
|
- global.sleeptime += *staticactions.SleepTime
|
|
|
|
|
- topvars.sleeptime += *staticactions.SleepTime
|
|
|
|
|
- dirvars.sleeptime += *staticactions.SleepTime
|
|
|
|
|
- filevars.sleeptime += *staticactions.SleepTime
|
|
|
|
|
- mqlvars.sleeptime += *staticactions.SleepTime
|
|
|
|
|
- time.Sleep(*staticactions.SleepTime)
|
|
|
|
|
- }
|
|
|
|
|
- return nil
|
|
|
|
|
-}
|
|
|