router_floyd.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package mgossip
  2. import (
  3. "math"
  4. "git.wecise.com/wecise/odbserver/mring/matrix"
  5. "git.wecise.com/wecise/odbserver/mring/mgossip"
  6. )
  7. type Router interface {
  8. Init() error
  9. Start() error
  10. }
  11. type mRouter struct {
  12. cnode matrix.ClusterNode
  13. mtopo mgossip.Topology
  14. }
  15. func NewMRouter(cnode matrix.ClusterNode) Router {
  16. mtopo := mgossip.MTopology(cnode)
  17. mr := &mRouter{
  18. cnode: cnode,
  19. mtopo: mtopo}
  20. // mtopo.OnTopoChange(func(connectedPeers map[matrix.PeerID]matrix.PeerIDs, update *mgossip.TopoInfo) {
  21. // peers := matrix.MPeerIDSet(cnode.Peers())
  22. // if !mpeers.Equal(peers) {
  23. // mpeers = peers
  24. // logger.Info(cnode.Name(), cnode.ID(), ":\n", mpeers.List())
  25. // }
  26. // })
  27. return mr
  28. }
  29. func (mr *mRouter) Init() (err error) {
  30. // 拓扑管理初始化,开启本地网络监听端口,
  31. if err = mr.mtopo.Init(); err != nil {
  32. return
  33. }
  34. return
  35. }
  36. func (mr *mRouter) Start() (err error) {
  37. // 拓扑管理初始化,开启本地网络监听端口,
  38. if err = mr.mtopo.Start(); err != nil {
  39. return
  40. }
  41. return
  42. }
  43. func (mr *mRouter) topoGraph(ti *mgossip.TopoInfo) {
  44. if ti == nil {
  45. return
  46. }
  47. fw := newFloydWarshall(len(ti.Nodes))
  48. for pid, ni := range ti.Nodes {
  49. bi := ni.BaseInfo
  50. if bi == nil {
  51. continue // 正常情况下不会为空
  52. }
  53. for rid, pi := range ni.Peers {
  54. // TODO 统计分析得出连接权重
  55. if len(pi.Connections) > 0 {
  56. fw.addEdge(pid, rid, 1)
  57. fw.addEdge(rid, pid, 1)
  58. }
  59. }
  60. }
  61. return
  62. }
  63. type graph struct {
  64. to int
  65. wt float64
  66. }
  67. type FloydWarshall struct {
  68. mindex map[matrix.PeerID]int
  69. rindex []matrix.PeerID
  70. mgraph [][]graph
  71. mpath [][][]matrix.PeerID
  72. mdist [][]float64
  73. }
  74. func newFloydWarshall(nodescount int) *FloydWarshall {
  75. return &FloydWarshall{
  76. mindex: map[matrix.PeerID]int{},
  77. rindex: make([]matrix.PeerID, nodescount+1),
  78. mgraph: make([][]graph, nodescount+1),
  79. }
  80. }
  81. func (fw *FloydWarshall) index(pid matrix.PeerID) int {
  82. fi, has := fw.mindex[pid]
  83. if !has {
  84. fi = len(fw.mindex) + 1
  85. fw.mindex[pid] = fi
  86. }
  87. if len(fw.rindex) < fi+1 {
  88. fw.rindex = append(fw.rindex, pid)
  89. } else {
  90. fw.rindex[fi] = pid
  91. }
  92. return fi
  93. }
  94. func (fw *FloydWarshall) addEdge(from, to matrix.PeerID, weight float64) {
  95. fi := fw.index(from)
  96. ti := fw.index(to)
  97. if len(fw.mgraph) < fi+1 {
  98. fw.mgraph = append(fw.mgraph, []graph{})
  99. }
  100. fw.mgraph[fi] = append(fw.mgraph[fi], graph{ti, weight})
  101. }
  102. func (fw *FloydWarshall) calculate() {
  103. fw.mpath = make([][][]matrix.PeerID, len(fw.mgraph))
  104. fw.mdist = make([][]float64, len(fw.mgraph))
  105. for i := range fw.mdist {
  106. pi := make([][]matrix.PeerID, len(fw.mgraph))
  107. di := make([]float64, len(fw.mgraph))
  108. for j := range di {
  109. di[j] = math.Inf(1)
  110. pi[j] = []matrix.PeerID{}
  111. }
  112. di[i] = 0
  113. fw.mdist[i] = di
  114. fw.mpath[i] = pi
  115. }
  116. for u, graphs := range fw.mgraph {
  117. for _, v := range graphs {
  118. fw.mdist[u][v.to] = v.wt
  119. fw.mpath[u][v.to] = []matrix.PeerID{fw.rindex[u], fw.rindex[v.to]}
  120. }
  121. }
  122. for k, dk := range fw.mdist {
  123. pk := fw.mpath[k]
  124. for i, di := range fw.mdist {
  125. pi := fw.mpath[i]
  126. for j, dij := range di {
  127. if d := di[k] + dk[j]; dij > d {
  128. di[j] = d
  129. pi[j] = append(pi[k], pk[j][1:]...)
  130. }
  131. }
  132. }
  133. }
  134. }
  135. func (fw *FloydWarshall) getPath(from, to matrix.PeerID) (dist float64, path matrix.PeerIDs) {
  136. fi := fw.index(from)
  137. ti := fw.index(to)
  138. if fi >= len(fw.mpath) || fi >= len(fw.mdist) {
  139. return 0, nil
  140. }
  141. if ti >= len(fw.mpath[fi]) || ti >= len(fw.mdist[fi]) {
  142. return 0, nil
  143. }
  144. return fw.mdist[fi][ti], fw.mpath[fi][ti]
  145. }