| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548 |
- package ovo
- import (
- "bytes"
- "encoding/binary"
- "fmt"
- "io"
- "net"
- "os"
- "os/signal"
- "reflect"
- "regexp"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "syscall"
- "time"
- "unsafe"
- )
- // 版本信息
- const version = "2021.6.2.13"
- // 已存在的最大ID值
- var maxIDNumber uint32
- // 获取新的ID
- func newID() int {
- return (int)(atomic.AddUint32(&maxIDNumber, 1))
- }
- // 共享内存,临时转换类型,更快速
- func str2bytes(s string) []byte {
- x := (*[2]uintptr)(unsafe.Pointer(&s))
- h := [3]uintptr{x[0], x[1], x[1]}
- return *(*[]byte)(unsafe.Pointer(&h))
- }
- // 共享内存,临时转换类型,更快速
- func bytes2str(b []byte) string {
- return *(*string)(unsafe.Pointer(&b))
- }
- // 字节数格式化输出
- func fmt_bytes_size(b int) string {
- if b >= 0 {
- if b < 1024 {
- return fmt.Sprintf("%dB", b)
- }
- if b < 1024*1024 {
- return fmt.Sprintf("%dKB", b/1024)
- }
- if b < 1024*1024*1024 {
- return fmt.Sprintf("%dMB", b/1024/1024)
- }
- return fmt.Sprintf("%dGB", b/1024/1024/1024)
- }
- return fmt.Sprintf("%d", b)
- }
- /**
- * find b in a 从内存a中查找匹配内存b的位置
- * return:
- * matched index 匹配到的开始位置或-1表示没匹配到或-2表示目前没匹配到,但等待更多信息也许能够确定
- * last matching index 匹配到的结束位置或没匹配到的检查点位置,为了下次从这个位置继续
- *
- */
- func FindBytes(a []byte, b []byte) (int, int) {
- ai := 0
- for ; ai <= len(a)-len(b); ai++ {
- bi := 0
- for ; bi < len(b) && a[ai+bi] == b[bi]; bi++ {
- }
- if bi == len(b) {
- //print("a中匹配到b的全部内容")
- return ai, ai + bi
- }
- }
- for ; ai < len(a); ai++ {
- bi := 0
- for ; ai+bi < len(a) && a[ai+bi] == b[bi]; bi++ {
- }
- if ai+bi == len(a) {
- //print("a结尾部分能够匹配b的开头部分,即a后续内容增加后有可能能够匹配b")
- return -2, ai
- }
- }
- return -1, ai
- }
- const DefaultTraceCatalog = "FEWIDX"
- type Tracer struct {
- TRACE string
- module string
- }
- func NewTracer(module string) *Tracer {
- return &Tracer{
- TRACE: Config.TRACE,
- module: module,
- }
- }
- var mutexTracerBuffer sync.Mutex
- var tracerBuffer bytes.Buffer
- func (tracer *Tracer) Trace(catalog byte, sfmt string, args ...interface{}) {
- if strings.Index(tracer.TRACE, string(catalog)) >= 0 {
- sinfo := fmt.Sprintf(sfmt, args...)
- sinfo = fmt.Sprintf("%s [%s] [%s] %s\r\n", time.Now().Format("2006-01-02 15:04:05.000"), tracer.module, string(catalog), sinfo)
- mutexTracerBuffer.Lock()
- defer mutexTracerBuffer.Unlock()
- tracerBuffer.WriteString(sinfo)
- go func() {
- mutexTracerBuffer.Lock()
- defer mutexTracerBuffer.Unlock()
- sinfo := tracerBuffer.String()
- if sinfo != "" {
- tracerBuffer.Reset()
- fmt.Print(sinfo)
- }
- }()
- }
- }
- var mutexTraceInfo sync.Mutex
- var nextOutputTime map[string]time.Time
- func (tracer *Tracer) CalmTrace(class string, delayms int, catalog byte, sfmt string, args ...interface{}) {
- if strings.Index(tracer.TRACE, string(catalog)) >= 0 {
- mutexTraceInfo.Lock()
- defer mutexTraceInfo.Unlock()
- if nextOutputTime == nil {
- nextOutputTime = make(map[string]time.Time)
- }
- t, o := nextOutputTime[class]
- if o && time.Now().Before(t) {
- return
- }
- sinfo := fmt.Sprintf(sfmt, args...)
- sinfo = fmt.Sprintf("%s [%s] [%s] %s\r\n", time.Now().Format("2006-01-02 15:04:05.000"), tracer.module, string(catalog), sinfo)
- fmt.Print(sinfo)
- nextOutputTime[class] = time.Now().Add(time.Duration(delayms) * time.Millisecond)
- }
- }
- /**
- * 消息分类匹配器,用于每个连接器的数据接收、分类、解析
- */
- type MessageMatcher interface {
- Initialize(connector *TCPConnector)
- MatchBegin(msgbuf []byte) (int, int)
- MatchEnd(msgbuf []byte) (int, int)
- }
- /**
- * 消息分类处理器,用于每类消息的业务处理
- */
- type MessageProcessor interface {
- MatchType(mm MessageMatcher) bool
- MessageProc(mm MessageMatcher, buf []byte)
- }
- var messageMatcherNameRegistry = make(map[string]reflect.Type)
- var messageMatcherTagRegistry = make(map[string]reflect.Type)
- var messageProcessorTagRegistry = make(map[string]reflect.Type)
- func RegisterMessageClass(tag string, message_matcher MessageMatcher, message_processor MessageProcessor) {
- tmm := reflect.TypeOf(message_matcher).Elem()
- messageMatcherNameRegistry[tmm.Name()] = tmm
- messageMatcherTagRegistry[tag] = tmm
- if message_processor != nil {
- tmp := reflect.TypeOf(message_processor).Elem()
- messageProcessorTagRegistry[tag] = tmp
- }
- }
- func NewMessageMatcher(mptype reflect.Type) MessageMatcher {
- if fmt.Sprintf("%s", mptype)[0] == '*' {
- //fmt.Println("mptype:", mptype) // *AutoDetectMessageMatcher
- mptype = mptype.Elem()
- //fmt.Println("mptype:", mptype) // AutoDetectMessageMatcher
- }
- mpe := reflect.New(mptype).Interface() //
- //fmt.Println("mpe type:", reflect.TypeOf(mpe)) // *AutoDetectMessageMatcher
- //fmt.Println("mpe:", mpe) // &{<nil> map[] 0 <nil> <nil> <nil>}
- return mpe.(MessageMatcher)
- }
- func NewMessageProcessor(mptype reflect.Type) MessageProcessor {
- if fmt.Sprintf("%s", mptype)[0] == '*' {
- mptype = mptype.Elem()
- }
- mpe := reflect.New(mptype).Interface() //
- return mpe.(MessageProcessor)
- }
- /**
- * 默认消息匹配器
- */
- type DefaultMessageMatcher struct {
- connector *TCPConnector
- }
- func (matcher *DefaultMessageMatcher) Initialize(connector *TCPConnector) {
- matcher.connector = connector
- }
- func (matcher *DefaultMessageMatcher) MatchBegin(buf []byte) (int, int) {
- return 0, 1
- }
- func (matcher *DefaultMessageMatcher) MatchEnd(buf []byte) (int, int) {
- return len(buf), len(buf)
- }
- type MessageMatcherDetector struct {
- *Tracer
- default_message_matcher MessageMatcher
- connector *TCPConnector
- message_matcher MessageMatcher
- message_processor MessageProcessor
- }
- func NewMessageMatcherDetector(connector *TCPConnector) *MessageMatcherDetector {
- return &MessageMatcherDetector{
- Tracer: NewTracer(connector.module),
- default_message_matcher: &DefaultMessageMatcher{},
- connector: connector,
- message_matcher: nil,
- message_processor: nil,
- }
- }
- func (detector *MessageMatcherDetector) Detect(buf []byte) (MessageMatcher, MessageProcessor) {
- if detector.message_matcher == nil {
- maxklen := 0
- for k := range messageMatcherTagRegistry {
- if len(k) > maxklen {
- maxklen = len(k)
- }
- }
- for k := range messageMatcherTagRegistry {
- n, _ := FindBytes(buf, str2bytes(k))
- if n == 0 {
- mmtype := messageMatcherTagRegistry[k]
- message_matcher := NewMessageMatcher(mmtype)
- message_matcher.Initialize(detector.connector)
- mptype := messageProcessorTagRegistry[k]
- if mptype != nil {
- detector.message_processor = NewMessageProcessor(mptype)
- }
- detector.message_matcher = message_matcher
- }
- if n == -2 && len(k) > maxklen {
- maxklen = len(k)
- }
- }
- if detector.message_matcher == nil {
- if len(buf) > maxklen {
- message_matcher := detector.default_message_matcher
- message_matcher.Initialize(detector.connector)
- detector.message_matcher = message_matcher
- } else {
- return nil, nil
- }
- }
- detector.Trace('D', "消息匹配: %s", reflect.TypeOf(detector.message_matcher).Elem())
- }
- return detector.message_matcher, detector.message_processor
- }
- type MessageClassifier struct {
- id string
- connector *TCPConnector
- message_matcher MessageMatcher
- message_processor MessageProcessor
- detector *MessageMatcherDetector
- mutex_readbuf_off sync.Mutex // 保护对message_readbuf_off的操作
- message_readbuf_off int
- message_begin_index int
- record_count int
- bytes_count int
- dropped_bytes int
- buffer_bytes int
- break_processing bool
- time_start time.Time
- }
- func (msgclassifier *MessageClassifier) Initialize(id string, connector *TCPConnector, message_matcher MessageMatcher, msgproc MessageProcessor) {
- msgclassifier.id = id
- msgclassifier.connector = connector
- msgclassifier.message_readbuf_off = 0
- msgclassifier.message_begin_index = -1
- msgclassifier.record_count = 0
- msgclassifier.bytes_count = 0
- msgclassifier.dropped_bytes = 0
- msgclassifier.buffer_bytes = 0
- msgclassifier.break_processing = false
- msgclassifier.message_processor = msgproc
- if message_matcher == nil {
- msgclassifier.detector = NewMessageMatcherDetector(connector)
- } else {
- msgclassifier.message_matcher = message_matcher
- msgclassifier.message_matcher.Initialize(msgclassifier.connector)
- msgclassifier.id = fmt.Sprintf("%s#%s", reflect.TypeOf(message_matcher).Elem(), msgclassifier.id)
- if msgclassifier.message_processor != nil && !msgclassifier.message_processor.MatchType(msgclassifier.message_matcher) {
- msgclassifier.message_processor = nil
- }
- }
- msgclassifier.time_start = time.Now()
- }
- /**
- * 返回已经处理完成的数据指针位置和最后匹配的数据块
- */
- func (msgclassifier *MessageClassifier) Match(buf []byte) (int, []byte) {
- msgclassifier.mutex_readbuf_off.Lock()
- defer msgclassifier.mutex_readbuf_off.Unlock()
- // 从上次未处理完的位置继续
- procbuf := buf[msgclassifier.message_readbuf_off:]
- if Config.TOVOT_DEBUG > 0 {
- msgclassifier.connector.Trace('X', "[%s] buf %d %d", msgclassifier.id, procbuf, msgclassifier.message_begin_index)
- }
- if msgclassifier.message_begin_index < 0 {
- // 匹配消息开始标记
- if msgclassifier.message_matcher == nil {
- message_matcher, message_processor := msgclassifier.detector.Detect(buf)
- if message_matcher == nil {
- // detector会有DefaultMessageMatcher保底
- // 返回空只有一种可能:数据太少暂时无法确定消息匹配器,需要等待接收更多数据
- // 此时 msgclassifier.message_readbuf_off==0
- return msgclassifier.message_readbuf_off, nil
- }
- msgclassifier.message_matcher = message_matcher
- if msgclassifier.message_processor == nil || !msgclassifier.message_processor.MatchType(message_matcher) {
- // 运行时参数中没有指定消息处理器,则由detector根据消息类型确定
- msgclassifier.message_processor = message_processor
- }
- msgclassifier.id = fmt.Sprintf("%s#%s", reflect.TypeOf(msgclassifier.message_matcher).Elem(), msgclassifier.id)
- }
- begin_flag_index, next_index := msgclassifier.message_matcher.MatchBegin(procbuf)
- if begin_flag_index >= 0 {
- if Config.TOVOT_DEBUG > 0 {
- msgclassifier.connector.Trace('D', "[%s] match begin %d %d", msgclassifier.id, begin_flag_index, procbuf)
- }
- // 从匹配消息开始标记位置开始处理,丢弃之前不匹配(不能识别的格式)的数据
- msgclassifier.message_begin_index = begin_flag_index
- procbuf = procbuf[begin_flag_index:]
- msgclassifier.message_readbuf_off += begin_flag_index
- msgclassifier.dropped_bytes += begin_flag_index
- } else {
- // 忽略不匹配(不能识别的格式)的数据
- if Config.TOVOT_DEBUG > 0 {
- msgclassifier.connector.Trace('X', "[%s] not match begin %d %d", msgclassifier.id, next_index, procbuf)
- }
- msgclassifier.message_readbuf_off += next_index
- msgclassifier.dropped_bytes += next_index
- return msgclassifier.message_readbuf_off, nil
- }
- }
- // 此时procbuf内容已匹配消息开始标记,相对起始位置为0
- // 如果不能匹配消息结束标记,对所有变量
- if Config.TOVOT_DEBUG > 0 {
- msgclassifier.connector.Trace('X', "[%s] parse bytes %d", msgclassifier.id, len(procbuf))
- }
- // 匹配消息结束标记
- end_flag_index, msg_end_index := msgclassifier.message_matcher.MatchEnd(procbuf)
- if end_flag_index >= 0 {
- if Config.TOVOT_DEBUG > 0 {
- msgclassifier.connector.Trace('D', "[%s] match end %d %d %d",
- msgclassifier.id, end_flag_index, msg_end_index, procbuf[end_flag_index:msg_end_index])
- }
- // 消息匹配完整,返回处理数据
- msgbuf := procbuf[:msg_end_index]
- msgclassifier.message_begin_index = -1
- msgclassifier.message_readbuf_off += msg_end_index
- return msgclassifier.message_readbuf_off, msgbuf
- }
- if Config.TOVOT_DEBUG > 0 {
- msgclassifier.connector.Trace('D', "[%s] not match end %d", msgclassifier.id, procbuf)
- }
- // 没有匹配到消息结束标记,继续等待完整的数据
- return msgclassifier.message_readbuf_off, nil
- }
- /**
- * 返回已经处理完成的数据指针位置,-1表示数据处理过程终止(数据缓冲区扩容失败,丢弃部分数据,数据指针位置被重置)
- */
- func (msgclassifier *MessageClassifier) Proc(buf []byte) int {
- done_buf_off, msgbuf := msgclassifier.Match(buf)
- if msgbuf != nil {
- if msgclassifier.message_processor != nil {
- msgclassifier.message_processor.MessageProc(msgclassifier.message_matcher, msgbuf)
- }
- msgclassifier.bytes_count += len(msgbuf)
- msgclassifier.record_count++
- msgclassifier.buffer_bytes = len(buf)
- conn_time := time.Now().Sub(msgclassifier.time_start)
- msgclassifier.connector.CalmTrace(msgclassifier.id+".records", 1000, 'D',
- "[%s] 连接持续 %v, 已处理 %d records, %d bytes, dropped %d bytes, buffer %d bytes",
- msgclassifier.id, conn_time,
- msgclassifier.record_count, msgclassifier.bytes_count, msgclassifier.dropped_bytes, msgclassifier.buffer_bytes)
- }
- if msgclassifier.break_processing {
- return -1
- }
- return done_buf_off
- }
- // 数据缓冲区前滚
- func (msgclassifier *MessageClassifier) ReadBufferRollForward(readbuf_off int, dropped bool) {
- if Config.TOVOT_DEBUG > 0 {
- msgclassifier.connector.Trace('X', "[%s] ReadBufferRollForward message_readbuf_off=%d, readbuf_off=%d, dropped=%v",
- msgclassifier.id, msgclassifier.message_readbuf_off, readbuf_off, dropped)
- }
- msgclassifier.mutex_readbuf_off.Lock()
- defer msgclassifier.mutex_readbuf_off.Unlock()
- msgclassifier.message_begin_index = -1
- msgclassifier.message_readbuf_off -= readbuf_off
- if msgclassifier.message_readbuf_off < 0 {
- msgclassifier.message_readbuf_off = 0
- }
- if dropped {
- msgclassifier.dropped_bytes += readbuf_off
- msgclassifier.break_processing = true
- }
- }
- // 销毁数据分类器
- func (msgclassifier *MessageClassifier) Destory() {
- if msgclassifier.connector != nil {
- msgclassifier.connector.Trace('D', "[%s] 销毁数据分类器", msgclassifier.id)
- }
- msgclassifier.message_begin_index = -1
- msgclassifier.message_readbuf_off = 0
- }
- type MultiMessageClassifier struct {
- message_classifier []*MessageClassifier
- }
- func (mmc *MultiMessageClassifier) Initialize(connector *TCPConnector, message_matcher []MessageMatcher, msgproc []MessageProcessor) {
- if len(message_matcher) != len(msgproc) {
- panic("MultiMessageClassifier.Initialize Error: len(message_matcher) != len(msgproc)")
- }
- message_classifier_count := len(message_matcher)
- message_classifier := make([]*MessageClassifier, message_classifier_count)
- if message_classifier_count > 0 {
- for i := 0; i < message_classifier_count; i++ {
- message_classifier[i] = &MessageClassifier{}
- message_classifier[i].Initialize(fmt.Sprintf("_%d", newID()), connector, message_matcher[i], msgproc[i])
- }
- } else {
- message_classifier = make([]*MessageClassifier, 1)
- message_classifier[0] = &MessageClassifier{}
- message_classifier[0].Initialize(fmt.Sprintf("_%d", newID()), connector, nil, nil)
- }
- mmc.message_classifier = message_classifier
- }
- func (mmc *MultiMessageClassifier) receiverProcess(buf []byte) int {
- done_buffer_off := len(buf) //已经处理完成的数据偏移量
- message_classifier := mmc.message_classifier
- message_classifier_count := len(message_classifier)
- for i := 0; i < message_classifier_count; i++ {
- last_donebufoff, curdonebufoff := -1, 0
- for last_donebufoff != curdonebufoff && curdonebufoff >= 0 && curdonebufoff < len(buf) {
- last_donebufoff = curdonebufoff
- curdonebufoff = message_classifier[i].Proc(buf)
- }
- // 取多个分类器中最小的已经处理完成的数据偏移量
- if curdonebufoff < done_buffer_off {
- done_buffer_off = curdonebufoff
- }
- }
- return done_buffer_off
- }
- func (mmc *MultiMessageClassifier) shiftBuffer(shif_buffer_off int, dropped bool) {
- message_classifier := mmc.message_classifier
- message_classifier_count := len(message_classifier)
- for i := 0; i < message_classifier_count; i++ {
- message_classifier[i].ReadBufferRollForward(shif_buffer_off, dropped)
- }
- }
- func (mmc *MultiMessageClassifier) Destory() {
- for i := 0; i < len(mmc.message_classifier); i++ {
- mmc.message_classifier[i].Destory()
- }
- }
- type TCPReceiverProcessor struct {
- MultiMessageClassifier
- }
- func (recvproc *TCPReceiverProcessor) Initialize(connector *TCPConnector, message_matcher MessageMatcher, msgproc MessageProcessor) {
- recvproc.MultiMessageClassifier.Initialize(connector, []MessageMatcher{message_matcher}, []MessageProcessor{msgproc})
- }
- /*
- * TCPConnector 接收器,用于接收 TCP 连接请求
- */
- type TCPConnector struct {
- *Tracer
- id string
- conn net.Conn
- closed bool
- init_buf_size int
- max_buf_size int
- SendPromptMessage string
- mu_conn sync.Mutex
- remote_addr string
- local_addr string
- receiver_processor *TCPReceiverProcessor
- }
- /**
- * 连接器
- */
- func NewTCPConnector(id string, conn net.Conn, in_out string, init_buf_size, max_buf_size int) *TCPConnector {
- remote_addr := conn.RemoteAddr().String()
- local_addr := conn.LocalAddr().String()
- trace_module := fmt.Sprintf("%s@%s%s%s", id, local_addr, in_out, remote_addr)
- if init_buf_size < 64 {
- init_buf_size = 64
- }
- connector := &TCPConnector{
- Tracer: NewTracer(trace_module),
- id: id,
- conn: conn,
- closed: false,
- init_buf_size: init_buf_size,
- max_buf_size: max_buf_size,
- remote_addr: remote_addr,
- local_addr: local_addr,
- receiver_processor: nil, // 单工发送器不需要信息接收处理器
- }
- return connector
- }
- func (connector *TCPConnector) IsClosed() bool {
- return connector == nil || connector.conn == nil || connector.closed
- }
- func (connector *TCPConnector) Close() {
- connector.Trace('D', "%s 准备关闭连接", connector.id)
- // 立即关闭,以备重连
- func() {
- connector.mu_conn.Lock()
- defer connector.mu_conn.Unlock()
- if connector.receiver_processor != nil {
- connector.receiver_processor.Destory()
- }
- if connector.conn != nil {
- connector.closed = true
- connector.conn.Close()
- connector.conn = nil
- connector.Trace('D', "%s 连接器已关闭", connector.id)
- }
- }()
- }
- func (connector *TCPConnector) Send(data []byte) (int, error) {
- if connector.conn != nil {
- n, err := connector.conn.Write(data)
- if err != nil {
- connector.Trace('D', "Error send: %s", err.Error())
- return 0, err
- }
- return n, err
- }
- return 0, nil
- }
- func (connector *TCPConnector) sendStartMessage() (int, error) {
- if len(connector.SendPromptMessage) > 0 {
- n, err := connector.Send([]byte(connector.SendPromptMessage))
- if err != nil {
- connector.Trace('D', "Error send message: %s", err.Error())
- } else {
- connector.Trace('D', "连接开始提示信息发送成功")
- }
- return n, err
- }
- return 0, nil
- }
- type TCPReceiveBuffer struct {
- connector *TCPConnector
- recvbuf []byte
- recvlimit int // 接收数据的buffer偏移量的最大值,一般为len(recvbuf),当数据处理未完成且进行了缓冲区重组后,该限值可能为last_readoff
- recvoff int // 接收数据的buffer偏移量
- readoff int // 读取数据的buffer偏移量
- last_recvoff int // 上一次接收数据的buffer偏移量
- last_readoff int // 上一次读取数据的buffer偏移量
- break_processing bool // recvbuf扩容失败,丢弃部分数据,会导致数据处理过程被截断
- last_proc_done bool // 上次的处理过程是否结束
- mutex_readoff sync.Mutex
- buf_size_ok_time time.Time
- }
- func (trb *TCPReceiveBuffer) Initialize(connector *TCPConnector) {
- trb.connector = connector
- trb.recvbuf = make([]byte, connector.init_buf_size)
- trb.recvlimit = len(trb.recvbuf)
- trb.recvoff = 0 // 接收数据的buffer偏移量
- trb.readoff = 0 // 读取数据的buffer偏移量
- trb.last_recvoff = -1 // 上一次接收数据的buffer偏移量
- trb.last_readoff = -1 // 上一次读取数据的buffer偏移量
- trb.break_processing = false // recvbuf扩容失败,丢弃部分数据,会导致数据处理过程被截断
- trb.last_proc_done = true // 上次的处理过程是否结束
- trb.buf_size_ok_time = time.Now()
- }
- func (trb *TCPReceiveBuffer) getRecvBuffer() []byte {
- // 缓冲区数据重组后,原来buf中的数据可能还在处理中,为避免新接收数据覆盖到未处理完的数据,需要限制接收数据的范围
- ei := trb.recvoff + trb.connector.init_buf_size
- if ei > trb.recvlimit {
- ei = trb.recvlimit
- }
- return trb.recvbuf[trb.recvoff:ei]
- }
- func (trb *TCPReceiveBuffer) BufferReceive() (int, error) {
- // 接收数据仅改变recvoff以后的recvbuf内容
- n, err := trb.connector.conn.Read(trb.getRecvBuffer())
- trb.recvoff += n
- if n > 0 {
- if Config.TOVOT_DEBUG > 0 {
- trb.connector.Trace('X', "read bytes %d", n)
- }
- }
- return n, err
- }
- // 获取新的待处理buffer,只需要处理最后更新的procbuf
- func (trb *TCPReceiveBuffer) getProcBuffer() []byte {
- trb.mutex_readoff.Lock()
- defer trb.mutex_readoff.Unlock()
- if Config.TOVOT_DEBUG > 0 {
- trb.connector.Trace('X', "准备处理 readoff=%d, last_readoff=%d, recvoff=%d, last_recvoff=%d",
- trb.readoff, trb.last_readoff, trb.recvoff, trb.last_recvoff)
- }
- if trb.last_readoff != trb.readoff || trb.last_recvoff != trb.recvoff {
- trb.last_readoff = trb.readoff
- trb.last_recvoff = trb.recvoff
- buf := trb.recvbuf[trb.last_readoff:trb.last_recvoff]
- if Config.TOVOT_DEBUG > 0 {
- trb.connector.Trace('X', "处理开始 last_readoff=%d, last_recvoff=%d, len(buf)=%d", trb.last_readoff, trb.last_recvoff, len(buf))
- }
- trb.break_processing = false
- return buf
- }
- return nil
- }
- func (trb *TCPReceiveBuffer) BufferProcess(buf []byte) {
- // 数据处理内部不会改变buf的内容,返回已处理完成的数据相对于buf的指针位置
- // 数据接收只会追加recvbuf后续内容,不影响buf内容,即使recvbuf扩容,也不会改变,因此可以在独立线程中并行数据处理
- // recvbuf扩容失败,丢弃部分数据,需要对数据处理起始位置指针进行操作,需要避免冲突,msgclassifier.mutex_readbuf_off
- // recvbuf扩容失败,丢弃部分数据,会导致数据处理过程被截断,返回 doneoff==-1
- doneoff := trb.connector.receiver_processor.receiverProcess(buf)
- // 更新数据指针位置
- func() {
- trb.mutex_readoff.Lock()
- defer trb.mutex_readoff.Unlock()
- if doneoff >= 0 && !trb.break_processing {
- trb.connector.receiver_processor.shiftBuffer(doneoff, false)
- trb.readoff += doneoff
- if Config.TOVOT_DEBUG > 0 {
- trb.connector.Trace('X', "处理完成 readoff=%d, doneoff=%d, len(buf)=%d", trb.readoff, doneoff, len(buf))
- }
- }
- trb.last_readoff = -1
- trb.recvlimit = len(trb.recvbuf)
- }()
- }
- //over the Reorg level
- func (trb *TCPReceiveBuffer) OverReorgLevel() bool {
- if trb.recvoff+trb.connector.init_buf_size < trb.recvlimit/16 {
- if time.Now().Sub(trb.buf_size_ok_time) > time.Duration(1*time.Second) {
- return true
- }
- return false
- } else {
- trb.buf_size_ok_time = time.Now()
- }
- return trb.recvoff > len(trb.recvbuf)/2 && trb.recvoff+trb.connector.init_buf_size >= trb.recvlimit
- }
- func (trb *TCPReceiveBuffer) BufferReorg() {
- new_recvbuf := trb.recvbuf
- // 根据需要扩容一倍,达到最大值后丢弃一半数据
- new_buf_size := trb.BufferExpansion()
- if new_buf_size > 0 {
- new_recvbuf = make([]byte, new_buf_size)
- }
- func() {
- trb.mutex_readoff.Lock()
- defer trb.mutex_readoff.Unlock()
- if Config.TOVOT_DEBUG > 0 {
- trb.connector.Trace('X', "重组未处理完成数据前 readoff=%d, recvoff=%d, recvlimit=%d", trb.readoff, trb.recvoff, trb.recvlimit)
- }
- keepbuf := trb.recvbuf[trb.readoff:trb.recvoff]
- // 调整数据指针
- trb.recvoff -= trb.readoff
- trb.readoff = 0
- // 确定接收buffer上限
- if new_buf_size > 0 {
- // 扩容、重新分配内存后,正在处理中的数据不会被覆盖
- trb.recvlimit = new_buf_size
- } else if trb.last_readoff > trb.recvoff {
- // 防止正在处理中的数据被覆盖
- trb.recvlimit = trb.last_readoff
- } else if trb.last_readoff >= 0 {
- panic(fmt.Sprintf("扩容或丢数操作没起作用? last_readoff=%d, recvoff=%d, recvlimit=%d, len(recvbuf)=%d, new_buf_size=%d", trb.last_readoff, trb.recvoff, trb.recvlimit, len(trb.recvbuf), new_buf_size))
- }
- // 移动少量未处理完的数据
- copy(new_recvbuf, keepbuf)
- trb.recvbuf = new_recvbuf
- // buf == recvbuf[readoff:recvoff]
- if Config.TOVOT_DEBUG > 0 {
- trb.connector.Trace('X', "重组未处理完成数据后 readoff=%d, recvoff=%d, recvlimit=%d", trb.readoff, trb.recvoff, trb.recvlimit)
- }
- }()
- }
- /**
- * 返回新的接收缓冲区和最终需要保留的缓冲区
- */
- func (trb *TCPReceiveBuffer) BufferExpansion() (new_buf_size int) {
- trb.mutex_readoff.Lock()
- defer trb.mutex_readoff.Unlock()
- if trb.recvoff+trb.connector.init_buf_size < trb.recvlimit/16 {
- // 数据接收空间过剩
- new_buf_size = (len(trb.recvbuf) - trb.connector.init_buf_size) * 8 / 10 / trb.connector.init_buf_size * trb.connector.init_buf_size
- trb.connector.Trace('I', "数据接收空间过剩,收缩至 %s", fmt_bytes_size(new_buf_size))
- return new_buf_size
- }
- // 数据接收空间不足 trb.recvoff > len(trb.recvbuf)/2 && trb.recvoff+trb.connector.init_buf_size >= trb.recvlimit
- // 准备数据重组
- if trb.recvoff-trb.readoff > len(trb.recvbuf)/4 {
- // 缓存内容过多 trb.recvoff-trb.readoff > len(trb.recvbuf)/4
- // 或缓存数据无法移动 trb.recvoff-trb.readoff+trb.connector.init_buf_size >= trb.readoff
- // 需要扩容
- if len(trb.recvbuf) >= trb.connector.max_buf_size {
- // 已经达到缓冲区最大尺寸
- if trb.recvoff-trb.readoff+trb.connector.init_buf_size < trb.readoff {
- // 可以通过重组腾出一定的接收空间
- return -1
- }
- // 缓存数据无法有效移动
- if trb.recvoff-trb.readoff+trb.connector.init_buf_size > len(trb.recvbuf) {
- truncate_size := trb.recvoff - trb.readoff - trb.connector.init_buf_size
- // 剩余接收空间不足
- trb.connector.Trace('E', "达到最大缓冲区尺寸 %s>=%s,数据缓存过半,数据前滚,丢弃旧数据 %dbytes", fmt_bytes_size(len(trb.recvbuf)), fmt_bytes_size(trb.connector.max_buf_size), truncate_size)
- // trb.connector.Trace('F', "每个连接 %s Buffer 还不够,需要考虑调整一下处理方法了", fmt_bytes_size(len(trb.recvbuf)))
- // 直接移动数据起始位置指针,丢弃部分数据
- // 直接移动数据起始位置指针,需要避免与数据处理过程对数据起始位置指针的操作产生冲突,msgclassifier.mutex_readbuf_off
- // truncate
- trb.connector.receiver_processor.shiftBuffer(truncate_size, true)
- trb.readoff += truncate_size
- trb.last_readoff = trb.readoff
- trb.break_processing = true
- return -1
- }
- // 还有足够空间,可以分配新内存块,重组后继续使用
- return len(trb.recvbuf)
- }
- // 扩容
- new_buf_size = (len(trb.recvbuf) + trb.connector.init_buf_size) * 14 / 10 / trb.connector.init_buf_size * trb.connector.init_buf_size
- if new_buf_size > trb.connector.max_buf_size {
- new_buf_size = trb.connector.max_buf_size
- }
- trb.connector.Trace('I', "数据堆积过多,扩容至 %s", fmt_bytes_size(new_buf_size))
- return new_buf_size
- }
- // 使用现有空间重组
- return -1
- }
- func (connector *TCPConnector) Receive(message_matcher MessageMatcher, msgproc MessageProcessor) error {
- n, err := connector.sendStartMessage()
- connector.receiver_processor = &TCPReceiverProcessor{}
- connector.receiver_processor.Initialize(connector, message_matcher, msgproc)
- // 数据接收buffer
- trb := &TCPReceiveBuffer{}
- trb.Initialize(connector)
- connector.Trace('D', "数据接收缓冲区就绪")
- var wg_proc sync.WaitGroup
- ch_active_proc := make(chan bool)
- // 缓冲区后台并行处理
- go func() {
- for <-ch_active_proc {
- func() {
- defer wg_proc.Done()
- buf := trb.getProcBuffer()
- if buf != nil {
- trb.BufferProcess(buf)
- }
- }()
- }
- }()
- connector.Trace('D', "数据接收缓冲区并行处理就绪")
- for connector.conn != nil && err == nil {
- n, err = trb.BufferReceive()
- if n > 0 {
- wg_proc.Add(1)
- go func() { ch_active_proc <- true }()
- }
- if trb.OverReorgLevel() {
- trb.BufferReorg()
- }
- }
- connector.Trace('D', "等待数据接收缓冲区并行处理结束")
- wg_proc.Wait()
- ch_active_proc <- false
- connector.Trace('D', "数据接收结束")
- if !connector.IsClosed() && err != nil && err != io.EOF {
- connector.Trace('D', "%s", err.Error())
- return err
- }
- return nil
- }
- type TCPSender struct {
- *Tracer
- //----配置信息
- id string
- remote_host string
- remote_port int
- timeout int
- //----基本信息
- connector *TCPConnector
- }
- func NewTCPSender(id string, remote_host string, remote_port int) *TCPSender {
- sender := &TCPSender{
- Tracer: NewTracer(id),
- id: id,
- remote_host: remote_host,
- remote_port: remote_port,
- timeout: 3,
- connector: nil,
- }
- sender.reconnect()
- return sender
- }
- func (sender *TCPSender) reconnect() error {
- sender.Close()
- remote_addr := fmt.Sprintf("%s:%d", sender.remote_host, sender.remote_port)
- conn, err := net.DialTimeout("tcp", remote_addr, time.Duration(sender.timeout)*time.Second)
- if err != nil {
- return err
- }
- sender.connector = NewTCPConnector(sender.id, conn, "->", 0, 0)
- sender.module = sender.connector.module
- sender.Trace('D', "开启连接器")
- go func() {
- connector := sender.connector
- if connector != nil {
- connector.Receive(nil, nil)
- connector.Close()
- }
- }()
- return nil
- }
- func (sender *TCPSender) SendRetry(data []byte, retry int) (int, error) {
- if sender.connector.IsClosed() {
- err := sender.reconnect()
- if err != nil {
- return 0, err
- }
- }
- n, err := sender.connector.Send(data)
- if err != nil && retry > 0 {
- sender.connector.Close()
- n, err = sender.SendRetry(data, retry-1)
- }
- return n, err
- }
- func (sender *TCPSender) Send(data []byte) (int, error) {
- return sender.SendRetry(data, 1)
- }
- func (sender *TCPSender) Close() {
- if sender.connector != nil {
- sender.connector.Close()
- }
- }
- type Configuration struct {
- ID string
- ServerLocation string
- ServerPort int
- InitRecvBufSize int
- MaxRecvBufSize int
- ConnectorGrowth int
- SendPromptMessage string
- TOVOT_DEBUG int
- TRACE string
- }
- func (config *Configuration) get(cfg map[string]string, key string, default_value string) string {
- v, o := cfg[key]
- if o {
- return v
- }
- return default_value
- }
- func (config *Configuration) getInt(cfg map[string]string, key string, default_value int) int {
- sv, so := cfg[key]
- if so {
- iv, err := strconv.Atoi(sv)
- if err == nil {
- return iv
- }
- }
- return default_value
- }
- func (config *Configuration) Initialize(cfg map[string]string) {
- config.ID = config.get(cfg, "ID", fmt.Sprintf("ID_%d", newID()))
- config.ServerLocation = config.get(cfg, "ServerLocation", "")
- config.ServerPort = config.getInt(cfg, "ServerPort", 9999)
- config.InitRecvBufSize = config.getInt(cfg, "InitRecvBufSize", 1024)
- config.MaxRecvBufSize = config.getInt(cfg, "MaxRecvBufSize", 1024*1024*128)
- config.ConnectorGrowth = config.getInt(cfg, "ConnectorGrowth", 2)
- config.SendPromptMessage = config.get(cfg, "SendPromptMessage", "` enter interactive mode\n")
- config.TOVOT_DEBUG = config.getInt(cfg, "TOVOT_DEBUG", 0)
- config.TRACE = config.get(cfg, "TRACE", DefaultTraceCatalog)
- }
- func (config *Configuration) InitWithDefaultConfig(defaultConfig *Configuration) {
- if config.ID == "" {
- config.ID = defaultConfig.ID
- }
- if config.ServerLocation == "" {
- config.ServerLocation = defaultConfig.ServerLocation
- }
- if config.ServerPort == 0 {
- config.ServerPort = defaultConfig.ServerPort
- }
- if config.InitRecvBufSize == 0 {
- config.InitRecvBufSize = defaultConfig.InitRecvBufSize
- }
- if config.MaxRecvBufSize == 0 {
- config.MaxRecvBufSize = defaultConfig.MaxRecvBufSize
- }
- if config.ConnectorGrowth == 0 {
- config.ConnectorGrowth = defaultConfig.ConnectorGrowth
- }
- if config.SendPromptMessage == "" {
- config.SendPromptMessage = defaultConfig.SendPromptMessage
- }
- if config.TOVOT_DEBUG == 0 {
- config.TOVOT_DEBUG = defaultConfig.TOVOT_DEBUG
- }
- if config.TRACE == "" {
- config.TRACE = defaultConfig.TRACE
- }
- }
- /*
- * TPCReceiver 接收器,用于接收 TCP 连接请求
- */
- type TCPReceiver struct {
- *Tracer
- //----配置信息
- host string //接收器的地址
- port int //接收器监听端口
- message_matcher MessageMatcher //消息匹配器,用于每个连接器的数据接收、分类、解析,设为nil时由Detector根据首次收到的信息自动识别确定
- msgproc MessageProcessor //消息处理函数,用于调用者对收到的消息进行业务处理
- connector_growth int //每次预设备用连接器增长数量
- init_buf_size int //初始接收数据缓冲区尺寸
- max_buf_size int //最大接收数据缓冲区尺寸
- SendPromptMessage string //连接开始时主动发送给客户端的提示信息
- //----基本信息
- addr string
- listener net.Listener
- //----监控信息
- max_conn_id int //单向增长的连接ID
- cur_conn_count int32 //当前备用连接器数量
- prepare_conn_count int32 //预设备用连接器数量
- conn_chan chan net.Conn //用于启动连接器的TCP连接管道
- connectors sync.Map //用于缓存运行中的连接器
- wg_conn_close sync.WaitGroup //用于控制关闭顺序
- }
- func NewTCPReceiver(config *Configuration, message_matcher MessageMatcher, msgproc MessageProcessor) *TCPReceiver {
- hostname := config.ServerLocation
- if hostname == "" {
- hostname, _ = os.Hostname()
- }
- addr := fmt.Sprintf("%s@%s:%d", config.ID, hostname, config.ServerPort)
- receiver := &TCPReceiver{
- Tracer: NewTracer(addr),
- host: config.ServerLocation,
- port: config.ServerPort,
- message_matcher: message_matcher,
- msgproc: msgproc,
- connector_growth: config.ConnectorGrowth, //每次增加备用连接器数量
- init_buf_size: config.InitRecvBufSize,
- max_buf_size: config.MaxRecvBufSize,
- SendPromptMessage: "` enter interactive mode\n",
- addr: addr,
- listener: nil,
- max_conn_id: 0,
- cur_conn_count: 0,
- prepare_conn_count: 0,
- conn_chan: make(chan net.Conn),
- }
- return receiver
- }
- func (receiver *TCPReceiver) accept(connID string, conn net.Conn) {
- connector := NewTCPConnector(connID, conn, "<-", receiver.init_buf_size, receiver.max_buf_size)
- connector.SendPromptMessage = receiver.SendPromptMessage
- receiver.connectors.Store(connector.id, connector)
- defer func() {
- connector.Close()
- receiver.connectors.Delete(connector.id)
- atomic.AddInt32(&receiver.cur_conn_count, -1)
- }()
- connector.Receive(receiver.message_matcher, receiver.msgproc)
- }
- func (receiver *TCPReceiver) ConnectorPrepare(count int) {
- receiver.Trace('D', "增加 %d 备用连接器", count)
- var wg_connector_prepare sync.WaitGroup
- for i := 0; i < count; i++ {
- wg_connector_prepare.Add(1)
- go func(connid int) {
- receiver.wg_conn_close.Add(1)
- atomic.AddInt32(&receiver.prepare_conn_count, +1)
- connID := fmt.Sprintf("connector_%d", connid)
- receiver.Trace('D', "备用连接器 %s 准备就绪", connID)
- wg_connector_prepare.Done()
- for conn := range receiver.conn_chan {
- if conn != nil {
- receiver.Trace('D', "备用连接器 %s 开启", connID)
- atomic.AddInt32(&receiver.cur_conn_count, +1)
- receiver.Trace('D', "当前连接数: %d", int(atomic.LoadInt32(&receiver.cur_conn_count)))
- go func() {
- if receiver.listener != nil && int(atomic.LoadInt32(&receiver.cur_conn_count)) > int(atomic.LoadInt32(&receiver.prepare_conn_count))-receiver.connector_growth {
- receiver.ConnectorPrepare(receiver.connector_growth)
- }
- }()
- receiver.accept(connID, conn)
- receiver.Trace('D', "当前连接数: %d", int(atomic.LoadInt32(&receiver.cur_conn_count)))
- receiver.Trace('D', "备用连接器 %s 关闭", connID)
- }
- if receiver.listener == nil || int(atomic.LoadInt32(&receiver.cur_conn_count)) < int(atomic.LoadInt32(&receiver.prepare_conn_count))-2*receiver.connector_growth {
- break
- }
- }
- receiver.Trace('D', "备用连接器 %s 销毁", connID)
- atomic.AddInt32(&receiver.prepare_conn_count, -1)
- receiver.wg_conn_close.Done()
- }(receiver.max_conn_id)
- receiver.max_conn_id++
- }
- wg_connector_prepare.Wait()
- }
- func (receiver *TCPReceiver) Start() error {
- receiver.Trace('D', "开始[ver:%s]", version)
- receiver.Trace('D', "预设备用接收器")
- receiver.ConnectorPrepare(receiver.connector_growth)
- receiver.Trace('D', "开启监听端口")
- listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", receiver.host, receiver.port))
- if err != nil {
- receiver.Trace('D', "监听端口开启错误: %s", err.Error())
- return err
- }
- receiver.Trace('D', "监听端口已开启")
- receiver.listener = listener
- go func() {
- receiver.Trace('D', "准备接收连接")
- for {
- listener = receiver.listener
- if listener == nil {
- receiver.Trace('D', "端口已关闭")
- break
- }
- conn, err := listener.Accept()
- if err != nil {
- receiver.Trace('D', "连接接收错误: %s", err.Error())
- } else {
- receiver.conn_chan <- conn
- }
- }
- receiver.Trace('D', "关闭所有连接器")
- receiver.connectors.Range(func(connID interface{}, connector interface{}) bool {
- connector.(*TCPConnector).Close()
- return true
- })
- receiver.Trace('D', "销毁备用连接器")
- for n := int(atomic.LoadInt32(&receiver.prepare_conn_count)); n > 0; n-- {
- receiver.conn_chan <- nil
- }
- }()
- return nil
- }
- func (receiver *TCPReceiver) IsAlive() bool {
- return receiver.listener != nil
- }
- func (receiver *TCPReceiver) Stop() {
- listener := receiver.listener
- receiver.listener = nil
- receiver.Trace('D', "关闭监听端口")
- listener.Close()
- receiver.wg_conn_close.Wait()
- receiver.Trace('D', "结束")
- }
- /**
- * 交互命令匹配器
- */
- type CommandMatcher struct {
- *Tracer
- connector *TCPConnector
- mode int
- welcome int
- }
- func (matcher *CommandMatcher) Initialize(connector *TCPConnector) {
- matcher.Tracer = NewTracer(connector.module + "_cmdproc")
- matcher.connector = connector
- matcher.mode = 0
- matcher.welcome = 0
- }
- func (matcher *CommandMatcher) MatchBegin(buf []byte) (int, int) {
- if matcher.mode != 0 {
- return 0, 1
- }
- gi, fi := FindBytes(buf, []byte("`"))
- if gi >= 0 {
- matcher.mode = 1
- }
- return gi, fi
- }
- func (matcher *CommandMatcher) MatchEnd(buf []byte) (int, int) {
- //matcher.Trace('X', "MatchEnd: %d", buf)
- return FindBytes(buf, []byte("\n"))
- }
- /**
- * EIF格式编码器
- */
- type EIFFormator struct {
- }
- /**
- * EIF格式消息头
- */
- type EIFHeader struct {
- start [8]byte
- reserved_1 int32
- reserved_2 int32
- reserved_3 int32
- reserved_4 int32
- reserved_5 int32
- body_size int32
- bytes_321 int32
- }
- /**
- * EIF格式消息体
- */
- type EIFBody struct {
- three byte
- class string
- semicolon string
- kv_pairs string
- end string
- newline string
- one byte
- }
- /**
- * EIF格式错误
- */
- type EIFFormatError string
- func (eiffe EIFFormatError) Error() string { return "EIFFormat Error: " + string(eiffe) }
- func (*EIFFormator) EncodeSimply(class string, key_equal_value_semicolon_pairs string) []byte {
- str := fmt.Sprintf("%s;%sEND\n", class, key_equal_value_semicolon_pairs)
- bs := str2bytes(str)
- size := len(bs) + 2
- var bb bytes.Buffer
- bb.WriteString("<START>>")
- bb.Write(make([]byte, 20))
- binary.Write(&bb, binary.BigEndian, int32(size))
- binary.Write(&bb, binary.BigEndian, int32(size))
- bb.WriteByte(3)
- bb.Write(bs)
- bb.WriteByte(1)
- return bb.Bytes()
- }
- func (formater *EIFFormator) Encode(class string, data map[string]interface{}) []byte {
- kvs := ""
- for k := range data {
- v := fmt.Sprintf("%v", data[k])
- v = strings.Replace(v, "'", "''", -1)
- kvs = fmt.Sprintf("%s%s='%s';", kvs, k, v)
- }
- return formater.EncodeSimply(class, kvs)
- }
- func (formater *EIFFormator) ParseBody(buf []byte) (string, map[string]string) {
- class := ""
- in_str := byte(0)
- var bs bytes.Buffer
- n := len(buf)
- k := ""
- v := ""
- data := make(map[string]string)
- semicolon_key_value_pairs := func() {
- v = string(bs.Bytes()) //分号前为value
- bs.Reset() //清除缓存,准备接收下一个键值对
- if class == "" {
- class = v //第一个分号前的值,class中不能包含=';,不检查格式错误
- } else {
- data[k] = v //键值数据
- }
- }
- for i := 0; i < n; {
- c := buf[i]
- i++
- if in_str != 0 {
- if c == in_str {
- if i < n && buf[i] == c {
- bs.WriteByte(c) // 两个引号合并为一个引号
- i++
- } else {
- in_str = 0 //结束引号
- }
- } else {
- bs.WriteByte(c) //引号内字符
- }
- } else if c == '\'' || c == '"' {
- in_str = c //开始引号
- } else if c == '=' {
- k = string(bs.Bytes()) //等号前为key
- bs.Reset()
- } else if c == ';' {
- semicolon_key_value_pairs()
- } else {
- bs.WriteByte(c) //非引号内字符
- }
- }
- if bs.Len() != 0 {
- //格式错误,最后不是以分号结束,将错就错将所有剩余内存作为最后一个value处理
- semicolon_key_value_pairs()
- }
- return class, data
- }
- func (formater *EIFFormator) ParseHeader(buf []byte) (int, []byte, error) {
- if len(buf) < 41 {
- return -1, nil, nil //数据加载中
- }
- var eifheader EIFHeader
- binary.Read(bytes.NewReader(buf[28:32]), binary.BigEndian, &eifheader.body_size)
- binary.Read(bytes.NewReader(buf[32:36]), binary.BigEndian, &eifheader.bytes_321)
- //formater.Trace('X', "eifheader.bytes_321=%v", eifheader.bytes_321)
- if eifheader.bytes_321 != eifheader.body_size && eifheader.bytes_321 != 4 {
- return -1, nil, EIFFormatError(fmt.Sprintf("Data bytes_321 %d is not match body_size %d or 4", eifheader.bytes_321, eifheader.body_size))
- }
- if eifheader.body_size < 5 {
- return -1, nil, EIFFormatError(fmt.Sprintf("Data body size %d<5", eifheader.body_size))
- }
- if eifheader.body_size > 100*1024*1024 {
- return -1, nil, EIFFormatError(fmt.Sprintf("Data body size %d>100M", eifheader.body_size))
- }
- if len(buf) < int(36+eifheader.body_size) {
- return -1, nil, nil //数据加载中
- }
- if buf[36] != 3 || buf[36+eifheader.body_size-1] != 1 {
- return -1, nil, EIFFormatError(fmt.Sprintf("end point is not match, (%d!=3 or %d!=1)", buf[36], buf[36+eifheader.body_size-1]))
- }
- end_flag := bytes2str(buf[36+eifheader.body_size-6 : 36+eifheader.body_size-1])
- if end_flag != ";END\n" {
- return -1, nil, EIFFormatError(fmt.Sprintf("end flag is not match, %s", end_flag))
- }
- buf = buf[37 : 36+eifheader.body_size-6+1] //保留最后一个分号,方便解析
- return int(36 + eifheader.body_size), buf, nil
- }
- func (formater *EIFFormator) Decode(buf []byte) (int, string, map[string]string, error) {
- end_index, buf, err := formater.ParseHeader(buf)
- //formater.Trace('D', "decode message %s", bytes2str(buf))
- //内容解析
- class, data := formater.ParseBody(buf)
- return end_index, class, data, err
- }
- /**
- * EIF信息匹配器
- */
- type EIFMessageMatcher struct {
- *Tracer
- connector *TCPConnector
- formator *EIFFormator
- msgbody []byte
- }
- func (matcher *EIFMessageMatcher) Initialize(connector *TCPConnector) {
- matcher.Tracer = NewTracer(connector.module)
- matcher.connector = connector
- matcher.formator = &EIFFormator{}
- }
- func (matcher *EIFMessageMatcher) MatchBegin(buf []byte) (int, int) {
- return FindBytes(buf, []byte("<START>>"))
- }
- func (matcher *EIFMessageMatcher) MatchEnd(buf []byte) (int, int) {
- buf_size, buf, err := matcher.formator.ParseHeader(buf)
- if buf != nil {
- matcher.msgbody = buf
- return buf_size, buf_size
- }
- if err != nil {
- matcher.Trace('D', "EIFFormat MatchEnd Error: %s", err.Error())
- }
- return -1, 0
- }
- type EIFMessageProc func(remote_addr string, class string, data map[string]string)
- type EIFMessageProcessor struct {
- msgproc EIFMessageProc
- }
- func (eifmsg *EIFMessageProcessor) MatchType(matcher MessageMatcher) bool {
- _, ok := matcher.(*EIFMessageMatcher)
- return ok
- }
- func (eifmsg *EIFMessageProcessor) MessageProc(matcher MessageMatcher, buf []byte) {
- mm := matcher.(*EIFMessageMatcher)
- buf = make([]byte, len(mm.msgbody))
- copy(buf, mm.msgbody)
- go func(remote_addr string, buf []byte) {
- class, data := mm.formator.ParseBody(buf)
- eifmsg.msgproc(remote_addr, class, data)
- }(mm.connector.remote_addr, buf)
- }
- func KVsParse(kvs ...string) map[string]string {
- retmap := make(map[string]string)
- for i := range kvs {
- kv := kvs[i]
- n := strings.Index(kv, "=")
- if n >= 0 {
- k := kv[:n]
- v := kv[n+1:]
- retmap[k] = v
- } else {
- retmap[kv] = kv
- }
- }
- return retmap
- }
- var Config Configuration
- func init() {
- RegisterMessageClass("<START>>\x00", &EIFMessageMatcher{}, &EIFMessageProcessor{})
- RegisterMessageClass("`\r\n", &CommandMatcher{}, &CommandProcessor{})
- RegisterMessageClass("\r\n", &CommandMatcher{}, &CommandProcessor{})
- config := KVsParse(os.Environ()...)
- for k, v := range KVsParse(os.Args...) {
- config[k] = v
- }
- Config.Initialize(config)
- }
- type EIFSender struct {
- *Tracer
- *Configuration
- tcpSender *TCPSender
- }
- func (sender *EIFSender) Send(class string, data map[string]string) (int, error) {
- if sender.tcpSender == nil {
- sender.Configuration.InitWithDefaultConfig(&Config)
- if sender.ID == "" {
- sender.ID = "eifsender"
- }
- sender.tcpSender = NewTCPSender(sender.ID, sender.ServerLocation, sender.ServerPort)
- }
- var eifformator EIFFormator
- dat := make(map[string]interface{})
- for k, v := range data {
- dat[k] = v
- }
- bs := eifformator.Encode(class, dat)
- for i := 0; i < len(bs); {
- ns, err := sender.tcpSender.Send(bs)
- i += ns
- if err != nil {
- return i, err
- }
- }
- return len(bs), nil
- }
- func (sender *EIFSender) Close() {
- tcpSender := sender.tcpSender
- if tcpSender != nil {
- tcpSender.Close()
- sender.tcpSender = nil
- }
- }
- func EIFSend(server string, port int, class string, kvs string) {
- eifsender := EIFSender{
- Configuration: &Configuration{
- ID: "estemp",
- ServerLocation: server,
- ServerPort: port,
- },
- }
- defer eifsender.Close()
- eifformator := &EIFFormator{}
- _, data := eifformator.ParseBody(str2bytes(class + ";" + kvs))
- eifsender.Send(class, data)
- }
- type EIFReceiver struct {
- *Tracer
- *Configuration
- tcpReceiver *TCPReceiver
- }
- /**
- * 启动EIFReceiver,开启监听端口,处理数据接收、分类、解析,直至进程被终止
- */
- func (receiver *EIFReceiver) Run(eifmsgproc EIFMessageProc) {
- receiver.Configuration.InitWithDefaultConfig(&Config)
- break_signal := make(chan os.Signal, 1)
- signal.Notify(break_signal, os.Interrupt, os.Kill, syscall.SIGTERM)
- // 将通用消息处理函数转换为EIF消息处理函数
- eifmsgprocessor := &EIFMessageProcessor{
- msgproc: eifmsgproc,
- }
- receiver.tcpReceiver = NewTCPReceiver(receiver.Configuration, nil, eifmsgprocessor)
- receiver.tcpReceiver.Start()
- bs := <-break_signal
- receiver.tcpReceiver.Trace('D', "%v", bs)
- receiver.tcpReceiver.Stop()
- }
- /**
- * 判断EIFReceiver是否处于监听状态
- */
- func (receiver *EIFReceiver) IsRunning() bool {
- return receiver.tcpReceiver != nil && receiver.tcpReceiver.IsAlive()
- }
- /**
- * 交互命令处理,通过 telnet 127.0.0.1 9999 进行命令交互,可用于测试、监控、集成
- */
- type CommandProcessor struct {
- }
- func (cmd *CommandProcessor) MatchType(mmatcher MessageMatcher) bool {
- _, ok := mmatcher.(*CommandMatcher)
- return ok
- }
- func (cmd *CommandProcessor) outputHelp(mmatcher MessageMatcher) {
- matcher := mmatcher.(*CommandMatcher)
- _, err := matcher.connector.Send([]byte("----------------------\n" +
- "` 进入或退出命令交互模式\n" +
- "c|x|q|close|exit|quit 断开连接\n" +
- "h|help 打印帮助信息\n" +
- "s|send ip:port class key=value;... 发送一条EIF格式的消息,如:s 127.0.0.1:9999 test number=101;sleep=100;msg=hello world.\n" +
- "----------------------\n"))
- if err != nil {
- matcher.Trace('D', "Error send response: %s", err.Error())
- }
- }
- func (cmd *CommandProcessor) MessageProc(mmatcher MessageMatcher, buf []byte) {
- matcher := mmatcher.(*CommandMatcher)
- if matcher.mode == 1 {
- buf = buf[1:]
- matcher.mode = 2
- }
- m, _ := regexp.Match(`^\s*(?:help|h)\s*$`, buf)
- if m || matcher.welcome == 0 {
- cmd.outputHelp(matcher)
- matcher.welcome = 1
- }
- str := bytes2str(buf[:len(buf)-2])
- re_exit, err := regexp.Compile(`^\s*(?:close|c|exit|x|quit|q)\s*$`)
- if err != nil {
- matcher.Trace('D', "Regexp error: %s", err.Error())
- }
- matcher.Trace('D', "received: %s", str)
- if re_exit.MatchString(str) {
- matcher.Trace('D', "match command exit")
- matcher.connector.Close()
- }
- re_send, err := regexp.Compile(`^\s*(?:send|s)\s+([^:]+):(\d+)\s+(\S*)\s+(.*)$`)
- if err != nil {
- matcher.Trace('D', "Regexp error: %s", err.Error())
- }
- eif_send_args := re_send.FindStringSubmatch(str)
- if len(eif_send_args) > 4 {
- port, _ := strconv.Atoi(eif_send_args[2])
- EIFSend(eif_send_args[1], port, eif_send_args[3], eif_send_args[4])
- }
- m, _ = regexp.Match("^\\s*`\\s*$", buf)
- if m {
- matcher.mode = 0
- }
- if matcher.mode != 0 {
- matcher.connector.Send([]byte("> "))
- }
- }
|