Handle error when point doesnt have a shard group
pull/5076/head
Nathaniel Cook 2015-12-09 15:09:56 -07:00
commit 50176c4c86
8 changed files with 129 additions and 32 deletions

View File

@ -6,6 +6,7 @@
- [#5016](https://github.com/influxdb/influxdb/pull/5016): Don't panic if Meta data directory not writable. Thanks @oiooj
- [#5059](https://github.com/influxdb/influxdb/pull/5059): Fix unmarshal of database error by client code. Thanks @farshidtz
- [#4940](https://github.com/influxdb/influxdb/pull/4940): Fix distributed aggregate query query error. Thanks @li-ang
- [#4622](https://github.com/influxdb/influxdb/issues/4622): Fix panic when passing too large of timestamps to OpenTSDB input.
## v0.9.6 [2015-12-09]

View File

@ -24,6 +24,7 @@ import (
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta/internal"
"github.com/influxdb/influxdb/models"
"golang.org/x/crypto/bcrypt"
)
@ -1225,6 +1226,10 @@ func (s *Store) DropRetentionPolicy(database, name string) error {
// CreateShardGroup creates a new shard group in a retention policy for a given time.
func (s *Store) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
// Check the time is valid since we are about to encode it as an int64
if err := models.CheckTime(timestamp); err != nil {
return nil, err
}
if err := s.exec(internal.Command_CreateShardGroupCommand, internal.E_CreateShardGroupCommand_Command,
&internal.CreateShardGroupCommand{
Database: proto.String(database),
@ -1250,7 +1255,11 @@ func (s *Store) CreateShardGroupIfNotExists(database, policy string, timestamp t
// Attempt to create database.
sgi, err := s.CreateShardGroup(database, policy, timestamp)
if err == ErrShardGroupExists {
return s.ShardGroupByTimestamp(database, policy, timestamp)
sgi, err = s.ShardGroupByTimestamp(database, policy, timestamp)
}
// Check that we are returning either an error or a valid shard group.
if sgi == nil && err == nil {
return nil, errors.New("failed to create a new shard group, error unknown.")
}
return sgi, err
}

View File

@ -232,11 +232,32 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err
if err != nil {
return nil, err
}
pt.time = time.Unix(0, ts*pt.GetPrecisionMultiplier(precision)).UTC()
pt.time, err = SafeCalcTime(ts, precision)
if err != nil {
return nil, err
}
}
return pt, nil
}
// GetPrecisionMultiplier will return a multiplier for the precision specified
func GetPrecisionMultiplier(precision string) int64 {
d := time.Nanosecond
switch precision {
case "u":
d = time.Microsecond
case "ms":
d = time.Millisecond
case "s":
d = time.Second
case "m":
d = time.Minute
case "h":
d = time.Hour
}
return int64(d)
}
// scanKey scans buf starting at i for the measurement and tag portion of the point.
// It returns the ending position and the byte slice of key within buf. If there
// are tags, they will be sorted if they are not already.
@ -1028,11 +1049,16 @@ func unescapeStringField(in string) string {
}
// NewPoint returns a new point with the given measurement name, tags, fields and timestamp. If
// an unsupported field value (NaN) is passed, this function returns an error.
// an unsupported field value (NaN) or out of range time is passed, this function returns an error.
func NewPoint(name string, tags Tags, fields Fields, time time.Time) (Point, error) {
if len(fields) == 0 {
return nil, fmt.Errorf("Point without fields is unsupported")
}
if !time.IsZero() {
if err := CheckTime(time); err != nil {
return nil, err
}
}
for key, value := range fields {
if fv, ok := value.(float64); ok {
@ -1200,24 +1226,6 @@ func (p *point) SetPrecision(precision string) {
}
}
// GetPrecisionMultiplier will return a multiplier for the precision specified
func (p *point) GetPrecisionMultiplier(precision string) int64 {
d := time.Nanosecond
switch precision {
case "u":
d = time.Microsecond
case "ms":
d = time.Millisecond
case "s":
d = time.Second
case "m":
d = time.Minute
case "h":
d = time.Hour
}
return int64(d)
}
func (p *point) String() string {
if p.Time().IsZero() {
return string(p.Key()) + " " + string(p.fields)
@ -1264,7 +1272,7 @@ func (p *point) PrecisionString(precision string) string {
return fmt.Sprintf("%s %s", p.Key(), string(p.fields))
}
return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields),
p.UnixNano()/p.GetPrecisionMultiplier(precision))
p.UnixNano()/GetPrecisionMultiplier(precision))
}
func (p *point) RoundedString(d time.Duration) string {

View File

@ -43,6 +43,24 @@ func BenchmarkParsePointNoTags(b *testing.B) {
}
}
func BenchmarkParsePointWithPrecisionN(b *testing.B) {
line := `cpu value=1i 1000000000`
defaultTime := time.Now().UTC()
for i := 0; i < b.N; i++ {
models.ParsePointsWithPrecision([]byte(line), defaultTime, "n")
b.SetBytes(int64(len(line)))
}
}
func BenchmarkParsePointWithPrecisionU(b *testing.B) {
line := `cpu value=1i 1000000000`
defaultTime := time.Now().UTC()
for i := 0; i < b.N; i++ {
models.ParsePointsWithPrecision([]byte(line), defaultTime, "u")
b.SetBytes(int64(len(line)))
}
}
func BenchmarkParsePointsTagsSorted2(b *testing.B) {
line := `cpu,host=serverA,region=us-west value=1i 1000000000`
for i := 0; i < b.N; i++ {

52
models/time.go Normal file
View File

@ -0,0 +1,52 @@
package models
// Helper time methods since parsing time can easily overflow and we only support a
// specific time range.
import (
"fmt"
"math"
"time"
)
var (
// Maximum time that can be represented via int64 nanoseconds since the epoch.
MaxNanoTime = time.Unix(0, math.MaxInt64).UTC()
// Minumum time that can be represented via int64 nanoseconds since the epoch.
MinNanoTime = time.Unix(0, math.MinInt64).UTC()
// The time is out of the representable range using int64 nanoseconds since the epoch.
ErrTimeOutOfRange = fmt.Errorf("time outside range %s - %s", MinNanoTime, MaxNanoTime)
)
// Safely calculate the time given. Will return error if the time is outside the
// supported range.
func SafeCalcTime(timestamp int64, precision string) (time.Time, error) {
mult := GetPrecisionMultiplier(precision)
if t, ok := safeSignedMult(timestamp, mult); ok {
return time.Unix(0, t).UTC(), nil
} else {
return time.Time{}, ErrTimeOutOfRange
}
}
// Check that a time is within the safe range.
func CheckTime(t time.Time) error {
if t.Before(MinNanoTime) || t.After(MaxNanoTime) {
return ErrTimeOutOfRange
}
return nil
}
// Perform the multiplication and check to make sure it didn't overflow.
func safeSignedMult(a, b int64) (int64, bool) {
if a == 0 || b == 0 || a == 1 || b == 1 {
return a * b, true
}
if a == math.MinInt64 || b == math.MaxInt64 {
return 0, false
}
c := a * b
return c, c/b == a
}

View File

@ -22,13 +22,14 @@ const leaderWaitTimeout = 30 * time.Second
// statistics gathered by the collectd service.
const (
statPointsReceived = "pointsRx"
statBytesReceived = "bytesRx"
statPointsParseFail = "pointsParseFail"
statReadFail = "readFail"
statBatchesTrasmitted = "batchesTx"
statPointsTransmitted = "pointsTx"
statBatchesTransmitFail = "batchesTxFail"
statPointsReceived = "pointsRx"
statBytesReceived = "bytesRx"
statPointsParseFail = "pointsParseFail"
statReadFail = "readFail"
statBatchesTrasmitted = "batchesTx"
statPointsTransmitted = "pointsTx"
statBatchesTransmitFail = "batchesTxFail"
statDroppedPointsInvalid = "droppedPointsInvalid"
)
// pointsWriter is an internal interface to make testing easier.
@ -233,7 +234,7 @@ func (s *Service) handleMessage(buffer []byte) {
return
}
for _, packet := range *packets {
points := Unmarshal(&packet)
points := s.UnmarshalCollectd(&packet)
for _, p := range points {
s.batcher.In() <- p
}
@ -266,7 +267,7 @@ func (s *Service) writePoints() {
}
// Unmarshal translates a collectd packet into InfluxDB data points.
func Unmarshal(packet *gollectd.Packet) []models.Point {
func (s *Service) UnmarshalCollectd(packet *gollectd.Packet) []models.Point {
// Prefer high resolution timestamp.
var timestamp time.Time
if packet.TimeHR > 0 {
@ -302,8 +303,10 @@ func Unmarshal(packet *gollectd.Packet) []models.Point {
tags["type_instance"] = packet.TypeInstance
}
p, err := models.NewPoint(name, tags, fields, timestamp)
// Drop points values of NaN since they are not supported
// Drop invalid points
if err != nil {
s.Logger.Printf("Dropping point %v: %v", p.Name, err)
s.statMap.Add(statDroppedPointsInvalid, 1)
continue
}

View File

@ -5,6 +5,7 @@ import (
"compress/gzip"
"encoding/json"
"errors"
"expvar"
"io"
"log"
"net"
@ -27,6 +28,8 @@ type Handler struct {
}
Logger *log.Logger
statMap *expvar.Map
}
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -113,6 +116,7 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
pt, err := models.NewPoint(p.Metric, p.Tags, map[string]interface{}{"value": p.Value}, ts)
if err != nil {
h.Logger.Printf("Dropping point %v: %v", p.Metric, err)
h.statMap.Add(statDroppedPointsInvalid, 1)
continue
}
points = append(points, pt)

View File

@ -42,6 +42,7 @@ const (
statBatchesTransmitFail = "batchesTxFail"
statConnectionsActive = "connsActive"
statConnectionsHandled = "connsHandled"
statDroppedPointsInvalid = "droppedPointsInvalid"
)
// Service manages the listener and handler for an HTTP endpoint.
@ -367,6 +368,7 @@ func (s *Service) serveHTTP() {
ConsistencyLevel: s.ConsistencyLevel,
PointsWriter: s.PointsWriter,
Logger: s.Logger,
statMap: s.statMap,
}}
srv.Serve(s.httpln)
}