ping.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "os"
  6. "runtime"
  7. "sort"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "trial/ping/probing"
  13. "trial/ping/utils"
  14. "git.wecise.com/wecise/common/logger"
  15. "git.wecise.com/wecise/common/matrix/cfg"
  16. "git.wecise.com/wecise/common/matrix/util"
  17. "github.com/scylladb/go-set/strset"
  18. )
  19. type task struct {
  20. Server string `json:"server"`
  21. Timeout int `json:"timeout"`
  22. NumberOfPings int `json:"numberofpings"`
  23. PacketInterval int `json:"packetinterval"`
  24. PacketSize int `json:"packetsize"`
  25. TypeOfService int `json:"typeofservice"` // Not used
  26. Retries int `json:"retries"` // Not used
  27. Poll int `json:"poll"`
  28. FailureRetests int `json:"failureretests"` // Not used
  29. RetestInterval int `json:"retestinterval"` // Not used
  30. HostNameLookupPreference string `json:"hostnamelookuppreference"` // Not used
  31. Description string `json:"description"` // Not used
  32. Rule string `json:"rule"`
  33. PerfRule string `json:"perfrule"`
  34. TaskTime int64 `json:"tasktime"`
  35. Indicator string `json:"indicator"`
  36. }
  37. type InputConfig struct {
  38. Poolsize int `toml:"poolsize"`
  39. Domain string `toml:"domain"`
  40. StatInterval int `toml:"stat_interval"`
  41. }
  42. type StatInfo struct {
  43. MinRtt time.Duration
  44. MaxRtt time.Duration
  45. AvgRtt time.Duration
  46. LossCount int
  47. Count int
  48. }
  49. type Input struct {
  50. *InputConfig
  51. ips []string
  52. statinfomutex sync.Mutex
  53. statinfo map[string]*StatInfo
  54. subSize int
  55. workChan chan *task
  56. stopChan chan bool
  57. ds *utils.DataStat
  58. }
  59. var mcfg = cfg.MConfig()
  60. func main() {
  61. input := &Input{}
  62. input.statinfo = map[string]*StatInfo{}
  63. inputcfg := &InputConfig{
  64. Poolsize: 1000,
  65. StatInterval: 600,
  66. }
  67. inputcfg.Poolsize = mcfg.GetInt("poolsize", inputcfg.Poolsize)
  68. fips := func() []string {
  69. xips := mcfg.GetStrings("ip|ping.ip", "")
  70. sips := strset.New()
  71. for _, aips := range xips {
  72. sips.Add(strings.Split(aips, ",")...)
  73. }
  74. bs, _ := util.ReadFile("./hosts.txt")
  75. if len(bs) > 0 {
  76. xips := strings.Split(string(bs), "\n")
  77. for _, aips := range xips {
  78. sips.Add(strings.Split(aips, ",")...)
  79. }
  80. }
  81. return sips.List()
  82. }
  83. go func() {
  84. t := time.NewTicker(5 * time.Second)
  85. for {
  86. <-t.C
  87. input.ips = fips()
  88. }
  89. }()
  90. input.ips = fips()
  91. mcfg.OnChanged(func(cfg cfg.Configure) {
  92. input.ips = fips()
  93. })
  94. input.Init(inputcfg)
  95. go input.Run()
  96. go func() {
  97. // last_count := 0
  98. t := time.NewTicker(1 * time.Second)
  99. for {
  100. select {
  101. case <-t.C:
  102. s := ""
  103. input.statinfomutex.Lock()
  104. ks := []string{}
  105. for k := range input.statinfo {
  106. ks = append(ks, k)
  107. }
  108. sort.Strings(ks)
  109. for i, k := range ks {
  110. v := input.statinfo[k]
  111. s += fmt.Sprintf("%-3d %-20s: %-12s [%12s ~ %-12s] loss %d/%d\n", i, k, v.AvgRtt, v.MinRtt, v.MaxRtt, v.LossCount, v.Count)
  112. }
  113. input.statinfomutex.Unlock()
  114. // logger.Info("统计信息更新:", fmt.Sprint("\n", s))
  115. // logger.Info(util.FormatDuration(time.Since(starttime)), "已经完成", pingcount, "次Ping操作,",
  116. // "平均每秒", (int64(pingcount+1) * int64(time.Second) / int64(time.Since(starttime))), "次",
  117. // "最近一秒", (pingcount - int32(last_count)), "次",
  118. // "缓存", probing.BufferUsedCount(),
  119. // )
  120. // last_count = int(pingcount)
  121. }
  122. }
  123. }()
  124. n := 0
  125. for {
  126. if n >= len(input.ips) {
  127. n = 0
  128. }
  129. ip := input.ips[n]
  130. n++
  131. //
  132. ip = strings.TrimSpace(ip)
  133. if ip != "" {
  134. input.workChan <- &task{
  135. Server: ip,
  136. Timeout: mcfg.GetInt("timeout", 1),
  137. NumberOfPings: mcfg.GetInt("count", 5), // 至少为2,小于2会导致不能正常结束
  138. PacketInterval: mcfg.GetInt("interval", 1000),
  139. PacketSize: mcfg.GetInt("size", 32),
  140. }
  141. }
  142. // time.Sleep(1 * time.Millisecond)
  143. }
  144. }
  145. func (input *Input) Init(config interface{}) error {
  146. input.InputConfig = config.(*InputConfig)
  147. input.workChan = make(chan *task, input.Poolsize)
  148. input.stopChan = make(chan bool)
  149. input.subSize = runtime.GOMAXPROCS(0)
  150. return nil
  151. }
  152. func (input *Input) Run() (err error) {
  153. input.ds = utils.NewDataStat("service", "", input.StatInterval, input.Poolsize*2)
  154. defer input.ds.Close()
  155. go input.ds.Start()
  156. defer close(input.workChan)
  157. var wg sync.WaitGroup
  158. // Start worker
  159. // count := int32(0)
  160. // lpcmutex := sync.Mutex{}
  161. // lastprintcount := time.Now()
  162. for i := 0; i < input.Poolsize; i++ {
  163. go func(n int) {
  164. for t := range input.workChan {
  165. wg.Add(1)
  166. // Run task
  167. if e := input.send(t, n); e != nil {
  168. logger.Error("Send error:", e)
  169. input.ds.Send(1)
  170. } else {
  171. input.ds.Send(0)
  172. }
  173. wg.Done()
  174. // x := atomic.AddInt32(&count, 1)
  175. // lpcmutex.Lock()
  176. // printmsg := time.Since(lastprintcount) >= 1*time.Second
  177. // if printmsg {
  178. // lastprintcount = time.Now()
  179. // }
  180. // lpcmutex.Unlock()
  181. // if printmsg {
  182. // logger.Info("已经完成", x, "个目标的Ping操作")
  183. // }
  184. }
  185. }(i)
  186. }
  187. <-input.stopChan
  188. wg.Wait()
  189. return nil
  190. }
  191. var pingcount int32
  192. var starttime = time.Now()
  193. var printpingcountmutex sync.Mutex
  194. var printpingcounttime = time.Now()
  195. func (input *Input) send(t *task, workerNum int) error {
  196. hostname, _ := os.Hostname()
  197. var (
  198. status = "success"
  199. message = "Pings Complete"
  200. m = map[string]interface{}{
  201. "service": "icmp",
  202. "monitorHost": hostname,
  203. "host": t.Server,
  204. "pollInterval": t.Poll,
  205. "taskTime": t.TaskTime,
  206. "indicator": t.Indicator,
  207. }
  208. )
  209. pinger, err := probing.NewPinger(t.Server)
  210. if err != nil {
  211. status = "failure"
  212. message = err.Error()
  213. sips := strset.New(input.ips...)
  214. sips.Remove(t.Server)
  215. input.ips = sips.List()
  216. logger.Error(err)
  217. } else {
  218. /*
  219. https://stackoverflow.com/questions/41423637/go-ping-library-for-unprivileged-icmp-ping-in-golang/41425527#41425527
  220. This library attempts to send an "unprivileged" ping via UDP. On linux, this must be enabled by setting
  221. sudo sysctl -w net.ipv4.ping_group_range="0 2147483647"
  222. If you do not wish to do this, you can set pinger.SetPrivileged(true) and use setcap to allow your binary using go-ping to bind to raw sockets (or just run as super-user):
  223. setcap cap_net_raw=+ep /bin/goping-binary
  224. getcap /bin/goping-binary to validate
  225. */
  226. pinger.SetPrivileged(false)
  227. pinger.TTL = 64
  228. if t.NumberOfPings > 0 {
  229. pinger.Count = t.NumberOfPings
  230. } else {
  231. pinger.Count = 1
  232. }
  233. if t.PacketInterval > 0 {
  234. pinger.Interval = time.Millisecond * time.Duration(t.PacketInterval)
  235. }
  236. if t.PacketSize > 0 {
  237. pinger.Size = t.PacketSize
  238. }
  239. if t.Timeout > 0 {
  240. pinger.Timeout = time.Second * time.Duration(t.Timeout)
  241. } else {
  242. pinger.Timeout = time.Second * time.Duration(pinger.Count)
  243. }
  244. var (
  245. consecutiveFailures int
  246. )
  247. var recvMutex sync.Mutex
  248. var recvList = map[int]int{}
  249. pinger.OnSend = func(pkt *probing.Packet) {
  250. recvMutex.Lock()
  251. recvList[pkt.Seq]++
  252. recvMutex.Unlock()
  253. input.statinfomutex.Lock()
  254. si := input.statinfo[t.Server]
  255. if si == nil {
  256. si = &StatInfo{}
  257. input.statinfo[t.Server] = si
  258. }
  259. si.Count++
  260. input.statinfomutex.Unlock()
  261. }
  262. pingMsg := fmt.Sprintf("\nWoker %d PING %s (%s):\n", workerNum, pinger.Addr(), pinger.IPAddr())
  263. pinger.OnRecv = func(pkt *probing.Packet) {
  264. recvMutex.Lock()
  265. recvList[pkt.Seq]++
  266. recvMutex.Unlock()
  267. s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v\n",
  268. time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
  269. pingMsg += s
  270. // fmt.Print(s)
  271. // printpingcount
  272. atomic.AddInt32(&pingcount, 1)
  273. }
  274. pinger.OnDuplicateRecv = func(pkt *probing.Packet) {
  275. recvMutex.Lock()
  276. recvList[pkt.Seq]++
  277. recvMutex.Unlock()
  278. s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v (DUP!)\n",
  279. time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
  280. pingMsg += s
  281. // fmt.Print(s)
  282. }
  283. pinger.OnFinish = func(stats *probing.Statistics) {
  284. m["numberPackets"] = stats.PacketsSent
  285. m["averageRTT"] = stats.AvgRtt.Milliseconds()
  286. if stats.PacketsSent == 0 {
  287. m["respondPercent"] = 1
  288. } else {
  289. m["respondPercent"] = stats.PacketsRecv / stats.PacketsSent * 100
  290. }
  291. //m["failureRetests"] = ?
  292. s := fmt.Sprintf("--- %s ping statistics ---\n", stats.Addr)
  293. s += fmt.Sprintf("%d packets transmitted, %d packets received, %v%% packet loss\n",
  294. stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss)
  295. s += fmt.Sprintf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n",
  296. stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt)
  297. pingMsg += s
  298. // fmt.Print(s)
  299. input.statinfomutex.Lock()
  300. si := input.statinfo[t.Server]
  301. if si == nil {
  302. si = &StatInfo{
  303. MinRtt: stats.MinRtt,
  304. MaxRtt: stats.MaxRtt,
  305. AvgRtt: stats.AvgRtt,
  306. LossCount: 0,
  307. Count: 0,
  308. }
  309. input.statinfo[t.Server] = si
  310. }
  311. if stats.MinRtt > 0 && (si.MinRtt == 0 || stats.MinRtt < si.MinRtt) {
  312. si.MinRtt = stats.MinRtt
  313. }
  314. if stats.MaxRtt > si.MaxRtt {
  315. si.MaxRtt = stats.MaxRtt
  316. }
  317. si.AvgRtt = stats.AvgRtt
  318. si.LossCount += stats.PacketsSent - stats.PacketsRecv
  319. input.statinfomutex.Unlock()
  320. }
  321. m["requestTime"] = time.Now().UnixNano() / int64(time.Millisecond)
  322. if err = pinger.Run(); err != nil {
  323. status = "failure"
  324. message = err.Error()
  325. logger.Errorf("Ping error: %v", err)
  326. } else {
  327. }
  328. m["responseTime"] = time.Now().UnixNano() / int64(time.Millisecond)
  329. var failCount, totalFailCount int
  330. recvMutex.Lock()
  331. for i := range recvList {
  332. if recvList[i] < 2 {
  333. failCount++
  334. message = "Packet loss"
  335. totalFailCount++
  336. } else {
  337. failCount = 0
  338. }
  339. if failCount > consecutiveFailures {
  340. consecutiveFailures = failCount
  341. }
  342. pingMsg += fmt.Sprintf("icmp_seq:%d %t\n", i, (recvList[i] >= 2))
  343. }
  344. // logger.Debug(pingMsg)
  345. m["consecutiveFailures"] = consecutiveFailures
  346. if totalFailCount == len(recvList) {
  347. message = "ICMP echo failed"
  348. }
  349. recvMutex.Unlock()
  350. }
  351. // Special
  352. m["returnStatus"] = status
  353. m["message"] = message
  354. b, err := json.Marshal(m)
  355. if err != nil {
  356. logger.Fatal(err)
  357. }
  358. protocolMsg := map[string]interface{}{
  359. "protocol": true,
  360. "output": b,
  361. "attr": map[string]string{"previous_key": input.Domain + "_" + t.Server},
  362. }
  363. b, err = json.Marshal(protocolMsg)
  364. if err != nil {
  365. logger.Fatal(err)
  366. }
  367. return nil
  368. }