client.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "runtime"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "git.wecise.com/wecise/odb-go/odb"
  10. "git.wecise.com/wecise/odbserver/lib/wgwaiter"
  11. "gitee.com/wecisecode/util/logger"
  12. "gitee.com/wecisecode/util/set/strset"
  13. //"git.wecise.com/wecise/odbserver/odb/upsertupsert"
  14. )
  15. var odbcfg *odb.Config
  16. func init() {
  17. odbpath := os.Getenv("ODBPATH")
  18. if odbpath == "" {
  19. odbpath = "47.92.151.165:11001"
  20. }
  21. odbpaths := strset.New(strings.Split(odbpath, ",")...).List()
  22. keyspace := os.Getenv("KEYSPACE")
  23. if keyspace == "" {
  24. keyspace = "ooootest"
  25. }
  26. odbcfg = &odb.Config{
  27. Hosts: odbpaths,
  28. Keyspace: keyspace,
  29. User: fmt.Sprint("测试客户端"),
  30. Pass: "********",
  31. PoolSize: 20,
  32. Debug: true,
  33. }
  34. }
  35. func getClient() (c odb.Client, err error) {
  36. client, err := odb.NewClient(odbcfg)
  37. if err != nil {
  38. return nil, err
  39. }
  40. return client, nil
  41. }
  42. func main() {
  43. time.Sleep(2 * time.Second)
  44. client, err := getClient()
  45. if err != nil {
  46. logger.Error(err)
  47. }
  48. defer func() {
  49. if err := client.Close(); err != nil {
  50. logger.Error(err)
  51. }
  52. }()
  53. wgg := wgwaiter.NewWaiter()
  54. DEFAULT_MQL_CONCURRENT_NUM := runtime.NumCPU()
  55. for kk := 0; kk < 10; kk++ {
  56. for i := kk * DEFAULT_MQL_CONCURRENT_NUM; i < (kk+1)*DEFAULT_MQL_CONCURRENT_NUM; i++ {
  57. wgg.AddOne()
  58. go func(ii int) {
  59. mtype := ii % 10
  60. tag := "mytag"
  61. switch mtype {
  62. case 1:
  63. tag = "mytag1"
  64. case 2:
  65. tag = "mytag2"
  66. case 3:
  67. tag = "mytag3"
  68. case 4:
  69. tag = "mytag4"
  70. case 5:
  71. tag = "mytag5"
  72. }
  73. _, err := client.Query(`insert into /test/malert_status_memdb (identifier, severity, lastoccurrence, maintenance, msg, type, agent, firstoccurrence, omni_class, nodealias, alertgroup, node, manager, originalseverity, summary, tags) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict update firstoccurrence=firstoccurrence, servicenowstate=servicenowstate, emailstatus=emailstatus, tally=tally+1`, "222.129.134.178.1.3.6.1.4.1.2011.5.25.219.2.5.665.25.219.2.5.6"+strconv.Itoa(ii), 5, 1622249715000, 1, "设备:222.129.134.178电源整体功能恢复!", mtype, "huawei mttrapd #"+strconv.Itoa(ii%100), 1622211715023, "300", "222.129.134.178", "huawei_power_func", "222.129.134.178", "MTTrapd Probe", 5, "设备:222.129.134.178电源整体功能恢复!", tag).Do()
  74. if err != nil {
  75. wgg.Fail(err)
  76. } else {
  77. wgg.Done()
  78. }
  79. }(i)
  80. }
  81. if err := wgg.Wait(30 * time.Second); err != nil {
  82. logger.Error(err)
  83. }
  84. logger.Debug("concurrent insert malert_status_memdb test, loop ", kk)
  85. }
  86. for kk := 0; kk < 10; kk++ {
  87. for i := kk * DEFAULT_MQL_CONCURRENT_NUM; i < (kk+1)*DEFAULT_MQL_CONCURRENT_NUM; i++ {
  88. wgg.AddOne()
  89. go func(ii int) {
  90. mtype := ii % 10
  91. tag := "mytag"
  92. switch mtype {
  93. case 1:
  94. tag = "mytag1"
  95. case 2:
  96. tag = "mytag2"
  97. case 3:
  98. tag = "mytag3"
  99. case 4:
  100. tag = "mytag4"
  101. case 5:
  102. tag = "mytag5"
  103. }
  104. _, err := client.Query(`insert into /test/malert_status_cache (identifier, severity, lastoccurrence, maintenance, msg, type, agent, firstoccurrence, omni_class, nodealias, alertgroup, node, manager, originalseverity, summary, tags) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict update firstoccurrence=firstoccurrence, servicenowstate=servicenowstate, emailstatus=emailstatus, tally=tally+1`, "222.129.134.178.1.3.6.1.4.1.2011.5.25.219.2.5.665.25.219.2.5.6"+strconv.Itoa(ii), 5, 1622249715000, nil, "设备:222.129.134.178电源整体功能恢复!", mtype, "huawei mttrapd #"+strconv.Itoa(ii%100), 1622211715023, "300", "222.129.134.178", "huawei_power_func", "222.129.134.178", "MTTrapd Probe", 5, "设备:222.129.134.178电源整体功能恢复!", tag).Do()
  105. if err != nil {
  106. wgg.Fail(err)
  107. } else {
  108. wgg.Done()
  109. }
  110. }(i)
  111. }
  112. if err := wgg.Wait(30 * time.Second); err != nil {
  113. logger.Error(err)
  114. }
  115. logger.Debug("concurrent insert malert_status_cache test, loop ", kk)
  116. }
  117. for kk := 0; kk < 10; kk++ {
  118. for i := kk * DEFAULT_MQL_CONCURRENT_NUM; i < (kk+1)*DEFAULT_MQL_CONCURRENT_NUM; i++ {
  119. wgg.AddOne()
  120. go func(ii int) {
  121. mtype := (ii + 5) % 10
  122. tag := "mytag"
  123. switch mtype {
  124. case 1:
  125. tag = "mytag1"
  126. case 2:
  127. tag = "mytag2"
  128. case 3:
  129. tag = "mytag3"
  130. case 4:
  131. tag = "mytag4"
  132. case 5:
  133. tag = "mytag5"
  134. }
  135. if mtype == 0 {
  136. _, err = client.Query(`insert into /test/malert_status_cache (identifier, severity, lastoccurrence, maintenance, msg, type, agent, firstoccurrence, omni_class, nodealias, alertgroup, node, manager, originalseverity, summary, tags) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict update firstoccurrence=firstoccurrence, servicenowstate=servicenowstate, emailstatus=emailstatus, tally=tally+1`, "222.129.134.178.1.3.6.1.4.1.2011.5.25.219.2.5.665.25.219.2.5.6"+strconv.Itoa(ii+100000), 5, 1622249715000, nil, "设备:222.129.134.178电源整体功能恢复!", nil, "huawei mttrapd #"+strconv.Itoa(ii%100), 1622211715023, "300", "222.129.134.178", "huawei_power_func", "222.129.134.178", "MTTrapd Probe", 5, "设备:222.129.134.178电源整体功能恢复!", tag).Do()
  137. } else {
  138. _, err = client.Query(`insert into /test/malert_status_cache (identifier, severity, lastoccurrence, maintenance, msg, type, agent, firstoccurrence, omni_class, nodealias, alertgroup, node, manager, originalseverity, summary, tags) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict update firstoccurrence=firstoccurrence, servicenowstate=servicenowstate, emailstatus=emailstatus, tally=tally+1`, "222.129.134.178.1.3.6.1.4.1.2011.5.25.219.2.5.665.25.219.2.5.6"+strconv.Itoa(ii+100000), 5, 1622249715000, nil, "设备:222.129.134.178电源整体功能恢复!", mtype, "huawei mttrapd #"+strconv.Itoa(ii%100), 1622211715023, "300", "222.129.134.178", "huawei_power_func", "222.129.134.178", "MTTrapd Probe", 5, "设备:222.129.134.178电源整体功能恢复!", tag).Do()
  139. }
  140. if err != nil {
  141. wgg.Fail(err)
  142. } else {
  143. wgg.Done()
  144. }
  145. }(i)
  146. }
  147. if err := wgg.Wait(30 * time.Second); err != nil {
  148. logger.Error(err)
  149. }
  150. logger.Debug("concurrent insert malert_status_cache test, loop ", kk)
  151. }
  152. for kk := 0; kk < 10; kk++ {
  153. for i := kk * DEFAULT_MQL_CONCURRENT_NUM; i < (kk+1)*DEFAULT_MQL_CONCURRENT_NUM; i++ {
  154. wgg.AddOne()
  155. go func(ii int) {
  156. mtype := (ii + 7) % 10
  157. tag := "mytag"
  158. switch mtype {
  159. case 1:
  160. tag = "mytag1"
  161. case 2:
  162. tag = "mytag2"
  163. case 3:
  164. tag = "mytag3"
  165. case 4:
  166. tag = "mytag4"
  167. case 5:
  168. tag = "mytag5"
  169. }
  170. if mtype == 0 {
  171. _, err = client.Query(`insert into /test/malert_status_cache (identifier, severity, lastoccurrence, maintenance, msg, type, agent, firstoccurrence, omni_class, nodealias, alertgroup, node, manager, originalseverity, summary, tags) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict update firstoccurrence=firstoccurrence, servicenowstate=servicenowstate, emailstatus=emailstatus, tally=tally+1`, "222.129.134.178.1.3.6.1.4.1.2011.5.25.219.2.5.665.25.219.2.5.6"+strconv.Itoa(ii+100000), 5, 1622249715000, nil, "设备:222.129.134.178电源整体功能恢复!", nil, "huawei mttrapd #"+strconv.Itoa(ii%100), 1622211715023, "300", "222.129.134.178", "huawei_power_func", "222.129.134.178", "MTTrapd Probe", 5, "设备:222.129.134.178电源整体功能恢复!", tag).Do()
  172. } else {
  173. _, err = client.Query(`insert into /test/malert_status_cache (identifier, severity, lastoccurrence, maintenance, msg, type, agent, firstoccurrence, omni_class, nodealias, alertgroup, node, manager, originalseverity, summary, tags) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict update firstoccurrence=firstoccurrence, servicenowstate=servicenowstate, emailstatus=emailstatus, tally=tally+1`, "222.129.134.178.1.3.6.1.4.1.2011.5.25.219.2.5.665.25.219.2.5.6"+strconv.Itoa(ii+100000), 5, 1622249715000, nil, "设备:222.129.134.178电源整体功能恢复!", mtype, "huawei mttrapd #"+strconv.Itoa(ii%100), 1622211715023, "300", "222.129.134.178", "huawei_power_func", "222.129.134.178", "MTTrapd Probe", 5, "设备:222.129.134.178电源整体功能恢复!", tag).Do()
  174. }
  175. if err != nil {
  176. wgg.Fail(err)
  177. } else {
  178. wgg.Done()
  179. }
  180. }(i)
  181. }
  182. if err := wgg.Wait(30 * time.Second); err != nil {
  183. logger.Error(err)
  184. }
  185. logger.Debug("concurrent insert malert_status_cache test, loop ", kk)
  186. }
  187. }