chore: remove kv Session service and sessionsv1 bucket (#18823)

* chore: remove sessionsv1 bucket

* chore: add extra configuration to sessions.Service
pull/18845/head
George 2020-07-02 15:15:08 +01:00 committed by GitHub
parent 3b50d08cd9
commit 4a6bc1d74f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 243 additions and 327 deletions

View File

@ -925,7 +925,13 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var sessionSvc platform.SessionService
{
sessionSvc = session.NewService(session.NewStorage(inmem.NewSessionStore()), userSvc, userResourceSvc, authSvc, time.Duration(m.sessionLength)*time.Minute)
sessionSvc = session.NewService(
session.NewStorage(inmem.NewSessionStore()),
userSvc,
userResourceSvc,
authSvc,
session.WithSessionLength(time.Duration(m.sessionLength)*time.Minute),
)
sessionSvc = session.NewSessionMetrics(m.reg, sessionSvc)
sessionSvc = session.NewSessionLogger(m.log.With(zap.String("service", "session")), sessionSvc)
}

View File

@ -43,7 +43,6 @@ func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error {
userpasswordBucket,
scrapersBucket,
secretBucket,
sessionBucket,
telegrafBucket,
telegrafPluginsBucket,
urmBucket,
@ -62,6 +61,8 @@ func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error {
variableBucket,
variableIndexBucket,
variableOrgsIndex,
// deprecated: removed in later migration
[]byte("sessionsv1"),
} {
if err := store.CreateBucket(ctx, bucket); err != nil {
return err

View File

@ -0,0 +1,7 @@
package all
import "github.com/influxdata/influxdb/v2/kv/migration"
// Migration0006_DeleteBucketSessionsv1 removes the sessionsv1 bucket
// from the backing kv store.
var Migration0006_DeleteBucketSessionsv1 = migration.DeleteBuckets("delete sessionsv1 bucket", []byte("sessionsv1"))

View File

@ -17,5 +17,7 @@ var Migrations = [...]migration.Spec{
Migration0004_AddDbrpBuckets,
// add pkger buckets
Migration0005_AddPkgerBuckets,
// delete bucket sessionsv1
Migration0006_DeleteBucketSessionsv1,
// {{ do_not_edit . }}
}

View File

@ -6,41 +6,78 @@ import (
"github.com/influxdata/influxdb/v2/kv"
)
// CreateBucketsMigration is a migration Spec which creates
type bucketMigrationType string
const (
createBucketMigration = bucketMigrationType("create")
deleteBucketMigration = bucketMigrationType("delete")
)
// BucketsMigration is a migration Spec which creates
// the provided list of buckets on a store when Up is called
// and deletes them on Down.
type CreateBucketsMigration struct {
type BucketsMigration struct {
typ bucketMigrationType
name string
buckets [][]byte
}
// CreateBuckets returns a new CreateBucketsMigration Spec.
func CreateBuckets(name string, buckets ...[]byte) Spec {
return CreateBucketsMigration{name, buckets}
// CreateBuckets returns a new BucketsMigration Spec.
func CreateBuckets(name string, bucket []byte, extraBuckets ...[]byte) Spec {
buckets := append([][]byte{bucket}, extraBuckets...)
return BucketsMigration{createBucketMigration, name, buckets}
}
// DeleteBuckets returns a new BucketsMigration Spec.
func DeleteBuckets(name string, bucket []byte, extraBuckets ...[]byte) Spec {
buckets := append([][]byte{bucket}, extraBuckets...)
return BucketsMigration{deleteBucketMigration, name, buckets}
}
// MigrationName returns the name of the migration.
func (c CreateBucketsMigration) MigrationName() string {
return c.name
func (m BucketsMigration) MigrationName() string {
return m.name
}
// Up creates the buckets on the store.
func (c CreateBucketsMigration) Up(ctx context.Context, store kv.SchemaStore) error {
for _, bucket := range c.buckets {
if err := store.CreateBucket(ctx, bucket); err != nil {
func (m BucketsMigration) Up(ctx context.Context, store kv.SchemaStore) error {
var fn func(context.Context, []byte) error
switch m.typ {
case createBucketMigration:
fn = store.CreateBucket
case deleteBucketMigration:
fn = store.DeleteBucket
default:
panic("unrecognized buckets migration type")
}
for _, bucket := range m.buckets {
if err := fn(ctx, bucket); err != nil {
return err
}
}
return nil
}
// Down delets the buckets on the store.
func (c CreateBucketsMigration) Down(ctx context.Context, store kv.SchemaStore) error {
for _, bucket := range c.buckets {
if err := store.DeleteBucket(ctx, bucket); err != nil {
func (m BucketsMigration) Down(ctx context.Context, store kv.SchemaStore) error {
var fn func(context.Context, []byte) error
switch m.typ {
case createBucketMigration:
fn = store.DeleteBucket
case deleteBucketMigration:
fn = store.CreateBucket
default:
panic("unrecognized buckets migration type")
}
for _, bucket := range m.buckets {
if err := fn(ctx, bucket); err != nil {
return err
}
}
return nil

View File

@ -0,0 +1,97 @@
package migration
import (
"context"
"errors"
"testing"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
)
func Test_BucketsMigration_CreateBuckets(t *testing.T) {
var (
ctx = context.Background()
bucket = []byte("some_bucket")
store = inmem.NewKVStore()
)
// bucket should not exist
bucketShouldNotExist(t, store, bucket)
// build new create buckets migration
migration := CreateBuckets("create bucket some_bucket", bucket)
// apply migration up
if err := migration.Up(ctx, store); err != nil {
t.Fatal("unexpected error", err)
}
// bucket should now exist
bucketShouldExist(t, store, bucket)
// apply migration down
if err := migration.Down(ctx, store); err != nil {
t.Fatal("unexpected error", err)
}
// bucket should no longer exist
bucketShouldNotExist(t, store, bucket)
}
func Test_BucketsMigration_DeleteBuckets(t *testing.T) {
var (
ctx = context.Background()
bucket = []byte("some_bucket")
store = inmem.NewKVStore()
)
// initially create bucket
if err := store.CreateBucket(ctx, bucket); err != nil {
t.Fatal("unexpected error", err)
}
// ensure bucket is there to start with
bucketShouldExist(t, store, bucket)
// build new delete buckets migration
migration := DeleteBuckets("delete bucket some_bucket", bucket)
// apply migration up
if err := migration.Up(ctx, store); err != nil {
t.Fatal("unexpected error", err)
}
// bucket should have been removed
bucketShouldNotExist(t, store, bucket)
// apply migration down
if err := migration.Down(ctx, store); err != nil {
t.Fatal("unexpected error", err)
}
// bucket should exist again
bucketShouldExist(t, store, bucket)
}
func bucketShouldExist(t *testing.T, store kv.Store, bucket []byte) {
t.Helper()
if err := store.View(context.Background(), func(tx kv.Tx) error {
_, err := tx.Bucket(bucket)
return err
}); err != nil {
t.Fatal("unexpected error", err)
}
}
func bucketShouldNotExist(t *testing.T, store kv.Store, bucket []byte) {
t.Helper()
if err := store.View(context.Background(), func(tx kv.Tx) error {
_, err := tx.Bucket(bucket)
return err
}); !errors.Is(err, kv.ErrBucketNotFound) {
t.Fatalf("expected bucket not found, got %q", err)
}
}

View File

@ -1,250 +0,0 @@
package kv
import (
"context"
"encoding/json"
"time"
"github.com/influxdata/influxdb/v2"
)
var (
sessionBucket = []byte("sessionsv1")
)
var _ influxdb.SessionService = (*Service)(nil)
// RenewSession extends the expire time to newExpiration.
func (s *Service) RenewSession(ctx context.Context, session *influxdb.Session, newExpiration time.Time) error {
if session == nil {
return &influxdb.Error{
Msg: "session is nil",
}
}
// session already has longer expiration
if newExpiration.Before(session.ExpiresAt) {
return nil
}
return s.kv.Update(ctx, func(tx Tx) error {
sess, err := s.findSession(ctx, tx, session.Key)
if err != nil {
return err
}
// session already has longer expiration
if newExpiration.Before(session.ExpiresAt) {
return nil
}
sess.ExpiresAt = newExpiration
if err := s.putSession(ctx, tx, sess); err != nil {
return &influxdb.Error{
Err: err,
}
}
*session = *sess
return nil
})
}
// FindSession retrieves the session found at the provided key.
func (s *Service) FindSession(ctx context.Context, key string) (*influxdb.Session, error) {
var sess *influxdb.Session
err := s.kv.View(ctx, func(tx Tx) error {
s, err := s.findSession(ctx, tx, key)
if err != nil {
return err
}
sess = s
return nil
})
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
if err := sess.Expired(); err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return sess, nil
}
func (s *Service) findSession(ctx context.Context, tx Tx, key string) (*influxdb.Session, error) {
b, err := tx.Bucket(sessionBucket)
if err != nil {
return nil, err
}
v, err := b.Get([]byte(key))
if IsNotFound(err) {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: influxdb.ErrSessionNotFound,
}
}
if err != nil {
return nil, err
}
sn := &influxdb.Session{}
if err := json.Unmarshal(v, sn); err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
ps, err := s.maxPermissions(ctx, tx, sn.UserID)
if err != nil {
return nil, err
}
sn.Permissions = ps
return sn, nil
}
func (s *Service) maxPermissions(ctx context.Context, tx Tx, userID influxdb.ID) ([]influxdb.Permission, error) {
// TODO(desa): these values should be cached so it's not so expensive to lookup each time.
f := influxdb.UserResourceMappingFilter{UserID: userID}
mappings, err := s.findUserResourceMappings(ctx, tx, f)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
ps := make([]influxdb.Permission, 0, len(mappings))
for _, m := range mappings {
p, err := m.ToPermissions()
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
ps = append(ps, p...)
}
ps = append(ps, influxdb.MePermissions(userID)...)
if !s.disableAuthorizationsForMaxPermissions(ctx) {
// TODO(desa): this is super expensive, we should keep a list of a users maximal privileges somewhere
// we did this so that the oper token would be used in a users permissions.
af := influxdb.AuthorizationFilter{UserID: &userID}
as, err := s.findAuthorizations(ctx, tx, af)
if err != nil {
return nil, err
}
for _, a := range as {
ps = append(ps, a.Permissions...)
}
}
return ps, nil
}
// PutSession puts the session at key.
func (s *Service) PutSession(ctx context.Context, sn *influxdb.Session) error {
return s.kv.Update(ctx, func(tx Tx) error {
if err := s.putSession(ctx, tx, sn); err != nil {
return err
}
return nil
})
}
func (s *Service) putSession(ctx context.Context, tx Tx, sn *influxdb.Session) error {
v, err := json.Marshal(sn)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
b, err := tx.Bucket(sessionBucket)
if err != nil {
return err
}
if err := b.Put([]byte(sn.Key), v); err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
}
// ExpireSession expires the session at the provided key.
func (s *Service) ExpireSession(ctx context.Context, key string) error {
return s.kv.Update(ctx, func(tx Tx) error {
sn, err := s.findSession(ctx, tx, key)
if err != nil {
return err
}
sn.ExpiresAt = time.Now()
if err := s.putSession(ctx, tx, sn); err != nil {
return err
}
return nil
})
}
// CreateSession creates a session for a user with the users maximal privileges.
func (s *Service) CreateSession(ctx context.Context, user string) (*influxdb.Session, error) {
var sess *influxdb.Session
err := s.kv.Update(ctx, func(tx Tx) error {
sn, err := s.createSession(ctx, tx, user)
if err != nil {
return err
}
sess = sn
return nil
})
if err != nil {
return nil, err
}
return sess, nil
}
func (s *Service) createSession(ctx context.Context, tx Tx, user string) (*influxdb.Session, error) {
u, pe := s.findUserByName(ctx, tx, user)
if pe != nil {
return nil, pe
}
sn := &influxdb.Session{}
sn.ID = s.IDGenerator.ID()
k, err := s.TokenGenerator.Token()
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
sn.Key = k
sn.UserID = u.ID
sn.CreatedAt = time.Now()
sn.ExpiresAt = sn.CreatedAt.Add(s.Config.SessionLength)
// TODO(desa): not totally sure what to do here. Possibly we should have a maximal privilege permission.
sn.Permissions = []influxdb.Permission{}
if err := s.putSession(ctx, tx, sn); err != nil {
return nil, err
}
return sn, nil
}

View File

@ -1,53 +0,0 @@
package kv_test
import (
"context"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
func TestBoltSessionService(t *testing.T) {
influxdbtesting.SessionService(initBoltSessionService, t)
}
func initBoltSessionService(f influxdbtesting.SessionFields, t *testing.T) (influxdb.SessionService, string, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initSessionService(s, f, t)
return svc, op, func() {
closeSvc()
closeBolt()
}
}
func initSessionService(s kv.SchemaStore, f influxdbtesting.SessionFields, t *testing.T) (influxdb.SessionService, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
svc.TokenGenerator = f.TokenGenerator
for _, u := range f.Users {
if err := svc.PutUser(ctx, u); err != nil {
t.Fatalf("failed to populate users")
}
}
for _, s := range f.Sessions {
if err := svc.PutSession(ctx, s); err != nil {
t.Fatalf("failed to populate sessions")
}
}
return svc, kv.OpPrefix, func() {
for _, u := range f.Users {
if err := svc.DeleteUser(ctx, u.ID); err != nil {
t.Logf("failed to remove users: %v", err)
}
}
}
}

View File

@ -1798,3 +1798,42 @@ func ExtractTaskOptions(ctx context.Context, lang influxdb.FluxLanguageService,
}
return options.FromScript(lang, matches[0])
}
func (s *Service) maxPermissions(ctx context.Context, tx Tx, userID influxdb.ID) ([]influxdb.Permission, error) {
// TODO(desa): these values should be cached so it's not so expensive to lookup each time.
f := influxdb.UserResourceMappingFilter{UserID: userID}
mappings, err := s.findUserResourceMappings(ctx, tx, f)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
ps := make([]influxdb.Permission, 0, len(mappings))
for _, m := range mappings {
p, err := m.ToPermissions()
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
ps = append(ps, p...)
}
ps = append(ps, influxdb.MePermissions(userID)...)
if !s.disableAuthorizationsForMaxPermissions(ctx) {
// TODO(desa): this is super expensive, we should keep a list of a users maximal privileges somewhere
// we did this so that the oper token would be used in a users permissions.
af := influxdb.AuthorizationFilter{UserID: &userID}
as, err := s.findAuthorizations(ctx, tx, af)
if err != nil {
return nil, err
}
for _, a := range as {
ps = append(ps, a.Permissions...)
}
}
return ps, nil
}

View File

@ -24,23 +24,53 @@ type Service struct {
disableAuthorizationsForMaxPermissions func(context.Context) bool
}
// NewService creates a new session service
func NewService(store *Storage, userService influxdb.UserService, urmService influxdb.UserResourceMappingService, authSvc influxdb.AuthorizationService, sessionLength time.Duration) *Service {
if sessionLength <= 0 {
sessionLength = time.Hour
// ServiceOption is a functional option for configuring a *Service
type ServiceOption func(*Service)
// WithSessionLength configures the length of the session with the provided
// duration when the resulting option is called on a *Service.
func WithSessionLength(length time.Duration) ServiceOption {
return func(s *Service) {
s.sessionLength = length
}
return &Service{
}
// WithIDGenerator overrides the default ID generator with the one
// provided to this function when called on a *Service
func WithIDGenerator(gen influxdb.IDGenerator) ServiceOption {
return func(s *Service) {
s.idGen = gen
}
}
// WithTokenGenerator overrides the default token generator with the one
// provided to this function when called on a *Service
func WithTokenGenerator(gen influxdb.TokenGenerator) ServiceOption {
return func(s *Service) {
s.tokenGen = gen
}
}
// NewService creates a new session service
func NewService(store *Storage, userService influxdb.UserService, urmService influxdb.UserResourceMappingService, authSvc influxdb.AuthorizationService, opts ...ServiceOption) *Service {
service := &Service{
store: store,
userService: userService,
urmService: urmService,
authService: authSvc,
sessionLength: sessionLength,
sessionLength: time.Hour,
idGen: snowflake.NewIDGenerator(),
tokenGen: rand.NewTokenGenerator(64),
disableAuthorizationsForMaxPermissions: func(context.Context) bool {
return false
},
}
for _, opt := range opts {
opt(service)
}
return service
}
// WithMaxPermissionFunc sets the useAuthorizationsForMaxPermissions function

View File

@ -34,14 +34,14 @@ func initSessionService(f influxdbtesting.SessionFields, t *testing.T) (influxdb
FindAuthorizationsFn: func(context.Context, influxdb.AuthorizationFilter, ...influxdb.FindOptions) ([]*influxdb.Authorization, int, error) {
return []*influxdb.Authorization{}, 0, nil
},
}, time.Minute)
}, WithSessionLength(time.Minute))
if f.IDGenerator != nil {
svc.idGen = f.IDGenerator
WithIDGenerator(f.IDGenerator)(svc)
}
if f.TokenGenerator != nil {
svc.tokenGen = f.TokenGenerator
WithTokenGenerator(f.TokenGenerator)(svc)
}
for _, u := range f.Users {