123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- package chord
- import (
- "bytes"
- "log"
- "sort"
- )
- func (r *Ring) Init(conf *Config, trans Transport) {
- // Set our variables
- r.Config = conf
- r.Vnodes = make([]*LocalVnode, conf.NumVnodes)
- r.Transport = InitLocalTransport(trans)
- r.delegateCh = make(chan func(), 32)
- // Initializes the Vnodes
- for i := 0; i < conf.NumVnodes; i++ {
- vn := &LocalVnode{}
- r.Vnodes[i] = vn
- vn.Ring = r
- vn.Init(i)
- }
- // Sort the Vnodes
- sort.Sort(r)
- }
- // Len is the number of Vnodes
- func (r *Ring) Len() int {
- return len(r.Vnodes)
- }
- // Less returns whether the vnode with index i should sort
- // before the vnode with index j.
- func (r *Ring) Less(i, j int) bool {
- return bytes.Compare(r.Vnodes[i].Id, r.Vnodes[j].Id) == -1
- }
- // Swap swaps the Vnodes with indexes i and j.
- func (r *Ring) Swap(i, j int) {
- r.Vnodes[i], r.Vnodes[j] = r.Vnodes[j], r.Vnodes[i]
- }
- // Returns the nearest local vnode to the key
- func (r *Ring) NearestVnode(key []byte) *LocalVnode {
- for i := len(r.Vnodes) - 1; i >= 0; i-- {
- if bytes.Compare(r.Vnodes[i].Id, key) == -1 {
- return r.Vnodes[i]
- }
- }
- // Return the last vnode
- return r.Vnodes[len(r.Vnodes)-1]
- }
- // Schedules each vnode in the ring
- func (r *Ring) Schedule() {
- if r.Config.Delegate != nil {
- go r.DelegateHandler()
- }
- for i := 0; i < len(r.Vnodes); i++ {
- r.Vnodes[i].Schedule()
- }
- }
- // Wait for all the Vnodes to shutdown
- func (r *Ring) StopVnodes() {
- r.ChanShutdown = make(chan bool, r.Config.NumVnodes)
- for i := 0; i < r.Config.NumVnodes; i++ {
- <-r.ChanShutdown
- }
- }
- // Stops the delegate handler
- func (r *Ring) StopDelegate() {
- if r.Config.Delegate != nil {
- // Wait for all delegate messages to be processed
- <-r.InvokeDelegate(r.Config.Delegate.Shutdown)
- close(r.delegateCh)
- }
- }
- // Initializes the Vnodes with their local Successors
- func (r *Ring) SetLocalSuccessors() {
- numV := len(r.Vnodes)
- numSuc := Min(r.Config.NumSuccessors, numV-1)
- for idx, vnode := range r.Vnodes {
- for i := 0; i < numSuc; i++ {
- vnode.Successors[i] = &r.Vnodes[(idx+i+1)%numV].Vnode
- }
- }
- }
- // Invokes a function on the delegate and returns completion channel
- func (r *Ring) InvokeDelegate(f func()) chan struct{} {
- if r.Config.Delegate == nil {
- return nil
- }
- ch := make(chan struct{}, 1)
- wrapper := func() {
- defer func() {
- ch <- struct{}{}
- }()
- f()
- }
- r.delegateCh <- wrapper
- return ch
- }
- // This handler runs in a go routine to invoke methods on the delegate
- func (r *Ring) DelegateHandler() {
- for {
- f, ok := <-r.delegateCh
- if !ok {
- break
- }
- r.safeInvoke(f)
- }
- }
- // Called to safely call a function on the delegate
- func (r *Ring) safeInvoke(f func()) {
- defer func() {
- if r := recover(); r != nil {
- log.Printf("Caught a panic invoking a delegate function! Got: %s", r)
- }
- }()
- f()
- }
|