Fix dropping fields created data corruption
The Point is intended to be immutable after being parsed since it is shared by several goroutines. When dropping a field (e.g. time), corrupted data can result if one goroutine is delete the field while another is marshaling the underlying byte slices. To avoid this, the shard will just skip invalid fields and series instead of trying to mutate them by deleting them.pull/8272/head
parent
fad4784f9e
commit
ff1270dfeb
|
@ -159,9 +159,6 @@ type FieldIterator interface {
|
|||
// FloatValue returns the float value of the current field.
|
||||
FloatValue() (float64, error)
|
||||
|
||||
// Delete deletes the current field.
|
||||
Delete()
|
||||
|
||||
// Reset resets the iterator to its initial state.
|
||||
Reset()
|
||||
}
|
||||
|
@ -1931,26 +1928,6 @@ func (p *point) FloatValue() (float64, error) {
|
|||
return f, nil
|
||||
}
|
||||
|
||||
// Delete deletes the current field.
|
||||
func (p *point) Delete() {
|
||||
switch {
|
||||
case p.it.end == p.it.start:
|
||||
case p.it.end >= len(p.fields):
|
||||
// Remove the trailing comma if there are more than one fields
|
||||
p.fields = bytes.TrimSuffix(p.fields[:p.it.start], []byte(","))
|
||||
|
||||
case p.it.start == 0:
|
||||
p.fields = p.fields[p.it.end:]
|
||||
default:
|
||||
p.fields = append(p.fields[:p.it.start], p.fields[p.it.end:]...)
|
||||
}
|
||||
|
||||
p.it.end = p.it.start
|
||||
p.it.key = nil
|
||||
p.it.valueBuf = nil
|
||||
p.it.fieldType = Empty
|
||||
}
|
||||
|
||||
// Reset resets the iterator to its initial state.
|
||||
func (p *point) Reset() {
|
||||
p.it.fieldType = Empty
|
||||
|
|
|
@ -2164,130 +2164,6 @@ m a=2i,b=3i,c=true,d="stuff",e=-0.23,f=123.456
|
|||
}
|
||||
}
|
||||
|
||||
func TestPoint_FieldIterator_Delete_Begin(t *testing.T) {
|
||||
points, err := models.ParsePointsString(`m a=1,b=2,c=3`)
|
||||
if err != nil || len(points) != 1 {
|
||||
t.Fatal("failed parsing point")
|
||||
}
|
||||
|
||||
fi := points[0].FieldIterator()
|
||||
fi.Next() // a
|
||||
fi.Delete()
|
||||
|
||||
fi.Reset()
|
||||
|
||||
got := toFields(fi)
|
||||
exp := models.Fields{"b": float64(2), "c": float64(3)}
|
||||
|
||||
if !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("Delete failed, got %#v, exp %#v", got, exp)
|
||||
}
|
||||
|
||||
if _, err = models.ParsePointsString(points[0].String()); err != nil {
|
||||
t.Fatalf("Failed to parse point: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoint_FieldIterator_Delete_Middle(t *testing.T) {
|
||||
points, err := models.ParsePointsString(`m a=1,b=2,c=3`)
|
||||
if err != nil || len(points) != 1 {
|
||||
t.Fatal("failed parsing point")
|
||||
}
|
||||
|
||||
fi := points[0].FieldIterator()
|
||||
fi.Next() // a
|
||||
fi.Next() // b
|
||||
fi.Delete()
|
||||
|
||||
fi.Reset()
|
||||
|
||||
got := toFields(fi)
|
||||
exp := models.Fields{"a": float64(1), "c": float64(3)}
|
||||
|
||||
if !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("Delete failed, got %#v, exp %#v", got, exp)
|
||||
}
|
||||
|
||||
if _, err = models.ParsePointsString(points[0].String()); err != nil {
|
||||
t.Fatalf("Failed to parse point: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoint_FieldIterator_Delete_End(t *testing.T) {
|
||||
points, err := models.ParsePointsString(`m a=1,b=2,c=3`)
|
||||
if err != nil || len(points) != 1 {
|
||||
t.Fatal("failed parsing point")
|
||||
}
|
||||
|
||||
fi := points[0].FieldIterator()
|
||||
fi.Next() // a
|
||||
fi.Next() // b
|
||||
fi.Next() // c
|
||||
fi.Delete()
|
||||
|
||||
fi.Reset()
|
||||
|
||||
got := toFields(fi)
|
||||
exp := models.Fields{"a": float64(1), "b": float64(2)}
|
||||
|
||||
if !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("Delete failed, got %#v, exp %#v", got, exp)
|
||||
}
|
||||
|
||||
if _, err = models.ParsePointsString(points[0].String()); err != nil {
|
||||
t.Fatalf("Failed to parse point: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoint_FieldIterator_Delete_Nothing(t *testing.T) {
|
||||
points, err := models.ParsePointsString(`m a=1,b=2,c=3`)
|
||||
if err != nil || len(points) != 1 {
|
||||
t.Fatal("failed parsing point")
|
||||
}
|
||||
|
||||
fi := points[0].FieldIterator()
|
||||
fi.Delete()
|
||||
|
||||
fi.Reset()
|
||||
|
||||
got := toFields(fi)
|
||||
exp := models.Fields{"a": float64(1), "b": float64(2), "c": float64(3)}
|
||||
|
||||
if !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("Delete failed, got %#v, exp %#v", got, exp)
|
||||
}
|
||||
|
||||
if _, err = models.ParsePointsString(points[0].String()); err != nil {
|
||||
t.Fatalf("Failed to parse point: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoint_FieldIterator_Delete_Twice(t *testing.T) {
|
||||
points, err := models.ParsePointsString(`m a=1,b=2,c=3`)
|
||||
if err != nil || len(points) != 1 {
|
||||
t.Fatal("failed parsing point")
|
||||
}
|
||||
|
||||
fi := points[0].FieldIterator()
|
||||
fi.Next() // a
|
||||
fi.Next() // b
|
||||
fi.Delete()
|
||||
fi.Delete() // no-op
|
||||
|
||||
fi.Reset()
|
||||
|
||||
got := toFields(fi)
|
||||
exp := models.Fields{"a": float64(1), "c": float64(3)}
|
||||
|
||||
if !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("Delete failed, got %#v, exp %#v", got, exp)
|
||||
}
|
||||
|
||||
if _, err = models.ParsePointsString(points[0].String()); err != nil {
|
||||
t.Fatalf("Failed to parse point: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEscapeStringField(t *testing.T) {
|
||||
cases := []struct {
|
||||
in string
|
||||
|
|
|
@ -32,8 +32,12 @@ func init() {
|
|||
tsdb.RegisterEngine("tsm1", NewEngine)
|
||||
}
|
||||
|
||||
// Ensure Engine implements the interface.
|
||||
var _ tsdb.Engine = &Engine{}
|
||||
var (
|
||||
// Ensure Engine implements the interface.
|
||||
_ tsdb.Engine = &Engine{}
|
||||
// Static objects to prevent small allocs.
|
||||
timeBytes = []byte("time")
|
||||
)
|
||||
|
||||
const (
|
||||
// keyFieldSeparator separates the series key from the field name in the composite key
|
||||
|
@ -671,6 +675,11 @@ func (e *Engine) WritePoints(points []models.Point) error {
|
|||
iter := p.FieldIterator()
|
||||
t := p.Time().UnixNano()
|
||||
for iter.Next() {
|
||||
// Skip fields name "time", they are illegal
|
||||
if bytes.Equal(iter.FieldKey(), timeBytes) {
|
||||
continue
|
||||
}
|
||||
|
||||
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
|
||||
var v Value
|
||||
switch iter.Type() {
|
||||
|
|
|
@ -546,26 +546,26 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
var skip bool
|
||||
for i, p := range points {
|
||||
skip = false
|
||||
// verify the tags and fields
|
||||
|
||||
// Drop any series w/ a "time" tag, these are illegal
|
||||
tags := p.Tags()
|
||||
if v := tags.Get(timeBytes); v != nil {
|
||||
s.logger.Info(fmt.Sprintf("dropping tag 'time' from '%s'\n", p.PrecisionString("")))
|
||||
tags.Delete(timeBytes)
|
||||
p.SetTags(tags)
|
||||
dropped++
|
||||
continue
|
||||
}
|
||||
|
||||
var validField bool
|
||||
iter := p.FieldIterator()
|
||||
for iter.Next() {
|
||||
if bytes.Equal(iter.FieldKey(), timeBytes) {
|
||||
s.logger.Info(fmt.Sprintf("dropping field 'time' from '%s'\n", p.PrecisionString("")))
|
||||
iter.Delete()
|
||||
continue
|
||||
}
|
||||
validField = true
|
||||
break
|
||||
}
|
||||
|
||||
if !validField {
|
||||
dropped++
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -592,10 +592,14 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
|
||||
// see if the field definitions need to be saved to the shard
|
||||
mf := s.engine.MeasurementFields(p.Name())
|
||||
|
||||
if mf == nil {
|
||||
var createType influxql.DataType
|
||||
for iter.Next() {
|
||||
// Skip fields name "time", they are illegal
|
||||
if bytes.Equal(iter.FieldKey(), timeBytes) {
|
||||
continue
|
||||
}
|
||||
|
||||
switch iter.Type() {
|
||||
case models.Float:
|
||||
createType = influxql.Float
|
||||
|
@ -631,6 +635,12 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
|
||||
// validate field types and encode data
|
||||
for iter.Next() {
|
||||
|
||||
// Skip fields name "time", they are illegal
|
||||
if bytes.Equal(iter.FieldKey(), timeBytes) {
|
||||
continue
|
||||
}
|
||||
|
||||
var fieldType influxql.DataType
|
||||
switch iter.Type() {
|
||||
case models.Float:
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package tsdb_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
@ -19,7 +18,6 @@ import (
|
|||
"github.com/influxdata/influxdb/pkg/deep"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
_ "github.com/influxdata/influxdb/tsdb/engine"
|
||||
"github.com/uber-go/zap"
|
||||
)
|
||||
|
||||
// DefaultPrecision is the precision used by the MustWritePointsString() function.
|
||||
|
@ -229,12 +227,8 @@ func TestWriteTimeTag(t *testing.T) {
|
|||
time.Unix(1, 2),
|
||||
)
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
sh.WithLogger(zap.New(zap.NewTextEncoder(), zap.Output(zap.AddSync(buf))))
|
||||
if err := sh.WritePoints([]models.Point{pt}); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else if got, exp := buf.String(), "dropping field 'time'"; !strings.Contains(got, exp) {
|
||||
t.Fatalf("unexpected log message: %s", strings.TrimSpace(got))
|
||||
if err := sh.WritePoints([]models.Point{pt}); err == nil {
|
||||
t.Fatal("expected error: got nil")
|
||||
}
|
||||
|
||||
m := index.Measurement("cpu")
|
||||
|
@ -249,12 +243,8 @@ func TestWriteTimeTag(t *testing.T) {
|
|||
time.Unix(1, 2),
|
||||
)
|
||||
|
||||
buf = bytes.NewBuffer(nil)
|
||||
sh.WithLogger(zap.New(zap.NewTextEncoder(), zap.Output(zap.AddSync(buf))))
|
||||
if err := sh.WritePoints([]models.Point{pt}); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else if got, exp := buf.String(), "dropping field 'time'"; !strings.Contains(got, exp) {
|
||||
t.Fatalf("unexpected log message: %s", strings.TrimSpace(got))
|
||||
}
|
||||
|
||||
m = index.Measurement("cpu")
|
||||
|
@ -290,20 +280,14 @@ func TestWriteTimeField(t *testing.T) {
|
|||
time.Unix(1, 2),
|
||||
)
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
sh.WithLogger(zap.New(zap.NewTextEncoder(), zap.Output(zap.AddSync(buf))))
|
||||
if err := sh.WritePoints([]models.Point{pt}); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else if got, exp := buf.String(), "dropping tag 'time'"; !strings.Contains(got, exp) {
|
||||
t.Fatalf("unexpected log message: %s", strings.TrimSpace(got))
|
||||
if err := sh.WritePoints([]models.Point{pt}); err == nil {
|
||||
t.Fatal("expected error: got nil")
|
||||
}
|
||||
|
||||
key := models.MakeKey([]byte("cpu"), nil)
|
||||
series := index.Series(string(key))
|
||||
if series == nil {
|
||||
t.Fatal("expected series")
|
||||
} else if len(series.Tags()) != 0 {
|
||||
t.Fatalf("unexpected number of tags: got=%v exp=%v", len(series.Tags()), 0)
|
||||
if series != nil {
|
||||
t.Fatal("unexpected series")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue