mpconn.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package probing
  2. import (
  3. "bytes"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "trial/ping/probing/icmp"
  9. "git.wecise.com/wecise/common/logger"
  10. "github.com/google/uuid"
  11. )
  12. var connm sync.Mutex
  13. var conns = map[string]*mpingconn{}
  14. func MPConn(ipv4 bool, protocol string) *mpingconn {
  15. key := protocol
  16. if ipv4 {
  17. key += "4"
  18. } else {
  19. key += "6"
  20. }
  21. connm.Lock()
  22. mp := conns[key]
  23. if mp == nil {
  24. mp = newMPConn(ipv4, protocol)
  25. conns[key] = mp
  26. }
  27. connm.Unlock()
  28. return mp
  29. }
  30. type mpingconn struct {
  31. *icmp.MPacketConn
  32. mutex sync.Mutex
  33. uuid uuid.UUID
  34. pingidinfo map[int]*mpinfo
  35. }
  36. type mpinfo struct {
  37. host string
  38. ipaddr *net.IPAddr
  39. id int
  40. lastseq int
  41. size int
  42. timeout time.Duration
  43. seqpkt map[int]*icmp.Packet
  44. OnSend func(*icmp.Packet)
  45. OnRecv func(*icmp.Packet)
  46. OnRecvDup func(*icmp.Packet)
  47. OnRecvError func(error)
  48. }
  49. var pingid int32
  50. var pinginfomutex sync.Mutex
  51. var pinginfo = map[string]*mpinfo{}
  52. func getPingInfo(ipaddr *net.IPAddr) *mpinfo {
  53. pinginfomutex.Lock()
  54. defer pinginfomutex.Unlock()
  55. pinfo := pinginfo[ipaddr.String()]
  56. if pinfo == nil {
  57. pinfo = &mpinfo{
  58. ipaddr: ipaddr,
  59. id: int(atomic.AddInt32(&pingid, 1)),
  60. seqpkt: make(map[int]*icmp.Packet),
  61. }
  62. pinginfo[ipaddr.String()] = pinfo
  63. }
  64. return pinfo
  65. }
  66. func newMPConn(ipv4 bool, protocol string) *mpingconn {
  67. mpconn := &mpingconn{
  68. MPacketConn: &icmp.MPacketConn{
  69. IPV4: ipv4,
  70. Protocol: protocol,
  71. Source: "",
  72. Backlog: receive_buffer_count,
  73. TTL: ping_ttl,
  74. },
  75. uuid: uuid.Must(uuid.NewUUID()),
  76. pingidinfo: make(map[int]*mpinfo),
  77. }
  78. mpconn.MPacketConn.OnRecvPacket = mpconn.OnRecvPacket
  79. mpconn.MPacketConn.OnRecvError = mpconn.OnRecvError
  80. return mpconn
  81. }
  82. func (p *mpingconn) Listen() error {
  83. err := p.MPacketConn.Listen()
  84. if err != nil {
  85. return err
  86. }
  87. return nil
  88. }
  89. func (p *mpingconn) Close() error {
  90. err := p.MPacketConn.Close()
  91. if err != nil {
  92. return err
  93. }
  94. return nil
  95. }
  96. func (p *mpingconn) OnRecvPacket(recvpkt *icmp.Packet) {
  97. // fmt.Println("recv", recvpkt)
  98. p.mutex.Lock()
  99. defer p.mutex.Unlock()
  100. pinfo := p.pingidinfo[recvpkt.ID]
  101. if pinfo == nil {
  102. return
  103. }
  104. if !bytes.Equal(p.uuid[:], recvpkt.UUID[:]) {
  105. return
  106. }
  107. _, inflight := pinfo.seqpkt[recvpkt.Seq]
  108. if inflight {
  109. // remove it from the list of sequences we're waiting for so we don't get duplicates.
  110. delete(pinfo.seqpkt, recvpkt.Seq)
  111. if pinfo.OnRecv != nil {
  112. go pinfo.OnRecv(recvpkt)
  113. }
  114. return
  115. }
  116. if pinfo.OnRecvDup != nil {
  117. go pinfo.OnRecvDup(recvpkt)
  118. }
  119. }
  120. func (p *mpingconn) OnRecvError(err error) {
  121. logger.Error(err)
  122. }
  123. func (p *mpingconn) Ping(pinfo *mpinfo) error {
  124. p.mutex.Lock()
  125. if _, has := p.pingidinfo[pinfo.id]; !has {
  126. p.pingidinfo[pinfo.id] = pinfo
  127. }
  128. seq := pinfo.lastseq
  129. pinfo.lastseq++
  130. outpkt := &icmp.Packet{
  131. IPAddr: pinfo.ipaddr,
  132. ID: pinfo.id,
  133. Seq: seq,
  134. UUID: p.uuid,
  135. Nbytes: pinfo.size,
  136. }
  137. pinfo.seqpkt[seq] = outpkt
  138. p.mutex.Unlock()
  139. err := p.SendPacket(outpkt)
  140. if err != nil {
  141. return err
  142. }
  143. if pinfo.OnSend != nil {
  144. pinfo.OnSend(outpkt)
  145. }
  146. // fmt.Println("sent", outpkt)
  147. return nil
  148. }