chord_test.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. package chord_test
  2. import (
  3. "fmt"
  4. "runtime"
  5. "testing"
  6. "time"
  7. "trial/go-chord"
  8. )
  9. type MultiLocalTrans struct {
  10. remote chord.Transport
  11. hosts map[string]*chord.LocalTransport
  12. }
  13. func InitMLTransport() *MultiLocalTrans {
  14. hosts := make(map[string]*chord.LocalTransport)
  15. remote := &chord.BlackholeTransport{}
  16. ml := &MultiLocalTrans{hosts: hosts}
  17. ml.remote = remote
  18. return ml
  19. }
  20. func (ml *MultiLocalTrans) ListVnodes(host string) ([]*chord.Vnode, error) {
  21. if local, ok := ml.hosts[host]; ok {
  22. return local.ListVnodes(host)
  23. }
  24. return ml.remote.ListVnodes(host)
  25. }
  26. // Ping a Vnode, check for liveness
  27. func (ml *MultiLocalTrans) Ping(v *chord.Vnode) (bool, error) {
  28. if local, ok := ml.hosts[v.Host]; ok {
  29. return local.Ping(v)
  30. }
  31. return ml.remote.Ping(v)
  32. }
  33. // Request a nodes predecessor
  34. func (ml *MultiLocalTrans) GetPredecessor(v *chord.Vnode) (*chord.Vnode, error) {
  35. if local, ok := ml.hosts[v.Host]; ok {
  36. return local.GetPredecessor(v)
  37. }
  38. return ml.remote.GetPredecessor(v)
  39. }
  40. // Notify our successor of ourselves
  41. func (ml *MultiLocalTrans) Notify(target, self *chord.Vnode) ([]*chord.Vnode, error) {
  42. if local, ok := ml.hosts[target.Host]; ok {
  43. return local.Notify(target, self)
  44. }
  45. return ml.remote.Notify(target, self)
  46. }
  47. // Find a successor
  48. func (ml *MultiLocalTrans) FindSuccessors(v *chord.Vnode, n int, k []byte) ([]*chord.Vnode, error) {
  49. if local, ok := ml.hosts[v.Host]; ok {
  50. return local.FindSuccessors(v, n, k)
  51. }
  52. return ml.remote.FindSuccessors(v, n, k)
  53. }
  54. // Clears a predecessor if it matches a given vnode. Used to leave.
  55. func (ml *MultiLocalTrans) ClearPredecessor(target, self *chord.Vnode) error {
  56. if local, ok := ml.hosts[target.Host]; ok {
  57. return local.ClearPredecessor(target, self)
  58. }
  59. return ml.remote.ClearPredecessor(target, self)
  60. }
  61. // Instructs a node to skip a given successor. Used to leave.
  62. func (ml *MultiLocalTrans) SkipSuccessor(target, self *chord.Vnode) error {
  63. if local, ok := ml.hosts[target.Host]; ok {
  64. return local.SkipSuccessor(target, self)
  65. }
  66. return ml.remote.SkipSuccessor(target, self)
  67. }
  68. func (ml *MultiLocalTrans) Register(v *chord.Vnode, o chord.VnodeRPC) {
  69. local, ok := ml.hosts[v.Host]
  70. if !ok {
  71. local = chord.InitLocalTransport(nil).(*chord.LocalTransport)
  72. ml.hosts[v.Host] = local
  73. }
  74. local.Register(v, o)
  75. }
  76. func (ml *MultiLocalTrans) Deregister(host string) {
  77. delete(ml.hosts, host)
  78. }
  79. func TestDefaultConfig(t *testing.T) {
  80. conf := chord.DefaultConfig("test")
  81. if conf.Hostname != "test" {
  82. t.Fatalf("bad hostname")
  83. }
  84. if conf.NumVnodes != 8 {
  85. t.Fatalf("bad num Vnodes")
  86. }
  87. if conf.NumSuccessors != 8 {
  88. t.Fatalf("bad num succ")
  89. }
  90. if conf.HashFunc == nil {
  91. t.Fatalf("bad hash")
  92. }
  93. if conf.HashBits != 160 {
  94. t.Fatalf("bad hash bits")
  95. }
  96. if conf.StabilizeMin != time.Duration(15*time.Second) {
  97. t.Fatalf("bad min stable")
  98. }
  99. if conf.StabilizeMax != time.Duration(45*time.Second) {
  100. t.Fatalf("bad max stable")
  101. }
  102. if conf.Delegate != nil {
  103. t.Fatalf("bad delegate")
  104. }
  105. }
  106. func fastConf() *chord.Config {
  107. conf := chord.DefaultConfig("test")
  108. conf.StabilizeMin = time.Duration(15 * time.Millisecond)
  109. conf.StabilizeMax = time.Duration(45 * time.Millisecond)
  110. return conf
  111. }
  112. func TestCreateShutdown(t *testing.T) {
  113. // Start the timer thread
  114. time.After(15)
  115. conf := fastConf()
  116. numGo := runtime.NumGoroutine()
  117. r, err := chord.Create(conf, nil)
  118. if err != nil {
  119. t.Fatalf("unexpected err. %s", err)
  120. }
  121. r.Shutdown()
  122. after := runtime.NumGoroutine()
  123. if after != numGo {
  124. t.Fatalf("unexpected routines! A:%d B:%d", after, numGo)
  125. }
  126. }
  127. func TestJoin(t *testing.T) {
  128. // Create a multi transport
  129. ml := InitMLTransport()
  130. // Create the initial ring
  131. conf := fastConf()
  132. r, err := chord.Create(conf, ml)
  133. if err != nil {
  134. t.Fatalf("unexpected err. %s", err)
  135. }
  136. // Create a second ring
  137. conf2 := fastConf()
  138. conf2.Hostname = "test2"
  139. r2, err := chord.Join(conf2, ml, "test")
  140. if err != nil {
  141. t.Fatalf("failed to join local node! Got %s", err)
  142. }
  143. // Shutdown
  144. r.Shutdown()
  145. r2.Shutdown()
  146. }
  147. func TestJoinDeadHost(t *testing.T) {
  148. // Create a multi transport
  149. ml := InitMLTransport()
  150. // Create the initial ring
  151. conf := fastConf()
  152. _, err := chord.Join(conf, ml, "noop")
  153. if err == nil {
  154. t.Fatalf("expected err!")
  155. }
  156. }
  157. func TestLeave(t *testing.T) {
  158. // Create a multi transport
  159. ml := InitMLTransport()
  160. // Create the initial ring
  161. conf := fastConf()
  162. r, err := chord.Create(conf, ml)
  163. if err != nil {
  164. t.Fatalf("unexpected err. %s", err)
  165. }
  166. // Create a second ring
  167. conf2 := fastConf()
  168. conf2.Hostname = "test2"
  169. r2, err := chord.Join(conf2, ml, "test")
  170. if err != nil {
  171. t.Fatalf("failed to join local node! Got %s", err)
  172. }
  173. // Wait for some stabilization
  174. <-time.After(100 * time.Millisecond)
  175. // Node 1 should leave
  176. r.Leave()
  177. ml.Deregister("test")
  178. // Wait for stabilization
  179. <-time.After(100 * time.Millisecond)
  180. // Verify r2 ring is still in tact
  181. num := len(r2.Vnodes)
  182. for idx, vn := range r2.Vnodes {
  183. if vn.Successors[0] != &r2.Vnodes[(idx+1)%num].Vnode {
  184. t.Fatalf("bad successor! Got:%s:%s", vn.Successors[0].Host,
  185. vn.Successors[0])
  186. }
  187. }
  188. }
  189. func TestLookupBadN(t *testing.T) {
  190. // Create a multi transport
  191. ml := InitMLTransport()
  192. // Create the initial ring
  193. conf := fastConf()
  194. r, err := chord.Create(conf, ml)
  195. if err != nil {
  196. t.Fatalf("unexpected err. %s", err)
  197. }
  198. vns, err := r.Lookup(5, []byte("test"))
  199. fmt.Println("vns::", fmt.Sprintf("%s", vns))
  200. // if err == nil {
  201. // t.Fatalf("expected err!")
  202. // }
  203. for idx := range vns {
  204. fmt.Println(vns[idx].String(), vns[idx].Host, fmt.Sprintf("%x", vns[idx].Id))
  205. }
  206. }
  207. func TestLookup(t *testing.T) {
  208. // Create a multi transport
  209. ml := InitMLTransport()
  210. // Create the initial ring
  211. conf := fastConf()
  212. conf.NumVnodes = 8
  213. conf.NumSuccessors = 8
  214. r, err := chord.Create(conf, ml)
  215. if err != nil {
  216. t.Fatalf("unexpected err. %s", err)
  217. }
  218. fmt.Println("conf::", fmt.Sprintf("%#v", conf))
  219. fmt.Println("ml::", fmt.Sprintf("%#v", ml))
  220. fmt.Println("r::", fmt.Sprintf("%#v", r))
  221. // Create a second ring
  222. conf2 := fastConf()
  223. conf2.NumVnodes = 8
  224. conf2.NumSuccessors = 8
  225. conf2.Hostname = "test2"
  226. r2, err := chord.Join(conf2, ml, "test")
  227. if err != nil {
  228. t.Fatalf("failed to join local node! Got %s", err)
  229. }
  230. fmt.Println("conf2::", fmt.Sprintf("%#v", conf2))
  231. fmt.Println("ml::", fmt.Sprintf("%#v", ml))
  232. fmt.Println("r2::", fmt.Sprintf("%#v", r2))
  233. fmt.Println("r::", fmt.Sprintf("%#v", r))
  234. for n := 3; n <= 3; n++ {
  235. // Create a second ring
  236. confx := fastConf()
  237. confx.NumVnodes = 8
  238. confx.NumSuccessors = 8
  239. confx.Hostname = fmt.Sprintf("%s%d", "test", n)
  240. _, err = chord.Join(confx, ml, "test")
  241. if err != nil {
  242. t.Fatalf("failed to join local node! Got %s", err)
  243. }
  244. }
  245. // Wait for some stabilization
  246. <-time.After(100 * time.Millisecond)
  247. // Try key lookup
  248. keys := [][]byte{[]byte("test"), []byte("foo"), []byte("bar"), []byte("trial")}
  249. for _, k := range keys {
  250. fmt.Println("lookup", fmt.Sprintf("%s", k))
  251. vn1, err := r.Lookup(2, k)
  252. if err != nil {
  253. t.Fatalf("unexpected err %s", err)
  254. }
  255. vn2, err := r2.Lookup(2, k)
  256. if err != nil {
  257. t.Fatalf("unexpected err %s", err)
  258. }
  259. if len(vn1) != len(vn2) {
  260. t.Fatalf("result len differs!")
  261. }
  262. fmt.Println("vn1", fmt.Sprintf("%#v", vn1))
  263. fmt.Println("vn2", fmt.Sprintf("%#v", vn2))
  264. for idx := range vn1 {
  265. fmt.Println(vn1[idx].String(), vn1[idx].Host, fmt.Sprintf("%x", vn1[idx].Id))
  266. if vn1[idx].String() != vn2[idx].String() {
  267. t.Fatalf("results differ!")
  268. }
  269. }
  270. }
  271. }
  272. func TestLookupN(t *testing.T) {
  273. // Create a multi transport
  274. ml := InitMLTransport()
  275. // Create the initial ring
  276. conf := fastConf()
  277. conf.NumVnodes = 16
  278. conf.NumSuccessors = 8
  279. r, err := chord.Create(conf, ml)
  280. if err != nil {
  281. t.Fatalf("unexpected err. %s", err)
  282. }
  283. fmt.Println("conf::", fmt.Sprintf("%#v", conf))
  284. fmt.Println("ml::", fmt.Sprintf("%#v", ml))
  285. fmt.Println("r::", fmt.Sprintf("%#v", r))
  286. for n := 2; n <= 4; n++ {
  287. // Create a second ring
  288. confx := fastConf()
  289. confx.NumVnodes = 16
  290. confx.NumSuccessors = 8
  291. confx.Hostname = fmt.Sprintf("%s%d", "test", n)
  292. _, err = chord.Join(confx, ml, "test")
  293. if err != nil {
  294. t.Fatalf("failed to join local node! Got %s", err)
  295. }
  296. }
  297. // Wait for some stabilization
  298. <-time.After(100 * time.Millisecond)
  299. kc := map[string]int{}
  300. for i := 0; i < 10000; i++ {
  301. k := []byte(fmt.Sprint("key", i))
  302. fmt.Println("lookup", fmt.Sprintf("%s", k))
  303. vns, err := r.Lookup(3, k)
  304. if err != nil {
  305. t.Fatalf("unexpected err %s", err)
  306. }
  307. for idx := range vns {
  308. kc[vns[idx].Host]++
  309. fmt.Println(vns[idx].String(), vns[idx].Host, fmt.Sprintf("%x", vns[idx].Id))
  310. }
  311. }
  312. fmt.Println("count:", kc)
  313. }