mql_run.go 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422
  1. package odbcmql
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "log"
  8. "math/rand"
  9. "os"
  10. "path/filepath"
  11. "regexp"
  12. "runtime"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "sync/atomic"
  17. "testing"
  18. "time"
  19. odb "git.wecise.com/wecise/odb-go/odb"
  20. "git.wecise.com/wecise/odb-go/odb/mql/qstru"
  21. "gitee.com/wecisecode/util/cast"
  22. mcfg "gitee.com/wecisecode/util/cfg"
  23. "gitee.com/wecisecode/util/filewalker"
  24. "gitee.com/wecisecode/util/mfmt"
  25. "gitee.com/wecisecode/util/pqc"
  26. "gitee.com/wecisecode/util/set/strset"
  27. "gitee.com/wecisecode/util/spliter"
  28. "github.com/gofrs/flock"
  29. "github.com/stretchr/testify/assert"
  30. )
  31. var ODBC odb.Client
  32. var ODBError error
  33. var debug bool
  34. // var checkCassSchema bool
  35. var default_keyspace = `oktest`
  36. var default_odbpaths = `127.0.0.1:11001`
  37. var default_timeout = 1 * time.Minute
  38. var ksnative = default_keyspace + "_native"
  39. func init() {
  40. logprefix := config.GetString("log.prefix", "")
  41. if logprefix != "" {
  42. logprefix += " "
  43. }
  44. logger.SetFormat(logprefix+"yyyy-MM-dd HH:mm:ss.SSSSSS [pid] [level] msg", "\n")
  45. odbpaths := strset.New(strings.Split(mcfg.CommandArgs.GetString("odb",
  46. strings.Join(mcfg.Environs.GetStrings("ODBPATH",
  47. config.GetStrings("odbc.odbpath", default_odbpaths)...), ",")), ",")...).List()
  48. keyspace := mcfg.CommandArgs.GetString("keyspace",
  49. mcfg.Environs.GetString("KEYSPACE", config.GetString("odbc.keyspace", default_keyspace)))
  50. cassdc := mcfg.CommandArgs.GetString("cassandra.dc",
  51. mcfg.Environs.GetString("CASSANDRADC", config.GetString("cassandra.dc", "")))
  52. if cassdc == "" || cassdc == "dc1" {
  53. ksnative = keyspace + "_native"
  54. } else {
  55. ksnative = keyspace + "_" + cassdc + "_native"
  56. }
  57. ksnative = mcfg.CommandArgs.GetString("ksnative",
  58. mcfg.Environs.GetString("KSNATIVE", config.GetString("odbc.ksnative", ksnative)))
  59. debug = mcfg.CommandArgs.GetBool("debug", false)
  60. // checkCassSchema = mcfg.CommandArgs.GetBool("ccs", true)
  61. ODBC, ODBError = odb.NewClient(&odb.Config{
  62. Keyspace: keyspace,
  63. Hosts: odbpaths,
  64. Debug: debug,
  65. })
  66. }
  67. func Run(t *testing.T) {
  68. if mcfg.CommandArgs.GetString("keyspace?") == "keyspace?" {
  69. if ODBC != nil {
  70. fmt.Println(ODBC.Config().Keyspace)
  71. os.Exit(0)
  72. }
  73. os.Exit(0)
  74. }
  75. if mcfg.CommandArgs.GetString("odbpath?") == "odbpath?" {
  76. if ODBC != nil {
  77. fmt.Println(ODBC.Config().Hosts)
  78. os.Exit(0)
  79. }
  80. os.Exit(0)
  81. }
  82. if mcfg.CommandArgs.GetString("odbver?") == "odbver?" {
  83. if ODBC != nil {
  84. fmt.Println(ODBC.Versline())
  85. os.Exit(0)
  86. }
  87. os.Exit(0)
  88. }
  89. spath := strings.Split(config.GetString("test.mql.path"), " ")
  90. if len(os.Args) > 1 {
  91. if len(os.Args) > 2 && os.Args[1] == "-" {
  92. s := strings.Join(os.Args[2:], " ")
  93. spath = []string{"/tmp/1.mql"}
  94. os.WriteFile(spath[0], []byte(s+"\n/**output()**/"), os.ModePerm)
  95. } else {
  96. for i := 1; i < len(os.Args); i++ {
  97. ap := os.Args[i]
  98. if strings.HasPrefix(ap, "-test.") {
  99. break
  100. }
  101. if strings.Contains(ap, "=") {
  102. continue
  103. }
  104. if ap == "files?" {
  105. continue
  106. }
  107. if (ap == "-" || ap == "--") && i+1 < len(os.Args) {
  108. s := strings.Join(os.Args[i+1:], " ")
  109. spath = []string{"/tmp/1.mql"}
  110. os.WriteFile(spath[0], []byte(s+"\n/**output()**/"), os.ModePerm)
  111. break
  112. }
  113. spath = append(spath, ap)
  114. }
  115. }
  116. }
  117. if len(spath) == 0 || len(spath) == 1 && len(spath[0]) == 0 {
  118. if mcfg.CommandArgs.GetString("files?") == "files?" {
  119. os.Exit(0)
  120. }
  121. fmt.Print(`循环遍历执行指定路径下的 mql 文件
  122. usage: mql path [options] -- mqlstatement
  123. path 指定mql文件所在路径,通配符 ** 表示任意字符,* 表示除分隔符以外的任意字符,. 表示递归当前目录下的所有子目录
  124. 为避免 shell 自动将 * 转换为文件名列表,可以将指定的 path 用引号包含
  125. mqlstatement 指定要执行的 mql 语句,支持多条语句,语句之间用分号 ; 分隔
  126. options:
  127. match=^\\d+.* 文件名匹配正则表达式,默认为所有以数字开头命名的文件
  128. odb=` + default_odbpaths + ` 指定odbserver路径,默认通过环境变量ODBPATH或通过ETCD相关配置获取
  129. keyspace=` + default_keyspace + ` 指定keyspace,默认通过环境变量KEYSPACE获取
  130. ksnative=` + default_keyspace + `_native 指定native keyspace,默认通过环境变量KSNATIVE获取
  131. debug=true 开启调试模式,输出更多信息
  132. 环境变量需求:
  133. KEYSPACE=` + default_keyspace + `
  134. KSNATIVE=` + default_keyspace + `_native
  135. ODBPATH=` + default_odbpaths + `
  136. ETCDPATH=127.0.0.1:2379
  137. ETCDUSER=
  138. ETCDPASS=
  139. CASSANDRAPATH=127.0.0.1
  140. CASSANDRALOOKUP=false
  141. NATSPATH=nats://user:N5JbKeT1C3uOUh317OVXjg==@127.0.0.1:4222
  142. LANG=zh_CN.utf8
  143. LC_ALL=zh_CN.utf8
  144. 可通过mql.conf配置运行参数:
  145. [odbc]
  146. ;指定odbserver路径
  147. odbpath=` + default_odbpaths + `
  148. ;指定keyspace
  149. keyspace=` + default_keyspace + `
  150. ;指定native keyspace
  151. ksnative=` + default_keyspace + `_native
  152. ;指定客户端默认超时时间
  153. client.timeout=` + mfmt.FormatDuration(default_timeout) + `
  154. mql语句扩展说明:
  155. mql语句中的内容替换:
  156. 随机字符替换:
  157. mql语句中的 '...N Bytes...' 会被替换为 N 个随机可见字符,N 为自然数
  158. 变量替换:
  159. mql语句中 '{%<f>,<varname>}' 形式的内容会被替换为,以 %<f> 作为格式化标记的 <varname> 变量的内容
  160. 内容中的相似内容 '{%...,...}' 可以转义为 '{%%...,...}' 来避免被替换
  161. 变量不存在时,不进行替换
  162. 目前支持的 <varname> 包括:
  163. keyspace 当前指定的 Keyspace
  164. ksnative Native Keyspace
  165. mqli 当前mql语句的循环值,参考 loop(N)
  166. filei 当前文件的循环值,参考 scope(file) loop(N)
  167. diri 当前目录的循环值,参考 scope(dir) loop(N)
  168. topi 整个执行进程的循环值,参考 scope(top) loop(N)
  169. mqlcount 当前mql语句的总计数
  170. filecount 当前文件的总计数
  171. dircount 当前目录的总计数
  172. topcount 整个执行进程的总计数
  173. rand 0 ~ 1000000 之间的随机整数
  174. now 当前时间,格式标记 t 相当于 2006-01-02 15:04:05
  175. mql语句中第一个 /** 到 最后一个 **/ 之间的内容可以标注执行若干预定义动作,多个动作标注用空格分隔
  176. 无法解析为预定义动作的内容作为普通注释直接跳过,不会报错
  177. 如 /** output() sleep(1) **/ 表示忽略当前语句执行过程中的任何报错,执行完成后等待一秒后再继续执行下一语句
  178. 目前支持的预定义动作如下:
  179. skip() 跳过当前语句,以及其它的所有预定义动作
  180. 可用于暂时屏蔽一条语句的执行
  181. params(V) 用于提供 prepare 语句中 ? 的对应值,V 为 JSON 格式编码的数组
  182. 对应值中不能有包含逗号和右括号,如果需要逗号用 \u002c 代替,右括号用 \u0029 代替
  183. subscribe(S) 订阅指定主题的消息通知 S,输出收到的通知信息,S 为要订阅主题的的名称字符串,同一主题只会被订阅一次
  184. unsubscribe(S) 取消订阅指定主题的消息通知 S,S 为要订阅主题的名称字符串
  185. fork(G) 新建并行线程分支,执行当前语句的同时,继续执行后续语句,G 为线程组名,配合 wait 使用
  186. wait(G) 执行当前语句前,等待之前创建的分支结束,G 为线程组名,对应由 fork 创建的所有同名线程分支,G 为空表示所有
  187. beforerun(F,...) 执行当前语句前,执行一系列预定义函数,以通过一些简单逻辑处理产生模拟数据,目前支持的函数包括
  188. set(S,V) 设置变量,S 为变量名,V为变量值,目前仅支持时间值和整数值
  189. add(S,V,u) 累加变量,S 为变量名,V为增量值,u为可选的变量单位,如时间段单位 second,minute,hour
  190. mod(S,V) 变量取模,相当于 S = S % V
  191. case(C,F) 根据条件 C 执行函数 F,C为比较判断表达式,目前仅支持等于“=”判断
  192. ------------- 针对错误处理的相关动作
  193. retry(N) 出错重试 N 次,N为自然数,默认为 0,
  194. onerror(O,E,...) retry后仍然有错时的处理方法,E为特定的错误信息关键字,不指定则为任意错误,O为处理方法,包括:
  195. break 中断当前语句的循环,继续顺序执行其它语句,没有循环时与 continue 无区别
  196. continue 忽略报错,继续执行,在循环中时,继续下一轮循环
  197. must 报错才正常,否则中断执行,报告期待错误信息不符,退出当前进程
  198. exit 停止执行,退出当前进程,此为默认处理方式
  199. noerrinfo() 存在 retry 或 onerror 处理时,不输出错误信息
  200. ------------- 以上的预定义动作只对当前语句起作用,下面的预定义动作可以指定其作用范围
  201. scope(S) 相关动作的作用范围 S 包括 top,dir,file,mql,默认作用范围为 mql 当前语句
  202. 如:scope(file) parallel(3) loop(5) 并发循环执行本文件五次,最大并行数限制为3
  203. qmeta(O) mql执行时设置特定的选项,O为json对象
  204. timeout(D) mql执行超时设置,D 为时间段,默认为一分钟(可通过配置 odbc.client.timeout 修改)
  205. 时间段 D 支持单位 d(天),h(时),m(分),s(秒),ms(毫秒),us(微秒),ns(纳秒),默认为毫秒
  206. loop(N) 循环执行 N 次,N 为正整数,默认执行一次
  207. 执行前会替换 mql 语句及 params 参数中的循环次数标记
  208. loopfrom(N) 设置循环执行计数起始值 N,默认为 1,该值仅影响替换 mql 语句及 params 参数中的循环次数标记
  209. loopstep(N) 设置循环执行计数步长值 N,默认为 1,该值仅影响替换 mql 语句及 params 参数中的循环次数标记
  210. parallel(N) 并发执行,N 为最大并行数,不指定 N 表示不限制并行数
  211. set(S,V) 设置变量,S 为变量名,V为变量值
  212. ------------- 以上为执行语句前的预定义动作,下面是执行完成后的预定义动作
  213. sleep(D) 执行完成后等待时间段 D 后再继续执行下一语句
  214. output() 输出执行结果
  215. outputcount() 输出执行结果记录条数
  216. metainfo() 输出执行结果相关的元信息
  217. schema(C) 执行 schema 命令检查指定类 C 是否存在
  218. count(N) 检查返回结果中的数据记录数是否为 N,N 为自然数
  219. equal(N,F,V) 判断返回结果中第 N 条数据的字段 F 的值是否为 V
  220. match(Kn,Mn) 检查返回结果中字段 Kn 值为 Mn 的记录是否存在,参数 Kn,Mn 可以有多个,需要成对出现
  221. matchcount(Kn,Mn,N) 检查返回结果中字段 Kn 值为 Mn 的记录数是否为 N,参数 Kn,Mn 可以有多个,需要成对出现,N为自然数
  222. 需要一次顺序执行多条语句时,可将多条语句包含在 multilines begin; 和 multilines end; 之间,多条语句之间用分号 ; 分隔
  223. 例:循环遍历执行当前路径下的所有 mql 文件
  224. ./mql .
  225. `)
  226. os.Exit(0)
  227. return
  228. }
  229. // 文件名以数字开头
  230. fnmatch := mcfg.CommandArgs.GetString("match", `^\d+.*`)
  231. fw, err := filewalker.NewFileWalker(spath, fnmatch) // orderby: dirfirst, filefirst, fullpath
  232. if !assert.Nil(t, err, err) {
  233. return
  234. }
  235. fns := []string{}
  236. fw.List(func(basedir, fpath string) bool { fns = append(fns, filepath.Join(basedir, fpath)); return true })
  237. if mcfg.CommandArgs.GetString("files?") == "files?" {
  238. fmt.Println(strings.Join(fns, "\n"))
  239. os.Exit(0)
  240. }
  241. // 确定MODB连接
  242. if !assert.Nil(t, ODBError) {
  243. return
  244. }
  245. logger.Info("odbpath :", ODBC.Config().Hosts, ODBC.Config().Port)
  246. logger.Info("keyspace :", ODBC.Config().Keyspace)
  247. logger.Info("version :", ODBC.Versline())
  248. logger.Info("ksnative :", ksnative)
  249. logger.Info("debug :", debug)
  250. logger.Info("spath :", spath)
  251. // if checkCassSchema && !cass.CheckCassandraSchema(t) {
  252. // assert.True(t, false, "cassandra schemda is different")
  253. // return
  254. // }
  255. logger.Info("walkdir: ", fw.WalkDir)
  256. logger.Info("pathmatch: ", fw.RePath)
  257. logger.Info("filematch: ", fw.ReFile)
  258. // test
  259. if len(fns) > 1 {
  260. appfile, err := filepath.Abs(os.Args[0])
  261. if !assert.Nil(t, err) {
  262. logger.Info("加锁时发生系统错误:", err)
  263. return
  264. }
  265. appdir := filepath.Dir(appfile)
  266. // 1. 创建一个锁对象(注意:这里传入的是锁文件的路径)
  267. fl := flock.New(filepath.Join(appdir, "mql.lock"))
  268. // 2. 尝试获取独占锁(写锁),不阻塞
  269. // 如果加锁失败(被其他进程占用),acquired 为 false,不会报错
  270. acquired, err := fl.TryLock()
  271. if !assert.Nil(t, err) {
  272. logger.Info("加锁时发生系统错误:", err)
  273. return
  274. }
  275. if !acquired {
  276. log.Println("无法获取测试锁,可能另一个测试实例正在运行")
  277. log.Println("can't lock test, another test is running")
  278. t.Fail()
  279. return
  280. }
  281. // 3. 务必在操作完成后解锁(使用 defer 确保执行)
  282. defer fl.Unlock()
  283. }
  284. logger.Info(fmt.Sprint("fns:", "\n", strings.Join(fns, "\n")))
  285. if len(fns) == 0 {
  286. logger.Info("没有找到 MQL 文件")
  287. return
  288. }
  289. logger.Info("共找到", len(fns), "个文件")
  290. // 顺序读取当前目录下文件名为数字开头的文件,执行其中的mql
  291. (&MQLTest{}).Run(t, fw)
  292. // 等待延迟显示信息输出
  293. // time.Sleep(1 * time.Second)
  294. logger.Info("关闭与服务器的连接")
  295. ODBC.Close()
  296. // 等待正常结束,只是为了发现隐含的问题
  297. // for n := 0; runtime.NumGoroutine() > 60 && n < 10; n++ {
  298. // fmt.Println("剩余协程数:", runtime.NumGoroutine())
  299. // time.Sleep(1000 * time.Millisecond)
  300. // }
  301. logger.Info("剩余协程数:", runtime.NumGoroutine())
  302. }
  303. // action(a1,a2...)
  304. // --注释后的大括号之间的内容 -- {}
  305. // mql query之后执行的动作
  306. var reactions = regexp.MustCompile(`(\w+)\s*\(([^\)]*)\)`)
  307. // 替换为随机可见字符
  308. var rereplace_nbytes = regexp.MustCompile(`\.\.\.\s*(\d+)\s*[Bb][Yy][Tt][Ee]s?\s*\.\.\.`)
  309. var reactioneexprs = regexp.MustCompile(`(?s)\/\*\*(?:\s*(\w+\s*\(\s*.*\)\s*)*)*\*\*\/`)
  310. var reactioneexprs_2 = regexp.MustCompile(`(?s)(?:(?:^|\n)\-\-\s*\{(.*)\}\s*)+$`)
  311. var reactioneexprs_3 = regexp.MustCompile(`(?s)(?:^\-\-\s*\{(.*)\}\s*)+\n`)
  312. var commentexprs = regexp.MustCompile(`(?s)\/\*(?:[^\*]|\*+[^\*\/])*\*+\/`)
  313. var commentexprs_2 = regexp.MustCompile(`(?ms)(?:^|\n)\-\-[^\n]*(?:\n|$)`)
  314. var commentexprs_3 = regexp.MustCompile(`(?ms)(?:^|\n)//[^\n]*(?:\n|$)`)
  315. type MQLTest struct {
  316. t *testing.T
  317. fw *filewalker.FileWalker
  318. scopevars *ScopeVars
  319. }
  320. type ScopeVars struct {
  321. sync.RWMutex
  322. top *Variables
  323. dir map[string]*Variables
  324. file map[string]*Variables
  325. mql map[string]*Variables
  326. }
  327. type Variables struct {
  328. vars map[string]interface{}
  329. loop_count int
  330. loop_from int
  331. loop_step int
  332. timeout time.Duration
  333. qmeta odb.QueryMeta
  334. parallel_max int
  335. }
  336. type CurrentVars struct {
  337. loop_i int
  338. ch_parallel_count chan<- int
  339. mqlcount int32
  340. sleeptime time.Duration
  341. totalusetime time.Duration
  342. maxusetime time.Duration
  343. minusetime time.Duration
  344. }
  345. type GlobalVars struct {
  346. sync.RWMutex
  347. *CurrentVars
  348. wg_wait_fork_routine map[string]*sync.WaitGroup
  349. ch_wait_mql_done map[string]chan bool
  350. }
  351. func (mt *MQLTest) Run(t *testing.T, fw *filewalker.FileWalker) (retok bool) {
  352. mt.t = t
  353. mt.fw = fw
  354. global := &GlobalVars{
  355. CurrentVars: &CurrentVars{},
  356. wg_wait_fork_routine: make(map[string]*sync.WaitGroup),
  357. ch_wait_mql_done: make(map[string]chan bool),
  358. }
  359. mt.scopevars = &ScopeVars{
  360. top: &Variables{
  361. vars: map[string]interface{}{},
  362. loop_count: 1,
  363. loop_from: 1,
  364. loop_step: 1},
  365. dir: map[string]*Variables{},
  366. file: map[string]*Variables{},
  367. mql: map[string]*Variables{}}
  368. ctx, cancel := context.WithCancel(context.Background())
  369. defer cancel()
  370. st := time.Now()
  371. loop_i := 0
  372. parallel_queue := pqc.NewQueue[any](0)
  373. mqlcount := int32(0)
  374. defer func() {
  375. mt.scopevars.RLock()
  376. loop_count := mt.scopevars.top.loop_count
  377. mt.scopevars.RUnlock()
  378. ut := time.Since(st)
  379. aut := time.Duration(0)
  380. if mqlcount > 0 {
  381. aut = global.totalusetime / time.Duration(mqlcount)
  382. }
  383. logger.Info("完成 ", loop_count, " 次执行,共", mqlcount, "次 MQL 请求,耗时", ut, "单条响应时间", global.minusetime, "~", global.maxusetime, "/", aut, "平均每秒吞吐量", (int64(mqlcount)*int64(time.Second))/int64(ut))
  384. }()
  385. var wg sync.WaitGroup
  386. for {
  387. mt.scopevars.Lock()
  388. ok := loop_i < mt.scopevars.top.loop_count
  389. mt.scopevars.Unlock()
  390. if !ok {
  391. break
  392. }
  393. loop_i++
  394. //
  395. ch_parallel_count := make(chan int)
  396. topvars := &CurrentVars{
  397. loop_i: loop_i,
  398. ch_parallel_count: ch_parallel_count,
  399. }
  400. //
  401. ok_chan := make(chan bool, 1)
  402. parallel_chan := make(chan bool, 1)
  403. parallel := false
  404. parallelcount := 0
  405. done := false
  406. wg.Add(1)
  407. go func() {
  408. defer func() {
  409. atomic.AddInt32(&mqlcount, topvars.mqlcount)
  410. wg.Done()
  411. }()
  412. ch_ok := make(chan bool)
  413. go func() {
  414. for {
  415. select {
  416. case <-ch_parallel_count:
  417. if !done && !parallel {
  418. parallel = true
  419. // 加入并发控制队列
  420. if parallelcount > 0 {
  421. if parallelcount > parallel_queue.Size() {
  422. parallel_queue.Growth(parallelcount)
  423. }
  424. parallel_queue.Push(1)
  425. }
  426. parallel_chan <- true
  427. }
  428. case ok := <-ch_ok:
  429. ok_chan <- ok
  430. if parallel {
  431. if parallelcount > 0 {
  432. // 从并发控制队列中移除
  433. parallel_queue.Pop()
  434. }
  435. } else {
  436. done = true
  437. }
  438. return
  439. }
  440. }
  441. }()
  442. ok := mt.RunAll(t, ctx,
  443. global,
  444. topvars)
  445. ch_ok <- ok
  446. if !ok {
  447. cancel() // 并发测试,有一个线程出错,就全停
  448. return
  449. }
  450. }()
  451. success := true
  452. select {
  453. case success = <-ok_chan: // 非并发,等待完成
  454. case <-parallel_chan: // 并发,执行继续下一次
  455. logger.Info("第", topvars.loop_i, "次并发执行继续")
  456. }
  457. if !success {
  458. return false // 失败,不等,直接返回
  459. }
  460. }
  461. wg.Wait()
  462. return
  463. }
  464. func (mt *MQLTest) RunAll(t *testing.T, ctx context.Context, global *GlobalVars, topvars *CurrentVars) bool {
  465. logger.Info("开始第", topvars.loop_i, "次执行")
  466. st := time.Now()
  467. // 读取文件列表
  468. listdirs := []string{}
  469. dirfiles := map[string][]string{}
  470. err := mt.fw.List(func(basedir, filename string) bool {
  471. if dirfiles[basedir] == nil {
  472. listdirs = append(listdirs, basedir)
  473. }
  474. dirfiles[basedir] = append(dirfiles[basedir], filename)
  475. return true
  476. })
  477. if !assert.Nil(t, err, err) {
  478. return false
  479. }
  480. // 执行所有目录
  481. for _, basedir := range listdirs {
  482. // 循环执行目录下所有文件
  483. mt.scopevars.Lock()
  484. if mt.scopevars.dir[basedir] == nil {
  485. mt.scopevars.dir[basedir] = &Variables{
  486. vars: map[string]interface{}{},
  487. loop_count: 1,
  488. loop_from: 1,
  489. loop_step: 1}
  490. }
  491. mt.scopevars.Unlock()
  492. var wg sync.WaitGroup
  493. st := time.Now()
  494. loop_i := 0
  495. parallel_queue := pqc.NewQueue[any](0)
  496. mqlcount := int32(0)
  497. for {
  498. mt.scopevars.Lock()
  499. ok := loop_i < mt.scopevars.dir[basedir].loop_count
  500. mt.scopevars.Unlock()
  501. if !ok {
  502. break
  503. }
  504. loop_i++
  505. //
  506. ch_parallel_count := make(chan int)
  507. dirvars := &CurrentVars{
  508. loop_i: loop_i,
  509. ch_parallel_count: ch_parallel_count,
  510. }
  511. //
  512. ok_chan := make(chan bool, 1)
  513. parallel_chan := make(chan bool, 1)
  514. parallel := false
  515. parallelcount := 0
  516. done := false
  517. wg.Add(1)
  518. go func(basedir string) {
  519. defer func() {
  520. atomic.AddInt32(&mqlcount, dirvars.mqlcount)
  521. wg.Done()
  522. }()
  523. ch_ok := make(chan bool)
  524. go func() {
  525. for {
  526. select {
  527. case <-ch_parallel_count:
  528. if !done && !parallel {
  529. parallel = true
  530. // 加入并发控制队列
  531. if parallelcount > 0 {
  532. if parallelcount > parallel_queue.Size() {
  533. parallel_queue.Growth(parallelcount)
  534. }
  535. parallel_queue.Push(1)
  536. }
  537. parallel_chan <- true
  538. }
  539. case ok := <-ch_ok:
  540. ok_chan <- ok
  541. if parallel {
  542. if parallelcount > 0 {
  543. // 从并发控制队列中移除
  544. parallel_queue.Pop()
  545. }
  546. } else {
  547. done = true
  548. }
  549. return
  550. }
  551. }
  552. }()
  553. // logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次执行开始")
  554. files := dirfiles[basedir]
  555. ch_ok <- mt.RunDir(t, ctx,
  556. global,
  557. topvars,
  558. dirvars,
  559. basedir, files)
  560. // logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次执行结束")
  561. }(basedir)
  562. success := true
  563. select {
  564. case success = <-ok_chan: // 非并发,等待完成
  565. logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次顺序执行完成")
  566. case <-parallel_chan: // 并发,执行继续下一次
  567. logger.Info("dir", basedir, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i), "次并发执行继续")
  568. }
  569. if !success {
  570. return false
  571. }
  572. }
  573. wg.Wait()
  574. mt.scopevars.RLock()
  575. loop_count := mt.scopevars.dir[basedir].loop_count
  576. mt.scopevars.RUnlock()
  577. if loop_count > 1 {
  578. ut := time.Since(st)
  579. logger.Info(fmt.Sprint("dir ", basedir, " loop ", loop_count, " times, run ", mqlcount, " mqls, usetime ", ut))
  580. }
  581. }
  582. logger.Info("完成第", topvars.loop_i, "次执行,共", topvars.mqlcount, "次 MQL 请求,耗时", time.Since(st))
  583. return true
  584. }
  585. func (mt *MQLTest) RunDir(t *testing.T, ctx context.Context,
  586. global *GlobalVars,
  587. topvars *CurrentVars,
  588. dirvars *CurrentVars,
  589. basedir string, files []string) bool {
  590. for _, filename := range files {
  591. ok := mt.RunFile(t, ctx, global, topvars, dirvars, basedir, filename)
  592. if !ok {
  593. return false
  594. }
  595. }
  596. return true
  597. }
  598. func (mt *MQLTest) RunFile(t *testing.T, ctx context.Context,
  599. global *GlobalVars,
  600. topvars *CurrentVars,
  601. dirvars *CurrentVars,
  602. basedir, filename string) bool {
  603. // 读取文件内容
  604. ffpath := filepath.Join(basedir, filename)
  605. bs, err := os.ReadFile(ffpath)
  606. if !assert.Nil(t, err, err) {
  607. return false
  608. }
  609. // mql语句切分
  610. mqgr := NewMQLGroupRequest()
  611. var multilines *MQLRequest
  612. mqs := spliter.NewMQLSpliter(bufio.NewReader(strings.NewReader(string(bs))))
  613. for {
  614. mql, fromline, toline, fromchar, tochar, hasnext, _ := mqs.NextMQL()
  615. if !hasnext {
  616. break
  617. }
  618. // 去掉注释
  619. clean_mql := strings.Join(spliter.MQLSplitClean(mql), ";")
  620. if multilines != nil {
  621. // 多条语句处理
  622. if strings.TrimSpace(clean_mql) == "multilines end" {
  623. // 保留语句中的注释
  624. tm := strings.TrimSpace(strings.Replace(mql, "multilines end", "", 1))
  625. if len(tm) > 0 {
  626. if multilines.OriginQueryString != "" {
  627. multilines.OriginQueryString += ";"
  628. }
  629. multilines.OriginQueryString += tm
  630. }
  631. multilines.Toline = toline
  632. multilines.Tochar = tochar
  633. // 加入请求组,并初始化,提取动作信息
  634. e := mqgr.Append(multilines)
  635. if !assert.Nil(t, e, e) {
  636. return false
  637. }
  638. multilines = nil
  639. } else {
  640. if multilines.OriginQueryString != "" {
  641. multilines.OriginQueryString += ";"
  642. }
  643. multilines.OriginQueryString += mql
  644. multilines.Toline = toline
  645. multilines.Tochar = tochar
  646. }
  647. } else if strings.TrimSpace(clean_mql) == "multilines begin" {
  648. multilines = &MQLRequest{FilePath: ffpath}
  649. // 保留语句中的注释
  650. multilines.OriginQueryString = strings.TrimSpace(strings.Replace(mql, "multilines begin", "", 1))
  651. multilines.Fromline = fromline
  652. multilines.Fromchar = fromchar
  653. multilines.Toline = toline
  654. multilines.Tochar = tochar
  655. } else {
  656. // 单条语句
  657. mqr := &MQLRequest{OriginQueryString: mql, FilePath: ffpath, Fromline: fromline, Toline: toline, Fromchar: fromchar, Tochar: tochar}
  658. // 加入请求组,并初始化,提取动作信息
  659. e := mqgr.Append(mqr)
  660. if !assert.Nil(t, e, e) {
  661. return false
  662. }
  663. }
  664. }
  665. // mqls := spliter.MQLSplit(string(bs))
  666. mt.scopevars.Lock()
  667. if mt.scopevars.file[ffpath] == nil {
  668. mt.scopevars.file[ffpath] = &Variables{
  669. vars: map[string]interface{}{},
  670. loop_count: 1,
  671. loop_from: 1,
  672. loop_step: 1}
  673. }
  674. mt.scopevars.Unlock()
  675. var wg sync.WaitGroup
  676. st := time.Now()
  677. loop_i := 0
  678. parallel_queue := pqc.NewQueue[any](0)
  679. mqlcount := int32(0)
  680. for {
  681. mt.scopevars.Lock()
  682. ok := loop_i < mt.scopevars.file[ffpath].loop_count
  683. mt.scopevars.Unlock()
  684. if !ok {
  685. break
  686. }
  687. loop_i++
  688. //
  689. ch_parallel_count := make(chan int)
  690. filevars := &CurrentVars{
  691. loop_i: loop_i,
  692. ch_parallel_count: ch_parallel_count,
  693. }
  694. //
  695. ok_chan := make(chan bool, 1)
  696. parallel_chan := make(chan bool, 1)
  697. parallel := false
  698. parallelcount := 0
  699. done := false
  700. wg.Add(1)
  701. go func() {
  702. defer func() {
  703. atomic.AddInt32(&mqlcount, filevars.mqlcount)
  704. wg.Done()
  705. }()
  706. ch_ok := make(chan bool)
  707. go func() {
  708. for {
  709. select {
  710. case <-ch_parallel_count:
  711. if !done && !parallel {
  712. parallel = true
  713. // 加入并发控制队列
  714. if parallelcount > 0 {
  715. if parallelcount > parallel_queue.Size() {
  716. parallel_queue.Growth(parallelcount)
  717. }
  718. parallel_queue.Push(1)
  719. }
  720. parallel_chan <- true
  721. }
  722. case ok := <-ch_ok:
  723. ok_chan <- ok
  724. if parallel {
  725. if parallelcount > 0 {
  726. // 从并发控制队列中移除
  727. parallel_queue.Pop()
  728. }
  729. } else {
  730. done = true
  731. }
  732. return
  733. }
  734. }
  735. }()
  736. // logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次执行开始")
  737. ch_ok <- mt.RunMQLGroup(t, ctx,
  738. global,
  739. topvars,
  740. dirvars,
  741. filevars,
  742. basedir, ffpath, mqgr)
  743. // logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次执行结束")
  744. }()
  745. success := true
  746. select {
  747. case success = <-ok_chan: // 非并发,等待完成
  748. logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次顺序执行完成")
  749. case <-parallel_chan: // 并发,执行继续下一次
  750. logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次并发执行继续")
  751. }
  752. if !success {
  753. return false
  754. }
  755. }
  756. wg.Wait()
  757. mt.scopevars.RLock()
  758. loop_count := mt.scopevars.file[ffpath].loop_count
  759. mt.scopevars.RUnlock()
  760. if loop_count > 1 {
  761. ut := time.Since(st)
  762. sn := fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i)
  763. logger.Info(fmt.Sprint("file ", ffpath+"/"+sn, " loop ", loop_count, " times, run ", mqlcount, " mqls, usetime ", ut))
  764. }
  765. return true
  766. }
  767. func (mt *MQLTest) RunMQLGroup(t *testing.T, ctx context.Context,
  768. global *GlobalVars,
  769. topvars *CurrentVars,
  770. dirvars *CurrentVars,
  771. filevars *CurrentVars,
  772. basedir, ffpath string, mqgr *MQLGroupRequest) (pass bool) {
  773. pass = true
  774. var wg sync.WaitGroup
  775. for _, mqs := range mqgr.mqrs {
  776. wg.Add(1)
  777. go func(mqs []*MQLRequest) {
  778. defer wg.Done()
  779. if len(mqs) == 0 {
  780. return
  781. }
  782. if mqs[0].StaticActions.ForkName != nil {
  783. forkname := *mqs[0].StaticActions.ForkName
  784. global.Lock()
  785. wg := global.wg_wait_fork_routine[forkname]
  786. if wg == nil {
  787. wg = &sync.WaitGroup{}
  788. global.wg_wait_fork_routine[forkname] = wg
  789. }
  790. global.Unlock()
  791. wg.Add(1)
  792. defer wg.Done()
  793. }
  794. ok := mt.RunMQLs(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqs)
  795. if !ok {
  796. pass = false
  797. }
  798. }(mqs)
  799. }
  800. wg.Wait()
  801. return
  802. }
  803. func (mt *MQLTest) RunMQLs(t *testing.T, ctx context.Context,
  804. global *GlobalVars,
  805. topvars *CurrentVars,
  806. dirvars *CurrentVars,
  807. filevars *CurrentVars,
  808. basedir, ffpath string, mqrs []*MQLRequest) bool {
  809. for _, mqr := range mqrs {
  810. mqlkey := mqr.Key
  811. mqlstr := mqr.OriginQueryString
  812. if mqlstr == "" {
  813. continue
  814. }
  815. staticactions := mqr.StaticActions
  816. // 设置执行过程中的控制参数
  817. mt.InitScopeVars(basedir, ffpath, mqlkey, staticactions)
  818. ch_test_run_one_mql_result := make(chan bool)
  819. go func() {
  820. mqrinst := fmt.Sprint(mqlkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
  821. global.Lock()
  822. mqrdone := global.ch_wait_mql_done[mqrinst]
  823. if mqrdone == nil {
  824. mqrdone = make(chan bool)
  825. global.ch_wait_mql_done[mqrinst] = mqrdone
  826. }
  827. var waitmqrdone chan bool
  828. if mqr.WaitMQLRequest != nil {
  829. waitmqrkey := mqr.WaitMQLRequest.Key
  830. waitmqrinst := fmt.Sprint(waitmqrkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
  831. waitmqrdone = global.ch_wait_mql_done[waitmqrinst]
  832. if waitmqrdone == nil {
  833. waitmqrdone = make(chan bool)
  834. global.ch_wait_mql_done[waitmqrinst] = waitmqrdone
  835. }
  836. }
  837. global.Unlock()
  838. var ret bool
  839. defer func() { mqrdone <- ret }()
  840. if waitmqrdone != nil {
  841. v := <-waitmqrdone
  842. waitmqrdone <- v
  843. if !v {
  844. // 依赖MQR失败
  845. ch_test_run_one_mql_result <- false
  846. return
  847. }
  848. }
  849. ret = mt.RunMQR(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqr)
  850. ch_test_run_one_mql_result <- ret
  851. }()
  852. ret := <-ch_test_run_one_mql_result
  853. if !ret {
  854. return ret
  855. }
  856. // continue
  857. }
  858. return true
  859. }
  860. func (mt *MQLTest) RunMQR(t *testing.T, ctx context.Context,
  861. global *GlobalVars,
  862. topvars *CurrentVars,
  863. dirvars *CurrentVars,
  864. filevars *CurrentVars,
  865. basedir, ffpath string, mqr *MQLRequest) bool {
  866. mqlkey := mqr.Key
  867. staticactionexprs := mqr.StaticActionExprs
  868. staticactions := mqr.StaticActions
  869. return t.Run(mqlkey, func(t *testing.T) {
  870. var wg sync.WaitGroup
  871. st := time.Now()
  872. loop_i := 0
  873. parallel_queue := pqc.NewQueue[any](0)
  874. mqlcount := int32(0)
  875. for {
  876. mt.scopevars.Lock()
  877. ok := loop_i < mt.scopevars.mql[mqlkey].loop_count
  878. mt.scopevars.Unlock()
  879. if !ok {
  880. break
  881. }
  882. loop_i++
  883. //
  884. ch_parallel_count := make(chan int)
  885. mqlvars := &CurrentVars{
  886. loop_i: loop_i,
  887. ch_parallel_count: ch_parallel_count,
  888. }
  889. // 运行实例
  890. mqlsn := fmt.Sprint(mqlkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i, ".", mqlvars.loop_i)
  891. mt.scopevars.Lock()
  892. if mt.scopevars.mql[mqlsn] == nil {
  893. mt.scopevars.mql[mqlsn] = &Variables{
  894. vars: map[string]interface{}{},
  895. loop_count: 1,
  896. loop_from: 1,
  897. loop_step: 1}
  898. }
  899. mt.scopevars.Unlock()
  900. //
  901. mt.scopevars.Lock()
  902. top_loopi := mt.scopevars.top.loop_from + (topvars.loop_i-1)*mt.scopevars.top.loop_step
  903. mt.scopevars.top.vars["topi"] = top_loopi
  904. dir_loopi := mt.scopevars.dir[basedir].loop_from + (dirvars.loop_i-1)*mt.scopevars.dir[basedir].loop_step
  905. mt.scopevars.dir[basedir].vars["diri"] = dir_loopi
  906. mt.scopevars.dir[basedir].vars["topi"] = top_loopi
  907. file_loopi := mt.scopevars.file[ffpath].loop_from + (filevars.loop_i-1)*mt.scopevars.file[ffpath].loop_step
  908. mt.scopevars.file[ffpath].vars["filei"] = file_loopi
  909. mt.scopevars.file[ffpath].vars["diri"] = dir_loopi
  910. mt.scopevars.file[ffpath].vars["topi"] = top_loopi
  911. mql_loopi := mt.scopevars.mql[mqlkey].loop_from + (mqlvars.loop_i-1)*mt.scopevars.mql[mqlkey].loop_step
  912. mt.scopevars.mql[mqlkey].vars["mqli"] = mql_loopi
  913. mt.scopevars.mql[mqlkey].vars["filei"] = file_loopi
  914. mt.scopevars.mql[mqlkey].vars["diri"] = dir_loopi
  915. mt.scopevars.mql[mqlkey].vars["topi"] = top_loopi
  916. //
  917. mt.scopevars.mql[mqlsn].vars["mqli"] = mql_loopi
  918. mt.scopevars.mql[mqlsn].vars["filei"] = file_loopi
  919. mt.scopevars.mql[mqlsn].vars["diri"] = dir_loopi
  920. mt.scopevars.mql[mqlsn].vars["topi"] = top_loopi
  921. mt.scopevars.Unlock()
  922. mt.BeforeRunAction(basedir, ffpath, mqlkey, mqlsn, staticactions)
  923. formatedmqlstr := mqr.FormatedQueryString
  924. mql := mt.ReplaceLoopSN(formatedmqlstr,
  925. global, topvars, dirvars, filevars, mqlvars,
  926. top_loopi, dir_loopi, file_loopi, mql_loopi,
  927. basedir, ffpath, mqlkey, mqlsn,
  928. )
  929. mqlreplace := rereplace_nbytes.FindAllStringSubmatch(mql, -1)
  930. for _, mqr := range mqlreplace {
  931. if len(mqr) == 2 {
  932. bs := make([]byte, cast.ToInt(mqr[1]))
  933. for i := 0; i < len(bs); i++ {
  934. bs[i] = byte(32 + rand.Intn(91))
  935. }
  936. s := string(bs)
  937. s = strings.ReplaceAll(s, ")", string([]byte{123}))
  938. s = strings.ReplaceAll(s, "'", string([]byte{124}))
  939. s = strings.ReplaceAll(s, "\"", string([]byte{125}))
  940. s = strings.ReplaceAll(s, "\\", string([]byte{126}))
  941. mql = strings.Replace(mql, mqr[0], s, 1)
  942. }
  943. }
  944. for i, sat := range staticactionexprs {
  945. mql = strings.Replace(mql, "["+strconv.Itoa(i)+"]", sat.SourceText, 1)
  946. }
  947. mql = strings.ReplaceAll(mql, "[[]", "[")
  948. mqri := &MQLRequestInstance{
  949. MQLRequest: mqr,
  950. PreparedQueryString: mql,
  951. }
  952. //
  953. ok_chan := make(chan error, 1)
  954. parallel_chan := make(chan bool, 1)
  955. parallel := false
  956. parallelcount := 0
  957. done := false
  958. wg.Add(1)
  959. go func(mqlvars *CurrentVars) {
  960. defer func() {
  961. atomic.AddInt32(&mqlcount, mqlvars.mqlcount)
  962. wg.Done()
  963. }()
  964. ch_ok := make(chan error)
  965. go func() {
  966. for {
  967. select {
  968. case parallelcount = <-ch_parallel_count:
  969. if !done && !parallel {
  970. parallel = true
  971. // 加入并发控制队列
  972. if parallelcount > 0 {
  973. if parallelcount > parallel_queue.Size() {
  974. parallel_queue.Growth(parallelcount)
  975. }
  976. parallel_queue.Push(1)
  977. }
  978. parallel_chan <- true
  979. }
  980. case ok := <-ch_ok:
  981. ok_chan <- ok
  982. if parallel {
  983. if parallelcount > 0 {
  984. // 从并发控制队列中移除
  985. parallel_queue.Pop()
  986. }
  987. } else {
  988. done = true
  989. }
  990. return
  991. }
  992. }
  993. }()
  994. nstaticactionexprs := map[int]*Action{}
  995. for i, sat := range staticactionexprs {
  996. nstaticactionexprs[i] = sat
  997. }
  998. // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次执行开始")
  999. ch_ok <- mt.RunMQL(t, ctx,
  1000. global,
  1001. topvars,
  1002. dirvars,
  1003. filevars,
  1004. mqlvars,
  1005. basedir, ffpath, mqlkey, mqlsn, mqri, staticactions, nstaticactionexprs)
  1006. // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次执行结束")
  1007. }(mqlvars)
  1008. var err error
  1009. select {
  1010. case err = <-ok_chan: // 非并发,等待完成
  1011. // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次顺序执行完成")
  1012. case <-parallel_chan: // 并发,执行继续下一次
  1013. // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次并发执行继续")
  1014. }
  1015. if err != nil {
  1016. // logger.Error(err)
  1017. break
  1018. }
  1019. }
  1020. wg.Wait()
  1021. mt.scopevars.RLock()
  1022. loop_count := mt.scopevars.mql[mqlkey].loop_count
  1023. mt.scopevars.RUnlock()
  1024. if loop_count > 1 {
  1025. ut := time.Since(st)
  1026. sn := fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
  1027. as := ""
  1028. if topvars.sleeptime > 0 {
  1029. as = fmt.Sprint(", sleep ", topvars.sleeptime)
  1030. }
  1031. logger.Info(fmt.Sprint("mql ", mqlkey+"/"+sn, " loop ", loop_i, " times, usetime ", ut, as))
  1032. }
  1033. })
  1034. }
  1035. func (mt *MQLTest) getValue(basedir, ffpath, mqlkey, mqlsn string,
  1036. staticactions *StaticActions, a any) (ak string, va any) {
  1037. switch av := a.(type) {
  1038. case *Action:
  1039. switch av.Name {
  1040. case "<EQ>":
  1041. if len(av.Args) >= 2 {
  1042. _, x := mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, av.Args[0])
  1043. _, y := mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, av.Args[1])
  1044. if cast.ToString(x) == cast.ToString(y) {
  1045. return "", true
  1046. }
  1047. }
  1048. return "", false
  1049. }
  1050. case *qstru.Expr:
  1051. ak = av.VarName
  1052. if ak == "now" {
  1053. return "", time.Now()
  1054. }
  1055. mt.scopevars.Lock()
  1056. switch staticactions.Scope {
  1057. case "top":
  1058. va = mt.scopevars.top.vars[ak]
  1059. case "dir":
  1060. va = mt.scopevars.dir[basedir].vars[ak]
  1061. case "file":
  1062. va = mt.scopevars.file[ffpath].vars[ak]
  1063. default:
  1064. var has bool
  1065. va, has = mt.scopevars.mql[mqlsn].vars[ak]
  1066. if !has {
  1067. va = mt.scopevars.mql[mqlkey].vars[ak]
  1068. }
  1069. }
  1070. mt.scopevars.Unlock()
  1071. case string:
  1072. va = av
  1073. for _, tf := range time_layouts {
  1074. t, e := time.Parse(tf, av)
  1075. if e == nil {
  1076. va = t
  1077. break
  1078. }
  1079. }
  1080. default:
  1081. va = a
  1082. }
  1083. return
  1084. }
  1085. func (mt *MQLTest) setValue(basedir, ffpath, mqlkey, mqlsn string,
  1086. staticactions *StaticActions, ak string, va any) {
  1087. mt.scopevars.Lock()
  1088. switch staticactions.Scope {
  1089. case "top":
  1090. mt.scopevars.top.vars[ak] = va
  1091. case "dir":
  1092. mt.scopevars.dir[basedir].vars[ak] = va
  1093. case "file":
  1094. mt.scopevars.file[ffpath].vars[ak] = va
  1095. default:
  1096. _, has := mt.scopevars.mql[mqlsn].vars[ak]
  1097. if has {
  1098. mt.scopevars.mql[mqlsn].vars[ak] = va
  1099. } else {
  1100. mt.scopevars.mql[mqlkey].vars[ak] = va
  1101. }
  1102. }
  1103. mt.scopevars.Unlock()
  1104. }
  1105. func (mt *MQLTest) BeforeRunAction(basedir, ffpath, mqlkey, mqlsn string,
  1106. staticactions *StaticActions,
  1107. ) {
  1108. mt.runAction(basedir, ffpath, mqlkey, mqlsn, staticactions, staticactions.BeforeRunActions...)
  1109. }
  1110. func (mt *MQLTest) runAction(basedir, ffpath, mqlkey, mqlsn string,
  1111. staticactions *StaticActions, actions ...*Action,
  1112. ) {
  1113. for _, act := range actions {
  1114. switch act.Name {
  1115. case "set":
  1116. var a, b any
  1117. var ak string
  1118. if len(act.Args) > 1 {
  1119. a = act.Args[0]
  1120. b = act.Args[1]
  1121. }
  1122. ak, _ = mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, a)
  1123. _, vb := mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, b)
  1124. mt.setValue(basedir, ffpath, mqlkey, mqlsn, staticactions, ak, vb)
  1125. case "add":
  1126. var a, b, va, vb any
  1127. var ak string
  1128. if len(act.Args) > 1 {
  1129. a = act.Args[0]
  1130. b = act.Args[1]
  1131. }
  1132. ak, va = mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, a)
  1133. _, vb = mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, b)
  1134. switch v := va.(type) {
  1135. case time.Time:
  1136. unit := "second"
  1137. if len(act.Args) > 2 {
  1138. unit = cast.ToString(act.Args[2])
  1139. }
  1140. vb := cast.ToInt(vb)
  1141. d := time.Duration(0)
  1142. switch unit {
  1143. case "second", "seconds", "s", "秒":
  1144. d = time.Duration(vb) * time.Second
  1145. case "minute", "minutes", "m", "分":
  1146. d = time.Duration(vb) * time.Minute
  1147. case "hour", "hours", "h", "时":
  1148. d = time.Duration(vb) * time.Hour
  1149. }
  1150. v = v.Add(d)
  1151. mt.setValue(basedir, ffpath, mqlkey, mqlsn, staticactions, ak, v)
  1152. default:
  1153. av := cast.ToInt(va)
  1154. bv := cast.ToInt(vb)
  1155. nv := av + bv
  1156. mt.setValue(basedir, ffpath, mqlkey, mqlsn, staticactions, ak, nv)
  1157. }
  1158. case "mod":
  1159. var a, b, va, vb any
  1160. var ak string
  1161. if len(act.Args) > 1 {
  1162. a = act.Args[0]
  1163. b = act.Args[1]
  1164. }
  1165. ak, va = mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, a)
  1166. _, vb = mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, b)
  1167. av := cast.ToInt(va)
  1168. bv := cast.ToInt(vb)
  1169. v := av % bv
  1170. mt.setValue(basedir, ffpath, mqlkey, mqlsn, staticactions, ak, v)
  1171. case "case":
  1172. var a, b, va any
  1173. if len(act.Args) > 1 {
  1174. a = act.Args[0]
  1175. b = act.Args[1]
  1176. }
  1177. _, va = mt.getValue(basedir, ffpath, mqlkey, mqlsn, staticactions, a)
  1178. if cast.ToBool(va) {
  1179. switch vb := b.(type) {
  1180. case *Action:
  1181. mt.runAction(basedir, ffpath, mqlkey, mqlsn, staticactions, vb)
  1182. }
  1183. }
  1184. }
  1185. }
  1186. }
  1187. func (mt *MQLTest) InitScopeVars(basedir, ffpath, mqlkey string,
  1188. staticactions *StaticActions,
  1189. ) {
  1190. mt.scopevars.Lock()
  1191. if mt.scopevars.mql[mqlkey] == nil {
  1192. mt.scopevars.mql[mqlkey] = &Variables{
  1193. vars: map[string]interface{}{},
  1194. loop_count: 1,
  1195. loop_from: 1,
  1196. loop_step: 1}
  1197. }
  1198. mt.scopevars.Unlock()
  1199. for k, v := range staticactions.Variables {
  1200. mt.scopevars.Lock()
  1201. switch staticactions.Scope {
  1202. case "top":
  1203. mt.scopevars.top.vars[k] = v
  1204. case "dir":
  1205. mt.scopevars.dir[basedir].vars[k] = v
  1206. case "file":
  1207. mt.scopevars.file[ffpath].vars[k] = v
  1208. default:
  1209. mt.scopevars.mql[mqlkey].vars[k] = v
  1210. }
  1211. mt.scopevars.Unlock()
  1212. }
  1213. if staticactions.Timeout != nil {
  1214. mt.scopevars.Lock()
  1215. switch staticactions.Scope {
  1216. case "top":
  1217. mt.scopevars.top.timeout = *staticactions.Timeout
  1218. case "dir":
  1219. mt.scopevars.dir[basedir].timeout = *staticactions.Timeout
  1220. case "file":
  1221. mt.scopevars.file[ffpath].timeout = *staticactions.Timeout
  1222. default:
  1223. mt.scopevars.mql[mqlkey].timeout = *staticactions.Timeout
  1224. }
  1225. mt.scopevars.Unlock()
  1226. }
  1227. if staticactions.Qmeta != nil {
  1228. mt.scopevars.Lock()
  1229. switch staticactions.Scope {
  1230. case "top":
  1231. mt.scopevars.top.qmeta = staticactions.Qmeta
  1232. case "dir":
  1233. mt.scopevars.dir[basedir].qmeta = staticactions.Qmeta
  1234. case "file":
  1235. mt.scopevars.file[ffpath].qmeta = staticactions.Qmeta
  1236. default:
  1237. mt.scopevars.mql[mqlkey].qmeta = staticactions.Qmeta
  1238. }
  1239. mt.scopevars.Unlock()
  1240. }
  1241. if staticactions.LoopCount != nil {
  1242. mt.scopevars.Lock()
  1243. switch staticactions.Scope {
  1244. case "top":
  1245. mt.scopevars.top.loop_count = *staticactions.LoopCount
  1246. case "dir":
  1247. mt.scopevars.dir[basedir].loop_count = *staticactions.LoopCount
  1248. case "file":
  1249. mt.scopevars.file[ffpath].loop_count = *staticactions.LoopCount
  1250. default:
  1251. mt.scopevars.mql[mqlkey].loop_count = *staticactions.LoopCount
  1252. }
  1253. mt.scopevars.Unlock()
  1254. }
  1255. if staticactions.LoopFrom != nil {
  1256. mt.scopevars.Lock()
  1257. switch staticactions.Scope {
  1258. case "top":
  1259. mt.scopevars.top.loop_from = *staticactions.LoopFrom
  1260. case "dir":
  1261. mt.scopevars.dir[basedir].loop_from = *staticactions.LoopFrom
  1262. case "file":
  1263. mt.scopevars.file[ffpath].loop_from = *staticactions.LoopFrom
  1264. default:
  1265. mt.scopevars.mql[mqlkey].loop_from = *staticactions.LoopFrom
  1266. }
  1267. mt.scopevars.Unlock()
  1268. }
  1269. if staticactions.LoopStep != nil {
  1270. mt.scopevars.Lock()
  1271. switch staticactions.Scope {
  1272. case "top":
  1273. mt.scopevars.top.loop_step = *staticactions.LoopStep
  1274. case "dir":
  1275. mt.scopevars.dir[basedir].loop_step = *staticactions.LoopStep
  1276. case "file":
  1277. mt.scopevars.file[ffpath].loop_step = *staticactions.LoopStep
  1278. default:
  1279. mt.scopevars.mql[mqlkey].loop_step = *staticactions.LoopStep
  1280. }
  1281. mt.scopevars.Unlock()
  1282. }
  1283. }
  1284. func (mt *MQLTest) RunMQL(t *testing.T, ctx context.Context,
  1285. global *GlobalVars,
  1286. topvars *CurrentVars,
  1287. dirvars *CurrentVars,
  1288. filevars *CurrentVars,
  1289. mqlvars *CurrentVars,
  1290. basedir, ffpath, mqlkey, mqlsn string, mqri *MQLRequestInstance, staticactions *StaticActions, staticactionexprs map[int]*Action) error {
  1291. mqlstr := mqri.PreparedQueryString
  1292. if staticactions.WaitName != nil {
  1293. global.Lock()
  1294. wgs := []*sync.WaitGroup{}
  1295. if *staticactions.WaitName == "" {
  1296. for _, wg := range global.wg_wait_fork_routine {
  1297. wgs = append(wgs, wg)
  1298. }
  1299. } else {
  1300. wg := global.wg_wait_fork_routine[*staticactions.WaitName]
  1301. if wg != nil {
  1302. wgs = append(wgs, wg)
  1303. }
  1304. }
  1305. global.Unlock()
  1306. for _, wg := range wgs {
  1307. wg.Wait()
  1308. }
  1309. }
  1310. if staticactions.ParallelCount != nil {
  1311. mt.scopevars.Lock()
  1312. switch staticactions.Scope {
  1313. case "top":
  1314. topvars.ch_parallel_count <- *staticactions.ParallelCount
  1315. case "dir":
  1316. dirvars.ch_parallel_count <- *staticactions.ParallelCount
  1317. case "file":
  1318. filevars.ch_parallel_count <- *staticactions.ParallelCount
  1319. default:
  1320. mqlvars.ch_parallel_count <- *staticactions.ParallelCount
  1321. }
  1322. mt.scopevars.Unlock()
  1323. }
  1324. // 重新获取修正后的动作
  1325. actionexprs, e := getActionExprs(mqlstr, staticactionexprs)
  1326. if !assert.Nil(t, e, e) {
  1327. return e
  1328. }
  1329. dynamicactions := actionexprs.DynamicActions()
  1330. if len(dynamicactions.SubscribeArgs) > 0 {
  1331. subscribe(dynamicactions.SubscribeArgs...)
  1332. }
  1333. if len(dynamicactions.UnsubscribeArgs) > 0 {
  1334. unsubscribe(dynamicactions.UnsubscribeArgs...)
  1335. }
  1336. values := []interface{}{}
  1337. if len(dynamicactions.Params) > 0 {
  1338. err := json.Unmarshal([]byte(dynamicactions.Params), &values)
  1339. if err != nil {
  1340. assert.Nil(t, fmt.Sprint("params参数只支持JSON Array,", dynamicactions.Params, mqlstr), err)
  1341. return err
  1342. }
  1343. }
  1344. x := atomic.AddInt32(&global.mqlcount, 1)
  1345. atomic.AddInt32(&topvars.mqlcount, 1)
  1346. atomic.AddInt32(&dirvars.mqlcount, 1)
  1347. atomic.AddInt32(&filevars.mqlcount, 1)
  1348. atomic.AddInt32(&mqlvars.mqlcount, 1)
  1349. err := mt.RunMQLTryDo(t, ctx,
  1350. global,
  1351. topvars,
  1352. dirvars,
  1353. filevars,
  1354. mqlvars,
  1355. basedir, ffpath, mqlkey, mqlsn+"("+strconv.Itoa(int(x))+")", mqri, values, staticactions, actionexprs)
  1356. if err != nil {
  1357. // 执行过程有错,停止继续执行
  1358. return err
  1359. }
  1360. if staticactions.SleepTime != nil {
  1361. global.sleeptime += *staticactions.SleepTime
  1362. topvars.sleeptime += *staticactions.SleepTime
  1363. dirvars.sleeptime += *staticactions.SleepTime
  1364. filevars.sleeptime += *staticactions.SleepTime
  1365. mqlvars.sleeptime += *staticactions.SleepTime
  1366. time.Sleep(*staticactions.SleepTime)
  1367. }
  1368. return nil
  1369. }