mqls_action.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. package odbcmql
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "math"
  6. "regexp"
  7. "strings"
  8. "time"
  9. "git.wecise.com/wecise/odb-go/odb"
  10. "git.wecise.com/wecise/odb-go/odb/mql/parser"
  11. "git.wecise.com/wecise/odb-go/odb/mql/qstru"
  12. "gitee.com/wecisecode/util/cast"
  13. "gitee.com/wecisecode/util/mfmt"
  14. "gitee.com/wecisecode/util/set/strset"
  15. "gitee.com/wecisecode/util/sortedmap"
  16. )
  17. type Action struct {
  18. ID int
  19. Name string
  20. Args []interface{}
  21. SourceFromIndex int
  22. SourceToIndex int
  23. SourceText string //
  24. }
  25. type OnErrorProc int
  26. const (
  27. OnErrorHalt OnErrorProc = iota // halt, fail, exit
  28. OnErrorIgnore // continue, ignore
  29. OnErrorBreak // break, 中断循环
  30. OnErrorPass // expect, pass, need, sure
  31. )
  32. type StaticActions struct {
  33. Skip bool
  34. RetryLimit int
  35. Scope string
  36. Qmeta odb.QueryMeta
  37. Timeout *time.Duration
  38. LoopCount *int
  39. LoopFrom *int
  40. LoopStep *int
  41. ParallelCount *int
  42. ForkName *string
  43. WaitName *string
  44. SleepTime *time.Duration
  45. Variables map[string]interface{}
  46. BeforeRunActions []*Action
  47. }
  48. type DynamicActions struct {
  49. SubscribeArgs []string
  50. UnsubscribeArgs []string
  51. Params string
  52. }
  53. type OnErrorOption struct {
  54. regex map[OnErrorProc]*regexp.Regexp
  55. noerrorinfo bool
  56. }
  57. type ActionExprs struct {
  58. *sortedmap.LinkedMap
  59. staticActions *StaticActions
  60. dynamicActions *DynamicActions
  61. followThroughActions []*Action
  62. onerrorOption *OnErrorOption
  63. }
  64. func getActionExprs(mqlstr string, staticactions map[int]*Action) (actionexprs *ActionExprs, err error) {
  65. actionexprs = &ActionExprs{
  66. LinkedMap: sortedmap.NewLinkedMap(),
  67. onerrorOption: &OnErrorOption{
  68. regex: map[OnErrorProc]*regexp.Regexp{},
  69. },
  70. }
  71. {
  72. // /** ... **/ 之间的内容
  73. qlsindex := reactioneexprs.FindStringSubmatchIndex(mqlstr)
  74. if len(qlsindex) < 4 {
  75. // --{ ... } 之间的内容
  76. qlsindex = reactioneexprs_2.FindStringSubmatchIndex(mqlstr)
  77. if len(qlsindex) < 4 {
  78. // --{ ... } 之间的内容
  79. qlsindex = reactioneexprs_3.FindStringSubmatchIndex(mqlstr)
  80. if len(qlsindex) < 4 {
  81. return
  82. }
  83. }
  84. }
  85. isactionexpr := qlsindex[2]
  86. ieactionexpr := qlsindex[3]
  87. if isactionexpr < 0 || ieactionexpr < 0 {
  88. return
  89. }
  90. sactionexpr := mqlstr[isactionexpr:ieactionexpr]
  91. runestart := len([]rune(mqlstr[:isactionexpr]))
  92. // for i, q := range qls {
  93. // if i == len(qls)-1 {
  94. // break
  95. // }
  96. // if i > 0 {
  97. // isactionexpr += 2
  98. // }
  99. // isactionexpr += len(q)
  100. // }
  101. // ckfs := strings.TrimSpace(qls[len(qls)-1])
  102. // if len(ckfs) > 2 && ckfs[0] == '{' && ckfs[len(ckfs)-1] == '}' {
  103. // sactionexpr = ckfs[1 : len(ckfs)-1]
  104. // }
  105. if sactionexpr != "" {
  106. // 解析动作表达式,构建动作对象
  107. // 对应 odo-go/odb/mql/parser/grammar/mql.peg.txt 语法中的 Functions 部分
  108. stmts, mp, e := parser.ParseMQL(sactionexpr)
  109. if e != nil {
  110. // logger.Error(e)
  111. return nil, e
  112. }
  113. ab := &ActionBuilder{
  114. runestart: runestart,
  115. actionid: 0,
  116. staticactions: staticactions,
  117. actionexprs: actionexprs,
  118. }
  119. for _, stmt := range stmts {
  120. fields := stmt.(*qstru.SelectStatement).Fields
  121. for _, sf := range fields {
  122. sp := mp.SourcePosition()[sf]
  123. if len(sp) != 1 {
  124. panic("len(sp) != 1,一个字段有且只有一个确定位置")
  125. }
  126. source := mp.Source(sp[0])
  127. ab.NewAction(sf.Expr, sp[0], source)
  128. }
  129. }
  130. }
  131. }
  132. return
  133. }
  134. type ActionBuilder struct {
  135. runestart int
  136. actionid int
  137. staticactions map[int]*Action
  138. actionexprs *ActionExprs
  139. }
  140. func (ab *ActionBuilder) NewAction(sfe *qstru.Expr, sp parser.SourcePosition, source string) *Action {
  141. ab.actionid++
  142. if act := ab.staticactions[ab.actionid]; act != nil {
  143. ab.actionexprs.Put(act.Name, act)
  144. return act
  145. }
  146. action := fmt.Sprint(sfe.FuncName)
  147. fargs := sfe.FuncArgs
  148. sargs := []interface{}{}
  149. for _, farg := range fargs {
  150. if farg == nil {
  151. sargs = append(sargs, nil)
  152. } else if farg.Const != nil {
  153. sargs = append(sargs, farg.Const)
  154. } else if len(farg.VarName) != 0 && farg.VarName[0] != "" {
  155. sargs = append(sargs, farg)
  156. } else if farg.FuncName != nil && farg.FuncArgs != nil {
  157. sargs = append(sargs, ab.NewAction(farg, parser.SourcePosition{}, ""))
  158. } else {
  159. sargs = append(sargs, nil)
  160. }
  161. }
  162. act := &Action{
  163. ID: ab.actionid,
  164. Name: action,
  165. Args: sargs,
  166. SourceFromIndex: sp.Begin + ab.runestart,
  167. SourceToIndex: sp.End + ab.runestart,
  168. SourceText: source,
  169. }
  170. ab.actionexprs.Put(act.Name, act)
  171. // 记录在mql中的位置,避免固定数值设定被动态改变
  172. if STATIC_ACTION_NAMES.Has(action) {
  173. ab.staticactions[ab.actionid] = act
  174. }
  175. if FOLLOWTHROUGH_ACTION_NAMES.Has(act.Name) {
  176. ab.actionexprs.followThroughActions = append(ab.actionexprs.followThroughActions, act)
  177. }
  178. if ONERROR_ACTION_NAMES.Has(action) {
  179. switch action {
  180. case "noerrinfo":
  181. ab.actionexprs.onerrorOption.noerrorinfo = true
  182. case "onerror":
  183. if len(act.Args) > 0 {
  184. oep := OnErrorHalt
  185. switch cast.ToString(act.Args[0]) {
  186. case "break":
  187. oep = OnErrorBreak
  188. case "continue", "ignore":
  189. oep = OnErrorIgnore
  190. case "pass", "expect", "need", "must", "sure":
  191. oep = OnErrorPass
  192. case "fail", "halt", "exit":
  193. default:
  194. }
  195. msgs := []string{}
  196. for _, arg := range act.Args[1:] {
  197. sarg := cast.ToString(arg)
  198. if sarg != "" {
  199. msgs = append(msgs, sarg)
  200. }
  201. }
  202. msg := strings.Join(msgs, "|")
  203. msg = regexp.MustCompile("([^\\|\\*0-9A-Z_a-z\u0100-\uffff])").ReplaceAllString(msg, "\\$1")
  204. msg = strings.ReplaceAll(msg, "*", ".*")
  205. var regx *regexp.Regexp
  206. if msg == "" {
  207. regx = matchall
  208. } else {
  209. msg = "(?:" + strings.ReplaceAll(msg, "|", ")|(?:") + ")"
  210. regx = ab.actionexprs.onerrorOption.regex[oep]
  211. if regx != nil && regx != matchall {
  212. msg += regx.String() + "|" + msg
  213. }
  214. regx = regexp.MustCompile(msg)
  215. }
  216. ab.actionexprs.onerrorOption.regex[oep] = regx
  217. }
  218. }
  219. }
  220. return act
  221. }
  222. var ONERROR_ACTION_NAMES = strset.New("onerror", "noerrinfo")
  223. var STATIC_ACTION_NAMES = strset.New("skip", "retry", "scope", "beforerun", "set", "timeout", "qmeta", "loop", "loopfrom", "loopstep", "parallel", "fork", "wait", "sleep")
  224. var DYNAMIC_ACTION_NAMES = strset.New("subscribe", "unsubscribe", "params")
  225. var FOLLOWTHROUGH_ACTION_NAMES = strset.New("schema", "metainfo", "output", "outputcount", "count", "match", "matchcount", "equal")
  226. var time_layouts = []string{"2006-01-02 15:04:05", "2006-01-02", "2006-01-02 15:04:05.000", "2006-01-02 15:04:05.000000", "2006-01-02 15:04:05.000000000", "2006-01-02 15:04"}
  227. func ToStaticActions(actionexprs *ActionExprs) (staticActions *StaticActions, err error) {
  228. staticActions = actionexprs.staticActions
  229. if staticActions == nil {
  230. staticActions = &StaticActions{}
  231. actionexprs.staticActions = staticActions
  232. // skip 动作与位置顺序无关
  233. skip := actionexprs.Has("skip")
  234. actionexprs.Delete("skip")
  235. staticActions.Skip = skip
  236. if skip {
  237. // 跳过本条语句,不需要后续处理了
  238. return
  239. }
  240. // retry 动作与位置顺序无关
  241. retry_v, has_retry := actionexprs.Get("retry")
  242. actionexprs.Delete("retry")
  243. if has_retry {
  244. retry_args := retry_v.(*Action).Args
  245. retry_limit := 0
  246. if len(retry_args) > 0 {
  247. retry_limit = cast.ToInt(retry_args[0])
  248. if retry_limit < 0 {
  249. retry_limit = 0
  250. }
  251. }
  252. staticActions.RetryLimit = retry_limit
  253. }
  254. // scope 动作与位置顺序无关
  255. op_scope_v, has_scope := actionexprs.Get("scope")
  256. actionexprs.Delete("scope")
  257. if has_scope {
  258. op_scope_args := op_scope_v.(*Action).Args
  259. op_scope := "mql"
  260. if len(op_scope_args) > 0 {
  261. op_scope = cast.ToString(op_scope_args[0])
  262. if op_scope != "file" && op_scope != "dir" && op_scope != "top" {
  263. op_scope = "mql"
  264. }
  265. }
  266. staticActions.Scope = op_scope
  267. }
  268. // beforerun 动作与位置顺序无关
  269. op_beforerun_v, has_beforerun := actionexprs.Get("beforerun")
  270. actionexprs.Delete("beforerun")
  271. if has_beforerun {
  272. op_beforerun_args := op_beforerun_v.(*Action).Args
  273. for _, f := range op_beforerun_args {
  274. switch sf := f.(type) {
  275. case *Action:
  276. staticActions.BeforeRunActions = append(staticActions.BeforeRunActions, sf)
  277. }
  278. }
  279. }
  280. // set 动作与位置顺序无关
  281. op_set_v, has_set := actionexprs.Get("set")
  282. actionexprs.Delete("set")
  283. if has_set {
  284. op_set_args := op_set_v.(*Action).Args
  285. vars := map[string]interface{}{}
  286. for i := 0; i < len(op_set_args)-1; i += 2 {
  287. k := cast.ToString(op_set_args[i])
  288. v := op_set_args[i+1]
  289. switch av := v.(type) {
  290. case *qstru.Expr:
  291. ak := ""
  292. if len(av.VarName) > 0 {
  293. ak = av.VarName[0]
  294. }
  295. if ak == "now" {
  296. v = time.Now()
  297. }
  298. case string:
  299. for _, tf := range time_layouts {
  300. t, e := time.Parse(tf, av)
  301. if e == nil {
  302. v = t
  303. break
  304. }
  305. }
  306. }
  307. vars[k] = v
  308. }
  309. staticActions.Variables = vars
  310. }
  311. // timeout 动作与位置顺序无关
  312. op_timeout_v, has_timeout := actionexprs.Get("timeout")
  313. actionexprs.Delete("timeout")
  314. if has_timeout {
  315. op_timeout_args := op_timeout_v.(*Action).Args
  316. var op_timeout time.Duration
  317. if len(op_timeout_args) > 0 {
  318. op_timeout = mfmt.ParseDuration(strings.TrimSpace(cast.ToString(op_timeout_args[0])))
  319. }
  320. staticActions.Timeout = &op_timeout
  321. }
  322. // qmeta 动作与位置顺序无关
  323. op_qmeta_v, has_qmeta := actionexprs.Get("qmeta")
  324. actionexprs.Delete("qmeta")
  325. if has_qmeta {
  326. op_qmeta_args := op_qmeta_v.(*Action).Args
  327. var op_qmeta odb.QueryMeta
  328. if len(op_qmeta_args) > 0 {
  329. qm := odb.QueryMeta{}
  330. switch qmeta_arg := op_qmeta_args[0].(type) {
  331. case map[string]any:
  332. bs, e := json.Marshal(qmeta_arg)
  333. if e != nil {
  334. return nil, e
  335. }
  336. e = json.Unmarshal(bs, &qm)
  337. if e != nil {
  338. return nil, e
  339. }
  340. default:
  341. s := strings.TrimSpace(cast.ToString(op_qmeta_args[0]))
  342. e := json.Unmarshal([]byte(s), &qm)
  343. if e != nil {
  344. return nil, e
  345. }
  346. }
  347. op_qmeta = qm
  348. }
  349. staticActions.Qmeta = op_qmeta
  350. }
  351. // loop 动作与位置顺序无关
  352. op_loop_v, has_loop := actionexprs.Get("loop")
  353. actionexprs.Delete("loop")
  354. if has_loop {
  355. op_loop_args := op_loop_v.(*Action).Args
  356. if len(op_loop_args) > 0 {
  357. loopcount := cast.ToInt(op_loop_args[0])
  358. if loopcount == 0 {
  359. loopcount = 1
  360. } else if loopcount < 0 {
  361. loopcount = math.MaxInt
  362. }
  363. staticActions.LoopCount = &loopcount
  364. }
  365. }
  366. // loopfrom 动作与位置顺序无关
  367. op_loopfrom_v, has_loopfrom := actionexprs.Get("loopfrom")
  368. actionexprs.Delete("loopfrom")
  369. if has_loopfrom {
  370. op_loopfrom_args := op_loopfrom_v.(*Action).Args
  371. if len(op_loopfrom_args) > 0 {
  372. loopfrom := cast.ToInt(op_loopfrom_args[0])
  373. staticActions.LoopFrom = &loopfrom
  374. }
  375. }
  376. // loopstep 动作与位置顺序无关
  377. op_loopstep_v, has_loopstep := actionexprs.Get("loopstep")
  378. actionexprs.Delete("loopstep")
  379. if has_loopstep {
  380. op_loopstep_args := op_loopstep_v.(*Action).Args
  381. if len(op_loopstep_args) > 0 {
  382. loopstep := cast.ToInt(op_loopstep_args[0])
  383. staticActions.LoopStep = &loopstep
  384. }
  385. }
  386. // parallel 动作与位置顺序无关
  387. op_parallel_v, has_parallel := actionexprs.Get("parallel")
  388. actionexprs.Delete("parallel")
  389. if has_parallel {
  390. op_parallel_args := op_parallel_v.(*Action).Args
  391. parallelcount := 0
  392. if len(op_parallel_args) > 0 {
  393. parallelcount = cast.ToInt(op_parallel_args[0])
  394. }
  395. staticActions.ParallelCount = &parallelcount
  396. }
  397. // fork 动作与位置顺序无关
  398. op_fork_v, has_fork := actionexprs.Get("fork")
  399. actionexprs.Delete("fork")
  400. if has_fork {
  401. op_fork_args := op_fork_v.(*Action).Args
  402. fork_name := ""
  403. if len(op_fork_args) > 0 {
  404. fork_name = cast.ToString(op_fork_args[0])
  405. }
  406. staticActions.ForkName = &fork_name
  407. }
  408. // wait 动作与位置顺序无关
  409. op_wait_v, has_wait := actionexprs.Get("wait")
  410. actionexprs.Delete("wait")
  411. if has_wait {
  412. op_wait_args := op_wait_v.(*Action).Args
  413. wait_name := ""
  414. if len(op_wait_args) > 0 {
  415. wait_name = cast.ToString(op_wait_args[0])
  416. }
  417. staticActions.WaitName = &wait_name
  418. }
  419. // sleep 动作与位置顺序无关
  420. op_sleep_v, has_sleep := actionexprs.Get("sleep")
  421. actionexprs.Delete("sleep")
  422. if has_sleep {
  423. op_sleep_args := op_sleep_v.(*Action).Args
  424. var op_sleep_time time.Duration
  425. if len(op_sleep_args) > 0 {
  426. op_sleep_time = mfmt.ParseDuration(strings.TrimSpace(cast.ToString(op_sleep_args[0])))
  427. }
  428. staticActions.SleepTime = &op_sleep_time
  429. }
  430. }
  431. return
  432. }
  433. func (actionexprs *ActionExprs) DynamicActions() (dynamicActions *DynamicActions) {
  434. dynamicActions = actionexprs.dynamicActions
  435. if actionexprs.dynamicActions == nil {
  436. dynamicActions = &DynamicActions{}
  437. actionexprs.dynamicActions = dynamicActions
  438. // subscribe 动作与位置顺序无关
  439. op_subscribe_args := []string{}
  440. op_subscribe_v, has_subscribe := actionexprs.Get("subscribe")
  441. actionexprs.Delete("subscribe")
  442. if has_subscribe {
  443. args := op_subscribe_v.(*Action).Args
  444. for _, argv := range args {
  445. switch arg := argv.(type) {
  446. case map[string]interface{}:
  447. bs, e := json.Marshal(arg)
  448. if e == nil {
  449. op_subscribe_args = append(op_subscribe_args, string(bs))
  450. }
  451. default:
  452. op_subscribe_args = append(op_subscribe_args, cast.ToString(arg))
  453. }
  454. }
  455. if len(op_subscribe_args) > 0 {
  456. subscribe(op_subscribe_args...)
  457. }
  458. }
  459. dynamicActions.SubscribeArgs = op_subscribe_args
  460. // unsubscribe 动作与位置顺序无关
  461. op_unsubscribe_args := []string{}
  462. op_unsubscribe_v, has_unsubscribe := actionexprs.Get("unsubscribe")
  463. actionexprs.Delete("unsubscribe")
  464. if has_unsubscribe {
  465. op_unsubscribe_args = cast.ToStringSlice(op_unsubscribe_v.(*Action).Args)
  466. if len(op_unsubscribe_args) > 0 {
  467. unsubscribe(op_unsubscribe_args...)
  468. }
  469. }
  470. dynamicActions.UnsubscribeArgs = op_unsubscribe_args
  471. // params 动作与位置顺序无关
  472. op_params_str := ""
  473. op_params_v, has_params := actionexprs.Get("params")
  474. actionexprs.Delete("params")
  475. if has_params {
  476. op_params_args := op_params_v.(*Action).Args
  477. bs, e := json.Marshal(op_params_args)
  478. if e == nil {
  479. op_params_str = string(bs)
  480. } else {
  481. op_params_str = strings.TrimSpace(strings.Join(cast.ToStringSlice(op_params_args), ","))
  482. if len(op_params_str) == 0 || op_params_str[0] != '[' || op_params_str[len(op_params_str)-1] != ']' {
  483. op_params_str = "[" + op_params_str + "]"
  484. }
  485. }
  486. }
  487. dynamicActions.Params = op_params_str
  488. }
  489. return
  490. }
  491. // 其它后续动作
  492. func (actionexprs *ActionExprs) FollowThroughActions() []*Action {
  493. if actionexprs.followThroughActions == nil {
  494. actionexprs.Fetch(func(key, value interface{}) bool {
  495. name := key.(string)
  496. action := value.(*Action)
  497. if FOLLOWTHROUGH_ACTION_NAMES.Has(name) {
  498. actionexprs.followThroughActions = append(actionexprs.followThroughActions, action)
  499. }
  500. return true
  501. })
  502. }
  503. return actionexprs.followThroughActions
  504. }