Wrap raft.Log mutex.

pull/2257/head
Ben Johnson 2015-04-12 12:57:34 -06:00
parent 64d9c5784f
commit 908fe7db4a
1 changed files with 120 additions and 107 deletions

View File

@ -14,6 +14,7 @@ import (
"net/url"
"os"
"path/filepath"
"runtime"
"runtime/debug"
"sort"
"strconv"
@ -208,33 +209,41 @@ func NewLog() *Log {
return l
}
func (l *Log) lock() { l.mu.Lock() }
func (l *Log) unlock() { l.mu.Unlock() }
func (l *Log) printCaller(label string) {
_, file, line, _ := runtime.Caller(2)
l.Logger.Printf("%s: %s:%d", label, filepath.Base(file), line)
}
// Path returns the data path of the Raft log.
// Returns an empty string if the log is closed.
func (l *Log) Path() string {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
return l.path
}
// URL returns the URL for the log.
func (l *Log) URL() url.URL {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
return l.url
}
// SetURL sets the URL for the log. This must be set before opening.
func (l *Log) SetURL(u url.URL) {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
assert(!l.opened(), "url cannot be set while log is open")
l.url = u
}
// URLs returns a list of all URLs in the cluster.
func (l *Log) URLs() []url.URL {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
if l.config == nil {
return nil
@ -254,8 +263,8 @@ func (l *Log) configPath() string { return filepath.Join(l.path, "config") }
// Opened returns true if the log is currently open.
func (l *Log) Opened() bool {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
return l.opened()
}
@ -263,43 +272,43 @@ func (l *Log) opened() bool { return l.path != "" }
// ID returns the log's identifier.
func (l *Log) ID() uint64 {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
return l.id
}
// State returns the current state.
func (l *Log) State() State {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
return l.state
}
// LastLogIndexTerm returns the last index & term from the log.
func (l *Log) LastLogIndexTerm() (index, term uint64) {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
return l.lastLogIndex, l.lastLogTerm
}
// CommtIndex returns the highest committed index.
func (l *Log) CommitIndex() uint64 {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
return l.commitIndex
}
// Term returns the current term.
func (l *Log) Term() uint64 {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
return l.term
}
// Config returns a the log's current configuration.
func (l *Log) Config() *Config {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
if l.config != nil {
return l.config.Clone()
}
@ -312,8 +321,8 @@ func (l *Log) Open(path string) error {
var closing chan struct{}
var config *Config
if err := func() error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// Validate initial log state.
if l.opened() {
@ -395,8 +404,8 @@ func (l *Log) Open(path string) error {
// Close closes the log.
func (l *Log) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
return l.close()
}
@ -411,9 +420,9 @@ func (l *Log) close() error {
if l.closing != nil {
close(l.closing)
l.closing = nil
l.mu.Unlock()
l.unlock()
l.wg.Wait()
l.mu.Lock()
l.lock()
}
// Close the writers.
@ -435,8 +444,8 @@ func (l *Log) close() error {
}
func (l *Log) setReaderWithLock(r io.ReadCloser) error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
return l.setReader(r)
}
@ -597,8 +606,8 @@ func (l *Log) writeConfig(config *Config) error {
func (l *Log) Initialize() error {
var config *Config
if err := func() error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// Return error if log is not open or is already a member of a cluster.
if !l.opened() {
@ -678,16 +687,16 @@ func (l *Log) tracef(msg string, v ...interface{}) {
// IsLeader returns true if the log is the current leader.
func (l *Log) IsLeader() bool {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
return l.id != 0 && l.id == l.leaderID
}
// Leader returns the id and URL associated with the current leader.
// Returns zero if there is no current leader.
func (l *Log) Leader() (id uint64, u url.URL) {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
return l.leader()
}
@ -709,8 +718,8 @@ func (l *Log) leader() (id uint64, u url.URL) {
// ClusterID returns the identifier for the cluster.
// Returns zero if the cluster has not been initialized yet.
func (l *Log) ClusterID() uint64 {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
if l.config == nil {
return 0
}
@ -723,8 +732,8 @@ func (l *Log) Join(u url.URL) error {
// Validate under lock.
var nodeURL url.URL
if err := func() error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
if !l.opened() {
return ErrClosed
@ -753,8 +762,8 @@ func (l *Log) Join(u url.URL) error {
// Lock once the join request is returned.
if err := func() error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// Write identifier.
if err := l.writeID(id); err != nil {
@ -785,8 +794,8 @@ func (l *Log) Join(u url.URL) error {
// Leave removes the log from cluster membership and removes the log data.
func (l *Log) Leave() error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// TODO(benbjohnson): Check if open.
// TODO(benbjohnson): Apply remove peer command.
@ -814,8 +823,8 @@ func (l *Log) stateLoop(closing <-chan struct{}, state State, stateChanged chan
// Transition to new state.
var transitioning chan struct{}
func() {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
l.Logger.Printf("log state change: %s => %s (term=%d)", l.state, state, l.term)
l.state = state
@ -872,13 +881,13 @@ func (l *Log) followerLoop(closing <-chan struct{}) State {
l.tracef("followerLoop: heartbeat: term=%d, idx=%d", hb.term, hb.commitIndex)
// Update term, commit index & leader.
l.mu.Lock()
l.lock()
l.mustSetTermIfHigher(hb.term)
if hb.commitIndex > l.commitIndex {
l.commitIndex = hb.commitIndex
}
l.leaderID = hb.leaderID
l.mu.Unlock()
l.unlock()
}
}
}
@ -896,10 +905,10 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup) {
}
// Retrieve the term, last log index, & leader URL.
l.mu.Lock()
l.lock()
id, lastLogIndex, term := l.id, l.lastLogIndex, l.term
_, u := l.leader()
l.mu.Unlock()
l.unlock()
// If no leader exists then wait momentarily and retry.
if u.Host == "" {
@ -960,7 +969,7 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {
// TODO: prevote
// Increment term and request votes.
l.mu.Lock()
l.lock()
l.mustSetTermIfHigher(l.term + 1)
l.votedFor = l.id
term := l.term
@ -969,7 +978,7 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {
case <-l.terms:
default:
}
l.mu.Unlock()
l.unlock()
// Ensure all candidate goroutines complete before transitioning to another state.
var wg sync.WaitGroup
@ -986,12 +995,12 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {
case <-closing:
return Stopped
case hb := <-l.heartbeats:
l.mu.Lock()
l.lock()
l.mustSetTermIfHigher(hb.term)
if hb.term >= l.term {
l.leaderID = hb.leaderID
}
l.mu.Unlock()
l.unlock()
case <-l.terms:
return Follower
case <-elected:
@ -1007,14 +1016,14 @@ func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
// Ensure we are in the same term and copy properties.
l.mu.Lock()
l.lock()
if term != l.term {
l.mu.Unlock()
l.unlock()
return
}
id, config := l.id, l.config
lastLogIndex, lastLogTerm := l.lastLogIndex, l.lastLogTerm
l.mu.Unlock()
l.unlock()
// Request votes from peers.
votes := make(chan struct{}, len(config.Nodes))
@ -1028,9 +1037,9 @@ func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup) {
// If an error occured then update term.
if err != nil {
l.mu.Lock()
l.lock()
l.mustSetTermIfHigher(peerTerm)
l.mu.Unlock()
l.unlock()
return
}
votes <- struct{}{}
@ -1067,14 +1076,14 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State {
defer close(l.transitioning)
// Retrieve leader's term.
l.mu.Lock()
l.lock()
term := l.term
select {
case <-l.terms:
default:
}
l.mu.Unlock()
l.unlock()
// Read log from leader in a separate goroutine.
for {
@ -1089,15 +1098,15 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State {
return Stopped
case <-l.terms: // step down on higher term
l.mu.Lock()
l.lock()
l.truncateTo(l.commitIndex)
l.mu.Unlock()
l.unlock()
return Follower
case hb := <-l.heartbeats: // update term, if necessary
l.mu.Lock()
l.lock()
l.mustSetTermIfHigher(hb.term)
l.mu.Unlock()
l.unlock()
case commitIndex, ok := <-committed:
// Quorum not reached, try again.
@ -1106,12 +1115,12 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State {
}
// Quorum reached, set new commit index.
l.mu.Lock()
l.lock()
if commitIndex > l.commitIndex {
l.tracef("leaderLoop: committed: idx=%d", commitIndex)
l.commitIndex = commitIndex
}
l.mu.Unlock()
l.unlock()
continue
}
}
@ -1122,13 +1131,13 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup
defer wg.Done()
// Ensure term is correct and retrieve current state.
l.mu.Lock()
l.lock()
if l.term != term {
l.mu.Unlock()
l.unlock()
return
}
commitIndex, localIndex, leaderID, config := l.commitIndex, l.lastLogIndex, l.id, l.config
l.mu.Unlock()
l.unlock()
// Commit latest index if there are no peers.
if config == nil || len(config.Nodes) <= 1 {
@ -1198,8 +1207,8 @@ func (l *Log) Apply(command []byte) (uint64, error) {
}
func (l *Log) internalApply(typ LogEntryType, command []byte) (index uint64, err error) {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// Do not apply if this node is not the leader.
if l.state != Leader {
@ -1234,9 +1243,9 @@ func (l *Log) Wait(idx uint64) error {
// TODO(benbjohnson): Add timeout.
for {
l.mu.Lock()
l.lock()
state, index := l.state, l.FSM.Index()
l.mu.Unlock()
l.unlock()
if state == Stopped {
return ErrClosed
@ -1250,9 +1259,9 @@ func (l *Log) Wait(idx uint64) error {
// waitCommitted blocks until a given committed index is reached.
func (l *Log) waitCommitted(index uint64) error {
for {
l.mu.Lock()
l.lock()
state, committedIndex := l.state, l.commitIndex
l.mu.Unlock()
l.unlock()
if state == Stopped {
return ErrClosed
@ -1266,10 +1275,10 @@ func (l *Log) waitCommitted(index uint64) error {
// waitUncommitted blocks until a given uncommitted index is reached.
func (l *Log) waitUncommitted(index uint64) error {
for {
l.mu.Lock()
l.lock()
lastLogIndex := l.lastLogIndex
//l.tracef("waitUncommitted: %s / %d", l.state, l.lastLogIndex)
l.mu.Unlock()
l.unlock()
if lastLogIndex >= index {
return nil
@ -1362,9 +1371,9 @@ func (l *Log) applier(closing <-chan struct{}) {
}
// Trim entries.
l.mu.Lock()
l.lock()
l.trim()
l.mu.Unlock()
l.unlock()
// Signal clock that apply is done.
close(confirm)
@ -1373,8 +1382,8 @@ func (l *Log) applier(closing <-chan struct{}) {
// applyNextUnappliedEntry applies the next committed entry that has not yet been applied.
func (l *Log) applyNextUnappliedEntry(closing <-chan struct{}) error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// Verify, under lock, that we're not closing.
select {
@ -1525,8 +1534,8 @@ func (l *Log) AddPeer(u url.URL) (uint64, uint64, *Config, error) {
}
// Lock while we look up the node.
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// Look up node.
n := l.config.NodeByURL(u)
@ -1539,8 +1548,8 @@ func (l *Log) AddPeer(u url.URL) (uint64, uint64, *Config, error) {
// RemovePeer removes an existing peer from the cluster by id.
func (l *Log) RemovePeer(id uint64) error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// TODO(benbjohnson): Apply removePeerCommand.
@ -1550,8 +1559,8 @@ func (l *Log) RemovePeer(id uint64) error {
// Heartbeat establishes dominance by the current leader.
// Returns the current term and highest written log entry index.
func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error) {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// Check if log is closed.
if !l.opened() || l.state == Stopped {
@ -1569,7 +1578,6 @@ func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64
select {
case l.heartbeats <- heartbeat{term: term, commitIndex: commitIndex, leaderID: leaderID}:
default:
}
l.tracef("recv heartbeat: (term=%d, commit=%d, leaderID: %d) (index=%d, term=%d)", term, commitIndex, leaderID, l.lastLogIndex, l.term)
@ -1578,8 +1586,8 @@ func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64
// RequestVote requests a vote from the log.
func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (peerTerm uint64, err error) {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// Check if log is closed.
if !l.opened() {
@ -1628,9 +1636,9 @@ func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error {
// Write the snapshot and advance the writer through the log.
// If an error occurs then remove the writer.
if err := l.writeTo(writer, id, term, index); err != nil {
l.mu.Lock()
l.lock()
l.removeWriter(writer)
l.mu.Unlock()
l.unlock()
return err
}
@ -1641,8 +1649,8 @@ func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error {
// validates writer and adds it to the list of writers.
func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error) {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// Check if log is closed.
if !l.opened() {
@ -1713,8 +1721,8 @@ func (l *Log) writeTo(writer *logWriter, id, term, index uint64) error {
// replays entries since the snapshot's index and begins tailing the log.
func (l *Log) advanceWriter(writer *logWriter) error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// Check if writer has been closed during snapshot.
select {
@ -1759,8 +1767,8 @@ func (l *Log) removeWriter(writer *logWriter) {
// Flush pushes out buffered data for all open writers.
func (l *Log) Flush() {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
for _, w := range l.writers {
flushWriter(w.Writer)
}
@ -1808,8 +1816,8 @@ func (l *Log) ReadFrom(r io.ReadCloser) error {
default:
// Append entry to the log.
if err := func() error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
if err := l.append(e); err != nil {
return fmt.Errorf("append: %s", err)
}
@ -1831,8 +1839,8 @@ func (l *Log) applyConfigLogEntry(e *LogEntry) error {
}
// Write the configuration to disk.
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
if err := l.writeConfig(config); err != nil {
return fmt.Errorf("write config: %s", err)
}
@ -1843,8 +1851,13 @@ func (l *Log) applyConfigLogEntry(e *LogEntry) error {
// applySnapshotLogEntry restores a snapshot log entry.
func (l *Log) applySnapshotLogEntry(e *LogEntry, r io.Reader) error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// Log snapshotting time.
start := time.Now()
l.Logger.Printf("applying snapshot: begin")
defer func() { l.Logger.Printf("applying snapshot: done (%s)", time.Since(start)) }()
// Let the FSM rebuild its state from the data in r.
if _, err := l.FSM.ReadFrom(r); err != nil {
@ -1862,8 +1875,8 @@ func (l *Log) applySnapshotLogEntry(e *LogEntry, r io.Reader) error {
// Initializes the ReadFrom() call under a lock and swaps out the readers.
func (l *Log) initReadFrom(r io.ReadCloser) error {
l.mu.Lock()
defer l.mu.Unlock()
l.lock()
defer l.unlock()
// Check if log is closed.
if !l.opened() {