Move remaining raft impl details to local raft state

pull/3372/head
Jason Wilder 2015-07-17 13:27:09 -06:00
parent 790733daad
commit 85db9c46e8
2 changed files with 43 additions and 39 deletions

View File

@ -34,7 +34,12 @@ type raftState interface {
// localRaft is a consensus strategy that uses a local raft implementation fo
// consensus operations.
type localRaft struct {
store *Store
store *Store
raft *raft.Raft
transport *raft.NetworkTransport
peerStore raft.PeerStore
raftStore *raftboltdb.BoltStore
raftLayer *raftLayer
}
func (r *localRaft) invalidate() error {
@ -61,20 +66,20 @@ func (r *localRaft) openRaft() error {
}
// Build raft layer to multiplex listener.
s.raftLayer = newRaftLayer(s.RaftListener, s.Addr)
r.raftLayer = newRaftLayer(s.RaftListener, s.Addr)
// Create a transport layer
s.transport = raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, os.Stderr)
r.transport = raft.NewNetworkTransport(r.raftLayer, 3, 10*time.Second, os.Stderr)
// Create peer storage.
s.peerStore = raft.NewJSONPeers(s.path, s.transport)
r.peerStore = raft.NewJSONPeers(s.path, r.transport)
// Create the log store and stable store.
store, err := raftboltdb.NewBoltStore(filepath.Join(s.path, "raft.db"))
if err != nil {
return fmt.Errorf("new bolt store: %s", err)
}
s.store = store
r.raftStore = store
// Create the snapshot store.
snapshots, err := raft.NewFileSnapshotStore(s.path, raftSnapshotsRetained, os.Stderr)
@ -83,29 +88,28 @@ func (r *localRaft) openRaft() error {
}
// Create raft log.
ra, err := raft.NewRaft(config, (*storeFSM)(s), store, store, snapshots, s.peerStore, s.transport)
ra, err := raft.NewRaft(config, (*storeFSM)(s), store, store, snapshots, r.peerStore, r.transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
s.raft = ra
r.raft = ra
return nil
}
func (r *localRaft) close() error {
s := r.store
// Shutdown raft.
if s.raft != nil {
s.raft.Shutdown()
s.raft = nil
if r.raft != nil {
r.raft.Shutdown()
r.raft = nil
}
if s.transport != nil {
s.transport.Close()
s.transport = nil
if r.transport != nil {
r.transport.Close()
r.transport = nil
}
if s.store != nil {
s.store.Close()
s.store = nil
if r.raftStore != nil {
r.raftStore.Close()
r.raftStore = nil
}
return nil
@ -114,14 +118,14 @@ func (r *localRaft) close() error {
func (r *localRaft) initialize() error {
s := r.store
// If we have committed entries then the store is already in the cluster.
if index, err := s.store.LastIndex(); err != nil {
if index, err := r.raftStore.LastIndex(); err != nil {
return fmt.Errorf("last index: %s", err)
} else if index > 0 {
return nil
}
// Force set peers.
if err := s.SetPeers(s.peers); err != nil {
if err := r.setPeers(s.peers); err != nil {
return fmt.Errorf("set raft peers: %s", err)
}
@ -130,9 +134,8 @@ func (r *localRaft) initialize() error {
// apply applies a serialized command to the raft log.
func (r *localRaft) apply(b []byte) error {
s := r.store
// Apply to raft log.
f := s.raft.Apply(b, 0)
f := r.raft.Apply(b, 0)
if err := f.Error(); err != nil {
return err
}
@ -149,7 +152,7 @@ func (r *localRaft) apply(b []byte) error {
}
func (r *localRaft) lastIndex() uint64 {
return r.store.raft.LastIndex()
return r.raft.LastIndex()
}
func (r *localRaft) sync(index uint64, timeout time.Duration) error {
@ -180,14 +183,13 @@ func (r *localRaft) sync(index uint64, timeout time.Duration) error {
}
func (r *localRaft) snapshot() error {
future := r.store.raft.Snapshot()
future := r.raft.Snapshot()
return future.Error()
}
// addPeer adds addr to the list of peers in the cluster.
func (r *localRaft) addPeer(addr string) error {
s := r.store
peers, err := s.peerStore.Peers()
peers, err := r.peerStore.Peers()
if err != nil {
return err
}
@ -196,7 +198,7 @@ func (r *localRaft) addPeer(addr string) error {
return nil
}
if fut := s.raft.AddPeer(addr); fut.Error() != nil {
if fut := r.raft.AddPeer(addr); fut.Error() != nil {
return fut.Error()
}
return nil
@ -212,21 +214,24 @@ func (r *localRaft) setPeers(addrs []string) error {
}
a[i] = addr.String()
}
return r.store.raft.SetPeers(a).Error()
return r.raft.SetPeers(a).Error()
}
func (r *localRaft) leader() string {
if r.store.raft == nil {
if r.raft == nil {
return ""
}
return r.store.raft.Leader()
return r.raft.Leader()
}
func (r *localRaft) isLeader() bool {
r.store.mu.RLock()
defer r.store.mu.RUnlock()
return r.store.raft.State() == raft.Leader
if r.raft == nil {
return false
}
return r.raft.State() == raft.Leader
}
// remoteRaft is a consensus strategy that uses a remote raft cluster for

View File

@ -21,7 +21,6 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta/internal"
"golang.org/x/crypto/bcrypt"
@ -74,12 +73,8 @@ type Store struct {
rpc *RPC
remoteAddr net.Addr
raft *raft.Raft
raftLayer *raftLayer
peerStore raft.PeerStore
transport *raft.NetworkTransport
store *raftboltdb.BoltStore
raftState raftState
raftState raftState
ready chan struct{}
err chan error
@ -153,6 +148,7 @@ func NewStore(c Config) *Store {
Logger: log.New(os.Stderr, "[metastore] ", log.LstdFlags),
}
s.raftState = &localRaft{store: s}
s.rpc = &RPC{
store: s,
Logger: s.Logger,
@ -169,8 +165,6 @@ func (s *Store) IDPath() string { return filepath.Join(s.path, "id") }
// Open opens and initializes the raft store.
func (s *Store) Open() error {
s.raftState = &localRaft{s}
// If we have a join addr, attempt to join
if s.join != "" {
res, err := s.rpc.join(s.Addr.String(), s.join)
@ -428,6 +422,11 @@ func (s *Store) Err() <-chan error { return s.err }
// IsLeader returns true if the store is currently the leader.
func (s *Store) IsLeader() bool {
s.mu.RLock()
defer s.mu.RUnlock()
if s.raftState == nil {
return false
}
return s.raftState.isLeader()
}