mnode.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package mnode
  2. import (
  3. "sync"
  4. "trial/grpc-odbserver/api"
  5. "git.wecise.com/wecise/mring/matrix"
  6. "git.wecise.com/wecise/mring/mnet"
  7. )
  8. const UNKOWN = 0
  9. const RUNNING = 1
  10. const SUSPEND = 2
  11. type MNode struct {
  12. listener mnet.Listener
  13. mutex sync.Mutex
  14. status int
  15. services map[string]api.Service
  16. }
  17. var _ = api.Node((*MNode)(nil))
  18. func (mn *MNode) ID() int64 {
  19. return int64(matrix.ID)
  20. }
  21. func (mn *MNode) Name() string {
  22. return mnet.MClusterNode(matrix.ID).Name()
  23. }
  24. func (mn *MNode) Init() {
  25. mn.services = make(map[string]api.Service)
  26. }
  27. func (mn *MNode) Destroy() {
  28. mn.Stop()
  29. for _, svc := range mn.services {
  30. svc.Destroy()
  31. }
  32. mn.services = nil
  33. }
  34. func (mn *MNode) RegisterService(svc api.Service) {
  35. svc.Init()
  36. mn.mutex.Lock()
  37. defer mn.mutex.Unlock()
  38. if osvc, ok := mn.services[svc.Name()]; ok {
  39. osvc.Stop()
  40. osvc.Destroy()
  41. }
  42. mn.services[svc.Name()] = svc
  43. if mn.status == RUNNING {
  44. svc.Start()
  45. }
  46. }
  47. func (mn *MNode) UnregisterService(svc api.Service) {
  48. mn.mutex.Lock()
  49. defer mn.mutex.Unlock()
  50. if osvc, ok := mn.services[svc.Name()]; ok && osvc == svc {
  51. osvc.Stop()
  52. osvc.Destroy()
  53. }
  54. }
  55. func (mn *MNode) Start() {
  56. mn.mutex.Lock()
  57. defer mn.mutex.Unlock()
  58. if mn.status == RUNNING {
  59. return
  60. }
  61. cfg := matrix.Config()
  62. listener, err := mnet.MListener(cfg.NodeName, "", cfg.Port, nil).Listen()
  63. if err != nil {
  64. panic(err)
  65. }
  66. mn.listener = listener
  67. for _, svc := range mn.services {
  68. svc.Start()
  69. }
  70. mn.status = RUNNING
  71. }
  72. func (mn *MNode) Stop() {
  73. mn.mutex.Lock()
  74. defer mn.mutex.Unlock()
  75. if mn.status == SUSPEND {
  76. return
  77. }
  78. mn.status = SUSPEND
  79. for _, svc := range mn.services {
  80. svc.Stop()
  81. }
  82. mn.listener.Close()
  83. mn.listener = nil
  84. }