refactor: separate coarse and fine permission interfaces (#20996)

pull/21041/head
Sam Arnold 2021-03-22 10:52:33 -03:00 committed by GitHub
parent d73df4ef53
commit b7e7de24d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 212 additions and 142 deletions

View File

@ -661,7 +661,7 @@ func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql
func (e *StatementExecutor) executeShowDatabasesStatement(ctx *query.ExecutionContext, q *influxql.ShowDatabasesStatement) (models.Rows, error) {
dis := e.MetaClient.Databases()
a := ctx.ExecutionOptions.Authorizer
a := ctx.ExecutionOptions.CoarseAuthorizer
row := &models.Row{Name: "databases", Columns: []string{"name"}}
for _, di := range dis {
@ -1374,9 +1374,9 @@ type TSDBStore interface {
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error
MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
MeasurementNames(ctx context.Context, auth query.FineAuthorizer, database string, cond influxql.Expr) ([][]byte, error)
TagKeys(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
SeriesCardinality(ctx context.Context, database string) (int64, error)
MeasurementsCardinality(ctx context.Context, database string) (int64, error)

View File

@ -383,26 +383,14 @@ func TestStatementExecutor_NormalizeDeleteSeries(t *testing.T) {
}
}
type mockAuthorizer struct {
type mockCoarseAuthorizer struct {
AuthorizeDatabaseFn func(influxql.Privilege, string) bool
}
func (a *mockAuthorizer) AuthorizeDatabase(p influxql.Privilege, name string) bool {
func (a *mockCoarseAuthorizer) AuthorizeDatabase(p influxql.Privilege, name string) bool {
return a.AuthorizeDatabaseFn(p, name)
}
func (m *mockAuthorizer) AuthorizeQuery(database string, query *influxql.Query) error {
panic("fail")
}
func (m *mockAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
panic("fail")
}
func (m *mockAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
panic("fail")
}
func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) {
qe := query.NewExecutor()
qe.StatementExecutor = &coordinator.StatementExecutor{
@ -416,7 +404,7 @@ func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) {
}
opt := query.ExecutionOptions{
Authorizer: &mockAuthorizer{
CoarseAuthorizer: &mockCoarseAuthorizer{
AuthorizeDatabaseFn: func(p influxql.Privilege, name string) bool {
return name == "db2" || name == "db4"
},
@ -468,11 +456,11 @@ func NewQueryExecutor() *QueryExecutor {
return nil
}
e.TSDBStore.MeasurementNamesFn = func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
e.TSDBStore.MeasurementNamesFn = func(auth query.FineAuthorizer, database string, cond influxql.Expr) ([][]byte, error) {
return nil, nil
}
e.TSDBStore.TagValuesFn = func(_ query.Authorizer, _ []uint64, _ influxql.Expr) ([]tsdb.TagValues, error) {
e.TSDBStore.TagValuesFn = func(_ query.FineAuthorizer, _ []uint64, _ influxql.Expr) ([]tsdb.TagValues, error) {
return nil, nil
}

View File

@ -5,7 +5,7 @@ import (
"github.com/influxdata/influxql"
)
// AuthorizerMock is a mockable implementation of a query.Authorizer.
// AuthorizerMock is a mockable implementation of a query.FineAuthorizer + query.CoarseAuthorizer
type AuthorizerMock struct {
AuthorizeDatabaseFn func(influxql.Privilege, string) bool
AuthorizeQueryFn func(database string, query *influxql.Query) error
@ -36,3 +36,7 @@ func (a *AuthorizerMock) AuthorizeSeriesRead(database string, measurement []byte
func (a *AuthorizerMock) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
return a.AuthorizeSeriesWriteFn(database, measurement, tags)
}
func (a *AuthorizerMock) IsOpen() bool {
return false
}

View File

@ -31,7 +31,7 @@ type TSDBStoreMock struct {
ImportShardFn func(id uint64, r io.Reader) error
MeasurementSeriesCountsFn func(database string) (measuments int, series int)
MeasurementsCardinalityFn func(database string) (int64, error)
MeasurementNamesFn func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
MeasurementNamesFn func(auth query.FineAuthorizer, database string, cond influxql.Expr) ([][]byte, error)
OpenFn func() error
PathFn func() string
RestoreShardFn func(id uint64, r io.Reader) error
@ -44,8 +44,8 @@ type TSDBStoreMock struct {
ShardRelativePathFn func(id uint64) (string, error)
ShardsFn func(ids []uint64) []*tsdb.Shard
StatisticsFn func(tags map[string]string) []models.Statistic
TagKeysFn func(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValuesFn func(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
TagKeysFn func(auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValuesFn func(auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
WithLoggerFn func(log *zap.Logger)
WriteToShardFn func(ctx tsdb.WriteContext, shardID uint64, points []models.Point) error
}
@ -93,7 +93,7 @@ func (s *TSDBStoreMock) ExpandSources(sources influxql.Sources) (influxql.Source
func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error {
return s.ImportShardFn(id, r)
}
func (s *TSDBStoreMock) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
func (s *TSDBStoreMock) MeasurementNames(ctx context.Context, auth query.FineAuthorizer, database string, cond influxql.Expr) ([][]byte, error) {
return s.MeasurementNamesFn(auth, database, cond)
}
func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measuments int, series int) {
@ -138,10 +138,10 @@ func (s *TSDBStoreMock) Shards(ids []uint64) []*tsdb.Shard {
func (s *TSDBStoreMock) Statistics(tags map[string]string) []models.Statistic {
return s.StatisticsFn(tags)
}
func (s *TSDBStoreMock) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) {
func (s *TSDBStoreMock) TagKeys(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) {
return s.TagKeysFn(auth, shardIDs, cond)
}
func (s *TSDBStoreMock) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) {
func (s *TSDBStoreMock) TagValues(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) {
return s.TagValuesFn(auth, shardIDs, cond)
}
func (s *TSDBStoreMock) WithLogger(log *zap.Logger) {

View File

@ -68,19 +68,33 @@ func ErrMaxConcurrentQueriesLimitExceeded(n, limit int) error {
return fmt.Errorf("max-concurrent-queries limit exceeded(%d, %d)", n, limit)
}
// Authorizer determines if certain operations are authorized.
type Authorizer interface {
// CoarseAuthorizer determines if certain operations are authorized at the database level.
//
// It is supported both in OSS and Enterprise.
type CoarseAuthorizer interface {
// AuthorizeDatabase indicates whether the given Privilege is authorized on the database with the given name.
AuthorizeDatabase(p influxql.Privilege, name string) bool
}
// AuthorizeQuery returns an error if the query cannot be executed
AuthorizeQuery(database string, query *influxql.Query) error
type openCoarseAuthorizer struct{}
func (a openCoarseAuthorizer) AuthorizeDatabase(influxql.Privilege, string) bool { return true }
// OpenCoarseAuthorizer is a fully permissive implementation of CoarseAuthorizer.
var OpenCoarseAuthorizer openCoarseAuthorizer
// FineAuthorizer determines if certain operations are authorized at the series level.
//
// It is only supported in InfluxDB Enterprise. In OSS it always returns true.
type FineAuthorizer interface {
// AuthorizeSeriesRead determines if a series is authorized for reading
AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool
// AuthorizeSeriesWrite determines if a series is authorized for writing
AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool
// IsOpen guarantees that the other methods of a FineAuthorizer always return true.
IsOpen() bool
}
// OpenAuthorizer is the Authorizer used when authorization is disabled.
@ -90,9 +104,6 @@ type openAuthorizer struct{}
// OpenAuthorizer can be shared by all goroutines.
var OpenAuthorizer = openAuthorizer{}
// AuthorizeDatabase returns true to allow any operation on a database.
func (a openAuthorizer) AuthorizeDatabase(influxql.Privilege, string) bool { return true }
// AuthorizeSeriesRead allows access to any series.
func (a openAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
return true
@ -103,6 +114,8 @@ func (a openAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte
return true
}
func (a openAuthorizer) IsOpen() bool { return true }
// AuthorizeSeriesRead allows any query to execute.
func (a openAuthorizer) AuthorizeQuery(_ string, _ *influxql.Query) error { return nil }
@ -110,11 +123,8 @@ func (a openAuthorizer) AuthorizeQuery(_ string, _ *influxql.Query) error { retu
// authorize anything. A nil Authorizer returns true for this function, and this
// function should be preferred over directly checking if an Authorizer is nil
// or not.
func AuthorizerIsOpen(a Authorizer) bool {
if u, ok := a.(interface{ AuthorizeUnrestricted() bool }); ok {
return u.AuthorizeUnrestricted()
}
return a == nil || a == OpenAuthorizer
func AuthorizerIsOpen(a FineAuthorizer) bool {
return a == nil || a.IsOpen()
}
// ExecutionOptions contains the options for executing a query.
@ -125,9 +135,11 @@ type ExecutionOptions struct {
// The retention policy the query is running against.
RetentionPolicy string
// How to determine whether the query is allowed to execute,
// what resources can be returned in SHOW queries, etc.
Authorizer Authorizer
// Authorizer handles series-level authorization
Authorizer FineAuthorizer
// CoarseAuthorizer handles database-level authorization
CoarseAuthorizer CoarseAuthorizer
// The requested maximum number of points to return in each result.
ChunkSize int

View File

@ -620,7 +620,7 @@ type IteratorOptions struct {
InterruptCh <-chan struct{}
// Authorizer can limit access to data
Authorizer Authorizer
Authorizer FineAuthorizer
}
// newIteratorOptionsStmt creates the iterator options from stmt.

View File

@ -23,7 +23,7 @@ var DefaultTypeMapper = influxql.MultiTypeMapper(
// SelectOptions are options that customize the select call.
type SelectOptions struct {
// Authorizer is used to limit access to data
Authorizer Authorizer
Authorizer FineAuthorizer
// Node to exclusively read from.
// If zero, all nodes are used.

View File

@ -402,6 +402,9 @@ func (*openAuthorizer) AuthorizeSeriesRead(database string, measurement []byte,
func (*openAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
return true
}
func (*openAuthorizer) IsOpen() bool {
return true
}
// Ensure that the subquery gets passed the query authorizer.
func TestSubquery_Authorizer(t *testing.T) {

View File

@ -83,6 +83,21 @@ type Route struct {
HandlerFunc interface{}
}
type QueryAuthorizer interface {
AuthorizeQuery(u meta.User, query *influxql.Query, database string) error
AuthorizeDatabase(u meta.User, priv influxql.Privilege, database string) error
}
// userQueryAuthorizer binds the QueryAuthorizer with a specific user for consumption by the query engine.
type userQueryAuthorizer struct {
auth QueryAuthorizer
user meta.User
}
func (a *userQueryAuthorizer) AuthorizeDatabase(p influxql.Privilege, name string) bool {
return a.auth.AuthorizeDatabase(a.user, p, name) == nil
}
// Handler represents an HTTP handler for the InfluxDB server.
type Handler struct {
mux *pat.PatternServeMux
@ -97,9 +112,7 @@ type Handler struct {
AdminUserExists() bool
}
QueryAuthorizer interface {
AuthorizeQuery(u meta.User, query *influxql.Query, database string) error
}
QueryAuthorizer QueryAuthorizer
WriteAuthorizer interface {
AuthorizeWrite(username, database string) error
@ -240,6 +253,7 @@ func NewHandler(c Config) *Handler {
if h.Config.AuthEnabled && h.Config.PprofEnabled && h.Config.PprofAuthEnabled {
authWrapper = func(handler func(http.ResponseWriter, *http.Request)) interface{} {
return func(w http.ResponseWriter, r *http.Request, user meta.User) {
// TODO: This is the only place we use AuthorizeUnrestricted. It would be better to use an explicit permission
if user == nil || !user.AuthorizeUnrestricted() {
h.Logger.Info("Unauthorized request", zap.String("user", user.ID()), zap.String("path", r.URL.Path))
h.httpError(w, "error authorizing admin access", http.StatusForbidden)
@ -603,14 +617,18 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U
ChunkSize: chunkSize,
ReadOnly: r.Method == "GET",
NodeID: nodeID,
// Authorizer is for fine grained auth, not supported by oss.
Authorizer: query.OpenAuthorizer,
}
if h.Config.AuthEnabled {
// The current user determines the authorized actions.
opts.Authorizer = user
opts.CoarseAuthorizer = &userQueryAuthorizer{
auth: h.QueryAuthorizer,
user: user,
}
} else {
// Auth is disabled, so allow everything.
opts.Authorizer = query.OpenAuthorizer
opts.CoarseAuthorizer = query.OpenCoarseAuthorizer
}
// Make sure if the client disconnects we signal the query to abort
@ -1239,7 +1257,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
h.httpError(w, fmt.Sprintf("user is required to read from database %q", db), http.StatusForbidden)
return
}
if !user.AuthorizeDatabase(influxql.ReadPrivilege, db) {
if h.QueryAuthorizer.AuthorizeDatabase(user, influxql.ReadPrivilege, db) != nil {
h.httpError(w, fmt.Sprintf("user %q is not authorized to read from database %q", user.ID(), db), http.StatusForbidden)
return
}

View File

@ -2166,6 +2166,10 @@ func (a *HandlerQueryAuthorizer) AuthorizeQuery(u meta.User, query *influxql.Que
return a.AuthorizeQueryFn(u, query, database)
}
func (a *HandlerQueryAuthorizer) AuthorizeDatabase(u meta.User, priv influxql.Privilege, database string) error {
panic("not implemented")
}
type HandlerPointsWriter struct {
WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
}

View File

@ -1574,7 +1574,7 @@ func (cqi *ContinuousQueryInfo) unmarshal(pb *internal.ContinuousQueryInfo) {
cqi.Query = pb.GetQuery()
}
var _ query.Authorizer = (*UserInfo)(nil)
var _ query.FineAuthorizer = (*UserInfo)(nil)
// UserInfo represents metadata about a user in the system.
type UserInfo struct {
@ -1592,7 +1592,7 @@ type UserInfo struct {
}
type User interface {
query.Authorizer
query.FineAuthorizer
ID() string
AuthorizeUnrestricted() bool
}
@ -1620,7 +1620,14 @@ func (u *UserInfo) AuthorizeSeriesWrite(database string, measurement []byte, tag
return true
}
// AuthorizeUnrestricted allows admins to shortcut access checks.
// IsOpen is a method on FineAuthorizer to indicate all fine auth is permitted and short circuit some checks.
func (u *UserInfo) IsOpen() bool {
return true
}
// AuthorizeUnrestricted identifies the admin user
//
// Only the pprof endpoint uses this, we should prefer to have explicit permissioning instead.
func (u *UserInfo) AuthorizeUnrestricted() bool {
return u.Admin
}

View File

@ -48,7 +48,64 @@ func (a *QueryAuthorizer) AuthorizeQuery(u User, query *influxql.Query, database
}
}
return u.AuthorizeQuery(database, query)
// There is only one OSS implementation of the User interface, and the OSS QueryAuthorizer only works
// with the OSS UserInfo. There is a similar tight coupling between the Enterprise QueryAuthorizer and
// Enterprise UserInfo in closed-source code.
switch user := u.(type) {
case *UserInfo:
// Admin privilege allows the user to execute all statements.
if user.Admin {
return nil
}
// Check each statement in the query.
for _, stmt := range query.Statements {
// Get the privileges required to execute the statement.
privs, err := stmt.RequiredPrivileges()
if err != nil {
return err
}
// Make sure the user has the privileges required to execute
// each statement.
for _, p := range privs {
if p.Admin {
// Admin privilege already checked so statement requiring admin
// privilege cannot be run.
return &ErrAuthorize{
Query: query,
User: user.Name,
Database: database,
Message: fmt.Sprintf("statement '%s', requires admin privilege", stmt),
}
}
// Use the db name specified by the statement or the db
// name passed by the caller if one wasn't specified by
// the statement.
db := p.Name
if db == "" {
db = database
}
if !user.AuthorizeDatabase(p.Privilege, db) {
return &ErrAuthorize{
Query: query,
User: user.Name,
Database: database,
Message: fmt.Sprintf("statement '%s', requires %s on %s", stmt, p.Privilege.String(), db),
}
}
}
}
return nil
default:
}
return &ErrAuthorize{
Query: query,
User: u.ID(),
Database: database,
Message: fmt.Sprintf("Invalid OSS user type %T", u),
}
}
func (a *QueryAuthorizer) AuthorizeDatabase(u User, priv influxql.Privilege, database string) error {
@ -59,63 +116,23 @@ func (a *QueryAuthorizer) AuthorizeDatabase(u User, priv influxql.Privilege, dat
}
}
if !u.AuthorizeDatabase(priv, database) {
return &ErrAuthorize{
Database: database,
Message: fmt.Sprintf("user %q, requires %s for database %q", u.ID(), priv.String(), database),
switch user := u.(type) {
case *UserInfo:
if !user.AuthorizeDatabase(priv, database) {
return &ErrAuthorize{
Database: database,
Message: fmt.Sprintf("user %q, requires %s for database %q", u.ID(), priv.String(), database),
}
}
}
return nil
}
func (u *UserInfo) AuthorizeQuery(database string, query *influxql.Query) error {
// Admin privilege allows the user to execute all statements.
if u.Admin {
return nil
default:
}
return &ErrAuthorize{
Database: database,
User: u.ID(),
Message: fmt.Sprintf("Internal error - incorrect oss user type %T", u),
}
// Check each statement in the query.
for _, stmt := range query.Statements {
// Get the privileges required to execute the statement.
privs, err := stmt.RequiredPrivileges()
if err != nil {
return err
}
// Make sure the user has the privileges required to execute
// each statement.
for _, p := range privs {
if p.Admin {
// Admin privilege already checked so statement requiring admin
// privilege cannot be run.
return &ErrAuthorize{
Query: query,
User: u.Name,
Database: database,
Message: fmt.Sprintf("statement '%s', requires admin privilege", stmt),
}
}
// Use the db name specified by the statement or the db
// name passed by the caller if one wasn't specified by
// the statement.
db := p.Name
if db == "" {
db = database
}
if !u.AuthorizeDatabase(p.Privilege, db) {
return &ErrAuthorize{
Query: query,
User: u.Name,
Database: database,
Message: fmt.Sprintf("statement '%s', requires %s on %s", stmt, p.Privilege.String(), db),
}
}
}
}
return nil
}
// ErrAuthorize represents an authorization error.

View File

@ -19,11 +19,28 @@ func NewWriteAuthorizer(c *Client) *WriteAuthorizer {
// AuthorizeWrite returns nil if the user has permission to write to the database.
func (a WriteAuthorizer) AuthorizeWrite(username, database string) error {
u, err := a.Client.User(username)
if err != nil || u == nil || !u.AuthorizeDatabase(influxql.WritePrivilege, database) {
if err != nil || u == nil {
return &ErrAuthorize{
Database: database,
Message: fmt.Sprintf("%s not authorized to write to %s", username, database),
}
}
// There is only one OSS implementation of the User interface, and the OSS WriteAuthorizer only works
// with the OSS UserInfo. There is a similar tight coupling between the Enterprise WriteAuthorizer and
// Enterprise UserInfo in closed-source code.
switch user := u.(type) {
case *UserInfo:
if !user.AuthorizeDatabase(influxql.WritePrivilege, database) {
return &ErrAuthorize{
Database: database,
Message: fmt.Sprintf("%s not authorized to write to %s", username, database),
}
}
default:
return &ErrAuthorize{
Database: database,
Message: fmt.Sprintf("Internal error - wrong type %T for oss user", u),
}
}
return nil
}

View File

@ -1320,7 +1320,7 @@ func (is IndexSet) DedupeInmemIndexes() IndexSet {
// MeasurementNamesByExpr returns a slice of measurement names matching the
// provided condition. If no condition is provided then all names are returned.
func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
release := is.SeriesFile.Retain()
defer release()
@ -1360,7 +1360,7 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.E
return slices.CopyChunkedByteSlices(names, 1000), nil
}
func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
if expr == nil {
return nil, nil
}
@ -1427,7 +1427,7 @@ func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.E
}
// measurementNamesByNameFilter returns matching measurement names in sorted order.
func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) {
func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) {
itr, err := is.measurementIterator()
if err != nil {
return nil, err
@ -1469,7 +1469,7 @@ func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influx
// provided condition. If no condition is provided then all names are returned.
// This behaves differently from MeasurementNamesByExpr because it will
// return measurements using flux predicates.
func (is IndexSet) MeasurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
release := is.SeriesFile.Retain()
defer release()
@ -1509,7 +1509,7 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.Authorizer, expr influ
return slices.CopyChunkedByteSlices(names, 1000), nil
}
func (is IndexSet) measurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
if expr == nil {
return nil, nil
}
@ -1575,7 +1575,7 @@ func (is IndexSet) measurementNamesByPredicate(auth query.Authorizer, expr influ
}
}
func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
var names [][]byte
mitr, err := is.measurementIterator()
@ -1702,7 +1702,7 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxq
return names, nil
}
func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
var names [][]byte
mitr, err := is.measurementIterator()
@ -1713,14 +1713,14 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op infl
}
defer mitr.Close()
var checkMeasurement func(auth query.Authorizer, me []byte) (bool, error)
var checkMeasurement func(auth query.FineAuthorizer, me []byte) (bool, error)
switch op {
case influxql.EQ:
checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
checkMeasurement = func(auth query.FineAuthorizer, me []byte) (bool, error) {
return is.measurementHasTagValue(auth, me, []byte(key), []byte(val))
}
case influxql.NEQ:
checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
checkMeasurement = func(auth query.FineAuthorizer, me []byte) (bool, error) {
// If there is an authorized series in this measurement and that series
// does not contain the tag key/value.
ok := is.measurementAuthorizedSeries(auth, me, func(tags models.Tags) bool {
@ -1729,11 +1729,11 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op infl
return ok, nil
}
case influxql.EQREGEX:
checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
checkMeasurement = func(auth query.FineAuthorizer, me []byte) (bool, error) {
return is.measurementHasTagValueRegex(auth, me, []byte(key), regex)
}
case influxql.NEQREGEX:
checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
checkMeasurement = func(auth query.FineAuthorizer, me []byte) (bool, error) {
// If there is an authorized series in this measurement and that series
// does not contain the tag key/value.
ok := is.measurementAuthorizedSeries(auth, me, func(tags models.Tags) bool {
@ -1767,7 +1767,7 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op infl
// measurementAuthorizedSeries determines if the measurement contains a series
// that is authorized to be read.
func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byte, exclude func(tags models.Tags) bool) bool {
func (is IndexSet) measurementAuthorizedSeries(auth query.FineAuthorizer, name []byte, exclude func(tags models.Tags) bool) bool {
if query.AuthorizerIsOpen(auth) && exclude == nil {
return true
}
@ -1803,7 +1803,7 @@ func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byt
}
}
func (is IndexSet) measurementHasTagValue(auth query.Authorizer, me, key, value []byte) (bool, error) {
func (is IndexSet) measurementHasTagValue(auth query.FineAuthorizer, me, key, value []byte) (bool, error) {
if len(value) == 0 {
return is.measurementHasEmptyTagValue(auth, me, key)
}
@ -1841,7 +1841,7 @@ func (is IndexSet) measurementHasTagValue(auth query.Authorizer, me, key, value
}
}
func (is IndexSet) measurementHasEmptyTagValue(auth query.Authorizer, me, key []byte) (bool, error) {
func (is IndexSet) measurementHasEmptyTagValue(auth query.FineAuthorizer, me, key []byte) (bool, error) {
// Any series that does not have a tag key
// has an empty tag value for that key.
// Iterate through all of the series to find one
@ -1876,7 +1876,7 @@ func (is IndexSet) measurementHasEmptyTagValue(auth query.Authorizer, me, key []
}
}
func (is IndexSet) measurementHasTagValueRegex(auth query.Authorizer, me, key []byte, value *regexp.Regexp) (bool, error) {
func (is IndexSet) measurementHasTagValueRegex(auth query.FineAuthorizer, me, key []byte, value *regexp.Regexp) (bool, error) {
// If the regex matches the empty string, do a special check to see
// if we have an empty tag value.
if matchEmpty := value.MatchString(""); matchEmpty {
@ -2037,7 +2037,7 @@ func (is IndexSet) tagValueIterator(name, key []byte) (TagValueIterator, error)
// TagKeyHasAuthorizedSeries determines if there exists an authorized series for
// the provided measurement name and tag key.
func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey []byte) (bool, error) {
func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.FineAuthorizer, name, tagKey []byte) (bool, error) {
if !is.HasInmemIndex() && query.AuthorizerIsOpen(auth) {
return true, nil
}
@ -2244,7 +2244,7 @@ func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Ex
type measurementSeriesKeyByExprIterator struct {
ids SeriesIDIterator
is IndexSet
auth query.Authorizer
auth query.FineAuthorizer
once sync.Once
releaser func()
}
@ -2309,7 +2309,7 @@ func (itr *measurementSeriesKeyByExprIterator) Close() error {
// MeasurementSeriesKeyByExprIterator iterates through series, filtered by an expression on the tags.
// Any non-tag expressions will be filtered as if the field had the zero value.
func (is IndexSet) MeasurementSeriesKeyByExprIterator(name []byte, expr influxql.Expr, auth query.Authorizer) (SeriesKeyIterator, error) {
func (is IndexSet) MeasurementSeriesKeyByExprIterator(name []byte, expr influxql.Expr, auth query.FineAuthorizer) (SeriesKeyIterator, error) {
release := is.SeriesFile.Retain()
// Create iterator for all matching series.
ids, err := is.measurementSeriesByExprIterator(name, expr)
@ -2770,7 +2770,7 @@ func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byt
//
// tagValuesByKeyAndExpr guarantees to never take any locks on the underlying
// series file.
func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) {
func (is IndexSet) tagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) {
database := is.Database()
valueExpr, remainingExpr, err := influxql.PartitionExpr(influxql.CloneExpr(expr), func(e influxql.Expr) (bool, error) {
@ -2874,7 +2874,7 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, key
}
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.FineAuthorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
if len(keys) == 0 {
return nil, nil
}

View File

@ -378,7 +378,7 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
// TagKeyHasAuthorizedSeries determines if there exists an authorized series for
// the provided measurement name and tag key.
func (i *Index) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool {
func (i *Index) TagKeyHasAuthorizedSeries(auth query.FineAuthorizer, name []byte, key string) bool {
i.mu.RLock()
mm := i.measurements[string(name)]
i.mu.RUnlock()
@ -422,7 +422,7 @@ func (i *Index) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, ke
//
// See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this
// method.
func (i *Index) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
func (i *Index) MeasurementTagKeyValuesByExpr(auth query.FineAuthorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
i.mu.RLock()
mm := i.measurements[string(name)]
i.mu.RUnlock()
@ -540,7 +540,7 @@ func (i *Index) TagsForSeries(key string) (models.Tags, error) {
//
// TODO(edd): Remove authorisation from these methods. There shouldn't need to
// be any auth passed down into the index.
func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
func (i *Index) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
i.mu.RLock()
defer i.mu.RUnlock()
@ -559,7 +559,7 @@ func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr
return i.measurementNamesByExpr(auth, expr)
}
func (i *Index) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
func (i *Index) measurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) {
if expr == nil {
return nil, nil
}
@ -625,7 +625,7 @@ func (i *Index) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr
}
// measurementNamesByNameFilter returns the sorted measurements matching a name.
func (i *Index) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) [][]byte {
func (i *Index) measurementNamesByNameFilter(auth query.FineAuthorizer, op influxql.Token, val string, regex *regexp.Regexp) [][]byte {
var names [][]byte
for _, m := range i.measurements {
var matched bool
@ -649,7 +649,7 @@ func (i *Index) measurementNamesByNameFilter(auth query.Authorizer, op influxql.
}
// measurementNamesByTagFilters returns the sorted measurements matching the filters on tag values.
func (i *Index) measurementNamesByTagFilters(auth query.Authorizer, filter *TagFilter) [][]byte {
func (i *Index) measurementNamesByTagFilters(auth query.FineAuthorizer, filter *TagFilter) [][]byte {
// Build a list of measurements matching the filters.
var names [][]byte
var tagMatch bool

View File

@ -90,7 +90,7 @@ func (m *measurement) bytes() int {
// Authorized determines if this Measurement is authorized to be read, according
// to the provided Authorizer. A measurement is authorized to be read if at
// least one undeleted series from the measurement is authorized to be read.
func (m *measurement) Authorized(auth query.Authorizer) bool {
func (m *measurement) Authorized(auth query.FineAuthorizer) bool {
// Note(edd): the cost of this check scales linearly with the number of series
// belonging to a measurement, which means it may become expensive when there
// are large numbers of series on a measurement.
@ -1423,7 +1423,7 @@ func (m *measurement) TagKeys() []string {
}
// TagValues returns all the values for the given tag key, in an arbitrary order.
func (m *measurement) TagValues(auth query.Authorizer, key string) []string {
func (m *measurement) TagValues(auth query.FineAuthorizer, key string) []string {
m.mu.RLock()
defer m.mu.RUnlock()
values := make([]string, 0, m.seriesByTagKeyValue[key].Cardinality())

View File

@ -1564,7 +1564,7 @@ func (s *Store) WriteToShard(writeCtx WriteContext, shardID uint64, points []mod
// MeasurementNames returns a slice of all measurements. Measurements accepts an
// optional condition expression. If cond is nil, then all measurements for the
// database will be returned.
func (s *Store) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
func (s *Store) MeasurementNames(ctx context.Context, auth query.FineAuthorizer, database string, cond influxql.Expr) ([][]byte, error) {
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
@ -1611,7 +1611,7 @@ func (a TagKeysSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a TagKeysSlice) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement }
// TagKeys returns the tag keys in the given database, matching the condition.
func (s *Store) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) {
func (s *Store) TagKeys(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) {
if len(shardIDs) == 0 {
return nil, nil
}
@ -1809,7 +1809,7 @@ func isTagKeyClause(e influxql.Expr) (bool, error) {
// TagValues returns the tag keys and values for the provided shards, where the
// tag values satisfy the provided condition.
func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) {
func (s *Store) TagValues(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) {
if len(shardIDs) == 0 {
return nil, nil
}