Code review fixes
parent
b78ac4bf15
commit
29011c5cf2
|
@ -1322,7 +1322,7 @@ type JoinResponse struct {
|
|||
EnableRaft *bool `protobuf:"varint,2,opt" json:"EnableRaft,omitempty"`
|
||||
// The addresses of raft peers to use if joining as a raft member. If not joining
|
||||
// as a raft member, these are the nodes running raft.
|
||||
Peers []string `protobuf:"bytes,3,rep" json:"Peers,omitempty"`
|
||||
RaftNodes []string `protobuf:"bytes,3,rep" json:"RaftNodes,omitempty"`
|
||||
// The node ID assigned to the requesting node.
|
||||
NodeID *uint64 `protobuf:"varint,4,opt" json:"NodeID,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
|
@ -1346,9 +1346,9 @@ func (m *JoinResponse) GetEnableRaft() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (m *JoinResponse) GetPeers() []string {
|
||||
func (m *JoinResponse) GetRaftNodes() []string {
|
||||
if m != nil {
|
||||
return m.Peers
|
||||
return m.RaftNodes
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -98,7 +98,7 @@ message Command {
|
|||
UpdateUserCommand = 15;
|
||||
SetPrivilegeCommand = 16;
|
||||
SetDataCommand = 17;
|
||||
SetAdminPrivilegeCommand = 18;
|
||||
SetAdminPrivilegeCommand = 18;
|
||||
}
|
||||
|
||||
required Type type = 1;
|
||||
|
@ -301,9 +301,10 @@ message JoinResponse {
|
|||
|
||||
// Indicates that this node should take part in the raft cluster.
|
||||
optional bool EnableRaft = 2;
|
||||
|
||||
// The addresses of raft peers to use if joining as a raft member. If not joining
|
||||
// as a raft member, these are the nodes running raft.
|
||||
repeated string Peers = 3;
|
||||
repeated string RaftNodes = 3;
|
||||
|
||||
// The node ID assigned to the requesting node.
|
||||
optional uint64 NodeID = 4;
|
||||
|
|
16
meta/rpc.go
16
meta/rpc.go
|
@ -15,7 +15,10 @@ import (
|
|||
)
|
||||
|
||||
// Max size of a message before we treat the size as invalid
|
||||
const MaxMessageSize = 1024 * 1024 * 1024
|
||||
const (
|
||||
MaxMessageSize = 1024 * 1024 * 1024
|
||||
leaderDialTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// RPC handles request/response style messaging between cluster nodes
|
||||
type RPC struct {
|
||||
|
@ -36,7 +39,7 @@ type RPC struct {
|
|||
|
||||
type JoinResult struct {
|
||||
RaftEnabled bool
|
||||
Peers []string
|
||||
RaftNodes []string
|
||||
NodeID uint64
|
||||
}
|
||||
|
||||
|
@ -51,7 +54,7 @@ func (r *RPC) proxyLeader(conn *net.TCPConn) {
|
|||
return
|
||||
}
|
||||
|
||||
leaderConn, err := net.DialTimeout("tcp", r.store.Leader(), 10*time.Second)
|
||||
leaderConn, err := net.DialTimeout("tcp", r.store.Leader(), leaderDialTimeout)
|
||||
if err != nil {
|
||||
r.sendError(conn, fmt.Sprintf("dial leader: %v", err))
|
||||
return
|
||||
|
@ -250,7 +253,7 @@ func (r *RPC) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon
|
|||
},
|
||||
//EnableRaft: proto.Bool(contains(r.store.Peers(), *req.Addr)),
|
||||
EnableRaft: proto.Bool(false),
|
||||
Peers: r.store.Peers(),
|
||||
RaftNodes: r.store.Peers(),
|
||||
NodeID: proto.Uint64(nodeID),
|
||||
}, err
|
||||
|
||||
|
@ -325,7 +328,7 @@ func (r *RPC) join(localAddr, remoteAddr string) (*JoinResult, error) {
|
|||
case *internal.JoinResponse:
|
||||
return &JoinResult{
|
||||
RaftEnabled: t.GetEnableRaft(),
|
||||
Peers: t.GetPeers(),
|
||||
RaftNodes: t.GetRaftNodes(),
|
||||
NodeID: t.GetNodeID(),
|
||||
}, nil
|
||||
case *internal.ErrorResponse:
|
||||
|
@ -338,7 +341,6 @@ func (r *RPC) join(localAddr, remoteAddr string) (*JoinResult, error) {
|
|||
// call sends an encoded request to the remote leader and returns
|
||||
// an encoded response value.
|
||||
func (r *RPC) call(dest string, req proto.Message) (proto.Message, error) {
|
||||
|
||||
// Determine type of request
|
||||
var rpcType internal.RPCType
|
||||
switch t := req.(type) {
|
||||
|
@ -351,7 +353,7 @@ func (r *RPC) call(dest string, req proto.Message) (proto.Message, error) {
|
|||
}
|
||||
|
||||
// Create a connection to the leader.
|
||||
conn, err := net.DialTimeout("tcp", dest, 10*time.Second)
|
||||
conn, err := net.DialTimeout("tcp", dest, leaderDialTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -161,12 +161,12 @@ func TestRPCJoin(t *testing.T) {
|
|||
t.Fatalf("raft enabled mismatch: got %v, exp %v", res.RaftEnabled, exp)
|
||||
}
|
||||
|
||||
if exp := 1; len(res.Peers) != exp {
|
||||
t.Fatalf("raft peer mismatch: got %v, exp %v", len(res.Peers), exp)
|
||||
if exp := 1; len(res.RaftNodes) != exp {
|
||||
t.Fatalf("raft peer mismatch: got %v, exp %v", len(res.RaftNodes), exp)
|
||||
}
|
||||
|
||||
if exp := "1.2.3.4:1234"; res.Peers[0] != exp {
|
||||
t.Fatalf("raft peer mismatch: got %v, exp %v", res.Peers[0], exp)
|
||||
if exp := "1.2.3.4:1234"; res.RaftNodes[0] != exp {
|
||||
t.Fatalf("raft peer mismatch: got %v, exp %v", res.RaftNodes[0], exp)
|
||||
}
|
||||
|
||||
if exp := uint64(100); res.NodeID != exp {
|
||||
|
|
|
@ -15,7 +15,7 @@ import (
|
|||
|
||||
// raftState abstracts the interaction of the raft consensus layer
|
||||
// across local or remote nodes. It is a form of the state design pattern and allows
|
||||
// the meta.Store to change how its behavior with the raft layer at runtime.
|
||||
// the meta.Store to change its behavior with the raft layer at runtime.
|
||||
type raftState interface {
|
||||
openRaft() error
|
||||
initialize() error
|
||||
|
@ -31,7 +31,7 @@ type raftState interface {
|
|||
snapshot() error
|
||||
}
|
||||
|
||||
// localRaft is a consensus strategy that uses a local raft implementation fo
|
||||
// localRaft is a consensus strategy that uses a local raft implementation for
|
||||
// consensus operations.
|
||||
type localRaft struct {
|
||||
store *Store
|
||||
|
|
|
@ -184,9 +184,9 @@ func (s *Store) Open() error {
|
|||
joined = true
|
||||
|
||||
s.Logger.Printf("joined remote node %v", join)
|
||||
s.Logger.Printf("raftEnabled=%v peers=%v", res.RaftEnabled, res.Peers)
|
||||
s.Logger.Printf("raftEnabled=%v raftNodes=%v", res.RaftEnabled, res.RaftNodes)
|
||||
|
||||
s.peers = res.Peers
|
||||
s.peers = res.RaftNodes
|
||||
s.id = res.NodeID
|
||||
|
||||
if err := s.writeNodeID(res.NodeID); err != nil {
|
||||
|
@ -573,9 +573,8 @@ func (s *Store) handleExecConn(conn net.Conn) {
|
|||
// serveRPCListener processes remote exec connections.
|
||||
// This function runs in a separate goroutine.
|
||||
func (s *Store) serveRPCListener() {
|
||||
<-s.ready
|
||||
|
||||
defer s.wg.Done()
|
||||
<-s.ready
|
||||
|
||||
for {
|
||||
// Accept next TCP connection.
|
||||
|
|
Loading…
Reference in New Issue