fix #408. Make InfluxDB recover from internal bugs and panics
parent
cd7c639436
commit
bf9975be66
|
@ -15,6 +15,7 @@
|
|||
- [Issue #403](https://github.com/influxdb/influxdb/issues/403). Filtering should work with join queries
|
||||
- [Issue #404](https://github.com/influxdb/influxdb/issues/404). Filtering with invalid condition shouldn't crash the server
|
||||
- [Issue #405](https://github.com/influxdb/influxdb/issues/405). Percentile shouldn't crash for small number of values
|
||||
- [Issue #408](https://github.com/influxdb/influxdb/issues/408). Make InfluxDB recover from internal bugs and panics
|
||||
- Close leveldb databases properly if we couldn't create a new Shard. See leveldb\_shard\_datastore\_test:131
|
||||
|
||||
## v0.5.4 [2014-04-02]
|
||||
|
|
|
@ -202,6 +202,10 @@ func (self *ShardData) WriteLocalOnly(request *p.Request) error {
|
|||
}
|
||||
|
||||
func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Response) {
|
||||
defer common.RecoverFunc(querySpec.Database(), querySpec.GetQueryString(), func(err interface{}) {
|
||||
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(fmt.Sprintf("%s", err))}
|
||||
})
|
||||
|
||||
// This is only for queries that are deletes or drops. They need to be sent everywhere as opposed to just the local or one of the remote shards.
|
||||
// But this boolean should only be set to true on the server that receives the initial query.
|
||||
if querySpec.RunAgainstAllServersInShard {
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
func RecoverFunc(database, query string, cleanup func(err interface{})) {
|
||||
if err := recover(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "********************************BUG********************************\n")
|
||||
buf := make([]byte, 1024)
|
||||
n := runtime.Stack(buf, false)
|
||||
fmt.Fprintf(os.Stderr, "Database: %s\n", database)
|
||||
fmt.Fprintf(os.Stderr, "Query: [%s]\n", query)
|
||||
fmt.Fprintf(os.Stderr, "Error: %s. Stacktrace: %s\n", err, string(buf[:n]))
|
||||
err = NewQueryError(InternalError, "Internal Error")
|
||||
if cleanup != nil {
|
||||
cleanup(err)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -7,11 +7,9 @@ import (
|
|||
"engine"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"parser"
|
||||
"protocol"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -81,7 +79,7 @@ func NewCoordinatorImpl(config *configuration.Configuration, raftServer ClusterC
|
|||
func (self *CoordinatorImpl) RunQuery(user common.User, database string, queryString string, seriesWriter SeriesWriter) (err error) {
|
||||
log.Debug("COORD: RunQuery: %s", queryString)
|
||||
// don't let a panic pass beyond RunQuery
|
||||
defer recoverFunc(database, queryString)
|
||||
defer common.RecoverFunc(database, queryString, nil)
|
||||
|
||||
q, err := parser.ParseQuery(queryString)
|
||||
if err != nil {
|
||||
|
@ -436,18 +434,6 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri
|
|||
return err
|
||||
}
|
||||
|
||||
func recoverFunc(database, query string) {
|
||||
if err := recover(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "********************************BUG********************************\n")
|
||||
buf := make([]byte, 1024)
|
||||
n := runtime.Stack(buf, false)
|
||||
fmt.Fprintf(os.Stderr, "Database: %s\n", database)
|
||||
fmt.Fprintf(os.Stderr, "Query: [%s]\n", query)
|
||||
fmt.Fprintf(os.Stderr, "Error: %s. Stacktrace: %s\n", err, string(buf[:n]))
|
||||
err = common.NewQueryError(common.InternalError, "Internal Error")
|
||||
}
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) ForceCompaction(user common.User) error {
|
||||
if !user.IsClusterAdmin() {
|
||||
return fmt.Errorf("Insufficient permissions to force a log compaction")
|
||||
|
@ -491,11 +477,11 @@ func (self *CoordinatorImpl) ProcessContinuousQueries(db string, series *protoco
|
|||
tableValue := table.Name
|
||||
if regex, ok := tableValue.GetCompiledRegex(); ok {
|
||||
if regex.MatchString(incomingSeriesName) {
|
||||
self.InterpolateValuesAndCommit(db, series, targetName, false)
|
||||
self.InterpolateValuesAndCommit(query.GetQueryString(), db, series, targetName, false)
|
||||
}
|
||||
} else {
|
||||
if tableValue.Name == incomingSeriesName {
|
||||
self.InterpolateValuesAndCommit(db, series, targetName, false)
|
||||
self.InterpolateValuesAndCommit(query.GetQueryString(), db, series, targetName, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -503,7 +489,9 @@ func (self *CoordinatorImpl) ProcessContinuousQueries(db string, series *protoco
|
|||
}
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) InterpolateValuesAndCommit(db string, series *protocol.Series, targetName string, assignSequenceNumbers bool) error {
|
||||
func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string, series *protocol.Series, targetName string, assignSequenceNumbers bool) error {
|
||||
defer common.RecoverFunc(db, query, nil)
|
||||
|
||||
targetName = strings.Replace(targetName, ":series_name", *series.Name, -1)
|
||||
type sequenceKey struct {
|
||||
seriesName string
|
||||
|
|
|
@ -3,15 +3,12 @@ package coordinator
|
|||
import (
|
||||
"bytes"
|
||||
"cluster"
|
||||
log "code.google.com/p/log4go"
|
||||
"common"
|
||||
"configuration"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/goraft/raft"
|
||||
"github.com/gorilla/mux"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net"
|
||||
|
@ -23,6 +20,10 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "code.google.com/p/log4go"
|
||||
"github.com/goraft/raft"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -464,7 +465,7 @@ func (s *RaftServer) runContinuousQuery(db string, query *parser.SelectQuery, st
|
|||
queryString := query.GetQueryStringForContinuousQuery(start, end)
|
||||
|
||||
f := func(series *protocol.Series) error {
|
||||
return s.coordinator.InterpolateValuesAndCommit(db, series, targetName, true)
|
||||
return s.coordinator.InterpolateValuesAndCommit(query.GetQueryString(), db, series, targetName, true)
|
||||
}
|
||||
|
||||
writer := NewContinuousQueryWriter(f)
|
||||
|
|
Loading…
Reference in New Issue