buildable again. lot of wip

pull/5428/head
Cory LaNou 2015-12-19 11:10:33 -06:00 committed by David Norton
parent b0d0668138
commit d3ab0b5ae6
6 changed files with 216 additions and 81 deletions

View File

@ -7,7 +7,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta/internal"
"github.com/influxdb/influxdb/services/meta/internal"
)
//go:generate protoc --gogo_out=. internal/meta.proto

View File

@ -14,7 +14,7 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/influxdb/influxdb/meta/internal"
"github.com/influxdb/influxdb/services/meta/internal"
"github.com/influxdb/influxdb/uuid"
)

View File

@ -15,51 +15,59 @@ import (
"github.com/hashicorp/raft-boltdb"
)
// Raft configuration.
const (
raftLogCacheSize = 512
raftSnapshotsRetained = 2
raftTransportMaxPool = 3
raftTransportTimeout = 10 * time.Second
)
// raftState is a consensus strategy that uses a local raft implementation for
// consensus operations.
type raftState struct {
wg sync.WaitGroup
config *Config
closing chan struct{}
raft *raft.Raft
transport *raft.NetworkTransport
peerStore raft.PeerStore
raftStore *raftboltdb.BoltStore
raftLayer *raftLayer
peers []string
ln *net.Listner
logger *log.Logger
wg sync.WaitGroup
config *Config
closing chan struct{}
raft *raft.Raft
transport *raft.NetworkTransport
peerStore raft.PeerStore
raftStore *raftboltdb.BoltStore
raftLayer *raftLayer
joinPeers []string
ln net.Listener
logger *log.Logger
remoteAddr net.Addr
path string
}
func newRaftState(c *Config, peers []string, ln *net.Listener, l *log.Logger) *raftState {
func newRaftState(c *Config, joinPeers []string) *raftState {
return &raftState{
config: c,
peers: peers,
logger: l,
ln: ln,
config: c,
joinPeers: joinPeers,
}
}
func (r *raftState) open() error {
func (r *raftState) open(s *store) error {
r.closing = make(chan struct{})
// Setup raft configuration.
config := raft.DefaultConfig()
config.LogOutput = ioutil.Discard
if s.clusterTracingEnabled {
config.Logger = s.logger
if r.config.ClusterTracing {
config.Logger = r.logger
}
config.HeartbeatTimeout = r.config.HeartbeatTimeout
config.ElectionTimeout = r.config.ElectionTimeout
config.LeaderLeaseTimeout = r.config.LeaderLeaseTimeout
config.CommitTimeout = r.config.CommitTimeout
config.HeartbeatTimeout = time.Duration(r.config.HeartbeatTimeout)
config.ElectionTimeout = time.Duration(r.config.ElectionTimeout)
config.LeaderLeaseTimeout = time.Duration(r.config.LeaderLeaseTimeout)
config.CommitTimeout = time.Duration(r.config.CommitTimeout)
// Since we actually never call `removePeer` this is safe.
// If in the future we decide to call remove peer we have to re-evaluate how to handle this
config.ShutdownOnRemove = false
// If no peers are set in the config or there is one and we are it, then start as a single server.
if len(s.peers) <= 1 {
if len(r.joinPeers) <= 1 {
config.EnableSingleNode = true
// Ensure we can always become the leader
config.DisableBootstrapAfterElect = false
@ -72,7 +80,7 @@ func (r *raftState) open() error {
r.transport = raft.NewNetworkTransport(r.raftLayer, 3, 10*time.Second, config.LogOutput)
// Create peer storage.
r.peerStore = raft.NewJSONPeers(s.path, r.transport)
r.peerStore = raft.NewJSONPeers(r.path, r.transport)
peers, err := r.peerStore.Peers()
if err != nil {
@ -82,30 +90,30 @@ func (r *raftState) open() error {
// For single-node clusters, we can update the raft peers before we start the cluster if the hostname
// has changed.
if config.EnableSingleNode {
if err := r.peerStore.SetPeers([]string{s.RemoteAddr.String()}); err != nil {
if err := r.peerStore.SetPeers([]string{r.remoteAddr.String()}); err != nil {
return err
}
peers = []string{s.RemoteAddr.String()}
peers = []string{r.remoteAddr.String()}
}
// If we have multiple nodes in the cluster, make sure our address is in the raft peers or
// we won't be able to boot into the cluster because the other peers will reject our new hostname. This
// is difficult to resolve automatically because we need to have all the raft peers agree on the current members
// of the cluster before we can change them.
if len(peers) > 0 && !raft.PeerContained(peers, s.RemoteAddr.String()) {
s.logger.Printf("%s is not in the list of raft peers. Please update %v/peers.json on all raft nodes to have the same contents.", s.RemoteAddr.String(), s.Path())
return fmt.Errorf("peers out of sync: %v not in %v", s.RemoteAddr.String(), peers)
if len(peers) > 0 && !raft.PeerContained(peers, r.remoteAddr.String()) {
r.logger.Printf("%s is not in the list of raft peers. Please update %v/peers.json on all raft nodes to have the same contents.", r.remoteAddr.String(), r.path)
return fmt.Errorf("peers out of sync: %v not in %v", r.remoteAddr.String(), peers)
}
// Create the log store and stable store.
store, err := raftboltdb.NewBoltStore(filepath.Join(s.path, "raft.db"))
store, err := raftboltdb.NewBoltStore(filepath.Join(r.path, "raft.db"))
if err != nil {
return fmt.Errorf("new bolt store: %s", err)
}
r.raftStore = store
// Create the snapshot store.
snapshots, err := raft.NewFileSnapshotStore(s.path, raftSnapshotsRetained, os.Stderr)
snapshots, err := raft.NewFileSnapshotStore(r.path, raftSnapshotsRetained, os.Stderr)
if err != nil {
return fmt.Errorf("file snapshot store: %s", err)
}
@ -177,7 +185,7 @@ func (r *raftState) initialize() error {
}
// Force set peers.
if err := r.setPeers(r.peers); err != nil {
if err := r.setPeers(r.joinPeers); err != nil {
return fmt.Errorf("set raft peers: %s", err)
}
@ -196,9 +204,11 @@ func (r *raftState) apply(b []byte) error {
// No other non-nil objects should be returned.
resp := f.Response()
if err, ok := resp.(error); ok {
return lookupError(err)
return err
}
if resp != nil {
panic(fmt.Sprintf("unexpected response: %#v", resp))
}
assert(resp == nil, "unexpected response: %#v", resp)
return nil
}
@ -292,13 +302,6 @@ func (l *raftLayer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
if err != nil {
return nil, err
}
// Write a marker byte for raft messages.
_, err = conn.Write([]byte{MuxRaftHeader})
if err != nil {
conn.Close()
return nil, err
}
return conn, err
}

View File

@ -44,6 +44,9 @@ func (s *Service) Open() error {
// Open the store
store := newStore(s.config)
s.store = store
if err := s.store.open(); err != nil {
return err
}
handler := newHandler(s.config)
handler.logger = s.Logger

View File

@ -3,11 +3,14 @@ package meta
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"time"
@ -15,15 +18,30 @@ import (
)
type store struct {
mu sync.RWMutex
closing chan struct{}
id uint64 // local node id
mu sync.RWMutex
closing chan struct{}
config *Config
data *Data
raftState *raftState
dataChanged chan struct{}
ready chan struct{}
addr string
raftln net.Listener
path string
opened bool
peers []string
logger *log.Logger
// Authentication cache.
authCache map[string]authUser
}
type authUser struct {
salt []byte
hash []byte
}
func newStore(c *Config) *store {
@ -31,11 +49,19 @@ func newStore(c *Config) *store {
data: &Data{
Index: 1,
},
ready: make(chan struct{}),
closing: make(chan struct{}),
dataChanged: make(chan struct{}),
addr: c.RaftAddr,
addr: c.RaftBindAddress,
path: c.Dir,
config: c,
}
if c.LoggingEnabled {
s.logger = log.New(os.Stderr, "[metastore] ", log.LstdFlags)
} else {
s.logger = log.New(ioutil.Discard, "", 0)
}
return &s
}
@ -46,9 +72,9 @@ func (s *store) open() error {
return err
}
s.raftln = ln
s.addr = ln.Addr()
s.addr = ln.Addr().String()
s.Logger.Printf("Using data dir: %v", s.path)
s.logger.Printf("Using data dir: %v", s.path)
if err := func() error {
s.mu.Lock()
@ -76,7 +102,7 @@ func (s *store) open() error {
}
// Initialize the store, if necessary.
if err := s.initialize(); err != nil {
if err := s.raftState.initialize(); err != nil {
return fmt.Errorf("initialize raft: %s", err)
}
@ -90,13 +116,6 @@ func (s *store) open() error {
return err
}
// Begin serving listener.
s.wg.Add(1)
go s.serveExecListener()
s.wg.Add(1)
go s.serveRPCListener()
// Join an existing cluster if we needed
if err := s.joinCluster(); err != nil {
return fmt.Errorf("join: %v", err)
@ -104,25 +123,23 @@ func (s *store) open() error {
// If the ID doesn't exist then create a new node.
if s.id == 0 {
go s.init()
go s.raftState.initialize()
} else {
go s.syncNodeInfo()
// TODO: enable node info sync
// all this does is update the raft peers with the new hostname of this node if it changed
// based on the ID of this node
// go s.syncNodeInfo()
close(s.ready)
}
// Wait for a leader to be elected so we know the raft log is loaded
// and up to date
<-s.ready
if err := s.WaitForLeader(0); err != nil {
if err := s.waitForLeader(0); err != nil {
return err
}
if s.raftPromotionEnabled {
s.wg.Add(1)
s.Logger.Printf("spun up monitoring for %d", s.NodeID())
go s.monitorPeerHealth()
}
return nil
}
@ -167,6 +184,86 @@ func readPeersJSON(path string) ([]string, error) {
return peers, nil
}
// IDPath returns the path to the local node ID file.
func (s *store) IDPath() string { return filepath.Join(s.path, "id") }
// readID reads the local node ID from the ID file.
func (s *store) readID() error {
b, err := ioutil.ReadFile(s.IDPath())
if os.IsNotExist(err) {
s.id = 0
return nil
} else if err != nil {
return fmt.Errorf("read file: %s", err)
}
id, err := strconv.ParseUint(string(b), 10, 64)
if err != nil {
return fmt.Errorf("parse id: %s", err)
}
s.id = id
return nil
}
func (s *store) openRaft() error {
rs := newRaftState(s.config, s.peers)
rs.ln = s.raftln
rs.logger = s.logger
rs.path = s.path
rs.remoteAddr = s.raftln.Addr()
rs.open(s)
return nil
}
func (s *store) joinCluster() error {
// No join options, so nothing to do
if len(s.peers) == 0 {
return nil
}
// We already have a node ID so were already part of a cluster,
// don't join again so we can use our existing state.
if s.id != 0 {
s.logger.Printf("Skipping cluster join: already member of cluster: nodeId=%v raftEnabled=%v peers=%v",
s.id, raft.PeerContained(s.peers, s.addr), s.peers)
return nil
}
s.logger.Printf("Joining cluster at: %v", s.peers)
for {
for _, join := range s.peers {
// delete me:
_ = join
// TODO rework this to use the HTTP endpoint for joining
//res, err := s.rpc.join(s.RemoteAddr.String(), join)
//if err != nil {
//s.logger.Printf("Join node %v failed: %v: retrying...", join, err)
//continue
//}
//s.logger.Printf("Joined remote node %v", join)
//s.logger.Printf("nodeId=%v raftEnabled=%v peers=%v", res.NodeID, res.RaftEnabled, res.RaftNodes)
//s.peers = res.RaftNodes
//s.id = res.NodeID
//if err := s.writeNodeID(res.NodeID); err != nil {
//s.logger.Printf("Write node id failed: %v", err)
//break
//}
return nil
}
time.Sleep(time.Second)
}
}
func (s *store) close() error {
s.mu.Lock()
defer s.mu.Unlock()
@ -196,14 +293,40 @@ func (s *store) afterIndex(index uint64) <-chan struct{} {
return s.dataChanged
}
// WaitForLeader sleeps until a leader is found or a timeout occurs.
// timeout == 0 means to wait forever.
func (s *store) waitForLeader(timeout time.Duration) error {
// Begin timeout timer.
timer := time.NewTimer(timeout)
defer timer.Stop()
// Continually check for leader until timeout.
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-s.closing:
return errors.New("closing")
case <-timer.C:
if timeout != 0 {
return errors.New("timeout")
}
case <-ticker.C:
if s.leader() != "" {
return nil
}
}
}
}
// isLeader returns true if the store is currently the leader.
func (s *store) isLeader() bool {
s.mu.RLock()
defer s.mu.RUnlock()
if s.raft == nil {
if s.raftState == nil {
return false
}
return s.raft.State() == raft.Leader
return s.raftState.raft.State() == raft.Leader
}
// leader returns what the store thinks is the current leader. An empty
@ -211,7 +334,10 @@ func (s *store) isLeader() bool {
func (s *store) leader() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.raft.Leader()
if s.raftState == nil {
return ""
}
return s.raftState.raft.Leader()
}
// index returns the current store index.
@ -226,7 +352,7 @@ func (s *store) apply(b []byte) error {
s.mu.RLock()
defer s.mu.RUnlock()
// Apply to raft log.
f := s.raft.Apply(b, 0)
f := s.raftState.raft.Apply(b, 0)
if err := f.Error(); err != nil {
return err
}

View File

@ -9,11 +9,11 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/hashicorp/raft"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta/internal"
"github.com/influxdb/influxdb/services/meta/internal"
)
// storeFSM represents the finite state machine used by Store to interact with Raft.
type storeFSM Store
type storeFSM store
func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
var cmd internal.Command
@ -22,7 +22,7 @@ func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
}
// Lock the store.
s := (*Store)(fsm)
s := (*store)(fsm)
s.mu.Lock()
defer s.mu.Unlock()
@ -80,7 +80,10 @@ func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
// Copy term and index to new metadata.
fsm.data.Term = l.Term
fsm.data.Index = l.Index
s.notifyChanged()
// signal that the data changed
close(s.dataChanged)
s.dataChanged = make(chan struct{})
return err
}
@ -95,17 +98,17 @@ func (fsm *storeFSM) applyRemovePeerCommand(cmd *internal.Command) interface{} {
// Only do this if you are the leader
if fsm.raftState.isLeader() {
//Remove that node from the peer
fsm.Logger.Printf("removing peer for node id %d, %s", id, addr)
fsm.logger.Printf("removing peer for node id %d, %s", id, addr)
if err := fsm.raftState.removePeer(addr); err != nil {
fsm.Logger.Printf("error removing peer: %s", err)
fsm.logger.Printf("error removing peer: %s", err)
}
}
// If this is the node being shutdown, close raft
if fsm.id == id {
fsm.Logger.Printf("shutting down raft for %s", addr)
fsm.logger.Printf("shutting down raft for %s", addr)
if err := fsm.raftState.close(); err != nil {
fsm.Logger.Printf("failed to shut down raft: %s", err)
fsm.logger.Printf("failed to shut down raft: %s", err)
}
}
@ -160,7 +163,7 @@ func (fsm *storeFSM) applyDeleteNodeCommand(cmd *internal.Command) interface{} {
fsm.data = other
id := v.GetID()
fsm.Logger.Printf("node '%d' removed", id)
fsm.logger.Printf("node '%d' removed", id)
return nil
}
@ -431,11 +434,11 @@ func (fsm *storeFSM) applySetDataCommand(cmd *internal.Command) interface{} {
}
func (fsm *storeFSM) Snapshot() (raft.FSMSnapshot, error) {
s := (*Store)(fsm)
s := (*store)(fsm)
s.mu.Lock()
defer s.mu.Unlock()
return &storeFSMSnapshot{Data: (*Store)(fsm).data}, nil
return &storeFSMSnapshot{Data: (*store)(fsm).data}, nil
}
func (fsm *storeFSM) Restore(r io.ReadCloser) error {