diff --git a/kv/migration/all/0010_add-index-telegraf-by-org.go b/kv/migration/all/0010_add-index-telegraf-by-org.go new file mode 100644 index 0000000000..9e904cdde3 --- /dev/null +++ b/kv/migration/all/0010_add-index-telegraf-by-org.go @@ -0,0 +1,9 @@ +package all + +import ( + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/telegraf" +) + +// Migration0010_AddIndexTelegrafByOrg adds the index telegraf configs by organization ID +var Migration0010_AddIndexTelegrafByOrg = kv.NewIndexMigration(telegraf.ByOrganizationIndexMapping, kv.WithIndexMigrationCleanup) diff --git a/kv/migration/all/all.go b/kv/migration/all/all.go index 56db939dff..54715d48d1 100644 --- a/kv/migration/all/all.go +++ b/kv/migration/all/all.go @@ -25,5 +25,7 @@ var Migrations = [...]migration.Spec{ Migration0008_LegacyAuthBuckets, // LegacyAuthPasswordBuckets Migration0009_LegacyAuthPasswordBuckets, + // add index telegraf by org + Migration0010_AddIndexTelegrafByOrg, // {{ do_not_edit . }} } diff --git a/telegraf.go b/telegraf.go index dde584b194..a09213b1dd 100644 --- a/telegraf.go +++ b/telegraf.go @@ -32,10 +32,6 @@ var ( // TelegrafConfigStore represents a service for managing telegraf config data. type TelegrafConfigStore interface { - // UserResourceMappingService must be part of all TelegrafConfigStore service, - // for create, search, delete. - UserResourceMappingService - // FindTelegrafConfigByID returns a single telegraf config by ID. FindTelegrafConfigByID(ctx context.Context, id ID) (*TelegrafConfig, error) diff --git a/telegraf/index.go b/telegraf/index.go new file mode 100644 index 0000000000..c22d153fb1 --- /dev/null +++ b/telegraf/index.go @@ -0,0 +1,26 @@ +package telegraf + +import ( + "encoding/json" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kv" +) + +var ( + // ByOrganizationIndexMapping is the mapping definition for fetching + // telegrafs by organization ID. + ByOrganizationIndexMapping = kv.NewIndexMapping( + []byte("telegrafv1"), + []byte("telegrafbyorgindexv1"), + func(v []byte) ([]byte, error) { + var telegraf influxdb.TelegrafConfig + if err := json.Unmarshal(v, &telegraf); err != nil { + return nil, err + } + + id, _ := telegraf.OrgID.Encode() + return id, nil + }, + ) +) diff --git a/telegraf/service/telegraf.go b/telegraf/service/telegraf.go new file mode 100644 index 0000000000..040d56e395 --- /dev/null +++ b/telegraf/service/telegraf.go @@ -0,0 +1,390 @@ +package service + +import ( + "context" + "encoding/json" + "fmt" + + influxdb "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/snowflake" + "github.com/influxdata/influxdb/v2/telegraf" +) + +var ( + // ErrTelegrafNotFound is used when the telegraf configuration is not found. + ErrTelegrafNotFound = &influxdb.Error{ + Msg: "telegraf configuration not found", + Code: influxdb.ENotFound, + } + + // ErrInvalidTelegrafID is used when the service was provided + // an invalid ID format. + ErrInvalidTelegrafID = &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "provided telegraf configuration ID has invalid format", + } + + // ErrInvalidTelegrafOrgID is the error message for a missing or invalid organization ID. + ErrInvalidTelegrafOrgID = &influxdb.Error{ + Code: influxdb.EEmptyValue, + Msg: "provided telegraf configuration organization ID is missing or invalid", + } +) + +// UnavailableTelegrafServiceError is used if we aren't able to interact with the +// store, it means the store is not available at the moment (e.g. network). +func UnavailableTelegrafServiceError(err error) *influxdb.Error { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: fmt.Sprintf("Unable to connect to telegraf service. Please try again; Err: %v", err), + Op: "kv/telegraf", + } +} + +// InternalTelegrafServiceError is used when the error comes from an +// internal system. +func InternalTelegrafServiceError(err error) *influxdb.Error { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: fmt.Sprintf("Unknown internal telegraf data error; Err: %v", err), + Op: "kv/telegraf", + } +} + +// CorruptTelegrafError is used when the config cannot be unmarshalled from the +// bytes stored in the kv. +func CorruptTelegrafError(err error) *influxdb.Error { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: fmt.Sprintf("Unknown internal telegraf data error; Err: %v", err), + Op: "kv/telegraf", + } +} + +// ErrUnprocessableTelegraf is used when a telegraf is not able to be converted to JSON. +func ErrUnprocessableTelegraf(err error) *influxdb.Error { + return &influxdb.Error{ + Code: influxdb.EUnprocessableEntity, + Msg: fmt.Sprintf("unable to convert telegraf configuration into JSON; Err %v", err), + } +} + +var ( + telegrafBucket = []byte("telegrafv1") + telegrafPluginsBucket = []byte("telegrafPluginsv1") +) + +var _ influxdb.TelegrafConfigStore = (*Service)(nil) + +// Service is a telegraf config service. +type Service struct { + kv kv.Store + + byOrganisationIndex *kv.Index + + IDGenerator influxdb.IDGenerator +} + +// New constructs and configures a new telegraf config service. +func New(store kv.Store) *Service { + return &Service{ + kv: store, + byOrganisationIndex: kv.NewIndex( + telegraf.ByOrganizationIndexMapping, + kv.WithIndexReadPathEnabled, + ), + IDGenerator: snowflake.NewIDGenerator(), + } +} + +func (s *Service) telegrafBucket(tx kv.Tx) (kv.Bucket, error) { + b, err := tx.Bucket(telegrafBucket) + if err != nil { + return nil, UnavailableTelegrafServiceError(err) + } + return b, nil +} + +func (s *Service) telegrafPluginsBucket(tx kv.Tx) (kv.Bucket, error) { + b, err := tx.Bucket(telegrafPluginsBucket) + if err != nil { + return nil, UnavailableTelegrafServiceError(err) + } + return b, nil +} + +// FindTelegrafConfigByID returns a single telegraf config by ID. +func (s *Service) FindTelegrafConfigByID(ctx context.Context, id influxdb.ID) (*influxdb.TelegrafConfig, error) { + var ( + tc *influxdb.TelegrafConfig + err error + ) + + err = s.kv.View(ctx, func(tx kv.Tx) error { + tc, err = s.findTelegrafConfigByID(ctx, tx, id) + return err + }) + + return tc, err +} + +func (s *Service) findTelegrafConfigByID(ctx context.Context, tx kv.Tx, id influxdb.ID) (*influxdb.TelegrafConfig, error) { + encID, err := id.Encode() + if err != nil { + return nil, ErrInvalidTelegrafID + } + + bucket, err := s.telegrafBucket(tx) + if err != nil { + return nil, err + } + + v, err := bucket.Get(encID) + if kv.IsNotFound(err) { + return nil, ErrTelegrafNotFound + } + if err != nil { + return nil, InternalTelegrafServiceError(err) + } + + return unmarshalTelegraf(v) +} + +// FindTelegrafConfigs returns a list of telegraf configs that match filter and the total count of matching telegraf configs. +// Additional options provide pagination & sorting. +func (s *Service) FindTelegrafConfigs(ctx context.Context, filter influxdb.TelegrafConfigFilter, opt ...influxdb.FindOptions) (tcs []*influxdb.TelegrafConfig, n int, err error) { + err = s.kv.View(ctx, func(tx kv.Tx) error { + tcs, n, err = s.findTelegrafConfigs(ctx, tx, filter) + return err + }) + return tcs, n, err +} + +func (s *Service) findTelegrafConfigs(ctx context.Context, tx kv.Tx, filter influxdb.TelegrafConfigFilter, opt ...influxdb.FindOptions) ([]*influxdb.TelegrafConfig, int, error) { + tcs := make([]*influxdb.TelegrafConfig, 0) + + visit := func(k, v []byte) error { + var tc influxdb.TelegrafConfig + if err := json.Unmarshal(v, &tc); err != nil { + return err + } + + tcs = append(tcs, &tc) + + return nil + + } + + if filter.OrgID == nil { + // forward cursor entire bucket + bucket, err := s.telegrafBucket(tx) + if err != nil { + return nil, 0, err + } + + // TODO(georgemac): convert find options into cursor options + cursor, err := bucket.ForwardCursor(nil) + if err != nil { + return nil, 0, err + } + + return tcs, len(tcs), kv.WalkCursor(ctx, cursor, visit) + } + + orgID, err := filter.OrgID.Encode() + if err != nil { + return nil, 0, err + } + + return tcs, len(tcs), s.byOrganisationIndex.Walk(ctx, tx, orgID, visit) +} + +// PutTelegrafConfig put a telegraf config to storage. +func (s *Service) PutTelegrafConfig(ctx context.Context, tc *influxdb.TelegrafConfig) error { + return s.kv.Update(ctx, func(tx kv.Tx) (err error) { + return s.putTelegrafConfig(ctx, tx, tc) + }) +} + +func (s *Service) putTelegrafConfig(ctx context.Context, tx kv.Tx, tc *influxdb.TelegrafConfig) error { + encodedID, err := tc.ID.Encode() + if err != nil { + return ErrInvalidTelegrafID + } + + if !tc.OrgID.Valid() { + return ErrInvalidTelegrafOrgID + } + + orgID, err := tc.OrgID.Encode() + if err != nil { + return err + } + + // insert index entry for orgID -> id + if err := s.byOrganisationIndex.Insert(tx, orgID, encodedID); err != nil { + return err + } + + v, err := marshalTelegraf(tc) + if err != nil { + return err + } + + bucket, err := s.telegrafBucket(tx) + if err != nil { + return err + } + + if err := bucket.Put(encodedID, v); err != nil { + return UnavailableTelegrafServiceError(err) + } + + return s.putTelegrafConfigStats(encodedID, tx, tc) +} + +func (s *Service) putTelegrafConfigStats(encodedID []byte, tx kv.Tx, tc *influxdb.TelegrafConfig) error { + bucket, err := s.telegrafPluginsBucket(tx) + if err != nil { + return err + } + + v, err := marshalTelegrafPlugins(tc.CountPlugins()) + if err != nil { + return err + } + + if err := bucket.Put(encodedID, v); err != nil { + return UnavailableTelegrafServiceError(err) + } + + return nil +} + +// CreateTelegrafConfig creates a new telegraf config and sets b.ID with the new identifier. +func (s *Service) CreateTelegrafConfig(ctx context.Context, tc *influxdb.TelegrafConfig, userID influxdb.ID) error { + return s.kv.Update(ctx, func(tx kv.Tx) error { + return s.createTelegrafConfig(ctx, tx, tc, userID) + }) +} + +func (s *Service) createTelegrafConfig(ctx context.Context, tx kv.Tx, tc *influxdb.TelegrafConfig, userID influxdb.ID) error { + tc.ID = s.IDGenerator.ID() + + return s.putTelegrafConfig(ctx, tx, tc) +} + +// UpdateTelegrafConfig updates a single telegraf config. +// Returns the new telegraf config after update. +func (s *Service) UpdateTelegrafConfig(ctx context.Context, id influxdb.ID, tc *influxdb.TelegrafConfig, userID influxdb.ID) (*influxdb.TelegrafConfig, error) { + var err error + err = s.kv.Update(ctx, func(tx kv.Tx) error { + tc, err = s.updateTelegrafConfig(ctx, tx, id, tc, userID) + return err + }) + return tc, err +} + +func (s *Service) updateTelegrafConfig(ctx context.Context, tx kv.Tx, id influxdb.ID, tc *influxdb.TelegrafConfig, userID influxdb.ID) (*influxdb.TelegrafConfig, error) { + current, err := s.findTelegrafConfigByID(ctx, tx, id) + if err != nil { + return nil, err + } + + // ID and OrganizationID can not be updated + tc.ID = current.ID + tc.OrgID = current.OrgID + err = s.putTelegrafConfig(ctx, tx, tc) + return tc, err +} + +// DeleteTelegrafConfig removes a telegraf config by ID. +func (s *Service) DeleteTelegrafConfig(ctx context.Context, id influxdb.ID) error { + return s.kv.Update(ctx, func(tx kv.Tx) error { + return s.deleteTelegrafConfig(ctx, tx, id) + }) +} + +func (s *Service) deleteTelegrafConfig(ctx context.Context, tx kv.Tx, id influxdb.ID) error { + tc, err := s.findTelegrafConfigByID(ctx, tx, id) + if err != nil { + return err + } + + encodedID, err := tc.ID.Encode() + if err != nil { + return ErrInvalidTelegrafID + } + + orgID, err := tc.OrgID.Encode() + if err != nil { + return err + } + + // removing index entry for orgID -> id + if err := s.byOrganisationIndex.Delete(tx, orgID, encodedID); err != nil { + return err + } + + bucket, err := s.telegrafBucket(tx) + if err != nil { + return err + } + + _, err = bucket.Get(encodedID) + if kv.IsNotFound(err) { + return ErrTelegrafNotFound + } + if err != nil { + return InternalTelegrafServiceError(err) + } + + if err := bucket.Delete(encodedID); err != nil { + return UnavailableTelegrafServiceError(err) + } + + return s.deleteTelegrafConfigStats(encodedID, tx) +} + +func (s *Service) deleteTelegrafConfigStats(encodedID []byte, tx kv.Tx) error { + bucket, err := s.telegrafPluginsBucket(tx) + if err != nil { + return err + } + + if err := bucket.Delete(encodedID); err != nil { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: fmt.Sprintf("Unable to connect to telegraf config stats service. Please try again; Err: %v", err), + Op: "kv/telegraf", + } + } + + return nil +} + +// unmarshalTelegraf turns the stored byte slice in the kv into a *influxdb.TelegrafConfig. +func unmarshalTelegraf(v []byte) (*influxdb.TelegrafConfig, error) { + t := &influxdb.TelegrafConfig{} + if err := json.Unmarshal(v, t); err != nil { + return nil, CorruptTelegrafError(err) + } + return t, nil +} + +func marshalTelegraf(tc *influxdb.TelegrafConfig) ([]byte, error) { + v, err := json.Marshal(tc) + if err != nil { + return nil, ErrUnprocessableTelegraf(err) + } + return v, nil +} + +func marshalTelegrafPlugins(plugins map[string]float64) ([]byte, error) { + v, err := json.Marshal(plugins) + if err != nil { + return nil, ErrUnprocessableTelegraf(err) + } + return v, nil +} diff --git a/telegraf/service/telegraf_test.go b/telegraf/service/telegraf_test.go new file mode 100644 index 0000000000..cac1625bf6 --- /dev/null +++ b/telegraf/service/telegraf_test.go @@ -0,0 +1,84 @@ +package service_test + +import ( + "context" + "errors" + "io/ioutil" + "os" + "testing" + + influxdb "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/bolt" + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/kv/migration/all" + telegrafservice "github.com/influxdata/influxdb/v2/telegraf/service" + telegraftesting "github.com/influxdata/influxdb/v2/telegraf/service/testing" + "go.uber.org/zap/zaptest" +) + +func TestBoltTelegrafService(t *testing.T) { + telegraftesting.TelegrafConfigStore(initBoltTelegrafService, t) +} + +func NewTestBoltStore(t *testing.T) (kv.SchemaStore, func(), error) { + f, err := ioutil.TempFile("", "influxdata-bolt-") + if err != nil { + return nil, nil, errors.New("unable to open temporary boltdb file") + } + f.Close() + + ctx := context.Background() + logger := zaptest.NewLogger(t) + path := f.Name() + + // skip fsync to improve test performance + s := bolt.NewKVStore(logger, path, bolt.WithNoSync) + if err := s.Open(context.Background()); err != nil { + return nil, nil, err + } + + if err := all.Up(ctx, logger, s); err != nil { + return nil, nil, err + } + + close := func() { + s.Close() + os.Remove(path) + } + + return s, close, nil +} + +func initBoltTelegrafService(f telegraftesting.TelegrafConfigFields, t *testing.T) (influxdb.TelegrafConfigStore, func()) { + s, closeBolt, err := NewTestBoltStore(t) + if err != nil { + t.Fatalf("failed to create new kv store: %v", err) + } + + svc, closeSvc := initTelegrafService(s, f, t) + return svc, func() { + closeSvc() + closeBolt() + } +} + +func initTelegrafService(s kv.SchemaStore, f telegraftesting.TelegrafConfigFields, t *testing.T) (influxdb.TelegrafConfigStore, func()) { + ctx := context.Background() + + svc := telegrafservice.New(s) + svc.IDGenerator = f.IDGenerator + + for _, tc := range f.TelegrafConfigs { + if err := svc.PutTelegrafConfig(ctx, tc); err != nil { + t.Fatalf("failed to populate telegraf config: %v", err) + } + } + + return svc, func() { + for _, tc := range f.TelegrafConfigs { + if err := svc.DeleteTelegrafConfig(ctx, tc.ID); err != nil { + t.Logf("failed to remove telegraf config: %v", err) + } + } + } +} diff --git a/telegraf/service/testing/testing.go b/telegraf/service/testing/testing.go new file mode 100644 index 0000000000..545002fdec --- /dev/null +++ b/telegraf/service/testing/testing.go @@ -0,0 +1,856 @@ +package testing + +import ( + "context" + "fmt" + "sort" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + influxdb "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/mock" + "github.com/influxdata/influxdb/v2/telegraf/plugins/inputs" + "github.com/influxdata/influxdb/v2/telegraf/plugins/outputs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + oneID = influxdb.ID(1) + twoID = influxdb.ID(2) + threeID = influxdb.ID(3) + fourID = influxdb.ID(4) +) + +// TelegrafConfigFields includes prepopulated data for mapping tests. +type TelegrafConfigFields struct { + IDGenerator influxdb.IDGenerator + TelegrafConfigs []*influxdb.TelegrafConfig +} + +var telegrafCmpOptions = cmp.Options{ + cmpopts.IgnoreUnexported( + inputs.CPUStats{}, + inputs.MemStats{}, + inputs.Kubernetes{}, + inputs.File{}, + outputs.File{}, + outputs.InfluxDBV2{}, + ), + cmp.Transformer("Sort", func(in []*influxdb.TelegrafConfig) []*influxdb.TelegrafConfig { + out := append([]*influxdb.TelegrafConfig(nil), in...) + sort.Slice(out, func(i, j int) bool { + return out[i].ID > out[j].ID + }) + return out + }), +} + +type telegrafTestFactoryFunc func(TelegrafConfigFields, *testing.T) (influxdb.TelegrafConfigStore, func()) + +// TelegrafConfigStore tests all the service functions. +func TelegrafConfigStore( + init telegrafTestFactoryFunc, t *testing.T, +) { + tests := []struct { + name string + fn func(init telegrafTestFactoryFunc, + t *testing.T) + }{ + { + name: "CreateTelegrafConfig", + fn: CreateTelegrafConfig, + }, + { + name: "FindTelegrafConfigByID", + fn: FindTelegrafConfigByID, + }, + { + name: "FindTelegrafConfigs", + fn: FindTelegrafConfigs, + }, + { + name: "UpdateTelegrafConfig", + fn: UpdateTelegrafConfig, + }, + { + name: "DeleteTelegrafConfig", + fn: DeleteTelegrafConfig, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt := tt + t.Parallel() + tt.fn(init, t) + }) + } +} + +// CreateTelegrafConfig testing. +func CreateTelegrafConfig( + init telegrafTestFactoryFunc, + t *testing.T, +) { + type args struct { + telegrafConfig *influxdb.TelegrafConfig + userID influxdb.ID + } + type wants struct { + err error + telegrafs []*influxdb.TelegrafConfig + } + + tests := []struct { + name string + fields TelegrafConfigFields + args args + wants wants + }{ + { + name: "create telegraf config without organization ID should error", + fields: TelegrafConfigFields{ + IDGenerator: mock.NewStaticIDGenerator(oneID), + TelegrafConfigs: []*influxdb.TelegrafConfig{}, + }, + args: args{ + telegrafConfig: &influxdb.TelegrafConfig{}, + }, + wants: wants{ + err: &influxdb.Error{ + Code: influxdb.EEmptyValue, + Msg: influxdb.ErrTelegrafConfigInvalidOrgID, + }, + }, + }, + { + name: "create telegraf config with empty set", + fields: TelegrafConfigFields{ + IDGenerator: mock.NewStaticIDGenerator(oneID), + TelegrafConfigs: []*influxdb.TelegrafConfig{}, + }, + args: args{ + userID: threeID, + telegrafConfig: &influxdb.TelegrafConfig{ + OrgID: twoID, + Name: "name1", + Config: "[[inputs.cpu]]\n[[outputs.influxdb_v2]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + wants: wants{ + telegrafs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: twoID, + Name: "name1", + Config: "[[inputs.cpu]]\n[[outputs.influxdb_v2]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + }, + { + name: "basic create telegraf config", + fields: TelegrafConfigFields{ + IDGenerator: mock.NewStaticIDGenerator(twoID), + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: twoID, + Name: "tc1", + Config: "[[inputs.mem_stats]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + args: args{ + userID: threeID, + telegrafConfig: &influxdb.TelegrafConfig{ + OrgID: twoID, + Name: "name2", + Config: "[[inputs.cpu]]\n[[outputs.influxdb_v2]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, // for inmem test as it doesn't unmarshal.. + }, + }, + wants: wants{ + telegrafs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: twoID, + Name: "tc1", + Config: "[[inputs.mem_stats]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: twoID, + Name: "name2", + Config: "[[inputs.cpu]]\n[[outputs.influxdb_v2]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, done := init(tt.fields, t) + defer done() + ctx := context.Background() + err := s.CreateTelegrafConfig(ctx, tt.args.telegrafConfig, tt.args.userID) + if (err != nil) != (tt.wants.err != nil) { + t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err) + } + if tt.wants.err == nil && !tt.args.telegrafConfig.ID.Valid() { + t.Fatalf("telegraf config ID not set from CreateTelegrafConfig") + } + + if err != nil && tt.wants.err != nil { + if influxdb.ErrorCode(err) != influxdb.ErrorCode(tt.wants.err) { + t.Fatalf("expected error messages to match '%v' got '%v'", influxdb.ErrorCode(tt.wants.err), influxdb.ErrorCode(err)) + } + } + + filter := influxdb.TelegrafConfigFilter{} + tcs, _, err := s.FindTelegrafConfigs(ctx, filter) + if err != nil { + t.Fatalf("failed to retrieve telegraf configs: %v", err) + } + if diff := cmp.Diff(tcs, tt.wants.telegrafs, telegrafCmpOptions...); diff != "" { + t.Errorf("telegraf configs are different -got/+want\ndiff %s", diff) + } + }) + } +} + +// FindTelegrafConfigByID testing. +func FindTelegrafConfigByID( + init telegrafTestFactoryFunc, + t *testing.T, +) { + type args struct { + id influxdb.ID + } + type wants struct { + err error + telegrafConfig *influxdb.TelegrafConfig + } + + tests := []struct { + name string + fields TelegrafConfigFields + args args + wants wants + }{ + { + name: "bad id", + fields: TelegrafConfigFields{ + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: twoID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: twoID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + args: args{ + id: influxdb.ID(0), + }, + wants: wants{ + err: fmt.Errorf("provided telegraf configuration ID has invalid format"), + }, + }, + { + name: "not found", + fields: TelegrafConfigFields{ + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: twoID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + }, + { + ID: twoID, + OrgID: twoID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + }, + }, + }, + args: args{ + id: threeID, + }, + wants: wants{ + err: &influxdb.Error{ + Code: influxdb.ENotFound, + Msg: "telegraf configuration not found", + }, + }, + }, + { + name: "basic find telegraf config by id", + fields: TelegrafConfigFields{ + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: threeID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: threeID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + args: args{ + id: twoID, + }, + wants: wants{ + telegrafConfig: &influxdb.TelegrafConfig{ + ID: twoID, + OrgID: threeID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, done := init(tt.fields, t) + defer done() + ctx := context.Background() + + tc, err := s.FindTelegrafConfigByID(ctx, tt.args.id) + if (err != nil) != (tt.wants.err != nil) { + t.Fatalf("expected errors to be equal '%v' got '%v'", tt.wants.err, err) + } + + if err != nil && tt.wants.err != nil { + if want, got := tt.wants.err.Error(), err.Error(); want != got { + t.Fatalf("expected error '%s' got '%s'", want, got) + } + } + if diff := cmp.Diff(tc, tt.wants.telegrafConfig, telegrafCmpOptions...); diff != "" { + t.Errorf("telegraf configs are different -got/+want\ndiff %s", diff) + } + }) + } +} + +// FindTelegrafConfigs testing +func FindTelegrafConfigs( + init telegrafTestFactoryFunc, + t *testing.T, +) { + type args struct { + filter influxdb.TelegrafConfigFilter + } + + type wants struct { + telegrafConfigs []*influxdb.TelegrafConfig + err error + } + tests := []struct { + name string + fields TelegrafConfigFields + args args + wants wants + }{ + { + name: "find nothing (empty set)", + fields: TelegrafConfigFields{ + TelegrafConfigs: []*influxdb.TelegrafConfig{}, + }, + args: args{ + filter: influxdb.TelegrafConfigFilter{}, + }, + wants: wants{ + telegrafConfigs: []*influxdb.TelegrafConfig{}, + }, + }, + { + name: "find all telegraf configs (across orgs)", + fields: TelegrafConfigFields{ + IDGenerator: mock.NewIncrementingIDGenerator(oneID), + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: twoID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: threeID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + args: args{ + filter: influxdb.TelegrafConfigFilter{}, + }, + wants: wants{ + telegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: twoID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: threeID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + }, + { + name: "filter by organization only", + fields: TelegrafConfigFields{ + IDGenerator: mock.NewIncrementingIDGenerator(oneID), + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: fourID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: fourID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: threeID, + OrgID: oneID, + Name: "tc3", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: fourID, + OrgID: oneID, + Name: "tc4", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + args: args{ + filter: influxdb.TelegrafConfigFilter{ + OrgID: &oneID, + }, + }, + wants: wants{ + telegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: threeID, + OrgID: oneID, + Name: "tc3", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: fourID, + OrgID: oneID, + Name: "tc4", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + }, + { + name: "empty for provided org", + fields: TelegrafConfigFields{ + IDGenerator: mock.NewIncrementingIDGenerator(oneID), + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: threeID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + }, + { + ID: twoID, + OrgID: threeID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + }, + }, + }, + args: args{ + filter: influxdb.TelegrafConfigFilter{ + OrgID: &oneID, + }, + }, + wants: wants{ + telegrafConfigs: []*influxdb.TelegrafConfig{}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, done := init(tt.fields, t) + defer done() + ctx := context.Background() + + tcs, _, err := s.FindTelegrafConfigs(ctx, tt.args.filter) + if err != nil && tt.wants.err == nil { + t.Fatalf("expected errors to be nil got '%v'", err) + } + + require.Equal(t, tt.wants.err, err) + assert.Equal(t, tt.wants.telegrafConfigs, tcs) + }) + } +} + +// UpdateTelegrafConfig testing. +func UpdateTelegrafConfig( + init telegrafTestFactoryFunc, + t *testing.T, +) { + type args struct { + userID influxdb.ID + id influxdb.ID + telegrafConfig *influxdb.TelegrafConfig + } + + type wants struct { + telegrafConfig *influxdb.TelegrafConfig + err error + } + tests := []struct { + name string + fields TelegrafConfigFields + args args + wants wants + }{ + { + name: "can't find the id", + fields: TelegrafConfigFields{ + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: fourID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + }, + { + ID: twoID, + OrgID: fourID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + }, + }, + }, + args: args{ + userID: threeID, + id: fourID, + telegrafConfig: &influxdb.TelegrafConfig{ + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + }, + }, + wants: wants{ + err: &influxdb.Error{ + Code: influxdb.ENotFound, + Msg: fmt.Sprintf("telegraf config with ID %v not found", fourID), + }, + }, + }, + { + fields: TelegrafConfigFields{ + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: fourID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: fourID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + args: args{ + userID: fourID, + id: twoID, + telegrafConfig: &influxdb.TelegrafConfig{ + OrgID: oneID, // notice this get ignored - ie., resulting TelegrafConfig will have OrgID equal to fourID + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + }, + }, + wants: wants{ + telegrafConfig: &influxdb.TelegrafConfig{ + ID: twoID, + OrgID: fourID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + }, + }, + }, + { + name: "config update", + fields: TelegrafConfigFields{ + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: oneID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + }, + { + ID: twoID, + OrgID: oneID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.kubernetes]]\n[[inputs.kubernetes]]\n", + }, + }, + }, + args: args{ + userID: fourID, + id: twoID, + telegrafConfig: &influxdb.TelegrafConfig{ + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.kubernetes]]\n[[inputs.kubernetes]]\n", + }, + }, + wants: wants{ + telegrafConfig: &influxdb.TelegrafConfig{ + ID: twoID, + OrgID: oneID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.kubernetes]]\n[[inputs.kubernetes]]\n", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, done := init(tt.fields, t) + defer done() + ctx := context.Background() + + tc, err := s.UpdateTelegrafConfig(ctx, tt.args.id, + tt.args.telegrafConfig, tt.args.userID) + if err != nil && tt.wants.err == nil { + t.Fatalf("expected errors to be nil got '%v'", err) + } + if err != nil && tt.wants.err != nil { + if influxdb.ErrorCode(err) != influxdb.ErrorCode(tt.wants.err) { + t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err) + } + } + if diff := cmp.Diff(tc, tt.wants.telegrafConfig, telegrafCmpOptions...); tt.wants.err == nil && diff != "" { + fmt.Println(tc.Metadata, tt.wants.telegrafConfig.Metadata) + t.Errorf("telegraf configs are different -got/+want\ndiff %s", diff) + } + }) + } +} + +// DeleteTelegrafConfig testing. +func DeleteTelegrafConfig( + init telegrafTestFactoryFunc, + t *testing.T, +) { + type args struct { + id influxdb.ID + } + + type wants struct { + telegrafConfigs []*influxdb.TelegrafConfig + err error + } + tests := []struct { + name string + fields TelegrafConfigFields + args args + wants wants + }{ + { + name: "bad id", + fields: TelegrafConfigFields{ + IDGenerator: mock.NewIncrementingIDGenerator(oneID), + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: fourID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: fourID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + args: args{ + id: influxdb.ID(0), + }, + wants: wants{ + err: fmt.Errorf("provided telegraf configuration ID has invalid format"), + telegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: fourID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: fourID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + }, + { + name: "none existing config", + fields: TelegrafConfigFields{ + IDGenerator: mock.NewIncrementingIDGenerator(oneID), + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: threeID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: threeID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + args: args{ + id: fourID, + }, + wants: wants{ + err: fmt.Errorf("telegraf configuration not found"), + telegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: threeID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: threeID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + }, + { + name: "regular delete", + fields: TelegrafConfigFields{ + IDGenerator: mock.NewIncrementingIDGenerator(oneID), + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: twoID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: twoID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + args: args{ + id: twoID, + }, + wants: wants{ + telegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: twoID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, done := init(tt.fields, t) + defer done() + ctx := context.Background() + err := s.DeleteTelegrafConfig(ctx, tt.args.id) + if err != nil && tt.wants.err == nil { + t.Fatalf("expected errors to be nil got '%v'", err) + } + + if err != nil && tt.wants.err != nil { + if want, got := tt.wants.err.Error(), err.Error(); want != got { + t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err) + } + } + + tcs, _, err := s.FindTelegrafConfigs(ctx, influxdb.TelegrafConfigFilter{}) + require.NoError(t, err) + assert.Equal(t, tt.wants.telegrafConfigs, tcs) + }) + } +}