ping.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "math"
  6. "math/rand"
  7. "os"
  8. "runtime"
  9. "sort"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "trial/ping/probing"
  15. "trial/ping/probing/icmp"
  16. "trial/ping/utils"
  17. "git.wecise.com/wecise/common/matrix/util"
  18. "github.com/scylladb/go-set/strset"
  19. "github.com/wecisecode/util/cfg"
  20. "github.com/wecisecode/util/logger"
  21. )
  22. type task struct {
  23. Server string `json:"server"`
  24. Timeout int `json:"timeout"`
  25. NumberOfPings int `json:"numberofpings"`
  26. PacketInterval int `json:"packetinterval"`
  27. PacketSize int `json:"packetsize"`
  28. TypeOfService int `json:"typeofservice"` // Not used
  29. Retries int `json:"retries"` // Not used
  30. Poll int `json:"poll"`
  31. FailureRetests int `json:"failureretests"` // Not used
  32. RetestInterval int `json:"retestinterval"` // Not used
  33. HostNameLookupPreference string `json:"hostnamelookuppreference"` // Not used
  34. Description string `json:"description"` // Not used
  35. Rule string `json:"rule"`
  36. PerfRule string `json:"perfrule"`
  37. TaskTime int64 `json:"tasktime"`
  38. Indicator string `json:"indicator"`
  39. }
  40. type InputConfig struct {
  41. Poolsize int `toml:"poolsize"`
  42. Domain string `toml:"domain"`
  43. StatInterval int `toml:"stat_interval"`
  44. }
  45. type StatInfo struct {
  46. MinRtt time.Duration
  47. MaxRtt time.Duration
  48. AvgRtt time.Duration
  49. LossCount int
  50. Count int
  51. }
  52. type Input struct {
  53. *InputConfig
  54. ips []string
  55. statinfomutex sync.Mutex
  56. statinfo map[string]*StatInfo
  57. ipaddrs map[string]string
  58. subSize int
  59. workChan chan *task
  60. stopChan chan bool
  61. ds *utils.DataStat
  62. }
  63. var mcfg = cfg.MConfig()
  64. var protocol = mcfg.GetString("protocol", "udp")
  65. var allipsmutex sync.Mutex
  66. var allips = strset.New()
  67. var keepipset = strset.New()
  68. /*
  69. poolsize=1000 同时ping的不同目标地址个数
  70. count=5 每轮ping的次数,每轮次产生一次统计结果,并切换目标地址
  71. interval=1000 两次ping之间的间隔,单位毫秒
  72. size=64 每次ping发送字节数,48~8192
  73. timeout=2 ping超时,单位秒
  74. detect=1ms 自动发现可ping通地址,并追加到 hosts.txt 文件,设置发现间隔时间需指定单位,如 ms,毫秒,us,微妙等,默认 0,不进行发现处理
  75. */
  76. func main() {
  77. input := &Input{}
  78. input.statinfo = map[string]*StatInfo{}
  79. input.ipaddrs = map[string]string{}
  80. inputcfg := &InputConfig{
  81. Poolsize: 1000,
  82. StatInterval: 600,
  83. }
  84. for _, aips := range mcfg.GetStrings("keep") {
  85. keepipset.Add(strings.Split(aips, ",")...)
  86. }
  87. keepips := keepipset.List()
  88. inputcfg.Poolsize = mcfg.GetInt("poolsize", inputcfg.Poolsize)
  89. if detect_interval := mcfg.GetDuration("detect", 0); detect_interval != 0 {
  90. go detect(detect_interval)
  91. }
  92. fips := func() []string {
  93. xips := mcfg.GetStrings("ip|ping.ip", "")
  94. allipsmutex.Lock()
  95. for _, aips := range xips {
  96. allips.Add(strings.Split(aips, ",")...)
  97. }
  98. bs, _ := util.ReadFile("./hosts.txt")
  99. if len(bs) > 0 {
  100. xips := strings.Split(string(bs), "\n")
  101. for _, aips := range xips {
  102. allips.Add(strings.Split(aips, ",")...)
  103. }
  104. }
  105. if len(detectips) > 0 {
  106. for _, aips := range detectips {
  107. allips.Add(strings.Split(aips, ",")...)
  108. }
  109. util.WriteFile("./hosts.txt", []byte(string(bs)+"\n"+strings.Join(detectips, ",")), true)
  110. detectips = detectips[:0]
  111. }
  112. allips.Remove(keepips...)
  113. sips := allips.List()
  114. // sort.Sort(sort.Reverse(sort.StringSlice(sips)))
  115. rand.Shuffle(len(sips), func(i, j int) { sips[i], sips[j] = sips[j], sips[i] })
  116. allipsmutex.Unlock()
  117. return sips
  118. }
  119. input.ips = append(keepips, fips()...)
  120. if protocol != "udp" && protocol != "icmp" {
  121. protocol = "udp"
  122. }
  123. input.Init(inputcfg)
  124. go input.Run()
  125. go func() {
  126. last_count := 0
  127. t := time.NewTicker(1 * time.Second)
  128. for {
  129. select {
  130. case <-t.C:
  131. s := ""
  132. input.statinfomutex.Lock()
  133. ks := []string{}
  134. for k := range input.statinfo {
  135. ks = append(ks, k)
  136. }
  137. sort.Strings(ks)
  138. for i, k := range ks {
  139. if i > len(ks)-200 {
  140. v := input.statinfo[k]
  141. ip := input.ipaddrs[k]
  142. s += fmt.Sprintf("%-3d %-20s %15s : %-12s [%12s ~ %-12s] loss %d/%d\n", i, k, ip, v.AvgRtt, v.MinRtt, v.MaxRtt, v.LossCount, v.Count)
  143. }
  144. }
  145. input.statinfomutex.Unlock()
  146. logger.Info("统计信息更新:", fmt.Sprint("\n", s))
  147. logger.Info(util.FormatDuration(time.Since(starttime)), "已经完成", pingcount, "次Ping操作,",
  148. "平均每秒", (int64(pingcount+1) * int64(time.Second) / int64(time.Since(starttime))), "次",
  149. "最近一秒", (pingcount - int32(last_count)), "次",
  150. "最大缓冲", icmp.MaxReceiveBufferUsed(),
  151. "并发失控", problem,
  152. )
  153. last_count = int(pingcount)
  154. }
  155. }
  156. }()
  157. n := 0
  158. for {
  159. if n >= len(input.ips) {
  160. n = 0
  161. input.ips = fips()
  162. }
  163. ip := input.ips[n]
  164. n++
  165. //
  166. ip = strings.TrimSpace(ip)
  167. if ip != "" {
  168. atask := &task{
  169. Server: ip,
  170. Timeout: mcfg.GetInt("timeout", 2),
  171. NumberOfPings: mcfg.GetInt("count", 5), // 至少为2,小于2会导致不能正常结束
  172. PacketInterval: mcfg.GetInt("interval", 1000),
  173. PacketSize: mcfg.GetInt("size", 64),
  174. }
  175. input.workChan <- atask
  176. }
  177. // time.Sleep(1 * time.Millisecond)
  178. }
  179. }
  180. func (input *Input) Init(config interface{}) error {
  181. input.InputConfig = config.(*InputConfig)
  182. input.workChan = make(chan *task, input.Poolsize)
  183. input.stopChan = make(chan bool)
  184. input.subSize = runtime.GOMAXPROCS(0)
  185. return nil
  186. }
  187. func (input *Input) Run() (err error) {
  188. input.ds = utils.NewDataStat("service", "", input.StatInterval, input.Poolsize*2)
  189. defer input.ds.Close()
  190. go input.ds.Start()
  191. defer close(input.workChan)
  192. var wg sync.WaitGroup
  193. // Start worker
  194. // count := int32(0)
  195. // lpcmutex := sync.Mutex{}
  196. // lastprintcount := time.Now()
  197. for i := 0; i < input.Poolsize; i++ {
  198. go func(n int) {
  199. for t := range input.workChan {
  200. wg.Add(1)
  201. // Run task
  202. n := 1
  203. if keepipset.Has(t.Server) {
  204. n = math.MaxInt
  205. }
  206. var e error
  207. for ; e == nil && n > 0; n-- {
  208. e = input.send(t, n)
  209. }
  210. if e != nil {
  211. logger.Error("Send error:", e)
  212. input.ds.Send(1)
  213. } else {
  214. input.ds.Send(0)
  215. }
  216. wg.Done()
  217. // x := atomic.AddInt32(&count, 1)
  218. // lpcmutex.Lock()
  219. // printmsg := time.Since(lastprintcount) >= 1*time.Second
  220. // if printmsg {
  221. // lastprintcount = time.Now()
  222. // }
  223. // lpcmutex.Unlock()
  224. // if printmsg {
  225. // logger.Info("已经完成", x, "个目标的Ping操作")
  226. // }
  227. }
  228. }(i)
  229. }
  230. <-input.stopChan
  231. wg.Wait()
  232. return nil
  233. }
  234. var pingcount int32
  235. var problem int32
  236. var starttime = time.Now()
  237. var printpingcountmutex sync.Mutex
  238. var printpingcounttime = time.Now()
  239. func (input *Input) send(t *task, workerNum int) error {
  240. hostname, _ := os.Hostname()
  241. var (
  242. status = "success"
  243. message = "Pings Complete"
  244. m = map[string]interface{}{
  245. "service": "icmp",
  246. "monitorHost": hostname,
  247. "host": t.Server,
  248. "pollInterval": t.Poll,
  249. "taskTime": t.TaskTime,
  250. "indicator": t.Indicator,
  251. }
  252. )
  253. pinger, err := probing.NewPinger(t.Server)
  254. if err != nil {
  255. status = "failure"
  256. message = err.Error()
  257. sips := strset.New(input.ips...)
  258. sips.Remove(t.Server)
  259. input.ips = sips.List()
  260. logger.Error(err)
  261. } else {
  262. /*
  263. https://stackoverflow.com/questions/41423637/go-ping-library-for-unprivileged-icmp-ping-in-golang/41425527#41425527
  264. This library attempts to send an "unprivileged" ping via UDP. On linux, this must be enabled by setting
  265. sudo sysctl -w net.ipv4.ping_group_range="0 2147483647"
  266. If you do not wish to do this, you can set pinger.SetPrivileged(true) and use setcap to allow your binary using go-ping to bind to raw sockets (or just run as super-user):
  267. setcap cap_net_raw=+ep /bin/goping-binary
  268. getcap /bin/goping-binary to validate
  269. */
  270. pinger.SetPrivileged(protocol == "icmp")
  271. if t.NumberOfPings > 0 {
  272. pinger.Count = t.NumberOfPings
  273. } else {
  274. pinger.Count = 1
  275. }
  276. if t.PacketInterval > 0 {
  277. pinger.Interval = time.Millisecond * time.Duration(t.PacketInterval)
  278. }
  279. if t.PacketSize > 0 {
  280. pinger.Size = t.PacketSize
  281. }
  282. if t.Timeout > 0 {
  283. pinger.Timeout = time.Second * time.Duration(t.Timeout)
  284. } else {
  285. pinger.Timeout = time.Second * time.Duration(pinger.Count)
  286. }
  287. var (
  288. consecutiveFailures int
  289. )
  290. var recvList = make([]int32, pinger.Count)
  291. pinger.OnSend = func(pkt *probing.Packet) {
  292. if atomic.AddInt32(&recvList[pkt.Seq], 1) != 1 {
  293. atomic.AddInt32(&problem, 1)
  294. }
  295. input.statinfomutex.Lock()
  296. si := input.statinfo[t.Server]
  297. if si == nil {
  298. si = &StatInfo{}
  299. input.statinfo[t.Server] = si
  300. }
  301. input.ipaddrs[t.Server] = pkt.IPAddr.String()
  302. si.Count++
  303. input.statinfomutex.Unlock()
  304. }
  305. pingMsg := fmt.Sprintf("\nWoker %d PING %s (%s):\n", workerNum, pinger.Addr(), pinger.IPAddr())
  306. pinger.OnRecv = func(pkt *probing.Packet) {
  307. atomic.AddInt32(&recvList[pkt.Seq], 1)
  308. s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v\n",
  309. time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
  310. pingMsg += s
  311. // fmt.Print(s)
  312. // printpingcount
  313. atomic.AddInt32(&pingcount, 1)
  314. }
  315. pinger.OnDuplicateRecv = func(pkt *probing.Packet) {
  316. atomic.AddInt32(&recvList[pkt.Seq], 1)
  317. s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v (DUP!)\n",
  318. time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
  319. pingMsg += s
  320. // fmt.Print(s)
  321. }
  322. pinger.OnFinish = func(stats *probing.Statistics) {
  323. m["numberPackets"] = stats.PacketsSent
  324. m["averageRTT"] = stats.AvgRtt.Milliseconds()
  325. if stats.PacketsSent == 0 {
  326. m["respondPercent"] = 1
  327. } else {
  328. m["respondPercent"] = stats.PacketsRecv / stats.PacketsSent * 100
  329. }
  330. //m["failureRetests"] = ?
  331. s := fmt.Sprintf("--- %s ping statistics ---\n", stats.Addr)
  332. s += fmt.Sprintf("%d packets transmitted, %d packets received, %v%% packet loss\n",
  333. stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss)
  334. s += fmt.Sprintf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n",
  335. stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt)
  336. pingMsg += s
  337. // fmt.Print(s)
  338. input.statinfomutex.Lock()
  339. si := input.statinfo[t.Server]
  340. if si == nil {
  341. si = &StatInfo{
  342. MinRtt: stats.MinRtt,
  343. MaxRtt: stats.MaxRtt,
  344. AvgRtt: stats.AvgRtt,
  345. LossCount: 0,
  346. Count: 0,
  347. }
  348. input.statinfo[t.Server] = si
  349. input.ipaddrs[t.Server] = stats.IPAddr.String()
  350. }
  351. if stats.MinRtt > 0 && (si.MinRtt == 0 || stats.MinRtt < si.MinRtt) {
  352. si.MinRtt = stats.MinRtt
  353. }
  354. if stats.MaxRtt > si.MaxRtt {
  355. si.MaxRtt = stats.MaxRtt
  356. }
  357. si.AvgRtt = stats.AvgRtt
  358. si.LossCount += stats.PacketsSent - stats.PacketsRecv
  359. input.statinfomutex.Unlock()
  360. }
  361. m["requestTime"] = time.Now().UnixNano() / int64(time.Millisecond)
  362. if err = pinger.Run(); err != nil {
  363. status = "failure"
  364. message = err.Error()
  365. logger.Errorf("Ping error: %v", err)
  366. } else {
  367. }
  368. m["responseTime"] = time.Now().UnixNano() / int64(time.Millisecond)
  369. var failCount, totalFailCount int
  370. for i := range recvList {
  371. if recvList[i] < 2 {
  372. failCount++
  373. message = "Packet loss"
  374. totalFailCount++
  375. } else {
  376. failCount = 0
  377. }
  378. if failCount > consecutiveFailures {
  379. consecutiveFailures = failCount
  380. }
  381. pingMsg += fmt.Sprintf("icmp_seq:%d %t\n", i, (recvList[i] >= 2))
  382. }
  383. // logger.Debug(pingMsg)
  384. m["consecutiveFailures"] = consecutiveFailures
  385. if totalFailCount == len(recvList) {
  386. message = "ICMP echo failed"
  387. }
  388. }
  389. // Special
  390. m["returnStatus"] = status
  391. m["message"] = message
  392. b, err := json.Marshal(m)
  393. if err != nil {
  394. logger.Fatal(err)
  395. }
  396. protocolMsg := map[string]interface{}{
  397. "protocol": true,
  398. "output": b,
  399. "attr": map[string]string{"previous_key": input.Domain + "_" + t.Server},
  400. }
  401. b, err = json.Marshal(protocolMsg)
  402. if err != nil {
  403. logger.Fatal(err)
  404. }
  405. return nil
  406. }
  407. var detectips = []string{}
  408. func detect(detect_interval time.Duration) {
  409. t := time.NewTicker(detect_interval)
  410. for {
  411. <-t.C
  412. randip := fmt.Sprintf("%d.%d.%d.%d", 1+rand.Intn(254), 1+rand.Intn(254), 1+rand.Intn(254), 1+rand.Intn(254))
  413. allipsmutex.Lock()
  414. has := allips.Has(randip)
  415. allipsmutex.Unlock()
  416. if !has {
  417. if detectip(randip) {
  418. allipsmutex.Lock()
  419. detectips = append(detectips, randip)
  420. allipsmutex.Unlock()
  421. }
  422. }
  423. }
  424. }
  425. func detectip(addr string) (ok bool) {
  426. pinger, err := probing.NewPinger(addr)
  427. if err != nil {
  428. return false
  429. }
  430. pinger.SetPrivileged(protocol == "icmp")
  431. pinger.Count = 1
  432. pinger.Interval = 1 * time.Nanosecond
  433. pinger.Size = 32
  434. pinger.Timeout = 1 * time.Second
  435. pinger.OnRecv = func(pkt *probing.Packet) {
  436. ok = true
  437. }
  438. pinger.Run()
  439. return
  440. }