From bf9975be66d830d4eddf98a6c4d5a0e498e2a310 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Fri, 4 Apr 2014 15:58:02 -0400 Subject: [PATCH] fix #408. Make InfluxDB recover from internal bugs and panics --- CHANGELOG.md | 1 + src/cluster/shard.go | 4 ++++ src/common/recover.go | 22 ++++++++++++++++++++++ src/coordinator/coordinator.go | 24 ++++++------------------ src/coordinator/raft_server.go | 9 +++++---- 5 files changed, 38 insertions(+), 22 deletions(-) create mode 100644 src/common/recover.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 57ac939d78..917dc48494 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - [Issue #403](https://github.com/influxdb/influxdb/issues/403). Filtering should work with join queries - [Issue #404](https://github.com/influxdb/influxdb/issues/404). Filtering with invalid condition shouldn't crash the server - [Issue #405](https://github.com/influxdb/influxdb/issues/405). Percentile shouldn't crash for small number of values +- [Issue #408](https://github.com/influxdb/influxdb/issues/408). Make InfluxDB recover from internal bugs and panics - Close leveldb databases properly if we couldn't create a new Shard. See leveldb\_shard\_datastore\_test:131 ## v0.5.4 [2014-04-02] diff --git a/src/cluster/shard.go b/src/cluster/shard.go index 989f0a8907..a1c430ae6e 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -202,6 +202,10 @@ func (self *ShardData) WriteLocalOnly(request *p.Request) error { } func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Response) { + defer common.RecoverFunc(querySpec.Database(), querySpec.GetQueryString(), func(err interface{}) { + response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(fmt.Sprintf("%s", err))} + }) + // This is only for queries that are deletes or drops. They need to be sent everywhere as opposed to just the local or one of the remote shards. // But this boolean should only be set to true on the server that receives the initial query. if querySpec.RunAgainstAllServersInShard { diff --git a/src/common/recover.go b/src/common/recover.go new file mode 100644 index 0000000000..0dbad0da6c --- /dev/null +++ b/src/common/recover.go @@ -0,0 +1,22 @@ +package common + +import ( + "fmt" + "os" + "runtime" +) + +func RecoverFunc(database, query string, cleanup func(err interface{})) { + if err := recover(); err != nil { + fmt.Fprintf(os.Stderr, "********************************BUG********************************\n") + buf := make([]byte, 1024) + n := runtime.Stack(buf, false) + fmt.Fprintf(os.Stderr, "Database: %s\n", database) + fmt.Fprintf(os.Stderr, "Query: [%s]\n", query) + fmt.Fprintf(os.Stderr, "Error: %s. Stacktrace: %s\n", err, string(buf[:n])) + err = NewQueryError(InternalError, "Internal Error") + if cleanup != nil { + cleanup(err) + } + } +} diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index a94099217d..c8c673f248 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -7,11 +7,9 @@ import ( "engine" "fmt" "math" - "os" "parser" "protocol" "regexp" - "runtime" "strings" "sync" "time" @@ -81,7 +79,7 @@ func NewCoordinatorImpl(config *configuration.Configuration, raftServer ClusterC func (self *CoordinatorImpl) RunQuery(user common.User, database string, queryString string, seriesWriter SeriesWriter) (err error) { log.Debug("COORD: RunQuery: %s", queryString) // don't let a panic pass beyond RunQuery - defer recoverFunc(database, queryString) + defer common.RecoverFunc(database, queryString, nil) q, err := parser.ParseQuery(queryString) if err != nil { @@ -436,18 +434,6 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri return err } -func recoverFunc(database, query string) { - if err := recover(); err != nil { - fmt.Fprintf(os.Stderr, "********************************BUG********************************\n") - buf := make([]byte, 1024) - n := runtime.Stack(buf, false) - fmt.Fprintf(os.Stderr, "Database: %s\n", database) - fmt.Fprintf(os.Stderr, "Query: [%s]\n", query) - fmt.Fprintf(os.Stderr, "Error: %s. Stacktrace: %s\n", err, string(buf[:n])) - err = common.NewQueryError(common.InternalError, "Internal Error") - } -} - func (self *CoordinatorImpl) ForceCompaction(user common.User) error { if !user.IsClusterAdmin() { return fmt.Errorf("Insufficient permissions to force a log compaction") @@ -491,11 +477,11 @@ func (self *CoordinatorImpl) ProcessContinuousQueries(db string, series *protoco tableValue := table.Name if regex, ok := tableValue.GetCompiledRegex(); ok { if regex.MatchString(incomingSeriesName) { - self.InterpolateValuesAndCommit(db, series, targetName, false) + self.InterpolateValuesAndCommit(query.GetQueryString(), db, series, targetName, false) } } else { if tableValue.Name == incomingSeriesName { - self.InterpolateValuesAndCommit(db, series, targetName, false) + self.InterpolateValuesAndCommit(query.GetQueryString(), db, series, targetName, false) } } } @@ -503,7 +489,9 @@ func (self *CoordinatorImpl) ProcessContinuousQueries(db string, series *protoco } } -func (self *CoordinatorImpl) InterpolateValuesAndCommit(db string, series *protocol.Series, targetName string, assignSequenceNumbers bool) error { +func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string, series *protocol.Series, targetName string, assignSequenceNumbers bool) error { + defer common.RecoverFunc(db, query, nil) + targetName = strings.Replace(targetName, ":series_name", *series.Name, -1) type sequenceKey struct { seriesName string diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index a64784e6fe..2c4111339f 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -3,15 +3,12 @@ package coordinator import ( "bytes" "cluster" - log "code.google.com/p/log4go" "common" "configuration" "encoding/binary" "encoding/json" "errors" "fmt" - "github.com/goraft/raft" - "github.com/gorilla/mux" "io/ioutil" "math/rand" "net" @@ -23,6 +20,10 @@ import ( "strings" "sync" "time" + + log "code.google.com/p/log4go" + "github.com/goraft/raft" + "github.com/gorilla/mux" ) const ( @@ -464,7 +465,7 @@ func (s *RaftServer) runContinuousQuery(db string, query *parser.SelectQuery, st queryString := query.GetQueryStringForContinuousQuery(start, end) f := func(series *protocol.Series) error { - return s.coordinator.InterpolateValuesAndCommit(db, series, targetName, true) + return s.coordinator.InterpolateValuesAndCommit(query.GetQueryString(), db, series, targetName, true) } writer := NewContinuousQueryWriter(f)