libf 2 years ago
parent
commit
af8734478d
13 changed files with 869 additions and 636 deletions
  1. 184 73
      abc/ab_code.c
  2. 7 1
      abc/ab_code.h
  3. 0 2
      abc/log.c
  4. 1 1
      abc/log_test.c
  5. 1 0
      abc/make.sh
  6. 7 0
      ping/host.txt
  7. 34 18
      ping/ping.go
  8. 279 0
      ping/probing/icmp/packetconn.go
  9. 52 0
      ping/probing/icmp/utils.go
  10. 3 13
      ping/probing/packetconn.go
  11. 22 0
      ping/probing/message.go
  12. 185 149
      ping/probing/mpconn.go
  13. 94 379
      ping/probing/ping.go

+ 184 - 73
abc/ab_code.c

@@ -28,6 +28,8 @@
 #include <sys/dir.h>
 #include <sys/stat.h>
 #include <pthread.h>
+#include <sys/types.h>
+#include <regex.h>
 #include "ab_map.h"
 #include "ab_code.h"
 
@@ -39,7 +41,7 @@ static int ablog_keep_hours = 120;
 static FILE *_ablog_file = NULL;
 
 static struct pattern_instance ablog_filename =
-    {"", 0, 0, 0, MAX_FILEPATH_LENGTH, ""};
+    {"", "", "", {0}, 0, 0, 0, MAX_FILEPATH_LENGTH, ""};
 
 static int _ablog_print = 0;
 
@@ -136,6 +138,7 @@ char *code_string(int n)
 
 void clear_coding()
 {
+    clear_pattern_instance(&ablog_filename);
     clear_mapping();
 }
 
@@ -219,6 +222,15 @@ time_t _replace_time_pattern(char *buf, int buf_len, time_t st, char CoS)
     return expiration_t;
 }
 
+void clear_pattern_instance(struct pattern_instance *pi)
+{
+    if (pi->regex[0] != 0)
+    {
+        regfree(&pi->reg);
+        pi->regex[0] = 0;
+    }
+}
+
 int instantiated_pattern(struct pattern_instance *pi)
 {
     time_t vt = vtime();
@@ -232,6 +244,72 @@ int instantiated_pattern(struct pattern_instance *pi)
         replace_string(pi->instance, "[pid]", format_int("%d", getpid()), pi->instance,
                        pi->instance_buf_len);
 
+        if (strcmp(pi->last_pattern, pi->pattern) != 0)
+        {
+            char regex[MAX_FILEPATH_LENGTH];
+            strncpy(regex, pi->pattern, MAX_FILEPATH_LENGTH);
+            // 正则表达式通配符
+            replace_string(regex, "\\", "\\\\", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, ".", "\\.", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "|", "\\|", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "(", "\\(", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, ")", "\\)", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "{", "\\{", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "}", "\\}", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "*", "\\*", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "+", "\\+", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "?", "\\?", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "^", "\\^", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "$", "\\$", regex, MAX_FILEPATH_LENGTH);
+            // 临时转义[]
+            replace_string(regex, "&", "&a", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "[", "&[", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "]", "&]", regex, MAX_FILEPATH_LENGTH);
+            // 文件名支持定义内容转义
+            replace_string(regex, "&[PID&]", "[0-9]*", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "&[pid&]", "[0-9]*", regex,  MAX_FILEPATH_LENGTH);
+            replace_string(regex, "&[YYYY&]", "[0-9]{4}", regex,   MAX_FILEPATH_LENGTH);
+            replace_string(regex, "&[yyyy&]", "[0-9]{4}", regex,   MAX_FILEPATH_LENGTH);
+            replace_string(regex, "&[MM&]", "[0-9]{2}", regex,   MAX_FILEPATH_LENGTH);
+            replace_string(regex, "&[mm&]", "[0-9]{2}", regex,  MAX_FILEPATH_LENGTH);
+            replace_string(regex, "&[DD&]", "[0-9]{2}", regex,  MAX_FILEPATH_LENGTH);
+            replace_string(regex, "&[dd&]", "[0-9]{2}", regex,
+                           MAX_FILEPATH_LENGTH);
+            replace_string(regex, "&[HH&]", "[0-9]{2}", regex,
+                           MAX_FILEPATH_LENGTH);
+            replace_string(regex, "&[hh&]", "[0-9]{2}", regex,
+                           MAX_FILEPATH_LENGTH);
+            replace_string(regex, "&[MI&]", "[0-9]{2}", regex,
+                           MAX_FILEPATH_LENGTH);
+            replace_string(regex, "&[mi&]", "[0-9]{2}", regex,
+                           MAX_FILEPATH_LENGTH);
+            // []转义
+            replace_string(regex, "&[", "\\[", regex, MAX_FILEPATH_LENGTH);
+            replace_string(regex, "&]", "\\]", regex, MAX_FILEPATH_LENGTH);
+            // 恢复临时转义
+            replace_string(regex, "&a", "&", regex, MAX_FILEPATH_LENGTH);
+            if (pi->regex[0] != 0)
+            {
+                regfree(&pi->reg);
+                pi->regex[0] = 0;
+            }
+            if (regex[0] != 0)
+            {
+                int reti = regcomp(&pi->reg, regex, REG_EXTENDED);
+                if (reti != 0)
+                {
+                    char errbuf[256];
+                    regerror(reti, &pi->reg, errbuf, sizeof(errbuf));
+                    log_println("ABLOG_INIT", __FILE__, __LINE__, "ERROR:%s", errbuf);
+                }
+                else
+                {
+                    strncpy(pi->regex, regex, MAX_FILEPATH_LENGTH);
+                }
+            }
+            strncpy(pi->last_pattern, pi->pattern, MAX_FILEPATH_LENGTH);
+        }
+
         pi->expiration_ct = _replace_time_pattern(pi->instance, pi->instance_buf_len, ct, 'C');
         pi->expiration_vt = _replace_time_pattern(pi->instance, pi->instance_buf_len, vt, 'V');
 
@@ -242,6 +320,28 @@ int instantiated_pattern(struct pattern_instance *pi)
     return 0;
 }
 
+int matched_logfile_pattern(struct pattern_instance *pi, char *filepath)
+{
+    if (pi->regex[0] != 0)
+    {
+        int reti = regexec(&pi->reg, filepath, 0, NULL, 0);
+        if (reti == 0)
+        {
+            // log_println("I", __FILE__, __LINE__, "%s match %s", filepath, pi->regex);
+            return 1; // 匹配成功
+        }
+        if (reti == REG_NOMATCH)
+        {
+            // log_println("I", __FILE__, __LINE__, "%s not match %s", filepath, pi->regex);
+            return 0; // 不匹配
+        }
+        char errbuf[256];
+        regerror(reti, &pi->reg, errbuf, sizeof(errbuf));
+        log_println("E", __FILE__, __LINE__, "%s", errbuf);
+    }
+    return 0;
+}
+
 int lock_file(char *filename, int wait)
 {
     int lockfd = open(filename, O_RDWR | O_CREAT, 0666);
@@ -312,16 +412,18 @@ int _ablog_lock(char *dir)
     return ret;
 }
 
-int _ablog_rm_old_files(char *dir, time_t time_before)
+int _ablog_rm_old_files(struct pattern_instance *pi, char *dir, time_t time_before)
 {
-    if (!_ablog_lock(dir))
-    {
-        return -1;
-    }
+    // if (!_ablog_lock(dir))
+    // {
+    //     // log_println("ABLOG_FILE", __FILE__, __LINE__, "lock dir failed: %s", dir);
+    //     return -1;
+    // }
 
     DIR *od = opendir(dir);
     if (od == NULL)
     {
+        // log_println("ABLOG_FILE", __FILE__, __LINE__, "open dir failed: %s", dir);
         return -1;
     }
 
@@ -335,15 +437,18 @@ int _ablog_rm_old_files(char *dir, time_t time_before)
         if (ent->d_name[0] != '.')
         {
             sprintf(file, "%s%s", dir, ent->d_name);
-            stat(file, &file_stat);
-            // printf("%32s  %s  ", namelist[i]->d_name, time_string(file_stat.st_mtime));
-            if (S_ISREG(file_stat.st_mode) && file_stat.st_mtime < time_before)
+            if (matched_logfile_pattern(pi, file))
             {
-                remove(file);
-                n++;
-                log_println("ABLOG_FILE", __FILE__, __LINE__, "remove old log file: %s", file);
+                stat(file, &file_stat);
+                // printf("%32s  %s  ", namelist[i]->d_name, time_string(file_stat.st_mtime));
+                if (S_ISREG(file_stat.st_mode) && file_stat.st_mtime < time_before)
+                {
+                    remove(file);
+                    n++;
+                    log_println("ABLOG_FILE", __FILE__, __LINE__, "remove old log file: %s", file);
+                }
+                // printf("\n");
             }
-            // printf("\n");
         }
     }
 
@@ -359,7 +464,8 @@ int _ablog_rm_old_files(char *dir, time_t time_before)
 
 int remove_old_log_files(time_t time_before)
 {
-    if (strlen(ablog_filename.instance) == 0) {
+    if (strlen(ablog_filename.instance) == 0)
+    {
         return 0;
     }
     char buf[MAX_FILEPATH_LENGTH];
@@ -374,7 +480,7 @@ int remove_old_log_files(time_t time_before)
     {
         strcpy(buf, "./");
     }
-    return _ablog_rm_old_files(buf, time_before);
+    return _ablog_rm_old_files(&ablog_filename, buf, time_before);
 }
 
 void _ablog_set_log_file()
@@ -387,7 +493,8 @@ void _ablog_set_log_file()
         }
         if (strlen(ablog_filename.instance) > 0)
         {
-            if (mk_parent_dir(ablog_filename.instance) != 0) {
+            if (mk_parent_dir(ablog_filename.instance) != 0)
+            {
                 _ablog_stdout = 1;
                 log_println("ABLOG_FILE", __FILE__, __LINE__, "make parent dir error, '%s'",
                             ablog_filename.instance);
@@ -478,14 +585,16 @@ int get_log_print_level()
  */
 void log_println(const char *func, const char *file, int line, const char *format, ...)
 {
-    if (ablog_file()) {
+    if (ablog_file())
+    {
         va_list vl;
         va_start(vl, format);
         log_println_vl(1, func, file, line, format, vl);
         va_end(vl);
     }
 
-    if (ablog_stdout()) {
+    if (ablog_stdout())
+    {
         va_list vl;
         va_start(vl, format);
         log_println_vl(0, func, file, line, format, vl);
@@ -499,17 +608,18 @@ void log_println_vl(int fileout, const char *func, const char *file, int line, c
     gettimeofday(&tv, NULL);
 
     FILE *fout = stdout;
-    if (fileout) {
+    if (fileout)
+    {
         _ablog_set_log_file();
         fout = _ablog_file;
-        if (!fout) {
+        if (!fout)
+        {
             return;
         }
     }
 
     pthread_mutex_lock(&thread_mut);
 
-
     fprintf(fout, "%s.%03ld[%s]:", time_string(tv.tv_sec), (tv.tv_usec / 1000l), func);
     vfprintf(fout, format, vl);
     fprintf(fout, " (%s:%d)\n", file, line);
@@ -997,57 +1107,57 @@ char *trimLeft(const char *str, char *rbuf)
     return rbuf;
 }
 
-char *trim(const char *str, char *rbuf)
-{
-    if (rbuf == 0)
-    {
-        rbuf = (char *)str;
-    }
-    trimLeft(str, rbuf);
-    trimRight(rbuf, rbuf);
-    return rbuf;
-}
-
-long read_mapping_file(int fd)
-{
-    ABLOG_Entry(Public, read_mapping_file);
-
-    long ret_map_id = new_mapping();
-
-    int flen = lseek(fd, 0, SEEK_END);
-    char *buf = MALLOC(flen + 1);
-    memset(buf, 0, flen + 1);
-    lseek(fd, 0, SEEK_SET);
-    read(fd, buf, flen);
-
-    char *line_buffer = buf;
-    while (line_buffer[0])
-    {
-        char *newline = strstr(line_buffer, "\n");
-        if (newline)
-        {
-            newline[0] = 0;
-        }
-        trimLeft(line_buffer, 0);
-        if (line_buffer[0] != '#' && strlen(line_buffer) > 0)
-        {
-            char *kv[2];
-            split_string(line_buffer, '=', line_buffer, flen + 1, kv, 2);
-
-            char *key = trim(kv[0], 0);
-            char *value = trim(kv[1], 0);
-
-            ABLOG_Printf(Detail, (ABLOG, "[D] %s=%s", key, value));
-
-            put_string_mapping(ret_map_id, key, value);
-        }
-        line_buffer = newline ? newline + 1 : 0;
-    }
-
-    FREE(buf);
-
-    ABLOG_Return_Long(ret_map_id);
-}
+// char *trim(const char *str, char *rbuf)
+// {
+//     if (rbuf == 0)
+//     {
+//         rbuf = (char *)str;
+//     }
+//     trimLeft(str, rbuf);
+//     trimRight(rbuf, rbuf);
+//     return rbuf;
+// }
+
+// long read_mapping_file(int fd)
+// {
+//     ABLOG_Entry(Public, read_mapping_file);
+
+//     long ret_map_id = new_mapping();
+
+//     int flen = lseek(fd, 0, SEEK_END);
+//     char *buf = MALLOC(flen + 1);
+//     memset(buf, 0, flen + 1);
+//     lseek(fd, 0, SEEK_SET);
+//     read(fd, buf, flen);
+
+//     char *line_buffer = buf;
+//     while (line_buffer[0])
+//     {
+//         char *newline = strstr(line_buffer, "\n");
+//         if (newline)
+//         {
+//             newline[0] = 0;
+//         }
+//         trimLeft(line_buffer, 0);
+//         if (line_buffer[0] != '#' && strlen(line_buffer) > 0)
+//         {
+//             char *kv[2];
+//             split_string(line_buffer, '=', line_buffer, flen + 1, kv, 2);
+
+//             char *key = trim(kv[0], 0);
+//             char *value = trim(kv[1], 0);
+
+//             ABLOG_Printf(Detail, (ABLOG, "[D] %s=%s", key, value));
+
+//             put_string_mapping(ret_map_id, key, value);
+//         }
+//         line_buffer = newline ? newline + 1 : 0;
+//     }
+
+//     FREE(buf);
+
+//     ABLOG_Return_Long(ret_map_id);
+// }
 
 int write_mapping_file(int fd, long mid)
 {
@@ -1075,7 +1185,8 @@ int mk_dir(char *dir)
     if ((mydir = opendir(dir)) == NULL) // 判断目录
     {
         int ret = mk_parent_dir(dir);
-        if (ret != 0) {
+        if (ret != 0)
+        {
             return ret;
         }
         ret = mkdir(dir, DIR_MODE); // 创建目录

+ 7 - 1
abc/ab_code.h

@@ -18,6 +18,7 @@
 #ifndef _AB_CODE_H_
 #define _AB_CODE_H_
 
+#include <regex.h>
 
 #ifndef TRUE
 #define TRUE (1==1)
@@ -162,6 +163,9 @@ time_t vtime();
 struct pattern_instance
 {
 	char pattern[MAX_FILEPATH_LENGTH];
+	char last_pattern[MAX_FILEPATH_LENGTH];
+	char regex[MAX_FILEPATH_LENGTH];
+	regex_t reg; //pattern转换为正则表达式
 	time_t instantiated_time; //实例化时间
 	time_t expiration_ct; //文件实例名过期时间,以系统时间表示
 	time_t expiration_vt; //文件实例名过期时间,以虚拟时间表示
@@ -175,6 +179,8 @@ void set_log_stdout(int yn);
 int ablog_stdout();
 int ablog_file();
 
+void clear_pattern_instance(struct pattern_instance *pi);
+
 /**
  * 实例化一个模式字符串
  * 模式中可以替换内容包括
@@ -262,7 +268,7 @@ char* trimRight(const char *str, char *rbuf);
  * 去掉字符串 str 两边的空白字符并存入 rbuf, 返回 rbuf
  * rbuf的可用内存大小必须大于 strlen(str)
  */
-char* trim(const char *str, char *rbuf);
+// char* trim(const char *str, char *rbuf);
 /**
  * 读mapping文件
  */

+ 0 - 2
abc/log.c

@@ -128,8 +128,6 @@ static void init_event(log_Event *ev, void *udata)
     ev->udata = udata;
 }
 
-static int ablog_level = -1;
-
 void log_log(int level, const char *file, int line, const char *fmt, ...)
 {
     if (level < LOG_TRACE || level > LOG_FATAL)

+ 1 - 1
abc/log_test.c

@@ -25,7 +25,7 @@ int main(int argc, char *argv[])
     log_error("https://github.com/rxi/log.c 开源日志输出库原始输出样式");
     log_fatal("https://github.com/rxi/log.c 开源日志输出库原始输出样式");
 
-    set_log_filename("./_logs/test[yyyy][mm][dd].txt"); // 设置日志输出文件名
+    set_log_filename("./_logs/test[{(&*+|?^$)}}][PID][yyyy][mm][dd][hh][mi].txt"); // 设置日志输出文件名
     set_log_keephours(1); // 设置日志文件保留小时数
     set_log_stdout(1); // 打开标准输出信息打印
     log_set_quiet(1); // 关闭标准错误信息打印

+ 1 - 0
abc/make.sh

@@ -1,3 +1,4 @@
 
 echo "build ..."
 gcc -o log_test -I../include log_test.c log.c ab_map.c ab_code.c
+

+ 7 - 0
ping/host.txt

@@ -34,3 +34,10 @@ www.qyer.com,www.lvmama.com,www.csair.com,www.chinahr.com,www.lagou.com,www.liep
 www.eastmoney.com,www.stockstar.com,www.sina.com.cn,www.cnfol.com,www.xueqiu.com,www.fang.com,www.anjuke.com
 www.lianjia.com,www.ganji.com,www.xiachufang.com,www.baixing.com,www.dianping.com,www.meituan.com,www.tianyancha.com
 www.huanqiu.com,www.guancha.cn,www.lottery.gov.cn,www.cwl.gov.cn,www.icbc.com.cn,www.boc.com,www.ccb.com
+live.com,pan.baidu.com,hexun.com,
+kugou.com
+taihe.com
+baixing.com
+115.com
+le.com,wo.cn,yeah.net
+weiyun.com,weibo.com,189.cn,jianguoyun.com

+ 34 - 18
ping/ping.go

@@ -48,7 +48,8 @@ type InputConfig struct {
 type StatInfo struct {
 	MinRtt    time.Duration
 	MaxRtt    time.Duration
-	LossTimes int
+	AvgRtt    time.Duration
+	LossCount int
 	Count     int
 }
 
@@ -74,7 +75,7 @@ func main() {
 	}
 	inputcfg.Poolsize = mcfg.GetInt("poolsize", inputcfg.Poolsize)
 	fips := func() []string {
-		xips := mcfg.GetStrings("ip|ping.ip", "127.0.0.1")
+		xips := mcfg.GetStrings("ip|ping.ip", "")
 		sips := strset.New()
 		for _, aips := range xips {
 			sips.Add(strings.Split(aips, ",")...)
@@ -102,6 +103,7 @@ func main() {
 	input.Init(inputcfg)
 	go input.Run()
 	go func() {
+		// last_count := 0
 		t := time.NewTicker(1 * time.Second)
 		for {
 			select {
@@ -115,11 +117,16 @@ func main() {
 				sort.Strings(ks)
 				for i, k := range ks {
 					v := input.statinfo[k]
-					s += fmt.Sprintf("%-3d %-20s: %s ~ %s loss %d/%d\n", i, k, v.MinRtt, v.MaxRtt, v.LossTimes, v.Count)
+					s += fmt.Sprintf("%-3d %-20s: %-12s [%12s ~ %-12s] loss %d/%d\n", i, k, v.AvgRtt, v.MinRtt, v.MaxRtt, v.LossCount, v.Count)
 				}
 				input.statinfomutex.Unlock()
-				logger.Info("统计信息更新:", fmt.Sprint("\n", s))
-				logger.Info(util.FormatDuration(time.Since(starttime)), "已经完成", pingcount, "次Ping操作,平均每秒", (int64(pingcount+1) * int64(time.Second) / int64(time.Since(starttime))), "次")
+				// logger.Info("统计信息更新:", fmt.Sprint("\n", s))
+				// logger.Info(util.FormatDuration(time.Since(starttime)), "已经完成", pingcount, "次Ping操作,",
+				// 	"平均每秒", (int64(pingcount+1) * int64(time.Second) / int64(time.Since(starttime))), "次",
+				// 	"最近一秒", (pingcount - int32(last_count)), "次",
+				// 	"缓存", probing.BufferUsedCount(),
+				// )
+				// last_count = int(pingcount)
 			}
 		}
 	}()
@@ -135,9 +142,9 @@ func main() {
 		if ip != "" {
 			input.workChan <- &task{
 				Server:         ip,
-				Timeout:        mcfg.GetInt("timeout", 3),
-				NumberOfPings:  mcfg.GetInt("count", 50), // 至少为2,小于2会导致不能正常结束
-				PacketInterval: mcfg.GetInt("interval", 200),
+				Timeout:        mcfg.GetInt("timeout", 1),
+				NumberOfPings:  mcfg.GetInt("count", 5), // 至少为2,小于2会导致不能正常结束
+				PacketInterval: mcfg.GetInt("interval", 1000),
 				PacketSize:     mcfg.GetInt("size", 32),
 			}
 		}
@@ -255,11 +262,14 @@ func (input *Input) send(t *task, workerNum int) error {
 		}
 		var (
 			consecutiveFailures int
-			recvList            []map[int]bool
 		)
+		var recvMutex sync.Mutex
+		var recvList = map[int]int{}
 
 		pinger.OnSend = func(pkt *probing.Packet) {
-			recvList = append(recvList, map[int]bool{pkt.Seq: false})
+			recvMutex.Lock()
+			recvList[pkt.Seq]++
+			recvMutex.Unlock()
 			input.statinfomutex.Lock()
 			si := input.statinfo[t.Server]
 			if si == nil {
@@ -271,7 +281,9 @@ func (input *Input) send(t *task, workerNum int) error {
 		}
 		pingMsg := fmt.Sprintf("\nWoker %d PING %s (%s):\n", workerNum, pinger.Addr(), pinger.IPAddr())
 		pinger.OnRecv = func(pkt *probing.Packet) {
-			recvList[pkt.Seq][pkt.Seq] = true
+			recvMutex.Lock()
+			recvList[pkt.Seq]++
+			recvMutex.Unlock()
 			s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v\n",
 				time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
 			pingMsg += s
@@ -280,7 +292,9 @@ func (input *Input) send(t *task, workerNum int) error {
 			atomic.AddInt32(&pingcount, 1)
 		}
 		pinger.OnDuplicateRecv = func(pkt *probing.Packet) {
-			recvList[pkt.Seq][pkt.Seq] = true
+			recvMutex.Lock()
+			recvList[pkt.Seq]++
+			recvMutex.Unlock()
 			s := fmt.Sprintf("%s %d bytes from %s: icmp_seq=%d time=%v (DUP!)\n",
 				time.Now().Format("15:04:05.000"), pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
 			pingMsg += s
@@ -309,7 +323,8 @@ func (input *Input) send(t *task, workerNum int) error {
 				si = &StatInfo{
 					MinRtt:    stats.MinRtt,
 					MaxRtt:    stats.MaxRtt,
-					LossTimes: 0,
+					AvgRtt:    stats.AvgRtt,
+					LossCount: 0,
 					Count:     0,
 				}
 				input.statinfo[t.Server] = si
@@ -320,9 +335,8 @@ func (input *Input) send(t *task, workerNum int) error {
 			if stats.MaxRtt > si.MaxRtt {
 				si.MaxRtt = stats.MaxRtt
 			}
-			if stats.PacketLoss > 0 {
-				si.LossTimes++
-			}
+			si.AvgRtt = stats.AvgRtt
+			si.LossCount += stats.PacketsSent - stats.PacketsRecv
 			input.statinfomutex.Unlock()
 		}
 		m["requestTime"] = time.Now().UnixNano() / int64(time.Millisecond)
@@ -336,8 +350,9 @@ func (input *Input) send(t *task, workerNum int) error {
 		m["responseTime"] = time.Now().UnixNano() / int64(time.Millisecond)
 
 		var failCount, totalFailCount int
+		recvMutex.Lock()
 		for i := range recvList {
-			if !recvList[i][i] {
+			if recvList[i] < 2 {
 				failCount++
 				message = "Packet loss"
 				totalFailCount++
@@ -347,13 +362,14 @@ func (input *Input) send(t *task, workerNum int) error {
 			if failCount > consecutiveFailures {
 				consecutiveFailures = failCount
 			}
-			pingMsg += fmt.Sprintf("icmp_seq:%d %t\n", i, recvList[i][i])
+			pingMsg += fmt.Sprintf("icmp_seq:%d %t\n", i, (recvList[i] >= 2))
 		}
 		// logger.Debug(pingMsg)
 		m["consecutiveFailures"] = consecutiveFailures
 		if totalFailCount == len(recvList) {
 			message = "ICMP echo failed"
 		}
+		recvMutex.Unlock()
 	}
 
 	// Special

+ 279 - 0
ping/probing/icmp/packetconn.go

@@ -0,0 +1,279 @@
+package icmp
+
+import (
+	"encoding/binary"
+	"fmt"
+	"net"
+	"sync"
+	"syscall"
+	"time"
+
+	"golang.org/x/net/icmp"
+	"golang.org/x/net/ipv4"
+	"golang.org/x/net/ipv6"
+)
+
+const (
+	protocolICMP     = 1
+	protocolIPv6ICMP = 58
+)
+
+var ENOLISTENER = fmt.Errorf("no listener")
+
+type Type icmp.Type
+
+type PacketConn interface {
+	Close() error
+	ICMPRequestType() Type
+	ReadFrom(b []byte) (n int, ttl int, src net.Addr, err error)
+	SetFlagTTL() error
+	SetReadDeadline(t time.Time) error
+	WriteTo(b []byte, dst net.Addr) (int, error)
+	SetTTL(ttl int)
+}
+
+var (
+	ipv4Proto = map[string]string{"icmp": "ip4:icmp", "udp": "udp4"}
+	ipv6Proto = map[string]string{"icmp": "ip6:ipv6-icmp", "udp": "udp6"}
+)
+
+func Listen(ipv4 bool, protocol string, source string) (conn PacketConn, err error) {
+	if ipv4 {
+		var c icmpv4Conn
+		c.c, err = icmp.ListenPacket(ipv4Proto[protocol], source)
+		conn = &c
+	} else {
+		var c icmpV6Conn
+		c.c, err = icmp.ListenPacket(ipv6Proto[protocol], source)
+		conn = &c
+	}
+	return
+}
+
+// Packet represents a received and processed ICMP echo packet.
+type Packet struct {
+	// IPAddr is the address of the host being pinged.
+	IPAddr *net.IPAddr
+	// ID is the ICMP identifier.
+	ID int
+	// Seq is the ICMP sequence number.
+	Seq int
+	// TTL is the Time To Live on the packet.
+	TTL int
+	// NBytes is the number of bytes in the message.
+	Nbytes int
+	// Rtt is the round-trip time it took to ping.
+	Rtt time.Duration
+}
+
+type recvPkt struct {
+	recvtime time.Time
+	addr     net.Addr
+	bytes    []byte
+	nbytes   int
+	ttl      int
+}
+
+type MPacketConn struct {
+	IPV4         bool
+	Protocol     string
+	Source       string
+	Backlog      int
+	OnRecvPacket func(addr net.Addr, pkt *Packet)
+	OnRecvError  func(error)
+	mutex        sync.Mutex
+	conn         PacketConn
+	done         chan interface{}
+	recvbuf      chan *recvPkt
+}
+
+func (mp *MPacketConn) Listen() error {
+	conn, err := Listen(mp.IPV4, mp.Protocol, mp.Source)
+	if err != nil {
+		return err
+	}
+
+	conn.SetTTL(64)
+	if err := conn.SetFlagTTL(); err != nil {
+		return err
+	}
+
+	mp.done = make(chan interface{})
+	mp.recvbuf = make(chan *recvPkt, mp.Backlog)
+	mp.conn = conn
+	go mp.recvICMP()
+	go mp.processRecvPacket()
+
+	return nil
+}
+
+func (mp *MPacketConn) Close() error {
+	mp.mutex.Lock()
+	defer mp.mutex.Unlock()
+
+	open := true
+	select {
+	case _, open = <-mp.done:
+	default:
+	}
+
+	if open {
+		close(mp.done)
+	}
+	return mp.conn.Close()
+}
+
+func (mp *MPacketConn) recvICMP() {
+	bytes := make([]byte, 65536)
+	for {
+		select {
+		case <-mp.done:
+			return
+		default:
+			var n, ttl int
+			var addr net.Addr
+			var err error
+			n, ttl, addr, err = mp.conn.ReadFrom(bytes)
+			if err != nil {
+				if neterr, ok := err.(*net.OpError); ok {
+					if neterr.Timeout() {
+						// Read timeout
+						continue
+					}
+				}
+				if mp.OnRecvError != nil {
+					mp.OnRecvError(err)
+				} else {
+					fmt.Println(err)
+				}
+			}
+			bs := make([]byte, n)
+			copy(bs, bytes[:n])
+			select {
+			case <-mp.done:
+				return
+			case mp.recvbuf <- &recvPkt{recvtime: time.Now(), addr: addr, bytes: bs, nbytes: n, ttl: ttl}:
+			}
+		}
+	}
+}
+
+func (mp *MPacketConn) SendPacket(pkt *Packet, addr *net.IPAddr) error {
+	if mp.conn == nil {
+		return ENOLISTENER
+	}
+	msgBytes, err := BuildEchoRequestMessage(pkt.ID, pkt.Seq, pkt.Nbytes, mp.conn.ICMPRequestType())
+	if err != nil {
+		return err
+	}
+	var dst net.Addr = addr
+	if mp.Protocol == "udp" {
+		dst = &net.UDPAddr{IP: addr.IP, Zone: addr.Zone}
+	}
+	for {
+		select {
+		case <-mp.done:
+			return nil
+		default:
+		}
+		if _, err := mp.conn.WriteTo(msgBytes, dst); err != nil {
+			if neterr, ok := err.(*net.OpError); ok {
+				if neterr.Err == syscall.ENOBUFS {
+					if mp.OnRecvError != nil {
+						mp.OnRecvError(neterr.Err)
+					} else {
+						fmt.Println("缓存不够,发送失败,重发")
+					}
+					continue
+				}
+			}
+			return err
+		} else {
+			return nil
+		}
+	}
+}
+
+func (mp *MPacketConn) processRecvPacket() {
+	for pkt := range mp.recvbuf {
+		err := mp.processPacket(pkt)
+		if err != nil {
+			if mp.OnRecvError != nil {
+				mp.OnRecvError(err)
+			} else {
+				fmt.Println(err)
+			}
+		}
+	}
+}
+
+var count = 0
+
+func (mp *MPacketConn) processPacket(recv *recvPkt) error {
+	var proto int
+	if mp.IPV4 {
+		proto = protocolICMP
+	} else {
+		proto = protocolIPv6ICMP
+	}
+
+	// fmt.Println(count, "from", recv.addr.String(), "bytes", recv.bytes)
+
+	var m *icmp.Message
+	var err error
+	if m, err = icmp.ParseMessage(proto, recv.bytes); err != nil {
+		return fmt.Errorf("error parsing icmp message: %w", err)
+	}
+
+	if m.Type != ipv4.ICMPTypeEchoReply && m.Type != ipv6.ICMPTypeEchoReply {
+		// Not an echo reply, ignore it
+		return nil
+	}
+
+	switch pkt := m.Body.(type) {
+	case *icmp.Echo:
+		return mp.processEchoReply(pkt, recv)
+	default:
+		// Very bad, not sure how this can happen
+		return fmt.Errorf("invalid ICMP echo reply; type: '%T', '%v'", pkt, pkt)
+	}
+}
+
+func (mp *MPacketConn) processEchoReply(pkt *icmp.Echo, recv *recvPkt) error {
+	if len(pkt.Data) < 24 {
+		return nil
+	}
+
+	sendtime := int64(binary.BigEndian.Uint64(pkt.Data[:8]))
+	fullseq := int(binary.BigEndian.Uint64(pkt.Data[8:16]))
+	fullid := int(binary.BigEndian.Uint64(pkt.Data[16:24]))
+
+	if fullid%65536 != pkt.ID || fullseq%65536 != pkt.Seq {
+		return nil
+	}
+
+	inPkt := &Packet{
+		Addr:   recv.addr.String(),
+		ID:     fullid,
+		Seq:    fullseq,
+		Nbytes: recv.nbytes,
+		TTL:    recv.ttl,
+		Rtt:    recv.recvtime.Sub(time.Unix(0, sendtime)),
+	}
+
+	// fmt.Printf("%s %d bytes from %s: icmp_seq=%d time=%v\n",
+	// 	time.Now().Format("15:04:05.000"), inPkt.Nbytes, inPkt.IPAddr, inPkt.Seq, inPkt.Rtt)
+
+	p.mutex.Lock()
+	chpkt, inflight := pinfo.seqpkt[fullseq]
+	if inflight {
+		// remove it from the list of sequences we're waiting for so we don't get duplicates.
+		delete(pinfo.seqpkt, fullseq)
+	}
+	p.mutex.Unlock()
+
+	if chpkt != nil {
+		chpkt <- inPkt
+	}
+	return nil
+}

+ 52 - 0
ping/probing/icmp/utils.go

@@ -0,0 +1,52 @@
+package icmp
+
+import (
+	"bytes"
+	"encoding/binary"
+	"net"
+	"time"
+
+	"golang.org/x/net/icmp"
+)
+
+func netAddrToIPAddr(a net.Addr) *net.IPAddr {
+	switch v := a.(type) {
+	case *net.UDPAddr:
+		if ip := v.IP.To4(); ip != nil {
+			return ip
+		}
+	case *net.IPAddr:
+		if ip := v.IP.To4(); ip != nil {
+			return ip
+		}
+	}
+	return nil
+}
+
+func BuildEchoRequestMessage(id, seq, size int, icmptype Type) ([]byte, error) {
+	if size < 24 {
+		size = 24
+	}
+	bs := bytes.Repeat([]byte{1}, size)
+	binary.BigEndian.PutUint64(bs, uint64(time.Now().UnixNano()))
+	binary.BigEndian.PutUint64(bs[8:], uint64(seq))
+	binary.BigEndian.PutUint64(bs[16:], uint64(id))
+
+	body := &icmp.Echo{
+		ID:   id % 65535,
+		Seq:  seq % 65536,
+		Data: bs,
+	}
+
+	msg := &icmp.Message{
+		Type: icmptype,
+		Code: 0,
+		Body: body,
+	}
+
+	msgBytes, err := msg.Marshal(nil)
+	if err != nil {
+		return nil, err
+	}
+	return msgBytes, nil
+}

+ 3 - 13
ping/probing/packetconn.go

@@ -1,4 +1,4 @@
-package probing
+package icmp
 
 import (
 	"net"
@@ -10,16 +10,6 @@ import (
 	"golang.org/x/net/ipv6"
 )
 
-type packetConn interface {
-	Close() error
-	ICMPRequestType() icmp.Type
-	ReadFrom(b []byte) (n int, ttl int, src net.Addr, err error)
-	SetFlagTTL() error
-	SetReadDeadline(t time.Time) error
-	WriteTo(b []byte, dst net.Addr) (int, error)
-	SetTTL(ttl int)
-}
-
 type icmpConn struct {
 	c   *icmp.PacketConn
 	ttl int
@@ -73,7 +63,7 @@ func (c *icmpv4Conn) ReadFrom(b []byte) (int, int, net.Addr, error) {
 	return n, ttl, src, err
 }
 
-func (c icmpv4Conn) ICMPRequestType() icmp.Type {
+func (c icmpv4Conn) ICMPRequestType() Type {
 	return ipv4.ICMPTypeEcho
 }
 
@@ -98,6 +88,6 @@ func (c *icmpV6Conn) ReadFrom(b []byte) (int, int, net.Addr, error) {
 	return n, ttl, src, err
 }
 
-func (c icmpV6Conn) ICMPRequestType() icmp.Type {
+func (c icmpV6Conn) ICMPRequestType() Type {
 	return ipv6.ICMPTypeEchoRequest
 }

+ 22 - 0
ping/probing/message.go

@@ -0,0 +1,22 @@
+package probing
+
+import (
+	"net"
+	"time"
+)
+
+// Packet represents a received and processed ICMP echo packet.
+type xPacket struct {
+	// IPAddr is the address of the host being pinged.
+	IPAddr *net.IPAddr
+	// ID is the ICMP identifier.
+	ID int
+	// Seq is the ICMP sequence number.
+	Seq int
+	// TTL is the Time To Live on the packet.
+	TTL int
+	// NBytes is the number of bytes in the message.
+	Nbytes int
+	// Rtt is the round-trip time it took to ping.
+	Rtt time.Duration
+}

+ 185 - 149
ping/probing/mpconn.go

@@ -1,43 +1,70 @@
 package probing
 
 import (
-	"bytes"
+	"encoding/binary"
 	"fmt"
 	"net"
 	"sync"
 	"sync/atomic"
 	"syscall"
 	"time"
+	"trial/ping/probing/icmp"
 
-	"github.com/google/uuid"
-	"golang.org/x/net/icmp"
 	"golang.org/x/net/ipv4"
 	"golang.org/x/net/ipv6"
 )
 
+type recvPkt struct {
+	recvtime time.Time
+	addr     net.Addr
+	bytes    []byte
+	nbytes   int
+	ttl      int
+}
+
 type mpingconn struct {
-	mutex    sync.Mutex
-	ipv4     bool
-	protocol string
-	Source   string
-	Conn     packetConn
-	done     chan interface{}
-	pingid   map[string]int
-	pingseq  map[int]int
-	pingfcb  map[int]func(*Packet)
+	mutex        sync.Mutex
+	ipv4         bool
+	protocol     string
+	Source       string
+	Conn         icmp.PacketConn
+	done         chan interface{}
+	recvchan     chan *recvPkt
+	pinghostinfo map[string]*mpinfo
+	pingidinfo   map[int]*mpinfo
+	OnError      func(error)
+}
+
+type mpinfo struct {
+	host        string
+	ipaddr      *net.IPAddr
+	id          int
+	lastseq     int
+	size        int
+	timeout     time.Duration
+	seqpkt      map[int]chan *Packet
+	OnSend      func(*Packet)
+	OnRecv      func(*Packet)
+	OnRecvDup   func(*Packet)
+	OnRecvError func(error)
 }
 
-var mpconn = &mpingconn{
-	ipv4:     true,
-	protocol: "udp",
-	Source:   "",
-	done:     make(chan interface{}),
-	pingid:   make(map[string]int),
-	pingseq:  make(map[int]int),
-	pingfcb:  make(map[int]func(*Packet)),
+var mpconn = newMPConn()
+
+func newMPConn() *mpingconn {
+	mpconn := &mpingconn{
+		ipv4:         true,
+		protocol:     "udp",
+		Source:       "",
+		done:         make(chan interface{}),
+		recvchan:     make(chan *recvPkt, receive_buffer_count),
+		pinghostinfo: make(map[string]*mpinfo),
+		pingidinfo:   make(map[int]*mpinfo),
+	}
+	return mpconn
 }
 
-func (p *mpingconn) listen() (packetConn, error) {
+func (p *mpingconn) listen() (icmp.PacketConn, error) {
 	p.mutex.Lock()
 	defer p.mutex.Unlock()
 
@@ -45,29 +72,19 @@ func (p *mpingconn) listen() (packetConn, error) {
 		return p.Conn, nil
 	}
 
-	var (
-		conn packetConn
-		err  error
-	)
-
-	if p.ipv4 {
-		var c icmpv4Conn
-		c.c, err = icmp.ListenPacket(ipv4Proto[p.protocol], p.Source)
-		conn = &c
-	} else {
-		var c icmpV6Conn
-		c.c, err = icmp.ListenPacket(ipv6Proto[p.protocol], p.Source)
-		conn = &c
-	}
-
+	conn, err := icmp.Listen(p.ipv4, p.protocol, p.Source)
 	if err != nil {
 		return nil, err
 	}
 	p.Conn = conn
 
-	go func() {
-		p.recvICMP()
-	}()
+	conn.SetTTL(64)
+	if err := conn.SetFlagTTL(); err != nil {
+		return nil, err
+	}
+
+	go p.recvICMP()
+	go p.processRecvPacket()
 
 	return p, nil
 }
@@ -113,72 +130,98 @@ func (p *mpingconn) SetTTL(ttl int) {
 }
 
 var pingid int32
-
-func newPingID() int {
-	return int(atomic.AddInt32(&pingid, 1))
-}
-
-func (p *mpingconn) ping(addr *net.IPAddr, size int, timeout time.Duration, onSend func(*Packet), onRecv func(*Packet)) (time.Duration, error) {
-	currentUUID, err := uuid.NewUUID()
-	if err != nil {
-		return -1, fmt.Errorf("NewUUID: %w", err)
-	}
-	uuidEncoded, err := currentUUID.MarshalBinary()
-	if err != nil {
-		return -1, fmt.Errorf("unable to marshal UUID binary: %w", err)
+var ETIMEDOUT error = fmt.Errorf("timeout")
+
+func newPingInfo(host string, ipaddr *net.IPAddr, size int, timeout time.Duration) *mpinfo {
+	return &mpinfo{
+		host:    host,
+		ipaddr:  ipaddr,
+		id:      int(atomic.AddInt32(&pingid, 1)),
+		seqpkt:  make(map[int]chan *Packet),
+		size:    size,
+		timeout: timeout,
 	}
+}
 
+func (p *mpingconn) ping(host string, ipaddr *net.IPAddr, size int, timeout time.Duration, onSend func(*Packet), onRecv func(*Packet)) error {
 	p.mutex.Lock()
-	var pid int
+	var pinfo *mpinfo
 	var has bool
-	if pid, has = p.pingid[addr.String()]; !has {
-		pid = newPingID()
-		p.pingid[addr.String()] = pid
+	if pinfo, has = p.pinghostinfo[ipaddr.String()]; !has {
+		pinfo = newPingInfo(host, ipaddr, size, timeout)
+		p.pinghostinfo[ipaddr.String()] = pinfo
+		p.pingidinfo[pinfo.id] = pinfo
 	}
-	var psq = p.pingseq[pid]
-	p.pingseq[pid]++
-	p.pingfcb[pid] = onRecv
+	seq := pinfo.lastseq
+	pinfo.lastseq++
+	recvpkt := make(chan *Packet, 1)
+	pinfo.seqpkt[seq] = recvpkt
 	p.mutex.Unlock()
 
-	t := append(timeToBytes(time.Now()), uuidEncoded...)
-	if remainSize := size - len(t); remainSize > 0 {
-		t = append(t, bytes.Repeat([]byte{1}, remainSize)...)
-	}
-
-	body := &icmp.Echo{
-		ID:   pid,
-		Seq:  psq,
-		Data: t,
-	}
-
-	msg := &icmp.Message{
-		Type: p.ICMPRequestType(),
-		Code: 0,
-		Body: body,
-	}
-
-	msgBytes, err := msg.Marshal(nil)
+	msgBytes, err := icmp.BuildEchoRequestMessage(pinfo.id, seq, size, p.ICMPRequestType())
 	if err != nil {
-		return -1, err
+		return err
 	}
 
-	err = p.sendICMP(msgBytes, addr)
+	err = p.sendICMP(msgBytes, ipaddr)
 	if err != nil {
-		return -1, err
+		return err
 	}
-	onSend(&Packet{
+	outpkt := &Packet{
 		Nbytes: len(msgBytes),
-		IPAddr: addr,
-		Seq:    psq,
-		ID:     pid,
-	})
-
-	time.NewTimer(timeout)
-
+		Host:   host,
+		IPAddr: ipaddr,
+		Seq:    seq,
+		ID:     pinfo.id,
+	}
+	if onSend != nil {
+		onSend(outpkt)
+	}
+	go func(onRecv func(*Packet), recvpkt chan *Packet) {
+		t := time.NewTimer(timeout)
+		select {
+		case <-t.C:
+		case inpkt := <-recvpkt:
+			if onRecv != nil {
+				onRecv(inpkt)
+			}
+		case <-p.done:
+		}
+		// clear();
+	}(onRecv, recvpkt)
 	return nil
 }
 
-func (p *mpingconn) recvICMP(recv chan<- *packet) error {
+func (p *mpingconn) sendICMP(msgBytes []byte, addr *net.IPAddr) error {
+	var dst net.Addr = addr
+	if p.protocol == "udp" {
+		dst = &net.UDPAddr{IP: addr.IP, Zone: addr.Zone}
+	}
+	for {
+		select {
+		case <-p.done:
+			return nil
+		default:
+		}
+		if _, err := p.Conn.WriteTo(msgBytes, dst); err != nil {
+			if neterr, ok := err.(*net.OpError); ok {
+				if neterr.Err == syscall.ENOBUFS {
+					if p.OnError != nil {
+						p.OnError(neterr.Err)
+					} else {
+						fmt.Println("缓存不够,发送失败,重发")
+					}
+					continue
+				}
+			}
+			return err
+		} else {
+			return nil
+		}
+	}
+}
+
+func (p *mpingconn) recvICMP() error {
 	bytes := make([]byte, 65536)
 	for {
 		select {
@@ -186,8 +229,9 @@ func (p *mpingconn) recvICMP(recv chan<- *packet) error {
 			return nil
 		default:
 			var n, ttl int
+			var addr net.Addr
 			var err error
-			n, ttl, _, err = p.Conn.ReadFrom(bytes)
+			n, ttl, addr, err = p.Conn.ReadFrom(bytes)
 			if err != nil {
 				if neterr, ok := err.(*net.OpError); ok {
 					if neterr.Timeout() {
@@ -203,39 +247,31 @@ func (p *mpingconn) recvICMP(recv chan<- *packet) error {
 			select {
 			case <-p.done:
 				return nil
-			case recv <- &packet{bytes: bs, nbytes: n, ttl: ttl}:
+			case p.recvchan <- &recvPkt{recvtime: time.Now(), addr: addr, bytes: bs, nbytes: n, ttl: ttl}:
 			}
 		}
 	}
 }
 
-func (p *mpingconn) sendICMP(msgBytes []byte, addr *net.IPAddr) error {
-	var dst net.Addr = addr
-	if p.protocol == "udp" {
-		dst = &net.UDPAddr{IP: addr.IP, Zone: addr.Zone}
-	}
-	for {
-		select {
-		case <-p.done:
-			return nil
-		default:
+func (p *mpingconn) processRecvPacket() {
+	for pkt := range p.recvchan {
+		if len(p.recvchan) > cap(p.recvchan)*9/10 {
+			fmt.Printf("receive buffer full")
 		}
-		for {
-			if _, err := p.Conn.WriteTo(msgBytes, dst); err != nil {
-				if neterr, ok := err.(*net.OpError); ok {
-					if neterr.Err == syscall.ENOBUFS {
-						fmt.Println("缓存不够,发送失败,重发")
-						continue
-					}
-				}
-				return err
+		err := p.processPacket(pkt)
+		if err != nil {
+			if p.OnError != nil {
+				p.OnError(err)
+			} else {
+				fmt.Println(err)
 			}
 		}
 	}
 }
 
-func (p *mpingconn) processPacket(recv *packet) error {
-	receivedAt := time.Now()
+var count = 0
+
+func (p *mpingconn) processPacket(recv *recvPkt) error {
 	var proto int
 	if p.ipv4 {
 		proto = protocolICMP
@@ -243,6 +279,8 @@ func (p *mpingconn) processPacket(recv *packet) error {
 		proto = protocolIPv6ICMP
 	}
 
+	// fmt.Println(count, "from", recv.addr.String(), "bytes", recv.bytes)
+
 	var m *icmp.Message
 	var err error
 	if m, err = icmp.ParseMessage(proto, recv.bytes); err != nil {
@@ -254,60 +292,58 @@ func (p *mpingconn) processPacket(recv *packet) error {
 		return nil
 	}
 
-	inPkt := &Packet{
-		Nbytes: recv.nbytes,
-		IPAddr: p.ipaddr,
-		Addr:   p.addr,
-		TTL:    recv.ttl,
-		ID:     p.id,
-	}
-
 	switch pkt := m.Body.(type) {
 	case *icmp.Echo:
-		return p.processEchoReply(pkt, receivedAt, inPkt)
+		return p.processEchoReply(pkt, recv)
 	default:
 		// Very bad, not sure how this can happen
 		return fmt.Errorf("invalid ICMP echo reply; type: '%T', '%v'", pkt, pkt)
 	}
 }
 
-func (p *mpingconn) processEchoReply(pkt *icmp.Echo, receivedAt time.Time, inPkt *Packet) error {
-	if !p.matchID(pkt.ID) {
-		// lastpingtimemutex.Lock()
-		// ap := idping[pkt.ID]
-		// lastpingtimemutex.Unlock()
-		// println(fmt.Sprintf("%#v%s%#v", ap, "\n", p))
+func (p *mpingconn) processEchoReply(pkt *icmp.Echo, recv *recvPkt) error {
+	if len(pkt.Data) < 24 {
 		return nil
 	}
 
-	if len(pkt.Data) < timeSliceLength+trackerLength {
-		return fmt.Errorf("insufficient data received; got: %d %v",
-			len(pkt.Data), pkt.Data)
-	}
+	sendtime := int64(binary.BigEndian.Uint64(pkt.Data[:8]))
+	fullseq := int(binary.BigEndian.Uint64(pkt.Data[8:16]))
+	fullid := int(binary.BigEndian.Uint64(pkt.Data[16:24]))
 
-	pktUUID, err := p.getPacketUUID(pkt.Data)
-	if err != nil || pktUUID == nil {
-		return err
+	if fullid%65536 != pkt.ID || fullseq%65536 != pkt.Seq {
+		return nil
 	}
 
-	timestamp := bytesToTime(pkt.Data[:timeSliceLength])
-	inPkt.Rtt = receivedAt.Sub(timestamp)
-	inPkt.Seq = pkt.Seq
-	// If we've already received this sequence, ignore it.
-	if _, inflight := p.awaitingSequences[*pktUUID][pkt.Seq]; !inflight {
-		p.PacketsRecvDuplicates++
-		if p.OnDuplicateRecv != nil {
-			p.OnDuplicateRecv(inPkt)
-		}
+	p.mutex.Lock()
+	pinfo := p.pingidinfo[fullid]
+	p.mutex.Unlock()
+	if pinfo == nil {
 		return nil
 	}
-	// remove it from the list of sequences we're waiting for so we don't get duplicates.
-	delete(p.awaitingSequences[*pktUUID], pkt.Seq)
-	p.updateStatistics(inPkt)
 
-	handler := p.OnRecv
-	if handler != nil {
-		handler(inPkt)
+	inPkt := &Packet{
+		Host:   pinfo.host,
+		IPAddr: pinfo.ipaddr,
+		ID:     pinfo.id,
+		Seq:    fullseq,
+		Nbytes: recv.nbytes,
+		TTL:    recv.ttl,
+		Rtt:    recv.recvtime.Sub(time.Unix(0, sendtime)),
+	}
+
+	// fmt.Printf("%s %d bytes from %s: icmp_seq=%d time=%v\n",
+	// 	time.Now().Format("15:04:05.000"), inPkt.Nbytes, inPkt.IPAddr, inPkt.Seq, inPkt.Rtt)
+
+	p.mutex.Lock()
+	chpkt, inflight := pinfo.seqpkt[fullseq]
+	if inflight {
+		// remove it from the list of sequences we're waiting for so we don't get duplicates.
+		delete(pinfo.seqpkt, fullseq)
+	}
+	p.mutex.Unlock()
+
+	if chpkt != nil {
+		chpkt <- inPkt
 	}
 	return nil
 }

+ 94 - 379
ping/probing/ping.go

@@ -52,28 +52,24 @@
 package probing
 
 import (
-	"bytes"
 	"errors"
-	"fmt"
 	"log"
 	"math"
 	"math/rand"
 	"net"
 	"sync"
 	"sync/atomic"
-	"syscall"
 	"time"
 
 	"git.wecise.com/wecise/common/matrix/cfg"
 	"github.com/google/uuid"
-	"golang.org/x/net/icmp"
-	"golang.org/x/net/ipv4"
-	"golang.org/x/net/ipv6"
 	"golang.org/x/sync/errgroup"
 )
 
 var mcfg = cfg.MConfig()
-var ping_interval_one_host = mcfg.GetDuration("ping.interval.one.host", 10000*time.Millisecond)
+var receive_buffer_count = mcfg.GetInt("ping.recv.buf.count", 100)
+var ttl = mcfg.GetInt("ping.ttl", 64)
+var ping_interval_one_host = mcfg.GetDuration("ping.interval.one.host", 200*time.Millisecond)
 var ping_interval_one_conn = mcfg.GetDuration("ping.interval.one.conn", 200*time.Millisecond)
 var concurlimit_ping = mcfg.GetInt("concurlimit.ping", 100)
 var concurlimit_send = mcfg.GetInt("concurlimit.send", 10)
@@ -91,11 +87,6 @@ const (
 	protocolIPv6ICMP = 58
 )
 
-var (
-	ipv4Proto = map[string]string{"icmp": "ip4:icmp", "udp": "udp4"}
-	ipv6Proto = map[string]string{"icmp": "ip6:ipv6-icmp", "udp": "udp6"}
-)
-
 // New returns a new Pinger struct pointer.
 func New(addr string) *Pinger {
 	r := rand.New(rand.NewSource(getSeed()))
@@ -179,6 +170,9 @@ type Pinger struct {
 	// OnRecv is called when Pinger receives and processes a packet
 	OnRecv func(*Packet)
 
+	// OnRecv is called when Pinger receives and processes a packet
+	OnTimeout func(*Packet)
+
 	// OnFinish is called when Pinger exits
 	OnFinish func(*Statistics)
 
@@ -204,9 +198,10 @@ type Pinger struct {
 	// trackerUUIDs is the list of UUIDs being used for sending packets.
 	trackerUUIDs []uuid.UUID
 
-	ipv4     bool
-	id       int
-	sequence int
+	ipv4          bool
+	id            int
+	sequence_base int
+	sequence      int
 	// awaitingSequences are in-flight sequence numbers we keep track of to help remove duplicate receipts
 	awaitingSequences map[uuid.UUID]map[int]struct{}
 	// network is one of "ip", "ip4", or "ip6".
@@ -219,12 +214,6 @@ type Pinger struct {
 	TTL int
 }
 
-type packet struct {
-	bytes  []byte
-	nbytes int
-	ttl    int
-}
-
 // Packet represents a received and processed ICMP echo packet.
 type Packet struct {
 	// Rtt is the round-trip time it took to ping.
@@ -233,8 +222,8 @@ type Packet struct {
 	// IPAddr is the address of the host being pinged.
 	IPAddr *net.IPAddr
 
-	// Addr is the string address of the host being pinged.
-	Addr string
+	// Host is the string address of the host being pinged.
+	Host string
 
 	// NBytes is the number of bytes in the message.
 	Nbytes int
@@ -414,64 +403,38 @@ func (p *Pinger) ID() int {
 // done. If Count or Interval are not specified, it will run continuously until
 // it is interrupted.
 func (p *Pinger) Run() error {
-	sleeptime := time.Duration(0)
-	for sleeptime >= 0 {
-		time.Sleep(sleeptime)
-		lastpingtimemutex.Lock()
-		alastpingtime := lastpingtime[p.ipaddr.String()]
-		sleeptime = ping_interval_one_host - time.Since(alastpingtime)
-		if sleeptime <= 0 {
-			lastpingtime[p.ipaddr.String()] = time.Now()
-		} else {
-			// logger.Error(fmt.Sprint("ping", p.addr, "[", p.ipaddr.String(), "]", "同一地址至少间隔一秒"))
-		}
-		lastpingtimemutex.Unlock()
-	}
-	lastpingtimemutex.Lock()
-	for idping[p.id] != nil && idping[p.id] != p {
-		p.id = (p.id + 1) % 65536
-	}
-	idping[p.id] = p
-	lastpingtimemutex.Unlock()
-	defer func() {
-		lastpingtimemutex.Lock()
-		lastpingtime[p.ipaddr.String()] = time.Now()
-		delete(idping, p.id)
-		lastpingtimemutex.Unlock()
-	}()
+	// sleeptime := time.Duration(0)
+	// for sleeptime >= 0 {
+	// 	time.Sleep(sleeptime)
+	// 	lastpingtimemutex.Lock()
+	// 	alastpingtime := lastpingtime[p.ipaddr.String()]
+	// 	sleeptime = ping_interval_one_host - time.Since(alastpingtime)
+	// 	if sleeptime <= 0 {
+	// 		lastpingtime[p.ipaddr.String()] = time.Now()
+	// 	} else {
+	// 		// logger.Error(fmt.Sprint("ping", p.addr, "[", p.ipaddr.String(), "]", "同一地址至少间隔一秒"))
+	// 	}
+	// 	lastpingtimemutex.Unlock()
+	// }
+	// defer func() {
+	// 	lastpingtimemutex.Lock()
+	// 	lastpingtime[p.ipaddr.String()] = time.Now()
+	// 	lastpingtimemutex.Unlock()
+	// }()
 	// concurchan_ping <- struct{}{}
 	// defer func() {
 	// 	<-concurchan_ping
 	// }()
-
-	var conn packetConn
-	var err error
-	if p.Size < timeSliceLength+trackerLength {
-		return fmt.Errorf("size %d is less than minimum required size %d", p.Size, timeSliceLength+trackerLength)
-	}
-	if p.ipaddr == nil {
-		err = p.Resolve()
-	}
-	if err != nil {
-		return err
-	}
-	if conn, err = p.listen(); err != nil {
-		return err
-	}
-	defer conn.Close()
-
-	conn.SetTTL(p.TTL)
-	return p.run(conn)
+	return p.run()
 }
 
-func (p *Pinger) run(conn packetConn) error {
-	if err := conn.SetFlagTTL(); err != nil {
-		return err
-	}
+func (p *Pinger) run() error {
 	defer p.finish()
 
-	recv := make(chan *packet, 5)
-	defer close(recv)
+	_, err := mpconn.listen()
+	if err != nil {
+		return err
+	}
 
 	if handler := p.OnSetup; handler != nil {
 		handler()
@@ -481,37 +444,27 @@ func (p *Pinger) run(conn packetConn) error {
 
 	g.Go(func() error {
 		defer p.Stop()
-		return p.recvICMP(conn, recv)
-	})
-
-	g.Go(func() error {
-		defer p.Stop()
-		return p.runLoop(conn, recv)
+		return p.runLoop()
 	})
 
 	return g.Wait()
 }
 
-func (p *Pinger) runLoop(
-	conn packetConn,
-	recvCh <-chan *packet,
-) error {
+func (p *Pinger) runLoop() error {
 
 	logger := p.logger
 	if logger == nil {
 		logger = NoopLogger{}
 	}
 
-	timeout := time.NewTicker(p.Timeout)
-	interval := time.NewTicker(p.Interval)
+	timeout := time.NewTimer(p.Timeout)
+	interval := time.NewTimer(0)
+	timeout.Stop()
 	defer func() {
 		interval.Stop()
 		timeout.Stop()
 	}()
-
-	if err := p.sendICMP(conn); err != nil {
-		return err
-	}
+	received := make(chan interface{}, 1)
 
 	for {
 		select {
@@ -521,28 +474,61 @@ func (p *Pinger) runLoop(
 		case <-timeout.C:
 			return nil
 
-		case r := <-recvCh:
-			err := p.processPacket(r)
-			if err != nil {
-				// FIXME: this logs as FATAL but continues
-				logger.Errorf("processing received packet: %s", err)
-			}
-
 		case <-interval.C:
+			if err := p.ping(received); err != nil {
+				logger.Errorf("sending packet: %s", err)
+			}
 			if p.Count > 0 && p.PacketsSent >= p.Count {
-				interval.Stop()
-				continue
+				timeout.Reset(p.Timeout)
+			} else {
+				interval.Reset(p.Interval)
 			}
-			err := p.sendICMP(conn)
-			if err != nil {
-				// FIXME: this logs as FATAL but continues
-				logger.Errorf("sending packet: %s", err)
+		case <-received:
+			if p.Count > 0 && p.PacketsRecv >= p.Count {
+				return nil
 			}
 		}
-		if p.Count > 0 && p.PacketsRecv >= p.Count {
-			return nil
+	}
+}
+
+func (p *Pinger) ping(received chan interface{}) error {
+
+	sleeptime := time.Duration(0)
+	for sleeptime >= 0 {
+		time.Sleep(sleeptime)
+		lastpingtimemutex.Lock()
+		alastpingtime := lastpingtime[p.ipaddr.String()]
+		sleeptime = ping_interval_one_host - time.Since(alastpingtime)
+		if sleeptime <= 0 {
+			lastpingtime[p.ipaddr.String()] = time.Now()
+		} else {
+			// logger.Error(fmt.Sprint("ping", p.addr, "[", p.ipaddr.String(), "]", "同一地址至少间隔一秒"))
 		}
+		lastpingtimemutex.Unlock()
 	}
+	defer func() {
+		lastpingtimemutex.Lock()
+		lastpingtime[p.ipaddr.String()] = time.Now()
+		lastpingtimemutex.Unlock()
+	}()
+
+	return mpconn.ping(p.addr, p.ipaddr, p.Size, p.Timeout, func(pkt *Packet) {
+		if p.PacketsSent == 0 {
+			p.sequence_base = pkt.Seq
+		}
+		p.PacketsSent++
+		pkt.Seq -= p.sequence_base
+		if p.OnSend != nil {
+			p.OnSend(pkt)
+		}
+	}, func(pkt *Packet) {
+		pkt.Seq -= p.sequence_base
+		if p.OnRecv != nil {
+			p.OnRecv(pkt)
+		}
+		p.updateStatistics(pkt)
+		received <- nil
+	})
 }
 
 func (p *Pinger) Stop() {
@@ -592,281 +578,6 @@ func (p *Pinger) Statistics() *Statistics {
 	return &s
 }
 
-type expBackoff struct {
-	baseDelay time.Duration
-	maxExp    int64
-	c         int64
-}
-
-func (b *expBackoff) Get() time.Duration {
-	if b.c < b.maxExp {
-		b.c++
-	}
-
-	return b.baseDelay * time.Duration(rand.Int63n(1<<b.c))
-}
-
-func newExpBackoff(baseDelay time.Duration, maxExp int64) expBackoff {
-	return expBackoff{baseDelay: baseDelay, maxExp: maxExp}
-}
-
-func (p *Pinger) recvICMP(
-	conn packetConn,
-	recv chan<- *packet,
-) error {
-	// Start by waiting for 50 µs and increase to a possible maximum of ~ 100 ms.
-	expBackoff := newExpBackoff(50*time.Microsecond, 100)
-	delay := expBackoff.Get()
-
-	for {
-		select {
-		case <-p.done:
-			return nil
-		default:
-			bytes := make([]byte, p.getMessageLength())
-			if err := conn.SetReadDeadline(time.Now().Add(delay)); err != nil {
-				return err
-			}
-			var n, ttl int
-			var err error
-			n, ttl, _, err = conn.ReadFrom(bytes)
-			if err != nil {
-				if neterr, ok := err.(*net.OpError); ok {
-					if neterr.Timeout() {
-						// Read timeout
-						delay = expBackoff.Get()
-						continue
-					}
-				}
-				return err
-			}
-
-			select {
-			case <-p.done:
-				return nil
-			case recv <- &packet{bytes: bytes, nbytes: n, ttl: ttl}:
-			}
-		}
-	}
-}
-
-// getPacketUUID scans the tracking slice for matches.
-func (p *Pinger) getPacketUUID(pkt []byte) (*uuid.UUID, error) {
-	var packetUUID uuid.UUID
-	err := packetUUID.UnmarshalBinary(pkt[timeSliceLength : timeSliceLength+trackerLength])
-	if err != nil {
-		return nil, fmt.Errorf("error decoding tracking UUID: %w", err)
-	}
-
-	for _, item := range p.trackerUUIDs {
-		if item == packetUUID {
-			return &packetUUID, nil
-		}
-	}
-	return nil, nil
-}
-
-// getCurrentTrackerUUID grabs the latest tracker UUID.
-func (p *Pinger) getCurrentTrackerUUID() uuid.UUID {
-	return p.trackerUUIDs[len(p.trackerUUIDs)-1]
-}
-
-func (p *Pinger) processPacket(recv *packet) error {
-	receivedAt := time.Now()
-	var proto int
-	if p.ipv4 {
-		proto = protocolICMP
-	} else {
-		proto = protocolIPv6ICMP
-	}
-
-	var m *icmp.Message
-	var err error
-	if m, err = icmp.ParseMessage(proto, recv.bytes); err != nil {
-		return fmt.Errorf("error parsing icmp message: %w", err)
-	}
-
-	if m.Type != ipv4.ICMPTypeEchoReply && m.Type != ipv6.ICMPTypeEchoReply {
-		// Not an echo reply, ignore it
-		return nil
-	}
-
-	inPkt := &Packet{
-		Nbytes: recv.nbytes,
-		IPAddr: p.ipaddr,
-		Addr:   p.addr,
-		TTL:    recv.ttl,
-		ID:     p.id,
-	}
-
-	switch pkt := m.Body.(type) {
-	case *icmp.Echo:
-		return p.processEchoReply(pkt, receivedAt, inPkt)
-	default:
-		// Very bad, not sure how this can happen
-		return fmt.Errorf("invalid ICMP echo reply; type: '%T', '%v'", pkt, pkt)
-	}
-}
-
-var idping = map[int]*Pinger{}
-
-func (p *Pinger) processEchoReply(pkt *icmp.Echo, receivedAt time.Time, inPkt *Packet) error {
-	if !p.matchID(pkt.ID) {
-		// lastpingtimemutex.Lock()
-		// ap := idping[pkt.ID]
-		// lastpingtimemutex.Unlock()
-		// println(fmt.Sprintf("%#v%s%#v", ap, "\n", p))
-		return nil
-	}
-
-	if len(pkt.Data) < timeSliceLength+trackerLength {
-		return fmt.Errorf("insufficient data received; got: %d %v",
-			len(pkt.Data), pkt.Data)
-	}
-
-	pktUUID, err := p.getPacketUUID(pkt.Data)
-	if err != nil || pktUUID == nil {
-		return err
-	}
-
-	timestamp := bytesToTime(pkt.Data[:timeSliceLength])
-	inPkt.Rtt = receivedAt.Sub(timestamp)
-	inPkt.Seq = pkt.Seq
-	// If we've already received this sequence, ignore it.
-	if _, inflight := p.awaitingSequences[*pktUUID][pkt.Seq]; !inflight {
-		p.PacketsRecvDuplicates++
-		if p.OnDuplicateRecv != nil {
-			p.OnDuplicateRecv(inPkt)
-		}
-		return nil
-	}
-	// remove it from the list of sequences we're waiting for so we don't get duplicates.
-	delete(p.awaitingSequences[*pktUUID], pkt.Seq)
-	p.updateStatistics(inPkt)
-
-	handler := p.OnRecv
-	if handler != nil {
-		handler(inPkt)
-	}
-	return nil
-}
-
-func (p *Pinger) sendICMP(conn packetConn) error {
-	concurchan_send <- struct{}{}
-	defer func() {
-		<-concurchan_send
-		lastpingtimemutex.Lock()
-		lastpingtime[p.ipaddr.String()] = time.Now()
-		lastpingtimemutex.Unlock()
-	}()
-	// lastsendtimemutex.Lock()
-	// alastsendtime := lastsendtime[p.addr]
-	// sleeptime := 200*time.Millisecond - time.Since(alastsendtime)
-	// lastsendtimemutex.Unlock()
-	// if sleeptime > 0 {
-	// 	time.Sleep(sleeptime)
-	// }
-	// lastsendtimemutex.Lock()
-	// lastsendtime[p.addr] = time.Now()
-	// lastsendtimemutex.Unlock()
-	// defer func() {
-	// 	lastsendtimemutex.Lock()
-	// 	lastsendtime[p.addr] = time.Now()
-	// 	lastsendtimemutex.Unlock()
-	// }()
-	return p.sendICMPx(conn)
-}
-
-func (p *Pinger) sendICMPx(conn packetConn) error {
-	var dst net.Addr = p.ipaddr
-	if p.protocol == "udp" {
-		dst = &net.UDPAddr{IP: p.ipaddr.IP, Zone: p.ipaddr.Zone}
-	}
-
-	currentUUID := p.getCurrentTrackerUUID()
-	uuidEncoded, err := currentUUID.MarshalBinary()
-	if err != nil {
-		return fmt.Errorf("unable to marshal UUID binary: %w", err)
-	}
-	t := append(timeToBytes(time.Now()), uuidEncoded...)
-	t = append(t, []byte(p.addr)...)
-	if remainSize := p.Size - len(t); remainSize > 0 {
-		t = append(t, bytes.Repeat([]byte{1}, remainSize)...)
-	}
-
-	body := &icmp.Echo{
-		ID:   p.id,
-		Seq:  p.sequence,
-		Data: t,
-	}
-
-	msg := &icmp.Message{
-		Type: conn.ICMPRequestType(),
-		Code: 0,
-		Body: body,
-	}
-
-	msgBytes, err := msg.Marshal(nil)
-	if err != nil {
-		return err
-	}
-
-	// fmt.Println("len(body.Data)=", len(body.Data))
-	// fmt.Println("len(msgBytes)=", len(msgBytes))
-
-	for {
-
-		if _, err := conn.WriteTo(msgBytes, dst); err != nil {
-			if neterr, ok := err.(*net.OpError); ok {
-				if neterr.Err == syscall.ENOBUFS {
-					fmt.Println("发送失败,重发")
-					continue
-				}
-			}
-			return err
-		}
-		handler := p.OnSend
-		if handler != nil {
-			outPkt := &Packet{
-				Nbytes: len(msgBytes),
-				IPAddr: p.ipaddr,
-				Addr:   p.addr,
-				Seq:    p.sequence,
-				ID:     p.id,
-			}
-			handler(outPkt)
-		}
-		// mark this sequence as in-flight
-		p.awaitingSequences[currentUUID][p.sequence] = struct{}{}
-		p.PacketsSent++
-		p.sequence++
-		if p.sequence > 65535 {
-			newUUID := uuid.New()
-			p.trackerUUIDs = append(p.trackerUUIDs, newUUID)
-			p.awaitingSequences[newUUID] = make(map[int]struct{})
-			p.sequence = 0
-		}
-		break
-	}
-
-	return nil
-}
-
-func (p *Pinger) listen() (packetConn, error) {
-	var (
-		conn packetConn
-		err  error
-	)
-
-	conn, err = mpconn.listen()
-
-	if err != nil {
-		p.Stop()
-		return nil, err
-	}
-	return conn, nil
-}
-
 func bytesToTime(b []byte) time.Time {
 	var nsec int64
 	for i := uint8(0); i < 8; i++ {
@@ -894,3 +605,7 @@ var seed = time.Now().UnixNano()
 func getSeed() int64 {
 	return atomic.AddInt64(&seed, 1)
 }
+
+func BufferUsedCount() int {
+	return len(mpconn.recvchan)
+}