fix remove peer, add leader to show servers
parent
3071eec2ca
commit
c108c6eb96
|
@ -28,7 +28,7 @@ type raftState interface {
|
|||
sync(index uint64, timeout time.Duration) error
|
||||
setPeers(addrs []string) error
|
||||
addPeer(addr string) error
|
||||
removePeer(addr string, cleanup bool) error
|
||||
removePeer(addr string) error
|
||||
peers() ([]string, error)
|
||||
invalidate() error
|
||||
close() error
|
||||
|
@ -318,17 +318,13 @@ func (r *localRaft) addPeer(addr string) error {
|
|||
}
|
||||
|
||||
// removePeer removes addr from the list of peers in the cluster.
|
||||
func (r *localRaft) removePeer(addr string, cleanup bool) error {
|
||||
func (r *localRaft) removePeer(addr string) error {
|
||||
// Only do this on the leader
|
||||
if r.isLeader() {
|
||||
if fut := r.raft.RemovePeer(addr); fut.Error() != nil {
|
||||
return fut.Error()
|
||||
}
|
||||
}
|
||||
// clean up the directories if this is the node removed
|
||||
if cleanup {
|
||||
return r.remove()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -414,7 +410,7 @@ func (r *remoteRaft) addPeer(addr string) error {
|
|||
}
|
||||
|
||||
// removePeer does nothing for remoteRaft.
|
||||
func (r *remoteRaft) removePeer(addr string, cleanup bool) error {
|
||||
func (r *remoteRaft) removePeer(addr string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ type StatementExecutor struct {
|
|||
Store interface {
|
||||
Nodes() ([]NodeInfo, error)
|
||||
Peers() ([]string, error)
|
||||
Leader() string
|
||||
|
||||
DeleteNode(nodeID uint64, force bool) error
|
||||
Database(name string) (*DatabaseInfo, error)
|
||||
|
@ -145,9 +146,11 @@ func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersS
|
|||
return &influxql.Result{Err: err}
|
||||
}
|
||||
|
||||
row := &models.Row{Columns: []string{"id", "cluster_addr", "raft"}}
|
||||
leader := e.Store.Leader()
|
||||
|
||||
row := &models.Row{Columns: []string{"id", "cluster_addr", "raft", "raft-leader"}}
|
||||
for _, ni := range nis {
|
||||
row.Values = append(row.Values, []interface{}{ni.ID, ni.Host, contains(peers, ni.Host)})
|
||||
row.Values = append(row.Values, []interface{}{ni.ID, ni.Host, contains(peers, ni.Host), leader == ni.Host})
|
||||
}
|
||||
return &influxql.Result{Series: []*models.Row{row}}
|
||||
}
|
||||
|
|
|
@ -125,15 +125,18 @@ func TestStatementExecutor_ExecuteStatement_ShowServers(t *testing.T) {
|
|||
e.Store.PeersFn = func() ([]string, error) {
|
||||
return []string{"node0"}, nil
|
||||
}
|
||||
e.Store.LeaderFn = func() string {
|
||||
return "node0"
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SERVERS`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if !reflect.DeepEqual(res.Series, models.Rows{
|
||||
{
|
||||
Columns: []string{"id", "cluster_addr", "raft"},
|
||||
Columns: []string{"id", "cluster_addr", "raft", "raft-leader"},
|
||||
Values: [][]interface{}{
|
||||
{uint64(1), "node0", true},
|
||||
{uint64(2), "node1", false},
|
||||
{uint64(1), "node0", true, true},
|
||||
{uint64(2), "node1", false, false},
|
||||
},
|
||||
},
|
||||
}) {
|
||||
|
@ -834,6 +837,7 @@ func NewStatementExecutor() *StatementExecutor {
|
|||
type StatementExecutorStore struct {
|
||||
NodesFn func() ([]meta.NodeInfo, error)
|
||||
PeersFn func() ([]string, error)
|
||||
LeaderFn func() string
|
||||
DatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
||||
DatabasesFn func() ([]meta.DatabaseInfo, error)
|
||||
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
||||
|
@ -865,6 +869,13 @@ func (s *StatementExecutorStore) Peers() ([]string, error) {
|
|||
return s.PeersFn()
|
||||
}
|
||||
|
||||
func (s *StatementExecutorStore) Leader() string {
|
||||
if s.LeaderFn != nil {
|
||||
return s.LeaderFn()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (s *StatementExecutorStore) DeleteNode(nodeID uint64, force bool) error {
|
||||
return s.DeleteNodeFn(nodeID, force)
|
||||
}
|
||||
|
|
|
@ -824,6 +824,11 @@ func (s *Store) UpdateNode(id uint64, host string) (*NodeInfo, error) {
|
|||
|
||||
// DeleteNode removes a node from the metastore by id.
|
||||
func (s *Store) DeleteNode(id uint64, force bool) error {
|
||||
ni := s.data.Node(id)
|
||||
if ni == nil {
|
||||
return ErrNodeNotFound
|
||||
}
|
||||
|
||||
err := s.exec(internal.Command_DeleteNodeCommand, internal.E_DeleteNodeCommand_Command,
|
||||
&internal.DeleteNodeCommand{
|
||||
ID: proto.Uint64(id),
|
||||
|
@ -838,7 +843,7 @@ func (s *Store) DeleteNode(id uint64, force bool) error {
|
|||
return s.exec(internal.Command_RemovePeerCommand, internal.E_RemovePeerCommand_Command,
|
||||
&internal.RemovePeerCommand{
|
||||
ID: proto.Uint64(id),
|
||||
Addr: proto.String(s.RemoteAddr.String()),
|
||||
Addr: proto.String(ni.Host),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
@ -1686,7 +1691,7 @@ func (fsm *storeFSM) applyRemovePeerCommand(cmd *internal.Command) interface{} {
|
|||
addr := v.GetAddr()
|
||||
//Remove that node from the peer
|
||||
fsm.Logger.Printf("removing peer for node id %d, %s", id, addr)
|
||||
if err := fsm.raftState.removePeer(addr, id == fsm.id); err != nil {
|
||||
if err := fsm.raftState.removePeer(addr); err != nil {
|
||||
fsm.Logger.Printf("error removing peer: %s", err)
|
||||
}
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue