package ovo import ( "bytes" "encoding/binary" "fmt" "io" "net" "os" "os/signal" "reflect" "regexp" "strconv" "strings" "sync" "sync/atomic" "syscall" "time" "unsafe" ) // 版本信息 const version = "2021.6.2.13" // 已存在的最大ID值 var maxIDNumber uint32 // 获取新的ID func newID() int { return (int)(atomic.AddUint32(&maxIDNumber, 1)) } // 共享内存,临时转换类型,更快速 func str2bytes(s string) []byte { x := (*[2]uintptr)(unsafe.Pointer(&s)) h := [3]uintptr{x[0], x[1], x[1]} return *(*[]byte)(unsafe.Pointer(&h)) } // 共享内存,临时转换类型,更快速 func bytes2str(b []byte) string { return *(*string)(unsafe.Pointer(&b)) } // 字节数格式化输出 func fmt_bytes_size(b int) string { if b >= 0 { if b < 1024 { return fmt.Sprintf("%dB", b) } if b < 1024*1024 { return fmt.Sprintf("%dKB", b/1024) } if b < 1024*1024*1024 { return fmt.Sprintf("%dMB", b/1024/1024) } return fmt.Sprintf("%dGB", b/1024/1024/1024) } return fmt.Sprintf("%d", b) } /** * find b in a 从内存a中查找匹配内存b的位置 * return: * matched index 匹配到的开始位置或-1表示没匹配到或-2表示目前没匹配到,但等待更多信息也许能够确定 * last matching index 匹配到的结束位置或没匹配到的检查点位置,为了下次从这个位置继续 * */ func FindBytes(a []byte, b []byte) (int, int) { ai := 0 for ; ai <= len(a)-len(b); ai++ { bi := 0 for ; bi < len(b) && a[ai+bi] == b[bi]; bi++ { } if bi == len(b) { //print("a中匹配到b的全部内容") return ai, ai + bi } } for ; ai < len(a); ai++ { bi := 0 for ; ai+bi < len(a) && a[ai+bi] == b[bi]; bi++ { } if ai+bi == len(a) { //print("a结尾部分能够匹配b的开头部分,即a后续内容增加后有可能能够匹配b") return -2, ai } } return -1, ai } const DefaultTraceCatalog = "FEWIDX" type Tracer struct { TRACE string module string } func NewTracer(module string) *Tracer { return &Tracer{ TRACE: Config.TRACE, module: module, } } var mutexTracerBuffer sync.Mutex var tracerBuffer bytes.Buffer func (tracer *Tracer) Trace(catalog byte, sfmt string, args ...interface{}) { if strings.Index(tracer.TRACE, string(catalog)) >= 0 { sinfo := fmt.Sprintf(sfmt, args...) sinfo = fmt.Sprintf("%s [%s] [%s] %s\r\n", time.Now().Format("2006-01-02 15:04:05.000"), tracer.module, string(catalog), sinfo) mutexTracerBuffer.Lock() defer mutexTracerBuffer.Unlock() tracerBuffer.WriteString(sinfo) go func() { mutexTracerBuffer.Lock() defer mutexTracerBuffer.Unlock() sinfo := tracerBuffer.String() if sinfo != "" { tracerBuffer.Reset() fmt.Print(sinfo) } }() } } var mutexTraceInfo sync.Mutex var nextOutputTime map[string]time.Time func (tracer *Tracer) CalmTrace(class string, delayms int, catalog byte, sfmt string, args ...interface{}) { if strings.Index(tracer.TRACE, string(catalog)) >= 0 { mutexTraceInfo.Lock() defer mutexTraceInfo.Unlock() if nextOutputTime == nil { nextOutputTime = make(map[string]time.Time) } t, o := nextOutputTime[class] if o && time.Now().Before(t) { return } sinfo := fmt.Sprintf(sfmt, args...) sinfo = fmt.Sprintf("%s [%s] [%s] %s\r\n", time.Now().Format("2006-01-02 15:04:05.000"), tracer.module, string(catalog), sinfo) fmt.Print(sinfo) nextOutputTime[class] = time.Now().Add(time.Duration(delayms) * time.Millisecond) } } /** * 消息分类匹配器,用于每个连接器的数据接收、分类、解析 */ type MessageMatcher interface { Initialize(connector *TCPConnector) MatchBegin(msgbuf []byte) (int, int) MatchEnd(msgbuf []byte) (int, int) } /** * 消息分类处理器,用于每类消息的业务处理 */ type MessageProcessor interface { MatchType(mm MessageMatcher) bool MessageProc(mm MessageMatcher, buf []byte) } var messageMatcherNameRegistry = make(map[string]reflect.Type) var messageMatcherTagRegistry = make(map[string]reflect.Type) var messageProcessorTagRegistry = make(map[string]reflect.Type) func RegisterMessageClass(tag string, message_matcher MessageMatcher, message_processor MessageProcessor) { tmm := reflect.TypeOf(message_matcher).Elem() messageMatcherNameRegistry[tmm.Name()] = tmm messageMatcherTagRegistry[tag] = tmm if message_processor != nil { tmp := reflect.TypeOf(message_processor).Elem() messageProcessorTagRegistry[tag] = tmp } } func NewMessageMatcher(mptype reflect.Type) MessageMatcher { if fmt.Sprintf("%s", mptype)[0] == '*' { //fmt.Println("mptype:", mptype) // *AutoDetectMessageMatcher mptype = mptype.Elem() //fmt.Println("mptype:", mptype) // AutoDetectMessageMatcher } mpe := reflect.New(mptype).Interface() // //fmt.Println("mpe type:", reflect.TypeOf(mpe)) // *AutoDetectMessageMatcher //fmt.Println("mpe:", mpe) // &{ map[] 0 } return mpe.(MessageMatcher) } func NewMessageProcessor(mptype reflect.Type) MessageProcessor { if fmt.Sprintf("%s", mptype)[0] == '*' { mptype = mptype.Elem() } mpe := reflect.New(mptype).Interface() // return mpe.(MessageProcessor) } /** * 默认消息匹配器 */ type DefaultMessageMatcher struct { connector *TCPConnector } func (matcher *DefaultMessageMatcher) Initialize(connector *TCPConnector) { matcher.connector = connector } func (matcher *DefaultMessageMatcher) MatchBegin(buf []byte) (int, int) { return 0, 1 } func (matcher *DefaultMessageMatcher) MatchEnd(buf []byte) (int, int) { return len(buf), len(buf) } type MessageMatcherDetector struct { *Tracer default_message_matcher MessageMatcher connector *TCPConnector message_matcher MessageMatcher message_processor MessageProcessor } func NewMessageMatcherDetector(connector *TCPConnector) *MessageMatcherDetector { return &MessageMatcherDetector{ Tracer: NewTracer(connector.module), default_message_matcher: &DefaultMessageMatcher{}, connector: connector, message_matcher: nil, message_processor: nil, } } func (detector *MessageMatcherDetector) Detect(buf []byte) (MessageMatcher, MessageProcessor) { if detector.message_matcher == nil { maxklen := 0 for k := range messageMatcherTagRegistry { if len(k) > maxklen { maxklen = len(k) } } for k := range messageMatcherTagRegistry { n, _ := FindBytes(buf, str2bytes(k)) if n == 0 { mmtype := messageMatcherTagRegistry[k] message_matcher := NewMessageMatcher(mmtype) message_matcher.Initialize(detector.connector) mptype := messageProcessorTagRegistry[k] if mptype != nil { detector.message_processor = NewMessageProcessor(mptype) } detector.message_matcher = message_matcher } if n == -2 && len(k) > maxklen { maxklen = len(k) } } if detector.message_matcher == nil { if len(buf) > maxklen { message_matcher := detector.default_message_matcher message_matcher.Initialize(detector.connector) detector.message_matcher = message_matcher } else { return nil, nil } } detector.Trace('D', "消息匹配: %s", reflect.TypeOf(detector.message_matcher).Elem()) } return detector.message_matcher, detector.message_processor } type MessageClassifier struct { id string connector *TCPConnector message_matcher MessageMatcher message_processor MessageProcessor detector *MessageMatcherDetector mutex_readbuf_off sync.Mutex // 保护对message_readbuf_off的操作 message_readbuf_off int message_begin_index int record_count int bytes_count int dropped_bytes int buffer_bytes int break_processing bool time_start time.Time } func (msgclassifier *MessageClassifier) Initialize(id string, connector *TCPConnector, message_matcher MessageMatcher, msgproc MessageProcessor) { msgclassifier.id = id msgclassifier.connector = connector msgclassifier.message_readbuf_off = 0 msgclassifier.message_begin_index = -1 msgclassifier.record_count = 0 msgclassifier.bytes_count = 0 msgclassifier.dropped_bytes = 0 msgclassifier.buffer_bytes = 0 msgclassifier.break_processing = false msgclassifier.message_processor = msgproc if message_matcher == nil { msgclassifier.detector = NewMessageMatcherDetector(connector) } else { msgclassifier.message_matcher = message_matcher msgclassifier.message_matcher.Initialize(msgclassifier.connector) msgclassifier.id = fmt.Sprintf("%s#%s", reflect.TypeOf(message_matcher).Elem(), msgclassifier.id) if msgclassifier.message_processor != nil && !msgclassifier.message_processor.MatchType(msgclassifier.message_matcher) { msgclassifier.message_processor = nil } } msgclassifier.time_start = time.Now() } /** * 返回已经处理完成的数据指针位置和最后匹配的数据块 */ func (msgclassifier *MessageClassifier) Match(buf []byte) (int, []byte) { msgclassifier.mutex_readbuf_off.Lock() defer msgclassifier.mutex_readbuf_off.Unlock() // 从上次未处理完的位置继续 procbuf := buf[msgclassifier.message_readbuf_off:] if Config.TOVOT_DEBUG > 0 { msgclassifier.connector.Trace('X', "[%s] buf %d %d", msgclassifier.id, procbuf, msgclassifier.message_begin_index) } if msgclassifier.message_begin_index < 0 { // 匹配消息开始标记 if msgclassifier.message_matcher == nil { message_matcher, message_processor := msgclassifier.detector.Detect(buf) if message_matcher == nil { // detector会有DefaultMessageMatcher保底 // 返回空只有一种可能:数据太少暂时无法确定消息匹配器,需要等待接收更多数据 // 此时 msgclassifier.message_readbuf_off==0 return msgclassifier.message_readbuf_off, nil } msgclassifier.message_matcher = message_matcher if msgclassifier.message_processor == nil || !msgclassifier.message_processor.MatchType(message_matcher) { // 运行时参数中没有指定消息处理器,则由detector根据消息类型确定 msgclassifier.message_processor = message_processor } msgclassifier.id = fmt.Sprintf("%s#%s", reflect.TypeOf(msgclassifier.message_matcher).Elem(), msgclassifier.id) } begin_flag_index, next_index := msgclassifier.message_matcher.MatchBegin(procbuf) if begin_flag_index >= 0 { if Config.TOVOT_DEBUG > 0 { msgclassifier.connector.Trace('D', "[%s] match begin %d %d", msgclassifier.id, begin_flag_index, procbuf) } // 从匹配消息开始标记位置开始处理,丢弃之前不匹配(不能识别的格式)的数据 msgclassifier.message_begin_index = begin_flag_index procbuf = procbuf[begin_flag_index:] msgclassifier.message_readbuf_off += begin_flag_index msgclassifier.dropped_bytes += begin_flag_index } else { // 忽略不匹配(不能识别的格式)的数据 if Config.TOVOT_DEBUG > 0 { msgclassifier.connector.Trace('X', "[%s] not match begin %d %d", msgclassifier.id, next_index, procbuf) } msgclassifier.message_readbuf_off += next_index msgclassifier.dropped_bytes += next_index return msgclassifier.message_readbuf_off, nil } } // 此时procbuf内容已匹配消息开始标记,相对起始位置为0 // 如果不能匹配消息结束标记,对所有变量 if Config.TOVOT_DEBUG > 0 { msgclassifier.connector.Trace('X', "[%s] parse bytes %d", msgclassifier.id, len(procbuf)) } // 匹配消息结束标记 end_flag_index, msg_end_index := msgclassifier.message_matcher.MatchEnd(procbuf) if end_flag_index >= 0 { if Config.TOVOT_DEBUG > 0 { msgclassifier.connector.Trace('D', "[%s] match end %d %d %d", msgclassifier.id, end_flag_index, msg_end_index, procbuf[end_flag_index:msg_end_index]) } // 消息匹配完整,返回处理数据 msgbuf := procbuf[:msg_end_index] msgclassifier.message_begin_index = -1 msgclassifier.message_readbuf_off += msg_end_index return msgclassifier.message_readbuf_off, msgbuf } if Config.TOVOT_DEBUG > 0 { msgclassifier.connector.Trace('D', "[%s] not match end %d", msgclassifier.id, procbuf) } // 没有匹配到消息结束标记,继续等待完整的数据 return msgclassifier.message_readbuf_off, nil } /** * 返回已经处理完成的数据指针位置,-1表示数据处理过程终止(数据缓冲区扩容失败,丢弃部分数据,数据指针位置被重置) */ func (msgclassifier *MessageClassifier) Proc(buf []byte) int { done_buf_off, msgbuf := msgclassifier.Match(buf) if msgbuf != nil { if msgclassifier.message_processor != nil { msgclassifier.message_processor.MessageProc(msgclassifier.message_matcher, msgbuf) } msgclassifier.bytes_count += len(msgbuf) msgclassifier.record_count++ msgclassifier.buffer_bytes = len(buf) conn_time := time.Now().Sub(msgclassifier.time_start) msgclassifier.connector.CalmTrace(msgclassifier.id+".records", 1000, 'D', "[%s] 连接持续 %v, 已处理 %d records, %d bytes, dropped %d bytes, buffer %d bytes", msgclassifier.id, conn_time, msgclassifier.record_count, msgclassifier.bytes_count, msgclassifier.dropped_bytes, msgclassifier.buffer_bytes) } if msgclassifier.break_processing { return -1 } return done_buf_off } // 数据缓冲区前滚 func (msgclassifier *MessageClassifier) ReadBufferRollForward(readbuf_off int, dropped bool) { if Config.TOVOT_DEBUG > 0 { msgclassifier.connector.Trace('X', "[%s] ReadBufferRollForward message_readbuf_off=%d, readbuf_off=%d, dropped=%v", msgclassifier.id, msgclassifier.message_readbuf_off, readbuf_off, dropped) } msgclassifier.mutex_readbuf_off.Lock() defer msgclassifier.mutex_readbuf_off.Unlock() msgclassifier.message_begin_index = -1 msgclassifier.message_readbuf_off -= readbuf_off if msgclassifier.message_readbuf_off < 0 { msgclassifier.message_readbuf_off = 0 } if dropped { msgclassifier.dropped_bytes += readbuf_off msgclassifier.break_processing = true } } // 销毁数据分类器 func (msgclassifier *MessageClassifier) Destory() { if msgclassifier.connector != nil { msgclassifier.connector.Trace('D', "[%s] 销毁数据分类器", msgclassifier.id) } msgclassifier.message_begin_index = -1 msgclassifier.message_readbuf_off = 0 } type MultiMessageClassifier struct { message_classifier []*MessageClassifier } func (mmc *MultiMessageClassifier) Initialize(connector *TCPConnector, message_matcher []MessageMatcher, msgproc []MessageProcessor) { if len(message_matcher) != len(msgproc) { panic("MultiMessageClassifier.Initialize Error: len(message_matcher) != len(msgproc)") } message_classifier_count := len(message_matcher) message_classifier := make([]*MessageClassifier, message_classifier_count) if message_classifier_count > 0 { for i := 0; i < message_classifier_count; i++ { message_classifier[i] = &MessageClassifier{} message_classifier[i].Initialize(fmt.Sprintf("_%d", newID()), connector, message_matcher[i], msgproc[i]) } } else { message_classifier = make([]*MessageClassifier, 1) message_classifier[0] = &MessageClassifier{} message_classifier[0].Initialize(fmt.Sprintf("_%d", newID()), connector, nil, nil) } mmc.message_classifier = message_classifier } func (mmc *MultiMessageClassifier) receiverProcess(buf []byte) int { done_buffer_off := len(buf) //已经处理完成的数据偏移量 message_classifier := mmc.message_classifier message_classifier_count := len(message_classifier) for i := 0; i < message_classifier_count; i++ { last_donebufoff, curdonebufoff := -1, 0 for last_donebufoff != curdonebufoff && curdonebufoff >= 0 && curdonebufoff < len(buf) { last_donebufoff = curdonebufoff curdonebufoff = message_classifier[i].Proc(buf) } // 取多个分类器中最小的已经处理完成的数据偏移量 if curdonebufoff < done_buffer_off { done_buffer_off = curdonebufoff } } return done_buffer_off } func (mmc *MultiMessageClassifier) shiftBuffer(shif_buffer_off int, dropped bool) { message_classifier := mmc.message_classifier message_classifier_count := len(message_classifier) for i := 0; i < message_classifier_count; i++ { message_classifier[i].ReadBufferRollForward(shif_buffer_off, dropped) } } func (mmc *MultiMessageClassifier) Destory() { for i := 0; i < len(mmc.message_classifier); i++ { mmc.message_classifier[i].Destory() } } type TCPReceiverProcessor struct { MultiMessageClassifier } func (recvproc *TCPReceiverProcessor) Initialize(connector *TCPConnector, message_matcher MessageMatcher, msgproc MessageProcessor) { recvproc.MultiMessageClassifier.Initialize(connector, []MessageMatcher{message_matcher}, []MessageProcessor{msgproc}) } /* * TCPConnector 接收器,用于接收 TCP 连接请求 */ type TCPConnector struct { *Tracer id string conn net.Conn closed bool init_buf_size int max_buf_size int SendPromptMessage string mu_conn sync.Mutex remote_addr string local_addr string receiver_processor *TCPReceiverProcessor } /** * 连接器 */ func NewTCPConnector(id string, conn net.Conn, in_out string, init_buf_size, max_buf_size int) *TCPConnector { remote_addr := conn.RemoteAddr().String() local_addr := conn.LocalAddr().String() trace_module := fmt.Sprintf("%s@%s%s%s", id, local_addr, in_out, remote_addr) if init_buf_size < 64 { init_buf_size = 64 } connector := &TCPConnector{ Tracer: NewTracer(trace_module), id: id, conn: conn, closed: false, init_buf_size: init_buf_size, max_buf_size: max_buf_size, remote_addr: remote_addr, local_addr: local_addr, receiver_processor: nil, // 单工发送器不需要信息接收处理器 } return connector } func (connector *TCPConnector) IsClosed() bool { return connector == nil || connector.conn == nil || connector.closed } func (connector *TCPConnector) Close() { connector.Trace('D', "%s 准备关闭连接", connector.id) // 立即关闭,以备重连 func() { connector.mu_conn.Lock() defer connector.mu_conn.Unlock() if connector.receiver_processor != nil { connector.receiver_processor.Destory() } if connector.conn != nil { connector.closed = true connector.conn.Close() connector.conn = nil connector.Trace('D', "%s 连接器已关闭", connector.id) } }() } func (connector *TCPConnector) Send(data []byte) (int, error) { if connector.conn != nil { n, err := connector.conn.Write(data) if err != nil { connector.Trace('D', "Error send: %s", err.Error()) return 0, err } return n, err } return 0, nil } func (connector *TCPConnector) sendStartMessage() (int, error) { if len(connector.SendPromptMessage) > 0 { n, err := connector.Send([]byte(connector.SendPromptMessage)) if err != nil { connector.Trace('D', "Error send message: %s", err.Error()) } else { connector.Trace('D', "连接开始提示信息发送成功") } return n, err } return 0, nil } type TCPReceiveBuffer struct { connector *TCPConnector recvbuf []byte recvlimit int // 接收数据的buffer偏移量的最大值,一般为len(recvbuf),当数据处理未完成且进行了缓冲区重组后,该限值可能为last_readoff recvoff int // 接收数据的buffer偏移量 readoff int // 读取数据的buffer偏移量 last_recvoff int // 上一次接收数据的buffer偏移量 last_readoff int // 上一次读取数据的buffer偏移量 break_processing bool // recvbuf扩容失败,丢弃部分数据,会导致数据处理过程被截断 last_proc_done bool // 上次的处理过程是否结束 mutex_readoff sync.Mutex buf_size_ok_time time.Time } func (trb *TCPReceiveBuffer) Initialize(connector *TCPConnector) { trb.connector = connector trb.recvbuf = make([]byte, connector.init_buf_size) trb.recvlimit = len(trb.recvbuf) trb.recvoff = 0 // 接收数据的buffer偏移量 trb.readoff = 0 // 读取数据的buffer偏移量 trb.last_recvoff = -1 // 上一次接收数据的buffer偏移量 trb.last_readoff = -1 // 上一次读取数据的buffer偏移量 trb.break_processing = false // recvbuf扩容失败,丢弃部分数据,会导致数据处理过程被截断 trb.last_proc_done = true // 上次的处理过程是否结束 trb.buf_size_ok_time = time.Now() } func (trb *TCPReceiveBuffer) getRecvBuffer() []byte { // 缓冲区数据重组后,原来buf中的数据可能还在处理中,为避免新接收数据覆盖到未处理完的数据,需要限制接收数据的范围 ei := trb.recvoff + trb.connector.init_buf_size if ei > trb.recvlimit { ei = trb.recvlimit } return trb.recvbuf[trb.recvoff:ei] } func (trb *TCPReceiveBuffer) BufferReceive() (int, error) { // 接收数据仅改变recvoff以后的recvbuf内容 n, err := trb.connector.conn.Read(trb.getRecvBuffer()) trb.recvoff += n if n > 0 { if Config.TOVOT_DEBUG > 0 { trb.connector.Trace('X', "read bytes %d", n) } } return n, err } // 获取新的待处理buffer,只需要处理最后更新的procbuf func (trb *TCPReceiveBuffer) getProcBuffer() []byte { trb.mutex_readoff.Lock() defer trb.mutex_readoff.Unlock() if Config.TOVOT_DEBUG > 0 { trb.connector.Trace('X', "准备处理 readoff=%d, last_readoff=%d, recvoff=%d, last_recvoff=%d", trb.readoff, trb.last_readoff, trb.recvoff, trb.last_recvoff) } if trb.last_readoff != trb.readoff || trb.last_recvoff != trb.recvoff { trb.last_readoff = trb.readoff trb.last_recvoff = trb.recvoff buf := trb.recvbuf[trb.last_readoff:trb.last_recvoff] if Config.TOVOT_DEBUG > 0 { trb.connector.Trace('X', "处理开始 last_readoff=%d, last_recvoff=%d, len(buf)=%d", trb.last_readoff, trb.last_recvoff, len(buf)) } trb.break_processing = false return buf } return nil } func (trb *TCPReceiveBuffer) BufferProcess(buf []byte) { // 数据处理内部不会改变buf的内容,返回已处理完成的数据相对于buf的指针位置 // 数据接收只会追加recvbuf后续内容,不影响buf内容,即使recvbuf扩容,也不会改变,因此可以在独立线程中并行数据处理 // recvbuf扩容失败,丢弃部分数据,需要对数据处理起始位置指针进行操作,需要避免冲突,msgclassifier.mutex_readbuf_off // recvbuf扩容失败,丢弃部分数据,会导致数据处理过程被截断,返回 doneoff==-1 doneoff := trb.connector.receiver_processor.receiverProcess(buf) // 更新数据指针位置 func() { trb.mutex_readoff.Lock() defer trb.mutex_readoff.Unlock() if doneoff >= 0 && !trb.break_processing { trb.connector.receiver_processor.shiftBuffer(doneoff, false) trb.readoff += doneoff if Config.TOVOT_DEBUG > 0 { trb.connector.Trace('X', "处理完成 readoff=%d, doneoff=%d, len(buf)=%d", trb.readoff, doneoff, len(buf)) } } trb.last_readoff = -1 trb.recvlimit = len(trb.recvbuf) }() } //over the Reorg level func (trb *TCPReceiveBuffer) OverReorgLevel() bool { if trb.recvoff+trb.connector.init_buf_size < trb.recvlimit/16 { if time.Now().Sub(trb.buf_size_ok_time) > time.Duration(1*time.Second) { return true } return false } else { trb.buf_size_ok_time = time.Now() } return trb.recvoff > len(trb.recvbuf)/2 && trb.recvoff+trb.connector.init_buf_size >= trb.recvlimit } func (trb *TCPReceiveBuffer) BufferReorg() { new_recvbuf := trb.recvbuf // 根据需要扩容一倍,达到最大值后丢弃一半数据 new_buf_size := trb.BufferExpansion() if new_buf_size > 0 { new_recvbuf = make([]byte, new_buf_size) } func() { trb.mutex_readoff.Lock() defer trb.mutex_readoff.Unlock() if Config.TOVOT_DEBUG > 0 { trb.connector.Trace('X', "重组未处理完成数据前 readoff=%d, recvoff=%d, recvlimit=%d", trb.readoff, trb.recvoff, trb.recvlimit) } keepbuf := trb.recvbuf[trb.readoff:trb.recvoff] // 调整数据指针 trb.recvoff -= trb.readoff trb.readoff = 0 // 确定接收buffer上限 if new_buf_size > 0 { // 扩容、重新分配内存后,正在处理中的数据不会被覆盖 trb.recvlimit = new_buf_size } else if trb.last_readoff > trb.recvoff { // 防止正在处理中的数据被覆盖 trb.recvlimit = trb.last_readoff } else if trb.last_readoff >= 0 { panic(fmt.Sprintf("扩容或丢数操作没起作用? last_readoff=%d, recvoff=%d, recvlimit=%d, len(recvbuf)=%d, new_buf_size=%d", trb.last_readoff, trb.recvoff, trb.recvlimit, len(trb.recvbuf), new_buf_size)) } // 移动少量未处理完的数据 copy(new_recvbuf, keepbuf) trb.recvbuf = new_recvbuf // buf == recvbuf[readoff:recvoff] if Config.TOVOT_DEBUG > 0 { trb.connector.Trace('X', "重组未处理完成数据后 readoff=%d, recvoff=%d, recvlimit=%d", trb.readoff, trb.recvoff, trb.recvlimit) } }() } /** * 返回新的接收缓冲区和最终需要保留的缓冲区 */ func (trb *TCPReceiveBuffer) BufferExpansion() (new_buf_size int) { trb.mutex_readoff.Lock() defer trb.mutex_readoff.Unlock() if trb.recvoff+trb.connector.init_buf_size < trb.recvlimit/16 { // 数据接收空间过剩 new_buf_size = (len(trb.recvbuf) - trb.connector.init_buf_size) * 8 / 10 / trb.connector.init_buf_size * trb.connector.init_buf_size trb.connector.Trace('I', "数据接收空间过剩,收缩至 %s", fmt_bytes_size(new_buf_size)) return new_buf_size } // 数据接收空间不足 trb.recvoff > len(trb.recvbuf)/2 && trb.recvoff+trb.connector.init_buf_size >= trb.recvlimit // 准备数据重组 if trb.recvoff-trb.readoff > len(trb.recvbuf)/4 { // 缓存内容过多 trb.recvoff-trb.readoff > len(trb.recvbuf)/4 // 或缓存数据无法移动 trb.recvoff-trb.readoff+trb.connector.init_buf_size >= trb.readoff // 需要扩容 if len(trb.recvbuf) >= trb.connector.max_buf_size { // 已经达到缓冲区最大尺寸 if trb.recvoff-trb.readoff+trb.connector.init_buf_size < trb.readoff { // 可以通过重组腾出一定的接收空间 return -1 } // 缓存数据无法有效移动 if trb.recvoff-trb.readoff+trb.connector.init_buf_size > len(trb.recvbuf) { truncate_size := trb.recvoff - trb.readoff - trb.connector.init_buf_size // 剩余接收空间不足 trb.connector.Trace('E', "达到最大缓冲区尺寸 %s>=%s,数据缓存过半,数据前滚,丢弃旧数据 %dbytes", fmt_bytes_size(len(trb.recvbuf)), fmt_bytes_size(trb.connector.max_buf_size), truncate_size) // trb.connector.Trace('F', "每个连接 %s Buffer 还不够,需要考虑调整一下处理方法了", fmt_bytes_size(len(trb.recvbuf))) // 直接移动数据起始位置指针,丢弃部分数据 // 直接移动数据起始位置指针,需要避免与数据处理过程对数据起始位置指针的操作产生冲突,msgclassifier.mutex_readbuf_off // truncate trb.connector.receiver_processor.shiftBuffer(truncate_size, true) trb.readoff += truncate_size trb.last_readoff = trb.readoff trb.break_processing = true return -1 } // 还有足够空间,可以分配新内存块,重组后继续使用 return len(trb.recvbuf) } // 扩容 new_buf_size = (len(trb.recvbuf) + trb.connector.init_buf_size) * 14 / 10 / trb.connector.init_buf_size * trb.connector.init_buf_size if new_buf_size > trb.connector.max_buf_size { new_buf_size = trb.connector.max_buf_size } trb.connector.Trace('I', "数据堆积过多,扩容至 %s", fmt_bytes_size(new_buf_size)) return new_buf_size } // 使用现有空间重组 return -1 } func (connector *TCPConnector) Receive(message_matcher MessageMatcher, msgproc MessageProcessor) error { n, err := connector.sendStartMessage() connector.receiver_processor = &TCPReceiverProcessor{} connector.receiver_processor.Initialize(connector, message_matcher, msgproc) // 数据接收buffer trb := &TCPReceiveBuffer{} trb.Initialize(connector) connector.Trace('D', "数据接收缓冲区就绪") var wg_proc sync.WaitGroup ch_active_proc := make(chan bool) // 缓冲区后台并行处理 go func() { for <-ch_active_proc { func() { defer wg_proc.Done() buf := trb.getProcBuffer() if buf != nil { trb.BufferProcess(buf) } }() } }() connector.Trace('D', "数据接收缓冲区并行处理就绪") for connector.conn != nil && err == nil { n, err = trb.BufferReceive() if n > 0 { wg_proc.Add(1) go func() { ch_active_proc <- true }() } if trb.OverReorgLevel() { trb.BufferReorg() } } connector.Trace('D', "等待数据接收缓冲区并行处理结束") wg_proc.Wait() ch_active_proc <- false connector.Trace('D', "数据接收结束") if !connector.IsClosed() && err != nil && err != io.EOF { connector.Trace('D', "%s", err.Error()) return err } return nil } type TCPSender struct { *Tracer //----配置信息 id string remote_host string remote_port int timeout int //----基本信息 connector *TCPConnector } func NewTCPSender(id string, remote_host string, remote_port int) *TCPSender { sender := &TCPSender{ Tracer: NewTracer(id), id: id, remote_host: remote_host, remote_port: remote_port, timeout: 3, connector: nil, } sender.reconnect() return sender } func (sender *TCPSender) reconnect() error { sender.Close() remote_addr := fmt.Sprintf("%s:%d", sender.remote_host, sender.remote_port) conn, err := net.DialTimeout("tcp", remote_addr, time.Duration(sender.timeout)*time.Second) if err != nil { return err } sender.connector = NewTCPConnector(sender.id, conn, "->", 0, 0) sender.module = sender.connector.module sender.Trace('D', "开启连接器") go func() { connector := sender.connector if connector != nil { connector.Receive(nil, nil) connector.Close() } }() return nil } func (sender *TCPSender) SendRetry(data []byte, retry int) (int, error) { if sender.connector.IsClosed() { err := sender.reconnect() if err != nil { return 0, err } } n, err := sender.connector.Send(data) if err != nil && retry > 0 { sender.connector.Close() n, err = sender.SendRetry(data, retry-1) } return n, err } func (sender *TCPSender) Send(data []byte) (int, error) { return sender.SendRetry(data, 1) } func (sender *TCPSender) Close() { if sender.connector != nil { sender.connector.Close() } } type Configuration struct { ID string ServerLocation string ServerPort int InitRecvBufSize int MaxRecvBufSize int ConnectorGrowth int SendPromptMessage string TOVOT_DEBUG int TRACE string } func (config *Configuration) get(cfg map[string]string, key string, default_value string) string { v, o := cfg[key] if o { return v } return default_value } func (config *Configuration) getInt(cfg map[string]string, key string, default_value int) int { sv, so := cfg[key] if so { iv, err := strconv.Atoi(sv) if err == nil { return iv } } return default_value } func (config *Configuration) Initialize(cfg map[string]string) { config.ID = config.get(cfg, "ID", fmt.Sprintf("ID_%d", newID())) config.ServerLocation = config.get(cfg, "ServerLocation", "") config.ServerPort = config.getInt(cfg, "ServerPort", 9999) config.InitRecvBufSize = config.getInt(cfg, "InitRecvBufSize", 1024) config.MaxRecvBufSize = config.getInt(cfg, "MaxRecvBufSize", 1024*1024*128) config.ConnectorGrowth = config.getInt(cfg, "ConnectorGrowth", 2) config.SendPromptMessage = config.get(cfg, "SendPromptMessage", "` enter interactive mode\n") config.TOVOT_DEBUG = config.getInt(cfg, "TOVOT_DEBUG", 0) config.TRACE = config.get(cfg, "TRACE", DefaultTraceCatalog) } func (config *Configuration) InitWithDefaultConfig(defaultConfig *Configuration) { if config.ID == "" { config.ID = defaultConfig.ID } if config.ServerLocation == "" { config.ServerLocation = defaultConfig.ServerLocation } if config.ServerPort == 0 { config.ServerPort = defaultConfig.ServerPort } if config.InitRecvBufSize == 0 { config.InitRecvBufSize = defaultConfig.InitRecvBufSize } if config.MaxRecvBufSize == 0 { config.MaxRecvBufSize = defaultConfig.MaxRecvBufSize } if config.ConnectorGrowth == 0 { config.ConnectorGrowth = defaultConfig.ConnectorGrowth } if config.SendPromptMessage == "" { config.SendPromptMessage = defaultConfig.SendPromptMessage } if config.TOVOT_DEBUG == 0 { config.TOVOT_DEBUG = defaultConfig.TOVOT_DEBUG } if config.TRACE == "" { config.TRACE = defaultConfig.TRACE } } /* * TPCReceiver 接收器,用于接收 TCP 连接请求 */ type TCPReceiver struct { *Tracer //----配置信息 host string //接收器的地址 port int //接收器监听端口 message_matcher MessageMatcher //消息匹配器,用于每个连接器的数据接收、分类、解析,设为nil时由Detector根据首次收到的信息自动识别确定 msgproc MessageProcessor //消息处理函数,用于调用者对收到的消息进行业务处理 connector_growth int //每次预设备用连接器增长数量 init_buf_size int //初始接收数据缓冲区尺寸 max_buf_size int //最大接收数据缓冲区尺寸 SendPromptMessage string //连接开始时主动发送给客户端的提示信息 //----基本信息 addr string listener net.Listener //----监控信息 max_conn_id int //单向增长的连接ID cur_conn_count int32 //当前备用连接器数量 prepare_conn_count int32 //预设备用连接器数量 conn_chan chan net.Conn //用于启动连接器的TCP连接管道 connectors sync.Map //用于缓存运行中的连接器 wg_conn_close sync.WaitGroup //用于控制关闭顺序 } func NewTCPReceiver(config *Configuration, message_matcher MessageMatcher, msgproc MessageProcessor) *TCPReceiver { hostname := config.ServerLocation if hostname == "" { hostname, _ = os.Hostname() } addr := fmt.Sprintf("%s@%s:%d", config.ID, hostname, config.ServerPort) receiver := &TCPReceiver{ Tracer: NewTracer(addr), host: config.ServerLocation, port: config.ServerPort, message_matcher: message_matcher, msgproc: msgproc, connector_growth: config.ConnectorGrowth, //每次增加备用连接器数量 init_buf_size: config.InitRecvBufSize, max_buf_size: config.MaxRecvBufSize, SendPromptMessage: "` enter interactive mode\n", addr: addr, listener: nil, max_conn_id: 0, cur_conn_count: 0, prepare_conn_count: 0, conn_chan: make(chan net.Conn), } return receiver } func (receiver *TCPReceiver) accept(connID string, conn net.Conn) { connector := NewTCPConnector(connID, conn, "<-", receiver.init_buf_size, receiver.max_buf_size) connector.SendPromptMessage = receiver.SendPromptMessage receiver.connectors.Store(connector.id, connector) defer func() { connector.Close() receiver.connectors.Delete(connector.id) atomic.AddInt32(&receiver.cur_conn_count, -1) }() connector.Receive(receiver.message_matcher, receiver.msgproc) } func (receiver *TCPReceiver) ConnectorPrepare(count int) { receiver.Trace('D', "增加 %d 备用连接器", count) var wg_connector_prepare sync.WaitGroup for i := 0; i < count; i++ { wg_connector_prepare.Add(1) go func(connid int) { receiver.wg_conn_close.Add(1) atomic.AddInt32(&receiver.prepare_conn_count, +1) connID := fmt.Sprintf("connector_%d", connid) receiver.Trace('D', "备用连接器 %s 准备就绪", connID) wg_connector_prepare.Done() for conn := range receiver.conn_chan { if conn != nil { receiver.Trace('D', "备用连接器 %s 开启", connID) atomic.AddInt32(&receiver.cur_conn_count, +1) receiver.Trace('D', "当前连接数: %d", int(atomic.LoadInt32(&receiver.cur_conn_count))) go func() { if receiver.listener != nil && int(atomic.LoadInt32(&receiver.cur_conn_count)) > int(atomic.LoadInt32(&receiver.prepare_conn_count))-receiver.connector_growth { receiver.ConnectorPrepare(receiver.connector_growth) } }() receiver.accept(connID, conn) receiver.Trace('D', "当前连接数: %d", int(atomic.LoadInt32(&receiver.cur_conn_count))) receiver.Trace('D', "备用连接器 %s 关闭", connID) } if receiver.listener == nil || int(atomic.LoadInt32(&receiver.cur_conn_count)) < int(atomic.LoadInt32(&receiver.prepare_conn_count))-2*receiver.connector_growth { break } } receiver.Trace('D', "备用连接器 %s 销毁", connID) atomic.AddInt32(&receiver.prepare_conn_count, -1) receiver.wg_conn_close.Done() }(receiver.max_conn_id) receiver.max_conn_id++ } wg_connector_prepare.Wait() } func (receiver *TCPReceiver) Start() error { receiver.Trace('D', "开始[ver:%s]", version) receiver.Trace('D', "预设备用接收器") receiver.ConnectorPrepare(receiver.connector_growth) receiver.Trace('D', "开启监听端口") listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", receiver.host, receiver.port)) if err != nil { receiver.Trace('D', "监听端口开启错误: %s", err.Error()) return err } receiver.Trace('D', "监听端口已开启") receiver.listener = listener go func() { receiver.Trace('D', "准备接收连接") for { listener = receiver.listener if listener == nil { receiver.Trace('D', "端口已关闭") break } conn, err := listener.Accept() if err != nil { receiver.Trace('D', "连接接收错误: %s", err.Error()) } else { receiver.conn_chan <- conn } } receiver.Trace('D', "关闭所有连接器") receiver.connectors.Range(func(connID interface{}, connector interface{}) bool { connector.(*TCPConnector).Close() return true }) receiver.Trace('D', "销毁备用连接器") for n := int(atomic.LoadInt32(&receiver.prepare_conn_count)); n > 0; n-- { receiver.conn_chan <- nil } }() return nil } func (receiver *TCPReceiver) IsAlive() bool { return receiver.listener != nil } func (receiver *TCPReceiver) Stop() { listener := receiver.listener receiver.listener = nil receiver.Trace('D', "关闭监听端口") listener.Close() receiver.wg_conn_close.Wait() receiver.Trace('D', "结束") } /** * 交互命令匹配器 */ type CommandMatcher struct { *Tracer connector *TCPConnector mode int welcome int } func (matcher *CommandMatcher) Initialize(connector *TCPConnector) { matcher.Tracer = NewTracer(connector.module + "_cmdproc") matcher.connector = connector matcher.mode = 0 matcher.welcome = 0 } func (matcher *CommandMatcher) MatchBegin(buf []byte) (int, int) { if matcher.mode != 0 { return 0, 1 } gi, fi := FindBytes(buf, []byte("`")) if gi >= 0 { matcher.mode = 1 } return gi, fi } func (matcher *CommandMatcher) MatchEnd(buf []byte) (int, int) { //matcher.Trace('X', "MatchEnd: %d", buf) return FindBytes(buf, []byte("\n")) } /** * EIF格式编码器 */ type EIFFormator struct { } /** * EIF格式消息头 */ type EIFHeader struct { start [8]byte reserved_1 int32 reserved_2 int32 reserved_3 int32 reserved_4 int32 reserved_5 int32 body_size int32 bytes_321 int32 } /** * EIF格式消息体 */ type EIFBody struct { three byte class string semicolon string kv_pairs string end string newline string one byte } /** * EIF格式错误 */ type EIFFormatError string func (eiffe EIFFormatError) Error() string { return "EIFFormat Error: " + string(eiffe) } func (*EIFFormator) EncodeSimply(class string, key_equal_value_semicolon_pairs string) []byte { str := fmt.Sprintf("%s;%sEND\n", class, key_equal_value_semicolon_pairs) bs := str2bytes(str) size := len(bs) + 2 var bb bytes.Buffer bb.WriteString(">") bb.Write(make([]byte, 20)) binary.Write(&bb, binary.BigEndian, int32(size)) binary.Write(&bb, binary.BigEndian, int32(size)) bb.WriteByte(3) bb.Write(bs) bb.WriteByte(1) return bb.Bytes() } func (formater *EIFFormator) Encode(class string, data map[string]interface{}) []byte { kvs := "" for k := range data { v := fmt.Sprintf("%v", data[k]) v = strings.Replace(v, "'", "''", -1) kvs = fmt.Sprintf("%s%s='%s';", kvs, k, v) } return formater.EncodeSimply(class, kvs) } func (formater *EIFFormator) ParseBody(buf []byte) (string, map[string]string) { class := "" in_str := byte(0) var bs bytes.Buffer n := len(buf) k := "" v := "" data := make(map[string]string) semicolon_key_value_pairs := func() { v = string(bs.Bytes()) //分号前为value bs.Reset() //清除缓存,准备接收下一个键值对 if class == "" { class = v //第一个分号前的值,class中不能包含=';,不检查格式错误 } else { data[k] = v //键值数据 } } for i := 0; i < n; { c := buf[i] i++ if in_str != 0 { if c == in_str { if i < n && buf[i] == c { bs.WriteByte(c) // 两个引号合并为一个引号 i++ } else { in_str = 0 //结束引号 } } else { bs.WriteByte(c) //引号内字符 } } else if c == '\'' || c == '"' { in_str = c //开始引号 } else if c == '=' { k = string(bs.Bytes()) //等号前为key bs.Reset() } else if c == ';' { semicolon_key_value_pairs() } else { bs.WriteByte(c) //非引号内字符 } } if bs.Len() != 0 { //格式错误,最后不是以分号结束,将错就错将所有剩余内存作为最后一个value处理 semicolon_key_value_pairs() } return class, data } func (formater *EIFFormator) ParseHeader(buf []byte) (int, []byte, error) { if len(buf) < 41 { return -1, nil, nil //数据加载中 } var eifheader EIFHeader binary.Read(bytes.NewReader(buf[28:32]), binary.BigEndian, &eifheader.body_size) binary.Read(bytes.NewReader(buf[32:36]), binary.BigEndian, &eifheader.bytes_321) //formater.Trace('X', "eifheader.bytes_321=%v", eifheader.bytes_321) if eifheader.bytes_321 != eifheader.body_size && eifheader.bytes_321 != 4 { return -1, nil, EIFFormatError(fmt.Sprintf("Data bytes_321 %d is not match body_size %d or 4", eifheader.bytes_321, eifheader.body_size)) } if eifheader.body_size < 5 { return -1, nil, EIFFormatError(fmt.Sprintf("Data body size %d<5", eifheader.body_size)) } if eifheader.body_size > 100*1024*1024 { return -1, nil, EIFFormatError(fmt.Sprintf("Data body size %d>100M", eifheader.body_size)) } if len(buf) < int(36+eifheader.body_size) { return -1, nil, nil //数据加载中 } if buf[36] != 3 || buf[36+eifheader.body_size-1] != 1 { return -1, nil, EIFFormatError(fmt.Sprintf("end point is not match, (%d!=3 or %d!=1)", buf[36], buf[36+eifheader.body_size-1])) } end_flag := bytes2str(buf[36+eifheader.body_size-6 : 36+eifheader.body_size-1]) if end_flag != ";END\n" { return -1, nil, EIFFormatError(fmt.Sprintf("end flag is not match, %s", end_flag)) } buf = buf[37 : 36+eifheader.body_size-6+1] //保留最后一个分号,方便解析 return int(36 + eifheader.body_size), buf, nil } func (formater *EIFFormator) Decode(buf []byte) (int, string, map[string]string, error) { end_index, buf, err := formater.ParseHeader(buf) //formater.Trace('D', "decode message %s", bytes2str(buf)) //内容解析 class, data := formater.ParseBody(buf) return end_index, class, data, err } /** * EIF信息匹配器 */ type EIFMessageMatcher struct { *Tracer connector *TCPConnector formator *EIFFormator msgbody []byte } func (matcher *EIFMessageMatcher) Initialize(connector *TCPConnector) { matcher.Tracer = NewTracer(connector.module) matcher.connector = connector matcher.formator = &EIFFormator{} } func (matcher *EIFMessageMatcher) MatchBegin(buf []byte) (int, int) { return FindBytes(buf, []byte(">")) } func (matcher *EIFMessageMatcher) MatchEnd(buf []byte) (int, int) { buf_size, buf, err := matcher.formator.ParseHeader(buf) if buf != nil { matcher.msgbody = buf return buf_size, buf_size } if err != nil { matcher.Trace('D', "EIFFormat MatchEnd Error: %s", err.Error()) } return -1, 0 } type EIFMessageProc func(remote_addr string, class string, data map[string]string) type EIFMessageProcessor struct { msgproc EIFMessageProc } func (eifmsg *EIFMessageProcessor) MatchType(matcher MessageMatcher) bool { _, ok := matcher.(*EIFMessageMatcher) return ok } func (eifmsg *EIFMessageProcessor) MessageProc(matcher MessageMatcher, buf []byte) { mm := matcher.(*EIFMessageMatcher) buf = make([]byte, len(mm.msgbody)) copy(buf, mm.msgbody) go func(remote_addr string, buf []byte) { class, data := mm.formator.ParseBody(buf) eifmsg.msgproc(remote_addr, class, data) }(mm.connector.remote_addr, buf) } func KVsParse(kvs ...string) map[string]string { retmap := make(map[string]string) for i := range kvs { kv := kvs[i] n := strings.Index(kv, "=") if n >= 0 { k := kv[:n] v := kv[n+1:] retmap[k] = v } else { retmap[kv] = kv } } return retmap } var Config Configuration func init() { RegisterMessageClass(">\x00", &EIFMessageMatcher{}, &EIFMessageProcessor{}) RegisterMessageClass("`\r\n", &CommandMatcher{}, &CommandProcessor{}) RegisterMessageClass("\r\n", &CommandMatcher{}, &CommandProcessor{}) config := KVsParse(os.Environ()...) for k, v := range KVsParse(os.Args...) { config[k] = v } Config.Initialize(config) } type EIFSender struct { *Tracer *Configuration tcpSender *TCPSender } func (sender *EIFSender) Send(class string, data map[string]string) (int, error) { if sender.tcpSender == nil { sender.Configuration.InitWithDefaultConfig(&Config) if sender.ID == "" { sender.ID = "eifsender" } sender.tcpSender = NewTCPSender(sender.ID, sender.ServerLocation, sender.ServerPort) } var eifformator EIFFormator dat := make(map[string]interface{}) for k, v := range data { dat[k] = v } bs := eifformator.Encode(class, dat) for i := 0; i < len(bs); { ns, err := sender.tcpSender.Send(bs) i += ns if err != nil { return i, err } } return len(bs), nil } func (sender *EIFSender) Close() { tcpSender := sender.tcpSender if tcpSender != nil { tcpSender.Close() sender.tcpSender = nil } } func EIFSend(server string, port int, class string, kvs string) { eifsender := EIFSender{ Configuration: &Configuration{ ID: "estemp", ServerLocation: server, ServerPort: port, }, } defer eifsender.Close() eifformator := &EIFFormator{} _, data := eifformator.ParseBody(str2bytes(class + ";" + kvs)) eifsender.Send(class, data) } type EIFReceiver struct { *Tracer *Configuration tcpReceiver *TCPReceiver } /** * 启动EIFReceiver,开启监听端口,处理数据接收、分类、解析,直至进程被终止 */ func (receiver *EIFReceiver) Run(eifmsgproc EIFMessageProc) { receiver.Configuration.InitWithDefaultConfig(&Config) break_signal := make(chan os.Signal, 1) signal.Notify(break_signal, os.Interrupt, os.Kill, syscall.SIGTERM) // 将通用消息处理函数转换为EIF消息处理函数 eifmsgprocessor := &EIFMessageProcessor{ msgproc: eifmsgproc, } receiver.tcpReceiver = NewTCPReceiver(receiver.Configuration, nil, eifmsgprocessor) receiver.tcpReceiver.Start() bs := <-break_signal receiver.tcpReceiver.Trace('D', "%v", bs) receiver.tcpReceiver.Stop() } /** * 判断EIFReceiver是否处于监听状态 */ func (receiver *EIFReceiver) IsRunning() bool { return receiver.tcpReceiver != nil && receiver.tcpReceiver.IsAlive() } /** * 交互命令处理,通过 telnet 127.0.0.1 9999 进行命令交互,可用于测试、监控、集成 */ type CommandProcessor struct { } func (cmd *CommandProcessor) MatchType(mmatcher MessageMatcher) bool { _, ok := mmatcher.(*CommandMatcher) return ok } func (cmd *CommandProcessor) outputHelp(mmatcher MessageMatcher) { matcher := mmatcher.(*CommandMatcher) _, err := matcher.connector.Send([]byte("----------------------\n" + "` 进入或退出命令交互模式\n" + "c|x|q|close|exit|quit 断开连接\n" + "h|help 打印帮助信息\n" + "s|send ip:port class key=value;... 发送一条EIF格式的消息,如:s 127.0.0.1:9999 test number=101;sleep=100;msg=hello world.\n" + "----------------------\n")) if err != nil { matcher.Trace('D', "Error send response: %s", err.Error()) } } func (cmd *CommandProcessor) MessageProc(mmatcher MessageMatcher, buf []byte) { matcher := mmatcher.(*CommandMatcher) if matcher.mode == 1 { buf = buf[1:] matcher.mode = 2 } m, _ := regexp.Match(`^\s*(?:help|h)\s*$`, buf) if m || matcher.welcome == 0 { cmd.outputHelp(matcher) matcher.welcome = 1 } str := bytes2str(buf[:len(buf)-2]) re_exit, err := regexp.Compile(`^\s*(?:close|c|exit|x|quit|q)\s*$`) if err != nil { matcher.Trace('D', "Regexp error: %s", err.Error()) } matcher.Trace('D', "received: %s", str) if re_exit.MatchString(str) { matcher.Trace('D', "match command exit") matcher.connector.Close() } re_send, err := regexp.Compile(`^\s*(?:send|s)\s+([^:]+):(\d+)\s+(\S*)\s+(.*)$`) if err != nil { matcher.Trace('D', "Regexp error: %s", err.Error()) } eif_send_args := re_send.FindStringSubmatch(str) if len(eif_send_args) > 4 { port, _ := strconv.Atoi(eif_send_args[2]) EIFSend(eif_send_args[1], port, eif_send_args[3], eif_send_args[4]) } m, _ = regexp.Match("^\\s*`\\s*$", buf) if m { matcher.mode = 0 } if matcher.mode != 0 { matcher.connector.Send([]byte("> ")) } }