From 815f740f4c4f4253dffd0a18471eb9ba02f39c0a Mon Sep 17 00:00:00 2001 From: Joe LeGasse Date: Fri, 5 May 2017 13:20:00 -0400 Subject: [PATCH] initial fga work wip wip fix tests / build --- cmd/influx_inspect/report/report.go | 2 +- cmd/influxd/run/server.go | 2 +- coordinator/meta_client.go | 2 +- coordinator/meta_client_test.go | 4 +-- coordinator/points_writer.go | 11 ++++-- coordinator/points_writer_test.go | 12 +++---- coordinator/statement_executor.go | 1 + coordinator/statement_executor_test.go | 12 +++++++ influxql/iterator.go | 11 ++++++ influxql/query_executor.go | 17 +++++++++- influxql/select.go | 3 ++ internal/meta_client.go | 14 ++++---- models/points.go | 6 ++-- models/points_test.go | 12 ------- services/collectd/service.go | 4 +-- services/collectd/service_test.go | 2 +- services/graphite/service.go | 4 +-- services/graphite/service_test.go | 2 +- services/httpd/handler.go | 36 +++++++++++--------- services/httpd/handler_test.go | 14 ++++---- services/httpd/requests.go | 4 +-- services/meta/client.go | 12 +++---- services/meta/client_test.go | 20 +++++------ services/meta/data.go | 47 ++++++++++++++++++++++---- services/meta/query_authorizer.go | 7 +++- services/opentsdb/handler.go | 4 +-- services/opentsdb/service.go | 4 +-- services/opentsdb/service_test.go | 2 +- services/udp/service.go | 4 +-- services/udp/service_test.go | 2 +- tests/server_helpers.go | 8 ++--- tests/server_test.go | 2 +- tsdb/engine/tsm1/engine.go | 2 +- tsdb/index/tsi1/index.go | 5 +-- 34 files changed, 184 insertions(+), 110 deletions(-) diff --git a/cmd/influx_inspect/report/report.go b/cmd/influx_inspect/report/report.go index 20407f1c82..86e9f5f2d2 100644 --- a/cmd/influx_inspect/report/report.go +++ b/cmd/influx_inspect/report/report.go @@ -102,7 +102,7 @@ func (cmd *Command) Run(args ...string) error { if cmd.detailed { sep := strings.Index(string(key), "#!~#") seriesKey, field := key[:sep], key[sep+4:] - measurement, tags, _ := models.ParseKey(seriesKey) + measurement, tags := models.ParseKey(seriesKey) measCount, ok := measCardinalities[measurement] if !ok { diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index b9cfae9b7f..2f6d4ab52c 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -602,7 +602,7 @@ func stopProfile() { type monitorPointsWriter coordinator.PointsWriter func (pw *monitorPointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error { - return (*coordinator.PointsWriter)(pw).WritePoints(database, retentionPolicy, models.ConsistencyLevelAny, points) + return (*coordinator.PointsWriter)(pw).WritePointsPrivileged(database, retentionPolicy, models.ConsistencyLevelAny, points) } func raftDBExists(dir string) error { diff --git a/coordinator/meta_client.go b/coordinator/meta_client.go index b459bb6b48..1fb825fa33 100644 --- a/coordinator/meta_client.go +++ b/coordinator/meta_client.go @@ -14,7 +14,7 @@ type MetaClient interface { CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) CreateSubscription(database, rp, name, mode string, destinations []string) error - CreateUser(name, password string, admin bool) (*meta.UserInfo, error) + CreateUser(name, password string, admin bool) (meta.User, error) Database(name string) *meta.DatabaseInfo Databases() []meta.DatabaseInfo DropShard(id uint64) error diff --git a/coordinator/meta_client_test.go b/coordinator/meta_client_test.go index 43c8d11338..fd4064dbdb 100644 --- a/coordinator/meta_client_test.go +++ b/coordinator/meta_client_test.go @@ -14,7 +14,7 @@ type MetaClient struct { CreateDatabaseWithRetentionPolicyFn func(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error - CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error) + CreateUserFn func(name, password string, admin bool) (meta.User, error) DatabaseFn func(name string) *meta.DatabaseInfo DatabasesFn func() []meta.DatabaseInfo DataNodeFn func(id uint64) (*meta.NodeInfo, error) @@ -63,7 +63,7 @@ func (c *MetaClient) CreateSubscription(database, rp, name, mode string, destina return c.CreateSubscriptionFn(database, rp, name, mode, destinations) } -func (c *MetaClient) CreateUser(name, password string, admin bool) (*meta.UserInfo, error) { +func (c *MetaClient) CreateUser(name, password string, admin bool) (meta.User, error) { return c.CreateUserFn(name, password, admin) } diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index b3ea6ed7da..d163b9087e 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -281,11 +281,16 @@ func (l sgList) Append(sgi meta.ShardGroupInfo) sgList { // WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of // a cluster structure for information. This is to avoid a circular dependency. func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error { - return w.WritePoints(p.Database, p.RetentionPolicy, models.ConsistencyLevelOne, p.Points) + return w.WritePointsPrivileged(p.Database, p.RetentionPolicy, models.ConsistencyLevelOne, p.Points) } -// WritePoints writes across multiple local and remote data nodes according the consistency level. -func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { +// WritePoints writes the data to the underlying storage. consitencyLevel and user are only used for clustered scenarios +func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user string, points []models.Point) error { + return w.WritePointsPrivileged(database, retentionPolicy, consistencyLevel, points) +} + +// WritePointsPrivileged writes the data to the underlying storage, consitencyLevel is only used for clustered scenarios +func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { atomic.AddInt64(&w.stats.WriteReq, 1) atomic.AddInt64(&w.stats.PointWriteReq, int64(len(points))) diff --git a/coordinator/points_writer_test.go b/coordinator/points_writer_test.go index 3358d01825..e9312b1c73 100644 --- a/coordinator/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -346,25 +346,25 @@ func TestPointsWriter_WritePoints(t *testing.T) { c.Open() defer c.Close() - err := c.WritePoints(pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points) + err := c.WritePointsPrivileged(pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points) if err == nil && test.expErr != nil { - t.Errorf("PointsWriter.WritePoints(): '%s' error: got %v, exp %v", test.name, err, test.expErr) + t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: got %v, exp %v", test.name, err, test.expErr) } if err != nil && test.expErr == nil { - t.Errorf("PointsWriter.WritePoints(): '%s' error: got %v, exp %v", test.name, err, test.expErr) + t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: got %v, exp %v", test.name, err, test.expErr) } if err != nil && test.expErr != nil && err.Error() != test.expErr.Error() { - t.Errorf("PointsWriter.WritePoints(): '%s' error: got %v, exp %v", test.name, err, test.expErr) + t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: got %v, exp %v", test.name, err, test.expErr) } if test.expErr == nil { select { case p := <-subPoints: if !reflect.DeepEqual(p, pr) { - t.Errorf("PointsWriter.WritePoints(): '%s' error: unexpected WritePointsRequest got %v, exp %v", test.name, p, pr) + t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: unexpected WritePointsRequest got %v, exp %v", test.name, p, pr) } default: - t.Errorf("PointsWriter.WritePoints(): '%s' error: Subscriber.Points not called", test.name) + t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: Subscriber.Points not called", test.name) } } } diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 10a6484e15..7300f1a849 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -530,6 +530,7 @@ func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx InterruptCh: ctx.InterruptCh, NodeID: ctx.ExecutionOptions.NodeID, MaxSeriesN: e.MaxSelectSeriesN, + Authorizer: ctx.Authorizer, } // Replace instances of "now()" with the current time, and check the resultant times. diff --git a/coordinator/statement_executor_test.go b/coordinator/statement_executor_test.go index 7268b26074..7ae58cef28 100644 --- a/coordinator/statement_executor_test.go +++ b/coordinator/statement_executor_test.go @@ -201,6 +201,18 @@ func (a *mockAuthorizer) AuthorizeDatabase(p influxql.Privilege, name string) bo return a.AuthorizeDatabaseFn(p, name) } +func (m *mockAuthorizer) AuthorizeQuery(database string, query *influxql.Query) error { + panic("fail") +} + +func (m *mockAuthorizer) AuthorizeSeriesRead(database string, series string) bool { + panic("fail") +} + +func (m *mockAuthorizer) AuthorizeSeriesWrite(database string, series string) bool { + panic("fail") +} + func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) { qe := influxql.NewQueryExecutor() qe.StatementExecutor = &coordinator.StatementExecutor{ diff --git a/influxql/iterator.go b/influxql/iterator.go index aa945e99b1..b5c4406e49 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -643,6 +643,12 @@ type FieldMapper interface { TypeMapper } +// SeriesAuthorizer can be used to limit access on a per-series basis +type SeriesAuthorizer interface { + AuthorizeSeriesRead(database, series string) bool + AuthorizeSeriesWrite(database, series string) bool +} + // IteratorOptions is an object passed to CreateIterator to specify creation options. type IteratorOptions struct { // Expression to iterate for. @@ -694,10 +700,14 @@ type IteratorOptions struct { // If this channel is set and is closed, the iterator should try to exit // and close as soon as possible. InterruptCh <-chan struct{} + + // Authorizer can limit acccess to data + Authorizer Authorizer } // newIteratorOptionsStmt creates the iterator options from stmt. func newIteratorOptionsStmt(stmt *SelectStatement, sopt *SelectOptions) (opt IteratorOptions, err error) { + // Determine time range from the condition. startTime, endTime, err := TimeRange(stmt.Condition) if err != nil { @@ -769,6 +779,7 @@ func newIteratorOptionsStmt(stmt *SelectStatement, sopt *SelectOptions) (opt Ite if sopt != nil { opt.MaxSeriesN = sopt.MaxSeriesN opt.InterruptCh = sopt.InterruptCh + opt.Authorizer = sopt.Authorizer } return opt, nil diff --git a/influxql/query_executor.go b/influxql/query_executor.go index fbd356a287..adf82e9280 100644 --- a/influxql/query_executor.go +++ b/influxql/query_executor.go @@ -60,6 +60,15 @@ func ErrMaxConcurrentQueriesLimitExceeded(n, limit int) error { type Authorizer interface { // AuthorizeDatabase indicates whether the given Privilege is authorized on the database with the given name. AuthorizeDatabase(p Privilege, name string) bool + + // AuthorizeQuery returns an error if the query cannot be executed + AuthorizeQuery(database string, query *Query) error + + // AuthorizeSeriesRead determines if a series is authorized for reading + AuthorizeSeriesRead(database, series string) bool + + // AuthorizeSeriesWrite determines if a series is authorized for writing + AuthorizeSeriesWrite(database, series string) bool } // OpenAuthorizer is the Authorizer used when authorization is disabled. @@ -69,7 +78,13 @@ type OpenAuthorizer struct{} var _ Authorizer = OpenAuthorizer{} // AuthorizeDatabase returns true to allow any operation on a database. -func (OpenAuthorizer) AuthorizeDatabase(Privilege, string) bool { return true } +func (_ OpenAuthorizer) AuthorizeDatabase(Privilege, string) bool { return true } + +func (_ OpenAuthorizer) AuthorizeSeriesRead(database string, series string) bool { return true } + +func (_ OpenAuthorizer) AuthorizeSeriesWrite(database string, series string) bool { return true } + +func (_ OpenAuthorizer) AuthorizeQuery(_ string, _ *Query) error { return nil } // ExecutionOptions contains the options for executing a query. type ExecutionOptions struct { diff --git a/influxql/select.go b/influxql/select.go index 916288df42..76e2627544 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -10,6 +10,9 @@ import ( // SelectOptions are options that customize the select call. type SelectOptions struct { + // Authorizer is used to limit access to data + Authorizer Authorizer + // The lower bound for a select call. MinTime time.Time diff --git a/internal/meta_client.go b/internal/meta_client.go index 7f1f0c445d..e87607ebcc 100644 --- a/internal/meta_client.go +++ b/internal/meta_client.go @@ -16,7 +16,7 @@ type MetaClientMock struct { CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) CreateShardGroupFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error - CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error) + CreateUserFn func(name, password string, admin bool) (meta.User, error) DatabaseFn func(name string) *meta.DatabaseInfo DatabasesFn func() []meta.DatabaseInfo @@ -34,7 +34,7 @@ type MetaClientMock struct { RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error) - AuthenticateFn func(username, password string) (ui *meta.UserInfo, err error) + AuthenticateFn func(username, password string) (ui meta.User, err error) AdminUserExistsFn func() bool SetAdminPrivilegeFn func(username string, admin bool) error SetDataFn func(*meta.Data) error @@ -45,7 +45,7 @@ type MetaClientMock struct { UpdateUserFn func(name, password string) error UserPrivilegeFn func(username, database string) (*influxql.Privilege, error) UserPrivilegesFn func(username string) (map[string]influxql.Privilege, error) - UserFn func(username string) (*meta.UserInfo, error) + UserFn func(username string) (meta.User, error) UsersFn func() []meta.UserInfo } @@ -77,7 +77,7 @@ func (c *MetaClientMock) CreateSubscription(database, rp, name, mode string, des return c.CreateSubscriptionFn(database, rp, name, mode, destinations) } -func (c *MetaClientMock) CreateUser(name, password string, admin bool) (*meta.UserInfo, error) { +func (c *MetaClientMock) CreateUser(name, password string, admin bool) (meta.User, error) { return c.CreateUserFn(name, password, admin) } @@ -153,13 +153,13 @@ func (c *MetaClientMock) UserPrivileges(username string) (map[string]influxql.Pr return c.UserPrivilegesFn(username) } -func (c *MetaClientMock) Authenticate(username, password string) (*meta.UserInfo, error) { +func (c *MetaClientMock) Authenticate(username, password string) (meta.User, error) { return c.AuthenticateFn(username, password) } func (c *MetaClientMock) AdminUserExists() bool { return c.AdminUserExistsFn() } -func (c *MetaClientMock) User(username string) (*meta.UserInfo, error) { return c.UserFn(username) } -func (c *MetaClientMock) Users() []meta.UserInfo { return c.UsersFn() } +func (c *MetaClientMock) User(username string) (meta.User, error) { return c.UserFn(username) } +func (c *MetaClientMock) Users() []meta.UserInfo { return c.UsersFn() } func (c *MetaClientMock) Open() error { return c.OpenFn() } func (c *MetaClientMock) Data() meta.Data { return c.DataFn() } diff --git a/models/points.go b/models/points.go index 1bea4bb1a5..b2d2348117 100644 --- a/models/points.go +++ b/models/points.go @@ -237,7 +237,7 @@ func ParsePointsString(buf string) ([]Point, error) { // // NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf. // This can have the unintended effect preventing buf from being garbage collected. -func ParseKey(buf []byte) (string, Tags, error) { +func ParseKey(buf []byte) (string, Tags) { // Ignore the error because scanMeasurement returns "missing fields" which we ignore // when just parsing a key state, i, _ := scanMeasurement(buf, 0) @@ -246,9 +246,9 @@ func ParseKey(buf []byte) (string, Tags, error) { if state == tagKeyState { tags = parseTags(buf) // scanMeasurement returns the location of the comma if there are tags, strip that off - return string(buf[:i-1]), tags, nil + return string(buf[:i-1]), tags } - return string(buf[:i]), tags, nil + return string(buf[:i]), tags } func ParseTags(buf []byte) (Tags, error) { diff --git a/models/points_test.go b/models/points_test.go index de4bd92e81..4875aa7363 100644 --- a/models/points_test.go +++ b/models/points_test.go @@ -2088,18 +2088,6 @@ func TestNewPointsRejectsMaxKey(t *testing.T) { } -func TestParseKeyEmpty(t *testing.T) { - if _, _, err := models.ParseKey(nil); err != nil { - t.Fatalf("unexpected error: %v", err) - } -} - -func TestParseKeyMissingValue(t *testing.T) { - if _, _, err := models.ParseKey([]byte("cpu,foo ")); err != nil { - t.Fatalf("unexpected error: %v", err) - } -} - func TestPoint_FieldIterator_Simple(t *testing.T) { p, err := models.ParsePoints([]byte(`m v=42i,f=42 36`)) diff --git a/services/collectd/service.go b/services/collectd/service.go index cbfb90a0d7..f24236af3e 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -35,7 +35,7 @@ const ( // pointsWriter is an internal interface to make testing easier. type pointsWriter interface { - WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error + WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error } // metaClient is an internal interface to make testing easier. @@ -374,7 +374,7 @@ func (s *Service) writePoints() { continue } - if err := s.PointsWriter.WritePoints(s.Config.Database, s.Config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil { + if err := s.PointsWriter.WritePointsPrivileged(s.Config.Database, s.Config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil { atomic.AddInt64(&s.stats.BatchesTransmitted, 1) atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) } else { diff --git a/services/collectd/service_test.go b/services/collectd/service_test.go index 78522b7d79..e9d4709373 100644 --- a/services/collectd/service_test.go +++ b/services/collectd/service_test.go @@ -365,7 +365,7 @@ func NewTestService(batchSize int, batchDuration time.Duration) *TestService { return s } -func (w *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { +func (w *TestService) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { return w.WritePointsFn(database, retentionPolicy, consistencyLevel, points) } diff --git a/services/graphite/service.go b/services/graphite/service.go index a17412a675..1b28d2f9c1 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -79,7 +79,7 @@ type Service struct { DeregisterDiagnosticsClient(name string) } PointsWriter interface { - WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error + WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error } MetaClient interface { CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) @@ -444,7 +444,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { continue } - if err := s.PointsWriter.WritePoints(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil { + if err := s.PointsWriter.WritePointsPrivileged(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil { atomic.AddInt64(&s.stats.BatchesTransmitted, 1) atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) } else { diff --git a/services/graphite/service_test.go b/services/graphite/service_test.go index ca3fe30eac..fa5d330c8f 100644 --- a/services/graphite/service_test.go +++ b/services/graphite/service_test.go @@ -304,6 +304,6 @@ func NewTestService(c *Config) *TestService { return service } -func (s *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { +func (s *TestService) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { return s.WritePointsFn(database, retentionPolicy, consistencyLevel, points) } diff --git a/services/httpd/handler.go b/services/httpd/handler.go index a859dd3ef2..6e14c01e3c 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -75,13 +75,13 @@ type Handler struct { MetaClient interface { Database(name string) *meta.DatabaseInfo Databases() []meta.DatabaseInfo - Authenticate(username, password string) (ui *meta.UserInfo, err error) - User(username string) (*meta.UserInfo, error) + Authenticate(username, password string) (ui meta.User, err error) + User(username string) (meta.User, error) AdminUserExists() bool } QueryAuthorizer interface { - AuthorizeQuery(u *meta.UserInfo, query *influxql.Query, database string) error + AuthorizeQuery(u meta.User, query *influxql.Query, database string) error } WriteAuthorizer interface { @@ -96,7 +96,7 @@ type Handler struct { } PointsWriter interface { - WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error + WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user string, points []models.Point) error } Config *Config @@ -217,7 +217,7 @@ func (h *Handler) AddRoutes(routes ...Route) { var handler http.Handler // If it's a handler func that requires authorization, wrap it in authentication - if hf, ok := r.HandlerFunc.(func(http.ResponseWriter, *http.Request, *meta.UserInfo)); ok { + if hf, ok := r.HandlerFunc.(func(http.ResponseWriter, *http.Request, meta.User)); ok { handler = authenticate(hf, h, h.Config.AuthEnabled) } @@ -277,7 +277,7 @@ func (h *Handler) writeHeader(w http.ResponseWriter, code int) { } // serveQuery parses an incoming query and, if valid, executes the query. -func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { +func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) { atomic.AddInt64(&h.stats.QueryRequests, 1) defer func(start time.Time) { atomic.AddInt64(&h.stats.QueryRequestDuration, time.Since(start).Nanoseconds()) @@ -578,7 +578,7 @@ func (h *Handler) async(query *influxql.Query, results <-chan *influxql.Result) } // serveWrite receives incoming series data in line protocol format and writes it to the database. -func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) { +func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User) { atomic.AddInt64(&h.stats.WriteRequests, 1) atomic.AddInt64(&h.stats.ActiveWriteRequests, 1) defer func(start time.Time) { @@ -598,16 +598,20 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta. return } - if h.Config.AuthEnabled && user == nil { - h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden) - return - } + userName := "" if h.Config.AuthEnabled { - if err := h.WriteAuthorizer.AuthorizeWrite(user.Name, database); err != nil { - h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.Name, database), http.StatusForbidden) + if user == nil { + h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden) return } + + if err := h.WriteAuthorizer.AuthorizeWrite(user.ID(), database); err != nil { + h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden) + return + } + + userName = user.ID() } // Handle gzip decoding of the body @@ -670,7 +674,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta. } // Write points. - if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, points); influxdb.IsClientError(err) { + if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, userName, points); influxdb.IsClientError(err) { atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) h.httpError(w, err.Error(), http.StatusBadRequest) return @@ -1015,14 +1019,14 @@ func parseCredentials(r *http.Request) (*credentials, error) { // // There is one exception: if there are no users in the system, authentication is not required. This // is to facilitate bootstrapping of a system with authentication enabled. -func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo), h *Handler, requireAuthentication bool) http.Handler { +func authenticate(inner func(http.ResponseWriter, *http.Request, meta.User), h *Handler, requireAuthentication bool) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Return early if we are not authenticating if !requireAuthentication { inner(w, r, nil) return } - var user *meta.UserInfo + var user meta.User // TODO corylanou: never allow this in the future without users if requireAuthentication && h.MetaClient.AdminUserExists() { diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 3151e13f1e..f170ac712b 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -92,7 +92,7 @@ func TestHandler_Query_Auth(t *testing.T) { // Set mock meta client functions for the handler to use. h.MetaClient.AdminUserExistsFn = func() bool { return true } - h.MetaClient.UserFn = func(username string) (*meta.UserInfo, error) { + h.MetaClient.UserFn = func(username string) (meta.User, error) { if username != "user1" { return nil, meta.ErrUserNotFound } @@ -103,7 +103,7 @@ func TestHandler_Query_Auth(t *testing.T) { }, nil } - h.MetaClient.AuthenticateFn = func(u, p string) (*meta.UserInfo, error) { + h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { if u != "user1" { return nil, fmt.Errorf("unexpected user: exp: user1, got: %s", u) } else if p != "abcd" { @@ -113,7 +113,7 @@ func TestHandler_Query_Auth(t *testing.T) { } // Set mock query authorizer for handler to use. - h.QueryAuthorizer.AuthorizeQueryFn = func(u *meta.UserInfo, query *influxql.Query, database string) error { + h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error { return nil } @@ -345,11 +345,11 @@ func TestHandler_Query_ErrInvalidQuery(t *testing.T) { // Ensure the handler returns an appropriate 401 or 403 status when authentication or authorization fails. func TestHandler_Query_ErrAuthorize(t *testing.T) { h := NewHandler(true) - h.QueryAuthorizer.AuthorizeQueryFn = func(u *meta.UserInfo, q *influxql.Query, db string) error { + h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, q *influxql.Query, db string) error { return errors.New("marker") } h.MetaClient.AdminUserExistsFn = func() bool { return true } - h.MetaClient.AuthenticateFn = func(u, p string) (*meta.UserInfo, error) { + h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { users := []meta.UserInfo{ { @@ -643,10 +643,10 @@ func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx // HandlerQueryAuthorizer is a mock implementation of Handler.QueryAuthorizer. type HandlerQueryAuthorizer struct { - AuthorizeQueryFn func(u *meta.UserInfo, query *influxql.Query, database string) error + AuthorizeQueryFn func(u meta.User, query *influxql.Query, database string) error } -func (a *HandlerQueryAuthorizer) AuthorizeQuery(u *meta.UserInfo, query *influxql.Query, database string) error { +func (a *HandlerQueryAuthorizer) AuthorizeQuery(u meta.User, query *influxql.Query, database string) error { return a.AuthorizeQueryFn(u, query, database) } diff --git a/services/httpd/requests.go b/services/httpd/requests.go index 46a9651a79..8fee1c06cc 100644 --- a/services/httpd/requests.go +++ b/services/httpd/requests.go @@ -109,7 +109,7 @@ func (rt *RequestTracker) TrackRequests() *RequestProfile { return profile } -func (rt *RequestTracker) Add(req *http.Request, user *meta.UserInfo) { +func (rt *RequestTracker) Add(req *http.Request, user meta.User) { rt.mu.RLock() if rt.profiles.Len() == 0 { rt.mu.RUnlock() @@ -125,7 +125,7 @@ func (rt *RequestTracker) Add(req *http.Request, user *meta.UserInfo) { info.IPAddr = host if user != nil { - info.Username = user.Name + info.Username = user.ID() } // Add the request info to the profiles. diff --git a/services/meta/client.go b/services/meta/client.go index e8e660fffc..6bba53dc7d 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -370,7 +370,7 @@ func (c *Client) Users() []UserInfo { } // User returns the user with the given name, or ErrUserNotFound. -func (c *Client) User(name string) (*UserInfo, error) { +func (c *Client) User(name string) (User, error) { c.mu.RLock() defer c.mu.RUnlock() @@ -406,14 +406,14 @@ func (c *Client) saltedHash(password string) (salt, hash []byte, err error) { } // CreateUser adds a user with the given name and password and admin status. -func (c *Client) CreateUser(name, password string, admin bool) (*UserInfo, error) { +func (c *Client) CreateUser(name, password string, admin bool) (User, error) { c.mu.Lock() defer c.mu.Unlock() data := c.cacheData.Clone() // See if the user already exists. - if u := data.User(name); u != nil { + if u := data.user(name); u != nil { if err := bcrypt.CompareHashAndPassword([]byte(u.Hash), []byte(password)); err != nil || u.Admin != admin { return nil, ErrUserExists } @@ -430,7 +430,7 @@ func (c *Client) CreateUser(name, password string, admin bool) (*UserInfo, error return nil, err } - u := data.User(name) + u := data.user(name) if err := c.commit(data); err != nil { return nil, err @@ -551,10 +551,10 @@ func (c *Client) AdminUserExists() bool { } // Authenticate returns a UserInfo if the username and password match an existing entry. -func (c *Client) Authenticate(username, password string) (*UserInfo, error) { +func (c *Client) Authenticate(username, password string) (User, error) { // Find user. c.mu.RLock() - userInfo := c.cacheData.User(username) + userInfo := c.cacheData.user(username) c.mu.RUnlock() if userInfo == nil { return nil, ErrUserNotFound diff --git a/services/meta/client_test.go b/services/meta/client_test.go index fab121511c..23acaf68aa 100644 --- a/services/meta/client_test.go +++ b/services/meta/client_test.go @@ -596,15 +596,15 @@ func TestMetaClient_CreateUser(t *testing.T) { if err != nil { t.Fatal(err) } - if exp, got := "fred", u.Name; exp != got { + if exp, got := "fred", u.ID(); exp != got { t.Fatalf("unexpected user name: exp: %s got: %s", exp, got) } - if !u.Admin { + if !u.IsAdmin() { t.Fatalf("expected user to be admin") } u, err = c.Authenticate("fred", "supersecure") - if u == nil || err != nil || u.Name != "fred" { + if u == nil || err != nil || u.ID() != "fred" { t.Fatalf("failed to authenticate") } @@ -633,7 +633,7 @@ func TestMetaClient_CreateUser(t *testing.T) { // Auth for new password should succeed. u, err = c.Authenticate("fred", "moresupersecure") - if u == nil || err != nil || u.Name != "fred" { + if u == nil || err != nil || u.ID() != "fred" { t.Fatalf("failed to authenticate") } @@ -647,10 +647,10 @@ func TestMetaClient_CreateUser(t *testing.T) { if err != nil { t.Fatal(err) } - if exp, got := "wilma", u.Name; exp != got { + if exp, got := "wilma", u.ID(); exp != got { t.Fatalf("unexpected user name: exp: %s got: %s", exp, got) } - if u.Admin { + if u.IsAdmin() { t.Fatalf("expected user not to be an admin") } @@ -667,10 +667,10 @@ func TestMetaClient_CreateUser(t *testing.T) { if err != nil { t.Fatal(err) } - if exp, got := "wilma", u.Name; exp != got { + if exp, got := "wilma", u.ID(); exp != got { t.Fatalf("unexpected user name: exp: %s got: %s", exp, got) } - if !u.Admin { + if !u.IsAdmin() { t.Fatalf("expected user to be an admin") } @@ -683,10 +683,10 @@ func TestMetaClient_CreateUser(t *testing.T) { if err != nil { t.Fatal(err) } - if exp, got := "wilma", u.Name; exp != got { + if exp, got := "wilma", u.ID(); exp != got { t.Fatalf("unexpected user name: exp: %s got: %s", exp, got) } - if u.Admin { + if u.IsAdmin() { t.Fatalf("expected user not to be an admin") } diff --git a/services/meta/data.go b/services/meta/data.go index dc71484d73..66f34dbf9f 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -519,8 +519,7 @@ func (data *Data) DropSubscription(database, rp, name string) error { return ErrSubscriptionNotFound } -// User returns a user by username. -func (data *Data) User(username string) *UserInfo { +func (data *Data) user(username string) *UserInfo { for i := range data.Users { if data.Users[i].Name == username { return &data.Users[i] @@ -529,6 +528,16 @@ func (data *Data) User(username string) *UserInfo { return nil } +// User returns a user by username. +func (data *Data) User(username string) User { + u := data.user(username) + if u == nil { + // prevent non-nil interface with nil pointer + return nil + } + return u +} + // CreateUser creates a new user. func (data *Data) CreateUser(name, hash string, admin bool) error { // Ensure the user doesn't already exist. @@ -597,7 +606,7 @@ func (data *Data) CloneUsers() []UserInfo { // SetPrivilege sets a privilege for a user on a database. func (data *Data) SetPrivilege(name, database string, p influxql.Privilege) error { - ui := data.User(name) + ui := data.user(name) if ui == nil { return ErrUserNotFound } @@ -612,7 +621,7 @@ func (data *Data) SetPrivilege(name, database string, p influxql.Privilege) erro // SetAdminPrivilege sets the admin privilege for a user. func (data *Data) SetAdminPrivilege(name string, admin bool) error { - ui := data.User(name) + ui := data.user(name) if ui == nil { return ErrUserNotFound } @@ -632,7 +641,7 @@ func (data Data) AdminUserExists() bool { // UserPrivileges gets the privileges for a user. func (data *Data) UserPrivileges(name string) (map[string]influxql.Privilege, error) { - ui := data.User(name) + ui := data.user(name) if ui == nil { return nil, ErrUserNotFound } @@ -642,7 +651,7 @@ func (data *Data) UserPrivileges(name string) (map[string]influxql.Privilege, er // UserPrivilege gets the privilege for a user on a database. func (data *Data) UserPrivilege(name, database string) (*influxql.Privilege, error) { - ui := data.User(name) + ui := data.user(name) if ui == nil { return nil, ErrUserNotFound } @@ -1419,6 +1428,8 @@ func (cqi *ContinuousQueryInfo) unmarshal(pb *internal.ContinuousQueryInfo) { cqi.Query = pb.GetQuery() } +var _ influxql.Authorizer = (*UserInfo)(nil) + // UserInfo represents metadata about a user in the system. type UserInfo struct { // User's name. @@ -1434,7 +1445,19 @@ type UserInfo struct { Privileges map[string]influxql.Privilege } -var _ influxql.Authorizer = (*UserInfo)(nil) +type User interface { + influxql.Authorizer + ID() string + IsAdmin() bool +} + +func (u *UserInfo) ID() string { + return u.Name +} + +func (u *UserInfo) IsAdmin() bool { + return u.Admin +} // AuthorizeDatabase returns true if the user is authorized for the given privilege on the given database. func (ui *UserInfo) AuthorizeDatabase(privilege influxql.Privilege, database string) bool { @@ -1445,6 +1468,16 @@ func (ui *UserInfo) AuthorizeDatabase(privilege influxql.Privilege, database str return ok && (p == privilege || p == influxql.AllPrivileges) } +// AuthorizeSeriesRead is used to limit access per-series (enterprise only) +func (u *UserInfo) AuthorizeSeriesRead(database string, series string) bool { + return true +} + +// AuthorizeSeriesWrite is used to limit access per-series (enterprise only) +func (u *UserInfo) AuthorizeSeriesWrite(database string, series string) bool { + return true +} + // clone returns a deep copy of si. func (ui UserInfo) clone() UserInfo { other := ui diff --git a/services/meta/query_authorizer.go b/services/meta/query_authorizer.go index f15e5bb520..8c51f65a0c 100644 --- a/services/meta/query_authorizer.go +++ b/services/meta/query_authorizer.go @@ -22,7 +22,7 @@ func NewQueryAuthorizer(c *Client) *QueryAuthorizer { // Database can be "" for queries that do not require a database. // If no user is provided it will return an error unless the query's first statement is to create // a root user. -func (a *QueryAuthorizer) AuthorizeQuery(u *UserInfo, query *influxql.Query, database string) error { +func (a *QueryAuthorizer) AuthorizeQuery(u User, query *influxql.Query, database string) error { // Special case if no users exist. if n := a.Client.UserCount(); n == 0 { // Ensure there is at least one statement. @@ -48,6 +48,11 @@ func (a *QueryAuthorizer) AuthorizeQuery(u *UserInfo, query *influxql.Query, dat } } + return u.AuthorizeQuery(database, query) +} + +func (u *UserInfo) AuthorizeQuery(database string, query *influxql.Query) error { + // Admin privilege allows the user to execute all statements. if u.Admin { return nil diff --git a/services/opentsdb/handler.go b/services/opentsdb/handler.go index 59ac021de0..b2d7e24c80 100644 --- a/services/opentsdb/handler.go +++ b/services/opentsdb/handler.go @@ -24,7 +24,7 @@ type Handler struct { RetentionPolicy string PointsWriter interface { - WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error + WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error } Logger zap.Logger @@ -126,7 +126,7 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) { } // Write points. - if err := h.PointsWriter.WritePoints(h.Database, h.RetentionPolicy, models.ConsistencyLevelAny, points); influxdb.IsClientError(err) { + if err := h.PointsWriter.WritePointsPrivileged(h.Database, h.RetentionPolicy, models.ConsistencyLevelAny, points); influxdb.IsClientError(err) { h.Logger.Info(fmt.Sprint("write series error: ", err)) http.Error(w, "write series error: "+err.Error(), http.StatusBadRequest) return diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index e5bc6a5a98..1a942dd958 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -60,7 +60,7 @@ type Service struct { RetentionPolicy string PointsWriter interface { - WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error + WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error } MetaClient interface { CreateDatabase(name string) (*meta.DatabaseInfo, error) @@ -459,7 +459,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { continue } - if err := s.PointsWriter.WritePoints(s.Database, s.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil { + if err := s.PointsWriter.WritePointsPrivileged(s.Database, s.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil { atomic.AddInt64(&s.stats.BatchesTransmitted, 1) atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) } else { diff --git a/services/opentsdb/service_test.go b/services/opentsdb/service_test.go index a8234553dd..ed1abe32b7 100644 --- a/services/opentsdb/service_test.go +++ b/services/opentsdb/service_test.go @@ -290,6 +290,6 @@ func NewTestService(database string, bind string) *TestService { return service } -func (s *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { +func (s *TestService) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { return s.WritePointsFn(database, retentionPolicy, consistencyLevel, points) } diff --git a/services/udp/service.go b/services/udp/service.go index 3eecf2aaab..f5b244cb02 100644 --- a/services/udp/service.go +++ b/services/udp/service.go @@ -49,7 +49,7 @@ type Service struct { config Config PointsWriter interface { - WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error + WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error } MetaClient interface { @@ -162,7 +162,7 @@ func (s *Service) writer() { continue } - if err := s.PointsWriter.WritePoints(s.config.Database, s.config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil { + if err := s.PointsWriter.WritePointsPrivileged(s.config.Database, s.config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil { atomic.AddInt64(&s.stats.BatchesTransmitted, 1) atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) } else { diff --git a/services/udp/service_test.go b/services/udp/service_test.go index 8d66475b36..bb0e667899 100644 --- a/services/udp/service_test.go +++ b/services/udp/service_test.go @@ -155,6 +155,6 @@ func NewTestService(c *Config) *TestService { return service } -func (s *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { +func (s *TestService) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { return s.WritePointsFn(database, retentionPolicy, consistencyLevel, points) } diff --git a/tests/server_helpers.go b/tests/server_helpers.go index 3aa634cb43..752be5b7ed 100644 --- a/tests/server_helpers.go +++ b/tests/server_helpers.go @@ -41,7 +41,7 @@ type Server interface { Write(db, rp, body string, params url.Values) (results string, err error) MustWrite(db, rp, body string, params url.Values) string - WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error + WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user string, points []models.Point) error } // RemoteServer is a Server that is accessed remotely via the HTTP API @@ -154,7 +154,7 @@ func (s *RemoteServer) Reset() error { } -func (s *RemoteServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { +func (s *RemoteServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user string, points []models.Point) error { panic("WritePoints not implemented") } @@ -328,10 +328,10 @@ func (s *LocalServer) Reset() error { return nil } -func (s *LocalServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { +func (s *LocalServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user string, points []models.Point) error { s.mu.RLock() defer s.mu.RUnlock() - return s.PointsWriter.WritePoints(database, retentionPolicy, consistencyLevel, points) + return s.PointsWriter.WritePoints(database, retentionPolicy, consistencyLevel, user, points) } // client abstract querying and writing to a Server using HTTP diff --git a/tests/server_test.go b/tests/server_test.go index 53c43e4882..65800e5d4f 100644 --- a/tests/server_test.go +++ b/tests/server_test.go @@ -7930,7 +7930,7 @@ func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) { Database: "db0", RetentionPolicy: "rp0", } - s.WritePoints(wpr.Database, wpr.RetentionPolicy, models.ConsistencyLevelAny, wpr.Points) + s.WritePoints(wpr.Database, wpr.RetentionPolicy, models.ConsistencyLevelAny, "", wpr.Points) } } }() diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 3c152ec089..c1f2ebeb97 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1747,7 +1747,7 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, name string, s // createVarRefSeriesIterator creates an iterator for a variable reference for a series. func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, name string, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) { - _, tfs, _ := models.ParseKey([]byte(seriesKey)) + _, tfs := models.ParseKey([]byte(seriesKey)) tags := influxql.NewTags(tfs.Map()) // Create options specific for this series. diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index d634a715d2..72e4ca6086 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -529,10 +529,7 @@ func (i *Index) DropSeries(key []byte) error { i.mu.RLock() defer i.mu.RUnlock() - name, tags, err := models.ParseKey(key) - if err != nil { - return err - } + name, tags := models.ParseKey(key) mname := []byte(name) if err := i.activeLogFile.DeleteSeries(mname, tags); err != nil {