net.go 17 KB


  1. package chord
  2. import (
  3. "encoding/gob"
  4. "fmt"
  5. "log"
  6. "net"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. /*
  12. TCPTransport provides a TCP based Chord transport layer. This allows Chord
  13. to be implemented over a network, instead of only using the LocalTransport. It is
  14. meant to be a simple implementation, optimizing for simplicity instead of performance.
  15. Messages are sent with a header frame, followed by a body frame. All data is encoded
  16. using the GOB format for simplicity.
  17. Internally, there is 1 Goroutine listening for inbound connections, 1 Goroutine PER
  18. inbound connection.
  19. */
  20. type TCPTransport struct {
  21. sock *net.TCPListener
  22. timeout time.Duration
  23. maxIdle time.Duration
  24. lock sync.RWMutex
  25. local map[string]*localRPC
  26. inbound map[*net.TCPConn]struct{}
  27. poolLock sync.Mutex
  28. pool map[string][]*tcpOutConn
  29. shutdown int32
  30. }
  31. type tcpOutConn struct {
  32. host string
  33. sock *net.TCPConn
  34. header tcpHeader
  35. enc *gob.Encoder
  36. dec *gob.Decoder
  37. used time.Time
  38. }
  39. const (
  40. tcpPing = iota
  41. tcpListReq
  42. tcpGetPredReq
  43. tcpNotifyReq
  44. tcpFindSucReq
  45. tcpClearPredReq
  46. tcpSkipSucReq
  47. )
  48. type tcpHeader struct {
  49. ReqType int
  50. }
  51. // Potential body types
  52. type tcpBodyError struct {
  53. Err error
  54. }
  55. type tcpBodyString struct {
  56. S string
  57. }
  58. type tcpBodyVnode struct {
  59. Vn *Vnode
  60. }
  61. type tcpBodyTwoVnode struct {
  62. Target *Vnode
  63. Vn *Vnode
  64. }
  65. type tcpBodyFindSuc struct {
  66. Target *Vnode
  67. Num int
  68. Key []byte
  69. }
  70. type tcpBodyVnodeError struct {
  71. Vnode *Vnode
  72. Err error
  73. }
  74. type tcpBodyVnodeListError struct {
  75. Vnodes []*Vnode
  76. Err error
  77. }
  78. type tcpBodyBoolError struct {
  79. B bool
  80. Err error
  81. }
  82. // Creates a new TCP transport on the given listen address with the
  83. // configured timeout duration.
  84. func InitTCPTransport(listen string, timeout time.Duration) (*TCPTransport, error) {
  85. // Try to start the listener
  86. sock, err := net.Listen("tcp", listen)
  87. if err != nil {
  88. return nil, err
  89. }
  90. // allocate maps
  91. local := make(map[string]*localRPC)
  92. inbound := make(map[*net.TCPConn]struct{})
  93. pool := make(map[string][]*tcpOutConn)
  94. // Maximum age of a connection
  95. maxIdle := time.Duration(300 * time.Second)
  96. // Setup the transport
  97. tcp := &TCPTransport{sock: sock.(*net.TCPListener),
  98. timeout: timeout,
  99. maxIdle: maxIdle,
  100. local: local,
  101. inbound: inbound,
  102. pool: pool}
  103. // Listen for connections
  104. go tcp.listen()
  105. // Reap old connections
  106. go tcp.reapOld()
  107. // Done
  108. return tcp, nil
  109. }
  110. // Checks for a local vnode
  111. func (t *TCPTransport) get(vn *Vnode) (VnodeRPC, bool) {
  112. key := vn.String()
  113. t.lock.RLock()
  114. defer t.lock.RUnlock()
  115. w, ok := t.local[key]
  116. if ok {
  117. return w.obj, ok
  118. } else {
  119. return nil, ok
  120. }
  121. }
  122. // Gets an outbound connection to a host
  123. func (t *TCPTransport) getConn(host string) (*tcpOutConn, error) {
  124. // Check if we have a conn cached
  125. var out *tcpOutConn
  126. t.poolLock.Lock()
  127. if atomic.LoadInt32(&t.shutdown) == 1 {
  128. t.poolLock.Unlock()
  129. return nil, fmt.Errorf("TCP transport is shutdown")
  130. }
  131. list, ok := t.pool[host]
  132. if ok && len(list) > 0 {
  133. out = list[len(list)-1]
  134. list = list[:len(list)-1]
  135. t.pool[host] = list
  136. }
  137. t.poolLock.Unlock()
  138. if out != nil {
  139. // Verify that the socket is valid. Might be closed.
  140. if _, err := out.sock.Read(nil); err == nil {
  141. return out, nil
  142. }
  143. out.sock.Close()
  144. }
  145. // Try to establish a connection
  146. conn, err := net.DialTimeout("tcp", host, t.timeout)
  147. if err != nil {
  148. return nil, err
  149. }
  150. // Setup the socket
  151. sock := conn.(*net.TCPConn)
  152. t.setupConn(sock)
  153. enc := gob.NewEncoder(sock)
  154. dec := gob.NewDecoder(sock)
  155. now := time.Now()
  156. // Wrap the sock
  157. out = &tcpOutConn{host: host, sock: sock, enc: enc, dec: dec, used: now}
  158. return out, nil
  159. }
  160. // Returns an outbound TCP connection to the pool
  161. func (t *TCPTransport) returnConn(o *tcpOutConn) {
  162. // Update the last used time
  163. o.used = time.Now()
  164. // Push back into the pool
  165. t.poolLock.Lock()
  166. defer t.poolLock.Unlock()
  167. if atomic.LoadInt32(&t.shutdown) == 1 {
  168. o.sock.Close()
  169. return
  170. }
  171. list, _ := t.pool[o.host]
  172. t.pool[o.host] = append(list, o)
  173. }
  174. // Setup a connection
  175. func (t *TCPTransport) setupConn(c *net.TCPConn) {
  176. c.SetNoDelay(true)
  177. c.SetKeepAlive(true)
  178. }
  179. // Gets a list of the Vnodes on the box
  180. func (t *TCPTransport) ListVnodes(host string) ([]*Vnode, error) {
  181. // Get a conn
  182. out, err := t.getConn(host)
  183. if err != nil {
  184. return nil, err
  185. }
  186. // Response channels
  187. respChan := make(chan []*Vnode, 1)
  188. errChan := make(chan error, 1)
  189. go func() {
  190. // Send a list command
  191. out.header.ReqType = tcpListReq
  192. body := tcpBodyString{S: host}
  193. if err := out.enc.Encode(&out.header); err != nil {
  194. errChan <- err
  195. return
  196. }
  197. if err := out.enc.Encode(&body); err != nil {
  198. errChan <- err
  199. return
  200. }
  201. // Read in the response
  202. resp := tcpBodyVnodeListError{}
  203. if err := out.dec.Decode(&resp); err != nil {
  204. errChan <- err
  205. }
  206. // Return the connection
  207. t.returnConn(out)
  208. if resp.Err == nil {
  209. respChan <- resp.Vnodes
  210. } else {
  211. errChan <- resp.Err
  212. }
  213. }()
  214. select {
  215. case <-time.After(t.timeout):
  216. return nil, fmt.Errorf("Command timed out!")
  217. case err := <-errChan:
  218. return nil, err
  219. case res := <-respChan:
  220. return res, nil
  221. }
  222. }
  223. // Ping a Vnode, check for liveness
  224. func (t *TCPTransport) Ping(vn *Vnode) (bool, error) {
  225. // Get a conn
  226. out, err := t.getConn(vn.Host)
  227. if err != nil {
  228. return false, err
  229. }
  230. // Response channels
  231. respChan := make(chan bool, 1)
  232. errChan := make(chan error, 1)
  233. go func() {
  234. // Send a list command
  235. out.header.ReqType = tcpPing
  236. body := tcpBodyVnode{Vn: vn}
  237. if err := out.enc.Encode(&out.header); err != nil {
  238. errChan <- err
  239. return
  240. }
  241. if err := out.enc.Encode(&body); err != nil {
  242. errChan <- err
  243. return
  244. }
  245. // Read in the response
  246. resp := tcpBodyBoolError{}
  247. if err := out.dec.Decode(&resp); err != nil {
  248. errChan <- err
  249. return
  250. }
  251. // Return the connection
  252. t.returnConn(out)
  253. if resp.Err == nil {
  254. respChan <- resp.B
  255. } else {
  256. errChan <- resp.Err
  257. }
  258. }()
  259. select {
  260. case <-time.After(t.timeout):
  261. return false, fmt.Errorf("Command timed out!")
  262. case err := <-errChan:
  263. return false, err
  264. case res := <-respChan:
  265. return res, nil
  266. }
  267. }
  268. // Request a nodes predecessor
  269. func (t *TCPTransport) GetPredecessor(vn *Vnode) (*Vnode, error) {
  270. // Get a conn
  271. out, err := t.getConn(vn.Host)
  272. if err != nil {
  273. return nil, err
  274. }
  275. respChan := make(chan *Vnode, 1)
  276. errChan := make(chan error, 1)
  277. go func() {
  278. // Send a list command
  279. out.header.ReqType = tcpGetPredReq
  280. body := tcpBodyVnode{Vn: vn}
  281. if err := out.enc.Encode(&out.header); err != nil {
  282. errChan <- err
  283. return
  284. }
  285. if err := out.enc.Encode(&body); err != nil {
  286. errChan <- err
  287. return
  288. }
  289. // Read in the response
  290. resp := tcpBodyVnodeError{}
  291. if err := out.dec.Decode(&resp); err != nil {
  292. errChan <- err
  293. return
  294. }
  295. // Return the connection
  296. t.returnConn(out)
  297. if resp.Err == nil {
  298. respChan <- resp.Vnode
  299. } else {
  300. errChan <- resp.Err
  301. }
  302. }()
  303. select {
  304. case <-time.After(t.timeout):
  305. return nil, fmt.Errorf("Command timed out!")
  306. case err := <-errChan:
  307. return nil, err
  308. case res := <-respChan:
  309. return res, nil
  310. }
  311. }
  312. // Notify our successor of ourselves
  313. func (t *TCPTransport) Notify(target, self *Vnode) ([]*Vnode, error) {
  314. // Get a conn
  315. out, err := t.getConn(target.Host)
  316. if err != nil {
  317. return nil, err
  318. }
  319. respChan := make(chan []*Vnode, 1)
  320. errChan := make(chan error, 1)
  321. go func() {
  322. // Send a list command
  323. out.header.ReqType = tcpNotifyReq
  324. body := tcpBodyTwoVnode{Target: target, Vn: self}
  325. if err := out.enc.Encode(&out.header); err != nil {
  326. errChan <- err
  327. return
  328. }
  329. if err := out.enc.Encode(&body); err != nil {
  330. errChan <- err
  331. return
  332. }
  333. // Read in the response
  334. resp := tcpBodyVnodeListError{}
  335. if err := out.dec.Decode(&resp); err != nil {
  336. errChan <- err
  337. return
  338. }
  339. // Return the connection
  340. t.returnConn(out)
  341. if resp.Err == nil {
  342. respChan <- resp.Vnodes
  343. } else {
  344. errChan <- resp.Err
  345. }
  346. }()
  347. select {
  348. case <-time.After(t.timeout):
  349. return nil, fmt.Errorf("Command timed out!")
  350. case err := <-errChan:
  351. return nil, err
  352. case res := <-respChan:
  353. return res, nil
  354. }
  355. }
  356. // Find a successor
  357. func (t *TCPTransport) FindSuccessors(vn *Vnode, n int, k []byte) ([]*Vnode, error) {
  358. // Get a conn
  359. out, err := t.getConn(vn.Host)
  360. if err != nil {
  361. return nil, err
  362. }
  363. respChan := make(chan []*Vnode, 1)
  364. errChan := make(chan error, 1)
  365. go func() {
  366. // Send a list command
  367. out.header.ReqType = tcpFindSucReq
  368. body := tcpBodyFindSuc{Target: vn, Num: n, Key: k}
  369. if err := out.enc.Encode(&out.header); err != nil {
  370. errChan <- err
  371. return
  372. }
  373. if err := out.enc.Encode(&body); err != nil {
  374. errChan <- err
  375. return
  376. }
  377. // Read in the response
  378. resp := tcpBodyVnodeListError{}
  379. if err := out.dec.Decode(&resp); err != nil {
  380. errChan <- err
  381. return
  382. }
  383. // Return the connection
  384. t.returnConn(out)
  385. if resp.Err == nil {
  386. respChan <- resp.Vnodes
  387. } else {
  388. errChan <- resp.Err
  389. }
  390. }()
  391. select {
  392. case <-time.After(t.timeout):
  393. return nil, fmt.Errorf("Command timed out!")
  394. case err := <-errChan:
  395. return nil, err
  396. case res := <-respChan:
  397. return res, nil
  398. }
  399. }
  400. // Clears a predecessor if it matches a given vnode. Used to leave.
  401. func (t *TCPTransport) ClearPredecessor(target, self *Vnode) error {
  402. // Get a conn
  403. out, err := t.getConn(target.Host)
  404. if err != nil {
  405. return err
  406. }
  407. respChan := make(chan bool, 1)
  408. errChan := make(chan error, 1)
  409. go func() {
  410. // Send a list command
  411. out.header.ReqType = tcpClearPredReq
  412. body := tcpBodyTwoVnode{Target: target, Vn: self}
  413. if err := out.enc.Encode(&out.header); err != nil {
  414. errChan <- err
  415. return
  416. }
  417. if err := out.enc.Encode(&body); err != nil {
  418. errChan <- err
  419. return
  420. }
  421. // Read in the response
  422. resp := tcpBodyError{}
  423. if err := out.dec.Decode(&resp); err != nil {
  424. errChan <- err
  425. return
  426. }
  427. // Return the connection
  428. t.returnConn(out)
  429. if resp.Err == nil {
  430. respChan <- true
  431. } else {
  432. errChan <- resp.Err
  433. }
  434. }()
  435. select {
  436. case <-time.After(t.timeout):
  437. return fmt.Errorf("Command timed out!")
  438. case err := <-errChan:
  439. return err
  440. case <-respChan:
  441. return nil
  442. }
  443. }
  444. // Instructs a node to skip a given successor. Used to leave.
  445. func (t *TCPTransport) SkipSuccessor(target, self *Vnode) error {
  446. // Get a conn
  447. out, err := t.getConn(target.Host)
  448. if err != nil {
  449. return err
  450. }
  451. respChan := make(chan bool, 1)
  452. errChan := make(chan error, 1)
  453. go func() {
  454. // Send a list command
  455. out.header.ReqType = tcpSkipSucReq
  456. body := tcpBodyTwoVnode{Target: target, Vn: self}
  457. if err := out.enc.Encode(&out.header); err != nil {
  458. errChan <- err
  459. return
  460. }
  461. if err := out.enc.Encode(&body); err != nil {
  462. errChan <- err
  463. return
  464. }
  465. // Read in the response
  466. resp := tcpBodyError{}
  467. if err := out.dec.Decode(&resp); err != nil {
  468. errChan <- err
  469. return
  470. }
  471. // Return the connection
  472. t.returnConn(out)
  473. if resp.Err == nil {
  474. respChan <- true
  475. } else {
  476. errChan <- resp.Err
  477. }
  478. }()
  479. select {
  480. case <-time.After(t.timeout):
  481. return fmt.Errorf("Command timed out!")
  482. case err := <-errChan:
  483. return err
  484. case <-respChan:
  485. return nil
  486. }
  487. }
  488. // Register for an RPC callbacks
  489. func (t *TCPTransport) Register(v *Vnode, o VnodeRPC) {
  490. key := v.String()
  491. t.lock.Lock()
  492. t.local[key] = &localRPC{v, o}
  493. t.lock.Unlock()
  494. }
  495. // Shutdown the TCP transport
  496. func (t *TCPTransport) Shutdown() {
  497. atomic.StoreInt32(&t.shutdown, 1)
  498. t.sock.Close()
  499. // Close all the inbound connections
  500. t.lock.RLock()
  501. for conn := range t.inbound {
  502. conn.Close()
  503. }
  504. t.lock.RUnlock()
  505. // Close all the outbound
  506. t.poolLock.Lock()
  507. for _, conns := range t.pool {
  508. for _, out := range conns {
  509. out.sock.Close()
  510. }
  511. }
  512. t.pool = nil
  513. t.poolLock.Unlock()
  514. }
  515. // Closes old outbound connections
  516. func (t *TCPTransport) reapOld() {
  517. for {
  518. if atomic.LoadInt32(&t.shutdown) == 1 {
  519. return
  520. }
  521. time.Sleep(30 * time.Second)
  522. t.reapOnce()
  523. }
  524. }
  525. func (t *TCPTransport) reapOnce() {
  526. t.poolLock.Lock()
  527. defer t.poolLock.Unlock()
  528. for host, conns := range t.pool {
  529. max := len(conns)
  530. for i := 0; i < max; i++ {
  531. if time.Since(conns[i].used) > t.maxIdle {
  532. conns[i].sock.Close()
  533. conns[i], conns[max-1] = conns[max-1], nil
  534. max--
  535. i--
  536. }
  537. }
  538. // Trim any idle conns
  539. t.pool[host] = conns[:max]
  540. }
  541. }
  542. // Listens for inbound connections
  543. func (t *TCPTransport) listen() {
  544. for {
  545. conn, err := t.sock.AcceptTCP()
  546. if err != nil {
  547. if atomic.LoadInt32(&t.shutdown) == 0 {
  548. fmt.Printf("[ERR] Error accepting TCP connection! %s", err)
  549. continue
  550. } else {
  551. return
  552. }
  553. }
  554. // Setup the conn
  555. t.setupConn(conn)
  556. // Register the inbound conn
  557. t.lock.Lock()
  558. t.inbound[conn] = struct{}{}
  559. t.lock.Unlock()
  560. // Start handler
  561. go t.handleConn(conn)
  562. }
  563. }
  564. // Handles inbound TCP connections
  565. func (t *TCPTransport) handleConn(conn *net.TCPConn) {
  566. // Defer the cleanup
  567. defer func() {
  568. t.lock.Lock()
  569. delete(t.inbound, conn)
  570. t.lock.Unlock()
  571. conn.Close()
  572. }()
  573. dec := gob.NewDecoder(conn)
  574. enc := gob.NewEncoder(conn)
  575. header := tcpHeader{}
  576. var sendResp interface{}
  577. for {
  578. // Get the header
  579. if err := dec.Decode(&header); err != nil {
  580. if atomic.LoadInt32(&t.shutdown) == 0 && err.Error() != "EOF" {
  581. log.Printf("[ERR] Failed to decode TCP header! Got %s", err)
  582. }
  583. return
  584. }
  585. // Read in the body and process request
  586. switch header.ReqType {
  587. case tcpPing:
  588. body := tcpBodyVnode{}
  589. if err := dec.Decode(&body); err != nil {
  590. log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
  591. return
  592. }
  593. // Generate a response
  594. _, ok := t.get(body.Vn)
  595. if ok {
  596. sendResp = tcpBodyBoolError{B: ok, Err: nil}
  597. } else {
  598. sendResp = tcpBodyBoolError{B: ok, Err: fmt.Errorf("Target VN not found! Target %s:%s",
  599. body.Vn.Host, body.Vn.String())}
  600. }
  601. case tcpListReq:
  602. body := tcpBodyString{}
  603. if err := dec.Decode(&body); err != nil {
  604. log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
  605. return
  606. }
  607. // Generate all the local clients
  608. res := make([]*Vnode, 0, len(t.local))
  609. // Build list
  610. t.lock.RLock()
  611. for _, v := range t.local {
  612. res = append(res, v.vnode)
  613. }
  614. t.lock.RUnlock()
  615. // Make response
  616. sendResp = tcpBodyVnodeListError{Vnodes: trimSlice(res)}
  617. case tcpGetPredReq:
  618. body := tcpBodyVnode{}
  619. if err := dec.Decode(&body); err != nil {
  620. log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
  621. return
  622. }
  623. // Generate a response
  624. obj, ok := t.get(body.Vn)
  625. resp := tcpBodyVnodeError{}
  626. sendResp = &resp
  627. if ok {
  628. node, err := obj.GetPredecessor()
  629. resp.Vnode = node
  630. resp.Err = err
  631. } else {
  632. resp.Err = fmt.Errorf("Target VN not found! Target %s:%s",
  633. body.Vn.Host, body.Vn.String())
  634. }
  635. case tcpNotifyReq:
  636. body := tcpBodyTwoVnode{}
  637. if err := dec.Decode(&body); err != nil {
  638. log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
  639. return
  640. }
  641. if body.Target == nil {
  642. return
  643. }
  644. // Generate a response
  645. obj, ok := t.get(body.Target)
  646. resp := tcpBodyVnodeListError{}
  647. sendResp = &resp
  648. if ok {
  649. nodes, err := obj.Notify(body.Vn)
  650. resp.Vnodes = trimSlice(nodes)
  651. resp.Err = err
  652. } else {
  653. resp.Err = fmt.Errorf("Target VN not found! Target %s:%s",
  654. body.Target.Host, body.Target.String())
  655. }
  656. case tcpFindSucReq:
  657. body := tcpBodyFindSuc{}
  658. if err := dec.Decode(&body); err != nil {
  659. log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
  660. return
  661. }
  662. // Generate a response
  663. obj, ok := t.get(body.Target)
  664. resp := tcpBodyVnodeListError{}
  665. sendResp = &resp
  666. if ok {
  667. nodes, err := obj.FindSuccessors(body.Num, body.Key)
  668. resp.Vnodes = trimSlice(nodes)
  669. resp.Err = err
  670. } else {
  671. resp.Err = fmt.Errorf("Target VN not found! Target %s:%s",
  672. body.Target.Host, body.Target.String())
  673. }
  674. case tcpClearPredReq:
  675. body := tcpBodyTwoVnode{}
  676. if err := dec.Decode(&body); err != nil {
  677. log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
  678. return
  679. }
  680. // Generate a response
  681. obj, ok := t.get(body.Target)
  682. resp := tcpBodyError{}
  683. sendResp = &resp
  684. if ok {
  685. resp.Err = obj.ClearPredecessor(body.Vn)
  686. } else {
  687. resp.Err = fmt.Errorf("Target VN not found! Target %s:%s",
  688. body.Target.Host, body.Target.String())
  689. }
  690. case tcpSkipSucReq:
  691. body := tcpBodyTwoVnode{}
  692. if err := dec.Decode(&body); err != nil {
  693. log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
  694. return
  695. }
  696. // Generate a response
  697. obj, ok := t.get(body.Target)
  698. resp := tcpBodyError{}
  699. sendResp = &resp
  700. if ok {
  701. resp.Err = obj.SkipSuccessor(body.Vn)
  702. } else {
  703. resp.Err = fmt.Errorf("Target VN not found! Target %s:%s",
  704. body.Target.Host, body.Target.String())
  705. }
  706. default:
  707. log.Printf("[ERR] Unknown request type! Got %d", header.ReqType)
  708. return
  709. }
  710. // Send the response
  711. if err := enc.Encode(sendResp); err != nil {
  712. log.Printf("[ERR] Failed to send TCP body! Got %s", err)
  713. return
  714. }
  715. }
  716. }
  717. // Trims the slice to remove nil elements
  718. func trimSlice(vn []*Vnode) []*Vnode {
  719. if vn == nil {
  720. return vn
  721. }
  722. // Find a non-nil index
  723. idx := len(vn) - 1
  724. for vn[idx] == nil {
  725. idx--
  726. }
  727. return vn[:idx+1]
  728. }