mql_run_mql.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package odbcmql
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math/rand"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "testing"
  12. "time"
  13. "gitee.com/wecisecode/util/cast"
  14. "gitee.com/wecisecode/util/pqc"
  15. "github.com/stretchr/testify/assert"
  16. )
  17. func (mt *MQLTest) RunMQR(t *testing.T, ctx context.Context,
  18. global *GlobalVars,
  19. topvars *CurrentVars,
  20. dirvars *CurrentVars,
  21. filevars *CurrentVars,
  22. basedir, ffpath string, mqr *MQLRequest) bool {
  23. mqlkey := mqr.Key
  24. staticactionexprs := mqr.StaticActionExprs
  25. staticactions := mqr.StaticActions
  26. return t.Run(mqlkey, func(t *testing.T) {
  27. var wg sync.WaitGroup
  28. st := time.Now()
  29. loop_i := 0
  30. parallel_queue := pqc.NewQueue[any](0)
  31. mqlcount := int32(0)
  32. for {
  33. mt.scopevars.Lock()
  34. ok := loop_i < mt.scopevars.mql[mqlkey].loop_count
  35. mt.scopevars.Unlock()
  36. if !ok {
  37. break
  38. }
  39. loop_i++
  40. //
  41. ch_parallel_count := make(chan int)
  42. mqlvars := &CurrentVars{
  43. loop_i: loop_i,
  44. ch_parallel_count: ch_parallel_count,
  45. }
  46. // 运行实例
  47. mqlsn := fmt.Sprint(mqlkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i, ".", mqlvars.loop_i)
  48. mt.scopevars.Lock()
  49. if mt.scopevars.mql[mqlsn] == nil {
  50. mt.scopevars.mql[mqlsn] = &Variables{
  51. vars: map[string]interface{}{},
  52. loop_count: 1,
  53. loop_from: 1,
  54. loop_step: 1}
  55. }
  56. mt.scopevars.Unlock()
  57. //
  58. mt.scopevars.Lock()
  59. top_loopi := mt.scopevars.top.loop_from + (topvars.loop_i-1)*mt.scopevars.top.loop_step
  60. mt.scopevars.top.vars["topi"] = top_loopi
  61. dir_loopi := mt.scopevars.dir[basedir].loop_from + (dirvars.loop_i-1)*mt.scopevars.dir[basedir].loop_step
  62. mt.scopevars.dir[basedir].vars["diri"] = dir_loopi
  63. mt.scopevars.dir[basedir].vars["topi"] = top_loopi
  64. file_loopi := mt.scopevars.file[ffpath].loop_from + (filevars.loop_i-1)*mt.scopevars.file[ffpath].loop_step
  65. mt.scopevars.file[ffpath].vars["filei"] = file_loopi
  66. mt.scopevars.file[ffpath].vars["diri"] = dir_loopi
  67. mt.scopevars.file[ffpath].vars["topi"] = top_loopi
  68. mql_loopi := mt.scopevars.mql[mqlkey].loop_from + (mqlvars.loop_i-1)*mt.scopevars.mql[mqlkey].loop_step
  69. mt.scopevars.mql[mqlkey].vars["mqli"] = mql_loopi
  70. mt.scopevars.mql[mqlkey].vars["filei"] = file_loopi
  71. mt.scopevars.mql[mqlkey].vars["diri"] = dir_loopi
  72. mt.scopevars.mql[mqlkey].vars["topi"] = top_loopi
  73. //
  74. mt.scopevars.mql[mqlsn].vars["mqli"] = mql_loopi
  75. mt.scopevars.mql[mqlsn].vars["filei"] = file_loopi
  76. mt.scopevars.mql[mqlsn].vars["diri"] = dir_loopi
  77. mt.scopevars.mql[mqlsn].vars["topi"] = top_loopi
  78. mt.scopevars.Unlock()
  79. mt.BeforeRunAction(basedir, ffpath, mqlkey, mqlsn, staticactions)
  80. formatedmqlstr := mqr.FormatedQueryString
  81. mql := mt.ReplaceLoopSN(formatedmqlstr,
  82. global, topvars, dirvars, filevars, mqlvars,
  83. top_loopi, dir_loopi, file_loopi, mql_loopi,
  84. basedir, ffpath, mqlkey, mqlsn,
  85. )
  86. mqlreplace := rereplace_nbytes.FindAllStringSubmatch(mql, -1)
  87. for _, mqr := range mqlreplace {
  88. if len(mqr) == 2 {
  89. bs := make([]byte, cast.ToInt(mqr[1]))
  90. for i := 0; i < len(bs); i++ {
  91. bs[i] = byte(32 + rand.Intn(91))
  92. }
  93. s := string(bs)
  94. s = strings.ReplaceAll(s, ")", string([]byte{123}))
  95. s = strings.ReplaceAll(s, "'", string([]byte{124}))
  96. s = strings.ReplaceAll(s, "\"", string([]byte{125}))
  97. s = strings.ReplaceAll(s, "\\", string([]byte{126}))
  98. mql = strings.Replace(mql, mqr[0], s, 1)
  99. }
  100. }
  101. for i, sat := range staticactionexprs {
  102. mql = strings.Replace(mql, "["+strconv.Itoa(i)+"]", sat.SourceText, 1)
  103. }
  104. mql = strings.ReplaceAll(mql, "[[]", "[")
  105. mqri := &MQLRequestInstance{
  106. MQLRequest: mqr,
  107. PreparedQueryString: mql,
  108. }
  109. //
  110. ok_chan := make(chan error, 1)
  111. parallel_chan := make(chan bool, 1)
  112. parallel := false
  113. parallelcount := 0
  114. done := false
  115. wg.Add(1)
  116. go func(mqlvars *CurrentVars) {
  117. defer func() {
  118. atomic.AddInt32(&mqlcount, mqlvars.mqlcount)
  119. wg.Done()
  120. }()
  121. ch_ok := make(chan error)
  122. go func() {
  123. for {
  124. select {
  125. case parallelcount = <-ch_parallel_count:
  126. if !done && !parallel {
  127. parallel = true
  128. // 加入并发控制队列
  129. if parallelcount > 0 {
  130. if parallelcount > parallel_queue.Size() {
  131. parallel_queue.Growth(parallelcount)
  132. }
  133. parallel_queue.Push(1)
  134. }
  135. parallel_chan <- true
  136. }
  137. case ok := <-ch_ok:
  138. ok_chan <- ok
  139. if parallel {
  140. if parallelcount > 0 {
  141. // 从并发控制队列中移除
  142. parallel_queue.Pop()
  143. }
  144. } else {
  145. done = true
  146. }
  147. return
  148. }
  149. }
  150. }()
  151. nstaticactionexprs := map[int]*Action{}
  152. for i, sat := range staticactionexprs {
  153. nstaticactionexprs[i] = sat
  154. }
  155. // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次执行开始")
  156. ch_ok <- mt.RunMQL(t, ctx,
  157. global,
  158. topvars,
  159. dirvars,
  160. filevars,
  161. mqlvars,
  162. basedir, ffpath, mqlkey, mqlsn, mqri, staticactions, nstaticactionexprs)
  163. // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次执行结束")
  164. }(mqlvars)
  165. var err error
  166. select {
  167. case err = <-ok_chan: // 非并发,等待完成
  168. // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次顺序执行完成")
  169. case <-parallel_chan: // 并发,执行继续下一次
  170. // logger.Info(mqlkey, "第", fmt.Sprint(all_loop_i, ".", dir_loop_i, ".", file_loop_i, ".", curval.mql_i), "次并发执行继续")
  171. }
  172. if err != nil {
  173. // logger.Error(err)
  174. break
  175. }
  176. }
  177. wg.Wait()
  178. mt.scopevars.RLock()
  179. loop_count := mt.scopevars.mql[mqlkey].loop_count
  180. mt.scopevars.RUnlock()
  181. if loop_count > 1 {
  182. ut := time.Since(st)
  183. sn := fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
  184. as := ""
  185. if topvars.sleeptime > 0 {
  186. as = fmt.Sprint(", sleep ", topvars.sleeptime)
  187. }
  188. logger.Info(fmt.Sprint("mql ", mqlkey+"/"+sn, " loop ", loop_i, " times, usetime ", ut, as))
  189. }
  190. })
  191. }
  192. // 执行一条命令
  193. func (mt *MQLTest) RunMQL(t *testing.T, ctx context.Context,
  194. global *GlobalVars,
  195. topvars *CurrentVars,
  196. dirvars *CurrentVars,
  197. filevars *CurrentVars,
  198. mqlvars *CurrentVars,
  199. basedir, ffpath, mqlkey, mqlsn string, mqri *MQLRequestInstance, staticactions *StaticActions, staticactionexprs map[int]*Action) error {
  200. mqlstr := mqri.PreparedQueryString
  201. if staticactions.WaitName != nil {
  202. global.Lock()
  203. wgs := []*sync.WaitGroup{}
  204. if *staticactions.WaitName == "" {
  205. for _, wg := range global.wg_wait_fork_routine {
  206. wgs = append(wgs, wg)
  207. }
  208. } else {
  209. wg := global.wg_wait_fork_routine[*staticactions.WaitName]
  210. if wg != nil {
  211. wgs = append(wgs, wg)
  212. }
  213. }
  214. global.Unlock()
  215. for _, wg := range wgs {
  216. wg.Wait()
  217. }
  218. }
  219. if staticactions.ParallelCount != nil {
  220. mt.scopevars.Lock()
  221. switch staticactions.Scope {
  222. case "top":
  223. topvars.ch_parallel_count <- *staticactions.ParallelCount
  224. case "dir":
  225. dirvars.ch_parallel_count <- *staticactions.ParallelCount
  226. case "file":
  227. filevars.ch_parallel_count <- *staticactions.ParallelCount
  228. default:
  229. mqlvars.ch_parallel_count <- *staticactions.ParallelCount
  230. }
  231. mt.scopevars.Unlock()
  232. }
  233. // 重新获取修正后的动作
  234. actionexprs, e := getActionExprs(mqlstr, staticactionexprs)
  235. if !assert.Nil(t, e, e) {
  236. return e
  237. }
  238. dynamicactions := actionexprs.DynamicActions()
  239. if len(dynamicactions.SubscribeArgs) > 0 {
  240. subscribe(dynamicactions.SubscribeArgs...)
  241. }
  242. if len(dynamicactions.UnsubscribeArgs) > 0 {
  243. unsubscribe(dynamicactions.UnsubscribeArgs...)
  244. }
  245. values := []interface{}{}
  246. if len(dynamicactions.Params) > 0 {
  247. err := json.Unmarshal([]byte(dynamicactions.Params), &values)
  248. if err != nil {
  249. assert.Nil(t, fmt.Sprint("params参数只支持JSON Array,", dynamicactions.Params, mqlstr), err)
  250. return err
  251. }
  252. }
  253. x := atomic.AddInt32(&global.mqlcount, 1)
  254. atomic.AddInt32(&topvars.mqlcount, 1)
  255. atomic.AddInt32(&dirvars.mqlcount, 1)
  256. atomic.AddInt32(&filevars.mqlcount, 1)
  257. atomic.AddInt32(&mqlvars.mqlcount, 1)
  258. err := mt.RunMQLTryDo(t, ctx,
  259. global,
  260. topvars,
  261. dirvars,
  262. filevars,
  263. mqlvars,
  264. basedir, ffpath, mqlkey, mqlsn+"("+strconv.Itoa(int(x))+")", mqri, values, staticactions, actionexprs)
  265. if err != nil {
  266. // 执行过程有错,停止继续执行
  267. return err
  268. }
  269. if staticactions.SleepTime != nil {
  270. global.sleeptime += *staticactions.SleepTime
  271. topvars.sleeptime += *staticactions.SleepTime
  272. dirvars.sleeptime += *staticactions.SleepTime
  273. filevars.sleeptime += *staticactions.SleepTime
  274. mqlvars.sleeptime += *staticactions.SleepTime
  275. time.Sleep(*staticactions.SleepTime)
  276. }
  277. return nil
  278. }