mqls_action.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. package odbcmql
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "math"
  6. "regexp"
  7. "strings"
  8. "time"
  9. "git.wecise.com/wecise/odb-go/odb"
  10. "git.wecise.com/wecise/odb-go/odb/mql/parser"
  11. "git.wecise.com/wecise/odb-go/odb/mql/qstru"
  12. "gitee.com/wecisecode/util/cast"
  13. "gitee.com/wecisecode/util/mfmt"
  14. "gitee.com/wecisecode/util/set/strset"
  15. "gitee.com/wecisecode/util/sortedmap"
  16. )
  17. type Action struct {
  18. ID int
  19. Name string
  20. Args []interface{}
  21. SourceFromIndex int
  22. SourceToIndex int
  23. SourceText string //
  24. }
  25. type OnErrorProc int
  26. const (
  27. OnErrorHalt OnErrorProc = iota // halt, fail, exit
  28. OnErrorIgnore // continue, ignore
  29. OnErrorBreak // break, 中断循环
  30. OnErrorPass // expect, pass, need, sure
  31. )
  32. type StaticActions struct {
  33. Skip bool
  34. RetryLimit int
  35. Scope string
  36. Qmeta odb.QueryMeta
  37. Timeout *time.Duration
  38. LoopCount *int
  39. LoopFrom *int
  40. LoopStep *int
  41. ParallelCount *int
  42. ForkName *string
  43. WaitName *string
  44. SleepTime *time.Duration
  45. Variables map[string]interface{}
  46. BeforeRunActions []*Action
  47. }
  48. type DynamicActions struct {
  49. SubscribeArgs []string
  50. UnsubscribeArgs []string
  51. Params string
  52. }
  53. type OnErrorOption struct {
  54. regex map[OnErrorProc]*regexp.Regexp
  55. noerrorinfo bool
  56. }
  57. type ActionExprs struct {
  58. *sortedmap.LinkedMap
  59. staticActions *StaticActions
  60. dynamicActions *DynamicActions
  61. followThroughActions []*Action
  62. onerrorOption *OnErrorOption
  63. }
  64. func getActionExprs(mqlstr string, staticactions map[int]*Action) (actionexprs *ActionExprs, err error) {
  65. actionexprs = &ActionExprs{
  66. LinkedMap: sortedmap.NewLinkedMap(),
  67. onerrorOption: &OnErrorOption{
  68. regex: map[OnErrorProc]*regexp.Regexp{},
  69. },
  70. }
  71. {
  72. // /** ... **/ 之间的内容
  73. qlsindex := reactioneexprs.FindStringSubmatchIndex(mqlstr)
  74. if len(qlsindex) < 4 {
  75. // --{ ... } 之间的内容
  76. qlsindex = reactioneexprs_2.FindStringSubmatchIndex(mqlstr)
  77. if len(qlsindex) < 4 {
  78. // --{ ... } 之间的内容
  79. qlsindex = reactioneexprs_3.FindStringSubmatchIndex(mqlstr)
  80. if len(qlsindex) < 4 {
  81. return
  82. }
  83. }
  84. }
  85. isactionexpr := qlsindex[2]
  86. ieactionexpr := qlsindex[3]
  87. if isactionexpr < 0 || ieactionexpr < 0 {
  88. return
  89. }
  90. sactionexpr := mqlstr[isactionexpr:ieactionexpr]
  91. runestart := len([]rune(mqlstr[:isactionexpr]))
  92. // for i, q := range qls {
  93. // if i == len(qls)-1 {
  94. // break
  95. // }
  96. // if i > 0 {
  97. // isactionexpr += 2
  98. // }
  99. // isactionexpr += len(q)
  100. // }
  101. // ckfs := strings.TrimSpace(qls[len(qls)-1])
  102. // if len(ckfs) > 2 && ckfs[0] == '{' && ckfs[len(ckfs)-1] == '}' {
  103. // sactionexpr = ckfs[1 : len(ckfs)-1]
  104. // }
  105. if sactionexpr != "" {
  106. stmts, mp, e := parser.ParseMQL(sactionexpr)
  107. if e != nil {
  108. // logger.Error(e)
  109. return nil, e
  110. }
  111. ab := &ActionBuilder{
  112. runestart: runestart,
  113. actionid: 0,
  114. staticactions: staticactions,
  115. actionexprs: actionexprs,
  116. }
  117. for _, stmt := range stmts {
  118. fields := stmt.(*qstru.SelectStatement).Fields
  119. for _, sf := range fields {
  120. sp := mp.SourcePosition()[sf]
  121. if len(sp) != 1 {
  122. panic("len(sp) != 1,一个字段有且只有一个确定位置")
  123. }
  124. source := mp.Source(sp[0])
  125. ab.NewAction(sf.Expr, sp[0], source)
  126. }
  127. }
  128. }
  129. }
  130. return
  131. }
  132. type ActionBuilder struct {
  133. runestart int
  134. actionid int
  135. staticactions map[int]*Action
  136. actionexprs *ActionExprs
  137. }
  138. func (ab *ActionBuilder) NewAction(sfe *qstru.Expr, sp parser.SourcePosition, source string) *Action {
  139. ab.actionid++
  140. if act := ab.staticactions[ab.actionid]; act != nil {
  141. ab.actionexprs.Put(act.Name, act)
  142. return act
  143. }
  144. action := fmt.Sprint(sfe.FuncName)
  145. fargs := sfe.FuncArgs
  146. sargs := []interface{}{}
  147. for _, farg := range fargs {
  148. if farg == nil {
  149. sargs = append(sargs, nil)
  150. } else if farg.Const != nil {
  151. sargs = append(sargs, farg.Const)
  152. } else if farg.VarName != "" {
  153. sargs = append(sargs, farg)
  154. } else if farg.FuncName != nil && farg.FuncArgs != nil {
  155. sargs = append(sargs, ab.NewAction(farg, parser.SourcePosition{}, ""))
  156. } else {
  157. sargs = append(sargs, nil)
  158. }
  159. }
  160. act := &Action{
  161. ID: ab.actionid,
  162. Name: action,
  163. Args: sargs,
  164. SourceFromIndex: sp.Begin + ab.runestart,
  165. SourceToIndex: sp.End + ab.runestart,
  166. SourceText: source,
  167. }
  168. ab.actionexprs.Put(act.Name, act)
  169. // 记录在mql中的位置,避免固定数值设定被动态改变
  170. if STATIC_ACTION_NAMES.Has(action) {
  171. ab.staticactions[ab.actionid] = act
  172. }
  173. if FOLLOWTHROUGH_ACTION_NAMES.Has(act.Name) {
  174. ab.actionexprs.followThroughActions = append(ab.actionexprs.followThroughActions, act)
  175. }
  176. if ONERROR_ACTION_NAMES.Has(action) {
  177. switch action {
  178. case "noerrinfo":
  179. ab.actionexprs.onerrorOption.noerrorinfo = true
  180. case "onerror":
  181. if len(act.Args) > 0 {
  182. oep := OnErrorHalt
  183. switch cast.ToString(act.Args[0]) {
  184. case "break":
  185. oep = OnErrorBreak
  186. case "continue", "ignore":
  187. oep = OnErrorIgnore
  188. case "pass", "expect", "need", "must", "sure":
  189. oep = OnErrorPass
  190. case "fail", "halt", "exit":
  191. default:
  192. }
  193. msgs := []string{}
  194. for _, arg := range act.Args[1:] {
  195. sarg := cast.ToString(arg)
  196. if sarg != "" {
  197. msgs = append(msgs, sarg)
  198. }
  199. }
  200. msg := strings.Join(msgs, "|")
  201. msg = regexp.MustCompile("([^\\|\\*0-9A-Z_a-z\u0100-\uffff])").ReplaceAllString(msg, "\\$1")
  202. msg = strings.ReplaceAll(msg, "*", ".*")
  203. var regx *regexp.Regexp
  204. if msg == "" {
  205. regx = matchall
  206. } else {
  207. msg = "(?:" + strings.ReplaceAll(msg, "|", ")|(?:") + ")"
  208. regx = ab.actionexprs.onerrorOption.regex[oep]
  209. if regx != nil && regx != matchall {
  210. msg += regx.String() + "|" + msg
  211. }
  212. regx = regexp.MustCompile(msg)
  213. }
  214. ab.actionexprs.onerrorOption.regex[oep] = regx
  215. }
  216. }
  217. }
  218. return act
  219. }
  220. var ONERROR_ACTION_NAMES = strset.New("onerror", "noerrinfo")
  221. var STATIC_ACTION_NAMES = strset.New("skip", "retry", "scope", "beforerun", "set", "timeout", "qmeta", "loop", "loopfrom", "loopstep", "parallel", "fork", "wait", "sleep")
  222. var DYNAMIC_ACTION_NAMES = strset.New("subscribe", "unsubscribe", "params")
  223. var FOLLOWTHROUGH_ACTION_NAMES = strset.New("schema", "metainfo", "output", "outputcount", "count", "match", "matchcount", "equal")
  224. var time_layouts = []string{"2006-01-02 15:04:05", "2006-01-02", "2006-01-02 15:04:05.000", "2006-01-02 15:04:05.000000", "2006-01-02 15:04:05.000000000", "2006-01-02 15:04"}
  225. func ToStaticActions(actionexprs *ActionExprs) (staticActions *StaticActions, err error) {
  226. staticActions = actionexprs.staticActions
  227. if staticActions == nil {
  228. staticActions = &StaticActions{}
  229. actionexprs.staticActions = staticActions
  230. // skip 动作与位置顺序无关
  231. skip := actionexprs.Has("skip")
  232. actionexprs.Delete("skip")
  233. staticActions.Skip = skip
  234. if skip {
  235. // 跳过本条语句,不需要后续处理了
  236. return
  237. }
  238. // retry 动作与位置顺序无关
  239. retry_v, has_retry := actionexprs.Get("retry")
  240. actionexprs.Delete("retry")
  241. if has_retry {
  242. retry_args := retry_v.(*Action).Args
  243. retry_limit := 0
  244. if len(retry_args) > 0 {
  245. retry_limit = cast.ToInt(retry_args[0])
  246. if retry_limit < 0 {
  247. retry_limit = 0
  248. }
  249. }
  250. staticActions.RetryLimit = retry_limit
  251. }
  252. // scope 动作与位置顺序无关
  253. op_scope_v, has_scope := actionexprs.Get("scope")
  254. actionexprs.Delete("scope")
  255. if has_scope {
  256. op_scope_args := op_scope_v.(*Action).Args
  257. op_scope := "mql"
  258. if len(op_scope_args) > 0 {
  259. op_scope = cast.ToString(op_scope_args[0])
  260. if op_scope != "file" && op_scope != "dir" && op_scope != "top" {
  261. op_scope = "mql"
  262. }
  263. }
  264. staticActions.Scope = op_scope
  265. }
  266. // beforerun 动作与位置顺序无关
  267. op_beforerun_v, has_beforerun := actionexprs.Get("beforerun")
  268. actionexprs.Delete("beforerun")
  269. if has_beforerun {
  270. op_beforerun_args := op_beforerun_v.(*Action).Args
  271. for _, f := range op_beforerun_args {
  272. switch sf := f.(type) {
  273. case *Action:
  274. staticActions.BeforeRunActions = append(staticActions.BeforeRunActions, sf)
  275. }
  276. }
  277. }
  278. // set 动作与位置顺序无关
  279. op_set_v, has_set := actionexprs.Get("set")
  280. actionexprs.Delete("set")
  281. if has_set {
  282. op_set_args := op_set_v.(*Action).Args
  283. vars := map[string]interface{}{}
  284. for i := 0; i < len(op_set_args)-1; i += 2 {
  285. k := cast.ToString(op_set_args[i])
  286. v := op_set_args[i+1]
  287. switch av := v.(type) {
  288. case *qstru.Expr:
  289. ak := av.VarName
  290. if ak == "now" {
  291. v = time.Now()
  292. }
  293. case string:
  294. for _, tf := range time_layouts {
  295. t, e := time.Parse(tf, av)
  296. if e == nil {
  297. v = t
  298. break
  299. }
  300. }
  301. }
  302. vars[k] = v
  303. }
  304. staticActions.Variables = vars
  305. }
  306. // timeout 动作与位置顺序无关
  307. op_timeout_v, has_timeout := actionexprs.Get("timeout")
  308. actionexprs.Delete("timeout")
  309. if has_timeout {
  310. op_timeout_args := op_timeout_v.(*Action).Args
  311. var op_timeout time.Duration
  312. if len(op_timeout_args) > 0 {
  313. op_timeout = mfmt.ParseDuration(strings.TrimSpace(cast.ToString(op_timeout_args[0])))
  314. }
  315. staticActions.Timeout = &op_timeout
  316. }
  317. // qmeta 动作与位置顺序无关
  318. op_qmeta_v, has_qmeta := actionexprs.Get("qmeta")
  319. actionexprs.Delete("qmeta")
  320. if has_qmeta {
  321. op_qmeta_args := op_qmeta_v.(*Action).Args
  322. var op_qmeta odb.QueryMeta
  323. if len(op_qmeta_args) > 0 {
  324. qm := odb.QueryMeta{}
  325. switch qmeta_arg := op_qmeta_args[0].(type) {
  326. case map[string]any:
  327. bs, e := json.Marshal(qmeta_arg)
  328. if e != nil {
  329. return nil, e
  330. }
  331. e = json.Unmarshal(bs, &qm)
  332. if e != nil {
  333. return nil, e
  334. }
  335. default:
  336. s := strings.TrimSpace(cast.ToString(op_qmeta_args[0]))
  337. e := json.Unmarshal([]byte(s), &qm)
  338. if e != nil {
  339. return nil, e
  340. }
  341. }
  342. op_qmeta = qm
  343. }
  344. staticActions.Qmeta = op_qmeta
  345. }
  346. // loop 动作与位置顺序无关
  347. op_loop_v, has_loop := actionexprs.Get("loop")
  348. actionexprs.Delete("loop")
  349. if has_loop {
  350. op_loop_args := op_loop_v.(*Action).Args
  351. if len(op_loop_args) > 0 {
  352. loopcount := cast.ToInt(op_loop_args[0])
  353. if loopcount == 0 {
  354. loopcount = 1
  355. } else if loopcount < 0 {
  356. loopcount = math.MaxInt
  357. }
  358. staticActions.LoopCount = &loopcount
  359. }
  360. }
  361. // loopfrom 动作与位置顺序无关
  362. op_loopfrom_v, has_loopfrom := actionexprs.Get("loopfrom")
  363. actionexprs.Delete("loopfrom")
  364. if has_loopfrom {
  365. op_loopfrom_args := op_loopfrom_v.(*Action).Args
  366. if len(op_loopfrom_args) > 0 {
  367. loopfrom := cast.ToInt(op_loopfrom_args[0])
  368. staticActions.LoopFrom = &loopfrom
  369. }
  370. }
  371. // loopstep 动作与位置顺序无关
  372. op_loopstep_v, has_loopstep := actionexprs.Get("loopstep")
  373. actionexprs.Delete("loopstep")
  374. if has_loopstep {
  375. op_loopstep_args := op_loopstep_v.(*Action).Args
  376. if len(op_loopstep_args) > 0 {
  377. loopstep := cast.ToInt(op_loopstep_args[0])
  378. staticActions.LoopStep = &loopstep
  379. }
  380. }
  381. // parallel 动作与位置顺序无关
  382. op_parallel_v, has_parallel := actionexprs.Get("parallel")
  383. actionexprs.Delete("parallel")
  384. if has_parallel {
  385. op_parallel_args := op_parallel_v.(*Action).Args
  386. parallelcount := 0
  387. if len(op_parallel_args) > 0 {
  388. parallelcount = cast.ToInt(op_parallel_args[0])
  389. }
  390. staticActions.ParallelCount = &parallelcount
  391. }
  392. // fork 动作与位置顺序无关
  393. op_fork_v, has_fork := actionexprs.Get("fork")
  394. actionexprs.Delete("fork")
  395. if has_fork {
  396. op_fork_args := op_fork_v.(*Action).Args
  397. fork_name := ""
  398. if len(op_fork_args) > 0 {
  399. fork_name = cast.ToString(op_fork_args[0])
  400. }
  401. staticActions.ForkName = &fork_name
  402. }
  403. // wait 动作与位置顺序无关
  404. op_wait_v, has_wait := actionexprs.Get("wait")
  405. actionexprs.Delete("wait")
  406. if has_wait {
  407. op_wait_args := op_wait_v.(*Action).Args
  408. wait_name := ""
  409. if len(op_wait_args) > 0 {
  410. wait_name = cast.ToString(op_wait_args[0])
  411. }
  412. staticActions.WaitName = &wait_name
  413. }
  414. // sleep 动作与位置顺序无关
  415. op_sleep_v, has_sleep := actionexprs.Get("sleep")
  416. actionexprs.Delete("sleep")
  417. if has_sleep {
  418. op_sleep_args := op_sleep_v.(*Action).Args
  419. var op_sleep_time time.Duration
  420. if len(op_sleep_args) > 0 {
  421. op_sleep_time = mfmt.ParseDuration(strings.TrimSpace(cast.ToString(op_sleep_args[0])))
  422. }
  423. staticActions.SleepTime = &op_sleep_time
  424. }
  425. }
  426. return
  427. }
  428. func (actionexprs *ActionExprs) DynamicActions() (dynamicActions *DynamicActions) {
  429. dynamicActions = actionexprs.dynamicActions
  430. if actionexprs.dynamicActions == nil {
  431. dynamicActions = &DynamicActions{}
  432. actionexprs.dynamicActions = dynamicActions
  433. // subscribe 动作与位置顺序无关
  434. op_subscribe_args := []string{}
  435. op_subscribe_v, has_subscribe := actionexprs.Get("subscribe")
  436. actionexprs.Delete("subscribe")
  437. if has_subscribe {
  438. args := op_subscribe_v.(*Action).Args
  439. for _, argv := range args {
  440. switch arg := argv.(type) {
  441. case map[string]interface{}:
  442. bs, e := json.Marshal(arg)
  443. if e == nil {
  444. op_subscribe_args = append(op_subscribe_args, string(bs))
  445. }
  446. default:
  447. op_subscribe_args = append(op_subscribe_args, cast.ToString(arg))
  448. }
  449. }
  450. if len(op_subscribe_args) > 0 {
  451. subscribe(op_subscribe_args...)
  452. }
  453. }
  454. dynamicActions.SubscribeArgs = op_subscribe_args
  455. // unsubscribe 动作与位置顺序无关
  456. op_unsubscribe_args := []string{}
  457. op_unsubscribe_v, has_unsubscribe := actionexprs.Get("unsubscribe")
  458. actionexprs.Delete("unsubscribe")
  459. if has_unsubscribe {
  460. op_unsubscribe_args = cast.ToStringSlice(op_unsubscribe_v.(*Action).Args)
  461. if len(op_unsubscribe_args) > 0 {
  462. unsubscribe(op_unsubscribe_args...)
  463. }
  464. }
  465. dynamicActions.UnsubscribeArgs = op_unsubscribe_args
  466. // params 动作与位置顺序无关
  467. op_params_str := ""
  468. op_params_v, has_params := actionexprs.Get("params")
  469. actionexprs.Delete("params")
  470. if has_params {
  471. op_params_args := op_params_v.(*Action).Args
  472. bs, e := json.Marshal(op_params_args)
  473. if e == nil {
  474. op_params_str = string(bs)
  475. } else {
  476. op_params_str = strings.TrimSpace(strings.Join(cast.ToStringSlice(op_params_args), ","))
  477. if len(op_params_str) == 0 || op_params_str[0] != '[' || op_params_str[len(op_params_str)-1] != ']' {
  478. op_params_str = "[" + op_params_str + "]"
  479. }
  480. }
  481. }
  482. dynamicActions.Params = op_params_str
  483. }
  484. return
  485. }
  486. // 其它后续动作
  487. func (actionexprs *ActionExprs) FollowThroughActions() []*Action {
  488. if actionexprs.followThroughActions == nil {
  489. actionexprs.Fetch(func(key, value interface{}) bool {
  490. name := key.(string)
  491. action := value.(*Action)
  492. if FOLLOWTHROUGH_ACTION_NAMES.Has(name) {
  493. actionexprs.followThroughActions = append(actionexprs.followThroughActions, action)
  494. }
  495. return true
  496. })
  497. }
  498. return actionexprs.followThroughActions
  499. }