diff --git a/CHANGELOG.md b/CHANGELOG.md index 792aa0e88e..83d9c84520 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ 1. [16341](https://github.com/influxdata/influxdb/pull/16341): Extend pkger apply functionality with ability to provide secrets outside of pkg 1. [16345](https://github.com/influxdata/influxdb/pull/16345): Add hide headers flag to influx cli task find cmd 1. [16336](https://github.com/influxdata/influxdb/pull/16336): Manual Overrides for Readiness Endpoint +1. [16347](https://github.com/influxdata/influxdb/pull/16347): Drop legacy inmem service implementation in favor of kv service with inmem dependency ### Bug Fixes diff --git a/authorizer/task_test.go b/authorizer/task_test.go index 4d20e42fd0..0b589fa74c 100644 --- a/authorizer/task_test.go +++ b/authorizer/task_test.go @@ -10,6 +10,7 @@ import ( pctx "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/http" "github.com/influxdata/influxdb/inmem" + "github.com/influxdata/influxdb/kv" "github.com/influxdata/influxdb/mock" _ "github.com/influxdata/influxdb/query/builtin" "github.com/influxdata/influxdb/task/backend" @@ -18,7 +19,7 @@ import ( ) func TestOnboardingValidation(t *testing.T) { - svc := inmem.NewService() + svc := newKVSVC(t) ts := authorizer.NewTaskService(zaptest.NewLogger(t), mockTaskService(3, 2, 1)) r, err := svc.Generate(context.Background(), &influxdb.OnboardingRequest{ @@ -28,7 +29,6 @@ func TestOnboardingValidation(t *testing.T) { Bucket: "holder", RetentionPeriod: 1, }) - if err != nil { t.Fatal(err) } @@ -120,9 +120,9 @@ func TestValidations(t *testing.T) { otherOrg = &influxdb.Organization{Name: "other_org"} ) - inmem := inmem.NewService() + svc := newKVSVC(t) - r, err := inmem.Generate(context.Background(), &influxdb.OnboardingRequest{ + r, err := svc.Generate(context.Background(), &influxdb.OnboardingRequest{ User: "Setec Astronomy", Password: "too many secrets", Org: "thing", @@ -133,7 +133,7 @@ func TestValidations(t *testing.T) { t.Fatal(err) } - if err := inmem.CreateOrganization(context.Background(), otherOrg); err != nil { + if err := svc.CreateOrganization(context.Background(), otherOrg); err != nil { t.Fatal(err) } @@ -142,7 +142,7 @@ func TestValidations(t *testing.T) { OrgID: otherOrg.ID, } - if err = inmem.CreateBucket(context.Background(), otherBucket); err != nil { + if err = svc.CreateBucket(context.Background(), otherBucket); err != nil { t.Fatal(err) } @@ -571,3 +571,13 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")` }) } } + +func newKVSVC(t *testing.T) *kv.Service { + t.Helper() + + svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore()) + if err := svc.Initialize(context.Background()); err != nil { + t.Fatal(err) + } + return svc +} diff --git a/crud_log.go b/crud_log.go index f77b27bf2b..5099247785 100644 --- a/crud_log.go +++ b/crud_log.go @@ -21,7 +21,7 @@ func (log *CRUDLog) SetCreatedAt(now time.Time) { log.CreatedAt = now } -// SetUpdated set the updated time. +// SetUpdatedAt set the updated time. func (log *CRUDLog) SetUpdatedAt(now time.Time) { log.UpdatedAt = now } diff --git a/http/auth_test.go b/http/auth_test.go index 8beb4b339e..05826b6c40 100644 --- a/http/auth_test.go +++ b/http/auth_test.go @@ -935,7 +935,7 @@ func initAuthorizationService(f platformtesting.AuthorizationFields, t *testing. done := server.Close - return &AuthorizationService{Client: httpClient}, inmem.OpPrefix, done + return &AuthorizationService{Client: httpClient}, "", done } func TestAuthorizationService_CreateAuthorization(t *testing.T) { diff --git a/http/bucket_test.go b/http/bucket_test.go index cfa9354535..8ff97cab36 100644 --- a/http/bucket_test.go +++ b/http/bucket_test.go @@ -1204,12 +1204,11 @@ func initBucketService(f platformtesting.BucketFields, t *testing.T) (platform.B handler := NewBucketHandler(zaptest.NewLogger(t), bucketBackend) server := httptest.NewServer(handler) client := BucketService{ - Client: mustNewHTTPClient(t, server.URL, ""), - OpPrefix: inmem.OpPrefix, + Client: mustNewHTTPClient(t, server.URL, ""), } done := server.Close - return &client, inmem.OpPrefix, done + return &client, "", done } func TestBucketService(t *testing.T) { diff --git a/http/dashboard_test.go b/http/dashboard_test.go index 17e62bda42..80d9c92aaf 100644 --- a/http/dashboard_test.go +++ b/http/dashboard_test.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/httprouter" platform "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/inmem" + "github.com/influxdata/influxdb/kv" "github.com/influxdata/influxdb/mock" platformtesting "github.com/influxdata/influxdb/testing" "github.com/yudai/gojsondiff" @@ -1755,9 +1756,8 @@ func Test_dashboardCellIDPath(t *testing.T) { func initDashboardService(f platformtesting.DashboardFields, t *testing.T) (platform.DashboardService, string, func()) { t.Helper() - svc := inmem.NewService() + svc := newInMemKVSVC(t) svc.IDGenerator = f.IDGenerator - svc.TimeGenerator = f.TimeGenerator ctx := context.Background() for _, d := range f.Dashboards { if err := svc.PutDashboard(ctx, d); err != nil { @@ -1772,14 +1772,13 @@ func initDashboardService(f platformtesting.DashboardFields, t *testing.T) (plat server := httptest.NewServer(h) client := DashboardService{Client: mustNewHTTPClient(t, server.URL, "")} - done := server.Close - return &client, inmem.OpPrefix, done + return &client, "", server.Close } func TestDashboardService(t *testing.T) { t.Parallel() - platformtesting.DashboardService(initDashboardService, t) + platformtesting.DeleteDashboard(initDashboardService, t) } func TestService_handlePostDashboardLabel(t *testing.T) { @@ -1917,3 +1916,13 @@ func jsonEqual(s1, s2 string) (eq bool, diff string, err error) { return cmp.Equal(o1, o2), diff, err } + +func newInMemKVSVC(t *testing.T) *kv.Service { + t.Helper() + + svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore()) + if err := svc.Initialize(context.Background()); err != nil { + t.Fatal(err) + } + return svc +} diff --git a/http/label_test.go b/http/label_test.go index bfcf4ca869..19e4e3e6ae 100644 --- a/http/label_test.go +++ b/http/label_test.go @@ -617,12 +617,11 @@ func initLabelService(f platformtesting.LabelFields, t *testing.T) (platform.Lab handler := NewLabelHandler(zaptest.NewLogger(t), svc, ErrorHandler(0)) server := httptest.NewServer(handler) client := LabelService{ - Client: mustNewHTTPClient(t, server.URL, ""), - OpPrefix: inmem.OpPrefix, + Client: mustNewHTTPClient(t, server.URL, ""), } done := server.Close - return &client, inmem.OpPrefix, done + return &client, "", done } func TestLabelService(t *testing.T) { diff --git a/http/org_test.go b/http/org_test.go index 2b060c9499..9bfdf21ce5 100644 --- a/http/org_test.go +++ b/http/org_test.go @@ -59,12 +59,11 @@ func initOrganizationService(f platformtesting.OrganizationFields, t *testing.T) handler := NewOrgHandler(zaptest.NewLogger(t), orgBackend) server := httptest.NewServer(handler) client := OrganizationService{ - Client: mustNewHTTPClient(t, server.URL, ""), - OpPrefix: inmem.OpPrefix, + Client: mustNewHTTPClient(t, server.URL, ""), } done := server.Close - return &client, inmem.OpPrefix, done + return &client, "", done } func TestOrganizationService(t *testing.T) { diff --git a/http/query_handler_test.go b/http/query_handler_test.go index 406270115d..e289616316 100644 --- a/http/query_handler_test.go +++ b/http/query_handler_test.go @@ -23,7 +23,6 @@ import ( platform "github.com/influxdata/influxdb" icontext "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/http/metric" - "github.com/influxdata/influxdb/inmem" "github.com/influxdata/influxdb/kit/check" tracetesting "github.com/influxdata/influxdb/kit/tracing/testing" influxmock "github.com/influxdata/influxdb/mock" @@ -324,12 +323,12 @@ var _ metric.EventRecorder = noopEventRecorder{} func TestFluxHandler_PostQuery_Errors(t *testing.T) { defer tracetesting.SetupInMemoryTracing(t.Name())() - i := inmem.NewService() + orgSVC := newInMemKVSVC(t) b := &FluxBackend{ HTTPErrorHandler: ErrorHandler(0), log: zaptest.NewLogger(t), QueryEventRecorder: noopEventRecorder{}, - OrganizationService: i, + OrganizationService: orgSVC, ProxyQueryService: &mock.ProxyQueryService{ QueryF: func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (flux.Statistics, error) { return flux.Statistics{}, &influxdb.Error{ @@ -409,7 +408,7 @@ func TestFluxHandler_PostQuery_Errors(t *testing.T) { t.Run("valid request but executing query results in client error", func(t *testing.T) { org := influxdb.Organization{Name: t.Name()} - if err := i.CreateOrganization(context.Background(), &org); err != nil { + if err := orgSVC.CreateOrganization(context.Background(), &org); err != nil { t.Fatal(err) } diff --git a/http/scraper_service_test.go b/http/scraper_service_test.go index fed17d5add..8a49d544f3 100644 --- a/http/scraper_service_test.go +++ b/http/scraper_service_test.go @@ -14,7 +14,6 @@ import ( "github.com/influxdata/influxdb" platcontext "github.com/influxdata/influxdb/context" httpMock "github.com/influxdata/influxdb/http/mock" - "github.com/influxdata/influxdb/inmem" "github.com/influxdata/influxdb/mock" platformtesting "github.com/influxdata/influxdb/testing" "go.uber.org/zap/zaptest" @@ -814,7 +813,7 @@ func TestService_handlePatchScraperTarget(t *testing.T) { func initScraperService(f platformtesting.TargetFields, t *testing.T) (influxdb.ScraperTargetStoreService, string, func()) { t.Helper() - svc := inmem.NewService() + svc := newInMemKVSVC(t) svc.IDGenerator = f.IDGenerator ctx := context.Background() @@ -824,7 +823,7 @@ func initScraperService(f platformtesting.TargetFields, t *testing.T) (influxdb. } } for _, m := range f.UserResourceMappings { - if err := svc.PutUserResourceMapping(ctx, m); err != nil { + if err := svc.CreateUserResourceMapping(ctx, m); err != nil { t.Fatalf("failed to populate user resource mapping") } } @@ -864,14 +863,13 @@ func initScraperService(f platformtesting.TargetFields, t *testing.T) (influxdb. UserResourceMappingService: svc, OrganizationService: svc, ScraperService: ScraperService{ - Token: "tok", - Addr: server.URL, - OpPrefix: inmem.OpPrefix, + Token: "tok", + Addr: server.URL, }, } done := server.Close - return &client, inmem.OpPrefix, done + return &client, "", done } func TestScraperService(t *testing.T) { diff --git a/http/task_service_test.go b/http/task_service_test.go index aab4b0e9d3..179e1f75a9 100644 --- a/http/task_service_test.go +++ b/http/task_service_test.go @@ -16,7 +16,6 @@ import ( "github.com/influxdata/httprouter" platform "github.com/influxdata/influxdb" pcontext "github.com/influxdata/influxdb/context" - "github.com/influxdata/influxdb/inmem" "github.com/influxdata/influxdb/mock" _ "github.com/influxdata/influxdb/query/builtin" "github.com/influxdata/influxdb/task/backend" @@ -27,6 +26,7 @@ import ( // NewMockTaskBackend returns a TaskBackend with mock services. func NewMockTaskBackend(t *testing.T) *TaskBackend { + t.Helper() return &TaskBackend{ log: zaptest.NewLogger(t).With(zap.String("handler", "task")), @@ -55,7 +55,7 @@ func NewMockTaskBackend(t *testing.T) *TaskBackend { return org, nil }, }, - UserResourceMappingService: inmem.NewService(), + UserResourceMappingService: newInMemKVSVC(t), LabelService: mock.NewLabelService(), UserService: mock.NewUserService(), } @@ -820,7 +820,7 @@ func TestTaskHandler_handleGetRuns(t *testing.T) { func TestTaskHandler_NotFoundStatus(t *testing.T) { // Ensure that the HTTP handlers return 404s for missing resources, and OKs for matching. - im := inmem.NewService() + im := newInMemKVSVC(t) taskBackend := NewMockTaskBackend(t) taskBackend.HTTPErrorHandler = ErrorHandler(0) h := NewTaskHandler(zaptest.NewLogger(t), taskBackend) @@ -1210,7 +1210,7 @@ func TestService_handlePostTaskLabel(t *testing.T) { // Test that org name to org ID translation happens properly in the HTTP layer. // Regression test for https://github.com/influxdata/influxdb/issues/12089. func TestTaskHandler_CreateTaskWithOrgName(t *testing.T) { - i := inmem.NewService() + i := newInMemKVSVC(t) ctx := context.Background() // Set up user and org. @@ -1303,7 +1303,7 @@ func TestTaskHandler_CreateTaskWithOrgName(t *testing.T) { func TestTaskHandler_Sessions(t *testing.T) { t.Skip("rework these") // Common setup to get a working base for using tasks. - i := inmem.NewService() + i := newInMemKVSVC(t) ctx := context.Background() diff --git a/http/user_test.go b/http/user_test.go index 10dcddf0f0..69a02603ad 100644 --- a/http/user_test.go +++ b/http/user_test.go @@ -9,7 +9,6 @@ import ( "testing" platform "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/inmem" "github.com/influxdata/influxdb/mock" "github.com/influxdata/influxdb/pkg/testttp" platformtesting "github.com/influxdata/influxdb/testing" @@ -29,7 +28,7 @@ func NewMockUserBackend(t *testing.T) *UserBackend { func initUserService(f platformtesting.UserFields, t *testing.T) (platform.UserService, string, func()) { t.Helper() - svc := inmem.NewService() + svc := newInMemKVSVC(t) svc.IDGenerator = f.IDGenerator ctx := context.Background() @@ -51,11 +50,10 @@ func initUserService(f platformtesting.UserFields, t *testing.T) (platform.UserS } client := UserService{ - Client: httpClient, - OpPrefix: inmem.OpPrefix, + Client: httpClient, } - return &client, inmem.OpPrefix, server.Close + return &client, "", server.Close } func TestUserService(t *testing.T) { diff --git a/http/variable_test.go b/http/variable_test.go index 4cbecd4f7f..b1bfc4f1bd 100644 --- a/http/variable_test.go +++ b/http/variable_test.go @@ -13,8 +13,6 @@ import ( "github.com/influxdata/httprouter" platform "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/inmem" - "github.com/influxdata/influxdb/kv" "github.com/influxdata/influxdb/mock" platformtesting "github.com/influxdata/influxdb/testing" "go.uber.org/zap/zaptest" @@ -890,14 +888,11 @@ func TestService_handlePostVariableLabel(t *testing.T) { } func initVariableService(f platformtesting.VariableFields, t *testing.T) (platform.VariableService, string, func()) { - svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore()) + svc := newInMemKVSVC(t) svc.IDGenerator = f.IDGenerator svc.TimeGenerator = f.TimeGenerator ctx := context.Background() - if err := svc.Initialize(ctx); err != nil { - t.Fatal(err) - } for _, v := range f.Variables { if err := svc.ReplaceVariable(ctx, v); err != nil { @@ -915,7 +910,7 @@ func initVariableService(f platformtesting.VariableFields, t *testing.T) (platfo } done := server.Close - return &client, inmem.OpPrefix, done + return &client, "", done } func TestVariableService(t *testing.T) { diff --git a/inmem/auth_service.go b/inmem/auth_service.go deleted file mode 100644 index 9379a19058..0000000000 --- a/inmem/auth_service.go +++ /dev/null @@ -1,250 +0,0 @@ -package inmem - -import ( - "context" - - platform "github.com/influxdata/influxdb" -) - -func (s *Service) loadAuthorization(ctx context.Context, id platform.ID) (*platform.Authorization, *platform.Error) { - i, ok := s.authorizationKV.Load(id.String()) - if !ok { - return nil, &platform.Error{ - Code: platform.ENotFound, - Msg: "authorization not found", - } - } - - a, ok := i.(platform.Authorization) - if !ok { - return nil, &platform.Error{ - Code: platform.EInternal, - Msg: "value found in map is not an authorization", - } - } - - if a.Status == "" { - a.Status = platform.Active - } - - return &a, nil -} - -// PutAuthorization overwrites the authorization with the contents of a. -func (s *Service) PutAuthorization(ctx context.Context, a *platform.Authorization) error { - if a.Status == "" { - a.Status = platform.Active - } - s.authorizationKV.Store(a.ID.String(), *a) - return nil -} - -// FindAuthorizationByID returns an authorization given an ID. -func (s *Service) FindAuthorizationByID(ctx context.Context, id platform.ID) (*platform.Authorization, error) { - var err error - a, pe := s.loadAuthorization(ctx, id) - if pe != nil { - pe.Op = OpPrefix + platform.OpFindAuthorizationByID - err = pe - } - return a, err -} - -// FindAuthorizationByToken returns an authorization given a token. -func (s *Service) FindAuthorizationByToken(ctx context.Context, t string) (*platform.Authorization, error) { - var err error - op := OpPrefix + platform.OpFindAuthorizationByToken - var n int - var as []*platform.Authorization - as, n, err = s.FindAuthorizations(ctx, platform.AuthorizationFilter{Token: &t}) - if err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - if n < 1 { - return nil, &platform.Error{ - Code: platform.ENotFound, - Msg: "authorization not found", - Op: op, - } - } - return as[0], nil -} - -func filterAuthorizationsFn(filter platform.AuthorizationFilter) func(a *platform.Authorization) bool { - if filter.ID != nil { - return func(a *platform.Authorization) bool { - return a.ID == *filter.ID - } - } - - if filter.Token != nil { - return func(a *platform.Authorization) bool { - return a.Token == *filter.Token - } - } - - // Filter by org and user - if filter.OrgID != nil && filter.UserID != nil { - return func(a *platform.Authorization) bool { - return a.OrgID == *filter.OrgID && a.UserID == *filter.UserID - } - } - - if filter.OrgID != nil { - return func(a *platform.Authorization) bool { - return a.OrgID == *filter.OrgID - } - } - - if filter.UserID != nil { - return func(a *platform.Authorization) bool { - return a.UserID == *filter.UserID - } - } - - return func(a *platform.Authorization) bool { return true } -} - -// FindAuthorizations returns all authorizations matching the filter. -func (s *Service) FindAuthorizations(ctx context.Context, filter platform.AuthorizationFilter, opt ...platform.FindOptions) ([]*platform.Authorization, int, error) { - op := OpPrefix + platform.OpFindAuthorizations - if filter.ID != nil { - a, err := s.FindAuthorizationByID(ctx, *filter.ID) - if err != nil { - return nil, 0, &platform.Error{ - Err: err, - Op: op, - } - } - - return []*platform.Authorization{a}, 1, nil - } - - var as []*platform.Authorization - if filter.User != nil { - u, err := s.findUserByName(ctx, *filter.User) - if err != nil { - return nil, 0, &platform.Error{ - Op: op, - Err: err, - } - } - filter.UserID = &u.ID - } - - if filter.Org != nil { - o, err := s.findOrganizationByName(ctx, *filter.Org) - if err != nil { - return nil, 0, &platform.Error{ - Op: op, - Err: err, - } - } - filter.OrgID = &o.ID - } - - var err error - filterF := filterAuthorizationsFn(filter) - s.authorizationKV.Range(func(k, v interface{}) bool { - a, ok := v.(platform.Authorization) - if !ok { - err = &platform.Error{ - Code: platform.EInternal, - Msg: "value found in map is not an authorization", - Op: op, - } - return false - } - - if filterF(&a) { - as = append(as, &a) - } - - return true - }) - - if err != nil { - return nil, 0, err - } - - return as, len(as), nil -} - -// CreateAuthorization sets a.Token and a.ID and creates an platform.Authorization -func (s *Service) CreateAuthorization(ctx context.Context, a *platform.Authorization) error { - op := OpPrefix + platform.OpCreateAuthorization - - _, pErr := s.FindUserByID(ctx, a.UserID) - if pErr != nil { - return platform.ErrUnableToCreateToken - } - - _, pErr = s.FindOrganizationByID(ctx, a.OrgID) - if pErr != nil { - return platform.ErrUnableToCreateToken - } - - if a.Token == "" { - var err error - a.Token, err = s.TokenGenerator.Token() - if err != nil { - return &platform.Error{ - Err: err, - Op: op, - } - } - } - - a.ID = s.IDGenerator.ID() - a.Status = platform.Active - - return s.PutAuthorization(ctx, a) -} - -// DeleteAuthorization deletes an authorization associated with id. -func (s *Service) DeleteAuthorization(ctx context.Context, id platform.ID) error { - if _, err := s.FindAuthorizationByID(ctx, id); err != nil { - return &platform.Error{ - Err: err, - Op: OpPrefix + platform.OpDeleteAuthorization, - } - } - - s.authorizationKV.Delete(id.String()) - return nil -} - -// UpdateAuthorization updates the status and description if available. -func (s *Service) UpdateAuthorization(ctx context.Context, id platform.ID, upd *platform.AuthorizationUpdate) (*platform.Authorization, error) { - op := OpPrefix + platform.OpUpdateAuthorization - a, err := s.FindAuthorizationByID(ctx, id) - if err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - - if upd.Status != nil { - status := *upd.Status - switch status { - case platform.Active, platform.Inactive: - default: - return nil, &platform.Error{ - Code: platform.EInvalid, - Msg: "unknown authorization status", - Op: op, - } - } - a.Status = status - } - - if upd.Description != nil { - a.Description = *upd.Description - } - - return a, s.PutAuthorization(ctx, a) -} diff --git a/inmem/auth_test.go b/inmem/auth_test.go deleted file mode 100644 index f50b47ceec..0000000000 --- a/inmem/auth_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package inmem - -import ( - "context" - "testing" - - platform "github.com/influxdata/influxdb" - platformtesting "github.com/influxdata/influxdb/testing" -) - -func initAuthorizationService(f platformtesting.AuthorizationFields, t *testing.T) (platform.AuthorizationService, string, func()) { - s := NewService() - s.IDGenerator = f.IDGenerator - s.TokenGenerator = f.TokenGenerator - s.TimeGenerator = f.TimeGenerator - ctx := context.Background() - - for _, u := range f.Users { - if err := s.PutUser(ctx, u); err != nil { - t.Fatalf("failed to populate users") - } - } - - for _, o := range f.Orgs { - if err := s.PutOrganization(ctx, o); err != nil { - t.Fatalf("failed to populate organizations") - } - } - - for _, u := range f.Authorizations { - if err := s.PutAuthorization(ctx, u); err != nil { - t.Fatalf("failed to populate authorizations") - } - } - - return s, OpPrefix, func() {} -} - -func TestAuthorizationService(t *testing.T) { - t.Skip("This service is not used, we use the kv inmem implementation") - platformtesting.AuthorizationService(initAuthorizationService, t) -} diff --git a/inmem/bucket_service.go b/inmem/bucket_service.go deleted file mode 100644 index 7c265df94e..0000000000 --- a/inmem/bucket_service.go +++ /dev/null @@ -1,328 +0,0 @@ -package inmem - -import ( - "context" - "errors" - "fmt" - "sort" - - platform "github.com/influxdata/influxdb" -) - -var ( - errBucketNotFound = "bucket not found" -) - -func (s *Service) loadBucket(ctx context.Context, id platform.ID) (*platform.Bucket, *platform.Error) { - i, ok := s.bucketKV.Load(id.String()) - if !ok { - return nil, &platform.Error{ - Code: platform.ENotFound, - Msg: errBucketNotFound, - } - } - - b, ok := i.(platform.Bucket) - if !ok { - return nil, &platform.Error{ - Code: platform.EInternal, - Msg: fmt.Sprintf("type %T is not a bucket", i), - } - } - - return &b, nil -} - -// FindBucketByID returns a single bucket by ID. -func (s *Service) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) { - var b *platform.Bucket - var pe *platform.Error - var err error - - if b, pe = s.loadBucket(ctx, id); pe != nil { - pe.Op = OpPrefix + platform.OpFindBucketByID - err = pe - } - return b, err -} - -// FindBucketByName returns a single bucket by name -// NOTE: Currently not implemented -func (s *Service) FindBucketByName(ctx context.Context, orgID platform.ID, n string) (*platform.Bucket, error) { - return nil, errors.New("not implemented") -} - -func (s *Service) forEachBucket(ctx context.Context, descending bool, fn func(b *platform.Bucket) bool) error { - var err error - bs := make([]*platform.Bucket, 0) - s.bucketKV.Range(func(k, v interface{}) bool { - b, ok := v.(platform.Bucket) - if !ok { - err = fmt.Errorf("type %T is not a bucket", v) - return false - } - - bs = append(bs, &b) - return true - }) - - // sort by id - sort.Slice(bs, func(i, j int) bool { - if descending { - return bs[i].ID.String() > bs[j].ID.String() - } - return bs[i].ID.String() < bs[j].ID.String() - }) - - for _, b := range bs { - if !fn(b) { - return nil - } - } - - return err -} - -func (s *Service) filterBuckets(ctx context.Context, fn func(b *platform.Bucket) bool, opts ...platform.FindOptions) ([]*platform.Bucket, error) { - var offset, limit, count int - var descending bool - if len(opts) > 0 { - offset = opts[0].Offset - limit = opts[0].Limit - descending = opts[0].Descending - } - - buckets := []*platform.Bucket{} - err := s.forEachBucket(ctx, descending, func(b *platform.Bucket) bool { - if fn(b) { - if count >= offset { - buckets = append(buckets, b) - } - count++ - } - - if limit > 0 && len(buckets) >= limit { - return false - } - - return true - }) - - if err != nil { - return nil, err - } - - return buckets, nil -} - -// FindBucket returns the first bucket that matches filter. -func (s *Service) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) { - op := OpPrefix + platform.OpFindBucket - var err error - var b *platform.Bucket - - if filter.ID == nil && filter.Name == nil && filter.OrganizationID == nil { - return nil, &platform.Error{ - Code: platform.EInvalid, - Op: op, - Msg: "no filter parameters provided", - } - } - - // filter by bucket id - if filter.ID != nil { - b, err = s.FindBucketByID(ctx, *filter.ID) - if err != nil { - return nil, &platform.Error{ - Op: op, - Err: err, - } - } - return b, nil - } - - bs, n, err := s.FindBuckets(ctx, filter) - if err != nil { - return nil, err - } - - if n < 1 && filter.Name != nil { - return nil, &platform.Error{ - Code: platform.ENotFound, - Op: op, - Msg: fmt.Sprintf("bucket %q not found", *filter.Name), - } - } else if n < 1 { - return nil, &platform.Error{ - Code: platform.ENotFound, - Op: op, - Msg: "bucket not found", - } - } - - return bs[0], nil -} - -func (s *Service) findBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, *platform.Error) { - // filter by bucket id - if filter.ID != nil { - b, err := s.FindBucketByID(ctx, *filter.ID) - if err != nil { - return nil, &platform.Error{ - Err: err, - } - } - - return []*platform.Bucket{b}, nil - } - - if filter.Org != nil { - o, err := s.findOrganizationByName(ctx, *filter.Org) - if err != nil { - return nil, &platform.Error{ - Err: err, - } - } - filter.OrganizationID = &o.ID - } - - filterFunc := func(b *platform.Bucket) bool { return true } - - if filter.Name != nil && filter.OrganizationID != nil { - filterFunc = func(b *platform.Bucket) bool { - return b.Name == *filter.Name && b.OrgID == *filter.OrganizationID - } - } else if filter.Name != nil { - // filter by bucket name - filterFunc = func(b *platform.Bucket) bool { - return b.Name == *filter.Name - } - } else if filter.OrganizationID != nil { - // filter by organization id - filterFunc = func(b *platform.Bucket) bool { - return b.OrgID == *filter.OrganizationID - } - } - - bs, err := s.filterBuckets(ctx, filterFunc, opt...) - if err != nil { - return nil, &platform.Error{ - Err: err, - } - } - return bs, nil -} - -// FindBuckets returns a list of buckets that match filter and the total count of matching buckets. -// Additional options provide pagination & sorting. -func (s *Service) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) { - var err error - bs, pe := s.findBuckets(ctx, filter, opt...) - if pe != nil { - pe.Op = OpPrefix + platform.OpFindBuckets - err = pe - return nil, 0, err - } - return bs, len(bs), nil -} - -// CreateBucket creates a new bucket and sets b.ID with the new identifier. -func (s *Service) CreateBucket(ctx context.Context, b *platform.Bucket) error { - if b.OrgID.Valid() { - _, pe := s.FindOrganizationByID(ctx, b.OrgID) - if pe != nil { - return &platform.Error{ - Err: pe, - Op: OpPrefix + platform.OpCreateBucket, - } - } - } - filter := platform.BucketFilter{ - Name: &b.Name, - OrganizationID: &b.OrgID, - } - if _, err := s.FindBucket(ctx, filter); err == nil { - return &platform.Error{ - Code: platform.EConflict, - Op: OpPrefix + platform.OpCreateBucket, - Msg: fmt.Sprintf("bucket with name %s already exists", b.Name), - } - } - b.ID = s.IDGenerator.ID() - b.CreatedAt = s.Now() - b.UpdatedAt = s.Now() - return s.PutBucket(ctx, b) -} - -// PutBucket sets bucket with the current ID. -func (s *Service) PutBucket(ctx context.Context, b *platform.Bucket) error { - s.bucketKV.Store(b.ID.String(), *b) - return nil -} - -// UpdateBucket updates a single bucket with changeset. -// Returns the new bucket state after update. -func (s *Service) UpdateBucket(ctx context.Context, id platform.ID, upd platform.BucketUpdate) (*platform.Bucket, error) { - b, err := s.FindBucketByID(ctx, id) - if err != nil { - return nil, &platform.Error{ - Op: OpPrefix + platform.OpUpdateBucket, - Err: err, - } - } - - if upd.Name != nil { - b.Name = *upd.Name - } - - if upd.RetentionPeriod != nil { - b.RetentionPeriod = *upd.RetentionPeriod - } - - if upd.Description != nil { - b.Description = *upd.Description - } - - b0, err := s.FindBucket(ctx, platform.BucketFilter{ - Name: upd.Name, - }) - if err == nil && b0.ID != id { - return nil, &platform.Error{ - Code: platform.EConflict, - Msg: "bucket name is not unique", - } - } - - b.UpdatedAt = s.Now() - s.bucketKV.Store(b.ID.String(), b) - - return b, nil -} - -// DeleteBucket removes a bucket by ID. -func (s *Service) DeleteBucket(ctx context.Context, id platform.ID) error { - if _, err := s.FindBucketByID(ctx, id); err != nil { - return &platform.Error{ - Op: OpPrefix + platform.OpDeleteBucket, - Err: err, - } - } - s.bucketKV.Delete(id.String()) - - // return s.deleteLabel(ctx, platform.LabelFilter{ResourceID: id}) - return nil -} - -// DeleteOrganizationBuckets removes all the buckets for a given org -func (s *Service) DeleteOrganizationBuckets(ctx context.Context, id platform.ID) error { - bucks, err := s.findBuckets(ctx, platform.BucketFilter{ - OrganizationID: &id, - }) - if err != nil { - return err - } - for _, buck := range bucks { - s.bucketKV.Delete(buck.ID.String()) - } - return nil -} diff --git a/inmem/bucket_test.go b/inmem/bucket_test.go deleted file mode 100644 index 00fdf02b8e..0000000000 --- a/inmem/bucket_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package inmem - -import ( - "context" - "testing" - - platform "github.com/influxdata/influxdb" - platformtesting "github.com/influxdata/influxdb/testing" -) - -func initBucketService(f platformtesting.BucketFields, t *testing.T) (platform.BucketService, string, func()) { - s := NewService() - s.IDGenerator = f.IDGenerator - s.TimeGenerator = f.TimeGenerator - if f.TimeGenerator == nil { - s.TimeGenerator = platform.RealTimeGenerator{} - } - ctx := context.Background() - for _, o := range f.Organizations { - if err := s.PutOrganization(ctx, o); err != nil { - t.Fatalf("failed to populate organizations") - } - } - for _, b := range f.Buckets { - if err := s.PutBucket(ctx, b); err != nil { - t.Fatalf("failed to populate buckets") - } - } - return s, OpPrefix, func() {} -} - -func TestBucketService(t *testing.T) { - t.Skip("bucket service no longer used. Remove all of this inmem stuff") - platformtesting.BucketService(initBucketService, t) -} diff --git a/inmem/dashboard.go b/inmem/dashboard.go deleted file mode 100644 index 3b21c93b68..0000000000 --- a/inmem/dashboard.go +++ /dev/null @@ -1,495 +0,0 @@ -package inmem - -import ( - "context" - "fmt" - - platform "github.com/influxdata/influxdb" -) - -func (s *Service) loadDashboard(ctx context.Context, id platform.ID) (*platform.Dashboard, *platform.Error) { - i, ok := s.dashboardKV.Load(id.String()) - if !ok { - return nil, &platform.Error{ - Code: platform.ENotFound, - Msg: platform.ErrDashboardNotFound, - } - } - - d, ok := i.(*platform.Dashboard) - if !ok { - return nil, &platform.Error{ - Code: platform.EInvalid, - Msg: fmt.Sprintf("type %T is not a dashboard", i), - } - } - return d, nil -} - -// FindDashboardByID returns a single dashboard by ID. -func (s *Service) FindDashboardByID(ctx context.Context, id platform.ID) (*platform.Dashboard, error) { - d, pe := s.loadDashboard(ctx, id) - if pe != nil { - return nil, &platform.Error{ - Op: OpPrefix + platform.OpFindDashboardByID, - Err: pe, - } - } - return d, nil -} - -func filterDashboardFn(filter platform.DashboardFilter) func(d *platform.Dashboard) bool { - if filter.OrganizationID != nil { - return func(d *platform.Dashboard) bool { - return d.OrganizationID == *filter.OrganizationID - } - } - - if len(filter.IDs) > 0 { - m := map[platform.ID]struct{}{} - for _, id := range filter.IDs { - if id != nil { - m[*id] = struct{}{} - } - } - return func(d *platform.Dashboard) bool { - _, ok := m[d.ID] - return ok - } - } - - return func(d *platform.Dashboard) bool { return true } -} - -func (s *Service) forEachDashboard(ctx context.Context, opts platform.FindOptions, fn func(d *platform.Dashboard) bool) error { - var err error - ds := make([]*platform.Dashboard, 0) - s.dashboardKV.Range(func(k, v interface{}) bool { - d, ok := v.(*platform.Dashboard) - if !ok { - err = fmt.Errorf("type %T is not a dashboard", v) - return false - } - ds = append(ds, d) - - return true - }) - - platform.SortDashboards(opts, ds) - - for _, d := range ds { - if !fn(d) { - return nil - } - } - - return err -} - -// FindDashboards implements platform.DashboardService interface. -func (s *Service) FindDashboards(ctx context.Context, filter platform.DashboardFilter, opts platform.FindOptions) ([]*platform.Dashboard, int, error) { - var ds []*platform.Dashboard - op := OpPrefix + platform.OpFindDashboards - if len(filter.IDs) == 1 { - d, err := s.FindDashboardByID(ctx, *filter.IDs[0]) - if err != nil && platform.ErrorCode(err) != platform.ENotFound { - return ds, 0, &platform.Error{ - Err: err, - Op: op, - } - } - if d == nil { - return ds, 0, nil - } - return []*platform.Dashboard{d}, 1, nil - } - - var count int - filterFn := filterDashboardFn(filter) - err := s.forEachDashboard(ctx, opts, func(d *platform.Dashboard) bool { - if filterFn(d) { - if count >= opts.Offset { - ds = append(ds, d) - } - count++ - } - if opts.Limit > 0 && len(ds) >= opts.Limit { - return false - } - return true - }) - - if err != nil { - return nil, 0, err - } - - return ds, len(ds), err -} - -// CreateDashboard implements platform.DashboardService interface. -func (s *Service) CreateDashboard(ctx context.Context, d *platform.Dashboard) error { - d.ID = s.IDGenerator.ID() - d.Meta.CreatedAt = s.Now() - d.Meta.UpdatedAt = s.Now() - err := s.PutDashboardWithMeta(ctx, d) - if err != nil { - return &platform.Error{ - Err: err, - Op: platform.OpCreateDashboard, - } - } - return nil -} - -// PutDashboard implements platform.DashboardService interface. -func (s *Service) PutDashboard(ctx context.Context, d *platform.Dashboard) error { - for _, cell := range d.Cells { - if err := s.PutCellView(ctx, cell); err != nil { - return err - } - } - s.dashboardKV.Store(d.ID.String(), d) - return nil -} - -// PutCellView puts the view for a cell. -func (s *Service) PutCellView(ctx context.Context, cell *platform.Cell) error { - v := &platform.View{} - v.ID = cell.ID - return s.PutView(ctx, v) -} - -// PutDashboardWithMeta sets a dashboard while updating the meta field of a dashboard. -func (s *Service) PutDashboardWithMeta(ctx context.Context, d *platform.Dashboard) error { - d.Meta.UpdatedAt = s.Now() - return s.PutDashboard(ctx, d) -} - -// UpdateDashboard implements platform.DashboardService interface. -func (s *Service) UpdateDashboard(ctx context.Context, id platform.ID, upd platform.DashboardUpdate) (*platform.Dashboard, error) { - op := OpPrefix + platform.OpUpdateDashboard - if err := upd.Valid(); err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - - d, err := s.FindDashboardByID(ctx, id) - if err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - - if err := upd.Apply(d); err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - - if err := s.PutDashboardWithMeta(ctx, d); err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - - return d, nil -} - -// DeleteDashboard implements platform.DashboardService interface. -func (s *Service) DeleteDashboard(ctx context.Context, id platform.ID) error { - op := OpPrefix + platform.OpDeleteDashboard - if _, err := s.FindDashboardByID(ctx, id); err != nil { - return &platform.Error{ - Err: err, - Op: op, - } - } - s.dashboardKV.Delete(id.String()) - // err := s.deleteLabel(ctx, platform.LabelFilter{ResourceID: id}) - // if err != nil { - // return &platform.Error{ - // Err: err, - // Op: op, - // } - // } - return nil -} - -// AddDashboardCell adds a new cell to the dashboard. -func (s *Service) AddDashboardCell(ctx context.Context, id platform.ID, cell *platform.Cell, opts platform.AddDashboardCellOptions) error { - op := OpPrefix + platform.OpAddDashboardCell - d, err := s.FindDashboardByID(ctx, id) - if err != nil { - return &platform.Error{ - Err: err, - Op: op, - } - } - cell.ID = s.IDGenerator.ID() - if err := s.createCellView(ctx, cell); err != nil { - return &platform.Error{ - Err: err, - Op: op, - } - } - - d.Cells = append(d.Cells, cell) - if err = s.PutDashboardWithMeta(ctx, d); err != nil { - return &platform.Error{ - Err: err, - Op: op, - } - } - return nil -} - -func (s *Service) createCellView(ctx context.Context, cell *platform.Cell) *platform.Error { - // If not view exists create the view - view := &platform.View{} - view.ID = cell.ID - if err := s.PutView(ctx, view); err != nil { - return &platform.Error{ - Err: err, - } - } - - return nil -} - -// PutDashboardCell replaces a dashboard cell with the cell contents. -func (s *Service) PutDashboardCell(ctx context.Context, id platform.ID, cell *platform.Cell) error { - d, err := s.FindDashboardByID(ctx, id) - if err != nil { - return err - } - view := &platform.View{} - view.ID = cell.ID - if err := s.PutView(ctx, view); err != nil { - return err - } - - d.Cells = append(d.Cells, cell) - return s.PutDashboard(ctx, d) -} - -// RemoveDashboardCell removes a dashboard cell from the dashboard. -func (s *Service) RemoveDashboardCell(ctx context.Context, dashboardID platform.ID, cellID platform.ID) error { - op := OpPrefix + platform.OpRemoveDashboardCell - d, err := s.FindDashboardByID(ctx, dashboardID) - if err != nil { - return &platform.Error{ - Err: err, - Op: op, - } - } - - idx := -1 - for i, cell := range d.Cells { - if cell.ID == cellID { - idx = i - break - } - } - if idx == -1 { - return &platform.Error{ - Code: platform.ENotFound, - Op: op, - Msg: platform.ErrCellNotFound, - } - } - - if err := s.DeleteView(ctx, d.Cells[idx].ID); err != nil { - return &platform.Error{ - Err: err, - Op: op, - } - } - - d.Cells = append(d.Cells[:idx], d.Cells[idx+1:]...) - return s.PutDashboardWithMeta(ctx, d) - -} - -// UpdateDashboardCell will remove a cell from a dashboard. -func (s *Service) UpdateDashboardCell(ctx context.Context, dashboardID platform.ID, cellID platform.ID, upd platform.CellUpdate) (*platform.Cell, error) { - op := OpPrefix + platform.OpUpdateDashboardCell - if err := upd.Valid(); err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - d, err := s.FindDashboardByID(ctx, dashboardID) - if err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - - idx := -1 - for i, cell := range d.Cells { - if cell.ID == cellID { - idx = i - break - } - } - if idx == -1 { - return nil, &platform.Error{ - Msg: platform.ErrCellNotFound, - Op: op, - Code: platform.ENotFound, - } - } - - if err := upd.Apply(d.Cells[idx]); err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - - cell := d.Cells[idx] - - if err := s.PutDashboardWithMeta(ctx, d); err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - - return cell, nil -} - -// ReplaceDashboardCells replaces many dashboard cells. -func (s *Service) ReplaceDashboardCells(ctx context.Context, id platform.ID, cs []*platform.Cell) error { - op := OpPrefix + platform.OpReplaceDashboardCells - d, err := s.FindDashboardByID(ctx, id) - if err != nil { - return &platform.Error{ - Err: err, - Op: op, - } - } - - ids := map[string]*platform.Cell{} - for _, cell := range d.Cells { - ids[cell.ID.String()] = cell - } - - for _, cell := range cs { - if !cell.ID.Valid() { - return &platform.Error{ - Code: platform.EInvalid, - Op: op, - Msg: "cannot provide empty cell id", - } - } - - if _, ok := ids[cell.ID.String()]; !ok { - return &platform.Error{ - Code: platform.EConflict, - Op: op, - Msg: "cannot replace cells that were not already present", - } - } - } - - d.Cells = cs - - return s.PutDashboardWithMeta(ctx, d) -} - -// GetDashboardCellView retrieves the view for a dashboard cell. -func (s *Service) GetDashboardCellView(ctx context.Context, dashboardID, cellID platform.ID) (*platform.View, error) { - v, err := s.FindViewByID(ctx, cellID) - if err != nil { - return nil, &platform.Error{ - Err: err, - Op: OpPrefix + platform.OpGetDashboardCellView, - } - } - - return v, nil -} - -// UpdateDashboardCellView updates the view for a dashboard cell. -func (s *Service) UpdateDashboardCellView(ctx context.Context, dashboardID, cellID platform.ID, upd platform.ViewUpdate) (*platform.View, error) { - op := OpPrefix + platform.OpUpdateDashboardCellView - - v, err := s.FindViewByID(ctx, cellID) - if err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - - if err := upd.Apply(v); err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - - if err := s.PutView(ctx, v); err != nil { - return nil, &platform.Error{ - Err: err, - Op: op, - } - } - - return v, nil -} - -func (s *Service) loadView(ctx context.Context, id platform.ID) (*platform.View, *platform.Error) { - i, ok := s.viewKV.Load(id.String()) - if !ok { - return nil, &platform.Error{ - Code: platform.ENotFound, - Msg: "view not found", - } - } - - d, ok := i.(*platform.View) - if !ok { - return nil, &platform.Error{ - Code: platform.EInvalid, - Msg: fmt.Sprintf("type %T is not a view", i), - } - } - return d, nil -} - -// FindViewByID returns a single view by ID. -func (s *Service) FindViewByID(ctx context.Context, id platform.ID) (*platform.View, error) { - v, pe := s.loadView(ctx, id) - if pe != nil { - return nil, pe - } - return v, nil -} - -// PutView sets view with the current ID. -func (s *Service) PutView(ctx context.Context, c *platform.View) error { - if c.Properties == nil { - c.Properties = platform.EmptyViewProperties{} - } - s.viewKV.Store(c.ID.String(), c) - return nil -} - -// DeleteView removes a view by ID. -func (s *Service) DeleteView(ctx context.Context, id platform.ID) error { - if _, err := s.FindViewByID(ctx, id); err != nil { - return err - } - - s.viewKV.Delete(id.String()) - return nil -} diff --git a/inmem/dashboard_test.go b/inmem/dashboard_test.go deleted file mode 100644 index f72f9a61a2..0000000000 --- a/inmem/dashboard_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package inmem - -import ( - "context" - "testing" - - platform "github.com/influxdata/influxdb" - platformtesting "github.com/influxdata/influxdb/testing" -) - -func initDashboardService(f platformtesting.DashboardFields, t *testing.T) (platform.DashboardService, string, func()) { - s := NewService() - s.IDGenerator = f.IDGenerator - s.TimeGenerator = f.TimeGenerator - ctx := context.Background() - for _, b := range f.Dashboards { - if err := s.PutDashboard(ctx, b); err != nil { - t.Fatalf("failed to populate Dashboards") - } - } - return s, OpPrefix, func() {} -} - -func TestDashboardService(t *testing.T) { - platformtesting.DashboardService(initDashboardService, t) -} diff --git a/inmem/label_service.go b/inmem/label_service.go deleted file mode 100644 index f8eb83bb4d..0000000000 --- a/inmem/label_service.go +++ /dev/null @@ -1,222 +0,0 @@ -package inmem - -import ( - "context" - "fmt" - "path" - - "github.com/influxdata/influxdb" -) - -func (s *Service) loadLabel(ctx context.Context, id influxdb.ID) (*influxdb.Label, error) { - i, ok := s.labelKV.Load(id.String()) - if !ok { - return nil, &influxdb.Error{ - Code: influxdb.ENotFound, - Msg: influxdb.ErrLabelNotFound, - } - } - - l, ok := i.(influxdb.Label) - if !ok { - return nil, fmt.Errorf("type %T is not a label", i) - } - - return &l, nil -} - -func (s *Service) forEachLabel(ctx context.Context, fn func(m *influxdb.Label) bool) error { - var err error - s.labelKV.Range(func(k, v interface{}) bool { - l, ok := v.(influxdb.Label) - if !ok { - err = fmt.Errorf("type %T is not a label", v) - return false - } - return fn(&l) - }) - - return err -} - -func (s *Service) forEachLabelMapping(ctx context.Context, fn func(m *influxdb.LabelMapping) bool) error { - var err error - s.labelMappingKV.Range(func(k, v interface{}) bool { - m, ok := v.(influxdb.LabelMapping) - if !ok { - err = fmt.Errorf("type %T is not a label mapping", v) - return false - } - return fn(&m) - }) - - return err -} - -func (s *Service) filterLabels(ctx context.Context, fn func(m *influxdb.Label) bool) ([]*influxdb.Label, error) { - labels := []*influxdb.Label{} - err := s.forEachLabel(ctx, func(l *influxdb.Label) bool { - if fn(l) { - labels = append(labels, l) - } - return true - }) - - if err != nil { - return nil, err - } - - return labels, nil -} - -func (s *Service) filterLabelMappings(ctx context.Context, fn func(m *influxdb.LabelMapping) bool) ([]*influxdb.LabelMapping, error) { - mappings := []*influxdb.LabelMapping{} - err := s.forEachLabelMapping(ctx, func(m *influxdb.LabelMapping) bool { - if fn(m) { - mappings = append(mappings, m) - } - return true - }) - - if err != nil { - return nil, err - } - - return mappings, nil -} - -func encodeLabelMappingKey(m *influxdb.LabelMapping) string { - return path.Join(m.ResourceID.String(), m.LabelID.String()) -} - -// FindLabelByID returns a single user by ID. -func (s *Service) FindLabelByID(ctx context.Context, id influxdb.ID) (*influxdb.Label, error) { - return s.loadLabel(ctx, id) -} - -// FindLabels will retrieve a list of labels from storage. -func (s *Service) FindLabels(ctx context.Context, filter influxdb.LabelFilter, opt ...influxdb.FindOptions) ([]*influxdb.Label, error) { - filterFunc := func(label *influxdb.Label) bool { - return (filter.Name == "" || (filter.Name == label.Name)) - } - - labels, err := s.filterLabels(ctx, filterFunc) - if err != nil { - return nil, err - } - - return labels, nil -} - -// FindResourceLabels returns a list of labels that are mapped to a resource. -func (s *Service) FindResourceLabels(ctx context.Context, filter influxdb.LabelMappingFilter) ([]*influxdb.Label, error) { - filterFunc := func(mapping *influxdb.LabelMapping) bool { - return (filter.ResourceID.String() == mapping.ResourceID.String()) - } - - mappings, err := s.filterLabelMappings(ctx, filterFunc) - if err != nil { - return nil, err - } - - ls := []*influxdb.Label{} - for _, m := range mappings { - l, err := s.FindLabelByID(ctx, m.LabelID) - if err != nil { - return nil, err - } - - ls = append(ls, l) - } - - return ls, nil -} - -// CreateLabel creates a new label. -func (s *Service) CreateLabel(ctx context.Context, l *influxdb.Label) error { - l.ID = s.IDGenerator.ID() - s.labelKV.Store(l.ID, *l) - return nil -} - -// CreateLabelMapping creates a mapping that associates a label to a resource. -func (s *Service) CreateLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error { - _, err := s.FindLabelByID(ctx, m.LabelID) - if err != nil { - return &influxdb.Error{ - Err: err, - Op: influxdb.OpCreateLabel, - } - } - - s.labelMappingKV.Store(encodeLabelMappingKey(m), *m) - return nil -} - -// UpdateLabel updates a label. -func (s *Service) UpdateLabel(ctx context.Context, id influxdb.ID, upd influxdb.LabelUpdate) (*influxdb.Label, error) { - label, err := s.FindLabelByID(ctx, id) - if err != nil { - return nil, &influxdb.Error{ - Code: influxdb.ENotFound, - Op: OpPrefix + influxdb.OpUpdateLabel, - Msg: influxdb.ErrLabelNotFound, - } - } - - if len(upd.Properties) > 0 && label.Properties == nil { - label.Properties = make(map[string]string) - } - - for k, v := range upd.Properties { - if v == "" { - delete(label.Properties, k) - } else { - label.Properties[k] = v - } - } - - if upd.Name != "" { - label.Name = upd.Name - } - - if err := label.Validate(); err != nil { - return nil, &influxdb.Error{ - Code: influxdb.EInvalid, - Op: OpPrefix + influxdb.OpUpdateLabel, - Err: err, - } - } - - s.labelKV.Store(label.ID.String(), *label) - - return label, nil -} - -// PutLabel writes a label directly to the database without generating IDs -// or making checks. -func (s *Service) PutLabel(ctx context.Context, l *influxdb.Label) error { - s.labelKV.Store(l.ID.String(), *l) - return nil -} - -// DeleteLabel deletes a label. -func (s *Service) DeleteLabel(ctx context.Context, id influxdb.ID) error { - label, err := s.FindLabelByID(ctx, id) - if label == nil && err != nil { - return &influxdb.Error{ - Code: influxdb.ENotFound, - Op: OpPrefix + influxdb.OpDeleteLabel, - Msg: influxdb.ErrLabelNotFound, - } - } - - s.labelKV.Delete(id.String()) - return nil -} - -// DeleteLabelMapping deletes a label mapping. -func (s *Service) DeleteLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error { - s.labelMappingKV.Delete(encodeLabelMappingKey(m)) - return nil -} diff --git a/inmem/lookup_service.go b/inmem/lookup_service.go deleted file mode 100644 index 1b0b8e1e68..0000000000 --- a/inmem/lookup_service.go +++ /dev/null @@ -1,58 +0,0 @@ -package inmem - -import ( - "context" - - platform "github.com/influxdata/influxdb" -) - -var _ platform.LookupService = (*Service)(nil) - -// Name returns the name for the resource and ID. -func (s *Service) Name(ctx context.Context, resource platform.ResourceType, id platform.ID) (string, error) { - if err := resource.Valid(); err != nil { - return "", err - } - - if ok := id.Valid(); !ok { - return "", platform.ErrInvalidID - } - - switch resource { - case platform.TasksResourceType: // 5 // TODO(goller): unify task bolt storage here so we can lookup names - case platform.AuthorizationsResourceType: // 0 TODO(goller): authorizations should also have optional names - case platform.SourcesResourceType: // 4 TODO(goller): no inmen version of sources service: https://github.com/influxdata/platform/issues/2145 - case platform.BucketsResourceType: // 1 - r, err := s.FindBucketByID(ctx, id) - if err != nil { - return "", err - } - return r.Name, nil - case platform.DashboardsResourceType: // 2 - r, err := s.FindDashboardByID(ctx, id) - if err != nil { - return "", err - } - return r.Name, nil - case platform.OrgsResourceType: // 3 - r, err := s.FindOrganizationByID(ctx, id) - if err != nil { - return "", err - } - return r.Name, nil - case platform.TelegrafsResourceType: // 6 - r, err := s.FindTelegrafConfigByID(ctx, id) - if err != nil { - return "", err - } - return r.Name, nil - case platform.UsersResourceType: // 7 - r, err := s.FindUserByID(ctx, id) - if err != nil { - return "", err - } - return r.Name, nil - } - - return "", nil -} diff --git a/inmem/lookup_service_test.go b/inmem/lookup_service_test.go deleted file mode 100644 index a4e342d66d..0000000000 --- a/inmem/lookup_service_test.go +++ /dev/null @@ -1,223 +0,0 @@ -package inmem - -import ( - "context" - "testing" - - platform "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/mock" - platformtesting "github.com/influxdata/influxdb/testing" -) - -var ( - testID = platform.ID(1) - testIDStr = testID.String() -) - -func TestService_Name(t *testing.T) { - type initFn func(ctx context.Context, s *Service) error - type args struct { - resource platform.Resource - init initFn - } - tests := []struct { - name string - args args - want string - wantErr bool - }{ - { - name: "error if id is invalid", - args: args{ - resource: platform.Resource{ - Type: platform.DashboardsResourceType, - ID: platformtesting.IDPtr(platform.InvalidID()), - }, - }, - wantErr: true, - }, - { - name: "error if resource is invalid", - args: args{ - resource: platform.Resource{ - Type: platform.ResourceType("invalid"), - }, - }, - wantErr: true, - }, - { - name: "authorization resource without a name returns empty string", - args: args{ - resource: platform.Resource{ - Type: platform.AuthorizationsResourceType, - ID: platformtesting.IDPtr(testID), - }, - }, - want: "", - }, - { - name: "task resource without a name returns an empty string", - args: args{ - resource: platform.Resource{ - Type: platform.TasksResourceType, - ID: platformtesting.IDPtr(testID), - }, - }, - want: "", - }, - { - name: "bucket with existing id returns name", - args: args{ - resource: platform.Resource{ - Type: platform.BucketsResourceType, - ID: platformtesting.IDPtr(testID), - }, - init: func(ctx context.Context, s *Service) error { - _ = s.CreateOrganization(ctx, &platform.Organization{ - Name: "o1", - }) - return s.CreateBucket(ctx, &platform.Bucket{ - Name: "b1", - OrgID: testID, - }) - }, - }, - want: "b1", - }, - { - name: "bucket with non-existent id returns error", - args: args{ - resource: platform.Resource{ - Type: platform.BucketsResourceType, - ID: platformtesting.IDPtr(testID), - }, - }, - wantErr: true, - }, - { - name: "dashboard with existing id returns name", - args: args{ - resource: platform.Resource{ - Type: platform.DashboardsResourceType, - ID: platformtesting.IDPtr(testID), - }, - init: func(ctx context.Context, s *Service) error { - return s.CreateDashboard(ctx, &platform.Dashboard{ - Name: "dashboard1", - }) - }, - }, - want: "dashboard1", - }, - { - name: "dashboard with non-existent id returns error", - args: args{ - resource: platform.Resource{ - Type: platform.DashboardsResourceType, - ID: platformtesting.IDPtr(testID), - }, - }, - wantErr: true, - }, - { - name: "org with existing id returns name", - args: args{ - resource: platform.Resource{ - Type: platform.OrgsResourceType, - ID: platformtesting.IDPtr(testID), - }, - init: func(ctx context.Context, s *Service) error { - return s.CreateOrganization(ctx, &platform.Organization{ - Name: "org1", - }) - }, - }, - want: "org1", - }, - { - name: "org with non-existent id returns error", - args: args{ - resource: platform.Resource{ - Type: platform.OrgsResourceType, - ID: platformtesting.IDPtr(testID), - }, - }, - wantErr: true, - }, - { - name: "telegraf with existing id returns name", - args: args{ - resource: platform.Resource{ - Type: platform.TelegrafsResourceType, - ID: platformtesting.IDPtr(testID), - }, - init: func(ctx context.Context, s *Service) error { - return s.CreateTelegrafConfig(ctx, &platform.TelegrafConfig{ - OrgID: platformtesting.MustIDBase16("0000000000000009"), - Name: "telegraf1", - }, testID) - }, - }, - want: "telegraf1", - }, - { - name: "telegraf with non-existent id returns error", - args: args{ - resource: platform.Resource{ - Type: platform.TelegrafsResourceType, - ID: platformtesting.IDPtr(testID), - }, - }, - wantErr: true, - }, - { - name: "user with existing id returns name", - args: args{ - resource: platform.Resource{ - Type: platform.UsersResourceType, - ID: platformtesting.IDPtr(testID), - }, - init: func(ctx context.Context, s *Service) error { - return s.CreateUser(ctx, &platform.User{ - Name: "user1", - }) - }, - }, - want: "user1", - }, - { - name: "user with non-existent id returns error", - args: args{ - resource: platform.Resource{ - Type: platform.UsersResourceType, - ID: platformtesting.IDPtr(testID), - }, - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := NewService() - s.IDGenerator = mock.NewIDGenerator(testIDStr, t) - ctx := context.Background() - if tt.args.init != nil { - if err := tt.args.init(ctx, s); err != nil { - t.Errorf("Service.Name() unable to initialize service: %v", err) - } - } - id := platform.InvalidID() - if tt.args.resource.ID != nil { - id = *tt.args.resource.ID - } - got, err := s.Name(ctx, tt.args.resource.Type, id) - if (err != nil) != tt.wantErr { - t.Errorf("Service.Name() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("Service.Name() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/inmem/onboarding.go b/inmem/onboarding.go deleted file mode 100644 index 7ce7a8b77f..0000000000 --- a/inmem/onboarding.go +++ /dev/null @@ -1,116 +0,0 @@ -package inmem - -import ( - "context" - "fmt" - "time" - - platform "github.com/influxdata/influxdb" -) - -const onboardingKey = "onboarding_key" - -var _ platform.OnboardingService = (*Service)(nil) - -// IsOnboarding checks onboardingBucket -// to see if the onboarding key is true. -func (s *Service) IsOnboarding(ctx context.Context) (isOnboarding bool, err error) { - result, ok := s.onboardingKV.Load(onboardingKey) - isOnboarding = !ok || !result.(bool) - return isOnboarding, nil -} - -// PutOnboardingStatus will put the isOnboarding to storage -func (s *Service) PutOnboardingStatus(ctx context.Context, v bool) error { - s.onboardingKV.Store(onboardingKey, v) - return nil -} - -// Generate OnboardingResults from onboarding request, -// update storage so this request will be disabled for the second run. -func (s *Service) Generate(ctx context.Context, req *platform.OnboardingRequest) (*platform.OnboardingResults, error) { - isOnboarding, err := s.IsOnboarding(ctx) - if err != nil { - return nil, err - } - if !isOnboarding { - return nil, &platform.Error{ - Code: platform.EConflict, - Msg: "onboarding has already been completed", - } - } - - if req.Password == "" { - return nil, &platform.Error{ - Code: platform.EEmptyValue, - Msg: "password is empty", - } - } - - if req.User == "" { - return nil, &platform.Error{ - Code: platform.EEmptyValue, - Msg: "username is empty", - } - } - - if req.Org == "" { - return nil, &platform.Error{ - Code: platform.EEmptyValue, - Msg: "org name is empty", - } - } - - if req.Bucket == "" { - return nil, &platform.Error{ - Code: platform.EEmptyValue, - Msg: "bucket name is empty", - } - } - - u := &platform.User{Name: req.User} - if err := s.CreateUser(ctx, u); err != nil { - return nil, err - } - - if err = s.SetPassword(ctx, u.ID, req.Password); err != nil { - return nil, err - } - - o := &platform.Organization{ - Name: req.Org, - } - if err = s.CreateOrganization(ctx, o); err != nil { - return nil, err - } - bucket := &platform.Bucket{ - Name: req.Bucket, - OrgID: o.ID, - RetentionPeriod: time.Duration(req.RetentionPeriod) * time.Hour, - } - if err = s.CreateBucket(ctx, bucket); err != nil { - return nil, err - } - - auth := &platform.Authorization{ - UserID: u.ID, - Description: fmt.Sprintf("%s's Token", u.Name), - OrgID: o.ID, - Permissions: platform.OperPermissions(), - Token: req.Token, - } - if err = s.CreateAuthorization(ctx, auth); err != nil { - return nil, err - } - - if err = s.PutOnboardingStatus(ctx, true); err != nil { - return nil, err - } - - return &platform.OnboardingResults{ - User: u, - Org: o, - Bucket: bucket, - Auth: auth, - }, nil -} diff --git a/inmem/onboarding_test.go b/inmem/onboarding_test.go deleted file mode 100644 index e4f5a94c26..0000000000 --- a/inmem/onboarding_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package inmem - -import ( - "context" - "testing" - - platform "github.com/influxdata/influxdb" - platformtesting "github.com/influxdata/influxdb/testing" -) - -func initOnboardingService(f platformtesting.OnboardingFields, t *testing.T) (platform.OnboardingService, func()) { - s := NewService() - s.IDGenerator = f.IDGenerator - s.TokenGenerator = f.TokenGenerator - s.TimeGenerator = f.TimeGenerator - if f.TimeGenerator == nil { - s.TimeGenerator = platform.RealTimeGenerator{} - } - ctx := context.TODO() - if err := s.PutOnboardingStatus(ctx, !f.IsOnboarding); err != nil { - t.Fatalf("failed to set new onboarding finished: %v", err) - } - return s, func() {} -} - -func TestGenerate(t *testing.T) { - t.Skip("these are no longer being used, using kv inmem instead") - platformtesting.Generate(initOnboardingService, t) -} diff --git a/inmem/organization_service.go b/inmem/organization_service.go deleted file mode 100644 index 23d6f0c812..0000000000 --- a/inmem/organization_service.go +++ /dev/null @@ -1,255 +0,0 @@ -package inmem - -import ( - "context" - "fmt" - "strings" - - platform "github.com/influxdata/influxdb" -) - -const ( - errOrganizationNotFound = "organization not found" -) - -func (s *Service) loadOrganization(id platform.ID) (*platform.Organization, *platform.Error) { - i, ok := s.organizationKV.Load(id.String()) - if !ok { - return nil, &platform.Error{ - Code: platform.ENotFound, - Msg: errOrganizationNotFound, - } - } - - b, ok := i.(*platform.Organization) - if !ok { - return nil, &platform.Error{ - Code: platform.EInternal, - Msg: fmt.Sprintf("type %T is not a organization", i), - } - } - return b, nil -} - -func (s *Service) forEachOrganization(ctx context.Context, fn func(b *platform.Organization) bool) error { - var err error - s.organizationKV.Range(func(k, v interface{}) bool { - o, ok := v.(*platform.Organization) - if !ok { - err = fmt.Errorf("type %T is not a organization", v) - return false - } - - return fn(o) - }) - - return err -} - -func (s *Service) filterOrganizations(ctx context.Context, fn func(b *platform.Organization) bool) ([]*platform.Organization, *platform.Error) { - orgs := []*platform.Organization{} - err := s.forEachOrganization(ctx, func(o *platform.Organization) bool { - if fn(o) { - orgs = append(orgs, o) - } - return true - }) - - if err != nil { - return nil, &platform.Error{ - Err: err, - } - } - - return orgs, nil -} - -// FindOrganizationByID returns a single organization by ID. -func (s *Service) FindOrganizationByID(ctx context.Context, id platform.ID) (*platform.Organization, error) { - o, pe := s.loadOrganization(id) - if pe != nil { - return nil, &platform.Error{ - Op: OpPrefix + platform.OpFindOrganizationByID, - Err: pe, - } - } - return o, nil -} - -// FindOrganization returns the first organization that matches a filter. -func (s *Service) FindOrganization(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) { - op := OpPrefix + platform.OpFindOrganization - if filter.ID == nil && filter.Name == nil { - return nil, &platform.Error{ - Op: op, - Err: platform.ErrInvalidOrgFilter, - } - } - - if filter.ID != nil { - o, err := s.FindOrganizationByID(ctx, *filter.ID) - if err != nil { - return nil, &platform.Error{ - Op: op, - Err: err, - } - } - return o, nil - } - - orgs, n, err := s.FindOrganizations(ctx, filter) - if err != nil { - return nil, &platform.Error{ - Op: op, - Err: err, - } - } - if n < 1 { - msg := errOrganizationNotFound - if filter.Name != nil { - msg = fmt.Sprintf("organization name \"%s\" not found", *filter.Name) - } - - return nil, &platform.Error{ - Code: platform.ENotFound, - Op: op, - Msg: msg, - } - } - - return orgs[0], nil -} - -// FindOrganizations returns a list of organizations that match filter and the total count of matching organizations. -func (s *Service) FindOrganizations(ctx context.Context, filter platform.OrganizationFilter, opt ...platform.FindOptions) ([]*platform.Organization, int, error) { - op := OpPrefix + platform.OpFindOrganizations - if filter.ID != nil { - o, err := s.FindOrganizationByID(ctx, *filter.ID) - if err != nil { - return nil, 0, &platform.Error{ - Op: op, - Err: err, - } - } - - return []*platform.Organization{o}, 1, nil - } - - filterFunc := func(o *platform.Organization) bool { return true } - if filter.Name != nil { - filterFunc = func(o *platform.Organization) bool { - return o.Name == *filter.Name - } - } - - orgs, pe := s.filterOrganizations(ctx, filterFunc) - if pe != nil { - return nil, 0, &platform.Error{ - Err: pe, - Op: op, - } - } - - if len(orgs) == 0 { - msg := errOrganizationNotFound - if filter.Name != nil { - msg = fmt.Sprintf("organization name \"%s\" not found", *filter.Name) - } - - return orgs, 0, &platform.Error{ - Code: platform.ENotFound, - Op: op, - Msg: msg, - } - } - - return orgs, len(orgs), nil -} - -func (s *Service) findOrganizationByName(ctx context.Context, n string) (*platform.Organization, *platform.Error) { - o, err := s.FindOrganization(ctx, platform.OrganizationFilter{Name: &n}) - if err != nil { - return nil, &platform.Error{ - Err: err, - } - } - return o, nil -} - -// CreateOrganization creates a new organization and sets b.ID with the new identifier. -func (s *Service) CreateOrganization(ctx context.Context, o *platform.Organization) error { - op := OpPrefix + platform.OpCreateOrganization - if o.Name = strings.TrimSpace(o.Name); o.Name == "" { - return platform.ErrOrgNameisEmpty - } - if _, err := s.FindOrganization(ctx, platform.OrganizationFilter{Name: &o.Name}); err == nil { - return &platform.Error{ - Code: platform.EConflict, - Op: op, - Msg: fmt.Sprintf("organization with name %s already exists", o.Name), - } - } - o.ID = s.IDGenerator.ID() - o.CreatedAt = s.Now() - o.UpdatedAt = s.Now() - err := s.PutOrganization(ctx, o) - if err != nil { - return &platform.Error{ - Op: op, - Err: err, - } - } - return nil -} - -// PutOrganization will put a organization without setting an ID. -func (s *Service) PutOrganization(ctx context.Context, o *platform.Organization) error { - s.organizationKV.Store(o.ID.String(), o) - return nil -} - -// UpdateOrganization updates a organization according the parameters set on upd. -func (s *Service) UpdateOrganization(ctx context.Context, id platform.ID, upd platform.OrganizationUpdate) (*platform.Organization, error) { - o, err := s.FindOrganizationByID(ctx, id) - if err != nil { - return nil, &platform.Error{ - Err: err, - Op: OpPrefix + platform.OpUpdateOrganization, - } - } - - if upd.Name != nil { - if *upd.Name = strings.TrimSpace(*upd.Name); *upd.Name == "" { - return nil, platform.ErrOrgNameisEmpty - } - if _, err := s.FindOrganization(ctx, platform.OrganizationFilter{Name: upd.Name}); err == nil { - return nil, &platform.Error{ - Code: platform.EConflict, - Msg: fmt.Sprintf("organization with name %s already exists", *upd.Name), - } - } - o.Name = *upd.Name - } - - if upd.Description != nil { - o.Description = *upd.Description - } - - o.UpdatedAt = s.Now() - - s.organizationKV.Store(o.ID.String(), o) - - return o, nil -} - -// DeleteOrganization deletes a organization and prunes it from the index. -func (s *Service) DeleteOrganization(ctx context.Context, id platform.ID) error { - if _, err := s.FindOrganizationByID(ctx, id); err != nil { - return &platform.Error{ - Err: err, - Op: OpPrefix + platform.OpDeleteOrganization, - } - } - s.organizationKV.Delete(id.String()) - return nil -} diff --git a/inmem/organization_test.go b/inmem/organization_test.go deleted file mode 100644 index 17a3593bd6..0000000000 --- a/inmem/organization_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package inmem - -import ( - "context" - "testing" - - platform "github.com/influxdata/influxdb" - platformtesting "github.com/influxdata/influxdb/testing" -) - -func initOrganizationService(f platformtesting.OrganizationFields, t *testing.T) (platform.OrganizationService, string, func()) { - s := NewService() - s.IDGenerator = f.IDGenerator - s.TimeGenerator = f.TimeGenerator - if f.TimeGenerator == nil { - s.TimeGenerator = platform.RealTimeGenerator{} - } - ctx := context.TODO() - for _, o := range f.Organizations { - if err := s.PutOrganization(ctx, o); err != nil { - t.Fatalf("failed to populate organizations") - } - } - return s, OpPrefix, func() {} -} - -func TestOrganizationService(t *testing.T) { - t.Skip("organization service no longer used. Remove all of this inmem stuff") - platformtesting.OrganizationService(initOrganizationService, t) -} diff --git a/inmem/passwords.go b/inmem/passwords.go deleted file mode 100644 index 5c532a3233..0000000000 --- a/inmem/passwords.go +++ /dev/null @@ -1,84 +0,0 @@ -package inmem - -import ( - "context" - - platform "github.com/influxdata/influxdb" - "golang.org/x/crypto/bcrypt" -) - -// MinPasswordLength is the shortest password we allow into the system. -const MinPasswordLength = 8 - -var ( - // EIncorrectPassword is returned when any password operation fails in which - // we do not want to leak information. - EIncorrectPassword = &platform.Error{ - Code: platform.EForbidden, - Msg: "your username or password is incorrect", - } - - // EIncorrectUser is returned when any user is failed to be found which indicates - // the userID provided is for a user that does not exist. - EIncorrectUser = &platform.Error{ - Code: platform.EForbidden, - Msg: "your userID is incorrect", - } - - // EShortPassword is used when a password is less than the minimum - // acceptable password length. - EShortPassword = &platform.Error{ - Code: platform.EInvalid, - Msg: "passwords must be at least 8 characters long", - } -) - -var _ platform.PasswordsService = (*Service)(nil) - -// HashCost is currently using bcrypt defaultCost -const HashCost = bcrypt.DefaultCost - -// SetPassword stores the password hash associated with a user. -func (s *Service) SetPassword(ctx context.Context, userID platform.ID, password string) error { - if len(password) < MinPasswordLength { - return EShortPassword - } - - u, err := s.FindUserByID(ctx, userID) - if err != nil { - return EIncorrectUser - } - hash, err := bcrypt.GenerateFromPassword([]byte(password), HashCost) - if err != nil { - return err - } - - s.basicAuthKV.Store(u.ID.String(), hash) - - return nil -} - -// ComparePassword compares a provided password with the stored password hash. -func (s *Service) ComparePassword(ctx context.Context, userID platform.ID, password string) error { - u, err := s.FindUserByID(ctx, userID) - if err != nil { - return EIncorrectUser - } - hash, ok := s.basicAuthKV.Load(u.ID.String()) - if !ok { - hash = []byte{} - } - - if err := bcrypt.CompareHashAndPassword(hash.([]byte), []byte(password)); err != nil { - return EIncorrectPassword - } - return nil -} - -// CompareAndSetPassword replaces the old password with the new password if thee old password is correct. -func (s *Service) CompareAndSetPassword(ctx context.Context, userID platform.ID, old string, new string) error { - if err := s.ComparePassword(ctx, userID, old); err != nil { - return err - } - return s.SetPassword(ctx, userID, new) -} diff --git a/inmem/passwords_test.go b/inmem/passwords_test.go deleted file mode 100644 index 94657e4f67..0000000000 --- a/inmem/passwords_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package inmem - -import ( - "context" - "testing" - - platform "github.com/influxdata/influxdb" - platformtesting "github.com/influxdata/influxdb/testing" -) - -func initPasswordsService(f platformtesting.PasswordFields, t *testing.T) (platform.PasswordsService, func()) { - s := NewService() - s.IDGenerator = f.IDGenerator - ctx := context.Background() - for _, u := range f.Users { - if err := s.PutUser(ctx, u); err != nil { - t.Fatalf("failed to populate users") - } - } - - for i := range f.Passwords { - if err := s.SetPassword(ctx, f.Users[i].ID, f.Passwords[i]); err != nil { - t.Fatalf("error setting passsword user, %s %s: %v", f.Users[i].Name, f.Passwords[i], err) - } - } - - return s, func() {} -} - -func TestPasswords(t *testing.T) { - t.Parallel() - platformtesting.PasswordsService(initPasswordsService, t) -} - -func TestPasswords_CompareAndSet(t *testing.T) { - t.Parallel() - platformtesting.CompareAndSetPassword(initPasswordsService, t) -} diff --git a/inmem/scraper.go b/inmem/scraper.go deleted file mode 100644 index e766aa5ed5..0000000000 --- a/inmem/scraper.go +++ /dev/null @@ -1,189 +0,0 @@ -package inmem - -import ( - "context" - "fmt" - - "github.com/influxdata/influxdb" -) - -const ( - errScraperTargetNotFound = "scraper target is not found" -) - -var _ influxdb.ScraperTargetStoreService = (*Service)(nil) - -func (s *Service) loadScraperTarget(id influxdb.ID) (*influxdb.ScraperTarget, *influxdb.Error) { - i, ok := s.scraperTargetKV.Load(id.String()) - if !ok { - return nil, &influxdb.Error{ - Code: influxdb.ENotFound, - Msg: errScraperTargetNotFound, - } - } - - b, ok := i.(influxdb.ScraperTarget) - if !ok { - return nil, &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: fmt.Sprintf("type %T is not a scraper target", i), - } - } - return &b, nil -} - -// ListTargets will list all scrape targets. -func (s *Service) ListTargets(ctx context.Context, filter influxdb.ScraperTargetFilter) (list []influxdb.ScraperTarget, err error) { - list = make([]influxdb.ScraperTarget, 0) - s.scraperTargetKV.Range(func(_, v interface{}) bool { - target, ok := v.(influxdb.ScraperTarget) - if !ok { - err = &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: fmt.Sprintf("type %T is not a scraper target", v), - } - return false - } - if filter.IDs != nil { - if _, ok := filter.IDs[target.ID]; !ok { - return true - } - } - if filter.Name != nil && target.Name != *filter.Name { - return true - } - if filter.Org != nil { - o, orgErr := s.findOrganizationByName(ctx, *filter.Org) - if orgErr != nil { - err = orgErr - return false - } - if target.OrgID != o.ID { - return true - } - } - if filter.OrgID != nil { - o, orgErr := s.FindOrganizationByID(ctx, *filter.OrgID) - if orgErr != nil { - err = orgErr - return true - } - if target.OrgID != o.ID { - return true - } - } - list = append(list, target) - return true - }) - return list, err -} - -// AddTarget add a new scraper target into storage. -func (s *Service) AddTarget(ctx context.Context, target *influxdb.ScraperTarget, userID influxdb.ID) (err error) { - target.ID = s.IDGenerator.ID() - if !target.OrgID.Valid() { - return &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: "provided organization ID has invalid format", - Op: OpPrefix + influxdb.OpAddTarget, - } - } - if !target.BucketID.Valid() { - return &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: "provided bucket ID has invalid format", - Op: OpPrefix + influxdb.OpAddTarget, - } - } - if err := s.PutTarget(ctx, target); err != nil { - return &influxdb.Error{ - Op: OpPrefix + influxdb.OpAddTarget, - Err: err, - } - } - urm := &influxdb.UserResourceMapping{ - ResourceID: target.ID, - UserID: userID, - UserType: influxdb.Owner, - ResourceType: influxdb.ScraperResourceType, - } - if err := s.CreateUserResourceMapping(ctx, urm); err != nil { - return err - } - return nil -} - -// RemoveTarget removes a scraper target from the bucket. -func (s *Service) RemoveTarget(ctx context.Context, id influxdb.ID) error { - if _, pe := s.loadScraperTarget(id); pe != nil { - return &influxdb.Error{ - Err: pe, - Op: OpPrefix + influxdb.OpRemoveTarget, - } - } - s.scraperTargetKV.Delete(id.String()) - err := s.deleteUserResourceMapping(ctx, influxdb.UserResourceMappingFilter{ - ResourceID: id, - ResourceType: influxdb.ScraperResourceType, - }) - if err != nil { - return &influxdb.Error{ - Code: influxdb.ErrorCode(err), - Op: OpPrefix + influxdb.OpRemoveTarget, - Err: err, - } - } - - return nil -} - -// UpdateTarget updates a scraper target. -func (s *Service) UpdateTarget(ctx context.Context, update *influxdb.ScraperTarget, userID influxdb.ID) (target *influxdb.ScraperTarget, err error) { - op := OpPrefix + influxdb.OpUpdateTarget - if !update.ID.Valid() { - return nil, &influxdb.Error{ - Code: influxdb.EInvalid, - Op: op, - Msg: "provided scraper target ID has invalid format", - } - } - oldTarget, pe := s.loadScraperTarget(update.ID) - if pe != nil { - return nil, &influxdb.Error{ - Op: op, - Err: pe, - } - } - if !update.OrgID.Valid() { - update.OrgID = oldTarget.OrgID - } - if !update.BucketID.Valid() { - update.BucketID = oldTarget.BucketID - } - if err = s.PutTarget(ctx, update); err != nil { - return nil, &influxdb.Error{ - Op: op, - Err: pe, - } - } - - return update, nil -} - -// GetTargetByID retrieves a scraper target by id. -func (s *Service) GetTargetByID(ctx context.Context, id influxdb.ID) (target *influxdb.ScraperTarget, err error) { - var pe *influxdb.Error - if target, pe = s.loadScraperTarget(id); pe != nil { - return nil, &influxdb.Error{ - Op: OpPrefix + influxdb.OpGetTargetByID, - Err: pe, - } - } - return target, nil -} - -// PutTarget will put a scraper target without setting an ID. -func (s *Service) PutTarget(ctx context.Context, target *influxdb.ScraperTarget) error { - s.scraperTargetKV.Store(target.ID.String(), *target) - return nil -} diff --git a/inmem/scraper_test.go b/inmem/scraper_test.go deleted file mode 100644 index 37f11f6f00..0000000000 --- a/inmem/scraper_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package inmem - -import ( - "context" - "testing" - - "github.com/influxdata/influxdb" - platformtesting "github.com/influxdata/influxdb/testing" -) - -func initScraperTargetStoreService(f platformtesting.TargetFields, t *testing.T) (influxdb.ScraperTargetStoreService, string, func()) { - s := NewService() - s.IDGenerator = f.IDGenerator - ctx := context.Background() - for _, target := range f.Targets { - if err := s.PutTarget(ctx, target); err != nil { - t.Fatalf("failed to populate scraper targets") - } - } - for _, m := range f.UserResourceMappings { - if err := s.PutUserResourceMapping(ctx, m); err != nil { - t.Fatalf("failed to populate user resource mapping") - } - } - for _, o := range f.Organizations { - if err := s.PutOrganization(ctx, o); err != nil { - t.Fatalf("failed to populate orgs") - } - } - return s, OpPrefix, func() {} -} - -func TestScraperTargetStoreService(t *testing.T) { - platformtesting.ScraperService(initScraperTargetStoreService, t) -} diff --git a/inmem/session.go b/inmem/session.go deleted file mode 100644 index 241693122b..0000000000 --- a/inmem/session.go +++ /dev/null @@ -1,99 +0,0 @@ -package inmem - -import ( - "context" - "time" - - platform "github.com/influxdata/influxdb" -) - -// RenewSession extends the expire time to newExpiration. -func (s *Service) RenewSession(ctx context.Context, session *platform.Session, newExpiration time.Time) error { - if session == nil { - return &platform.Error{ - Msg: "session is nil", - } - } - session.ExpiresAt = newExpiration - return s.PutSession(ctx, session) -} - -// FindSession retrieves the session found at the provided key. -func (s *Service) FindSession(ctx context.Context, key string) (*platform.Session, error) { - result, found := s.sessionKV.Load(key) - if !found { - return nil, &platform.Error{ - Code: platform.ENotFound, - Msg: platform.ErrSessionNotFound, - } - } - - sess := new(platform.Session) - *sess = result.(platform.Session) - - // TODO(desa): these values should be cached so it's not so expensive to lookup each time. - f := platform.UserResourceMappingFilter{UserID: sess.UserID} - mappings, _, err := s.FindUserResourceMappings(ctx, f) - if err != nil { - return nil, &platform.Error{ - Err: err, - } - } - - ps := make([]platform.Permission, 0, len(mappings)) - for _, m := range mappings { - p, err := m.ToPermissions() - if err != nil { - return nil, &platform.Error{ - Err: err, - } - } - - ps = append(ps, p...) - } - ps = append(ps, platform.MePermissions(sess.UserID)...) - sess.Permissions = ps - return sess, nil -} - -func (s *Service) PutSession(ctx context.Context, sess *platform.Session) error { - s.sessionKV.Store(sess.Key, *sess) - return nil -} - -// ExpireSession expires the session at the provided key. -func (s *Service) ExpireSession(ctx context.Context, key string) error { - return nil -} - -// CreateSession creates a session for a user with the users maximal privileges. -func (s *Service) CreateSession(ctx context.Context, user string) (*platform.Session, error) { - u, pe := s.findUserByName(ctx, user) - if pe != nil { - return nil, &platform.Error{ - Err: pe, - } - } - - sess := &platform.Session{} - sess.ID = s.IDGenerator.ID() - k, err := s.TokenGenerator.Token() - if err != nil { - return nil, &platform.Error{ - Err: err, - } - } - sess.Key = k - sess.UserID = u.ID - sess.CreatedAt = time.Now() - // TODO(desa): make this configurable - sess.ExpiresAt = sess.CreatedAt.Add(time.Hour) - // TODO(desa): not totally sure what to do here. Possibly we should have a maximal privilege permission. - sess.Permissions = []platform.Permission{} - - if err := s.PutSession(ctx, sess); err != nil { - return nil, err - } - - return sess, nil -} diff --git a/inmem/source.go b/inmem/source.go index e2dde0b364..ac41d2cd72 100644 --- a/inmem/source.go +++ b/inmem/source.go @@ -102,46 +102,8 @@ func (s *Service) FindSources(ctx context.Context, opt platform.FindOptions) ([] return ds, len(ds), nil } -// CreateSource creates a platform source and sets s.ID. -func (s *Service) CreateSource(ctx context.Context, src *platform.Source) error { - src.ID = s.IDGenerator.ID() - if err := s.PutSource(ctx, src); err != nil { - return &platform.Error{ - Err: err, - } - } - return nil -} - // PutSource will put a source without setting an ID. func (s *Service) PutSource(ctx context.Context, src *platform.Source) error { s.sourceKV.Store(src.ID.String(), src) return nil } - -// UpdateSource updates a source according the parameters set on upd. -func (s *Service) UpdateSource(ctx context.Context, id platform.ID, upd platform.SourceUpdate) (*platform.Source, error) { - src, err := s.FindSourceByID(ctx, id) - if err != nil { - return nil, &platform.Error{ - Err: err, - Op: OpPrefix + platform.OpUpdateSource, - } - } - - upd.Apply(src) - s.sourceKV.Store(src.ID.String(), src) - return src, nil -} - -// DeleteSource deletes a source and prunes it from the index. -func (s *Service) DeleteSource(ctx context.Context, id platform.ID) error { - if _, err := s.FindSourceByID(ctx, id); err != nil { - return &platform.Error{ - Err: err, - Op: OpPrefix + platform.OpDeleteSource, - } - } - s.sourceKV.Delete(id.String()) - return nil -} diff --git a/inmem/task_test.go b/inmem/task_test.go deleted file mode 100644 index ff324e3798..0000000000 --- a/inmem/task_test.go +++ /dev/null @@ -1,108 +0,0 @@ -package inmem_test - -import ( - "context" - "encoding/json" - "testing" - - "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/inmem" - "github.com/influxdata/influxdb/kv" - "github.com/influxdata/influxdb/snowflake" - "go.uber.org/zap/zaptest" -) - -var ( - taskBucket = []byte("tasksv1") - organizationBucket = []byte("organizationsv1") - authBucket = []byte("authorizationsv1") - idgen influxdb.IDGenerator = snowflake.NewIDGenerator() -) - -func BenchmarkFindTaskByID_CursorHints(b *testing.B) { - kvs := inmem.NewKVStore() - ctx := context.Background() - _ = kvs.Update(ctx, func(tx kv.Tx) error { - createData(b, tx) - createTasks(b, tx) - - return nil - }) - - s := kv.NewService(zaptest.NewLogger(b), kvs) - _ = s.Initialize(ctx) - - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - _, _ = s.FindTaskByID(ctx, 1) - } -} - -func createData(tb testing.TB, tx kv.Tx) { - tb.Helper() - - authB, err := tx.Bucket(authBucket) - if err != nil { - tb.Fatal("authBucket:", err) - } - orgB, err := tx.Bucket(organizationBucket) - if err != nil { - tb.Fatal("organizationBucket:", err) - } - - a := influxdb.Authorization{ - Permissions: influxdb.OperPermissions(), - } - o := influxdb.Organization{} - - var orgID = influxdb.ID(1e4) - var userID = influxdb.ID(1e7) - for i := 1; i <= 1000; i++ { - o.ID = orgID - val := mustMarshal(tb, &o) - key, _ := a.OrgID.Encode() - _ = orgB.Put(key, val) - - a.OrgID = o.ID - orgID++ - - for j := 1; j <= 5; j++ { - a.ID = idgen.ID() - a.UserID = userID - userID++ - - val = mustMarshal(tb, &a) - key, _ = a.ID.Encode() - _ = authB.Put(key, val) - } - } -} - -func createTasks(tb testing.TB, tx kv.Tx) { - tb.Helper() - - taskB, err := tx.Bucket(taskBucket) - if err != nil { - tb.Fatal("taskBucket:", err) - } - - t := influxdb.Task{ - ID: 1, - OrganizationID: 1e4, - OwnerID: 1e7, - } - - val := mustMarshal(tb, &t) - key, _ := t.ID.Encode() - _ = taskB.Put(key, val) -} - -func mustMarshal(t testing.TB, v interface{}) []byte { - t.Helper() - d, err := json.Marshal(v) - if err != nil { - t.Fatal(err) - } - return d -} diff --git a/inmem/telegraf.go b/inmem/telegraf.go deleted file mode 100644 index 6b4ac4cd55..0000000000 --- a/inmem/telegraf.go +++ /dev/null @@ -1,176 +0,0 @@ -package inmem - -import ( - "context" - - platform "github.com/influxdata/influxdb" -) - -var _ platform.TelegrafConfigStore = new(Service) - -// FindTelegrafConfigByID returns a single telegraf config by ID. -func (s *Service) FindTelegrafConfigByID(ctx context.Context, id platform.ID) (tc *platform.TelegrafConfig, err error) { - var pErr *platform.Error - tc, pErr = s.findTelegrafConfigByID(ctx, id) - if pErr != nil { - err = pErr - } - return tc, err -} - -func (s *Service) findTelegrafConfigByID(ctx context.Context, id platform.ID) (*platform.TelegrafConfig, *platform.Error) { - if !id.Valid() { - return nil, &platform.Error{ - Code: platform.EInvalid, - Msg: "provided telegraf configuration ID has invalid format", - } - } - result, found := s.telegrafConfigKV.Load(id) - if !found { - return nil, &platform.Error{ - Code: platform.ENotFound, - Msg: platform.ErrTelegrafConfigNotFound, - } - } - tc := new(platform.TelegrafConfig) - *tc = result.(platform.TelegrafConfig) - return tc, nil -} - -func (s *Service) findTelegrafConfigs(ctx context.Context, filter platform.TelegrafConfigFilter, opt ...platform.FindOptions) ([]*platform.TelegrafConfig, int, *platform.Error) { - tcs := make([]*platform.TelegrafConfig, 0) - m, _, err := s.FindUserResourceMappings(ctx, filter.UserResourceMappingFilter) - if err != nil { - return nil, 0, &platform.Error{ - Err: err, - } - } - if len(m) == 0 { - return tcs, 0, nil - } - for _, item := range m { - tc, err := s.findTelegrafConfigByID(ctx, item.ResourceID) - if err != nil && platform.ErrorCode(err) != platform.ENotFound { - return nil, 0, &platform.Error{ - // return internal error, for any mapping issue - Err: err, - } - } - if tc != nil { - // Restrict results by organization ID, if it has been provided - if filter.OrgID != nil && filter.OrgID.Valid() && tc.OrgID != *filter.OrgID { - continue - } - tcs = append(tcs, tc) - } - } - - return tcs, len(tcs), nil -} - -// FindTelegrafConfigs returns a list of telegraf configs that match filter and the total count of matching telegraf configs. -// Additional options provide pagination & sorting. -func (s *Service) FindTelegrafConfigs(ctx context.Context, filter platform.TelegrafConfigFilter, opt ...platform.FindOptions) (tcs []*platform.TelegrafConfig, n int, err error) { - op := OpPrefix + platform.OpFindTelegrafConfigs - var pErr *platform.Error - tcs, n, pErr = s.findTelegrafConfigs(ctx, filter) - if pErr != nil { - pErr.Op = op - err = pErr - } - return tcs, n, err -} - -func (s *Service) putTelegrafConfig(ctx context.Context, tc *platform.TelegrafConfig) *platform.Error { - if !tc.ID.Valid() { - return &platform.Error{ - Code: platform.EEmptyValue, - Err: platform.ErrInvalidID, - } - } - if !tc.OrgID.Valid() { - return &platform.Error{ - Code: platform.EEmptyValue, - Msg: platform.ErrTelegrafConfigInvalidOrgID, - } - } - s.telegrafConfigKV.Store(tc.ID, *tc) - return nil -} - -// CreateTelegrafConfig creates a new telegraf config and sets b.ID with the new identifier. -func (s *Service) CreateTelegrafConfig(ctx context.Context, tc *platform.TelegrafConfig, userID platform.ID) error { - op := OpPrefix + platform.OpCreateTelegrafConfig - tc.ID = s.IDGenerator.ID() - - pErr := s.putTelegrafConfig(ctx, tc) - if pErr != nil { - pErr.Op = op - return pErr - } - - urm := &platform.UserResourceMapping{ - ResourceID: tc.ID, - UserID: userID, - UserType: platform.Owner, - ResourceType: platform.TelegrafsResourceType, - } - if err := s.CreateUserResourceMapping(ctx, urm); err != nil { - return err - } - - return nil -} - -// UpdateTelegrafConfig updates a single telegraf config. -// Returns the new telegraf config after update. -func (s *Service) UpdateTelegrafConfig(ctx context.Context, id platform.ID, tc *platform.TelegrafConfig, userID platform.ID) (*platform.TelegrafConfig, error) { - var err error - op := OpPrefix + platform.OpUpdateTelegrafConfig - current, pErr := s.findTelegrafConfigByID(ctx, id) - if pErr != nil { - pErr.Op = op - err = pErr - return nil, err - } - tc.ID = id - // OrganizationID can not be updated - tc.OrgID = current.OrgID - pErr = s.putTelegrafConfig(ctx, tc) - if pErr != nil { - pErr.Op = op - err = pErr - } - - return tc, err -} - -// DeleteTelegrafConfig removes a telegraf config by ID. -func (s *Service) DeleteTelegrafConfig(ctx context.Context, id platform.ID) error { - op := OpPrefix + platform.OpDeleteTelegrafConfig - var err error - if !id.Valid() { - return &platform.Error{ - Msg: "provided telegraf configuration ID has invalid format", - Code: platform.EInvalid, - } - } - if _, pErr := s.findTelegrafConfigByID(ctx, id); pErr != nil { - return pErr - } - s.telegrafConfigKV.Delete(id) - - err = s.deleteUserResourceMapping(ctx, platform.UserResourceMappingFilter{ - ResourceID: id, - ResourceType: platform.TelegrafsResourceType, - }) - - if err != nil { - return &platform.Error{ - Code: platform.ErrorCode(err), - Op: op, - Err: err, - } - } - return nil -} diff --git a/inmem/telegraf_test.go b/inmem/telegraf_test.go deleted file mode 100644 index 94cd856395..0000000000 --- a/inmem/telegraf_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package inmem - -import ( - "context" - "testing" - - platform "github.com/influxdata/influxdb" - platformtesting "github.com/influxdata/influxdb/testing" -) - -func initTelegrafStore(f platformtesting.TelegrafConfigFields, t *testing.T) (platform.TelegrafConfigStore, func()) { - s := NewService() - s.IDGenerator = f.IDGenerator - ctx := context.Background() - for _, m := range f.UserResourceMappings { - if err := s.PutUserResourceMapping(ctx, m); err != nil { - t.Fatalf("failed to populate user resource mapping") - } - } - for _, tc := range f.TelegrafConfigs { - if err := s.putTelegrafConfig(ctx, tc); err != nil { - t.Fatalf("failed to populate telegraf configs") - } - } - return s, func() {} -} - -func TestTelegrafStore(t *testing.T) { - platformtesting.TelegrafConfigStore(initTelegrafStore, t) -} diff --git a/inmem/user_resource_mapping_service.go b/inmem/user_resource_mapping_service.go deleted file mode 100644 index ba70b89591..0000000000 --- a/inmem/user_resource_mapping_service.go +++ /dev/null @@ -1,128 +0,0 @@ -package inmem - -import ( - "context" - "fmt" - "path" - - platform "github.com/influxdata/influxdb" -) - -func encodeUserResourceMappingKey(resourceID, userID platform.ID) string { - return path.Join(resourceID.String(), userID.String()) -} - -func (s *Service) loadUserResourceMapping(ctx context.Context, resourceID, userID platform.ID) (*platform.UserResourceMapping, error) { - i, ok := s.userResourceMappingKV.Load(encodeUserResourceMappingKey(resourceID, userID)) - if !ok { - return nil, &platform.Error{ - Msg: "user to resource mapping not found", - Code: platform.ENotFound, - } - } - - m, ok := i.(platform.UserResourceMapping) - if !ok { - return nil, fmt.Errorf("type %T is not an userResource mapping", i) - } - - return &m, nil -} - -func (s *Service) FindUserResourceBy(ctx context.Context, resourceID, userID platform.ID) (*platform.UserResourceMapping, error) { - return s.loadUserResourceMapping(ctx, resourceID, userID) -} - -func (s *Service) forEachUserResourceMapping(ctx context.Context, fn func(m *platform.UserResourceMapping) bool) error { - var err error - s.userResourceMappingKV.Range(func(k, v interface{}) bool { - m, ok := v.(platform.UserResourceMapping) - if !ok { - err = fmt.Errorf("type %T is not a userResource mapping", v) - return false - } - return fn(&m) - }) - - return err -} - -func (s *Service) filterUserResourceMappings(ctx context.Context, fn func(m *platform.UserResourceMapping) bool) ([]*platform.UserResourceMapping, error) { - mappings := []*platform.UserResourceMapping{} - err := s.forEachUserResourceMapping(ctx, func(m *platform.UserResourceMapping) bool { - if fn(m) { - mappings = append(mappings, m) - } - return true - }) - - if err != nil { - return nil, err - } - - return mappings, nil -} - -func (s *Service) FindUserResourceMappings(ctx context.Context, filter platform.UserResourceMappingFilter, opt ...platform.FindOptions) ([]*platform.UserResourceMapping, int, error) { - if filter.ResourceID.Valid() && filter.UserID.Valid() { - m, err := s.FindUserResourceBy(ctx, filter.ResourceID, filter.UserID) - if err != nil { - return nil, 0, err - } - return []*platform.UserResourceMapping{m}, 1, nil - } - - filterFunc := func(mapping *platform.UserResourceMapping) bool { - return (!filter.UserID.Valid() || (filter.UserID == mapping.UserID)) && - (!filter.ResourceID.Valid() || (filter.ResourceID == mapping.ResourceID)) && - (filter.UserType == "" || (filter.UserType == mapping.UserType)) && - (filter.ResourceType == "" || (filter.ResourceType == mapping.ResourceType)) - } - - mappings, err := s.filterUserResourceMappings(ctx, filterFunc) - if err != nil { - return nil, 0, err - } - - return mappings, len(mappings), nil -} - -func (s *Service) CreateUserResourceMapping(ctx context.Context, m *platform.UserResourceMapping) error { - mapping, _ := s.FindUserResourceBy(ctx, m.ResourceID, m.UserID) - if mapping != nil { - return &platform.Error{ - Code: platform.EInternal, - Msg: fmt.Sprintf("Unexpected error when assigning user to a resource: mapping for user %s already exists", m.UserID), - } - } - - s.userResourceMappingKV.Store(encodeUserResourceMappingKey(m.ResourceID, m.UserID), *m) - return nil -} - -func (s *Service) PutUserResourceMapping(ctx context.Context, m *platform.UserResourceMapping) error { - s.userResourceMappingKV.Store(encodeUserResourceMappingKey(m.ResourceID, m.UserID), *m) - return nil -} - -func (s *Service) DeleteUserResourceMapping(ctx context.Context, resourceID, userID platform.ID) error { - mapping, err := s.FindUserResourceBy(ctx, resourceID, userID) - if mapping == nil && err != nil { - return err - } - - s.userResourceMappingKV.Delete(encodeUserResourceMappingKey(resourceID, userID)) - return nil -} - -func (s *Service) deleteUserResourceMapping(ctx context.Context, filter platform.UserResourceMappingFilter) error { - mappings, _, err := s.FindUserResourceMappings(ctx, filter) - if mappings == nil && err != nil { - return err - } - for _, m := range mappings { - s.userResourceMappingKV.Delete(encodeUserResourceMappingKey(m.ResourceID, m.UserID)) - } - - return nil -} diff --git a/inmem/user_resource_mapping_test.go b/inmem/user_resource_mapping_test.go deleted file mode 100644 index 3b98e2994e..0000000000 --- a/inmem/user_resource_mapping_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package inmem - -import ( - "context" - "testing" - - platform "github.com/influxdata/influxdb" - platformtesting "github.com/influxdata/influxdb/testing" -) - -func initUserResourceMappingService(f platformtesting.UserResourceFields, t *testing.T) (platform.UserResourceMappingService, func()) { - s := NewService() - ctx := context.TODO() - for _, m := range f.UserResourceMappings { - if err := s.CreateUserResourceMapping(ctx, m); err != nil { - t.Fatalf("failed to populate mappings") - } - } - - return s, func() {} -} - -func TestUserResourceMappingService_FindUserResourceMappings(t *testing.T) { - platformtesting.FindUserResourceMappings(initUserResourceMappingService, t) -} - -func TestUserResourceMappingService_CreateUserResourceMapping(t *testing.T) { - platformtesting.CreateUserResourceMapping(initUserResourceMappingService, t) -} - -func TestUserResourceMappingService_DeleteUserResourceMapping(t *testing.T) { - platformtesting.DeleteUserResourceMapping(initUserResourceMappingService, t) -} diff --git a/inmem/user_service.go b/inmem/user_service.go deleted file mode 100644 index 30be24b5f0..0000000000 --- a/inmem/user_service.go +++ /dev/null @@ -1,205 +0,0 @@ -package inmem - -import ( - "context" - "fmt" - - "github.com/influxdata/influxdb" - platform "github.com/influxdata/influxdb" -) - -var _ platform.UserService = (*Service)(nil) - -func (s *Service) loadUser(id platform.ID) (*platform.User, *platform.Error) { - i, ok := s.userKV.Load(id.String()) - if !ok { - return nil, &platform.Error{ - Code: platform.ENotFound, - Msg: "user not found", - } - } - - b, ok := i.(*platform.User) - if !ok { - return nil, &platform.Error{ - Code: platform.EInternal, - Msg: fmt.Sprintf("type %T is not a user", i), - } - } - return b, nil -} - -func (s *Service) forEachUser(ctx context.Context, fn func(b *platform.User) bool) error { - var err error - s.userKV.Range(func(k, v interface{}) bool { - o, ok := v.(*platform.User) - if !ok { - err = fmt.Errorf("type %T is not a user", v) - return false - } - - return fn(o) - }) - - return err -} - -// FindUserByID returns a single user by ID. -func (s *Service) FindUserByID(ctx context.Context, id platform.ID) (u *platform.User, err error) { - var pe *platform.Error - u, pe = s.loadUser(id) - if pe != nil { - err = &platform.Error{ - Op: OpPrefix + platform.OpFindUserByID, - Err: pe, - } - } - return u, err -} - -func (s *Service) findUserByName(ctx context.Context, n string) (*platform.User, error) { - return s.FindUser(ctx, platform.UserFilter{Name: &n}) -} - -// FindUser returns the first user that matches a filter. -func (s *Service) FindUser(ctx context.Context, filter platform.UserFilter) (*platform.User, error) { - op := OpPrefix + platform.OpFindUser - if filter.ID != nil { - u, err := s.FindUserByID(ctx, *filter.ID) - if err != nil { - return nil, &platform.Error{ - Op: op, - Err: err, - } - } - return u, nil - } - - if filter.Name != nil { - var u *platform.User - - err := s.forEachUser(ctx, func(user *platform.User) bool { - if user.Name == *filter.Name { - u = user - return false - } - return true - }) - - if err != nil { - return nil, err - } - - if u == nil { - return nil, &platform.Error{ - Code: platform.ENotFound, - Op: op, - Msg: "user not found", - } - } - - return u, nil - } - - return nil, &platform.Error{ - Code: platform.EInvalid, - Op: op, - Msg: "expected filter to contain name", - } -} - -// FindUsers will retrieve a list of users from storage. -func (s *Service) FindUsers(ctx context.Context, filter platform.UserFilter, opt ...platform.FindOptions) ([]*platform.User, int, error) { - op := OpPrefix + platform.OpFindUsers - if filter.ID != nil { - u, err := s.FindUserByID(ctx, *filter.ID) - if err != nil { - return nil, 0, &platform.Error{ - Err: err, - Op: op, - } - } - - return []*platform.User{u}, 1, nil - } - if filter.Name != nil { - u, err := s.FindUser(ctx, filter) - if err != nil { - return nil, 0, &platform.Error{ - Err: err, - Op: op, - } - } - - return []*platform.User{u}, 1, nil - } - - users := []*platform.User{} - - err := s.forEachUser(ctx, func(user *platform.User) bool { - users = append(users, user) - return true - }) - - if err != nil { - return nil, 0, err - } - - return users, len(users), nil -} - -// CreateUser will create an user into storage. -func (s *Service) CreateUser(ctx context.Context, u *platform.User) error { - if _, err := s.FindUser(ctx, platform.UserFilter{Name: &u.Name}); err == nil { - return &platform.Error{ - Code: platform.EConflict, - Op: OpPrefix + platform.OpCreateUser, - Msg: fmt.Sprintf("user with name %s already exists", u.Name), - } - } - u.ID = s.IDGenerator.ID() - u.Status = influxdb.Active - s.PutUser(ctx, u) - return nil -} - -// PutUser put a user into storage. -func (s *Service) PutUser(ctx context.Context, o *platform.User) error { - s.userKV.Store(o.ID.String(), o) - return nil -} - -// UpdateUser update a user in storage. -func (s *Service) UpdateUser(ctx context.Context, id platform.ID, upd platform.UserUpdate) (*platform.User, error) { - o, err := s.FindUserByID(ctx, id) - if err != nil { - return nil, &platform.Error{ - Err: err, - Op: OpPrefix + platform.OpUpdateUser, - } - } - - if upd.Name != nil { - o.Name = *upd.Name - } - - if upd.Status != nil { - o.Status = *upd.Status - } - - s.userKV.Store(o.ID.String(), o) - - return o, nil -} - -// DeleteUser remove a user from storage. -func (s *Service) DeleteUser(ctx context.Context, id platform.ID) error { - if _, err := s.FindUserByID(ctx, id); err != nil { - return &platform.Error{ - Err: err, - Op: OpPrefix + platform.OpDeleteUser, - } - } - s.userKV.Delete(id.String()) - return nil -} diff --git a/inmem/user_test.go b/inmem/user_test.go deleted file mode 100644 index 226cd831d3..0000000000 --- a/inmem/user_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package inmem - -import ( - "context" - "testing" - - "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/kv" - influxdbtesting "github.com/influxdata/influxdb/testing" - "go.uber.org/zap/zaptest" -) - -func initUserService(f influxdbtesting.UserFields, t *testing.T) (influxdb.UserService, string, func()) { - s := NewKVStore() - svc := kv.NewService(zaptest.NewLogger(t), s) - svc.IDGenerator = f.IDGenerator - ctx := context.Background() - if err := svc.Initialize(ctx); err != nil { - t.Fatalf("error initializing user service: %v", err) - } - - for _, u := range f.Users { - if err := svc.PutUser(ctx, u); err != nil { - t.Fatalf("failed to populate users") - } - } - return svc, "kv/", func() { - for _, u := range f.Users { - if err := svc.DeleteUser(ctx, u.ID); err != nil { - t.Logf("failed to remove users: %v", err) - } - } - } -} - -func TestUserService(t *testing.T) { - t.Parallel() - influxdbtesting.UserService(initUserService, t) -} diff --git a/inmem/variable.go b/inmem/variable.go deleted file mode 100644 index 5977be8564..0000000000 --- a/inmem/variable.go +++ /dev/null @@ -1,161 +0,0 @@ -package inmem - -import ( - "context" - "fmt" - - platform "github.com/influxdata/influxdb" -) - -func (s *Service) loadVariable(ctx context.Context, id platform.ID) (*platform.Variable, *platform.Error) { - r, ok := s.variableKV.Load(id.String()) - if !ok { - return nil, &platform.Error{ - Code: platform.ENotFound, - Msg: platform.ErrVariableNotFound, - } - } - - m, ok := r.(*platform.Variable) - if !ok { - return nil, &platform.Error{ - Code: platform.EInvalid, - Msg: fmt.Sprintf("type %T is not a variable", r), - } - } - - return m, nil -} - -// FindVariableByID implements the platform.VariableService interface -func (s *Service) FindVariableByID(ctx context.Context, id platform.ID) (*platform.Variable, error) { - m, pe := s.loadVariable(ctx, id) - if pe != nil { - return nil, &platform.Error{ - Err: pe, - Op: OpPrefix + platform.OpFindVariableByID, - } - } - - return m, nil -} - -func filterVariablesFn(filter platform.VariableFilter) func(m *platform.Variable) bool { - if filter.ID != nil { - return func(m *platform.Variable) bool { - return m.ID == *filter.ID - } - } - - if filter.OrganizationID != nil { - return func(m *platform.Variable) bool { - return m.OrganizationID == *filter.OrganizationID - } - } - - return func(m *platform.Variable) bool { return true } -} - -// FindVariables implements the platform.VariableService interface -func (s *Service) FindVariables(ctx context.Context, filter platform.VariableFilter, opt ...platform.FindOptions) ([]*platform.Variable, error) { - op := OpPrefix + platform.OpFindVariables - var variables []*platform.Variable - - if filter.ID != nil { - m, err := s.FindVariableByID(ctx, *filter.ID) - if err != nil && platform.ErrorCode(err) != platform.ENotFound { - return variables, &platform.Error{ - Err: err, - Op: op, - } - } - if m == nil { - return variables, nil - } - - return []*platform.Variable{m}, nil - } - - filterFn := filterVariablesFn(filter) - s.variableKV.Range(func(k, v interface{}) bool { - variable, ok := v.(*platform.Variable) - if !ok { - return false - } - if filterFn(variable) { - variables = append(variables, variable) - } - - return true - }) - - return variables, nil -} - -// CreateVariable implements the platform.VariableService interface -func (s *Service) CreateVariable(ctx context.Context, m *platform.Variable) error { - op := OpPrefix + platform.OpCreateVariable - m.ID = s.IDGenerator.ID() - err := s.ReplaceVariable(ctx, m) - now := s.Now() - m.CreatedAt = now - m.UpdatedAt = now - if err != nil { - return &platform.Error{ - Op: op, - Err: err, - } - } - - return nil -} - -// UpdateVariable implements the platform.VariableService interface -func (s *Service) UpdateVariable(ctx context.Context, id platform.ID, update *platform.VariableUpdate) (*platform.Variable, error) { - op := OpPrefix + platform.OpUpdateVariable - variable, err := s.FindVariableByID(ctx, id) - if err != nil { - return nil, &platform.Error{ - Op: op, - Err: err, - } - } - now := s.Now() - variable.UpdatedAt = now - if err := update.Apply(variable); err != nil { - return nil, &platform.Error{ - Op: op, - Err: err, - } - } - - if err := s.ReplaceVariable(ctx, variable); err != nil { - return nil, &platform.Error{ - Op: op, - Err: err, - } - } - return variable, nil -} - -// DeleteVariable implements the platform.VariableService interface -func (s *Service) DeleteVariable(ctx context.Context, id platform.ID) error { - op := OpPrefix + platform.OpDeleteVariable - _, err := s.FindVariableByID(ctx, id) - if err != nil { - return &platform.Error{ - Op: op, - Err: err, - } - } - - s.variableKV.Delete(id.String()) - - return nil -} - -// ReplaceVariable stores a Variable in the key value store -func (s *Service) ReplaceVariable(ctx context.Context, m *platform.Variable) error { - s.variableKV.Store(m.ID.String(), m) - return nil -} diff --git a/kv/dashboard.go b/kv/dashboard.go index 381a3e0d74..d442c8c160 100644 --- a/kv/dashboard.go +++ b/kv/dashboard.go @@ -854,9 +854,9 @@ func (s *Service) DeleteDashboard(ctx context.Context, id influxdb.ID) error { } func (s *Service) deleteDashboard(ctx context.Context, tx Tx, id influxdb.ID) error { - d, pe := s.findDashboardByID(ctx, tx, id) - if pe != nil { - return pe + d, err := s.findDashboardByID(ctx, tx, id) + if err != nil { + return err } for _, cell := range d.Cells { @@ -956,6 +956,7 @@ func (s *Service) appendDashboardEventToLog(ctx context.Context, tx Tx, id influ // TODO(desa): this is fragile and non explicit since it requires an authorizer to be on context. It should be // replaced with a higher level transaction so that adding to the log can take place in the http handler // where the userID will exist explicitly. + a, err := icontext.GetAuthorizer(ctx) if err == nil { // Add the user to the log if you can, but don't error if its not there. diff --git a/storage/bucket_service_test.go b/storage/bucket_service_test.go index e571a0a29d..b4859fe658 100644 --- a/storage/bucket_service_test.go +++ b/storage/bucket_service_test.go @@ -6,7 +6,9 @@ import ( platform "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/inmem" + "github.com/influxdata/influxdb/kv" "github.com/influxdata/influxdb/storage" + "go.uber.org/zap/zaptest" ) func TestBucketService(t *testing.T) { @@ -21,7 +23,7 @@ func TestBucketService(t *testing.T) { t.Fatal("expected error, got nil") } - inmemService := inmem.NewService() + inmemService := newInMemKVSVC(t) service = storage.NewBucketService(inmemService, nil) if err := service.DeleteBucket(context.TODO(), *i); err == nil { @@ -61,3 +63,13 @@ func (m *MockDeleter) DeleteBucket(_ context.Context, orgID, bucketID platform.I m.orgID, m.bucketID = orgID, bucketID return nil } + +func newInMemKVSVC(t *testing.T) *kv.Service { + t.Helper() + + svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore()) + if err := svc.Initialize(context.Background()); err != nil { + t.Fatal(err) + } + return svc +}