feat: Add remotes and replications to telemetry (#23456)
* feat: start work on remotes/replications phone home data * feat: add remotes/replications phone home data (no tests * refactor: use erroring binary conversions * style: gofmt * refactor: improve some error handling * style: cleanup * feat: add tests * refactor: just list remotes/replications rather than decrement * chore: linting fix Co-authored-by: DStrand1 <dstrandboge@influxdata.com>pull/23310/merge v2.3.0
parent
8bd4fc502d
commit
090f681737
|
@ -32,6 +32,7 @@ test.key
|
|||
/fluxd
|
||||
/transpilerd
|
||||
/bin
|
||||
/internal/cmd/kvmigrate/kvmigrate
|
||||
|
||||
# Project tools that you might install with go build.
|
||||
/editorconfig-checker
|
||||
|
|
|
@ -102,6 +102,8 @@ func (c *Client) initialize(ctx context.Context) error {
|
|||
scraperBucket,
|
||||
telegrafBucket,
|
||||
telegrafPluginsBucket,
|
||||
remoteBucket,
|
||||
replicationBucket,
|
||||
userBucket,
|
||||
}
|
||||
for _, bktName := range bkts {
|
||||
|
|
|
@ -21,6 +21,8 @@ var (
|
|||
scraperBucket = []byte("scraperv2")
|
||||
telegrafBucket = []byte("telegrafv1")
|
||||
telegrafPluginsBucket = []byte("telegrafPluginsv1")
|
||||
remoteBucket = []byte("remotesv2")
|
||||
replicationBucket = []byte("replicationsv2")
|
||||
userBucket = []byte("usersv1")
|
||||
)
|
||||
|
||||
|
@ -65,6 +67,16 @@ var (
|
|||
"Number of individual telegraf plugins configured",
|
||||
[]string{"plugin"}, nil)
|
||||
|
||||
remoteDesc = prometheus.NewDesc(
|
||||
"influxdb_remotes_total",
|
||||
"Number of total remote connections configured on the server",
|
||||
nil, nil)
|
||||
|
||||
replicationDesc = prometheus.NewDesc(
|
||||
"influxdb_replications_total",
|
||||
"Number of total replication configurations on the server",
|
||||
nil, nil)
|
||||
|
||||
boltWritesDesc = prometheus.NewDesc(
|
||||
"boltdb_writes_total",
|
||||
"Total number of boltdb writes",
|
||||
|
@ -85,6 +97,8 @@ func (c *Client) Describe(ch chan<- *prometheus.Desc) {
|
|||
ch <- dashboardsDesc
|
||||
ch <- scrapersDesc
|
||||
ch <- telegrafsDesc
|
||||
ch <- remoteDesc
|
||||
ch <- replicationDesc
|
||||
ch <- boltWritesDesc
|
||||
ch <- boltReadsDesc
|
||||
|
||||
|
@ -209,12 +223,15 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) {
|
|||
|
||||
orgs, buckets, users, tokens := 0, 0, 0, 0
|
||||
dashboards, scrapers, telegrafs := 0, 0, 0
|
||||
remotes, replications := 0, 0
|
||||
_ = c.db.View(func(tx *bolt.Tx) error {
|
||||
buckets = tx.Bucket(bucketBucket).Stats().KeyN
|
||||
dashboards = tx.Bucket(dashboardBucket).Stats().KeyN
|
||||
orgs = tx.Bucket(organizationBucket).Stats().KeyN
|
||||
scrapers = tx.Bucket(scraperBucket).Stats().KeyN
|
||||
telegrafs = tx.Bucket(telegrafBucket).Stats().KeyN
|
||||
remotes = tx.Bucket(remoteBucket).Stats().KeyN
|
||||
replications = tx.Bucket(replicationBucket).Stats().KeyN
|
||||
tokens = tx.Bucket(authorizationBucket).Stats().KeyN
|
||||
users = tx.Bucket(userBucket).Stats().KeyN
|
||||
return nil
|
||||
|
@ -262,5 +279,17 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) {
|
|||
float64(telegrafs),
|
||||
)
|
||||
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
remoteDesc,
|
||||
prometheus.CounterValue,
|
||||
float64(remotes),
|
||||
)
|
||||
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
replicationDesc,
|
||||
prometheus.CounterValue,
|
||||
float64(replications),
|
||||
)
|
||||
|
||||
c.pluginsCollector.Collect(ch)
|
||||
}
|
||||
|
|
|
@ -38,6 +38,8 @@ func TestInitialMetrics(t *testing.T) {
|
|||
"influxdb_users_total": 0,
|
||||
"influxdb_tokens_total": 0,
|
||||
"influxdb_dashboards_total": 0,
|
||||
"influxdb_remotes_total": 0,
|
||||
"influxdb_replications_total": 0,
|
||||
"boltdb_reads_total": 0,
|
||||
}
|
||||
for name, count := range metrics {
|
||||
|
|
|
@ -363,11 +363,11 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
|
|||
|
||||
remotesSvc := remotes.NewService(m.sqlStore)
|
||||
remotesServer := remotesTransport.NewInstrumentedRemotesHandler(
|
||||
m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc)
|
||||
m.log.With(zap.String("handler", "remotes")), m.reg, m.kvStore, remotesSvc)
|
||||
|
||||
replicationSvc, replicationsMetrics := replications.NewService(m.sqlStore, ts, pointsWriter, m.log.With(zap.String("service", "replications")), opts.EnginePath, opts.InstanceID)
|
||||
replicationServer := replicationTransport.NewInstrumentedReplicationHandler(
|
||||
m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc)
|
||||
m.log.With(zap.String("handler", "replications")), m.reg, m.kvStore, replicationSvc)
|
||||
ts.BucketService = replications.NewBucketService(
|
||||
m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc)
|
||||
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
package all
|
||||
|
||||
import "github.com/influxdata/influxdb/v2/kv/migration"
|
||||
|
||||
var (
|
||||
remoteMetricsBucket = []byte("remotesv2")
|
||||
replicationsMetricsBucket = []byte("replicationsv2")
|
||||
)
|
||||
|
||||
var Migration0020_Add_remotes_replications_metrics_buckets = migration.CreateBuckets(
|
||||
"create remotes and replications metrics buckets",
|
||||
remoteMetricsBucket,
|
||||
replicationsMetricsBucket,
|
||||
)
|
|
@ -45,5 +45,7 @@ var Migrations = [...]migration.Spec{
|
|||
Migration0018_RepairMissingShardGroupDurations,
|
||||
// add remotes and replications resource types to operator and all-access tokens
|
||||
Migration0019_AddRemotesReplicationsToTokens,
|
||||
// add_remotes_replications_metrics_buckets
|
||||
Migration0020_Add_remotes_replications_metrics_buckets,
|
||||
// {{ do_not_edit . }}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ func NewMatcher() Matcher {
|
|||
return Matcher{}
|
||||
}
|
||||
|
||||
// Family helps constuct match by adding a metric family to match to.
|
||||
// Family helps construct match by adding a metric family to match to.
|
||||
func (m Matcher) Family(name string, lps ...*dto.LabelPair) Matcher {
|
||||
// prometheus metrics labels are sorted by label name.
|
||||
sort.Slice(lps, func(i, j int) bool {
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/kv"
|
||||
|
||||
"github.com/go-chi/chi"
|
||||
"github.com/go-chi/chi/middleware"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
|
@ -56,7 +58,9 @@ type RemoteConnectionHandler struct {
|
|||
remotesService RemoteConnectionService
|
||||
}
|
||||
|
||||
func NewInstrumentedRemotesHandler(log *zap.Logger, reg prometheus.Registerer, svc RemoteConnectionService) *RemoteConnectionHandler {
|
||||
func NewInstrumentedRemotesHandler(log *zap.Logger, reg prometheus.Registerer, kv kv.Store, svc RemoteConnectionService) *RemoteConnectionHandler {
|
||||
// Collect telemetry.
|
||||
svc = newTelemetryCollectingService(kv, svc)
|
||||
// Collect metrics.
|
||||
svc = newMetricCollectingService(reg, svc)
|
||||
// Wrap logging.
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
package transport
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kv"
|
||||
)
|
||||
|
||||
var remotesBucket = []byte("remotesv2")
|
||||
|
||||
func newTelemetryCollectingService(kv kv.Store, underlying RemoteConnectionService) *telemetryService {
|
||||
return &telemetryService{
|
||||
kv: kv,
|
||||
underlying: underlying,
|
||||
}
|
||||
}
|
||||
|
||||
type telemetryService struct {
|
||||
kv kv.Store
|
||||
underlying RemoteConnectionService
|
||||
}
|
||||
|
||||
func (t telemetryService) ListRemoteConnections(ctx context.Context, filter influxdb.RemoteConnectionListFilter) (*influxdb.RemoteConnections, error) {
|
||||
return t.underlying.ListRemoteConnections(ctx, filter)
|
||||
}
|
||||
|
||||
func (t telemetryService) GetRemoteConnection(ctx context.Context, id platform.ID) (*influxdb.RemoteConnection, error) {
|
||||
return t.underlying.GetRemoteConnection(ctx, id)
|
||||
}
|
||||
|
||||
func (t telemetryService) UpdateRemoteConnection(ctx context.Context, id platform.ID, request influxdb.UpdateRemoteConnectionRequest) (*influxdb.RemoteConnection, error) {
|
||||
return t.underlying.UpdateRemoteConnection(ctx, id, request)
|
||||
}
|
||||
|
||||
func (t telemetryService) CreateRemoteConnection(ctx context.Context, request influxdb.CreateRemoteConnectionRequest) (*influxdb.RemoteConnection, error) {
|
||||
conn, err := t.underlying.CreateRemoteConnection(ctx, request)
|
||||
if err != nil {
|
||||
return conn, err
|
||||
}
|
||||
err = t.storeRemoteMetrics(ctx, request.OrgID)
|
||||
return conn, err
|
||||
}
|
||||
|
||||
func (t telemetryService) DeleteRemoteConnection(ctx context.Context, id platform.ID) error {
|
||||
rc, err := t.underlying.GetRemoteConnection(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = t.underlying.DeleteRemoteConnection(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return t.storeRemoteMetrics(ctx, rc.OrgID)
|
||||
}
|
||||
|
||||
func (t telemetryService) storeRemoteMetrics(ctx context.Context, orgID platform.ID) error {
|
||||
if err := t.kv.Update(ctx, func(tx kv.Tx) error {
|
||||
encodedID, err := orgID.Encode()
|
||||
if err != nil {
|
||||
return platform.ErrInvalidID
|
||||
}
|
||||
bucket, err := tx.Bucket(remotesBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
count, err := t.countRemotes(ctx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return bucket.Put(encodedID, count)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("updating telemetry failed: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t telemetryService) countRemotes(ctx context.Context, orgID platform.ID) ([]byte, error) {
|
||||
req := influxdb.RemoteConnectionListFilter{
|
||||
OrgID: orgID,
|
||||
}
|
||||
list, err := t.underlying.ListRemoteConnections(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b := make([]byte, 0, 8)
|
||||
buf := bytes.NewBuffer(b)
|
||||
err = binary.Write(buf, binary.BigEndian, int64(len(list.Remotes)))
|
||||
return buf.Bytes(), err
|
||||
}
|
|
@ -0,0 +1,149 @@
|
|||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/inmem"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/influxdata/influxdb/v2/kv"
|
||||
"github.com/influxdata/influxdb/v2/kv/migration"
|
||||
"github.com/influxdata/influxdb/v2/kv/migration/all"
|
||||
"github.com/influxdata/influxdb/v2/remotes/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func getCount(kvStore kv.Store, orgID platform.ID) (uint64, error) {
|
||||
var count uint64
|
||||
if err := kvStore.Update(context.Background(), func(tx kv.Tx) error {
|
||||
encodedID, err := orgID.Encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bucket, err := tx.Bucket([]byte("remotesv2"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c, err := bucket.Get(encodedID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
count = binary.BigEndian.Uint64(c)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func TestRemoteCreateKVUpdate(t *testing.T) {
|
||||
kvStore := inmem.NewKVStore()
|
||||
gmock := gomock.NewController(t)
|
||||
defer gmock.Finish()
|
||||
mockRemote := mock.NewMockRemoteConnectionService(gmock)
|
||||
telemetry := newTelemetryCollectingService(kvStore, mockRemote)
|
||||
|
||||
kvMigrator, err := migration.NewMigrator(
|
||||
zap.L(),
|
||||
kvStore,
|
||||
all.Migrations[:]...,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, kvMigrator.Up(context.Background()))
|
||||
|
||||
ctx := context.Background()
|
||||
req := influxdb.CreateRemoteConnectionRequest{
|
||||
OrgID: platform.ID(1),
|
||||
Name: "test1",
|
||||
RemoteURL: "cloud2.influxdata.com",
|
||||
RemoteToken: "testtoken",
|
||||
RemoteOrgID: platform.ID(2),
|
||||
}
|
||||
|
||||
remoteConnction := influxdb.RemoteConnection{
|
||||
OrgID: platform.ID(1),
|
||||
}
|
||||
remoteConnections := influxdb.RemoteConnections{
|
||||
Remotes: []influxdb.RemoteConnection{remoteConnction},
|
||||
}
|
||||
|
||||
mockRemote.EXPECT().CreateRemoteConnection(ctx, req).Return(&remoteConnction, nil).Times(1)
|
||||
mockRemote.EXPECT().ListRemoteConnections(ctx, influxdb.RemoteConnectionListFilter{OrgID: req.OrgID}).Return(&remoteConnections, nil).Times(1)
|
||||
|
||||
remote, err := telemetry.CreateRemoteConnection(ctx, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
count, err := getCount(kvStore, remote.OrgID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), int64(count))
|
||||
}
|
||||
|
||||
func TestRemoteDeleteKVUpdate(t *testing.T) {
|
||||
kvStore := inmem.NewKVStore()
|
||||
gmock := gomock.NewController(t)
|
||||
defer gmock.Finish()
|
||||
mockRemote := mock.NewMockRemoteConnectionService(gmock)
|
||||
telemetry := newTelemetryCollectingService(kvStore, mockRemote)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
kvMigrator, err := migration.NewMigrator(
|
||||
zap.L(),
|
||||
kvStore,
|
||||
all.Migrations[:]...,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, kvMigrator.Up(ctx))
|
||||
|
||||
req := influxdb.CreateRemoteConnectionRequest{
|
||||
OrgID: platform.ID(1),
|
||||
Name: "test1",
|
||||
RemoteURL: "cloud2.influxdata.com",
|
||||
RemoteToken: "testtoken",
|
||||
RemoteOrgID: platform.ID(2),
|
||||
}
|
||||
req2 := req
|
||||
req2.Name = "test2"
|
||||
|
||||
remoteConnection1 := influxdb.RemoteConnection{
|
||||
ID: platform.ID(1),
|
||||
OrgID: platform.ID(1),
|
||||
}
|
||||
remoteConnection2 := remoteConnection1
|
||||
remoteConnection2.ID = platform.ID(2)
|
||||
|
||||
remoteConnectionsPreDelete := influxdb.RemoteConnections{
|
||||
Remotes: []influxdb.RemoteConnection{remoteConnection1, remoteConnection2},
|
||||
}
|
||||
|
||||
remoteConnectionsPostDelete := influxdb.RemoteConnections{
|
||||
Remotes: []influxdb.RemoteConnection{remoteConnection1},
|
||||
}
|
||||
|
||||
mockRemote.EXPECT().CreateRemoteConnection(ctx, req).Return(&remoteConnection1, nil).Times(1)
|
||||
mockRemote.EXPECT().CreateRemoteConnection(ctx, req2).Return(&remoteConnection2, nil).Times(1)
|
||||
mockRemote.EXPECT().ListRemoteConnections(ctx, influxdb.RemoteConnectionListFilter{OrgID: req.OrgID}).Return(&remoteConnectionsPreDelete, nil).Times(2)
|
||||
|
||||
mockRemote.EXPECT().DeleteRemoteConnection(ctx, remoteConnection1.ID).Return(nil).Times(1)
|
||||
mockRemote.EXPECT().GetRemoteConnection(ctx, remoteConnection1.ID).Return(&remoteConnection1, nil).Times(1)
|
||||
mockRemote.EXPECT().ListRemoteConnections(ctx, influxdb.RemoteConnectionListFilter{OrgID: req.OrgID}).Return(&remoteConnectionsPostDelete, nil).Times(1)
|
||||
|
||||
_, err = telemetry.CreateRemoteConnection(ctx, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
remote, err := telemetry.CreateRemoteConnection(ctx, req2)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = telemetry.DeleteRemoteConnection(ctx, remoteConnection1.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
count, err := getCount(kvStore, remote.OrgID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), int64(count))
|
||||
}
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform/errors"
|
||||
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
|
||||
"github.com/influxdata/influxdb/v2/kv"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -78,7 +79,9 @@ type ReplicationHandler struct {
|
|||
replicationsService ReplicationService
|
||||
}
|
||||
|
||||
func NewInstrumentedReplicationHandler(log *zap.Logger, reg prometheus.Registerer, svc ReplicationService) *ReplicationHandler {
|
||||
func NewInstrumentedReplicationHandler(log *zap.Logger, reg prometheus.Registerer, kv kv.Store, svc ReplicationService) *ReplicationHandler {
|
||||
// Collect telemetry
|
||||
svc = newTelemetryCollectingService(kv, svc)
|
||||
// Collect metrics.
|
||||
svc = newMetricCollectingService(reg, svc)
|
||||
// Wrap logging.
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
package transport
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kv"
|
||||
)
|
||||
|
||||
var replicationsBucket = []byte("replicationsv2")
|
||||
|
||||
func newTelemetryCollectingService(kv kv.Store, underlying ReplicationService) *telemetryService {
|
||||
return &telemetryService{
|
||||
kv: kv,
|
||||
underlying: underlying,
|
||||
}
|
||||
}
|
||||
|
||||
type telemetryService struct {
|
||||
kv kv.Store
|
||||
underlying ReplicationService
|
||||
}
|
||||
|
||||
func (t telemetryService) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) {
|
||||
return t.underlying.ListReplications(ctx, filter)
|
||||
}
|
||||
|
||||
func (t telemetryService) GetReplication(ctx context.Context, id platform.ID) (*influxdb.Replication, error) {
|
||||
return t.underlying.GetReplication(ctx, id)
|
||||
}
|
||||
|
||||
func (t telemetryService) UpdateReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) (*influxdb.Replication, error) {
|
||||
return t.underlying.UpdateReplication(ctx, id, request)
|
||||
}
|
||||
|
||||
func (t telemetryService) ValidateNewReplication(ctx context.Context, request influxdb.CreateReplicationRequest) error {
|
||||
return t.underlying.ValidateNewReplication(ctx, request)
|
||||
}
|
||||
|
||||
func (t telemetryService) ValidateUpdatedReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) error {
|
||||
return t.underlying.ValidateUpdatedReplication(ctx, id, request)
|
||||
}
|
||||
|
||||
func (t telemetryService) ValidateReplication(ctx context.Context, id platform.ID) error {
|
||||
return t.underlying.ValidateReplication(ctx, id)
|
||||
}
|
||||
|
||||
func (t telemetryService) CreateReplication(ctx context.Context, request influxdb.CreateReplicationRequest) (*influxdb.Replication, error) {
|
||||
conn, err := t.underlying.CreateReplication(ctx, request)
|
||||
if err != nil {
|
||||
return conn, err
|
||||
}
|
||||
err = t.storeReplicationMetrics(ctx, request.OrgID)
|
||||
return conn, err
|
||||
}
|
||||
|
||||
func (t telemetryService) DeleteReplication(ctx context.Context, id platform.ID) error {
|
||||
rc, err := t.underlying.GetReplication(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = t.underlying.DeleteReplication(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return t.storeReplicationMetrics(ctx, rc.OrgID)
|
||||
}
|
||||
|
||||
func (t telemetryService) storeReplicationMetrics(ctx context.Context, orgID platform.ID) error {
|
||||
if err := t.kv.Update(ctx, func(tx kv.Tx) error {
|
||||
encodedID, err := orgID.Encode()
|
||||
if err != nil {
|
||||
return platform.ErrInvalidID
|
||||
}
|
||||
bucket, err := tx.Bucket(replicationsBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
count, err := t.countReplications(ctx, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return bucket.Put(encodedID, count)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("updating telemetry failed: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t telemetryService) countReplications(ctx context.Context, orgID platform.ID) ([]byte, error) {
|
||||
req := influxdb.ReplicationListFilter{
|
||||
OrgID: orgID,
|
||||
}
|
||||
list, err := t.underlying.ListReplications(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b := make([]byte, 0, 8)
|
||||
buf := bytes.NewBuffer(b)
|
||||
err = binary.Write(buf, binary.BigEndian, int64(len(list.Replications)))
|
||||
return buf.Bytes(), err
|
||||
}
|
|
@ -0,0 +1,147 @@
|
|||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/inmem"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/influxdata/influxdb/v2/kv"
|
||||
"github.com/influxdata/influxdb/v2/kv/migration"
|
||||
"github.com/influxdata/influxdb/v2/kv/migration/all"
|
||||
"github.com/influxdata/influxdb/v2/replications/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func getCount(kvStore kv.Store, orgID platform.ID) (uint64, error) {
|
||||
var count uint64
|
||||
if err := kvStore.Update(context.Background(), func(tx kv.Tx) error {
|
||||
encodedID, err := orgID.Encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bucket, err := tx.Bucket([]byte("replicationsv2"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c, err := bucket.Get(encodedID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
count = binary.BigEndian.Uint64(c)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func TestReplicationCreateKVUpdate(t *testing.T) {
|
||||
kvStore := inmem.NewKVStore()
|
||||
gmock := gomock.NewController(t)
|
||||
defer gmock.Finish()
|
||||
mockRemote := mock.NewMockReplicationService(gmock)
|
||||
telemetry := newTelemetryCollectingService(kvStore, mockRemote)
|
||||
|
||||
kvMigrator, err := migration.NewMigrator(
|
||||
zap.L(),
|
||||
kvStore,
|
||||
all.Migrations[:]...,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, kvMigrator.Up(context.Background()))
|
||||
|
||||
ctx := context.Background()
|
||||
req := influxdb.CreateReplicationRequest{
|
||||
OrgID: platform.ID(1),
|
||||
Name: "test1",
|
||||
RemoteBucketID: platform.ID(11),
|
||||
LocalBucketID: platform.ID(22),
|
||||
}
|
||||
|
||||
replication := influxdb.Replication{
|
||||
OrgID: platform.ID(1),
|
||||
}
|
||||
replications := influxdb.Replications{
|
||||
Replications: []influxdb.Replication{replication},
|
||||
}
|
||||
|
||||
mockRemote.EXPECT().CreateReplication(ctx, req).Return(&replication, nil).Times(1)
|
||||
mockRemote.EXPECT().ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: req.OrgID}).Return(&replications, nil).Times(1)
|
||||
|
||||
repl, err := telemetry.CreateReplication(ctx, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
count, err := getCount(kvStore, repl.OrgID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), int64(count))
|
||||
}
|
||||
|
||||
func TestReplicationDeleteKVUpdate(t *testing.T) {
|
||||
kvStore := inmem.NewKVStore()
|
||||
gmock := gomock.NewController(t)
|
||||
defer gmock.Finish()
|
||||
mockRemote := mock.NewMockReplicationService(gmock)
|
||||
telemetry := newTelemetryCollectingService(kvStore, mockRemote)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
kvMigrator, err := migration.NewMigrator(
|
||||
zap.L(),
|
||||
kvStore,
|
||||
all.Migrations[:]...,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, kvMigrator.Up(ctx))
|
||||
|
||||
req := influxdb.CreateReplicationRequest{
|
||||
OrgID: platform.ID(1),
|
||||
Name: "test1",
|
||||
RemoteBucketID: platform.ID(11),
|
||||
LocalBucketID: platform.ID(22),
|
||||
}
|
||||
req2 := req
|
||||
req2.Name = "test2"
|
||||
|
||||
replication1 := influxdb.Replication{
|
||||
ID: platform.ID(1),
|
||||
OrgID: platform.ID(1),
|
||||
}
|
||||
replication2 := replication1
|
||||
replication2.ID = platform.ID(2)
|
||||
|
||||
remoteConnectionsPreDelete := influxdb.Replications{
|
||||
Replications: []influxdb.Replication{replication1, replication2},
|
||||
}
|
||||
|
||||
remoteConnectionsPostDelete := influxdb.Replications{
|
||||
Replications: []influxdb.Replication{replication1},
|
||||
}
|
||||
|
||||
mockRemote.EXPECT().CreateReplication(ctx, req).Return(&replication1, nil).Times(1)
|
||||
mockRemote.EXPECT().CreateReplication(ctx, req2).Return(&replication2, nil).Times(1)
|
||||
mockRemote.EXPECT().ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: req.OrgID}).Return(&remoteConnectionsPreDelete, nil).Times(2)
|
||||
|
||||
mockRemote.EXPECT().DeleteReplication(ctx, replication1.ID).Return(nil).Times(1)
|
||||
mockRemote.EXPECT().GetReplication(ctx, replication1.ID).Return(&replication1, nil).Times(1)
|
||||
mockRemote.EXPECT().ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: req.OrgID}).Return(&remoteConnectionsPostDelete, nil).Times(1)
|
||||
|
||||
_, err = telemetry.CreateReplication(ctx, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
repl, err := telemetry.CreateReplication(ctx, req2)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = telemetry.DeleteReplication(ctx, replication1.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
count, err := getCount(kvStore, repl.OrgID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), int64(count))
|
||||
}
|
|
@ -21,6 +21,8 @@ var telemetryMatcher = pr.NewMatcher().
|
|||
Family("influxdb_scrapers_total").
|
||||
Family("influxdb_telegrafs_total").
|
||||
Family("influxdb_telegraf_plugins_count").
|
||||
Family("influxdb_remotes_total").
|
||||
Family("influxdb_replications_total").
|
||||
Family("task_scheduler_claims_active"). // Count of currently active tasks
|
||||
/*
|
||||
* Count of API requests including success and failure
|
||||
|
|
Loading…
Reference in New Issue