refactor(telegraf): add new isolated config service

pull/19902/head
George MacRorie 2020-10-26 11:48:42 +00:00 committed by George
parent 37ef9d2791
commit 222470b1ef
7 changed files with 1367 additions and 4 deletions

View File

@ -0,0 +1,9 @@
package all
import (
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/telegraf"
)
// Migration0010_AddIndexTelegrafByOrg adds the index telegraf configs by organization ID
var Migration0010_AddIndexTelegrafByOrg = kv.NewIndexMigration(telegraf.ByOrganizationIndexMapping, kv.WithIndexMigrationCleanup)

View File

@ -25,5 +25,7 @@ var Migrations = [...]migration.Spec{
Migration0008_LegacyAuthBuckets,
// LegacyAuthPasswordBuckets
Migration0009_LegacyAuthPasswordBuckets,
// add index telegraf by org
Migration0010_AddIndexTelegrafByOrg,
// {{ do_not_edit . }}
}

View File

@ -32,10 +32,6 @@ var (
// TelegrafConfigStore represents a service for managing telegraf config data.
type TelegrafConfigStore interface {
// UserResourceMappingService must be part of all TelegrafConfigStore service,
// for create, search, delete.
UserResourceMappingService
// FindTelegrafConfigByID returns a single telegraf config by ID.
FindTelegrafConfigByID(ctx context.Context, id ID) (*TelegrafConfig, error)

26
telegraf/index.go Normal file
View File

@ -0,0 +1,26 @@
package telegraf
import (
"encoding/json"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
)
var (
// ByOrganizationIndexMapping is the mapping definition for fetching
// telegrafs by organization ID.
ByOrganizationIndexMapping = kv.NewIndexMapping(
[]byte("telegrafv1"),
[]byte("telegrafbyorgindexv1"),
func(v []byte) ([]byte, error) {
var telegraf influxdb.TelegrafConfig
if err := json.Unmarshal(v, &telegraf); err != nil {
return nil, err
}
id, _ := telegraf.OrgID.Encode()
return id, nil
},
)
)

View File

@ -0,0 +1,390 @@
package service
import (
"context"
"encoding/json"
"fmt"
influxdb "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/snowflake"
"github.com/influxdata/influxdb/v2/telegraf"
)
var (
// ErrTelegrafNotFound is used when the telegraf configuration is not found.
ErrTelegrafNotFound = &influxdb.Error{
Msg: "telegraf configuration not found",
Code: influxdb.ENotFound,
}
// ErrInvalidTelegrafID is used when the service was provided
// an invalid ID format.
ErrInvalidTelegrafID = &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "provided telegraf configuration ID has invalid format",
}
// ErrInvalidTelegrafOrgID is the error message for a missing or invalid organization ID.
ErrInvalidTelegrafOrgID = &influxdb.Error{
Code: influxdb.EEmptyValue,
Msg: "provided telegraf configuration organization ID is missing or invalid",
}
)
// UnavailableTelegrafServiceError is used if we aren't able to interact with the
// store, it means the store is not available at the moment (e.g. network).
func UnavailableTelegrafServiceError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("Unable to connect to telegraf service. Please try again; Err: %v", err),
Op: "kv/telegraf",
}
}
// InternalTelegrafServiceError is used when the error comes from an
// internal system.
func InternalTelegrafServiceError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("Unknown internal telegraf data error; Err: %v", err),
Op: "kv/telegraf",
}
}
// CorruptTelegrafError is used when the config cannot be unmarshalled from the
// bytes stored in the kv.
func CorruptTelegrafError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("Unknown internal telegraf data error; Err: %v", err),
Op: "kv/telegraf",
}
}
// ErrUnprocessableTelegraf is used when a telegraf is not able to be converted to JSON.
func ErrUnprocessableTelegraf(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EUnprocessableEntity,
Msg: fmt.Sprintf("unable to convert telegraf configuration into JSON; Err %v", err),
}
}
var (
telegrafBucket = []byte("telegrafv1")
telegrafPluginsBucket = []byte("telegrafPluginsv1")
)
var _ influxdb.TelegrafConfigStore = (*Service)(nil)
// Service is a telegraf config service.
type Service struct {
kv kv.Store
byOrganisationIndex *kv.Index
IDGenerator influxdb.IDGenerator
}
// New constructs and configures a new telegraf config service.
func New(store kv.Store) *Service {
return &Service{
kv: store,
byOrganisationIndex: kv.NewIndex(
telegraf.ByOrganizationIndexMapping,
kv.WithIndexReadPathEnabled,
),
IDGenerator: snowflake.NewIDGenerator(),
}
}
func (s *Service) telegrafBucket(tx kv.Tx) (kv.Bucket, error) {
b, err := tx.Bucket(telegrafBucket)
if err != nil {
return nil, UnavailableTelegrafServiceError(err)
}
return b, nil
}
func (s *Service) telegrafPluginsBucket(tx kv.Tx) (kv.Bucket, error) {
b, err := tx.Bucket(telegrafPluginsBucket)
if err != nil {
return nil, UnavailableTelegrafServiceError(err)
}
return b, nil
}
// FindTelegrafConfigByID returns a single telegraf config by ID.
func (s *Service) FindTelegrafConfigByID(ctx context.Context, id influxdb.ID) (*influxdb.TelegrafConfig, error) {
var (
tc *influxdb.TelegrafConfig
err error
)
err = s.kv.View(ctx, func(tx kv.Tx) error {
tc, err = s.findTelegrafConfigByID(ctx, tx, id)
return err
})
return tc, err
}
func (s *Service) findTelegrafConfigByID(ctx context.Context, tx kv.Tx, id influxdb.ID) (*influxdb.TelegrafConfig, error) {
encID, err := id.Encode()
if err != nil {
return nil, ErrInvalidTelegrafID
}
bucket, err := s.telegrafBucket(tx)
if err != nil {
return nil, err
}
v, err := bucket.Get(encID)
if kv.IsNotFound(err) {
return nil, ErrTelegrafNotFound
}
if err != nil {
return nil, InternalTelegrafServiceError(err)
}
return unmarshalTelegraf(v)
}
// FindTelegrafConfigs returns a list of telegraf configs that match filter and the total count of matching telegraf configs.
// Additional options provide pagination & sorting.
func (s *Service) FindTelegrafConfigs(ctx context.Context, filter influxdb.TelegrafConfigFilter, opt ...influxdb.FindOptions) (tcs []*influxdb.TelegrafConfig, n int, err error) {
err = s.kv.View(ctx, func(tx kv.Tx) error {
tcs, n, err = s.findTelegrafConfigs(ctx, tx, filter)
return err
})
return tcs, n, err
}
func (s *Service) findTelegrafConfigs(ctx context.Context, tx kv.Tx, filter influxdb.TelegrafConfigFilter, opt ...influxdb.FindOptions) ([]*influxdb.TelegrafConfig, int, error) {
tcs := make([]*influxdb.TelegrafConfig, 0)
visit := func(k, v []byte) error {
var tc influxdb.TelegrafConfig
if err := json.Unmarshal(v, &tc); err != nil {
return err
}
tcs = append(tcs, &tc)
return nil
}
if filter.OrgID == nil {
// forward cursor entire bucket
bucket, err := s.telegrafBucket(tx)
if err != nil {
return nil, 0, err
}
// TODO(georgemac): convert find options into cursor options
cursor, err := bucket.ForwardCursor(nil)
if err != nil {
return nil, 0, err
}
return tcs, len(tcs), kv.WalkCursor(ctx, cursor, visit)
}
orgID, err := filter.OrgID.Encode()
if err != nil {
return nil, 0, err
}
return tcs, len(tcs), s.byOrganisationIndex.Walk(ctx, tx, orgID, visit)
}
// PutTelegrafConfig put a telegraf config to storage.
func (s *Service) PutTelegrafConfig(ctx context.Context, tc *influxdb.TelegrafConfig) error {
return s.kv.Update(ctx, func(tx kv.Tx) (err error) {
return s.putTelegrafConfig(ctx, tx, tc)
})
}
func (s *Service) putTelegrafConfig(ctx context.Context, tx kv.Tx, tc *influxdb.TelegrafConfig) error {
encodedID, err := tc.ID.Encode()
if err != nil {
return ErrInvalidTelegrafID
}
if !tc.OrgID.Valid() {
return ErrInvalidTelegrafOrgID
}
orgID, err := tc.OrgID.Encode()
if err != nil {
return err
}
// insert index entry for orgID -> id
if err := s.byOrganisationIndex.Insert(tx, orgID, encodedID); err != nil {
return err
}
v, err := marshalTelegraf(tc)
if err != nil {
return err
}
bucket, err := s.telegrafBucket(tx)
if err != nil {
return err
}
if err := bucket.Put(encodedID, v); err != nil {
return UnavailableTelegrafServiceError(err)
}
return s.putTelegrafConfigStats(encodedID, tx, tc)
}
func (s *Service) putTelegrafConfigStats(encodedID []byte, tx kv.Tx, tc *influxdb.TelegrafConfig) error {
bucket, err := s.telegrafPluginsBucket(tx)
if err != nil {
return err
}
v, err := marshalTelegrafPlugins(tc.CountPlugins())
if err != nil {
return err
}
if err := bucket.Put(encodedID, v); err != nil {
return UnavailableTelegrafServiceError(err)
}
return nil
}
// CreateTelegrafConfig creates a new telegraf config and sets b.ID with the new identifier.
func (s *Service) CreateTelegrafConfig(ctx context.Context, tc *influxdb.TelegrafConfig, userID influxdb.ID) error {
return s.kv.Update(ctx, func(tx kv.Tx) error {
return s.createTelegrafConfig(ctx, tx, tc, userID)
})
}
func (s *Service) createTelegrafConfig(ctx context.Context, tx kv.Tx, tc *influxdb.TelegrafConfig, userID influxdb.ID) error {
tc.ID = s.IDGenerator.ID()
return s.putTelegrafConfig(ctx, tx, tc)
}
// UpdateTelegrafConfig updates a single telegraf config.
// Returns the new telegraf config after update.
func (s *Service) UpdateTelegrafConfig(ctx context.Context, id influxdb.ID, tc *influxdb.TelegrafConfig, userID influxdb.ID) (*influxdb.TelegrafConfig, error) {
var err error
err = s.kv.Update(ctx, func(tx kv.Tx) error {
tc, err = s.updateTelegrafConfig(ctx, tx, id, tc, userID)
return err
})
return tc, err
}
func (s *Service) updateTelegrafConfig(ctx context.Context, tx kv.Tx, id influxdb.ID, tc *influxdb.TelegrafConfig, userID influxdb.ID) (*influxdb.TelegrafConfig, error) {
current, err := s.findTelegrafConfigByID(ctx, tx, id)
if err != nil {
return nil, err
}
// ID and OrganizationID can not be updated
tc.ID = current.ID
tc.OrgID = current.OrgID
err = s.putTelegrafConfig(ctx, tx, tc)
return tc, err
}
// DeleteTelegrafConfig removes a telegraf config by ID.
func (s *Service) DeleteTelegrafConfig(ctx context.Context, id influxdb.ID) error {
return s.kv.Update(ctx, func(tx kv.Tx) error {
return s.deleteTelegrafConfig(ctx, tx, id)
})
}
func (s *Service) deleteTelegrafConfig(ctx context.Context, tx kv.Tx, id influxdb.ID) error {
tc, err := s.findTelegrafConfigByID(ctx, tx, id)
if err != nil {
return err
}
encodedID, err := tc.ID.Encode()
if err != nil {
return ErrInvalidTelegrafID
}
orgID, err := tc.OrgID.Encode()
if err != nil {
return err
}
// removing index entry for orgID -> id
if err := s.byOrganisationIndex.Delete(tx, orgID, encodedID); err != nil {
return err
}
bucket, err := s.telegrafBucket(tx)
if err != nil {
return err
}
_, err = bucket.Get(encodedID)
if kv.IsNotFound(err) {
return ErrTelegrafNotFound
}
if err != nil {
return InternalTelegrafServiceError(err)
}
if err := bucket.Delete(encodedID); err != nil {
return UnavailableTelegrafServiceError(err)
}
return s.deleteTelegrafConfigStats(encodedID, tx)
}
func (s *Service) deleteTelegrafConfigStats(encodedID []byte, tx kv.Tx) error {
bucket, err := s.telegrafPluginsBucket(tx)
if err != nil {
return err
}
if err := bucket.Delete(encodedID); err != nil {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("Unable to connect to telegraf config stats service. Please try again; Err: %v", err),
Op: "kv/telegraf",
}
}
return nil
}
// unmarshalTelegraf turns the stored byte slice in the kv into a *influxdb.TelegrafConfig.
func unmarshalTelegraf(v []byte) (*influxdb.TelegrafConfig, error) {
t := &influxdb.TelegrafConfig{}
if err := json.Unmarshal(v, t); err != nil {
return nil, CorruptTelegrafError(err)
}
return t, nil
}
func marshalTelegraf(tc *influxdb.TelegrafConfig) ([]byte, error) {
v, err := json.Marshal(tc)
if err != nil {
return nil, ErrUnprocessableTelegraf(err)
}
return v, nil
}
func marshalTelegrafPlugins(plugins map[string]float64) ([]byte, error) {
v, err := json.Marshal(plugins)
if err != nil {
return nil, ErrUnprocessableTelegraf(err)
}
return v, nil
}

View File

@ -0,0 +1,84 @@
package service_test
import (
"context"
"errors"
"io/ioutil"
"os"
"testing"
influxdb "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
telegrafservice "github.com/influxdata/influxdb/v2/telegraf/service"
telegraftesting "github.com/influxdata/influxdb/v2/telegraf/service/testing"
"go.uber.org/zap/zaptest"
)
func TestBoltTelegrafService(t *testing.T) {
telegraftesting.TelegrafConfigStore(initBoltTelegrafService, t)
}
func NewTestBoltStore(t *testing.T) (kv.SchemaStore, func(), error) {
f, err := ioutil.TempFile("", "influxdata-bolt-")
if err != nil {
return nil, nil, errors.New("unable to open temporary boltdb file")
}
f.Close()
ctx := context.Background()
logger := zaptest.NewLogger(t)
path := f.Name()
// skip fsync to improve test performance
s := bolt.NewKVStore(logger, path, bolt.WithNoSync)
if err := s.Open(context.Background()); err != nil {
return nil, nil, err
}
if err := all.Up(ctx, logger, s); err != nil {
return nil, nil, err
}
close := func() {
s.Close()
os.Remove(path)
}
return s, close, nil
}
func initBoltTelegrafService(f telegraftesting.TelegrafConfigFields, t *testing.T) (influxdb.TelegrafConfigStore, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, closeSvc := initTelegrafService(s, f, t)
return svc, func() {
closeSvc()
closeBolt()
}
}
func initTelegrafService(s kv.SchemaStore, f telegraftesting.TelegrafConfigFields, t *testing.T) (influxdb.TelegrafConfigStore, func()) {
ctx := context.Background()
svc := telegrafservice.New(s)
svc.IDGenerator = f.IDGenerator
for _, tc := range f.TelegrafConfigs {
if err := svc.PutTelegrafConfig(ctx, tc); err != nil {
t.Fatalf("failed to populate telegraf config: %v", err)
}
}
return svc, func() {
for _, tc := range f.TelegrafConfigs {
if err := svc.DeleteTelegrafConfig(ctx, tc.ID); err != nil {
t.Logf("failed to remove telegraf config: %v", err)
}
}
}
}

View File

@ -0,0 +1,856 @@
package testing
import (
"context"
"fmt"
"sort"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
influxdb "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/telegraf/plugins/inputs"
"github.com/influxdata/influxdb/v2/telegraf/plugins/outputs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var (
oneID = influxdb.ID(1)
twoID = influxdb.ID(2)
threeID = influxdb.ID(3)
fourID = influxdb.ID(4)
)
// TelegrafConfigFields includes prepopulated data for mapping tests.
type TelegrafConfigFields struct {
IDGenerator influxdb.IDGenerator
TelegrafConfigs []*influxdb.TelegrafConfig
}
var telegrafCmpOptions = cmp.Options{
cmpopts.IgnoreUnexported(
inputs.CPUStats{},
inputs.MemStats{},
inputs.Kubernetes{},
inputs.File{},
outputs.File{},
outputs.InfluxDBV2{},
),
cmp.Transformer("Sort", func(in []*influxdb.TelegrafConfig) []*influxdb.TelegrafConfig {
out := append([]*influxdb.TelegrafConfig(nil), in...)
sort.Slice(out, func(i, j int) bool {
return out[i].ID > out[j].ID
})
return out
}),
}
type telegrafTestFactoryFunc func(TelegrafConfigFields, *testing.T) (influxdb.TelegrafConfigStore, func())
// TelegrafConfigStore tests all the service functions.
func TelegrafConfigStore(
init telegrafTestFactoryFunc, t *testing.T,
) {
tests := []struct {
name string
fn func(init telegrafTestFactoryFunc,
t *testing.T)
}{
{
name: "CreateTelegrafConfig",
fn: CreateTelegrafConfig,
},
{
name: "FindTelegrafConfigByID",
fn: FindTelegrafConfigByID,
},
{
name: "FindTelegrafConfigs",
fn: FindTelegrafConfigs,
},
{
name: "UpdateTelegrafConfig",
fn: UpdateTelegrafConfig,
},
{
name: "DeleteTelegrafConfig",
fn: DeleteTelegrafConfig,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt := tt
t.Parallel()
tt.fn(init, t)
})
}
}
// CreateTelegrafConfig testing.
func CreateTelegrafConfig(
init telegrafTestFactoryFunc,
t *testing.T,
) {
type args struct {
telegrafConfig *influxdb.TelegrafConfig
userID influxdb.ID
}
type wants struct {
err error
telegrafs []*influxdb.TelegrafConfig
}
tests := []struct {
name string
fields TelegrafConfigFields
args args
wants wants
}{
{
name: "create telegraf config without organization ID should error",
fields: TelegrafConfigFields{
IDGenerator: mock.NewStaticIDGenerator(oneID),
TelegrafConfigs: []*influxdb.TelegrafConfig{},
},
args: args{
telegrafConfig: &influxdb.TelegrafConfig{},
},
wants: wants{
err: &influxdb.Error{
Code: influxdb.EEmptyValue,
Msg: influxdb.ErrTelegrafConfigInvalidOrgID,
},
},
},
{
name: "create telegraf config with empty set",
fields: TelegrafConfigFields{
IDGenerator: mock.NewStaticIDGenerator(oneID),
TelegrafConfigs: []*influxdb.TelegrafConfig{},
},
args: args{
userID: threeID,
telegrafConfig: &influxdb.TelegrafConfig{
OrgID: twoID,
Name: "name1",
Config: "[[inputs.cpu]]\n[[outputs.influxdb_v2]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
wants: wants{
telegrafs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: twoID,
Name: "name1",
Config: "[[inputs.cpu]]\n[[outputs.influxdb_v2]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
},
{
name: "basic create telegraf config",
fields: TelegrafConfigFields{
IDGenerator: mock.NewStaticIDGenerator(twoID),
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: twoID,
Name: "tc1",
Config: "[[inputs.mem_stats]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
args: args{
userID: threeID,
telegrafConfig: &influxdb.TelegrafConfig{
OrgID: twoID,
Name: "name2",
Config: "[[inputs.cpu]]\n[[outputs.influxdb_v2]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}}, // for inmem test as it doesn't unmarshal..
},
},
wants: wants{
telegrafs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: twoID,
Name: "tc1",
Config: "[[inputs.mem_stats]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: twoID,
OrgID: twoID,
Name: "name2",
Config: "[[inputs.cpu]]\n[[outputs.influxdb_v2]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.Background()
err := s.CreateTelegrafConfig(ctx, tt.args.telegrafConfig, tt.args.userID)
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
if tt.wants.err == nil && !tt.args.telegrafConfig.ID.Valid() {
t.Fatalf("telegraf config ID not set from CreateTelegrafConfig")
}
if err != nil && tt.wants.err != nil {
if influxdb.ErrorCode(err) != influxdb.ErrorCode(tt.wants.err) {
t.Fatalf("expected error messages to match '%v' got '%v'", influxdb.ErrorCode(tt.wants.err), influxdb.ErrorCode(err))
}
}
filter := influxdb.TelegrafConfigFilter{}
tcs, _, err := s.FindTelegrafConfigs(ctx, filter)
if err != nil {
t.Fatalf("failed to retrieve telegraf configs: %v", err)
}
if diff := cmp.Diff(tcs, tt.wants.telegrafs, telegrafCmpOptions...); diff != "" {
t.Errorf("telegraf configs are different -got/+want\ndiff %s", diff)
}
})
}
}
// FindTelegrafConfigByID testing.
func FindTelegrafConfigByID(
init telegrafTestFactoryFunc,
t *testing.T,
) {
type args struct {
id influxdb.ID
}
type wants struct {
err error
telegrafConfig *influxdb.TelegrafConfig
}
tests := []struct {
name string
fields TelegrafConfigFields
args args
wants wants
}{
{
name: "bad id",
fields: TelegrafConfigFields{
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: twoID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: twoID,
OrgID: twoID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
args: args{
id: influxdb.ID(0),
},
wants: wants{
err: fmt.Errorf("provided telegraf configuration ID has invalid format"),
},
},
{
name: "not found",
fields: TelegrafConfigFields{
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: twoID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
},
{
ID: twoID,
OrgID: twoID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
},
},
},
args: args{
id: threeID,
},
wants: wants{
err: &influxdb.Error{
Code: influxdb.ENotFound,
Msg: "telegraf configuration not found",
},
},
},
{
name: "basic find telegraf config by id",
fields: TelegrafConfigFields{
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: threeID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: twoID,
OrgID: threeID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
args: args{
id: twoID,
},
wants: wants{
telegrafConfig: &influxdb.TelegrafConfig{
ID: twoID,
OrgID: threeID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.Background()
tc, err := s.FindTelegrafConfigByID(ctx, tt.args.id)
if (err != nil) != (tt.wants.err != nil) {
t.Fatalf("expected errors to be equal '%v' got '%v'", tt.wants.err, err)
}
if err != nil && tt.wants.err != nil {
if want, got := tt.wants.err.Error(), err.Error(); want != got {
t.Fatalf("expected error '%s' got '%s'", want, got)
}
}
if diff := cmp.Diff(tc, tt.wants.telegrafConfig, telegrafCmpOptions...); diff != "" {
t.Errorf("telegraf configs are different -got/+want\ndiff %s", diff)
}
})
}
}
// FindTelegrafConfigs testing
func FindTelegrafConfigs(
init telegrafTestFactoryFunc,
t *testing.T,
) {
type args struct {
filter influxdb.TelegrafConfigFilter
}
type wants struct {
telegrafConfigs []*influxdb.TelegrafConfig
err error
}
tests := []struct {
name string
fields TelegrafConfigFields
args args
wants wants
}{
{
name: "find nothing (empty set)",
fields: TelegrafConfigFields{
TelegrafConfigs: []*influxdb.TelegrafConfig{},
},
args: args{
filter: influxdb.TelegrafConfigFilter{},
},
wants: wants{
telegrafConfigs: []*influxdb.TelegrafConfig{},
},
},
{
name: "find all telegraf configs (across orgs)",
fields: TelegrafConfigFields{
IDGenerator: mock.NewIncrementingIDGenerator(oneID),
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: twoID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: twoID,
OrgID: threeID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
args: args{
filter: influxdb.TelegrafConfigFilter{},
},
wants: wants{
telegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: twoID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: twoID,
OrgID: threeID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
},
{
name: "filter by organization only",
fields: TelegrafConfigFields{
IDGenerator: mock.NewIncrementingIDGenerator(oneID),
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: fourID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: twoID,
OrgID: fourID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: threeID,
OrgID: oneID,
Name: "tc3",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: fourID,
OrgID: oneID,
Name: "tc4",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
args: args{
filter: influxdb.TelegrafConfigFilter{
OrgID: &oneID,
},
},
wants: wants{
telegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: threeID,
OrgID: oneID,
Name: "tc3",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: fourID,
OrgID: oneID,
Name: "tc4",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
},
{
name: "empty for provided org",
fields: TelegrafConfigFields{
IDGenerator: mock.NewIncrementingIDGenerator(oneID),
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: threeID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
},
{
ID: twoID,
OrgID: threeID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
},
},
},
args: args{
filter: influxdb.TelegrafConfigFilter{
OrgID: &oneID,
},
},
wants: wants{
telegrafConfigs: []*influxdb.TelegrafConfig{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.Background()
tcs, _, err := s.FindTelegrafConfigs(ctx, tt.args.filter)
if err != nil && tt.wants.err == nil {
t.Fatalf("expected errors to be nil got '%v'", err)
}
require.Equal(t, tt.wants.err, err)
assert.Equal(t, tt.wants.telegrafConfigs, tcs)
})
}
}
// UpdateTelegrafConfig testing.
func UpdateTelegrafConfig(
init telegrafTestFactoryFunc,
t *testing.T,
) {
type args struct {
userID influxdb.ID
id influxdb.ID
telegrafConfig *influxdb.TelegrafConfig
}
type wants struct {
telegrafConfig *influxdb.TelegrafConfig
err error
}
tests := []struct {
name string
fields TelegrafConfigFields
args args
wants wants
}{
{
name: "can't find the id",
fields: TelegrafConfigFields{
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: fourID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
},
{
ID: twoID,
OrgID: fourID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
},
},
},
args: args{
userID: threeID,
id: fourID,
telegrafConfig: &influxdb.TelegrafConfig{
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
},
},
wants: wants{
err: &influxdb.Error{
Code: influxdb.ENotFound,
Msg: fmt.Sprintf("telegraf config with ID %v not found", fourID),
},
},
},
{
fields: TelegrafConfigFields{
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: fourID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: twoID,
OrgID: fourID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
args: args{
userID: fourID,
id: twoID,
telegrafConfig: &influxdb.TelegrafConfig{
OrgID: oneID, // notice this get ignored - ie., resulting TelegrafConfig will have OrgID equal to fourID
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
},
},
wants: wants{
telegrafConfig: &influxdb.TelegrafConfig{
ID: twoID,
OrgID: fourID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
},
},
},
{
name: "config update",
fields: TelegrafConfigFields{
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: oneID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
},
{
ID: twoID,
OrgID: oneID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.kubernetes]]\n[[inputs.kubernetes]]\n",
},
},
},
args: args{
userID: fourID,
id: twoID,
telegrafConfig: &influxdb.TelegrafConfig{
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.kubernetes]]\n[[inputs.kubernetes]]\n",
},
},
wants: wants{
telegrafConfig: &influxdb.TelegrafConfig{
ID: twoID,
OrgID: oneID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.kubernetes]]\n[[inputs.kubernetes]]\n",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.Background()
tc, err := s.UpdateTelegrafConfig(ctx, tt.args.id,
tt.args.telegrafConfig, tt.args.userID)
if err != nil && tt.wants.err == nil {
t.Fatalf("expected errors to be nil got '%v'", err)
}
if err != nil && tt.wants.err != nil {
if influxdb.ErrorCode(err) != influxdb.ErrorCode(tt.wants.err) {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
}
if diff := cmp.Diff(tc, tt.wants.telegrafConfig, telegrafCmpOptions...); tt.wants.err == nil && diff != "" {
fmt.Println(tc.Metadata, tt.wants.telegrafConfig.Metadata)
t.Errorf("telegraf configs are different -got/+want\ndiff %s", diff)
}
})
}
}
// DeleteTelegrafConfig testing.
func DeleteTelegrafConfig(
init telegrafTestFactoryFunc,
t *testing.T,
) {
type args struct {
id influxdb.ID
}
type wants struct {
telegrafConfigs []*influxdb.TelegrafConfig
err error
}
tests := []struct {
name string
fields TelegrafConfigFields
args args
wants wants
}{
{
name: "bad id",
fields: TelegrafConfigFields{
IDGenerator: mock.NewIncrementingIDGenerator(oneID),
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: fourID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: twoID,
OrgID: fourID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
args: args{
id: influxdb.ID(0),
},
wants: wants{
err: fmt.Errorf("provided telegraf configuration ID has invalid format"),
telegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: fourID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: twoID,
OrgID: fourID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
},
{
name: "none existing config",
fields: TelegrafConfigFields{
IDGenerator: mock.NewIncrementingIDGenerator(oneID),
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: threeID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: twoID,
OrgID: threeID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
args: args{
id: fourID,
},
wants: wants{
err: fmt.Errorf("telegraf configuration not found"),
telegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: threeID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: twoID,
OrgID: threeID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
},
{
name: "regular delete",
fields: TelegrafConfigFields{
IDGenerator: mock.NewIncrementingIDGenerator(oneID),
TelegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: twoID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
{
ID: twoID,
OrgID: twoID,
Name: "tc2",
Config: "[[inputs.file]]\n[[inputs.mem]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
args: args{
id: twoID,
},
wants: wants{
telegrafConfigs: []*influxdb.TelegrafConfig{
{
ID: oneID,
OrgID: twoID,
Name: "tc1",
Config: "[[inputs.cpu]]\n",
Metadata: map[string]interface{}{"buckets": []interface{}{}},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, done := init(tt.fields, t)
defer done()
ctx := context.Background()
err := s.DeleteTelegrafConfig(ctx, tt.args.id)
if err != nil && tt.wants.err == nil {
t.Fatalf("expected errors to be nil got '%v'", err)
}
if err != nil && tt.wants.err != nil {
if want, got := tt.wants.err.Error(), err.Error(); want != got {
t.Fatalf("expected error '%v' got '%v'", tt.wants.err, err)
}
}
tcs, _, err := s.FindTelegrafConfigs(ctx, influxdb.TelegrafConfigFilter{})
require.NoError(t, err)
assert.Equal(t, tt.wants.telegrafConfigs, tcs)
})
}
}