transport.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package chord
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. // Wraps vnode and object
  7. type LocalRPC struct {
  8. *Vnode
  9. Obj VnodeRPC
  10. }
  11. // LocalTransport is used to provides fast routing to Vnodes running
  12. // locally using direct method calls. For any non-local Vnodes, the
  13. // request is passed on to another transport.
  14. type LocalTransport struct {
  15. host string
  16. Remote Transport
  17. lock sync.RWMutex
  18. Local map[string]*LocalRPC
  19. }
  20. // Creates a local transport to wrap a remote transport
  21. func InitLocalTransport(remote Transport) Transport {
  22. // Replace a nil transport with black hole
  23. if remote == nil {
  24. remote = &BlackholeTransport{}
  25. }
  26. local := make(map[string]*LocalRPC)
  27. return &LocalTransport{Remote: remote, Local: local}
  28. }
  29. // Checks for a local vnode
  30. func (lt *LocalTransport) get(vn *Vnode) (VnodeRPC, bool) {
  31. key := vn.String()
  32. lt.lock.RLock()
  33. defer lt.lock.RUnlock()
  34. w, ok := lt.Local[key]
  35. if ok {
  36. return w.Obj, ok
  37. } else {
  38. return nil, ok
  39. }
  40. }
  41. func (lt *LocalTransport) ListVnodes(host string) ([]*Vnode, error) {
  42. // Check if this is a local host
  43. if host == lt.host {
  44. // Generate all the local clients
  45. res := make([]*Vnode, 0, len(lt.Local))
  46. // Build list
  47. lt.lock.RLock()
  48. for _, v := range lt.Local {
  49. res = append(res, v.Vnode)
  50. }
  51. lt.lock.RUnlock()
  52. return res, nil
  53. }
  54. // Pass onto remote
  55. return lt.Remote.ListVnodes(host)
  56. }
  57. func (lt *LocalTransport) Ping(vn *Vnode) (bool, error) {
  58. // Look for it locally
  59. _, ok := lt.get(vn)
  60. // If it exists locally, handle it
  61. if ok {
  62. return true, nil
  63. }
  64. // Pass onto remote
  65. return lt.Remote.Ping(vn)
  66. }
  67. func (lt *LocalTransport) GetPredecessor(vn *Vnode) (*Vnode, error) {
  68. // Look for it locally
  69. obj, ok := lt.get(vn)
  70. // If it exists locally, handle it
  71. if ok {
  72. return obj.GetPredecessor()
  73. }
  74. // Pass onto remote
  75. return lt.Remote.GetPredecessor(vn)
  76. }
  77. func (lt *LocalTransport) Notify(vn, self *Vnode) ([]*Vnode, error) {
  78. // Look for it locally
  79. obj, ok := lt.get(vn)
  80. // If it exists locally, handle it
  81. if ok {
  82. return obj.Notify(self)
  83. }
  84. // Pass onto remote
  85. return lt.Remote.Notify(vn, self)
  86. }
  87. func (lt *LocalTransport) FindSuccessors(vn *Vnode, n int, key []byte) ([]*Vnode, error) {
  88. // Look for it locally
  89. obj, ok := lt.get(vn)
  90. // If it exists locally, handle it
  91. if ok {
  92. return obj.FindSuccessors(n, key)
  93. }
  94. // Pass onto remote
  95. return lt.Remote.FindSuccessors(vn, n, key)
  96. }
  97. func (lt *LocalTransport) ClearPredecessor(target, self *Vnode) error {
  98. // Look for it locally
  99. obj, ok := lt.get(target)
  100. // If it exists locally, handle it
  101. if ok {
  102. return obj.ClearPredecessor(self)
  103. }
  104. // Pass onto remote
  105. return lt.Remote.ClearPredecessor(target, self)
  106. }
  107. func (lt *LocalTransport) SkipSuccessor(target, self *Vnode) error {
  108. // Look for it locally
  109. obj, ok := lt.get(target)
  110. // If it exists locally, handle it
  111. if ok {
  112. return obj.SkipSuccessor(self)
  113. }
  114. // Pass onto remote
  115. return lt.Remote.SkipSuccessor(target, self)
  116. }
  117. func (lt *LocalTransport) Register(v *Vnode, o VnodeRPC) {
  118. // Register local instance
  119. key := v.String()
  120. lt.lock.Lock()
  121. lt.host = v.Host
  122. lt.Local[key] = &LocalRPC{v, o}
  123. lt.lock.Unlock()
  124. // Register with remote transport
  125. lt.Remote.Register(v, o)
  126. }
  127. func (lt *LocalTransport) Deregister(v *Vnode) {
  128. key := v.String()
  129. lt.lock.Lock()
  130. delete(lt.Local, key)
  131. lt.lock.Unlock()
  132. }
  133. // BlackholeTransport is used to provide an implemenation of the Transport that
  134. // does not actually do anything. Any operation will result in an error.
  135. type BlackholeTransport struct {
  136. }
  137. func (*BlackholeTransport) ListVnodes(host string) ([]*Vnode, error) {
  138. return nil, fmt.Errorf("Failed to connect! Blackhole: %s.", host)
  139. }
  140. func (*BlackholeTransport) Ping(vn *Vnode) (bool, error) {
  141. return false, nil
  142. }
  143. func (*BlackholeTransport) GetPredecessor(vn *Vnode) (*Vnode, error) {
  144. return nil, fmt.Errorf("Failed to connect! Blackhole: %s.", vn.String())
  145. }
  146. func (*BlackholeTransport) Notify(vn, self *Vnode) ([]*Vnode, error) {
  147. return nil, fmt.Errorf("Failed to connect! Blackhole: %s", vn.String())
  148. }
  149. func (*BlackholeTransport) FindSuccessors(vn *Vnode, n int, key []byte) ([]*Vnode, error) {
  150. return nil, fmt.Errorf("Failed to connect! Blackhole: %s", vn.String())
  151. }
  152. func (*BlackholeTransport) ClearPredecessor(target, self *Vnode) error {
  153. return fmt.Errorf("Failed to connect! Blackhole: %s", target.String())
  154. }
  155. func (*BlackholeTransport) SkipSuccessor(target, self *Vnode) error {
  156. return fmt.Errorf("Failed to connect! Blackhole: %s", target.String())
  157. }
  158. func (*BlackholeTransport) Register(v *Vnode, o VnodeRPC) {
  159. }