fix(inmem): remove inmem.Service

pull/13039/head
zhulongcheng 2019-04-08 14:26:32 +08:00
parent cacd6a8223
commit df9702a4be
31 changed files with 0 additions and 3726 deletions

View File

@ -1,225 +0,0 @@
package inmem
import (
"context"
platform "github.com/influxdata/influxdb"
)
func (s *Service) loadAuthorization(ctx context.Context, id platform.ID) (*platform.Authorization, *platform.Error) {
i, ok := s.authorizationKV.Load(id.String())
if !ok {
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: "authorization not found",
}
}
a, ok := i.(platform.Authorization)
if !ok {
return nil, &platform.Error{
Code: platform.EInternal,
Msg: "value found in map is not an authorization",
}
}
if a.Status == "" {
a.Status = platform.Active
}
return &a, nil
}
// PutAuthorization overwrites the authorization with the contents of a.
func (s *Service) PutAuthorization(ctx context.Context, a *platform.Authorization) error {
if a.Status == "" {
a.Status = platform.Active
}
s.authorizationKV.Store(a.ID.String(), *a)
return nil
}
// FindAuthorizationByID returns an authorization given an ID.
func (s *Service) FindAuthorizationByID(ctx context.Context, id platform.ID) (*platform.Authorization, error) {
var err error
a, pe := s.loadAuthorization(ctx, id)
if pe != nil {
pe.Op = OpPrefix + platform.OpFindAuthorizationByID
err = pe
}
return a, err
}
// FindAuthorizationByToken returns an authorization given a token.
func (s *Service) FindAuthorizationByToken(ctx context.Context, t string) (*platform.Authorization, error) {
var err error
op := OpPrefix + platform.OpFindAuthorizationByToken
var n int
var as []*platform.Authorization
as, n, err = s.FindAuthorizations(ctx, platform.AuthorizationFilter{Token: &t})
if err != nil {
return nil, &platform.Error{
Err: err,
Op: op,
}
}
if n < 1 {
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: "authorization not found",
Op: op,
}
}
return as[0], nil
}
func filterAuthorizationsFn(filter platform.AuthorizationFilter) func(a *platform.Authorization) bool {
if filter.ID != nil {
return func(a *platform.Authorization) bool {
return a.ID == *filter.ID
}
}
if filter.Token != nil {
return func(a *platform.Authorization) bool {
return a.Token == *filter.Token
}
}
if filter.UserID != nil {
return func(a *platform.Authorization) bool {
return a.UserID == *filter.UserID
}
}
return func(a *platform.Authorization) bool { return true }
}
// FindAuthorizations returns all authorizations matching the filter.
func (s *Service) FindAuthorizations(ctx context.Context, filter platform.AuthorizationFilter, opt ...platform.FindOptions) ([]*platform.Authorization, int, error) {
op := OpPrefix + platform.OpFindAuthorizations
if filter.ID != nil {
a, err := s.FindAuthorizationByID(ctx, *filter.ID)
if err != nil {
return nil, 0, &platform.Error{
Err: err,
Op: op,
}
}
return []*platform.Authorization{a}, 1, nil
}
var as []*platform.Authorization
if filter.User != nil {
u, err := s.findUserByName(ctx, *filter.User)
if err != nil {
return nil, 0, &platform.Error{
Op: op,
Err: err,
}
}
filter.UserID = &u.ID
}
var err error
filterF := filterAuthorizationsFn(filter)
s.authorizationKV.Range(func(k, v interface{}) bool {
a, ok := v.(platform.Authorization)
if !ok {
err = &platform.Error{
Code: platform.EInternal,
Msg: "value found in map is not an authorization",
Op: op,
}
return false
}
if filterF(&a) {
as = append(as, &a)
}
return true
})
if err != nil {
return nil, 0, err
}
return as, len(as), nil
}
// CreateAuthorization sets a.Token and a.ID and creates an platform.Authorization
func (s *Service) CreateAuthorization(ctx context.Context, a *platform.Authorization) error {
op := OpPrefix + platform.OpCreateAuthorization
_, pErr := s.FindUserByID(ctx, a.UserID)
if pErr != nil {
return platform.ErrUnableToCreateToken
}
_, pErr = s.FindOrganizationByID(ctx, a.OrgID)
if pErr != nil {
return platform.ErrUnableToCreateToken
}
if a.Token == "" {
var err error
a.Token, err = s.TokenGenerator.Token()
if err != nil {
return &platform.Error{
Err: err,
Op: op,
}
}
}
a.ID = s.IDGenerator.ID()
a.Status = platform.Active
return s.PutAuthorization(ctx, a)
}
// DeleteAuthorization deletes an authorization associated with id.
func (s *Service) DeleteAuthorization(ctx context.Context, id platform.ID) error {
if _, err := s.FindAuthorizationByID(ctx, id); err != nil {
return &platform.Error{
Err: err,
Op: OpPrefix + platform.OpDeleteAuthorization,
}
}
s.authorizationKV.Delete(id.String())
return nil
}
// UpdateAuthorization updates the status and description if available.
func (s *Service) UpdateAuthorization(ctx context.Context, id platform.ID, upd *platform.AuthorizationUpdate) error {
op := OpPrefix + platform.OpUpdateAuthorization
a, err := s.FindAuthorizationByID(ctx, id)
if err != nil {
return &platform.Error{
Err: err,
Op: op,
}
}
if upd.Status != nil {
status := *upd.Status
switch status {
case platform.Active, platform.Inactive:
default:
return &platform.Error{
Code: platform.EInvalid,
Msg: "unknown authorization status",
Op: op,
}
}
a.Status = status
}
if upd.Description != nil {
a.Description = *upd.Description
}
return s.PutAuthorization(ctx, a)
}

View File

@ -1,40 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
platformtesting "github.com/influxdata/influxdb/testing"
)
func initAuthorizationService(f platformtesting.AuthorizationFields, t *testing.T) (platform.AuthorizationService, string, func()) {
s := NewService()
s.IDGenerator = f.IDGenerator
s.TokenGenerator = f.TokenGenerator
ctx := context.Background()
for _, u := range f.Users {
if err := s.PutUser(ctx, u); err != nil {
t.Fatalf("failed to populate users")
}
}
for _, o := range f.Orgs {
if err := s.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate organizations")
}
}
for _, u := range f.Authorizations {
if err := s.PutAuthorization(ctx, u); err != nil {
t.Fatalf("failed to populate authorizations")
}
}
return s, OpPrefix, func() {}
}
func TestAuthorizationService(t *testing.T) {
platformtesting.AuthorizationService(initAuthorizationService, t)
}

View File

@ -1,344 +0,0 @@
package inmem
import (
"context"
"fmt"
"sort"
platform "github.com/influxdata/influxdb"
)
var (
errBucketNotFound = "bucket not found"
)
func (s *Service) loadBucket(ctx context.Context, id platform.ID) (*platform.Bucket, *platform.Error) {
i, ok := s.bucketKV.Load(id.String())
if !ok {
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: errBucketNotFound,
}
}
b, ok := i.(platform.Bucket)
if !ok {
return nil, &platform.Error{
Code: platform.EInternal,
Msg: fmt.Sprintf("type %T is not a bucket", i),
}
}
if err := s.setOrganizationNameOnBucket(ctx, &b); err != nil {
return nil, &platform.Error{
Err: err,
}
}
return &b, nil
}
func (s *Service) setOrganizationNameOnBucket(ctx context.Context, b *platform.Bucket) error {
o, err := s.loadOrganization(b.OrganizationID)
if err != nil {
return err
}
b.Organization = o.Name
return nil
}
// FindBucketByID returns a single bucket by ID.
func (s *Service) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) {
var b *platform.Bucket
var pe *platform.Error
var err error
if b, pe = s.loadBucket(ctx, id); pe != nil {
pe.Op = OpPrefix + platform.OpFindBucketByID
err = pe
}
return b, err
}
func (s *Service) forEachBucket(ctx context.Context, descending bool, fn func(b *platform.Bucket) bool) error {
var err error
bs := make([]*platform.Bucket, 0)
s.bucketKV.Range(func(k, v interface{}) bool {
b, ok := v.(platform.Bucket)
if !ok {
err = fmt.Errorf("type %T is not a bucket", v)
return false
}
bs = append(bs, &b)
return true
})
// sort by id
sort.Slice(bs, func(i, j int) bool {
if descending {
return bs[i].ID.String() > bs[j].ID.String()
}
return bs[i].ID.String() < bs[j].ID.String()
})
for _, b := range bs {
if !fn(b) {
return nil
}
}
return err
}
func (s *Service) filterBuckets(ctx context.Context, fn func(b *platform.Bucket) bool, opts ...platform.FindOptions) ([]*platform.Bucket, error) {
var offset, limit, count int
var descending bool
if len(opts) > 0 {
offset = opts[0].Offset
limit = opts[0].Limit
descending = opts[0].Descending
}
buckets := []*platform.Bucket{}
err := s.forEachBucket(ctx, descending, func(b *platform.Bucket) bool {
if fn(b) {
if count >= offset {
buckets = append(buckets, b)
}
count += 1
}
if limit > 0 && len(buckets) >= limit {
return false
}
return true
})
if err != nil {
return nil, err
}
return buckets, nil
}
// FindBucket returns the first bucket that matches filter.
func (s *Service) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) {
op := OpPrefix + platform.OpFindBucket
var err error
var b *platform.Bucket
if filter.ID == nil && filter.Name == nil && filter.OrganizationID == nil {
return nil, &platform.Error{
Code: platform.EInvalid,
Op: op,
Msg: "no filter parameters provided",
}
}
// filter by bucket id
if filter.ID != nil {
b, err = s.FindBucketByID(ctx, *filter.ID)
if err != nil {
return nil, &platform.Error{
Op: op,
Err: err,
}
}
return b, nil
}
bs, n, err := s.FindBuckets(ctx, filter)
if err != nil {
return nil, err
}
if n < 1 && filter.Name != nil {
return nil, &platform.Error{
Code: platform.ENotFound,
Op: op,
Msg: fmt.Sprintf("bucket %q not found", *filter.Name),
}
} else if n < 1 {
return nil, &platform.Error{
Code: platform.ENotFound,
Op: op,
Msg: "bucket not found",
}
}
return bs[0], nil
}
func (s *Service) findBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, *platform.Error) {
// filter by bucket id
if filter.ID != nil {
b, err := s.FindBucketByID(ctx, *filter.ID)
if err != nil {
return nil, &platform.Error{
Err: err,
}
}
return []*platform.Bucket{b}, nil
}
if filter.Organization != nil {
o, err := s.findOrganizationByName(ctx, *filter.Organization)
if err != nil {
return nil, &platform.Error{
Err: err,
}
}
filter.OrganizationID = &o.ID
}
filterFunc := func(b *platform.Bucket) bool { return true }
if filter.Name != nil && filter.OrganizationID != nil {
filterFunc = func(b *platform.Bucket) bool {
return b.Name == *filter.Name && b.OrganizationID == *filter.OrganizationID
}
} else if filter.Name != nil {
// filter by bucket name
filterFunc = func(b *platform.Bucket) bool {
return b.Name == *filter.Name
}
} else if filter.OrganizationID != nil {
// filter by organization id
filterFunc = func(b *platform.Bucket) bool {
return b.OrganizationID == *filter.OrganizationID
}
}
bs, err := s.filterBuckets(ctx, filterFunc, opt...)
if err != nil {
return nil, &platform.Error{
Err: err,
}
}
return bs, nil
}
// FindBuckets returns a list of buckets that match filter and the total count of matching buckets.
// Additional options provide pagination & sorting.
func (s *Service) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) {
var err error
bs, pe := s.findBuckets(ctx, filter, opt...)
if pe != nil {
pe.Op = OpPrefix + platform.OpFindBuckets
err = pe
return nil, 0, err
}
for _, b := range bs {
if err := s.setOrganizationNameOnBucket(ctx, b); err != nil {
return nil, 0, err
}
}
return bs, len(bs), nil
}
// CreateBucket creates a new bucket and sets b.ID with the new identifier.
func (s *Service) CreateBucket(ctx context.Context, b *platform.Bucket) error {
if b.OrganizationID.Valid() {
_, pe := s.FindOrganizationByID(ctx, b.OrganizationID)
if pe != nil {
return &platform.Error{
Err: pe,
Op: OpPrefix + platform.OpCreateBucket,
}
}
} else {
o, pe := s.findOrganizationByName(ctx, b.Organization)
if pe != nil {
return &platform.Error{
Err: pe,
Op: OpPrefix + platform.OpCreateBucket,
}
}
b.OrganizationID = o.ID
}
filter := platform.BucketFilter{
Name: &b.Name,
OrganizationID: &b.OrganizationID,
}
if _, err := s.FindBucket(ctx, filter); err == nil {
return &platform.Error{
Code: platform.EConflict,
Op: OpPrefix + platform.OpCreateBucket,
Msg: fmt.Sprintf("bucket with name %s already exists", b.Name),
}
}
b.ID = s.IDGenerator.ID()
return s.PutBucket(ctx, b)
}
// PutBucket sets bucket with the current ID.
func (s *Service) PutBucket(ctx context.Context, b *platform.Bucket) error {
s.bucketKV.Store(b.ID.String(), *b)
return nil
}
// UpdateBucket updates a single bucket with changeset.
// Returns the new bucket state after update.
func (s *Service) UpdateBucket(ctx context.Context, id platform.ID, upd platform.BucketUpdate) (*platform.Bucket, error) {
b, err := s.FindBucketByID(ctx, id)
if err != nil {
return nil, &platform.Error{
Op: OpPrefix + platform.OpUpdateBucket,
Err: err,
}
}
if upd.Name != nil {
b.Name = *upd.Name
}
if upd.RetentionPeriod != nil {
b.RetentionPeriod = *upd.RetentionPeriod
}
b0, err := s.FindBucket(ctx, platform.BucketFilter{
Name: upd.Name,
})
if err == nil && b0.ID != id {
return nil, &platform.Error{
Code: platform.EConflict,
Msg: "bucket name is not unique",
}
}
s.bucketKV.Store(b.ID.String(), b)
return b, nil
}
// DeleteBucket removes a bucket by ID.
func (s *Service) DeleteBucket(ctx context.Context, id platform.ID) error {
if _, err := s.FindBucketByID(ctx, id); err != nil {
return &platform.Error{
Op: OpPrefix + platform.OpDeleteBucket,
Err: err,
}
}
s.bucketKV.Delete(id.String())
// return s.deleteLabel(ctx, platform.LabelFilter{ResourceID: id})
return nil
}
// DeleteOrganizationBuckets removes all the buckets for a given org
func (s *Service) DeleteOrganizationBuckets(ctx context.Context, id platform.ID) error {
bucks, err := s.findBuckets(ctx, platform.BucketFilter{
OrganizationID: &id,
})
if err != nil {
return err
}
for _, buck := range bucks {
s.bucketKV.Delete(buck.ID.String())
}
return nil
}

View File

@ -1,30 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
platformtesting "github.com/influxdata/influxdb/testing"
)
func initBucketService(f platformtesting.BucketFields, t *testing.T) (platform.BucketService, string, func()) {
s := NewService()
s.IDGenerator = f.IDGenerator
ctx := context.Background()
for _, o := range f.Organizations {
if err := s.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate organizations")
}
}
for _, b := range f.Buckets {
if err := s.PutBucket(ctx, b); err != nil {
t.Fatalf("failed to populate buckets")
}
}
return s, OpPrefix, func() {}
}
func TestBucketService(t *testing.T) {
platformtesting.BucketService(initBucketService, t)
}

View File

@ -1,494 +0,0 @@
package inmem
import (
"context"
"fmt"
platform "github.com/influxdata/influxdb"
)
func (s *Service) loadDashboard(ctx context.Context, id platform.ID) (*platform.Dashboard, *platform.Error) {
i, ok := s.dashboardKV.Load(id.String())
if !ok {
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: platform.ErrDashboardNotFound,
}
}
d, ok := i.(*platform.Dashboard)
if !ok {
return nil, &platform.Error{
Code: platform.EInvalid,
Msg: fmt.Sprintf("type %T is not a dashboard", i),
}
}
return d, nil
}
// FindDashboardByID returns a single dashboard by ID.
func (s *Service) FindDashboardByID(ctx context.Context, id platform.ID) (*platform.Dashboard, error) {
d, pe := s.loadDashboard(ctx, id)
if pe != nil {
return nil, &platform.Error{
Op: OpPrefix + platform.OpFindDashboardByID,
Err: pe,
}
}
return d, nil
}
func filterDashboardFn(filter platform.DashboardFilter) func(d *platform.Dashboard) bool {
if filter.OrganizationID != nil {
return func(d *platform.Dashboard) bool {
return d.OrganizationID == *filter.OrganizationID
}
}
if len(filter.IDs) > 0 {
m := map[platform.ID]struct{}{}
for _, id := range filter.IDs {
if id != nil {
m[*id] = struct{}{}
}
}
return func(d *platform.Dashboard) bool {
_, ok := m[d.ID]
return ok
}
}
return func(d *platform.Dashboard) bool { return true }
}
func (s *Service) forEachDashboard(ctx context.Context, opts platform.FindOptions, fn func(d *platform.Dashboard) bool) error {
var err error
ds := make([]*platform.Dashboard, 0)
s.dashboardKV.Range(func(k, v interface{}) bool {
d, ok := v.(*platform.Dashboard)
if !ok {
err = fmt.Errorf("type %T is not a dashboard", v)
return false
}
ds = append(ds, d)
return true
})
platform.SortDashboards(opts, ds)
for _, d := range ds {
if !fn(d) {
return nil
}
}
return err
}
// FindDashboards implements platform.DashboardService interface.
func (s *Service) FindDashboards(ctx context.Context, filter platform.DashboardFilter, opts platform.FindOptions) ([]*platform.Dashboard, int, error) {
var ds []*platform.Dashboard
op := OpPrefix + platform.OpFindDashboards
if len(filter.IDs) == 1 {
d, err := s.FindDashboardByID(ctx, *filter.IDs[0])
if err != nil && platform.ErrorCode(err) != platform.ENotFound {
return ds, 0, &platform.Error{
Err: err,
Op: op,
}
}
if d == nil {
return ds, 0, nil
}
return []*platform.Dashboard{d}, 1, nil
}
var count int
filterFn := filterDashboardFn(filter)
err := s.forEachDashboard(ctx, opts, func(d *platform.Dashboard) bool {
if filterFn(d) {
if count >= opts.Offset {
ds = append(ds, d)
}
count++
}
if opts.Limit > 0 && len(ds) >= opts.Limit {
return false
}
return true
})
if err != nil {
return nil, 0, err
}
return ds, len(ds), err
}
// CreateDashboard implements platform.DashboardService interface.
func (s *Service) CreateDashboard(ctx context.Context, d *platform.Dashboard) error {
d.ID = s.IDGenerator.ID()
d.Meta.CreatedAt = s.time()
err := s.PutDashboardWithMeta(ctx, d)
if err != nil {
return &platform.Error{
Err: err,
Op: platform.OpCreateDashboard,
}
}
return nil
}
// PutDashboard implements platform.DashboardService interface.
func (s *Service) PutDashboard(ctx context.Context, d *platform.Dashboard) error {
for _, cell := range d.Cells {
if err := s.PutCellView(ctx, cell); err != nil {
return err
}
}
s.dashboardKV.Store(d.ID.String(), d)
return nil
}
// PutCellView puts the view for a cell.
func (s *Service) PutCellView(ctx context.Context, cell *platform.Cell) error {
v := &platform.View{}
v.ID = cell.ID
return s.PutView(ctx, v)
}
// PutDashboardWithMeta sets a dashboard while updating the meta field of a dashboard.
func (s *Service) PutDashboardWithMeta(ctx context.Context, d *platform.Dashboard) error {
d.Meta.UpdatedAt = s.time()
return s.PutDashboard(ctx, d)
}
// UpdateDashboard implements platform.DashboardService interface.
func (s *Service) UpdateDashboard(ctx context.Context, id platform.ID, upd platform.DashboardUpdate) (*platform.Dashboard, error) {
op := OpPrefix + platform.OpUpdateDashboard
if err := upd.Valid(); err != nil {
return nil, &platform.Error{
Err: err,
Op: op,
}
}
d, err := s.FindDashboardByID(ctx, id)
if err != nil {
return nil, &platform.Error{
Err: err,
Op: op,
}
}
if err := upd.Apply(d); err != nil {
return nil, &platform.Error{
Err: err,
Op: op,
}
}
if err := s.PutDashboardWithMeta(ctx, d); err != nil {
return nil, &platform.Error{
Err: err,
Op: op,
}
}
return d, nil
}
// DeleteDashboard implements platform.DashboardService interface.
func (s *Service) DeleteDashboard(ctx context.Context, id platform.ID) error {
op := OpPrefix + platform.OpDeleteDashboard
if _, err := s.FindDashboardByID(ctx, id); err != nil {
return &platform.Error{
Err: err,
Op: op,
}
}
s.dashboardKV.Delete(id.String())
// err := s.deleteLabel(ctx, platform.LabelFilter{ResourceID: id})
// if err != nil {
// return &platform.Error{
// Err: err,
// Op: op,
// }
// }
return nil
}
// AddDashboardCell adds a new cell to the dashboard.
func (s *Service) AddDashboardCell(ctx context.Context, id platform.ID, cell *platform.Cell, opts platform.AddDashboardCellOptions) error {
op := OpPrefix + platform.OpAddDashboardCell
d, err := s.FindDashboardByID(ctx, id)
if err != nil {
return &platform.Error{
Err: err,
Op: op,
}
}
cell.ID = s.IDGenerator.ID()
if err := s.createCellView(ctx, cell); err != nil {
return &platform.Error{
Err: err,
Op: op,
}
}
d.Cells = append(d.Cells, cell)
if err = s.PutDashboardWithMeta(ctx, d); err != nil {
return &platform.Error{
Err: err,
Op: op,
}
}
return nil
}
func (s *Service) createCellView(ctx context.Context, cell *platform.Cell) *platform.Error {
// If not view exists create the view
view := &platform.View{}
view.ID = cell.ID
if err := s.PutView(ctx, view); err != nil {
return &platform.Error{
Err: err,
}
}
return nil
}
// PutDashboardCell replaces a dashboad cell with the cell contents.
func (s *Service) PutDashboardCell(ctx context.Context, id platform.ID, cell *platform.Cell) error {
d, err := s.FindDashboardByID(ctx, id)
if err != nil {
return err
}
view := &platform.View{}
view.ID = cell.ID
if err := s.PutView(ctx, view); err != nil {
return err
}
d.Cells = append(d.Cells, cell)
return s.PutDashboard(ctx, d)
}
// RemoveDashboardCell removes a dashboard cell from the dashboard.
func (s *Service) RemoveDashboardCell(ctx context.Context, dashboardID platform.ID, cellID platform.ID) error {
op := OpPrefix + platform.OpRemoveDashboardCell
d, err := s.FindDashboardByID(ctx, dashboardID)
if err != nil {
return &platform.Error{
Err: err,
Op: op,
}
}
idx := -1
for i, cell := range d.Cells {
if cell.ID == cellID {
idx = i
break
}
}
if idx == -1 {
return &platform.Error{
Code: platform.ENotFound,
Op: op,
Msg: platform.ErrCellNotFound,
}
}
if err := s.DeleteView(ctx, d.Cells[idx].ID); err != nil {
return &platform.Error{
Err: err,
Op: op,
}
}
d.Cells = append(d.Cells[:idx], d.Cells[idx+1:]...)
return s.PutDashboardWithMeta(ctx, d)
}
// UpdateDashboardCell will remove a cell from a dashboard.
func (s *Service) UpdateDashboardCell(ctx context.Context, dashboardID platform.ID, cellID platform.ID, upd platform.CellUpdate) (*platform.Cell, error) {
op := OpPrefix + platform.OpUpdateDashboardCell
if err := upd.Valid(); err != nil {
return nil, &platform.Error{
Err: err,
Op: op,
}
}
d, err := s.FindDashboardByID(ctx, dashboardID)
if err != nil {
return nil, &platform.Error{
Err: err,
Op: op,
}
}
idx := -1
for i, cell := range d.Cells {
if cell.ID == cellID {
idx = i
break
}
}
if idx == -1 {
return nil, &platform.Error{
Msg: platform.ErrCellNotFound,
Op: op,
Code: platform.ENotFound,
}
}
if err := upd.Apply(d.Cells[idx]); err != nil {
return nil, &platform.Error{
Err: err,
Op: op,
}
}
cell := d.Cells[idx]
if err := s.PutDashboardWithMeta(ctx, d); err != nil {
return nil, &platform.Error{
Err: err,
Op: op,
}
}
return cell, nil
}
// ReplaceDashboardCells replaces many dashboard cells.
func (s *Service) ReplaceDashboardCells(ctx context.Context, id platform.ID, cs []*platform.Cell) error {
op := OpPrefix + platform.OpReplaceDashboardCells
d, err := s.FindDashboardByID(ctx, id)
if err != nil {
return &platform.Error{
Err: err,
Op: op,
}
}
ids := map[string]*platform.Cell{}
for _, cell := range d.Cells {
ids[cell.ID.String()] = cell
}
for _, cell := range cs {
if !cell.ID.Valid() {
return &platform.Error{
Code: platform.EInvalid,
Op: op,
Msg: "cannot provide empty cell id",
}
}
if _, ok := ids[cell.ID.String()]; !ok {
return &platform.Error{
Code: platform.EConflict,
Op: op,
Msg: "cannot replace cells that were not already present",
}
}
}
d.Cells = cs
return s.PutDashboardWithMeta(ctx, d)
}
// GetDashboardCellView retrieves the view for a dashboard cell.
func (s *Service) GetDashboardCellView(ctx context.Context, dashboardID, cellID platform.ID) (*platform.View, error) {
v, err := s.FindViewByID(ctx, cellID)
if err != nil {
return nil, &platform.Error{
Err: err,
Op: OpPrefix + platform.OpGetDashboardCellView,
}
}
return v, nil
}
// UpdateDashboardCellView updates the view for a dashboard cell.
func (s *Service) UpdateDashboardCellView(ctx context.Context, dashboardID, cellID platform.ID, upd platform.ViewUpdate) (*platform.View, error) {
op := OpPrefix + platform.OpUpdateDashboardCellView
v, err := s.FindViewByID(ctx, cellID)
if err != nil {
return nil, &platform.Error{
Err: err,
Op: op,
}
}
if err := upd.Apply(v); err != nil {
return nil, &platform.Error{
Err: err,
Op: op,
}
}
if err := s.PutView(ctx, v); err != nil {
return nil, &platform.Error{
Err: err,
Op: op,
}
}
return v, nil
}
func (s *Service) loadView(ctx context.Context, id platform.ID) (*platform.View, *platform.Error) {
i, ok := s.viewKV.Load(id.String())
if !ok {
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: "view not found",
}
}
d, ok := i.(*platform.View)
if !ok {
return nil, &platform.Error{
Code: platform.EInvalid,
Msg: fmt.Sprintf("type %T is not a view", i),
}
}
return d, nil
}
// FindViewByID returns a single view by ID.
func (s *Service) FindViewByID(ctx context.Context, id platform.ID) (*platform.View, error) {
v, pe := s.loadView(ctx, id)
if pe != nil {
return nil, pe
}
return v, nil
}
// PutView sets view with the current ID.
func (s *Service) PutView(ctx context.Context, c *platform.View) error {
if c.Properties == nil {
c.Properties = platform.EmptyViewProperties{}
}
s.viewKV.Store(c.ID.String(), c)
return nil
}
// DeleteView removes a view by ID.
func (s *Service) DeleteView(ctx context.Context, id platform.ID) error {
if _, err := s.FindViewByID(ctx, id); err != nil {
return err
}
s.viewKV.Delete(id.String())
return nil
}

View File

@ -1,26 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
platformtesting "github.com/influxdata/influxdb/testing"
)
func initDashboardService(f platformtesting.DashboardFields, t *testing.T) (platform.DashboardService, string, func()) {
s := NewService()
s.IDGenerator = f.IDGenerator
ctx := context.Background()
s.WithTime(f.NowFn)
for _, b := range f.Dashboards {
if err := s.PutDashboard(ctx, b); err != nil {
t.Fatalf("failed to populate Dashboards")
}
}
return s, OpPrefix, func() {}
}
func TestDashboardService(t *testing.T) {
platformtesting.DashboardService(initDashboardService, t)
}

View File

@ -1,150 +0,0 @@
package inmem
import (
"context"
"errors"
"fmt"
"path"
platform "github.com/influxdata/influxdb"
)
var (
errDBRPMappingNotFound = fmt.Errorf("dbrp mapping not found")
)
func encodeDBRPMappingKey(cluster, db, rp string) string {
return path.Join(cluster, db, rp)
}
func (c *Service) loadDBRPMapping(ctx context.Context, cluster, db, rp string) (*platform.DBRPMapping, error) {
i, ok := c.dbrpMappingKV.Load(encodeDBRPMappingKey(cluster, db, rp))
if !ok {
return nil, errDBRPMappingNotFound
}
m, ok := i.(platform.DBRPMapping)
if !ok {
return nil, fmt.Errorf("type %T is not a dbrp mapping", i)
}
return &m, nil
}
// FindBy returns a single dbrp mapping by cluster, db and rp.
func (s *Service) FindBy(ctx context.Context, cluster, db, rp string) (*platform.DBRPMapping, error) {
return s.loadDBRPMapping(ctx, cluster, db, rp)
}
func (c *Service) forEachDBRPMapping(ctx context.Context, fn func(m *platform.DBRPMapping) bool) error {
var err error
c.dbrpMappingKV.Range(func(k, v interface{}) bool {
m, ok := v.(platform.DBRPMapping)
if !ok {
err = fmt.Errorf("type %T is not a dbrp mapping", v)
return false
}
return fn(&m)
})
return err
}
func (s *Service) filterDBRPMappings(ctx context.Context, fn func(m *platform.DBRPMapping) bool) ([]*platform.DBRPMapping, error) {
mappings := []*platform.DBRPMapping{}
err := s.forEachDBRPMapping(ctx, func(m *platform.DBRPMapping) bool {
if fn(m) {
mappings = append(mappings, m)
}
return true
})
if err != nil {
return nil, err
}
return mappings, nil
}
// Find returns the first dbrp mapping that matches filter.
func (s *Service) Find(ctx context.Context, filter platform.DBRPMappingFilter) (*platform.DBRPMapping, error) {
if filter.Cluster == nil && filter.Database == nil && filter.RetentionPolicy == nil {
return nil, fmt.Errorf("no filter parameters provided")
}
// filter by dbrpMapping id
if filter.Cluster != nil && filter.Database != nil && filter.RetentionPolicy != nil {
return s.FindBy(ctx, *filter.Cluster, *filter.Database, *filter.RetentionPolicy)
}
mappings, n, err := s.FindMany(ctx, filter)
if err != nil {
return nil, err
}
if n < 1 {
return nil, errDBRPMappingNotFound
}
return mappings[0], nil
}
// FindMany returns a list of dbrpMappings that match filter and the total count of matching dbrp mappings.
// Additional options provide pagination & sorting.
func (s *Service) FindMany(ctx context.Context, filter platform.DBRPMappingFilter, opt ...platform.FindOptions) ([]*platform.DBRPMapping, int, error) {
// filter by dbrpMapping id
if filter.Cluster != nil && filter.Database != nil && filter.RetentionPolicy != nil {
m, err := s.FindBy(ctx, *filter.Cluster, *filter.Database, *filter.RetentionPolicy)
if err != nil {
return nil, 0, err
}
return []*platform.DBRPMapping{m}, 1, nil
}
filterFunc := func(mapping *platform.DBRPMapping) bool {
return (filter.Cluster == nil || (*filter.Cluster) == mapping.Cluster) &&
(filter.Database == nil || (*filter.Database) == mapping.Database) &&
(filter.RetentionPolicy == nil || (*filter.RetentionPolicy) == mapping.RetentionPolicy) &&
(filter.Default == nil || (*filter.Default) == mapping.Default)
}
mappings, err := s.filterDBRPMappings(ctx, filterFunc)
if err != nil {
return nil, 0, err
}
return mappings, len(mappings), nil
}
// Create creates a new dbrp mapping.
func (s *Service) Create(ctx context.Context, m *platform.DBRPMapping) error {
if err := m.Validate(); err != nil {
return nil
}
existing, err := s.loadDBRPMapping(ctx, m.Cluster, m.Database, m.RetentionPolicy)
if err != nil {
if err == errDBRPMappingNotFound {
return s.PutDBRPMapping(ctx, m)
}
return err
}
if !existing.Equal(m) {
return errors.New("dbrp mapping already exists")
}
return s.PutDBRPMapping(ctx, m)
}
// PutDBRPMapping sets dbrpMapping with the current ID.
func (s *Service) PutDBRPMapping(ctx context.Context, m *platform.DBRPMapping) error {
k := encodeDBRPMappingKey(m.Cluster, m.Database, m.RetentionPolicy)
s.dbrpMappingKV.Store(k, *m)
return nil
}
// Delete removes a dbrp mapping
func (s *Service) Delete(ctx context.Context, cluster, db, rp string) error {
s.dbrpMappingKV.Delete(encodeDBRPMappingKey(cluster, db, rp))
return nil
}

View File

@ -1,43 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
platformtesting "github.com/influxdata/influxdb/testing"
)
func initDBRPMappingService(f platformtesting.DBRPMappingFields, t *testing.T) (platform.DBRPMappingService, func()) {
s := NewService()
ctx := context.TODO()
if err := f.Populate(ctx, s); err != nil {
t.Fatal(err)
}
return s, func() {}
}
func TestDBRPMappingService_CreateDBRPMapping(t *testing.T) {
t.Parallel()
platformtesting.CreateDBRPMapping(initDBRPMappingService, t)
}
func TestDBRPMappingService_FindDBRPMappingByKey(t *testing.T) {
t.Parallel()
platformtesting.FindDBRPMappingByKey(initDBRPMappingService, t)
}
func TestDBRPMappingService_FindDBRPMappings(t *testing.T) {
t.Parallel()
platformtesting.FindDBRPMappings(initDBRPMappingService, t)
}
func TestDBRPMappingService_DeleteDBRPMapping(t *testing.T) {
t.Parallel()
platformtesting.DeleteDBRPMapping(initDBRPMappingService, t)
}
func TestDBRPMappingService_FindDBRPMapping(t *testing.T) {
t.Parallel()
platformtesting.FindDBRPMapping(initDBRPMappingService, t)
}

View File

@ -1,222 +0,0 @@
package inmem
import (
"context"
"fmt"
"path"
"github.com/influxdata/influxdb"
)
func (s *Service) loadLabel(ctx context.Context, id influxdb.ID) (*influxdb.Label, error) {
i, ok := s.labelKV.Load(id.String())
if !ok {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Err: influxdb.ErrLabelNotFound,
}
}
l, ok := i.(influxdb.Label)
if !ok {
return nil, fmt.Errorf("type %T is not a label", i)
}
return &l, nil
}
func (s *Service) forEachLabel(ctx context.Context, fn func(m *influxdb.Label) bool) error {
var err error
s.labelKV.Range(func(k, v interface{}) bool {
l, ok := v.(influxdb.Label)
if !ok {
err = fmt.Errorf("type %T is not a label", v)
return false
}
return fn(&l)
})
return err
}
func (s *Service) forEachLabelMapping(ctx context.Context, fn func(m *influxdb.LabelMapping) bool) error {
var err error
s.labelMappingKV.Range(func(k, v interface{}) bool {
m, ok := v.(influxdb.LabelMapping)
if !ok {
err = fmt.Errorf("type %T is not a label mapping", v)
return false
}
return fn(&m)
})
return err
}
func (s *Service) filterLabels(ctx context.Context, fn func(m *influxdb.Label) bool) ([]*influxdb.Label, error) {
labels := []*influxdb.Label{}
err := s.forEachLabel(ctx, func(l *influxdb.Label) bool {
if fn(l) {
labels = append(labels, l)
}
return true
})
if err != nil {
return nil, err
}
return labels, nil
}
func (s *Service) filterLabelMappings(ctx context.Context, fn func(m *influxdb.LabelMapping) bool) ([]*influxdb.LabelMapping, error) {
mappings := []*influxdb.LabelMapping{}
err := s.forEachLabelMapping(ctx, func(m *influxdb.LabelMapping) bool {
if fn(m) {
mappings = append(mappings, m)
}
return true
})
if err != nil {
return nil, err
}
return mappings, nil
}
func encodeLabelMappingKey(m *influxdb.LabelMapping) string {
return path.Join(m.ResourceID.String(), m.LabelID.String())
}
// FindLabelByID returns a single user by ID.
func (s *Service) FindLabelByID(ctx context.Context, id influxdb.ID) (*influxdb.Label, error) {
return s.loadLabel(ctx, id)
}
// FindLabels will retrieve a list of labels from storage.
func (s *Service) FindLabels(ctx context.Context, filter influxdb.LabelFilter, opt ...influxdb.FindOptions) ([]*influxdb.Label, error) {
filterFunc := func(label *influxdb.Label) bool {
return (filter.Name == "" || (filter.Name == label.Name))
}
labels, err := s.filterLabels(ctx, filterFunc)
if err != nil {
return nil, err
}
return labels, nil
}
// FindResourceLabels returns a list of labels that are mapped to a resource.
func (s *Service) FindResourceLabels(ctx context.Context, filter influxdb.LabelMappingFilter) ([]*influxdb.Label, error) {
filterFunc := func(mapping *influxdb.LabelMapping) bool {
return (filter.ResourceID.String() == mapping.ResourceID.String())
}
mappings, err := s.filterLabelMappings(ctx, filterFunc)
if err != nil {
return nil, err
}
ls := []*influxdb.Label{}
for _, m := range mappings {
l, err := s.FindLabelByID(ctx, m.LabelID)
if err != nil {
return nil, err
}
ls = append(ls, l)
}
return ls, nil
}
// CreateLabel creates a new label.
func (s *Service) CreateLabel(ctx context.Context, l *influxdb.Label) error {
l.ID = s.IDGenerator.ID()
s.labelKV.Store(l.ID, *l)
return nil
}
// CreateLabelMapping creates a mapping that associates a label to a resource.
func (s *Service) CreateLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error {
_, err := s.FindLabelByID(ctx, m.LabelID)
if err != nil {
return &influxdb.Error{
Err: err,
Op: influxdb.OpCreateLabel,
}
}
s.labelMappingKV.Store(encodeLabelMappingKey(m), *m)
return nil
}
// UpdateLabel updates a label.
func (s *Service) UpdateLabel(ctx context.Context, id influxdb.ID, upd influxdb.LabelUpdate) (*influxdb.Label, error) {
label, err := s.FindLabelByID(ctx, id)
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Op: OpPrefix + influxdb.OpUpdateLabel,
Err: influxdb.ErrLabelNotFound,
}
}
if len(upd.Properties) > 0 && label.Properties == nil {
label.Properties = make(map[string]string)
}
for k, v := range upd.Properties {
if v == "" {
delete(label.Properties, k)
} else {
label.Properties[k] = v
}
}
if upd.Name != "" {
label.Name = upd.Name
}
if err := label.Validate(); err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Op: OpPrefix + influxdb.OpUpdateLabel,
Err: err,
}
}
s.labelKV.Store(label.ID.String(), *label)
return label, nil
}
// PutLabel writes a label directly to the database without generating IDs
// or making checks.
func (s *Service) PutLabel(ctx context.Context, l *influxdb.Label) error {
s.labelKV.Store(l.ID.String(), *l)
return nil
}
// DeleteLabel deletes a label.
func (s *Service) DeleteLabel(ctx context.Context, id influxdb.ID) error {
label, err := s.FindLabelByID(ctx, id)
if label == nil && err != nil {
return &influxdb.Error{
Code: influxdb.ENotFound,
Op: OpPrefix + influxdb.OpDeleteLabel,
Err: influxdb.ErrLabelNotFound,
}
}
s.labelKV.Delete(id.String())
return nil
}
// DeleteLabelMapping deletes a label mapping.
func (s *Service) DeleteLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error {
s.labelMappingKV.Delete(encodeLabelMappingKey(m))
return nil
}

View File

@ -1,33 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
platformtesting "github.com/influxdata/influxdb/testing"
)
func initLabelService(f platformtesting.LabelFields, t *testing.T) (platform.LabelService, string, func()) {
s := NewService()
s.IDGenerator = f.IDGenerator
ctx := context.Background()
for _, l := range f.Labels {
if err := s.PutLabel(ctx, l); err != nil {
t.Fatalf("failed to populate labels")
}
}
for _, m := range f.Mappings {
if err := s.CreateLabelMapping(ctx, m); err != nil {
t.Fatalf("failed to populate label mappings")
}
}
return s, OpPrefix, func() {}
}
func TestLabelService(t *testing.T) {
t.Parallel()
platformtesting.LabelService(initLabelService, t)
}

View File

@ -1,58 +0,0 @@
package inmem
import (
"context"
platform "github.com/influxdata/influxdb"
)
var _ platform.LookupService = (*Service)(nil)
// Name returns the name for the resource and ID.
func (s *Service) Name(ctx context.Context, resource platform.ResourceType, id platform.ID) (string, error) {
if err := resource.Valid(); err != nil {
return "", err
}
if ok := id.Valid(); !ok {
return "", platform.ErrInvalidID
}
switch resource {
case platform.TasksResourceType: // 5 // TODO(goller): unify task bolt storage here so we can lookup names
case platform.AuthorizationsResourceType: // 0 TODO(goller): authorizations should also have optional names
case platform.SourcesResourceType: // 4 TODO(goller): no inmen version of sources service: https://github.com/influxdata/platform/issues/2145
case platform.BucketsResourceType: // 1
r, err := s.FindBucketByID(ctx, id)
if err != nil {
return "", err
}
return r.Name, nil
case platform.DashboardsResourceType: // 2
r, err := s.FindDashboardByID(ctx, id)
if err != nil {
return "", err
}
return r.Name, nil
case platform.OrgsResourceType: // 3
r, err := s.FindOrganizationByID(ctx, id)
if err != nil {
return "", err
}
return r.Name, nil
case platform.TelegrafsResourceType: // 6
r, err := s.FindTelegrafConfigByID(ctx, id)
if err != nil {
return "", err
}
return r.Name, nil
case platform.UsersResourceType: // 7
r, err := s.FindUserByID(ctx, id)
if err != nil {
return "", err
}
return r.Name, nil
}
return "", nil
}

View File

@ -1,223 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock"
platformtesting "github.com/influxdata/influxdb/testing"
)
var (
testID = platform.ID(1)
testIDStr = testID.String()
)
func TestService_Name(t *testing.T) {
type initFn func(ctx context.Context, s *Service) error
type args struct {
resource platform.Resource
init initFn
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "error if id is invalid",
args: args{
resource: platform.Resource{
Type: platform.DashboardsResourceType,
ID: platformtesting.IDPtr(platform.InvalidID()),
},
},
wantErr: true,
},
{
name: "error if resource is invalid",
args: args{
resource: platform.Resource{
Type: platform.ResourceType("invalid"),
},
},
wantErr: true,
},
{
name: "authorization resource without a name returns empty string",
args: args{
resource: platform.Resource{
Type: platform.AuthorizationsResourceType,
ID: platformtesting.IDPtr(testID),
},
},
want: "",
},
{
name: "task resource without a name returns an empty string",
args: args{
resource: platform.Resource{
Type: platform.TasksResourceType,
ID: platformtesting.IDPtr(testID),
},
},
want: "",
},
{
name: "bucket with existing id returns name",
args: args{
resource: platform.Resource{
Type: platform.BucketsResourceType,
ID: platformtesting.IDPtr(testID),
},
init: func(ctx context.Context, s *Service) error {
_ = s.CreateOrganization(ctx, &platform.Organization{
Name: "o1",
})
return s.CreateBucket(ctx, &platform.Bucket{
Name: "b1",
OrganizationID: testID,
})
},
},
want: "b1",
},
{
name: "bucket with non-existent id returns error",
args: args{
resource: platform.Resource{
Type: platform.BucketsResourceType,
ID: platformtesting.IDPtr(testID),
},
},
wantErr: true,
},
{
name: "dashboard with existing id returns name",
args: args{
resource: platform.Resource{
Type: platform.DashboardsResourceType,
ID: platformtesting.IDPtr(testID),
},
init: func(ctx context.Context, s *Service) error {
return s.CreateDashboard(ctx, &platform.Dashboard{
Name: "dashboard1",
})
},
},
want: "dashboard1",
},
{
name: "dashboard with non-existent id returns error",
args: args{
resource: platform.Resource{
Type: platform.DashboardsResourceType,
ID: platformtesting.IDPtr(testID),
},
},
wantErr: true,
},
{
name: "org with existing id returns name",
args: args{
resource: platform.Resource{
Type: platform.OrgsResourceType,
ID: platformtesting.IDPtr(testID),
},
init: func(ctx context.Context, s *Service) error {
return s.CreateOrganization(ctx, &platform.Organization{
Name: "org1",
})
},
},
want: "org1",
},
{
name: "org with non-existent id returns error",
args: args{
resource: platform.Resource{
Type: platform.OrgsResourceType,
ID: platformtesting.IDPtr(testID),
},
},
wantErr: true,
},
{
name: "telegraf with existing id returns name",
args: args{
resource: platform.Resource{
Type: platform.TelegrafsResourceType,
ID: platformtesting.IDPtr(testID),
},
init: func(ctx context.Context, s *Service) error {
return s.CreateTelegrafConfig(ctx, &platform.TelegrafConfig{
OrganizationID: platformtesting.MustIDBase16("0000000000000009"),
Name: "telegraf1",
}, testID)
},
},
want: "telegraf1",
},
{
name: "telegraf with non-existent id returns error",
args: args{
resource: platform.Resource{
Type: platform.TelegrafsResourceType,
ID: platformtesting.IDPtr(testID),
},
},
wantErr: true,
},
{
name: "user with existing id returns name",
args: args{
resource: platform.Resource{
Type: platform.UsersResourceType,
ID: platformtesting.IDPtr(testID),
},
init: func(ctx context.Context, s *Service) error {
return s.CreateUser(ctx, &platform.User{
Name: "user1",
})
},
},
want: "user1",
},
{
name: "user with non-existent id returns error",
args: args{
resource: platform.Resource{
Type: platform.UsersResourceType,
ID: platformtesting.IDPtr(testID),
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := NewService()
s.IDGenerator = mock.NewIDGenerator(testIDStr, t)
ctx := context.Background()
if tt.args.init != nil {
if err := tt.args.init(ctx, s); err != nil {
t.Errorf("Service.Name() unable to initialize service: %v", err)
}
}
id := platform.InvalidID()
if tt.args.resource.ID != nil {
id = *tt.args.resource.ID
}
got, err := s.Name(ctx, tt.args.resource.Type, id)
if (err != nil) != tt.wantErr {
t.Errorf("Service.Name() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("Service.Name() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -1,117 +0,0 @@
package inmem
import (
"context"
"fmt"
"time"
platform "github.com/influxdata/influxdb"
)
const onboardingKey = "onboarding_key"
var _ platform.OnboardingService = (*Service)(nil)
// IsOnboarding checks onboardingBucket
// to see if the onboarding key is true.
func (s *Service) IsOnboarding(ctx context.Context) (isOnboarding bool, err error) {
result, ok := s.onboardingKV.Load(onboardingKey)
isOnboarding = !ok || !result.(bool)
return isOnboarding, nil
}
// PutOnboardingStatus will put the isOnboarding to storage
func (s *Service) PutOnboardingStatus(ctx context.Context, v bool) error {
s.onboardingKV.Store(onboardingKey, v)
return nil
}
// Generate OnboardingResults from onboarding request,
// update storage so this request will be disabled for the second run.
func (s *Service) Generate(ctx context.Context, req *platform.OnboardingRequest) (*platform.OnboardingResults, error) {
isOnboarding, err := s.IsOnboarding(ctx)
if err != nil {
return nil, err
}
if !isOnboarding {
return nil, &platform.Error{
Code: platform.EConflict,
Msg: "onboarding has already been completed",
}
}
if req.Password == "" {
return nil, &platform.Error{
Code: platform.EEmptyValue,
Msg: "password is empty",
}
}
if req.User == "" {
return nil, &platform.Error{
Code: platform.EEmptyValue,
Msg: "username is empty",
}
}
if req.Org == "" {
return nil, &platform.Error{
Code: platform.EEmptyValue,
Msg: "org name is empty",
}
}
if req.Bucket == "" {
return nil, &platform.Error{
Code: platform.EEmptyValue,
Msg: "bucket name is empty",
}
}
u := &platform.User{Name: req.User}
if err := s.CreateUser(ctx, u); err != nil {
return nil, err
}
if err = s.SetPassword(ctx, u.Name, req.Password); err != nil {
return nil, err
}
o := &platform.Organization{
Name: req.Org,
}
if err = s.CreateOrganization(ctx, o); err != nil {
return nil, err
}
bucket := &platform.Bucket{
Name: req.Bucket,
Organization: o.Name,
OrganizationID: o.ID,
RetentionPeriod: time.Duration(req.RetentionPeriod) * time.Hour,
}
if err = s.CreateBucket(ctx, bucket); err != nil {
return nil, err
}
auth := &platform.Authorization{
UserID: u.ID,
Description: fmt.Sprintf("%s's Token", u.Name),
OrgID: o.ID,
Permissions: platform.OperPermissions(),
Token: req.Token,
}
if err = s.CreateAuthorization(ctx, auth); err != nil {
return nil, err
}
if err = s.PutOnboardingStatus(ctx, true); err != nil {
return nil, err
}
return &platform.OnboardingResults{
User: u,
Org: o,
Bucket: bucket,
Auth: auth,
}, nil
}

View File

@ -1,24 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
platformtesting "github.com/influxdata/influxdb/testing"
)
func initOnboardingService(f platformtesting.OnboardingFields, t *testing.T) (platform.OnboardingService, func()) {
s := NewService()
s.IDGenerator = f.IDGenerator
s.TokenGenerator = f.TokenGenerator
ctx := context.TODO()
if err := s.PutOnboardingStatus(ctx, !f.IsOnboarding); err != nil {
t.Fatalf("failed to set new onboarding finished: %v", err)
}
return s, func() {}
}
func TestGenerate(t *testing.T) {
platformtesting.Generate(initOnboardingService, t)
}

View File

@ -1,236 +0,0 @@
package inmem
import (
"context"
"fmt"
platform "github.com/influxdata/influxdb"
)
const (
errOrganizationNotFound = "organization not found"
)
func (s *Service) loadOrganization(id platform.ID) (*platform.Organization, *platform.Error) {
i, ok := s.organizationKV.Load(id.String())
if !ok {
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: errOrganizationNotFound,
}
}
b, ok := i.(*platform.Organization)
if !ok {
return nil, &platform.Error{
Code: platform.EInternal,
Msg: fmt.Sprintf("type %T is not a organization", i),
}
}
return b, nil
}
func (s *Service) forEachOrganization(ctx context.Context, fn func(b *platform.Organization) bool) error {
var err error
s.organizationKV.Range(func(k, v interface{}) bool {
o, ok := v.(*platform.Organization)
if !ok {
err = fmt.Errorf("type %T is not a organization", v)
return false
}
return fn(o)
})
return err
}
func (s *Service) filterOrganizations(ctx context.Context, fn func(b *platform.Organization) bool) ([]*platform.Organization, *platform.Error) {
orgs := []*platform.Organization{}
err := s.forEachOrganization(ctx, func(o *platform.Organization) bool {
if fn(o) {
orgs = append(orgs, o)
}
return true
})
if err != nil {
return nil, &platform.Error{
Err: err,
}
}
return orgs, nil
}
// FindOrganizationByID returns a single organization by ID.
func (s *Service) FindOrganizationByID(ctx context.Context, id platform.ID) (*platform.Organization, error) {
o, pe := s.loadOrganization(id)
if pe != nil {
return nil, &platform.Error{
Op: OpPrefix + platform.OpFindOrganizationByID,
Err: pe,
}
}
return o, nil
}
// FindOrganization returns the first organization that matches a filter.
func (s *Service) FindOrganization(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) {
op := OpPrefix + platform.OpFindOrganization
if filter.ID == nil && filter.Name == nil {
return nil, &platform.Error{
Code: platform.EInvalid,
Op: op,
Msg: "no filter parameters provided",
}
}
if filter.ID != nil {
o, err := s.FindOrganizationByID(ctx, *filter.ID)
if err != nil {
return nil, &platform.Error{
Op: op,
Err: err,
}
}
return o, nil
}
orgs, n, err := s.FindOrganizations(ctx, filter)
if err != nil {
return nil, &platform.Error{
Op: op,
Err: err,
}
}
if n < 1 {
msg := errOrganizationNotFound
if filter.Name != nil {
msg = fmt.Sprintf("organization name \"%s\" not found", *filter.Name)
}
return nil, &platform.Error{
Code: platform.ENotFound,
Op: op,
Msg: msg,
}
}
return orgs[0], nil
}
// FindOrganizations returns a list of organizations that match filter and the total count of matching organizations.
func (s *Service) FindOrganizations(ctx context.Context, filter platform.OrganizationFilter, opt ...platform.FindOptions) ([]*platform.Organization, int, error) {
op := OpPrefix + platform.OpFindOrganizations
if filter.ID != nil {
o, err := s.FindOrganizationByID(ctx, *filter.ID)
if err != nil {
return nil, 0, &platform.Error{
Op: op,
Err: err,
}
}
return []*platform.Organization{o}, 1, nil
}
filterFunc := func(o *platform.Organization) bool { return true }
if filter.Name != nil {
filterFunc = func(o *platform.Organization) bool {
return o.Name == *filter.Name
}
}
orgs, pe := s.filterOrganizations(ctx, filterFunc)
if pe != nil {
return nil, 0, &platform.Error{
Err: pe,
Op: op,
}
}
if len(orgs) == 0 {
msg := errOrganizationNotFound
if filter.Name != nil {
msg = fmt.Sprintf("organization name \"%s\" not found", *filter.Name)
}
return orgs, 0, &platform.Error{
Code: platform.ENotFound,
Op: op,
Msg: msg,
}
}
return orgs, len(orgs), nil
}
func (s *Service) findOrganizationByName(ctx context.Context, n string) (*platform.Organization, *platform.Error) {
o, err := s.FindOrganization(ctx, platform.OrganizationFilter{Name: &n})
if err != nil {
return nil, &platform.Error{
Err: err,
}
}
return o, nil
}
// CreateOrganization creates a new organization and sets b.ID with the new identifier.
func (s *Service) CreateOrganization(ctx context.Context, o *platform.Organization) error {
op := OpPrefix + platform.OpCreateOrganization
if _, err := s.FindOrganization(ctx, platform.OrganizationFilter{Name: &o.Name}); err == nil {
return &platform.Error{
Code: platform.EConflict,
Op: op,
Msg: fmt.Sprintf("organization with name %s already exists", o.Name),
}
}
o.ID = s.IDGenerator.ID()
err := s.PutOrganization(ctx, o)
if err != nil {
return &platform.Error{
Op: op,
Err: err,
}
}
return nil
}
// PutOrganization will put a organization without setting an ID.
func (s *Service) PutOrganization(ctx context.Context, o *platform.Organization) error {
s.organizationKV.Store(o.ID.String(), o)
return nil
}
// UpdateOrganization updates a organization according the parameters set on upd.
func (s *Service) UpdateOrganization(ctx context.Context, id platform.ID, upd platform.OrganizationUpdate) (*platform.Organization, error) {
o, err := s.FindOrganizationByID(ctx, id)
if err != nil {
return nil, &platform.Error{
Err: err,
Op: OpPrefix + platform.OpUpdateOrganization,
}
}
if upd.Name != nil {
o.Name = *upd.Name
}
s.organizationKV.Store(o.ID.String(), o)
return o, nil
}
// DeleteOrganization deletes a organization and prunes it from the index.
func (s *Service) DeleteOrganization(ctx context.Context, id platform.ID) error {
if _, err := s.FindOrganizationByID(ctx, id); err != nil {
return &platform.Error{
Err: err,
Op: OpPrefix + platform.OpDeleteOrganization,
}
}
s.organizationKV.Delete(id.String())
return nil
}

View File

@ -1,25 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
platformtesting "github.com/influxdata/influxdb/testing"
)
func initOrganizationService(f platformtesting.OrganizationFields, t *testing.T) (platform.OrganizationService, string, func()) {
s := NewService()
s.IDGenerator = f.IDGenerator
ctx := context.TODO()
for _, o := range f.Organizations {
if err := s.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate organizations")
}
}
return s, OpPrefix, func() {}
}
func TestOrganizationService(t *testing.T) {
platformtesting.OrganizationService(initOrganizationService, t)
}

View File

@ -1,76 +0,0 @@
package inmem
import (
"context"
"fmt"
platform "github.com/influxdata/influxdb"
"golang.org/x/crypto/bcrypt"
)
// MinPasswordLength is the shortest password we allow into the system.
const MinPasswordLength = 8
var (
// EIncorrectPassword is returned when any password operation fails in which
// we do not want to leak information.
EIncorrectPassword = &platform.Error{
Msg: "<forbidden> your username or password is incorrect",
}
// EShortPassword is used when a password is less than the minimum
// acceptable password length.
EShortPassword = &platform.Error{
Msg: "<invalid> passwords must be at least 8 characters long",
}
)
var _ platform.PasswordsService = (*Service)(nil)
// HashCost is currently using bcrypt defaultCost
const HashCost = bcrypt.DefaultCost
// SetPassword stores the password hash associated with a user.
func (s *Service) SetPassword(ctx context.Context, name string, password string) error {
if len(password) < MinPasswordLength {
return EShortPassword
}
u, err := s.FindUser(ctx, platform.UserFilter{Name: &name})
if err != nil {
return EIncorrectPassword
}
hash, err := bcrypt.GenerateFromPassword([]byte(password), HashCost)
if err != nil {
return err
}
s.basicAuthKV.Store(u.ID.String(), hash)
return nil
}
// ComparePassword compares a provided password with the stored password hash.
func (s *Service) ComparePassword(ctx context.Context, name string, password string) error {
u, err := s.FindUser(ctx, platform.UserFilter{Name: &name})
if err != nil {
return EIncorrectPassword
}
hash, ok := s.basicAuthKV.Load(u.ID.String())
if !ok {
hash = []byte{}
}
if err := bcrypt.CompareHashAndPassword(hash.([]byte), []byte(password)); err != nil {
return fmt.Errorf("<forbidden> your username or password is incorrect")
}
return nil
}
// CompareAndSetPassword replaces the old password with the new password if thee old password is correct.
func (s *Service) CompareAndSetPassword(ctx context.Context, name string, old string, new string) error {
if err := s.ComparePassword(ctx, name, old); err != nil {
return err
}
return s.SetPassword(ctx, name, new)
}

View File

@ -1,38 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
platformtesting "github.com/influxdata/influxdb/testing"
)
func initPasswordsService(f platformtesting.PasswordFields, t *testing.T) (platform.PasswordsService, func()) {
s := NewService()
s.IDGenerator = f.IDGenerator
ctx := context.Background()
for _, u := range f.Users {
if err := s.PutUser(ctx, u); err != nil {
t.Fatalf("failed to populate users")
}
}
for i := range f.Passwords {
if err := s.SetPassword(ctx, f.Users[i].Name, f.Passwords[i]); err != nil {
t.Fatalf("error setting passsword user, %s %s: %v", f.Users[i].Name, f.Passwords[i], err)
}
}
return s, func() {}
}
func TestPasswords(t *testing.T) {
t.Parallel()
platformtesting.PasswordsService(initPasswordsService, t)
}
func TestPasswords_CompareAndSet(t *testing.T) {
t.Parallel()
platformtesting.CompareAndSetPassword(initPasswordsService, t)
}

View File

@ -1,161 +0,0 @@
package inmem
import (
"context"
"fmt"
platform "github.com/influxdata/influxdb"
)
const (
errScraperTargetNotFound = "scraper target is not found"
)
var _ platform.ScraperTargetStoreService = (*Service)(nil)
func (s *Service) loadScraperTarget(id platform.ID) (*platform.ScraperTarget, *platform.Error) {
i, ok := s.scraperTargetKV.Load(id.String())
if !ok {
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: errScraperTargetNotFound,
}
}
b, ok := i.(platform.ScraperTarget)
if !ok {
return nil, &platform.Error{
Code: platform.EInvalid,
Msg: fmt.Sprintf("type %T is not a scraper target", i),
}
}
return &b, nil
}
// ListTargets will list all scrape targets.
func (s *Service) ListTargets(ctx context.Context) (list []platform.ScraperTarget, err error) {
list = make([]platform.ScraperTarget, 0)
s.scraperTargetKV.Range(func(_, v interface{}) bool {
b, ok := v.(platform.ScraperTarget)
if !ok {
err = &platform.Error{
Code: platform.EInvalid,
Msg: fmt.Sprintf("type %T is not a scraper target", v),
}
return false
}
list = append(list, b)
return true
})
return list, err
}
// AddTarget add a new scraper target into storage.
func (s *Service) AddTarget(ctx context.Context, target *platform.ScraperTarget, userID platform.ID) (err error) {
target.ID = s.IDGenerator.ID()
if !target.OrgID.Valid() {
return &platform.Error{
Code: platform.EInvalid,
Msg: "provided organization ID has invalid format",
Op: OpPrefix + platform.OpAddTarget,
}
}
if !target.BucketID.Valid() {
return &platform.Error{
Code: platform.EInvalid,
Msg: "provided bucket ID has invalid format",
Op: OpPrefix + platform.OpAddTarget,
}
}
if err := s.PutTarget(ctx, target); err != nil {
return &platform.Error{
Op: OpPrefix + platform.OpAddTarget,
Err: err,
}
}
urm := &platform.UserResourceMapping{
ResourceID: target.ID,
UserID: userID,
UserType: platform.Owner,
ResourceType: platform.ScraperResourceType,
}
if err := s.CreateUserResourceMapping(ctx, urm); err != nil {
return err
}
return nil
}
// RemoveTarget removes a scraper target from the bucket.
func (s *Service) RemoveTarget(ctx context.Context, id platform.ID) error {
if _, pe := s.loadScraperTarget(id); pe != nil {
return &platform.Error{
Err: pe,
Op: OpPrefix + platform.OpRemoveTarget,
}
}
s.scraperTargetKV.Delete(id.String())
err := s.deleteUserResourceMapping(ctx, platform.UserResourceMappingFilter{
ResourceID: id,
ResourceType: platform.ScraperResourceType,
})
if err != nil {
return &platform.Error{
Code: platform.ErrorCode(err),
Op: OpPrefix + platform.OpRemoveTarget,
Err: err,
}
}
return nil
}
// UpdateTarget updates a scraper target.
func (s *Service) UpdateTarget(ctx context.Context, update *platform.ScraperTarget, userID platform.ID) (target *platform.ScraperTarget, err error) {
op := OpPrefix + platform.OpUpdateTarget
if !update.ID.Valid() {
return nil, &platform.Error{
Code: platform.EInvalid,
Op: op,
Msg: "provided scraper target ID has invalid format",
}
}
oldTarget, pe := s.loadScraperTarget(update.ID)
if pe != nil {
return nil, &platform.Error{
Op: op,
Err: pe,
}
}
if !update.OrgID.Valid() {
update.OrgID = oldTarget.OrgID
}
if !update.BucketID.Valid() {
update.BucketID = oldTarget.BucketID
}
if err = s.PutTarget(ctx, update); err != nil {
return nil, &platform.Error{
Op: op,
Err: pe,
}
}
return update, nil
}
// GetTargetByID retrieves a scraper target by id.
func (s *Service) GetTargetByID(ctx context.Context, id platform.ID) (target *platform.ScraperTarget, err error) {
var pe *platform.Error
if target, pe = s.loadScraperTarget(id); pe != nil {
return nil, &platform.Error{
Op: OpPrefix + platform.OpGetTargetByID,
Err: pe,
}
}
return target, nil
}
// PutTarget will put a scraper target without setting an ID.
func (s *Service) PutTarget(ctx context.Context, target *platform.ScraperTarget) error {
s.scraperTargetKV.Store(target.ID.String(), *target)
return nil
}

View File

@ -1,30 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
platformtesting "github.com/influxdata/influxdb/testing"
)
func initScraperTargetStoreService(f platformtesting.TargetFields, t *testing.T) (platform.ScraperTargetStoreService, string, func()) {
s := NewService()
s.IDGenerator = f.IDGenerator
ctx := context.Background()
for _, target := range f.Targets {
if err := s.PutTarget(ctx, target); err != nil {
t.Fatalf("failed to populate scraper targets")
}
}
for _, m := range f.UserResourceMappings {
if err := s.PutUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping")
}
}
return s, OpPrefix, func() {}
}
func TestScraperTargetStoreService(t *testing.T) {
platformtesting.ScraperService(initScraperTargetStoreService, t)
}

View File

@ -1,89 +0,0 @@
package inmem
import (
"context"
"sync"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/rand"
"github.com/influxdata/influxdb/snowflake"
)
// Service implements various top level services.
type Service struct {
authorizationKV sync.Map
organizationKV sync.Map
bucketKV sync.Map
userKV sync.Map
dashboardKV sync.Map
viewKV sync.Map
variableKV sync.Map
dbrpMappingKV sync.Map
userResourceMappingKV sync.Map
labelKV sync.Map
labelMappingKV sync.Map
scraperTargetKV sync.Map
telegrafConfigKV sync.Map
onboardingKV sync.Map
basicAuthKV sync.Map
sessionKV sync.Map
sourceKV sync.Map
TokenGenerator platform.TokenGenerator
IDGenerator platform.IDGenerator
time func() time.Time
}
// NewService creates an instance of a Service.
func NewService() *Service {
s := &Service{
TokenGenerator: rand.NewTokenGenerator(64),
IDGenerator: snowflake.NewIDGenerator(),
time: time.Now,
}
s.initializeSources(context.TODO())
return s
}
// WithTime sets the function for computing the current time. Used for updating meta data
// about objects stored. Should only be used in tests for mocking.
func (s *Service) WithTime(fn func() time.Time) {
s.time = fn
}
// Flush removes all data from the in-memory store
func (s *Service) Flush() {
s.flush(&s.authorizationKV)
s.flush(&s.organizationKV)
s.flush(&s.bucketKV)
s.flush(&s.userKV)
s.flush(&s.dashboardKV)
s.flush(&s.viewKV)
s.flush(&s.variableKV)
s.flush(&s.dbrpMappingKV)
s.flush(&s.userResourceMappingKV)
s.flush(&s.labelKV)
s.flush(&s.labelMappingKV)
s.flush(&s.scraperTargetKV)
s.flush(&s.telegrafConfigKV)
s.flush(&s.onboardingKV)
s.flush(&s.basicAuthKV)
s.flush(&s.sessionKV)
s.flush(&s.sourceKV)
}
func (s *Service) flush(m *sync.Map) {
keys := []interface{}{}
f := func(key, value interface{}) bool {
keys = append(keys, key)
return true
}
m.Range(f)
for _, k := range keys {
m.Delete(k)
}
}

View File

@ -1,99 +0,0 @@
package inmem
import (
"context"
"time"
platform "github.com/influxdata/influxdb"
)
// RenewSession extends the expire time to newExpiration.
func (s *Service) RenewSession(ctx context.Context, session *platform.Session, newExpiration time.Time) error {
if session == nil {
return &platform.Error{
Msg: "session is nil",
}
}
session.ExpiresAt = newExpiration
return s.PutSession(ctx, session)
}
// FindSession retrieves the session found at the provided key.
func (s *Service) FindSession(ctx context.Context, key string) (*platform.Session, error) {
result, found := s.sessionKV.Load(key)
if !found {
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: platform.ErrSessionNotFound,
}
}
sess := new(platform.Session)
*sess = result.(platform.Session)
// TODO(desa): these values should be cached so it's not so expensive to lookup each time.
f := platform.UserResourceMappingFilter{UserID: sess.UserID}
mappings, _, err := s.FindUserResourceMappings(ctx, f)
if err != nil {
return nil, &platform.Error{
Err: err,
}
}
ps := make([]platform.Permission, 0, len(mappings))
for _, m := range mappings {
p, err := m.ToPermissions()
if err != nil {
return nil, &platform.Error{
Err: err,
}
}
ps = append(ps, p...)
}
ps = append(ps, platform.MePermissions(sess.UserID)...)
sess.Permissions = ps
return sess, nil
}
func (s *Service) PutSession(ctx context.Context, sess *platform.Session) error {
s.sessionKV.Store(sess.Key, *sess)
return nil
}
// ExpireSession expires the session at the provided key.
func (s *Service) ExpireSession(ctx context.Context, key string) error {
return nil
}
// CreateSession creates a session for a user with the users maximal privileges.
func (s *Service) CreateSession(ctx context.Context, user string) (*platform.Session, error) {
u, pe := s.findUserByName(ctx, user)
if pe != nil {
return nil, &platform.Error{
Err: pe,
}
}
sess := &platform.Session{}
sess.ID = s.IDGenerator.ID()
k, err := s.TokenGenerator.Token()
if err != nil {
return nil, &platform.Error{
Err: err,
}
}
sess.Key = k
sess.UserID = u.ID
sess.CreatedAt = time.Now()
// TODO(desa): make this configurable
sess.ExpiresAt = sess.CreatedAt.Add(time.Hour)
// TODO(desa): not totally sure what to do here. Possibly we should have a maximal privilege permission.
sess.Permissions = []platform.Permission{}
if err := s.PutSession(ctx, sess); err != nil {
return nil, err
}
return sess, nil
}

View File

@ -1,147 +0,0 @@
package inmem
import (
"context"
"fmt"
platform "github.com/influxdata/influxdb"
)
// DefaultSource is the default source.
var DefaultSource = platform.Source{
Default: true,
Name: "autogen",
Type: platform.SelfSourceType,
}
const (
// DefaultSourceID it the default source identifier
DefaultSourceID = "020f755c3c082000"
// DefaultSourceOrganizationID is the default source's organization identifier
DefaultSourceOrganizationID = "50616e67652c206c"
)
func init() {
if err := DefaultSource.ID.DecodeFromString(DefaultSourceID); err != nil {
panic(fmt.Sprintf("failed to decode default source id: %v", err))
}
if err := DefaultSource.OrganizationID.DecodeFromString(DefaultSourceOrganizationID); err != nil {
panic(fmt.Sprintf("failed to decode default source organization id: %v", err))
}
}
func (s *Service) initializeSources(ctx context.Context) error {
_, pe := s.FindSourceByID(ctx, DefaultSource.ID)
if pe != nil && platform.ErrorCode(pe) != platform.ENotFound {
return pe
}
if platform.ErrorCode(pe) == platform.ENotFound {
if err := s.PutSource(ctx, &DefaultSource); err != nil {
return err
}
}
return nil
}
// DefaultSource retrieves the default source.
func (s *Service) DefaultSource(ctx context.Context) (*platform.Source, error) {
// TODO(desa): make this faster by putting the default source in an index.
srcs, _, err := s.FindSources(ctx, platform.FindOptions{})
if err != nil {
return nil, err
}
for _, src := range srcs {
if src.Default {
return src, nil
}
}
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: "no default source found",
}
}
// FindSourceByID retrieves a source by id.
func (s *Service) FindSourceByID(ctx context.Context, id platform.ID) (*platform.Source, error) {
i, ok := s.sourceKV.Load(id.String())
if !ok {
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: platform.ErrSourceNotFound,
}
}
src, ok := i.(*platform.Source)
if !ok {
return nil, &platform.Error{
Code: platform.EInvalid,
Msg: fmt.Sprintf("type %T is not a source", i),
}
}
return src, nil
}
// FindSources retrives all sources that match an arbitrary source filter.
// Filters using ID, or OrganizationID and source Name should be efficient.
// Other filters will do a linear scan across all sources searching for a match.
func (s *Service) FindSources(ctx context.Context, opt platform.FindOptions) ([]*platform.Source, int, error) {
var ds []*platform.Source
s.sourceKV.Range(func(k, v interface{}) bool {
d, ok := v.(*platform.Source)
if !ok {
return false
}
ds = append(ds, d)
return true
})
return ds, len(ds), nil
}
// CreateSource creates a platform source and sets s.ID.
func (s *Service) CreateSource(ctx context.Context, src *platform.Source) error {
src.ID = s.IDGenerator.ID()
if err := s.PutSource(ctx, src); err != nil {
return &platform.Error{
Err: err,
}
}
return nil
}
// PutSource will put a source without setting an ID.
func (s *Service) PutSource(ctx context.Context, src *platform.Source) error {
s.sourceKV.Store(src.ID.String(), src)
return nil
}
// UpdateSource updates a source according the parameters set on upd.
func (s *Service) UpdateSource(ctx context.Context, id platform.ID, upd platform.SourceUpdate) (*platform.Source, error) {
src, err := s.FindSourceByID(ctx, id)
if err != nil {
return nil, &platform.Error{
Err: err,
Op: OpPrefix + platform.OpUpdateSource,
}
}
upd.Apply(src)
s.sourceKV.Store(src.ID.String(), src)
return src, nil
}
// DeleteSource deletes a source and prunes it from the index.
func (s *Service) DeleteSource(ctx context.Context, id platform.ID) error {
if _, err := s.FindSourceByID(ctx, id); err != nil {
return &platform.Error{
Err: err,
Op: OpPrefix + platform.OpDeleteSource,
}
}
s.sourceKV.Delete(id.String())
return nil
}

View File

@ -1,176 +0,0 @@
package inmem
import (
"context"
platform "github.com/influxdata/influxdb"
)
var _ platform.TelegrafConfigStore = new(Service)
// FindTelegrafConfigByID returns a single telegraf config by ID.
func (s *Service) FindTelegrafConfigByID(ctx context.Context, id platform.ID) (tc *platform.TelegrafConfig, err error) {
var pErr *platform.Error
tc, pErr = s.findTelegrafConfigByID(ctx, id)
if pErr != nil {
err = pErr
}
return tc, err
}
func (s *Service) findTelegrafConfigByID(ctx context.Context, id platform.ID) (*platform.TelegrafConfig, *platform.Error) {
if !id.Valid() {
return nil, &platform.Error{
Code: platform.EInvalid,
Msg: "provided telegraf configuration ID has invalid format",
}
}
result, found := s.telegrafConfigKV.Load(id)
if !found {
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: platform.ErrTelegrafConfigNotFound,
}
}
tc := new(platform.TelegrafConfig)
*tc = result.(platform.TelegrafConfig)
return tc, nil
}
func (s *Service) findTelegrafConfigs(ctx context.Context, filter platform.TelegrafConfigFilter, opt ...platform.FindOptions) ([]*platform.TelegrafConfig, int, *platform.Error) {
tcs := make([]*platform.TelegrafConfig, 0)
m, _, err := s.FindUserResourceMappings(ctx, filter.UserResourceMappingFilter)
if err != nil {
return nil, 0, &platform.Error{
Err: err,
}
}
if len(m) == 0 {
return tcs, 0, nil
}
for _, item := range m {
tc, err := s.findTelegrafConfigByID(ctx, item.ResourceID)
if err != nil && platform.ErrorCode(err) != platform.ENotFound {
return nil, 0, &platform.Error{
// return internal error, for any mapping issue
Err: err,
}
}
if tc != nil {
// Restrict results by organization ID, if it has been provided
if filter.OrganizationID != nil && filter.OrganizationID.Valid() && tc.OrganizationID != *filter.OrganizationID {
continue
}
tcs = append(tcs, tc)
}
}
return tcs, len(tcs), nil
}
// FindTelegrafConfigs returns a list of telegraf configs that match filter and the total count of matching telegraf configs.
// Additional options provide pagination & sorting.
func (s *Service) FindTelegrafConfigs(ctx context.Context, filter platform.TelegrafConfigFilter, opt ...platform.FindOptions) (tcs []*platform.TelegrafConfig, n int, err error) {
op := OpPrefix + platform.OpFindTelegrafConfigs
var pErr *platform.Error
tcs, n, pErr = s.findTelegrafConfigs(ctx, filter)
if pErr != nil {
pErr.Op = op
err = pErr
}
return tcs, n, err
}
func (s *Service) putTelegrafConfig(ctx context.Context, tc *platform.TelegrafConfig) *platform.Error {
if !tc.ID.Valid() {
return &platform.Error{
Code: platform.EEmptyValue,
Err: platform.ErrInvalidID,
}
}
if !tc.OrganizationID.Valid() {
return &platform.Error{
Code: platform.EEmptyValue,
Msg: platform.ErrTelegrafConfigInvalidOrganizationID,
}
}
s.telegrafConfigKV.Store(tc.ID, *tc)
return nil
}
// CreateTelegrafConfig creates a new telegraf config and sets b.ID with the new identifier.
func (s *Service) CreateTelegrafConfig(ctx context.Context, tc *platform.TelegrafConfig, userID platform.ID) error {
op := OpPrefix + platform.OpCreateTelegrafConfig
tc.ID = s.IDGenerator.ID()
pErr := s.putTelegrafConfig(ctx, tc)
if pErr != nil {
pErr.Op = op
return pErr
}
urm := &platform.UserResourceMapping{
ResourceID: tc.ID,
UserID: userID,
UserType: platform.Owner,
ResourceType: platform.TelegrafsResourceType,
}
if err := s.CreateUserResourceMapping(ctx, urm); err != nil {
return err
}
return nil
}
// UpdateTelegrafConfig updates a single telegraf config.
// Returns the new telegraf config after update.
func (s *Service) UpdateTelegrafConfig(ctx context.Context, id platform.ID, tc *platform.TelegrafConfig, userID platform.ID) (*platform.TelegrafConfig, error) {
var err error
op := OpPrefix + platform.OpUpdateTelegrafConfig
current, pErr := s.findTelegrafConfigByID(ctx, id)
if pErr != nil {
pErr.Op = op
err = pErr
return nil, err
}
tc.ID = id
// OrganizationID can not be updated
tc.OrganizationID = current.OrganizationID
pErr = s.putTelegrafConfig(ctx, tc)
if pErr != nil {
pErr.Op = op
err = pErr
}
return tc, err
}
// DeleteTelegrafConfig removes a telegraf config by ID.
func (s *Service) DeleteTelegrafConfig(ctx context.Context, id platform.ID) error {
op := OpPrefix + platform.OpDeleteTelegrafConfig
var err error
if !id.Valid() {
return &platform.Error{
Msg: "provided telegraf configuration ID has invalid format",
Code: platform.EInvalid,
}
}
if _, pErr := s.findTelegrafConfigByID(ctx, id); pErr != nil {
return pErr
}
s.telegrafConfigKV.Delete(id)
err = s.deleteUserResourceMapping(ctx, platform.UserResourceMappingFilter{
ResourceID: id,
ResourceType: platform.TelegrafsResourceType,
})
if err != nil {
return &platform.Error{
Code: platform.ErrorCode(err),
Op: op,
Err: err,
}
}
return nil
}

View File

@ -1,30 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
platformtesting "github.com/influxdata/influxdb/testing"
)
func initTelegrafStore(f platformtesting.TelegrafConfigFields, t *testing.T) (platform.TelegrafConfigStore, func()) {
s := NewService()
s.IDGenerator = f.IDGenerator
ctx := context.Background()
for _, m := range f.UserResourceMappings {
if err := s.PutUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping")
}
}
for _, tc := range f.TelegrafConfigs {
if err := s.putTelegrafConfig(ctx, tc); err != nil {
t.Fatalf("failed to populate telegraf configs")
}
}
return s, func() {}
}
func TestTelegrafStore(t *testing.T) {
platformtesting.TelegrafConfigStore(initTelegrafStore, t)
}

View File

@ -1,128 +0,0 @@
package inmem
import (
"context"
"fmt"
"path"
platform "github.com/influxdata/influxdb"
)
func encodeUserResourceMappingKey(resourceID, userID platform.ID) string {
return path.Join(resourceID.String(), userID.String())
}
func (s *Service) loadUserResourceMapping(ctx context.Context, resourceID, userID platform.ID) (*platform.UserResourceMapping, error) {
i, ok := s.userResourceMappingKV.Load(encodeUserResourceMappingKey(resourceID, userID))
if !ok {
return nil, &platform.Error{
Msg: "user to resource mapping not found",
Code: platform.ENotFound,
}
}
m, ok := i.(platform.UserResourceMapping)
if !ok {
return nil, fmt.Errorf("type %T is not an userResource mapping", i)
}
return &m, nil
}
func (s *Service) FindUserResourceBy(ctx context.Context, resourceID, userID platform.ID) (*platform.UserResourceMapping, error) {
return s.loadUserResourceMapping(ctx, resourceID, userID)
}
func (s *Service) forEachUserResourceMapping(ctx context.Context, fn func(m *platform.UserResourceMapping) bool) error {
var err error
s.userResourceMappingKV.Range(func(k, v interface{}) bool {
m, ok := v.(platform.UserResourceMapping)
if !ok {
err = fmt.Errorf("type %T is not a userResource mapping", v)
return false
}
return fn(&m)
})
return err
}
func (s *Service) filterUserResourceMappings(ctx context.Context, fn func(m *platform.UserResourceMapping) bool) ([]*platform.UserResourceMapping, error) {
mappings := []*platform.UserResourceMapping{}
err := s.forEachUserResourceMapping(ctx, func(m *platform.UserResourceMapping) bool {
if fn(m) {
mappings = append(mappings, m)
}
return true
})
if err != nil {
return nil, err
}
return mappings, nil
}
func (s *Service) FindUserResourceMappings(ctx context.Context, filter platform.UserResourceMappingFilter, opt ...platform.FindOptions) ([]*platform.UserResourceMapping, int, error) {
if filter.ResourceID.Valid() && filter.UserID.Valid() {
m, err := s.FindUserResourceBy(ctx, filter.ResourceID, filter.UserID)
if err != nil {
return nil, 0, err
}
return []*platform.UserResourceMapping{m}, 1, nil
}
filterFunc := func(mapping *platform.UserResourceMapping) bool {
return (!filter.UserID.Valid() || (filter.UserID == mapping.UserID)) &&
(!filter.ResourceID.Valid() || (filter.ResourceID == mapping.ResourceID)) &&
(filter.UserType == "" || (filter.UserType == mapping.UserType)) &&
(filter.ResourceType == "" || (filter.ResourceType == mapping.ResourceType))
}
mappings, err := s.filterUserResourceMappings(ctx, filterFunc)
if err != nil {
return nil, 0, err
}
return mappings, len(mappings), nil
}
func (s *Service) CreateUserResourceMapping(ctx context.Context, m *platform.UserResourceMapping) error {
mapping, _ := s.FindUserResourceBy(ctx, m.ResourceID, m.UserID)
if mapping != nil {
return &platform.Error{
Code: platform.EInternal,
Msg: fmt.Sprintf("Unexpected error when assigning user to a resource: mapping for user %s already exists", m.UserID),
}
}
s.userResourceMappingKV.Store(encodeUserResourceMappingKey(m.ResourceID, m.UserID), *m)
return nil
}
func (s *Service) PutUserResourceMapping(ctx context.Context, m *platform.UserResourceMapping) error {
s.userResourceMappingKV.Store(encodeUserResourceMappingKey(m.ResourceID, m.UserID), *m)
return nil
}
func (s *Service) DeleteUserResourceMapping(ctx context.Context, resourceID, userID platform.ID) error {
mapping, err := s.FindUserResourceBy(ctx, resourceID, userID)
if mapping == nil && err != nil {
return err
}
s.userResourceMappingKV.Delete(encodeUserResourceMappingKey(resourceID, userID))
return nil
}
func (s *Service) deleteUserResourceMapping(ctx context.Context, filter platform.UserResourceMappingFilter) error {
mappings, _, err := s.FindUserResourceMappings(ctx, filter)
if mappings == nil && err != nil {
return err
}
for _, m := range mappings {
s.userResourceMappingKV.Delete(encodeUserResourceMappingKey(m.ResourceID, m.UserID))
}
return nil
}

View File

@ -1,33 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
platformtesting "github.com/influxdata/influxdb/testing"
)
func initUserResourceMappingService(f platformtesting.UserResourceFields, t *testing.T) (platform.UserResourceMappingService, func()) {
s := NewService()
ctx := context.TODO()
for _, m := range f.UserResourceMappings {
if err := s.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate mappings")
}
}
return s, func() {}
}
func TestUserResourceMappingService_FindUserResourceMappings(t *testing.T) {
platformtesting.FindUserResourceMappings(initUserResourceMappingService, t)
}
func TestUserResourceMappingService_CreateUserResourceMapping(t *testing.T) {
platformtesting.CreateUserResourceMapping(initUserResourceMappingService, t)
}
func TestUserResourceMappingService_DeleteUserResourceMapping(t *testing.T) {
platformtesting.DeleteUserResourceMapping(initUserResourceMappingService, t)
}

View File

@ -1,199 +0,0 @@
package inmem
import (
"context"
"fmt"
platform "github.com/influxdata/influxdb"
)
var _ platform.UserService = (*Service)(nil)
func (s *Service) loadUser(id platform.ID) (*platform.User, *platform.Error) {
i, ok := s.userKV.Load(id.String())
if !ok {
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: "user not found",
}
}
b, ok := i.(*platform.User)
if !ok {
return nil, &platform.Error{
Code: platform.EInternal,
Msg: fmt.Sprintf("type %T is not a user", i),
}
}
return b, nil
}
func (s *Service) forEachUser(ctx context.Context, fn func(b *platform.User) bool) error {
var err error
s.userKV.Range(func(k, v interface{}) bool {
o, ok := v.(*platform.User)
if !ok {
err = fmt.Errorf("type %T is not a user", v)
return false
}
return fn(o)
})
return err
}
// FindUserByID returns a single user by ID.
func (s *Service) FindUserByID(ctx context.Context, id platform.ID) (u *platform.User, err error) {
var pe *platform.Error
u, pe = s.loadUser(id)
if pe != nil {
err = &platform.Error{
Op: OpPrefix + platform.OpFindUserByID,
Err: pe,
}
}
return u, err
}
func (s *Service) findUserByName(ctx context.Context, n string) (*platform.User, error) {
return s.FindUser(ctx, platform.UserFilter{Name: &n})
}
// FindUser returns the first user that matches a filter.
func (s *Service) FindUser(ctx context.Context, filter platform.UserFilter) (*platform.User, error) {
op := OpPrefix + platform.OpFindUser
if filter.ID != nil {
u, err := s.FindUserByID(ctx, *filter.ID)
if err != nil {
return nil, &platform.Error{
Op: op,
Err: err,
}
}
return u, nil
}
if filter.Name != nil {
var u *platform.User
err := s.forEachUser(ctx, func(user *platform.User) bool {
if user.Name == *filter.Name {
u = user
return false
}
return true
})
if err != nil {
return nil, err
}
if u == nil {
return nil, &platform.Error{
Code: platform.ENotFound,
Op: op,
Msg: "user not found",
}
}
return u, nil
}
return nil, &platform.Error{
Code: platform.EInvalid,
Op: op,
Msg: "expected filter to contain name",
}
}
// FindUsers will retrieve a list of users from storage.
func (s *Service) FindUsers(ctx context.Context, filter platform.UserFilter, opt ...platform.FindOptions) ([]*platform.User, int, error) {
op := OpPrefix + platform.OpFindUsers
if filter.ID != nil {
u, err := s.FindUserByID(ctx, *filter.ID)
if err != nil {
return nil, 0, &platform.Error{
Err: err,
Op: op,
}
}
return []*platform.User{u}, 1, nil
}
if filter.Name != nil {
u, err := s.FindUser(ctx, filter)
if err != nil {
return nil, 0, &platform.Error{
Err: err,
Op: op,
}
}
return []*platform.User{u}, 1, nil
}
users := []*platform.User{}
err := s.forEachUser(ctx, func(user *platform.User) bool {
users = append(users, user)
return true
})
if err != nil {
return nil, 0, err
}
return users, len(users), nil
}
// CreateUser will create an user into storage.
func (s *Service) CreateUser(ctx context.Context, u *platform.User) error {
if _, err := s.FindUser(ctx, platform.UserFilter{Name: &u.Name}); err == nil {
return &platform.Error{
Code: platform.EConflict,
Op: OpPrefix + platform.OpCreateUser,
Msg: fmt.Sprintf("user with name %s already exists", u.Name),
}
}
u.ID = s.IDGenerator.ID()
s.PutUser(ctx, u)
return nil
}
// PutUser put a user into storage.
func (s *Service) PutUser(ctx context.Context, o *platform.User) error {
s.userKV.Store(o.ID.String(), o)
return nil
}
// UpdateUser update a user in storage.
func (s *Service) UpdateUser(ctx context.Context, id platform.ID, upd platform.UserUpdate) (*platform.User, error) {
o, err := s.FindUserByID(ctx, id)
if err != nil {
return nil, &platform.Error{
Err: err,
Op: OpPrefix + platform.OpUpdateUser,
}
}
if upd.Name != nil {
o.Name = *upd.Name
}
s.userKV.Store(o.ID.String(), o)
return o, nil
}
// DeleteUser remove a user from storage.
func (s *Service) DeleteUser(ctx context.Context, id platform.ID) error {
if _, err := s.FindUserByID(ctx, id); err != nil {
return &platform.Error{
Err: err,
Op: OpPrefix + platform.OpDeleteUser,
}
}
s.userKV.Delete(id.String())
return nil
}

View File

@ -1,38 +0,0 @@
package inmem
import (
"context"
"testing"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kv"
influxdbtesting "github.com/influxdata/influxdb/testing"
)
func initUserService(f influxdbtesting.UserFields, t *testing.T) (influxdb.UserService, string, func()) {
s := NewKVStore()
svc := kv.NewService(s)
svc.IDGenerator = f.IDGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing user service: %v", err)
}
for _, u := range f.Users {
if err := svc.PutUser(ctx, u); err != nil {
t.Fatalf("failed to populate users")
}
}
return svc, "kv/", func() {
for _, u := range f.Users {
if err := svc.DeleteUser(ctx, u.ID); err != nil {
t.Logf("failed to remove users: %v", err)
}
}
}
}
func TestUserService(t *testing.T) {
t.Parallel()
influxdbtesting.UserService(initUserService, t)
}

View File

@ -1,158 +0,0 @@
package inmem
import (
"context"
"fmt"
platform "github.com/influxdata/influxdb"
)
func (s *Service) loadVariable(ctx context.Context, id platform.ID) (*platform.Variable, *platform.Error) {
r, ok := s.variableKV.Load(id.String())
if !ok {
return nil, &platform.Error{
Code: platform.ENotFound,
Msg: platform.ErrVariableNotFound,
}
}
m, ok := r.(*platform.Variable)
if !ok {
return nil, &platform.Error{
Code: platform.EInvalid,
Msg: fmt.Sprintf("type %T is not a variable", r),
}
}
return m, nil
}
// FindVariableByID implements the platform.VariableService interface
func (s *Service) FindVariableByID(ctx context.Context, id platform.ID) (*platform.Variable, error) {
m, pe := s.loadVariable(ctx, id)
if pe != nil {
return nil, &platform.Error{
Err: pe,
Op: OpPrefix + platform.OpFindVariableByID,
}
}
return m, nil
}
func filterVariablesFn(filter platform.VariableFilter) func(m *platform.Variable) bool {
if filter.ID != nil {
return func(m *platform.Variable) bool {
return m.ID == *filter.ID
}
}
if filter.OrganizationID != nil {
return func(m *platform.Variable) bool {
return m.OrganizationID == *filter.OrganizationID
}
}
return func(m *platform.Variable) bool { return true }
}
// FindVariables implements the platform.VariableService interface
func (s *Service) FindVariables(ctx context.Context, filter platform.VariableFilter, opt ...platform.FindOptions) ([]*platform.Variable, error) {
op := OpPrefix + platform.OpFindVariables
var variables []*platform.Variable
if filter.ID != nil {
m, err := s.FindVariableByID(ctx, *filter.ID)
if err != nil && platform.ErrorCode(err) != platform.ENotFound {
return variables, &platform.Error{
Err: err,
Op: op,
}
}
if m == nil {
return variables, nil
}
return []*platform.Variable{m}, nil
}
filterFn := filterVariablesFn(filter)
s.variableKV.Range(func(k, v interface{}) bool {
variable, ok := v.(*platform.Variable)
if !ok {
return false
}
if filterFn(variable) {
variables = append(variables, variable)
}
return true
})
return variables, nil
}
// CreateVariable implements the platform.VariableService interface
func (s *Service) CreateVariable(ctx context.Context, m *platform.Variable) error {
op := OpPrefix + platform.OpCreateVariable
m.ID = s.IDGenerator.ID()
err := s.ReplaceVariable(ctx, m)
if err != nil {
return &platform.Error{
Op: op,
Err: err,
}
}
return nil
}
// UpdateVariable implements the platform.VariableService interface
func (s *Service) UpdateVariable(ctx context.Context, id platform.ID, update *platform.VariableUpdate) (*platform.Variable, error) {
op := OpPrefix + platform.OpUpdateVariable
variable, err := s.FindVariableByID(ctx, id)
if err != nil {
return nil, &platform.Error{
Op: op,
Err: err,
}
}
if err := update.Apply(variable); err != nil {
return nil, &platform.Error{
Op: op,
Err: err,
}
}
if err := s.ReplaceVariable(ctx, variable); err != nil {
return nil, &platform.Error{
Op: op,
Err: err,
}
}
return variable, nil
}
// DeleteVariable implements the platform.VariableService interface
func (s *Service) DeleteVariable(ctx context.Context, id platform.ID) error {
op := OpPrefix + platform.OpDeleteVariable
_, err := s.FindVariableByID(ctx, id)
if err != nil {
return &platform.Error{
Op: op,
Err: err,
}
}
s.variableKV.Delete(id.String())
return nil
}
// ReplaceVariable stores a Variable in the key value store
func (s *Service) ReplaceVariable(ctx context.Context, m *platform.Variable) error {
s.variableKV.Store(m.ID.String(), m)
return nil
}

View File

@ -1,34 +0,0 @@
package inmem
import (
"context"
"testing"
platform "github.com/influxdata/influxdb"
platformtesting "github.com/influxdata/influxdb/testing"
)
func initVariableService(f platformtesting.VariableFields, t *testing.T) (platform.VariableService, string, func()) {
s := NewService()
s.IDGenerator = f.IDGenerator
ctx := context.TODO()
for _, variable := range f.Variables {
if err := s.ReplaceVariable(ctx, variable); err != nil {
t.Fatalf("failed to populate variables")
}
}
done := func() {
for _, variable := range f.Variables {
if err := s.DeleteVariable(ctx, variable.ID); err != nil {
t.Fatalf("failed to clean up variables bolt test: %v", err)
}
}
}
return s, OpPrefix, done
}
func TestVariableService(t *testing.T) {
platformtesting.VariableService(initVariableService, t)
}