From aa17ef55f925c1670b238332721af79280d0fce6 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 14 Nov 2017 14:33:16 +0000 Subject: [PATCH] Implement FGA on SHOW SERIES --- internal/authorizer.go | 38 ++++++++++++++ tsdb/index/tsi1/index.go | 8 +++ tsdb/shard_test.go | 111 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+) create mode 100644 internal/authorizer.go diff --git a/internal/authorizer.go b/internal/authorizer.go new file mode 100644 index 0000000000..07847f5258 --- /dev/null +++ b/internal/authorizer.go @@ -0,0 +1,38 @@ +package internal + +import ( + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxql" +) + +// AuthorizerMock is a mockable implementation of a query.Authorizer. +type AuthorizerMock struct { + AuthorizeDatabaseFn func(influxql.Privilege, string) bool + AuthorizeQueryFn func(database string, query *influxql.Query) error + AuthorizeSeriesReadFn func(database string, measurement []byte, tags models.Tags) bool + AuthorizeSeriesWriteFn func(database string, measurement []byte, tags models.Tags) bool +} + +// AuthorizeDatabase determines if the provided privilege is sufficient to +// authorise access to the database. +func (a *AuthorizerMock) AuthorizeDatabase(p influxql.Privilege, name string) bool { + return a.AuthorizeDatabaseFn(p, name) +} + +// AuthorizeQuery determins if the query can be executed against the provided +// database. +func (a *AuthorizerMock) AuthorizeQuery(database string, query *influxql.Query) error { + return a.AuthorizeQueryFn(database, query) +} + +// AuthorizeSeriesRead determines if the series comprising measurement and tags +// can be read on the provided database. +func (a *AuthorizerMock) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool { + return a.AuthorizeSeriesReadFn(database, measurement, tags) +} + +// AuthorizeSeriesWrite determines if the series comprising measurement and tags +// can be written to, on the provided database. +func (a *AuthorizerMock) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool { + return a.AuthorizeSeriesWriteFn(database, measurement, tags) +} diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 4dd2a4a5ae..e92c367ffe 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -1239,6 +1239,14 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) { continue } + // TODO(edd): It seems to me like this authorisation check should be + // further down in the index. At this point we're going to be filtering + // series that have already been materialised in the LogFiles and + // IndexFiles. + if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.fs.database, e.Name(), e.Tags()) { + continue + } + // Convert to a key. key := string(models.MakeKey(e.Name(), e.Tags())) diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 5221435720..189d23238a 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -1,6 +1,7 @@ package tsdb_test import ( + "bytes" "context" "fmt" "io/ioutil" @@ -14,6 +15,8 @@ import ( "testing" "time" + "github.com/influxdata/influxdb/internal" + "github.com/davecgh/go-spew/spew" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -774,6 +777,105 @@ cpu,host=serverB,region=uswest value=25 0 } } +func TestShard_CreateIterator_Series_Auth(t *testing.T) { + var sh *Shard + var itr query.Iterator + + type variant struct { + name string + m *influxql.Measurement + aux []influxql.VarRef + } + + examples := []variant{ + { + name: "use_index", + m: &influxql.Measurement{Name: "cpu"}, + aux: []influxql.VarRef{{Val: "_seriesKey", Type: influxql.String}}, + }, + { + name: "use_cursors", + m: &influxql.Measurement{Name: "cpu", SystemIterator: "_series"}, + aux: []influxql.VarRef{{Val: "key", Type: influxql.String}}, + }, + } + + test := func(index string, v variant) error { + sh = MustNewOpenShard(index) + sh.MustWritePointsString(` +cpu,host=serverA,region=uswest value=100 0 +cpu,host=serverA,region=uswest value=50,val2=5 10 +cpu,host=serverB,region=uswest value=25 0 +cpu,secret=foo value=100 0 +`) + + seriesAuthorizer := &internal.AuthorizerMock{ + AuthorizeSeriesReadFn: func(database string, measurement []byte, tags models.Tags) bool { + if database == "" || !bytes.Equal(measurement, []byte("cpu")) || tags.GetString("secret") != "" { + t.Logf("Rejecting series db=%s, m=%s, tags=%v", database, measurement, tags) + return false + } + return true + }, + } + + // Create iterator for case where we use cursors (e.g., where time + // included in a SHOW SERIES query). + var err error + itr, err = sh.CreateIterator(context.Background(), v.m, query.IteratorOptions{ + Aux: v.aux, + Ascending: true, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, + Authorizer: seriesAuthorizer, + }) + if err != nil { + return err + } + + if itr == nil { + return fmt.Errorf("iterator is nil") + } + + fitr := itr.(query.FloatIterator) + defer fitr.Close() + var expCount = 2 + var gotCount int + for { + f, err := fitr.Next() + if err != nil { + return err + } + + if f == nil { + break + } + + if got := f.Aux[0].(string); strings.Contains(got, "secret") { + return fmt.Errorf("got a series %q that should be filtered", got) + } + gotCount++ + } + + if gotCount != expCount { + return fmt.Errorf("got %d series, expected %d", gotCount, expCount) + } + return nil + } + + for _, index := range tsdb.RegisteredIndexes() { + for _, example := range examples { + t.Run(index+"_"+example.name, func(t *testing.T) { + if err := test(index, example); err != nil { + t.Fatal(err) + } + }) + } + sh.Close() + itr.Close() + } +} + func TestShard_Disabled_WriteQuery(t *testing.T) { var sh *Shard @@ -1564,6 +1666,15 @@ func NewShard(index string) *Shard { } } +// MustNewOpenShard creates and opens a shard with the provided index. +func MustNewOpenShard(index string) *Shard { + sh := NewShard(index) + if err := sh.Open(); err != nil { + panic(err) + } + return sh +} + // Close closes the shard and removes all underlying data. func (sh *Shard) Close() error { defer os.RemoveAll(sh.path)