feat(storage): Enforce single org for series key reads

pull/12069/head
Stuart Carnie 2019-02-15 14:30:50 -07:00
parent a8bdc9c9ab
commit 01b5fccfbe
4 changed files with 73 additions and 26 deletions

View File

@ -336,7 +336,8 @@ func (e *Engine) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest
if e.closing == nil {
return nil, ErrEngineClosed
}
return newSeriesCursor(req, e.index, cond)
return newSeriesCursor(req, e.index, e.sfile, cond)
}
// CreateCursorIterator creates a CursorIterator for usage with the read service.

View File

@ -47,9 +47,6 @@ func newIndexSeriesCursor(ctx context.Context, src *readSource, req *datatypes.R
}
p := &indexSeriesCursor{row: reads.SeriesRow{Query: tsdb.CursorIterators{queries}}}
m := tsdb.EncodeName(platform.ID(src.OrganizationID), platform.ID(src.BucketID))
mi := tsdb.NewMeasurementSliceIterator([][]byte{m[:]})
if root := req.Predicate.GetRoot(); root != nil {
if p.cond, err = reads.NodeToExpr(root, nil); err != nil {
return nil, err
@ -66,7 +63,10 @@ func newIndexSeriesCursor(ctx context.Context, src *readSource, req *datatypes.R
}
}
p.sqry, err = engine.CreateSeriesCursor(ctx, storage.SeriesCursorRequest{Measurements: mi}, opt.Condition)
scr := storage.SeriesCursorRequest{
Name: tsdb.EncodeName(platform.ID(src.OrganizationID), platform.ID(src.BucketID)),
}
p.sqry, err = engine.CreateSeriesCursor(ctx, scr, opt.Condition)
if err != nil {
p.Close()
return nil, err

View File

@ -1,6 +1,7 @@
package storage
import (
"bytes"
"errors"
"sort"
"sync"
@ -11,20 +12,28 @@ import (
"github.com/influxdata/influxql"
)
var (
errUnexpectedOrg = errors.New("seriesCursor: unexpected org")
errUnexpectedTagComparisonOperator = errors.New("seriesCursor: unexpected tag comparison operator")
)
type SeriesCursor interface {
Close() error
Next() (*SeriesCursorRow, error)
}
type SeriesCursorRequest struct {
Measurements tsdb.MeasurementIterator
// Name contains the tsdb encoded org and bucket ID
Name [16]byte
}
// seriesCursor is an implementation of SeriesCursor over an tsi1.Index.
type seriesCursor struct {
once sync.Once
index *tsi1.Index
sfile *tsdb.SeriesFile
mitr tsdb.MeasurementIterator
orgID []byte
keys [][]byte
ofs int
row SeriesCursorRow
@ -37,25 +46,19 @@ type SeriesCursorRow struct {
}
// newSeriesCursor returns a new instance of SeriesCursor.
func newSeriesCursor(req SeriesCursorRequest, index *tsi1.Index, cond influxql.Expr) (_ SeriesCursor, err error) {
// Only equality operators are allowed.
influxql.WalkFunc(cond, func(node influxql.Node) {
switch n := node.(type) {
case *influxql.BinaryExpr:
switch n.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX, influxql.OR, influxql.AND:
default:
err = errors.New("invalid tag comparison operator")
func newSeriesCursor(req SeriesCursorRequest, index *tsi1.Index, sfile *tsdb.SeriesFile, cond influxql.Expr) (_ SeriesCursor, err error) {
if cond != nil {
var err error
influxql.WalkFunc(cond, func(node influxql.Node) {
switch n := node.(type) {
case *influxql.BinaryExpr:
switch n.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX, influxql.OR, influxql.AND:
default:
err = errUnexpectedTagComparisonOperator
}
}
}
})
if err != nil {
return nil, err
}
mitr := req.Measurements
if mitr == nil {
mitr, err = index.MeasurementIterator()
})
if err != nil {
return nil, err
}
@ -63,7 +66,9 @@ func newSeriesCursor(req SeriesCursorRequest, index *tsi1.Index, cond influxql.E
return &seriesCursor{
index: index,
mitr: mitr,
sfile: sfile,
mitr: tsdb.NewMeasurementSliceIterator([][]byte{req.Name[:]}),
orgID: req.Name[:8],
cond: cond,
}, nil
}
@ -98,6 +103,9 @@ func (cur *seriesCursor) Next() (*SeriesCursorRow, error) {
}
cur.row.Name, cur.row.Tags = tsdb.ParseSeriesKey(cur.keys[cur.ofs])
if !bytes.HasPrefix(cur.row.Name, cur.orgID) {
return nil, errUnexpectedOrg
}
cur.ofs++
return &cur.row, nil
}
@ -123,7 +131,7 @@ func (cur *seriesCursor) readSeriesKeys(name []byte) error {
break
}
key := cur.index.SeriesFile().SeriesKey(elem.SeriesID)
key := cur.sfile.SeriesKey(elem.SeriesID)
if len(key) == 0 {
continue
}

View File

@ -0,0 +1,38 @@
package storage
import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
)
func Test_NewSeriesCursor_UnexpectedOrg(t *testing.T) {
makeKey := func(org, bucket uint64) []byte {
name := tsdb.EncodeName(influxdb.ID(org), influxdb.ID(bucket))
return tsdb.AppendSeriesKey(nil, name[:], nil)
}
nm := tsdb.EncodeName(0x0f0f, 0xb0b0)
cur := &seriesCursor{
keys: [][]byte{
makeKey(0x0f0f, 0xb0b0),
makeKey(0xffff, 0xb0b0),
},
orgID: nm[:8],
}
_, err := cur.Next()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
_, err = cur.Next()
if err == nil {
t.Fatal("expected error")
}
if !cmp.Equal(err.Error(), errUnexpectedOrg.Error()) {
t.Errorf("unexpected error -got/+exp\n%s", cmp.Diff(err.Error(), errUnexpectedOrg.Error()))
}
}