vnode.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. package chord
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "log"
  6. "time"
  7. )
  8. // Converts the ID to string
  9. func (vn *Vnode) String() string {
  10. return fmt.Sprintf("%x", vn.Id)
  11. }
  12. // Initializes a local vnode
  13. func (vn *LocalVnode) Init(idx int) {
  14. // Generate an ID
  15. vn.GenId(uint16(idx))
  16. // Set our host
  17. vn.Host = vn.Ring.Config.Hostname
  18. // Initialize all state
  19. vn.Successors = make([]*Vnode, vn.Ring.Config.NumSuccessors)
  20. vn.Finger = make([]*Vnode, vn.Ring.Config.HashBits)
  21. // Register with the RPC mechanism
  22. vn.Ring.Transport.Register(&vn.Vnode, vn)
  23. }
  24. // Schedules the Vnode to do regular maintenence
  25. func (vn *LocalVnode) Schedule() {
  26. // Setup our stabilize timer
  27. vn.Timer = time.AfterFunc(RandStabilize(vn.Ring.Config), vn.Stabilize)
  28. }
  29. // Generates an ID for the node
  30. func (vn *LocalVnode) GenId(idx uint16) {
  31. // Use the hash funciton
  32. conf := vn.Ring.Config
  33. hash := conf.HashFunc()
  34. hash.Write([]byte(conf.Hostname))
  35. binary.Write(hash, binary.BigEndian, idx)
  36. // Use the hash as the ID
  37. vn.Id = hash.Sum(nil)
  38. }
  39. // Called to periodically stabilize the vnode
  40. func (vn *LocalVnode) Stabilize() {
  41. // Clear the timer
  42. vn.Timer = nil
  43. // Check for shutdown
  44. if vn.Ring.ChanShutdown != nil {
  45. vn.Ring.ChanShutdown <- true
  46. return
  47. }
  48. // Setup the next stabilize timer
  49. defer vn.Schedule()
  50. // Check for new successor
  51. if err := vn.CheckNewSuccessor(); err != nil {
  52. log.Printf("[ERR] Error checking for new successor: %s", err)
  53. }
  54. // Notify the successor
  55. if err := vn.NotifySuccessor(); err != nil {
  56. log.Printf("[ERR] Error notifying successor: %s", err)
  57. }
  58. // Finger table fix up
  59. if err := vn.FixFingerTable(); err != nil {
  60. log.Printf("[ERR] Error fixing finger table: %s", err)
  61. }
  62. // Check the predecessor
  63. if err := vn.CheckPredecessor(); err != nil {
  64. log.Printf("[ERR] Error checking predecessor: %s", err)
  65. }
  66. // Set the last stabilized time
  67. vn.Stabilized = time.Now()
  68. }
  69. // Checks for a new successor
  70. func (vn *LocalVnode) CheckNewSuccessor() error {
  71. // Ask our successor for it's predecessor
  72. trans := vn.Ring.Transport
  73. CHECK_NEW_SUC:
  74. succ := vn.Successors[0]
  75. if succ == nil {
  76. panic("Node has no successor!")
  77. }
  78. maybe_suc, err := trans.GetPredecessor(succ)
  79. if err != nil {
  80. // Check if we have succ list, try to contact next live succ
  81. known := vn.KnownSuccessors()
  82. if known > 1 {
  83. for i := 0; i < known; i++ {
  84. if alive, _ := trans.Ping(vn.Successors[0]); !alive {
  85. // Don't eliminate the last successor we know of
  86. if i+1 == known {
  87. return fmt.Errorf("All known Successors dead!")
  88. }
  89. // Advance the Successors list past the dead one
  90. copy(vn.Successors[0:], vn.Successors[1:])
  91. vn.Successors[known-1-i] = nil
  92. } else {
  93. // Found live successor, check for new one
  94. goto CHECK_NEW_SUC
  95. }
  96. }
  97. }
  98. return err
  99. }
  100. // Check if we should replace our successor
  101. if maybe_suc != nil && Between(vn.Id, succ.Id, maybe_suc.Id) {
  102. // Check if new successor is alive before switching
  103. alive, err := trans.Ping(maybe_suc)
  104. if alive && err == nil {
  105. copy(vn.Successors[1:], vn.Successors[0:len(vn.Successors)-1])
  106. vn.Successors[0] = maybe_suc
  107. } else {
  108. return err
  109. }
  110. }
  111. return nil
  112. }
  113. // RPC: Invoked to return out predecessor
  114. func (vn *LocalVnode) GetPredecessor() (*Vnode, error) {
  115. return vn.Predecessor, nil
  116. }
  117. // Notifies our successor of us, updates successor list
  118. func (vn *LocalVnode) NotifySuccessor() error {
  119. // Notify successor
  120. succ := vn.Successors[0]
  121. succ_list, err := vn.Ring.Transport.Notify(succ, &vn.Vnode)
  122. if err != nil {
  123. return err
  124. }
  125. // Trim the Successors list if too long
  126. max_succ := vn.Ring.Config.NumSuccessors
  127. if len(succ_list) > max_succ-1 {
  128. succ_list = succ_list[:max_succ-1]
  129. }
  130. // Update local Successors list
  131. for idx, s := range succ_list {
  132. if s == nil {
  133. break
  134. }
  135. // Ensure we don't set ourselves as a successor!
  136. if s == nil || s.String() == vn.String() {
  137. break
  138. }
  139. vn.Successors[idx+1] = s
  140. }
  141. return nil
  142. }
  143. // RPC: Notify is invoked when a Vnode gets notified
  144. func (vn *LocalVnode) Notify(maybe_pred *Vnode) ([]*Vnode, error) {
  145. // Check if we should update our predecessor
  146. if vn.Predecessor == nil || Between(vn.Predecessor.Id, vn.Id, maybe_pred.Id) {
  147. // Inform the delegate
  148. conf := vn.Ring.Config
  149. old := vn.Predecessor
  150. vn.Ring.InvokeDelegate(func() {
  151. conf.Delegate.NewPredecessor(&vn.Vnode, maybe_pred, old)
  152. })
  153. vn.Predecessor = maybe_pred
  154. }
  155. // Return our Successors list
  156. return vn.Successors, nil
  157. }
  158. // Fixes up the finger table
  159. func (vn *LocalVnode) FixFingerTable() error {
  160. // Determine the offset
  161. hb := vn.Ring.Config.HashBits
  162. offset := PowerOffset(vn.Id, vn.LastFinger, hb)
  163. // Find the successor
  164. nodes, err := vn.FindSuccessors(1, offset)
  165. if nodes == nil || len(nodes) == 0 || err != nil {
  166. return err
  167. }
  168. node := nodes[0]
  169. // Update the finger table
  170. vn.Finger[vn.LastFinger] = node
  171. // Try to skip as many finger entries as possible
  172. for {
  173. next := vn.LastFinger + 1
  174. if next >= hb {
  175. break
  176. }
  177. offset := PowerOffset(vn.Id, next, hb)
  178. // While the node is the successor, update the finger entries
  179. if BetweenRightIncl(vn.Id, node.Id, offset) {
  180. vn.Finger[next] = node
  181. vn.LastFinger = next
  182. } else {
  183. break
  184. }
  185. }
  186. // Increment to the index to repair
  187. if vn.LastFinger+1 == hb {
  188. vn.LastFinger = 0
  189. } else {
  190. vn.LastFinger++
  191. }
  192. return nil
  193. }
  194. // Checks the health of our predecessor
  195. func (vn *LocalVnode) CheckPredecessor() error {
  196. // Check predecessor
  197. if vn.Predecessor != nil {
  198. res, err := vn.Ring.Transport.Ping(vn.Predecessor)
  199. if err != nil {
  200. return err
  201. }
  202. // Predecessor is dead
  203. if !res {
  204. vn.Predecessor = nil
  205. }
  206. }
  207. return nil
  208. }
  209. // Finds next N Successors. N must be <= NumSuccessors
  210. func (vn *LocalVnode) FindSuccessors(n int, key []byte) ([]*Vnode, error) {
  211. // Check if we are the immediate predecessor
  212. if BetweenRightIncl(vn.Id, vn.Successors[0].Id, key) {
  213. return vn.Successors[:n], nil
  214. }
  215. // Try the closest preceeding nodes
  216. cp := ClosestPreceedingVnodeIterator{}
  217. cp.Init(vn, key)
  218. for {
  219. // Get the next closest node
  220. closest := cp.Next()
  221. if closest == nil {
  222. break
  223. }
  224. // Try that node, break on success
  225. res, err := vn.Ring.Transport.FindSuccessors(closest, n, key)
  226. if err == nil {
  227. return res, nil
  228. } else {
  229. log.Printf("[ERR] Failed to contact %s. Got %s", closest.String(), err)
  230. }
  231. }
  232. // Determine how many Successors we know of
  233. Successors := vn.KnownSuccessors()
  234. // Check if the ID is between us and any non-immediate Successors
  235. for i := 1; i <= Successors-n; i++ {
  236. if BetweenRightIncl(vn.Id, vn.Successors[i].Id, key) {
  237. remain := vn.Successors[i:]
  238. if len(remain) > n {
  239. remain = remain[:n]
  240. }
  241. return remain, nil
  242. }
  243. }
  244. // Checked all closer nodes and our Successors!
  245. return nil, fmt.Errorf("Exhausted all preceeding nodes!")
  246. }
  247. // Instructs the vnode to leave
  248. func (vn *LocalVnode) Leave() error {
  249. // Inform the delegate we are leaving
  250. conf := vn.Ring.Config
  251. pred := vn.Predecessor
  252. succ := vn.Successors[0]
  253. vn.Ring.InvokeDelegate(func() {
  254. conf.Delegate.Leaving(&vn.Vnode, pred, succ)
  255. })
  256. // Notify predecessor to advance to their next successor
  257. var err error
  258. trans := vn.Ring.Transport
  259. if vn.Predecessor != nil {
  260. err = trans.SkipSuccessor(vn.Predecessor, &vn.Vnode)
  261. }
  262. // Notify successor to clear old predecessor
  263. err = MergeErrors(err, trans.ClearPredecessor(vn.Successors[0], &vn.Vnode))
  264. return err
  265. }
  266. // Used to clear our predecessor when a node is leaving
  267. func (vn *LocalVnode) ClearPredecessor(p *Vnode) error {
  268. if vn.Predecessor != nil && vn.Predecessor.String() == p.String() {
  269. // Inform the delegate
  270. conf := vn.Ring.Config
  271. old := vn.Predecessor
  272. vn.Ring.InvokeDelegate(func() {
  273. conf.Delegate.PredecessorLeaving(&vn.Vnode, old)
  274. })
  275. vn.Predecessor = nil
  276. }
  277. return nil
  278. }
  279. // Used to skip a successor when a node is leaving
  280. func (vn *LocalVnode) SkipSuccessor(s *Vnode) error {
  281. // Skip if we have a match
  282. if vn.Successors[0].String() == s.String() {
  283. // Inform the delegate
  284. conf := vn.Ring.Config
  285. old := vn.Successors[0]
  286. vn.Ring.InvokeDelegate(func() {
  287. conf.Delegate.SuccessorLeaving(&vn.Vnode, old)
  288. })
  289. known := vn.KnownSuccessors()
  290. copy(vn.Successors[0:], vn.Successors[1:])
  291. vn.Successors[known-1] = nil
  292. }
  293. return nil
  294. }
  295. // Determine how many Successors we know of
  296. func (vn *LocalVnode) KnownSuccessors() (Successors int) {
  297. for i := 0; i < len(vn.Successors); i++ {
  298. if vn.Successors[i] != nil {
  299. Successors = i + 1
  300. }
  301. }
  302. return
  303. }