123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- package chord
- import (
- "fmt"
- "sync"
- )
- // Wraps vnode and object
- type LocalRPC struct {
- *Vnode
- Obj VnodeRPC
- }
- // LocalTransport is used to provides fast routing to Vnodes running
- // locally using direct method calls. For any non-local Vnodes, the
- // request is passed on to another transport.
- type LocalTransport struct {
- host string
- Remote Transport
- lock sync.RWMutex
- Local map[string]*LocalRPC
- }
- // Creates a local transport to wrap a remote transport
- func InitLocalTransport(remote Transport) Transport {
- // Replace a nil transport with black hole
- if remote == nil {
- remote = &BlackholeTransport{}
- }
- local := make(map[string]*LocalRPC)
- return &LocalTransport{Remote: remote, Local: local}
- }
- // Checks for a local vnode
- func (lt *LocalTransport) get(vn *Vnode) (VnodeRPC, bool) {
- key := vn.String()
- lt.lock.RLock()
- defer lt.lock.RUnlock()
- w, ok := lt.Local[key]
- if ok {
- return w.Obj, ok
- } else {
- return nil, ok
- }
- }
- func (lt *LocalTransport) ListVnodes(host string) ([]*Vnode, error) {
- // Check if this is a local host
- if host == lt.host {
- // Generate all the local clients
- res := make([]*Vnode, 0, len(lt.Local))
- // Build list
- lt.lock.RLock()
- for _, v := range lt.Local {
- res = append(res, v.Vnode)
- }
- lt.lock.RUnlock()
- return res, nil
- }
- // Pass onto remote
- return lt.Remote.ListVnodes(host)
- }
- func (lt *LocalTransport) Ping(vn *Vnode) (bool, error) {
- // Look for it locally
- _, ok := lt.get(vn)
- // If it exists locally, handle it
- if ok {
- return true, nil
- }
- // Pass onto remote
- return lt.Remote.Ping(vn)
- }
- func (lt *LocalTransport) GetPredecessor(vn *Vnode) (*Vnode, error) {
- // Look for it locally
- obj, ok := lt.get(vn)
- // If it exists locally, handle it
- if ok {
- return obj.GetPredecessor()
- }
- // Pass onto remote
- return lt.Remote.GetPredecessor(vn)
- }
- func (lt *LocalTransport) Notify(vn, self *Vnode) ([]*Vnode, error) {
- // Look for it locally
- obj, ok := lt.get(vn)
- // If it exists locally, handle it
- if ok {
- return obj.Notify(self)
- }
- // Pass onto remote
- return lt.Remote.Notify(vn, self)
- }
- func (lt *LocalTransport) FindSuccessors(vn *Vnode, n int, key []byte) ([]*Vnode, error) {
- // Look for it locally
- obj, ok := lt.get(vn)
- // If it exists locally, handle it
- if ok {
- return obj.FindSuccessors(n, key)
- }
- // Pass onto remote
- return lt.Remote.FindSuccessors(vn, n, key)
- }
- func (lt *LocalTransport) ClearPredecessor(target, self *Vnode) error {
- // Look for it locally
- obj, ok := lt.get(target)
- // If it exists locally, handle it
- if ok {
- return obj.ClearPredecessor(self)
- }
- // Pass onto remote
- return lt.Remote.ClearPredecessor(target, self)
- }
- func (lt *LocalTransport) SkipSuccessor(target, self *Vnode) error {
- // Look for it locally
- obj, ok := lt.get(target)
- // If it exists locally, handle it
- if ok {
- return obj.SkipSuccessor(self)
- }
- // Pass onto remote
- return lt.Remote.SkipSuccessor(target, self)
- }
- func (lt *LocalTransport) Register(v *Vnode, o VnodeRPC) {
- // Register local instance
- key := v.String()
- lt.lock.Lock()
- lt.host = v.Host
- lt.Local[key] = &LocalRPC{v, o}
- lt.lock.Unlock()
- // Register with remote transport
- lt.Remote.Register(v, o)
- }
- func (lt *LocalTransport) Deregister(v *Vnode) {
- key := v.String()
- lt.lock.Lock()
- delete(lt.Local, key)
- lt.lock.Unlock()
- }
- // BlackholeTransport is used to provide an implemenation of the Transport that
- // does not actually do anything. Any operation will result in an error.
- type BlackholeTransport struct {
- }
- func (*BlackholeTransport) ListVnodes(host string) ([]*Vnode, error) {
- return nil, fmt.Errorf("Failed to connect! Blackhole: %s.", host)
- }
- func (*BlackholeTransport) Ping(vn *Vnode) (bool, error) {
- return false, nil
- }
- func (*BlackholeTransport) GetPredecessor(vn *Vnode) (*Vnode, error) {
- return nil, fmt.Errorf("Failed to connect! Blackhole: %s.", vn.String())
- }
- func (*BlackholeTransport) Notify(vn, self *Vnode) ([]*Vnode, error) {
- return nil, fmt.Errorf("Failed to connect! Blackhole: %s", vn.String())
- }
- func (*BlackholeTransport) FindSuccessors(vn *Vnode, n int, key []byte) ([]*Vnode, error) {
- return nil, fmt.Errorf("Failed to connect! Blackhole: %s", vn.String())
- }
- func (*BlackholeTransport) ClearPredecessor(target, self *Vnode) error {
- return fmt.Errorf("Failed to connect! Blackhole: %s", target.String())
- }
- func (*BlackholeTransport) SkipSuccessor(target, self *Vnode) error {
- return fmt.Errorf("Failed to connect! Blackhole: %s", target.String())
- }
- func (*BlackholeTransport) Register(v *Vnode, o VnodeRPC) {
- }
|