Merge pull request #12349 from influxdata/jmw-validate-before-cache

storage: detect conflicting types in a single batch of points
pull/12399/head
Jeff Wendling 2019-03-06 11:14:20 -07:00 committed by GitHub
commit 8a2ac94b48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 17 deletions

View File

@ -5,11 +5,12 @@ import (
"context"
"errors"
"fmt"
"github.com/opentracing/opentracing-go"
"math"
"sync"
"time"
"github.com/opentracing/opentracing-go"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
@ -404,8 +405,8 @@ func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error {
return ErrEngineClosed
}
// Convert the points to values for adding to the WAL/Cache.
values, err := tsm1.PointsToValues(collection.Points)
// Convert the collection to values for adding to the WAL/Cache.
values, err := tsm1.CollectionToValues(collection)
if err != nil {
return err
}
@ -433,7 +434,7 @@ func (e *Engine) writePointsLocked(collection *tsdb.SeriesCollection, values map
// more than the points so we need to recreate them.
if collection.PartialWriteError() != nil {
var err error
values, err = tsm1.PointsToValues(collection.Points)
values, err = tsm1.CollectionToValues(collection)
if err != nil {
return err
}

View File

@ -132,7 +132,7 @@ func TestEngine_WriteAddNewField(t *testing.T) {
err := engine.Write1xPoints([]models.Point{pt})
if err != nil {
t.Fatalf(err.Error())
t.Fatal(err)
}
pt = models.MustNewPoint(
@ -144,7 +144,7 @@ func TestEngine_WriteAddNewField(t *testing.T) {
err = engine.Write1xPoints([]models.Point{pt})
if err != nil {
t.Fatalf(err.Error())
t.Fatal(err)
}
if got, exp := engine.SeriesCardinality(), int64(2); got != exp {
@ -166,7 +166,7 @@ func TestEngine_DeleteBucket(t *testing.T) {
err := engine.Write1xPoints([]models.Point{pt})
if err != nil {
t.Fatalf(err.Error())
t.Fatal(err)
}
pt = models.MustNewPoint(
@ -179,7 +179,7 @@ func TestEngine_DeleteBucket(t *testing.T) {
// Same org, different bucket.
err = engine.Write1xPointsWithOrgBucket([]models.Point{pt}, "3131313131313131", "8888888888888888")
if err != nil {
t.Fatalf(err.Error())
t.Fatal(err)
}
if got, exp := engine.SeriesCardinality(), int64(3); got != exp {
@ -230,7 +230,7 @@ func TestEngineClose_RemoveIndex(t *testing.T) {
err := engine.Write1xPoints([]models.Point{pt})
if err != nil {
t.Fatalf(err.Error())
t.Fatal(err)
}
if got, exp := engine.SeriesCardinality(), int64(1); got != exp {
@ -266,6 +266,30 @@ func TestEngine_WALDisabled(t *testing.T) {
}
}
func TestEngine_WriteConflictingBatch(t *testing.T) {
engine := NewDefaultEngine()
defer engine.Close()
engine.MustOpen()
pt1 := models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
pt2 := models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 2},
time.Unix(1, 2),
)
err := engine.Write1xPoints([]models.Point{pt1, pt2})
if _, ok := err.(tsdb.PartialWriteError); !ok {
t.Fatal("expected partial write error. got:", err)
}
}
func BenchmarkDeleteBucket(b *testing.B) {
var engine *Engine
setup := func(card int) {

View File

@ -588,12 +588,18 @@ func (e *Engine) Free() error {
// WritePoints saves the set of points in the engine.
func (e *Engine) WritePoints(points []models.Point) error {
values, err := PointsToValues(points)
collection := tsdb.NewSeriesCollection(points)
values, err := CollectionToValues(collection)
if err != nil {
return err
}
return e.WriteValues(values)
if err := e.WriteValues(values); err != nil {
return err
}
return collection.PartialWriteError()
}
// WriteValues saves the set of values in the engine.

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/value"
)
@ -50,21 +51,25 @@ func NewBooleanValue(t int64, v bool) Value { return value.NewBooleanValue(t, v)
// NewStringValue returns a new string value.
func NewStringValue(t int64, v string) Value { return value.NewStringValue(t, v) }
// PointsToValues takes in a slice of points and returns it as a map of series key to
// CollectionToValues takes in a series collection and returns it as a map of series key to
// values. It returns an error if any of the points could not be converted.
func PointsToValues(points []models.Point) (map[string][]Value, error) {
values := make(map[string][]Value, len(points))
func CollectionToValues(collection *tsdb.SeriesCollection) (map[string][]Value, error) {
values := make(map[string][]Value, collection.Length())
var (
keyBuf []byte
baseLen int
)
for _, p := range points {
keyBuf = append(keyBuf[:0], p.Key()...)
j := 0
for citer := collection.Iterator(); citer.Next(); {
keyBuf = append(keyBuf[:0], citer.Key()...)
keyBuf = append(keyBuf, keyFieldSeparator...)
baseLen = len(keyBuf)
p := citer.Point()
iter := p.FieldIterator()
t := p.Time().UnixNano()
for iter.Next() {
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
@ -100,10 +105,26 @@ func PointsToValues(points []models.Point) (map[string][]Value, error) {
return nil, fmt.Errorf("unknown field type for %s: %s",
string(iter.FieldKey()), p.String())
}
values[string(keyBuf)] = append(values[string(keyBuf)], v)
vs, ok := values[string(keyBuf)]
if ok && len(vs) > 0 && valueType(vs[0]) != valueType(v) {
if collection.Reason == "" {
collection.Reason = fmt.Sprintf(
"conflicting field type: %s has field type %T but expected %T",
citer.Key(), v.Value(), vs[0].Value())
}
collection.Dropped++
collection.DroppedKeys = append(collection.DroppedKeys, citer.Key())
continue
}
values[string(keyBuf)] = append(vs, v)
collection.Copy(j, citer.Index())
j++
}
}
collection.Truncate(j)
return values, nil
}