diff --git a/meta/state.go b/meta/state.go index 0cc269a234..8f8c72ad55 100644 --- a/meta/state.go +++ b/meta/state.go @@ -28,7 +28,7 @@ type raftState interface { sync(index uint64, timeout time.Duration) error setPeers(addrs []string) error addPeer(addr string) error - removePeer(addr string, cleanup bool) error + removePeer(addr string) error peers() ([]string, error) invalidate() error close() error @@ -318,17 +318,13 @@ func (r *localRaft) addPeer(addr string) error { } // removePeer removes addr from the list of peers in the cluster. -func (r *localRaft) removePeer(addr string, cleanup bool) error { +func (r *localRaft) removePeer(addr string) error { // Only do this on the leader if r.isLeader() { if fut := r.raft.RemovePeer(addr); fut.Error() != nil { return fut.Error() } } - // clean up the directories if this is the node removed - if cleanup { - return r.remove() - } return nil } @@ -414,7 +410,7 @@ func (r *remoteRaft) addPeer(addr string) error { } // removePeer does nothing for remoteRaft. -func (r *remoteRaft) removePeer(addr string, cleanup bool) error { +func (r *remoteRaft) removePeer(addr string) error { return nil } diff --git a/meta/statement_executor.go b/meta/statement_executor.go index 78631c18f0..2e18418ee0 100644 --- a/meta/statement_executor.go +++ b/meta/statement_executor.go @@ -15,6 +15,7 @@ type StatementExecutor struct { Store interface { Nodes() ([]NodeInfo, error) Peers() ([]string, error) + Leader() string DeleteNode(nodeID uint64, force bool) error Database(name string) (*DatabaseInfo, error) @@ -145,9 +146,11 @@ func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersS return &influxql.Result{Err: err} } - row := &models.Row{Columns: []string{"id", "cluster_addr", "raft"}} + leader := e.Store.Leader() + + row := &models.Row{Columns: []string{"id", "cluster_addr", "raft", "raft-leader"}} for _, ni := range nis { - row.Values = append(row.Values, []interface{}{ni.ID, ni.Host, contains(peers, ni.Host)}) + row.Values = append(row.Values, []interface{}{ni.ID, ni.Host, contains(peers, ni.Host), leader == ni.Host}) } return &influxql.Result{Series: []*models.Row{row}} } diff --git a/meta/statement_executor_test.go b/meta/statement_executor_test.go index 10f34488f8..c2441cadd4 100644 --- a/meta/statement_executor_test.go +++ b/meta/statement_executor_test.go @@ -125,15 +125,18 @@ func TestStatementExecutor_ExecuteStatement_ShowServers(t *testing.T) { e.Store.PeersFn = func() ([]string, error) { return []string{"node0"}, nil } + e.Store.LeaderFn = func() string { + return "node0" + } if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SERVERS`)); res.Err != nil { t.Fatal(res.Err) } else if !reflect.DeepEqual(res.Series, models.Rows{ { - Columns: []string{"id", "cluster_addr", "raft"}, + Columns: []string{"id", "cluster_addr", "raft", "raft-leader"}, Values: [][]interface{}{ - {uint64(1), "node0", true}, - {uint64(2), "node1", false}, + {uint64(1), "node0", true, true}, + {uint64(2), "node1", false, false}, }, }, }) { @@ -834,6 +837,7 @@ func NewStatementExecutor() *StatementExecutor { type StatementExecutorStore struct { NodesFn func() ([]meta.NodeInfo, error) PeersFn func() ([]string, error) + LeaderFn func() string DatabaseFn func(name string) (*meta.DatabaseInfo, error) DatabasesFn func() ([]meta.DatabaseInfo, error) CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error) @@ -865,6 +869,13 @@ func (s *StatementExecutorStore) Peers() ([]string, error) { return s.PeersFn() } +func (s *StatementExecutorStore) Leader() string { + if s.LeaderFn != nil { + return s.LeaderFn() + } + return "" +} + func (s *StatementExecutorStore) DeleteNode(nodeID uint64, force bool) error { return s.DeleteNodeFn(nodeID, force) } diff --git a/meta/store.go b/meta/store.go index e666582d8b..81486fc3dc 100644 --- a/meta/store.go +++ b/meta/store.go @@ -824,6 +824,11 @@ func (s *Store) UpdateNode(id uint64, host string) (*NodeInfo, error) { // DeleteNode removes a node from the metastore by id. func (s *Store) DeleteNode(id uint64, force bool) error { + ni := s.data.Node(id) + if ni == nil { + return ErrNodeNotFound + } + err := s.exec(internal.Command_DeleteNodeCommand, internal.E_DeleteNodeCommand_Command, &internal.DeleteNodeCommand{ ID: proto.Uint64(id), @@ -838,7 +843,7 @@ func (s *Store) DeleteNode(id uint64, force bool) error { return s.exec(internal.Command_RemovePeerCommand, internal.E_RemovePeerCommand_Command, &internal.RemovePeerCommand{ ID: proto.Uint64(id), - Addr: proto.String(s.RemoteAddr.String()), + Addr: proto.String(ni.Host), }, ) } @@ -1686,7 +1691,7 @@ func (fsm *storeFSM) applyRemovePeerCommand(cmd *internal.Command) interface{} { addr := v.GetAddr() //Remove that node from the peer fsm.Logger.Printf("removing peer for node id %d, %s", id, addr) - if err := fsm.raftState.removePeer(addr, id == fsm.id); err != nil { + if err := fsm.raftState.removePeer(addr); err != nil { fsm.Logger.Printf("error removing peer: %s", err) } return nil