From 29011c5cf24ef32294cf30ed44350b221cb75613 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 20 Jul 2015 14:59:00 -0600 Subject: [PATCH] Code review fixes --- meta/internal/meta.pb.go | 6 +++--- meta/internal/meta.proto | 5 +++-- meta/rpc.go | 16 +++++++++------- meta/rpc_test.go | 8 ++++---- meta/state.go | 4 ++-- meta/store.go | 7 +++---- 6 files changed, 24 insertions(+), 22 deletions(-) diff --git a/meta/internal/meta.pb.go b/meta/internal/meta.pb.go index 94294ef1b4..efc891dede 100644 --- a/meta/internal/meta.pb.go +++ b/meta/internal/meta.pb.go @@ -1322,7 +1322,7 @@ type JoinResponse struct { EnableRaft *bool `protobuf:"varint,2,opt" json:"EnableRaft,omitempty"` // The addresses of raft peers to use if joining as a raft member. If not joining // as a raft member, these are the nodes running raft. - Peers []string `protobuf:"bytes,3,rep" json:"Peers,omitempty"` + RaftNodes []string `protobuf:"bytes,3,rep" json:"RaftNodes,omitempty"` // The node ID assigned to the requesting node. NodeID *uint64 `protobuf:"varint,4,opt" json:"NodeID,omitempty"` XXX_unrecognized []byte `json:"-"` @@ -1346,9 +1346,9 @@ func (m *JoinResponse) GetEnableRaft() bool { return false } -func (m *JoinResponse) GetPeers() []string { +func (m *JoinResponse) GetRaftNodes() []string { if m != nil { - return m.Peers + return m.RaftNodes } return nil } diff --git a/meta/internal/meta.proto b/meta/internal/meta.proto index da510914c1..62550d348c 100644 --- a/meta/internal/meta.proto +++ b/meta/internal/meta.proto @@ -98,7 +98,7 @@ message Command { UpdateUserCommand = 15; SetPrivilegeCommand = 16; SetDataCommand = 17; - SetAdminPrivilegeCommand = 18; + SetAdminPrivilegeCommand = 18; } required Type type = 1; @@ -301,9 +301,10 @@ message JoinResponse { // Indicates that this node should take part in the raft cluster. optional bool EnableRaft = 2; + // The addresses of raft peers to use if joining as a raft member. If not joining // as a raft member, these are the nodes running raft. - repeated string Peers = 3; + repeated string RaftNodes = 3; // The node ID assigned to the requesting node. optional uint64 NodeID = 4; diff --git a/meta/rpc.go b/meta/rpc.go index 506f36eb47..9db03bb813 100644 --- a/meta/rpc.go +++ b/meta/rpc.go @@ -15,7 +15,10 @@ import ( ) // Max size of a message before we treat the size as invalid -const MaxMessageSize = 1024 * 1024 * 1024 +const ( + MaxMessageSize = 1024 * 1024 * 1024 + leaderDialTimeout = 10 * time.Second +) // RPC handles request/response style messaging between cluster nodes type RPC struct { @@ -36,7 +39,7 @@ type RPC struct { type JoinResult struct { RaftEnabled bool - Peers []string + RaftNodes []string NodeID uint64 } @@ -51,7 +54,7 @@ func (r *RPC) proxyLeader(conn *net.TCPConn) { return } - leaderConn, err := net.DialTimeout("tcp", r.store.Leader(), 10*time.Second) + leaderConn, err := net.DialTimeout("tcp", r.store.Leader(), leaderDialTimeout) if err != nil { r.sendError(conn, fmt.Sprintf("dial leader: %v", err)) return @@ -250,7 +253,7 @@ func (r *RPC) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon }, //EnableRaft: proto.Bool(contains(r.store.Peers(), *req.Addr)), EnableRaft: proto.Bool(false), - Peers: r.store.Peers(), + RaftNodes: r.store.Peers(), NodeID: proto.Uint64(nodeID), }, err @@ -325,7 +328,7 @@ func (r *RPC) join(localAddr, remoteAddr string) (*JoinResult, error) { case *internal.JoinResponse: return &JoinResult{ RaftEnabled: t.GetEnableRaft(), - Peers: t.GetPeers(), + RaftNodes: t.GetRaftNodes(), NodeID: t.GetNodeID(), }, nil case *internal.ErrorResponse: @@ -338,7 +341,6 @@ func (r *RPC) join(localAddr, remoteAddr string) (*JoinResult, error) { // call sends an encoded request to the remote leader and returns // an encoded response value. func (r *RPC) call(dest string, req proto.Message) (proto.Message, error) { - // Determine type of request var rpcType internal.RPCType switch t := req.(type) { @@ -351,7 +353,7 @@ func (r *RPC) call(dest string, req proto.Message) (proto.Message, error) { } // Create a connection to the leader. - conn, err := net.DialTimeout("tcp", dest, 10*time.Second) + conn, err := net.DialTimeout("tcp", dest, leaderDialTimeout) if err != nil { return nil, err } diff --git a/meta/rpc_test.go b/meta/rpc_test.go index 7654ef9f6b..d9c258357a 100644 --- a/meta/rpc_test.go +++ b/meta/rpc_test.go @@ -161,12 +161,12 @@ func TestRPCJoin(t *testing.T) { t.Fatalf("raft enabled mismatch: got %v, exp %v", res.RaftEnabled, exp) } - if exp := 1; len(res.Peers) != exp { - t.Fatalf("raft peer mismatch: got %v, exp %v", len(res.Peers), exp) + if exp := 1; len(res.RaftNodes) != exp { + t.Fatalf("raft peer mismatch: got %v, exp %v", len(res.RaftNodes), exp) } - if exp := "1.2.3.4:1234"; res.Peers[0] != exp { - t.Fatalf("raft peer mismatch: got %v, exp %v", res.Peers[0], exp) + if exp := "1.2.3.4:1234"; res.RaftNodes[0] != exp { + t.Fatalf("raft peer mismatch: got %v, exp %v", res.RaftNodes[0], exp) } if exp := uint64(100); res.NodeID != exp { diff --git a/meta/state.go b/meta/state.go index b96422f3d7..164f76702d 100644 --- a/meta/state.go +++ b/meta/state.go @@ -15,7 +15,7 @@ import ( // raftState abstracts the interaction of the raft consensus layer // across local or remote nodes. It is a form of the state design pattern and allows -// the meta.Store to change how its behavior with the raft layer at runtime. +// the meta.Store to change its behavior with the raft layer at runtime. type raftState interface { openRaft() error initialize() error @@ -31,7 +31,7 @@ type raftState interface { snapshot() error } -// localRaft is a consensus strategy that uses a local raft implementation fo +// localRaft is a consensus strategy that uses a local raft implementation for // consensus operations. type localRaft struct { store *Store diff --git a/meta/store.go b/meta/store.go index 079d539102..3a8713b4ca 100644 --- a/meta/store.go +++ b/meta/store.go @@ -184,9 +184,9 @@ func (s *Store) Open() error { joined = true s.Logger.Printf("joined remote node %v", join) - s.Logger.Printf("raftEnabled=%v peers=%v", res.RaftEnabled, res.Peers) + s.Logger.Printf("raftEnabled=%v raftNodes=%v", res.RaftEnabled, res.RaftNodes) - s.peers = res.Peers + s.peers = res.RaftNodes s.id = res.NodeID if err := s.writeNodeID(res.NodeID); err != nil { @@ -573,9 +573,8 @@ func (s *Store) handleExecConn(conn net.Conn) { // serveRPCListener processes remote exec connections. // This function runs in a separate goroutine. func (s *Store) serveRPCListener() { - <-s.ready - defer s.wg.Done() + <-s.ready for { // Accept next TCP connection.