| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373 |
- package odbcmql
- import (
- "bufio"
- "context"
- "encoding/json"
- "fmt"
- "log"
- "math/rand"
- "os"
- "path/filepath"
- "regexp"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- odb "git.wecise.com/wecise/odb-go/odb"
- "git.wecise.com/wecise/odb-go/odb/mql/qstru"
- "gitee.com/wecisecode/util/cast"
- mcfg "gitee.com/wecisecode/util/cfg"
- "gitee.com/wecisecode/util/filewalker"
- "gitee.com/wecisecode/util/pqc"
- "gitee.com/wecisecode/util/set/strset"
- "gitee.com/wecisecode/util/spliter"
- "github.com/gofrs/flock"
- "github.com/stretchr/testify/assert"
- )
- var ODBC odb.Client
- var ODBError error
- var debug bool
- // var checkCassSchema bool
- var default_keyspace = `oktest`
- var default_odbpaths = `127.0.0.1:11001`
- var ksnative = default_keyspace + "_native"
- func init() {
- logprefix := config.GetString("log.prefix", "")
- if logprefix != "" {
- logprefix += " "
- }
- logger.SetFormat(logprefix+"yyyy-MM-dd HH:mm:ss.SSSSSS [pid] [level] msg", "\n")
- odbpaths := strset.New(strings.Split(mcfg.CommandArgs.GetString("odb",
- strings.Join(mcfg.Environs.GetStrings("ODBPATH",
- config.GetStrings("odbc.odbpath", default_odbpaths)...), ",")), ",")...).List()
- keyspace := mcfg.CommandArgs.GetString("keyspace",
- mcfg.Environs.GetString("KEYSPACE", config.GetString("odbc.keyspace", default_keyspace)))
- cassdc := mcfg.CommandArgs.GetString("cassandra.dc",
- mcfg.Environs.GetString("CASSANDRADC", config.GetString("cassandra.dc", "")))
- if cassdc == "" || cassdc == "dc1" {
- ksnative = keyspace + "_native"
- } else {
- ksnative = keyspace + "_" + cassdc + "_native"
- }
- ksnative = mcfg.CommandArgs.GetString("ksnative",
- mcfg.Environs.GetString("KSNATIVE", config.GetString("odbc.ksnative", ksnative)))
- debug = mcfg.CommandArgs.GetBool("debug", false)
- // checkCassSchema = mcfg.CommandArgs.GetBool("ccs", true)
- ODBC, ODBError = odb.NewClient(&odb.Config{
- Keyspace: keyspace,
- Hosts: odbpaths,
- Debug: debug,
- })
- }
- func Run(t *testing.T) {
- if mcfg.CommandArgs.GetString("keyspace?") == "keyspace?" {
- if ODBC != nil {
- fmt.Println(ODBC.Config().Keyspace)
- os.Exit(0)
- }
- os.Exit(0)
- }
- if mcfg.CommandArgs.GetString("odbpath?") == "odbpath?" {
- if ODBC != nil {
- fmt.Println(ODBC.Config().Hosts)
- os.Exit(0)
- }
- os.Exit(0)
- }
- if mcfg.CommandArgs.GetString("odbver?") == "odbver?" {
- if ODBC != nil {
- fmt.Println(ODBC.Versline())
- os.Exit(0)
- }
- os.Exit(0)
- }
- spath := strings.Split(config.GetString("test.mql.path"), " ")
- if len(os.Args) > 1 {
- if len(os.Args) > 2 && os.Args[1] == "-" {
- s := strings.Join(os.Args[2:], " ")
- spath = []string{"/tmp/1.mql"}
- os.WriteFile(spath[0], []byte(s+"/**output()**/"), os.ModePerm)
- } else {
- for i := 1; i < len(os.Args); i++ {
- ap := os.Args[i]
- if strings.HasPrefix(ap, "-test.") {
- break
- }
- if strings.Contains(ap, "=") {
- continue
- }
- if (ap == "-" || ap == "--") && i+1 < len(os.Args) {
- s := strings.Join(os.Args[i+1:], " ")
- spath = []string{"/tmp/1.mql"}
- os.WriteFile(spath[0], []byte(s+"\n/**output()**/"), os.ModePerm)
- break
- }
- spath = append(spath, ap)
- }
- }
- }
- if len(spath) == 0 || len(spath) == 1 && len(spath[0]) == 0 {
- fmt.Print(`循环遍历执行指定路径下的 mql 文件
- usage: mql path [options] -- mqlstatement
- path 指定mql文件所在路径,通配符 ** 表示任意字符,* 表示除分隔符以外的任意字符, . 表示递归当前目录下的所有子目录
- 为避免 shell 自动将 * 转换为文件名列表,可以将指定的 path 用引号包含
- mqlstatement 指定要执行的 mql 语句,支持多条语句,语句之间用分号 ; 分隔
- options:
- match=^\\d+.* 文件名匹配正则表达式,默认为所有以数字开头命名的文件
- odb=` + default_odbpaths + ` 指定odbserver路径,默认通过环境变量ODBPATH或通过ETCD相关配置获取
- keyspace=` + default_keyspace + ` 指定keyspace,默认通过环境变量KEYSPACE获取
- ksnative=` + default_keyspace + `_native 指定native keyspace,默认通过环境变量KSNATIVE获取
- debug=true 开启调试模式,输出更多信息
- 环境变量需求:
- KEYSPACE=` + default_keyspace + `
- KSNATIVE=` + default_keyspace + `_native
- ODBPATH=` + default_odbpaths + `
- ETCDPATH=127.0.0.1:2379
- ETCDUSER=
- ETCDPASS=
- CASSANDRAPATH=127.0.0.1
- CASSANDRALOOKUP=false
- NATSPATH=nats://user:N5JbKeT1C3uOUh317OVXjg==@127.0.0.1:4222
- LANG=zh_CN.utf8
- LC_ALL=zh_CN.utf8
- 可通过mql.conf配置运行参数:
- [odbc]
- ;指定odbserver路径
- odbpath=` + default_odbpaths + `
- ;指定keyspace
- keyspace=` + default_keyspace + `
- ;指定native keyspace
- ksnative=` + default_keyspace + `_native
- mql语句扩展说明:
- mql语句中的内容替换:
- 随机字符替换:
- mql语句中的 '...N Bytes...' 会被替换为 N 个随机可见字符,N 为自然数
- 变量替换:
- mql语句中 '{%<f>,<varname>}' 形式的内容会被替换为,以 %<f> 作为格式化标记的 <varname> 变量的内容
- 目前支持的 <varname> 包括:
- keyspace 当前指定的 Keyspace
- ksnative Native Keyspace
- mqli 当前mql语句的循环值,参考 loop(N)
- filei 当前文件的循环值,参考 scope(file) loop(N)
- diri 当前目录的循环值,参考 scope(dir) loop(N)
- topi 整个执行进程的循环值,参考 scope(top) loop(N)
- mqlcount 当前mql语句的总计数
- filecount 当前文件的总计数
- dircount 当前目录的总计数
- topcount 整个执行进程的总计数
- mql语句中第一个 /** 到 最后一个 **/ 之间的内容可以标注执行若干预定义动作,多个动作标注用空格分隔
- 如 /** output() sleep(1) **/ 表示忽略当前语句执行过程中的任何报错,执行完成后等待一秒后再继续执行下一语句
- 目前支持的预定义动作如下:
- skip() 跳过当前语句,以及其它的所有预定义动作
- 可用于暂时屏蔽一条语句的执行
- params(V) 用于提供 prepare 语句中 ? 的对应值,V 为 JSON 格式编码的数组
- 对应值中不能有包含逗号和右括号,如果需要逗号用 \u002c 代替,右括号用 \u0029 代替
- subscribe(S) 订阅指定主题的消息通知 S,输出收到的通知信息,S 为要订阅主题的的名称字符串,同一主题只会被订阅一次
- unsubscribe(S) 取消订阅指定主题的消息通知 S,S 为要订阅主题的名称字符串
- fork(G) 新建并行线程分支,执行当前语句的同时,继续执行后续语句,G 为线程组名,配合 wait 使用
- wait(G) 执行当前语句前,等待之前创建的分支结束,G 为线程组名,对应由 fork 创建的所有同名线程分支,G 为空表示所有
- beforerun(F,...) 执行当前语句前,执行一系列预定义函数,以通过一些简单逻辑处理产生模拟数据,目前支持的函数包括
- set(S,V) 设置变量,S 为变量名,V为变量值
- add(S,V,u) 累加变量,S 为变量名,V为增量值,u为可选的变量单位,如时间段单位 second,minute,hour
- mod(S,V) 变量取模,相当于 S = S % V
- case(C,F) 根据条件 C 执行函数 F
- ------------- 针对错误处理的相关动作
- retry(N) 出错重试 N 次,N为自然数,默认为 0,
- onerror(O,E,...) retry后仍然有错时的处理方法,E为特定的错误信息关键字,不指定则为任意错误,O为处理方法,包括:
- break 中断当前语句的循环,继续顺序执行其它语句,没有循环时与pass无区别
- continue 忽略报错,继续执行,在循环中时,继续下一轮循环
- must 报错才正常,否则中断执行,报告期待错误信息不符,退出当前进程
- exit 停止执行,退出当前进程,此为默认处理方式
- noerrinfo() 存在 retry 或 onerror 处理时,不输出错误信息
- ------------- 以上的预定义动作只对当前语句起作用,下面的预定义动作可以指定其作用范围
- scope(S) 相关动作的作用范围 S 包括 top,dir,file,mql,默认作用范围为 mql 当前语句
- 如:scope(file) parallel(3) loop(5) 并发循环执行本文件五次,最大并行数限制为3
- qmeta(O) mql执行时设置特定的选项,O为json对象
- timeout(D) mql执行超时设置,D 为时间段,默认为一分钟
- 时间段 D 支持单位 d(天),h(时),m(分),s(秒),ms(毫秒),us(微秒),ns(纳秒),默认为毫秒
- loop(N) 循环执行 N 次,N 为正整数,默认执行一次
- 执行前会替换 mql 语句及 params 参数中的循环次数标记
- loopfrom(N) 设置循环执行计数起始值 N,默认为 1,该值仅影响替换 mql 语句及 params 参数中的循环次数标记
- loopstep(N) 设置循环执行计数步长值 N,默认为 1,该值仅影响替换 mql 语句及 params 参数中的循环次数标记
- parallel(N) 并发执行,N 为最大并行数,不指定 N 表示不限制并行数
- set(S,V) 设置变量,S 为变量名,V为变量值
- ------------- 以上为执行语句前的预定义动作,下面是执行完成后的预定义动作
- sleep(D) 执行完成后等待时间段 D 后再继续执行下一语句
- output() 输出执行结果
- outputcount() 输出执行结果记录条数
- metainfo() 输出执行结果相关的元信息
- schema(C) 执行 schema 命令检查指定类 C 是否存在
- count(N) 检查返回结果中的数据记录数是否为 N,N 为自然数
- equal(N,F,V) 判断返回结果中第 N 条数据的字段 F 的值是否为 V
- match(Kn,Mn) 检查返回结果中字段 Kn 值为 Mn 的记录是否存在,参数 Kn,Mn 可以有多个,需要成对出现
- matchcount(Kn,Mn,N) 检查返回结果中字段 Kn 值为 Mn 的记录数是否为 N,参数 Kn,Mn 可以有多个,需要成对出现,N为自然数
- 例:循环遍历执行当前路径下的所有 mql 文件
- ./mql .
- `)
- os.Exit(0)
- return
- }
- // 确定MODB连接
- if !assert.Nil(t, ODBError) {
- return
- }
- logger.Info("odbpath :", ODBC.Config().Hosts, ODBC.Config().Port)
- logger.Info("keyspace :", ODBC.Config().Keyspace)
- logger.Info("version :", ODBC.Versline())
- logger.Info("ksnative :", ksnative)
- logger.Info("debug :", debug)
- logger.Info("spath :", spath)
- // if checkCassSchema && !cass.CheckCassandraSchema(t) {
- // assert.True(t, false, "cassandra schemda is different")
- // return
- // }
- // 文件名以数字开头
- fnmatch := mcfg.CommandArgs.GetString("match", `^\d+.*`)
- fw, err := filewalker.NewFileWalker(spath, fnmatch) // orderby: dirfirst, filefirst, fullpath
- if !assert.Nil(t, err, err) {
- return
- }
- logger.Info("walkdir: ", fw.WalkDir)
- logger.Info("pathmatch: ", fw.RePath)
- logger.Info("filematch: ", fw.ReFile)
- // test
- fns := []string{}
- fw.List(func(basedir, fpath string) bool { fns = append(fns, filepath.Join(basedir, fpath)); return true })
- if len(fns) > 1 {
- appfile, err := filepath.Abs(os.Args[0])
- if !assert.Nil(t, err) {
- logger.Info("加锁时发生系统错误:", err)
- return
- }
- appdir := filepath.Dir(appfile)
- // 1. 创建一个锁对象(注意:这里传入的是锁文件的路径)
- fl := flock.New(filepath.Join(appdir, "mql.lock"))
- // 2. 尝试获取独占锁(写锁),不阻塞
- // 如果加锁失败(被其他进程占用),acquired 为 false,不会报错
- acquired, err := fl.TryLock()
- if !assert.Nil(t, err) {
- logger.Info("加锁时发生系统错误:", err)
- return
- }
- if !acquired {
- log.Println("无法获取测试锁,可能另一个测试实例正在运行")
- t.Fail()
- return
- }
- // 3. 务必在操作完成后解锁(使用 defer 确保执行)
- defer fl.Unlock()
- }
- logger.Info(fmt.Sprint("fns:", "\n", strings.Join(fns, "\n")))
- if len(fns) == 0 {
- logger.Info("没有找到 MQL 文件")
- return
- }
- logger.Info("共找到", len(fns), "个文件")
- // 顺序读取当前目录下文件名为数字开头的文件,执行其中的mql
- (&MQLTest{}).Run(t, fw)
- // 等待延迟显示信息输出
- // time.Sleep(1 * time.Second)
- logger.Info("关闭与服务器的连接")
- ODBC.Close()
- // 等待正常结束,只是为了发现隐含的问题
- // for n := 0; runtime.NumGoroutine() > 60 && n < 10; n++ {
- // fmt.Println("剩余协程数:", runtime.NumGoroutine())
- // time.Sleep(1000 * time.Millisecond)
- // }
- logger.Info("剩余协程数:", runtime.NumGoroutine())
- }
- // action(a1,a2...)
- // --注释后的大括号之间的内容 -- {}
- // mql query之后执行的动作
- var reactions = regexp.MustCompile(`(\w+)\s*\(([^\)]*)\)`)
- // 替换为随机可见字符
- var rereplace_nbytes = regexp.MustCompile(`\.\.\.\s*(\d+)\s*[Bb][Yy][Tt][Ee]s?\s*\.\.\.`)
- var reactioneexprs = regexp.MustCompile(`(?s)\/\*\*(?:\s*(\w+\s*\(\s*.*\)\s*)*)*\*\*\/`)
- var reactioneexprs_2 = regexp.MustCompile(`(?s)(?:(?:^|\n)\-\-\s*\{(.*)\}\s*)+$`)
- var reactioneexprs_3 = regexp.MustCompile(`(?s)(?:^\-\-\s*\{(.*)\}\s*)+\n`)
- var commentexprs = regexp.MustCompile(`(?s)\/\*(?:[^\*]|\*+[^\*\/])*\*+\/`)
- var commentexprs_2 = regexp.MustCompile(`(?ms)(?:^|\n)\-\-[^\n]*(?:\n|$)`)
- var commentexprs_3 = regexp.MustCompile(`(?ms)(?:^|\n)//[^\n]*(?:\n|$)`)
- type MQLTest struct {
- t *testing.T
- fw *filewalker.FileWalker
- scopevars *ScopeVars
- }
- type ScopeVars struct {
- sync.RWMutex
- top *Variables
- dir map[string]*Variables
- file map[string]*Variables
- mql map[string]*Variables
- }
- type Variables struct {
- vars map[string]interface{}
- loop_count int
- loop_from int
- loop_step int
- timeout time.Duration
- qmeta odb.QueryMeta
- parallel_max int
- }
- type CurrentVars struct {
- loop_i int
- ch_parallel_count chan<- int
- mqlcount int32
- sleeptime time.Duration
- totalusetime time.Duration
- maxusetime time.Duration
- minusetime time.Duration
- }
- type GlobalVars struct {
- sync.RWMutex
- *CurrentVars
- wg_wait_fork_routine map[string]*sync.WaitGroup
- ch_wait_mql_done map[string]chan bool
- }
- func (mt *MQLTest) Run(t *testing.T, fw *filewalker.FileWalker) (ok bool) {
- mt.t = t
- mt.fw = fw
- global := &GlobalVars{
- CurrentVars: &CurrentVars{},
- wg_wait_fork_routine: make(map[string]*sync.WaitGroup),
- ch_wait_mql_done: make(map[string]chan bool),
- }
- mt.scopevars = &ScopeVars{
- top: &Variables{
- vars: map[string]interface{}{},
- loop_count: 1,
- loop_from: 1,
- loop_step: 1},
- dir: map[string]*Variables{},
- file: map[string]*Variables{},
- mql: map[string]*Variables{}}
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- var wg sync.WaitGroup
- st := time.Now()
- loop_i := 0
- parallel_queue := pqc.NewQueue[any](0)
- mqlcount := int32(0)
- for {
- mt.scopevars.Lock()
- ok := loop_i < mt.scopevars.top.loop_count
- mt.scopevars.Unlock()
- if !ok {
- break
- }
- loop_i++
- //
- ch_parallel_count := make(chan int)
- topvars := &CurrentVars{
- loop_i: loop_i,
- ch_parallel_count: ch_parallel_count,
- }
- //
- ok_chan := make(chan bool, 1)
- parallel_chan := make(chan bool, 1)
- parallel := false
- parallelcount := 0
- done := false
- wg.Add(1)
- go func() {
- defer func() {
- atomic.AddInt32(&mqlcount, topvars.mqlcount)
- wg.Done()
- }()
- ch_ok := make(chan bool)
- go func() {
- for {
- select {
- case <-ch_parallel_count:
- if !done && !parallel {
- parallel = true
- // 加入并发控制队列
- if parallelcount > 0 {
- if parallelcount > parallel_queue.Size() {
- parallel_queue.Growth(parallelcount)
- }
- parallel_queue.Push(1)
- }
- parallel_chan <- true
- }
- case ok := <-ch_ok:
- ok_chan <- ok
- if parallel {
- if parallelcount > 0 {
- // 从并发控制队列中移除
- parallel_queue.Pop()
- }
- } else {
- done = true
- }
- return
- }
- }
- }()
- ok := mt.RunAll(t, ctx,
- global,
- topvars)
- ch_ok <- ok
- if !ok {
- cancel() // 并发测试,有一个线程出错,就全停
- return
- }
- }()
- success := true
- select {
- case success = <-ok_chan: // 非并发,等待完成
- case <-parallel_chan: // 并发,执行继续下一次
- logger.Info("第", topvars.loop_i, "次并发执行继续")
- }
- if !success {
- return false // 失败,不等,直接返回
- }
- }
- wg.Wait()
- mt.scopevars.RLock()
- loop_count := mt.scopevars.top.loop_count
- mt.scopevars.RUnlock()
- ut := time.Since(st)
- aut := time.Duration(0)
- if mqlcount > 0 {
- aut = global.totalusetime / time.Duration(mqlcount)
- }
- logger.Info("完成 ", loop_count, " 次执行,共", mqlcount, "次 MQL 请求,耗时", ut, "单条响应时间", global.minusetime, "~", global.maxusetime, "/", aut, "平均每秒吞吐量", (int64(mqlcount)*int64(time.Second))/int64(ut))
- return
- }
- func (mt *MQLTest) RunAll(t *testing.T, ctx context.Context, global *GlobalVars, topvars *CurrentVars) bool {
- logger.Info("开始第", topvars.loop_i, "次执行")
- st := time.Now()
- // 读取文件列表
- listdirs := []string{}
- dirfiles := map[string][]string{}
- err := mt.fw.List(func(basedir, filename string) bool {
- if dirfiles[basedir] == nil {
- listdirs = append(listdirs, basedir)
- }
- dirfiles[basedir] = append(dirfiles[basedir], filename)
- return true
- })
- if !assert.Nil(t, err, err) {
- return false
- }
- // 执行所有目录
- for _, basedir := range listdirs {
- // 循环执行目录下所有文件
- mt.scopevars.Lock()
- if mt.scopevars.dir[basedir] == nil {
- mt.scopevars.dir[basedir] = &Variables{
- vars: map[string]interface{}{},
- loop_count: 1,
- loop_from: 1,
- loop_step: 1}
- }
- mt.scopevars.Unlock()
- var wg sync.WaitGroup
- st := time.Now()
- loop_i := 0
- parallel_queue := pqc.NewQueue[any](0)
- mqlcount := int32(0)
- for {
- mt.scopevars.Lock()
- ok := loop_i < mt.scopevars.dir[basedir].loop_count
- mt.scopevars.Unlock()
- if !ok {
- break
- }
- loop_i++
- //
- ch_parallel_count := make(chan int)
- dirvars := &CurrentVars{
- loop_i: loop_i,
- ch_parallel_count: ch_parallel_count,
- }
- //
- ok_chan := make(chan bool, 1)
- parallel_chan := make(chan bool, 1)
- parallel := false
- parallelcount := 0
- done := false
- wg.Add(1)
- go func(basedir string) {
- defer func() {
- atomic.AddInt32(&mqlcount, dirvars.mqlcount)
- wg.Done()
- }()
- ch_ok := make(chan bool)
- go func() {
- for {
- select {
- case <-ch_parallel_count:
- if !done && !parallel {
- parallel = true
- // 加入并发控制队列
- if parallelcount > 0 {
- if parallelcount > parallel_queue.Size() {
- parallel_queue.Growth(parallelcount)
- }
- parallel_queue.Push(1)
- }
- parallel_chan <- true
- }
- case ok := <-ch_ok:
- ok_chan <- ok
- if parallel {
- if parallelcount > 0 {
- // 从并发控制队列中移除
- parallel_queue.Pop()
- }
- } else {
- done = true
- }
- return
- }
- }
- }()
- // logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次执行开始")
- files := dirfiles[basedir]
- ch_ok <- mt.RunDir(t, ctx,
- global,
- topvars,
- dirvars,
- basedir, files)
- // logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次执行结束")
- }(basedir)
- success := true
- select {
- case success = <-ok_chan: // 非并发,等待完成
- logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次顺序执行完成")
- case <-parallel_chan: // 并发,执行继续下一次
- logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次并发执行继续")
- }
- if !success {
- return false
- }
- }
- wg.Wait()
- mt.scopevars.RLock()
- loop_count := mt.scopevars.dir[basedir].loop_count
- mt.scopevars.RUnlock()
- if loop_count > 1 {
- ut := time.Since(st)
- logger.Info(fmt.Sprint("dir ", basedir, " loop ", loop_count, " times, run ", mqlcount, " mqls, usetime ", ut))
- }
- }
- logger.Info("完成第", topvars.loop_i, "次执行,共", topvars.mqlcount, "次 MQL 请求,耗时", time.Since(st))
- return true
- }
- func (mt *MQLTest) RunDir(t *testing.T, ctx context.Context,
- global *GlobalVars,
- topvars *CurrentVars,
- dirvars *CurrentVars,
- basedir string, files []string) bool {
- for _, filename := range files {
- ok := mt.RunFile(t, ctx, global, topvars, dirvars, basedir, filename)
- if !ok {
- return false
- }
- }
- return true
- }
- func (mt *MQLTest) RunFile(t *testing.T, ctx context.Context,
- global *GlobalVars,
- topvars *CurrentVars,
- dirvars *CurrentVars,
- basedir, filename string) bool {
- // 读取文件内容
- ffpath := filepath.Join(basedir, filename)
- bs, err := os.ReadFile(ffpath)
- if !assert.Nil(t, err, err) {
- return false
- }
- // mql语句切分
- mqgr := NewMQLGroupRequest()
- var multilines *MQLRequest
- mqs := spliter.NewMQLSpliter(bufio.NewReader(strings.NewReader(string(bs))))
- for {
- mql, fromline, toline, fromchar, tochar, hasnext, _ := mqs.NextMQL()
- if !hasnext {
- break
- }
- if multilines != nil {
- if strings.TrimSpace(mql) == "multilines end" {
- e := mqgr.Append(multilines)
- if !assert.Nil(t, e, e) {
- return false
- }
- multilines = nil
- } else if multilines.OriginQueryString == "" {
- multilines.OriginQueryString = mql
- multilines.Fromline = fromline
- multilines.Toline = toline
- multilines.Fromchar = fromchar
- multilines.Tochar = tochar
- } else {
- multilines.OriginQueryString += ";" + mql
- multilines.Toline = toline
- multilines.Tochar = tochar
- }
- } else if strings.TrimSpace(mql) == "multilines begin" {
- multilines = &MQLRequest{FilePath: ffpath}
- } else {
- mqr := &MQLRequest{OriginQueryString: mql, FilePath: ffpath, Fromline: fromline, Toline: toline, Fromchar: fromchar, Tochar: tochar}
- e := mqgr.Append(mqr)
- if !assert.Nil(t, e, e) {
- return false
- }
- }
- }
- // mqls := spliter.MQLSplit(string(bs))
- mt.scopevars.Lock()
- if mt.scopevars.file[ffpath] == nil {
- mt.scopevars.file[ffpath] = &Variables{
- vars: map[string]interface{}{},
- loop_count: 1,
- loop_from: 1,
- loop_step: 1}
- }
- mt.scopevars.Unlock()
- var wg sync.WaitGroup
- st := time.Now()
- loop_i := 0
- parallel_queue := pqc.NewQueue[any](0)
- mqlcount := int32(0)
- for {
- mt.scopevars.Lock()
- ok := loop_i < mt.scopevars.file[ffpath].loop_count
- mt.scopevars.Unlock()
- if !ok {
- break
- }
- loop_i++
- //
- ch_parallel_count := make(chan int)
- filevars := &CurrentVars{
- loop_i: loop_i,
- ch_parallel_count: ch_parallel_count,
- }
- //
- ok_chan := make(chan bool, 1)
- parallel_chan := make(chan bool, 1)
- parallel := false
- parallelcount := 0
- done := false
- wg.Add(1)
- go func() {
- defer func() {
- atomic.AddInt32(&mqlcount, filevars.mqlcount)
- wg.Done()
- }()
- ch_ok := make(chan bool)
- go func() {
- for {
- select {
- case <-ch_parallel_count:
- if !done && !parallel {
- parallel = true
- // 加入并发控制队列
- if parallelcount > 0 {
- if parallelcount > parallel_queue.Size() {
- parallel_queue.Growth(parallelcount)
- }
- parallel_queue.Push(1)
- }
- parallel_chan <- true
- }
- case ok := <-ch_ok:
- ok_chan <- ok
- if parallel {
- if parallelcount > 0 {
- // 从并发控制队列中移除
- parallel_queue.Pop()
- }
- } else {
- done = true
- }
- return
- }
- }
- }()
- // logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次执行开始")
- ch_ok <- mt.RunMQLGroup(t, ctx,
- global,
- topvars,
- dirvars,
- filevars,
- basedir, ffpath, mqgr)
- // logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次执行结束")
- }()
- success := true
- select {
- case success = <-ok_chan: // 非并发,等待完成
- logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次顺序执行完成")
- case <-parallel_chan: // 并发,执行继续下一次
- logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次并发执行继续")
- }
- if !success {
- return false
- }
- }
- wg.Wait()
- mt.scopevars.RLock()
- loop_count := mt.scopevars.file[ffpath].loop_count
- mt.scopevars.RUnlock()
- if loop_count > 1 {
- ut := time.Since(st)
- sn := fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i)
- logger.Info(fmt.Sprint("file ", ffpath+"/"+sn, " loop ", loop_count, " times, run ", mqlcount, " mqls, usetime ", ut))
- }
- return true
- }
- func (mt *MQLTest) RunMQLGroup(t *testing.T, ctx context.Context,
- global *GlobalVars,
- topvars *CurrentVars,
- dirvars *CurrentVars,
- filevars *CurrentVars,
- basedir, ffpath string, mqgr *MQLGroupRequest) (pass bool) {
- pass = true
- var wg sync.WaitGroup
- for _, mqs := range mqgr.mqrs {
- wg.Add(1)
- go func(mqs []*MQLRequest) {
- defer wg.Done()
- if len(mqs) == 0 {
- return
- }
- if mqs[0].StaticActions.ForkName != nil {
- forkname := *mqs[0].StaticActions.ForkName
- global.Lock()
- wg := global.wg_wait_fork_routine[forkname]
- if wg == nil {
- wg = &sync.WaitGroup{}
- global.wg_wait_fork_routine[forkname] = wg
- }
- global.Unlock()
- wg.Add(1)
- defer wg.Done()
- }
- ok := mt.RunMQLs(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqs)
- if !ok {
- pass = false
- }
- }(mqs)
- }
- wg.Wait()
- return
- }
- func (mt *MQLTest) RunMQLs(t *testing.T, ctx context.Context,
- global *GlobalVars,
- topvars *CurrentVars,
- dirvars *CurrentVars,
- filevars *CurrentVars,
- basedir, ffpath string, mqrs []*MQLRequest) bool {
- for _, mqr := range mqrs {
- mqlkey := mqr.Key
- mqlstr := mqr.OriginQueryString
- if mqlstr == "" {
- continue
- }
- staticactions := mqr.StaticActions
- // 设置执行过程中的控制参数
- mt.InitScopeVars(basedir, ffpath, mqlkey, staticactions)
- ch_test_run_one_mql_result := make(chan bool)
- go func() {
- mqrinst := fmt.Sprint(mqlkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
- global.Lock()
- mqrdone := global.ch_wait_mql_done[mqrinst]
- if mqrdone == nil {
- mqrdone = make(chan bool)
- global.ch_wait_mql_done[mqrinst] = mqrdone
- }
- var waitmqrdone chan bool
- if mqr.WaitMQLRequest != nil {
- waitmqrkey := mqr.WaitMQLRequest.Key
- waitmqrinst := fmt.Sprint(waitmqrkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
- waitmqrdone = global.ch_wait_mql_done[waitmqrinst]
- if waitmqrdone == nil {
- waitmqrdone = make(chan bool)
- global.ch_wait_mql_done[waitmqrinst] = waitmqrdone
- }
- }
- global.Unlock()
- var ret bool
- defer func() { mqrdone <- ret }()
- if waitmqrdone != nil {
- v := <-waitmqrdone
- waitmqrdone <- v
- if !v {
- // 依赖MQR失败
- ch_test_run_one_mql_result <- false
- return
- }
- }
- ret = mt.RunMQR(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqr)
- ch_test_run_one_mql_result <- ret
- }()
- ret := <-ch_test_run_one_mql_result
- if !ret {
- return ret
- }
- // continue
- }
- return true
- }
- func (mt *MQLTest) RunMQR(t *testing.T, ctx context.Context,
- global *GlobalVars,
- topvars *CurrentVars,
- dirvars *CurrentVars,
- filevars *CurrentVars,
- basedir, ffpath string, mqr *MQLRequest) bool {
- mqlkey := mqr.Key
- staticactionexprs := mqr.StaticActionExprs
- staticactions := mqr.StaticActions
- return t.Run(mqlkey, func(t *testing.T) {
- var wg sync.WaitGroup
- st := time.Now()
- loop_i := 0
- parallel_queue := pqc.NewQueue[any](0)
- mqlcount := int32(0)
- for {
- mt.scopevars.Lock()
- ok := loop_i < mt.scopevars.mql[mqlkey].loop_count
- mt.scopevars.Unlock()
- if !ok {
- break
- }
- loop_i++
- //
- ch_parallel_count := make(chan int)
- mqlvars := &CurrentVars{
- loop_i: loop_i,
- ch_parallel_count: ch_parallel_count,
- }
- // 运行实例
- mqlsn := fmt.Sprint(mqlkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i, ".", mqlvars.loop_i)
- mt.scopevars.Lock()
- if mt.scopevars.mql[mqlsn] == nil {
- mt.scopevars.mql[mqlsn] = &Variables{
- vars: map[string]interface{}{},
- loop_count: 1,
- loop_from: 1,
- loop_step: 1}
- }
- mt.scopevars.Unlock()
- //
- mt.scopevars.Lock()
- top_loopi := mt.scopevars.top.loop_from + (topvars.loop_i-1)*mt.scopevars.top.loop_step
- mt.scopevars.top.vars["topi"] = top_loopi
- dir_loopi := mt.scopevars.dir[basedir].loop_from + (dirvars.loop_i-1)*mt.scopevars.dir[basedir].loop_step
- mt.scopevars.dir[basedir].vars["diri"] = dir_loopi
- mt.scopevars.dir[basedir].vars["topi"] = top_loopi
- file_loopi := mt.scopevars.file[ffpath].loop_from + (filevars.loop_i-1)*mt.scopevars.file[ffpath].loop_step
- mt.scopevars.file[ffpath].vars["filei"] = file_loopi
- mt.scopevars.file[ffpath].vars["diri"] = dir_loopi
- mt.scopevars.file[ffpath].vars["topi"] = top_loopi
- mql_loopi := mt.scopevars.mql[mqlkey].loop_from + (mqlvars.loop_i-1)*mt.scopevars.mql[mqlkey].loop_step
- mt.scopevars.mql[mqlkey].vars["mqli"] = mql_loopi
- mt.scopevars.mql[mqlkey].vars["filei"] = file_loopi
- mt.scopevars.mql[mqlkey].vars["diri"] = dir_loopi
- mt.scopevars.mql[mqlkey].vars["topi"] = top_loopi
- //
- mt.scopevars.mql[mqlsn].vars["mqli"] = mql_loopi
- mt.scopevars.mql[mqlsn].vars["filei"] = file_loopi
- mt.scopevars.mql[mqlsn].vars["diri"] = dir_loopi
- mt.scopevars.mql[mqlsn].vars["topi"] = top_loopi
- mt.scopevars.Unlock()
- mt.BeforeRunAction(basedir, ffpath, mqlkey, mqlsn, staticactions)
- formatedmqlstr := mqr.FormatedQueryString
- mql := mt.ReplaceLoopSN(formatedmqlstr,
- global, topvars, dirvars, filevars, mqlvars,
- top_loopi, dir_loopi, file_loopi, mql_loopi,
- basedir, ffpath, mqlkey, mqlsn,
- )
- mqlreplace := rereplace_nbytes.FindAllStringSubmatch(mql, -1)
- for _, mqr := range mqlreplace {
- if len(mqr) == 2 {
- bs := make([]byte, cast.ToInt(mqr[1]))
- for i := 0; i < len(bs); i++ {
- bs[i] = byte(32 + rand.Intn(91))
- }
- s := string(bs)
- s = strings.ReplaceAll(s, ")", string([]byte{123}))
- s = strings.ReplaceAll(s, "'", string([]byte{124}))
- s = strings.ReplaceAll(s, "\"", string([]byte{125}))
- s = strings.ReplaceAll(s, "\\", string([]byte{126}))
- mql = strings.Replace(mql, mqr[0], s, 1)
- }
- }
- for i, sat := range staticactionexprs {
- mql = strings.Replace(mql, "["+strconv.Itoa(i)+"]", sat.SourceText, 1)
- }
- mql = strings.ReplaceAll(mql, "[[]", "[")
- mqri := &MQLRequestInstance{
- MQLRequest: mqr,
- PreparedQueryString: mql,
- }
- //
- ok_chan := make(chan error, 1)
- parallel_chan := make(chan bool, 1)
- parallel := false
- parallelcount := 0
- done := false
- wg.Add(1)
- go func(mqlvars *CurrentVars) {
- defer func() {
- atomic.AddInt32(&mqlcount, mqlvars.mqlcount)
- wg.Done()
- }()
- ch_ok := make(chan error)
- go func() {
- for {
- select {
- case parallelcount = <-ch_parallel_count:
- if !done && !parallel {
- parallel = true
- // 加入并发控制队列
- if parallelcount > 0 {
- if parallelcount > parallel_queue.Size() {
- parallel_queue.Growth(parallelcount)
- }
- parallel_queue.Push(1)
- }
- parallel_chan <- true
- }
- case ok := <-ch_ok:
- ok_chan <- ok
- if parallel {
- if parallelcount > 0 {
- // 从并发控制队列中移除
- parallel_queue.Pop()
- }
- } else {
- done = true
- }
- return
- }
- }
- }()
- nstaticactionexprs := map[int]*Action{}
- for i, sat := range staticactionexprs {
- nstaticactionexprs[i] = sat
- }
- // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次执行开始")
- ch_ok <- mt.RunMQL(t, ctx,
- global,
- topvars,
- dirvars,
- filevars,
- mqlvars,
- basedir, ffpath, mqlkey, mqlsn, mqri, staticactions, nstaticactionexprs)
- // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次执行结束")
- }(mqlvars)
- var err error
- select {
- case err = <-ok_chan: // 非并发,等待完成
- // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次顺序执行完成")
- case <-parallel_chan: // 并发,执行继续下一次
- // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次并发执行继续")
- }
- if err != nil {
- // logger.Error(err)
- break
- }
- }
- wg.Wait()
- mt.scopevars.RLock()
- loop_count := mt.scopevars.mql[mqlkey].loop_count
- mt.scopevars.RUnlock()
- if loop_count > 1 {
- ut := time.Since(st)
- sn := fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
- as := ""
- if topvars.sleeptime > 0 {
- as = fmt.Sprint(", sleep ", topvars.sleeptime)
- }
- logger.Info(fmt.Sprint("mql ", mqlkey+"/"+sn, " loop ", loop_i, " times, usetime ", ut, as))
- }
- })
- }
- func (mt *MQLTest) getValue(basedir, ffpath, mqlkey, mqlsn string,
- staticactions *StaticActions, a any) (ak string, va any) {
- switch av := a.(type) {
- case *Action:
- switch av.Name {
- case "<EQ>":
- if len(av.Args) >= 2 {
- _, x := mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, av.Args[0])
- _, y := mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, av.Args[1])
- if cast.ToString(x) == cast.ToString(y) {
- return "", true
- }
- }
- return "", false
- }
- case *qstru.Expr:
- ak = av.VarName
- if ak == "now" {
- return "", time.Now()
- }
- mt.scopevars.Lock()
- switch staticactions.Scope {
- case "top":
- va = mt.scopevars.top.vars[ak]
- case "dir":
- va = mt.scopevars.dir[basedir].vars[ak]
- case "file":
- va = mt.scopevars.file[ffpath].vars[ak]
- default:
- var has bool
- va, has = mt.scopevars.mql[mqlsn].vars[ak]
- if !has {
- va = mt.scopevars.mql[mqlkey].vars[ak]
- }
- }
- mt.scopevars.Unlock()
- case string:
- va = av
- for _, tf := range time_layouts {
- t, e := time.Parse(tf, av)
- if e == nil {
- va = t
- break
- }
- }
- default:
- va = a
- }
- return
- }
- func (mt *MQLTest) setValue(basedir, ffpath, mqlkey, mqlsn string,
- staticactions *StaticActions, ak string, va any) {
- mt.scopevars.Lock()
- switch staticactions.Scope {
- case "top":
- mt.scopevars.top.vars[ak] = va
- case "dir":
- mt.scopevars.dir[basedir].vars[ak] = va
- case "file":
- mt.scopevars.file[ffpath].vars[ak] = va
- default:
- _, has := mt.scopevars.mql[mqlsn].vars[ak]
- if has {
- mt.scopevars.mql[mqlsn].vars[ak] = va
- } else {
- mt.scopevars.mql[mqlkey].vars[ak] = va
- }
- }
- mt.scopevars.Unlock()
- }
- func (mt *MQLTest) BeforeRunAction(basedir, ffpath, mqlkey, mqlsn string,
- staticactions *StaticActions,
- ) {
- mt.runAction(basedir, ffpath, mqlkey, mqlsn, staticactions, staticactions.BeforeRunActions...)
- }
- func (mt *MQLTest) runAction(basedir, ffpath, mqlkey, mqlsn string,
- staticactions *StaticActions, actions ...*Action,
- ) {
- for _, act := range actions {
- switch act.Name {
- case "set":
- var a, b any
- var ak string
- if len(act.Args) > 1 {
- a = act.Args[0]
- b = act.Args[1]
- }
- ak, _ = mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, a)
- _, vb := mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, b)
- mt.setValue(basedir, ffpath, mqlkey, mqlsn, staticactions, ak, vb)
- case "add":
- var a, b, va, vb any
- var ak string
- if len(act.Args) > 1 {
- a = act.Args[0]
- b = act.Args[1]
- }
- ak, va = mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, a)
- _, vb = mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, b)
- switch v := va.(type) {
- case time.Time:
- unit := "second"
- if len(act.Args) > 2 {
- unit = cast.ToString(act.Args[2])
- }
- vb := cast.ToInt(vb)
- d := time.Duration(0)
- switch unit {
- case "second", "seconds", "s", "秒":
- d = time.Duration(vb) * time.Second
- case "minute", "minutes", "m", "分":
- d = time.Duration(vb) * time.Minute
- case "hour", "hours", "h", "时":
- d = time.Duration(vb) * time.Hour
- }
- v = v.Add(d)
- mt.setValue(basedir, ffpath, mqlkey, mqlsn, staticactions, ak, v)
- }
- case "mod":
- var a, b, va, vb any
- var ak string
- if len(act.Args) > 1 {
- a = act.Args[0]
- b = act.Args[1]
- }
- ak, va = mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, a)
- _, vb = mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, b)
- av := cast.ToInt(va)
- bv := cast.ToInt(vb)
- v := av % bv
- mt.setValue(basedir, ffpath, mqlkey, mqlsn, staticactions, ak, v)
- case "case":
- var a, b, va any
- if len(act.Args) > 1 {
- a = act.Args[0]
- b = act.Args[1]
- }
- _, va = mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, a)
- if cast.ToBool(va) {
- switch vb := b.(type) {
- case *Action:
- mt.runAction(basedir, ffpath, mqlkey, mqlsn, staticactions, vb)
- }
- }
- }
- }
- }
- func (mt *MQLTest) InitScopeVars(basedir, ffpath, mqlkey string,
- staticactions *StaticActions,
- ) {
- mt.scopevars.Lock()
- if mt.scopevars.mql[mqlkey] == nil {
- mt.scopevars.mql[mqlkey] = &Variables{
- vars: map[string]interface{}{},
- loop_count: 1,
- loop_from: 1,
- loop_step: 1}
- }
- mt.scopevars.Unlock()
- for k, v := range staticactions.Variables {
- mt.scopevars.Lock()
- switch staticactions.Scope {
- case "top":
- mt.scopevars.top.vars[k] = v
- case "dir":
- mt.scopevars.dir[basedir].vars[k] = v
- case "file":
- mt.scopevars.file[ffpath].vars[k] = v
- default:
- mt.scopevars.mql[mqlkey].vars[k] = v
- }
- mt.scopevars.Unlock()
- }
- if staticactions.Timeout != nil {
- mt.scopevars.Lock()
- switch staticactions.Scope {
- case "top":
- mt.scopevars.top.timeout = *staticactions.Timeout
- case "dir":
- mt.scopevars.dir[basedir].timeout = *staticactions.Timeout
- case "file":
- mt.scopevars.file[ffpath].timeout = *staticactions.Timeout
- default:
- mt.scopevars.mql[mqlkey].timeout = *staticactions.Timeout
- }
- mt.scopevars.Unlock()
- }
- if staticactions.Qmeta != nil {
- mt.scopevars.Lock()
- switch staticactions.Scope {
- case "top":
- mt.scopevars.top.qmeta = staticactions.Qmeta
- case "dir":
- mt.scopevars.dir[basedir].qmeta = staticactions.Qmeta
- case "file":
- mt.scopevars.file[ffpath].qmeta = staticactions.Qmeta
- default:
- mt.scopevars.mql[mqlkey].qmeta = staticactions.Qmeta
- }
- mt.scopevars.Unlock()
- }
- if staticactions.LoopCount != nil {
- mt.scopevars.Lock()
- switch staticactions.Scope {
- case "top":
- mt.scopevars.top.loop_count = *staticactions.LoopCount
- case "dir":
- mt.scopevars.dir[basedir].loop_count = *staticactions.LoopCount
- case "file":
- mt.scopevars.file[ffpath].loop_count = *staticactions.LoopCount
- default:
- mt.scopevars.mql[mqlkey].loop_count = *staticactions.LoopCount
- }
- mt.scopevars.Unlock()
- }
- if staticactions.LoopFrom != nil {
- mt.scopevars.Lock()
- switch staticactions.Scope {
- case "top":
- mt.scopevars.top.loop_from = *staticactions.LoopFrom
- case "dir":
- mt.scopevars.dir[basedir].loop_from = *staticactions.LoopFrom
- case "file":
- mt.scopevars.file[ffpath].loop_from = *staticactions.LoopFrom
- default:
- mt.scopevars.mql[mqlkey].loop_from = *staticactions.LoopFrom
- }
- mt.scopevars.Unlock()
- }
- if staticactions.LoopStep != nil {
- mt.scopevars.Lock()
- switch staticactions.Scope {
- case "top":
- mt.scopevars.top.loop_step = *staticactions.LoopStep
- case "dir":
- mt.scopevars.dir[basedir].loop_step = *staticactions.LoopStep
- case "file":
- mt.scopevars.file[ffpath].loop_step = *staticactions.LoopStep
- default:
- mt.scopevars.mql[mqlkey].loop_step = *staticactions.LoopStep
- }
- mt.scopevars.Unlock()
- }
- }
- func (mt *MQLTest) RunMQL(t *testing.T, ctx context.Context,
- global *GlobalVars,
- topvars *CurrentVars,
- dirvars *CurrentVars,
- filevars *CurrentVars,
- mqlvars *CurrentVars,
- basedir, ffpath, mqlkey, mqlsn string, mqri *MQLRequestInstance, staticactions *StaticActions, staticactionexprs map[int]*Action) error {
- mqlstr := mqri.PreparedQueryString
- if staticactions.WaitName != nil {
- global.Lock()
- wgs := []*sync.WaitGroup{}
- if *staticactions.WaitName == "" {
- for _, wg := range global.wg_wait_fork_routine {
- wgs = append(wgs, wg)
- }
- } else {
- wg := global.wg_wait_fork_routine[*staticactions.WaitName]
- if wg != nil {
- wgs = append(wgs, wg)
- }
- }
- global.Unlock()
- for _, wg := range wgs {
- wg.Wait()
- }
- }
- if staticactions.ParallelCount != nil {
- mt.scopevars.Lock()
- switch staticactions.Scope {
- case "top":
- topvars.ch_parallel_count <- *staticactions.ParallelCount
- case "dir":
- dirvars.ch_parallel_count <- *staticactions.ParallelCount
- case "file":
- filevars.ch_parallel_count <- *staticactions.ParallelCount
- default:
- mqlvars.ch_parallel_count <- *staticactions.ParallelCount
- }
- mt.scopevars.Unlock()
- }
- // 重新获取修正后的动作
- actionexprs, e := getActionExprs(mqlstr, staticactionexprs)
- if !assert.Nil(t, e, e) {
- return e
- }
- dynamicactions := actionexprs.DynamicActions()
- if len(dynamicactions.SubscribeArgs) > 0 {
- subscribe(dynamicactions.SubscribeArgs...)
- }
- if len(dynamicactions.UnsubscribeArgs) > 0 {
- unsubscribe(dynamicactions.UnsubscribeArgs...)
- }
- values := []interface{}{}
- if len(dynamicactions.Params) > 0 {
- err := json.Unmarshal([]byte(dynamicactions.Params), &values)
- if err != nil {
- assert.Nil(t, fmt.Sprint("params参数只支持JSON Array,", dynamicactions.Params, mqlstr), err)
- return err
- }
- }
- x := atomic.AddInt32(&global.mqlcount, 1)
- atomic.AddInt32(&topvars.mqlcount, 1)
- atomic.AddInt32(&dirvars.mqlcount, 1)
- atomic.AddInt32(&filevars.mqlcount, 1)
- atomic.AddInt32(&mqlvars.mqlcount, 1)
- err := mt.RunMQLTryDo(t, ctx,
- global,
- topvars,
- dirvars,
- filevars,
- mqlvars,
- basedir, ffpath, mqlkey, mqlsn+"("+strconv.Itoa(int(x))+")", mqri, values, staticactions, actionexprs)
- if err != nil {
- // 执行过程有错,停止继续执行
- return err
- }
- if staticactions.SleepTime != nil {
- global.sleeptime += *staticactions.SleepTime
- topvars.sleeptime += *staticactions.SleepTime
- dirvars.sleeptime += *staticactions.SleepTime
- filevars.sleeptime += *staticactions.SleepTime
- mqlvars.sleeptime += *staticactions.SleepTime
- time.Sleep(*staticactions.SleepTime)
- }
- return nil
- }
|