feat: add auth to remotes & replications APIs (#22744)

pull/22785/head
Daniel Moran 2021-10-26 11:32:35 -04:00 committed by GitHub
parent ca6b99f9a2
commit 58139c47b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 595 additions and 149 deletions

View File

@ -139,6 +139,10 @@ const (
NotebooksResourceType = ResourceType("notebooks") // 18
// AnnotationsResourceType gives permission to one or more annotations.
AnnotationsResourceType = ResourceType("annotations") // 19
// RemotesResourceType gives permission to one or more remote connections.
RemotesResourceType = ResourceType("remotes") // 20
// ReplicationsResourceType gives permission to one or more replications.
ReplicationsResourceType = ResourceType("replications") // 21
)
// AllResourceTypes is the list of all known resource types.
@ -163,28 +167,11 @@ var AllResourceTypes = []ResourceType{
DBRPResourceType, // 17
NotebooksResourceType, // 18
AnnotationsResourceType, // 19
RemotesResourceType, // 20
ReplicationsResourceType, // 21
// NOTE: when modifying this list, please update the swagger for components.schemas.Permission resource enum.
}
// OrgResourceTypes is the list of all known resource types that belong to an organization.
var OrgResourceTypes = []ResourceType{
BucketsResourceType, // 1
DashboardsResourceType, // 2
SourcesResourceType, // 4
TasksResourceType, // 5
TelegrafsResourceType, // 6
UsersResourceType, // 7
VariablesResourceType, // 8
SecretsResourceType, // 10
DocumentsResourceType, // 13
NotificationRuleResourceType, // 14
NotificationEndpointResourceType, // 15
ChecksResourceType, // 16
DBRPResourceType, // 17
NotebooksResourceType, // 18
AnnotationsResourceType, // 19
}
// Valid checks if the resource type is a member of the ResourceType enum.
func (r Resource) Valid() (err error) {
return r.Type.Valid()
@ -213,6 +200,8 @@ func (t ResourceType) Valid() (err error) {
case DBRPResourceType: // 17
case NotebooksResourceType: // 18
case AnnotationsResourceType: // 19
case RemotesResourceType: // 20
case ReplicationsResourceType: // 21
default:
err = ErrInvalidResourceType
}

View File

@ -29,6 +29,6 @@ func Test_Auth_Basic(t *testing.T) {
`ID User Name User ID Description Token Permissions`+"\n"+
`08371db24dcc8000 testuser 08371db1dd8c8000 testuser's Token A9Ovdl8SmP-rfp8wQ2vJoPUsZoQQJ3EochD88SlJcgrcLw4HBwgUqpSHQxc9N9Drg0_aY6Lp1jutBRcKhbV7aQ== \[read:authorizations write:authorizations read:buckets write:buckets read:dashboards write:dashboards read:orgs write:orgs read:sources write:sources read:tasks write:tasks read:telegrafs write:telegrafs read:users write:users read:variables write:variables read:scrapers write:scrapers read:secrets write:secrets read:labels write:labels read:views write:views read:documents write:documents read:notificationRules write:notificationRules read:notificationEndpoints write:notificationEndpoints read:checks write:checks read:dbrp write:dbrp read:notebooks write:notebooks read:annotations write:annotations\]`+"\n"+
`08371deae98c8000 testuser 08371db1dd8c8000 testuser's read buckets token 4-pZrlm84u9uiMVrPBeITe46KxfdEnvTX5H2CZh38BtAsXX4O47b8QwZ9jHL_Cek2w-VbVfRxDpo0Mu8ORiqyQ== \[read:orgs/dd7cd2292f6e974a/buckets\]`+"\n"+
`[^\t]* testuser [^\t]* testuser's Recovery Token [^\t]* \[read:authorizations write:authorizations read:buckets write:buckets read:dashboards write:dashboards read:orgs write:orgs read:sources write:sources read:tasks write:tasks read:telegrafs write:telegrafs read:users write:users read:variables write:variables read:scrapers write:scrapers read:secrets write:secrets read:labels write:labels read:views write:views read:documents write:documents read:notificationRules write:notificationRules read:notificationEndpoints write:notificationEndpoints read:checks write:checks read:dbrp write:dbrp read:notebooks write:notebooks read:annotations write:annotations\]`+"\n",
`[^\t]* testuser [^\t]* testuser's Recovery Token [^\t]* \[read:authorizations write:authorizations read:buckets write:buckets read:dashboards write:dashboards read:orgs write:orgs read:sources write:sources read:tasks write:tasks read:telegrafs write:telegrafs read:users write:users read:variables write:variables read:scrapers write:scrapers read:secrets write:secrets read:labels write:labels read:views write:views read:documents write:documents read:notificationRules write:notificationRules read:notificationEndpoints write:notificationEndpoints read:checks write:checks read:dbrp write:dbrp read:notebooks write:notebooks read:annotations write:annotations read:remotes write:remotes read:replications write:replications\]`+"\n",
testhelper.MustRunCommand(t, NewAuthCommand(), "list", "--bolt-path", db.Name()))
}

View File

@ -43,6 +43,8 @@ func TestResourceListHandler(t *testing.T) {
string(influxdb.DBRPResourceType),
string(influxdb.NotebooksResourceType),
string(influxdb.AnnotationsResourceType),
string(influxdb.RemotesResourceType),
string(influxdb.ReplicationsResourceType),
}
resp := w.Result()

View File

@ -6,27 +6,36 @@ import (
"sort"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kv"
)
var Migration0016_AddAnnotationsNotebooksToOperToken = UpOnlyMigration(
"add annotations and notebooks resource types to operator token",
func(ctx context.Context, store kv.SchemaStore) error {
migrateTokensMigration(
func(t influxdb.Authorization) bool {
return permListsMatch(preNotebooksAnnotationsOpPerms(), t.Permissions)
},
func(t *influxdb.Authorization) {
t.Permissions = append(t.Permissions, notebooksAndAnnotationsPerms(0)...)
},
),
)
func migrateTokensMigration(
checkToken func(influxdb.Authorization) bool,
updateToken func(*influxdb.Authorization),
) func(context.Context, kv.SchemaStore) error {
return func(ctx context.Context, store kv.SchemaStore) error {
authBucket := []byte("authorizationsv1")
// Find the operator token that needs updated
// There will usually be 1 operator token. If somebody has deleted their
// operator token, we don't necessarily want to make it so that influxdb
// won't start, so store a list of the found operator tokens so that it can
// be iterated on later.
opTokens := []influxdb.Authorization{}
// First find all tokens matching the predicate.
var tokens []influxdb.Authorization
if err := store.View(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(authBucket)
if err != nil {
return err
}
cursor, err := bkt.ForwardCursor(nil)
if err != nil {
return err
@ -37,54 +46,51 @@ var Migration0016_AddAnnotationsNotebooksToOperToken = UpOnlyMigration(
if err := json.Unmarshal(v, &t); err != nil {
return false, err
}
// Add any tokens to the list that match the list of permission from an
// "old" operator token
if permListsMatch(oldOpPerms(), t.Permissions) {
opTokens = append(opTokens, t)
if checkToken(t) {
tokens = append(tokens, t)
}
return true, nil
})
}); err != nil {
return err
}
// Go through the list of operator tokens found and update their permissions
// list. There should be only 1, but if there are somehow more this will
// update all of them.
for _, t := range opTokens {
encodedID, err := t.ID.Encode()
// Next, update all the extracted tokens.
for i := range tokens {
updateToken(&tokens[i])
}
// Finally, persist the updated tokens back to the DB.
if err := store.Update(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(authBucket)
if err != nil {
return err
}
t.Permissions = append(t.Permissions, extraPerms()...)
v, err := json.Marshal(t)
if err != nil {
return err
}
if err := store.Update(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(authBucket)
for _, t := range tokens {
encodedID, err := t.ID.Encode()
if err != nil {
return err
}
return bkt.Put(encodedID, v)
}); err != nil {
return err
v, err := json.Marshal(t)
if err != nil {
return err
}
if err := bkt.Put(encodedID, v); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
return nil
},
)
}
}
// extraPerms returns the list of additional permissions that need added for
// notebooksAndAnnotationsPerms returns the list of additional permissions that need added for
// annotations and notebooks.
func extraPerms() []influxdb.Permission {
func notebooksAndAnnotationsPerms(orgID platform.ID) []influxdb.Permission {
resTypes := []influxdb.Resource{
{
Type: influxdb.AnnotationsResourceType,
@ -93,13 +99,18 @@ func extraPerms() []influxdb.Permission {
Type: influxdb.NotebooksResourceType,
},
}
return permListFromResources(resTypes)
perms := permListFromResources(resTypes)
if orgID.Valid() {
for i := range perms {
perms[i].Resource.OrgID = &orgID
}
}
return perms
}
// oldOpPerms is the list of permissions from an "old" operator token - prior to
// the addition of the notebooks an annotations resource type.
func oldOpPerms() []influxdb.Permission {
// preNotebooksAnnotationsOpPerms is the list of permissions from a 2.0.x operator token,
// prior to the addition of the notebooks and annotations resource types.
func preNotebooksAnnotationsOpPerms() []influxdb.Permission {
resTypes := []influxdb.Resource{
{
Type: influxdb.AuthorizationsResourceType,

View File

@ -60,7 +60,7 @@ func TestMigration_AnnotationsNotebooksOperToken(t *testing.T) {
ID: id2, // an operator token
OrgID: OrgID,
UserID: UserID,
Permissions: oldOpPerms(),
Permissions: preNotebooksAnnotationsOpPerms(),
},
}
@ -113,7 +113,7 @@ func TestMigration_AnnotationsNotebooksOperToken(t *testing.T) {
var token influxdb.Authorization
require.NoError(t, json.Unmarshal(b, &token))
require.ElementsMatch(t, append(oldOpPerms(), extraPerms()...), token.Permissions)
require.ElementsMatch(t, append(preNotebooksAnnotationsOpPerms(), notebooksAndAnnotationsPerms(0)...), token.Permissions)
return nil
})
require.NoError(t, err)

View File

@ -1,95 +1,26 @@
package all
import (
"context"
"encoding/json"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kv"
)
var Migration0017_AddAnnotationsNotebooksToAllAccessTokens = UpOnlyMigration(
"add annotations and notebooks resource types to all-access tokens",
func(ctx context.Context, store kv.SchemaStore) error {
authBucket := []byte("authorizationsv1")
// Find all-access tokens that need to be updated.
tokens := []influxdb.Authorization{}
if err := store.View(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(authBucket)
if err != nil {
return err
}
cursor, err := bkt.ForwardCursor(nil)
if err != nil {
return err
}
return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) {
var t influxdb.Authorization
if err := json.Unmarshal(v, &t); err != nil {
return false, err
}
// Add any tokens to the list that match the list of permission from an
// "old" all-access token
if permListsMatch(oldAllAccessPerms(t.OrgID, t.UserID), t.Permissions) {
tokens = append(tokens, t)
}
return true, nil
})
}); err != nil {
return err
}
// Go through the list of all-access tokens found and update their permissions list.
for _, t := range tokens {
encodedID, err := t.ID.Encode()
if err != nil {
return err
}
t.Permissions = append(t.Permissions, extraAllAccessPerms(t.OrgID)...)
v, err := json.Marshal(t)
if err != nil {
return err
}
if err := store.Update(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(authBucket)
if err != nil {
return err
}
return bkt.Put(encodedID, v)
}); err != nil {
return err
}
}
return nil
},
migrateTokensMigration(
func(t influxdb.Authorization) bool {
return permListsMatch(preNotebooksAnnotationsAllAccessPerms(t.OrgID, t.UserID), t.Permissions)
},
func(t *influxdb.Authorization) {
t.Permissions = append(t.Permissions, notebooksAndAnnotationsPerms(t.OrgID)...)
},
),
)
// extraAllAccessPerms returns the list of additional permissions that need added for
// annotations and notebooks.
func extraAllAccessPerms(orgId platform.ID) []influxdb.Permission {
perms := extraPerms()
for i := range perms {
perms[i].Resource.OrgID = &orgId
}
return perms
}
// oldAllAccessPerms is the list of permissions from an "old" all-access token - prior to
// the addition of the notebooks an annotations resource type.
func oldAllAccessPerms(orgId platform.ID, userId platform.ID) []influxdb.Permission {
opPerms := oldOpPerms()
// preNotebooksAnnotationsAllAccessPerms is the list of permissions from a 2.0.x all-access token,
// prior to the addition of the notebooks and annotations resource types.
func preNotebooksAnnotationsAllAccessPerms(orgId platform.ID, userId platform.ID) []influxdb.Permission {
opPerms := preNotebooksAnnotationsOpPerms()
perms := make([]influxdb.Permission, 0, len(opPerms)-1) // -1 because write-org permission isn't included.
for _, p := range opPerms {
if p.Resource.Type == influxdb.OrgsResourceType {

View File

@ -43,7 +43,7 @@ func TestMigration_AnnotationsNotebooksAllAccessToken(t *testing.T) {
ID: id2, // an all-access token
OrgID: OrgID,
UserID: UserID,
Permissions: oldAllAccessPerms(OrgID, UserID),
Permissions: preNotebooksAnnotationsAllAccessPerms(OrgID, UserID),
},
}
@ -96,7 +96,7 @@ func TestMigration_AnnotationsNotebooksAllAccessToken(t *testing.T) {
var token influxdb.Authorization
require.NoError(t, json.Unmarshal(b, &token))
require.ElementsMatch(t, append(oldAllAccessPerms(OrgID, UserID), extraAllAccessPerms(OrgID)...), token.Permissions)
require.ElementsMatch(t, append(preNotebooksAnnotationsAllAccessPerms(OrgID, UserID), notebooksAndAnnotationsPerms(OrgID)...), token.Permissions)
return nil
})
require.NoError(t, err)

View File

@ -0,0 +1,49 @@
package all
import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
)
var Migration0019_AddRemotesReplicationsToTokens = UpOnlyMigration(
"add remotes and replications resource types to operator and all-access tokens",
migrateTokensMigration(
func(t influxdb.Authorization) bool {
return permListsMatch(preReplicationOpPerms(), t.Permissions) ||
permListsMatch(preReplicationAllAccessPerms(t.OrgID, t.UserID), t.Permissions)
},
func(t *influxdb.Authorization) {
if permListsMatch(preReplicationOpPerms(), t.Permissions) {
t.Permissions = append(t.Permissions, remotesAndReplicationsPerms(0)...)
} else {
t.Permissions = append(t.Permissions, remotesAndReplicationsPerms(t.OrgID)...)
}
},
),
)
func preReplicationOpPerms() []influxdb.Permission {
return append(preNotebooksAnnotationsOpPerms(), notebooksAndAnnotationsPerms(0)...)
}
func preReplicationAllAccessPerms(orgID platform.ID, userID platform.ID) []influxdb.Permission {
return append(preNotebooksAnnotationsAllAccessPerms(orgID, userID), notebooksAndAnnotationsPerms(orgID)...)
}
func remotesAndReplicationsPerms(orgID platform.ID) []influxdb.Permission {
resTypes := []influxdb.Resource{
{
Type: influxdb.RemotesResourceType,
},
{
Type: influxdb.ReplicationsResourceType,
},
}
perms := permListFromResources(resTypes)
if orgID.Valid() {
for i := range perms {
perms[i].Resource.OrgID = &orgID
}
}
return perms
}

View File

@ -0,0 +1,210 @@
package all
import (
"context"
"encoding/json"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/snowflake"
"github.com/stretchr/testify/require"
)
func TestMigration_RemotesReplicationsOperToken(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
// Run up to migration 18.
ts := newService(t, ctx, 18)
// Auth bucket contains the authorizations AKA tokens
authBucket := []byte("authorizationsv1")
// The store returned by newService will include an operator token with the
// current system's entire list of resources already, so remove that before
// proceeding with the tests.
err := ts.Store.Update(context.Background(), func(tx kv.Tx) error {
bkt, err := tx.Bucket(authBucket)
require.NoError(t, err)
cursor, err := bkt.ForwardCursor(nil)
require.NoError(t, err)
return kv.WalkCursor(ctx, cursor, func(k, _ []byte) (bool, error) {
err := bkt.Delete(k)
require.NoError(t, err)
return true, nil
})
})
require.NoError(t, err)
// Verify that running the migration in the absence of an operator token will
// not crash influxdb.
require.NoError(t, Migration0019_AddRemotesReplicationsToTokens.Up(context.Background(), ts.Store))
// Seed some authorizations
id1 := snowflake.NewIDGenerator().ID()
id2 := snowflake.NewIDGenerator().ID()
OrgID := ts.Org.ID
UserID := ts.User.ID
auths := []influxdb.Authorization{
{
ID: id1, // a non-operator token
OrgID: OrgID,
UserID: UserID,
Permissions: permsShouldNotChange(),
},
{
ID: id2, // an operator token
OrgID: OrgID,
UserID: UserID,
Permissions: preReplicationOpPerms(),
},
}
for _, a := range auths {
js, err := json.Marshal(a)
require.NoError(t, err)
idBytes, err := a.ID.Encode()
require.NoError(t, err)
err = ts.Store.Update(context.Background(), func(tx kv.Tx) error {
bkt, err := tx.Bucket(authBucket)
require.NoError(t, err)
return bkt.Put(idBytes, js)
})
require.NoError(t, err)
}
// Run the migration
require.NoError(t, Migration0019_AddRemotesReplicationsToTokens.Up(context.Background(), ts.Store))
// the first item should not be changed
encoded1, err := id1.Encode()
require.NoError(t, err)
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
bkt, err := tx.Bucket(authBucket)
require.NoError(t, err)
b, err := bkt.Get(encoded1)
require.NoError(t, err)
var token influxdb.Authorization
require.NoError(t, json.Unmarshal(b, &token))
require.Equal(t, auths[0], token)
return nil
})
require.NoError(t, err)
// the second item is the 2.0.x operator token and should have been updated
// with a new permissions list
encoded2, err := id2.Encode()
require.NoError(t, err)
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
bkt, err := tx.Bucket(authBucket)
require.NoError(t, err)
b, err := bkt.Get(encoded2)
require.NoError(t, err)
var token influxdb.Authorization
require.NoError(t, json.Unmarshal(b, &token))
require.ElementsMatch(t, append(preReplicationOpPerms(), remotesAndReplicationsPerms(0)...), token.Permissions)
return nil
})
require.NoError(t, err)
}
func TestMigration_RemotesReplicationsAllAccessToken(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
// Run up to migration 18.
ts := newService(t, ctx, 18)
// Auth bucket contains the authorizations AKA tokens
authBucket := []byte("authorizationsv1")
// Verify that running the migration in the absence of an all-access token will
// not crash influxdb.
require.NoError(t, Migration0019_AddRemotesReplicationsToTokens.Up(context.Background(), ts.Store))
// Seed some authorizations
id1 := snowflake.NewIDGenerator().ID()
id2 := snowflake.NewIDGenerator().ID()
OrgID := ts.Org.ID
UserID := ts.User.ID
auths := []influxdb.Authorization{
{
ID: id1, // a non-all-access token
OrgID: OrgID,
UserID: UserID,
Permissions: orgPermsShouldNotChange(OrgID),
},
{
ID: id2, // an all-access token
OrgID: OrgID,
UserID: UserID,
Permissions: preReplicationAllAccessPerms(OrgID, UserID),
},
}
for _, a := range auths {
js, err := json.Marshal(a)
require.NoError(t, err)
idBytes, err := a.ID.Encode()
require.NoError(t, err)
err = ts.Store.Update(context.Background(), func(tx kv.Tx) error {
bkt, err := tx.Bucket(authBucket)
require.NoError(t, err)
return bkt.Put(idBytes, js)
})
require.NoError(t, err)
}
// Run the migration
require.NoError(t, Migration0019_AddRemotesReplicationsToTokens.Up(context.Background(), ts.Store))
// the first item should not be changed
encoded1, err := id1.Encode()
require.NoError(t, err)
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
bkt, err := tx.Bucket(authBucket)
require.NoError(t, err)
b, err := bkt.Get(encoded1)
require.NoError(t, err)
var token influxdb.Authorization
require.NoError(t, json.Unmarshal(b, &token))
require.Equal(t, auths[0], token)
return nil
})
require.NoError(t, err)
// the second item is a 2.0.x all-access token and should have been updated
// with a new permissions list
encoded2, err := id2.Encode()
require.NoError(t, err)
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
bkt, err := tx.Bucket(authBucket)
require.NoError(t, err)
b, err := bkt.Get(encoded2)
require.NoError(t, err)
var token influxdb.Authorization
require.NoError(t, json.Unmarshal(b, &token))
require.ElementsMatch(t, append(preReplicationAllAccessPerms(OrgID, UserID), remotesAndReplicationsPerms(OrgID)...), token.Permissions)
return nil
})
require.NoError(t, err)
}

View File

@ -43,5 +43,7 @@ var Migrations = [...]migration.Spec{
Migration0017_AddAnnotationsNotebooksToAllAccessTokens,
// repair missing shard group durations
Migration0018_RepairMissingShardGroupDurations,
// add remotes and replications resource types to operator and all-access tokens
Migration0019_AddRemotesReplicationsToTokens,
// {{ do_not_edit . }}
}

View File

@ -74,6 +74,8 @@ func NewInstrumentedRemotesHandler(log *zap.Logger, reg prometheus.Registerer, s
svc = newMetricCollectingService(reg, svc)
// Wrap logging.
svc = newLoggingService(log, svc)
// Wrap authz.
svc = newAuthCheckingService(svc)
return newRemoteConnectionHandler(log, svc)
}

View File

@ -0,0 +1,111 @@
package transport
import (
"context"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorizer"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
)
func newAuthCheckingService(underlying RemoteConnectionService) *authCheckingService {
return &authCheckingService{underlying}
}
type authCheckingService struct {
underlying RemoteConnectionService
}
var _ RemoteConnectionService = (*authCheckingService)(nil)
func (a authCheckingService) ListRemoteConnections(ctx context.Context, filter influxdb.RemoteConnectionListFilter) (*influxdb.RemoteConnections, error) {
rs, err := a.underlying.ListRemoteConnections(ctx, filter)
if err != nil {
return nil, err
}
rrs := rs.Remotes[:0]
for _, r := range rs.Remotes {
_, _, err := authorizer.AuthorizeRead(ctx, influxdb.RemotesResourceType, r.ID, r.OrgID)
if err != nil && errors.ErrorCode(err) != errors.EUnauthorized {
return nil, err
}
if errors.ErrorCode(err) == errors.EUnauthorized {
continue
}
rrs = append(rrs, r)
}
return &influxdb.RemoteConnections{Remotes: rrs}, nil
}
func (a authCheckingService) CreateRemoteConnection(ctx context.Context, request influxdb.CreateRemoteConnectionRequest) (*influxdb.RemoteConnection, error) {
if _, _, err := authorizer.AuthorizeCreate(ctx, influxdb.RemotesResourceType, request.OrgID); err != nil {
return nil, err
}
return a.underlying.CreateRemoteConnection(ctx, request)
}
func (a authCheckingService) ValidateNewRemoteConnection(ctx context.Context, request influxdb.CreateRemoteConnectionRequest) error {
if _, _, err := authorizer.AuthorizeCreate(ctx, influxdb.RemotesResourceType, request.OrgID); err != nil {
return err
}
return a.underlying.ValidateNewRemoteConnection(ctx, request)
}
func (a authCheckingService) GetRemoteConnection(ctx context.Context, id platform.ID) (*influxdb.RemoteConnection, error) {
r, err := a.underlying.GetRemoteConnection(ctx, id)
if err != nil {
return nil, err
}
if _, _, err := authorizer.AuthorizeRead(ctx, influxdb.RemotesResourceType, id, r.OrgID); err != nil {
return nil, err
}
return r, nil
}
func (a authCheckingService) UpdateRemoteConnection(ctx context.Context, id platform.ID, request influxdb.UpdateRemoteConnectionRequest) (*influxdb.RemoteConnection, error) {
r, err := a.underlying.GetRemoteConnection(ctx, id)
if err != nil {
return nil, err
}
if _, _, err := authorizer.AuthorizeWrite(ctx, influxdb.RemotesResourceType, id, r.OrgID); err != nil {
return nil, err
}
return a.underlying.UpdateRemoteConnection(ctx, id, request)
}
func (a authCheckingService) ValidateUpdatedRemoteConnection(ctx context.Context, id platform.ID, request influxdb.UpdateRemoteConnectionRequest) error {
r, err := a.underlying.GetRemoteConnection(ctx, id)
if err != nil {
return err
}
if _, _, err := authorizer.AuthorizeWrite(ctx, influxdb.RemotesResourceType, id, r.OrgID); err != nil {
return err
}
return a.underlying.ValidateUpdatedRemoteConnection(ctx, id, request)
}
func (a authCheckingService) DeleteRemoteConnection(ctx context.Context, id platform.ID) error {
r, err := a.underlying.GetRemoteConnection(ctx, id)
if err != nil {
return err
}
if _, _, err := authorizer.AuthorizeWrite(ctx, influxdb.RemotesResourceType, id, r.OrgID); err != nil {
return err
}
return a.underlying.DeleteRemoteConnection(ctx, id)
}
func (a authCheckingService) ValidateRemoteConnection(ctx context.Context, id platform.ID) error {
r, err := a.underlying.GetRemoteConnection(ctx, id)
if err != nil {
return err
}
if _, _, err := authorizer.AuthorizeRead(ctx, influxdb.RemotesResourceType, id, r.OrgID); err != nil {
return err
}
return a.underlying.ValidateRemoteConnection(ctx, id)
}

View File

@ -84,6 +84,8 @@ func NewInstrumentedReplicationHandler(log *zap.Logger, reg prometheus.Registere
svc = newMetricCollectingService(reg, svc)
// Wrap logging.
svc = newLoggingService(log, svc)
// Wrap authz.
svc = newAuthCheckingService(svc)
return newReplicationHandler(log, svc)
}

View File

@ -0,0 +1,131 @@
package transport
import (
"context"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorizer"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
)
func newAuthCheckingService(underlying ReplicationService) *authCheckingService {
return &authCheckingService{underlying}
}
type authCheckingService struct {
underlying ReplicationService
}
var _ ReplicationService = (*authCheckingService)(nil)
func (a authCheckingService) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) {
rs, err := a.underlying.ListReplications(ctx, filter)
if err != nil {
return nil, err
}
rrs := rs.Replications[:0]
for _, r := range rs.Replications {
_, _, err := authorizer.AuthorizeRead(ctx, influxdb.ReplicationsResourceType, r.ID, r.OrgID)
if err != nil && errors.ErrorCode(err) != errors.EUnauthorized {
return nil, err
}
if errors.ErrorCode(err) == errors.EUnauthorized {
continue
}
rrs = append(rrs, r)
}
return &influxdb.Replications{Replications: rrs}, nil
}
func (a authCheckingService) CreateReplication(ctx context.Context, request influxdb.CreateReplicationRequest) (*influxdb.Replication, error) {
if err := a.authCreateReplication(ctx, request); err != nil {
return nil, err
}
return a.underlying.CreateReplication(ctx, request)
}
func (a authCheckingService) ValidateNewReplication(ctx context.Context, request influxdb.CreateReplicationRequest) error {
if err := a.authCreateReplication(ctx, request); err != nil {
return err
}
return a.underlying.ValidateNewReplication(ctx, request)
}
func (a authCheckingService) authCreateReplication(ctx context.Context, request influxdb.CreateReplicationRequest) error {
if _, _, err := authorizer.AuthorizeCreate(ctx, influxdb.ReplicationsResourceType, request.OrgID); err != nil {
return err
}
// N.B. creating a replication requires read-access to both the source bucket and the target remote.
if _, _, err := authorizer.AuthorizeRead(ctx, influxdb.BucketsResourceType, request.LocalBucketID, request.OrgID); err != nil {
return err
}
if _, _, err := authorizer.AuthorizeRead(ctx, influxdb.RemotesResourceType, request.RemoteID, request.OrgID); err != nil {
return err
}
return nil
}
func (a authCheckingService) GetReplication(ctx context.Context, id platform.ID) (*influxdb.Replication, error) {
r, err := a.underlying.GetReplication(ctx, id)
if err != nil {
return nil, err
}
if _, _, err := authorizer.AuthorizeRead(ctx, influxdb.ReplicationsResourceType, id, r.OrgID); err != nil {
return nil, err
}
return r, nil
}
func (a authCheckingService) UpdateReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) (*influxdb.Replication, error) {
if err := a.authUpdateReplication(ctx, id, request); err != nil {
return nil, err
}
return a.underlying.UpdateReplication(ctx, id, request)
}
func (a authCheckingService) ValidateUpdatedReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) error {
if err := a.authUpdateReplication(ctx, id, request); err != nil {
return err
}
return a.underlying.ValidateUpdatedReplication(ctx, id, request)
}
func (a authCheckingService) authUpdateReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) error {
r, err := a.underlying.GetReplication(ctx, id)
if err != nil {
return err
}
if _, _, err := authorizer.AuthorizeWrite(ctx, influxdb.ReplicationsResourceType, id, r.OrgID); err != nil {
return err
}
if request.RemoteID != nil {
if _, _, err := authorizer.AuthorizeRead(ctx, influxdb.RemotesResourceType, *request.RemoteID, r.OrgID); err != nil {
return err
}
}
return nil
}
func (a authCheckingService) DeleteReplication(ctx context.Context, id platform.ID) error {
r, err := a.underlying.GetReplication(ctx, id)
if err != nil {
return err
}
if _, _, err := authorizer.AuthorizeWrite(ctx, influxdb.ReplicationsResourceType, id, r.OrgID); err != nil {
return err
}
return a.underlying.DeleteReplication(ctx, id)
}
func (a authCheckingService) ValidateReplication(ctx context.Context, id platform.ID) error {
r, err := a.underlying.GetReplication(ctx, id)
if err != nil {
return err
}
if _, _, err := authorizer.AuthorizeRead(ctx, influxdb.ReplicationsResourceType, id, r.OrgID); err != nil {
return err
}
return a.underlying.ValidateReplication(ctx, id)
}

View File

@ -10,7 +10,7 @@ declare -r ROOT_DIR=$(dirname ${SCRIPT_DIR})
declare -r STATIC_DIR="$ROOT_DIR/static"
# Pins the swagger that will be downloaded to a specific commit
declare -r OPENAPI_SHA=ecdcd4b1baace6942e567771518ba19caf381652
declare -r OPENAPI_SHA=2e699cf815ccd77803febed8f124cc65e1f97358
# Don't do a shallow clone since the commit we want might be several commits
# back; but do only clone the main branch.

View File

@ -156,6 +156,10 @@ func TestOnboardAuth(t *testing.T) {
{Action: influxdb.WriteAction, Resource: influxdb.Resource{Type: influxdb.NotebooksResourceType}},
{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.AnnotationsResourceType}},
{Action: influxdb.WriteAction, Resource: influxdb.Resource{Type: influxdb.AnnotationsResourceType}},
{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.RemotesResourceType}},
{Action: influxdb.WriteAction, Resource: influxdb.Resource{Type: influxdb.RemotesResourceType}},
{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.ReplicationsResourceType}},
{Action: influxdb.WriteAction, Resource: influxdb.Resource{Type: influxdb.ReplicationsResourceType}},
}
if !cmp.Equal(auth.Permissions, expectedPerm) {
t.Fatalf("unequal permissions: \n %+v", cmp.Diff(auth.Permissions, expectedPerm))

View File

@ -147,6 +147,8 @@ func TestFindPermissionsFromUser(t *testing.T) {
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DBRPResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.NotebooksResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.AnnotationsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.RemotesResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ReplicationsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.UsersResourceType, ID: &u.ID}},
influxdb.Permission{Action: influxdb.WriteAction, Resource: influxdb.Resource{Type: influxdb.UsersResourceType, ID: &u.ID}},
}