Fix term signal.

This commit changes raft so that term changes are made immediately and
term change signals are made afterward. Previously, election timeouts
were invalidated by incoming term changes which caused an election loop.

Stale term was also fixed and http/pprof was added too.
pull/2236/head
Ben Johnson 2015-04-10 12:59:55 -06:00
parent c757039f70
commit eaf4bfca0a
7 changed files with 32 additions and 7 deletions

View File

@ -3,6 +3,7 @@
### Bugfixes
- [#2225](https://github.com/influxdb/influxdb/pull/2225): Make keywords completely case insensitive
- [#2228](https://github.com/influxdb/influxdb/pull/2228): Accept keyword default unquoted in ALTER RETENTION POLICY statement
- [#2236](https://github.com/influxdb/influxdb/pull/2236): Immediate term changes, fix stale write issue, net/http/pprof
## v0.9.0-rc22 [2015-04-09]

View File

@ -184,6 +184,10 @@ type Config struct {
WriteInterval Duration `toml:"write-interval"`
} `toml:"monitoring"`
Debugging struct {
PprofEnabled bool `toml:"pprof-enabled"`
} `toml:"debugging"`
ContinuousQuery struct {
// when continuous queries are run we'll automatically recompute previous intervals
// in case lagged data came in. Set to zero if you never have lagged data. We do

View File

@ -4,6 +4,7 @@ import (
"log"
"math/rand"
"net/http"
"net/http/pprof"
"net/url"
"strings"
@ -29,6 +30,21 @@ func NewHandler() *Handler {
// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Debug routes.
if h.Config.Debugging.PprofEnabled && strings.HasPrefix(r.URL.Path, "/debug/pprof") {
switch r.URL.Path {
case "/debug/pprof/cmdline":
pprof.Cmdline(w, r)
case "/debug/pprof/profile":
pprof.Profile(w, r)
case "/debug/pprof/symbol":
pprof.Symbol(w, r)
default:
pprof.Index(w, r)
}
return
}
// FIXME: This is very brittle. Refactor to have common path prefix
if strings.HasPrefix(r.URL.Path, "/raft") {
h.serveRaft(w, r)

View File

@ -495,12 +495,9 @@ func (b *Broker) applySetTopicMaxIndex(m *Message) {
if t := b.topics[topicID]; t != nil {
t.mu.Lock()
defer t.mu.Unlock()
// Track the highest replicated index per data node URL
t.indexByURL[u] = index
if t.index < index {
t.index = index
}
}
}
@ -815,6 +812,9 @@ func (t *Topic) WriteMessage(m *Message) error {
return fmt.Errorf("write segment: %s", err)
}
// Update index.
t.index = m.Index
return nil
}

View File

@ -133,9 +133,6 @@ func TestBroker_Apply_SetMaxTopicIndex(t *testing.T) {
t.Fatalf("apply error: %s", err)
}
if topic := b.Topic(20); topic.Index() != 5 {
t.Fatalf("unexpected topic index: %d", topic.Index())
}
if topic := b.Topic(20); topic.IndexForURL(*testDataURL) != 5 {
t.Fatalf("unexpected topic url index: %d", topic.IndexForURL(*testDataURL))
}

View File

@ -546,6 +546,12 @@ func (l *Log) mustSetTermIfHigher(term uint64) {
if err := l.setTerm(term); err != nil {
panic("unable to set term: " + err.Error())
}
// Signal term change.
select {
case l.terms <- struct{}{}:
default:
}
}
// readConfig reads the configuration from disk.

View File

@ -1771,6 +1771,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
for _, p := range points {
measurement, series := db.MeasurementAndSeries(p.Name, p.Tags)
if series == nil {
s.Logger.Printf("series not found: name=%s, tags=%#v", p.Name, p.Tags)
return ErrSeriesNotFound
}