client_test.go 11 KB


  1. package odbc_test
  2. import (
  3. "fmt"
  4. "regexp"
  5. "strconv"
  6. "sync/atomic"
  7. "testing"
  8. "time"
  9. ccfg "git.wecise.com/wecise/common/matrix/cfg"
  10. clog "git.wecise.com/wecise/common/matrix/logger"
  11. "git.wecise.com/wecise/odb-go/odb"
  12. "git.wecise.com/wecise/odbtest/odbc"
  13. )
  14. var cfg = ccfg.MConfig()
  15. var log = clog.New().WithConfig(cfg, "log")
  16. func TestConcurrent(t *testing.T) {
  17. var n, reqcount, okcount, confirmcount, errcount int32
  18. odb.QueryDoAsync = true
  19. log.Info("QueryDoAsync =", odb.QueryDoAsync)
  20. log.Info("开始测试连接到", odbc.Config().Hosts)
  21. client, err := odbc.NewClient()
  22. if err != nil {
  23. log.Error(fmt.Sprint("connection error:", err, atomic.AddInt32(&errcount, 1)))
  24. return
  25. }
  26. defer func() {
  27. _ = client.Close()
  28. log.Info("connection closed")
  29. }()
  30. classname := "/oktest/alert_status"
  31. N := cfg.GetInt("N", 0)
  32. if N != 0 {
  33. classname += "_" + strconv.Itoa(N)
  34. }
  35. log.Info("开启事件订阅")
  36. odbc.SubscribeDataChangeTest(client)
  37. log.Info("开始创建相关类", classname)
  38. cns := regexp.MustCompile("^/([^/]+)/([^/]+)").FindAllStringSubmatch(classname, -1)
  39. rootclass := "/" + cns[0][1]
  40. aliasname := cns[0][2] + "_" + cns[0][1]
  41. _, err = client.Query(`
  42. create class if not exists ` + rootclass + `();
  43. create class if not exists ` + classname + ` (
  44. acknowledged int "acknowledged",
  45. msg varchar "msg",
  46. agent varchar "AAgent",
  47. alertgroup varchar "alertgroup",
  48. alertkey varchar "alertkey",
  49. bsm_classname varchar "bsm_classname",
  50. bsm_identity varchar "bsm_identity",
  51. bsm_subidentity varchar "bsm_subidentity",
  52. businessimpact int "businessimpact",
  53. causetype int "causetype",
  54. ciid varchar "ciid",
  55. cmdbenrichstatus int "cmdbenrichstatus",
  56. corrscore int "corrscore",
  57. customer varchar "customer",
  58. domaingroup varchar "domaingroup",
  59. domainsubgroup varchar "domainsubgroup",
  60. duration int "duration",
  61. emailstatus int "emailstatus",
  62. eventid varchar "eventid",
  63. eventtype enum {
  64. "10":["主机告警"],
  65. "11":["数据库告警"],
  66. "12":["中间件类告警"],
  67. "13":["应用类告警"],
  68. "14":["硬件类告警"],
  69. "99":["其他类告警"]} "事件类型",
  70. expiretime int "expiretime",
  71. extendedattr varchar "extendedattr",
  72. firstoccurrence timestamp "firstoccurrence",
  73. flash int "flash",
  74. grade int "grade",
  75. identifier varchar "identifier",
  76. internallast timestamp "internallast",
  77. lastoccurrence timestamp "lastoccurrence",
  78. localnodealias varchar "localnodealias",
  79. localobjrelate int "localobjrelate",
  80. localpriobj varchar "localpriobj",
  81. localrootobj varchar "localrootobj",
  82. localsecobj varchar "localsecobj",
  83. localtertobj varchar "localtertobj",
  84. location varchar "location",
  85. logticket int "logticket",
  86. maintenance enum {
  87. "0":["Unknown", "未知"],
  88. "1":["true", "维护期内"],
  89. "2":["false", "维护期外"]} "级别",
  90. manager varchar "manager",
  91. market varchar "market",
  92. netype varchar "netype",
  93. nmoscausetype int "nmoscausetype",
  94. nmosdomainname varchar "nmosdomainname",
  95. nmosentityid int "nmosentityid",
  96. nmoseventmap varchar "nmoseventmap",
  97. nmosmanagedstatus int "nmosmanagedstatus",
  98. nmosobjinst int "nmosobjinst",
  99. nmosserial varchar "nmosserial",
  100. node varchar "node",
  101. nodealias varchar "nodealias",
  102. nodename varchar "nodename",
  103. objectfullname varchar "objectfullname",
  104. oldrow int "oldrow",
  105. omni_class varchar "class",
  106. omni_serial int "serial",
  107. originalseverity int "originalseverity",
  108. ownergid int "ownergid",
  109. owneruid int "owneruid",
  110. physicalcard varchar "physicalcard",
  111. physicalport int "physicalport",
  112. physicalslot int "physicalslot",
  113. platform varchar "platform",
  114. poll int "poll",
  115. probesubsecondid int "probesubsecondid",
  116. processreq int "processreq",
  117. region varchar "region",
  118. remotenodealias varchar "remotenodealias",
  119. remoteobjrelate int "remoteobjrelate",
  120. remotepriobj varchar "remotepriobj",
  121. remoterootobj varchar "remoterootobj",
  122. remotesecobj varchar "remotesecobj",
  123. remotetertobj varchar "remotetertobj",
  124. servername varchar "servername",
  125. serverserial int "serverserial",
  126. service varchar "service",
  127. servicenowerrorcode int "servicenowerrorcode",
  128. servicenowstate int "servicenowstate",
  129. servicenowsysid varchar "servicenowsysid",
  130. severity enum {
  131. "0":["Unknown", "未知"],
  132. "1":["Trace", "追踪"],
  133. "2":["Debug", "调试"],
  134. "3":["Info", "消息"],
  135. "4":["Warn", "警告"],
  136. "5":["Error", "错误"],
  137. "6":["Fatal", "致命错误"]} "级别",
  138. siteid varchar "siteid",
  139. statechange timestamp "statechange",
  140. summary varchar "summary",
  141. suppressescl int "suppressescl",
  142. tally int "tally",
  143. tasklist int "tasklist",
  144. ticketstatus varchar "ticketstatus",
  145. ttnumber varchar "ttnumber",
  146. ttstate int "ttstate",
  147. ttuser varchar "ttuser",
  148. type enum {
  149. "0":["Type not set"],
  150. "1":["problem", "问题"],
  151. "2":["resolution", "恢复"],
  152. "7":["ISM new alarm", "问题"],
  153. "8":["ISM old alarm", "恢复"]} "类型",
  154. url varchar "url",
  155. x733corrnotif varchar "x733corrnotif",
  156. x733eventtype int "x733eventtype",
  157. x733probablecause int "x733probablecause",
  158. x733specificprob varchar "x733specificprob",
  159. keys( identifier ) ,
  160. index( acknowledged,alertgroup,alertkey,bsm_classname,bsm_identity,bsm_subidentity,businessimpact,causetype,ciid,cmdbenrichstatus,corrscore,customer,domaingroup,domainsubgroup,emailstatus,eventid,eventtype,expiretime,extendedattr,firstoccurrence,flash,grade,identifier,internallast,lastoccurrence,localnodealias,localobjrelate,localpriobj,localrootobj,localsecobj,localtertobj,location,logticket,manager,market,netype,nmoscausetype,nmosdomainname,nmosentityid,nmoseventmap,nmosmanagedstatus,nmosobjinst,nmosserial,node,nodealias,nodename,objectfullname,oldrow,omni_serial,originalseverity,ownergid,owneruid,physicalcard,physicalport,physicalslot,platform,poll,probesubsecondid,processreq,region,remotenodealias,remoteobjrelate,remotepriobj,remoterootobj,remotesecobj,remotetertobj,servername,serverserial,service,servicenowerrorcode,servicenowstate,servicenowsysid,severity,siteid,statechange,summary,suppressescl,tally,tasklist,ticketstatus,ttnumber,ttstate,ttuser,type,url,x733corrnotif,x733eventtype,x733probablecause,x733specificprob)
  161. )with ttl=366 day , autosearch=true , version=true , key=manu, alias='` + aliasname + `' , nickname='` + aliasname + `' ;
  162. `).Do()
  163. if err != nil {
  164. log.Error(fmt.Sprint("query count error:", err, atomic.AddInt32(&errcount, 1)))
  165. }
  166. log.Info("检查历史数据")
  167. result, err := client.Query("select count(id) as count FROM " + classname + " LIMIT -1").Do()
  168. if err != nil {
  169. log.Error(fmt.Sprint("query count error:", err, atomic.AddInt32(&errcount, 1)))
  170. } else {
  171. log.Info(fmt.Sprintf("query count return: %#v", result.Data[0]["count"]))
  172. }
  173. log.Info("清除历史数据")
  174. _, err = client.Query("delete FROM " + classname + "").Do()
  175. if err != nil {
  176. log.Error(fmt.Sprint("delete error:", err, atomic.AddInt32(&errcount, 1)))
  177. } else {
  178. log.Info(fmt.Sprintf("delete ok"))
  179. }
  180. st := time.Now()
  181. go func() {
  182. t := time.NewTicker(1 * time.Second)
  183. for {
  184. select {
  185. case <-t.C:
  186. log.Info(time.Since(st), "插入数据", "req:", atomic.AddInt32(&reqcount, 0), "ok:", atomic.AddInt32(&okcount, 0), "confirm:", atomic.AddInt32(&confirmcount, 0), "err:", atomic.AddInt32(&errcount, 0))
  187. }
  188. }
  189. }()
  190. for i := 0; i < client.Config().PoolSize*1000; i++ {
  191. go func(i int) {
  192. for {
  193. identifier := fmt.Sprint("222.129.134.178.1.3.6.1.4.1.2011.5.25.219.2.5.665.25.219.2.5.6.", i, ".", atomic.AddInt32(&n, 1))
  194. mql := `insert into ` + classname + ` (identifier, severity, serverserial, domaingroup, lastoccurrence, maintenance, type, agent, firstoccurrence, omni_class, nodealias, alertgroup, node, manager, _tokens, originalseverity, summary) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict update firstoccurrence=firstoccurrence, servicenowstate=servicenowstate, emailstatus=emailstatus, tally=tally+1`
  195. params := []interface{}{identifier, 5, 111, "domaingroup", 1622249715000, 1, 2, "huawei mttrapd", 1622211715023, "300", "222.129.134.178", "huawei_power_func", "222.129.134.178", "MTTrapd Probe", map[string]interface{}{"_raw": "{\"1\":\"SNMP Trap Test.see more:http://www.micmiu.com\",\"1_hex\":\"53 4e 4d 50 20 54 72 61 70 20 54 65 73 74 2e 73 65 65 20 6d 6f 72 65 3a 68 74 74 70 3a 2f 2f 77 77 77 2e 6d 69 63 6d 69 75 2e 63 6f 6d\",\"1_raw\":\"SNMP Trap Test.see more:http://www.micmiu.com\",\"1_text\":\"SNMP Trap Test.see more:http://www.micmiu.com\",\"2\":\"1000\",\"2_hex\":\"1000\",\"2_raw\":\"1000\",\"2_text\":\"1000\",\"3\":\".1.3.6.1.4.1.2011.5.25.219.2.5.6\",\"3_hex\":\".1.3.6.1.4.1.2011.5.25.219.2.5.6\",\"3_raw\":\".1.3.6.1.4.1.2011.5.25.219.2.5.6\",\"3_text\":\".1.3.6.1.4.1.2011.5.25.219.2.5.6\",\"4\":\"SNMPTEST\",\"4_hex\":\"53 4e 4d 50 54 45 53 54\",\"4_raw\":\"SNMPTEST\",\"4_text\":\"SNMPTEST\",\"5\":10,\"5_hex\":10,\"5_raw\":10,\"5_text\":10,\"6\":\"Sat May 29 08:55:14 CST 2021\",\"6_hex\":\"53 61 74 20 4d 61 79 20 32 39 20 30 38 3a 35 35 3a 31 34 20 43 53 54 20 32 30 32 31\",\"6_raw\":\"Sat May 29 08:55:14 CST 2021\",\"6_text\":\"Sat May 29 08:55:14 CST 2021\",\"IPaddress\":\"222.129.134.178\",\"Node\":\"222.129.134.178\",\"OID1\":\".1.3.6.1.2.1.1.1.1\",\"OID2\":\".1.3.6.1.2.1.1.3.0\",\"OID3\":\".1.3.6.1.6.3.1.1.4.1.0\",\"OID4\":\".1.3.6.1.4.1.2011.2.15.2.1.2.1.1.1.1\",\"OID5\":\".1.3.6.1.4.1.2011.2.15.2.1.2.1.1.1.2\",\"OID6\":\".1.3.6.1.4.1.2011.2.15.2.1.2.1.1.1.3\",\"PeerAddress\":\"222.129.134.178\",\"PeerIPaddress\":\"222.129.134.178\",\"Protocol\":\"udp\",\"ReceivedPort\":9162,\"ReceivedTime\":0,\"ReqId\":176301626,\"SNMP_Version\":\"2c\",\"UpTime\":\"1000\",\"Uptime\":\"8:16:40.00\",\"community\":\"public\",\"contextEngineID\":\"\",\"enterprise\":\"\",\"generic-trap\":0,\"notify\":\".1.3.6.1.4.1.2011.5.25.219.2.5.6\",\"securityEngineID\":\"1234\",\"securityLevel\":\"authPriv\",\"securityName\":\"user\",\"specific-trap\":0}"}, 5, "设备:222.129.134.178电源整体功能恢复!"}
  196. atomic.AddInt32(&reqcount, 1)
  197. result, err := client.Query(mql, params...).Do()
  198. if err != nil {
  199. log.Error(fmt.Sprint("connection", i, err, atomic.AddInt32(&errcount, 1)))
  200. time.Sleep(1 * time.Second)
  201. } else {
  202. atomic.AddInt32(&okcount, 1)
  203. result, err = client.Query("select id,tally FROM "+classname+" where identifier=?", identifier).Do()
  204. if err != nil {
  205. log.Error(fmt.Sprint("connection", i, err, atomic.AddInt32(&errcount, 1)))
  206. time.Sleep(1 * time.Second)
  207. } else {
  208. atomic.AddInt32(&confirmcount, 1)
  209. log.Debug(fmt.Sprint("connection", i, result))
  210. // time.Sleep(1 * time.Second)
  211. }
  212. }
  213. // time.Sleep(3 * time.Second)
  214. }
  215. }(i)
  216. }
  217. time.Sleep(3600 * time.Second)
  218. log.Info("测试结束")
  219. }