mql_run_file.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package odbcmql
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "testing"
  12. "time"
  13. "gitee.com/wecisecode/util/pqc"
  14. "gitee.com/wecisecode/util/spliter"
  15. "github.com/stretchr/testify/assert"
  16. )
  17. func (mt *MQLTest) RunFile(t *testing.T, ctx context.Context,
  18. global *GlobalVars,
  19. topvars *CurrentVars,
  20. dirvars *CurrentVars,
  21. basedir, filename string) bool {
  22. // 读取文件内容
  23. ffpath := filepath.Join(basedir, filename)
  24. bs, err := os.ReadFile(ffpath)
  25. if !assert.Nil(t, err, err) {
  26. return false
  27. }
  28. // mql语句切分
  29. mqgr := NewMQLGroupRequest()
  30. var multilines *MQLRequest
  31. mqs := spliter.NewMQLSpliter(bufio.NewReader(strings.NewReader(string(bs))))
  32. for {
  33. mql, fromline, toline, fromchar, tochar, hasnext, _ := mqs.NextMQL()
  34. if !hasnext {
  35. break
  36. }
  37. // 去掉注释
  38. clean_mql := strings.Join(spliter.MQLSplitClean(mql), ";")
  39. if multilines != nil {
  40. // 多条语句处理
  41. if strings.TrimSpace(clean_mql) == "multilines end" {
  42. // 保留语句中的注释
  43. tm := strings.TrimSpace(strings.Replace(mql, "multilines end", "", 1))
  44. if len(tm) > 0 {
  45. if multilines.OriginQueryString != "" {
  46. multilines.OriginQueryString += ";"
  47. }
  48. multilines.OriginQueryString += tm
  49. }
  50. multilines.Toline = toline
  51. multilines.Tochar = tochar
  52. // 加入请求组,并初始化,提取动作信息
  53. e := mqgr.Append(multilines)
  54. if !assert.Nil(t, e, e) {
  55. return false
  56. }
  57. multilines = nil
  58. } else {
  59. if multilines.OriginQueryString != "" {
  60. multilines.OriginQueryString += ";"
  61. }
  62. multilines.OriginQueryString += mql
  63. multilines.Toline = toline
  64. multilines.Tochar = tochar
  65. }
  66. } else if strings.TrimSpace(clean_mql) == "multilines begin" {
  67. multilines = &MQLRequest{FilePath: ffpath}
  68. // 保留语句中的注释
  69. multilines.OriginQueryString = strings.TrimSpace(strings.Replace(mql, "multilines begin", "", 1))
  70. multilines.Fromline = fromline
  71. multilines.Fromchar = fromchar
  72. multilines.Toline = toline
  73. multilines.Tochar = tochar
  74. } else {
  75. // 单条语句
  76. mqr := &MQLRequest{OriginQueryString: mql, FilePath: ffpath, Fromline: fromline, Toline: toline, Fromchar: fromchar, Tochar: tochar}
  77. // 加入请求组,并初始化,提取动作信息
  78. e := mqgr.Append(mqr)
  79. if !assert.Nil(t, e, e) {
  80. return false
  81. }
  82. }
  83. }
  84. // mqls := spliter.MQLSplit(string(bs))
  85. mt.scopevars.Lock()
  86. if mt.scopevars.file[ffpath] == nil {
  87. mt.scopevars.file[ffpath] = &Variables{
  88. vars: map[string]interface{}{},
  89. loop_count: 1,
  90. loop_from: 1,
  91. loop_step: 1}
  92. }
  93. mt.scopevars.Unlock()
  94. var wg sync.WaitGroup
  95. st := time.Now()
  96. loop_i := 0
  97. parallel_queue := pqc.NewQueue[any](0)
  98. mqlcount := int32(0)
  99. for {
  100. mt.scopevars.Lock()
  101. ok := loop_i < mt.scopevars.file[ffpath].loop_count
  102. mt.scopevars.Unlock()
  103. if !ok {
  104. break
  105. }
  106. loop_i++
  107. //
  108. ch_parallel_count := make(chan int)
  109. filevars := &CurrentVars{
  110. loop_i: loop_i,
  111. ch_parallel_count: ch_parallel_count,
  112. }
  113. //
  114. ok_chan := make(chan bool, 1)
  115. parallel_chan := make(chan bool, 1)
  116. parallel := false
  117. parallelcount := 0
  118. done := false
  119. wg.Add(1)
  120. go func() {
  121. defer func() {
  122. atomic.AddInt32(&mqlcount, filevars.mqlcount)
  123. wg.Done()
  124. }()
  125. ch_ok := make(chan bool)
  126. go func() {
  127. for {
  128. select {
  129. case <-ch_parallel_count:
  130. if !done && !parallel {
  131. parallel = true
  132. // 加入并发控制队列
  133. if parallelcount > 0 {
  134. if parallelcount > parallel_queue.Size() {
  135. parallel_queue.Growth(parallelcount)
  136. }
  137. parallel_queue.Push(1)
  138. }
  139. parallel_chan <- true
  140. }
  141. case ok := <-ch_ok:
  142. ok_chan <- ok
  143. if parallel {
  144. if parallelcount > 0 {
  145. // 从并发控制队列中移除
  146. parallel_queue.Pop()
  147. }
  148. } else {
  149. done = true
  150. }
  151. return
  152. }
  153. }
  154. }()
  155. // logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次执行开始")
  156. ch_ok <- mt.RunMQLGroup(t, ctx,
  157. global,
  158. topvars,
  159. dirvars,
  160. filevars,
  161. basedir, ffpath, mqgr)
  162. // logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次执行结束")
  163. }()
  164. success := true
  165. select {
  166. case success = <-ok_chan: // 非并发,等待完成
  167. logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次顺序执行完成")
  168. case <-parallel_chan: // 并发,执行继续下一次
  169. logger.Info("file", ffpath, "第", fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i), "次并发执行继续")
  170. }
  171. if !success {
  172. return false
  173. }
  174. }
  175. wg.Wait()
  176. mt.scopevars.RLock()
  177. loop_count := mt.scopevars.file[ffpath].loop_count
  178. mt.scopevars.RUnlock()
  179. if loop_count > 1 {
  180. ut := time.Since(st)
  181. sn := fmt.Sprint(topvars.loop_i, ".", dirvars.loop_i)
  182. logger.Info(fmt.Sprint("file ", ffpath+"/"+sn, " loop ", loop_count, " times, run ", mqlcount, " mqls, usetime ", ut))
  183. }
  184. return true
  185. }
  186. func (mt *MQLTest) RunMQLGroup(t *testing.T, ctx context.Context,
  187. global *GlobalVars,
  188. topvars *CurrentVars,
  189. dirvars *CurrentVars,
  190. filevars *CurrentVars,
  191. basedir, ffpath string, mqgr *MQLGroupRequest) (pass bool) {
  192. pass = true
  193. var wg sync.WaitGroup
  194. for _, mqs := range mqgr.mqrs {
  195. wg.Add(1)
  196. go func(mqs []*MQLRequest) {
  197. defer wg.Done()
  198. if len(mqs) == 0 {
  199. return
  200. }
  201. if mqs[0].StaticActions.ForkName != nil {
  202. forkname := *mqs[0].StaticActions.ForkName
  203. global.Lock()
  204. wg := global.wg_wait_fork_routine[forkname]
  205. if wg == nil {
  206. wg = &sync.WaitGroup{}
  207. global.wg_wait_fork_routine[forkname] = wg
  208. }
  209. global.Unlock()
  210. wg.Add(1)
  211. defer wg.Done()
  212. }
  213. ok := mt.RunMQLs(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqs)
  214. if !ok {
  215. pass = false
  216. }
  217. }(mqs)
  218. }
  219. wg.Wait()
  220. return
  221. }
  222. func (mt *MQLTest) RunMQLs(t *testing.T, ctx context.Context,
  223. global *GlobalVars,
  224. topvars *CurrentVars,
  225. dirvars *CurrentVars,
  226. filevars *CurrentVars,
  227. basedir, ffpath string, mqrs []*MQLRequest) bool {
  228. for _, mqr := range mqrs {
  229. mqlkey := mqr.Key
  230. mqlstr := mqr.OriginQueryString
  231. if mqlstr == "" {
  232. continue
  233. }
  234. staticactions := mqr.StaticActions
  235. // 设置执行过程中的控制参数
  236. mt.InitScopeVars(basedir, ffpath, mqlkey, staticactions)
  237. ch_test_run_one_mql_result := make(chan bool)
  238. go func() {
  239. mqrinst := fmt.Sprint(mqlkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
  240. global.Lock()
  241. mqrdone := global.ch_wait_mql_done[mqrinst]
  242. if mqrdone == nil {
  243. mqrdone = make(chan bool)
  244. global.ch_wait_mql_done[mqrinst] = mqrdone
  245. }
  246. var waitmqrdone chan bool
  247. if mqr.WaitMQLRequest != nil {
  248. waitmqrkey := mqr.WaitMQLRequest.Key
  249. waitmqrinst := fmt.Sprint(waitmqrkey, "/", topvars.loop_i, ".", dirvars.loop_i, ".", filevars.loop_i)
  250. waitmqrdone = global.ch_wait_mql_done[waitmqrinst]
  251. if waitmqrdone == nil {
  252. waitmqrdone = make(chan bool)
  253. global.ch_wait_mql_done[waitmqrinst] = waitmqrdone
  254. }
  255. }
  256. global.Unlock()
  257. var ret bool
  258. defer func() { mqrdone <- ret }()
  259. if waitmqrdone != nil {
  260. v := <-waitmqrdone
  261. waitmqrdone <- v
  262. if !v {
  263. // 依赖MQR失败
  264. ch_test_run_one_mql_result <- false
  265. return
  266. }
  267. }
  268. ret = mt.RunMQR(t, ctx, global, topvars, dirvars, filevars, basedir, ffpath, mqr)
  269. ch_test_run_one_mql_result <- ret
  270. }()
  271. ret := <-ch_test_run_one_mql_result
  272. if !ret {
  273. return ret
  274. }
  275. // continue
  276. }
  277. return true
  278. }