Add series type validation to Engine

This is the start of per-series validation that occurs in the
Engine write path.  It uses an in-memory radix tree to reduce
memory usage and is re-built on demand the first time a series
is written.
pull/9748/head
Jason Wilder 2018-04-17 22:50:07 -06:00
parent 2c8efb2609
commit 2be2418b89
4 changed files with 187 additions and 7 deletions

View File

@ -514,6 +514,38 @@ func (c *Cache) Split(n int) []*Cache {
return caches
}
// Type returns the series type for a key.
func (c *Cache) Type(key []byte) (models.FieldType, error) {
c.mu.RLock()
e := c.store.entry(key)
if e == nil && c.snapshot != nil {
e = c.snapshot.store.entry(key)
}
c.mu.RUnlock()
if e != nil {
typ, err := e.InfluxQLType()
if err != nil {
return models.Empty, tsdb.ErrUnknownFieldType
}
switch typ {
case influxql.Float:
return models.Float, nil
case influxql.Integer:
return models.Integer, nil
case influxql.Unsigned:
return models.Unsigned, nil
case influxql.Boolean:
return models.Boolean, nil
case influxql.String:
return models.String, nil
}
}
return models.Empty, tsdb.ErrUnknownFieldType
}
// Values returns a copy of all values, deduped and sorted, for the given key.
func (c *Cache) Values(key []byte) Values {
var snapshotEntries *entry

View File

@ -26,6 +26,7 @@ import (
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/pkg/metrics"
"github.com/influxdata/influxdb/pkg/radix"
intar "github.com/influxdata/influxdb/pkg/tar"
"github.com/influxdata/influxdb/pkg/tracing"
"github.com/influxdata/influxdb/query"
@ -187,6 +188,9 @@ type Engine struct {
// provides access to the total set of series IDs
seriesIDSets tsdb.SeriesIDSets
// seriesTypeMap maps a series key to field type
seriesTypeMap *radix.Tree
}
// NewEngine returns a new instance of Engine.
@ -236,6 +240,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
compactionLimiter: opt.CompactionLimiter,
scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
seriesIDSets: opt.SeriesIDSets,
seriesTypeMap: radix.New(),
}
if e.traceLogging {
@ -1187,8 +1192,12 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType
// It returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(points []models.Point) error {
values := make(map[string][]Value, len(points))
var keyBuf []byte
var baseLen int
var (
keyBuf []byte
baseLen int
seriesErr error
)
for _, p := range points {
keyBuf = append(keyBuf[:0], p.Key()...)
keyBuf = append(keyBuf, keyFieldSeparator...)
@ -1202,6 +1211,36 @@ func (e *Engine) WritePoints(points []models.Point) error {
}
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
kstr := string(keyBuf)
// Fast-path check to see if the field for the series already exists.
if v, ok := e.seriesTypeMap.Get(kstr); !ok {
if typ, err := e.Type(keyBuf); err != nil {
// Field type is unknown, we can try to add it.
} else if typ != iter.Type() {
// Existing type is different from what was passed in, we need to drop
// this write and refresh the series type map.
seriesErr = tsdb.ErrFieldTypeConflict
e.seriesTypeMap.Insert(kstr, int(typ))
continue
}
// Doesn't exsts, so try to insert
vv, ok := e.seriesTypeMap.Insert(kstr, int(iter.Type()))
// We didn't insert and the type that exists isn't what we tried to insert, so
// we have a conflict and must drop this field/series.
if !ok || vv != int(iter.Type()) {
seriesErr = tsdb.ErrFieldTypeConflict
continue
}
} else if v != int(iter.Type()) {
// The series already exists, but with a different type. This is also a type conflict
// and we need to drop this field/series.
seriesErr = tsdb.ErrFieldTypeConflict
continue
}
var v Value
switch iter.Type() {
case models.Float:
@ -1233,7 +1272,7 @@ func (e *Engine) WritePoints(points []models.Point) error {
default:
return fmt.Errorf("unknown field type for %s: %s", string(iter.FieldKey()), p.String())
}
values[string(keyBuf)] = append(values[string(keyBuf)], v)
values[kstr] = append(values[kstr], v)
}
}
@ -1241,15 +1280,16 @@ func (e *Engine) WritePoints(points []models.Point) error {
defer e.mu.RUnlock()
// first try to write to the cache
err := e.Cache.WriteMulti(values)
if err != nil {
if err := e.Cache.WriteMulti(values); err != nil {
return err
}
if e.WALEnabled {
_, err = e.WAL.WriteMulti(values)
if _, err := e.WAL.WriteMulti(values); err != nil {
return err
}
}
return err
return seriesErr
}
// DeleteSeriesRange removes the values between min and max (inclusive) from all series
@ -2843,6 +2883,32 @@ func (e *Engine) IteratorCost(measurement string, opt query.IteratorOptions) (qu
return cost, nil
}
// Type returns FieldType for a series. If the series does not
// exist, ErrUnkownFieldType is returned.
func (e *Engine) Type(series []byte) (models.FieldType, error) {
if typ, err := e.Cache.Type(series); err == nil {
return typ, nil
}
typ, err := e.FileStore.Type(series)
if err != nil {
return 0, err
}
switch typ {
case BlockFloat64:
return models.Float, nil
case BlockInteger:
return models.Integer, nil
case BlockUnsigned:
return models.Unsigned, nil
case BlockString:
return models.String, nil
case BlockBoolean:
return models.Boolean, nil
}
return 0, tsdb.ErrUnknownFieldType
}
func (e *Engine) seriesCost(seriesKey, field string, tmin, tmax int64) query.IteratorCost {
key := SeriesFieldKeyBytes(seriesKey, field)
c := e.FileStore.Cost(key, tmin, tmax)

View File

@ -1892,6 +1892,85 @@ func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) {
}
}
func TestEngine_WritePoints_TypeConflict(t *testing.T) {
t.Parallel()
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
defer e.Close()
if err := e.WritePointsString(
`cpu,host=A value=1.1 1`,
`cpu,host=A value=1i 2`,
); err == nil {
t.Fatalf("expected field type conflict")
} else if err != tsdb.ErrFieldTypeConflict {
t.Fatalf("error mismatch: got %v, exp %v", err, tsdb.ErrFieldTypeConflict)
}
// Series type should be a float
got, err := e.Type([]byte(tsm1.SeriesFieldKey("cpu,host=A", "value")))
if err != nil {
t.Fatalf("unexpected error getting field type: %v", err)
}
if exp := models.Float; got != exp {
t.Fatalf("field type mismatch: got %v, exp %v", got, exp)
}
values := e.Cache.Values([]byte(tsm1.SeriesFieldKey("cpu,host=A", "value")))
if got, exp := len(values), 1; got != exp {
t.Fatalf("values len mismatch: got %v, exp %v", got, exp)
}
})
}
}
func TestEngine_WritePoints_Reload(t *testing.T) {
t.Parallel()
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
defer e.Close()
if err := e.WritePointsString(
`cpu,host=A value=1.1 1`,
); err != nil {
t.Fatalf("expected field type conflict")
}
// Series type should be a float
got, err := e.Type([]byte(tsm1.SeriesFieldKey("cpu,host=A", "value")))
if err != nil {
t.Fatalf("unexpected error getting field type: %v", err)
}
if exp := models.Float; got != exp {
t.Fatalf("field type mismatch: got %v, exp %v", got, exp)
}
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
println("repopen")
if err := e.Reopen(); err != nil {
t.Fatalf("unexpected error reopning engine: %v", err)
}
if err := e.WritePointsString(
`cpu,host=A value=1i 1`,
); err != tsdb.ErrFieldTypeConflict {
t.Fatalf("expected field type conflict: got %v", err)
}
})
}
}
func BenchmarkEngine_WritePoints(b *testing.B) {
batchSizes := []int{10, 100, 1000, 5000, 10000}
for _, sz := range batchSizes {

View File

@ -68,6 +68,9 @@ var (
// the file's magic number.
ErrUnknownFieldsFormat = errors.New("unknown field index format")
// ErrUnknownFieldType is returned when the type of a field cannot be determined.
ErrUnknownFieldType = errors.New("unknown field type")
// ErrShardNotIdle is returned when an operation requring the shard to be idle/cold is
// attempted on a hot shard.
ErrShardNotIdle = errors.New("shard not idle")