Merge pull request #8437 from influxdata/jl-points-auth

Fine Grained Authorization
pull/8445/head
Ryan Betts 2017-05-31 10:23:49 -04:00 committed by GitHub
commit 19ef39d947
43 changed files with 276 additions and 169 deletions

View File

@ -102,7 +102,7 @@ func (cmd *Command) Run(args ...string) error {
if cmd.detailed {
sep := strings.Index(string(key), "#!~#")
seriesKey, field := key[:sep], key[sep+4:]
measurement, tags, _ := models.ParseKey(seriesKey)
measurement, tags := models.ParseKey(seriesKey)
measCount, ok := measCardinalities[measurement]
if !ok {

View File

@ -602,7 +602,7 @@ func stopProfile() {
type monitorPointsWriter coordinator.PointsWriter
func (pw *monitorPointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error {
return (*coordinator.PointsWriter)(pw).WritePoints(database, retentionPolicy, models.ConsistencyLevelAny, points)
return (*coordinator.PointsWriter)(pw).WritePointsPrivileged(database, retentionPolicy, models.ConsistencyLevelAny, points)
}
func raftDBExists(dir string) error {

View File

@ -14,7 +14,7 @@ type MetaClient interface {
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
CreateSubscription(database, rp, name, mode string, destinations []string) error
CreateUser(name, password string, admin bool) (*meta.UserInfo, error)
CreateUser(name, password string, admin bool) (meta.User, error)
Database(name string) *meta.DatabaseInfo
Databases() []meta.DatabaseInfo
DropShard(id uint64) error

View File

@ -14,7 +14,7 @@ type MetaClient struct {
CreateDatabaseWithRetentionPolicyFn func(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error
CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error)
CreateUserFn func(name, password string, admin bool) (meta.User, error)
DatabaseFn func(name string) *meta.DatabaseInfo
DatabasesFn func() []meta.DatabaseInfo
DataNodeFn func(id uint64) (*meta.NodeInfo, error)
@ -63,7 +63,7 @@ func (c *MetaClient) CreateSubscription(database, rp, name, mode string, destina
return c.CreateSubscriptionFn(database, rp, name, mode, destinations)
}
func (c *MetaClient) CreateUser(name, password string, admin bool) (*meta.UserInfo, error) {
func (c *MetaClient) CreateUser(name, password string, admin bool) (meta.User, error) {
return c.CreateUserFn(name, password, admin)
}

View File

@ -281,11 +281,16 @@ func (l sgList) Append(sgi meta.ShardGroupInfo) sgList {
// WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of
// a cluster structure for information. This is to avoid a circular dependency.
func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error {
return w.WritePoints(p.Database, p.RetentionPolicy, models.ConsistencyLevelOne, p.Points)
return w.WritePointsPrivileged(p.Database, p.RetentionPolicy, models.ConsistencyLevelOne, p.Points)
}
// WritePoints writes across multiple local and remote data nodes according the consistency level.
func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
// WritePoints writes the data to the underlying storage. consitencyLevel and user are only used for clustered scenarios
func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
return w.WritePointsPrivileged(database, retentionPolicy, consistencyLevel, points)
}
// WritePointsPrivileged writes the data to the underlying storage, consitencyLevel is only used for clustered scenarios
func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
atomic.AddInt64(&w.stats.WriteReq, 1)
atomic.AddInt64(&w.stats.PointWriteReq, int64(len(points)))

View File

@ -346,25 +346,25 @@ func TestPointsWriter_WritePoints(t *testing.T) {
c.Open()
defer c.Close()
err := c.WritePoints(pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points)
err := c.WritePointsPrivileged(pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points)
if err == nil && test.expErr != nil {
t.Errorf("PointsWriter.WritePoints(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
}
if err != nil && test.expErr == nil {
t.Errorf("PointsWriter.WritePoints(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
}
if err != nil && test.expErr != nil && err.Error() != test.expErr.Error() {
t.Errorf("PointsWriter.WritePoints(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
}
if test.expErr == nil {
select {
case p := <-subPoints:
if !reflect.DeepEqual(p, pr) {
t.Errorf("PointsWriter.WritePoints(): '%s' error: unexpected WritePointsRequest got %v, exp %v", test.name, p, pr)
t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: unexpected WritePointsRequest got %v, exp %v", test.name, p, pr)
}
default:
t.Errorf("PointsWriter.WritePoints(): '%s' error: Subscriber.Points not called", test.name)
t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: Subscriber.Points not called", test.name)
}
}
}
@ -422,7 +422,7 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
c.Open()
defer c.Close()
err := c.WritePoints(pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points)
err := c.WritePointsPrivileged(pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points)
if _, ok := err.(tsdb.PartialWriteError); !ok {
t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{})
}

View File

@ -530,6 +530,7 @@ func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx
InterruptCh: ctx.InterruptCh,
NodeID: ctx.ExecutionOptions.NodeID,
MaxSeriesN: e.MaxSelectSeriesN,
Authorizer: ctx.Authorizer,
}
// Replace instances of "now()" with the current time, and check the resultant times.

View File

@ -201,6 +201,18 @@ func (a *mockAuthorizer) AuthorizeDatabase(p influxql.Privilege, name string) bo
return a.AuthorizeDatabaseFn(p, name)
}
func (m *mockAuthorizer) AuthorizeQuery(database string, query *influxql.Query) error {
panic("fail")
}
func (m *mockAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
panic("fail")
}
func (m *mockAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
panic("fail")
}
func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) {
qe := influxql.NewQueryExecutor()
qe.StatementExecutor = &coordinator.StatementExecutor{

View File

@ -20,6 +20,14 @@ func ErrRetentionPolicyNotFound(name string) error {
return fmt.Errorf("retention policy not found: %s", name)
}
// IsAuthorizationError indicates whether an error is due to an authorization failure
func IsAuthorizationError(err error) bool {
e, ok := err.(interface {
AuthorizationFailed() bool
})
return ok && e.AuthorizationFailed()
}
// IsClientError indicates whether an error is a known client error.
func IsClientError(err error) bool {
if err == nil {

View File

@ -694,10 +694,14 @@ type IteratorOptions struct {
// If this channel is set and is closed, the iterator should try to exit
// and close as soon as possible.
InterruptCh <-chan struct{}
// Authorizer can limit acccess to data
Authorizer Authorizer
}
// newIteratorOptionsStmt creates the iterator options from stmt.
func newIteratorOptionsStmt(stmt *SelectStatement, sopt *SelectOptions) (opt IteratorOptions, err error) {
// Determine time range from the condition.
startTime, endTime, err := TimeRange(stmt.Condition)
if err != nil {
@ -769,6 +773,7 @@ func newIteratorOptionsStmt(stmt *SelectStatement, sopt *SelectOptions) (opt Ite
if sopt != nil {
opt.MaxSeriesN = sopt.MaxSeriesN
opt.InterruptCh = sopt.InterruptCh
opt.Authorizer = sopt.Authorizer
}
return opt, nil

View File

@ -60,6 +60,15 @@ func ErrMaxConcurrentQueriesLimitExceeded(n, limit int) error {
type Authorizer interface {
// AuthorizeDatabase indicates whether the given Privilege is authorized on the database with the given name.
AuthorizeDatabase(p Privilege, name string) bool
// AuthorizeQuery returns an error if the query cannot be executed
AuthorizeQuery(database string, query *Query) error
// AuthorizeSeriesRead determines if a series is authorized for reading
AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool
// AuthorizeSeriesWrite determines if a series is authorized for writing
AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool
}
// OpenAuthorizer is the Authorizer used when authorization is disabled.
@ -69,7 +78,17 @@ type OpenAuthorizer struct{}
var _ Authorizer = OpenAuthorizer{}
// AuthorizeDatabase returns true to allow any operation on a database.
func (OpenAuthorizer) AuthorizeDatabase(Privilege, string) bool { return true }
func (_ OpenAuthorizer) AuthorizeDatabase(Privilege, string) bool { return true }
func (_ OpenAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
return true
}
func (_ OpenAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
return true
}
func (_ OpenAuthorizer) AuthorizeQuery(_ string, _ *Query) error { return nil }
// ExecutionOptions contains the options for executing a query.
type ExecutionOptions struct {

View File

@ -10,6 +10,9 @@ import (
// SelectOptions are options that customize the select call.
type SelectOptions struct {
// Authorizer is used to limit access to data
Authorizer Authorizer
// The lower bound for a select call.
MinTime time.Time

View File

@ -16,7 +16,7 @@ type MetaClientMock struct {
CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
CreateShardGroupFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error
CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error)
CreateUserFn func(name, password string, admin bool) (meta.User, error)
DatabaseFn func(name string) *meta.DatabaseInfo
DatabasesFn func() []meta.DatabaseInfo
@ -34,7 +34,7 @@ type MetaClientMock struct {
RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
AuthenticateFn func(username, password string) (ui *meta.UserInfo, err error)
AuthenticateFn func(username, password string) (ui meta.User, err error)
AdminUserExistsFn func() bool
SetAdminPrivilegeFn func(username string, admin bool) error
SetDataFn func(*meta.Data) error
@ -45,7 +45,7 @@ type MetaClientMock struct {
UpdateUserFn func(name, password string) error
UserPrivilegeFn func(username, database string) (*influxql.Privilege, error)
UserPrivilegesFn func(username string) (map[string]influxql.Privilege, error)
UserFn func(username string) (*meta.UserInfo, error)
UserFn func(username string) (meta.User, error)
UsersFn func() []meta.UserInfo
}
@ -77,7 +77,7 @@ func (c *MetaClientMock) CreateSubscription(database, rp, name, mode string, des
return c.CreateSubscriptionFn(database, rp, name, mode, destinations)
}
func (c *MetaClientMock) CreateUser(name, password string, admin bool) (*meta.UserInfo, error) {
func (c *MetaClientMock) CreateUser(name, password string, admin bool) (meta.User, error) {
return c.CreateUserFn(name, password, admin)
}
@ -153,13 +153,13 @@ func (c *MetaClientMock) UserPrivileges(username string) (map[string]influxql.Pr
return c.UserPrivilegesFn(username)
}
func (c *MetaClientMock) Authenticate(username, password string) (*meta.UserInfo, error) {
func (c *MetaClientMock) Authenticate(username, password string) (meta.User, error) {
return c.AuthenticateFn(username, password)
}
func (c *MetaClientMock) AdminUserExists() bool { return c.AdminUserExistsFn() }
func (c *MetaClientMock) User(username string) (*meta.UserInfo, error) { return c.UserFn(username) }
func (c *MetaClientMock) Users() []meta.UserInfo { return c.UsersFn() }
func (c *MetaClientMock) User(username string) (meta.User, error) { return c.UserFn(username) }
func (c *MetaClientMock) Users() []meta.UserInfo { return c.UsersFn() }
func (c *MetaClientMock) Open() error { return c.OpenFn() }
func (c *MetaClientMock) Data() meta.Data { return c.DataFn() }

View File

@ -237,7 +237,7 @@ func ParsePointsString(buf string) ([]Point, error) {
//
// NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf.
// This can have the unintended effect preventing buf from being garbage collected.
func ParseKey(buf []byte) (string, Tags, error) {
func ParseKey(buf []byte) (string, Tags) {
// Ignore the error because scanMeasurement returns "missing fields" which we ignore
// when just parsing a key
state, i, _ := scanMeasurement(buf, 0)
@ -246,9 +246,9 @@ func ParseKey(buf []byte) (string, Tags, error) {
if state == tagKeyState {
tags = parseTags(buf)
// scanMeasurement returns the location of the comma if there are tags, strip that off
return string(buf[:i-1]), tags, nil
return string(buf[:i-1]), tags
}
return string(buf[:i]), tags, nil
return string(buf[:i]), tags
}
func ParseTags(buf []byte) (Tags, error) {

View File

@ -2088,18 +2088,6 @@ func TestNewPointsRejectsMaxKey(t *testing.T) {
}
func TestParseKeyEmpty(t *testing.T) {
if _, _, err := models.ParseKey(nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestParseKeyMissingValue(t *testing.T) {
if _, _, err := models.ParseKey([]byte("cpu,foo ")); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestPoint_FieldIterator_Simple(t *testing.T) {
p, err := models.ParsePoints([]byte(`m v=42i,f=42 36`))

View File

@ -35,7 +35,7 @@ const (
// pointsWriter is an internal interface to make testing easier.
type pointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}
// metaClient is an internal interface to make testing easier.
@ -374,7 +374,7 @@ func (s *Service) writePoints() {
continue
}
if err := s.PointsWriter.WritePoints(s.Config.Database, s.Config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
if err := s.PointsWriter.WritePointsPrivileged(s.Config.Database, s.Config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {

View File

@ -365,7 +365,7 @@ func NewTestService(batchSize int, batchDuration time.Duration) *TestService {
return s
}
func (w *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
func (w *TestService) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
return w.WritePointsFn(database, retentionPolicy, consistencyLevel, points)
}

View File

@ -79,7 +79,7 @@ type Service struct {
DeregisterDiagnosticsClient(name string)
}
PointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}
MetaClient interface {
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
@ -444,7 +444,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
continue
}
if err := s.PointsWriter.WritePoints(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
if err := s.PointsWriter.WritePointsPrivileged(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {

View File

@ -304,6 +304,6 @@ func NewTestService(c *Config) *TestService {
return service
}
func (s *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
func (s *TestService) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
return s.WritePointsFn(database, retentionPolicy, consistencyLevel, points)
}

View File

@ -75,13 +75,13 @@ type Handler struct {
MetaClient interface {
Database(name string) *meta.DatabaseInfo
Databases() []meta.DatabaseInfo
Authenticate(username, password string) (ui *meta.UserInfo, err error)
User(username string) (*meta.UserInfo, error)
Authenticate(username, password string) (ui meta.User, err error)
User(username string) (meta.User, error)
AdminUserExists() bool
}
QueryAuthorizer interface {
AuthorizeQuery(u *meta.UserInfo, query *influxql.Query, database string) error
AuthorizeQuery(u meta.User, query *influxql.Query, database string) error
}
WriteAuthorizer interface {
@ -96,7 +96,7 @@ type Handler struct {
}
PointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
}
Config *Config
@ -217,7 +217,7 @@ func (h *Handler) AddRoutes(routes ...Route) {
var handler http.Handler
// If it's a handler func that requires authorization, wrap it in authentication
if hf, ok := r.HandlerFunc.(func(http.ResponseWriter, *http.Request, *meta.UserInfo)); ok {
if hf, ok := r.HandlerFunc.(func(http.ResponseWriter, *http.Request, meta.User)); ok {
handler = authenticate(hf, h, h.Config.AuthEnabled)
}
@ -277,7 +277,7 @@ func (h *Handler) writeHeader(w http.ResponseWriter, code int) {
}
// serveQuery parses an incoming query and, if valid, executes the query.
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) {
atomic.AddInt64(&h.stats.QueryRequests, 1)
defer func(start time.Time) {
atomic.AddInt64(&h.stats.QueryRequestDuration, time.Since(start).Nanoseconds())
@ -578,7 +578,7 @@ func (h *Handler) async(query *influxql.Query, results <-chan *influxql.Result)
}
// serveWrite receives incoming series data in line protocol format and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
atomic.AddInt64(&h.stats.WriteRequests, 1)
atomic.AddInt64(&h.stats.ActiveWriteRequests, 1)
defer func(start time.Time) {
@ -598,14 +598,14 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
return
}
if h.Config.AuthEnabled && user == nil {
h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden)
return
}
if h.Config.AuthEnabled {
if err := h.WriteAuthorizer.AuthorizeWrite(user.Name, database); err != nil {
h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.Name, database), http.StatusForbidden)
if user == nil {
h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden)
return
}
if err := h.WriteAuthorizer.AuthorizeWrite(user.ID(), database); err != nil {
h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden)
return
}
}
@ -670,10 +670,14 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
}
// Write points.
if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, points); influxdb.IsClientError(err) {
if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) {
atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
h.httpError(w, err.Error(), http.StatusBadRequest)
return
} else if influxdb.IsAuthorizationError(err) {
atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
h.httpError(w, err.Error(), http.StatusForbidden)
return
} else if werr, ok := err.(tsdb.PartialWriteError); ok {
atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)-werr.Dropped))
atomic.AddInt64(&h.stats.PointsWrittenDropped, int64(werr.Dropped))
@ -1015,14 +1019,14 @@ func parseCredentials(r *http.Request) (*credentials, error) {
//
// There is one exception: if there are no users in the system, authentication is not required. This
// is to facilitate bootstrapping of a system with authentication enabled.
func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo), h *Handler, requireAuthentication bool) http.Handler {
func authenticate(inner func(http.ResponseWriter, *http.Request, meta.User), h *Handler, requireAuthentication bool) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Return early if we are not authenticating
if !requireAuthentication {
inner(w, r, nil)
return
}
var user *meta.UserInfo
var user meta.User
// TODO corylanou: never allow this in the future without users
if requireAuthentication && h.MetaClient.AdminUserExists() {

View File

@ -92,7 +92,7 @@ func TestHandler_Query_Auth(t *testing.T) {
// Set mock meta client functions for the handler to use.
h.MetaClient.AdminUserExistsFn = func() bool { return true }
h.MetaClient.UserFn = func(username string) (*meta.UserInfo, error) {
h.MetaClient.UserFn = func(username string) (meta.User, error) {
if username != "user1" {
return nil, meta.ErrUserNotFound
}
@ -103,7 +103,7 @@ func TestHandler_Query_Auth(t *testing.T) {
}, nil
}
h.MetaClient.AuthenticateFn = func(u, p string) (*meta.UserInfo, error) {
h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) {
if u != "user1" {
return nil, fmt.Errorf("unexpected user: exp: user1, got: %s", u)
} else if p != "abcd" {
@ -113,7 +113,7 @@ func TestHandler_Query_Auth(t *testing.T) {
}
// Set mock query authorizer for handler to use.
h.QueryAuthorizer.AuthorizeQueryFn = func(u *meta.UserInfo, query *influxql.Query, database string) error {
h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error {
return nil
}
@ -368,11 +368,11 @@ func TestHandler_Query_ErrInvalidQuery(t *testing.T) {
// Ensure the handler returns an appropriate 401 or 403 status when authentication or authorization fails.
func TestHandler_Query_ErrAuthorize(t *testing.T) {
h := NewHandler(true)
h.QueryAuthorizer.AuthorizeQueryFn = func(u *meta.UserInfo, q *influxql.Query, db string) error {
h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, q *influxql.Query, db string) error {
return errors.New("marker")
}
h.MetaClient.AdminUserExistsFn = func() bool { return true }
h.MetaClient.AuthenticateFn = func(u, p string) (*meta.UserInfo, error) {
h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) {
users := []meta.UserInfo{
{
@ -666,10 +666,10 @@ func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx
// HandlerQueryAuthorizer is a mock implementation of Handler.QueryAuthorizer.
type HandlerQueryAuthorizer struct {
AuthorizeQueryFn func(u *meta.UserInfo, query *influxql.Query, database string) error
AuthorizeQueryFn func(u meta.User, query *influxql.Query, database string) error
}
func (a *HandlerQueryAuthorizer) AuthorizeQuery(u *meta.UserInfo, query *influxql.Query, database string) error {
func (a *HandlerQueryAuthorizer) AuthorizeQuery(u meta.User, query *influxql.Query, database string) error {
return a.AuthorizeQueryFn(u, query, database)
}

View File

@ -109,7 +109,7 @@ func (rt *RequestTracker) TrackRequests() *RequestProfile {
return profile
}
func (rt *RequestTracker) Add(req *http.Request, user *meta.UserInfo) {
func (rt *RequestTracker) Add(req *http.Request, user meta.User) {
rt.mu.RLock()
if rt.profiles.Len() == 0 {
rt.mu.RUnlock()
@ -125,7 +125,7 @@ func (rt *RequestTracker) Add(req *http.Request, user *meta.UserInfo) {
info.IPAddr = host
if user != nil {
info.Username = user.Name
info.Username = user.ID()
}
// Add the request info to the profiles.

View File

@ -370,7 +370,7 @@ func (c *Client) Users() []UserInfo {
}
// User returns the user with the given name, or ErrUserNotFound.
func (c *Client) User(name string) (*UserInfo, error) {
func (c *Client) User(name string) (User, error) {
c.mu.RLock()
defer c.mu.RUnlock()
@ -406,14 +406,14 @@ func (c *Client) saltedHash(password string) (salt, hash []byte, err error) {
}
// CreateUser adds a user with the given name and password and admin status.
func (c *Client) CreateUser(name, password string, admin bool) (*UserInfo, error) {
func (c *Client) CreateUser(name, password string, admin bool) (User, error) {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
// See if the user already exists.
if u := data.User(name); u != nil {
if u := data.user(name); u != nil {
if err := bcrypt.CompareHashAndPassword([]byte(u.Hash), []byte(password)); err != nil || u.Admin != admin {
return nil, ErrUserExists
}
@ -430,7 +430,7 @@ func (c *Client) CreateUser(name, password string, admin bool) (*UserInfo, error
return nil, err
}
u := data.User(name)
u := data.user(name)
if err := c.commit(data); err != nil {
return nil, err
@ -551,10 +551,10 @@ func (c *Client) AdminUserExists() bool {
}
// Authenticate returns a UserInfo if the username and password match an existing entry.
func (c *Client) Authenticate(username, password string) (*UserInfo, error) {
func (c *Client) Authenticate(username, password string) (User, error) {
// Find user.
c.mu.RLock()
userInfo := c.cacheData.User(username)
userInfo := c.cacheData.user(username)
c.mu.RUnlock()
if userInfo == nil {
return nil, ErrUserNotFound

View File

@ -596,15 +596,15 @@ func TestMetaClient_CreateUser(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if exp, got := "fred", u.Name; exp != got {
if exp, got := "fred", u.ID(); exp != got {
t.Fatalf("unexpected user name: exp: %s got: %s", exp, got)
}
if !u.Admin {
if !u.IsAdmin() {
t.Fatalf("expected user to be admin")
}
u, err = c.Authenticate("fred", "supersecure")
if u == nil || err != nil || u.Name != "fred" {
if u == nil || err != nil || u.ID() != "fred" {
t.Fatalf("failed to authenticate")
}
@ -633,7 +633,7 @@ func TestMetaClient_CreateUser(t *testing.T) {
// Auth for new password should succeed.
u, err = c.Authenticate("fred", "moresupersecure")
if u == nil || err != nil || u.Name != "fred" {
if u == nil || err != nil || u.ID() != "fred" {
t.Fatalf("failed to authenticate")
}
@ -647,10 +647,10 @@ func TestMetaClient_CreateUser(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if exp, got := "wilma", u.Name; exp != got {
if exp, got := "wilma", u.ID(); exp != got {
t.Fatalf("unexpected user name: exp: %s got: %s", exp, got)
}
if u.Admin {
if u.IsAdmin() {
t.Fatalf("expected user not to be an admin")
}
@ -667,10 +667,10 @@ func TestMetaClient_CreateUser(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if exp, got := "wilma", u.Name; exp != got {
if exp, got := "wilma", u.ID(); exp != got {
t.Fatalf("unexpected user name: exp: %s got: %s", exp, got)
}
if !u.Admin {
if !u.IsAdmin() {
t.Fatalf("expected user to be an admin")
}
@ -683,10 +683,10 @@ func TestMetaClient_CreateUser(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if exp, got := "wilma", u.Name; exp != got {
if exp, got := "wilma", u.ID(); exp != got {
t.Fatalf("unexpected user name: exp: %s got: %s", exp, got)
}
if u.Admin {
if u.IsAdmin() {
t.Fatalf("expected user not to be an admin")
}

View File

@ -519,8 +519,7 @@ func (data *Data) DropSubscription(database, rp, name string) error {
return ErrSubscriptionNotFound
}
// User returns a user by username.
func (data *Data) User(username string) *UserInfo {
func (data *Data) user(username string) *UserInfo {
for i := range data.Users {
if data.Users[i].Name == username {
return &data.Users[i]
@ -529,6 +528,16 @@ func (data *Data) User(username string) *UserInfo {
return nil
}
// User returns a user by username.
func (data *Data) User(username string) User {
u := data.user(username)
if u == nil {
// prevent non-nil interface with nil pointer
return nil
}
return u
}
// CreateUser creates a new user.
func (data *Data) CreateUser(name, hash string, admin bool) error {
// Ensure the user doesn't already exist.
@ -597,7 +606,7 @@ func (data *Data) CloneUsers() []UserInfo {
// SetPrivilege sets a privilege for a user on a database.
func (data *Data) SetPrivilege(name, database string, p influxql.Privilege) error {
ui := data.User(name)
ui := data.user(name)
if ui == nil {
return ErrUserNotFound
}
@ -612,7 +621,7 @@ func (data *Data) SetPrivilege(name, database string, p influxql.Privilege) erro
// SetAdminPrivilege sets the admin privilege for a user.
func (data *Data) SetAdminPrivilege(name string, admin bool) error {
ui := data.User(name)
ui := data.user(name)
if ui == nil {
return ErrUserNotFound
}
@ -632,7 +641,7 @@ func (data Data) AdminUserExists() bool {
// UserPrivileges gets the privileges for a user.
func (data *Data) UserPrivileges(name string) (map[string]influxql.Privilege, error) {
ui := data.User(name)
ui := data.user(name)
if ui == nil {
return nil, ErrUserNotFound
}
@ -642,7 +651,7 @@ func (data *Data) UserPrivileges(name string) (map[string]influxql.Privilege, er
// UserPrivilege gets the privilege for a user on a database.
func (data *Data) UserPrivilege(name, database string) (*influxql.Privilege, error) {
ui := data.User(name)
ui := data.user(name)
if ui == nil {
return nil, ErrUserNotFound
}
@ -1419,6 +1428,8 @@ func (cqi *ContinuousQueryInfo) unmarshal(pb *internal.ContinuousQueryInfo) {
cqi.Query = pb.GetQuery()
}
var _ influxql.Authorizer = (*UserInfo)(nil)
// UserInfo represents metadata about a user in the system.
type UserInfo struct {
// User's name.
@ -1434,7 +1445,19 @@ type UserInfo struct {
Privileges map[string]influxql.Privilege
}
var _ influxql.Authorizer = (*UserInfo)(nil)
type User interface {
influxql.Authorizer
ID() string
IsAdmin() bool
}
func (u *UserInfo) ID() string {
return u.Name
}
func (u *UserInfo) IsAdmin() bool {
return u.Admin
}
// AuthorizeDatabase returns true if the user is authorized for the given privilege on the given database.
func (ui *UserInfo) AuthorizeDatabase(privilege influxql.Privilege, database string) bool {
@ -1445,6 +1468,16 @@ func (ui *UserInfo) AuthorizeDatabase(privilege influxql.Privilege, database str
return ok && (p == privilege || p == influxql.AllPrivileges)
}
// AuthorizeSeriesRead is used to limit access per-series (enterprise only)
func (u *UserInfo) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
return true
}
// AuthorizeSeriesWrite is used to limit access per-series (enterprise only)
func (u *UserInfo) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
return true
}
// clone returns a deep copy of si.
func (ui UserInfo) clone() UserInfo {
other := ui

View File

@ -22,7 +22,7 @@ func NewQueryAuthorizer(c *Client) *QueryAuthorizer {
// Database can be "" for queries that do not require a database.
// If no user is provided it will return an error unless the query's first statement is to create
// a root user.
func (a *QueryAuthorizer) AuthorizeQuery(u *UserInfo, query *influxql.Query, database string) error {
func (a *QueryAuthorizer) AuthorizeQuery(u User, query *influxql.Query, database string) error {
// Special case if no users exist.
if n := a.Client.UserCount(); n == 0 {
// Ensure there is at least one statement.
@ -48,6 +48,11 @@ func (a *QueryAuthorizer) AuthorizeQuery(u *UserInfo, query *influxql.Query, dat
}
}
return u.AuthorizeQuery(database, query)
}
func (u *UserInfo) AuthorizeQuery(database string, query *influxql.Query) error {
// Admin privilege allows the user to execute all statements.
if u.Admin {
return nil

View File

@ -24,7 +24,7 @@ type Handler struct {
RetentionPolicy string
PointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}
Logger zap.Logger
@ -126,7 +126,7 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
}
// Write points.
if err := h.PointsWriter.WritePoints(h.Database, h.RetentionPolicy, models.ConsistencyLevelAny, points); influxdb.IsClientError(err) {
if err := h.PointsWriter.WritePointsPrivileged(h.Database, h.RetentionPolicy, models.ConsistencyLevelAny, points); influxdb.IsClientError(err) {
h.Logger.Info(fmt.Sprint("write series error: ", err))
http.Error(w, "write series error: "+err.Error(), http.StatusBadRequest)
return

View File

@ -60,7 +60,7 @@ type Service struct {
RetentionPolicy string
PointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}
MetaClient interface {
CreateDatabase(name string) (*meta.DatabaseInfo, error)
@ -459,7 +459,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
continue
}
if err := s.PointsWriter.WritePoints(s.Database, s.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
if err := s.PointsWriter.WritePointsPrivileged(s.Database, s.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {

View File

@ -290,6 +290,6 @@ func NewTestService(database string, bind string) *TestService {
return service
}
func (s *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
func (s *TestService) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
return s.WritePointsFn(database, retentionPolicy, consistencyLevel, points)
}

View File

@ -49,7 +49,7 @@ type Service struct {
config Config
PointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}
MetaClient interface {
@ -162,7 +162,7 @@ func (s *Service) writer() {
continue
}
if err := s.PointsWriter.WritePoints(s.config.Database, s.config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
if err := s.PointsWriter.WritePointsPrivileged(s.config.Database, s.config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {

View File

@ -155,6 +155,6 @@ func NewTestService(c *Config) *TestService {
return service
}
func (s *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
func (s *TestService) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
return s.WritePointsFn(database, retentionPolicy, consistencyLevel, points)
}

View File

@ -41,7 +41,7 @@ type Server interface {
Write(db, rp, body string, params url.Values) (results string, err error)
MustWrite(db, rp, body string, params url.Values) string
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
}
// RemoteServer is a Server that is accessed remotely via the HTTP API
@ -154,7 +154,7 @@ func (s *RemoteServer) Reset() error {
}
func (s *RemoteServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
func (s *RemoteServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
panic("WritePoints not implemented")
}
@ -328,10 +328,10 @@ func (s *LocalServer) Reset() error {
return nil
}
func (s *LocalServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
func (s *LocalServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.PointsWriter.WritePoints(database, retentionPolicy, consistencyLevel, points)
return s.PointsWriter.WritePoints(database, retentionPolicy, consistencyLevel, user, points)
}
// client abstract querying and writing to a Server using HTTP

View File

@ -8020,7 +8020,7 @@ func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) {
Database: "db0",
RetentionPolicy: "rp0",
}
s.WritePoints(wpr.Database, wpr.RetentionPolicy, models.ConsistencyLevelAny, wpr.Points)
s.WritePoints(wpr.Database, wpr.RetentionPolicy, models.ConsistencyLevelAny, nil, wpr.Points)
}
}
}()

View File

@ -89,7 +89,7 @@ const (
)
// NewEngineFunc creates a new engine.
type NewEngineFunc func(id uint64, i Index, path string, walPath string, options EngineOptions) Engine
type NewEngineFunc func(id uint64, i Index, database, path string, walPath string, options EngineOptions) Engine
// newEngineFuncs is a lookup of engine constructors by name.
var newEngineFuncs = make(map[string]NewEngineFunc)
@ -114,10 +114,10 @@ func RegisteredEngines() []string {
// NewEngine returns an instance of an engine based on its format.
// If the path does not exist then the DefaultFormat is used.
func NewEngine(id uint64, i Index, path string, walPath string, options EngineOptions) (Engine, error) {
func NewEngine(id uint64, i Index, database, path string, walPath string, options EngineOptions) (Engine, error) {
// Create a new engine
if _, err := os.Stat(path); os.IsNotExist(err) {
return newEngineFuncs[options.EngineVersion](id, i, path, walPath, options), nil
return newEngineFuncs[options.EngineVersion](id, i, database, path, walPath, options), nil
}
// If it's a dir then it's a tsm1 engine
@ -136,7 +136,7 @@ func NewEngine(id uint64, i Index, path string, walPath string, options EngineOp
return nil, fmt.Errorf("invalid engine format: %q", format)
}
return fn(id, i, path, walPath, options), nil
return fn(id, i, database, path, walPath, options), nil
}
// EngineOptions represents the options used to initialize the engine.

View File

@ -105,6 +105,7 @@ type Engine struct {
snapWG sync.WaitGroup // waitgroup for running snapshot compactions
id uint64
database string
path string
logger zap.Logger // Logger to be used for important messages
traceLogger zap.Logger // Logger to be used when trace-logging is on.
@ -140,7 +141,7 @@ type Engine struct {
}
// NewEngine returns a new instance of Engine.
func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
w := NewWAL(walPath)
w.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
@ -155,6 +156,7 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb.
logger := zap.New(zap.NullEncoder())
e := &Engine{
id: id,
database: database,
path: path,
index: idx,
logger: logger,
@ -1747,7 +1749,7 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, name string, s
// createVarRefSeriesIterator creates an iterator for a variable reference for a series.
func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, name string, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) {
_, tfs, _ := models.ParseKey([]byte(seriesKey))
_, tfs := models.ParseKey([]byte(seriesKey))
tags := influxql.NewTags(tfs.Map())
// Create options specific for this series.

View File

@ -17,6 +17,8 @@ import (
"testing"
"time"
"path"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/deep"
@ -159,12 +161,13 @@ func TestEngine_Backup(t *testing.T) {
p3 := MustParsePointString("cpu,host=C value=1.3 3000000000")
// Write those points to the engine.
db := path.Base(f.Name())
opt := tsdb.NewEngineOptions()
opt.InmemIndex = inmem.NewIndex()
idx := tsdb.MustOpenIndex(1, filepath.Join(f.Name(), "index"), opt)
opt.InmemIndex = inmem.NewIndex(db)
idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), opt)
defer idx.Close()
e := tsm1.NewEngine(1, idx, f.Name(), walPath, opt).(*tsm1.Engine)
e := tsm1.NewEngine(1, idx, db, f.Name(), walPath, opt).(*tsm1.Engine)
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
@ -589,11 +592,12 @@ func TestEngine_DeleteSeries(t *testing.T) {
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
// Write those points to the engine.
db := path.Base(f.Name())
opt := tsdb.NewEngineOptions()
opt.InmemIndex = inmem.NewIndex()
idx := tsdb.MustOpenIndex(1, filepath.Join(f.Name(), "index"), opt)
opt.InmemIndex = inmem.NewIndex(db)
idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), opt)
defer idx.Close()
e := tsm1.NewEngine(1, idx, f.Name(), walPath, opt).(*tsm1.Engine)
e := tsm1.NewEngine(1, idx, db, f.Name(), walPath, opt).(*tsm1.Engine)
// e.LoadMetadataIndex(1, MustNewDatabaseIndex("db0")) // Initialise an index
// mock the planner so compactions don't run during the test
@ -644,12 +648,13 @@ func TestEngine_LastModified(t *testing.T) {
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
// Write those points to the engine.
db := path.Base(dir)
opt := tsdb.NewEngineOptions()
opt.InmemIndex = inmem.NewIndex()
idx := tsdb.MustOpenIndex(1, filepath.Join(dir, "index"), opt)
opt.InmemIndex = inmem.NewIndex(db)
idx := tsdb.MustOpenIndex(1, db, filepath.Join(dir, "index"), opt)
defer idx.Close()
e := tsm1.NewEngine(1, idx, dir, walPath, opt).(*tsm1.Engine)
e := tsm1.NewEngine(1, idx, db, dir, walPath, opt).(*tsm1.Engine)
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
@ -985,14 +990,16 @@ func NewEngine() *Engine {
panic(err)
}
db := path.Base(root)
opt := tsdb.NewEngineOptions()
opt.InmemIndex = inmem.NewIndex()
opt.InmemIndex = inmem.NewIndex(db)
idx := tsdb.MustOpenIndex(1, filepath.Join(root, "data", "index"), opt)
idx := tsdb.MustOpenIndex(1, db, filepath.Join(root, "data", "index"), opt)
return &Engine{
Engine: tsm1.NewEngine(1,
idx,
db,
filepath.Join(root, "data"),
filepath.Join(root, "wal"),
opt).(*tsm1.Engine),
@ -1030,13 +1037,15 @@ func (e *Engine) Reopen() error {
return err
}
db := path.Base(e.root)
opt := tsdb.NewEngineOptions()
opt.InmemIndex = inmem.NewIndex()
opt.InmemIndex = inmem.NewIndex(db)
e.index = tsdb.MustOpenIndex(1, filepath.Join(e.root, "data", "index"), opt)
e.index = tsdb.MustOpenIndex(1, db, filepath.Join(e.root, "data", "index"), opt)
e.Engine = tsm1.NewEngine(1,
e.index,
db,
filepath.Join(e.root, "data"),
filepath.Join(e.root, "wal"),
opt).(*tsm1.Engine)

View File

@ -70,7 +70,7 @@ const (
)
// NewIndexFunc creates a new index.
type NewIndexFunc func(id uint64, path string, options EngineOptions) Index
type NewIndexFunc func(id uint64, database, path string, options EngineOptions) Index
// newIndexFuncs is a lookup of index constructors by name.
var newIndexFuncs = make(map[string]NewIndexFunc)
@ -95,7 +95,7 @@ func RegisteredIndexes() []string {
// NewIndex returns an instance of an index based on its format.
// If the path does not exist then the DefaultFormat is used.
func NewIndex(id uint64, path string, options EngineOptions) (Index, error) {
func NewIndex(id uint64, database, path string, options EngineOptions) (Index, error) {
format := options.IndexVersion
// Use default format unless existing directory exists.
@ -113,11 +113,11 @@ func NewIndex(id uint64, path string, options EngineOptions) (Index, error) {
if fn == nil {
return nil, fmt.Errorf("invalid index format: %q", format)
}
return fn(id, path, options), nil
return fn(id, database, path, options), nil
}
func MustOpenIndex(id uint64, path string, options EngineOptions) Index {
idx, err := NewIndex(id, path, options)
func MustOpenIndex(id uint64, database, path string, options EngineOptions) Index {
idx, err := NewIndex(id, database, path, options)
if err != nil {
panic(err)
} else if err := idx.Open(); err != nil {

View File

@ -33,10 +33,10 @@ import (
const IndexName = "inmem"
func init() {
tsdb.NewInmemIndex = func(name string) (interface{}, error) { return NewIndex(), nil }
tsdb.NewInmemIndex = func(name string) (interface{}, error) { return NewIndex(name), nil }
tsdb.RegisterIndex(IndexName, func(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index {
return NewShardIndex(id, path, opt)
tsdb.RegisterIndex(IndexName, func(id uint64, database, path string, opt tsdb.EngineOptions) tsdb.Index {
return NewShardIndex(id, database, path, opt)
})
}
@ -46,6 +46,8 @@ func init() {
type Index struct {
mu sync.RWMutex
database string
// In-memory metadata index, built on load and updated when new series come in
measurements map[string]*Measurement // measurement name to object and index
series map[string]*Series // map series key to the Series object
@ -56,8 +58,9 @@ type Index struct {
}
// NewIndex returns a new initialized Index.
func NewIndex() *Index {
func NewIndex(database string) *Index {
index := &Index{
database: database,
measurements: make(map[string]*Measurement),
series: make(map[string]*Series),
}
@ -210,7 +213,7 @@ func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *Measurement {
// and acquire the write lock
m = i.measurements[string(name)]
if m == nil {
m = NewMeasurement(string(name))
m = NewMeasurement(i.database, string(name))
i.measurements[string(name)] = m
// Add the measurement to the measurements sketch.
@ -831,7 +834,7 @@ func (i *ShardIndex) TagSets(name []byte, opt influxql.IteratorOptions) ([]*infl
}
// NewShardIndex returns a new index for a shard.
func NewShardIndex(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index {
func NewShardIndex(id uint64, database, path string, opt tsdb.EngineOptions) tsdb.Index {
return &ShardIndex{
Index: opt.InmemIndex.(*Index),
id: id,

View File

@ -17,8 +17,11 @@ import (
// goroutine safe while un-exported functions assume the caller will use the
// appropriate locks.
type Measurement struct {
database string
Name string `json:"name,omitempty"`
name []byte // cached version as []byte
mu sync.RWMutex
Name string `json:"name,omitempty"`
fieldNames map[string]struct{}
// in-memory index fields
@ -30,9 +33,11 @@ type Measurement struct {
}
// NewMeasurement allocates and initializes a new Measurement.
func NewMeasurement(name string) *Measurement {
func NewMeasurement(database, name string) *Measurement {
return &Measurement{
database: database,
Name: name,
name: []byte(name),
fieldNames: make(map[string]struct{}),
seriesByID: make(map[uint64]*Series),
@ -338,6 +343,10 @@ func (m *Measurement) TagSets(shardID uint64, opt influxql.IteratorOptions) ([]*
continue
}
if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(m.database, m.name, s.Tags()) {
continue
}
var tagsAsKey []byte
if len(dims) > 0 {
tagsAsKey = tsdb.MakeTagsKey(dims, s.Tags())
@ -353,7 +362,7 @@ func (m *Measurement) TagSets(shardID uint64, opt influxql.IteratorOptions) ([]*
tagSets[string(tagsAsKey)] = tagSet
}
// Associate the series and filter with the Tagset.
tagSet.AddFilter(m.seriesByID[id].Key, filters[id])
tagSet.AddFilter(s.Key, filters[id])
seriesN++
}
// Release the lock while we sort all the tags

View File

@ -93,7 +93,7 @@ func TestSeriesIDs_Reject(t *testing.T) {
}
func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) {
m := inmem.NewMeasurement("cpu")
m := inmem.NewMeasurement("foo", "cpu")
var dst []string
dst = m.AppendSeriesKeysByID(dst, []uint64{1})
if exp, got := 0, len(dst); exp != got {
@ -102,7 +102,7 @@ func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) {
}
func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) {
m := inmem.NewMeasurement("cpu")
m := inmem.NewMeasurement("foo", "cpu")
s := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
s.ID = 1
m.AddSeries(s)
@ -119,7 +119,7 @@ func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) {
}
func TestMeasurement_TagsSet_Deadlock(t *testing.T) {
m := inmem.NewMeasurement("cpu")
m := inmem.NewMeasurement("foo", "cpu")
s1 := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
s1.ID = 1
m.AddSeries(s1)
@ -138,7 +138,7 @@ func TestMeasurement_TagsSet_Deadlock(t *testing.T) {
}
func TestMeasurement_ForEachSeriesByExpr_Deadlock(t *testing.T) {
m := inmem.NewMeasurement("cpu")
m := inmem.NewMeasurement("foo", "cpu")
s1 := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
s1.ID = 1
m.AddSeries(s1)
@ -159,7 +159,7 @@ func TestMeasurement_ForEachSeriesByExpr_Deadlock(t *testing.T) {
}
func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) {
m := inmem.NewMeasurement("cpu")
m := inmem.NewMeasurement("foo", "cpu")
for i := 0; i < 100000; i++ {
s := inmem.NewSeries([]byte("cpu"), models.Tags{models.NewTag(
[]byte("host"),
@ -190,7 +190,7 @@ func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) {
}
func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) {
m := inmem.NewMeasurement("cpu")
m := inmem.NewMeasurement("foo", "cpu")
for i := 0; i < 100000; i++ {
s := inmem.NewSeries([]byte("cpu"), models.Tags{models.Tag{
Key: []byte("host"),
@ -222,7 +222,7 @@ func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) {
}
func benchmarkTagSets(b *testing.B, n int, opt influxql.IteratorOptions) {
m := inmem.NewMeasurement("m")
m := inmem.NewMeasurement("foo", "m")
for i := 0; i < n; i++ {
tags := map[string]string{"tag1": "value1", "tag2": "value2"}
s := inmem.NewSeries([]byte(fmt.Sprintf("m,tag1=value1,tag2=value2")), models.NewTags(tags))

View File

@ -31,9 +31,10 @@ const (
)
func init() {
tsdb.RegisterIndex(IndexName, func(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index {
tsdb.RegisterIndex(IndexName, func(id uint64, database, path string, opt tsdb.EngineOptions) tsdb.Index {
idx := NewIndex()
idx.ShardID = id
idx.Database = database
idx.Path = path
idx.options = opt
return idx
@ -79,6 +80,9 @@ type Index struct {
// Associated shard info.
ShardID uint64
// Name of database.
Database string
// Root directory of the index files.
Path string
@ -529,10 +533,7 @@ func (i *Index) DropSeries(key []byte) error {
i.mu.RLock()
defer i.mu.RUnlock()
name, tags, err := models.ParseKey(key)
if err != nil {
return err
}
name, tags := models.ParseKey(key)
mname := []byte(name)
if err := i.activeLogFile.DeleteSeries(mname, tags); err != nil {

View File

@ -265,7 +265,7 @@ func (s *Shard) Open() error {
// Initialize underlying index.
ipath := filepath.Join(s.path, "index")
idx, err := NewIndex(s.id, ipath, s.options)
idx, err := NewIndex(s.id, s.database, ipath, s.options)
if err != nil {
return err
}
@ -278,7 +278,7 @@ func (s *Shard) Open() error {
idx.WithLogger(s.baseLogger)
// Initialize underlying engine.
e, err := NewEngine(s.id, idx, s.path, s.walPath, s.options)
e, err := NewEngine(s.id, idx, s.database, s.path, s.walPath, s.options)
if err != nil {
return err
}

View File

@ -30,7 +30,7 @@ func TestShardWriteAndIndex(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir))
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
@ -98,7 +98,7 @@ func TestMaxSeriesLimit(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.Config.MaxSeriesPerDatabase = 1000
opts.InmemIndex = inmem.NewIndex()
opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir))
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
@ -151,7 +151,7 @@ func TestShard_MaxTagValuesLimit(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.Config.MaxValuesPerTag = 1000
opts.InmemIndex = inmem.NewIndex()
opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir))
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
@ -203,7 +203,7 @@ func TestWriteTimeTag(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir))
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
@ -251,7 +251,7 @@ func TestWriteTimeField(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir))
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
@ -284,7 +284,7 @@ func TestShardWriteAddNewField(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir))
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
@ -334,7 +334,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir))
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
@ -418,7 +418,7 @@ func TestShard_WritePoints_FieldConflictConcurrentQuery(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir))
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
@ -563,7 +563,7 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir))
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
@ -1089,23 +1089,23 @@ type Shard struct {
// NewShard returns a new instance of Shard with temp paths.
func NewShard() *Shard {
// Create temporary path for data and WAL.
path, err := ioutil.TempDir("", "influxdb-tsdb-")
dir, err := ioutil.TempDir("", "influxdb-tsdb-")
if err != nil {
panic(err)
}
// Build engine options.
opt := tsdb.NewEngineOptions()
opt.Config.WALDir = filepath.Join(path, "wal")
opt.InmemIndex = inmem.NewIndex()
opt.Config.WALDir = filepath.Join(dir, "wal")
opt.InmemIndex = inmem.NewIndex(path.Base(dir))
return &Shard{
Shard: tsdb.NewShard(0,
filepath.Join(path, "data", "db0", "rp0", "1"),
filepath.Join(path, "wal", "db0", "rp0", "1"),
filepath.Join(dir, "data", "db0", "rp0", "1"),
filepath.Join(dir, "wal", "db0", "rp0", "1"),
opt,
),
path: path,
path: dir,
}
}