testdw.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package main
  2. import (
  3. //"fmt"
  4. //"time"
  5. "gitee.com/wecisecode/util/logger"
  6. . "git.wecise.com/wecise/odbserver/odb"
  7. )
  8. // CGO_ENABLED=1 go run --ldflags '-linkmode external -extldflags "-static"' testdw.go
  9. // #cgo LDFLAGS: -L/opt/odbserver/sqlite -lsqlite -ldl
  10. // #include "/opt/odbserver/sqlite/sqlite.h"
  11. // #include <stdint.h>
  12. // #include <stdlib.h>
  13. // extern int64_t uhaha_seed;
  14. // extern int64_t uhaha_ts;
  15. // void uhaha_begin_reader();
  16. // void uhaha_end_reader();
  17. func main() {
  18. //options := map[string]interface{} {"cache":odb.CacheAll}
  19. //g,err:= odb.New( options )
  20. //option := &StoreOption{Cache:CacheAll}
  21. option := &Option{ Cache:CacheAll, Keyspace:"matrix", DisableInitialHostLookup:true, DisableTask: true, DisableNotify: true }
  22. g,err:= test.NewG(option)
  23. if err != nil {
  24. logger.Error(err.Error())
  25. }else{
  26. defer g.Close()
  27. }
  28. //logger.SetRollingDaily("C:/test/zkcron/src/test", "test.log")
  29. logger.SetConsole(true)
  30. //logger.SetLevel(logger.DEBUG)
  31. //------------------------------------------
  32. //insert into datawindow(log_insert, '/matrix/test/bucketlog', 'bklog', 'file1.log') values( '{"id": "linux:wecise", "msg": "1-1--DEBUG [CompactionExecutor:584] 2020-07-25 00:49:59,666 TimeWindowCompactionStrategy.java:124 - TWCS skipping check for fully expired SSTables"}' )
  33. datawindow(log_insert).insert( "/matrix/test/bucketlog", "bklog", "file1.log", "linux:wecise", "msg": "1-1--DEBUG [CompactionExecutor:584] 2020-07-25 00:49:59,666 TimeWindowCompactionStrategy.java:124 - TWCS skipping check for fully expired SSTables" )
  34. // put(key, value) return => last_timestamp, total_size
  35. // get(key) return nano_timestamp, value
  36. // delete(key , nano_timestamp)
  37. // clear( size ) => 前几条, 或全部
  38. // https://github.com/hoisie/mustache
  39. // $size ,$timestamp
  40. //------------------------------------------
  41. rst,meta,err := g.Query(`
  42. CREATE DataWindow log_insert <$class, $bucket, $file>
  43. INPUT class, bucket, file, id, msg
  44. SCHEDULE AFTER INSERT
  45. COMMENT 'batch insert log'
  46. TIMEOUT 15 SECOND
  47. SIZE 1000
  48. WHEN fullsize or timeout THEN
  49. MQL `insert into {{$class}} (id, {{$bucket}}[ {{$file}} ]) values (?, ?)` , $id , join( msg, "\n")
  50. clear $size
  51. END
  52. `)
  53. rst,meta,err := g.Query(`
  54. CREATE DataWindow auto_closed <$class>
  55. INPUT class, status, eventid
  56. COMMENT 'auto close event'
  57. SCHEDULE BEFORE INSERT
  58. SLIDING 5 MINUTE
  59. WHEN status = 'open' and eventid = $eventid AS $serial THEN
  60. MQL `update {{class}} SET status='closed' WHERE id = ?` $eventid
  61. CANCEL
  62. DELETE $serial
  63. END
  64. `)
  65. rst,meta,err := g.Query(`
  66. CREATE DataWindow dyn_thred <$EventClass, $Field>
  67. INPUT EventClass, Field, host, param, value
  68. SCHEDULE BEFORE INSERT
  69. SLIDING 1 HOUR
  70. WHEN max(value) < $value THEN
  71. MQL `INSERT INTO {{ $EventClass}} (msg) VALUES (?) ` `in $Host, current $param value is {{ $value }} , too high .`
  72. END
  73. `)
  74. rst,meta,err := g.Query(`
  75. CREATE DataWindow chain <$transid>
  76. INPUT transid
  77. COMMENT 'chain log analysis'
  78. TIMEOUT 1 MINUTE
  79. SIZE 5
  80. WHEN fullsize THEN
  81. FORWARD "queue name" data
  82. clear
  83. END
  84. WHEN timeout THEN
  85. log info 'tmplate'
  86. clear
  87. END
  88. `)
  89. rst,meta,err := g.Query(`
  90. CREATE DataWindow chain <$transid>
  91. INPUT transid
  92. COMMENT 'chain log analysis'
  93. TIMEOUT 1 MINUTE
  94. SIZE 5
  95. WHEN fullsize THEN
  96. FORWARD "queue name" json(data)
  97. clear
  98. END
  99. WHEN timeout THEN
  100. foreach( data, $v => {
  101. $ts = gjson($v, "timestamp")
  102. log info `trans $transid isn’t complete . timestamp $ts`
  103. } )
  104. $timestamp = timestamp
  105. MQL `INSERT INTO /matrix/devops/alerts_stats (msg) VALUES (?)` `trans $transid isn’t complete .`
  106. clear
  107. END
  108. `)
  109. //rst,meta,err := g.Query(`update /matrix/test set name="test8" , test="test" WHERE test2 = "Test22"`)
  110. if err != nil {
  111. logger.Info("Query error:"+err.Error())
  112. logger.Warn(meta["type"])
  113. }else{
  114. logger.Info("rst ===>", rst)
  115. //logger.Error("meta ===>", meta)
  116. if meta["trace"] != nil && len(meta["trace"].([]string)) > 0 {
  117. for _, msg := range meta["trace"].([]string) {
  118. logger.Error(msg)
  119. }
  120. }
  121. logger.Warn(meta["columns"])
  122. }
  123. }