package mgossip import ( "math" "git.wecise.com/wecise/odbserver/mring/matrix" "git.wecise.com/wecise/odbserver/mring/mgossip" ) type Router interface { Init() error Start() error } type mRouter struct { cnode matrix.ClusterNode mtopo mgossip.Topology } func NewMRouter(cnode matrix.ClusterNode) Router { mtopo := mgossip.MTopology(cnode) mr := &mRouter{ cnode: cnode, mtopo: mtopo} // mtopo.OnTopoChange(func(connectedPeers map[matrix.PeerID]matrix.PeerIDs, update *mgossip.TopoInfo) { // peers := matrix.MPeerIDSet(cnode.Peers()) // if !mpeers.Equal(peers) { // mpeers = peers // logger.Info(cnode.Name(), cnode.ID(), ":\n", mpeers.List()) // } // }) return mr } func (mr *mRouter) Init() (err error) { // 拓扑管理初始化,开启本地网络监听端口, if err = mr.mtopo.Init(); err != nil { return } return } func (mr *mRouter) Start() (err error) { // 拓扑管理初始化,开启本地网络监听端口, if err = mr.mtopo.Start(); err != nil { return } return } func (mr *mRouter) topoGraph(ti *mgossip.TopoInfo) { if ti == nil { return } fw := newFloydWarshall(len(ti.Nodes)) for pid, ni := range ti.Nodes { bi := ni.BaseInfo if bi == nil { continue // 正常情况下不会为空 } for rid, pi := range ni.Peers { // TODO 统计分析得出连接权重 if len(pi.Connections) > 0 { fw.addEdge(pid, rid, 1) fw.addEdge(rid, pid, 1) } } } return } type graph struct { to int wt float64 } type FloydWarshall struct { mindex map[matrix.PeerID]int rindex []matrix.PeerID mgraph [][]graph mpath [][][]matrix.PeerID mdist [][]float64 } func newFloydWarshall(nodescount int) *FloydWarshall { return &FloydWarshall{ mindex: map[matrix.PeerID]int{}, rindex: make([]matrix.PeerID, nodescount+1), mgraph: make([][]graph, nodescount+1), } } func (fw *FloydWarshall) index(pid matrix.PeerID) int { fi, has := fw.mindex[pid] if !has { fi = len(fw.mindex) + 1 fw.mindex[pid] = fi } if len(fw.rindex) < fi+1 { fw.rindex = append(fw.rindex, pid) } else { fw.rindex[fi] = pid } return fi } func (fw *FloydWarshall) addEdge(from, to matrix.PeerID, weight float64) { fi := fw.index(from) ti := fw.index(to) if len(fw.mgraph) < fi+1 { fw.mgraph = append(fw.mgraph, []graph{}) } fw.mgraph[fi] = append(fw.mgraph[fi], graph{ti, weight}) } func (fw *FloydWarshall) calculate() { fw.mpath = make([][][]matrix.PeerID, len(fw.mgraph)) fw.mdist = make([][]float64, len(fw.mgraph)) for i := range fw.mdist { pi := make([][]matrix.PeerID, len(fw.mgraph)) di := make([]float64, len(fw.mgraph)) for j := range di { di[j] = math.Inf(1) pi[j] = []matrix.PeerID{} } di[i] = 0 fw.mdist[i] = di fw.mpath[i] = pi } for u, graphs := range fw.mgraph { for _, v := range graphs { fw.mdist[u][v.to] = v.wt fw.mpath[u][v.to] = []matrix.PeerID{fw.rindex[u], fw.rindex[v.to]} } } for k, dk := range fw.mdist { pk := fw.mpath[k] for i, di := range fw.mdist { pi := fw.mpath[i] for j, dij := range di { if d := di[k] + dk[j]; dij > d { di[j] = d pi[j] = append(pi[k], pk[j][1:]...) } } } } } func (fw *FloydWarshall) getPath(from, to matrix.PeerID) (dist float64, path matrix.PeerIDs) { fi := fw.index(from) ti := fw.index(to) if fi >= len(fw.mpath) || fi >= len(fw.mdist) { return 0, nil } if ti >= len(fw.mpath[fi]) || ti >= len(fw.mdist[fi]) { return 0, nil } return fw.mdist[fi][ti], fw.mpath[fi][ti] }