| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 | package chordimport (	"fmt"	"sync")// Wraps vnode and objecttype LocalRPC struct {	*Vnode	Obj VnodeRPC}// LocalTransport is used to provides fast routing to Vnodes running// locally using direct method calls. For any non-local Vnodes, the// request is passed on to another transport.type LocalTransport struct {	host   string	Remote Transport	lock   sync.RWMutex	Local  map[string]*LocalRPC}// Creates a local transport to wrap a remote transportfunc InitLocalTransport(remote Transport) Transport {	// Replace a nil transport with black hole	if remote == nil {		remote = &BlackholeTransport{}	}	local := make(map[string]*LocalRPC)	return &LocalTransport{Remote: remote, Local: local}}// Checks for a local vnodefunc (lt *LocalTransport) get(vn *Vnode) (VnodeRPC, bool) {	key := vn.String()	lt.lock.RLock()	defer lt.lock.RUnlock()	w, ok := lt.Local[key]	if ok {		return w.Obj, ok	} else {		return nil, ok	}}func (lt *LocalTransport) ListVnodes(host string) ([]*Vnode, error) {	// Check if this is a local host	if host == lt.host {		// Generate all the local clients		res := make([]*Vnode, 0, len(lt.Local))		// Build list		lt.lock.RLock()		for _, v := range lt.Local {			res = append(res, v.Vnode)		}		lt.lock.RUnlock()		return res, nil	}	// Pass onto remote	return lt.Remote.ListVnodes(host)}func (lt *LocalTransport) Ping(vn *Vnode) (bool, error) {	// Look for it locally	_, ok := lt.get(vn)	// If it exists locally, handle it	if ok {		return true, nil	}	// Pass onto remote	return lt.Remote.Ping(vn)}func (lt *LocalTransport) GetPredecessor(vn *Vnode) (*Vnode, error) {	// Look for it locally	obj, ok := lt.get(vn)	// If it exists locally, handle it	if ok {		return obj.GetPredecessor()	}	// Pass onto remote	return lt.Remote.GetPredecessor(vn)}func (lt *LocalTransport) Notify(vn, self *Vnode) ([]*Vnode, error) {	// Look for it locally	obj, ok := lt.get(vn)	// If it exists locally, handle it	if ok {		return obj.Notify(self)	}	// Pass onto remote	return lt.Remote.Notify(vn, self)}func (lt *LocalTransport) FindSuccessors(vn *Vnode, n int, key []byte) ([]*Vnode, error) {	// Look for it locally	obj, ok := lt.get(vn)	// If it exists locally, handle it	if ok {		return obj.FindSuccessors(n, key)	}	// Pass onto remote	return lt.Remote.FindSuccessors(vn, n, key)}func (lt *LocalTransport) ClearPredecessor(target, self *Vnode) error {	// Look for it locally	obj, ok := lt.get(target)	// If it exists locally, handle it	if ok {		return obj.ClearPredecessor(self)	}	// Pass onto remote	return lt.Remote.ClearPredecessor(target, self)}func (lt *LocalTransport) SkipSuccessor(target, self *Vnode) error {	// Look for it locally	obj, ok := lt.get(target)	// If it exists locally, handle it	if ok {		return obj.SkipSuccessor(self)	}	// Pass onto remote	return lt.Remote.SkipSuccessor(target, self)}func (lt *LocalTransport) Register(v *Vnode, o VnodeRPC) {	// Register local instance	key := v.String()	lt.lock.Lock()	lt.host = v.Host	lt.Local[key] = &LocalRPC{v, o}	lt.lock.Unlock()	// Register with remote transport	lt.Remote.Register(v, o)}func (lt *LocalTransport) Deregister(v *Vnode) {	key := v.String()	lt.lock.Lock()	delete(lt.Local, key)	lt.lock.Unlock()}// BlackholeTransport is used to provide an implemenation of the Transport that// does not actually do anything. Any operation will result in an error.type BlackholeTransport struct {}func (*BlackholeTransport) ListVnodes(host string) ([]*Vnode, error) {	return nil, fmt.Errorf("Failed to connect! Blackhole: %s.", host)}func (*BlackholeTransport) Ping(vn *Vnode) (bool, error) {	return false, nil}func (*BlackholeTransport) GetPredecessor(vn *Vnode) (*Vnode, error) {	return nil, fmt.Errorf("Failed to connect! Blackhole: %s.", vn.String())}func (*BlackholeTransport) Notify(vn, self *Vnode) ([]*Vnode, error) {	return nil, fmt.Errorf("Failed to connect! Blackhole: %s", vn.String())}func (*BlackholeTransport) FindSuccessors(vn *Vnode, n int, key []byte) ([]*Vnode, error) {	return nil, fmt.Errorf("Failed to connect! Blackhole: %s", vn.String())}func (*BlackholeTransport) ClearPredecessor(target, self *Vnode) error {	return fmt.Errorf("Failed to connect! Blackhole: %s", target.String())}func (*BlackholeTransport) SkipSuccessor(target, self *Vnode) error {	return fmt.Errorf("Failed to connect! Blackhole: %s", target.String())}func (*BlackholeTransport) Register(v *Vnode, o VnodeRPC) {}
 |