Implement FGA on SHOW SERIES
parent
8acab9b5ac
commit
aa17ef55f9
|
@ -0,0 +1,38 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
// AuthorizerMock is a mockable implementation of a query.Authorizer.
|
||||
type AuthorizerMock struct {
|
||||
AuthorizeDatabaseFn func(influxql.Privilege, string) bool
|
||||
AuthorizeQueryFn func(database string, query *influxql.Query) error
|
||||
AuthorizeSeriesReadFn func(database string, measurement []byte, tags models.Tags) bool
|
||||
AuthorizeSeriesWriteFn func(database string, measurement []byte, tags models.Tags) bool
|
||||
}
|
||||
|
||||
// AuthorizeDatabase determines if the provided privilege is sufficient to
|
||||
// authorise access to the database.
|
||||
func (a *AuthorizerMock) AuthorizeDatabase(p influxql.Privilege, name string) bool {
|
||||
return a.AuthorizeDatabaseFn(p, name)
|
||||
}
|
||||
|
||||
// AuthorizeQuery determins if the query can be executed against the provided
|
||||
// database.
|
||||
func (a *AuthorizerMock) AuthorizeQuery(database string, query *influxql.Query) error {
|
||||
return a.AuthorizeQueryFn(database, query)
|
||||
}
|
||||
|
||||
// AuthorizeSeriesRead determines if the series comprising measurement and tags
|
||||
// can be read on the provided database.
|
||||
func (a *AuthorizerMock) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
|
||||
return a.AuthorizeSeriesReadFn(database, measurement, tags)
|
||||
}
|
||||
|
||||
// AuthorizeSeriesWrite determines if the series comprising measurement and tags
|
||||
// can be written to, on the provided database.
|
||||
func (a *AuthorizerMock) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
|
||||
return a.AuthorizeSeriesWriteFn(database, measurement, tags)
|
||||
}
|
|
@ -1239,6 +1239,14 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
|
|||
continue
|
||||
}
|
||||
|
||||
// TODO(edd): It seems to me like this authorisation check should be
|
||||
// further down in the index. At this point we're going to be filtering
|
||||
// series that have already been materialised in the LogFiles and
|
||||
// IndexFiles.
|
||||
if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.fs.database, e.Name(), e.Tags()) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Convert to a key.
|
||||
key := string(models.MakeKey(e.Name(), e.Tags()))
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package tsdb_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
@ -14,6 +15,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/internal"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
|
@ -774,6 +777,105 @@ cpu,host=serverB,region=uswest value=25 0
|
|||
}
|
||||
}
|
||||
|
||||
func TestShard_CreateIterator_Series_Auth(t *testing.T) {
|
||||
var sh *Shard
|
||||
var itr query.Iterator
|
||||
|
||||
type variant struct {
|
||||
name string
|
||||
m *influxql.Measurement
|
||||
aux []influxql.VarRef
|
||||
}
|
||||
|
||||
examples := []variant{
|
||||
{
|
||||
name: "use_index",
|
||||
m: &influxql.Measurement{Name: "cpu"},
|
||||
aux: []influxql.VarRef{{Val: "_seriesKey", Type: influxql.String}},
|
||||
},
|
||||
{
|
||||
name: "use_cursors",
|
||||
m: &influxql.Measurement{Name: "cpu", SystemIterator: "_series"},
|
||||
aux: []influxql.VarRef{{Val: "key", Type: influxql.String}},
|
||||
},
|
||||
}
|
||||
|
||||
test := func(index string, v variant) error {
|
||||
sh = MustNewOpenShard(index)
|
||||
sh.MustWritePointsString(`
|
||||
cpu,host=serverA,region=uswest value=100 0
|
||||
cpu,host=serverA,region=uswest value=50,val2=5 10
|
||||
cpu,host=serverB,region=uswest value=25 0
|
||||
cpu,secret=foo value=100 0
|
||||
`)
|
||||
|
||||
seriesAuthorizer := &internal.AuthorizerMock{
|
||||
AuthorizeSeriesReadFn: func(database string, measurement []byte, tags models.Tags) bool {
|
||||
if database == "" || !bytes.Equal(measurement, []byte("cpu")) || tags.GetString("secret") != "" {
|
||||
t.Logf("Rejecting series db=%s, m=%s, tags=%v", database, measurement, tags)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
// Create iterator for case where we use cursors (e.g., where time
|
||||
// included in a SHOW SERIES query).
|
||||
var err error
|
||||
itr, err = sh.CreateIterator(context.Background(), v.m, query.IteratorOptions{
|
||||
Aux: v.aux,
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
Authorizer: seriesAuthorizer,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if itr == nil {
|
||||
return fmt.Errorf("iterator is nil")
|
||||
}
|
||||
|
||||
fitr := itr.(query.FloatIterator)
|
||||
defer fitr.Close()
|
||||
var expCount = 2
|
||||
var gotCount int
|
||||
for {
|
||||
f, err := fitr.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if f == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if got := f.Aux[0].(string); strings.Contains(got, "secret") {
|
||||
return fmt.Errorf("got a series %q that should be filtered", got)
|
||||
}
|
||||
gotCount++
|
||||
}
|
||||
|
||||
if gotCount != expCount {
|
||||
return fmt.Errorf("got %d series, expected %d", gotCount, expCount)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
for _, example := range examples {
|
||||
t.Run(index+"_"+example.name, func(t *testing.T) {
|
||||
if err := test(index, example); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
sh.Close()
|
||||
itr.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestShard_Disabled_WriteQuery(t *testing.T) {
|
||||
var sh *Shard
|
||||
|
||||
|
@ -1564,6 +1666,15 @@ func NewShard(index string) *Shard {
|
|||
}
|
||||
}
|
||||
|
||||
// MustNewOpenShard creates and opens a shard with the provided index.
|
||||
func MustNewOpenShard(index string) *Shard {
|
||||
sh := NewShard(index)
|
||||
if err := sh.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return sh
|
||||
}
|
||||
|
||||
// Close closes the shard and removes all underlying data.
|
||||
func (sh *Shard) Close() error {
|
||||
defer os.RemoveAll(sh.path)
|
||||
|
|
Loading…
Reference in New Issue