libf 2 years ago
parent
commit
4847438a10

+ 7 - 8
ping/ping.go

@@ -103,7 +103,7 @@ func main() {
 	input.Init(inputcfg)
 	input.Init(inputcfg)
 	go input.Run()
 	go input.Run()
 	go func() {
 	go func() {
-		// last_count := 0
+		last_count := 0
 		t := time.NewTicker(1 * time.Second)
 		t := time.NewTicker(1 * time.Second)
 		for {
 		for {
 			select {
 			select {
@@ -120,13 +120,12 @@ func main() {
 					s += fmt.Sprintf("%-3d %-20s: %-12s [%12s ~ %-12s] loss %d/%d\n", i, k, v.AvgRtt, v.MinRtt, v.MaxRtt, v.LossCount, v.Count)
 					s += fmt.Sprintf("%-3d %-20s: %-12s [%12s ~ %-12s] loss %d/%d\n", i, k, v.AvgRtt, v.MinRtt, v.MaxRtt, v.LossCount, v.Count)
 				}
 				}
 				input.statinfomutex.Unlock()
 				input.statinfomutex.Unlock()
-				// logger.Info("统计信息更新:", fmt.Sprint("\n", s))
-				// logger.Info(util.FormatDuration(time.Since(starttime)), "已经完成", pingcount, "次Ping操作,",
-				// 	"平均每秒", (int64(pingcount+1) * int64(time.Second) / int64(time.Since(starttime))), "次",
-				// 	"最近一秒", (pingcount - int32(last_count)), "次",
-				// 	"缓存", probing.BufferUsedCount(),
-				// )
-				// last_count = int(pingcount)
+				logger.Info("统计信息更新:", fmt.Sprint("\n", s))
+				logger.Info(util.FormatDuration(time.Since(starttime)), "已经完成", pingcount, "次Ping操作,",
+					"平均每秒", (int64(pingcount+1) * int64(time.Second) / int64(time.Since(starttime))), "次",
+					"最近一秒", (pingcount - int32(last_count)), "次",
+				)
+				last_count = int(pingcount)
 			}
 			}
 		}
 		}
 	}()
 	}()

+ 26 - 28
ping/probing/icmp/packetconn.go

@@ -37,7 +37,7 @@ var (
 	ipv6Proto = map[string]string{"icmp": "ip6:ipv6-icmp", "udp": "udp6"}
 	ipv6Proto = map[string]string{"icmp": "ip6:ipv6-icmp", "udp": "udp6"}
 )
 )
 
 
-func Listen(ipv4 bool, protocol string, source string) (conn PacketConn, err error) {
+func listen(ipv4 bool, protocol string, source string) (conn PacketConn, err error) {
 	if ipv4 {
 	if ipv4 {
 		var c icmpv4Conn
 		var c icmpv4Conn
 		c.c, err = icmp.ListenPacket(ipv4Proto[protocol], source)
 		c.c, err = icmp.ListenPacket(ipv4Proto[protocol], source)
@@ -79,7 +79,8 @@ type MPacketConn struct {
 	Protocol     string
 	Protocol     string
 	Source       string
 	Source       string
 	Backlog      int
 	Backlog      int
-	OnRecvPacket func(addr net.Addr, pkt *Packet)
+	TTL          int
+	OnRecvPacket func(pkt *Packet)
 	OnRecvError  func(error)
 	OnRecvError  func(error)
 	mutex        sync.Mutex
 	mutex        sync.Mutex
 	conn         PacketConn
 	conn         PacketConn
@@ -88,12 +89,19 @@ type MPacketConn struct {
 }
 }
 
 
 func (mp *MPacketConn) Listen() error {
 func (mp *MPacketConn) Listen() error {
-	conn, err := Listen(mp.IPV4, mp.Protocol, mp.Source)
+	mp.mutex.Lock()
+	defer mp.mutex.Unlock()
+
+	if mp.conn != nil {
+		return nil
+	}
+
+	conn, err := listen(mp.IPV4, mp.Protocol, mp.Source)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
-	conn.SetTTL(64)
+	conn.SetTTL(mp.TTL)
 	if err := conn.SetFlagTTL(); err != nil {
 	if err := conn.SetFlagTTL(); err != nil {
 		return err
 		return err
 	}
 	}
@@ -158,7 +166,7 @@ func (mp *MPacketConn) recvICMP() {
 	}
 	}
 }
 }
 
 
-func (mp *MPacketConn) SendPacket(pkt *Packet, addr *net.IPAddr) error {
+func (mp *MPacketConn) SendPacket(pkt *Packet) error {
 	if mp.conn == nil {
 	if mp.conn == nil {
 		return ENOLISTENER
 		return ENOLISTENER
 	}
 	}
@@ -166,9 +174,9 @@ func (mp *MPacketConn) SendPacket(pkt *Packet, addr *net.IPAddr) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	var dst net.Addr = addr
+	var dst net.Addr = pkt.IPAddr
 	if mp.Protocol == "udp" {
 	if mp.Protocol == "udp" {
-		dst = &net.UDPAddr{IP: addr.IP, Zone: addr.Zone}
+		dst = &net.UDPAddr{IP: pkt.IPAddr.IP, Zone: pkt.IPAddr.Zone}
 	}
 	}
 	for {
 	for {
 		select {
 		select {
@@ -252,28 +260,18 @@ func (mp *MPacketConn) processEchoReply(pkt *icmp.Echo, recv *recvPkt) error {
 		return nil
 		return nil
 	}
 	}
 
 
-	inPkt := &Packet{
-		Addr:   recv.addr.String(),
-		ID:     fullid,
-		Seq:    fullseq,
-		Nbytes: recv.nbytes,
-		TTL:    recv.ttl,
-		Rtt:    recv.recvtime.Sub(time.Unix(0, sendtime)),
-	}
-
 	// fmt.Printf("%s %d bytes from %s: icmp_seq=%d time=%v\n",
 	// fmt.Printf("%s %d bytes from %s: icmp_seq=%d time=%v\n",
-	// 	time.Now().Format("15:04:05.000"), inPkt.Nbytes, inPkt.IPAddr, inPkt.Seq, inPkt.Rtt)
-
-	p.mutex.Lock()
-	chpkt, inflight := pinfo.seqpkt[fullseq]
-	if inflight {
-		// remove it from the list of sequences we're waiting for so we don't get duplicates.
-		delete(pinfo.seqpkt, fullseq)
-	}
-	p.mutex.Unlock()
-
-	if chpkt != nil {
-		chpkt <- inPkt
+	// 	time.Now().Format("15:04:05.000"), recv.nbytes, recv.addr, fullseq, recv.recvtime.Sub(time.Unix(0, sendtime)))
+
+	if mp.OnRecvPacket != nil {
+		mp.OnRecvPacket(&Packet{
+			IPAddr: netAddrToIPAddr(recv.addr),
+			ID:     fullid,
+			Seq:    fullseq,
+			Nbytes: recv.nbytes,
+			TTL:    recv.ttl,
+			Rtt:    recv.recvtime.Sub(time.Unix(0, sendtime)),
+		})
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 2 - 6
ping/probing/icmp/utils.go

@@ -12,13 +12,9 @@ import (
 func netAddrToIPAddr(a net.Addr) *net.IPAddr {
 func netAddrToIPAddr(a net.Addr) *net.IPAddr {
 	switch v := a.(type) {
 	switch v := a.(type) {
 	case *net.UDPAddr:
 	case *net.UDPAddr:
-		if ip := v.IP.To4(); ip != nil {
-			return ip
-		}
+		return &net.IPAddr{IP: v.IP, Zone: v.Zone}
 	case *net.IPAddr:
 	case *net.IPAddr:
-		if ip := v.IP.To4(); ip != nil {
-			return ip
-		}
+		return v
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 0 - 53
ping/probing/logger.go

@@ -1,53 +0,0 @@
-package probing
-
-import "log"
-
-type Logger interface {
-	Fatalf(format string, v ...interface{})
-	Errorf(format string, v ...interface{})
-	Warnf(format string, v ...interface{})
-	Infof(format string, v ...interface{})
-	Debugf(format string, v ...interface{})
-}
-
-type StdLogger struct {
-	Logger *log.Logger
-}
-
-func (l StdLogger) Fatalf(format string, v ...interface{}) {
-	l.Logger.Printf("FATAL: "+format, v...)
-}
-
-func (l StdLogger) Errorf(format string, v ...interface{}) {
-	l.Logger.Printf("ERROR: "+format, v...)
-}
-
-func (l StdLogger) Warnf(format string, v ...interface{}) {
-	l.Logger.Printf("WARN: "+format, v...)
-}
-
-func (l StdLogger) Infof(format string, v ...interface{}) {
-	l.Logger.Printf("INFO: "+format, v...)
-}
-
-func (l StdLogger) Debugf(format string, v ...interface{}) {
-	l.Logger.Printf("DEBUG: "+format, v...)
-}
-
-type NoopLogger struct {
-}
-
-func (l NoopLogger) Fatalf(format string, v ...interface{}) {
-}
-
-func (l NoopLogger) Errorf(format string, v ...interface{}) {
-}
-
-func (l NoopLogger) Warnf(format string, v ...interface{}) {
-}
-
-func (l NoopLogger) Infof(format string, v ...interface{}) {
-}
-
-func (l NoopLogger) Debugf(format string, v ...interface{}) {
-}

+ 0 - 22
ping/probing/message.go

@@ -1,22 +0,0 @@
-package probing
-
-import (
-	"net"
-	"time"
-)
-
-// Packet represents a received and processed ICMP echo packet.
-type xPacket struct {
-	// IPAddr is the address of the host being pinged.
-	IPAddr *net.IPAddr
-	// ID is the ICMP identifier.
-	ID int
-	// Seq is the ICMP sequence number.
-	Seq int
-	// TTL is the Time To Live on the packet.
-	TTL int
-	// NBytes is the number of bytes in the message.
-	Nbytes int
-	// Rtt is the round-trip time it took to ping.
-	Rtt time.Duration
-}

+ 85 - 279
ping/probing/mpconn.go

@@ -1,38 +1,40 @@
 package probing
 package probing
 
 
 import (
 import (
-	"encoding/binary"
 	"fmt"
 	"fmt"
 	"net"
 	"net"
 	"sync"
 	"sync"
 	"sync/atomic"
 	"sync/atomic"
-	"syscall"
 	"time"
 	"time"
 	"trial/ping/probing/icmp"
 	"trial/ping/probing/icmp"
 
 
-	"golang.org/x/net/ipv4"
-	"golang.org/x/net/ipv6"
+	"git.wecise.com/wecise/common/logger"
 )
 )
 
 
-type recvPkt struct {
-	recvtime time.Time
-	addr     net.Addr
-	bytes    []byte
-	nbytes   int
-	ttl      int
+var connm sync.Mutex
+var conns = map[string]*mpingconn{}
+
+func MPConn(ipv4 bool, protocol string) *mpingconn {
+	key := protocol
+	if ipv4 {
+		key += "4"
+	} else {
+		key += "6"
+	}
+	connm.Lock()
+	mp := conns[key]
+	if mp == nil {
+		mp = newMPConn(ipv4, protocol)
+		conns[key] = mp
+	}
+	connm.Unlock()
+	return mp
 }
 }
 
 
 type mpingconn struct {
 type mpingconn struct {
-	mutex        sync.Mutex
-	ipv4         bool
-	protocol     string
-	Source       string
-	Conn         icmp.PacketConn
-	done         chan interface{}
-	recvchan     chan *recvPkt
-	pinghostinfo map[string]*mpinfo
-	pingidinfo   map[int]*mpinfo
-	OnError      func(error)
+	*icmp.MPacketConn
+	mutex      sync.Mutex
+	pingidinfo map[int]*mpinfo
 }
 }
 
 
 type mpinfo struct {
 type mpinfo struct {
@@ -42,93 +44,13 @@ type mpinfo struct {
 	lastseq     int
 	lastseq     int
 	size        int
 	size        int
 	timeout     time.Duration
 	timeout     time.Duration
-	seqpkt      map[int]chan *Packet
-	OnSend      func(*Packet)
-	OnRecv      func(*Packet)
-	OnRecvDup   func(*Packet)
+	seqpkt      map[int]*icmp.Packet
+	OnSend      func(*icmp.Packet)
+	OnRecv      func(*icmp.Packet)
+	OnRecvDup   func(*icmp.Packet)
 	OnRecvError func(error)
 	OnRecvError func(error)
 }
 }
 
 
-var mpconn = newMPConn()
-
-func newMPConn() *mpingconn {
-	mpconn := &mpingconn{
-		ipv4:         true,
-		protocol:     "udp",
-		Source:       "",
-		done:         make(chan interface{}),
-		recvchan:     make(chan *recvPkt, receive_buffer_count),
-		pinghostinfo: make(map[string]*mpinfo),
-		pingidinfo:   make(map[int]*mpinfo),
-	}
-	return mpconn
-}
-
-func (p *mpingconn) listen() (icmp.PacketConn, error) {
-	p.mutex.Lock()
-	defer p.mutex.Unlock()
-
-	if p.Conn != nil {
-		return p.Conn, nil
-	}
-
-	conn, err := icmp.Listen(p.ipv4, p.protocol, p.Source)
-	if err != nil {
-		return nil, err
-	}
-	p.Conn = conn
-
-	conn.SetTTL(64)
-	if err := conn.SetFlagTTL(); err != nil {
-		return nil, err
-	}
-
-	go p.recvICMP()
-	go p.processRecvPacket()
-
-	return p, nil
-}
-
-func (p *mpingconn) Close() error {
-	p.mutex.Lock()
-	defer p.mutex.Unlock()
-
-	open := true
-	select {
-	case _, open = <-p.done:
-	default:
-	}
-
-	if open {
-		close(p.done)
-	}
-	return p.Conn.Close()
-}
-
-func (p *mpingconn) ICMPRequestType() icmp.Type {
-	return p.Conn.ICMPRequestType()
-}
-
-func (p *mpingconn) ReadFrom(b []byte) (n int, ttl int, src net.Addr, err error) {
-	return p.Conn.ReadFrom(b)
-}
-
-func (p *mpingconn) WriteTo(b []byte, dst net.Addr) (int, error) {
-	return p.Conn.WriteTo(b, dst)
-}
-
-func (p *mpingconn) SetReadDeadline(t time.Time) error {
-	return p.Conn.SetReadDeadline(t)
-}
-
-func (p *mpingconn) SetFlagTTL() error {
-	return p.Conn.SetFlagTTL()
-}
-
-func (p *mpingconn) SetTTL(ttl int) {
-	p.Conn.SetTTL(ttl)
-}
-
 var pingid int32
 var pingid int32
 var ETIMEDOUT error = fmt.Errorf("timeout")
 var ETIMEDOUT error = fmt.Errorf("timeout")
 
 
@@ -137,213 +59,97 @@ func newPingInfo(host string, ipaddr *net.IPAddr, size int, timeout time.Duratio
 		host:    host,
 		host:    host,
 		ipaddr:  ipaddr,
 		ipaddr:  ipaddr,
 		id:      int(atomic.AddInt32(&pingid, 1)),
 		id:      int(atomic.AddInt32(&pingid, 1)),
-		seqpkt:  make(map[int]chan *Packet),
+		seqpkt:  make(map[int]*icmp.Packet),
 		size:    size,
 		size:    size,
 		timeout: timeout,
 		timeout: timeout,
 	}
 	}
 }
 }
 
 
-func (p *mpingconn) ping(host string, ipaddr *net.IPAddr, size int, timeout time.Duration, onSend func(*Packet), onRecv func(*Packet)) error {
-	p.mutex.Lock()
-	var pinfo *mpinfo
-	var has bool
-	if pinfo, has = p.pinghostinfo[ipaddr.String()]; !has {
-		pinfo = newPingInfo(host, ipaddr, size, timeout)
-		p.pinghostinfo[ipaddr.String()] = pinfo
-		p.pingidinfo[pinfo.id] = pinfo
-	}
-	seq := pinfo.lastseq
-	pinfo.lastseq++
-	recvpkt := make(chan *Packet, 1)
-	pinfo.seqpkt[seq] = recvpkt
-	p.mutex.Unlock()
+func newMPConn(ipv4 bool, protocol string) *mpingconn {
+	mpconn := &mpingconn{
+		MPacketConn: &icmp.MPacketConn{
+			IPV4:     ipv4,
+			Protocol: protocol,
+			Source:   "",
+			Backlog:  10,
+			TTL:      64,
+		},
+		pingidinfo: make(map[int]*mpinfo),
+	}
+	mpconn.MPacketConn.OnRecvPacket = mpconn.OnRecvPacket
+	mpconn.MPacketConn.OnRecvError = mpconn.OnRecvError
+	return mpconn
+}
 
 
-	msgBytes, err := icmp.BuildEchoRequestMessage(pinfo.id, seq, size, p.ICMPRequestType())
+func (p *mpingconn) Listen() error {
+	err := p.MPacketConn.Listen()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
+	return nil
+}
 
 
-	err = p.sendICMP(msgBytes, ipaddr)
+func (p *mpingconn) Close() error {
+	err := p.MPacketConn.Close()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	outpkt := &Packet{
-		Nbytes: len(msgBytes),
-		Host:   host,
-		IPAddr: ipaddr,
-		Seq:    seq,
-		ID:     pinfo.id,
-	}
-	if onSend != nil {
-		onSend(outpkt)
-	}
-	go func(onRecv func(*Packet), recvpkt chan *Packet) {
-		t := time.NewTimer(timeout)
-		select {
-		case <-t.C:
-		case inpkt := <-recvpkt:
-			if onRecv != nil {
-				onRecv(inpkt)
-			}
-		case <-p.done:
-		}
-		// clear();
-	}(onRecv, recvpkt)
 	return nil
 	return nil
 }
 }
 
 
-func (p *mpingconn) sendICMP(msgBytes []byte, addr *net.IPAddr) error {
-	var dst net.Addr = addr
-	if p.protocol == "udp" {
-		dst = &net.UDPAddr{IP: addr.IP, Zone: addr.Zone}
-	}
-	for {
-		select {
-		case <-p.done:
-			return nil
-		default:
-		}
-		if _, err := p.Conn.WriteTo(msgBytes, dst); err != nil {
-			if neterr, ok := err.(*net.OpError); ok {
-				if neterr.Err == syscall.ENOBUFS {
-					if p.OnError != nil {
-						p.OnError(neterr.Err)
-					} else {
-						fmt.Println("缓存不够,发送失败,重发")
-					}
-					continue
-				}
-			}
-			return err
-		} else {
-			return nil
-		}
-	}
-}
+func (p *mpingconn) OnRecvPacket(recvpkt *icmp.Packet) {
 
 
-func (p *mpingconn) recvICMP() error {
-	bytes := make([]byte, 65536)
-	for {
-		select {
-		case <-p.done:
-			return nil
-		default:
-			var n, ttl int
-			var addr net.Addr
-			var err error
-			n, ttl, addr, err = p.Conn.ReadFrom(bytes)
-			if err != nil {
-				if neterr, ok := err.(*net.OpError); ok {
-					if neterr.Timeout() {
-						// Read timeout
-						continue
-					}
-				}
-				return err
-			}
-			bs := make([]byte, n)
-			copy(bs, bytes[:n])
+	// fmt.Println("recv", recvpkt)
 
 
-			select {
-			case <-p.done:
-				return nil
-			case p.recvchan <- &recvPkt{recvtime: time.Now(), addr: addr, bytes: bs, nbytes: n, ttl: ttl}:
-			}
-		}
+	p.mutex.Lock()
+	defer p.mutex.Unlock()
+	pinfo := p.pingidinfo[recvpkt.ID]
+	if pinfo == nil {
+		return
 	}
 	}
-}
-
-func (p *mpingconn) processRecvPacket() {
-	for pkt := range p.recvchan {
-		if len(p.recvchan) > cap(p.recvchan)*9/10 {
-			fmt.Printf("receive buffer full")
-		}
-		err := p.processPacket(pkt)
-		if err != nil {
-			if p.OnError != nil {
-				p.OnError(err)
-			} else {
-				fmt.Println(err)
-			}
+	_, inflight := pinfo.seqpkt[recvpkt.Seq]
+	if inflight {
+		// remove it from the list of sequences we're waiting for so we don't get duplicates.
+		delete(pinfo.seqpkt, recvpkt.Seq)
+		if pinfo.OnRecv != nil {
+			go pinfo.OnRecv(recvpkt)
 		}
 		}
+		return
 	}
 	}
-}
-
-var count = 0
-
-func (p *mpingconn) processPacket(recv *recvPkt) error {
-	var proto int
-	if p.ipv4 {
-		proto = protocolICMP
-	} else {
-		proto = protocolIPv6ICMP
-	}
-
-	// fmt.Println(count, "from", recv.addr.String(), "bytes", recv.bytes)
-
-	var m *icmp.Message
-	var err error
-	if m, err = icmp.ParseMessage(proto, recv.bytes); err != nil {
-		return fmt.Errorf("error parsing icmp message: %w", err)
-	}
-
-	if m.Type != ipv4.ICMPTypeEchoReply && m.Type != ipv6.ICMPTypeEchoReply {
-		// Not an echo reply, ignore it
-		return nil
-	}
-
-	switch pkt := m.Body.(type) {
-	case *icmp.Echo:
-		return p.processEchoReply(pkt, recv)
-	default:
-		// Very bad, not sure how this can happen
-		return fmt.Errorf("invalid ICMP echo reply; type: '%T', '%v'", pkt, pkt)
+	if pinfo.OnRecvDup != nil {
+		go pinfo.OnRecvDup(recvpkt)
 	}
 	}
 }
 }
 
 
-func (p *mpingconn) processEchoReply(pkt *icmp.Echo, recv *recvPkt) error {
-	if len(pkt.Data) < 24 {
-		return nil
-	}
-
-	sendtime := int64(binary.BigEndian.Uint64(pkt.Data[:8]))
-	fullseq := int(binary.BigEndian.Uint64(pkt.Data[8:16]))
-	fullid := int(binary.BigEndian.Uint64(pkt.Data[16:24]))
-
-	if fullid%65536 != pkt.ID || fullseq%65536 != pkt.Seq {
-		return nil
-	}
+func (p *mpingconn) OnRecvError(err error) {
+	logger.Error(err)
+}
 
 
+func (p *mpingconn) Ping(pinfo *mpinfo) error {
 	p.mutex.Lock()
 	p.mutex.Lock()
-	pinfo := p.pingidinfo[fullid]
-	p.mutex.Unlock()
-	if pinfo == nil {
-		return nil
+	if _, has := p.pingidinfo[pinfo.id]; !has {
+		p.pingidinfo[pinfo.id] = pinfo
 	}
 	}
-
-	inPkt := &Packet{
-		Host:   pinfo.host,
+	seq := pinfo.lastseq
+	pinfo.lastseq++
+	outpkt := &icmp.Packet{
 		IPAddr: pinfo.ipaddr,
 		IPAddr: pinfo.ipaddr,
 		ID:     pinfo.id,
 		ID:     pinfo.id,
-		Seq:    fullseq,
-		Nbytes: recv.nbytes,
-		TTL:    recv.ttl,
-		Rtt:    recv.recvtime.Sub(time.Unix(0, sendtime)),
-	}
-
-	// fmt.Printf("%s %d bytes from %s: icmp_seq=%d time=%v\n",
-	// 	time.Now().Format("15:04:05.000"), inPkt.Nbytes, inPkt.IPAddr, inPkt.Seq, inPkt.Rtt)
-
-	p.mutex.Lock()
-	chpkt, inflight := pinfo.seqpkt[fullseq]
-	if inflight {
-		// remove it from the list of sequences we're waiting for so we don't get duplicates.
-		delete(pinfo.seqpkt, fullseq)
+		Seq:    seq,
+		Nbytes: pinfo.size,
 	}
 	}
+	pinfo.seqpkt[seq] = outpkt
 	p.mutex.Unlock()
 	p.mutex.Unlock()
 
 
-	if chpkt != nil {
-		chpkt <- inPkt
+	err := p.SendPacket(outpkt)
+	if err != nil {
+		return err
 	}
 	}
+	if pinfo.OnSend != nil {
+		pinfo.OnSend(outpkt)
+	}
+
+	// fmt.Println("sent", outpkt)
+
 	return nil
 	return nil
 }
 }

+ 48 - 34
ping/probing/ping.go

@@ -53,14 +53,15 @@ package probing
 
 
 import (
 import (
 	"errors"
 	"errors"
-	"log"
 	"math"
 	"math"
 	"math/rand"
 	"math/rand"
 	"net"
 	"net"
 	"sync"
 	"sync"
 	"sync/atomic"
 	"sync/atomic"
 	"time"
 	"time"
+	"trial/ping/probing/icmp"
 
 
+	"git.wecise.com/wecise/common/logger"
 	"git.wecise.com/wecise/common/matrix/cfg"
 	"git.wecise.com/wecise/common/matrix/cfg"
 	"github.com/google/uuid"
 	"github.com/google/uuid"
 	"golang.org/x/sync/errgroup"
 	"golang.org/x/sync/errgroup"
@@ -69,8 +70,7 @@ import (
 var mcfg = cfg.MConfig()
 var mcfg = cfg.MConfig()
 var receive_buffer_count = mcfg.GetInt("ping.recv.buf.count", 100)
 var receive_buffer_count = mcfg.GetInt("ping.recv.buf.count", 100)
 var ttl = mcfg.GetInt("ping.ttl", 64)
 var ttl = mcfg.GetInt("ping.ttl", 64)
-var ping_interval_one_host = mcfg.GetDuration("ping.interval.one.host", 200*time.Millisecond)
-var ping_interval_one_conn = mcfg.GetDuration("ping.interval.one.conn", 200*time.Millisecond)
+var ping_interval = mcfg.GetDuration("ping.interval", 1000*time.Millisecond)
 var concurlimit_ping = mcfg.GetInt("concurlimit.ping", 100)
 var concurlimit_ping = mcfg.GetInt("concurlimit.ping", 100)
 var concurlimit_send = mcfg.GetInt("concurlimit.send", 10)
 var concurlimit_send = mcfg.GetInt("concurlimit.send", 10)
 var concurchan_ping = make(chan struct{}, concurlimit_ping)
 var concurchan_ping = make(chan struct{}, concurlimit_ping)
@@ -81,10 +81,8 @@ var lastsendtimemutex sync.Mutex
 var lastsendtime = map[string]time.Time{}
 var lastsendtime = map[string]time.Time{}
 
 
 const (
 const (
-	timeSliceLength  = 8
-	trackerLength    = len(uuid.UUID{})
-	protocolICMP     = 1
-	protocolIPv6ICMP = 58
+	timeSliceLength = 8
+	trackerLength   = len(uuid.UUID{})
 )
 )
 
 
 // New returns a new Pinger struct pointer.
 // New returns a new Pinger struct pointer.
@@ -110,7 +108,6 @@ func New(addr string) *Pinger {
 		protocol:          "udp",
 		protocol:          "udp",
 		awaitingSequences: firstSequence,
 		awaitingSequences: firstSequence,
 		TTL:               64,
 		TTL:               64,
-		logger:            StdLogger{Logger: log.New(log.Writer(), log.Prefix(), log.Flags())},
 	}
 	}
 }
 }
 
 
@@ -209,8 +206,6 @@ type Pinger struct {
 	// protocol is "icmp" or "udp".
 	// protocol is "icmp" or "udp".
 	protocol string
 	protocol string
 
 
-	logger Logger
-
 	TTL int
 	TTL int
 }
 }
 
 
@@ -384,11 +379,6 @@ func (p *Pinger) Privileged() bool {
 	return p.protocol == "icmp"
 	return p.protocol == "icmp"
 }
 }
 
 
-// SetLogger sets the logger to be used to log events from the pinger.
-func (p *Pinger) SetLogger(logger Logger) {
-	p.logger = logger
-}
-
 // SetID sets the ICMP identifier.
 // SetID sets the ICMP identifier.
 func (p *Pinger) SetID(id int) {
 func (p *Pinger) SetID(id int) {
 	p.id = id
 	p.id = id
@@ -431,7 +421,7 @@ func (p *Pinger) Run() error {
 func (p *Pinger) run() error {
 func (p *Pinger) run() error {
 	defer p.finish()
 	defer p.finish()
 
 
-	_, err := mpconn.listen()
+	err := MPConn(p.ipv4, p.protocol).Listen()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -452,11 +442,6 @@ func (p *Pinger) run() error {
 
 
 func (p *Pinger) runLoop() error {
 func (p *Pinger) runLoop() error {
 
 
-	logger := p.logger
-	if logger == nil {
-		logger = NoopLogger{}
-	}
-
 	timeout := time.NewTimer(p.Timeout)
 	timeout := time.NewTimer(p.Timeout)
 	interval := time.NewTimer(0)
 	interval := time.NewTimer(0)
 	timeout.Stop()
 	timeout.Stop()
@@ -498,7 +483,7 @@ func (p *Pinger) ping(received chan interface{}) error {
 		time.Sleep(sleeptime)
 		time.Sleep(sleeptime)
 		lastpingtimemutex.Lock()
 		lastpingtimemutex.Lock()
 		alastpingtime := lastpingtime[p.ipaddr.String()]
 		alastpingtime := lastpingtime[p.ipaddr.String()]
-		sleeptime = ping_interval_one_host - time.Since(alastpingtime)
+		sleeptime = ping_interval - time.Since(alastpingtime)
 		if sleeptime <= 0 {
 		if sleeptime <= 0 {
 			lastpingtime[p.ipaddr.String()] = time.Now()
 			lastpingtime[p.ipaddr.String()] = time.Now()
 		} else {
 		} else {
@@ -512,23 +497,56 @@ func (p *Pinger) ping(received chan interface{}) error {
 		lastpingtimemutex.Unlock()
 		lastpingtimemutex.Unlock()
 	}()
 	}()
 
 
-	return mpconn.ping(p.addr, p.ipaddr, p.Size, p.Timeout, func(pkt *Packet) {
+	pinfo := newPingInfo(p.addr, p.ipaddr, p.Size, p.Timeout)
+	pinfo.OnSend = func(pkt *icmp.Packet) {
 		if p.PacketsSent == 0 {
 		if p.PacketsSent == 0 {
 			p.sequence_base = pkt.Seq
 			p.sequence_base = pkt.Seq
 		}
 		}
 		p.PacketsSent++
 		p.PacketsSent++
-		pkt.Seq -= p.sequence_base
 		if p.OnSend != nil {
 		if p.OnSend != nil {
-			p.OnSend(pkt)
+			p.OnSend(&Packet{
+				Rtt:    pkt.Rtt,
+				IPAddr: pkt.IPAddr,
+				Host:   pinfo.host,
+				Nbytes: pkt.Nbytes,
+				Seq:    pkt.Seq - p.sequence_base,
+				TTL:    pkt.TTL,
+				ID:     pkt.ID,
+			})
+		}
+	}
+	pinfo.OnRecv = func(pkt *icmp.Packet) {
+		inpkt := &Packet{
+			Rtt:    pkt.Rtt,
+			IPAddr: pkt.IPAddr,
+			Host:   pinfo.host,
+			Nbytes: pkt.Nbytes,
+			Seq:    pkt.Seq - p.sequence_base,
+			TTL:    pkt.TTL,
+			ID:     pkt.ID,
 		}
 		}
-	}, func(pkt *Packet) {
-		pkt.Seq -= p.sequence_base
 		if p.OnRecv != nil {
 		if p.OnRecv != nil {
-			p.OnRecv(pkt)
+			p.OnRecv(inpkt)
 		}
 		}
-		p.updateStatistics(pkt)
+		p.updateStatistics(inpkt)
 		received <- nil
 		received <- nil
-	})
+	}
+	pinfo.OnRecvDup = func(pkt *icmp.Packet) {
+		inpkt := &Packet{
+			Rtt:    pkt.Rtt,
+			IPAddr: pkt.IPAddr,
+			Host:   pinfo.host,
+			Nbytes: pkt.Nbytes,
+			Seq:    pkt.Seq - p.sequence_base,
+			TTL:    pkt.TTL,
+			ID:     pkt.ID,
+		}
+		if p.OnDuplicateRecv != nil {
+			p.OnDuplicateRecv(inpkt)
+		}
+		p.updateStatistics(inpkt)
+	}
+	return MPConn(p.ipv4, p.protocol).Ping(pinfo)
 }
 }
 
 
 func (p *Pinger) Stop() {
 func (p *Pinger) Stop() {
@@ -605,7 +623,3 @@ var seed = time.Now().UnixNano()
 func getSeed() int64 {
 func getSeed() int64 {
 	return atomic.AddInt64(&seed, 1)
 	return atomic.AddInt64(&seed, 1)
 }
 }
-
-func BufferUsedCount() int {
-	return len(mpconn.recvchan)
-}