diff --git a/CHANGELOG.md b/CHANGELOG.md index 22a11f706e..617d228892 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - [#2404](https://github.com/influxdb/influxdb/pull/2404): Mean and percentile function fixes - [#2408](https://github.com/influxdb/influxdb/pull/2408): Fix snapshot 500 error - [#1896](https://github.com/influxdb/influxdb/issues/1896): Excessive heartbeater logging of "connection refused" on cluster node stop +- [#2418](https://github.com/influxdb/influxdb/pull/2418): Fix raft node getting stuck in candidate state ### Features - [#2410](https://github.com/influxdb/influxdb/pull/2410) Allow configuration of Raft timers diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 0ba7e94fdd..3d531cc801 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -350,7 +350,7 @@ func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster, expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",%d]]}]}]}`, numPoints-1) got, ok, nOK := queryAndWait(t, nodes, database, `SELECT count(value) FROM cpu`, expected, "", 120*time.Second) if !ok { - t.Errorf("test %s:rawDataReturnsInOrder failed, SELECT count() query returned unexpected data\nexp: %s\n, got: %s\n%d nodes responded correctly", testName, expected, got, nOK) + t.Errorf("test %s:rawDataReturnsInOrder failed, SELECT count() query returned unexpected data\nexp: %s\ngot: %s\n%d nodes responded correctly", testName, expected, got, nOK) dumpClusterDiags(t, testName, nodes) dumpClusterStats(t, testName, nodes) } @@ -1524,7 +1524,6 @@ func Test3NodeServerFailover(t *testing.T) { // ensure that all queries work if there are more nodes in a cluster than the replication factor // and there is more than 1 shards func Test5NodeClusterPartiallyReplicated(t *testing.T) { - t.Skip("unstable, skipping for now") t.Parallel() testName := "5-node server integration partial replication" if testing.Short() { diff --git a/raft/log.go b/raft/log.go index 03acf4083a..87ccda290f 100644 --- a/raft/log.go +++ b/raft/log.go @@ -393,7 +393,7 @@ func (l *Log) Open(path string) error { go l.applier(l.closing) if l.config != nil { - l.Logger.Printf("log open: created at %s, with ID %d, term %d, last applied index of %d", path, l.id, l.term, l.lastLogIndex) + l.printf("log open: created at %s, with ID %d, term %d, last applied index of %d", path, l.id, l.term, l.lastLogIndex) } // Retrieve variables to use while starting state loop. @@ -419,7 +419,7 @@ func (l *Log) Open(path string) error { l.startStateLoop(closing, Follower) } } else { - l.Logger.Printf("log pending: waiting for initialization or join") + l.printf("log pending: waiting for initialization or join") } return nil @@ -555,7 +555,7 @@ func (l *Log) writeTerm(term uint64) error { // setTerm sets the current term and clears the vote. func (l *Log) setTerm(term uint64) error { - l.Logger.Printf("changing term: %d => %d", l.term, term) + l.printf("changing term: %d => %d", l.term, term) if err := l.writeTerm(term); err != nil { return err @@ -668,7 +668,7 @@ func (l *Log) Initialize() error { // Begin state loop as leader. l.startStateLoop(l.closing, Leader) - l.Logger.Printf("log initialize: promoted to 'leader' with cluster ID %d, log ID %d, term %d", + l.printf("log initialize: promoted to 'leader' with cluster ID %d, log ID %d, term %d", config.ClusterID, l.id, l.term) // Set initial configuration. @@ -693,10 +693,14 @@ func (l *Log) trace(v ...interface{}) { // trace writes a formatted log message if DebugEnabled is true. func (l *Log) tracef(msg string, v ...interface{}) { if l.DebugEnabled { - l.Logger.Printf(msg+"\n", v...) + l.printf(msg, v...) } } +func (l *Log) printf(msg string, v ...interface{}) { + l.Logger.Printf(fmt.Sprintf("%s[%d]: ", l.state, l.id)+msg+"\n", v...) +} + // IsLeader returns true if the log is the current leader. func (l *Log) IsLeader() bool { l.lock() @@ -838,7 +842,7 @@ func (l *Log) stateLoop(closing <-chan struct{}, state State, stateChanged chan l.lock() defer l.unlock() - l.Logger.Printf("log state change: %s => %s (term=%d)", l.state, state, l.term) + l.tracef("log state change: %s => %s (term=%d)", l.state, state, l.term) l.state = state l.transitioning = make(chan struct{}, 0) transitioning = l.transitioning @@ -901,7 +905,7 @@ func (l *Log) followerLoop(closing <-chan struct{}) State { return Candidate case hb := <-l.heartbeats: - l.tracef("followerLoop: heartbeat: term=%d, idx=%d", hb.term, hb.commitIndex) + l.tracef("followerLoop: recv heartbeat: term=%d, idx=%d", hb.term, hb.commitIndex) // Update term, commit index & leader. l.lock() @@ -917,7 +921,7 @@ func (l *Log) followerLoop(closing <-chan struct{}) State { func (l *Log) readFromLeader(wg *sync.WaitGroup) { defer wg.Done() - l.tracef("readFromLeader:") + l.tracef("readFromLeader") for { select { @@ -944,7 +948,7 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup) { l.tracef("readFromLeader: read from: %s, id=%d, term=%d, index=%d", u.String(), id, term, lastLogIndex) r, err := l.Transport.ReadFrom(u, id, term, lastLogIndex) if err != nil { - l.Logger.Printf("connect stream: %s", err) + l.printf("readFromLeader: connect stream: %s", err) time.Sleep(500 * time.Millisecond) continue } @@ -958,7 +962,7 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup) { // truncateTo removes all uncommitted entries up to index. func (l *Log) truncateTo(index uint64) { - assert(index >= l.commitIndex, "cannot truncate to before the commit index: index=%d, commit=%d", index, l.commitIndex) + assert(index >= l.commitIndex, "cannot truncate to before the commit index: id=%d index=%d, commit=%d", l.id, index, l.commitIndex) // Ignore if there are no entries. // Ignore if all entries are before the index. @@ -981,7 +985,8 @@ func (l *Log) truncateTo(index uint64) { l.entries = l.entries[:index-emin+1] l.lastLogIndex = index - assert(l.entries[len(l.entries)-1].Index == index, "last entry in truncation not index: emax=%d, index=%d", l.entries[len(l.entries)-1].Index, index) + assert(l.entries[len(l.entries)-1].Index == index, "last entry in truncation not index: id=%d emax=%d, index=%d", + l.id, l.entries[len(l.entries)-1].Index, index) } // candidateLoop requests vote from other nodes in an attempt to become leader. @@ -1021,7 +1026,10 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State { l.lock() l.mustSetTermIfHigher(hb.term) if hb.term >= l.term { + l.tracef("candidateLoop: new leader: old=%d new=%d", l.leaderID, hb.leaderID) l.leaderID = hb.leaderID + l.unlock() + return Follower } l.unlock() case <-l.terms: @@ -1056,7 +1064,7 @@ func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup) { } go func(n *ConfigNode) { peerTerm, err := l.Transport.RequestVote(n.URL, term, id, lastLogIndex, lastLogTerm) - l.Logger.Printf("send req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (term=%d, err=%v)", term, id, lastLogIndex, lastLogTerm, peerTerm, err) + l.tracef("send req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (term=%d, err=%v)", term, id, lastLogIndex, lastLogTerm, peerTerm, err) // If an error occured then update term. if err != nil { @@ -1169,7 +1177,7 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup return } - l.tracef("send heartbeat: start: n=%d", len(config.Nodes)) + l.tracef("leaderLoop: send heartbeat: start: n=%d", len(config.Nodes)) // Send heartbeats to all peers. peerIndices := make(chan uint64, len(config.Nodes)) @@ -1183,13 +1191,13 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup c := atomic.AddInt64(&n.HeartbeatErrorCount, 1) // log heartbeat error once every 15 seconds to avoid flooding the logs if time.Now().Unix()-atomic.LoadInt64(&n.LastHeartbeatError) > heartbeartErrorLogThreshold { - l.Logger.Printf("send heartbeat: error: cnt=%d %s", c, err) + l.printf("leaderLoop: send heartbeat: error: cnt=%d %s", c, err) atomic.StoreInt64(&n.LastHeartbeatError, time.Now().Unix()) } return } if atomic.LoadInt64(&n.LastHeartbeatError) != 0 { - l.Logger.Printf("send heartbeat: success url=%s", n.URL.String()) + l.printf("leaderLoop: send heartbeat: success url=%s", n.URL.String()) atomic.StoreInt64(&n.LastHeartbeatError, 0) atomic.StoreInt64(&n.HeartbeatErrorCount, 0) } @@ -1204,10 +1212,10 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup for { select { case <-l.transitioning: - l.tracef("send heartbeat: transitioning") + l.tracef("leaderLoop: send heartbeat: transitioning") return case peerIndex := <-peerIndices: - l.tracef("send heartbeat: index: idx=%d, idxs=%+v", peerIndex, indexes) + l.tracef("leaderLoop: send heartbeat: index: idx=%d, idxs=%+v", peerIndex, indexes) indexes = append(indexes, peerIndex) // collect responses case ch := <-after: // Once we have enough indices then return the lowest index @@ -1217,9 +1225,9 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup // Return highest index reported by quorum. sort.Sort(sort.Reverse(uint64Slice(indexes))) committed <- indexes[quorumN-1] - l.tracef("send heartbeat: commit: idx=%d, idxs=%+v", commitIndex, indexes) + l.tracef("leaderLoop: send heartbeat: commit: idx=%d, idxs=%+v", commitIndex, indexes) } else { - l.tracef("send heartbeat: no quorum: idxs=%+v", indexes) + l.tracef("leaderLoop: send heartbeat: no quorum: idxs=%+v", indexes) close(committed) } close(ch) @@ -1366,7 +1374,7 @@ func (l *Log) appendToWriters(buf []byte) { // If an error occurs then remove the writer and close it. if _, err := w.Write(buf); err != nil { - l.Logger.Printf("append to writers error: %s", err) + l.printf("append to writers error: %s", err) l.removeWriter(w) i-- continue @@ -1607,13 +1615,13 @@ func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64 // Check if log is closed. if !l.opened() || l.state == Stopped { - l.Logger.Printf("recv heartbeat: closed") + l.tracef("recv heartbeat: closed") return 0, ErrClosed } // Ignore if the incoming term is less than the log's term. if term < l.term { - l.Logger.Printf("recv heartbeat: stale term, ignore: %d < %d", term, l.term) + l.tracef("recv heartbeat: stale term, ignore: %d < %d", term, l.term) return l.lastLogIndex, ErrStaleTerm } @@ -1644,7 +1652,8 @@ func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) ( } defer func() { - l.Logger.Printf("recv req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (err=%v)", term, candidateID, lastLogIndex, lastLogTerm, err) + l.tracef("recv req vote: (term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (err=%v)", + term, candidateID, lastLogIndex, lastLogTerm, err) }() // Deny vote if: @@ -1679,7 +1688,7 @@ func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error { // Validate and initialize the writer. writer, err := l.initWriter(w, id, term, index) if err != nil { - l.Logger.Printf("unable to init writer: %s", err) + l.printf("unable to init writer: %s", err) return err } @@ -1689,7 +1698,7 @@ func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error { l.lock() l.removeWriter(writer) l.unlock() - l.Logger.Printf("unable to write entries: %s", err) + l.printf("unable to write entries: %s", err) return err } @@ -1812,7 +1821,7 @@ func (l *Log) removeWriter(writer *logWriter) { l.writers[len(l.writers)-1] = nil l.writers = l.writers[:len(l.writers)-1] _ = w.Close() - l.Logger.Printf("writer removed: %#v", w) + l.printf("writer removed: id=%d idx=%d", w.id, w.snapshotIndex) break } } @@ -1833,7 +1842,7 @@ func (l *Log) ReadFrom(r io.ReadCloser) error { return nil } - l.Logger.Printf("reading from stream") + l.tracef("reading from stream") // Continually decode entries. dec := NewLogEntryDecoder(r) @@ -1852,13 +1861,13 @@ func (l *Log) ReadFrom(r io.ReadCloser) error { case logEntryConfig: l.tracef("ReadFrom: config") if err := l.applyConfigLogEntry(e); err != nil { - l.Logger.Printf("error reading config from stream: %s", err) + l.printf("error reading config from stream: %s", err) return fmt.Errorf("apply config log entry: %s", err) } case logEntrySnapshot: if err := l.applySnapshotLogEntry(e, r); err != nil { - l.Logger.Printf("error snapshotting from stream: %s", err) + l.printf("error snapshotting from stream: %s", err) return fmt.Errorf("apply snapshot log entry: %s", err) } @@ -1873,7 +1882,7 @@ func (l *Log) ReadFrom(r io.ReadCloser) error { return nil }(); err != nil { - l.Logger.Printf("error appending from stream: %s", err) + l.printf("error appending from stream: %s", err) return err } } @@ -1910,8 +1919,8 @@ func (l *Log) applySnapshotLogEntry(e *LogEntry, r io.Reader) error { // Log snapshotting time. start := time.Now() - l.Logger.Printf("applying snapshot: begin") - defer func() { l.Logger.Printf("applying snapshot: done (%s)", time.Since(start)) }() + l.printf("applying snapshot: begin") + defer func() { l.printf("applying snapshot: done (%s)", time.Since(start)) }() // Let the FSM rebuild its state from the data in r. if _, err := l.FSM.ReadFrom(r); err != nil {