pull/5428/head
Cory LaNou 2015-12-18 17:31:24 -05:00 committed by David Norton
parent 94b05404dc
commit b0d0668138
10 changed files with 4655 additions and 117 deletions

1295
services/meta/data.go Normal file

File diff suppressed because it is too large Load Diff

108
services/meta/errors.go Normal file
View File

@ -0,0 +1,108 @@
package meta
import (
"errors"
"fmt"
)
var (
// ErrStoreOpen is returned when opening an already open store.
ErrStoreOpen = errors.New("store already open")
// ErrStoreClosed is returned when closing an already closed store.
ErrStoreClosed = errors.New("raft store already closed")
// ErrTooManyPeers is returned when more than 3 peers are used.
ErrTooManyPeers = errors.New("too many peers; influxdb v0.9.0 is limited to 3 nodes in a cluster")
)
var (
// ErrNodeExists is returned when creating an already existing node.
ErrNodeExists = errors.New("node already exists")
// ErrNodeNotFound is returned when mutating a node that doesn't exist.
ErrNodeNotFound = errors.New("node not found")
// ErrNodesRequired is returned when at least one node is required for an operation.
// This occurs when creating a shard group.
ErrNodesRequired = errors.New("at least one node required")
// ErrNodeIDRequired is returned when using a zero node id.
ErrNodeIDRequired = errors.New("node id must be greater than 0")
// ErrNodeUnableToDropFinalNode is returned if the node being dropped is the last
// node in the cluster
ErrNodeUnableToDropFinalNode = errors.New("unable to drop the final node in a cluster")
)
var (
// ErrDatabaseExists is returned when creating an already existing database.
ErrDatabaseExists = errors.New("database already exists")
// ErrDatabaseNameRequired is returned when creating a database without a name.
ErrDatabaseNameRequired = errors.New("database name required")
)
var (
// ErrRetentionPolicyExists is returned when creating an already existing policy.
ErrRetentionPolicyExists = errors.New("retention policy already exists")
// ErrRetentionPolicyDefault is returned when attempting a prohibited operation
// on a default retention policy.
ErrRetentionPolicyDefault = errors.New("retention policy is default")
// ErrRetentionPolicyNameRequired is returned when creating a policy without a name.
ErrRetentionPolicyNameRequired = errors.New("retention policy name required")
// ErrRetentionPolicyNameExists is returned when renaming a policy to
// the same name as another existing policy.
ErrRetentionPolicyNameExists = errors.New("retention policy name already exists")
// ErrRetentionPolicyDurationTooLow is returned when updating a retention
// policy that has a duration lower than the allowed minimum.
ErrRetentionPolicyDurationTooLow = errors.New(fmt.Sprintf("retention policy duration must be at least %s",
MinRetentionPolicyDuration))
// ErrReplicationFactorTooLow is returned when the replication factor is not in an
// acceptable range.
ErrReplicationFactorTooLow = errors.New("replication factor must be greater than 0")
)
var (
// ErrShardGroupExists is returned when creating an already existing shard group.
ErrShardGroupExists = errors.New("shard group already exists")
// ErrShardGroupNotFound is returned when mutating a shard group that doesn't exist.
ErrShardGroupNotFound = errors.New("shard group not found")
// ErrShardNotReplicated is returned if the node requested to be dropped has
// the last copy of a shard present and the force keyword was not used
ErrShardNotReplicated = errors.New("shard not replicated")
)
var (
// ErrContinuousQueryExists is returned when creating an already existing continuous query.
ErrContinuousQueryExists = errors.New("continuous query already exists")
// ErrContinuousQueryNotFound is returned when removing a continuous query that doesn't exist.
ErrContinuousQueryNotFound = errors.New("continuous query not found")
)
var (
// ErrSubscriptionExists is returned when creating an already existing subscription.
ErrSubscriptionExists = errors.New("subscription already exists")
// ErrSubscriptionNotFound is returned when removing a subscription that doesn't exist.
ErrSubscriptionNotFound = errors.New("subscription not found")
)
var (
// ErrUserExists is returned when creating an already existing user.
ErrUserExists = errors.New("user already exists")
// ErrUserNotFound is returned when mutating a user that doesn't exist.
ErrUserNotFound = errors.New("user not found")
// ErrUsernameRequired is returned when creating a user without a username.
ErrUsernameRequired = errors.New("username required")
)

View File

@ -2,9 +2,10 @@ package meta
import (
"compress/gzip"
"expvar"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
@ -12,13 +13,14 @@ import (
"strings"
"time"
"github.com/gogo/protobuf/proto"
"github.com/influxdb/influxdb/meta/internal"
"github.com/influxdb/influxdb/uuid"
)
//type store interface {
// AfterIndex(index int) <-chan struct{}
// Database(name string) (*DatabaseInfo, error)
//}
// execMagic is the first 4 bytes sent to a remote exec connection to verify
// that it is coming from a remote exec client connection.
const execMagic = "EXEC"
// handler represents an HTTP handler for the meta service.
type handler struct {
@ -29,9 +31,12 @@ type handler struct {
loggingEnabled bool // Log every HTTP access.
pprofEnabled bool
store interface {
AfterIndex(index int) <-chan struct{}
Snapshot() ([]byte, error)
SetCache(b []byte)
afterIndex(index uint64) <-chan struct{}
index() uint64
isLeader() bool
leader() string
snapshot() (*Data, error)
apply(b []byte) error
}
}
@ -71,29 +76,98 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case "POST":
h.WrapHandler("execute", h.serveExec).ServeHTTP(w, r)
default:
http.Error(w, "bad request", http.StatusBadRequest)
http.Error(w, "", http.StatusBadRequest)
}
}
// serveExec executes the requested command.
func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {
// If not the leader, redirect.
if !h.store.isLeader() {
l := h.store.leader()
if l == "" {
h.httpError(errors.New("no leader"), w, http.StatusServiceUnavailable)
return
}
l = r.URL.Scheme + "//" + l + "/execute"
http.Redirect(w, r, l, http.StatusFound)
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
h.httpError(err, w, http.StatusInternalServerError)
return
}
if err := validateCommand(body); err != nil {
h.httpError(err, w, http.StatusBadRequest)
return
}
resp := h.exec(body)
b, err := proto.Marshal(resp)
if err != nil {
h.httpError(err, w, http.StatusInternalServerError)
return
}
w.Header().Add("Content-Type", "application/octet-stream")
w.Write(b)
}
func validateCommand(b []byte) error {
// Read marker message.
if len(b) < 4 {
return errors.New("invalid execMagic size")
} else if string(b[:4]) != execMagic {
return fmt.Errorf("invalid exec magic: %q", string(b[:4]))
}
// Ensure command can be deserialized before applying.
if err := proto.Unmarshal(b[4:], &internal.Command{}); err != nil {
return fmt.Errorf("unable to unmarshal command: %s", err)
}
return nil
}
func (h *handler) exec(b []byte) *internal.Response {
if err := h.store.apply(b); err != nil {
return &internal.Response{
OK: proto.Bool(false),
Error: proto.String(err.Error()),
}
}
return &internal.Response{
OK: proto.Bool(true),
Index: proto.Uint64(h.store.index()),
}
}
// serveSnapshot is a long polling http connection to server cache updates
func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
// get the current index that client has
index, _ := strconv.Atoi(r.URL.Query().Get("index"))
index, err := strconv.ParseUint(r.URL.Query().Get("index"), 10, 64)
if err != nil {
http.Error(w, "error parsing index", http.StatusBadRequest)
}
select {
case <-h.store.AfterIndex(index):
case <-h.store.afterIndex(index):
// Send updated snapshot to client.
ss, err := h.store.Snapshot()
ss, err := h.store.snapshot()
if err != nil {
h.logger.Println(err)
http.Error(w, "", http.StatusInternalServerError)
h.httpError(err, w, http.StatusInternalServerError)
return
}
w.Write(ss)
b, err := ss.MarshalBinary()
if err != nil {
h.httpError(err, w, http.StatusInternalServerError)
return
}
w.Write(b)
case <-w.(http.CloseNotifier).CloseNotify():
// Client closed the connection so we're done.
return
@ -105,21 +179,6 @@ func (h *handler) servePing(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ACK"))
}
// serveExpvar serves registered expvar information over HTTP.
func serveExpvar(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, "{\n")
first := true
expvar.Do(func(kv expvar.KeyValue) {
if !first {
fmt.Fprintf(w, ",\n")
}
first = false
fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value)
})
fmt.Fprintf(w, "\n}\n")
}
type gzipResponseWriter struct {
io.Writer
http.ResponseWriter
@ -197,3 +256,10 @@ func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler
inner.ServeHTTP(l, r)
})
}
func (h *handler) httpError(err error, w http.ResponseWriter, status int) {
if h.loggingEnabled {
h.logger.Println(err)
}
http.Error(w, "", status)
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,377 @@
package internal;
//========================================================================
//
// Metadata
//
//========================================================================
message Data {
required uint64 Term = 1;
required uint64 Index = 2;
required uint64 ClusterID = 3;
repeated NodeInfo Nodes = 4;
repeated DatabaseInfo Databases = 5;
repeated UserInfo Users = 6;
required uint64 MaxNodeID = 7;
required uint64 MaxShardGroupID = 8;
required uint64 MaxShardID = 9;
}
message NodeInfo {
required uint64 ID = 1;
required string Host = 2;
}
message DatabaseInfo {
required string Name = 1;
required string DefaultRetentionPolicy = 2;
repeated RetentionPolicyInfo RetentionPolicies = 3;
repeated ContinuousQueryInfo ContinuousQueries = 4;
}
message RetentionPolicyInfo {
required string Name = 1;
required int64 Duration = 2;
required int64 ShardGroupDuration = 3;
required uint32 ReplicaN = 4;
repeated ShardGroupInfo ShardGroups = 5;
repeated SubscriptionInfo Subscriptions = 6;
}
message ShardGroupInfo {
required uint64 ID = 1;
required int64 StartTime = 2;
required int64 EndTime = 3;
required int64 DeletedAt = 4;
repeated ShardInfo Shards = 5;
}
message ShardInfo {
required uint64 ID = 1;
repeated uint64 OwnerIDs = 2 [deprecated=true];
repeated ShardOwner Owners = 3;
}
message SubscriptionInfo{
required string Name = 1;
required string Mode = 2;
repeated string Destinations = 3;
}
message ShardOwner {
required uint64 NodeID = 1;
}
message ContinuousQueryInfo {
required string Name = 1;
required string Query = 2;
}
message UserInfo {
required string Name = 1;
required string Hash = 2;
required bool Admin = 3;
repeated UserPrivilege Privileges = 4;
}
message UserPrivilege {
required string Database = 1;
required int32 Privilege = 2;
}
//========================================================================
//
// COMMANDS
//
//========================================================================
message Command {
extensions 100 to max;
enum Type {
CreateNodeCommand = 1;
DeleteNodeCommand = 2;
CreateDatabaseCommand = 3;
DropDatabaseCommand = 4;
CreateRetentionPolicyCommand = 5;
DropRetentionPolicyCommand = 6;
SetDefaultRetentionPolicyCommand = 7;
UpdateRetentionPolicyCommand = 8;
CreateShardGroupCommand = 9;
DeleteShardGroupCommand = 10;
CreateContinuousQueryCommand = 11;
DropContinuousQueryCommand = 12;
CreateUserCommand = 13;
DropUserCommand = 14;
UpdateUserCommand = 15;
SetPrivilegeCommand = 16;
SetDataCommand = 17;
SetAdminPrivilegeCommand = 18;
UpdateNodeCommand = 19;
CreateSubscriptionCommand = 21;
DropSubscriptionCommand = 22;
RemovePeerCommand = 23;
}
required Type type = 1;
}
message CreateNodeCommand {
extend Command {
optional CreateNodeCommand command = 101;
}
required string Host = 1;
required uint64 Rand = 2;
}
message DeleteNodeCommand {
extend Command {
optional DeleteNodeCommand command = 102;
}
required uint64 ID = 1;
required bool Force = 2;
}
message CreateDatabaseCommand {
extend Command {
optional CreateDatabaseCommand command = 103;
}
required string Name = 1;
}
message DropDatabaseCommand {
extend Command {
optional DropDatabaseCommand command = 104;
}
required string Name = 1;
}
message CreateRetentionPolicyCommand {
extend Command {
optional CreateRetentionPolicyCommand command = 105;
}
required string Database = 1;
required RetentionPolicyInfo RetentionPolicy = 2;
}
message DropRetentionPolicyCommand {
extend Command {
optional DropRetentionPolicyCommand command = 106;
}
required string Database = 1;
required string Name = 2;
}
message SetDefaultRetentionPolicyCommand {
extend Command {
optional SetDefaultRetentionPolicyCommand command = 107;
}
required string Database = 1;
required string Name = 2;
}
message UpdateRetentionPolicyCommand {
extend Command {
optional UpdateRetentionPolicyCommand command = 108;
}
required string Database = 1;
required string Name = 2;
optional string NewName = 3;
optional int64 Duration = 4;
optional uint32 ReplicaN = 5;
}
message CreateShardGroupCommand {
extend Command {
optional CreateShardGroupCommand command = 109;
}
required string Database = 1;
required string Policy = 2;
required int64 Timestamp = 3;
}
message DeleteShardGroupCommand {
extend Command {
optional DeleteShardGroupCommand command = 110;
}
required string Database = 1;
required string Policy = 2;
required uint64 ShardGroupID = 3;
}
message CreateContinuousQueryCommand {
extend Command {
optional CreateContinuousQueryCommand command = 111;
}
required string Database = 1;
required string Name = 2;
required string Query = 3;
}
message DropContinuousQueryCommand {
extend Command {
optional DropContinuousQueryCommand command = 112;
}
required string Database = 1;
required string Name = 2;
}
message CreateUserCommand {
extend Command {
optional CreateUserCommand command = 113;
}
required string Name = 1;
required string Hash = 2;
required bool Admin = 3;
}
message DropUserCommand {
extend Command {
optional DropUserCommand command = 114;
}
required string Name = 1;
}
message UpdateUserCommand {
extend Command {
optional UpdateUserCommand command = 115;
}
required string Name = 1;
required string Hash = 2;
}
message SetPrivilegeCommand {
extend Command {
optional SetPrivilegeCommand command = 116;
}
required string Username = 1;
required string Database = 2;
required int32 Privilege = 3;
}
message SetDataCommand {
extend Command {
optional SetDataCommand command = 117;
}
required Data Data = 1;
}
message SetAdminPrivilegeCommand {
extend Command {
optional SetAdminPrivilegeCommand command = 118;
}
required string Username = 1;
required bool Admin = 2;
}
message UpdateNodeCommand {
extend Command {
optional UpdateNodeCommand command = 119;
}
required uint64 ID = 1;
required string Host = 2;
}
message CreateSubscriptionCommand {
extend Command {
optional CreateSubscriptionCommand command = 121;
}
required string Name = 1;
required string Database = 2;
required string RetentionPolicy = 3;
required string Mode = 4;
repeated string Destinations = 5;
}
message DropSubscriptionCommand {
extend Command {
optional DropSubscriptionCommand command = 122;
}
required string Name = 1;
required string Database = 2;
required string RetentionPolicy = 3;
}
message RemovePeerCommand {
extend Command {
optional RemovePeerCommand command = 123;
}
required uint64 ID = 1;
required string Addr = 2;
}
message Response {
required bool OK = 1;
optional string Error = 2;
optional uint64 Index = 3;
}
//========================================================================
//
// RPC - higher-level cluster communication operations
//
//========================================================================
enum RPCType {
Error = 1;
FetchData = 2;
Join = 3;
PromoteRaft = 4;
}
message ResponseHeader {
required bool OK = 1;
optional string Error = 2;
}
message ErrorResponse {
required ResponseHeader Header = 1;
}
message FetchDataRequest {
required uint64 Index = 1;
required uint64 Term = 2;
optional bool Blocking = 3 [default = false];
}
message FetchDataResponse {
required ResponseHeader Header = 1;
required uint64 Index = 2;
required uint64 Term = 3;
optional bytes Data = 4;
}
message JoinRequest {
required string Addr = 1;
}
message JoinResponse {
required ResponseHeader Header = 1;
// Indicates that this node should take part in the raft cluster.
optional bool EnableRaft = 2;
// The addresses of raft peers to use if joining as a raft member. If not joining
// as a raft member, these are the nodes running raft.
repeated string RaftNodes = 3;
// The node ID assigned to the requesting node.
optional uint64 NodeID = 4;
}
message PromoteRaftRequest {
required string Addr = 1;
repeated string RaftNodes = 2;
}
message PromoteRaftResponse {
required ResponseHeader Header = 1;
optional bool Success = 2;
}

309
services/meta/raft_state.go Normal file
View File

@ -0,0 +1,309 @@
package meta
import (
"errors"
"fmt"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"sync"
"time"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
)
// raftState is a consensus strategy that uses a local raft implementation for
// consensus operations.
type raftState struct {
wg sync.WaitGroup
config *Config
closing chan struct{}
raft *raft.Raft
transport *raft.NetworkTransport
peerStore raft.PeerStore
raftStore *raftboltdb.BoltStore
raftLayer *raftLayer
peers []string
ln *net.Listner
logger *log.Logger
}
func newRaftState(c *Config, peers []string, ln *net.Listener, l *log.Logger) *raftState {
return &raftState{
config: c,
peers: peers,
logger: l,
ln: ln,
}
}
func (r *raftState) open() error {
r.closing = make(chan struct{})
// Setup raft configuration.
config := raft.DefaultConfig()
config.LogOutput = ioutil.Discard
if s.clusterTracingEnabled {
config.Logger = s.logger
}
config.HeartbeatTimeout = r.config.HeartbeatTimeout
config.ElectionTimeout = r.config.ElectionTimeout
config.LeaderLeaseTimeout = r.config.LeaderLeaseTimeout
config.CommitTimeout = r.config.CommitTimeout
// Since we actually never call `removePeer` this is safe.
// If in the future we decide to call remove peer we have to re-evaluate how to handle this
config.ShutdownOnRemove = false
// If no peers are set in the config or there is one and we are it, then start as a single server.
if len(s.peers) <= 1 {
config.EnableSingleNode = true
// Ensure we can always become the leader
config.DisableBootstrapAfterElect = false
}
// Build raft layer to multiplex listener.
r.raftLayer = newRaftLayer(r.ln, r.remoteAddr)
// Create a transport layer
r.transport = raft.NewNetworkTransport(r.raftLayer, 3, 10*time.Second, config.LogOutput)
// Create peer storage.
r.peerStore = raft.NewJSONPeers(s.path, r.transport)
peers, err := r.peerStore.Peers()
if err != nil {
return err
}
// For single-node clusters, we can update the raft peers before we start the cluster if the hostname
// has changed.
if config.EnableSingleNode {
if err := r.peerStore.SetPeers([]string{s.RemoteAddr.String()}); err != nil {
return err
}
peers = []string{s.RemoteAddr.String()}
}
// If we have multiple nodes in the cluster, make sure our address is in the raft peers or
// we won't be able to boot into the cluster because the other peers will reject our new hostname. This
// is difficult to resolve automatically because we need to have all the raft peers agree on the current members
// of the cluster before we can change them.
if len(peers) > 0 && !raft.PeerContained(peers, s.RemoteAddr.String()) {
s.logger.Printf("%s is not in the list of raft peers. Please update %v/peers.json on all raft nodes to have the same contents.", s.RemoteAddr.String(), s.Path())
return fmt.Errorf("peers out of sync: %v not in %v", s.RemoteAddr.String(), peers)
}
// Create the log store and stable store.
store, err := raftboltdb.NewBoltStore(filepath.Join(s.path, "raft.db"))
if err != nil {
return fmt.Errorf("new bolt store: %s", err)
}
r.raftStore = store
// Create the snapshot store.
snapshots, err := raft.NewFileSnapshotStore(s.path, raftSnapshotsRetained, os.Stderr)
if err != nil {
return fmt.Errorf("file snapshot store: %s", err)
}
// Create raft log.
ra, err := raft.NewRaft(config, (*storeFSM)(s), store, store, snapshots, r.peerStore, r.transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
r.raft = ra
r.wg.Add(1)
go r.logLeaderChanges()
return nil
}
func (r *raftState) logLeaderChanges() {
defer r.wg.Done()
// Logs our current state (Node at 1.2.3.4:8088 [Follower])
r.logger.Printf(r.raft.String())
for {
select {
case <-r.closing:
return
case <-r.raft.LeaderCh():
peers, err := r.peers()
if err != nil {
r.logger.Printf("failed to lookup peers: %v", err)
}
r.logger.Printf("%v. peers=%v", r.raft.String(), peers)
}
}
}
func (r *raftState) close() error {
if r.closing != nil {
close(r.closing)
}
r.wg.Wait()
if r.transport != nil {
r.transport.Close()
r.transport = nil
}
// Shutdown raft.
if r.raft != nil {
if err := r.raft.Shutdown().Error(); err != nil {
return err
}
r.raft = nil
}
if r.raftStore != nil {
r.raftStore.Close()
r.raftStore = nil
}
return nil
}
func (r *raftState) initialize() error {
// If we have committed entries then the store is already in the cluster.
if index, err := r.raftStore.LastIndex(); err != nil {
return fmt.Errorf("last index: %s", err)
} else if index > 0 {
return nil
}
// Force set peers.
if err := r.setPeers(r.peers); err != nil {
return fmt.Errorf("set raft peers: %s", err)
}
return nil
}
// apply applies a serialized command to the raft log.
func (r *raftState) apply(b []byte) error {
// Apply to raft log.
f := r.raft.Apply(b, 0)
if err := f.Error(); err != nil {
return err
}
// Return response if it's an error.
// No other non-nil objects should be returned.
resp := f.Response()
if err, ok := resp.(error); ok {
return lookupError(err)
}
assert(resp == nil, "unexpected response: %#v", resp)
return nil
}
func (r *raftState) lastIndex() uint64 {
return r.raft.LastIndex()
}
func (r *raftState) snapshot() error {
future := r.raft.Snapshot()
return future.Error()
}
// addPeer adds addr to the list of peers in the cluster.
func (r *raftState) addPeer(addr string) error {
peers, err := r.peerStore.Peers()
if err != nil {
return err
}
if len(peers) >= 3 {
return nil
}
if fut := r.raft.AddPeer(addr); fut.Error() != nil {
return fut.Error()
}
return nil
}
// removePeer removes addr from the list of peers in the cluster.
func (r *raftState) removePeer(addr string) error {
// Only do this on the leader
if !r.isLeader() {
return errors.New("not the leader")
}
if fut := r.raft.RemovePeer(addr); fut.Error() != nil {
return fut.Error()
}
return nil
}
// setPeers sets a list of peers in the cluster.
func (r *raftState) setPeers(addrs []string) error {
return r.raft.SetPeers(addrs).Error()
}
func (r *raftState) peers() ([]string, error) {
return r.peerStore.Peers()
}
func (r *raftState) leader() string {
if r.raft == nil {
return ""
}
return r.raft.Leader()
}
func (r *raftState) isLeader() bool {
if r.raft == nil {
return false
}
return r.raft.State() == raft.Leader
}
// raftLayer wraps the connection so it can be re-used for forwarding.
type raftLayer struct {
ln net.Listener
addr net.Addr
conn chan net.Conn
closed chan struct{}
}
// newRaftLayer returns a new instance of raftLayer.
func newRaftLayer(ln net.Listener, addr net.Addr) *raftLayer {
return &raftLayer{
ln: ln,
addr: addr,
conn: make(chan net.Conn),
closed: make(chan struct{}),
}
}
// Addr returns the local address for the layer.
func (l *raftLayer) Addr() net.Addr { return l.addr }
// Dial creates a new network connection.
func (l *raftLayer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", addr, timeout)
if err != nil {
return nil, err
}
// Write a marker byte for raft messages.
_, err = conn.Write([]byte{MuxRaftHeader})
if err != nil {
conn.Close()
return nil, err
}
return conn, err
}
// Accept waits for the next connection.
func (l *raftLayer) Accept() (net.Conn, error) { return l.ln.Accept() }
// Close closes the layer.
func (l *raftLayer) Close() error { return l.ln.Close() }

View File

@ -19,12 +19,8 @@ type Service struct {
https bool
cert string
err chan error
Logger *log.Logger
store interface {
Close() error
}
Logger *log.Logger
store *store
}
// NewService returns a new instance of Service.
@ -102,7 +98,7 @@ func (s *Service) Close() error {
return s.ln.Close()
}
if err := s.store.Close(); err != nil {
if err := s.store.close(); err != nil {
return err
}

View File

@ -1,12 +1,10 @@
package meta
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"testing"
"time"
)
func TestService_Open(t *testing.T) {
@ -48,58 +46,58 @@ func TestService_PingEndpoint(t *testing.T) {
}
}
func TestService_LongPollCache(t *testing.T) {
cfg := newConfig()
s := NewService(cfg)
if err := s.Open(); err != nil {
t.Fatal(err)
}
url, err := url.Parse(s.URL())
if err != nil {
t.Fatal(err)
}
ch := make(chan []byte)
reqFunc := func(index int) {
println("here1 ******")
u := fmt.Sprintf("http://%s?index=%d", url, index)
resp, err := http.Get(u)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
println("here2 ******")
buf := make([]byte, 10)
n, err := resp.Body.Read(buf)
if err != nil {
t.Fatal(err)
}
println(string(buf[:n]))
println("here3 ******")
ch <- buf[:n]
}
go reqFunc(0)
go reqFunc(1)
go func() {
time.Sleep(1 * time.Second)
s.handler.store.SetCache([]byte("world"))
}()
for n := 0; n < 2; n++ {
b := <-ch
t.Log(string(b))
println("client read cache update")
}
close(ch)
if err := s.Close(); err != nil {
t.Fatal(err)
}
}
//func TestService_LongPollCache(t *testing.T) {
// cfg := newConfig()
// s := NewService(cfg)
// if err := s.Open(); err != nil {
// t.Fatal(err)
// }
//
// url, err := url.Parse(s.URL())
// if err != nil {
// t.Fatal(err)
// }
//
// ch := make(chan []byte)
//
// reqFunc := func(index int) {
// println("here1 ******")
// u := fmt.Sprintf("http://%s?index=%d", url, index)
// resp, err := http.Get(u)
// if err != nil {
// t.Fatal(err)
// }
// defer resp.Body.Close()
// println("here2 ******")
//
// buf := make([]byte, 10)
// n, err := resp.Body.Read(buf)
// if err != nil {
// t.Fatal(err)
// }
// println(string(buf[:n]))
// println("here3 ******")
// ch <- buf[:n]
// }
//
// go reqFunc(0)
// go reqFunc(1)
// go func() {
// time.Sleep(1 * time.Second)
// s.handler.store.SetCache([]byte("world"))
// }()
//
// for n := 0; n < 2; n++ {
// b := <-ch
// t.Log(string(b))
// println("client read cache update")
// }
// close(ch)
//
// if err := s.Close(); err != nil {
// t.Fatal(err)
// }
//}
func newConfig() *Config {
cfg := NewConfig()

View File

@ -1,59 +1,263 @@
package meta
import "sync"
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"sync"
"time"
"github.com/hashicorp/raft"
)
type store struct {
mu sync.RWMutex
index int
closing chan struct{}
cache []byte
cacheChanged chan struct{}
mu sync.RWMutex
closing chan struct{}
data *Data
raftState *raftState
dataChanged chan struct{}
addr string
raftln net.Listener
path string
opened bool
}
func newStore(c *Config) *store {
s := store{
index: 1,
closing: make(chan struct{}),
cache: []byte("hello"),
cacheChanged: make(chan struct{}),
data: &Data{
Index: 1,
},
closing: make(chan struct{}),
dataChanged: make(chan struct{}),
addr: c.RaftAddr,
path: c.Dir,
}
return &s
}
func (s *store) Close() error {
// open opens and initializes the raft store.
func (s *store) open() error {
ln, err := net.Listen("tcp", s.addr)
if err != nil {
return err
}
s.raftln = ln
s.addr = ln.Addr()
s.Logger.Printf("Using data dir: %v", s.path)
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
// Check if store has already been opened.
if s.opened {
return ErrStoreOpen
}
s.opened = true
// load our raft peers
if err := s.loadPeers(); err != nil {
return err
}
// Create the root directory if it doesn't already exist.
if err := os.MkdirAll(s.path, 0777); err != nil {
return fmt.Errorf("mkdir all: %s", err)
}
// Open the raft store.
if err := s.openRaft(); err != nil {
return fmt.Errorf("raft: %s", err)
}
// Initialize the store, if necessary.
if err := s.initialize(); err != nil {
return fmt.Errorf("initialize raft: %s", err)
}
// Load existing ID, if exists.
if err := s.readID(); err != nil {
return fmt.Errorf("read id: %s", err)
}
return nil
}(); err != nil {
return err
}
// Begin serving listener.
s.wg.Add(1)
go s.serveExecListener()
s.wg.Add(1)
go s.serveRPCListener()
// Join an existing cluster if we needed
if err := s.joinCluster(); err != nil {
return fmt.Errorf("join: %v", err)
}
// If the ID doesn't exist then create a new node.
if s.id == 0 {
go s.init()
} else {
go s.syncNodeInfo()
close(s.ready)
}
// Wait for a leader to be elected so we know the raft log is loaded
// and up to date
<-s.ready
if err := s.WaitForLeader(0); err != nil {
return err
}
if s.raftPromotionEnabled {
s.wg.Add(1)
s.Logger.Printf("spun up monitoring for %d", s.NodeID())
go s.monitorPeerHealth()
}
return nil
}
// loadPeers sets the appropriate peers from our persistent storage
func (s *store) loadPeers() error {
peers, err := readPeersJSON(filepath.Join(s.path, "peers.json"))
if err != nil {
return err
}
// If we have existing peers, use those. This will override what's in the
// config.
if len(peers) > 0 {
s.peers = peers
if _, err := os.Stat(filepath.Join(s.path, "raft.db")); err != nil {
return err
}
}
return nil
}
func readPeersJSON(path string) ([]string, error) {
// Read the file
buf, err := ioutil.ReadFile(path)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
// Check for no peers
if len(buf) == 0 {
return nil, nil
}
// Decode the peers
var peers []string
dec := json.NewDecoder(bytes.NewReader(buf))
if err := dec.Decode(&peers); err != nil {
return nil, err
}
return peers, nil
}
func (s *store) close() error {
s.mu.Lock()
defer s.mu.Unlock()
close(s.closing)
return nil
}
func (s *store) SetCache(b []byte) {
s.mu.Lock()
defer s.mu.Unlock()
s.cache = b
s.index++
close(s.cacheChanged)
s.cacheChanged = make(chan struct{})
}
func (s *store) Snapshot() ([]byte, error) {
func (s *store) snapshot() (*Data, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.cache, nil
return s.data.Clone(), nil
}
// AfterIndex returns a channel that will be closed to signal
// afterIndex returns a channel that will be closed to signal
// the caller when an updated snapshot is available.
func (s *store) AfterIndex(index int) <-chan struct{} {
func (s *store) afterIndex(index uint64) <-chan struct{} {
s.mu.RLock()
defer s.mu.RUnlock()
if index < s.index {
if index < s.data.Index {
// Client needs update so return a closed channel.
ch := make(chan struct{})
close(ch)
return ch
}
return s.cacheChanged
return s.dataChanged
}
// isLeader returns true if the store is currently the leader.
func (s *store) isLeader() bool {
s.mu.RLock()
defer s.mu.RUnlock()
if s.raft == nil {
return false
}
return s.raft.State() == raft.Leader
}
// leader returns what the store thinks is the current leader. An empty
// string indicates no leader exists.
func (s *store) leader() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.raft.Leader()
}
// index returns the current store index.
func (s *store) index() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.data.Index
}
// apply applies a command to raft.
func (s *store) apply(b []byte) error {
s.mu.RLock()
defer s.mu.RUnlock()
// Apply to raft log.
f := s.raft.Apply(b, 0)
if err := f.Error(); err != nil {
return err
}
// Return response if it's an error.
// No other non-nil objects should be returned.
resp := f.Response()
if err, ok := resp.(error); ok {
return err
}
// resp should either be an error or nil.
if resp != nil {
panic(fmt.Sprintf("unexpected response: %#v", resp))
}
return nil
}
// RetentionPolicyUpdate represents retention policy fields to be updated.
type RetentionPolicyUpdate struct {
Name *string
Duration *time.Duration
ReplicaN *int
}
// SetName sets the RetentionPolicyUpdate.Name
func (rpu *RetentionPolicyUpdate) SetName(v string) { rpu.Name = &v }
// SetDuration sets the RetentionPolicyUpdate.Duration
func (rpu *RetentionPolicyUpdate) SetDuration(v time.Duration) { rpu.Duration = &v }
// SetReplicaN sets the RetentionPolicyUpdate.ReplicaN
func (rpu *RetentionPolicyUpdate) SetReplicaN(v int) { rpu.ReplicaN = &v }

496
services/meta/store_fsm.go Normal file
View File

@ -0,0 +1,496 @@
package meta
import (
"fmt"
"io"
"io/ioutil"
"time"
"github.com/gogo/protobuf/proto"
"github.com/hashicorp/raft"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta/internal"
)
// storeFSM represents the finite state machine used by Store to interact with Raft.
type storeFSM Store
func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
var cmd internal.Command
if err := proto.Unmarshal(l.Data, &cmd); err != nil {
panic(fmt.Errorf("cannot marshal command: %x", l.Data))
}
// Lock the store.
s := (*Store)(fsm)
s.mu.Lock()
defer s.mu.Unlock()
err := func() interface{} {
switch cmd.GetType() {
case internal.Command_RemovePeerCommand:
return fsm.applyRemovePeerCommand(&cmd)
case internal.Command_CreateNodeCommand:
return fsm.applyCreateNodeCommand(&cmd)
case internal.Command_DeleteNodeCommand:
return fsm.applyDeleteNodeCommand(&cmd)
case internal.Command_CreateDatabaseCommand:
return fsm.applyCreateDatabaseCommand(&cmd)
case internal.Command_DropDatabaseCommand:
return fsm.applyDropDatabaseCommand(&cmd)
case internal.Command_CreateRetentionPolicyCommand:
return fsm.applyCreateRetentionPolicyCommand(&cmd)
case internal.Command_DropRetentionPolicyCommand:
return fsm.applyDropRetentionPolicyCommand(&cmd)
case internal.Command_SetDefaultRetentionPolicyCommand:
return fsm.applySetDefaultRetentionPolicyCommand(&cmd)
case internal.Command_UpdateRetentionPolicyCommand:
return fsm.applyUpdateRetentionPolicyCommand(&cmd)
case internal.Command_CreateShardGroupCommand:
return fsm.applyCreateShardGroupCommand(&cmd)
case internal.Command_DeleteShardGroupCommand:
return fsm.applyDeleteShardGroupCommand(&cmd)
case internal.Command_CreateContinuousQueryCommand:
return fsm.applyCreateContinuousQueryCommand(&cmd)
case internal.Command_DropContinuousQueryCommand:
return fsm.applyDropContinuousQueryCommand(&cmd)
case internal.Command_CreateSubscriptionCommand:
return fsm.applyCreateSubscriptionCommand(&cmd)
case internal.Command_DropSubscriptionCommand:
return fsm.applyDropSubscriptionCommand(&cmd)
case internal.Command_CreateUserCommand:
return fsm.applyCreateUserCommand(&cmd)
case internal.Command_DropUserCommand:
return fsm.applyDropUserCommand(&cmd)
case internal.Command_UpdateUserCommand:
return fsm.applyUpdateUserCommand(&cmd)
case internal.Command_SetPrivilegeCommand:
return fsm.applySetPrivilegeCommand(&cmd)
case internal.Command_SetAdminPrivilegeCommand:
return fsm.applySetAdminPrivilegeCommand(&cmd)
case internal.Command_SetDataCommand:
return fsm.applySetDataCommand(&cmd)
case internal.Command_UpdateNodeCommand:
return fsm.applyUpdateNodeCommand(&cmd)
default:
panic(fmt.Errorf("cannot apply command: %x", l.Data))
}
}()
// Copy term and index to new metadata.
fsm.data.Term = l.Term
fsm.data.Index = l.Index
s.notifyChanged()
return err
}
func (fsm *storeFSM) applyRemovePeerCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_RemovePeerCommand_Command)
v := ext.(*internal.RemovePeerCommand)
id := v.GetID()
addr := v.GetAddr()
// Only do this if you are the leader
if fsm.raftState.isLeader() {
//Remove that node from the peer
fsm.Logger.Printf("removing peer for node id %d, %s", id, addr)
if err := fsm.raftState.removePeer(addr); err != nil {
fsm.Logger.Printf("error removing peer: %s", err)
}
}
// If this is the node being shutdown, close raft
if fsm.id == id {
fsm.Logger.Printf("shutting down raft for %s", addr)
if err := fsm.raftState.close(); err != nil {
fsm.Logger.Printf("failed to shut down raft: %s", err)
}
}
return nil
}
func (fsm *storeFSM) applyCreateNodeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateNodeCommand_Command)
v := ext.(*internal.CreateNodeCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateNode(v.GetHost()); err != nil {
return err
}
// If the cluster ID hasn't been set then use the command's random number.
if other.ClusterID == 0 {
other.ClusterID = uint64(v.GetRand())
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyUpdateNodeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_UpdateNodeCommand_Command)
v := ext.(*internal.UpdateNodeCommand)
// Copy data and update.
other := fsm.data.Clone()
ni := other.Node(v.GetID())
if ni == nil {
return ErrNodeNotFound
}
ni.Host = v.GetHost()
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDeleteNodeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DeleteNodeCommand_Command)
v := ext.(*internal.DeleteNodeCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DeleteNode(v.GetID(), v.GetForce()); err != nil {
return err
}
fsm.data = other
id := v.GetID()
fsm.Logger.Printf("node '%d' removed", id)
return nil
}
func (fsm *storeFSM) applyCreateDatabaseCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateDatabaseCommand_Command)
v := ext.(*internal.CreateDatabaseCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateDatabase(v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDropDatabaseCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropDatabaseCommand_Command)
v := ext.(*internal.DropDatabaseCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropDatabase(v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyCreateRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateRetentionPolicyCommand_Command)
v := ext.(*internal.CreateRetentionPolicyCommand)
pb := v.GetRetentionPolicy()
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateRetentionPolicy(v.GetDatabase(),
&RetentionPolicyInfo{
Name: pb.GetName(),
ReplicaN: int(pb.GetReplicaN()),
Duration: time.Duration(pb.GetDuration()),
ShardGroupDuration: time.Duration(pb.GetShardGroupDuration()),
}); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDropRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropRetentionPolicyCommand_Command)
v := ext.(*internal.DropRetentionPolicyCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropRetentionPolicy(v.GetDatabase(), v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applySetDefaultRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_SetDefaultRetentionPolicyCommand_Command)
v := ext.(*internal.SetDefaultRetentionPolicyCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.SetDefaultRetentionPolicy(v.GetDatabase(), v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyUpdateRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_UpdateRetentionPolicyCommand_Command)
v := ext.(*internal.UpdateRetentionPolicyCommand)
// Create update object.
rpu := RetentionPolicyUpdate{Name: v.NewName}
if v.Duration != nil {
value := time.Duration(v.GetDuration())
rpu.Duration = &value
}
if v.ReplicaN != nil {
value := int(v.GetReplicaN())
rpu.ReplicaN = &value
}
// Copy data and update.
other := fsm.data.Clone()
if err := other.UpdateRetentionPolicy(v.GetDatabase(), v.GetName(), &rpu); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyCreateShardGroupCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateShardGroupCommand_Command)
v := ext.(*internal.CreateShardGroupCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateShardGroup(v.GetDatabase(), v.GetPolicy(), time.Unix(0, v.GetTimestamp())); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDeleteShardGroupCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DeleteShardGroupCommand_Command)
v := ext.(*internal.DeleteShardGroupCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DeleteShardGroup(v.GetDatabase(), v.GetPolicy(), v.GetShardGroupID()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyCreateContinuousQueryCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateContinuousQueryCommand_Command)
v := ext.(*internal.CreateContinuousQueryCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateContinuousQuery(v.GetDatabase(), v.GetName(), v.GetQuery()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDropContinuousQueryCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropContinuousQueryCommand_Command)
v := ext.(*internal.DropContinuousQueryCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropContinuousQuery(v.GetDatabase(), v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyCreateSubscriptionCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateSubscriptionCommand_Command)
v := ext.(*internal.CreateSubscriptionCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateSubscription(v.GetDatabase(), v.GetRetentionPolicy(), v.GetName(), v.GetMode(), v.GetDestinations()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDropSubscriptionCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropSubscriptionCommand_Command)
v := ext.(*internal.DropSubscriptionCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropSubscription(v.GetDatabase(), v.GetRetentionPolicy(), v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyCreateUserCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateUserCommand_Command)
v := ext.(*internal.CreateUserCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateUser(v.GetName(), v.GetHash(), v.GetAdmin()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyDropUserCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropUserCommand_Command)
v := ext.(*internal.DropUserCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropUser(v.GetName()); err != nil {
return err
}
fsm.data = other
delete(fsm.authCache, v.GetName())
return nil
}
func (fsm *storeFSM) applyUpdateUserCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_UpdateUserCommand_Command)
v := ext.(*internal.UpdateUserCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.UpdateUser(v.GetName(), v.GetHash()); err != nil {
return err
}
fsm.data = other
delete(fsm.authCache, v.GetName())
return nil
}
func (fsm *storeFSM) applySetPrivilegeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_SetPrivilegeCommand_Command)
v := ext.(*internal.SetPrivilegeCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.SetPrivilege(v.GetUsername(), v.GetDatabase(), influxql.Privilege(v.GetPrivilege())); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applySetAdminPrivilegeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_SetAdminPrivilegeCommand_Command)
v := ext.(*internal.SetAdminPrivilegeCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.SetAdminPrivilege(v.GetUsername(), v.GetAdmin()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applySetDataCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_SetDataCommand_Command)
v := ext.(*internal.SetDataCommand)
// Overwrite data.
fsm.data = &Data{}
fsm.data.unmarshal(v.GetData())
return nil
}
func (fsm *storeFSM) Snapshot() (raft.FSMSnapshot, error) {
s := (*Store)(fsm)
s.mu.Lock()
defer s.mu.Unlock()
return &storeFSMSnapshot{Data: (*Store)(fsm).data}, nil
}
func (fsm *storeFSM) Restore(r io.ReadCloser) error {
// Read all bytes.
b, err := ioutil.ReadAll(r)
if err != nil {
return err
}
// Decode metadata.
data := &Data{}
if err := data.UnmarshalBinary(b); err != nil {
return err
}
// Set metadata on store.
// NOTE: No lock because Hashicorp Raft doesn't call Restore concurrently
// with any other function.
fsm.data = data
return nil
}
type storeFSMSnapshot struct {
Data *Data
}
func (s *storeFSMSnapshot) Persist(sink raft.SnapshotSink) error {
err := func() error {
// Encode data.
p, err := s.Data.MarshalBinary()
if err != nil {
return err
}
// Write data to sink.
if _, err := sink.Write(p); err != nil {
return err
}
// Close the sink.
if err := sink.Close(); err != nil {
return err
}
return nil
}()
if err != nil {
sink.Cancel()
return err
}
return nil
}
// Release is invoked when we are finished with the snapshot
func (s *storeFSMSnapshot) Release() {}