123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- package mgossip
- import (
- "math"
- "git.wecise.com/wecise/odbserver/mring/matrix"
- "git.wecise.com/wecise/odbserver/mring/mgossip"
- )
- type Router interface {
- Init() error
- Start() error
- }
- type mRouter struct {
- cnode matrix.ClusterNode
- mtopo mgossip.Topology
- }
- func NewMRouter(cnode matrix.ClusterNode) Router {
- mtopo := mgossip.MTopology(cnode)
- mr := &mRouter{
- cnode: cnode,
- mtopo: mtopo}
- // mtopo.OnTopoChange(func(connectedPeers map[matrix.PeerID]matrix.PeerIDs, update *mgossip.TopoInfo) {
- // peers := matrix.MPeerIDSet(cnode.Peers())
- // if !mpeers.Equal(peers) {
- // mpeers = peers
- // logger.Info(cnode.Name(), cnode.ID(), ":\n", mpeers.List())
- // }
- // })
- return mr
- }
- func (mr *mRouter) Init() (err error) {
- // 拓扑管理初始化,开启本地网络监听端口,
- if err = mr.mtopo.Init(); err != nil {
- return
- }
- return
- }
- func (mr *mRouter) Start() (err error) {
- // 拓扑管理初始化,开启本地网络监听端口,
- if err = mr.mtopo.Start(); err != nil {
- return
- }
- return
- }
- func (mr *mRouter) topoGraph(ti *mgossip.TopoInfo) {
- if ti == nil {
- return
- }
- fw := newFloydWarshall(len(ti.Nodes))
- for pid, ni := range ti.Nodes {
- bi := ni.BaseInfo
- if bi == nil {
- continue // 正常情况下不会为空
- }
- for rid, pi := range ni.Peers {
- // TODO 统计分析得出连接权重
- if len(pi.Connections) > 0 {
- fw.addEdge(pid, rid, 1)
- fw.addEdge(rid, pid, 1)
- }
- }
- }
- return
- }
- type graph struct {
- to int
- wt float64
- }
- type FloydWarshall struct {
- mindex map[matrix.PeerID]int
- rindex []matrix.PeerID
- mgraph [][]graph
- mpath [][][]matrix.PeerID
- mdist [][]float64
- }
- func newFloydWarshall(nodescount int) *FloydWarshall {
- return &FloydWarshall{
- mindex: map[matrix.PeerID]int{},
- rindex: make([]matrix.PeerID, nodescount+1),
- mgraph: make([][]graph, nodescount+1),
- }
- }
- func (fw *FloydWarshall) index(pid matrix.PeerID) int {
- fi, has := fw.mindex[pid]
- if !has {
- fi = len(fw.mindex) + 1
- fw.mindex[pid] = fi
- }
- if len(fw.rindex) < fi+1 {
- fw.rindex = append(fw.rindex, pid)
- } else {
- fw.rindex[fi] = pid
- }
- return fi
- }
- func (fw *FloydWarshall) addEdge(from, to matrix.PeerID, weight float64) {
- fi := fw.index(from)
- ti := fw.index(to)
- if len(fw.mgraph) < fi+1 {
- fw.mgraph = append(fw.mgraph, []graph{})
- }
- fw.mgraph[fi] = append(fw.mgraph[fi], graph{ti, weight})
- }
- func (fw *FloydWarshall) calculate() {
- fw.mpath = make([][][]matrix.PeerID, len(fw.mgraph))
- fw.mdist = make([][]float64, len(fw.mgraph))
- for i := range fw.mdist {
- pi := make([][]matrix.PeerID, len(fw.mgraph))
- di := make([]float64, len(fw.mgraph))
- for j := range di {
- di[j] = math.Inf(1)
- pi[j] = []matrix.PeerID{}
- }
- di[i] = 0
- fw.mdist[i] = di
- fw.mpath[i] = pi
- }
- for u, graphs := range fw.mgraph {
- for _, v := range graphs {
- fw.mdist[u][v.to] = v.wt
- fw.mpath[u][v.to] = []matrix.PeerID{fw.rindex[u], fw.rindex[v.to]}
- }
- }
- for k, dk := range fw.mdist {
- pk := fw.mpath[k]
- for i, di := range fw.mdist {
- pi := fw.mpath[i]
- for j, dij := range di {
- if d := di[k] + dk[j]; dij > d {
- di[j] = d
- pi[j] = append(pi[k], pk[j][1:]...)
- }
- }
- }
- }
- }
- func (fw *FloydWarshall) getPath(from, to matrix.PeerID) (dist float64, path matrix.PeerIDs) {
- fi := fw.index(from)
- ti := fw.index(to)
- if fi >= len(fw.mpath) || fi >= len(fw.mdist) {
- return 0, nil
- }
- if ti >= len(fw.mpath[fi]) || ti >= len(fw.mdist[fi]) {
- return 0, nil
- }
- return fw.mdist[fi][ti], fw.mpath[fi][ti]
- }
|