From 090f681737896feb99870ee534c9ccc4203b9a6d Mon Sep 17 00:00:00 2001 From: Jeffrey Smith II Date: Thu, 16 Jun 2022 14:48:06 -0400 Subject: [PATCH] feat: Add remotes and replications to telemetry (#23456) * feat: start work on remotes/replications phone home data * feat: add remotes/replications phone home data (no tests * refactor: use erroring binary conversions * style: gofmt * refactor: improve some error handling * style: cleanup * feat: add tests * refactor: just list remotes/replications rather than decrement * chore: linting fix Co-authored-by: DStrand1 --- .gitignore | 1 + bolt/bbolt.go | 2 + bolt/metrics.go | 29 ++++ bolt/metrics_test.go | 2 + cmd/influxd/launcher/launcher.go | 4 +- ...dd_remotes_replications_metrics_buckets.go | 14 ++ kv/migration/all/all.go | 2 + prometheus/filter.go | 2 +- remotes/transport/http.go | 6 +- remotes/transport/middleware_kv.go | 99 ++++++++++++ remotes/transport/middleware_kv_test.go | 149 ++++++++++++++++++ replications/transport/http.go | 5 +- replications/transport/middleware_kv.go | 109 +++++++++++++ replications/transport/middleware_kv_test.go | 147 +++++++++++++++++ telemetry/metrics.go | 2 + 15 files changed, 568 insertions(+), 5 deletions(-) create mode 100644 kv/migration/all/0020_add_remotes_replications_metrics_buckets.go create mode 100644 remotes/transport/middleware_kv.go create mode 100644 remotes/transport/middleware_kv_test.go create mode 100644 replications/transport/middleware_kv.go create mode 100644 replications/transport/middleware_kv_test.go diff --git a/.gitignore b/.gitignore index b5d6b34ba1..7dd8d22efe 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,7 @@ test.key /fluxd /transpilerd /bin +/internal/cmd/kvmigrate/kvmigrate # Project tools that you might install with go build. /editorconfig-checker diff --git a/bolt/bbolt.go b/bolt/bbolt.go index e6b5d9a3a0..9d6c8bdace 100644 --- a/bolt/bbolt.go +++ b/bolt/bbolt.go @@ -102,6 +102,8 @@ func (c *Client) initialize(ctx context.Context) error { scraperBucket, telegrafBucket, telegrafPluginsBucket, + remoteBucket, + replicationBucket, userBucket, } for _, bktName := range bkts { diff --git a/bolt/metrics.go b/bolt/metrics.go index 6a88e26ae2..d47314cdd7 100644 --- a/bolt/metrics.go +++ b/bolt/metrics.go @@ -21,6 +21,8 @@ var ( scraperBucket = []byte("scraperv2") telegrafBucket = []byte("telegrafv1") telegrafPluginsBucket = []byte("telegrafPluginsv1") + remoteBucket = []byte("remotesv2") + replicationBucket = []byte("replicationsv2") userBucket = []byte("usersv1") ) @@ -65,6 +67,16 @@ var ( "Number of individual telegraf plugins configured", []string{"plugin"}, nil) + remoteDesc = prometheus.NewDesc( + "influxdb_remotes_total", + "Number of total remote connections configured on the server", + nil, nil) + + replicationDesc = prometheus.NewDesc( + "influxdb_replications_total", + "Number of total replication configurations on the server", + nil, nil) + boltWritesDesc = prometheus.NewDesc( "boltdb_writes_total", "Total number of boltdb writes", @@ -85,6 +97,8 @@ func (c *Client) Describe(ch chan<- *prometheus.Desc) { ch <- dashboardsDesc ch <- scrapersDesc ch <- telegrafsDesc + ch <- remoteDesc + ch <- replicationDesc ch <- boltWritesDesc ch <- boltReadsDesc @@ -209,12 +223,15 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) { orgs, buckets, users, tokens := 0, 0, 0, 0 dashboards, scrapers, telegrafs := 0, 0, 0 + remotes, replications := 0, 0 _ = c.db.View(func(tx *bolt.Tx) error { buckets = tx.Bucket(bucketBucket).Stats().KeyN dashboards = tx.Bucket(dashboardBucket).Stats().KeyN orgs = tx.Bucket(organizationBucket).Stats().KeyN scrapers = tx.Bucket(scraperBucket).Stats().KeyN telegrafs = tx.Bucket(telegrafBucket).Stats().KeyN + remotes = tx.Bucket(remoteBucket).Stats().KeyN + replications = tx.Bucket(replicationBucket).Stats().KeyN tokens = tx.Bucket(authorizationBucket).Stats().KeyN users = tx.Bucket(userBucket).Stats().KeyN return nil @@ -262,5 +279,17 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) { float64(telegrafs), ) + ch <- prometheus.MustNewConstMetric( + remoteDesc, + prometheus.CounterValue, + float64(remotes), + ) + + ch <- prometheus.MustNewConstMetric( + replicationDesc, + prometheus.CounterValue, + float64(replications), + ) + c.pluginsCollector.Collect(ch) } diff --git a/bolt/metrics_test.go b/bolt/metrics_test.go index 1c177196b6..fc834279f1 100644 --- a/bolt/metrics_test.go +++ b/bolt/metrics_test.go @@ -38,6 +38,8 @@ func TestInitialMetrics(t *testing.T) { "influxdb_users_total": 0, "influxdb_tokens_total": 0, "influxdb_dashboards_total": 0, + "influxdb_remotes_total": 0, + "influxdb_replications_total": 0, "boltdb_reads_total": 0, } for name, count := range metrics { diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 4df96843b3..c78c2304c0 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -363,11 +363,11 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { remotesSvc := remotes.NewService(m.sqlStore) remotesServer := remotesTransport.NewInstrumentedRemotesHandler( - m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc) + m.log.With(zap.String("handler", "remotes")), m.reg, m.kvStore, remotesSvc) replicationSvc, replicationsMetrics := replications.NewService(m.sqlStore, ts, pointsWriter, m.log.With(zap.String("service", "replications")), opts.EnginePath, opts.InstanceID) replicationServer := replicationTransport.NewInstrumentedReplicationHandler( - m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc) + m.log.With(zap.String("handler", "replications")), m.reg, m.kvStore, replicationSvc) ts.BucketService = replications.NewBucketService( m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc) diff --git a/kv/migration/all/0020_add_remotes_replications_metrics_buckets.go b/kv/migration/all/0020_add_remotes_replications_metrics_buckets.go new file mode 100644 index 0000000000..30d6dedfca --- /dev/null +++ b/kv/migration/all/0020_add_remotes_replications_metrics_buckets.go @@ -0,0 +1,14 @@ +package all + +import "github.com/influxdata/influxdb/v2/kv/migration" + +var ( + remoteMetricsBucket = []byte("remotesv2") + replicationsMetricsBucket = []byte("replicationsv2") +) + +var Migration0020_Add_remotes_replications_metrics_buckets = migration.CreateBuckets( + "create remotes and replications metrics buckets", + remoteMetricsBucket, + replicationsMetricsBucket, +) diff --git a/kv/migration/all/all.go b/kv/migration/all/all.go index 1ecd7afdee..2ee8878d21 100644 --- a/kv/migration/all/all.go +++ b/kv/migration/all/all.go @@ -45,5 +45,7 @@ var Migrations = [...]migration.Spec{ Migration0018_RepairMissingShardGroupDurations, // add remotes and replications resource types to operator and all-access tokens Migration0019_AddRemotesReplicationsToTokens, + // add_remotes_replications_metrics_buckets + Migration0020_Add_remotes_replications_metrics_buckets, // {{ do_not_edit . }} } diff --git a/prometheus/filter.go b/prometheus/filter.go index c11a033800..0426afb821 100644 --- a/prometheus/filter.go +++ b/prometheus/filter.go @@ -35,7 +35,7 @@ func NewMatcher() Matcher { return Matcher{} } -// Family helps constuct match by adding a metric family to match to. +// Family helps construct match by adding a metric family to match to. func (m Matcher) Family(name string, lps ...*dto.LabelPair) Matcher { // prometheus metrics labels are sorted by label name. sort.Slice(lps, func(i, j int) bool { diff --git a/remotes/transport/http.go b/remotes/transport/http.go index 1ae8df32fb..37fe1da32f 100644 --- a/remotes/transport/http.go +++ b/remotes/transport/http.go @@ -4,6 +4,8 @@ import ( "context" "net/http" + "github.com/influxdata/influxdb/v2/kv" + "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" "github.com/influxdata/influxdb/v2" @@ -56,7 +58,9 @@ type RemoteConnectionHandler struct { remotesService RemoteConnectionService } -func NewInstrumentedRemotesHandler(log *zap.Logger, reg prometheus.Registerer, svc RemoteConnectionService) *RemoteConnectionHandler { +func NewInstrumentedRemotesHandler(log *zap.Logger, reg prometheus.Registerer, kv kv.Store, svc RemoteConnectionService) *RemoteConnectionHandler { + // Collect telemetry. + svc = newTelemetryCollectingService(kv, svc) // Collect metrics. svc = newMetricCollectingService(reg, svc) // Wrap logging. diff --git a/remotes/transport/middleware_kv.go b/remotes/transport/middleware_kv.go new file mode 100644 index 0000000000..d9e8714d78 --- /dev/null +++ b/remotes/transport/middleware_kv.go @@ -0,0 +1,99 @@ +package transport + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + + "github.com/influxdata/influxdb/v2/kit/platform" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kv" +) + +var remotesBucket = []byte("remotesv2") + +func newTelemetryCollectingService(kv kv.Store, underlying RemoteConnectionService) *telemetryService { + return &telemetryService{ + kv: kv, + underlying: underlying, + } +} + +type telemetryService struct { + kv kv.Store + underlying RemoteConnectionService +} + +func (t telemetryService) ListRemoteConnections(ctx context.Context, filter influxdb.RemoteConnectionListFilter) (*influxdb.RemoteConnections, error) { + return t.underlying.ListRemoteConnections(ctx, filter) +} + +func (t telemetryService) GetRemoteConnection(ctx context.Context, id platform.ID) (*influxdb.RemoteConnection, error) { + return t.underlying.GetRemoteConnection(ctx, id) +} + +func (t telemetryService) UpdateRemoteConnection(ctx context.Context, id platform.ID, request influxdb.UpdateRemoteConnectionRequest) (*influxdb.RemoteConnection, error) { + return t.underlying.UpdateRemoteConnection(ctx, id, request) +} + +func (t telemetryService) CreateRemoteConnection(ctx context.Context, request influxdb.CreateRemoteConnectionRequest) (*influxdb.RemoteConnection, error) { + conn, err := t.underlying.CreateRemoteConnection(ctx, request) + if err != nil { + return conn, err + } + err = t.storeRemoteMetrics(ctx, request.OrgID) + return conn, err +} + +func (t telemetryService) DeleteRemoteConnection(ctx context.Context, id platform.ID) error { + rc, err := t.underlying.GetRemoteConnection(ctx, id) + if err != nil { + return err + } + + err = t.underlying.DeleteRemoteConnection(ctx, id) + if err != nil { + return err + } + + return t.storeRemoteMetrics(ctx, rc.OrgID) +} + +func (t telemetryService) storeRemoteMetrics(ctx context.Context, orgID platform.ID) error { + if err := t.kv.Update(ctx, func(tx kv.Tx) error { + encodedID, err := orgID.Encode() + if err != nil { + return platform.ErrInvalidID + } + bucket, err := tx.Bucket(remotesBucket) + if err != nil { + return err + } + count, err := t.countRemotes(ctx, orgID) + if err != nil { + return err + } + return bucket.Put(encodedID, count) + }); err != nil { + return fmt.Errorf("updating telemetry failed: %v", err) + } + + return nil +} + +func (t telemetryService) countRemotes(ctx context.Context, orgID platform.ID) ([]byte, error) { + req := influxdb.RemoteConnectionListFilter{ + OrgID: orgID, + } + list, err := t.underlying.ListRemoteConnections(ctx, req) + if err != nil { + return nil, err + } + + b := make([]byte, 0, 8) + buf := bytes.NewBuffer(b) + err = binary.Write(buf, binary.BigEndian, int64(len(list.Remotes))) + return buf.Bytes(), err +} diff --git a/remotes/transport/middleware_kv_test.go b/remotes/transport/middleware_kv_test.go new file mode 100644 index 0000000000..d9afedc7c7 --- /dev/null +++ b/remotes/transport/middleware_kv_test.go @@ -0,0 +1,149 @@ +package transport + +import ( + "context" + "encoding/binary" + "testing" + + "github.com/golang/mock/gomock" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/inmem" + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/kv/migration" + "github.com/influxdata/influxdb/v2/kv/migration/all" + "github.com/influxdata/influxdb/v2/remotes/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func getCount(kvStore kv.Store, orgID platform.ID) (uint64, error) { + var count uint64 + if err := kvStore.Update(context.Background(), func(tx kv.Tx) error { + encodedID, err := orgID.Encode() + if err != nil { + return err + } + bucket, err := tx.Bucket([]byte("remotesv2")) + if err != nil { + return err + } + c, err := bucket.Get(encodedID) + if err != nil { + return err + } + + count = binary.BigEndian.Uint64(c) + return nil + }); err != nil { + return 0, err + } + + return count, nil +} + +func TestRemoteCreateKVUpdate(t *testing.T) { + kvStore := inmem.NewKVStore() + gmock := gomock.NewController(t) + defer gmock.Finish() + mockRemote := mock.NewMockRemoteConnectionService(gmock) + telemetry := newTelemetryCollectingService(kvStore, mockRemote) + + kvMigrator, err := migration.NewMigrator( + zap.L(), + kvStore, + all.Migrations[:]..., + ) + require.NoError(t, err) + require.NoError(t, kvMigrator.Up(context.Background())) + + ctx := context.Background() + req := influxdb.CreateRemoteConnectionRequest{ + OrgID: platform.ID(1), + Name: "test1", + RemoteURL: "cloud2.influxdata.com", + RemoteToken: "testtoken", + RemoteOrgID: platform.ID(2), + } + + remoteConnction := influxdb.RemoteConnection{ + OrgID: platform.ID(1), + } + remoteConnections := influxdb.RemoteConnections{ + Remotes: []influxdb.RemoteConnection{remoteConnction}, + } + + mockRemote.EXPECT().CreateRemoteConnection(ctx, req).Return(&remoteConnction, nil).Times(1) + mockRemote.EXPECT().ListRemoteConnections(ctx, influxdb.RemoteConnectionListFilter{OrgID: req.OrgID}).Return(&remoteConnections, nil).Times(1) + + remote, err := telemetry.CreateRemoteConnection(ctx, req) + require.NoError(t, err) + + count, err := getCount(kvStore, remote.OrgID) + require.NoError(t, err) + require.Equal(t, int64(1), int64(count)) +} + +func TestRemoteDeleteKVUpdate(t *testing.T) { + kvStore := inmem.NewKVStore() + gmock := gomock.NewController(t) + defer gmock.Finish() + mockRemote := mock.NewMockRemoteConnectionService(gmock) + telemetry := newTelemetryCollectingService(kvStore, mockRemote) + + ctx := context.Background() + + kvMigrator, err := migration.NewMigrator( + zap.L(), + kvStore, + all.Migrations[:]..., + ) + require.NoError(t, err) + require.NoError(t, kvMigrator.Up(ctx)) + + req := influxdb.CreateRemoteConnectionRequest{ + OrgID: platform.ID(1), + Name: "test1", + RemoteURL: "cloud2.influxdata.com", + RemoteToken: "testtoken", + RemoteOrgID: platform.ID(2), + } + req2 := req + req2.Name = "test2" + + remoteConnection1 := influxdb.RemoteConnection{ + ID: platform.ID(1), + OrgID: platform.ID(1), + } + remoteConnection2 := remoteConnection1 + remoteConnection2.ID = platform.ID(2) + + remoteConnectionsPreDelete := influxdb.RemoteConnections{ + Remotes: []influxdb.RemoteConnection{remoteConnection1, remoteConnection2}, + } + + remoteConnectionsPostDelete := influxdb.RemoteConnections{ + Remotes: []influxdb.RemoteConnection{remoteConnection1}, + } + + mockRemote.EXPECT().CreateRemoteConnection(ctx, req).Return(&remoteConnection1, nil).Times(1) + mockRemote.EXPECT().CreateRemoteConnection(ctx, req2).Return(&remoteConnection2, nil).Times(1) + mockRemote.EXPECT().ListRemoteConnections(ctx, influxdb.RemoteConnectionListFilter{OrgID: req.OrgID}).Return(&remoteConnectionsPreDelete, nil).Times(2) + + mockRemote.EXPECT().DeleteRemoteConnection(ctx, remoteConnection1.ID).Return(nil).Times(1) + mockRemote.EXPECT().GetRemoteConnection(ctx, remoteConnection1.ID).Return(&remoteConnection1, nil).Times(1) + mockRemote.EXPECT().ListRemoteConnections(ctx, influxdb.RemoteConnectionListFilter{OrgID: req.OrgID}).Return(&remoteConnectionsPostDelete, nil).Times(1) + + _, err = telemetry.CreateRemoteConnection(ctx, req) + require.NoError(t, err) + + remote, err := telemetry.CreateRemoteConnection(ctx, req2) + require.NoError(t, err) + + err = telemetry.DeleteRemoteConnection(ctx, remoteConnection1.ID) + require.NoError(t, err) + + count, err := getCount(kvStore, remote.OrgID) + require.NoError(t, err) + require.Equal(t, int64(1), int64(count)) +} diff --git a/replications/transport/http.go b/replications/transport/http.go index 6e5e6ea347..14e99a86da 100644 --- a/replications/transport/http.go +++ b/replications/transport/http.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/kit/platform/errors" kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" + "github.com/influxdata/influxdb/v2/kv" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -78,7 +79,9 @@ type ReplicationHandler struct { replicationsService ReplicationService } -func NewInstrumentedReplicationHandler(log *zap.Logger, reg prometheus.Registerer, svc ReplicationService) *ReplicationHandler { +func NewInstrumentedReplicationHandler(log *zap.Logger, reg prometheus.Registerer, kv kv.Store, svc ReplicationService) *ReplicationHandler { + // Collect telemetry + svc = newTelemetryCollectingService(kv, svc) // Collect metrics. svc = newMetricCollectingService(reg, svc) // Wrap logging. diff --git a/replications/transport/middleware_kv.go b/replications/transport/middleware_kv.go new file mode 100644 index 0000000000..8dd8d35f06 --- /dev/null +++ b/replications/transport/middleware_kv.go @@ -0,0 +1,109 @@ +package transport + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + + "github.com/influxdata/influxdb/v2/kit/platform" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kv" +) + +var replicationsBucket = []byte("replicationsv2") + +func newTelemetryCollectingService(kv kv.Store, underlying ReplicationService) *telemetryService { + return &telemetryService{ + kv: kv, + underlying: underlying, + } +} + +type telemetryService struct { + kv kv.Store + underlying ReplicationService +} + +func (t telemetryService) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) { + return t.underlying.ListReplications(ctx, filter) +} + +func (t telemetryService) GetReplication(ctx context.Context, id platform.ID) (*influxdb.Replication, error) { + return t.underlying.GetReplication(ctx, id) +} + +func (t telemetryService) UpdateReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) (*influxdb.Replication, error) { + return t.underlying.UpdateReplication(ctx, id, request) +} + +func (t telemetryService) ValidateNewReplication(ctx context.Context, request influxdb.CreateReplicationRequest) error { + return t.underlying.ValidateNewReplication(ctx, request) +} + +func (t telemetryService) ValidateUpdatedReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) error { + return t.underlying.ValidateUpdatedReplication(ctx, id, request) +} + +func (t telemetryService) ValidateReplication(ctx context.Context, id platform.ID) error { + return t.underlying.ValidateReplication(ctx, id) +} + +func (t telemetryService) CreateReplication(ctx context.Context, request influxdb.CreateReplicationRequest) (*influxdb.Replication, error) { + conn, err := t.underlying.CreateReplication(ctx, request) + if err != nil { + return conn, err + } + err = t.storeReplicationMetrics(ctx, request.OrgID) + return conn, err +} + +func (t telemetryService) DeleteReplication(ctx context.Context, id platform.ID) error { + rc, err := t.underlying.GetReplication(ctx, id) + if err != nil { + return err + } + + err = t.underlying.DeleteReplication(ctx, id) + if err != nil { + return err + } + return t.storeReplicationMetrics(ctx, rc.OrgID) +} + +func (t telemetryService) storeReplicationMetrics(ctx context.Context, orgID platform.ID) error { + if err := t.kv.Update(ctx, func(tx kv.Tx) error { + encodedID, err := orgID.Encode() + if err != nil { + return platform.ErrInvalidID + } + bucket, err := tx.Bucket(replicationsBucket) + if err != nil { + return err + } + count, err := t.countReplications(ctx, orgID) + if err != nil { + return err + } + return bucket.Put(encodedID, count) + }); err != nil { + return fmt.Errorf("updating telemetry failed: %v", err) + } + return nil +} + +func (t telemetryService) countReplications(ctx context.Context, orgID platform.ID) ([]byte, error) { + req := influxdb.ReplicationListFilter{ + OrgID: orgID, + } + list, err := t.underlying.ListReplications(ctx, req) + if err != nil { + return nil, err + } + + b := make([]byte, 0, 8) + buf := bytes.NewBuffer(b) + err = binary.Write(buf, binary.BigEndian, int64(len(list.Replications))) + return buf.Bytes(), err +} diff --git a/replications/transport/middleware_kv_test.go b/replications/transport/middleware_kv_test.go new file mode 100644 index 0000000000..acdc4bd806 --- /dev/null +++ b/replications/transport/middleware_kv_test.go @@ -0,0 +1,147 @@ +package transport + +import ( + "context" + "encoding/binary" + "testing" + + "github.com/golang/mock/gomock" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/inmem" + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/kv/migration" + "github.com/influxdata/influxdb/v2/kv/migration/all" + "github.com/influxdata/influxdb/v2/replications/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func getCount(kvStore kv.Store, orgID platform.ID) (uint64, error) { + var count uint64 + if err := kvStore.Update(context.Background(), func(tx kv.Tx) error { + encodedID, err := orgID.Encode() + if err != nil { + return err + } + bucket, err := tx.Bucket([]byte("replicationsv2")) + if err != nil { + return err + } + c, err := bucket.Get(encodedID) + if err != nil { + return err + } + + count = binary.BigEndian.Uint64(c) + return nil + }); err != nil { + return 0, err + } + + return count, nil +} + +func TestReplicationCreateKVUpdate(t *testing.T) { + kvStore := inmem.NewKVStore() + gmock := gomock.NewController(t) + defer gmock.Finish() + mockRemote := mock.NewMockReplicationService(gmock) + telemetry := newTelemetryCollectingService(kvStore, mockRemote) + + kvMigrator, err := migration.NewMigrator( + zap.L(), + kvStore, + all.Migrations[:]..., + ) + require.NoError(t, err) + require.NoError(t, kvMigrator.Up(context.Background())) + + ctx := context.Background() + req := influxdb.CreateReplicationRequest{ + OrgID: platform.ID(1), + Name: "test1", + RemoteBucketID: platform.ID(11), + LocalBucketID: platform.ID(22), + } + + replication := influxdb.Replication{ + OrgID: platform.ID(1), + } + replications := influxdb.Replications{ + Replications: []influxdb.Replication{replication}, + } + + mockRemote.EXPECT().CreateReplication(ctx, req).Return(&replication, nil).Times(1) + mockRemote.EXPECT().ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: req.OrgID}).Return(&replications, nil).Times(1) + + repl, err := telemetry.CreateReplication(ctx, req) + require.NoError(t, err) + + count, err := getCount(kvStore, repl.OrgID) + require.NoError(t, err) + require.Equal(t, int64(1), int64(count)) +} + +func TestReplicationDeleteKVUpdate(t *testing.T) { + kvStore := inmem.NewKVStore() + gmock := gomock.NewController(t) + defer gmock.Finish() + mockRemote := mock.NewMockReplicationService(gmock) + telemetry := newTelemetryCollectingService(kvStore, mockRemote) + + ctx := context.Background() + + kvMigrator, err := migration.NewMigrator( + zap.L(), + kvStore, + all.Migrations[:]..., + ) + require.NoError(t, err) + require.NoError(t, kvMigrator.Up(ctx)) + + req := influxdb.CreateReplicationRequest{ + OrgID: platform.ID(1), + Name: "test1", + RemoteBucketID: platform.ID(11), + LocalBucketID: platform.ID(22), + } + req2 := req + req2.Name = "test2" + + replication1 := influxdb.Replication{ + ID: platform.ID(1), + OrgID: platform.ID(1), + } + replication2 := replication1 + replication2.ID = platform.ID(2) + + remoteConnectionsPreDelete := influxdb.Replications{ + Replications: []influxdb.Replication{replication1, replication2}, + } + + remoteConnectionsPostDelete := influxdb.Replications{ + Replications: []influxdb.Replication{replication1}, + } + + mockRemote.EXPECT().CreateReplication(ctx, req).Return(&replication1, nil).Times(1) + mockRemote.EXPECT().CreateReplication(ctx, req2).Return(&replication2, nil).Times(1) + mockRemote.EXPECT().ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: req.OrgID}).Return(&remoteConnectionsPreDelete, nil).Times(2) + + mockRemote.EXPECT().DeleteReplication(ctx, replication1.ID).Return(nil).Times(1) + mockRemote.EXPECT().GetReplication(ctx, replication1.ID).Return(&replication1, nil).Times(1) + mockRemote.EXPECT().ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: req.OrgID}).Return(&remoteConnectionsPostDelete, nil).Times(1) + + _, err = telemetry.CreateReplication(ctx, req) + require.NoError(t, err) + + repl, err := telemetry.CreateReplication(ctx, req2) + require.NoError(t, err) + + err = telemetry.DeleteReplication(ctx, replication1.ID) + require.NoError(t, err) + + count, err := getCount(kvStore, repl.OrgID) + require.NoError(t, err) + require.Equal(t, int64(1), int64(count)) +} diff --git a/telemetry/metrics.go b/telemetry/metrics.go index e6a9c46c89..47dd467498 100644 --- a/telemetry/metrics.go +++ b/telemetry/metrics.go @@ -21,6 +21,8 @@ var telemetryMatcher = pr.NewMatcher(). Family("influxdb_scrapers_total"). Family("influxdb_telegrafs_total"). Family("influxdb_telegraf_plugins_count"). + Family("influxdb_remotes_total"). + Family("influxdb_replications_total"). Family("task_scheduler_claims_active"). // Count of currently active tasks /* * Count of API requests including success and failure