ring.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package chord
  2. import (
  3. "bytes"
  4. "log"
  5. "sort"
  6. )
  7. func (r *Ring) Init(conf *Config, trans Transport) {
  8. // Set our variables
  9. r.Config = conf
  10. r.Vnodes = make([]*LocalVnode, conf.NumVnodes)
  11. r.Transport = InitLocalTransport(trans)
  12. r.delegateCh = make(chan func(), 32)
  13. // Initializes the Vnodes
  14. for i := 0; i < conf.NumVnodes; i++ {
  15. vn := &LocalVnode{}
  16. r.Vnodes[i] = vn
  17. vn.Ring = r
  18. vn.Init(i)
  19. }
  20. // Sort the Vnodes
  21. sort.Sort(r)
  22. }
  23. // Len is the number of Vnodes
  24. func (r *Ring) Len() int {
  25. return len(r.Vnodes)
  26. }
  27. // Less returns whether the vnode with index i should sort
  28. // before the vnode with index j.
  29. func (r *Ring) Less(i, j int) bool {
  30. return bytes.Compare(r.Vnodes[i].Id, r.Vnodes[j].Id) == -1
  31. }
  32. // Swap swaps the Vnodes with indexes i and j.
  33. func (r *Ring) Swap(i, j int) {
  34. r.Vnodes[i], r.Vnodes[j] = r.Vnodes[j], r.Vnodes[i]
  35. }
  36. // Returns the nearest local vnode to the key
  37. func (r *Ring) NearestVnode(key []byte) *LocalVnode {
  38. for i := len(r.Vnodes) - 1; i >= 0; i-- {
  39. if bytes.Compare(r.Vnodes[i].Id, key) == -1 {
  40. return r.Vnodes[i]
  41. }
  42. }
  43. // Return the last vnode
  44. return r.Vnodes[len(r.Vnodes)-1]
  45. }
  46. // Schedules each vnode in the ring
  47. func (r *Ring) Schedule() {
  48. if r.Config.Delegate != nil {
  49. go r.DelegateHandler()
  50. }
  51. for i := 0; i < len(r.Vnodes); i++ {
  52. r.Vnodes[i].Schedule()
  53. }
  54. }
  55. // Wait for all the Vnodes to shutdown
  56. func (r *Ring) StopVnodes() {
  57. r.ChanShutdown = make(chan bool, r.Config.NumVnodes)
  58. for i := 0; i < r.Config.NumVnodes; i++ {
  59. <-r.ChanShutdown
  60. }
  61. }
  62. // Stops the delegate handler
  63. func (r *Ring) StopDelegate() {
  64. if r.Config.Delegate != nil {
  65. // Wait for all delegate messages to be processed
  66. <-r.InvokeDelegate(r.Config.Delegate.Shutdown)
  67. close(r.delegateCh)
  68. }
  69. }
  70. // Initializes the Vnodes with their local Successors
  71. func (r *Ring) SetLocalSuccessors() {
  72. numV := len(r.Vnodes)
  73. numSuc := Min(r.Config.NumSuccessors, numV-1)
  74. for idx, vnode := range r.Vnodes {
  75. for i := 0; i < numSuc; i++ {
  76. vnode.Successors[i] = &r.Vnodes[(idx+i+1)%numV].Vnode
  77. }
  78. }
  79. }
  80. // Invokes a function on the delegate and returns completion channel
  81. func (r *Ring) InvokeDelegate(f func()) chan struct{} {
  82. if r.Config.Delegate == nil {
  83. return nil
  84. }
  85. ch := make(chan struct{}, 1)
  86. wrapper := func() {
  87. defer func() {
  88. ch <- struct{}{}
  89. }()
  90. f()
  91. }
  92. r.delegateCh <- wrapper
  93. return ch
  94. }
  95. // This handler runs in a go routine to invoke methods on the delegate
  96. func (r *Ring) DelegateHandler() {
  97. for {
  98. f, ok := <-r.delegateCh
  99. if !ok {
  100. break
  101. }
  102. r.safeInvoke(f)
  103. }
  104. }
  105. // Called to safely call a function on the delegate
  106. func (r *Ring) safeInvoke(f func()) {
  107. defer func() {
  108. if r := recover(); r != nil {
  109. log.Printf("Caught a panic invoking a delegate function! Got: %s", r)
  110. }
  111. }()
  112. f()
  113. }