| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 | package chordimport (	"bytes"	"log"	"sort")func (r *Ring) Init(conf *Config, trans Transport) {	// Set our variables	r.Config = conf	r.Vnodes = make([]*LocalVnode, conf.NumVnodes)	r.Transport = InitLocalTransport(trans)	r.delegateCh = make(chan func(), 32)	// Initializes the Vnodes	for i := 0; i < conf.NumVnodes; i++ {		vn := &LocalVnode{}		r.Vnodes[i] = vn		vn.Ring = r		vn.Init(i)	}	// Sort the Vnodes	sort.Sort(r)}// Len is the number of Vnodesfunc (r *Ring) Len() int {	return len(r.Vnodes)}// Less returns whether the vnode with index i should sort// before the vnode with index j.func (r *Ring) Less(i, j int) bool {	return bytes.Compare(r.Vnodes[i].Id, r.Vnodes[j].Id) == -1}// Swap swaps the Vnodes with indexes i and j.func (r *Ring) Swap(i, j int) {	r.Vnodes[i], r.Vnodes[j] = r.Vnodes[j], r.Vnodes[i]}// Returns the nearest local vnode to the keyfunc (r *Ring) NearestVnode(key []byte) *LocalVnode {	for i := len(r.Vnodes) - 1; i >= 0; i-- {		if bytes.Compare(r.Vnodes[i].Id, key) == -1 {			return r.Vnodes[i]		}	}	// Return the last vnode	return r.Vnodes[len(r.Vnodes)-1]}// Schedules each vnode in the ringfunc (r *Ring) Schedule() {	if r.Config.Delegate != nil {		go r.DelegateHandler()	}	for i := 0; i < len(r.Vnodes); i++ {		r.Vnodes[i].Schedule()	}}// Wait for all the Vnodes to shutdownfunc (r *Ring) StopVnodes() {	r.ChanShutdown = make(chan bool, r.Config.NumVnodes)	for i := 0; i < r.Config.NumVnodes; i++ {		<-r.ChanShutdown	}}// Stops the delegate handlerfunc (r *Ring) StopDelegate() {	if r.Config.Delegate != nil {		// Wait for all delegate messages to be processed		<-r.InvokeDelegate(r.Config.Delegate.Shutdown)		close(r.delegateCh)	}}// Initializes the Vnodes with their local Successorsfunc (r *Ring) SetLocalSuccessors() {	numV := len(r.Vnodes)	numSuc := Min(r.Config.NumSuccessors, numV-1)	for idx, vnode := range r.Vnodes {		for i := 0; i < numSuc; i++ {			vnode.Successors[i] = &r.Vnodes[(idx+i+1)%numV].Vnode		}	}}// Invokes a function on the delegate and returns completion channelfunc (r *Ring) InvokeDelegate(f func()) chan struct{} {	if r.Config.Delegate == nil {		return nil	}	ch := make(chan struct{}, 1)	wrapper := func() {		defer func() {			ch <- struct{}{}		}()		f()	}	r.delegateCh <- wrapper	return ch}// This handler runs in a go routine to invoke methods on the delegatefunc (r *Ring) DelegateHandler() {	for {		f, ok := <-r.delegateCh		if !ok {			break		}		r.safeInvoke(f)	}}// Called to safely call a function on the delegatefunc (r *Ring) safeInvoke(f func()) {	defer func() {		if r := recover(); r != nil {			log.Printf("Caught a panic invoking a delegate function! Got: %s", r)		}	}()	f()}
 |