commit
1536cd52ee
|
@ -4,6 +4,7 @@
|
||||||
- [#3376](https://github.com/influxdb/influxdb/pull/3376): Support for remote shard query mapping
|
- [#3376](https://github.com/influxdb/influxdb/pull/3376): Support for remote shard query mapping
|
||||||
- [#3372](https://github.com/influxdb/influxdb/pull/3372): Support joining nodes to existing cluster
|
- [#3372](https://github.com/influxdb/influxdb/pull/3372): Support joining nodes to existing cluster
|
||||||
- [#3426](https://github.com/influxdb/influxdb/pull/3426): Additional logging for continuous queries. Thanks @jhorwit2
|
- [#3426](https://github.com/influxdb/influxdb/pull/3426): Additional logging for continuous queries. Thanks @jhorwit2
|
||||||
|
- [#3478](https://github.com/influxdb/influxdb/pull/3478)): Support incremental cluster joins
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
- [#3405](https://github.com/influxdb/influxdb/pull/3405): Prevent database panic when fields are missing. Thanks @jhorwit2
|
- [#3405](https://github.com/influxdb/influxdb/pull/3405): Prevent database panic when fields are missing. Thanks @jhorwit2
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/BurntSushi/toml"
|
"github.com/BurntSushi/toml"
|
||||||
)
|
)
|
||||||
|
@ -83,7 +84,7 @@ func (cmd *Command) Run(args ...string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.Join != "" {
|
if options.Join != "" {
|
||||||
config.Meta.Join = options.Join
|
config.Meta.Peers = strings.Split(options.Join, ",")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate the configuration.
|
// Validate the configuration.
|
||||||
|
|
|
@ -373,8 +373,14 @@ func (s *Server) Close() error {
|
||||||
// startServerReporting starts periodic server reporting.
|
// startServerReporting starts periodic server reporting.
|
||||||
func (s *Server) startServerReporting() {
|
func (s *Server) startServerReporting() {
|
||||||
for {
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.closing:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
if err := s.MetaStore.WaitForLeader(30 * time.Second); err != nil {
|
if err := s.MetaStore.WaitForLeader(30 * time.Second); err != nil {
|
||||||
log.Printf("no leader available for reporting: %s", err.Error())
|
log.Printf("no leader available for reporting: %s", err.Error())
|
||||||
|
time.Sleep(time.Second)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.reportServer()
|
s.reportServer()
|
||||||
|
|
|
@ -38,9 +38,6 @@ type Config struct {
|
||||||
LeaderLeaseTimeout toml.Duration `toml:"leader-lease-timeout"`
|
LeaderLeaseTimeout toml.Duration `toml:"leader-lease-timeout"`
|
||||||
CommitTimeout toml.Duration `toml:"commit-timeout"`
|
CommitTimeout toml.Duration `toml:"commit-timeout"`
|
||||||
ClusterTracing bool `toml:"cluster-tracing"`
|
ClusterTracing bool `toml:"cluster-tracing"`
|
||||||
|
|
||||||
// The join command-line argument
|
|
||||||
Join string `toml:"-"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConfig() *Config {
|
func NewConfig() *Config {
|
||||||
|
|
49
meta/rpc.go
49
meta/rpc.go
|
@ -11,6 +11,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
|
"github.com/hashicorp/raft"
|
||||||
"github.com/influxdb/influxdb/meta/internal"
|
"github.com/influxdb/influxdb/meta/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -29,7 +30,7 @@ type rpc struct {
|
||||||
cachedData() *Data
|
cachedData() *Data
|
||||||
IsLeader() bool
|
IsLeader() bool
|
||||||
Leader() string
|
Leader() string
|
||||||
Peers() []string
|
Peers() ([]string, error)
|
||||||
AddPeer(host string) error
|
AddPeer(host string) error
|
||||||
CreateNode(host string) (*NodeInfo, error)
|
CreateNode(host string) (*NodeInfo, error)
|
||||||
NodeByHost(host string) (*NodeInfo, error)
|
NodeByHost(host string) (*NodeInfo, error)
|
||||||
|
@ -215,26 +216,33 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon
|
||||||
r.traceCluster("join request from: %v", *req.Addr)
|
r.traceCluster("join request from: %v", *req.Addr)
|
||||||
|
|
||||||
node, err := func() (*NodeInfo, error) {
|
node, err := func() (*NodeInfo, error) {
|
||||||
|
|
||||||
// attempt to create the node
|
// attempt to create the node
|
||||||
node, err := r.store.CreateNode(*req.Addr)
|
node, err := r.store.CreateNode(*req.Addr)
|
||||||
// if it exists, return the existing node
|
// if it exists, return the existing node
|
||||||
if err == ErrNodeExists {
|
if err == ErrNodeExists {
|
||||||
return r.store.NodeByHost(*req.Addr)
|
node, err = r.store.NodeByHost(*req.Addr)
|
||||||
|
if err != nil {
|
||||||
|
return node, err
|
||||||
|
}
|
||||||
|
r.logger.Printf("existing node re-joined: id=%v addr=%v", node.ID, node.Host)
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return nil, fmt.Errorf("create node: %v", err)
|
return nil, fmt.Errorf("create node: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: jwilder: adding raft nodes is tricky since going
|
peers, err := r.store.Peers()
|
||||||
// from 1 node (leader) to two kills the cluster because
|
if err != nil {
|
||||||
// quorum is lost after adding the second node. For now,
|
return nil, fmt.Errorf("list peers: %v", err)
|
||||||
// can only add non-raft enabled nodes
|
}
|
||||||
|
|
||||||
// If we have less than 3 nodes, add them as raft peers
|
// If we have less than 3 nodes, add them as raft peers if they are not
|
||||||
// if len(r.store.Peers()) < MaxRaftNodes {
|
// already a peer
|
||||||
// if err = r.store.AddPeer(*req.Addr); err != nil {
|
if len(peers) < MaxRaftNodes && !raft.PeerContained(peers, *req.Addr) {
|
||||||
// return node, fmt.Errorf("add peer: %v", err)
|
r.logger.Printf("adding new raft peer: nodeId=%v addr=%v", node.ID, *req.Addr)
|
||||||
// }
|
if err = r.store.AddPeer(*req.Addr); err != nil {
|
||||||
// }
|
return node, fmt.Errorf("add peer: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
return node, err
|
return node, err
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -247,13 +255,18 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get the current raft peers
|
||||||
|
peers, err := r.store.Peers()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("list peers: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
return &internal.JoinResponse{
|
return &internal.JoinResponse{
|
||||||
Header: &internal.ResponseHeader{
|
Header: &internal.ResponseHeader{
|
||||||
OK: proto.Bool(true),
|
OK: proto.Bool(true),
|
||||||
},
|
},
|
||||||
//EnableRaft: proto.Bool(contains(r.store.Peers(), *req.Addr)),
|
EnableRaft: proto.Bool(raft.PeerContained(peers, *req.Addr)),
|
||||||
EnableRaft: proto.Bool(false),
|
RaftNodes: peers,
|
||||||
RaftNodes: r.store.Peers(),
|
|
||||||
NodeID: proto.Uint64(nodeID),
|
NodeID: proto.Uint64(nodeID),
|
||||||
}, err
|
}, err
|
||||||
|
|
||||||
|
@ -355,7 +368,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {
|
||||||
// Create a connection to the leader.
|
// Create a connection to the leader.
|
||||||
conn, err := net.DialTimeout("tcp", dest, leaderDialTimeout)
|
conn, err := net.DialTimeout("tcp", dest, leaderDialTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("rpc dial: %v", err)
|
||||||
}
|
}
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
|
@ -382,11 +395,13 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {
|
||||||
|
|
||||||
// Should always have a size and type
|
// Should always have a size and type
|
||||||
if exp := 16; len(data) < exp {
|
if exp := 16; len(data) < exp {
|
||||||
|
r.traceCluster("recv: %v", string(data))
|
||||||
return nil, fmt.Errorf("rpc %v failed: short read: got %v, exp %v", rpcType, len(data), exp)
|
return nil, fmt.Errorf("rpc %v failed: short read: got %v, exp %v", rpcType, len(data), exp)
|
||||||
}
|
}
|
||||||
|
|
||||||
sz := btou64(data[0:8])
|
sz := btou64(data[0:8])
|
||||||
if len(data[8:]) != int(sz) {
|
if len(data[8:]) != int(sz) {
|
||||||
|
r.traceCluster("recv: %v", string(data))
|
||||||
return nil, fmt.Errorf("rpc %v failed: short read: got %v, exp %v", rpcType, len(data[8:]), sz)
|
return nil, fmt.Errorf("rpc %v failed: short read: got %v, exp %v", rpcType, len(data[8:]), sz)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -421,7 +436,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) {
|
||||||
|
|
||||||
func (r *rpc) traceCluster(msg string, args ...interface{}) {
|
func (r *rpc) traceCluster(msg string, args ...interface{}) {
|
||||||
if r.tracingEnabled {
|
if r.tracingEnabled {
|
||||||
r.logger.Printf("rpc error: "+msg, args...)
|
r.logger.Printf("rpc: "+msg, args...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -159,7 +159,7 @@ func TestRPCJoin(t *testing.T) {
|
||||||
t.Fatalf("failed to join: %v", err)
|
t.Fatalf("failed to join: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if exp := false; res.RaftEnabled != false {
|
if exp := true; res.RaftEnabled != true {
|
||||||
t.Fatalf("raft enabled mismatch: got %v, exp %v", res.RaftEnabled, exp)
|
t.Fatalf("raft enabled mismatch: got %v, exp %v", res.RaftEnabled, exp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,7 +230,7 @@ func (f *fakeStore) cachedData() *Data {
|
||||||
|
|
||||||
func (f *fakeStore) IsLeader() bool { return true }
|
func (f *fakeStore) IsLeader() bool { return true }
|
||||||
func (f *fakeStore) Leader() string { return f.leader }
|
func (f *fakeStore) Leader() string { return f.leader }
|
||||||
func (f *fakeStore) Peers() []string { return []string{f.leader} }
|
func (f *fakeStore) Peers() ([]string, error) { return []string{f.leader}, nil }
|
||||||
func (f *fakeStore) AddPeer(host string) error { return nil }
|
func (f *fakeStore) AddPeer(host string) error { return nil }
|
||||||
func (f *fakeStore) CreateNode(host string) (*NodeInfo, error) {
|
func (f *fakeStore) CreateNode(host string) (*NodeInfo, error) {
|
||||||
return &NodeInfo{ID: f.newNodeID, Host: host}, nil
|
return &NodeInfo{ID: f.newNodeID, Host: host}, nil
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
|
@ -17,13 +20,15 @@ import (
|
||||||
// across local or remote nodes. It is a form of the state design pattern and allows
|
// across local or remote nodes. It is a form of the state design pattern and allows
|
||||||
// the meta.Store to change its behavior with the raft layer at runtime.
|
// the meta.Store to change its behavior with the raft layer at runtime.
|
||||||
type raftState interface {
|
type raftState interface {
|
||||||
openRaft() error
|
open() error
|
||||||
|
remove() error
|
||||||
initialize() error
|
initialize() error
|
||||||
leader() string
|
leader() string
|
||||||
isLeader() bool
|
isLeader() bool
|
||||||
sync(index uint64, timeout time.Duration) error
|
sync(index uint64, timeout time.Duration) error
|
||||||
setPeers(addrs []string) error
|
setPeers(addrs []string) error
|
||||||
addPeer(addr string) error
|
addPeer(addr string) error
|
||||||
|
peers() ([]string, error)
|
||||||
invalidate() error
|
invalidate() error
|
||||||
close() error
|
close() error
|
||||||
lastIndex() uint64
|
lastIndex() uint64
|
||||||
|
@ -42,6 +47,19 @@ type localRaft struct {
|
||||||
raftLayer *raftLayer
|
raftLayer *raftLayer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *localRaft) remove() error {
|
||||||
|
if err := os.RemoveAll(filepath.Join(r.store.path, "raft.db")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := os.RemoveAll(filepath.Join(r.store.path, "peers.json")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := os.RemoveAll(filepath.Join(r.store.path, "snapshots")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *localRaft) updateMetaData(ms *Data) {
|
func (r *localRaft) updateMetaData(ms *Data) {
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
return
|
return
|
||||||
|
@ -76,7 +94,7 @@ func (r *localRaft) invalidate() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *localRaft) openRaft() error {
|
func (r *localRaft) open() error {
|
||||||
s := r.store
|
s := r.store
|
||||||
// Setup raft configuration.
|
// Setup raft configuration.
|
||||||
config := raft.DefaultConfig()
|
config := raft.DefaultConfig()
|
||||||
|
@ -89,11 +107,6 @@ func (r *localRaft) openRaft() error {
|
||||||
// If no peers are set in the config then start as a single server.
|
// If no peers are set in the config then start as a single server.
|
||||||
config.EnableSingleNode = (len(s.peers) == 0)
|
config.EnableSingleNode = (len(s.peers) == 0)
|
||||||
|
|
||||||
// Ensure our addr is in the peer list
|
|
||||||
if config.EnableSingleNode {
|
|
||||||
s.peers = append(s.peers, s.Addr.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build raft layer to multiplex listener.
|
// Build raft layer to multiplex listener.
|
||||||
r.raftLayer = newRaftLayer(s.RaftListener, s.Addr)
|
r.raftLayer = newRaftLayer(s.RaftListener, s.Addr)
|
||||||
|
|
||||||
|
@ -246,6 +259,10 @@ func (r *localRaft) setPeers(addrs []string) error {
|
||||||
return r.raft.SetPeers(a).Error()
|
return r.raft.SetPeers(a).Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *localRaft) peers() ([]string, error) {
|
||||||
|
return r.peerStore.Peers()
|
||||||
|
}
|
||||||
|
|
||||||
func (r *localRaft) leader() string {
|
func (r *localRaft) leader() string {
|
||||||
if r.raft == nil {
|
if r.raft == nil {
|
||||||
return ""
|
return ""
|
||||||
|
@ -269,6 +286,10 @@ type remoteRaft struct {
|
||||||
store *Store
|
store *Store
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *remoteRaft) remove() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *remoteRaft) updateMetaData(ms *Data) {
|
func (r *remoteRaft) updateMetaData(ms *Data) {
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
return
|
return
|
||||||
|
@ -300,7 +321,15 @@ func (r *remoteRaft) invalidate() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *remoteRaft) setPeers(addrs []string) error {
|
func (r *remoteRaft) setPeers(addrs []string) error {
|
||||||
return nil
|
// Convert to JSON
|
||||||
|
var buf bytes.Buffer
|
||||||
|
enc := json.NewEncoder(&buf)
|
||||||
|
if err := enc.Encode(addrs); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write out as JSON
|
||||||
|
return ioutil.WriteFile(filepath.Join(r.store.path, "peers.json"), buf.Bytes(), 0755)
|
||||||
}
|
}
|
||||||
|
|
||||||
// addPeer adds addr to the list of peers in the cluster.
|
// addPeer adds addr to the list of peers in the cluster.
|
||||||
|
@ -308,7 +337,15 @@ func (r *remoteRaft) addPeer(addr string) error {
|
||||||
return fmt.Errorf("cannot add peer using remote raft")
|
return fmt.Errorf("cannot add peer using remote raft")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *remoteRaft) openRaft() error {
|
func (r *remoteRaft) peers() ([]string, error) {
|
||||||
|
return readPeersJSON(filepath.Join(r.store.path, "peers.json"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *remoteRaft) open() error {
|
||||||
|
if err := r.setPeers(r.store.peers); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -366,3 +403,25 @@ func (r *remoteRaft) sync(index uint64, timeout time.Duration) error {
|
||||||
func (r *remoteRaft) snapshot() error {
|
func (r *remoteRaft) snapshot() error {
|
||||||
return fmt.Errorf("cannot snapshot while in remote raft state")
|
return fmt.Errorf("cannot snapshot while in remote raft state")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func readPeersJSON(path string) ([]string, error) {
|
||||||
|
// Read the file
|
||||||
|
buf, err := ioutil.ReadFile(path)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for no peers
|
||||||
|
if len(buf) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode the peers
|
||||||
|
var peers []string
|
||||||
|
dec := json.NewDecoder(bytes.NewReader(buf))
|
||||||
|
if err := dec.Decode(&peers); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return peers, nil
|
||||||
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
type StatementExecutor struct {
|
type StatementExecutor struct {
|
||||||
Store interface {
|
Store interface {
|
||||||
Nodes() ([]NodeInfo, error)
|
Nodes() ([]NodeInfo, error)
|
||||||
|
Peers() ([]string, error)
|
||||||
|
|
||||||
Database(name string) (*DatabaseInfo, error)
|
Database(name string) (*DatabaseInfo, error)
|
||||||
Databases() ([]DatabaseInfo, error)
|
Databases() ([]DatabaseInfo, error)
|
||||||
|
@ -127,9 +128,14 @@ func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersS
|
||||||
return &influxql.Result{Err: err}
|
return &influxql.Result{Err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
row := &influxql.Row{Columns: []string{"id", "url"}}
|
peers, err := e.Store.Peers()
|
||||||
|
if err != nil {
|
||||||
|
return &influxql.Result{Err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
row := &influxql.Row{Columns: []string{"id", "url", "raft"}}
|
||||||
for _, ni := range nis {
|
for _, ni := range nis {
|
||||||
row.Values = append(row.Values, []interface{}{ni.ID, "http://" + ni.Host})
|
row.Values = append(row.Values, []interface{}{ni.ID, "http://" + ni.Host, contains(peers, ni.Host)})
|
||||||
}
|
}
|
||||||
return &influxql.Result{Series: []*influxql.Row{row}}
|
return &influxql.Result{Series: []*influxql.Row{row}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,15 +121,18 @@ func TestStatementExecutor_ExecuteStatement_ShowServers(t *testing.T) {
|
||||||
{ID: 2, Host: "node1"},
|
{ID: 2, Host: "node1"},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
e.Store.PeersFn = func() ([]string, error) {
|
||||||
|
return []string{"node0"}, nil
|
||||||
|
}
|
||||||
|
|
||||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SERVERS`)); res.Err != nil {
|
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SERVERS`)); res.Err != nil {
|
||||||
t.Fatal(res.Err)
|
t.Fatal(res.Err)
|
||||||
} else if !reflect.DeepEqual(res.Series, influxql.Rows{
|
} else if !reflect.DeepEqual(res.Series, influxql.Rows{
|
||||||
{
|
{
|
||||||
Columns: []string{"id", "url"},
|
Columns: []string{"id", "url", "raft"},
|
||||||
Values: [][]interface{}{
|
Values: [][]interface{}{
|
||||||
{uint64(1), "http://node0"},
|
{uint64(1), "http://node0", true},
|
||||||
{uint64(2), "http://node1"},
|
{uint64(2), "http://node1", false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}) {
|
}) {
|
||||||
|
@ -778,6 +781,7 @@ func NewStatementExecutor() *StatementExecutor {
|
||||||
// StatementExecutorStore represents a mock implementation of StatementExecutor.Store.
|
// StatementExecutorStore represents a mock implementation of StatementExecutor.Store.
|
||||||
type StatementExecutorStore struct {
|
type StatementExecutorStore struct {
|
||||||
NodesFn func() ([]meta.NodeInfo, error)
|
NodesFn func() ([]meta.NodeInfo, error)
|
||||||
|
PeersFn func() ([]string, error)
|
||||||
DatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
DatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
||||||
DatabasesFn func() ([]meta.DatabaseInfo, error)
|
DatabasesFn func() ([]meta.DatabaseInfo, error)
|
||||||
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
||||||
|
@ -804,6 +808,10 @@ func (s *StatementExecutorStore) Nodes() ([]meta.NodeInfo, error) {
|
||||||
return s.NodesFn()
|
return s.NodesFn()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *StatementExecutorStore) Peers() ([]string, error) {
|
||||||
|
return s.PeersFn()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *StatementExecutorStore) Database(name string) (*meta.DatabaseInfo, error) {
|
func (s *StatementExecutorStore) Database(name string) (*meta.DatabaseInfo, error) {
|
||||||
return s.DatabaseFn(name)
|
return s.DatabaseFn(name)
|
||||||
}
|
}
|
||||||
|
|
183
meta/store.go
183
meta/store.go
|
@ -66,7 +66,6 @@ type Store struct {
|
||||||
|
|
||||||
// All peers in cluster. Used during bootstrapping.
|
// All peers in cluster. Used during bootstrapping.
|
||||||
peers []string
|
peers []string
|
||||||
join string
|
|
||||||
|
|
||||||
data *Data
|
data *Data
|
||||||
|
|
||||||
|
@ -131,7 +130,6 @@ func NewStore(c *Config) *Store {
|
||||||
s := &Store{
|
s := &Store{
|
||||||
path: c.Dir,
|
path: c.Dir,
|
||||||
peers: c.Peers,
|
peers: c.Peers,
|
||||||
join: c.Join,
|
|
||||||
data: &Data{},
|
data: &Data{},
|
||||||
|
|
||||||
ready: make(chan struct{}),
|
ready: make(chan struct{}),
|
||||||
|
@ -171,41 +169,6 @@ func (s *Store) IDPath() string { return filepath.Join(s.path, "id") }
|
||||||
|
|
||||||
// Open opens and initializes the raft store.
|
// Open opens and initializes the raft store.
|
||||||
func (s *Store) Open() error {
|
func (s *Store) Open() error {
|
||||||
// If we have a join addr, attempt to join
|
|
||||||
if s.join != "" {
|
|
||||||
|
|
||||||
joined := false
|
|
||||||
for _, join := range strings.Split(s.join, ",") {
|
|
||||||
res, err := s.rpc.join(s.Addr.String(), join)
|
|
||||||
if err != nil {
|
|
||||||
s.Logger.Printf("join failed: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
joined = true
|
|
||||||
|
|
||||||
s.Logger.Printf("joined remote node %v", join)
|
|
||||||
s.Logger.Printf("raftEnabled=%v raftNodes=%v", res.RaftEnabled, res.RaftNodes)
|
|
||||||
|
|
||||||
s.peers = res.RaftNodes
|
|
||||||
s.id = res.NodeID
|
|
||||||
|
|
||||||
if err := s.writeNodeID(res.NodeID); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !res.RaftEnabled {
|
|
||||||
s.raftState = &remoteRaft{s}
|
|
||||||
if err := s.invalidate(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !joined {
|
|
||||||
return fmt.Errorf("failed to join existing cluster at %v", s.join)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify that no more than 3 peers.
|
// Verify that no more than 3 peers.
|
||||||
// https://github.com/influxdb/influxdb/issues/2750
|
// https://github.com/influxdb/influxdb/issues/2750
|
||||||
if len(s.peers) > MaxRaftNodes {
|
if len(s.peers) > MaxRaftNodes {
|
||||||
|
@ -231,6 +194,11 @@ func (s *Store) Open() error {
|
||||||
}
|
}
|
||||||
s.opened = true
|
s.opened = true
|
||||||
|
|
||||||
|
// load our raft state
|
||||||
|
if err := s.loadState(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Create the root directory if it doesn't already exist.
|
// Create the root directory if it doesn't already exist.
|
||||||
if err := s.createRootDir(); err != nil {
|
if err := s.createRootDir(); err != nil {
|
||||||
return fmt.Errorf("mkdir all: %s", err)
|
return fmt.Errorf("mkdir all: %s", err)
|
||||||
|
@ -264,6 +232,11 @@ func (s *Store) Open() error {
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.serveRPCListener()
|
go s.serveRPCListener()
|
||||||
|
|
||||||
|
// Join an existing cluster if we needed
|
||||||
|
if err := s.joinCluster(); err != nil {
|
||||||
|
return fmt.Errorf("join: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// If the ID doesn't exist then create a new node.
|
// If the ID doesn't exist then create a new node.
|
||||||
if s.id == 0 {
|
if s.id == 0 {
|
||||||
go s.init()
|
go s.init()
|
||||||
|
@ -274,9 +247,125 @@ func (s *Store) Open() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// loadState sets the appropriate raftState from our persistent storage
|
||||||
|
func (s *Store) loadState() error {
|
||||||
|
peers, err := readPeersJSON(filepath.Join(s.path, "peers.json"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we have existing peers, use those. This will override what's in the
|
||||||
|
// config.
|
||||||
|
if len(peers) > 0 {
|
||||||
|
s.peers = peers
|
||||||
|
}
|
||||||
|
|
||||||
|
// if no peers on disk, we need to start raft in order to initialize a new
|
||||||
|
// cluster or join an existing one.
|
||||||
|
if len(peers) == 0 {
|
||||||
|
s.raftState = &localRaft{store: s}
|
||||||
|
// if we have a raft database, (maybe restored), we should start raft locally
|
||||||
|
} else if _, err := os.Stat(filepath.Join(s.path, "raft.db")); err == nil {
|
||||||
|
s.raftState = &localRaft{store: s}
|
||||||
|
// otherwise, we should use remote raft
|
||||||
|
} else {
|
||||||
|
s.raftState = &remoteRaft{store: s}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) joinCluster() error {
|
||||||
|
|
||||||
|
// No join options, so nothing to do
|
||||||
|
if len(s.peers) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// We already have a node ID so were already part of a cluster,
|
||||||
|
// don't join again so we can use our existing state.
|
||||||
|
if s.id != 0 {
|
||||||
|
s.Logger.Printf("skipping join: already member of cluster: nodeId=%v raftEnabled=%v raftNodes=%v",
|
||||||
|
s.id, raft.PeerContained(s.peers, s.Addr.String()), s.peers)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Logger.Printf("joining cluster at: %v", s.peers)
|
||||||
|
for {
|
||||||
|
for _, join := range s.peers {
|
||||||
|
res, err := s.rpc.join(s.Addr.String(), join)
|
||||||
|
if err != nil {
|
||||||
|
s.Logger.Printf("join failed: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Logger.Printf("joined remote node %v", join)
|
||||||
|
s.Logger.Printf("nodeId=%v raftEnabled=%v raftNodes=%v", res.NodeID, res.RaftEnabled, res.RaftNodes)
|
||||||
|
|
||||||
|
s.peers = res.RaftNodes
|
||||||
|
s.id = res.NodeID
|
||||||
|
|
||||||
|
if err := s.writeNodeID(res.NodeID); err != nil {
|
||||||
|
s.Logger.Printf("write node id failed: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if !res.RaftEnabled {
|
||||||
|
// Shutdown our local raft and transition to a remote raft state
|
||||||
|
if err := s.enableRemoteRaft(); err != nil {
|
||||||
|
s.Logger.Printf("enable remote raft failed: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Logger.Printf("join failed: retrying...")
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) enableLocalRaft() error {
|
||||||
|
if _, ok := s.raftState.(*localRaft); ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
s.Logger.Printf("switching to local raft")
|
||||||
|
|
||||||
|
lr := &localRaft{store: s}
|
||||||
|
return s.changeState(lr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) enableRemoteRaft() error {
|
||||||
|
if _, ok := s.raftState.(*remoteRaft); ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Logger.Printf("switching to remote raft")
|
||||||
|
rr := &remoteRaft{store: s}
|
||||||
|
return s.changeState(rr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) changeState(state raftState) error {
|
||||||
|
if err := s.raftState.close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear out any persistent state
|
||||||
|
if err := s.raftState.remove(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.raftState = state
|
||||||
|
|
||||||
|
if err := s.raftState.open(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// openRaft initializes the raft store.
|
// openRaft initializes the raft store.
|
||||||
func (s *Store) openRaft() error {
|
func (s *Store) openRaft() error {
|
||||||
return s.raftState.openRaft()
|
return s.raftState.open()
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize attempts to bootstrap the raft store if there are no committed entries.
|
// initialize attempts to bootstrap the raft store if there are no committed entries.
|
||||||
|
@ -294,11 +383,15 @@ func (s *Store) Close() error {
|
||||||
// WaitForDataChanged will block the current goroutine until the metastore index has
|
// WaitForDataChanged will block the current goroutine until the metastore index has
|
||||||
// be updated.
|
// be updated.
|
||||||
func (s *Store) WaitForDataChanged() error {
|
func (s *Store) WaitForDataChanged() error {
|
||||||
|
s.mu.RLock()
|
||||||
|
changed := s.changed
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.closing:
|
case <-s.closing:
|
||||||
return errors.New("closing")
|
return errors.New("closing")
|
||||||
case <-s.changed:
|
case <-changed:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -404,10 +497,6 @@ func (s *Store) Snapshot() error {
|
||||||
// WaitForLeader sleeps until a leader is found or a timeout occurs.
|
// WaitForLeader sleeps until a leader is found or a timeout occurs.
|
||||||
// timeout == 0 means to wait forever.
|
// timeout == 0 means to wait forever.
|
||||||
func (s *Store) WaitForLeader(timeout time.Duration) error {
|
func (s *Store) WaitForLeader(timeout time.Duration) error {
|
||||||
if s.Leader() != "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Begin timeout timer.
|
// Begin timeout timer.
|
||||||
timer := time.NewTimer(timeout)
|
timer := time.NewTimer(timeout)
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
|
@ -452,6 +541,9 @@ func (s *Store) IsLeader() bool {
|
||||||
func (s *Store) Leader() string {
|
func (s *Store) Leader() string {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
|
if s.raftState == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
return s.raftState.leader()
|
return s.raftState.leader()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -466,10 +558,10 @@ func (s *Store) AddPeer(addr string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Peers returns the list of peers in the cluster.
|
// Peers returns the list of peers in the cluster.
|
||||||
func (s *Store) Peers() []string {
|
func (s *Store) Peers() ([]string, error) {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
return s.peers
|
return s.raftState.peers()
|
||||||
}
|
}
|
||||||
|
|
||||||
// serveExecListener processes remote exec connections.
|
// serveExecListener processes remote exec connections.
|
||||||
|
@ -574,7 +666,6 @@ func (s *Store) handleExecConn(conn net.Conn) {
|
||||||
// This function runs in a separate goroutine.
|
// This function runs in a separate goroutine.
|
||||||
func (s *Store) serveRPCListener() {
|
func (s *Store) serveRPCListener() {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
<-s.ready
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// Accept next TCP connection.
|
// Accept next TCP connection.
|
||||||
|
|
|
@ -787,33 +787,101 @@ func TestCluster_Open(t *testing.T) {
|
||||||
t.Fatal("no leader found")
|
t.Fatal("no leader found")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add a database to each node.
|
// ensure all the nodes see the same metastore data
|
||||||
for i, s := range c.Stores {
|
assertDatabaseReplicated(t, c)
|
||||||
if di, err := s.CreateDatabase(fmt.Sprintf("db%d", i)); err != nil {
|
}
|
||||||
t.Fatal(err)
|
|
||||||
} else if di == nil {
|
// Ensure a multi-node cluster can start, join the cluster, and the first three members are raft nodes.
|
||||||
t.Fatal("expected database")
|
func TestCluster_OpenRaft(t *testing.T) {
|
||||||
|
// Start a single node.
|
||||||
|
c := MustOpenCluster(1)
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
// Check that the node becomes leader.
|
||||||
|
if s := c.Leader(); s == nil {
|
||||||
|
t.Fatal("no leader found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add 5 more nodes.
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
if err := c.Join(); err != nil {
|
||||||
|
t.Fatalf("failed to join cluster: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that each store has all databases.
|
// ensure we have 3 raft nodes
|
||||||
for i := 0; i < len(c.Stores); i++ {
|
assertRaftPeerNodes(t, c, 3)
|
||||||
for _, s := range c.Stores {
|
|
||||||
if di, err := s.Database(fmt.Sprintf("db%d", i)); err != nil {
|
// ensure all the nodes see the same metastore data
|
||||||
t.Fatal(err)
|
assertDatabaseReplicated(t, c)
|
||||||
} else if di == nil {
|
}
|
||||||
t.Fatal("expected database")
|
|
||||||
}
|
// Ensure a multi-node cluster can restart
|
||||||
|
func TestCluster_Restart(t *testing.T) {
|
||||||
|
// Start a single node.
|
||||||
|
c := MustOpenCluster(1)
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
// Check that one node is leader.
|
||||||
|
if s := c.Leader(); s == nil {
|
||||||
|
t.Fatal("no leader found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add 5 more ndes, 2 should become raft peers, 3 remote raft clients
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
if err := c.Join(); err != nil {
|
||||||
|
t.Fatalf("failed to join cluster: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The tests use a host host assigned listener port. We need to re-use
|
||||||
|
// the original ports when the new cluster is restarted so that the existing
|
||||||
|
// peer store addresses can be reached.
|
||||||
|
addrs := []string{}
|
||||||
|
|
||||||
|
// Make sure we keep files on disk when we shutdown as well as record the
|
||||||
|
// current cluster IP addresses
|
||||||
|
for _, s := range c.Stores {
|
||||||
|
s.LeaveFiles = true
|
||||||
|
addrs = append(addrs, s.Addr.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop the cluster
|
||||||
|
if err := c.Close(); err != nil {
|
||||||
|
t.Fatalf("failed to close cluster: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait a bit to avoid spurious port in use conflict errors from trying to
|
||||||
|
// start the new cluster to fast
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// Re-create the cluster nodes from existing disk paths and addresses
|
||||||
|
stores := []*Store{}
|
||||||
|
for i, s := range c.Stores {
|
||||||
|
store := MustOpenStoreWithPath(addrs[i], s.Path())
|
||||||
|
stores = append(stores, store)
|
||||||
|
}
|
||||||
|
c.Stores = stores
|
||||||
|
|
||||||
|
// Wait for the cluster to stabilize
|
||||||
|
if err := c.WaitForLeader(); err != nil {
|
||||||
|
t.Fatal("no leader found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure we have 3 raft nodes
|
||||||
|
assertRaftPeerNodes(t, c, 3)
|
||||||
|
|
||||||
|
// ensure all the nodes see the same metastore data
|
||||||
|
assertDatabaseReplicated(t, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store is a test wrapper for meta.Store.
|
// Store is a test wrapper for meta.Store.
|
||||||
type Store struct {
|
type Store struct {
|
||||||
*meta.Store
|
*meta.Store
|
||||||
Listener net.Listener
|
BindAddress string
|
||||||
Stderr bytes.Buffer
|
Listener net.Listener
|
||||||
LeaveFiles bool // set to true to leave temporary files on close
|
Stderr bytes.Buffer
|
||||||
|
LeaveFiles bool // set to true to leave temporary files on close
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStore returns a new test wrapper for Store.
|
// NewStore returns a new test wrapper for Store.
|
||||||
|
@ -828,7 +896,16 @@ func NewStore(c *meta.Config) *Store {
|
||||||
|
|
||||||
// MustOpenStore opens a store in a temporary path. Panic on error.
|
// MustOpenStore opens a store in a temporary path. Panic on error.
|
||||||
func MustOpenStore() *Store {
|
func MustOpenStore() *Store {
|
||||||
s := NewStore(NewConfig(MustTempFile()))
|
return MustOpenStoreWithPath("", MustTempFile())
|
||||||
|
}
|
||||||
|
|
||||||
|
// MustOpenStoreWith opens a store from a given path. Panic on error.
|
||||||
|
func MustOpenStoreWithPath(addr, path string) *Store {
|
||||||
|
c := NewConfig(path)
|
||||||
|
s := NewStore(c)
|
||||||
|
if addr != "" {
|
||||||
|
s.BindAddress = addr
|
||||||
|
}
|
||||||
if err := s.Open(); err != nil {
|
if err := s.Open(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -845,8 +922,13 @@ func MustOpenStore() *Store {
|
||||||
|
|
||||||
// Open opens the store on a random TCP port.
|
// Open opens the store on a random TCP port.
|
||||||
func (s *Store) Open() error {
|
func (s *Store) Open() error {
|
||||||
|
|
||||||
|
addr := "127.0.0.1:0"
|
||||||
|
if s.BindAddress != "" {
|
||||||
|
addr = s.BindAddress
|
||||||
|
}
|
||||||
// Open a TCP port.
|
// Open a TCP port.
|
||||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
ln, err := net.Listen("tcp", addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("listen: %s", err)
|
return fmt.Errorf("listen: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -895,17 +977,21 @@ func NewConfig(path string) *meta.Config {
|
||||||
|
|
||||||
// Cluster represents a group of stores joined as a raft cluster.
|
// Cluster represents a group of stores joined as a raft cluster.
|
||||||
type Cluster struct {
|
type Cluster struct {
|
||||||
|
path string
|
||||||
Stores []*Store
|
Stores []*Store
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCluster returns a cluster of n stores within path.
|
// NewCluster returns a cluster of n stores within path.
|
||||||
func NewCluster(path string, n int) *Cluster {
|
func NewCluster(path string, n int) *Cluster {
|
||||||
c := &Cluster{}
|
c := &Cluster{path: path}
|
||||||
|
|
||||||
// Construct a list of temporary peers.
|
peers := []string{}
|
||||||
peers := make([]string, n)
|
if n > 1 {
|
||||||
for i := range peers {
|
// Construct a list of temporary peers.
|
||||||
peers[i] = "127.0.0.1:0"
|
peers := make([]string, n)
|
||||||
|
for i := range peers {
|
||||||
|
peers[i] = "127.0.0.1:0"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create new stores with temporary peers.
|
// Create new stores with temporary peers.
|
||||||
|
@ -937,6 +1023,23 @@ func MustOpenCluster(n int) *Cluster {
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) Join() error {
|
||||||
|
config := NewConfig(filepath.Join(c.path, strconv.Itoa(len(c.Stores))))
|
||||||
|
config.Peers = []string{c.Stores[0].Addr.String()}
|
||||||
|
s := NewStore(config)
|
||||||
|
if err := s.Open(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case err := <-s.Err():
|
||||||
|
panic(fmt.Sprintf("store: i=%d, addr=%s, err=%s", len(c.Stores), s.Addr.String(), err))
|
||||||
|
case <-s.Ready():
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Stores = append(c.Stores, s)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Open opens and initializes all stores in the cluster.
|
// Open opens and initializes all stores in the cluster.
|
||||||
func (c *Cluster) Open() error {
|
func (c *Cluster) Open() error {
|
||||||
if err := func() error {
|
if err := func() error {
|
||||||
|
@ -972,6 +1075,15 @@ func (c *Cluster) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) WaitForLeader() error {
|
||||||
|
for _, s := range c.Stores {
|
||||||
|
if err := s.WaitForLeader(5 * time.Second); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Leader returns the store that is currently leader.
|
// Leader returns the store that is currently leader.
|
||||||
func (c *Cluster) Leader() *Store {
|
func (c *Cluster) Leader() *Store {
|
||||||
for _, s := range c.Stores {
|
for _, s := range c.Stores {
|
||||||
|
@ -994,3 +1106,44 @@ func MustTempFile() string {
|
||||||
func mockHashPassword(password string) ([]byte, error) {
|
func mockHashPassword(password string) ([]byte, error) {
|
||||||
return []byte(password), nil
|
return []byte(password), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// assertRaftPeerNodes counts the number of nodes running with a local raft
|
||||||
|
// database and asserts that the count is equal to n
|
||||||
|
func assertRaftPeerNodes(t *testing.T, c *Cluster, n int) {
|
||||||
|
// Ensure we have the required number of raft nodes
|
||||||
|
raftCount := 0
|
||||||
|
for _, s := range c.Stores {
|
||||||
|
if _, err := os.Stat(filepath.Join(s.Path(), "raft.db")); err == nil {
|
||||||
|
raftCount += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if raftCount != n {
|
||||||
|
t.Errorf("raft nodes mismatch: got %v, exp %v", raftCount, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// assertDatabaseReplicated creates a new database named after each node and
|
||||||
|
// then verifies that each node can see all the created databases from their
|
||||||
|
// local meta data
|
||||||
|
func assertDatabaseReplicated(t *testing.T, c *Cluster) {
|
||||||
|
// Add a database to each node.
|
||||||
|
for i, s := range c.Stores {
|
||||||
|
if di, err := s.CreateDatabase(fmt.Sprintf("db%d", i)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if di == nil {
|
||||||
|
t.Fatal("expected database")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that each store has all databases.
|
||||||
|
for i := 0; i < len(c.Stores); i++ {
|
||||||
|
for _, s := range c.Stores {
|
||||||
|
if di, err := s.Database(fmt.Sprintf("db%d", i)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if di == nil {
|
||||||
|
t.Fatal("expected database")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue