From ff1270dfeb32de5e9f322b7f6a323784ba454894 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 7 Apr 2017 11:20:07 -0600 Subject: [PATCH] Fix dropping fields created data corruption The Point is intended to be immutable after being parsed since it is shared by several goroutines. When dropping a field (e.g. time), corrupted data can result if one goroutine is delete the field while another is marshaling the underlying byte slices. To avoid this, the shard will just skip invalid fields and series instead of trying to mutate them by deleting them. --- models/points.go | 23 ------- models/points_test.go | 124 ------------------------------------- tsdb/engine/tsm1/engine.go | 13 +++- tsdb/shard.go | 24 ++++--- tsdb/shard_test.go | 28 ++------- 5 files changed, 34 insertions(+), 178 deletions(-) diff --git a/models/points.go b/models/points.go index 90af497abb..18ae59ab0e 100644 --- a/models/points.go +++ b/models/points.go @@ -159,9 +159,6 @@ type FieldIterator interface { // FloatValue returns the float value of the current field. FloatValue() (float64, error) - // Delete deletes the current field. - Delete() - // Reset resets the iterator to its initial state. Reset() } @@ -1931,26 +1928,6 @@ func (p *point) FloatValue() (float64, error) { return f, nil } -// Delete deletes the current field. -func (p *point) Delete() { - switch { - case p.it.end == p.it.start: - case p.it.end >= len(p.fields): - // Remove the trailing comma if there are more than one fields - p.fields = bytes.TrimSuffix(p.fields[:p.it.start], []byte(",")) - - case p.it.start == 0: - p.fields = p.fields[p.it.end:] - default: - p.fields = append(p.fields[:p.it.start], p.fields[p.it.end:]...) - } - - p.it.end = p.it.start - p.it.key = nil - p.it.valueBuf = nil - p.it.fieldType = Empty -} - // Reset resets the iterator to its initial state. func (p *point) Reset() { p.it.fieldType = Empty diff --git a/models/points_test.go b/models/points_test.go index da49d10adf..7cb79742f7 100644 --- a/models/points_test.go +++ b/models/points_test.go @@ -2164,130 +2164,6 @@ m a=2i,b=3i,c=true,d="stuff",e=-0.23,f=123.456 } } -func TestPoint_FieldIterator_Delete_Begin(t *testing.T) { - points, err := models.ParsePointsString(`m a=1,b=2,c=3`) - if err != nil || len(points) != 1 { - t.Fatal("failed parsing point") - } - - fi := points[0].FieldIterator() - fi.Next() // a - fi.Delete() - - fi.Reset() - - got := toFields(fi) - exp := models.Fields{"b": float64(2), "c": float64(3)} - - if !reflect.DeepEqual(got, exp) { - t.Fatalf("Delete failed, got %#v, exp %#v", got, exp) - } - - if _, err = models.ParsePointsString(points[0].String()); err != nil { - t.Fatalf("Failed to parse point: %v", err) - } -} - -func TestPoint_FieldIterator_Delete_Middle(t *testing.T) { - points, err := models.ParsePointsString(`m a=1,b=2,c=3`) - if err != nil || len(points) != 1 { - t.Fatal("failed parsing point") - } - - fi := points[0].FieldIterator() - fi.Next() // a - fi.Next() // b - fi.Delete() - - fi.Reset() - - got := toFields(fi) - exp := models.Fields{"a": float64(1), "c": float64(3)} - - if !reflect.DeepEqual(got, exp) { - t.Fatalf("Delete failed, got %#v, exp %#v", got, exp) - } - - if _, err = models.ParsePointsString(points[0].String()); err != nil { - t.Fatalf("Failed to parse point: %v", err) - } -} - -func TestPoint_FieldIterator_Delete_End(t *testing.T) { - points, err := models.ParsePointsString(`m a=1,b=2,c=3`) - if err != nil || len(points) != 1 { - t.Fatal("failed parsing point") - } - - fi := points[0].FieldIterator() - fi.Next() // a - fi.Next() // b - fi.Next() // c - fi.Delete() - - fi.Reset() - - got := toFields(fi) - exp := models.Fields{"a": float64(1), "b": float64(2)} - - if !reflect.DeepEqual(got, exp) { - t.Fatalf("Delete failed, got %#v, exp %#v", got, exp) - } - - if _, err = models.ParsePointsString(points[0].String()); err != nil { - t.Fatalf("Failed to parse point: %v", err) - } -} - -func TestPoint_FieldIterator_Delete_Nothing(t *testing.T) { - points, err := models.ParsePointsString(`m a=1,b=2,c=3`) - if err != nil || len(points) != 1 { - t.Fatal("failed parsing point") - } - - fi := points[0].FieldIterator() - fi.Delete() - - fi.Reset() - - got := toFields(fi) - exp := models.Fields{"a": float64(1), "b": float64(2), "c": float64(3)} - - if !reflect.DeepEqual(got, exp) { - t.Fatalf("Delete failed, got %#v, exp %#v", got, exp) - } - - if _, err = models.ParsePointsString(points[0].String()); err != nil { - t.Fatalf("Failed to parse point: %v", err) - } -} - -func TestPoint_FieldIterator_Delete_Twice(t *testing.T) { - points, err := models.ParsePointsString(`m a=1,b=2,c=3`) - if err != nil || len(points) != 1 { - t.Fatal("failed parsing point") - } - - fi := points[0].FieldIterator() - fi.Next() // a - fi.Next() // b - fi.Delete() - fi.Delete() // no-op - - fi.Reset() - - got := toFields(fi) - exp := models.Fields{"a": float64(1), "c": float64(3)} - - if !reflect.DeepEqual(got, exp) { - t.Fatalf("Delete failed, got %#v, exp %#v", got, exp) - } - - if _, err = models.ParsePointsString(points[0].String()); err != nil { - t.Fatalf("Failed to parse point: %v", err) - } -} - func TestEscapeStringField(t *testing.T) { cases := []struct { in string diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index a5d538db5d..26e46fe2e3 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -32,8 +32,12 @@ func init() { tsdb.RegisterEngine("tsm1", NewEngine) } -// Ensure Engine implements the interface. -var _ tsdb.Engine = &Engine{} +var ( + // Ensure Engine implements the interface. + _ tsdb.Engine = &Engine{} + // Static objects to prevent small allocs. + timeBytes = []byte("time") +) const ( // keyFieldSeparator separates the series key from the field name in the composite key @@ -671,6 +675,11 @@ func (e *Engine) WritePoints(points []models.Point) error { iter := p.FieldIterator() t := p.Time().UnixNano() for iter.Next() { + // Skip fields name "time", they are illegal + if bytes.Equal(iter.FieldKey(), timeBytes) { + continue + } + keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...) var v Value switch iter.Type() { diff --git a/tsdb/shard.go b/tsdb/shard.go index 00e89f746c..c23d88a593 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -546,26 +546,26 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, var skip bool for i, p := range points { skip = false - // verify the tags and fields + + // Drop any series w/ a "time" tag, these are illegal tags := p.Tags() if v := tags.Get(timeBytes); v != nil { - s.logger.Info(fmt.Sprintf("dropping tag 'time' from '%s'\n", p.PrecisionString(""))) - tags.Delete(timeBytes) - p.SetTags(tags) + dropped++ + continue } var validField bool iter := p.FieldIterator() for iter.Next() { if bytes.Equal(iter.FieldKey(), timeBytes) { - s.logger.Info(fmt.Sprintf("dropping field 'time' from '%s'\n", p.PrecisionString(""))) - iter.Delete() continue } validField = true + break } if !validField { + dropped++ continue } @@ -592,10 +592,14 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, // see if the field definitions need to be saved to the shard mf := s.engine.MeasurementFields(p.Name()) - if mf == nil { var createType influxql.DataType for iter.Next() { + // Skip fields name "time", they are illegal + if bytes.Equal(iter.FieldKey(), timeBytes) { + continue + } + switch iter.Type() { case models.Float: createType = influxql.Float @@ -631,6 +635,12 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, // validate field types and encode data for iter.Next() { + + // Skip fields name "time", they are illegal + if bytes.Equal(iter.FieldKey(), timeBytes) { + continue + } + var fieldType influxql.DataType switch iter.Type() { case models.Float: diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 54ca5efb60..39fb5d6b70 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -1,7 +1,6 @@ package tsdb_test import ( - "bytes" "fmt" "io/ioutil" "os" @@ -19,7 +18,6 @@ import ( "github.com/influxdata/influxdb/pkg/deep" "github.com/influxdata/influxdb/tsdb" _ "github.com/influxdata/influxdb/tsdb/engine" - "github.com/uber-go/zap" ) // DefaultPrecision is the precision used by the MustWritePointsString() function. @@ -229,12 +227,8 @@ func TestWriteTimeTag(t *testing.T) { time.Unix(1, 2), ) - buf := bytes.NewBuffer(nil) - sh.WithLogger(zap.New(zap.NewTextEncoder(), zap.Output(zap.AddSync(buf)))) - if err := sh.WritePoints([]models.Point{pt}); err != nil { - t.Fatalf("unexpected error: %v", err) - } else if got, exp := buf.String(), "dropping field 'time'"; !strings.Contains(got, exp) { - t.Fatalf("unexpected log message: %s", strings.TrimSpace(got)) + if err := sh.WritePoints([]models.Point{pt}); err == nil { + t.Fatal("expected error: got nil") } m := index.Measurement("cpu") @@ -249,12 +243,8 @@ func TestWriteTimeTag(t *testing.T) { time.Unix(1, 2), ) - buf = bytes.NewBuffer(nil) - sh.WithLogger(zap.New(zap.NewTextEncoder(), zap.Output(zap.AddSync(buf)))) if err := sh.WritePoints([]models.Point{pt}); err != nil { t.Fatalf("unexpected error: %v", err) - } else if got, exp := buf.String(), "dropping field 'time'"; !strings.Contains(got, exp) { - t.Fatalf("unexpected log message: %s", strings.TrimSpace(got)) } m = index.Measurement("cpu") @@ -290,20 +280,14 @@ func TestWriteTimeField(t *testing.T) { time.Unix(1, 2), ) - buf := bytes.NewBuffer(nil) - sh.WithLogger(zap.New(zap.NewTextEncoder(), zap.Output(zap.AddSync(buf)))) - if err := sh.WritePoints([]models.Point{pt}); err != nil { - t.Fatalf("unexpected error: %v", err) - } else if got, exp := buf.String(), "dropping tag 'time'"; !strings.Contains(got, exp) { - t.Fatalf("unexpected log message: %s", strings.TrimSpace(got)) + if err := sh.WritePoints([]models.Point{pt}); err == nil { + t.Fatal("expected error: got nil") } key := models.MakeKey([]byte("cpu"), nil) series := index.Series(string(key)) - if series == nil { - t.Fatal("expected series") - } else if len(series.Tags()) != 0 { - t.Fatalf("unexpected number of tags: got=%v exp=%v", len(series.Tags()), 0) + if series != nil { + t.Fatal("unexpected series") } }