Merge pull request #2418 from influxdb/jw-int-tests

Fix raft node getting stuck in candidate state
pull/2423/head
Jason Wilder 2015-04-24 16:18:10 -06:00
commit 2d2c806f36
3 changed files with 43 additions and 34 deletions

View File

@ -5,6 +5,7 @@
- [#2404](https://github.com/influxdb/influxdb/pull/2404): Mean and percentile function fixes
- [#2408](https://github.com/influxdb/influxdb/pull/2408): Fix snapshot 500 error
- [#1896](https://github.com/influxdb/influxdb/issues/1896): Excessive heartbeater logging of "connection refused" on cluster node stop
- [#2418](https://github.com/influxdb/influxdb/pull/2418): Fix raft node getting stuck in candidate state
### Features
- [#2410](https://github.com/influxdb/influxdb/pull/2410) Allow configuration of Raft timers

View File

@ -350,7 +350,7 @@ func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster,
expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",%d]]}]}]}`, numPoints-1)
got, ok, nOK := queryAndWait(t, nodes, database, `SELECT count(value) FROM cpu`, expected, "", 120*time.Second)
if !ok {
t.Errorf("test %s:rawDataReturnsInOrder failed, SELECT count() query returned unexpected data\nexp: %s\n, got: %s\n%d nodes responded correctly", testName, expected, got, nOK)
t.Errorf("test %s:rawDataReturnsInOrder failed, SELECT count() query returned unexpected data\nexp: %s\ngot: %s\n%d nodes responded correctly", testName, expected, got, nOK)
dumpClusterDiags(t, testName, nodes)
dumpClusterStats(t, testName, nodes)
}
@ -1524,7 +1524,6 @@ func Test3NodeServerFailover(t *testing.T) {
// ensure that all queries work if there are more nodes in a cluster than the replication factor
// and there is more than 1 shards
func Test5NodeClusterPartiallyReplicated(t *testing.T) {
t.Skip("unstable, skipping for now")
t.Parallel()
testName := "5-node server integration partial replication"
if testing.Short() {

View File

@ -393,7 +393,7 @@ func (l *Log) Open(path string) error {
go l.applier(l.closing)
if l.config != nil {
l.Logger.Printf("log open: created at %s, with ID %d, term %d, last applied index of %d", path, l.id, l.term, l.lastLogIndex)
l.printf("log open: created at %s, with ID %d, term %d, last applied index of %d", path, l.id, l.term, l.lastLogIndex)
}
// Retrieve variables to use while starting state loop.
@ -419,7 +419,7 @@ func (l *Log) Open(path string) error {
l.startStateLoop(closing, Follower)
}
} else {
l.Logger.Printf("log pending: waiting for initialization or join")
l.printf("log pending: waiting for initialization or join")
}
return nil
@ -555,7 +555,7 @@ func (l *Log) writeTerm(term uint64) error {
// setTerm sets the current term and clears the vote.
func (l *Log) setTerm(term uint64) error {
l.Logger.Printf("changing term: %d => %d", l.term, term)
l.printf("changing term: %d => %d", l.term, term)
if err := l.writeTerm(term); err != nil {
return err
@ -668,7 +668,7 @@ func (l *Log) Initialize() error {
// Begin state loop as leader.
l.startStateLoop(l.closing, Leader)
l.Logger.Printf("log initialize: promoted to 'leader' with cluster ID %d, log ID %d, term %d",
l.printf("log initialize: promoted to 'leader' with cluster ID %d, log ID %d, term %d",
config.ClusterID, l.id, l.term)
// Set initial configuration.
@ -693,10 +693,14 @@ func (l *Log) trace(v ...interface{}) {
// trace writes a formatted log message if DebugEnabled is true.
func (l *Log) tracef(msg string, v ...interface{}) {
if l.DebugEnabled {
l.Logger.Printf(msg+"\n", v...)
l.printf(msg, v...)
}
}
func (l *Log) printf(msg string, v ...interface{}) {
l.Logger.Printf(fmt.Sprintf("%s[%d]: ", l.state, l.id)+msg+"\n", v...)
}
// IsLeader returns true if the log is the current leader.
func (l *Log) IsLeader() bool {
l.lock()
@ -838,7 +842,7 @@ func (l *Log) stateLoop(closing <-chan struct{}, state State, stateChanged chan
l.lock()
defer l.unlock()
l.Logger.Printf("log state change: %s => %s (term=%d)", l.state, state, l.term)
l.tracef("log state change: %s => %s (term=%d)", l.state, state, l.term)
l.state = state
l.transitioning = make(chan struct{}, 0)
transitioning = l.transitioning
@ -901,7 +905,7 @@ func (l *Log) followerLoop(closing <-chan struct{}) State {
return Candidate
case hb := <-l.heartbeats:
l.tracef("followerLoop: heartbeat: term=%d, idx=%d", hb.term, hb.commitIndex)
l.tracef("followerLoop: recv heartbeat: term=%d, idx=%d", hb.term, hb.commitIndex)
// Update term, commit index & leader.
l.lock()
@ -917,7 +921,7 @@ func (l *Log) followerLoop(closing <-chan struct{}) State {
func (l *Log) readFromLeader(wg *sync.WaitGroup) {
defer wg.Done()
l.tracef("readFromLeader:")
l.tracef("readFromLeader")
for {
select {
@ -944,7 +948,7 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup) {
l.tracef("readFromLeader: read from: %s, id=%d, term=%d, index=%d", u.String(), id, term, lastLogIndex)
r, err := l.Transport.ReadFrom(u, id, term, lastLogIndex)
if err != nil {
l.Logger.Printf("connect stream: %s", err)
l.printf("readFromLeader: connect stream: %s", err)
time.Sleep(500 * time.Millisecond)
continue
}
@ -958,7 +962,7 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup) {
// truncateTo removes all uncommitted entries up to index.
func (l *Log) truncateTo(index uint64) {
assert(index >= l.commitIndex, "cannot truncate to before the commit index: index=%d, commit=%d", index, l.commitIndex)
assert(index >= l.commitIndex, "cannot truncate to before the commit index: id=%d index=%d, commit=%d", l.id, index, l.commitIndex)
// Ignore if there are no entries.
// Ignore if all entries are before the index.
@ -981,7 +985,8 @@ func (l *Log) truncateTo(index uint64) {
l.entries = l.entries[:index-emin+1]
l.lastLogIndex = index
assert(l.entries[len(l.entries)-1].Index == index, "last entry in truncation not index: emax=%d, index=%d", l.entries[len(l.entries)-1].Index, index)
assert(l.entries[len(l.entries)-1].Index == index, "last entry in truncation not index: id=%d emax=%d, index=%d",
l.id, l.entries[len(l.entries)-1].Index, index)
}
// candidateLoop requests vote from other nodes in an attempt to become leader.
@ -1021,7 +1026,10 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {
l.lock()
l.mustSetTermIfHigher(hb.term)
if hb.term >= l.term {
l.tracef("candidateLoop: new leader: old=%d new=%d", l.leaderID, hb.leaderID)
l.leaderID = hb.leaderID
l.unlock()
return Follower
}
l.unlock()
case <-l.terms:
@ -1056,7 +1064,7 @@ func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup) {
}
go func(n *ConfigNode) {
peerTerm, err := l.Transport.RequestVote(n.URL, term, id, lastLogIndex, lastLogTerm)
l.Logger.Printf("send req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (term=%d, err=%v)", term, id, lastLogIndex, lastLogTerm, peerTerm, err)
l.tracef("send req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (term=%d, err=%v)", term, id, lastLogIndex, lastLogTerm, peerTerm, err)
// If an error occured then update term.
if err != nil {
@ -1169,7 +1177,7 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup
return
}
l.tracef("send heartbeat: start: n=%d", len(config.Nodes))
l.tracef("leaderLoop: send heartbeat: start: n=%d", len(config.Nodes))
// Send heartbeats to all peers.
peerIndices := make(chan uint64, len(config.Nodes))
@ -1183,13 +1191,13 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup
c := atomic.AddInt64(&n.HeartbeatErrorCount, 1)
// log heartbeat error once every 15 seconds to avoid flooding the logs
if time.Now().Unix()-atomic.LoadInt64(&n.LastHeartbeatError) > heartbeartErrorLogThreshold {
l.Logger.Printf("send heartbeat: error: cnt=%d %s", c, err)
l.printf("leaderLoop: send heartbeat: error: cnt=%d %s", c, err)
atomic.StoreInt64(&n.LastHeartbeatError, time.Now().Unix())
}
return
}
if atomic.LoadInt64(&n.LastHeartbeatError) != 0 {
l.Logger.Printf("send heartbeat: success url=%s", n.URL.String())
l.printf("leaderLoop: send heartbeat: success url=%s", n.URL.String())
atomic.StoreInt64(&n.LastHeartbeatError, 0)
atomic.StoreInt64(&n.HeartbeatErrorCount, 0)
}
@ -1204,10 +1212,10 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup
for {
select {
case <-l.transitioning:
l.tracef("send heartbeat: transitioning")
l.tracef("leaderLoop: send heartbeat: transitioning")
return
case peerIndex := <-peerIndices:
l.tracef("send heartbeat: index: idx=%d, idxs=%+v", peerIndex, indexes)
l.tracef("leaderLoop: send heartbeat: index: idx=%d, idxs=%+v", peerIndex, indexes)
indexes = append(indexes, peerIndex) // collect responses
case ch := <-after:
// Once we have enough indices then return the lowest index
@ -1217,9 +1225,9 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup
// Return highest index reported by quorum.
sort.Sort(sort.Reverse(uint64Slice(indexes)))
committed <- indexes[quorumN-1]
l.tracef("send heartbeat: commit: idx=%d, idxs=%+v", commitIndex, indexes)
l.tracef("leaderLoop: send heartbeat: commit: idx=%d, idxs=%+v", commitIndex, indexes)
} else {
l.tracef("send heartbeat: no quorum: idxs=%+v", indexes)
l.tracef("leaderLoop: send heartbeat: no quorum: idxs=%+v", indexes)
close(committed)
}
close(ch)
@ -1366,7 +1374,7 @@ func (l *Log) appendToWriters(buf []byte) {
// If an error occurs then remove the writer and close it.
if _, err := w.Write(buf); err != nil {
l.Logger.Printf("append to writers error: %s", err)
l.printf("append to writers error: %s", err)
l.removeWriter(w)
i--
continue
@ -1607,13 +1615,13 @@ func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64
// Check if log is closed.
if !l.opened() || l.state == Stopped {
l.Logger.Printf("recv heartbeat: closed")
l.tracef("recv heartbeat: closed")
return 0, ErrClosed
}
// Ignore if the incoming term is less than the log's term.
if term < l.term {
l.Logger.Printf("recv heartbeat: stale term, ignore: %d < %d", term, l.term)
l.tracef("recv heartbeat: stale term, ignore: %d < %d", term, l.term)
return l.lastLogIndex, ErrStaleTerm
}
@ -1644,7 +1652,8 @@ func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (
}
defer func() {
l.Logger.Printf("recv req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (err=%v)", term, candidateID, lastLogIndex, lastLogTerm, err)
l.tracef("recv req vote: (term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (err=%v)",
term, candidateID, lastLogIndex, lastLogTerm, err)
}()
// Deny vote if:
@ -1679,7 +1688,7 @@ func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error {
// Validate and initialize the writer.
writer, err := l.initWriter(w, id, term, index)
if err != nil {
l.Logger.Printf("unable to init writer: %s", err)
l.printf("unable to init writer: %s", err)
return err
}
@ -1689,7 +1698,7 @@ func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error {
l.lock()
l.removeWriter(writer)
l.unlock()
l.Logger.Printf("unable to write entries: %s", err)
l.printf("unable to write entries: %s", err)
return err
}
@ -1812,7 +1821,7 @@ func (l *Log) removeWriter(writer *logWriter) {
l.writers[len(l.writers)-1] = nil
l.writers = l.writers[:len(l.writers)-1]
_ = w.Close()
l.Logger.Printf("writer removed: %#v", w)
l.printf("writer removed: id=%d idx=%d", w.id, w.snapshotIndex)
break
}
}
@ -1833,7 +1842,7 @@ func (l *Log) ReadFrom(r io.ReadCloser) error {
return nil
}
l.Logger.Printf("reading from stream")
l.tracef("reading from stream")
// Continually decode entries.
dec := NewLogEntryDecoder(r)
@ -1852,13 +1861,13 @@ func (l *Log) ReadFrom(r io.ReadCloser) error {
case logEntryConfig:
l.tracef("ReadFrom: config")
if err := l.applyConfigLogEntry(e); err != nil {
l.Logger.Printf("error reading config from stream: %s", err)
l.printf("error reading config from stream: %s", err)
return fmt.Errorf("apply config log entry: %s", err)
}
case logEntrySnapshot:
if err := l.applySnapshotLogEntry(e, r); err != nil {
l.Logger.Printf("error snapshotting from stream: %s", err)
l.printf("error snapshotting from stream: %s", err)
return fmt.Errorf("apply snapshot log entry: %s", err)
}
@ -1873,7 +1882,7 @@ func (l *Log) ReadFrom(r io.ReadCloser) error {
return nil
}(); err != nil {
l.Logger.Printf("error appending from stream: %s", err)
l.printf("error appending from stream: %s", err)
return err
}
}
@ -1910,8 +1919,8 @@ func (l *Log) applySnapshotLogEntry(e *LogEntry, r io.Reader) error {
// Log snapshotting time.
start := time.Now()
l.Logger.Printf("applying snapshot: begin")
defer func() { l.Logger.Printf("applying snapshot: done (%s)", time.Since(start)) }()
l.printf("applying snapshot: begin")
defer func() { l.printf("applying snapshot: done (%s)", time.Since(start)) }()
// Let the FSM rebuild its state from the data in r.
if _, err := l.FSM.ReadFrom(r); err != nil {