chord.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. /*
  2. This package is used to provide an implementation of the
  3. Chord network protocol.
  4. */
  5. package chord
  6. import (
  7. "crypto/sha1"
  8. "fmt"
  9. "hash"
  10. "time"
  11. )
  12. // Implements the methods needed for a Chord ring
  13. type Transport interface {
  14. // Gets a list of the Vnodes on the box
  15. ListVnodes(string) ([]*Vnode, error)
  16. // Ping a Vnode, check for liveness
  17. Ping(*Vnode) (bool, error)
  18. // Request a nodes predecessor
  19. GetPredecessor(*Vnode) (*Vnode, error)
  20. // Notify our successor of ourselves
  21. Notify(target, self *Vnode) ([]*Vnode, error)
  22. // Find a successor
  23. FindSuccessors(*Vnode, int, []byte) ([]*Vnode, error)
  24. // Clears a predecessor if it matches a given vnode. Used to leave.
  25. ClearPredecessor(target, self *Vnode) error
  26. // Instructs a node to skip a given successor. Used to leave.
  27. SkipSuccessor(target, self *Vnode) error
  28. // Register for an RPC callbacks
  29. Register(*Vnode, VnodeRPC)
  30. }
  31. // These are the methods to invoke on the registered Vnodes
  32. type VnodeRPC interface {
  33. GetPredecessor() (*Vnode, error)
  34. Notify(*Vnode) ([]*Vnode, error)
  35. FindSuccessors(int, []byte) ([]*Vnode, error)
  36. ClearPredecessor(*Vnode) error
  37. SkipSuccessor(*Vnode) error
  38. }
  39. // Delegate to notify on ring events
  40. type Delegate interface {
  41. NewPredecessor(local, remoteNew, remotePrev *Vnode)
  42. Leaving(local, pred, succ *Vnode)
  43. PredecessorLeaving(local, remote *Vnode)
  44. SuccessorLeaving(local, remote *Vnode)
  45. Shutdown()
  46. }
  47. // Configuration for Chord nodes
  48. type Config struct {
  49. Hostname string // Local host name
  50. NumVnodes int // Number of Vnodes per physical node
  51. HashFunc func() hash.Hash // Hash function to use
  52. StabilizeMin time.Duration // Minimum stabilization time
  53. StabilizeMax time.Duration // Maximum stabilization time
  54. NumSuccessors int // Number of Successors to maintain
  55. Delegate Delegate // Invoked to handle ring events
  56. HashBits int // Bit size of the hash function
  57. }
  58. // Represents an Vnode, local or remote
  59. type Vnode struct {
  60. Id []byte // Virtual ID
  61. Host string // Host identifier
  62. }
  63. // Represents a local Vnode
  64. type LocalVnode struct {
  65. Vnode
  66. Ring *Ring
  67. Successors []*Vnode
  68. Finger []*Vnode
  69. LastFinger int
  70. Predecessor *Vnode
  71. Stabilized time.Time
  72. Timer *time.Timer
  73. }
  74. // Stores the state required for a Chord ring
  75. type Ring struct {
  76. Config *Config
  77. Transport Transport
  78. Vnodes []*LocalVnode
  79. delegateCh chan func()
  80. ChanShutdown chan bool
  81. }
  82. // Returns the default Ring configuration
  83. func DefaultConfig(hostname string) *Config {
  84. return &Config{
  85. hostname,
  86. 8, // 8 Vnodes
  87. sha1.New, // SHA1
  88. time.Duration(15 * time.Second),
  89. time.Duration(45 * time.Second),
  90. 8, // 8 Successors
  91. nil, // No delegate
  92. 160, // 160bit hash function
  93. }
  94. }
  95. // Creates a new Chord ring given the config and transport
  96. func Create(conf *Config, trans Transport) (*Ring, error) {
  97. // Initialize the hash bits
  98. conf.HashBits = conf.HashFunc().Size() * 8
  99. // Create and initialize a ring
  100. ring := &Ring{}
  101. ring.Init(conf, trans)
  102. ring.SetLocalSuccessors()
  103. ring.Schedule()
  104. return ring, nil
  105. }
  106. // Joins an existing Chord ring
  107. func Join(conf *Config, trans Transport, existing string) (*Ring, error) {
  108. // Initialize the hash bits
  109. conf.HashBits = conf.HashFunc().Size() * 8
  110. // Request a list of Vnodes from the remote host
  111. hosts, err := trans.ListVnodes(existing)
  112. if err != nil {
  113. return nil, err
  114. }
  115. if hosts == nil || len(hosts) == 0 {
  116. return nil, fmt.Errorf("Remote host has no Vnodes!")
  117. }
  118. // Create a ring
  119. ring := &Ring{}
  120. ring.Init(conf, trans)
  121. // Acquire a live successor for each Vnode
  122. for _, vn := range ring.Vnodes {
  123. // Get the nearest remote vnode
  124. nearest := NearestVnodeToKey(hosts, vn.Id)
  125. // Query for a list of Successors to this Vnode
  126. succs, err := trans.FindSuccessors(nearest, conf.NumSuccessors, vn.Id)
  127. if err != nil {
  128. return nil, fmt.Errorf("Failed to find successor for Vnodes! Got %s", err)
  129. }
  130. if succs == nil || len(succs) == 0 {
  131. return nil, fmt.Errorf("Failed to find successor for Vnodes! Got no Vnodes!")
  132. }
  133. // Assign the Successors
  134. for idx, s := range succs {
  135. vn.Successors[idx] = s
  136. }
  137. }
  138. // Start delegate handler
  139. if ring.Config.Delegate != nil {
  140. go ring.DelegateHandler()
  141. }
  142. // Do a fast stabilization, will schedule regular execution
  143. for _, vn := range ring.Vnodes {
  144. vn.Stabilize()
  145. }
  146. return ring, nil
  147. }
  148. // Leaves a given Chord ring and shuts down the local Vnodes
  149. func (r *Ring) Leave() error {
  150. // Shutdown the Vnodes first to avoid further stabilization runs
  151. r.StopVnodes()
  152. // Instruct each vnode to leave
  153. var err error
  154. for _, vn := range r.Vnodes {
  155. err = MergeErrors(err, vn.Leave())
  156. }
  157. // Wait for the delegate callbacks to complete
  158. r.StopDelegate()
  159. return err
  160. }
  161. // Shutdown shuts down the local processes in a given Chord ring
  162. // Blocks until all the Vnodes terminate.
  163. func (r *Ring) Shutdown() {
  164. r.StopVnodes()
  165. r.StopDelegate()
  166. }
  167. // Does a key lookup for up to N Successors of a key
  168. func (r *Ring) Lookup(n int, key []byte) ([]*Vnode, error) {
  169. // Ensure that n is sane
  170. if n > r.Config.NumSuccessors {
  171. return nil, fmt.Errorf("Cannot ask for more Successors than NumSuccessors!")
  172. }
  173. // Hash the key
  174. h := r.Config.HashFunc()
  175. h.Write(key)
  176. key_hash := h.Sum(nil)
  177. // Find the nearest local vnode
  178. nearest := r.NearestVnode(key_hash)
  179. // Use the nearest node for the lookup
  180. Successors, err := nearest.FindSuccessors(n, key_hash)
  181. if err != nil {
  182. return nil, err
  183. }
  184. // Trim the nil Successors
  185. for Successors[len(Successors)-1] == nil {
  186. Successors = Successors[:len(Successors)-1]
  187. }
  188. return Successors, nil
  189. }