libf há 2 anos atrás
pai
commit
c72e33fc32

+ 224 - 0
chord/chord.go

@@ -0,0 +1,224 @@
+/*
+This package is used to provide an implementation of the
+Chord network protocol.
+*/
+package chord
+
+import (
+	"crypto/sha1"
+	"fmt"
+	"hash"
+	"time"
+)
+
+// Implements the methods needed for a Chord ring
+type Transport interface {
+	// Gets a list of the Vnodes on the box
+	ListVnodes(string) ([]*Vnode, error)
+
+	// Ping a Vnode, check for liveness
+	Ping(*Vnode) (bool, error)
+
+	// Request a nodes predecessor
+	GetPredecessor(*Vnode) (*Vnode, error)
+
+	// Notify our successor of ourselves
+	Notify(target, self *Vnode) ([]*Vnode, error)
+
+	// Find a successor
+	FindSuccessors(*Vnode, int, []byte) ([]*Vnode, error)
+
+	// Clears a predecessor if it matches a given vnode. Used to leave.
+	ClearPredecessor(target, self *Vnode) error
+
+	// Instructs a node to skip a given successor. Used to leave.
+	SkipSuccessor(target, self *Vnode) error
+
+	// Register for an RPC callbacks
+	Register(*Vnode, VnodeRPC)
+}
+
+// These are the methods to invoke on the registered Vnodes
+type VnodeRPC interface {
+	GetPredecessor() (*Vnode, error)
+	Notify(*Vnode) ([]*Vnode, error)
+	FindSuccessors(int, []byte) ([]*Vnode, error)
+	ClearPredecessor(*Vnode) error
+	SkipSuccessor(*Vnode) error
+}
+
+// Delegate to notify on ring events
+type Delegate interface {
+	NewPredecessor(local, remoteNew, remotePrev *Vnode)
+	Leaving(local, pred, succ *Vnode)
+	PredecessorLeaving(local, remote *Vnode)
+	SuccessorLeaving(local, remote *Vnode)
+	Shutdown()
+}
+
+// Configuration for Chord nodes
+type Config struct {
+	Hostname      string           // Local host name
+	NumVnodes     int              // Number of Vnodes per physical node
+	HashFunc      func() hash.Hash // Hash function to use
+	StabilizeMin  time.Duration    // Minimum stabilization time
+	StabilizeMax  time.Duration    // Maximum stabilization time
+	NumSuccessors int              // Number of Successors to maintain
+	Delegate      Delegate         // Invoked to handle ring events
+	HashBits      int              // Bit size of the hash function
+}
+
+// Represents an Vnode, local or remote
+type Vnode struct {
+	Id   []byte // Virtual ID
+	Host string // Host identifier
+}
+
+// Represents a local Vnode
+type LocalVnode struct {
+	Vnode
+	Ring        *Ring
+	Successors  []*Vnode
+	Finger      []*Vnode
+	LastFinger  int
+	Predecessor *Vnode
+	Stabilized  time.Time
+	Timer       *time.Timer
+}
+
+// Stores the state required for a Chord ring
+type Ring struct {
+	Config       *Config
+	Transport    Transport
+	Vnodes       []*LocalVnode
+	delegateCh   chan func()
+	ChanShutdown chan bool
+}
+
+// Returns the default Ring configuration
+func DefaultConfig(hostname string) *Config {
+	return &Config{
+		hostname,
+		8,        // 8 Vnodes
+		sha1.New, // SHA1
+		time.Duration(15 * time.Second),
+		time.Duration(45 * time.Second),
+		8,   // 8 Successors
+		nil, // No delegate
+		160, // 160bit hash function
+	}
+}
+
+// Creates a new Chord ring given the config and transport
+func Create(conf *Config, trans Transport) (*Ring, error) {
+	// Initialize the hash bits
+	conf.HashBits = conf.HashFunc().Size() * 8
+
+	// Create and initialize a ring
+	ring := &Ring{}
+	ring.Init(conf, trans)
+	ring.SetLocalSuccessors()
+	ring.Schedule()
+	return ring, nil
+}
+
+// Joins an existing Chord ring
+func Join(conf *Config, trans Transport, existing string) (*Ring, error) {
+	// Initialize the hash bits
+	conf.HashBits = conf.HashFunc().Size() * 8
+
+	// Request a list of Vnodes from the remote host
+	hosts, err := trans.ListVnodes(existing)
+	if err != nil {
+		return nil, err
+	}
+	if hosts == nil || len(hosts) == 0 {
+		return nil, fmt.Errorf("Remote host has no Vnodes!")
+	}
+
+	// Create a ring
+	ring := &Ring{}
+	ring.Init(conf, trans)
+
+	// Acquire a live successor for each Vnode
+	for _, vn := range ring.Vnodes {
+		// Get the nearest remote vnode
+		nearest := NearestVnodeToKey(hosts, vn.Id)
+
+		// Query for a list of Successors to this Vnode
+		succs, err := trans.FindSuccessors(nearest, conf.NumSuccessors, vn.Id)
+		if err != nil {
+			return nil, fmt.Errorf("Failed to find successor for Vnodes! Got %s", err)
+		}
+		if succs == nil || len(succs) == 0 {
+			return nil, fmt.Errorf("Failed to find successor for Vnodes! Got no Vnodes!")
+		}
+
+		// Assign the Successors
+		for idx, s := range succs {
+			vn.Successors[idx] = s
+		}
+	}
+
+	// Start delegate handler
+	if ring.Config.Delegate != nil {
+		go ring.DelegateHandler()
+	}
+
+	// Do a fast stabilization, will schedule regular execution
+	for _, vn := range ring.Vnodes {
+		vn.Stabilize()
+	}
+	return ring, nil
+}
+
+// Leaves a given Chord ring and shuts down the local Vnodes
+func (r *Ring) Leave() error {
+	// Shutdown the Vnodes first to avoid further stabilization runs
+	r.StopVnodes()
+
+	// Instruct each vnode to leave
+	var err error
+	for _, vn := range r.Vnodes {
+		err = MergeErrors(err, vn.Leave())
+	}
+
+	// Wait for the delegate callbacks to complete
+	r.StopDelegate()
+	return err
+}
+
+// Shutdown shuts down the local processes in a given Chord ring
+// Blocks until all the Vnodes terminate.
+func (r *Ring) Shutdown() {
+	r.StopVnodes()
+	r.StopDelegate()
+}
+
+// Does a key lookup for up to N Successors of a key
+func (r *Ring) Lookup(n int, key []byte) ([]*Vnode, error) {
+	// Ensure that n is sane
+	if n > r.Config.NumSuccessors {
+		return nil, fmt.Errorf("Cannot ask for more Successors than NumSuccessors!")
+	}
+
+	// Hash the key
+	h := r.Config.HashFunc()
+	h.Write(key)
+	key_hash := h.Sum(nil)
+
+	// Find the nearest local vnode
+	nearest := r.NearestVnode(key_hash)
+
+	// Use the nearest node for the lookup
+	Successors, err := nearest.FindSuccessors(n, key_hash)
+	if err != nil {
+		return nil, err
+	}
+
+	// Trim the nil Successors
+	for Successors[len(Successors)-1] == nil {
+		Successors = Successors[:len(Successors)-1]
+	}
+	return Successors, nil
+}

+ 276 - 0
chord/chord_test/chord_test.go

@@ -0,0 +1,276 @@
+package chord_test
+
+import (
+	"runtime"
+	"testing"
+	"time"
+	"trial/chord"
+)
+
+type MultiLocalTrans struct {
+	remote chord.Transport
+	hosts  map[string]*chord.LocalTransport
+}
+
+func InitMLTransport() *MultiLocalTrans {
+	hosts := make(map[string]*chord.LocalTransport)
+	remote := &chord.BlackholeTransport{}
+	ml := &MultiLocalTrans{hosts: hosts}
+	ml.remote = remote
+	return ml
+}
+
+func (ml *MultiLocalTrans) ListVnodes(host string) ([]*chord.Vnode, error) {
+	if local, ok := ml.hosts[host]; ok {
+		return local.ListVnodes(host)
+	}
+	return ml.remote.ListVnodes(host)
+}
+
+// Ping a Vnode, check for liveness
+func (ml *MultiLocalTrans) Ping(v *chord.Vnode) (bool, error) {
+	if local, ok := ml.hosts[v.Host]; ok {
+		return local.Ping(v)
+	}
+	return ml.remote.Ping(v)
+}
+
+// Request a nodes predecessor
+func (ml *MultiLocalTrans) GetPredecessor(v *chord.Vnode) (*chord.Vnode, error) {
+	if local, ok := ml.hosts[v.Host]; ok {
+		return local.GetPredecessor(v)
+	}
+	return ml.remote.GetPredecessor(v)
+}
+
+// Notify our successor of ourselves
+func (ml *MultiLocalTrans) Notify(target, self *chord.Vnode) ([]*chord.Vnode, error) {
+	if local, ok := ml.hosts[target.Host]; ok {
+		return local.Notify(target, self)
+	}
+	return ml.remote.Notify(target, self)
+}
+
+// Find a successor
+func (ml *MultiLocalTrans) FindSuccessors(v *chord.Vnode, n int, k []byte) ([]*chord.Vnode, error) {
+	if local, ok := ml.hosts[v.Host]; ok {
+		return local.FindSuccessors(v, n, k)
+	}
+	return ml.remote.FindSuccessors(v, n, k)
+}
+
+// Clears a predecessor if it matches a given vnode. Used to leave.
+func (ml *MultiLocalTrans) ClearPredecessor(target, self *chord.Vnode) error {
+	if local, ok := ml.hosts[target.Host]; ok {
+		return local.ClearPredecessor(target, self)
+	}
+	return ml.remote.ClearPredecessor(target, self)
+}
+
+// Instructs a node to skip a given successor. Used to leave.
+func (ml *MultiLocalTrans) SkipSuccessor(target, self *chord.Vnode) error {
+	if local, ok := ml.hosts[target.Host]; ok {
+		return local.SkipSuccessor(target, self)
+	}
+	return ml.remote.SkipSuccessor(target, self)
+}
+
+func (ml *MultiLocalTrans) Register(v *chord.Vnode, o chord.VnodeRPC) {
+	local, ok := ml.hosts[v.Host]
+	if !ok {
+		local = chord.InitLocalTransport(nil).(*chord.LocalTransport)
+		ml.hosts[v.Host] = local
+	}
+	local.Register(v, o)
+}
+
+func (ml *MultiLocalTrans) Deregister(host string) {
+	delete(ml.hosts, host)
+}
+
+func TestDefaultConfig(t *testing.T) {
+	conf := chord.DefaultConfig("test")
+	if conf.Hostname != "test" {
+		t.Fatalf("bad hostname")
+	}
+	if conf.NumVnodes != 8 {
+		t.Fatalf("bad num Vnodes")
+	}
+	if conf.NumSuccessors != 8 {
+		t.Fatalf("bad num succ")
+	}
+	if conf.HashFunc == nil {
+		t.Fatalf("bad hash")
+	}
+	if conf.HashBits != 160 {
+		t.Fatalf("bad hash bits")
+	}
+	if conf.StabilizeMin != time.Duration(15*time.Second) {
+		t.Fatalf("bad min stable")
+	}
+	if conf.StabilizeMax != time.Duration(45*time.Second) {
+		t.Fatalf("bad max stable")
+	}
+	if conf.Delegate != nil {
+		t.Fatalf("bad delegate")
+	}
+}
+
+func fastConf() *chord.Config {
+	conf := chord.DefaultConfig("test")
+	conf.StabilizeMin = time.Duration(15 * time.Millisecond)
+	conf.StabilizeMax = time.Duration(45 * time.Millisecond)
+	return conf
+}
+
+func TestCreateShutdown(t *testing.T) {
+	// Start the timer thread
+	time.After(15)
+	conf := fastConf()
+	numGo := runtime.NumGoroutine()
+	r, err := chord.Create(conf, nil)
+	if err != nil {
+		t.Fatalf("unexpected err. %s", err)
+	}
+	r.Shutdown()
+	after := runtime.NumGoroutine()
+	if after != numGo {
+		t.Fatalf("unexpected routines! A:%d B:%d", after, numGo)
+	}
+}
+
+func TestJoin(t *testing.T) {
+	// Create a multi transport
+	ml := InitMLTransport()
+
+	// Create the initial ring
+	conf := fastConf()
+	r, err := chord.Create(conf, ml)
+	if err != nil {
+		t.Fatalf("unexpected err. %s", err)
+	}
+
+	// Create a second ring
+	conf2 := fastConf()
+	conf2.Hostname = "test2"
+	r2, err := chord.Join(conf2, ml, "test")
+	if err != nil {
+		t.Fatalf("failed to join local node! Got %s", err)
+	}
+
+	// Shutdown
+	r.Shutdown()
+	r2.Shutdown()
+}
+
+func TestJoinDeadHost(t *testing.T) {
+	// Create a multi transport
+	ml := InitMLTransport()
+
+	// Create the initial ring
+	conf := fastConf()
+	_, err := chord.Join(conf, ml, "noop")
+	if err == nil {
+		t.Fatalf("expected err!")
+	}
+}
+
+func TestLeave(t *testing.T) {
+	// Create a multi transport
+	ml := InitMLTransport()
+
+	// Create the initial ring
+	conf := fastConf()
+	r, err := chord.Create(conf, ml)
+	if err != nil {
+		t.Fatalf("unexpected err. %s", err)
+	}
+
+	// Create a second ring
+	conf2 := fastConf()
+	conf2.Hostname = "test2"
+	r2, err := chord.Join(conf2, ml, "test")
+	if err != nil {
+		t.Fatalf("failed to join local node! Got %s", err)
+	}
+
+	// Wait for some stabilization
+	<-time.After(100 * time.Millisecond)
+
+	// Node 1 should leave
+	r.Leave()
+	ml.Deregister("test")
+
+	// Wait for stabilization
+	<-time.After(100 * time.Millisecond)
+
+	// Verify r2 ring is still in tact
+	num := len(r2.Vnodes)
+	for idx, vn := range r2.Vnodes {
+		if vn.Successors[0] != &r2.Vnodes[(idx+1)%num].Vnode {
+			t.Fatalf("bad successor! Got:%s:%s", vn.Successors[0].Host,
+				vn.Successors[0])
+		}
+	}
+}
+
+func TestLookupBadN(t *testing.T) {
+	// Create a multi transport
+	ml := InitMLTransport()
+
+	// Create the initial ring
+	conf := fastConf()
+	r, err := chord.Create(conf, ml)
+	if err != nil {
+		t.Fatalf("unexpected err. %s", err)
+	}
+
+	_, err = r.Lookup(10, []byte("test"))
+	if err == nil {
+		t.Fatalf("expected err!")
+	}
+}
+
+func TestLookup(t *testing.T) {
+	// Create a multi transport
+	ml := InitMLTransport()
+
+	// Create the initial ring
+	conf := fastConf()
+	r, err := chord.Create(conf, ml)
+	if err != nil {
+		t.Fatalf("unexpected err. %s", err)
+	}
+
+	// Create a second ring
+	conf2 := fastConf()
+	conf2.Hostname = "test2"
+	r2, err := chord.Join(conf2, ml, "test")
+	if err != nil {
+		t.Fatalf("failed to join local node! Got %s", err)
+	}
+
+	// Wait for some stabilization
+	<-time.After(100 * time.Millisecond)
+
+	// Try key lookup
+	keys := [][]byte{[]byte("test"), []byte("foo"), []byte("bar")}
+	for _, k := range keys {
+		vn1, err := r.Lookup(3, k)
+		if err != nil {
+			t.Fatalf("unexpected err %s", err)
+		}
+		vn2, err := r2.Lookup(3, k)
+		if err != nil {
+			t.Fatalf("unexpected err %s", err)
+		}
+		if len(vn1) != len(vn2) {
+			t.Fatalf("result len differs!")
+		}
+		for idx := range vn1 {
+			if vn1[idx].String() != vn2[idx].String() {
+				t.Fatalf("results differ!")
+			}
+		}
+	}
+}

+ 184 - 0
chord/chord_test/iter_closest_test.go

@@ -0,0 +1,184 @@
+package chord_test
+
+import (
+	"math/big"
+	"testing"
+	"trial/chord"
+)
+
+func TestNextClosest(t *testing.T) {
+	// Make the Vnodes on the Ring (mod 64)
+	v1 := &chord.Vnode{Id: []byte{1}}
+	v2 := &chord.Vnode{Id: []byte{10}}
+	//v3 := &chord.Vnode{Id: []byte{20}}
+	v4 := &chord.Vnode{Id: []byte{32}}
+	//v5 := &chord.Vnode{Id: []byte{40}}
+	v6 := &chord.Vnode{Id: []byte{59}}
+	v7 := &chord.Vnode{Id: []byte{62}}
+
+	// Make a vnode
+	vn := &chord.LocalVnode{}
+	vn.Id = []byte{54}
+	vn.Successors = []*chord.Vnode{v6, v7, nil}
+	vn.Finger = []*chord.Vnode{v6, v6, v7, v1, v2, v4, nil}
+	vn.Ring = &chord.Ring{}
+	vn.Ring.Config = &chord.Config{HashBits: 6}
+
+	// Make an iterator
+	k := []byte{32}
+	cp := &chord.ClosestPreceedingVnodeIterator{}
+	cp.Init(vn, k)
+
+	// Iterate until we are done
+	s1 := cp.Next()
+	if s1 != v2 {
+		t.Fatalf("Expect v2. %v", s1)
+	}
+
+	s2 := cp.Next()
+	if s2 != v1 {
+		t.Fatalf("Expect v1. %v", s2)
+	}
+
+	s3 := cp.Next()
+	if s3 != v7 {
+		t.Fatalf("Expect v7. %v", s3)
+	}
+
+	s4 := cp.Next()
+	if s4 != v6 {
+		t.Fatalf("Expect v6. %v", s4)
+	}
+
+	s5 := cp.Next()
+	if s5 != nil {
+		t.Fatalf("Expect nil. %v", s5)
+	}
+}
+
+func TestNextClosestNoSucc(t *testing.T) {
+	// Make the Vnodes on the Ring (mod 64)
+	v1 := &chord.Vnode{Id: []byte{1}}
+	v2 := &chord.Vnode{Id: []byte{10}}
+	//v3 := &chord.Vnode{Id: []byte{20}}
+	v4 := &chord.Vnode{Id: []byte{32}}
+	//v5 := &chord.Vnode{Id: []byte{40}}
+	v6 := &chord.Vnode{Id: []byte{59}}
+	v7 := &chord.Vnode{Id: []byte{62}}
+
+	// Make a vnode
+	vn := &chord.LocalVnode{}
+	vn.Id = []byte{54}
+	vn.Successors = []*chord.Vnode{nil}
+	vn.Finger = []*chord.Vnode{v6, v6, v7, v1, v2, v4, nil}
+	vn.Ring = &chord.Ring{}
+	vn.Ring.Config = &chord.Config{HashBits: 6}
+
+	// Make an iterator
+	k := []byte{32}
+	cp := &chord.ClosestPreceedingVnodeIterator{}
+	cp.Init(vn, k)
+
+	// Iterate until we are done
+	s1 := cp.Next()
+	if s1 != v2 {
+		t.Fatalf("Expect v2. %v", s1)
+	}
+
+	s2 := cp.Next()
+	if s2 != v1 {
+		t.Fatalf("Expect v1. %v", s2)
+	}
+
+	s3 := cp.Next()
+	if s3 != v7 {
+		t.Fatalf("Expect v7. %v", s3)
+	}
+
+	s4 := cp.Next()
+	if s4 != v6 {
+		t.Fatalf("Expect v6. %v", s4)
+	}
+
+	s5 := cp.Next()
+	if s5 != nil {
+		t.Fatalf("Expect nil. %v", s5)
+	}
+}
+
+func TestNextClosestNoFinger(t *testing.T) {
+	// Make the Vnodes on the Ring (mod 64)
+	//v1 := &chord.Vnode{Id: []byte{1}}
+	//v2 := &chord.Vnode{Id: []byte{10}}
+	//v3 := &chord.Vnode{Id: []byte{20}}
+	//v4 := &chord.Vnode{Id: []byte{32}}
+	//v5 := &chord.Vnode{Id: []byte{40}}
+	v6 := &chord.Vnode{Id: []byte{59}}
+	v7 := &chord.Vnode{Id: []byte{62}}
+
+	// Make a vnode
+	vn := &chord.LocalVnode{}
+	vn.Id = []byte{54}
+	vn.Successors = []*chord.Vnode{v6, v7, v7, nil}
+	vn.Finger = []*chord.Vnode{nil, nil, nil}
+	vn.Ring = &chord.Ring{}
+	vn.Ring.Config = &chord.Config{HashBits: 6}
+
+	// Make an iterator
+	k := []byte{32}
+	cp := &chord.ClosestPreceedingVnodeIterator{}
+	cp.Init(vn, k)
+
+	// Iterate until we are done
+	s3 := cp.Next()
+	if s3 != v7 {
+		t.Fatalf("Expect v7. %v", s3)
+	}
+
+	s4 := cp.Next()
+	if s4 != v6 {
+		t.Fatalf("Expect v6. %v", s4)
+	}
+
+	s5 := cp.Next()
+	if s5 != nil {
+		t.Fatalf("Expect nil. %v", s5)
+	}
+}
+
+func TestClosest(t *testing.T) {
+	a := &chord.Vnode{Id: []byte{128}}
+	b := &chord.Vnode{Id: []byte{32}}
+	k := []byte{45}
+	c := chord.ClosestPreceedingVnode(a, b, k, 8)
+	if c != b {
+		t.Fatalf("expect b to be closer!")
+	}
+	c = chord.ClosestPreceedingVnode(b, a, k, 8)
+	if c != b {
+		t.Fatalf("expect b to be closer!")
+	}
+}
+
+func TestDistance(t *testing.T) {
+	a := []byte{63}
+	b := []byte{3}
+	d := chord.Distance(a, b, 6) // Ring size of 64
+	if d.Cmp(big.NewInt(4)) != 0 {
+		t.Fatalf("expect distance 4! %v", d)
+	}
+
+	a = []byte{0}
+	b = []byte{65}
+	d = chord.Distance(a, b, 7) // Ring size of 128
+	if d.Cmp(big.NewInt(65)) != 0 {
+		t.Fatalf("expect distance 65! %v", d)
+	}
+
+	a = []byte{1}
+	b = []byte{255}
+	d = chord.Distance(a, b, 8) // Ring size of 256
+	if d.Cmp(big.NewInt(254)) != 0 {
+		t.Fatalf("expect distance 254! %v", d)
+	}
+}

+ 93 - 0
chord/chord_test/net_test.go

@@ -0,0 +1,93 @@
+package chord_test
+
+import (
+	"fmt"
+	"testing"
+	"time"
+	"trial/chord"
+)
+
+func prepRing(port int) (*chord.Config, *chord.TCPTransport, error) {
+	listen := fmt.Sprintf("localhost:%d", port)
+	conf := chord.DefaultConfig(listen)
+	conf.StabilizeMin = time.Duration(15 * time.Millisecond)
+	conf.StabilizeMax = time.Duration(45 * time.Millisecond)
+	timeout := time.Duration(20 * time.Millisecond)
+	trans, err := chord.InitTCPTransport(listen, timeout)
+	if err != nil {
+		return nil, nil, err
+	}
+	return conf, trans, nil
+}
+
+func TestTCPJoin(t *testing.T) {
+	// Prepare to create 2 nodes
+	c1, t1, err := prepRing(10025)
+	if err != nil {
+		t.Fatalf("unexpected err. %s", err)
+	}
+	c2, t2, err := prepRing(10026)
+	if err != nil {
+		t.Fatalf("unexpected err. %s", err)
+	}
+
+	// Create initial ring
+	r1, err := chord.Create(c1, t1)
+	if err != nil {
+		t.Fatalf("unexpected err. %s", err)
+	}
+
+	// Join ring
+	r2, err := chord.Join(c2, t2, c1.Hostname)
+	if err != nil {
+		t.Fatalf("failed to join local node! Got %s", err)
+	}
+
+	// Shutdown
+	r1.Shutdown()
+	r2.Shutdown()
+	t1.Shutdown()
+	t2.Shutdown()
+}
+
+func TestTCPLeave(t *testing.T) {
+	// Prepare to create 2 nodes
+	c1, t1, err := prepRing(10027)
+	if err != nil {
+		t.Fatalf("unexpected err. %s", err)
+	}
+	c2, t2, err := prepRing(10028)
+	if err != nil {
+		t.Fatalf("unexpected err. %s", err)
+	}
+
+	// Create initial ring
+	r1, err := chord.Create(c1, t1)
+	if err != nil {
+		t.Fatalf("unexpected err. %s", err)
+	}
+
+	// Join ring
+	r2, err := chord.Join(c2, t2, c1.Hostname)
+	if err != nil {
+		t.Fatalf("failed to join local node! Got %s", err)
+	}
+
+	// Wait for some stabilization
+	<-time.After(100 * time.Millisecond)
+
+	// Node 1 should leave
+	r1.Leave()
+	t1.Shutdown()
+
+	// Wait for stabilization
+	<-time.After(100 * time.Millisecond)
+
+	// Verify r2 ring is still in tact
+	for _, vn := range r2.Vnodes {
+		if vn.Successors[0].Host != r2.Config.Hostname {
+			t.Fatalf("bad successor! Got:%s:%s", vn.Successors[0].Host,
+				vn.Successors[0])
+		}
+	}
+}

+ 187 - 0
chord/chord_test/ring_test.go

@@ -0,0 +1,187 @@
+package chord_test
+
+import (
+	"bytes"
+	"crypto/sha1"
+	"sort"
+	"testing"
+	"time"
+	"trial/chord"
+)
+
+type MockDelegate struct {
+	shutdown bool
+}
+
+func (m *MockDelegate) NewPredecessor(local, remoteNew, remotePrev *chord.Vnode) {
+}
+func (m *MockDelegate) Leaving(local, pred, succ *chord.Vnode) {
+}
+func (m *MockDelegate) PredecessorLeaving(local, remote *chord.Vnode) {
+}
+func (m *MockDelegate) SuccessorLeaving(local, remote *chord.Vnode) {
+}
+func (m *MockDelegate) Shutdown() {
+	m.shutdown = true
+}
+
+func makeRing() *chord.Ring {
+	conf := &chord.Config{
+		NumVnodes:     5,
+		NumSuccessors: 8,
+		HashFunc:      sha1.New,
+		HashBits:      160,
+		StabilizeMin:  time.Second,
+		StabilizeMax:  5 * time.Second,
+	}
+
+	ring := &chord.Ring{}
+	ring.Init(conf, nil)
+	return ring
+}
+
+func TestRingInit(t *testing.T) {
+	// Create a ring
+	ring := &chord.Ring{}
+	conf := chord.DefaultConfig("test")
+	ring.Init(conf, nil)
+
+	// Test features
+	if ring.Config != conf {
+		t.Fatalf("wrong config")
+	}
+	if ring.Transport == nil {
+		t.Fatalf("missing transport")
+	}
+
+	// Check the Vnodes
+	for i := 0; i < conf.NumVnodes; i++ {
+		if ring.Vnodes[i] == nil {
+			t.Fatalf("missing vnode!")
+		}
+		if ring.Vnodes[i].Ring != ring {
+			t.Fatalf("ring missing!")
+		}
+		if ring.Vnodes[i].Id == nil {
+			t.Fatalf("ID not initialized!")
+		}
+	}
+}
+
+func TestRingLen(t *testing.T) {
+	ring := makeRing()
+	if ring.Len() != 5 {
+		t.Fatalf("wrong len")
+	}
+}
+
+func TestRingSort(t *testing.T) {
+	ring := makeRing()
+	sort.Sort(ring)
+	if bytes.Compare(ring.Vnodes[0].Id, ring.Vnodes[1].Id) != -1 {
+		t.Fatalf("bad sort")
+	}
+	if bytes.Compare(ring.Vnodes[1].Id, ring.Vnodes[2].Id) != -1 {
+		t.Fatalf("bad sort")
+	}
+	if bytes.Compare(ring.Vnodes[2].Id, ring.Vnodes[3].Id) != -1 {
+		t.Fatalf("bad sort")
+	}
+	if bytes.Compare(ring.Vnodes[3].Id, ring.Vnodes[4].Id) != -1 {
+		t.Fatalf("bad sort")
+	}
+}
+
+func TestRingNearest(t *testing.T) {
+	ring := makeRing()
+	ring.Vnodes[0].Id = []byte{2}
+	ring.Vnodes[1].Id = []byte{4}
+	ring.Vnodes[2].Id = []byte{7}
+	ring.Vnodes[3].Id = []byte{10}
+	ring.Vnodes[4].Id = []byte{14}
+	key := []byte{6}
+
+	near := ring.NearestVnode(key)
+	if near != ring.Vnodes[1] {
+		t.Fatalf("got wrong node back!")
+	}
+
+	key = []byte{0}
+	near = ring.NearestVnode(key)
+	if near != ring.Vnodes[4] {
+		t.Fatalf("got wrong node back!")
+	}
+}
+
+func TestRingSchedule(t *testing.T) {
+	ring := makeRing()
+	ring.SetLocalSuccessors()
+	ring.Schedule()
+	for i := 0; i < len(ring.Vnodes); i++ {
+		if ring.Vnodes[i].Timer == nil {
+			t.Fatalf("expected timer!")
+		}
+	}
+	ring.StopVnodes()
+}
+
+func TestRingSetLocalSucc(t *testing.T) {
+	ring := makeRing()
+	ring.SetLocalSuccessors()
+	for i := 0; i < len(ring.Vnodes); i++ {
+		for j := 0; j < 4; j++ {
+			if ring.Vnodes[i].Successors[j] == nil {
+				t.Fatalf("expected successor!")
+			}
+		}
+		if ring.Vnodes[i].Successors[4] != nil {
+			t.Fatalf("should not have 5th successor!")
+		}
+	}
+
+	// Verify the successor manually for node 3
+	vn := ring.Vnodes[2]
+	if vn.Successors[0] != &ring.Vnodes[3].Vnode {
+		t.Fatalf("bad succ!")
+	}
+	if vn.Successors[1] != &ring.Vnodes[4].Vnode {
+		t.Fatalf("bad succ!")
+	}
+	if vn.Successors[2] != &ring.Vnodes[0].Vnode {
+		t.Fatalf("bad succ!")
+	}
+	if vn.Successors[3] != &ring.Vnodes[1].Vnode {
+		t.Fatalf("bad succ!")
+	}
+}
+
+func TestRingDelegate(t *testing.T) {
+	d := &MockDelegate{}
+	ring := makeRing()
+	ring.SetLocalSuccessors()
+	ring.Config.Delegate = d
+	ring.Schedule()
+
+	var b bool
+	f := func() {
+		println("run!")
+		b = true
+	}
+	ch := ring.InvokeDelegate(f)
+	if ch == nil {
+		t.Fatalf("expected chan")
+	}
+	select {
+	case <-ch:
+	case <-time.After(time.Second):
+		t.Fatalf("timeout")
+	}
+	if !b {
+		t.Fatalf("b should be true")
+	}
+
+	ring.StopDelegate()
+	if !d.shutdown {
+		t.Fatalf("delegate did not get shutdown")
+	}
+}

+ 307 - 0
chord/chord_test/transport_test.go

@@ -0,0 +1,307 @@
+package chord_test
+
+import (
+	"bytes"
+	"testing"
+	"trial/chord"
+)
+
+type MockVnodeRPC struct {
+	err       error
+	pred      *chord.Vnode
+	not_pred  *chord.Vnode
+	succ_list []*chord.Vnode
+	key       []byte
+	succ      []*chord.Vnode
+	skip      *chord.Vnode
+}
+
+func (mv *MockVnodeRPC) GetPredecessor() (*chord.Vnode, error) {
+	return mv.pred, mv.err
+}
+func (mv *MockVnodeRPC) Notify(vn *chord.Vnode) ([]*chord.Vnode, error) {
+	mv.not_pred = vn
+	return mv.succ_list, mv.err
+}
+func (mv *MockVnodeRPC) FindSuccessors(n int, key []byte) ([]*chord.Vnode, error) {
+	mv.key = key
+	return mv.succ, mv.err
+}
+
+func (mv *MockVnodeRPC) ClearPredecessor(p *chord.Vnode) error {
+	mv.pred = nil
+	return nil
+}
+
+func (mv *MockVnodeRPC) SkipSuccessor(s *chord.Vnode) error {
+	mv.skip = s
+	return nil
+}
+
+func makeLocal() *chord.LocalTransport {
+	return chord.InitLocalTransport(nil).(*chord.LocalTransport)
+}
+
+func TestInitLocalTransport(t *testing.T) {
+	local := chord.InitLocalTransport(nil).(*chord.LocalTransport)
+	if local.Remote == nil {
+		t.Fatalf("bad remote")
+	}
+	if local.Local == nil {
+		t.Fatalf("missing map")
+	}
+}
+
+func TestLocalList(t *testing.T) {
+	l := makeLocal()
+	vn := &chord.Vnode{Id: []byte{1}, Host: "test"}
+	mockVN := &MockVnodeRPC{}
+	l.Register(vn, mockVN)
+
+	list, err := l.ListVnodes("test")
+	if err != nil {
+		t.Fatalf("unexpected err. %s", err)
+	}
+	if len(list) != 1 || list[0] != vn {
+		t.Fatal("local list failed", list)
+	}
+}
+
+func TestLocalListRemote(t *testing.T) {
+	l := makeLocal()
+	vn := &chord.Vnode{Id: []byte{1}, Host: "test"}
+	mockVN := &MockVnodeRPC{}
+	l.Register(vn, mockVN)
+
+	_, err := l.ListVnodes("remote")
+	if err == nil {
+		t.Fatalf("expected err!")
+	}
+}
+
+func TestLocalPing(t *testing.T) {
+	l := makeLocal()
+	vn := &chord.Vnode{Id: []byte{1}}
+	mockVN := &MockVnodeRPC{}
+	l.Register(vn, mockVN)
+	if res, err := l.Ping(vn); !res || err != nil {
+		t.Fatalf("local ping failed")
+	}
+}
+
+func TestLocalMissingPing(t *testing.T) {
+	l := makeLocal()
+	vn := &chord.Vnode{Id: []byte{2}}
+	mockVN := &MockVnodeRPC{}
+	l.Register(vn, mockVN)
+
+	// Print some random node
+	vn2 := &chord.Vnode{Id: []byte{3}}
+	if res, _ := l.Ping(vn2); res {
+		t.Fatalf("ping succeeded")
+	}
+}
+
+func TestLocalGetPredecessor(t *testing.T) {
+	l := makeLocal()
+	pred := &chord.Vnode{Id: []byte{10}}
+	vn := &chord.Vnode{Id: []byte{42}}
+	mockVN := &MockVnodeRPC{pred: pred, err: nil}
+	l.Register(vn, mockVN)
+
+	vn2 := &chord.Vnode{Id: []byte{42}}
+	res, err := l.GetPredecessor(vn2)
+	if err != nil {
+		t.Fatalf("local GetPredecessor failed")
+	}
+	if res != pred {
+		t.Fatalf("got wrong predecessor")
+	}
+
+	unknown := &chord.Vnode{Id: []byte{1}}
+	res, err = l.GetPredecessor(unknown)
+	if err == nil {
+		t.Fatalf("expected error!")
+	}
+}
+
+func TestLocalNotify(t *testing.T) {
+	l := makeLocal()
+	suc1 := &chord.Vnode{Id: []byte{10}}
+	suc2 := &chord.Vnode{Id: []byte{20}}
+	suc3 := &chord.Vnode{Id: []byte{30}}
+	succ_list := []*chord.Vnode{suc1, suc2, suc3}
+
+	mockVN := &MockVnodeRPC{succ_list: succ_list, err: nil}
+	vn := &chord.Vnode{Id: []byte{0}}
+	l.Register(vn, mockVN)
+
+	self := &chord.Vnode{Id: []byte{60}}
+	res, err := l.Notify(vn, self)
+	if err != nil {
+		t.Fatalf("local notify failed")
+	}
+	if res == nil || res[0] != suc1 || res[1] != suc2 || res[2] != suc3 {
+		t.Fatalf("got wrong successor list")
+	}
+	if mockVN.not_pred != self {
+		t.Fatalf("didn't get notified correctly!")
+	}
+
+	unknown := &chord.Vnode{Id: []byte{1}}
+	res, err = l.Notify(unknown, self)
+	if err == nil {
+		t.Fatalf("remote notify should fail")
+	}
+}
+
+func TestLocalFindSucc(t *testing.T) {
+	l := makeLocal()
+	suc := []*chord.Vnode{{Id: []byte{40}}}
+
+	mockVN := &MockVnodeRPC{succ: suc, err: nil}
+	vn := &chord.Vnode{Id: []byte{12}}
+	l.Register(vn, mockVN)
+
+	key := []byte("test")
+	res, err := l.FindSuccessors(vn, 1, key)
+	if err != nil {
+		t.Fatalf("local FindSuccessor failed")
+	}
+	if res[0] != suc[0] {
+		t.Fatalf("got wrong successor")
+	}
+	if bytes.Compare(mockVN.key, key) != 0 {
+		t.Fatalf("didn't get key correctly!")
+	}
+
+	unknown := &chord.Vnode{Id: []byte{1}}
+	res, err = l.FindSuccessors(unknown, 1, key)
+	if err == nil {
+		t.Fatalf("remote find should fail")
+	}
+}
+
+func TestLocalClearPred(t *testing.T) {
+	l := makeLocal()
+	pred := &chord.Vnode{Id: []byte{10}}
+	mockVN := &MockVnodeRPC{pred: pred}
+	vn := &chord.Vnode{Id: []byte{12}}
+	l.Register(vn, mockVN)
+
+	err := l.ClearPredecessor(vn, pred)
+	if err != nil {
+		t.Fatalf("local ClearPredecessor failed")
+	}
+	if mockVN.pred != nil {
+		t.Fatalf("clear failed")
+	}
+
+	unknown := &chord.Vnode{Id: []byte{1}}
+	err = l.ClearPredecessor(unknown, pred)
+	if err == nil {
+		t.Fatalf("remote clear should fail")
+	}
+}
+
+func TestLocalSkipSucc(t *testing.T) {
+	l := makeLocal()
+	suc := []*chord.Vnode{{Id: []byte{40}}}
+	mockVN := &MockVnodeRPC{succ: suc}
+	vn := &chord.Vnode{Id: []byte{12}}
+	l.Register(vn, mockVN)
+
+	s := &chord.Vnode{Id: []byte{40}}
+	err := l.SkipSuccessor(vn, s)
+	if err != nil {
+		t.Fatalf("local Skip failed")
+	}
+	if mockVN.skip != s {
+		t.Fatalf("skip failed")
+	}
+
+	unknown := &chord.Vnode{Id: []byte{1}}
+	err = l.SkipSuccessor(unknown, s)
+	if err == nil {
+		t.Fatalf("remote skip should fail")
+	}
+}
+
+func TestLocalDeregister(t *testing.T) {
+	l := makeLocal()
+	vn := &chord.Vnode{Id: []byte{1}}
+	mockVN := &MockVnodeRPC{}
+	l.Register(vn, mockVN)
+	if res, err := l.Ping(vn); !res || err != nil {
+		t.Fatalf("local ping failed")
+	}
+	l.Deregister(vn)
+	if res, _ := l.Ping(vn); res {
+		t.Fatalf("local ping succeeded")
+	}
+}
+
+func TestBHList(t *testing.T) {
+	bh := chord.BlackholeTransport{}
+	res, err := bh.ListVnodes("test")
+	if res != nil || err == nil {
+		t.Fatalf("expected fail")
+	}
+}
+
+func TestBHPing(t *testing.T) {
+	bh := chord.BlackholeTransport{}
+	vn := &chord.Vnode{Id: []byte{12}}
+	res, err := bh.Ping(vn)
+	if res || err != nil {
+		t.Fatalf("expected fail")
+	}
+}
+
+func TestBHGetPred(t *testing.T) {
+	bh := chord.BlackholeTransport{}
+	vn := &chord.Vnode{Id: []byte{12}}
+	_, err := bh.GetPredecessor(vn)
+	if err.Error()[:18] != "Failed to connect!" {
+		t.Fatalf("expected fail")
+	}
+}
+
+func TestBHNotify(t *testing.T) {
+	bh := chord.BlackholeTransport{}
+	vn := &chord.Vnode{Id: []byte{12}}
+	vn2 := &chord.Vnode{Id: []byte{42}}
+	_, err := bh.Notify(vn, vn2)
+	if err.Error()[:18] != "Failed to connect!" {
+		t.Fatalf("expected fail")
+	}
+}
+
+func TestBHFindSuccessors(t *testing.T) {
+	bh := chord.BlackholeTransport{}
+	vn := &chord.Vnode{Id: []byte{12}}
+	_, err := bh.FindSuccessors(vn, 1, []byte("test"))
+	if err.Error()[:18] != "Failed to connect!" {
+		t.Fatalf("expected fail")
+	}
+}
+
+func TestBHClearPred(t *testing.T) {
+	bh := chord.BlackholeTransport{}
+	vn := &chord.Vnode{Id: []byte{12}}
+	s := &chord.Vnode{Id: []byte{50}}
+	err := bh.ClearPredecessor(vn, s)
+	if err.Error()[:18] != "Failed to connect!" {
+		t.Fatalf("expected fail")
+	}
+}
+
+func TestBHSkipSucc(t *testing.T) {
+	bh := chord.BlackholeTransport{}
+	vn := &chord.Vnode{Id: []byte{12}}
+	s := &chord.Vnode{Id: []byte{50}}
+	err := bh.SkipSuccessor(vn, s)
+	if err.Error()[:18] != "Failed to connect!" {
+		t.Fatalf("expected fail")
+	}
+}

+ 168 - 0
chord/chord_test/util_test.go

@@ -0,0 +1,168 @@
+package chord_test
+
+import (
+	"errors"
+	"testing"
+	"time"
+	"trial/chord"
+)
+
+func TestRandStabilize(t *testing.T) {
+	min := time.Duration(10 * time.Second)
+	max := time.Duration(30 * time.Second)
+	conf := &chord.Config{
+		StabilizeMin: min,
+		StabilizeMax: max}
+
+	var times []time.Duration
+	for i := 0; i < 1000; i++ {
+		after := chord.RandStabilize(conf)
+		times = append(times, after)
+		if after < min {
+			t.Fatalf("after below min")
+		}
+		if after > max {
+			t.Fatalf("after above max")
+		}
+	}
+
+	collisions := 0
+	for idx, val := range times {
+		for i := 0; i < len(times); i++ {
+			if idx != i && times[i] == val {
+				collisions += 1
+			}
+		}
+	}
+
+	if collisions > 3 {
+		t.Fatalf("too many collisions! %d", collisions)
+	}
+}
+
+func TestBetween(t *testing.T) {
+	t1 := []byte{0, 0, 0, 0}
+	t2 := []byte{1, 0, 0, 0}
+	k := []byte{0, 0, 5, 0}
+	if !chord.Between(t1, t2, k) {
+		t.Fatalf("expected k chord.Between!")
+	}
+	if chord.Between(t1, t2, t1) {
+		t.Fatalf("dont expect t1 chord.Between!")
+	}
+	if chord.Between(t1, t2, t2) {
+		t.Fatalf("dont expect t1 chord.Between!")
+	}
+
+	k = []byte{2, 0, 0, 0}
+	if chord.Between(t1, t2, k) {
+		t.Fatalf("dont expect k chord.Between!")
+	}
+}
+
+func TestBetweenWrap(t *testing.T) {
+	t1 := []byte{0xff, 0, 0, 0}
+	t2 := []byte{1, 0, 0, 0}
+	k := []byte{0, 0, 5, 0}
+	if !chord.Between(t1, t2, k) {
+		t.Fatalf("expected k chord.Between!")
+	}
+
+	k = []byte{0xff, 0xff, 0, 0}
+	if !chord.Between(t1, t2, k) {
+		t.Fatalf("expect k chord.Between!")
+	}
+}
+
+func TestBetweenRightIncl(t *testing.T) {
+	t1 := []byte{0, 0, 0, 0}
+	t2 := []byte{1, 0, 0, 0}
+	k := []byte{1, 0, 0, 0}
+	if !chord.BetweenRightIncl(t1, t2, k) {
+		t.Fatalf("expected k chord.Between!")
+	}
+}
+
+func TestBetweenRightInclWrap(t *testing.T) {
+	t1 := []byte{0xff, 0, 0, 0}
+	t2 := []byte{1, 0, 0, 0}
+	k := []byte{1, 0, 0, 0}
+	if !chord.BetweenRightIncl(t1, t2, k) {
+		t.Fatalf("expected k chord.Between!")
+	}
+}
+
+func TestPowerOffset(t *testing.T) {
+	id := []byte{0, 0, 0, 0}
+	exp := 30
+	mod := 32
+	val := chord.PowerOffset(id, exp, mod)
+	if val[0] != 64 {
+		t.Fatalf("unexpected val! %v", val)
+	}
+
+	// 0-7, 8-15, 16-23, 24-31
+	id = []byte{0, 0xff, 0xff, 0xff}
+	exp = 23
+	val = chord.PowerOffset(id, exp, mod)
+	if val[0] != 1 || val[1] != 0x7f || val[2] != 0xff || val[3] != 0xff {
+		t.Fatalf("unexpected val! %v", val)
+	}
+}
+
+func TestMax(t *testing.T) {
+	if chord.Max(-10, 10) != 10 {
+		t.Fatalf("bad chord.Max")
+	}
+	if chord.Max(10, -10) != 10 {
+		t.Fatalf("bad chord.Max")
+	}
+}
+
+func TestMin(t *testing.T) {
+	if chord.Min(-10, 10) != -10 {
+		t.Fatalf("bad chord.Min")
+	}
+	if chord.Min(10, -10) != -10 {
+		t.Fatalf("bad chord.Min")
+	}
+}
+
+func TestNearestVnodesKey(t *testing.T) {
+	Vnodes := make([]*chord.Vnode, 5)
+	Vnodes[0] = &chord.Vnode{Id: []byte{2}}
+	Vnodes[1] = &chord.Vnode{Id: []byte{4}}
+	Vnodes[2] = &chord.Vnode{Id: []byte{7}}
+	Vnodes[3] = &chord.Vnode{Id: []byte{10}}
+	Vnodes[4] = &chord.Vnode{Id: []byte{14}}
+	key := []byte{6}
+
+	near := chord.NearestVnodeToKey(Vnodes, key)
+	if near != Vnodes[1] {
+		t.Fatalf("got wrong node back!")
+	}
+
+	key = []byte{0}
+	near = chord.NearestVnodeToKey(Vnodes, key)
+	if near != Vnodes[4] {
+		t.Fatalf("got wrong node back!")
+	}
+}
+
+func TestMergeErrors(t *testing.T) {
+	e1 := errors.New("test1")
+	e2 := errors.New("test2")
+
+	if chord.MergeErrors(e1, nil) != e1 {
+		t.Fatalf("bad merge")
+	}
+	if chord.MergeErrors(nil, e1) != e1 {
+		t.Fatalf("bad merge")
+	}
+	if chord.MergeErrors(nil, nil) != nil {
+		t.Fatalf("bad merge")
+	}
+	if chord.MergeErrors(e1, e2).Error() != "test1\ntest2" {
+		t.Fatalf("bad merge")
+	}
+}

+ 685 - 0
chord/chord_test/vnode_test.go

@@ -0,0 +1,685 @@
+package chord_test
+
+import (
+	"bytes"
+	"crypto/sha1"
+	"sort"
+	"testing"
+	"time"
+	"trial/chord"
+)
+
+func makeVnode() *chord.LocalVnode {
+	min := time.Duration(10 * time.Second)
+	max := time.Duration(30 * time.Second)
+	conf := &chord.Config{
+		NumSuccessors: 8,
+		StabilizeMin:  min,
+		StabilizeMax:  max,
+		HashFunc:      sha1.New}
+	trans := chord.InitLocalTransport(nil)
+	ring := &chord.Ring{Config: conf, Transport: trans}
+	return &chord.LocalVnode{Ring: ring}
+}
+
+func TestVnodeInit(t *testing.T) {
+	vn := makeVnode()
+	vn.Init(0)
+	if vn.Id == nil {
+		t.Fatalf("unexpected nil")
+	}
+	if vn.Successors == nil {
+		t.Fatalf("unexpected nil")
+	}
+	if vn.Finger == nil {
+		t.Fatalf("unexpected nil")
+	}
+	if vn.Timer != nil {
+		t.Fatalf("unexpected timer")
+	}
+}
+
+func TestVnodeSchedule(t *testing.T) {
+	vn := makeVnode()
+	vn.Schedule()
+	if vn.Timer == nil {
+		t.Fatalf("unexpected nil")
+	}
+}
+
+func TestGenId(t *testing.T) {
+	vn := makeVnode()
+	var ids [][]byte
+	for i := 0; i < 16; i++ {
+		vn.GenId(uint16(i))
+		ids = append(ids, vn.Id)
+	}
+
+	for idx, val := range ids {
+		for i := 0; i < len(ids); i++ {
+			if idx != i && bytes.Compare(ids[i], val) == 0 {
+				t.Fatalf("unexpected id collision!")
+			}
+		}
+	}
+}
+
+func TestVnodeStabilizeShutdown(t *testing.T) {
+	vn := makeVnode()
+	vn.Schedule()
+	vn.Ring.ChanShutdown = make(chan bool, 1)
+	vn.Stabilize()
+
+	if vn.Timer != nil {
+		t.Fatalf("unexpected timer")
+	}
+	if !vn.Stabilized.IsZero() {
+		t.Fatalf("unexpected time")
+	}
+	select {
+	case <-vn.Ring.ChanShutdown:
+		return
+	default:
+		t.Fatalf("expected message")
+	}
+}
+
+func TestVnodeStabilizeResched(t *testing.T) {
+	vn := makeVnode()
+	vn.Init(1)
+	vn.Successors[0] = &vn.Vnode
+	vn.Schedule()
+	vn.Stabilize()
+
+	if vn.Timer == nil {
+		t.Fatalf("expected timer")
+	}
+	if vn.Stabilized.IsZero() {
+		t.Fatalf("expected time")
+	}
+	vn.Timer.Stop()
+}
+
+func TestVnodeKnownSucc(t *testing.T) {
+	vn := makeVnode()
+	vn.Init(0)
+	if vn.KnownSuccessors() != 0 {
+		t.Fatalf("wrong num known!")
+	}
+	vn.Successors[0] = &chord.Vnode{Id: []byte{1}}
+	if vn.KnownSuccessors() != 1 {
+		t.Fatalf("wrong num known!")
+	}
+}
+
+// Checks panic if no Successors
+func TestVnodeCheckNewSuccAlivePanic(t *testing.T) {
+	defer func() {
+		if r := recover(); r == nil {
+			t.Fatalf("expected panic!")
+		}
+	}()
+	vn1 := makeVnode()
+	vn1.Init(1)
+	vn1.CheckNewSuccessor()
+}
+
+// Checks pinging a live successor with no changes
+func TestVnodeCheckNewSuccAlive(t *testing.T) {
+	vn1 := makeVnode()
+	vn1.Init(1)
+
+	vn2 := makeVnode()
+	vn2.Ring = vn1.Ring
+	vn2.Init(2)
+	vn2.Predecessor = &vn1.Vnode
+	vn1.Successors[0] = &vn2.Vnode
+
+	if pred, _ := vn2.GetPredecessor(); pred != &vn1.Vnode {
+		t.Fatalf("expected vn1 as predecessor")
+	}
+
+	if err := vn1.CheckNewSuccessor(); err != nil {
+		t.Fatalf("unexpected err %s", err)
+	}
+
+	if vn1.Successors[0] != &vn2.Vnode {
+		t.Fatalf("unexpected successor!")
+	}
+}
+
+// Checks pinging a dead successor with no alternates
+func TestVnodeCheckNewSuccDead(t *testing.T) {
+	vn1 := makeVnode()
+	vn1.Init(1)
+	vn1.Successors[0] = &chord.Vnode{Id: []byte{0}}
+
+	if err := vn1.CheckNewSuccessor(); err == nil {
+		t.Fatal("err!", err)
+	}
+
+	if vn1.Successors[0].String() != "00" {
+		t.Fatalf("unexpected successor!")
+	}
+}
+
+// Checks pinging a dead successor with alternate
+func TestVnodeCheckNewSuccDeadAlternate(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+
+	vn1 := r.Vnodes[0]
+	vn2 := r.Vnodes[1]
+	vn3 := r.Vnodes[2]
+
+	vn1.Successors[0] = &vn2.Vnode
+	vn1.Successors[1] = &vn3.Vnode
+	vn2.Predecessor = &vn1.Vnode
+	vn3.Predecessor = &vn2.Vnode
+
+	// Remove vn2
+	(r.Transport.(*chord.LocalTransport)).Deregister(&vn2.Vnode)
+
+	// Should not get an error
+	if err := vn1.CheckNewSuccessor(); err != nil {
+		t.Fatalf("unexpected err %s", err)
+	}
+
+	// Should become vn3
+	if vn1.Successors[0] != &vn3.Vnode {
+		t.Fatalf("unexpected successor!")
+	}
+}
+
+// Checks pinging a dead successor with all dead alternates
+func TestVnodeCheckNewSuccAllDeadAlternates(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+
+	vn1 := r.Vnodes[0]
+	vn2 := r.Vnodes[1]
+	vn3 := r.Vnodes[2]
+
+	vn1.Successors[0] = &vn2.Vnode
+	vn1.Successors[1] = &vn3.Vnode
+	vn2.Predecessor = &vn1.Vnode
+	vn3.Predecessor = &vn2.Vnode
+
+	// Remove vn2
+	(r.Transport.(*chord.LocalTransport)).Deregister(&vn2.Vnode)
+	(r.Transport.(*chord.LocalTransport)).Deregister(&vn3.Vnode)
+
+	// Should get an error
+	if err := vn1.CheckNewSuccessor(); err.Error() != "All known Successors dead!" {
+		t.Fatalf("unexpected err %s", err)
+	}
+
+	// Should just be vn3
+	if vn1.Successors[0] != &vn3.Vnode {
+		t.Fatalf("unexpected successor!")
+	}
+}
+
+// Checks pinging a successor, and getting a new successor
+func TestVnodeCheckNewSuccNewSucc(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+
+	vn1 := r.Vnodes[0]
+	vn2 := r.Vnodes[1]
+	vn3 := r.Vnodes[2]
+
+	vn1.Successors[0] = &vn3.Vnode
+	vn2.Predecessor = &vn1.Vnode
+	vn3.Predecessor = &vn2.Vnode
+
+	// vn3 pred is vn2
+	if pred, _ := vn3.GetPredecessor(); pred != &vn2.Vnode {
+		t.Fatalf("expected vn2 as predecessor")
+	}
+
+	// Should not get an error
+	if err := vn1.CheckNewSuccessor(); err != nil {
+		t.Fatalf("unexpected err %s", err)
+	}
+
+	// Should become vn2
+	if vn1.Successors[0] != &vn2.Vnode {
+		t.Fatalf("unexpected successor! %s", vn1.Successors[0])
+	}
+
+	// 2nd successor should become vn3
+	if vn1.Successors[1] != &vn3.Vnode {
+		t.Fatalf("unexpected 2nd successor!")
+	}
+}
+
+// Checks pinging a successor, and getting a new successor
+// which is not alive
+func TestVnodeCheckNewSuccNewSuccDead(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+
+	vn1 := r.Vnodes[0]
+	vn2 := r.Vnodes[1]
+	vn3 := r.Vnodes[2]
+
+	vn1.Successors[0] = &vn3.Vnode
+	vn2.Predecessor = &vn1.Vnode
+	vn3.Predecessor = &vn2.Vnode
+
+	// Remove vn2
+	(r.Transport.(*chord.LocalTransport)).Deregister(&vn2.Vnode)
+
+	// Should not get an error
+	if err := vn1.CheckNewSuccessor(); err != nil {
+		t.Fatalf("unexpected err %s", err)
+	}
+
+	// Should stay vn3
+	if vn1.Successors[0] != &vn3.Vnode {
+		t.Fatalf("unexpected successor!")
+	}
+}
+
+// Test notifying a successor successfully
+func TestVnodeNotifySucc(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+
+	s1 := &chord.Vnode{Id: []byte{1}}
+	s2 := &chord.Vnode{Id: []byte{2}}
+	s3 := &chord.Vnode{Id: []byte{3}}
+
+	vn1 := r.Vnodes[0]
+	vn2 := r.Vnodes[1]
+	vn1.Successors[0] = &vn2.Vnode
+	vn2.Predecessor = &vn1.Vnode
+	vn2.Successors[0] = s1
+	vn2.Successors[1] = s2
+	vn2.Successors[2] = s3
+
+	// Should get no error
+	if err := vn1.NotifySuccessor(); err != nil {
+		t.Fatalf("unexpected err %s", err)
+	}
+
+	// Successor list should be updated
+	if vn1.Successors[1] != s1 {
+		t.Fatalf("bad succ 1")
+	}
+	if vn1.Successors[2] != s2 {
+		t.Fatalf("bad succ 2")
+	}
+	if vn1.Successors[3] != s3 {
+		t.Fatalf("bad succ 3")
+	}
+
+	// Predecessor should not updated
+	if vn2.Predecessor != &vn1.Vnode {
+		t.Fatalf("bad predecessor")
+	}
+}
+
+// Test notifying a dead successor
+func TestVnodeNotifySuccDead(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+
+	vn1 := r.Vnodes[0]
+	vn2 := r.Vnodes[1]
+	vn1.Successors[0] = &vn2.Vnode
+	vn2.Predecessor = &vn1.Vnode
+
+	// Remove vn2
+	(r.Transport.(*chord.LocalTransport)).Deregister(&vn2.Vnode)
+
+	// Should get error
+	if err := vn1.NotifySuccessor(); err == nil {
+		t.Fatalf("expected err!")
+	}
+}
+
+func TestVnodeNotifySamePred(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+
+	s1 := &chord.Vnode{Id: []byte{1}}
+	s2 := &chord.Vnode{Id: []byte{2}}
+	s3 := &chord.Vnode{Id: []byte{3}}
+
+	vn1 := r.Vnodes[0]
+	vn2 := r.Vnodes[1]
+	vn1.Successors[0] = &vn2.Vnode
+	vn2.Predecessor = &vn1.Vnode
+	vn2.Successors[0] = s1
+	vn2.Successors[1] = s2
+	vn2.Successors[2] = s3
+
+	succs, err := vn2.Notify(&vn1.Vnode)
+	if err != nil {
+		t.Fatalf("unexpected error! %s", err)
+	}
+	if succs[0] != s1 {
+		t.Fatalf("unexpected succ 0")
+	}
+	if succs[1] != s2 {
+		t.Fatalf("unexpected succ 1")
+	}
+	if succs[2] != s3 {
+		t.Fatalf("unexpected succ 2")
+	}
+	if vn2.Predecessor != &vn1.Vnode {
+		t.Fatalf("unexpected pred")
+	}
+}
+
+func TestVnodeNotifyNoPred(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+
+	s1 := &chord.Vnode{Id: []byte{1}}
+	s2 := &chord.Vnode{Id: []byte{2}}
+	s3 := &chord.Vnode{Id: []byte{3}}
+
+	vn1 := r.Vnodes[0]
+	vn2 := r.Vnodes[1]
+	vn2.Successors[0] = s1
+	vn2.Successors[1] = s2
+	vn2.Successors[2] = s3
+
+	succs, err := vn2.Notify(&vn1.Vnode)
+	if err != nil {
+		t.Fatalf("unexpected error! %s", err)
+	}
+	if succs[0] != s1 {
+		t.Fatalf("unexpected succ 0")
+	}
+	if succs[1] != s2 {
+		t.Fatalf("unexpected succ 1")
+	}
+	if succs[2] != s3 {
+		t.Fatalf("unexpected succ 2")
+	}
+	if vn2.Predecessor != &vn1.Vnode {
+		t.Fatalf("unexpected pred")
+	}
+}
+
+func TestVnodeNotifyNewPred(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+
+	vn1 := r.Vnodes[0]
+	vn2 := r.Vnodes[1]
+	vn3 := r.Vnodes[2]
+	vn3.Predecessor = &vn1.Vnode
+
+	_, err := vn3.Notify(&vn2.Vnode)
+	if err != nil {
+		t.Fatalf("unexpected error! %s", err)
+	}
+	if vn3.Predecessor != &vn2.Vnode {
+		t.Fatalf("unexpected pred")
+	}
+}
+
+func TestVnodeFixFinger(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+	num := len(r.Vnodes)
+	for i := 0; i < num; i++ {
+		r.Vnodes[i].Init(i)
+		r.Vnodes[i].Successors[0] = &r.Vnodes[(i+1)%num].Vnode
+	}
+
+	// Fix finger should not error
+	vn := r.Vnodes[0]
+	if err := vn.FixFingerTable(); err != nil {
+		t.Fatalf("unexpected err, %s", err)
+	}
+
+	// Check we've progressed
+	if vn.LastFinger != 158 {
+		t.Fatalf("unexpected last finger! %d", vn.LastFinger)
+	}
+
+	// Ensure that we've setup our successor as the initial entries
+	for i := 0; i < vn.LastFinger; i++ {
+		if vn.Finger[i] != vn.Successors[0] {
+			t.Fatalf("unexpected finger entry!")
+		}
+	}
+
+	// Fix next index
+	if err := vn.FixFingerTable(); err != nil {
+		t.Fatalf("unexpected err, %s", err)
+	}
+	if vn.LastFinger != 0 {
+		t.Fatalf("unexpected last finger! %d", vn.LastFinger)
+	}
+}
+
+func TestVnodeCheckPredNoPred(t *testing.T) {
+	v := makeVnode()
+	v.Init(0)
+	if err := v.CheckPredecessor(); err != nil {
+		t.Fatalf("unpexected err! %s", err)
+	}
+}
+
+func TestVnodeCheckLivePred(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+
+	vn1 := r.Vnodes[0]
+	vn2 := r.Vnodes[1]
+	vn2.Predecessor = &vn1.Vnode
+
+	if err := vn2.CheckPredecessor(); err != nil {
+		t.Fatalf("unexpected error! %s", err)
+	}
+	if vn2.Predecessor != &vn1.Vnode {
+		t.Fatalf("unexpected pred")
+	}
+}
+
+func TestVnodeCheckDeadPred(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+
+	vn1 := r.Vnodes[0]
+	vn2 := r.Vnodes[1]
+	vn2.Predecessor = &vn1.Vnode
+
+	// Deregister vn1
+	(r.Transport.(*chord.LocalTransport)).Deregister(&vn1.Vnode)
+
+	if err := vn2.CheckPredecessor(); err != nil {
+		t.Fatalf("unexpected error! %s", err)
+	}
+	if vn2.Predecessor != nil {
+		t.Fatalf("unexpected pred")
+	}
+}
+
+func TestVnodeFindSuccessors(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+	num := len(r.Vnodes)
+	for i := 0; i < num; i++ {
+		r.Vnodes[i].Successors[0] = &r.Vnodes[(i+1)%num].Vnode
+	}
+
+	// Get a random key
+	h := r.Config.HashFunc()
+	h.Write([]byte("test"))
+	key := h.Sum(nil)
+
+	// Local only, should be nearest in the ring
+	nearest := r.NearestVnode(key)
+	exp := nearest.Successors[0]
+
+	// Do a lookup on the key
+	for i := 0; i < len(r.Vnodes); i++ {
+		vn := r.Vnodes[i]
+		succ, err := vn.FindSuccessors(1, key)
+		if err != nil {
+			t.Fatalf("unexpected err! %s", err)
+		}
+
+		// Local only, should be nearest in the ring
+		if exp != succ[0] {
+			t.Fatalf("unexpected succ! K:%x Exp: %s Got:%s",
+				key, exp, succ[0])
+		}
+	}
+}
+
+// Ensure each node has multiple Successors
+func TestVnodeFindSuccessorsMultSucc(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+	num := len(r.Vnodes)
+	for i := 0; i < num; i++ {
+		r.Vnodes[i].Successors[0] = &r.Vnodes[(i+1)%num].Vnode
+		r.Vnodes[i].Successors[1] = &r.Vnodes[(i+2)%num].Vnode
+		r.Vnodes[i].Successors[2] = &r.Vnodes[(i+3)%num].Vnode
+	}
+
+	// Get a random key
+	h := r.Config.HashFunc()
+	h.Write([]byte("test"))
+	key := h.Sum(nil)
+
+	// Local only, should be nearest in the ring
+	nearest := r.NearestVnode(key)
+	exp := nearest.Successors[0]
+
+	// Do a lookup on the key
+	for i := 0; i < len(r.Vnodes); i++ {
+		vn := r.Vnodes[i]
+		succ, err := vn.FindSuccessors(1, key)
+		if err != nil {
+			t.Fatalf("unexpected err! %s", err)
+		}
+
+		// Local only, should be nearest in the ring
+		if exp != succ[0] {
+			t.Fatalf("unexpected succ! K:%x Exp: %s Got:%s",
+				key, exp, succ[0])
+		}
+	}
+}
+
+// Kill off a part of the ring and see what happens
+func TestVnodeFindSuccessorsSomeDead(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+	num := len(r.Vnodes)
+	for i := 0; i < num; i++ {
+		r.Vnodes[i].Successors[0] = &r.Vnodes[(i+1)%num].Vnode
+		r.Vnodes[i].Successors[1] = &r.Vnodes[(i+2)%num].Vnode
+	}
+
+	// Kill 2 of the nodes
+	(r.Transport.(*chord.LocalTransport)).Deregister(&r.Vnodes[0].Vnode)
+	(r.Transport.(*chord.LocalTransport)).Deregister(&r.Vnodes[3].Vnode)
+
+	// Get a random key
+	h := r.Config.HashFunc()
+	h.Write([]byte("test"))
+	key := h.Sum(nil)
+
+	// Local only, should be nearest in the ring
+	nearest := r.NearestVnode(key)
+	exp := nearest.Successors[0]
+
+	// Do a lookup on the key
+	for i := 0; i < len(r.Vnodes); i++ {
+		vn := r.Vnodes[i]
+		succ, err := vn.FindSuccessors(1, key)
+		if err != nil {
+			t.Fatalf("(%d) unexpected err! %s", i, err)
+		}
+
+		// Local only, should be nearest in the ring
+		if exp != succ[0] {
+			t.Fatalf("(%d) unexpected succ! K:%x Exp: %s Got:%s",
+				i, key, exp, succ[0])
+		}
+	}
+}
+
+func TestVnodeClearPred(t *testing.T) {
+	v := makeVnode()
+	v.Init(0)
+	p := &chord.Vnode{Id: []byte{12}}
+	v.Predecessor = p
+	v.ClearPredecessor(p)
+	if v.Predecessor != nil {
+		t.Fatalf("expect no predecessor!")
+	}
+
+	np := &chord.Vnode{Id: []byte{14}}
+	v.Predecessor = p
+	v.ClearPredecessor(np)
+	if v.Predecessor != p {
+		t.Fatalf("expect p predecessor!")
+	}
+}
+
+func TestVnodeSkipSucc(t *testing.T) {
+	v := makeVnode()
+	v.Init(0)
+
+	s1 := &chord.Vnode{Id: []byte{10}}
+	s2 := &chord.Vnode{Id: []byte{11}}
+	s3 := &chord.Vnode{Id: []byte{12}}
+
+	v.Successors[0] = s1
+	v.Successors[1] = s2
+	v.Successors[2] = s3
+
+	// s2 should do nothing
+	if err := v.SkipSuccessor(s2); err != nil {
+		t.Fatalf("unexpected err")
+	}
+	if v.Successors[0] != s1 {
+		t.Fatalf("unexpected suc")
+	}
+
+	// s1 should skip
+	if err := v.SkipSuccessor(s1); err != nil {
+		t.Fatalf("unexpected err")
+	}
+	if v.Successors[0] != s2 {
+		t.Fatalf("unexpected suc")
+	}
+	if v.KnownSuccessors() != 2 {
+		t.Fatalf("bad num of suc")
+	}
+}
+
+func TestVnodeLeave(t *testing.T) {
+	r := makeRing()
+	sort.Sort(r)
+	num := len(r.Vnodes)
+	for i := int(0); i < num; i++ {
+		r.Vnodes[i].Predecessor = &r.Vnodes[(i+num-1)%num].Vnode
+		r.Vnodes[i].Successors[0] = &r.Vnodes[(i+1)%num].Vnode
+		r.Vnodes[i].Successors[1] = &r.Vnodes[(i+2)%num].Vnode
+	}
+
+	// Make node 0 leave
+	if err := r.Vnodes[0].Leave(); err != nil {
+		t.Fatalf("unexpected err")
+	}
+
+	if r.Vnodes[4].Successors[0] != &r.Vnodes[1].Vnode {
+		t.Fatalf("unexpected suc!")
+	}
+	if r.Vnodes[1].Predecessor != nil {
+		t.Fatalf("unexpected pred!")
+	}
+}

+ 117 - 0
chord/iter_closest.go

@@ -0,0 +1,117 @@
+package chord
+
+import (
+	"math/big"
+)
+
+type ClosestPreceedingVnodeIterator struct {
+	key           []byte
+	vn            *LocalVnode
+	finger_idx    int
+	successor_idx int
+	yielded       map[string]struct{}
+}
+
+func (cp *ClosestPreceedingVnodeIterator) Init(vn *LocalVnode, key []byte) {
+	cp.key = key
+	cp.vn = vn
+	cp.successor_idx = len(vn.Successors) - 1
+	cp.finger_idx = len(vn.Finger) - 1
+	cp.yielded = make(map[string]struct{})
+}
+
+func (cp *ClosestPreceedingVnodeIterator) Next() *Vnode {
+	// Try to find each node
+	var successor_node *Vnode
+	var finger_node *Vnode
+
+	// Scan to find the next successor
+	vn := cp.vn
+	var i int
+	for i = cp.successor_idx; i >= 0; i-- {
+		if vn.Successors[i] == nil {
+			continue
+		}
+		if _, ok := cp.yielded[vn.Successors[i].String()]; ok {
+			continue
+		}
+		if Between(vn.Id, cp.key, vn.Successors[i].Id) {
+			successor_node = vn.Successors[i]
+			break
+		}
+	}
+	cp.successor_idx = i
+
+	// Scan to find the next Finger
+	for i = cp.finger_idx; i >= 0; i-- {
+		if vn.Finger[i] == nil {
+			continue
+		}
+		if _, ok := cp.yielded[vn.Finger[i].String()]; ok {
+			continue
+		}
+		if Between(vn.Id, cp.key, vn.Finger[i].Id) {
+			finger_node = vn.Finger[i]
+			break
+		}
+	}
+	cp.finger_idx = i
+
+	// Determine which node is better
+	if successor_node != nil && finger_node != nil {
+		// Determine the closer node
+		hb := cp.vn.Ring.Config.HashBits
+		closest := ClosestPreceedingVnode(successor_node,
+			finger_node, cp.key, hb)
+		if closest == successor_node {
+			cp.successor_idx--
+		} else {
+			cp.finger_idx--
+		}
+		cp.yielded[closest.String()] = struct{}{}
+		return closest
+
+	} else if successor_node != nil {
+		cp.successor_idx--
+		cp.yielded[successor_node.String()] = struct{}{}
+		return successor_node
+
+	} else if finger_node != nil {
+		cp.finger_idx--
+		cp.yielded[finger_node.String()] = struct{}{}
+		return finger_node
+	}
+
+	return nil
+}
+
+// Returns the closest preceeding Vnode to the key
+func ClosestPreceedingVnode(a, b *Vnode, key []byte, bits int) *Vnode {
+	a_dist := Distance(a.Id, key, bits)
+	b_dist := Distance(b.Id, key, bits)
+	if a_dist.Cmp(b_dist) <= 0 {
+		return a
+	} else {
+		return b
+	}
+}
+
+// Computes the forward Distance from a to b modulus a ring size
+func Distance(a, b []byte, bits int) *big.Int {
+	// Get the ring size
+	var ring big.Int
+	ring.Exp(big.NewInt(2), big.NewInt(int64(bits)), nil)
+
+	// Convert to int
+	var a_int, b_int big.Int
+	(&a_int).SetBytes(a)
+	(&b_int).SetBytes(b)
+
+	// Compute the distances
+	var dist big.Int
+	(&dist).Sub(&b_int, &a_int)
+
+	// Distance modulus ring size
+	(&dist).Mod(&dist, &ring)
+	return &dist
+}

+ 824 - 0
chord/net.go

@@ -0,0 +1,824 @@
+package chord
+
+import (
+	"encoding/gob"
+	"fmt"
+	"log"
+	"net"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+/*
+TCPTransport provides a TCP based Chord transport layer. This allows Chord
+to be implemented over a network, instead of only using the LocalTransport. It is
+meant to be a simple implementation, optimizing for simplicity instead of performance.
+Messages are sent with a header frame, followed by a body frame. All data is encoded
+using the GOB format for simplicity.
+
+Internally, there is 1 Goroutine listening for inbound connections, 1 Goroutine PER
+inbound connection.
+*/
+type TCPTransport struct {
+	sock     *net.TCPListener
+	timeout  time.Duration
+	maxIdle  time.Duration
+	lock     sync.RWMutex
+	local    map[string]*localRPC
+	inbound  map[*net.TCPConn]struct{}
+	poolLock sync.Mutex
+	pool     map[string][]*tcpOutConn
+	shutdown int32
+}
+
+type tcpOutConn struct {
+	host   string
+	sock   *net.TCPConn
+	header tcpHeader
+	enc    *gob.Encoder
+	dec    *gob.Decoder
+	used   time.Time
+}
+
+const (
+	tcpPing = iota
+	tcpListReq
+	tcpGetPredReq
+	tcpNotifyReq
+	tcpFindSucReq
+	tcpClearPredReq
+	tcpSkipSucReq
+)
+
+type tcpHeader struct {
+	ReqType int
+}
+
+// Potential body types
+type tcpBodyError struct {
+	Err error
+}
+type tcpBodyString struct {
+	S string
+}
+type tcpBodyVnode struct {
+	Vn *Vnode
+}
+type tcpBodyTwoVnode struct {
+	Target *Vnode
+	Vn     *Vnode
+}
+type tcpBodyFindSuc struct {
+	Target *Vnode
+	Num    int
+	Key    []byte
+}
+type tcpBodyVnodeError struct {
+	Vnode *Vnode
+	Err   error
+}
+type tcpBodyVnodeListError struct {
+	Vnodes []*Vnode
+	Err    error
+}
+type tcpBodyBoolError struct {
+	B   bool
+	Err error
+}
+
+// Creates a new TCP transport on the given listen address with the
+// configured timeout duration.
+func InitTCPTransport(listen string, timeout time.Duration) (*TCPTransport, error) {
+	// Try to start the listener
+	sock, err := net.Listen("tcp", listen)
+	if err != nil {
+		return nil, err
+	}
+
+	// allocate maps
+	local := make(map[string]*localRPC)
+	inbound := make(map[*net.TCPConn]struct{})
+	pool := make(map[string][]*tcpOutConn)
+
+	// Maximum age of a connection
+	maxIdle := time.Duration(300 * time.Second)
+
+	// Setup the transport
+	tcp := &TCPTransport{sock: sock.(*net.TCPListener),
+		timeout: timeout,
+		maxIdle: maxIdle,
+		local:   local,
+		inbound: inbound,
+		pool:    pool}
+
+	// Listen for connections
+	go tcp.listen()
+
+	// Reap old connections
+	go tcp.reapOld()
+
+	// Done
+	return tcp, nil
+}
+
+// Checks for a local vnode
+func (t *TCPTransport) get(vn *Vnode) (VnodeRPC, bool) {
+	key := vn.String()
+	t.lock.RLock()
+	defer t.lock.RUnlock()
+	w, ok := t.local[key]
+	if ok {
+		return w.obj, ok
+	} else {
+		return nil, ok
+	}
+}
+
+// Gets an outbound connection to a host
+func (t *TCPTransport) getConn(host string) (*tcpOutConn, error) {
+	// Check if we have a conn cached
+	var out *tcpOutConn
+	t.poolLock.Lock()
+	if atomic.LoadInt32(&t.shutdown) == 1 {
+		t.poolLock.Unlock()
+		return nil, fmt.Errorf("TCP transport is shutdown")
+	}
+	list, ok := t.pool[host]
+	if ok && len(list) > 0 {
+		out = list[len(list)-1]
+		list = list[:len(list)-1]
+		t.pool[host] = list
+	}
+	t.poolLock.Unlock()
+	if out != nil {
+		// Verify that the socket is valid. Might be closed.
+		if _, err := out.sock.Read(nil); err == nil {
+			return out, nil
+		}
+		out.sock.Close()
+	}
+
+	// Try to establish a connection
+	conn, err := net.DialTimeout("tcp", host, t.timeout)
+	if err != nil {
+		return nil, err
+	}
+
+	// Setup the socket
+	sock := conn.(*net.TCPConn)
+	t.setupConn(sock)
+	enc := gob.NewEncoder(sock)
+	dec := gob.NewDecoder(sock)
+	now := time.Now()
+
+	// Wrap the sock
+	out = &tcpOutConn{host: host, sock: sock, enc: enc, dec: dec, used: now}
+	return out, nil
+}
+
+// Returns an outbound TCP connection to the pool
+func (t *TCPTransport) returnConn(o *tcpOutConn) {
+	// Update the last used time
+	o.used = time.Now()
+
+	// Push back into the pool
+	t.poolLock.Lock()
+	defer t.poolLock.Unlock()
+	if atomic.LoadInt32(&t.shutdown) == 1 {
+		o.sock.Close()
+		return
+	}
+	list, _ := t.pool[o.host]
+	t.pool[o.host] = append(list, o)
+}
+
+// Setup a connection
+func (t *TCPTransport) setupConn(c *net.TCPConn) {
+	c.SetNoDelay(true)
+	c.SetKeepAlive(true)
+}
+
+// Gets a list of the Vnodes on the box
+func (t *TCPTransport) ListVnodes(host string) ([]*Vnode, error) {
+	// Get a conn
+	out, err := t.getConn(host)
+	if err != nil {
+		return nil, err
+	}
+
+	// Response channels
+	respChan := make(chan []*Vnode, 1)
+	errChan := make(chan error, 1)
+
+	go func() {
+		// Send a list command
+		out.header.ReqType = tcpListReq
+		body := tcpBodyString{S: host}
+		if err := out.enc.Encode(&out.header); err != nil {
+			errChan <- err
+			return
+		}
+		if err := out.enc.Encode(&body); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Read in the response
+		resp := tcpBodyVnodeListError{}
+		if err := out.dec.Decode(&resp); err != nil {
+			errChan <- err
+		}
+
+		// Return the connection
+		t.returnConn(out)
+		if resp.Err == nil {
+			respChan <- resp.Vnodes
+		} else {
+			errChan <- resp.Err
+		}
+	}()
+
+	select {
+	case <-time.After(t.timeout):
+		return nil, fmt.Errorf("Command timed out!")
+	case err := <-errChan:
+		return nil, err
+	case res := <-respChan:
+		return res, nil
+	}
+}
+
+// Ping a Vnode, check for liveness
+func (t *TCPTransport) Ping(vn *Vnode) (bool, error) {
+	// Get a conn
+	out, err := t.getConn(vn.Host)
+	if err != nil {
+		return false, err
+	}
+
+	// Response channels
+	respChan := make(chan bool, 1)
+	errChan := make(chan error, 1)
+
+	go func() {
+		// Send a list command
+		out.header.ReqType = tcpPing
+		body := tcpBodyVnode{Vn: vn}
+		if err := out.enc.Encode(&out.header); err != nil {
+			errChan <- err
+			return
+		}
+		if err := out.enc.Encode(&body); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Read in the response
+		resp := tcpBodyBoolError{}
+		if err := out.dec.Decode(&resp); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Return the connection
+		t.returnConn(out)
+		if resp.Err == nil {
+			respChan <- resp.B
+		} else {
+			errChan <- resp.Err
+		}
+	}()
+
+	select {
+	case <-time.After(t.timeout):
+		return false, fmt.Errorf("Command timed out!")
+	case err := <-errChan:
+		return false, err
+	case res := <-respChan:
+		return res, nil
+	}
+}
+
+// Request a nodes predecessor
+func (t *TCPTransport) GetPredecessor(vn *Vnode) (*Vnode, error) {
+	// Get a conn
+	out, err := t.getConn(vn.Host)
+	if err != nil {
+		return nil, err
+	}
+
+	respChan := make(chan *Vnode, 1)
+	errChan := make(chan error, 1)
+
+	go func() {
+		// Send a list command
+		out.header.ReqType = tcpGetPredReq
+		body := tcpBodyVnode{Vn: vn}
+		if err := out.enc.Encode(&out.header); err != nil {
+			errChan <- err
+			return
+		}
+		if err := out.enc.Encode(&body); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Read in the response
+		resp := tcpBodyVnodeError{}
+		if err := out.dec.Decode(&resp); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Return the connection
+		t.returnConn(out)
+		if resp.Err == nil {
+			respChan <- resp.Vnode
+		} else {
+			errChan <- resp.Err
+		}
+	}()
+
+	select {
+	case <-time.After(t.timeout):
+		return nil, fmt.Errorf("Command timed out!")
+	case err := <-errChan:
+		return nil, err
+	case res := <-respChan:
+		return res, nil
+	}
+}
+
+// Notify our successor of ourselves
+func (t *TCPTransport) Notify(target, self *Vnode) ([]*Vnode, error) {
+	// Get a conn
+	out, err := t.getConn(target.Host)
+	if err != nil {
+		return nil, err
+	}
+
+	respChan := make(chan []*Vnode, 1)
+	errChan := make(chan error, 1)
+
+	go func() {
+		// Send a list command
+		out.header.ReqType = tcpNotifyReq
+		body := tcpBodyTwoVnode{Target: target, Vn: self}
+		if err := out.enc.Encode(&out.header); err != nil {
+			errChan <- err
+			return
+		}
+		if err := out.enc.Encode(&body); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Read in the response
+		resp := tcpBodyVnodeListError{}
+		if err := out.dec.Decode(&resp); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Return the connection
+		t.returnConn(out)
+		if resp.Err == nil {
+			respChan <- resp.Vnodes
+		} else {
+			errChan <- resp.Err
+		}
+	}()
+
+	select {
+	case <-time.After(t.timeout):
+		return nil, fmt.Errorf("Command timed out!")
+	case err := <-errChan:
+		return nil, err
+	case res := <-respChan:
+		return res, nil
+	}
+}
+
+// Find a successor
+func (t *TCPTransport) FindSuccessors(vn *Vnode, n int, k []byte) ([]*Vnode, error) {
+	// Get a conn
+	out, err := t.getConn(vn.Host)
+	if err != nil {
+		return nil, err
+	}
+
+	respChan := make(chan []*Vnode, 1)
+	errChan := make(chan error, 1)
+
+	go func() {
+		// Send a list command
+		out.header.ReqType = tcpFindSucReq
+		body := tcpBodyFindSuc{Target: vn, Num: n, Key: k}
+		if err := out.enc.Encode(&out.header); err != nil {
+			errChan <- err
+			return
+		}
+		if err := out.enc.Encode(&body); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Read in the response
+		resp := tcpBodyVnodeListError{}
+		if err := out.dec.Decode(&resp); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Return the connection
+		t.returnConn(out)
+		if resp.Err == nil {
+			respChan <- resp.Vnodes
+		} else {
+			errChan <- resp.Err
+		}
+	}()
+
+	select {
+	case <-time.After(t.timeout):
+		return nil, fmt.Errorf("Command timed out!")
+	case err := <-errChan:
+		return nil, err
+	case res := <-respChan:
+		return res, nil
+	}
+}
+
+// Clears a predecessor if it matches a given vnode. Used to leave.
+func (t *TCPTransport) ClearPredecessor(target, self *Vnode) error {
+	// Get a conn
+	out, err := t.getConn(target.Host)
+	if err != nil {
+		return err
+	}
+
+	respChan := make(chan bool, 1)
+	errChan := make(chan error, 1)
+
+	go func() {
+		// Send a list command
+		out.header.ReqType = tcpClearPredReq
+		body := tcpBodyTwoVnode{Target: target, Vn: self}
+		if err := out.enc.Encode(&out.header); err != nil {
+			errChan <- err
+			return
+		}
+		if err := out.enc.Encode(&body); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Read in the response
+		resp := tcpBodyError{}
+		if err := out.dec.Decode(&resp); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Return the connection
+		t.returnConn(out)
+		if resp.Err == nil {
+			respChan <- true
+		} else {
+			errChan <- resp.Err
+		}
+	}()
+
+	select {
+	case <-time.After(t.timeout):
+		return fmt.Errorf("Command timed out!")
+	case err := <-errChan:
+		return err
+	case <-respChan:
+		return nil
+	}
+}
+
+// Instructs a node to skip a given successor. Used to leave.
+func (t *TCPTransport) SkipSuccessor(target, self *Vnode) error {
+	// Get a conn
+	out, err := t.getConn(target.Host)
+	if err != nil {
+		return err
+	}
+
+	respChan := make(chan bool, 1)
+	errChan := make(chan error, 1)
+
+	go func() {
+		// Send a list command
+		out.header.ReqType = tcpSkipSucReq
+		body := tcpBodyTwoVnode{Target: target, Vn: self}
+		if err := out.enc.Encode(&out.header); err != nil {
+			errChan <- err
+			return
+		}
+		if err := out.enc.Encode(&body); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Read in the response
+		resp := tcpBodyError{}
+		if err := out.dec.Decode(&resp); err != nil {
+			errChan <- err
+			return
+		}
+
+		// Return the connection
+		t.returnConn(out)
+		if resp.Err == nil {
+			respChan <- true
+		} else {
+			errChan <- resp.Err
+		}
+	}()
+
+	select {
+	case <-time.After(t.timeout):
+		return fmt.Errorf("Command timed out!")
+	case err := <-errChan:
+		return err
+	case <-respChan:
+		return nil
+	}
+}
+
+// Register for an RPC callbacks
+func (t *TCPTransport) Register(v *Vnode, o VnodeRPC) {
+	key := v.String()
+	t.lock.Lock()
+	t.local[key] = &localRPC{v, o}
+	t.lock.Unlock()
+}
+
+// Shutdown the TCP transport
+func (t *TCPTransport) Shutdown() {
+	atomic.StoreInt32(&t.shutdown, 1)
+	t.sock.Close()
+
+	// Close all the inbound connections
+	t.lock.RLock()
+	for conn := range t.inbound {
+		conn.Close()
+	}
+	t.lock.RUnlock()
+
+	// Close all the outbound
+	t.poolLock.Lock()
+	for _, conns := range t.pool {
+		for _, out := range conns {
+			out.sock.Close()
+		}
+	}
+	t.pool = nil
+	t.poolLock.Unlock()
+}
+
+// Closes old outbound connections
+func (t *TCPTransport) reapOld() {
+	for {
+		if atomic.LoadInt32(&t.shutdown) == 1 {
+			return
+		}
+		time.Sleep(30 * time.Second)
+		t.reapOnce()
+	}
+}
+
+func (t *TCPTransport) reapOnce() {
+	t.poolLock.Lock()
+	defer t.poolLock.Unlock()
+	for host, conns := range t.pool {
+		max := len(conns)
+		for i := 0; i < max; i++ {
+			if time.Since(conns[i].used) > t.maxIdle {
+				conns[i].sock.Close()
+				conns[i], conns[max-1] = conns[max-1], nil
+				max--
+				i--
+			}
+		}
+		// Trim any idle conns
+		t.pool[host] = conns[:max]
+	}
+}
+
+// Listens for inbound connections
+func (t *TCPTransport) listen() {
+	for {
+		conn, err := t.sock.AcceptTCP()
+		if err != nil {
+			if atomic.LoadInt32(&t.shutdown) == 0 {
+				fmt.Printf("[ERR] Error accepting TCP connection! %s", err)
+				continue
+			} else {
+				return
+			}
+		}
+
+		// Setup the conn
+		t.setupConn(conn)
+
+		// Register the inbound conn
+		t.lock.Lock()
+		t.inbound[conn] = struct{}{}
+		t.lock.Unlock()
+
+		// Start handler
+		go t.handleConn(conn)
+	}
+}
+
+// Handles inbound TCP connections
+func (t *TCPTransport) handleConn(conn *net.TCPConn) {
+	// Defer the cleanup
+	defer func() {
+		t.lock.Lock()
+		delete(t.inbound, conn)
+		t.lock.Unlock()
+		conn.Close()
+	}()
+
+	dec := gob.NewDecoder(conn)
+	enc := gob.NewEncoder(conn)
+	header := tcpHeader{}
+	var sendResp interface{}
+	for {
+		// Get the header
+		if err := dec.Decode(&header); err != nil {
+			if atomic.LoadInt32(&t.shutdown) == 0 && err.Error() != "EOF" {
+				log.Printf("[ERR] Failed to decode TCP header! Got %s", err)
+			}
+			return
+		}
+
+		// Read in the body and process request
+		switch header.ReqType {
+		case tcpPing:
+			body := tcpBodyVnode{}
+			if err := dec.Decode(&body); err != nil {
+				log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
+				return
+			}
+
+			// Generate a response
+			_, ok := t.get(body.Vn)
+			if ok {
+				sendResp = tcpBodyBoolError{B: ok, Err: nil}
+			} else {
+				sendResp = tcpBodyBoolError{B: ok, Err: fmt.Errorf("Target VN not found! Target %s:%s",
+					body.Vn.Host, body.Vn.String())}
+			}
+
+		case tcpListReq:
+			body := tcpBodyString{}
+			if err := dec.Decode(&body); err != nil {
+				log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
+				return
+			}
+
+			// Generate all the local clients
+			res := make([]*Vnode, 0, len(t.local))
+
+			// Build list
+			t.lock.RLock()
+			for _, v := range t.local {
+				res = append(res, v.vnode)
+			}
+			t.lock.RUnlock()
+
+			// Make response
+			sendResp = tcpBodyVnodeListError{Vnodes: trimSlice(res)}
+
+		case tcpGetPredReq:
+			body := tcpBodyVnode{}
+			if err := dec.Decode(&body); err != nil {
+				log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
+				return
+			}
+
+			// Generate a response
+			obj, ok := t.get(body.Vn)
+			resp := tcpBodyVnodeError{}
+			sendResp = &resp
+			if ok {
+				node, err := obj.GetPredecessor()
+				resp.Vnode = node
+				resp.Err = err
+			} else {
+				resp.Err = fmt.Errorf("Target VN not found! Target %s:%s",
+					body.Vn.Host, body.Vn.String())
+			}
+
+		case tcpNotifyReq:
+			body := tcpBodyTwoVnode{}
+			if err := dec.Decode(&body); err != nil {
+				log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
+				return
+			}
+			if body.Target == nil {
+				return
+			}
+
+			// Generate a response
+			obj, ok := t.get(body.Target)
+			resp := tcpBodyVnodeListError{}
+			sendResp = &resp
+			if ok {
+				nodes, err := obj.Notify(body.Vn)
+				resp.Vnodes = trimSlice(nodes)
+				resp.Err = err
+			} else {
+				resp.Err = fmt.Errorf("Target VN not found! Target %s:%s",
+					body.Target.Host, body.Target.String())
+			}
+
+		case tcpFindSucReq:
+			body := tcpBodyFindSuc{}
+			if err := dec.Decode(&body); err != nil {
+				log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
+				return
+			}
+
+			// Generate a response
+			obj, ok := t.get(body.Target)
+			resp := tcpBodyVnodeListError{}
+			sendResp = &resp
+			if ok {
+				nodes, err := obj.FindSuccessors(body.Num, body.Key)
+				resp.Vnodes = trimSlice(nodes)
+				resp.Err = err
+			} else {
+				resp.Err = fmt.Errorf("Target VN not found! Target %s:%s",
+					body.Target.Host, body.Target.String())
+			}
+
+		case tcpClearPredReq:
+			body := tcpBodyTwoVnode{}
+			if err := dec.Decode(&body); err != nil {
+				log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
+				return
+			}
+
+			// Generate a response
+			obj, ok := t.get(body.Target)
+			resp := tcpBodyError{}
+			sendResp = &resp
+			if ok {
+				resp.Err = obj.ClearPredecessor(body.Vn)
+			} else {
+				resp.Err = fmt.Errorf("Target VN not found! Target %s:%s",
+					body.Target.Host, body.Target.String())
+			}
+
+		case tcpSkipSucReq:
+			body := tcpBodyTwoVnode{}
+			if err := dec.Decode(&body); err != nil {
+				log.Printf("[ERR] Failed to decode TCP body! Got %s", err)
+				return
+			}
+
+			// Generate a response
+			obj, ok := t.get(body.Target)
+			resp := tcpBodyError{}
+			sendResp = &resp
+			if ok {
+				resp.Err = obj.SkipSuccessor(body.Vn)
+			} else {
+				resp.Err = fmt.Errorf("Target VN not found! Target %s:%s",
+					body.Target.Host, body.Target.String())
+			}
+
+		default:
+			log.Printf("[ERR] Unknown request type! Got %d", header.ReqType)
+			return
+		}
+
+		// Send the response
+		if err := enc.Encode(sendResp); err != nil {
+			log.Printf("[ERR] Failed to send TCP body! Got %s", err)
+			return
+		}
+	}
+}
+
+// Trims the slice to remove nil elements
+func trimSlice(vn []*Vnode) []*Vnode {
+	if vn == nil {
+		return vn
+	}
+
+	// Find a non-nil index
+	idx := len(vn) - 1
+	for vn[idx] == nil {
+		idx--
+	}
+	return vn[:idx+1]
+}

+ 130 - 0
chord/ring.go

@@ -0,0 +1,130 @@
+package chord
+
+import (
+	"bytes"
+	"log"
+	"sort"
+)
+
+func (r *Ring) Init(conf *Config, trans Transport) {
+	// Set our variables
+	r.Config = conf
+	r.Vnodes = make([]*LocalVnode, conf.NumVnodes)
+	r.Transport = InitLocalTransport(trans)
+	r.delegateCh = make(chan func(), 32)
+
+	// Initializes the Vnodes
+	for i := 0; i < conf.NumVnodes; i++ {
+		vn := &LocalVnode{}
+		r.Vnodes[i] = vn
+		vn.Ring = r
+		vn.Init(i)
+	}
+
+	// Sort the Vnodes
+	sort.Sort(r)
+}
+
+// Len is the number of Vnodes
+func (r *Ring) Len() int {
+	return len(r.Vnodes)
+}
+
+// Less returns whether the vnode with index i should sort
+// before the vnode with index j.
+func (r *Ring) Less(i, j int) bool {
+	return bytes.Compare(r.Vnodes[i].Id, r.Vnodes[j].Id) == -1
+}
+
+// Swap swaps the Vnodes with indexes i and j.
+func (r *Ring) Swap(i, j int) {
+	r.Vnodes[i], r.Vnodes[j] = r.Vnodes[j], r.Vnodes[i]
+}
+
+// Returns the nearest local vnode to the key
+func (r *Ring) NearestVnode(key []byte) *LocalVnode {
+	for i := len(r.Vnodes) - 1; i >= 0; i-- {
+		if bytes.Compare(r.Vnodes[i].Id, key) == -1 {
+			return r.Vnodes[i]
+		}
+	}
+	// Return the last vnode
+	return r.Vnodes[len(r.Vnodes)-1]
+}
+
+// Schedules each vnode in the ring
+func (r *Ring) Schedule() {
+	if r.Config.Delegate != nil {
+		go r.DelegateHandler()
+	}
+	for i := 0; i < len(r.Vnodes); i++ {
+		r.Vnodes[i].Schedule()
+	}
+}
+
+// Wait for all the Vnodes to shutdown
+func (r *Ring) StopVnodes() {
+	r.ChanShutdown = make(chan bool, r.Config.NumVnodes)
+	for i := 0; i < r.Config.NumVnodes; i++ {
+		<-r.ChanShutdown
+	}
+}
+
+// Stops the delegate handler
+func (r *Ring) StopDelegate() {
+	if r.Config.Delegate != nil {
+		// Wait for all delegate messages to be processed
+		<-r.InvokeDelegate(r.Config.Delegate.Shutdown)
+		close(r.delegateCh)
+	}
+}
+
+// Initializes the Vnodes with their local Successors
+func (r *Ring) SetLocalSuccessors() {
+	numV := len(r.Vnodes)
+	numSuc := Min(r.Config.NumSuccessors, numV-1)
+	for idx, vnode := range r.Vnodes {
+		for i := 0; i < numSuc; i++ {
+			vnode.Successors[i] = &r.Vnodes[(idx+i+1)%numV].Vnode
+		}
+	}
+}
+
+// Invokes a function on the delegate and returns completion channel
+func (r *Ring) InvokeDelegate(f func()) chan struct{} {
+	if r.Config.Delegate == nil {
+		return nil
+	}
+
+	ch := make(chan struct{}, 1)
+	wrapper := func() {
+		defer func() {
+			ch <- struct{}{}
+		}()
+		f()
+	}
+
+	r.delegateCh <- wrapper
+	return ch
+}
+
+// This handler runs in a go routine to invoke methods on the delegate
+func (r *Ring) DelegateHandler() {
+	for {
+		f, ok := <-r.delegateCh
+		if !ok {
+			break
+		}
+		r.safeInvoke(f)
+	}
+}
+
+// Called to safely call a function on the delegate
+func (r *Ring) safeInvoke(f func()) {
+	defer func() {
+		if r := recover(); r != nil {
+			log.Printf("Caught a panic invoking a delegate function! Got: %s", r)
+		}
+	}()
+	f()
+}

+ 199 - 0
chord/transport.go

@@ -0,0 +1,199 @@
+package chord
+
+import (
+	"fmt"
+	"sync"
+)
+
+// Wraps vnode and object
+type localRPC struct {
+	vnode *Vnode
+	obj   VnodeRPC
+}
+
+// LocalTransport is used to provides fast routing to Vnodes running
+// locally using direct method calls. For any non-local Vnodes, the
+// request is passed on to another transport.
+type LocalTransport struct {
+	host   string
+	Remote Transport
+	lock   sync.RWMutex
+	Local  map[string]*localRPC
+}
+
+// Creates a local transport to wrap a remote transport
+func InitLocalTransport(remote Transport) Transport {
+	// Replace a nil transport with black hole
+	if remote == nil {
+		remote = &BlackholeTransport{}
+	}
+
+	local := make(map[string]*localRPC)
+	return &LocalTransport{Remote: remote, Local: local}
+}
+
+// Checks for a local vnode
+func (lt *LocalTransport) get(vn *Vnode) (VnodeRPC, bool) {
+	key := vn.String()
+	lt.lock.RLock()
+	defer lt.lock.RUnlock()
+	w, ok := lt.Local[key]
+	if ok {
+		return w.obj, ok
+	} else {
+		return nil, ok
+	}
+}
+
+func (lt *LocalTransport) ListVnodes(host string) ([]*Vnode, error) {
+	// Check if this is a local host
+	if host == lt.host {
+		// Generate all the local clients
+		res := make([]*Vnode, 0, len(lt.Local))
+
+		// Build list
+		lt.lock.RLock()
+		for _, v := range lt.Local {
+			res = append(res, v.vnode)
+		}
+		lt.lock.RUnlock()
+
+		return res, nil
+	}
+
+	// Pass onto remote
+	return lt.Remote.ListVnodes(host)
+}
+
+func (lt *LocalTransport) Ping(vn *Vnode) (bool, error) {
+	// Look for it locally
+	_, ok := lt.get(vn)
+
+	// If it exists locally, handle it
+	if ok {
+		return true, nil
+	}
+
+	// Pass onto remote
+	return lt.Remote.Ping(vn)
+}
+
+func (lt *LocalTransport) GetPredecessor(vn *Vnode) (*Vnode, error) {
+	// Look for it locally
+	obj, ok := lt.get(vn)
+
+	// If it exists locally, handle it
+	if ok {
+		return obj.GetPredecessor()
+	}
+
+	// Pass onto remote
+	return lt.Remote.GetPredecessor(vn)
+}
+
+func (lt *LocalTransport) Notify(vn, self *Vnode) ([]*Vnode, error) {
+	// Look for it locally
+	obj, ok := lt.get(vn)
+
+	// If it exists locally, handle it
+	if ok {
+		return obj.Notify(self)
+	}
+
+	// Pass onto remote
+	return lt.Remote.Notify(vn, self)
+}
+
+func (lt *LocalTransport) FindSuccessors(vn *Vnode, n int, key []byte) ([]*Vnode, error) {
+	// Look for it locally
+	obj, ok := lt.get(vn)
+
+	// If it exists locally, handle it
+	if ok {
+		return obj.FindSuccessors(n, key)
+	}
+
+	// Pass onto remote
+	return lt.Remote.FindSuccessors(vn, n, key)
+}
+
+func (lt *LocalTransport) ClearPredecessor(target, self *Vnode) error {
+	// Look for it locally
+	obj, ok := lt.get(target)
+
+	// If it exists locally, handle it
+	if ok {
+		return obj.ClearPredecessor(self)
+	}
+
+	// Pass onto remote
+	return lt.Remote.ClearPredecessor(target, self)
+}
+
+func (lt *LocalTransport) SkipSuccessor(target, self *Vnode) error {
+	// Look for it locally
+	obj, ok := lt.get(target)
+
+	// If it exists locally, handle it
+	if ok {
+		return obj.SkipSuccessor(self)
+	}
+
+	// Pass onto remote
+	return lt.Remote.SkipSuccessor(target, self)
+}
+
+func (lt *LocalTransport) Register(v *Vnode, o VnodeRPC) {
+	// Register local instance
+	key := v.String()
+	lt.lock.Lock()
+	lt.host = v.Host
+	lt.Local[key] = &localRPC{v, o}
+	lt.lock.Unlock()
+
+	// Register with remote transport
+	lt.Remote.Register(v, o)
+}
+
+func (lt *LocalTransport) Deregister(v *Vnode) {
+	key := v.String()
+	lt.lock.Lock()
+	delete(lt.Local, key)
+	lt.lock.Unlock()
+}
+
+// BlackholeTransport is used to provide an implemenation of the Transport that
+// does not actually do anything. Any operation will result in an error.
+type BlackholeTransport struct {
+}
+
+func (*BlackholeTransport) ListVnodes(host string) ([]*Vnode, error) {
+	return nil, fmt.Errorf("Failed to connect! Blackhole: %s.", host)
+}
+
+func (*BlackholeTransport) Ping(vn *Vnode) (bool, error) {
+	return false, nil
+}
+
+func (*BlackholeTransport) GetPredecessor(vn *Vnode) (*Vnode, error) {
+	return nil, fmt.Errorf("Failed to connect! Blackhole: %s.", vn.String())
+}
+
+func (*BlackholeTransport) Notify(vn, self *Vnode) ([]*Vnode, error) {
+	return nil, fmt.Errorf("Failed to connect! Blackhole: %s", vn.String())
+}
+
+func (*BlackholeTransport) FindSuccessors(vn *Vnode, n int, key []byte) ([]*Vnode, error) {
+	return nil, fmt.Errorf("Failed to connect! Blackhole: %s", vn.String())
+}
+
+func (*BlackholeTransport) ClearPredecessor(target, self *Vnode) error {
+	return fmt.Errorf("Failed to connect! Blackhole: %s", target.String())
+}
+
+func (*BlackholeTransport) SkipSuccessor(target, self *Vnode) error {
+	return fmt.Errorf("Failed to connect! Blackhole: %s", target.String())
+}
+
+func (*BlackholeTransport) Register(v *Vnode, o VnodeRPC) {
+}

+ 112 - 0
chord/util.go

@@ -0,0 +1,112 @@
+package chord
+
+import (
+	"bytes"
+	"fmt"
+	"math/big"
+	"math/rand"
+	"time"
+)
+
+// Generates a random stabilization time
+func RandStabilize(conf *Config) time.Duration {
+	min := conf.StabilizeMin
+	max := conf.StabilizeMax
+	r := rand.Float64()
+	return time.Duration((r * float64(max-min)) + float64(min))
+}
+
+// Checks if a key is STRICTLY between two ID's exclusively
+func Between(id1, id2, key []byte) bool {
+	// Check for ring wrap around
+	if bytes.Compare(id1, id2) == 1 {
+		return bytes.Compare(id1, key) == -1 ||
+			bytes.Compare(id2, key) == 1
+	}
+
+	// Handle the normal case
+	return bytes.Compare(id1, key) == -1 &&
+		bytes.Compare(id2, key) == 1
+}
+
+// Checks if a key is between two ID's, right inclusive
+func BetweenRightIncl(id1, id2, key []byte) bool {
+	// Check for ring wrap around
+	if bytes.Compare(id1, id2) == 1 {
+		return bytes.Compare(id1, key) == -1 ||
+			bytes.Compare(id2, key) >= 0
+	}
+
+	return bytes.Compare(id1, key) == -1 &&
+		bytes.Compare(id2, key) >= 0
+}
+
+// Computes the offset by (n + 2^exp) % (2^mod)
+func PowerOffset(id []byte, exp int, mod int) []byte {
+	// Copy the existing slice
+	off := make([]byte, len(id))
+	copy(off, id)
+
+	// Convert the ID to a bigint
+	idInt := big.Int{}
+	idInt.SetBytes(id)
+
+	// Get the offset
+	two := big.NewInt(2)
+	offset := big.Int{}
+	offset.Exp(two, big.NewInt(int64(exp)), nil)
+
+	// Sum
+	sum := big.Int{}
+	sum.Add(&idInt, &offset)
+
+	// Get the ceiling
+	ceil := big.Int{}
+	ceil.Exp(two, big.NewInt(int64(mod)), nil)
+
+	// Apply the mod
+	idInt.Mod(&sum, &ceil)
+
+	// Add together
+	return idInt.Bytes()
+}
+
+// max returns the max of two ints
+func Max(a, b int) int {
+	if a >= b {
+		return a
+	} else {
+		return b
+	}
+}
+
+// min returns the min of two ints
+func Min(a, b int) int {
+	if a <= b {
+		return a
+	} else {
+		return b
+	}
+}
+
+// Returns the vnode nearest a key
+func NearestVnodeToKey(Vnodes []*Vnode, key []byte) *Vnode {
+	for i := len(Vnodes) - 1; i >= 0; i-- {
+		if bytes.Compare(Vnodes[i].Id, key) == -1 {
+			return Vnodes[i]
+		}
+	}
+	// Return the last vnode
+	return Vnodes[len(Vnodes)-1]
+}
+
+// Merges errors together
+func MergeErrors(err1, err2 error) error {
+	if err1 == nil {
+		return err2
+	} else if err2 == nil {
+		return err1
+	} else {
+		return fmt.Errorf("%s\n%s", err1, err2)
+	}
+}

+ 353 - 0
chord/vnode.go

@@ -0,0 +1,353 @@
+package chord
+
+import (
+	"encoding/binary"
+	"fmt"
+	"log"
+	"time"
+)
+
+// Converts the ID to string
+func (vn *Vnode) String() string {
+	return fmt.Sprintf("%x", vn.Id)
+}
+
+// Initializes a local vnode
+func (vn *LocalVnode) Init(idx int) {
+	// Generate an ID
+	vn.GenId(uint16(idx))
+
+	// Set our host
+	vn.Host = vn.Ring.Config.Hostname
+
+	// Initialize all state
+	vn.Successors = make([]*Vnode, vn.Ring.Config.NumSuccessors)
+	vn.Finger = make([]*Vnode, vn.Ring.Config.HashBits)
+
+	// Register with the RPC mechanism
+	vn.Ring.Transport.Register(&vn.Vnode, vn)
+}
+
+// Schedules the Vnode to do regular maintenence
+func (vn *LocalVnode) Schedule() {
+	// Setup our stabilize timer
+	vn.Timer = time.AfterFunc(RandStabilize(vn.Ring.Config), vn.Stabilize)
+}
+
+// Generates an ID for the node
+func (vn *LocalVnode) GenId(idx uint16) {
+	// Use the hash funciton
+	conf := vn.Ring.Config
+	hash := conf.HashFunc()
+	hash.Write([]byte(conf.Hostname))
+	binary.Write(hash, binary.BigEndian, idx)
+
+	// Use the hash as the ID
+	vn.Id = hash.Sum(nil)
+}
+
+// Called to periodically stabilize the vnode
+func (vn *LocalVnode) Stabilize() {
+	// Clear the timer
+	vn.Timer = nil
+
+	// Check for shutdown
+	if vn.Ring.ChanShutdown != nil {
+		vn.Ring.ChanShutdown <- true
+		return
+	}
+
+	// Setup the next stabilize timer
+	defer vn.Schedule()
+
+	// Check for new successor
+	if err := vn.CheckNewSuccessor(); err != nil {
+		log.Printf("[ERR] Error checking for new successor: %s", err)
+	}
+
+	// Notify the successor
+	if err := vn.NotifySuccessor(); err != nil {
+		log.Printf("[ERR] Error notifying successor: %s", err)
+	}
+
+	// Finger table fix up
+	if err := vn.FixFingerTable(); err != nil {
+		log.Printf("[ERR] Error fixing finger table: %s", err)
+	}
+
+	// Check the predecessor
+	if err := vn.CheckPredecessor(); err != nil {
+		log.Printf("[ERR] Error checking predecessor: %s", err)
+	}
+
+	// Set the last stabilized time
+	vn.Stabilized = time.Now()
+}
+
+// Checks for a new successor
+func (vn *LocalVnode) CheckNewSuccessor() error {
+	// Ask our successor for it's predecessor
+	trans := vn.Ring.Transport
+
+CHECK_NEW_SUC:
+	succ := vn.Successors[0]
+	if succ == nil {
+		panic("Node has no successor!")
+	}
+	maybe_suc, err := trans.GetPredecessor(succ)
+	if err != nil {
+		// Check if we have succ list, try to contact next live succ
+		known := vn.KnownSuccessors()
+		if known > 1 {
+			for i := 0; i < known; i++ {
+				if alive, _ := trans.Ping(vn.Successors[0]); !alive {
+					// Don't eliminate the last successor we know of
+					if i+1 == known {
+						return fmt.Errorf("All known Successors dead!")
+					}
+
+					// Advance the Successors list past the dead one
+					copy(vn.Successors[0:], vn.Successors[1:])
+					vn.Successors[known-1-i] = nil
+				} else {
+					// Found live successor, check for new one
+					goto CHECK_NEW_SUC
+				}
+			}
+		}
+		return err
+	}
+
+	// Check if we should replace our successor
+	if maybe_suc != nil && Between(vn.Id, succ.Id, maybe_suc.Id) {
+		// Check if new successor is alive before switching
+		alive, err := trans.Ping(maybe_suc)
+		if alive && err == nil {
+			copy(vn.Successors[1:], vn.Successors[0:len(vn.Successors)-1])
+			vn.Successors[0] = maybe_suc
+		} else {
+			return err
+		}
+	}
+	return nil
+}
+
+// RPC: Invoked to return out predecessor
+func (vn *LocalVnode) GetPredecessor() (*Vnode, error) {
+	return vn.Predecessor, nil
+}
+
+// Notifies our successor of us, updates successor list
+func (vn *LocalVnode) NotifySuccessor() error {
+	// Notify successor
+	succ := vn.Successors[0]
+	succ_list, err := vn.Ring.Transport.Notify(succ, &vn.Vnode)
+	if err != nil {
+		return err
+	}
+
+	// Trim the Successors list if too long
+	max_succ := vn.Ring.Config.NumSuccessors
+	if len(succ_list) > max_succ-1 {
+		succ_list = succ_list[:max_succ-1]
+	}
+
+	// Update local Successors list
+	for idx, s := range succ_list {
+		if s == nil {
+			break
+		}
+		// Ensure we don't set ourselves as a successor!
+		if s == nil || s.String() == vn.String() {
+			break
+		}
+		vn.Successors[idx+1] = s
+	}
+	return nil
+}
+
+// RPC: Notify is invoked when a Vnode gets notified
+func (vn *LocalVnode) Notify(maybe_pred *Vnode) ([]*Vnode, error) {
+	// Check if we should update our predecessor
+	if vn.Predecessor == nil || Between(vn.Predecessor.Id, vn.Id, maybe_pred.Id) {
+		// Inform the delegate
+		conf := vn.Ring.Config
+		old := vn.Predecessor
+		vn.Ring.InvokeDelegate(func() {
+			conf.Delegate.NewPredecessor(&vn.Vnode, maybe_pred, old)
+		})
+
+		vn.Predecessor = maybe_pred
+	}
+
+	// Return our Successors list
+	return vn.Successors, nil
+}
+
+// Fixes up the finger table
+func (vn *LocalVnode) FixFingerTable() error {
+	// Determine the offset
+	hb := vn.Ring.Config.HashBits
+	offset := PowerOffset(vn.Id, vn.LastFinger, hb)
+
+	// Find the successor
+	nodes, err := vn.FindSuccessors(1, offset)
+	if nodes == nil || len(nodes) == 0 || err != nil {
+		return err
+	}
+	node := nodes[0]
+
+	// Update the finger table
+	vn.Finger[vn.LastFinger] = node
+
+	// Try to skip as many finger entries as possible
+	for {
+		next := vn.LastFinger + 1
+		if next >= hb {
+			break
+		}
+		offset := PowerOffset(vn.Id, next, hb)
+
+		// While the node is the successor, update the finger entries
+		if BetweenRightIncl(vn.Id, node.Id, offset) {
+			vn.Finger[next] = node
+			vn.LastFinger = next
+		} else {
+			break
+		}
+	}
+
+	// Increment to the index to repair
+	if vn.LastFinger+1 == hb {
+		vn.LastFinger = 0
+	} else {
+		vn.LastFinger++
+	}
+
+	return nil
+}
+
+// Checks the health of our predecessor
+func (vn *LocalVnode) CheckPredecessor() error {
+	// Check predecessor
+	if vn.Predecessor != nil {
+		res, err := vn.Ring.Transport.Ping(vn.Predecessor)
+		if err != nil {
+			return err
+		}
+
+		// Predecessor is dead
+		if !res {
+			vn.Predecessor = nil
+		}
+	}
+	return nil
+}
+
+// Finds next N Successors. N must be <= NumSuccessors
+func (vn *LocalVnode) FindSuccessors(n int, key []byte) ([]*Vnode, error) {
+	// Check if we are the immediate predecessor
+	if BetweenRightIncl(vn.Id, vn.Successors[0].Id, key) {
+		return vn.Successors[:n], nil
+	}
+
+	// Try the closest preceeding nodes
+	cp := ClosestPreceedingVnodeIterator{}
+	cp.Init(vn, key)
+	for {
+		// Get the next closest node
+		closest := cp.Next()
+		if closest == nil {
+			break
+		}
+
+		// Try that node, break on success
+		res, err := vn.Ring.Transport.FindSuccessors(closest, n, key)
+		if err == nil {
+			return res, nil
+		} else {
+			log.Printf("[ERR] Failed to contact %s. Got %s", closest.String(), err)
+		}
+	}
+
+	// Determine how many Successors we know of
+	Successors := vn.KnownSuccessors()
+
+	// Check if the ID is between us and any non-immediate Successors
+	for i := 1; i <= Successors-n; i++ {
+		if BetweenRightIncl(vn.Id, vn.Successors[i].Id, key) {
+			remain := vn.Successors[i:]
+			if len(remain) > n {
+				remain = remain[:n]
+			}
+			return remain, nil
+		}
+	}
+
+	// Checked all closer nodes and our Successors!
+	return nil, fmt.Errorf("Exhausted all preceeding nodes!")
+}
+
+// Instructs the vnode to leave
+func (vn *LocalVnode) Leave() error {
+	// Inform the delegate we are leaving
+	conf := vn.Ring.Config
+	pred := vn.Predecessor
+	succ := vn.Successors[0]
+	vn.Ring.InvokeDelegate(func() {
+		conf.Delegate.Leaving(&vn.Vnode, pred, succ)
+	})
+
+	// Notify predecessor to advance to their next successor
+	var err error
+	trans := vn.Ring.Transport
+	if vn.Predecessor != nil {
+		err = trans.SkipSuccessor(vn.Predecessor, &vn.Vnode)
+	}
+
+	// Notify successor to clear old predecessor
+	err = MergeErrors(err, trans.ClearPredecessor(vn.Successors[0], &vn.Vnode))
+	return err
+}
+
+// Used to clear our predecessor when a node is leaving
+func (vn *LocalVnode) ClearPredecessor(p *Vnode) error {
+	if vn.Predecessor != nil && vn.Predecessor.String() == p.String() {
+		// Inform the delegate
+		conf := vn.Ring.Config
+		old := vn.Predecessor
+		vn.Ring.InvokeDelegate(func() {
+			conf.Delegate.PredecessorLeaving(&vn.Vnode, old)
+		})
+		vn.Predecessor = nil
+	}
+	return nil
+}
+
+// Used to skip a successor when a node is leaving
+func (vn *LocalVnode) SkipSuccessor(s *Vnode) error {
+	// Skip if we have a match
+	if vn.Successors[0].String() == s.String() {
+		// Inform the delegate
+		conf := vn.Ring.Config
+		old := vn.Successors[0]
+		vn.Ring.InvokeDelegate(func() {
+			conf.Delegate.SuccessorLeaving(&vn.Vnode, old)
+		})
+
+		known := vn.KnownSuccessors()
+		copy(vn.Successors[0:], vn.Successors[1:])
+		vn.Successors[known-1] = nil
+	}
+	return nil
+}
+
+// Determine how many Successors we know of
+func (vn *LocalVnode) KnownSuccessors() (Successors int) {
+	for i := 0; i < len(vn.Successors); i++ {
+		if vn.Successors[i] != nil {
+			Successors = i + 1
+		}
+	}
+	return
+}

+ 66 - 20
iotest/iotest.go

@@ -32,7 +32,7 @@ import (
 
 func init() {
 	logger.SetFormat("yyyy-MM-dd HH:mm:ss [pid] [level] msg", "\n")
-	logger.SetRollingFile("", "iotest.log", -1, 1*1024*1024, math.MaxInt64, 1)
+	logger.SetRollingFile("", "dfstest.log", -1, 1*1024*1024, math.MaxInt64, 1)
 }
 
 var mdfs billy.Filesystem
@@ -72,7 +72,17 @@ func initDFS(keyspace string) error {
 // 	return os.Remove(filePath)
 // }
 
-func writefile(filePath string, content []byte) error {
+func writefile(filePath string, content []byte) (err error) {
+	// defer func() {
+	// 	x := recover()
+	// 	if x != nil {
+	// 		if e, ok := x.(error); ok {
+	// 			err = e
+	// 		} else {
+	// 			err = fmt.Errorf("writefile error, %v", x)
+	// 		}
+	// 	}
+	// }()
 	dir := path.Dir(filePath)
 	fi, e := mdfs.Stat(dir)
 	if os.IsNotExist(e) || !fi.IsDir() {
@@ -92,7 +102,17 @@ func writefile(filePath string, content []byte) error {
 	return err
 }
 
-func readfile(filePath string) ([]byte, error) {
+func readfile(filePath string) (bs []byte, err error) {
+	// defer func() {
+	// 	x := recover()
+	// 	if x != nil {
+	// 		if e, ok := x.(error); ok {
+	// 			err = e
+	// 		} else {
+	// 			err = fmt.Errorf("readfile error, %v", x)
+	// 		}
+	// 	}
+	// }()
 	var size int
 	if info, err := mdfs.Stat(filePath); err == nil {
 		size64 := info.Size()
@@ -134,25 +154,39 @@ func readfile(filePath string) ([]byte, error) {
 	}
 }
 
-func deletefile(filePath string) error {
+func deletefile(filePath string) (err error) {
+	// defer func() {
+	// 	x := recover()
+	// 	if x != nil {
+	// 		if e, ok := x.(error); ok {
+	// 			err = e
+	// 		} else {
+	// 			err = fmt.Errorf("deletefile error, %v", x)
+	// 		}
+	// 	}
+	// }()
 	return mdfs.Remove(filePath)
 }
 
 func main() {
 	// 设置命令行参数
 	keyspace := flag.String("keyspace", "ootest", "keyspace")
-	rootPath := flag.String("root-path", "_test", "文件目录")
+	rootPath := flag.String("root-path", "/opt/dfstest", "文件目录")
 	delay := flag.Duration("delay", 30*time.Second, "删除延迟时间")
-	concurLimit := flag.Int("concur-limit", 10, "最大并发数")
+	concurLimit := flag.Int("concur-limit", 1, "最大并发数")
 	fileSizeMin := flag.Int("file-size-min", 1024, "最小文件尺寸")
 	fileSizeMax := flag.Int("file-size-max", 10240, "最大文件尺寸")
 	flag.Parse()
 
+	// mutex := sync.Mutex{}
+	// errorcount := map[string]int{}
+
 	// 初始化随机数生成器
 	rand.Seed(time.Now().UnixNano())
 
 	if err := initDFS(*keyspace); err != nil {
 		logger.Error(err)
+		return
 	}
 
 	type fileinfo struct {
@@ -176,8 +210,8 @@ func main() {
 					<-writeChan
 				}()
 				// 随机生成文件名和文件内容
-				dirName := fmt.Sprintf("%02d", rand.Intn(99)+1)
-				fileName := fmt.Sprintf("%06d", i) + ".txt"
+				dirName := fmt.Sprintf("%02d", rand.Intn(100))
+				fileName := fmt.Sprintf("%03d", rand.Intn(1000)) + ".txt"
 				filePath := filepath.Join(*rootPath, dirName, fileName)
 				fileBinarySize := (rand.Intn(*fileSizeMax-*fileSizeMin) + *fileSizeMin) / 4 * 3 // 二进制长度
 
@@ -186,11 +220,13 @@ func main() {
 				rand.Read(bs)
 				content := base64.RawURLEncoding.EncodeToString(bs)
 
-				// 写入文件
-				if err := writefile(filePath, []byte(content)); err != nil {
-					logger.Error("写入文件", filePath, "失败:", err)
-					return
-				}
+				go func() {
+					// 写入文件
+					if err := writefile(filePath, []byte(content)); err != nil {
+						logger.Error("写入文件", filePath, "失败:", err)
+						return
+					}
+				}()
 
 				// 写入完成
 				checkChan <- &fileinfo{
@@ -203,23 +239,33 @@ func main() {
 	}()
 
 	var successCount int32
+	var failedCount int32
 
 	// 并发验证和删除文件
 	go func() {
 		for fi := range checkChan {
 			go func(fi *fileinfo) {
 				// 读取文件
-				content, err := readfile(fi.filepath)
-				if err != nil {
-					logger.Error("读取文件", fi.filepath, "失败:", err)
-					return
+				var content []byte
+				var err error
+				var t = time.Now()
+				for (err == nil || len(content) == 0) && time.Since(t) < 60*time.Second {
+					content, err = readfile(fi.filepath)
+					if err != nil {
+						if !os.IsNotExist(err) {
+							logger.Error("读取文件", fi.filepath, "失败:", err)
+							return
+						}
+						time.Sleep(10 * time.Millisecond)
+					}
 				}
 
 				// 验证文件内容是否正确
 				expectedSize := fi.filesize
 				if len(content) != expectedSize || !bytes.Equal(content, []byte(fi.content)) {
-					logger.Error("验证失败", fi.filepath, fmt.Sprintf("文件内容与期望值不一致,写入内容为:\n%s\n读出内容为:\n%s", string(content), fi.content))
-					os.Exit(1)
+					// logger.Error("验证失败", fi.filepath, fmt.Sprintf("文件内容与期望值不一致,写入内容为:\n%s\n读出内容为:\n%s", fi.content, string(content)))
+					// os.Exit(1)
+					atomic.AddInt32(&failedCount, 1)
 					return
 				}
 
@@ -241,7 +287,7 @@ func main() {
 		select {
 		case <-t.C:
 			// 输出计数信息
-			logger.Info("验证成功的文件数:", successCount)
+			logger.Info("读写一致验证成功数:", successCount, "失败数", failedCount)
 		}
 	}
 }

+ 1 - 1
go.mod

@@ -1,4 +1,4 @@
-module test
+module trial
 
 go 1.19
 

+ 1 - 1
grpc-odbserver/grpc/test/test.go

@@ -5,8 +5,8 @@ import (
 	"fmt"
 	"log"
 	"net"
-	"test/grpc-odbserver/grpc/api"
 	"time"
+	"trial/grpc-odbserver/grpc/api"
 
 	"google.golang.org/grpc"
 	"gopkg.in/yaml.v3"

+ 2 - 2
grpc-odbserver/main.go

@@ -4,8 +4,8 @@ import (
 	"os"
 	"os/signal"
 	"syscall"
-	"test/grpc-odbserver/mnode"
-	"test/grpc-odbserver/msn"
+	"trial/grpc-odbserver/mnode"
+	"trial/grpc-odbserver/msn"
 
 	"git.wecise.com/wecise/common/matrix/logger"
 	"git.wecise.com/wecise/common/matrix/util"

+ 1 - 1
grpc-odbserver/matrix/topo.go

@@ -5,8 +5,8 @@ import (
 	"fmt"
 	"log"
 	"net"
-	"test/grpc-odbserver/grpc/api"
 	"time"
+	"trial/grpc-odbserver/grpc/api"
 
 	"google.golang.org/grpc"
 	"gopkg.in/yaml.v3"

+ 1 - 1
grpc-odbserver/mnode/mnode.go

@@ -2,7 +2,7 @@ package mnode
 
 import (
 	"sync"
-	"test/grpc-odbserver/api"
+	"trial/grpc-odbserver/api"
 
 	"git.wecise.com/wecise/odbserver/mring/matrix"
 	"git.wecise.com/wecise/odbserver/mring/mnet"

+ 1 - 1
grpc-odbserver/mnode/topo.go

@@ -4,8 +4,8 @@ import (
 	"context"
 	"fmt"
 	"log"
-	"test/grpc-odbserver/grpc/api"
 	"time"
+	"trial/grpc-odbserver/grpc/api"
 
 	"google.golang.org/grpc"
 	"gopkg.in/yaml.v3"