influxdb/meta/store.go

2088 lines
54 KiB
Go
Raw Normal View History

package meta
import (
"bytes"
crand "crypto/rand"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"io"
2015-06-01 17:20:57 +00:00
"io/ioutil"
"log"
"math/rand"
"net"
"os"
"path/filepath"
2015-06-01 17:20:57 +00:00
"strconv"
2015-06-06 03:58:54 +00:00
"strings"
"sync"
"time"
"github.com/gogo/protobuf/proto"
"github.com/hashicorp/raft"
2015-05-27 19:41:58 +00:00
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta/internal"
2015-05-21 20:06:01 +00:00
"golang.org/x/crypto/bcrypt"
)
// tcp.Mux header bytes.
const (
MuxRaftHeader = 0
MuxExecHeader = 1
MuxRPCHeader = 5
// SaltBytes is the number of bytes used for salts
SaltBytes = 32
2015-08-05 20:48:30 +00:00
DefaultSyncNodeDelay = time.Second
)
// ExecMagic is the first 4 bytes sent to a remote exec connection to verify
// that it is coming from a remote exec client connection.
const ExecMagic = "EXEC"
// Retention policy settings.
const (
AutoCreateRetentionPolicyName = "default"
AutoCreateRetentionPolicyPeriod = 0
RetentionPolicyMinDuration = time.Hour
// MaxAutoCreatedRetentionPolicyReplicaN is the maximum replication factor that will
// be set for auto-created retention policies.
MaxAutoCreatedRetentionPolicyReplicaN = 3
)
// Raft configuration.
const (
raftLogCacheSize = 512
raftSnapshotsRetained = 2
raftTransportMaxPool = 3
raftTransportTimeout = 10 * time.Second
MaxRaftNodes = 3
)
// Store represents a raft-backed metastore.
type Store struct {
2015-05-27 19:41:58 +00:00
mu sync.RWMutex
path string
opened bool
id uint64 // local node id
2015-06-01 17:20:57 +00:00
// All peers in cluster. Used during bootstrapping.
peers []string
data *Data
2015-07-23 02:51:42 +00:00
rpc *rpc
2015-08-05 20:48:30 +00:00
// The address used by other nodes to reach this node.
RemoteAddr net.Addr
raftState raftState
2015-09-30 17:10:09 +00:00
ready chan struct{}
err chan error
closing chan struct{}
wg sync.WaitGroup
changed chan struct{}
// clusterTracingEnabled controls whether low-level cluster communcation is logged.
// Useful for troubleshooting
clusterTracingEnabled bool
2015-06-07 00:03:52 +00:00
retentionAutoCreate bool
// The listeners to accept raft and remote exec connections from.
RaftListener net.Listener
ExecListener net.Listener
// The listener for higher-level, cluster operations
RPCListener net.Listener
// The advertised hostname of the store.
Addr net.Addr
// The amount of time before a follower starts a new election.
HeartbeatTimeout time.Duration
// The amount of time before a candidate starts a new election.
ElectionTimeout time.Duration
// The amount of time without communication to the cluster before a
// leader steps down to a follower state.
LeaderLeaseTimeout time.Duration
// The amount of time without an apply before sending a heartbeat.
CommitTimeout time.Duration
2015-06-26 19:30:00 +00:00
// Authentication cache.
authCache map[string]authUser
2015-06-26 19:30:00 +00:00
2015-06-29 17:37:36 +00:00
// hashPassword generates a cryptographically secure hash for password.
// Returns an error if the password is invalid or a hash cannot be generated.
hashPassword HashPasswordFn
Logger *log.Logger
}
type authUser struct {
salt []byte
hash []byte
}
// NewStore returns a new instance of Store.
func NewStore(c *Config) *Store {
s := &Store{
path: c.Dir,
peers: c.Peers,
data: &Data{},
2015-09-30 17:10:09 +00:00
ready: make(chan struct{}),
err: make(chan error),
closing: make(chan struct{}),
changed: make(chan struct{}),
clusterTracingEnabled: c.ClusterTracing,
retentionAutoCreate: c.RetentionAutoCreate,
2015-06-01 22:00:13 +00:00
HeartbeatTimeout: time.Duration(c.HeartbeatTimeout),
ElectionTimeout: time.Duration(c.ElectionTimeout),
LeaderLeaseTimeout: time.Duration(c.LeaderLeaseTimeout),
CommitTimeout: time.Duration(c.CommitTimeout),
authCache: make(map[string]authUser, 0),
2015-06-29 17:37:36 +00:00
hashPassword: func(password string) ([]byte, error) {
return bcrypt.GenerateFromPassword([]byte(password), BcryptCost)
},
Logger: log.New(os.Stderr, "[metastore] ", log.LstdFlags),
}
s.raftState = &localRaft{store: s}
2015-07-23 02:51:42 +00:00
s.rpc = &rpc{
store: s,
tracingEnabled: c.ClusterTracing,
2015-07-23 02:51:42 +00:00
logger: s.Logger,
}
return s
}
// Path returns the root path when open.
// Returns an empty string when the store is closed.
2015-05-27 19:41:58 +00:00
func (s *Store) Path() string { return s.path }
2015-06-01 17:20:57 +00:00
// IDPath returns the path to the local node ID file.
func (s *Store) IDPath() string { return filepath.Join(s.path, "id") }
// Open opens and initializes the raft store.
2015-05-27 19:41:58 +00:00
func (s *Store) Open() error {
// Verify that no more than 3 peers.
// https://github.com/influxdb/influxdb/issues/2750
if len(s.peers) > MaxRaftNodes {
return ErrTooManyPeers
}
// Verify listeners are set.
if s.RaftListener == nil {
panic("Store.RaftListener not set")
} else if s.ExecListener == nil {
panic("Store.ExecListener not set")
} else if s.RPCListener == nil {
panic("Store.RPCListener not set")
}
s.Logger.Printf("Using data dir: %v", s.Path())
2015-06-01 17:20:57 +00:00
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
2015-06-01 17:20:57 +00:00
// Check if store has already been opened.
if s.opened {
return ErrStoreOpen
}
s.opened = true
// load our raft state
if err := s.loadState(); err != nil {
return err
}
// Create the root directory if it doesn't already exist.
if err := s.createRootDir(); err != nil {
return fmt.Errorf("mkdir all: %s", err)
}
// Open the raft store.
if err := s.openRaft(); err != nil {
return fmt.Errorf("raft: %s", err)
}
// Initialize the store, if necessary.
if err := s.initialize(); err != nil {
return fmt.Errorf("initialize raft: %s", err)
}
// Load existing ID, if exists.
2015-06-01 17:20:57 +00:00
if err := s.readID(); err != nil {
return fmt.Errorf("read id: %s", err)
}
return nil
}(); err != nil {
s.close()
return err
}
// Begin serving listener.
s.wg.Add(1)
go s.serveExecListener()
s.wg.Add(1)
go s.serveRPCListener()
// Join an existing cluster if we needed
if err := s.joinCluster(); err != nil {
return fmt.Errorf("join: %v", err)
}
2015-06-01 17:20:57 +00:00
// If the ID doesn't exist then create a new node.
if s.id == 0 {
go s.init()
} else {
go s.syncNodeInfo()
close(s.ready)
}
// Wait for a leader to be elected so we know the raft log is loaded
// and up to date
<-s.ready
return s.WaitForLeader(0)
}
// syncNodeInfo continuously tries to update the current nodes hostname
// in the meta store. It will retry until successful.
func (s *Store) syncNodeInfo() error {
<-s.ready
for {
if err := func() error {
if err := s.WaitForLeader(0); err != nil {
return err
}
ni, err := s.Node(s.id)
if err != nil {
return err
}
if ni == nil {
return ErrNodeNotFound
}
if ni.Host == s.RemoteAddr.String() {
s.Logger.Printf("Updated node id=%d hostname=%v", s.id, s.RemoteAddr.String())
return nil
}
_, err = s.UpdateNode(s.id, s.RemoteAddr.String())
if err != nil {
return err
}
return nil
}(); err != nil {
2015-08-05 16:25:55 +00:00
// If we get an error, the cluster has not stabilized so just try again
2015-08-05 20:48:30 +00:00
time.Sleep(DefaultSyncNodeDelay)
continue
}
return nil
}
}
// loadState sets the appropriate raftState from our persistent storage
func (s *Store) loadState() error {
peers, err := readPeersJSON(filepath.Join(s.path, "peers.json"))
if err != nil {
return err
}
// If we have existing peers, use those. This will override what's in the
// config.
if len(peers) > 0 {
s.peers = peers
}
// if no peers on disk, we need to start raft in order to initialize a new
// cluster or join an existing one.
if len(peers) == 0 {
s.raftState = &localRaft{store: s}
// if we have a raft database, (maybe restored), we should start raft locally
} else if _, err := os.Stat(filepath.Join(s.path, "raft.db")); err == nil {
s.raftState = &localRaft{store: s}
// otherwise, we should use remote raft
} else {
s.raftState = &remoteRaft{store: s}
}
return nil
}
func (s *Store) joinCluster() error {
// No join options, so nothing to do
if len(s.peers) == 0 {
return nil
}
// We already have a node ID so were already part of a cluster,
// don't join again so we can use our existing state.
if s.id != 0 {
s.Logger.Printf("Skipping cluster join: already member of cluster: nodeId=%v raftEnabled=%v peers=%v",
s.id, raft.PeerContained(s.peers, s.RemoteAddr.String()), s.peers)
return nil
}
s.Logger.Printf("Joining cluster at: %v", s.peers)
for {
for _, join := range s.peers {
res, err := s.rpc.join(s.RemoteAddr.String(), join)
if err != nil {
2015-08-13 22:58:25 +00:00
s.Logger.Printf("Join node %v failed: %v: retrying...", join, err)
continue
}
s.Logger.Printf("Joined remote node %v", join)
s.Logger.Printf("nodeId=%v raftEnabled=%v peers=%v", res.NodeID, res.RaftEnabled, res.RaftNodes)
s.peers = res.RaftNodes
s.id = res.NodeID
if err := s.writeNodeID(res.NodeID); err != nil {
s.Logger.Printf("Write node id failed: %v", err)
break
}
if !res.RaftEnabled {
// Shutdown our local raft and transition to a remote raft state
if err := s.enableRemoteRaft(); err != nil {
s.Logger.Printf("Enable remote raft failed: %v", err)
break
}
}
return nil
}
time.Sleep(time.Second)
}
}
func (s *Store) enableLocalRaft() error {
if _, ok := s.raftState.(*localRaft); ok {
return nil
}
s.Logger.Printf("Switching to local raft")
lr := &localRaft{store: s}
return s.changeState(lr)
}
func (s *Store) enableRemoteRaft() error {
if _, ok := s.raftState.(*remoteRaft); ok {
return nil
}
s.Logger.Printf("Switching to remote raft")
rr := &remoteRaft{store: s}
return s.changeState(rr)
}
func (s *Store) changeState(state raftState) error {
if err := s.raftState.close(); err != nil {
return err
}
// Clear out any persistent state
if err := s.raftState.remove(); err != nil {
return err
}
s.raftState = state
if err := s.raftState.open(); err != nil {
return err
}
return nil
}
// openRaft initializes the raft store.
func (s *Store) openRaft() error {
2015-07-23 16:49:43 +00:00
return s.raftState.open()
}
// initialize attempts to bootstrap the raft store if there are no committed entries.
func (s *Store) initialize() error {
2015-07-17 16:50:33 +00:00
return s.raftState.initialize()
}
// Close closes the store and shuts down the node in the cluster.
func (s *Store) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.close()
}
// WaitForDataChanged will block the current goroutine until the metastore index has
// be updated.
func (s *Store) WaitForDataChanged() error {
2015-07-27 21:30:22 +00:00
s.mu.RLock()
changed := s.changed
s.mu.RUnlock()
for {
select {
case <-s.closing:
return errors.New("closing")
2015-07-27 21:30:22 +00:00
case <-changed:
return nil
}
}
}
func (s *Store) close() error {
// Check if store has already been closed.
2015-05-27 19:41:58 +00:00
if !s.opened {
2015-05-21 20:06:01 +00:00
return ErrStoreClosed
}
2015-05-27 19:41:58 +00:00
s.opened = false
// Notify goroutines of close.
close(s.closing)
// FIXME(benbjohnson): s.wg.Wait()
if s.raftState != nil {
2015-07-17 16:50:33 +00:00
s.raftState.close()
s.raftState = nil
}
return nil
}
2015-06-01 17:20:57 +00:00
// readID reads the local node ID from the ID file.
func (s *Store) readID() error {
b, err := ioutil.ReadFile(s.IDPath())
if os.IsNotExist(err) {
s.id = 0
return nil
} else if err != nil {
return fmt.Errorf("read file: %s", err)
}
id, err := strconv.ParseUint(string(b), 10, 64)
if err != nil {
return fmt.Errorf("parse id: %s", err)
}
s.id = id
return nil
}
// init initializes the store in a separate goroutine.
// This occurs when the store first creates or joins a cluster.
// The ready channel is closed once the store is initialized.
func (s *Store) init() {
// Create a node for this store.
if err := s.createLocalNode(); err != nil {
s.err <- fmt.Errorf("create local node: %s", err)
return
}
// Notify the ready channel.
close(s.ready)
}
2015-06-01 17:20:57 +00:00
// createLocalNode creates the node for this local instance.
// Writes the id of the node to file on success.
func (s *Store) createLocalNode() error {
// Wait for leader.
if err := s.WaitForLeader(0); err != nil {
return fmt.Errorf("wait for leader: %s", err)
}
2015-06-01 17:20:57 +00:00
// Create new node.
ni, err := s.CreateNode(s.RemoteAddr.String())
2015-06-01 17:20:57 +00:00
if err != nil {
return fmt.Errorf("create node: %s", err)
}
// Write node id to file.
if err := s.writeNodeID(ni.ID); err != nil {
2015-06-01 17:20:57 +00:00
return fmt.Errorf("write file: %s", err)
}
// Set ID locally.
s.id = ni.ID
s.Logger.Printf("Created local node: id=%d, host=%s", s.id, s.RemoteAddr)
2015-06-01 17:20:57 +00:00
return nil
}
func (s *Store) createRootDir() error {
return os.MkdirAll(s.path, 0777)
}
func (s *Store) writeNodeID(id uint64) error {
if err := s.createRootDir(); err != nil {
return err
}
return ioutil.WriteFile(s.IDPath(), []byte(strconv.FormatUint(id, 10)), 0666)
}
2015-06-25 23:52:47 +00:00
// Snapshot saves a snapshot of the current state.
func (s *Store) Snapshot() error {
2015-07-17 18:43:42 +00:00
return s.raftState.snapshot()
2015-06-25 23:52:47 +00:00
}
// WaitForLeader sleeps until a leader is found or a timeout occurs.
// timeout == 0 means to wait forever.
func (s *Store) WaitForLeader(timeout time.Duration) error {
// Begin timeout timer.
timer := time.NewTimer(timeout)
defer timer.Stop()
// Continually check for leader until timeout.
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-s.closing:
return errors.New("closing")
case <-timer.C:
if timeout != 0 {
return errors.New("timeout")
}
case <-ticker.C:
if s.Leader() != "" {
return nil
}
}
}
}
// Ready returns a channel that is closed once the store is initialized.
func (s *Store) Ready() <-chan struct{} { return s.ready }
// Err returns a channel for all out-of-band errors.
func (s *Store) Err() <-chan error { return s.err }
// IsLeader returns true if the store is currently the leader.
2015-06-06 00:49:12 +00:00
func (s *Store) IsLeader() bool {
s.mu.RLock()
defer s.mu.RUnlock()
if s.raftState == nil {
return false
}
2015-07-17 17:50:06 +00:00
return s.raftState.isLeader()
2015-06-06 00:49:12 +00:00
}
2015-06-11 07:03:10 +00:00
// Leader returns what the store thinks is the current leader. An empty
// string indicates no leader exists.
func (s *Store) Leader() string {
s.mu.RLock()
defer s.mu.RUnlock()
if s.raftState == nil {
return ""
}
2015-07-17 16:50:33 +00:00
return s.raftState.leader()
2015-06-11 07:03:10 +00:00
}
// SetPeers sets a list of peers in the cluster.
func (s *Store) SetPeers(addrs []string) error {
2015-07-17 18:03:04 +00:00
return s.raftState.setPeers(addrs)
}
// AddPeer adds addr to the list of peers in the cluster.
func (s *Store) AddPeer(addr string) error {
2015-07-17 18:09:07 +00:00
return s.raftState.addPeer(addr)
}
// Peers returns the list of peers in the cluster.
func (s *Store) Peers() ([]string, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.raftState.peers()
}
// serveExecListener processes remote exec connections.
// This function runs in a separate goroutine.
func (s *Store) serveExecListener() {
defer s.wg.Done()
for {
// Accept next TCP connection.
conn, err := s.ExecListener.Accept()
if err != nil {
2015-06-06 03:58:54 +00:00
if strings.Contains(err.Error(), "connection closed") {
return
}
s.Logger.Printf("temporary accept error: %s", err)
continue
}
// Handle connection in a separate goroutine.
s.wg.Add(1)
go s.handleExecConn(conn)
}
}
// handleExecConn reads a command from the connection and executes it.
func (s *Store) handleExecConn(conn net.Conn) {
defer s.wg.Done()
// Nodes not part of the raft cluster may initiate remote exec commands
// but may not know who the current leader of the cluster. If we are not
// the leader, proxy the request to the current leader.
if !s.IsLeader() {
if s.Leader() == s.RemoteAddr.String() {
s.Logger.Printf("No leader")
return
}
leaderConn, err := net.DialTimeout("tcp", s.Leader(), 10*time.Second)
if err != nil {
s.Logger.Printf("Dial leader: %v", err)
return
}
defer leaderConn.Close()
leaderConn.Write([]byte{MuxExecHeader})
if err := proxy(leaderConn.(*net.TCPConn), conn.(*net.TCPConn)); err != nil {
s.Logger.Printf("Leader proxy error: %v", err)
}
conn.Close()
return
}
// Read and execute command.
err := func() error {
// Read marker message.
b := make([]byte, 4)
if _, err := io.ReadFull(conn, b); err != nil {
return fmt.Errorf("read magic: %s", err)
} else if string(b) != ExecMagic {
return fmt.Errorf("invalid exec magic: %q", string(b))
}
// Read command size.
var sz uint64
if err := binary.Read(conn, binary.BigEndian, &sz); err != nil {
return fmt.Errorf("read size: %s", err)
}
// Read command.
buf := make([]byte, sz)
if _, err := io.ReadFull(conn, buf); err != nil {
return fmt.Errorf("read command: %s", err)
}
// Ensure command can be deserialized before applying.
if err := proto.Unmarshal(buf, &internal.Command{}); err != nil {
return fmt.Errorf("unable to unmarshal command: %s", err)
}
// Apply against the raft log.
if err := s.apply(buf); err != nil {
2015-09-30 04:52:52 +00:00
return err
}
return nil
}()
// Build response message.
var resp internal.Response
resp.OK = proto.Bool(err == nil)
2015-07-17 18:34:39 +00:00
resp.Index = proto.Uint64(s.raftState.lastIndex())
if err != nil {
resp.Error = proto.String(err.Error())
}
// Encode response back to connection.
if b, err := proto.Marshal(&resp); err != nil {
panic(err)
} else if err = binary.Write(conn, binary.BigEndian, uint64(len(b))); err != nil {
s.Logger.Printf("Unable to write exec response size: %s", err)
} else if _, err = conn.Write(b); err != nil {
s.Logger.Printf("Unable to write exec response: %s", err)
}
conn.Close()
}
// serveRPCListener processes remote exec connections.
// This function runs in a separate goroutine.
func (s *Store) serveRPCListener() {
defer s.wg.Done()
for {
// Accept next TCP connection.
conn, err := s.RPCListener.Accept()
if err != nil {
if strings.Contains(err.Error(), "connection closed") {
return
} else {
s.Logger.Printf("temporary accept error: %s", err)
continue
}
}
// Handle connection in a separate goroutine.
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.rpc.handleRPCConn(conn)
}()
}
}
// MarshalBinary encodes the store's data to a binary protobuf format.
func (s *Store) MarshalBinary() ([]byte, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.data.MarshalBinary()
}
// ClusterID returns the unique identifier for the cluster.
// This is generated once a node has been created.
func (s *Store) ClusterID() (id uint64, err error) {
err = s.read(func(data *Data) error {
id = data.ClusterID
return nil
})
return
}
2015-06-01 17:20:57 +00:00
// NodeID returns the identifier for the local node.
// Panics if the node has not joined the cluster.
func (s *Store) NodeID() uint64 { return s.id }
2015-05-21 20:06:01 +00:00
// Node returns a node by id.
func (s *Store) Node(id uint64) (ni *NodeInfo, err error) {
err = s.read(func(data *Data) error {
ni = data.Node(id)
if ni == nil {
return errInvalidate
}
return nil
})
return
}
2015-05-21 20:06:01 +00:00
// NodeByHost returns a node by hostname.
func (s *Store) NodeByHost(host string) (ni *NodeInfo, err error) {
err = s.read(func(data *Data) error {
ni = data.NodeByHost(host)
if ni == nil {
return errInvalidate
}
return nil
})
return
}
2015-05-27 19:41:58 +00:00
// Nodes returns a list of all nodes.
func (s *Store) Nodes() (a []NodeInfo, err error) {
err = s.read(func(data *Data) error {
a = data.Nodes
return nil
})
return
}
// CreateNode creates a new node in the store.
func (s *Store) CreateNode(host string) (*NodeInfo, error) {
2015-05-21 20:06:01 +00:00
if err := s.exec(internal.Command_CreateNodeCommand, internal.E_CreateNodeCommand_Command,
&internal.CreateNodeCommand{
Host: proto.String(host),
Rand: proto.Uint64(uint64(rand.Int63())),
},
); err != nil {
return nil, err
}
return s.NodeByHost(host)
}
2015-08-05 16:25:55 +00:00
// UpdateNode updates an existing node in the store.
2015-07-28 20:22:32 +00:00
func (s *Store) UpdateNode(id uint64, host string) (*NodeInfo, error) {
if err := s.exec(internal.Command_UpdateNodeCommand, internal.E_UpdateNodeCommand_Command,
&internal.UpdateNodeCommand{
ID: proto.Uint64(id),
Host: proto.String(host),
},
); err != nil {
return nil, err
}
return s.NodeByHost(host)
}
2015-05-21 20:06:01 +00:00
// DeleteNode removes a node from the metastore by id.
2015-10-03 02:49:11 +00:00
func (s *Store) DeleteNode(id uint64, force bool) error {
ni := s.data.Node(id)
if ni == nil {
return ErrNodeNotFound
}
return s.exec(internal.Command_DeleteNodeCommand, internal.E_DeleteNodeCommand_Command,
2015-05-21 20:06:01 +00:00
&internal.DeleteNodeCommand{
2015-10-03 02:49:11 +00:00
ID: proto.Uint64(id),
Force: proto.Bool(force),
2015-09-29 21:10:47 +00:00
},
)
2015-09-25 18:54:11 +00:00
}
2015-05-21 20:06:01 +00:00
// Database returns a database by name.
func (s *Store) Database(name string) (di *DatabaseInfo, err error) {
err = s.read(func(data *Data) error {
di = data.Database(name)
if di == nil {
return errInvalidate
}
return nil
})
return
}
2015-05-27 19:41:58 +00:00
// Databases returns a list of all databases.
func (s *Store) Databases() (dis []DatabaseInfo, err error) {
err = s.read(func(data *Data) error {
dis = data.Databases
return nil
})
return
}
2015-05-21 20:06:01 +00:00
// CreateDatabase creates a new database in the store.
func (s *Store) CreateDatabase(name string) (*DatabaseInfo, error) {
if err := s.exec(internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command,
&internal.CreateDatabaseCommand{
Name: proto.String(name),
},
); err != nil {
return nil, err
}
s.Logger.Printf("database '%s' created", name)
2015-06-07 00:03:52 +00:00
if s.retentionAutoCreate {
// Read node count.
// Retention policies must be fully replicated.
var nodeN int
if err := s.read(func(data *Data) error {
nodeN = len(data.Nodes)
return nil
}); err != nil {
return nil, fmt.Errorf("read: %s", err)
}
if nodeN > MaxAutoCreatedRetentionPolicyReplicaN {
nodeN = MaxAutoCreatedRetentionPolicyReplicaN
}
// Create a retention policy.
2015-06-07 00:03:52 +00:00
rpi := NewRetentionPolicyInfo(AutoCreateRetentionPolicyName)
rpi.ReplicaN = nodeN
rpi.Duration = AutoCreateRetentionPolicyPeriod
if _, err := s.CreateRetentionPolicy(name, rpi); err != nil {
return nil, err
}
// Set it as the default retention policy.
2015-06-07 00:03:52 +00:00
if err := s.SetDefaultRetentionPolicy(name, AutoCreateRetentionPolicyName); err != nil {
return nil, err
}
}
2015-05-21 20:06:01 +00:00
return s.Database(name)
}
2015-05-27 19:41:58 +00:00
// CreateDatabaseIfNotExists creates a new database in the store if it doesn't already exist.
func (s *Store) CreateDatabaseIfNotExists(name string) (*DatabaseInfo, error) {
// Try to find database locally first.
if di, err := s.Database(name); err != nil {
return nil, err
} else if di != nil {
return di, nil
}
// Attempt to create database.
di, err := s.CreateDatabase(name)
if err == ErrDatabaseExists {
2015-05-27 19:41:58 +00:00
return s.Database(name)
}
return di, err
2015-05-27 19:41:58 +00:00
}
2015-05-21 20:06:01 +00:00
// DropDatabase removes a database from the metastore by name.
func (s *Store) DropDatabase(name string) error {
return s.exec(internal.Command_DropDatabaseCommand, internal.E_DropDatabaseCommand_Command,
&internal.DropDatabaseCommand{
Name: proto.String(name),
},
)
}
// RetentionPolicy returns a retention policy for a database by name.
func (s *Store) RetentionPolicy(database, name string) (rpi *RetentionPolicyInfo, err error) {
err = s.read(func(data *Data) error {
rpi, err = data.RetentionPolicy(database, name)
if err != nil {
return err
} else if rpi == nil {
return errInvalidate
}
return nil
})
return
}
2015-05-27 19:41:58 +00:00
// DefaultRetentionPolicy returns the default retention policy for a database.
func (s *Store) DefaultRetentionPolicy(database string) (rpi *RetentionPolicyInfo, err error) {
err = s.read(func(data *Data) error {
di := data.Database(database)
if di == nil {
return ErrDatabaseNotFound
}
for i := range di.RetentionPolicies {
if di.RetentionPolicies[i].Name == di.DefaultRetentionPolicy {
rpi = &di.RetentionPolicies[i]
return nil
}
}
return errInvalidate
})
return
}
// RetentionPolicies returns a list of all retention policies for a database.
func (s *Store) RetentionPolicies(database string) (a []RetentionPolicyInfo, err error) {
err = s.read(func(data *Data) error {
di := data.Database(database)
if di != nil {
return ErrDatabaseNotFound
}
a = di.RetentionPolicies
return nil
})
return
}
2015-05-21 20:06:01 +00:00
// CreateRetentionPolicy creates a new retention policy for a database.
func (s *Store) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) (*RetentionPolicyInfo, error) {
if rpi.Duration < RetentionPolicyMinDuration && rpi.Duration != 0 {
return nil, ErrRetentionPolicyDurationTooLow
}
2015-05-21 20:06:01 +00:00
if err := s.exec(internal.Command_CreateRetentionPolicyCommand, internal.E_CreateRetentionPolicyCommand_Command,
&internal.CreateRetentionPolicyCommand{
Database: proto.String(database),
RetentionPolicy: rpi.marshal(),
2015-05-21 20:06:01 +00:00
},
); err != nil {
return nil, err
}
s.Logger.Printf("retention policy '%s' for database '%s' created", rpi.Name, database)
2015-05-21 20:06:01 +00:00
return s.RetentionPolicy(database, rpi.Name)
}
2015-05-27 19:41:58 +00:00
// CreateRetentionPolicyIfNotExists creates a new policy in the store if it doesn't already exist.
func (s *Store) CreateRetentionPolicyIfNotExists(database string, rpi *RetentionPolicyInfo) (*RetentionPolicyInfo, error) {
// Try to find policy locally first.
if rpi, err := s.RetentionPolicy(database, rpi.Name); err != nil {
return nil, err
} else if rpi != nil {
return rpi, nil
}
// Attempt to create policy.
other, err := s.CreateRetentionPolicy(database, rpi)
if err == ErrRetentionPolicyExists {
2015-05-27 19:41:58 +00:00
return s.RetentionPolicy(database, rpi.Name)
}
return other, err
2015-05-27 19:41:58 +00:00
}
2015-05-21 20:06:01 +00:00
// SetDefaultRetentionPolicy sets the default retention policy for a database.
func (s *Store) SetDefaultRetentionPolicy(database, name string) error {
return s.exec(internal.Command_SetDefaultRetentionPolicyCommand, internal.E_SetDefaultRetentionPolicyCommand_Command,
&internal.SetDefaultRetentionPolicyCommand{
Database: proto.String(database),
Name: proto.String(name),
},
)
}
// UpdateRetentionPolicy updates an existing retention policy.
func (s *Store) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error {
var newName *string
if rpu.Name != nil {
newName = rpu.Name
}
var duration *int64
if rpu.Duration != nil {
value := int64(*rpu.Duration)
duration = &value
}
var replicaN *uint32
if rpu.ReplicaN != nil {
2015-05-21 20:06:01 +00:00
value := uint32(*rpu.ReplicaN)
replicaN = &value
}
return s.exec(internal.Command_UpdateRetentionPolicyCommand, internal.E_UpdateRetentionPolicyCommand_Command,
&internal.UpdateRetentionPolicyCommand{
Database: proto.String(database),
Name: proto.String(name),
NewName: newName,
Duration: duration,
ReplicaN: replicaN,
},
)
}
// DropRetentionPolicy removes a policy from a database by name.
func (s *Store) DropRetentionPolicy(database, name string) error {
return s.exec(internal.Command_DropRetentionPolicyCommand, internal.E_DropRetentionPolicyCommand_Command,
&internal.DropRetentionPolicyCommand{
Database: proto.String(database),
Name: proto.String(name),
},
)
}
// CreateShardGroup creates a new shard group in a retention policy for a given time.
func (s *Store) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
if err := s.exec(internal.Command_CreateShardGroupCommand, internal.E_CreateShardGroupCommand_Command,
&internal.CreateShardGroupCommand{
Database: proto.String(database),
Policy: proto.String(policy),
Timestamp: proto.Int64(timestamp.UnixNano()),
},
); err != nil {
return nil, err
}
return s.ShardGroupByTimestamp(database, policy, timestamp)
}
2015-05-27 19:41:58 +00:00
// CreateShardGroupIfNotExists creates a new shard group if one doesn't already exist.
func (s *Store) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
// Try to find shard group locally first.
if sgi, err := s.ShardGroupByTimestamp(database, policy, timestamp); err != nil {
return nil, err
2015-06-04 22:50:33 +00:00
} else if sgi != nil && !sgi.Deleted() {
2015-05-27 19:41:58 +00:00
return sgi, nil
}
// Attempt to create database.
sgi, err := s.CreateShardGroup(database, policy, timestamp)
if err == ErrShardGroupExists {
2015-05-27 19:41:58 +00:00
return s.ShardGroupByTimestamp(database, policy, timestamp)
}
return sgi, err
2015-05-27 19:41:58 +00:00
}
2015-05-21 20:06:01 +00:00
// DeleteShardGroup removes an existing shard group from a policy by ID.
func (s *Store) DeleteShardGroup(database, policy string, id uint64) error {
return s.exec(internal.Command_DeleteShardGroupCommand, internal.E_DeleteShardGroupCommand_Command,
&internal.DeleteShardGroupCommand{
Database: proto.String(database),
Policy: proto.String(policy),
ShardGroupID: proto.Uint64(id),
},
)
}
2015-05-27 19:41:58 +00:00
// ShardGroups returns a list of all shard groups for a policy by timestamp.
func (s *Store) ShardGroups(database, policy string) (a []ShardGroupInfo, err error) {
err = s.read(func(data *Data) error {
a, err = data.ShardGroups(database, policy)
if err != nil {
return err
}
return nil
})
return
}
// ShardGroupsByTimeRange returns a slice of ShardGroups that may contain data for the given time range. ShardGroups
// are sorted by start time.
func (s *Store) ShardGroupsByTimeRange(database, policy string, tmin, tmax time.Time) (a []ShardGroupInfo, err error) {
err = s.read(func(data *Data) error {
a, err = data.ShardGroupsByTimeRange(database, policy, tmin, tmax)
if err != nil {
return err
} else if a == nil {
return errInvalidate
}
return nil
})
return
}
// VisitRetentionPolicies calls the given function with full retention policy details.
func (s *Store) VisitRetentionPolicies(f func(d DatabaseInfo, r RetentionPolicyInfo)) {
s.read(func(data *Data) error {
for _, di := range data.Databases {
for _, rp := range di.RetentionPolicies {
f(di, rp)
}
}
return nil
})
return
}
2015-05-21 20:06:01 +00:00
// ShardGroupByTimestamp returns a shard group for a policy by timestamp.
func (s *Store) ShardGroupByTimestamp(database, policy string, timestamp time.Time) (sgi *ShardGroupInfo, err error) {
err = s.read(func(data *Data) error {
sgi, err = data.ShardGroupByTimestamp(database, policy, timestamp)
if err != nil {
return err
} else if sgi == nil {
return errInvalidate
}
return nil
})
return
}
func (s *Store) ShardOwner(shardID uint64) (database, policy string, sgi *ShardGroupInfo) {
s.read(func(data *Data) error {
for _, dbi := range data.Databases {
for _, rpi := range dbi.RetentionPolicies {
for _, g := range rpi.ShardGroups {
if g.Deleted() {
continue
}
for _, sh := range g.Shards {
if sh.ID == shardID {
database = dbi.Name
policy = rpi.Name
sgi = &g
return nil
}
}
}
}
}
return errInvalidate
})
return
}
2015-05-21 20:06:01 +00:00
// CreateContinuousQuery creates a new continuous query on the store.
func (s *Store) CreateContinuousQuery(database, name, query string) error {
2015-05-21 20:06:01 +00:00
return s.exec(internal.Command_CreateContinuousQueryCommand, internal.E_CreateContinuousQueryCommand_Command,
&internal.CreateContinuousQueryCommand{
Database: proto.String(database),
Name: proto.String(name),
Query: proto.String(query),
2015-05-21 20:06:01 +00:00
},
)
}
// DropContinuousQuery removes a continuous query from the store.
func (s *Store) DropContinuousQuery(database, name string) error {
2015-05-21 20:06:01 +00:00
return s.exec(internal.Command_DropContinuousQueryCommand, internal.E_DropContinuousQueryCommand_Command,
&internal.DropContinuousQueryCommand{
Database: proto.String(database),
Name: proto.String(name),
2015-05-21 20:06:01 +00:00
},
)
}
// User returns a user by name.
func (s *Store) User(name string) (ui *UserInfo, err error) {
err = s.read(func(data *Data) error {
ui = data.User(name)
if ui == nil {
return errInvalidate
}
return nil
})
return
}
// Users returns a list of all users.
func (s *Store) Users() (a []UserInfo, err error) {
err = s.read(func(data *Data) error {
a = data.Users
return nil
})
return
}
2015-05-27 19:41:58 +00:00
// AdminUserExists returns true if an admin user exists on the system.
func (s *Store) AdminUserExists() (exists bool, err error) {
err = s.read(func(data *Data) error {
for i := range data.Users {
if data.Users[i].Admin {
exists = true
break
}
}
return nil
})
return
}
2015-06-29 17:37:36 +00:00
// ErrAuthenticate is returned when authentication fails.
var ErrAuthenticate = errors.New("authentication failed")
2015-05-27 19:41:58 +00:00
// Authenticate retrieves a user with a matching username and password.
func (s *Store) Authenticate(username, password string) (ui *UserInfo, err error) {
err = s.read(func(data *Data) error {
s.mu.Lock()
defer s.mu.Unlock()
2015-05-27 19:41:58 +00:00
// Find user.
u := data.User(username)
if u == nil {
return ErrUserNotFound
}
2015-06-26 19:30:00 +00:00
// Check the local auth cache first.
if au, ok := s.authCache[username]; ok {
// verify the password using the cached salt and hash
hashed, err := s.hashWithSalt(au.salt, password)
if err != nil {
return err
}
if bytes.Equal(hashed, au.hash) {
2015-06-29 17:37:36 +00:00
ui = u
return nil
}
return ErrAuthenticate
2015-06-26 19:30:00 +00:00
}
2015-05-27 19:41:58 +00:00
// Compare password with user hash.
if err := bcrypt.CompareHashAndPassword([]byte(u.Hash), []byte(password)); err != nil {
2015-06-29 17:37:36 +00:00
return ErrAuthenticate
2015-05-27 19:41:58 +00:00
}
// generate a salt and hash of the password for the cache
salt, hashed, err := s.saltedHash(password)
if err != nil {
return err
}
s.authCache[username] = authUser{salt: salt, hash: hashed}
2015-06-26 19:30:00 +00:00
2015-05-27 19:41:58 +00:00
ui = u
return nil
})
return
}
// hashWithSalt returns a salted hash of password using salt
func (s *Store) hashWithSalt(salt []byte, password string) ([]byte, error) {
hasher := sha256.New()
hasher.Write(append(salt, []byte(password)...))
return hasher.Sum(nil), nil
}
// saltedHash returns a salt and salted hash of password
func (s *Store) saltedHash(password string) (salt, hash []byte, err error) {
salt = make([]byte, SaltBytes)
_, err = io.ReadFull(crand.Reader, salt)
if err != nil {
return
}
hash, err = s.hashWithSalt(salt, password)
return
}
2015-05-21 20:06:01 +00:00
// CreateUser creates a new user in the store.
func (s *Store) CreateUser(name, password string, admin bool) (*UserInfo, error) {
// Hash the password before serializing it.
2015-06-29 17:37:36 +00:00
hash, err := s.hashPassword(password)
2015-05-21 20:06:01 +00:00
if err != nil {
return nil, err
}
2015-05-21 20:06:01 +00:00
// Serialize command and send it to the leader.
if err := s.exec(internal.Command_CreateUserCommand, internal.E_CreateUserCommand_Command,
&internal.CreateUserCommand{
Name: proto.String(name),
Hash: proto.String(string(hash)),
Admin: proto.Bool(admin),
},
); err != nil {
return nil, err
}
return s.User(name)
}
2015-05-21 20:06:01 +00:00
// DropUser removes a user from the metastore by name.
func (s *Store) DropUser(name string) error {
return s.exec(internal.Command_DropUserCommand, internal.E_DropUserCommand_Command,
&internal.DropUserCommand{
Name: proto.String(name),
},
)
}
2015-05-21 20:06:01 +00:00
// UpdateUser updates an existing user in the store.
func (s *Store) UpdateUser(name, password string) error {
// Hash the password before serializing it.
2015-06-29 17:37:36 +00:00
hash, err := s.hashPassword(password)
2015-05-21 20:06:01 +00:00
if err != nil {
return err
}
2015-05-21 20:06:01 +00:00
// Serialize command and send it to the leader.
return s.exec(internal.Command_UpdateUserCommand, internal.E_UpdateUserCommand_Command,
&internal.UpdateUserCommand{
Name: proto.String(name),
Hash: proto.String(string(hash)),
},
)
}
2015-05-27 19:41:58 +00:00
// SetPrivilege sets a privilege for a user on a database.
func (s *Store) SetPrivilege(username, database string, p influxql.Privilege) error {
return s.exec(internal.Command_SetPrivilegeCommand, internal.E_SetPrivilegeCommand_Command,
&internal.SetPrivilegeCommand{
Username: proto.String(username),
Database: proto.String(database),
Privilege: proto.Int32(int32(p)),
},
)
}
// SetAdminPrivilege sets the admin privilege for a user on a database.
func (s *Store) SetAdminPrivilege(username string, admin bool) error {
return s.exec(internal.Command_SetAdminPrivilegeCommand, internal.E_SetAdminPrivilegeCommand_Command,
&internal.SetAdminPrivilegeCommand{
Username: proto.String(username),
Admin: proto.Bool(admin),
},
)
}
// UserPrivileges returns a list of all databases.
func (s *Store) UserPrivileges(username string) (p map[string]influxql.Privilege, err error) {
err = s.read(func(data *Data) error {
p, err = data.UserPrivileges(username)
return err
})
return
}
// UserPrivilege returns the privilege for a database.
func (s *Store) UserPrivilege(username, database string) (p *influxql.Privilege, err error) {
err = s.read(func(data *Data) error {
p, err = data.UserPrivilege(username, database)
return err
})
return
}
// UserCount returns the number of users defined in the cluster.
func (s *Store) UserCount() (count int, err error) {
err = s.read(func(data *Data) error {
count = len(data.Users)
return nil
})
return
}
// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but
// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data
// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation
// avoids taking the hit at write-time.
func (s *Store) PrecreateShardGroups(from, to time.Time) error {
s.read(func(data *Data) error {
for _, di := range data.Databases {
for _, rp := range di.RetentionPolicies {
if len(rp.ShardGroups) == 0 {
// No data was ever written to this group, or all groups have been deleted.
continue
}
g := rp.ShardGroups[len(rp.ShardGroups)-1] // Get the last group in time.
if !g.Deleted() && g.EndTime.Before(to) && g.EndTime.After(from) {
// Group is not deleted, will end before the future time, but is still yet to expire.
// This last check is important, so the system doesn't create shards groups wholly
// in the past.
// Create successive shard group.
nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)
if newGroup, err := s.CreateShardGroupIfNotExists(di.Name, rp.Name, nextShardGroupTime); err != nil {
s.Logger.Printf("failed to create successive shard group for group %d: %s",
g.ID, err.Error())
} else {
s.Logger.Printf("new shard group %d successfully created for database %s, retention policy %s",
newGroup.ID, di.Name, rp.Name)
}
}
}
}
return nil
})
return nil
}
// SetData force overwrites the root data.
// This should only be used when restoring a snapshot.
func (s *Store) SetData(data *Data) error {
return s.exec(internal.Command_SetDataCommand, internal.E_SetDataCommand_Command,
&internal.SetDataCommand{
Data: data.marshal(),
},
)
}
2015-05-21 20:06:01 +00:00
// read executes a function with the current metadata.
// If an error is returned then the cache is invalidated and retried.
//
// The error returned by the retry is passed through to the original caller
// unless the error is errInvalidate. A nil error is passed through when
// errInvalidate is returned.
func (s *Store) read(fn func(*Data) error) error {
// First use the cached metadata.
s.mu.RLock()
data := s.data
s.mu.RUnlock()
2015-05-21 20:06:01 +00:00
// Execute fn against cached data.
// Return immediately if there was no error.
if err := fn(data); err == nil {
return nil
}
2015-05-21 20:06:01 +00:00
// If an error occurred then invalidate cache and retry.
if err := s.invalidate(); err != nil {
return err
}
// Re-read the metadata.
s.mu.RLock()
data = s.data
s.mu.RUnlock()
// Passthrough error unless it is a cache invalidation.
if err := fn(data); err != nil && err != errInvalidate {
return err
}
2015-05-21 20:06:01 +00:00
return nil
}
// errInvalidate is returned to read() when the cache should be invalidated
// but an error should not be passed through to the caller.
var errInvalidate = errors.New("invalidate cache")
func (s *Store) invalidate() error {
2015-07-17 16:50:33 +00:00
return s.raftState.invalidate()
2015-05-21 20:06:01 +00:00
}
func (s *Store) exec(typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {
// Create command.
cmd := &internal.Command{Type: &typ}
2015-05-21 20:06:01 +00:00
err := proto.SetExtension(cmd, desc, value)
assert(err == nil, "proto.SetExtension: %s", err)
// Marshal to a byte slice.
b, err := proto.Marshal(cmd)
2015-05-21 20:06:01 +00:00
assert(err == nil, "proto.Marshal: %s", err)
// Apply the command if this is the leader.
// Otherwise remotely execute the command against the current leader.
if s.raftState.isLeader() {
return s.apply(b)
}
return s.remoteExec(b)
}
// apply applies a serialized command to the raft log.
func (s *Store) apply(b []byte) error {
2015-07-17 18:39:13 +00:00
return s.raftState.apply(b)
}
// remoteExec sends an encoded command to the remote leader.
func (s *Store) remoteExec(b []byte) error {
// Retrieve the current known leader.
2015-07-17 16:50:33 +00:00
leader := s.raftState.leader()
if leader == "" {
return errors.New("no leader")
}
// Create a connection to the leader.
conn, err := net.DialTimeout("tcp", leader, 10*time.Second)
if err != nil {
return err
}
defer conn.Close()
// Write a marker byte for exec messages.
_, err = conn.Write([]byte{MuxExecHeader})
if err != nil {
return err
}
// Write a marker message.
_, err = conn.Write([]byte(ExecMagic))
if err != nil {
return err
}
// Write command size & bytes.
if err := binary.Write(conn, binary.BigEndian, uint64(len(b))); err != nil {
return fmt.Errorf("write command size: %s", err)
} else if _, err := conn.Write(b); err != nil {
return fmt.Errorf("write command: %s", err)
}
// Read response bytes.
var sz uint64
if err := binary.Read(conn, binary.BigEndian, &sz); err != nil {
return fmt.Errorf("read response size: %s", err)
}
buf := make([]byte, sz)
if _, err := io.ReadFull(conn, buf); err != nil {
return fmt.Errorf("read response: %s", err)
}
// Unmarshal response.
var resp internal.Response
if err := proto.Unmarshal(buf, &resp); err != nil {
return fmt.Errorf("unmarshal response: %s", err)
} else if !resp.GetOK() {
2015-09-30 04:52:52 +00:00
return lookupError(fmt.Errorf(resp.GetError()))
}
// Wait for local FSM to sync to index.
if err := s.sync(resp.GetIndex(), 5*time.Second); err != nil {
return fmt.Errorf("sync: %s", err)
}
return nil
}
// sync polls the state machine until it reaches a given index.
func (s *Store) sync(index uint64, timeout time.Duration) error {
2015-07-17 16:50:33 +00:00
return s.raftState.sync(index, timeout)
}
func (s *Store) cachedData() *Data {
s.mu.RLock()
defer s.mu.RUnlock()
return s.data.Clone()
}
2015-06-29 17:37:36 +00:00
// BcryptCost is the cost associated with generating password with Bcrypt.
// This setting is lowered during testing to improve test suite performance.
var BcryptCost = 10
// HashPasswordFn represnets a password hashing function.
type HashPasswordFn func(password string) ([]byte, error)
// GetHashPasswordFn returns the current password hashing function.
func (s *Store) GetHashPasswordFn() HashPasswordFn {
s.mu.RLock()
defer s.mu.RUnlock()
2015-06-29 17:37:36 +00:00
return s.hashPassword
}
// SetHashPasswordFn sets the password hashing function.
func (s *Store) SetHashPasswordFn(fn HashPasswordFn) {
s.mu.Lock()
defer s.mu.Unlock()
s.hashPassword = fn
}
2015-05-21 20:06:01 +00:00
// storeFSM represents the finite state machine used by Store to interact with Raft.
type storeFSM Store
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
var cmd internal.Command
if err := proto.Unmarshal(l.Data, &cmd); err != nil {
panic(fmt.Errorf("cannot marshal command: %x", l.Data))
}
2015-05-21 20:06:01 +00:00
// Lock the store.
s := (*Store)(fsm)
s.mu.Lock()
defer s.mu.Unlock()
err := func() interface{} {
switch cmd.GetType() {
case internal.Command_CreateNodeCommand:
return fsm.applyCreateNodeCommand(&cmd)
case internal.Command_DeleteNodeCommand:
return fsm.applyDeleteNodeCommand(&cmd)
case internal.Command_CreateDatabaseCommand:
return fsm.applyCreateDatabaseCommand(&cmd)
case internal.Command_DropDatabaseCommand:
return fsm.applyDropDatabaseCommand(&cmd)
case internal.Command_CreateRetentionPolicyCommand:
return fsm.applyCreateRetentionPolicyCommand(&cmd)
case internal.Command_DropRetentionPolicyCommand:
return fsm.applyDropRetentionPolicyCommand(&cmd)
case internal.Command_SetDefaultRetentionPolicyCommand:
return fsm.applySetDefaultRetentionPolicyCommand(&cmd)
case internal.Command_UpdateRetentionPolicyCommand:
return fsm.applyUpdateRetentionPolicyCommand(&cmd)
case internal.Command_CreateShardGroupCommand:
return fsm.applyCreateShardGroupCommand(&cmd)
case internal.Command_DeleteShardGroupCommand:
return fsm.applyDeleteShardGroupCommand(&cmd)
case internal.Command_CreateContinuousQueryCommand:
return fsm.applyCreateContinuousQueryCommand(&cmd)
case internal.Command_DropContinuousQueryCommand:
return fsm.applyDropContinuousQueryCommand(&cmd)
case internal.Command_CreateUserCommand:
return fsm.applyCreateUserCommand(&cmd)
case internal.Command_DropUserCommand:
return fsm.applyDropUserCommand(&cmd)
case internal.Command_UpdateUserCommand:
return fsm.applyUpdateUserCommand(&cmd)
case internal.Command_SetPrivilegeCommand:
return fsm.applySetPrivilegeCommand(&cmd)
case internal.Command_SetAdminPrivilegeCommand:
return fsm.applySetAdminPrivilegeCommand(&cmd)
case internal.Command_SetDataCommand:
return fsm.applySetDataCommand(&cmd)
2015-07-28 20:22:32 +00:00
case internal.Command_UpdateNodeCommand:
return fsm.applyUpdateNodeCommand(&cmd)
default:
panic(fmt.Errorf("cannot apply command: %x", l.Data))
}
}()
// Copy term and index to new metadata.
fsm.data.Term = l.Term
fsm.data.Index = l.Index
close(s.changed)
s.changed = make(chan struct{})
return err
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyCreateNodeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateNodeCommand_Command)
v := ext.(*internal.CreateNodeCommand)
// Copy data and update.
2015-05-21 20:06:01 +00:00
other := fsm.data.Clone()
if err := other.CreateNode(v.GetHost()); err != nil {
return err
}
// If the cluster ID hasn't been set then use the command's random number.
if other.ClusterID == 0 {
other.ClusterID = uint64(v.GetRand())
}
fsm.data = other
return nil
}
2015-07-28 20:22:32 +00:00
func (fsm *storeFSM) applyUpdateNodeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_UpdateNodeCommand_Command)
v := ext.(*internal.UpdateNodeCommand)
// Copy data and update.
other := fsm.data.Clone()
ni := other.Node(v.GetID())
if ni == nil {
return ErrNodeNotFound
}
ni.Host = v.GetHost()
fsm.data = other
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyDeleteNodeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DeleteNodeCommand_Command)
v := ext.(*internal.DeleteNodeCommand)
// Copy data and update.
other := fsm.data.Clone()
2015-10-03 02:49:11 +00:00
if err := other.DeleteNode(v.GetID(), v.GetForce()); err != nil {
2015-05-21 20:06:01 +00:00
return err
}
fsm.data = other
2015-10-03 02:49:11 +00:00
id := v.GetID()
fsm.Logger.Printf("node '%d' removed", id)
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyCreateDatabaseCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateDatabaseCommand_Command)
v := ext.(*internal.CreateDatabaseCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateDatabase(v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyDropDatabaseCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropDatabaseCommand_Command)
v := ext.(*internal.DropDatabaseCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropDatabase(v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyCreateRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateRetentionPolicyCommand_Command)
v := ext.(*internal.CreateRetentionPolicyCommand)
pb := v.GetRetentionPolicy()
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateRetentionPolicy(v.GetDatabase(),
&RetentionPolicyInfo{
Name: pb.GetName(),
ReplicaN: int(pb.GetReplicaN()),
Duration: time.Duration(pb.GetDuration()),
ShardGroupDuration: time.Duration(pb.GetShardGroupDuration()),
}); err != nil {
return err
}
fsm.data = other
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyDropRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropRetentionPolicyCommand_Command)
v := ext.(*internal.DropRetentionPolicyCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropRetentionPolicy(v.GetDatabase(), v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applySetDefaultRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_SetDefaultRetentionPolicyCommand_Command)
v := ext.(*internal.SetDefaultRetentionPolicyCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.SetDefaultRetentionPolicy(v.GetDatabase(), v.GetName()); err != nil {
return err
}
fsm.data = other
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyUpdateRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_UpdateRetentionPolicyCommand_Command)
v := ext.(*internal.UpdateRetentionPolicyCommand)
// Create update object.
rpu := RetentionPolicyUpdate{Name: v.NewName}
if v.Duration != nil {
value := time.Duration(v.GetDuration())
rpu.Duration = &value
}
if v.ReplicaN != nil {
value := int(v.GetReplicaN())
rpu.ReplicaN = &value
}
// Copy data and update.
other := fsm.data.Clone()
if err := other.UpdateRetentionPolicy(v.GetDatabase(), v.GetName(), &rpu); err != nil {
return err
}
fsm.data = other
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyCreateShardGroupCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateShardGroupCommand_Command)
v := ext.(*internal.CreateShardGroupCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateShardGroup(v.GetDatabase(), v.GetPolicy(), time.Unix(0, v.GetTimestamp())); err != nil {
return err
}
fsm.data = other
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyDeleteShardGroupCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DeleteShardGroupCommand_Command)
v := ext.(*internal.DeleteShardGroupCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DeleteShardGroup(v.GetDatabase(), v.GetPolicy(), v.GetShardGroupID()); err != nil {
return err
}
fsm.data = other
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyCreateContinuousQueryCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateContinuousQueryCommand_Command)
v := ext.(*internal.CreateContinuousQueryCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateContinuousQuery(v.GetDatabase(), v.GetName(), v.GetQuery()); err != nil {
2015-05-21 20:06:01 +00:00
return err
}
fsm.data = other
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyDropContinuousQueryCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropContinuousQueryCommand_Command)
v := ext.(*internal.DropContinuousQueryCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropContinuousQuery(v.GetDatabase(), v.GetName()); err != nil {
2015-05-21 20:06:01 +00:00
return err
}
fsm.data = other
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyCreateUserCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateUserCommand_Command)
v := ext.(*internal.CreateUserCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.CreateUser(v.GetName(), v.GetHash(), v.GetAdmin()); err != nil {
return err
}
fsm.data = other
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyDropUserCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_DropUserCommand_Command)
v := ext.(*internal.DropUserCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.DropUser(v.GetName()); err != nil {
return err
}
fsm.data = other
2015-06-26 19:30:00 +00:00
delete(fsm.authCache, v.GetName())
2015-05-21 20:06:01 +00:00
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applyUpdateUserCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_UpdateUserCommand_Command)
v := ext.(*internal.UpdateUserCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.UpdateUser(v.GetName(), v.GetHash()); err != nil {
return err
}
fsm.data = other
2015-06-26 19:30:00 +00:00
delete(fsm.authCache, v.GetName())
2015-05-21 20:06:01 +00:00
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) applySetPrivilegeCommand(cmd *internal.Command) interface{} {
2015-05-27 19:41:58 +00:00
ext, _ := proto.GetExtension(cmd, internal.E_SetPrivilegeCommand_Command)
v := ext.(*internal.SetPrivilegeCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.SetPrivilege(v.GetUsername(), v.GetDatabase(), influxql.Privilege(v.GetPrivilege())); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applySetAdminPrivilegeCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_SetAdminPrivilegeCommand_Command)
v := ext.(*internal.SetAdminPrivilegeCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.SetAdminPrivilege(v.GetUsername(), v.GetAdmin()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applySetDataCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_SetDataCommand_Command)
v := ext.(*internal.SetDataCommand)
// Overwrite data.
fsm.data = &Data{}
fsm.data.unmarshal(v.GetData())
return nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) Snapshot() (raft.FSMSnapshot, error) {
s := (*Store)(fsm)
s.mu.Lock()
defer s.mu.Unlock()
2015-05-21 20:06:01 +00:00
return &storeFSMSnapshot{Data: (*Store)(fsm).data}, nil
}
2015-05-21 20:06:01 +00:00
func (fsm *storeFSM) Restore(r io.ReadCloser) error {
// Read all bytes.
2015-06-25 23:52:47 +00:00
b, err := ioutil.ReadAll(r)
if err != nil {
return err
}
2015-06-25 23:52:47 +00:00
// Decode metadata.
data := &Data{}
if err := data.UnmarshalBinary(b); err != nil {
return err
}
// Set metadata on store.
2015-06-26 02:26:53 +00:00
// NOTE: No lock because Hashicorp Raft doesn't call Restore concurrently
// with any other function.
2015-06-25 23:52:47 +00:00
fsm.data = data
return nil
}
2015-05-21 20:06:01 +00:00
type storeFSMSnapshot struct {
Data *Data
}
2015-05-21 20:06:01 +00:00
func (s *storeFSMSnapshot) Persist(sink raft.SnapshotSink) error {
2015-06-26 02:26:53 +00:00
err := func() error {
// Encode data.
p, err := s.Data.MarshalBinary()
if err != nil {
return err
}
2015-06-25 23:52:47 +00:00
2015-06-26 02:26:53 +00:00
// Write data to sink.
if _, err := sink.Write(p); err != nil {
return err
}
// Close the sink.
if err := sink.Close(); err != nil {
return err
}
2015-06-25 23:52:47 +00:00
2015-06-26 02:26:53 +00:00
return nil
}()
if err != nil {
sink.Cancel()
2015-06-25 23:52:47 +00:00
return err
}
return nil
}
// Release is invoked when we are finished with the snapshot
2015-05-21 20:06:01 +00:00
func (s *storeFSMSnapshot) Release() {}
// raftLayer wraps the connection so it can be re-used for forwarding.
type raftLayer struct {
ln net.Listener
addr net.Addr
conn chan net.Conn
closed chan struct{}
}
// newRaftLayer returns a new instance of raftLayer.
func newRaftLayer(ln net.Listener, addr net.Addr) *raftLayer {
return &raftLayer{
ln: ln,
addr: addr,
conn: make(chan net.Conn),
closed: make(chan struct{}),
}
}
// Addr returns the local address for the layer.
func (l *raftLayer) Addr() net.Addr { return l.addr }
// Dial creates a new network connection.
func (l *raftLayer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", addr, timeout)
if err != nil {
return nil, err
}
// Write a marker byte for raft messages.
_, err = conn.Write([]byte{MuxRaftHeader})
if err != nil {
conn.Close()
return nil, err
}
return conn, err
}
// Accept waits for the next connection.
func (l *raftLayer) Accept() (net.Conn, error) { return l.ln.Accept() }
// Close closes the layer.
func (l *raftLayer) Close() error { return l.ln.Close() }
// RetentionPolicyUpdate represents retention policy fields to be updated.
type RetentionPolicyUpdate struct {
Name *string
Duration *time.Duration
2015-05-21 20:06:01 +00:00
ReplicaN *int
}
func (rpu *RetentionPolicyUpdate) SetName(v string) { rpu.Name = &v }
func (rpu *RetentionPolicyUpdate) SetDuration(v time.Duration) { rpu.Duration = &v }
func (rpu *RetentionPolicyUpdate) SetReplicaN(v int) { rpu.ReplicaN = &v }
// assert will panic with a given formatted message if the given condition is false.
func assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assert failed: "+msg, v...))
}
}