From 908fe7db4a51da83f6bacdb16128953d857f7fd0 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 12 Apr 2015 12:57:34 -0600 Subject: [PATCH] Wrap raft.Log mutex. --- raft/log.go | 227 +++++++++++++++++++++++++++------------------------- 1 file changed, 120 insertions(+), 107 deletions(-) diff --git a/raft/log.go b/raft/log.go index 25138d6e57..43e5e91be1 100644 --- a/raft/log.go +++ b/raft/log.go @@ -14,6 +14,7 @@ import ( "net/url" "os" "path/filepath" + "runtime" "runtime/debug" "sort" "strconv" @@ -208,33 +209,41 @@ func NewLog() *Log { return l } +func (l *Log) lock() { l.mu.Lock() } +func (l *Log) unlock() { l.mu.Unlock() } + +func (l *Log) printCaller(label string) { + _, file, line, _ := runtime.Caller(2) + l.Logger.Printf("%s: %s:%d", label, filepath.Base(file), line) +} + // Path returns the data path of the Raft log. // Returns an empty string if the log is closed. func (l *Log) Path() string { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() return l.path } // URL returns the URL for the log. func (l *Log) URL() url.URL { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() return l.url } // SetURL sets the URL for the log. This must be set before opening. func (l *Log) SetURL(u url.URL) { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() assert(!l.opened(), "url cannot be set while log is open") l.url = u } // URLs returns a list of all URLs in the cluster. func (l *Log) URLs() []url.URL { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() if l.config == nil { return nil @@ -254,8 +263,8 @@ func (l *Log) configPath() string { return filepath.Join(l.path, "config") } // Opened returns true if the log is currently open. func (l *Log) Opened() bool { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() return l.opened() } @@ -263,43 +272,43 @@ func (l *Log) opened() bool { return l.path != "" } // ID returns the log's identifier. func (l *Log) ID() uint64 { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() return l.id } // State returns the current state. func (l *Log) State() State { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() return l.state } // LastLogIndexTerm returns the last index & term from the log. func (l *Log) LastLogIndexTerm() (index, term uint64) { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() return l.lastLogIndex, l.lastLogTerm } // CommtIndex returns the highest committed index. func (l *Log) CommitIndex() uint64 { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() return l.commitIndex } // Term returns the current term. func (l *Log) Term() uint64 { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() return l.term } // Config returns a the log's current configuration. func (l *Log) Config() *Config { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() if l.config != nil { return l.config.Clone() } @@ -312,8 +321,8 @@ func (l *Log) Open(path string) error { var closing chan struct{} var config *Config if err := func() error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // Validate initial log state. if l.opened() { @@ -395,8 +404,8 @@ func (l *Log) Open(path string) error { // Close closes the log. func (l *Log) Close() error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() return l.close() } @@ -411,9 +420,9 @@ func (l *Log) close() error { if l.closing != nil { close(l.closing) l.closing = nil - l.mu.Unlock() + l.unlock() l.wg.Wait() - l.mu.Lock() + l.lock() } // Close the writers. @@ -435,8 +444,8 @@ func (l *Log) close() error { } func (l *Log) setReaderWithLock(r io.ReadCloser) error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() return l.setReader(r) } @@ -597,8 +606,8 @@ func (l *Log) writeConfig(config *Config) error { func (l *Log) Initialize() error { var config *Config if err := func() error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // Return error if log is not open or is already a member of a cluster. if !l.opened() { @@ -678,16 +687,16 @@ func (l *Log) tracef(msg string, v ...interface{}) { // IsLeader returns true if the log is the current leader. func (l *Log) IsLeader() bool { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() return l.id != 0 && l.id == l.leaderID } // Leader returns the id and URL associated with the current leader. // Returns zero if there is no current leader. func (l *Log) Leader() (id uint64, u url.URL) { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() return l.leader() } @@ -709,8 +718,8 @@ func (l *Log) leader() (id uint64, u url.URL) { // ClusterID returns the identifier for the cluster. // Returns zero if the cluster has not been initialized yet. func (l *Log) ClusterID() uint64 { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() if l.config == nil { return 0 } @@ -723,8 +732,8 @@ func (l *Log) Join(u url.URL) error { // Validate under lock. var nodeURL url.URL if err := func() error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() if !l.opened() { return ErrClosed @@ -753,8 +762,8 @@ func (l *Log) Join(u url.URL) error { // Lock once the join request is returned. if err := func() error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // Write identifier. if err := l.writeID(id); err != nil { @@ -785,8 +794,8 @@ func (l *Log) Join(u url.URL) error { // Leave removes the log from cluster membership and removes the log data. func (l *Log) Leave() error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // TODO(benbjohnson): Check if open. // TODO(benbjohnson): Apply remove peer command. @@ -814,8 +823,8 @@ func (l *Log) stateLoop(closing <-chan struct{}, state State, stateChanged chan // Transition to new state. var transitioning chan struct{} func() { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() l.Logger.Printf("log state change: %s => %s (term=%d)", l.state, state, l.term) l.state = state @@ -872,13 +881,13 @@ func (l *Log) followerLoop(closing <-chan struct{}) State { l.tracef("followerLoop: heartbeat: term=%d, idx=%d", hb.term, hb.commitIndex) // Update term, commit index & leader. - l.mu.Lock() + l.lock() l.mustSetTermIfHigher(hb.term) if hb.commitIndex > l.commitIndex { l.commitIndex = hb.commitIndex } l.leaderID = hb.leaderID - l.mu.Unlock() + l.unlock() } } } @@ -896,10 +905,10 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup) { } // Retrieve the term, last log index, & leader URL. - l.mu.Lock() + l.lock() id, lastLogIndex, term := l.id, l.lastLogIndex, l.term _, u := l.leader() - l.mu.Unlock() + l.unlock() // If no leader exists then wait momentarily and retry. if u.Host == "" { @@ -960,7 +969,7 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State { // TODO: prevote // Increment term and request votes. - l.mu.Lock() + l.lock() l.mustSetTermIfHigher(l.term + 1) l.votedFor = l.id term := l.term @@ -969,7 +978,7 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State { case <-l.terms: default: } - l.mu.Unlock() + l.unlock() // Ensure all candidate goroutines complete before transitioning to another state. var wg sync.WaitGroup @@ -986,12 +995,12 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State { case <-closing: return Stopped case hb := <-l.heartbeats: - l.mu.Lock() + l.lock() l.mustSetTermIfHigher(hb.term) if hb.term >= l.term { l.leaderID = hb.leaderID } - l.mu.Unlock() + l.unlock() case <-l.terms: return Follower case <-elected: @@ -1007,14 +1016,14 @@ func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup) { defer wg.Done() // Ensure we are in the same term and copy properties. - l.mu.Lock() + l.lock() if term != l.term { - l.mu.Unlock() + l.unlock() return } id, config := l.id, l.config lastLogIndex, lastLogTerm := l.lastLogIndex, l.lastLogTerm - l.mu.Unlock() + l.unlock() // Request votes from peers. votes := make(chan struct{}, len(config.Nodes)) @@ -1028,9 +1037,9 @@ func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup) { // If an error occured then update term. if err != nil { - l.mu.Lock() + l.lock() l.mustSetTermIfHigher(peerTerm) - l.mu.Unlock() + l.unlock() return } votes <- struct{}{} @@ -1067,14 +1076,14 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State { defer close(l.transitioning) // Retrieve leader's term. - l.mu.Lock() + l.lock() term := l.term select { case <-l.terms: default: } - l.mu.Unlock() + l.unlock() // Read log from leader in a separate goroutine. for { @@ -1089,15 +1098,15 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State { return Stopped case <-l.terms: // step down on higher term - l.mu.Lock() + l.lock() l.truncateTo(l.commitIndex) - l.mu.Unlock() + l.unlock() return Follower case hb := <-l.heartbeats: // update term, if necessary - l.mu.Lock() + l.lock() l.mustSetTermIfHigher(hb.term) - l.mu.Unlock() + l.unlock() case commitIndex, ok := <-committed: // Quorum not reached, try again. @@ -1106,12 +1115,12 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State { } // Quorum reached, set new commit index. - l.mu.Lock() + l.lock() if commitIndex > l.commitIndex { l.tracef("leaderLoop: committed: idx=%d", commitIndex) l.commitIndex = commitIndex } - l.mu.Unlock() + l.unlock() continue } } @@ -1122,13 +1131,13 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup defer wg.Done() // Ensure term is correct and retrieve current state. - l.mu.Lock() + l.lock() if l.term != term { - l.mu.Unlock() + l.unlock() return } commitIndex, localIndex, leaderID, config := l.commitIndex, l.lastLogIndex, l.id, l.config - l.mu.Unlock() + l.unlock() // Commit latest index if there are no peers. if config == nil || len(config.Nodes) <= 1 { @@ -1198,8 +1207,8 @@ func (l *Log) Apply(command []byte) (uint64, error) { } func (l *Log) internalApply(typ LogEntryType, command []byte) (index uint64, err error) { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // Do not apply if this node is not the leader. if l.state != Leader { @@ -1234,9 +1243,9 @@ func (l *Log) Wait(idx uint64) error { // TODO(benbjohnson): Add timeout. for { - l.mu.Lock() + l.lock() state, index := l.state, l.FSM.Index() - l.mu.Unlock() + l.unlock() if state == Stopped { return ErrClosed @@ -1250,9 +1259,9 @@ func (l *Log) Wait(idx uint64) error { // waitCommitted blocks until a given committed index is reached. func (l *Log) waitCommitted(index uint64) error { for { - l.mu.Lock() + l.lock() state, committedIndex := l.state, l.commitIndex - l.mu.Unlock() + l.unlock() if state == Stopped { return ErrClosed @@ -1266,10 +1275,10 @@ func (l *Log) waitCommitted(index uint64) error { // waitUncommitted blocks until a given uncommitted index is reached. func (l *Log) waitUncommitted(index uint64) error { for { - l.mu.Lock() + l.lock() lastLogIndex := l.lastLogIndex //l.tracef("waitUncommitted: %s / %d", l.state, l.lastLogIndex) - l.mu.Unlock() + l.unlock() if lastLogIndex >= index { return nil @@ -1362,9 +1371,9 @@ func (l *Log) applier(closing <-chan struct{}) { } // Trim entries. - l.mu.Lock() + l.lock() l.trim() - l.mu.Unlock() + l.unlock() // Signal clock that apply is done. close(confirm) @@ -1373,8 +1382,8 @@ func (l *Log) applier(closing <-chan struct{}) { // applyNextUnappliedEntry applies the next committed entry that has not yet been applied. func (l *Log) applyNextUnappliedEntry(closing <-chan struct{}) error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // Verify, under lock, that we're not closing. select { @@ -1525,8 +1534,8 @@ func (l *Log) AddPeer(u url.URL) (uint64, uint64, *Config, error) { } // Lock while we look up the node. - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // Look up node. n := l.config.NodeByURL(u) @@ -1539,8 +1548,8 @@ func (l *Log) AddPeer(u url.URL) (uint64, uint64, *Config, error) { // RemovePeer removes an existing peer from the cluster by id. func (l *Log) RemovePeer(id uint64) error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // TODO(benbjohnson): Apply removePeerCommand. @@ -1550,8 +1559,8 @@ func (l *Log) RemovePeer(id uint64) error { // Heartbeat establishes dominance by the current leader. // Returns the current term and highest written log entry index. func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error) { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // Check if log is closed. if !l.opened() || l.state == Stopped { @@ -1569,7 +1578,6 @@ func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64 select { case l.heartbeats <- heartbeat{term: term, commitIndex: commitIndex, leaderID: leaderID}: default: - } l.tracef("recv heartbeat: (term=%d, commit=%d, leaderID: %d) (index=%d, term=%d)", term, commitIndex, leaderID, l.lastLogIndex, l.term) @@ -1578,8 +1586,8 @@ func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64 // RequestVote requests a vote from the log. func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (peerTerm uint64, err error) { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // Check if log is closed. if !l.opened() { @@ -1628,9 +1636,9 @@ func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error { // Write the snapshot and advance the writer through the log. // If an error occurs then remove the writer. if err := l.writeTo(writer, id, term, index); err != nil { - l.mu.Lock() + l.lock() l.removeWriter(writer) - l.mu.Unlock() + l.unlock() return err } @@ -1641,8 +1649,8 @@ func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error { // validates writer and adds it to the list of writers. func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error) { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // Check if log is closed. if !l.opened() { @@ -1713,8 +1721,8 @@ func (l *Log) writeTo(writer *logWriter, id, term, index uint64) error { // replays entries since the snapshot's index and begins tailing the log. func (l *Log) advanceWriter(writer *logWriter) error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // Check if writer has been closed during snapshot. select { @@ -1759,8 +1767,8 @@ func (l *Log) removeWriter(writer *logWriter) { // Flush pushes out buffered data for all open writers. func (l *Log) Flush() { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() for _, w := range l.writers { flushWriter(w.Writer) } @@ -1808,8 +1816,8 @@ func (l *Log) ReadFrom(r io.ReadCloser) error { default: // Append entry to the log. if err := func() error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() if err := l.append(e); err != nil { return fmt.Errorf("append: %s", err) } @@ -1831,8 +1839,8 @@ func (l *Log) applyConfigLogEntry(e *LogEntry) error { } // Write the configuration to disk. - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() if err := l.writeConfig(config); err != nil { return fmt.Errorf("write config: %s", err) } @@ -1843,8 +1851,13 @@ func (l *Log) applyConfigLogEntry(e *LogEntry) error { // applySnapshotLogEntry restores a snapshot log entry. func (l *Log) applySnapshotLogEntry(e *LogEntry, r io.Reader) error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() + + // Log snapshotting time. + start := time.Now() + l.Logger.Printf("applying snapshot: begin") + defer func() { l.Logger.Printf("applying snapshot: done (%s)", time.Since(start)) }() // Let the FSM rebuild its state from the data in r. if _, err := l.FSM.ReadFrom(r); err != nil { @@ -1862,8 +1875,8 @@ func (l *Log) applySnapshotLogEntry(e *LogEntry, r io.Reader) error { // Initializes the ReadFrom() call under a lock and swaps out the readers. func (l *Log) initReadFrom(r io.ReadCloser) error { - l.mu.Lock() - defer l.mu.Unlock() + l.lock() + defer l.unlock() // Check if log is closed. if !l.opened() {