From 79168cf671d2b2cd46bb8644023993a2080f1dad Mon Sep 17 00:00:00 2001 From: Sam Arnold Date: Tue, 23 Mar 2021 09:29:52 -0300 Subject: [PATCH] refactor: separate coarse and fine permission interfaces (#20996) (#21035) (cherry picked from commit b7e7de24d6a71fe6a0b999dbf1ae3c51e099ad13) --- coordinator/statement_executor.go | 8 +- coordinator/statement_executor_test.go | 22 +---- internal/authorizer.go | 6 +- internal/tsdb_store.go | 12 +-- query/executor.go | 42 ++++++--- query/iterator.go | 2 +- query/select.go | 2 +- query/subquery_test.go | 3 + services/httpd/handler.go | 30 ++++-- services/httpd/handler_test.go | 4 + services/meta/data.go | 13 ++- services/meta/query_authorizer.go | 125 ++++++++++++++----------- services/meta/write_authorizer.go | 19 +++- tsdb/index.go | 40 ++++---- tsdb/index/inmem/inmem.go | 12 +-- tsdb/index/inmem/meta.go | 4 +- tsdb/shard.go | 2 +- tsdb/store.go | 6 +- 18 files changed, 211 insertions(+), 141 deletions(-) diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 533f9463b7..6d5083bad4 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -661,7 +661,7 @@ func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql func (e *StatementExecutor) executeShowDatabasesStatement(ctx *query.ExecutionContext, q *influxql.ShowDatabasesStatement) (models.Rows, error) { dis := e.MetaClient.Databases() - a := ctx.ExecutionOptions.Authorizer + a := ctx.ExecutionOptions.CoarseAuthorizer row := &models.Row{Name: "databases", Columns: []string{"name"}} for _, di := range dis { @@ -1374,9 +1374,9 @@ type TSDBStore interface { DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error DeleteShard(id uint64) error - MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) - TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) - TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) + MeasurementNames(ctx context.Context, auth query.FineAuthorizer, database string, cond influxql.Expr) ([][]byte, error) + TagKeys(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) + TagValues(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) SeriesCardinality(ctx context.Context, database string) (int64, error) MeasurementsCardinality(ctx context.Context, database string) (int64, error) diff --git a/coordinator/statement_executor_test.go b/coordinator/statement_executor_test.go index 5d64c63e98..8855f4ac31 100644 --- a/coordinator/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -383,26 +383,14 @@ func TestStatementExecutor_NormalizeDeleteSeries(t *testing.T) { } } -type mockAuthorizer struct { +type mockCoarseAuthorizer struct { AuthorizeDatabaseFn func(influxql.Privilege, string) bool } -func (a *mockAuthorizer) AuthorizeDatabase(p influxql.Privilege, name string) bool { +func (a *mockCoarseAuthorizer) AuthorizeDatabase(p influxql.Privilege, name string) bool { return a.AuthorizeDatabaseFn(p, name) } -func (m *mockAuthorizer) AuthorizeQuery(database string, query *influxql.Query) error { - panic("fail") -} - -func (m *mockAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool { - panic("fail") -} - -func (m *mockAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool { - panic("fail") -} - func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) { qe := query.NewExecutor() qe.StatementExecutor = &coordinator.StatementExecutor{ @@ -416,7 +404,7 @@ func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) { } opt := query.ExecutionOptions{ - Authorizer: &mockAuthorizer{ + CoarseAuthorizer: &mockCoarseAuthorizer{ AuthorizeDatabaseFn: func(p influxql.Privilege, name string) bool { return name == "db2" || name == "db4" }, @@ -468,11 +456,11 @@ func NewQueryExecutor() *QueryExecutor { return nil } - e.TSDBStore.MeasurementNamesFn = func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { + e.TSDBStore.MeasurementNamesFn = func(auth query.FineAuthorizer, database string, cond influxql.Expr) ([][]byte, error) { return nil, nil } - e.TSDBStore.TagValuesFn = func(_ query.Authorizer, _ []uint64, _ influxql.Expr) ([]tsdb.TagValues, error) { + e.TSDBStore.TagValuesFn = func(_ query.FineAuthorizer, _ []uint64, _ influxql.Expr) ([]tsdb.TagValues, error) { return nil, nil } diff --git a/internal/authorizer.go b/internal/authorizer.go index 07847f5258..5a2e9df1c2 100644 --- a/internal/authorizer.go +++ b/internal/authorizer.go @@ -5,7 +5,7 @@ import ( "github.com/influxdata/influxql" ) -// AuthorizerMock is a mockable implementation of a query.Authorizer. +// AuthorizerMock is a mockable implementation of a query.FineAuthorizer + query.CoarseAuthorizer type AuthorizerMock struct { AuthorizeDatabaseFn func(influxql.Privilege, string) bool AuthorizeQueryFn func(database string, query *influxql.Query) error @@ -36,3 +36,7 @@ func (a *AuthorizerMock) AuthorizeSeriesRead(database string, measurement []byte func (a *AuthorizerMock) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool { return a.AuthorizeSeriesWriteFn(database, measurement, tags) } + +func (a *AuthorizerMock) IsOpen() bool { + return false +} diff --git a/internal/tsdb_store.go b/internal/tsdb_store.go index 6c53778551..fc3eecfd3a 100644 --- a/internal/tsdb_store.go +++ b/internal/tsdb_store.go @@ -31,7 +31,7 @@ type TSDBStoreMock struct { ImportShardFn func(id uint64, r io.Reader) error MeasurementSeriesCountsFn func(database string) (measuments int, series int) MeasurementsCardinalityFn func(database string) (int64, error) - MeasurementNamesFn func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) + MeasurementNamesFn func(auth query.FineAuthorizer, database string, cond influxql.Expr) ([][]byte, error) OpenFn func() error PathFn func() string RestoreShardFn func(id uint64, r io.Reader) error @@ -44,8 +44,8 @@ type TSDBStoreMock struct { ShardRelativePathFn func(id uint64) (string, error) ShardsFn func(ids []uint64) []*tsdb.Shard StatisticsFn func(tags map[string]string) []models.Statistic - TagKeysFn func(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) - TagValuesFn func(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) + TagKeysFn func(auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) + TagValuesFn func(auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) WithLoggerFn func(log *zap.Logger) WriteToShardFn func(shardID uint64, points []models.Point) error } @@ -93,7 +93,7 @@ func (s *TSDBStoreMock) ExpandSources(sources influxql.Sources) (influxql.Source func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error { return s.ImportShardFn(id, r) } -func (s *TSDBStoreMock) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { +func (s *TSDBStoreMock) MeasurementNames(ctx context.Context, auth query.FineAuthorizer, database string, cond influxql.Expr) ([][]byte, error) { return s.MeasurementNamesFn(auth, database, cond) } func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measuments int, series int) { @@ -138,10 +138,10 @@ func (s *TSDBStoreMock) Shards(ids []uint64) []*tsdb.Shard { func (s *TSDBStoreMock) Statistics(tags map[string]string) []models.Statistic { return s.StatisticsFn(tags) } -func (s *TSDBStoreMock) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) { +func (s *TSDBStoreMock) TagKeys(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) { return s.TagKeysFn(auth, shardIDs, cond) } -func (s *TSDBStoreMock) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) { +func (s *TSDBStoreMock) TagValues(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) { return s.TagValuesFn(auth, shardIDs, cond) } func (s *TSDBStoreMock) WithLogger(log *zap.Logger) { diff --git a/query/executor.go b/query/executor.go index beddc3ea88..e15ee91980 100644 --- a/query/executor.go +++ b/query/executor.go @@ -68,19 +68,33 @@ func ErrMaxConcurrentQueriesLimitExceeded(n, limit int) error { return fmt.Errorf("max-concurrent-queries limit exceeded(%d, %d)", n, limit) } -// Authorizer determines if certain operations are authorized. -type Authorizer interface { +// CoarseAuthorizer determines if certain operations are authorized at the database level. +// +// It is supported both in OSS and Enterprise. +type CoarseAuthorizer interface { // AuthorizeDatabase indicates whether the given Privilege is authorized on the database with the given name. AuthorizeDatabase(p influxql.Privilege, name string) bool +} - // AuthorizeQuery returns an error if the query cannot be executed - AuthorizeQuery(database string, query *influxql.Query) error +type openCoarseAuthorizer struct{} +func (a openCoarseAuthorizer) AuthorizeDatabase(influxql.Privilege, string) bool { return true } + +// OpenCoarseAuthorizer is a fully permissive implementation of CoarseAuthorizer. +var OpenCoarseAuthorizer openCoarseAuthorizer + +// FineAuthorizer determines if certain operations are authorized at the series level. +// +// It is only supported in InfluxDB Enterprise. In OSS it always returns true. +type FineAuthorizer interface { // AuthorizeSeriesRead determines if a series is authorized for reading AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool // AuthorizeSeriesWrite determines if a series is authorized for writing AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool + + // IsOpen guarantees that the other methods of a FineAuthorizer always return true. + IsOpen() bool } // OpenAuthorizer is the Authorizer used when authorization is disabled. @@ -90,9 +104,6 @@ type openAuthorizer struct{} // OpenAuthorizer can be shared by all goroutines. var OpenAuthorizer = openAuthorizer{} -// AuthorizeDatabase returns true to allow any operation on a database. -func (a openAuthorizer) AuthorizeDatabase(influxql.Privilege, string) bool { return true } - // AuthorizeSeriesRead allows access to any series. func (a openAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool { return true @@ -103,6 +114,8 @@ func (a openAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte return true } +func (a openAuthorizer) IsOpen() bool { return true } + // AuthorizeSeriesRead allows any query to execute. func (a openAuthorizer) AuthorizeQuery(_ string, _ *influxql.Query) error { return nil } @@ -110,11 +123,8 @@ func (a openAuthorizer) AuthorizeQuery(_ string, _ *influxql.Query) error { retu // authorize anything. A nil Authorizer returns true for this function, and this // function should be preferred over directly checking if an Authorizer is nil // or not. -func AuthorizerIsOpen(a Authorizer) bool { - if u, ok := a.(interface{ AuthorizeUnrestricted() bool }); ok { - return u.AuthorizeUnrestricted() - } - return a == nil || a == OpenAuthorizer +func AuthorizerIsOpen(a FineAuthorizer) bool { + return a == nil || a.IsOpen() } // ExecutionOptions contains the options for executing a query. @@ -125,9 +135,11 @@ type ExecutionOptions struct { // The retention policy the query is running against. RetentionPolicy string - // How to determine whether the query is allowed to execute, - // what resources can be returned in SHOW queries, etc. - Authorizer Authorizer + // Authorizer handles series-level authorization + Authorizer FineAuthorizer + + // CoarseAuthorizer handles database-level authorization + CoarseAuthorizer CoarseAuthorizer // The requested maximum number of points to return in each result. ChunkSize int diff --git a/query/iterator.go b/query/iterator.go index 8b4b42007c..79a84a3bf8 100644 --- a/query/iterator.go +++ b/query/iterator.go @@ -613,7 +613,7 @@ type IteratorOptions struct { InterruptCh <-chan struct{} // Authorizer can limit access to data - Authorizer Authorizer + Authorizer FineAuthorizer } // newIteratorOptionsStmt creates the iterator options from stmt. diff --git a/query/select.go b/query/select.go index c8702ba178..ceb11c3de7 100644 --- a/query/select.go +++ b/query/select.go @@ -23,7 +23,7 @@ var DefaultTypeMapper = influxql.MultiTypeMapper( // SelectOptions are options that customize the select call. type SelectOptions struct { // Authorizer is used to limit access to data - Authorizer Authorizer + Authorizer FineAuthorizer // Node to exclusively read from. // If zero, all nodes are used. diff --git a/query/subquery_test.go b/query/subquery_test.go index 6589e524b2..8789642def 100644 --- a/query/subquery_test.go +++ b/query/subquery_test.go @@ -402,6 +402,9 @@ func (*openAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, func (*openAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool { return true } +func (*openAuthorizer) IsOpen() bool { + return true +} // Ensure that the subquery gets passed the query authorizer. func TestSubquery_Authorizer(t *testing.T) { diff --git a/services/httpd/handler.go b/services/httpd/handler.go index ef3731ca3b..145d7c92f9 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -85,6 +85,21 @@ type Route struct { HandlerFunc interface{} } +type QueryAuthorizer interface { + AuthorizeQuery(u meta.User, query *influxql.Query, database string) error + AuthorizeDatabase(u meta.User, priv influxql.Privilege, database string) error +} + +// userQueryAuthorizer binds the QueryAuthorizer with a specific user for consumption by the query engine. +type userQueryAuthorizer struct { + auth QueryAuthorizer + user meta.User +} + +func (a *userQueryAuthorizer) AuthorizeDatabase(p influxql.Privilege, name string) bool { + return a.auth.AuthorizeDatabase(a.user, p, name) == nil +} + // Handler represents an HTTP handler for the InfluxDB server. type Handler struct { mux *pat.PatternServeMux @@ -99,9 +114,7 @@ type Handler struct { AdminUserExists() bool } - QueryAuthorizer interface { - AuthorizeQuery(u meta.User, query *influxql.Query, database string) error - } + QueryAuthorizer QueryAuthorizer WriteAuthorizer interface { AuthorizeWrite(username, database string) error @@ -234,6 +247,7 @@ func NewHandler(c Config) *Handler { if h.Config.AuthEnabled && h.Config.PprofEnabled && h.Config.PprofAuthEnabled { authWrapper = func(handler func(http.ResponseWriter, *http.Request)) interface{} { return func(w http.ResponseWriter, r *http.Request, user meta.User) { + // TODO: This is the only place we use AuthorizeUnrestricted. It would be better to use an explicit permission if user == nil || !user.AuthorizeUnrestricted() { h.Logger.Info("Unauthorized request", zap.String("user", user.ID()), zap.String("path", r.URL.Path)) h.httpError(w, "error authorizing admin access", http.StatusForbidden) @@ -595,14 +609,18 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U ChunkSize: chunkSize, ReadOnly: r.Method == "GET", NodeID: nodeID, + // Authorizer is for fine grained auth, not supported by oss. + Authorizer: query.OpenAuthorizer, } if h.Config.AuthEnabled { // The current user determines the authorized actions. - opts.Authorizer = user + opts.CoarseAuthorizer = &userQueryAuthorizer{ + auth: h.QueryAuthorizer, + user: user, + } } else { - // Auth is disabled, so allow everything. - opts.Authorizer = query.OpenAuthorizer + opts.CoarseAuthorizer = query.OpenCoarseAuthorizer } // Make sure if the client disconnects we signal the query to abort diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index df5eb410b9..c6c758f919 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -2040,6 +2040,10 @@ func (a *HandlerQueryAuthorizer) AuthorizeQuery(u meta.User, query *influxql.Que return a.AuthorizeQueryFn(u, query, database) } +func (a *HandlerQueryAuthorizer) AuthorizeDatabase(u meta.User, priv influxql.Privilege, database string) error { + panic("not implemented") +} + type HandlerPointsWriter struct { WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error } diff --git a/services/meta/data.go b/services/meta/data.go index f0bc7e365c..06803d4c6f 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -1574,7 +1574,7 @@ func (cqi *ContinuousQueryInfo) unmarshal(pb *internal.ContinuousQueryInfo) { cqi.Query = pb.GetQuery() } -var _ query.Authorizer = (*UserInfo)(nil) +var _ query.FineAuthorizer = (*UserInfo)(nil) // UserInfo represents metadata about a user in the system. type UserInfo struct { @@ -1592,7 +1592,7 @@ type UserInfo struct { } type User interface { - query.Authorizer + query.FineAuthorizer ID() string AuthorizeUnrestricted() bool } @@ -1620,7 +1620,14 @@ func (u *UserInfo) AuthorizeSeriesWrite(database string, measurement []byte, tag return true } -// AuthorizeUnrestricted allows admins to shortcut access checks. +// IsOpen is a method on FineAuthorizer to indicate all fine auth is permitted and short circuit some checks. +func (u *UserInfo) IsOpen() bool { + return true +} + +// AuthorizeUnrestricted identifies the admin user +// +// Only the pprof endpoint uses this, we should prefer to have explicit permissioning instead. func (u *UserInfo) AuthorizeUnrestricted() bool { return u.Admin } diff --git a/services/meta/query_authorizer.go b/services/meta/query_authorizer.go index acf92a8663..741544c644 100644 --- a/services/meta/query_authorizer.go +++ b/services/meta/query_authorizer.go @@ -48,7 +48,64 @@ func (a *QueryAuthorizer) AuthorizeQuery(u User, query *influxql.Query, database } } - return u.AuthorizeQuery(database, query) + // There is only one OSS implementation of the User interface, and the OSS QueryAuthorizer only works + // with the OSS UserInfo. There is a similar tight coupling between the Enterprise QueryAuthorizer and + // Enterprise UserInfo in closed-source code. + switch user := u.(type) { + case *UserInfo: + // Admin privilege allows the user to execute all statements. + if user.Admin { + return nil + } + + // Check each statement in the query. + for _, stmt := range query.Statements { + // Get the privileges required to execute the statement. + privs, err := stmt.RequiredPrivileges() + if err != nil { + return err + } + + // Make sure the user has the privileges required to execute + // each statement. + for _, p := range privs { + if p.Admin { + // Admin privilege already checked so statement requiring admin + // privilege cannot be run. + return &ErrAuthorize{ + Query: query, + User: user.Name, + Database: database, + Message: fmt.Sprintf("statement '%s', requires admin privilege", stmt), + } + } + + // Use the db name specified by the statement or the db + // name passed by the caller if one wasn't specified by + // the statement. + db := p.Name + if db == "" { + db = database + } + if !user.AuthorizeDatabase(p.Privilege, db) { + return &ErrAuthorize{ + Query: query, + User: user.Name, + Database: database, + Message: fmt.Sprintf("statement '%s', requires %s on %s", stmt, p.Privilege.String(), db), + } + } + } + } + return nil + default: + } + return &ErrAuthorize{ + Query: query, + User: u.ID(), + Database: database, + Message: fmt.Sprintf("Invalid OSS user type %T", u), + } } func (a *QueryAuthorizer) AuthorizeDatabase(u User, priv influxql.Privilege, database string) error { @@ -59,63 +116,23 @@ func (a *QueryAuthorizer) AuthorizeDatabase(u User, priv influxql.Privilege, dat } } - if !u.AuthorizeDatabase(priv, database) { - return &ErrAuthorize{ - Database: database, - Message: fmt.Sprintf("user %q, requires %s for database %q", u.ID(), priv.String(), database), + switch user := u.(type) { + case *UserInfo: + if !user.AuthorizeDatabase(priv, database) { + return &ErrAuthorize{ + Database: database, + Message: fmt.Sprintf("user %q, requires %s for database %q", u.ID(), priv.String(), database), + } } - } - - return nil -} - -func (u *UserInfo) AuthorizeQuery(database string, query *influxql.Query) error { - - // Admin privilege allows the user to execute all statements. - if u.Admin { return nil + default: + } + return &ErrAuthorize{ + Database: database, + User: u.ID(), + Message: fmt.Sprintf("Internal error - incorrect oss user type %T", u), } - // Check each statement in the query. - for _, stmt := range query.Statements { - // Get the privileges required to execute the statement. - privs, err := stmt.RequiredPrivileges() - if err != nil { - return err - } - - // Make sure the user has the privileges required to execute - // each statement. - for _, p := range privs { - if p.Admin { - // Admin privilege already checked so statement requiring admin - // privilege cannot be run. - return &ErrAuthorize{ - Query: query, - User: u.Name, - Database: database, - Message: fmt.Sprintf("statement '%s', requires admin privilege", stmt), - } - } - - // Use the db name specified by the statement or the db - // name passed by the caller if one wasn't specified by - // the statement. - db := p.Name - if db == "" { - db = database - } - if !u.AuthorizeDatabase(p.Privilege, db) { - return &ErrAuthorize{ - Query: query, - User: u.Name, - Database: database, - Message: fmt.Sprintf("statement '%s', requires %s on %s", stmt, p.Privilege.String(), db), - } - } - } - } - return nil } // ErrAuthorize represents an authorization error. diff --git a/services/meta/write_authorizer.go b/services/meta/write_authorizer.go index 51f3ebd038..541671366c 100644 --- a/services/meta/write_authorizer.go +++ b/services/meta/write_authorizer.go @@ -19,11 +19,28 @@ func NewWriteAuthorizer(c *Client) *WriteAuthorizer { // AuthorizeWrite returns nil if the user has permission to write to the database. func (a WriteAuthorizer) AuthorizeWrite(username, database string) error { u, err := a.Client.User(username) - if err != nil || u == nil || !u.AuthorizeDatabase(influxql.WritePrivilege, database) { + if err != nil || u == nil { return &ErrAuthorize{ Database: database, Message: fmt.Sprintf("%s not authorized to write to %s", username, database), } } + // There is only one OSS implementation of the User interface, and the OSS WriteAuthorizer only works + // with the OSS UserInfo. There is a similar tight coupling between the Enterprise WriteAuthorizer and + // Enterprise UserInfo in closed-source code. + switch user := u.(type) { + case *UserInfo: + if !user.AuthorizeDatabase(influxql.WritePrivilege, database) { + return &ErrAuthorize{ + Database: database, + Message: fmt.Sprintf("%s not authorized to write to %s", username, database), + } + } + default: + return &ErrAuthorize{ + Database: database, + Message: fmt.Sprintf("Internal error - wrong type %T for oss user", u), + } + } return nil } diff --git a/tsdb/index.go b/tsdb/index.go index 474eeb0579..c08a295421 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -1314,7 +1314,7 @@ func (is IndexSet) DedupeInmemIndexes() IndexSet { // MeasurementNamesByExpr returns a slice of measurement names matching the // provided condition. If no condition is provided then all names are returned. -func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { +func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { release := is.SeriesFile.Retain() defer release() @@ -1354,7 +1354,7 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.E return slices.CopyChunkedByteSlices(names, 1000), nil } -func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { +func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { if expr == nil { return nil, nil } @@ -1421,7 +1421,7 @@ func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.E } // measurementNamesByNameFilter returns matching measurement names in sorted order. -func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) { +func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) { itr, err := is.measurementIterator() if err != nil { return nil, err @@ -1463,7 +1463,7 @@ func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influx // provided condition. If no condition is provided then all names are returned. // This behaves differently from MeasurementNamesByExpr because it will // return measurements using flux predicates. -func (is IndexSet) MeasurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { +func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { release := is.SeriesFile.Retain() defer release() @@ -1503,7 +1503,7 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.Authorizer, expr influ return slices.CopyChunkedByteSlices(names, 1000), nil } -func (is IndexSet) measurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { +func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { if expr == nil { return nil, nil } @@ -1569,7 +1569,7 @@ func (is IndexSet) measurementNamesByPredicate(auth query.Authorizer, expr influ } } -func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { +func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { var names [][]byte mitr, err := is.measurementIterator() @@ -1696,7 +1696,7 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxq return names, nil } -func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { +func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { var names [][]byte mitr, err := is.measurementIterator() @@ -1707,14 +1707,14 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op infl } defer mitr.Close() - var checkMeasurement func(auth query.Authorizer, me []byte) (bool, error) + var checkMeasurement func(auth query.FineAuthorizer, me []byte) (bool, error) switch op { case influxql.EQ: - checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) { + checkMeasurement = func(auth query.FineAuthorizer, me []byte) (bool, error) { return is.measurementHasTagValue(auth, me, []byte(key), []byte(val)) } case influxql.NEQ: - checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) { + checkMeasurement = func(auth query.FineAuthorizer, me []byte) (bool, error) { // If there is an authorized series in this measurement and that series // does not contain the tag key/value. ok := is.measurementAuthorizedSeries(auth, me, func(tags models.Tags) bool { @@ -1723,11 +1723,11 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op infl return ok, nil } case influxql.EQREGEX: - checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) { + checkMeasurement = func(auth query.FineAuthorizer, me []byte) (bool, error) { return is.measurementHasTagValueRegex(auth, me, []byte(key), regex) } case influxql.NEQREGEX: - checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) { + checkMeasurement = func(auth query.FineAuthorizer, me []byte) (bool, error) { // If there is an authorized series in this measurement and that series // does not contain the tag key/value. ok := is.measurementAuthorizedSeries(auth, me, func(tags models.Tags) bool { @@ -1761,7 +1761,7 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op infl // measurementAuthorizedSeries determines if the measurement contains a series // that is authorized to be read. -func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byte, exclude func(tags models.Tags) bool) bool { +func (is IndexSet) measurementAuthorizedSeries(auth query.FineAuthorizer, name []byte, exclude func(tags models.Tags) bool) bool { if query.AuthorizerIsOpen(auth) && exclude == nil { return true } @@ -1797,7 +1797,7 @@ func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byt } } -func (is IndexSet) measurementHasTagValue(auth query.Authorizer, me, key, value []byte) (bool, error) { +func (is IndexSet) measurementHasTagValue(auth query.FineAuthorizer, me, key, value []byte) (bool, error) { if len(value) == 0 { return is.measurementHasEmptyTagValue(auth, me, key) } @@ -1835,7 +1835,7 @@ func (is IndexSet) measurementHasTagValue(auth query.Authorizer, me, key, value } } -func (is IndexSet) measurementHasEmptyTagValue(auth query.Authorizer, me, key []byte) (bool, error) { +func (is IndexSet) measurementHasEmptyTagValue(auth query.FineAuthorizer, me, key []byte) (bool, error) { // Any series that does not have a tag key // has an empty tag value for that key. // Iterate through all of the series to find one @@ -1870,7 +1870,7 @@ func (is IndexSet) measurementHasEmptyTagValue(auth query.Authorizer, me, key [] } } -func (is IndexSet) measurementHasTagValueRegex(auth query.Authorizer, me, key []byte, value *regexp.Regexp) (bool, error) { +func (is IndexSet) measurementHasTagValueRegex(auth query.FineAuthorizer, me, key []byte, value *regexp.Regexp) (bool, error) { // If the regex matches the empty string, do a special check to see // if we have an empty tag value. if matchEmpty := value.MatchString(""); matchEmpty { @@ -2031,7 +2031,7 @@ func (is IndexSet) tagValueIterator(name, key []byte) (TagValueIterator, error) // TagKeyHasAuthorizedSeries determines if there exists an authorized series for // the provided measurement name and tag key. -func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey []byte) (bool, error) { +func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.FineAuthorizer, name, tagKey []byte) (bool, error) { if !is.HasInmemIndex() && query.AuthorizerIsOpen(auth) { return true, nil } @@ -2673,7 +2673,7 @@ func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byt // // N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending // lexicographic order. -func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) { +func (is IndexSet) TagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) { release := is.SeriesFile.Retain() defer release() return is.tagValuesByKeyAndExpr(auth, name, keys, expr) @@ -2684,7 +2684,7 @@ func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, key // // tagValuesByKeyAndExpr guarantees to never take any locks on the underlying // series file. -func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) { +func (is IndexSet) tagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) { database := is.Database() valueExpr := influxql.CloneExpr(expr) @@ -2771,7 +2771,7 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, key } // MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression. -func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { +func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.FineAuthorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { if len(keys) == 0 { return nil, nil } diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index adb17c7a67..c60e5a2b30 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -378,7 +378,7 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s // TagKeyHasAuthorizedSeries determines if there exists an authorized series for // the provided measurement name and tag key. -func (i *Index) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool { +func (i *Index) TagKeyHasAuthorizedSeries(auth query.FineAuthorizer, name []byte, key string) bool { i.mu.RLock() mm := i.measurements[string(name)] i.mu.RUnlock() @@ -422,7 +422,7 @@ func (i *Index) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, ke // // See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this // method. -func (i *Index) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { +func (i *Index) MeasurementTagKeyValuesByExpr(auth query.FineAuthorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { i.mu.RLock() mm := i.measurements[string(name)] i.mu.RUnlock() @@ -540,7 +540,7 @@ func (i *Index) TagsForSeries(key string) (models.Tags, error) { // // TODO(edd): Remove authorisation from these methods. There shouldn't need to // be any auth passed down into the index. -func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { +func (i *Index) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { i.mu.RLock() defer i.mu.RUnlock() @@ -559,7 +559,7 @@ func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr return i.measurementNamesByExpr(auth, expr) } -func (i *Index) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { +func (i *Index) measurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { if expr == nil { return nil, nil } @@ -625,7 +625,7 @@ func (i *Index) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr } // measurementNamesByNameFilter returns the sorted measurements matching a name. -func (i *Index) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) [][]byte { +func (i *Index) measurementNamesByNameFilter(auth query.FineAuthorizer, op influxql.Token, val string, regex *regexp.Regexp) [][]byte { var names [][]byte for _, m := range i.measurements { var matched bool @@ -649,7 +649,7 @@ func (i *Index) measurementNamesByNameFilter(auth query.Authorizer, op influxql. } // measurementNamesByTagFilters returns the sorted measurements matching the filters on tag values. -func (i *Index) measurementNamesByTagFilters(auth query.Authorizer, filter *TagFilter) [][]byte { +func (i *Index) measurementNamesByTagFilters(auth query.FineAuthorizer, filter *TagFilter) [][]byte { // Build a list of measurements matching the filters. var names [][]byte var tagMatch bool diff --git a/tsdb/index/inmem/meta.go b/tsdb/index/inmem/meta.go index 1b48f3eb8b..dccd358053 100644 --- a/tsdb/index/inmem/meta.go +++ b/tsdb/index/inmem/meta.go @@ -90,7 +90,7 @@ func (m *measurement) bytes() int { // Authorized determines if this Measurement is authorized to be read, according // to the provided Authorizer. A measurement is authorized to be read if at // least one undeleted series from the measurement is authorized to be read. -func (m *measurement) Authorized(auth query.Authorizer) bool { +func (m *measurement) Authorized(auth query.FineAuthorizer) bool { // Note(edd): the cost of this check scales linearly with the number of series // belonging to a measurement, which means it may become expensive when there // are large numbers of series on a measurement. @@ -1423,7 +1423,7 @@ func (m *measurement) TagKeys() []string { } // TagValues returns all the values for the given tag key, in an arbitrary order. -func (m *measurement) TagValues(auth query.Authorizer, key string) []string { +func (m *measurement) TagValues(auth query.FineAuthorizer, key string) []string { m.mu.RLock() defer m.mu.RUnlock() values := make([]string, 0, m.seriesByTagKeyValue[key].Cardinality()) diff --git a/tsdb/shard.go b/tsdb/shard.go index 94da424e78..7d777887f3 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -839,7 +839,7 @@ func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s // MeasurementTagKeyValuesByExpr returns all the tag keys values for the // provided expression. -func (s *Shard) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { +func (s *Shard) MeasurementTagKeyValuesByExpr(auth query.FineAuthorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { index, err := s.Index() if err != nil { return nil, err diff --git a/tsdb/store.go b/tsdb/store.go index 73017c8180..2ada9f3173 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -1455,7 +1455,7 @@ func (s *Store) WriteToShardWithContext(ctx context.Context, shardID uint64, poi // MeasurementNames returns a slice of all measurements. Measurements accepts an // optional condition expression. If cond is nil, then all measurements for the // database will be returned. -func (s *Store) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) { +func (s *Store) MeasurementNames(ctx context.Context, auth query.FineAuthorizer, database string, cond influxql.Expr) ([][]byte, error) { s.mu.RLock() shards := s.filterShards(byDatabase(database)) s.mu.RUnlock() @@ -1502,7 +1502,7 @@ func (a TagKeysSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a TagKeysSlice) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement } // TagKeys returns the tag keys in the given database, matching the condition. -func (s *Store) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) { +func (s *Store) TagKeys(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) { if len(shardIDs) == 0 { return nil, nil } @@ -1680,7 +1680,7 @@ func (a tagValuesSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[ // TagValues returns the tag keys and values for the provided shards, where the // tag values satisfy the provided condition. -func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) { +func (s *Store) TagValues(ctx context.Context, auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) { if cond == nil { return nil, errors.New("a condition is required") }