diff --git a/CHANGELOG.md b/CHANGELOG.md index 33c647c9a0..5a5456fc58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - [#5016](https://github.com/influxdb/influxdb/pull/5016): Don't panic if Meta data directory not writable. Thanks @oiooj - [#5059](https://github.com/influxdb/influxdb/pull/5059): Fix unmarshal of database error by client code. Thanks @farshidtz - [#4940](https://github.com/influxdb/influxdb/pull/4940): Fix distributed aggregate query query error. Thanks @li-ang +- [#4622](https://github.com/influxdb/influxdb/issues/4622): Fix panic when passing too large of timestamps to OpenTSDB input. ## v0.9.6 [2015-12-09] diff --git a/meta/store.go b/meta/store.go index 4792a025e2..6015f7ea68 100644 --- a/meta/store.go +++ b/meta/store.go @@ -24,6 +24,7 @@ import ( "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/meta/internal" + "github.com/influxdb/influxdb/models" "golang.org/x/crypto/bcrypt" ) @@ -1225,6 +1226,10 @@ func (s *Store) DropRetentionPolicy(database, name string) error { // CreateShardGroup creates a new shard group in a retention policy for a given time. func (s *Store) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) { + // Check the time is valid since we are about to encode it as an int64 + if err := models.CheckTime(timestamp); err != nil { + return nil, err + } if err := s.exec(internal.Command_CreateShardGroupCommand, internal.E_CreateShardGroupCommand_Command, &internal.CreateShardGroupCommand{ Database: proto.String(database), @@ -1250,7 +1255,11 @@ func (s *Store) CreateShardGroupIfNotExists(database, policy string, timestamp t // Attempt to create database. sgi, err := s.CreateShardGroup(database, policy, timestamp) if err == ErrShardGroupExists { - return s.ShardGroupByTimestamp(database, policy, timestamp) + sgi, err = s.ShardGroupByTimestamp(database, policy, timestamp) + } + // Check that we are returning either an error or a valid shard group. + if sgi == nil && err == nil { + return nil, errors.New("failed to create a new shard group, error unknown.") } return sgi, err } diff --git a/models/points.go b/models/points.go index 274369a6fb..b7669a1b54 100644 --- a/models/points.go +++ b/models/points.go @@ -232,11 +232,32 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err if err != nil { return nil, err } - pt.time = time.Unix(0, ts*pt.GetPrecisionMultiplier(precision)).UTC() + pt.time, err = SafeCalcTime(ts, precision) + if err != nil { + return nil, err + } } return pt, nil } +// GetPrecisionMultiplier will return a multiplier for the precision specified +func GetPrecisionMultiplier(precision string) int64 { + d := time.Nanosecond + switch precision { + case "u": + d = time.Microsecond + case "ms": + d = time.Millisecond + case "s": + d = time.Second + case "m": + d = time.Minute + case "h": + d = time.Hour + } + return int64(d) +} + // scanKey scans buf starting at i for the measurement and tag portion of the point. // It returns the ending position and the byte slice of key within buf. If there // are tags, they will be sorted if they are not already. @@ -1028,11 +1049,16 @@ func unescapeStringField(in string) string { } // NewPoint returns a new point with the given measurement name, tags, fields and timestamp. If -// an unsupported field value (NaN) is passed, this function returns an error. +// an unsupported field value (NaN) or out of range time is passed, this function returns an error. func NewPoint(name string, tags Tags, fields Fields, time time.Time) (Point, error) { if len(fields) == 0 { return nil, fmt.Errorf("Point without fields is unsupported") } + if !time.IsZero() { + if err := CheckTime(time); err != nil { + return nil, err + } + } for key, value := range fields { if fv, ok := value.(float64); ok { @@ -1200,24 +1226,6 @@ func (p *point) SetPrecision(precision string) { } } -// GetPrecisionMultiplier will return a multiplier for the precision specified -func (p *point) GetPrecisionMultiplier(precision string) int64 { - d := time.Nanosecond - switch precision { - case "u": - d = time.Microsecond - case "ms": - d = time.Millisecond - case "s": - d = time.Second - case "m": - d = time.Minute - case "h": - d = time.Hour - } - return int64(d) -} - func (p *point) String() string { if p.Time().IsZero() { return string(p.Key()) + " " + string(p.fields) @@ -1264,7 +1272,7 @@ func (p *point) PrecisionString(precision string) string { return fmt.Sprintf("%s %s", p.Key(), string(p.fields)) } return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields), - p.UnixNano()/p.GetPrecisionMultiplier(precision)) + p.UnixNano()/GetPrecisionMultiplier(precision)) } func (p *point) RoundedString(d time.Duration) string { diff --git a/models/points_test.go b/models/points_test.go index ccbb2cdd81..36f2986d81 100644 --- a/models/points_test.go +++ b/models/points_test.go @@ -43,6 +43,24 @@ func BenchmarkParsePointNoTags(b *testing.B) { } } +func BenchmarkParsePointWithPrecisionN(b *testing.B) { + line := `cpu value=1i 1000000000` + defaultTime := time.Now().UTC() + for i := 0; i < b.N; i++ { + models.ParsePointsWithPrecision([]byte(line), defaultTime, "n") + b.SetBytes(int64(len(line))) + } +} + +func BenchmarkParsePointWithPrecisionU(b *testing.B) { + line := `cpu value=1i 1000000000` + defaultTime := time.Now().UTC() + for i := 0; i < b.N; i++ { + models.ParsePointsWithPrecision([]byte(line), defaultTime, "u") + b.SetBytes(int64(len(line))) + } +} + func BenchmarkParsePointsTagsSorted2(b *testing.B) { line := `cpu,host=serverA,region=us-west value=1i 1000000000` for i := 0; i < b.N; i++ { diff --git a/models/time.go b/models/time.go new file mode 100644 index 0000000000..2da5076a63 --- /dev/null +++ b/models/time.go @@ -0,0 +1,52 @@ +package models + +// Helper time methods since parsing time can easily overflow and we only support a +// specific time range. + +import ( + "fmt" + "math" + "time" +) + +var ( + // Maximum time that can be represented via int64 nanoseconds since the epoch. + MaxNanoTime = time.Unix(0, math.MaxInt64).UTC() + // Minumum time that can be represented via int64 nanoseconds since the epoch. + MinNanoTime = time.Unix(0, math.MinInt64).UTC() + + // The time is out of the representable range using int64 nanoseconds since the epoch. + ErrTimeOutOfRange = fmt.Errorf("time outside range %s - %s", MinNanoTime, MaxNanoTime) +) + +// Safely calculate the time given. Will return error if the time is outside the +// supported range. +func SafeCalcTime(timestamp int64, precision string) (time.Time, error) { + mult := GetPrecisionMultiplier(precision) + if t, ok := safeSignedMult(timestamp, mult); ok { + return time.Unix(0, t).UTC(), nil + } else { + return time.Time{}, ErrTimeOutOfRange + } + +} + +// Check that a time is within the safe range. +func CheckTime(t time.Time) error { + if t.Before(MinNanoTime) || t.After(MaxNanoTime) { + return ErrTimeOutOfRange + } + return nil +} + +// Perform the multiplication and check to make sure it didn't overflow. +func safeSignedMult(a, b int64) (int64, bool) { + if a == 0 || b == 0 || a == 1 || b == 1 { + return a * b, true + } + if a == math.MinInt64 || b == math.MaxInt64 { + return 0, false + } + c := a * b + return c, c/b == a +} diff --git a/services/collectd/service.go b/services/collectd/service.go index e8bf39d227..a60deecf48 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -22,13 +22,14 @@ const leaderWaitTimeout = 30 * time.Second // statistics gathered by the collectd service. const ( - statPointsReceived = "pointsRx" - statBytesReceived = "bytesRx" - statPointsParseFail = "pointsParseFail" - statReadFail = "readFail" - statBatchesTrasmitted = "batchesTx" - statPointsTransmitted = "pointsTx" - statBatchesTransmitFail = "batchesTxFail" + statPointsReceived = "pointsRx" + statBytesReceived = "bytesRx" + statPointsParseFail = "pointsParseFail" + statReadFail = "readFail" + statBatchesTrasmitted = "batchesTx" + statPointsTransmitted = "pointsTx" + statBatchesTransmitFail = "batchesTxFail" + statDroppedPointsInvalid = "droppedPointsInvalid" ) // pointsWriter is an internal interface to make testing easier. @@ -233,7 +234,7 @@ func (s *Service) handleMessage(buffer []byte) { return } for _, packet := range *packets { - points := Unmarshal(&packet) + points := s.UnmarshalCollectd(&packet) for _, p := range points { s.batcher.In() <- p } @@ -266,7 +267,7 @@ func (s *Service) writePoints() { } // Unmarshal translates a collectd packet into InfluxDB data points. -func Unmarshal(packet *gollectd.Packet) []models.Point { +func (s *Service) UnmarshalCollectd(packet *gollectd.Packet) []models.Point { // Prefer high resolution timestamp. var timestamp time.Time if packet.TimeHR > 0 { @@ -302,8 +303,10 @@ func Unmarshal(packet *gollectd.Packet) []models.Point { tags["type_instance"] = packet.TypeInstance } p, err := models.NewPoint(name, tags, fields, timestamp) - // Drop points values of NaN since they are not supported + // Drop invalid points if err != nil { + s.Logger.Printf("Dropping point %v: %v", p.Name, err) + s.statMap.Add(statDroppedPointsInvalid, 1) continue } diff --git a/services/opentsdb/handler.go b/services/opentsdb/handler.go index d379ce4459..91c212cd8a 100644 --- a/services/opentsdb/handler.go +++ b/services/opentsdb/handler.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "encoding/json" "errors" + "expvar" "io" "log" "net" @@ -27,6 +28,8 @@ type Handler struct { } Logger *log.Logger + + statMap *expvar.Map } func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -113,6 +116,7 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) { pt, err := models.NewPoint(p.Metric, p.Tags, map[string]interface{}{"value": p.Value}, ts) if err != nil { h.Logger.Printf("Dropping point %v: %v", p.Metric, err) + h.statMap.Add(statDroppedPointsInvalid, 1) continue } points = append(points, pt) diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index 62d1c9494e..8d576f3dd6 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -42,6 +42,7 @@ const ( statBatchesTransmitFail = "batchesTxFail" statConnectionsActive = "connsActive" statConnectionsHandled = "connsHandled" + statDroppedPointsInvalid = "droppedPointsInvalid" ) // Service manages the listener and handler for an HTTP endpoint. @@ -367,6 +368,7 @@ func (s *Service) serveHTTP() { ConsistencyLevel: s.ConsistencyLevel, PointsWriter: s.PointsWriter, Logger: s.Logger, + statMap: s.statMap, }} srv.Serve(s.httpln) }