tcptrans.go 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548
  1. package ovo
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "os"
  9. "os/signal"
  10. "reflect"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "syscall"
  17. "time"
  18. "unsafe"
  19. )
  20. // 版本信息
  21. const version = "2021.6.2.13"
  22. // 已存在的最大ID值
  23. var maxIDNumber uint32
  24. // 获取新的ID
  25. func newID() int {
  26. return (int)(atomic.AddUint32(&maxIDNumber, 1))
  27. }
  28. // 共享内存,临时转换类型,更快速
  29. func str2bytes(s string) []byte {
  30. x := (*[2]uintptr)(unsafe.Pointer(&s))
  31. h := [3]uintptr{x[0], x[1], x[1]}
  32. return *(*[]byte)(unsafe.Pointer(&h))
  33. }
  34. // 共享内存,临时转换类型,更快速
  35. func bytes2str(b []byte) string {
  36. return *(*string)(unsafe.Pointer(&b))
  37. }
  38. // 字节数格式化输出
  39. func fmt_bytes_size(b int) string {
  40. if b >= 0 {
  41. if b < 1024 {
  42. return fmt.Sprintf("%dB", b)
  43. }
  44. if b < 1024*1024 {
  45. return fmt.Sprintf("%dKB", b/1024)
  46. }
  47. if b < 1024*1024*1024 {
  48. return fmt.Sprintf("%dMB", b/1024/1024)
  49. }
  50. return fmt.Sprintf("%dGB", b/1024/1024/1024)
  51. }
  52. return fmt.Sprintf("%d", b)
  53. }
  54. /**
  55. * find b in a 从内存a中查找匹配内存b的位置
  56. * return:
  57. * matched index 匹配到的开始位置或-1表示没匹配到或-2表示目前没匹配到,但等待更多信息也许能够确定
  58. * last matching index 匹配到的结束位置或没匹配到的检查点位置,为了下次从这个位置继续
  59. *
  60. */
  61. func FindBytes(a []byte, b []byte) (int, int) {
  62. ai := 0
  63. for ; ai <= len(a)-len(b); ai++ {
  64. bi := 0
  65. for ; bi < len(b) && a[ai+bi] == b[bi]; bi++ {
  66. }
  67. if bi == len(b) {
  68. //print("a中匹配到b的全部内容")
  69. return ai, ai + bi
  70. }
  71. }
  72. for ; ai < len(a); ai++ {
  73. bi := 0
  74. for ; ai+bi < len(a) && a[ai+bi] == b[bi]; bi++ {
  75. }
  76. if ai+bi == len(a) {
  77. //print("a结尾部分能够匹配b的开头部分,即a后续内容增加后有可能能够匹配b")
  78. return -2, ai
  79. }
  80. }
  81. return -1, ai
  82. }
  83. const DefaultTraceCatalog = "FEWIDX"
  84. type Tracer struct {
  85. TRACE string
  86. module string
  87. }
  88. func NewTracer(module string) *Tracer {
  89. return &Tracer{
  90. TRACE: Config.TRACE,
  91. module: module,
  92. }
  93. }
  94. var mutexTracerBuffer sync.Mutex
  95. var tracerBuffer bytes.Buffer
  96. func (tracer *Tracer) Trace(catalog byte, sfmt string, args ...interface{}) {
  97. if strings.Index(tracer.TRACE, string(catalog)) >= 0 {
  98. sinfo := fmt.Sprintf(sfmt, args...)
  99. sinfo = fmt.Sprintf("%s [%s] [%s] %s\r\n", time.Now().Format("2006-01-02 15:04:05.000"), tracer.module, string(catalog), sinfo)
  100. mutexTracerBuffer.Lock()
  101. defer mutexTracerBuffer.Unlock()
  102. tracerBuffer.WriteString(sinfo)
  103. go func() {
  104. mutexTracerBuffer.Lock()
  105. defer mutexTracerBuffer.Unlock()
  106. sinfo := tracerBuffer.String()
  107. if sinfo != "" {
  108. tracerBuffer.Reset()
  109. fmt.Print(sinfo)
  110. }
  111. }()
  112. }
  113. }
  114. var mutexTraceInfo sync.Mutex
  115. var nextOutputTime map[string]time.Time
  116. func (tracer *Tracer) CalmTrace(class string, delayms int, catalog byte, sfmt string, args ...interface{}) {
  117. if strings.Index(tracer.TRACE, string(catalog)) >= 0 {
  118. mutexTraceInfo.Lock()
  119. defer mutexTraceInfo.Unlock()
  120. if nextOutputTime == nil {
  121. nextOutputTime = make(map[string]time.Time)
  122. }
  123. t, o := nextOutputTime[class]
  124. if o && time.Now().Before(t) {
  125. return
  126. }
  127. sinfo := fmt.Sprintf(sfmt, args...)
  128. sinfo = fmt.Sprintf("%s [%s] [%s] %s\r\n", time.Now().Format("2006-01-02 15:04:05.000"), tracer.module, string(catalog), sinfo)
  129. fmt.Print(sinfo)
  130. nextOutputTime[class] = time.Now().Add(time.Duration(delayms) * time.Millisecond)
  131. }
  132. }
  133. /**
  134. * 消息分类匹配器,用于每个连接器的数据接收、分类、解析
  135. */
  136. type MessageMatcher interface {
  137. Initialize(connector *TCPConnector)
  138. MatchBegin(msgbuf []byte) (int, int)
  139. MatchEnd(msgbuf []byte) (int, int)
  140. }
  141. /**
  142. * 消息分类处理器,用于每类消息的业务处理
  143. */
  144. type MessageProcessor interface {
  145. MatchType(mm MessageMatcher) bool
  146. MessageProc(mm MessageMatcher, buf []byte)
  147. }
  148. var messageMatcherNameRegistry = make(map[string]reflect.Type)
  149. var messageMatcherTagRegistry = make(map[string]reflect.Type)
  150. var messageProcessorTagRegistry = make(map[string]reflect.Type)
  151. func RegisterMessageClass(tag string, message_matcher MessageMatcher, message_processor MessageProcessor) {
  152. tmm := reflect.TypeOf(message_matcher).Elem()
  153. messageMatcherNameRegistry[tmm.Name()] = tmm
  154. messageMatcherTagRegistry[tag] = tmm
  155. if message_processor != nil {
  156. tmp := reflect.TypeOf(message_processor).Elem()
  157. messageProcessorTagRegistry[tag] = tmp
  158. }
  159. }
  160. func NewMessageMatcher(mptype reflect.Type) MessageMatcher {
  161. if fmt.Sprintf("%s", mptype)[0] == '*' {
  162. //fmt.Println("mptype:", mptype) // *AutoDetectMessageMatcher
  163. mptype = mptype.Elem()
  164. //fmt.Println("mptype:", mptype) // AutoDetectMessageMatcher
  165. }
  166. mpe := reflect.New(mptype).Interface() //
  167. //fmt.Println("mpe type:", reflect.TypeOf(mpe)) // *AutoDetectMessageMatcher
  168. //fmt.Println("mpe:", mpe) // &{<nil> map[] 0 <nil> <nil> <nil>}
  169. return mpe.(MessageMatcher)
  170. }
  171. func NewMessageProcessor(mptype reflect.Type) MessageProcessor {
  172. if fmt.Sprintf("%s", mptype)[0] == '*' {
  173. mptype = mptype.Elem()
  174. }
  175. mpe := reflect.New(mptype).Interface() //
  176. return mpe.(MessageProcessor)
  177. }
  178. /**
  179. * 默认消息匹配器
  180. */
  181. type DefaultMessageMatcher struct {
  182. connector *TCPConnector
  183. }
  184. func (matcher *DefaultMessageMatcher) Initialize(connector *TCPConnector) {
  185. matcher.connector = connector
  186. }
  187. func (matcher *DefaultMessageMatcher) MatchBegin(buf []byte) (int, int) {
  188. return 0, 1
  189. }
  190. func (matcher *DefaultMessageMatcher) MatchEnd(buf []byte) (int, int) {
  191. return len(buf), len(buf)
  192. }
  193. type MessageMatcherDetector struct {
  194. *Tracer
  195. default_message_matcher MessageMatcher
  196. connector *TCPConnector
  197. message_matcher MessageMatcher
  198. message_processor MessageProcessor
  199. }
  200. func NewMessageMatcherDetector(connector *TCPConnector) *MessageMatcherDetector {
  201. return &MessageMatcherDetector{
  202. Tracer: NewTracer(connector.module),
  203. default_message_matcher: &DefaultMessageMatcher{},
  204. connector: connector,
  205. message_matcher: nil,
  206. message_processor: nil,
  207. }
  208. }
  209. func (detector *MessageMatcherDetector) Detect(buf []byte) (MessageMatcher, MessageProcessor) {
  210. if detector.message_matcher == nil {
  211. maxklen := 0
  212. for k := range messageMatcherTagRegistry {
  213. if len(k) > maxklen {
  214. maxklen = len(k)
  215. }
  216. }
  217. for k := range messageMatcherTagRegistry {
  218. n, _ := FindBytes(buf, str2bytes(k))
  219. if n == 0 {
  220. mmtype := messageMatcherTagRegistry[k]
  221. message_matcher := NewMessageMatcher(mmtype)
  222. message_matcher.Initialize(detector.connector)
  223. mptype := messageProcessorTagRegistry[k]
  224. if mptype != nil {
  225. detector.message_processor = NewMessageProcessor(mptype)
  226. }
  227. detector.message_matcher = message_matcher
  228. }
  229. if n == -2 && len(k) > maxklen {
  230. maxklen = len(k)
  231. }
  232. }
  233. if detector.message_matcher == nil {
  234. if len(buf) > maxklen {
  235. message_matcher := detector.default_message_matcher
  236. message_matcher.Initialize(detector.connector)
  237. detector.message_matcher = message_matcher
  238. } else {
  239. return nil, nil
  240. }
  241. }
  242. detector.Trace('D', "消息匹配: %s", reflect.TypeOf(detector.message_matcher).Elem())
  243. }
  244. return detector.message_matcher, detector.message_processor
  245. }
  246. type MessageClassifier struct {
  247. id string
  248. connector *TCPConnector
  249. message_matcher MessageMatcher
  250. message_processor MessageProcessor
  251. detector *MessageMatcherDetector
  252. mutex_readbuf_off sync.Mutex // 保护对message_readbuf_off的操作
  253. message_readbuf_off int
  254. message_begin_index int
  255. record_count int
  256. bytes_count int
  257. dropped_bytes int
  258. buffer_bytes int
  259. break_processing bool
  260. time_start time.Time
  261. }
  262. func (msgclassifier *MessageClassifier) Initialize(id string, connector *TCPConnector, message_matcher MessageMatcher, msgproc MessageProcessor) {
  263. msgclassifier.id = id
  264. msgclassifier.connector = connector
  265. msgclassifier.message_readbuf_off = 0
  266. msgclassifier.message_begin_index = -1
  267. msgclassifier.record_count = 0
  268. msgclassifier.bytes_count = 0
  269. msgclassifier.dropped_bytes = 0
  270. msgclassifier.buffer_bytes = 0
  271. msgclassifier.break_processing = false
  272. msgclassifier.message_processor = msgproc
  273. if message_matcher == nil {
  274. msgclassifier.detector = NewMessageMatcherDetector(connector)
  275. } else {
  276. msgclassifier.message_matcher = message_matcher
  277. msgclassifier.message_matcher.Initialize(msgclassifier.connector)
  278. msgclassifier.id = fmt.Sprintf("%s#%s", reflect.TypeOf(message_matcher).Elem(), msgclassifier.id)
  279. if msgclassifier.message_processor != nil && !msgclassifier.message_processor.MatchType(msgclassifier.message_matcher) {
  280. msgclassifier.message_processor = nil
  281. }
  282. }
  283. msgclassifier.time_start = time.Now()
  284. }
  285. /**
  286. * 返回已经处理完成的数据指针位置和最后匹配的数据块
  287. */
  288. func (msgclassifier *MessageClassifier) Match(buf []byte) (int, []byte) {
  289. msgclassifier.mutex_readbuf_off.Lock()
  290. defer msgclassifier.mutex_readbuf_off.Unlock()
  291. // 从上次未处理完的位置继续
  292. procbuf := buf[msgclassifier.message_readbuf_off:]
  293. if Config.TOVOT_DEBUG > 0 {
  294. msgclassifier.connector.Trace('X', "[%s] buf %d %d", msgclassifier.id, procbuf, msgclassifier.message_begin_index)
  295. }
  296. if msgclassifier.message_begin_index < 0 {
  297. // 匹配消息开始标记
  298. if msgclassifier.message_matcher == nil {
  299. message_matcher, message_processor := msgclassifier.detector.Detect(buf)
  300. if message_matcher == nil {
  301. // detector会有DefaultMessageMatcher保底
  302. // 返回空只有一种可能:数据太少暂时无法确定消息匹配器,需要等待接收更多数据
  303. // 此时 msgclassifier.message_readbuf_off==0
  304. return msgclassifier.message_readbuf_off, nil
  305. }
  306. msgclassifier.message_matcher = message_matcher
  307. if msgclassifier.message_processor == nil || !msgclassifier.message_processor.MatchType(message_matcher) {
  308. // 运行时参数中没有指定消息处理器,则由detector根据消息类型确定
  309. msgclassifier.message_processor = message_processor
  310. }
  311. msgclassifier.id = fmt.Sprintf("%s#%s", reflect.TypeOf(msgclassifier.message_matcher).Elem(), msgclassifier.id)
  312. }
  313. begin_flag_index, next_index := msgclassifier.message_matcher.MatchBegin(procbuf)
  314. if begin_flag_index >= 0 {
  315. if Config.TOVOT_DEBUG > 0 {
  316. msgclassifier.connector.Trace('D', "[%s] match begin %d %d", msgclassifier.id, begin_flag_index, procbuf)
  317. }
  318. // 从匹配消息开始标记位置开始处理,丢弃之前不匹配(不能识别的格式)的数据
  319. msgclassifier.message_begin_index = begin_flag_index
  320. procbuf = procbuf[begin_flag_index:]
  321. msgclassifier.message_readbuf_off += begin_flag_index
  322. msgclassifier.dropped_bytes += begin_flag_index
  323. } else {
  324. // 忽略不匹配(不能识别的格式)的数据
  325. if Config.TOVOT_DEBUG > 0 {
  326. msgclassifier.connector.Trace('X', "[%s] not match begin %d %d", msgclassifier.id, next_index, procbuf)
  327. }
  328. msgclassifier.message_readbuf_off += next_index
  329. msgclassifier.dropped_bytes += next_index
  330. return msgclassifier.message_readbuf_off, nil
  331. }
  332. }
  333. // 此时procbuf内容已匹配消息开始标记,相对起始位置为0
  334. // 如果不能匹配消息结束标记,对所有变量
  335. if Config.TOVOT_DEBUG > 0 {
  336. msgclassifier.connector.Trace('X', "[%s] parse bytes %d", msgclassifier.id, len(procbuf))
  337. }
  338. // 匹配消息结束标记
  339. end_flag_index, msg_end_index := msgclassifier.message_matcher.MatchEnd(procbuf)
  340. if end_flag_index >= 0 {
  341. if Config.TOVOT_DEBUG > 0 {
  342. msgclassifier.connector.Trace('D', "[%s] match end %d %d %d",
  343. msgclassifier.id, end_flag_index, msg_end_index, procbuf[end_flag_index:msg_end_index])
  344. }
  345. // 消息匹配完整,返回处理数据
  346. msgbuf := procbuf[:msg_end_index]
  347. msgclassifier.message_begin_index = -1
  348. msgclassifier.message_readbuf_off += msg_end_index
  349. return msgclassifier.message_readbuf_off, msgbuf
  350. }
  351. if Config.TOVOT_DEBUG > 0 {
  352. msgclassifier.connector.Trace('D', "[%s] not match end %d", msgclassifier.id, procbuf)
  353. }
  354. // 没有匹配到消息结束标记,继续等待完整的数据
  355. return msgclassifier.message_readbuf_off, nil
  356. }
  357. /**
  358. * 返回已经处理完成的数据指针位置,-1表示数据处理过程终止(数据缓冲区扩容失败,丢弃部分数据,数据指针位置被重置)
  359. */
  360. func (msgclassifier *MessageClassifier) Proc(buf []byte) int {
  361. done_buf_off, msgbuf := msgclassifier.Match(buf)
  362. if msgbuf != nil {
  363. if msgclassifier.message_processor != nil {
  364. msgclassifier.message_processor.MessageProc(msgclassifier.message_matcher, msgbuf)
  365. }
  366. msgclassifier.bytes_count += len(msgbuf)
  367. msgclassifier.record_count++
  368. msgclassifier.buffer_bytes = len(buf)
  369. conn_time := time.Now().Sub(msgclassifier.time_start)
  370. msgclassifier.connector.CalmTrace(msgclassifier.id+".records", 1000, 'D',
  371. "[%s] 连接持续 %v, 已处理 %d records, %d bytes, dropped %d bytes, buffer %d bytes",
  372. msgclassifier.id, conn_time,
  373. msgclassifier.record_count, msgclassifier.bytes_count, msgclassifier.dropped_bytes, msgclassifier.buffer_bytes)
  374. }
  375. if msgclassifier.break_processing {
  376. return -1
  377. }
  378. return done_buf_off
  379. }
  380. // 数据缓冲区前滚
  381. func (msgclassifier *MessageClassifier) ReadBufferRollForward(readbuf_off int, dropped bool) {
  382. if Config.TOVOT_DEBUG > 0 {
  383. msgclassifier.connector.Trace('X', "[%s] ReadBufferRollForward message_readbuf_off=%d, readbuf_off=%d, dropped=%v",
  384. msgclassifier.id, msgclassifier.message_readbuf_off, readbuf_off, dropped)
  385. }
  386. msgclassifier.mutex_readbuf_off.Lock()
  387. defer msgclassifier.mutex_readbuf_off.Unlock()
  388. msgclassifier.message_begin_index = -1
  389. msgclassifier.message_readbuf_off -= readbuf_off
  390. if msgclassifier.message_readbuf_off < 0 {
  391. msgclassifier.message_readbuf_off = 0
  392. }
  393. if dropped {
  394. msgclassifier.dropped_bytes += readbuf_off
  395. msgclassifier.break_processing = true
  396. }
  397. }
  398. // 销毁数据分类器
  399. func (msgclassifier *MessageClassifier) Destory() {
  400. if msgclassifier.connector != nil {
  401. msgclassifier.connector.Trace('D', "[%s] 销毁数据分类器", msgclassifier.id)
  402. }
  403. msgclassifier.message_begin_index = -1
  404. msgclassifier.message_readbuf_off = 0
  405. }
  406. type MultiMessageClassifier struct {
  407. message_classifier []*MessageClassifier
  408. }
  409. func (mmc *MultiMessageClassifier) Initialize(connector *TCPConnector, message_matcher []MessageMatcher, msgproc []MessageProcessor) {
  410. if len(message_matcher) != len(msgproc) {
  411. panic("MultiMessageClassifier.Initialize Error: len(message_matcher) != len(msgproc)")
  412. }
  413. message_classifier_count := len(message_matcher)
  414. message_classifier := make([]*MessageClassifier, message_classifier_count)
  415. if message_classifier_count > 0 {
  416. for i := 0; i < message_classifier_count; i++ {
  417. message_classifier[i] = &MessageClassifier{}
  418. message_classifier[i].Initialize(fmt.Sprintf("_%d", newID()), connector, message_matcher[i], msgproc[i])
  419. }
  420. } else {
  421. message_classifier = make([]*MessageClassifier, 1)
  422. message_classifier[0] = &MessageClassifier{}
  423. message_classifier[0].Initialize(fmt.Sprintf("_%d", newID()), connector, nil, nil)
  424. }
  425. mmc.message_classifier = message_classifier
  426. }
  427. func (mmc *MultiMessageClassifier) receiverProcess(buf []byte) int {
  428. done_buffer_off := len(buf) //已经处理完成的数据偏移量
  429. message_classifier := mmc.message_classifier
  430. message_classifier_count := len(message_classifier)
  431. for i := 0; i < message_classifier_count; i++ {
  432. last_donebufoff, curdonebufoff := -1, 0
  433. for last_donebufoff != curdonebufoff && curdonebufoff >= 0 && curdonebufoff < len(buf) {
  434. last_donebufoff = curdonebufoff
  435. curdonebufoff = message_classifier[i].Proc(buf)
  436. }
  437. // 取多个分类器中最小的已经处理完成的数据偏移量
  438. if curdonebufoff < done_buffer_off {
  439. done_buffer_off = curdonebufoff
  440. }
  441. }
  442. return done_buffer_off
  443. }
  444. func (mmc *MultiMessageClassifier) shiftBuffer(shif_buffer_off int, dropped bool) {
  445. message_classifier := mmc.message_classifier
  446. message_classifier_count := len(message_classifier)
  447. for i := 0; i < message_classifier_count; i++ {
  448. message_classifier[i].ReadBufferRollForward(shif_buffer_off, dropped)
  449. }
  450. }
  451. func (mmc *MultiMessageClassifier) Destory() {
  452. for i := 0; i < len(mmc.message_classifier); i++ {
  453. mmc.message_classifier[i].Destory()
  454. }
  455. }
  456. type TCPReceiverProcessor struct {
  457. MultiMessageClassifier
  458. }
  459. func (recvproc *TCPReceiverProcessor) Initialize(connector *TCPConnector, message_matcher MessageMatcher, msgproc MessageProcessor) {
  460. recvproc.MultiMessageClassifier.Initialize(connector, []MessageMatcher{message_matcher}, []MessageProcessor{msgproc})
  461. }
  462. /*
  463. * TCPConnector 接收器,用于接收 TCP 连接请求
  464. */
  465. type TCPConnector struct {
  466. *Tracer
  467. id string
  468. conn net.Conn
  469. closed bool
  470. init_buf_size int
  471. max_buf_size int
  472. SendPromptMessage string
  473. mu_conn sync.Mutex
  474. remote_addr string
  475. local_addr string
  476. receiver_processor *TCPReceiverProcessor
  477. }
  478. /**
  479. * 连接器
  480. */
  481. func NewTCPConnector(id string, conn net.Conn, in_out string, init_buf_size, max_buf_size int) *TCPConnector {
  482. remote_addr := conn.RemoteAddr().String()
  483. local_addr := conn.LocalAddr().String()
  484. trace_module := fmt.Sprintf("%s@%s%s%s", id, local_addr, in_out, remote_addr)
  485. if init_buf_size < 64 {
  486. init_buf_size = 64
  487. }
  488. connector := &TCPConnector{
  489. Tracer: NewTracer(trace_module),
  490. id: id,
  491. conn: conn,
  492. closed: false,
  493. init_buf_size: init_buf_size,
  494. max_buf_size: max_buf_size,
  495. remote_addr: remote_addr,
  496. local_addr: local_addr,
  497. receiver_processor: nil, // 单工发送器不需要信息接收处理器
  498. }
  499. return connector
  500. }
  501. func (connector *TCPConnector) IsClosed() bool {
  502. return connector == nil || connector.conn == nil || connector.closed
  503. }
  504. func (connector *TCPConnector) Close() {
  505. connector.Trace('D', "%s 准备关闭连接", connector.id)
  506. // 立即关闭,以备重连
  507. func() {
  508. connector.mu_conn.Lock()
  509. defer connector.mu_conn.Unlock()
  510. if connector.receiver_processor != nil {
  511. connector.receiver_processor.Destory()
  512. }
  513. if connector.conn != nil {
  514. connector.closed = true
  515. connector.conn.Close()
  516. connector.conn = nil
  517. connector.Trace('D', "%s 连接器已关闭", connector.id)
  518. }
  519. }()
  520. }
  521. func (connector *TCPConnector) Send(data []byte) (int, error) {
  522. if connector.conn != nil {
  523. n, err := connector.conn.Write(data)
  524. if err != nil {
  525. connector.Trace('D', "Error send: %s", err.Error())
  526. return 0, err
  527. }
  528. return n, err
  529. }
  530. return 0, nil
  531. }
  532. func (connector *TCPConnector) sendStartMessage() (int, error) {
  533. if len(connector.SendPromptMessage) > 0 {
  534. n, err := connector.Send([]byte(connector.SendPromptMessage))
  535. if err != nil {
  536. connector.Trace('D', "Error send message: %s", err.Error())
  537. } else {
  538. connector.Trace('D', "连接开始提示信息发送成功")
  539. }
  540. return n, err
  541. }
  542. return 0, nil
  543. }
  544. type TCPReceiveBuffer struct {
  545. connector *TCPConnector
  546. recvbuf []byte
  547. recvlimit int // 接收数据的buffer偏移量的最大值,一般为len(recvbuf),当数据处理未完成且进行了缓冲区重组后,该限值可能为last_readoff
  548. recvoff int // 接收数据的buffer偏移量
  549. readoff int // 读取数据的buffer偏移量
  550. last_recvoff int // 上一次接收数据的buffer偏移量
  551. last_readoff int // 上一次读取数据的buffer偏移量
  552. break_processing bool // recvbuf扩容失败,丢弃部分数据,会导致数据处理过程被截断
  553. last_proc_done bool // 上次的处理过程是否结束
  554. mutex_readoff sync.Mutex
  555. buf_size_ok_time time.Time
  556. }
  557. func (trb *TCPReceiveBuffer) Initialize(connector *TCPConnector) {
  558. trb.connector = connector
  559. trb.recvbuf = make([]byte, connector.init_buf_size)
  560. trb.recvlimit = len(trb.recvbuf)
  561. trb.recvoff = 0 // 接收数据的buffer偏移量
  562. trb.readoff = 0 // 读取数据的buffer偏移量
  563. trb.last_recvoff = -1 // 上一次接收数据的buffer偏移量
  564. trb.last_readoff = -1 // 上一次读取数据的buffer偏移量
  565. trb.break_processing = false // recvbuf扩容失败,丢弃部分数据,会导致数据处理过程被截断
  566. trb.last_proc_done = true // 上次的处理过程是否结束
  567. trb.buf_size_ok_time = time.Now()
  568. }
  569. func (trb *TCPReceiveBuffer) getRecvBuffer() []byte {
  570. // 缓冲区数据重组后,原来buf中的数据可能还在处理中,为避免新接收数据覆盖到未处理完的数据,需要限制接收数据的范围
  571. ei := trb.recvoff + trb.connector.init_buf_size
  572. if ei > trb.recvlimit {
  573. ei = trb.recvlimit
  574. }
  575. return trb.recvbuf[trb.recvoff:ei]
  576. }
  577. func (trb *TCPReceiveBuffer) BufferReceive() (int, error) {
  578. // 接收数据仅改变recvoff以后的recvbuf内容
  579. n, err := trb.connector.conn.Read(trb.getRecvBuffer())
  580. trb.recvoff += n
  581. if n > 0 {
  582. if Config.TOVOT_DEBUG > 0 {
  583. trb.connector.Trace('X', "read bytes %d", n)
  584. }
  585. }
  586. return n, err
  587. }
  588. // 获取新的待处理buffer,只需要处理最后更新的procbuf
  589. func (trb *TCPReceiveBuffer) getProcBuffer() []byte {
  590. trb.mutex_readoff.Lock()
  591. defer trb.mutex_readoff.Unlock()
  592. if Config.TOVOT_DEBUG > 0 {
  593. trb.connector.Trace('X', "准备处理 readoff=%d, last_readoff=%d, recvoff=%d, last_recvoff=%d",
  594. trb.readoff, trb.last_readoff, trb.recvoff, trb.last_recvoff)
  595. }
  596. if trb.last_readoff != trb.readoff || trb.last_recvoff != trb.recvoff {
  597. trb.last_readoff = trb.readoff
  598. trb.last_recvoff = trb.recvoff
  599. buf := trb.recvbuf[trb.last_readoff:trb.last_recvoff]
  600. if Config.TOVOT_DEBUG > 0 {
  601. trb.connector.Trace('X', "处理开始 last_readoff=%d, last_recvoff=%d, len(buf)=%d", trb.last_readoff, trb.last_recvoff, len(buf))
  602. }
  603. trb.break_processing = false
  604. return buf
  605. }
  606. return nil
  607. }
  608. func (trb *TCPReceiveBuffer) BufferProcess(buf []byte) {
  609. // 数据处理内部不会改变buf的内容,返回已处理完成的数据相对于buf的指针位置
  610. // 数据接收只会追加recvbuf后续内容,不影响buf内容,即使recvbuf扩容,也不会改变,因此可以在独立线程中并行数据处理
  611. // recvbuf扩容失败,丢弃部分数据,需要对数据处理起始位置指针进行操作,需要避免冲突,msgclassifier.mutex_readbuf_off
  612. // recvbuf扩容失败,丢弃部分数据,会导致数据处理过程被截断,返回 doneoff==-1
  613. doneoff := trb.connector.receiver_processor.receiverProcess(buf)
  614. // 更新数据指针位置
  615. func() {
  616. trb.mutex_readoff.Lock()
  617. defer trb.mutex_readoff.Unlock()
  618. if doneoff >= 0 && !trb.break_processing {
  619. trb.connector.receiver_processor.shiftBuffer(doneoff, false)
  620. trb.readoff += doneoff
  621. if Config.TOVOT_DEBUG > 0 {
  622. trb.connector.Trace('X', "处理完成 readoff=%d, doneoff=%d, len(buf)=%d", trb.readoff, doneoff, len(buf))
  623. }
  624. }
  625. trb.last_readoff = -1
  626. trb.recvlimit = len(trb.recvbuf)
  627. }()
  628. }
  629. //over the Reorg level
  630. func (trb *TCPReceiveBuffer) OverReorgLevel() bool {
  631. if trb.recvoff+trb.connector.init_buf_size < trb.recvlimit/16 {
  632. if time.Now().Sub(trb.buf_size_ok_time) > time.Duration(1*time.Second) {
  633. return true
  634. }
  635. return false
  636. } else {
  637. trb.buf_size_ok_time = time.Now()
  638. }
  639. return trb.recvoff > len(trb.recvbuf)/2 && trb.recvoff+trb.connector.init_buf_size >= trb.recvlimit
  640. }
  641. func (trb *TCPReceiveBuffer) BufferReorg() {
  642. new_recvbuf := trb.recvbuf
  643. // 根据需要扩容一倍,达到最大值后丢弃一半数据
  644. new_buf_size := trb.BufferExpansion()
  645. if new_buf_size > 0 {
  646. new_recvbuf = make([]byte, new_buf_size)
  647. }
  648. func() {
  649. trb.mutex_readoff.Lock()
  650. defer trb.mutex_readoff.Unlock()
  651. if Config.TOVOT_DEBUG > 0 {
  652. trb.connector.Trace('X', "重组未处理完成数据前 readoff=%d, recvoff=%d, recvlimit=%d", trb.readoff, trb.recvoff, trb.recvlimit)
  653. }
  654. keepbuf := trb.recvbuf[trb.readoff:trb.recvoff]
  655. // 调整数据指针
  656. trb.recvoff -= trb.readoff
  657. trb.readoff = 0
  658. // 确定接收buffer上限
  659. if new_buf_size > 0 {
  660. // 扩容、重新分配内存后,正在处理中的数据不会被覆盖
  661. trb.recvlimit = new_buf_size
  662. } else if trb.last_readoff > trb.recvoff {
  663. // 防止正在处理中的数据被覆盖
  664. trb.recvlimit = trb.last_readoff
  665. } else if trb.last_readoff >= 0 {
  666. panic(fmt.Sprintf("扩容或丢数操作没起作用? last_readoff=%d, recvoff=%d, recvlimit=%d, len(recvbuf)=%d, new_buf_size=%d", trb.last_readoff, trb.recvoff, trb.recvlimit, len(trb.recvbuf), new_buf_size))
  667. }
  668. // 移动少量未处理完的数据
  669. copy(new_recvbuf, keepbuf)
  670. trb.recvbuf = new_recvbuf
  671. // buf == recvbuf[readoff:recvoff]
  672. if Config.TOVOT_DEBUG > 0 {
  673. trb.connector.Trace('X', "重组未处理完成数据后 readoff=%d, recvoff=%d, recvlimit=%d", trb.readoff, trb.recvoff, trb.recvlimit)
  674. }
  675. }()
  676. }
  677. /**
  678. * 返回新的接收缓冲区和最终需要保留的缓冲区
  679. */
  680. func (trb *TCPReceiveBuffer) BufferExpansion() (new_buf_size int) {
  681. trb.mutex_readoff.Lock()
  682. defer trb.mutex_readoff.Unlock()
  683. if trb.recvoff+trb.connector.init_buf_size < trb.recvlimit/16 {
  684. // 数据接收空间过剩
  685. new_buf_size = (len(trb.recvbuf) - trb.connector.init_buf_size) * 8 / 10 / trb.connector.init_buf_size * trb.connector.init_buf_size
  686. trb.connector.Trace('I', "数据接收空间过剩,收缩至 %s", fmt_bytes_size(new_buf_size))
  687. return new_buf_size
  688. }
  689. // 数据接收空间不足 trb.recvoff > len(trb.recvbuf)/2 && trb.recvoff+trb.connector.init_buf_size >= trb.recvlimit
  690. // 准备数据重组
  691. if trb.recvoff-trb.readoff > len(trb.recvbuf)/4 {
  692. // 缓存内容过多 trb.recvoff-trb.readoff > len(trb.recvbuf)/4
  693. // 或缓存数据无法移动 trb.recvoff-trb.readoff+trb.connector.init_buf_size >= trb.readoff
  694. // 需要扩容
  695. if len(trb.recvbuf) >= trb.connector.max_buf_size {
  696. // 已经达到缓冲区最大尺寸
  697. if trb.recvoff-trb.readoff+trb.connector.init_buf_size < trb.readoff {
  698. // 可以通过重组腾出一定的接收空间
  699. return -1
  700. }
  701. // 缓存数据无法有效移动
  702. if trb.recvoff-trb.readoff+trb.connector.init_buf_size > len(trb.recvbuf) {
  703. truncate_size := trb.recvoff - trb.readoff - trb.connector.init_buf_size
  704. // 剩余接收空间不足
  705. trb.connector.Trace('E', "达到最大缓冲区尺寸 %s>=%s,数据缓存过半,数据前滚,丢弃旧数据 %dbytes", fmt_bytes_size(len(trb.recvbuf)), fmt_bytes_size(trb.connector.max_buf_size), truncate_size)
  706. // trb.connector.Trace('F', "每个连接 %s Buffer 还不够,需要考虑调整一下处理方法了", fmt_bytes_size(len(trb.recvbuf)))
  707. // 直接移动数据起始位置指针,丢弃部分数据
  708. // 直接移动数据起始位置指针,需要避免与数据处理过程对数据起始位置指针的操作产生冲突,msgclassifier.mutex_readbuf_off
  709. // truncate
  710. trb.connector.receiver_processor.shiftBuffer(truncate_size, true)
  711. trb.readoff += truncate_size
  712. trb.last_readoff = trb.readoff
  713. trb.break_processing = true
  714. return -1
  715. }
  716. // 还有足够空间,可以分配新内存块,重组后继续使用
  717. return len(trb.recvbuf)
  718. }
  719. // 扩容
  720. new_buf_size = (len(trb.recvbuf) + trb.connector.init_buf_size) * 14 / 10 / trb.connector.init_buf_size * trb.connector.init_buf_size
  721. if new_buf_size > trb.connector.max_buf_size {
  722. new_buf_size = trb.connector.max_buf_size
  723. }
  724. trb.connector.Trace('I', "数据堆积过多,扩容至 %s", fmt_bytes_size(new_buf_size))
  725. return new_buf_size
  726. }
  727. // 使用现有空间重组
  728. return -1
  729. }
  730. func (connector *TCPConnector) Receive(message_matcher MessageMatcher, msgproc MessageProcessor) error {
  731. n, err := connector.sendStartMessage()
  732. connector.receiver_processor = &TCPReceiverProcessor{}
  733. connector.receiver_processor.Initialize(connector, message_matcher, msgproc)
  734. // 数据接收buffer
  735. trb := &TCPReceiveBuffer{}
  736. trb.Initialize(connector)
  737. connector.Trace('D', "数据接收缓冲区就绪")
  738. var wg_proc sync.WaitGroup
  739. ch_active_proc := make(chan bool)
  740. // 缓冲区后台并行处理
  741. go func() {
  742. for <-ch_active_proc {
  743. func() {
  744. defer wg_proc.Done()
  745. buf := trb.getProcBuffer()
  746. if buf != nil {
  747. trb.BufferProcess(buf)
  748. }
  749. }()
  750. }
  751. }()
  752. connector.Trace('D', "数据接收缓冲区并行处理就绪")
  753. for connector.conn != nil && err == nil {
  754. n, err = trb.BufferReceive()
  755. if n > 0 {
  756. wg_proc.Add(1)
  757. go func() { ch_active_proc <- true }()
  758. }
  759. if trb.OverReorgLevel() {
  760. trb.BufferReorg()
  761. }
  762. }
  763. connector.Trace('D', "等待数据接收缓冲区并行处理结束")
  764. wg_proc.Wait()
  765. ch_active_proc <- false
  766. connector.Trace('D', "数据接收结束")
  767. if !connector.IsClosed() && err != nil && err != io.EOF {
  768. connector.Trace('D', "%s", err.Error())
  769. return err
  770. }
  771. return nil
  772. }
  773. type TCPSender struct {
  774. *Tracer
  775. //----配置信息
  776. id string
  777. remote_host string
  778. remote_port int
  779. timeout int
  780. //----基本信息
  781. connector *TCPConnector
  782. }
  783. func NewTCPSender(id string, remote_host string, remote_port int) *TCPSender {
  784. sender := &TCPSender{
  785. Tracer: NewTracer(id),
  786. id: id,
  787. remote_host: remote_host,
  788. remote_port: remote_port,
  789. timeout: 3,
  790. connector: nil,
  791. }
  792. sender.reconnect()
  793. return sender
  794. }
  795. func (sender *TCPSender) reconnect() error {
  796. sender.Close()
  797. remote_addr := fmt.Sprintf("%s:%d", sender.remote_host, sender.remote_port)
  798. conn, err := net.DialTimeout("tcp", remote_addr, time.Duration(sender.timeout)*time.Second)
  799. if err != nil {
  800. return err
  801. }
  802. sender.connector = NewTCPConnector(sender.id, conn, "->", 0, 0)
  803. sender.module = sender.connector.module
  804. sender.Trace('D', "开启连接器")
  805. go func() {
  806. connector := sender.connector
  807. if connector != nil {
  808. connector.Receive(nil, nil)
  809. connector.Close()
  810. }
  811. }()
  812. return nil
  813. }
  814. func (sender *TCPSender) SendRetry(data []byte, retry int) (int, error) {
  815. if sender.connector.IsClosed() {
  816. err := sender.reconnect()
  817. if err != nil {
  818. return 0, err
  819. }
  820. }
  821. n, err := sender.connector.Send(data)
  822. if err != nil && retry > 0 {
  823. sender.connector.Close()
  824. n, err = sender.SendRetry(data, retry-1)
  825. }
  826. return n, err
  827. }
  828. func (sender *TCPSender) Send(data []byte) (int, error) {
  829. return sender.SendRetry(data, 1)
  830. }
  831. func (sender *TCPSender) Close() {
  832. if sender.connector != nil {
  833. sender.connector.Close()
  834. }
  835. }
  836. type Configuration struct {
  837. ID string
  838. ServerLocation string
  839. ServerPort int
  840. InitRecvBufSize int
  841. MaxRecvBufSize int
  842. ConnectorGrowth int
  843. SendPromptMessage string
  844. TOVOT_DEBUG int
  845. TRACE string
  846. }
  847. func (config *Configuration) get(cfg map[string]string, key string, default_value string) string {
  848. v, o := cfg[key]
  849. if o {
  850. return v
  851. }
  852. return default_value
  853. }
  854. func (config *Configuration) getInt(cfg map[string]string, key string, default_value int) int {
  855. sv, so := cfg[key]
  856. if so {
  857. iv, err := strconv.Atoi(sv)
  858. if err == nil {
  859. return iv
  860. }
  861. }
  862. return default_value
  863. }
  864. func (config *Configuration) Initialize(cfg map[string]string) {
  865. config.ID = config.get(cfg, "ID", fmt.Sprintf("ID_%d", newID()))
  866. config.ServerLocation = config.get(cfg, "ServerLocation", "")
  867. config.ServerPort = config.getInt(cfg, "ServerPort", 9999)
  868. config.InitRecvBufSize = config.getInt(cfg, "InitRecvBufSize", 1024)
  869. config.MaxRecvBufSize = config.getInt(cfg, "MaxRecvBufSize", 1024*1024*128)
  870. config.ConnectorGrowth = config.getInt(cfg, "ConnectorGrowth", 2)
  871. config.SendPromptMessage = config.get(cfg, "SendPromptMessage", "` enter interactive mode\n")
  872. config.TOVOT_DEBUG = config.getInt(cfg, "TOVOT_DEBUG", 0)
  873. config.TRACE = config.get(cfg, "TRACE", DefaultTraceCatalog)
  874. }
  875. func (config *Configuration) InitWithDefaultConfig(defaultConfig *Configuration) {
  876. if config.ID == "" {
  877. config.ID = defaultConfig.ID
  878. }
  879. if config.ServerLocation == "" {
  880. config.ServerLocation = defaultConfig.ServerLocation
  881. }
  882. if config.ServerPort == 0 {
  883. config.ServerPort = defaultConfig.ServerPort
  884. }
  885. if config.InitRecvBufSize == 0 {
  886. config.InitRecvBufSize = defaultConfig.InitRecvBufSize
  887. }
  888. if config.MaxRecvBufSize == 0 {
  889. config.MaxRecvBufSize = defaultConfig.MaxRecvBufSize
  890. }
  891. if config.ConnectorGrowth == 0 {
  892. config.ConnectorGrowth = defaultConfig.ConnectorGrowth
  893. }
  894. if config.SendPromptMessage == "" {
  895. config.SendPromptMessage = defaultConfig.SendPromptMessage
  896. }
  897. if config.TOVOT_DEBUG == 0 {
  898. config.TOVOT_DEBUG = defaultConfig.TOVOT_DEBUG
  899. }
  900. if config.TRACE == "" {
  901. config.TRACE = defaultConfig.TRACE
  902. }
  903. }
  904. /*
  905. * TPCReceiver 接收器,用于接收 TCP 连接请求
  906. */
  907. type TCPReceiver struct {
  908. *Tracer
  909. //----配置信息
  910. host string //接收器的地址
  911. port int //接收器监听端口
  912. message_matcher MessageMatcher //消息匹配器,用于每个连接器的数据接收、分类、解析,设为nil时由Detector根据首次收到的信息自动识别确定
  913. msgproc MessageProcessor //消息处理函数,用于调用者对收到的消息进行业务处理
  914. connector_growth int //每次预设备用连接器增长数量
  915. init_buf_size int //初始接收数据缓冲区尺寸
  916. max_buf_size int //最大接收数据缓冲区尺寸
  917. SendPromptMessage string //连接开始时主动发送给客户端的提示信息
  918. //----基本信息
  919. addr string
  920. listener net.Listener
  921. //----监控信息
  922. max_conn_id int //单向增长的连接ID
  923. cur_conn_count int32 //当前备用连接器数量
  924. prepare_conn_count int32 //预设备用连接器数量
  925. conn_chan chan net.Conn //用于启动连接器的TCP连接管道
  926. connectors sync.Map //用于缓存运行中的连接器
  927. wg_conn_close sync.WaitGroup //用于控制关闭顺序
  928. }
  929. func NewTCPReceiver(config *Configuration, message_matcher MessageMatcher, msgproc MessageProcessor) *TCPReceiver {
  930. hostname := config.ServerLocation
  931. if hostname == "" {
  932. hostname, _ = os.Hostname()
  933. }
  934. addr := fmt.Sprintf("%s@%s:%d", config.ID, hostname, config.ServerPort)
  935. receiver := &TCPReceiver{
  936. Tracer: NewTracer(addr),
  937. host: config.ServerLocation,
  938. port: config.ServerPort,
  939. message_matcher: message_matcher,
  940. msgproc: msgproc,
  941. connector_growth: config.ConnectorGrowth, //每次增加备用连接器数量
  942. init_buf_size: config.InitRecvBufSize,
  943. max_buf_size: config.MaxRecvBufSize,
  944. SendPromptMessage: "` enter interactive mode\n",
  945. addr: addr,
  946. listener: nil,
  947. max_conn_id: 0,
  948. cur_conn_count: 0,
  949. prepare_conn_count: 0,
  950. conn_chan: make(chan net.Conn),
  951. }
  952. return receiver
  953. }
  954. func (receiver *TCPReceiver) accept(connID string, conn net.Conn) {
  955. connector := NewTCPConnector(connID, conn, "<-", receiver.init_buf_size, receiver.max_buf_size)
  956. connector.SendPromptMessage = receiver.SendPromptMessage
  957. receiver.connectors.Store(connector.id, connector)
  958. defer func() {
  959. connector.Close()
  960. receiver.connectors.Delete(connector.id)
  961. atomic.AddInt32(&receiver.cur_conn_count, -1)
  962. }()
  963. connector.Receive(receiver.message_matcher, receiver.msgproc)
  964. }
  965. func (receiver *TCPReceiver) ConnectorPrepare(count int) {
  966. receiver.Trace('D', "增加 %d 备用连接器", count)
  967. var wg_connector_prepare sync.WaitGroup
  968. for i := 0; i < count; i++ {
  969. wg_connector_prepare.Add(1)
  970. go func(connid int) {
  971. receiver.wg_conn_close.Add(1)
  972. atomic.AddInt32(&receiver.prepare_conn_count, +1)
  973. connID := fmt.Sprintf("connector_%d", connid)
  974. receiver.Trace('D', "备用连接器 %s 准备就绪", connID)
  975. wg_connector_prepare.Done()
  976. for conn := range receiver.conn_chan {
  977. if conn != nil {
  978. receiver.Trace('D', "备用连接器 %s 开启", connID)
  979. atomic.AddInt32(&receiver.cur_conn_count, +1)
  980. receiver.Trace('D', "当前连接数: %d", int(atomic.LoadInt32(&receiver.cur_conn_count)))
  981. go func() {
  982. if receiver.listener != nil && int(atomic.LoadInt32(&receiver.cur_conn_count)) > int(atomic.LoadInt32(&receiver.prepare_conn_count))-receiver.connector_growth {
  983. receiver.ConnectorPrepare(receiver.connector_growth)
  984. }
  985. }()
  986. receiver.accept(connID, conn)
  987. receiver.Trace('D', "当前连接数: %d", int(atomic.LoadInt32(&receiver.cur_conn_count)))
  988. receiver.Trace('D', "备用连接器 %s 关闭", connID)
  989. }
  990. if receiver.listener == nil || int(atomic.LoadInt32(&receiver.cur_conn_count)) < int(atomic.LoadInt32(&receiver.prepare_conn_count))-2*receiver.connector_growth {
  991. break
  992. }
  993. }
  994. receiver.Trace('D', "备用连接器 %s 销毁", connID)
  995. atomic.AddInt32(&receiver.prepare_conn_count, -1)
  996. receiver.wg_conn_close.Done()
  997. }(receiver.max_conn_id)
  998. receiver.max_conn_id++
  999. }
  1000. wg_connector_prepare.Wait()
  1001. }
  1002. func (receiver *TCPReceiver) Start() error {
  1003. receiver.Trace('D', "开始[ver:%s]", version)
  1004. receiver.Trace('D', "预设备用接收器")
  1005. receiver.ConnectorPrepare(receiver.connector_growth)
  1006. receiver.Trace('D', "开启监听端口")
  1007. listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", receiver.host, receiver.port))
  1008. if err != nil {
  1009. receiver.Trace('D', "监听端口开启错误: %s", err.Error())
  1010. return err
  1011. }
  1012. receiver.Trace('D', "监听端口已开启")
  1013. receiver.listener = listener
  1014. go func() {
  1015. receiver.Trace('D', "准备接收连接")
  1016. for {
  1017. listener = receiver.listener
  1018. if listener == nil {
  1019. receiver.Trace('D', "端口已关闭")
  1020. break
  1021. }
  1022. conn, err := listener.Accept()
  1023. if err != nil {
  1024. receiver.Trace('D', "连接接收错误: %s", err.Error())
  1025. } else {
  1026. receiver.conn_chan <- conn
  1027. }
  1028. }
  1029. receiver.Trace('D', "关闭所有连接器")
  1030. receiver.connectors.Range(func(connID interface{}, connector interface{}) bool {
  1031. connector.(*TCPConnector).Close()
  1032. return true
  1033. })
  1034. receiver.Trace('D', "销毁备用连接器")
  1035. for n := int(atomic.LoadInt32(&receiver.prepare_conn_count)); n > 0; n-- {
  1036. receiver.conn_chan <- nil
  1037. }
  1038. }()
  1039. return nil
  1040. }
  1041. func (receiver *TCPReceiver) IsAlive() bool {
  1042. return receiver.listener != nil
  1043. }
  1044. func (receiver *TCPReceiver) Stop() {
  1045. listener := receiver.listener
  1046. receiver.listener = nil
  1047. receiver.Trace('D', "关闭监听端口")
  1048. listener.Close()
  1049. receiver.wg_conn_close.Wait()
  1050. receiver.Trace('D', "结束")
  1051. }
  1052. /**
  1053. * 交互命令匹配器
  1054. */
  1055. type CommandMatcher struct {
  1056. *Tracer
  1057. connector *TCPConnector
  1058. mode int
  1059. welcome int
  1060. }
  1061. func (matcher *CommandMatcher) Initialize(connector *TCPConnector) {
  1062. matcher.Tracer = NewTracer(connector.module + "_cmdproc")
  1063. matcher.connector = connector
  1064. matcher.mode = 0
  1065. matcher.welcome = 0
  1066. }
  1067. func (matcher *CommandMatcher) MatchBegin(buf []byte) (int, int) {
  1068. if matcher.mode != 0 {
  1069. return 0, 1
  1070. }
  1071. gi, fi := FindBytes(buf, []byte("`"))
  1072. if gi >= 0 {
  1073. matcher.mode = 1
  1074. }
  1075. return gi, fi
  1076. }
  1077. func (matcher *CommandMatcher) MatchEnd(buf []byte) (int, int) {
  1078. //matcher.Trace('X', "MatchEnd: %d", buf)
  1079. return FindBytes(buf, []byte("\n"))
  1080. }
  1081. /**
  1082. * EIF格式编码器
  1083. */
  1084. type EIFFormator struct {
  1085. }
  1086. /**
  1087. * EIF格式消息头
  1088. */
  1089. type EIFHeader struct {
  1090. start [8]byte
  1091. reserved_1 int32
  1092. reserved_2 int32
  1093. reserved_3 int32
  1094. reserved_4 int32
  1095. reserved_5 int32
  1096. body_size int32
  1097. bytes_321 int32
  1098. }
  1099. /**
  1100. * EIF格式消息体
  1101. */
  1102. type EIFBody struct {
  1103. three byte
  1104. class string
  1105. semicolon string
  1106. kv_pairs string
  1107. end string
  1108. newline string
  1109. one byte
  1110. }
  1111. /**
  1112. * EIF格式错误
  1113. */
  1114. type EIFFormatError string
  1115. func (eiffe EIFFormatError) Error() string { return "EIFFormat Error: " + string(eiffe) }
  1116. func (*EIFFormator) EncodeSimply(class string, key_equal_value_semicolon_pairs string) []byte {
  1117. str := fmt.Sprintf("%s;%sEND\n", class, key_equal_value_semicolon_pairs)
  1118. bs := str2bytes(str)
  1119. size := len(bs) + 2
  1120. var bb bytes.Buffer
  1121. bb.WriteString("<START>>")
  1122. bb.Write(make([]byte, 20))
  1123. binary.Write(&bb, binary.BigEndian, int32(size))
  1124. binary.Write(&bb, binary.BigEndian, int32(size))
  1125. bb.WriteByte(3)
  1126. bb.Write(bs)
  1127. bb.WriteByte(1)
  1128. return bb.Bytes()
  1129. }
  1130. func (formater *EIFFormator) Encode(class string, data map[string]interface{}) []byte {
  1131. kvs := ""
  1132. for k := range data {
  1133. v := fmt.Sprintf("%v", data[k])
  1134. v = strings.Replace(v, "'", "''", -1)
  1135. kvs = fmt.Sprintf("%s%s='%s';", kvs, k, v)
  1136. }
  1137. return formater.EncodeSimply(class, kvs)
  1138. }
  1139. func (formater *EIFFormator) ParseBody(buf []byte) (string, map[string]string) {
  1140. class := ""
  1141. in_str := byte(0)
  1142. var bs bytes.Buffer
  1143. n := len(buf)
  1144. k := ""
  1145. v := ""
  1146. data := make(map[string]string)
  1147. semicolon_key_value_pairs := func() {
  1148. v = string(bs.Bytes()) //分号前为value
  1149. bs.Reset() //清除缓存,准备接收下一个键值对
  1150. if class == "" {
  1151. class = v //第一个分号前的值,class中不能包含=';,不检查格式错误
  1152. } else {
  1153. data[k] = v //键值数据
  1154. }
  1155. }
  1156. for i := 0; i < n; {
  1157. c := buf[i]
  1158. i++
  1159. if in_str != 0 {
  1160. if c == in_str {
  1161. if i < n && buf[i] == c {
  1162. bs.WriteByte(c) // 两个引号合并为一个引号
  1163. i++
  1164. } else {
  1165. in_str = 0 //结束引号
  1166. }
  1167. } else {
  1168. bs.WriteByte(c) //引号内字符
  1169. }
  1170. } else if c == '\'' || c == '"' {
  1171. in_str = c //开始引号
  1172. } else if c == '=' {
  1173. k = string(bs.Bytes()) //等号前为key
  1174. bs.Reset()
  1175. } else if c == ';' {
  1176. semicolon_key_value_pairs()
  1177. } else {
  1178. bs.WriteByte(c) //非引号内字符
  1179. }
  1180. }
  1181. if bs.Len() != 0 {
  1182. //格式错误,最后不是以分号结束,将错就错将所有剩余内存作为最后一个value处理
  1183. semicolon_key_value_pairs()
  1184. }
  1185. return class, data
  1186. }
  1187. func (formater *EIFFormator) ParseHeader(buf []byte) (int, []byte, error) {
  1188. if len(buf) < 41 {
  1189. return -1, nil, nil //数据加载中
  1190. }
  1191. var eifheader EIFHeader
  1192. binary.Read(bytes.NewReader(buf[28:32]), binary.BigEndian, &eifheader.body_size)
  1193. binary.Read(bytes.NewReader(buf[32:36]), binary.BigEndian, &eifheader.bytes_321)
  1194. //formater.Trace('X', "eifheader.bytes_321=%v", eifheader.bytes_321)
  1195. if eifheader.bytes_321 != eifheader.body_size && eifheader.bytes_321 != 4 {
  1196. return -1, nil, EIFFormatError(fmt.Sprintf("Data bytes_321 %d is not match body_size %d or 4", eifheader.bytes_321, eifheader.body_size))
  1197. }
  1198. if eifheader.body_size < 5 {
  1199. return -1, nil, EIFFormatError(fmt.Sprintf("Data body size %d<5", eifheader.body_size))
  1200. }
  1201. if eifheader.body_size > 100*1024*1024 {
  1202. return -1, nil, EIFFormatError(fmt.Sprintf("Data body size %d>100M", eifheader.body_size))
  1203. }
  1204. if len(buf) < int(36+eifheader.body_size) {
  1205. return -1, nil, nil //数据加载中
  1206. }
  1207. if buf[36] != 3 || buf[36+eifheader.body_size-1] != 1 {
  1208. return -1, nil, EIFFormatError(fmt.Sprintf("end point is not match, (%d!=3 or %d!=1)", buf[36], buf[36+eifheader.body_size-1]))
  1209. }
  1210. end_flag := bytes2str(buf[36+eifheader.body_size-6 : 36+eifheader.body_size-1])
  1211. if end_flag != ";END\n" {
  1212. return -1, nil, EIFFormatError(fmt.Sprintf("end flag is not match, %s", end_flag))
  1213. }
  1214. buf = buf[37 : 36+eifheader.body_size-6+1] //保留最后一个分号,方便解析
  1215. return int(36 + eifheader.body_size), buf, nil
  1216. }
  1217. func (formater *EIFFormator) Decode(buf []byte) (int, string, map[string]string, error) {
  1218. end_index, buf, err := formater.ParseHeader(buf)
  1219. //formater.Trace('D', "decode message %s", bytes2str(buf))
  1220. //内容解析
  1221. class, data := formater.ParseBody(buf)
  1222. return end_index, class, data, err
  1223. }
  1224. /**
  1225. * EIF信息匹配器
  1226. */
  1227. type EIFMessageMatcher struct {
  1228. *Tracer
  1229. connector *TCPConnector
  1230. formator *EIFFormator
  1231. msgbody []byte
  1232. }
  1233. func (matcher *EIFMessageMatcher) Initialize(connector *TCPConnector) {
  1234. matcher.Tracer = NewTracer(connector.module)
  1235. matcher.connector = connector
  1236. matcher.formator = &EIFFormator{}
  1237. }
  1238. func (matcher *EIFMessageMatcher) MatchBegin(buf []byte) (int, int) {
  1239. return FindBytes(buf, []byte("<START>>"))
  1240. }
  1241. func (matcher *EIFMessageMatcher) MatchEnd(buf []byte) (int, int) {
  1242. buf_size, buf, err := matcher.formator.ParseHeader(buf)
  1243. if buf != nil {
  1244. matcher.msgbody = buf
  1245. return buf_size, buf_size
  1246. }
  1247. if err != nil {
  1248. matcher.Trace('D', "EIFFormat MatchEnd Error: %s", err.Error())
  1249. }
  1250. return -1, 0
  1251. }
  1252. type EIFMessageProc func(remote_addr string, class string, data map[string]string)
  1253. type EIFMessageProcessor struct {
  1254. msgproc EIFMessageProc
  1255. }
  1256. func (eifmsg *EIFMessageProcessor) MatchType(matcher MessageMatcher) bool {
  1257. _, ok := matcher.(*EIFMessageMatcher)
  1258. return ok
  1259. }
  1260. func (eifmsg *EIFMessageProcessor) MessageProc(matcher MessageMatcher, buf []byte) {
  1261. mm := matcher.(*EIFMessageMatcher)
  1262. buf = make([]byte, len(mm.msgbody))
  1263. copy(buf, mm.msgbody)
  1264. go func(remote_addr string, buf []byte) {
  1265. class, data := mm.formator.ParseBody(buf)
  1266. eifmsg.msgproc(remote_addr, class, data)
  1267. }(mm.connector.remote_addr, buf)
  1268. }
  1269. func KVsParse(kvs ...string) map[string]string {
  1270. retmap := make(map[string]string)
  1271. for i := range kvs {
  1272. kv := kvs[i]
  1273. n := strings.Index(kv, "=")
  1274. if n >= 0 {
  1275. k := kv[:n]
  1276. v := kv[n+1:]
  1277. retmap[k] = v
  1278. } else {
  1279. retmap[kv] = kv
  1280. }
  1281. }
  1282. return retmap
  1283. }
  1284. var Config Configuration
  1285. func init() {
  1286. RegisterMessageClass("<START>>\x00", &EIFMessageMatcher{}, &EIFMessageProcessor{})
  1287. RegisterMessageClass("`\r\n", &CommandMatcher{}, &CommandProcessor{})
  1288. RegisterMessageClass("\r\n", &CommandMatcher{}, &CommandProcessor{})
  1289. config := KVsParse(os.Environ()...)
  1290. for k, v := range KVsParse(os.Args...) {
  1291. config[k] = v
  1292. }
  1293. Config.Initialize(config)
  1294. }
  1295. type EIFSender struct {
  1296. *Tracer
  1297. *Configuration
  1298. tcpSender *TCPSender
  1299. }
  1300. func (sender *EIFSender) Send(class string, data map[string]string) (int, error) {
  1301. if sender.tcpSender == nil {
  1302. sender.Configuration.InitWithDefaultConfig(&Config)
  1303. if sender.ID == "" {
  1304. sender.ID = "eifsender"
  1305. }
  1306. sender.tcpSender = NewTCPSender(sender.ID, sender.ServerLocation, sender.ServerPort)
  1307. }
  1308. var eifformator EIFFormator
  1309. dat := make(map[string]interface{})
  1310. for k, v := range data {
  1311. dat[k] = v
  1312. }
  1313. bs := eifformator.Encode(class, dat)
  1314. for i := 0; i < len(bs); {
  1315. ns, err := sender.tcpSender.Send(bs)
  1316. i += ns
  1317. if err != nil {
  1318. return i, err
  1319. }
  1320. }
  1321. return len(bs), nil
  1322. }
  1323. func (sender *EIFSender) Close() {
  1324. tcpSender := sender.tcpSender
  1325. if tcpSender != nil {
  1326. tcpSender.Close()
  1327. sender.tcpSender = nil
  1328. }
  1329. }
  1330. func EIFSend(server string, port int, class string, kvs string) {
  1331. eifsender := EIFSender{
  1332. Configuration: &Configuration{
  1333. ID: "estemp",
  1334. ServerLocation: server,
  1335. ServerPort: port,
  1336. },
  1337. }
  1338. defer eifsender.Close()
  1339. eifformator := &EIFFormator{}
  1340. _, data := eifformator.ParseBody(str2bytes(class + ";" + kvs))
  1341. eifsender.Send(class, data)
  1342. }
  1343. type EIFReceiver struct {
  1344. *Tracer
  1345. *Configuration
  1346. tcpReceiver *TCPReceiver
  1347. }
  1348. /**
  1349. * 启动EIFReceiver,开启监听端口,处理数据接收、分类、解析,直至进程被终止
  1350. */
  1351. func (receiver *EIFReceiver) Run(eifmsgproc EIFMessageProc) {
  1352. receiver.Configuration.InitWithDefaultConfig(&Config)
  1353. break_signal := make(chan os.Signal, 1)
  1354. signal.Notify(break_signal, os.Interrupt, os.Kill, syscall.SIGTERM)
  1355. // 将通用消息处理函数转换为EIF消息处理函数
  1356. eifmsgprocessor := &EIFMessageProcessor{
  1357. msgproc: eifmsgproc,
  1358. }
  1359. receiver.tcpReceiver = NewTCPReceiver(receiver.Configuration, nil, eifmsgprocessor)
  1360. receiver.tcpReceiver.Start()
  1361. bs := <-break_signal
  1362. receiver.tcpReceiver.Trace('D', "%v", bs)
  1363. receiver.tcpReceiver.Stop()
  1364. }
  1365. /**
  1366. * 判断EIFReceiver是否处于监听状态
  1367. */
  1368. func (receiver *EIFReceiver) IsRunning() bool {
  1369. return receiver.tcpReceiver != nil && receiver.tcpReceiver.IsAlive()
  1370. }
  1371. /**
  1372. * 交互命令处理,通过 telnet 127.0.0.1 9999 进行命令交互,可用于测试、监控、集成
  1373. */
  1374. type CommandProcessor struct {
  1375. }
  1376. func (cmd *CommandProcessor) MatchType(mmatcher MessageMatcher) bool {
  1377. _, ok := mmatcher.(*CommandMatcher)
  1378. return ok
  1379. }
  1380. func (cmd *CommandProcessor) outputHelp(mmatcher MessageMatcher) {
  1381. matcher := mmatcher.(*CommandMatcher)
  1382. _, err := matcher.connector.Send([]byte("----------------------\n" +
  1383. "` 进入或退出命令交互模式\n" +
  1384. "c|x|q|close|exit|quit 断开连接\n" +
  1385. "h|help 打印帮助信息\n" +
  1386. "s|send ip:port class key=value;... 发送一条EIF格式的消息,如:s 127.0.0.1:9999 test number=101;sleep=100;msg=hello world.\n" +
  1387. "----------------------\n"))
  1388. if err != nil {
  1389. matcher.Trace('D', "Error send response: %s", err.Error())
  1390. }
  1391. }
  1392. func (cmd *CommandProcessor) MessageProc(mmatcher MessageMatcher, buf []byte) {
  1393. matcher := mmatcher.(*CommandMatcher)
  1394. if matcher.mode == 1 {
  1395. buf = buf[1:]
  1396. matcher.mode = 2
  1397. }
  1398. m, _ := regexp.Match(`^\s*(?:help|h)\s*$`, buf)
  1399. if m || matcher.welcome == 0 {
  1400. cmd.outputHelp(matcher)
  1401. matcher.welcome = 1
  1402. }
  1403. str := bytes2str(buf[:len(buf)-2])
  1404. re_exit, err := regexp.Compile(`^\s*(?:close|c|exit|x|quit|q)\s*$`)
  1405. if err != nil {
  1406. matcher.Trace('D', "Regexp error: %s", err.Error())
  1407. }
  1408. matcher.Trace('D', "received: %s", str)
  1409. if re_exit.MatchString(str) {
  1410. matcher.Trace('D', "match command exit")
  1411. matcher.connector.Close()
  1412. }
  1413. re_send, err := regexp.Compile(`^\s*(?:send|s)\s+([^:]+):(\d+)\s+(\S*)\s+(.*)$`)
  1414. if err != nil {
  1415. matcher.Trace('D', "Regexp error: %s", err.Error())
  1416. }
  1417. eif_send_args := re_send.FindStringSubmatch(str)
  1418. if len(eif_send_args) > 4 {
  1419. port, _ := strconv.Atoi(eif_send_args[2])
  1420. EIFSend(eif_send_args[1], port, eif_send_args[3], eif_send_args[4])
  1421. }
  1422. m, _ = regexp.Match("^\\s*`\\s*$", buf)
  1423. if m {
  1424. matcher.mode = 0
  1425. }
  1426. if matcher.mode != 0 {
  1427. matcher.connector.Send([]byte("> "))
  1428. }
  1429. }