diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 5f45639889..c6ca4bddbd 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -647,6 +647,17 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { NotificationRuleFinder: notificationRuleSvc, } + remotesSvc := remotes.NewService(m.sqlStore) + remotesServer := remotesTransport.NewInstrumentedRemotesHandler( + m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc) + + replicationSvc := replications.NewService(m.sqlStore, ts, m.log.With(zap.String("service", "replications"))) + replicationServer := replicationTransport.NewInstrumentedReplicationHandler( + m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc) + + ts.BucketService = replications.NewBucketService( + m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc) + m.apibackend = &http.APIBackend{ AssetsPath: opts.AssetsPath, UIDisabled: opts.UIDisabled, @@ -852,14 +863,6 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { ), ) - remotesSvc := remotes.NewService(m.sqlStore) - remotesServer := remotesTransport.NewInstrumentedRemotesHandler( - m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc) - - replicationSvc := replications.NewService(m.sqlStore, ts) - replicationServer := replicationTransport.NewInstrumentedReplicationHandler( - m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc) - platformHandler := http.NewPlatformHandler( m.apibackend, http.WithResourceHandler(stacksHTTPServer), diff --git a/replications/bucket_service.go b/replications/bucket_service.go new file mode 100644 index 0000000000..f07858dbc2 --- /dev/null +++ b/replications/bucket_service.go @@ -0,0 +1,40 @@ +package replications + +import ( + "context" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/platform" + "go.uber.org/zap" +) + +type ReplicationDeleter interface { + // DeleteBucketReplications deletes all replications registered to the local bucket + // with the given ID. + DeleteBucketReplications(context.Context, platform.ID) error +} + +type bucketService struct { + influxdb.BucketService + logger *zap.Logger + replicationDeleter ReplicationDeleter +} + +func NewBucketService(log *zap.Logger, bucketSvc influxdb.BucketService, deleter ReplicationDeleter) *bucketService { + return &bucketService{ + BucketService: bucketSvc, + logger: log, + replicationDeleter: deleter, + } +} + +func (s *bucketService) DeleteBucket(ctx context.Context, id platform.ID) error { + if err := s.BucketService.DeleteBucket(ctx, id); err != nil { + return err + } + if err := s.replicationDeleter.DeleteBucketReplications(ctx, id); err != nil { + s.logger.Error("Failed to delete replications for bucket", + zap.String("bucket_id", id.String()), zap.Error(err)) + } + return nil +} diff --git a/replications/service.go b/replications/service.go index b27c55d7e9..69abcd65b7 100644 --- a/replications/service.go +++ b/replications/service.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/influxdb/v2/snowflake" "github.com/influxdata/influxdb/v2/sqlite" "github.com/mattn/go-sqlite3" + "go.uber.org/zap" ) var errReplicationNotFound = &ierrors.Error{ @@ -37,12 +38,13 @@ func errLocalBucketNotFound(id platform.ID, cause error) error { } } -func NewService(store *sqlite.SqlStore, bktSvc BucketService) *service { +func NewService(store *sqlite.SqlStore, bktSvc BucketService, log *zap.Logger) *service { return &service{ store: store, idGenerator: snowflake.NewIDGenerator(), bucketService: bktSvc, validator: internal.NewValidator(), + log: log, } } @@ -61,6 +63,7 @@ type service struct { idGenerator platform.IDGenerator bucketService BucketService validator ReplicationValidator + log *zap.Logger } func (s service) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) { @@ -264,6 +267,26 @@ func (s service) DeleteReplication(ctx context.Context, id platform.ID) error { return nil } +func (s service) DeleteBucketReplications(ctx context.Context, localBucketID platform.ID) error { + s.store.Mu.Lock() + defer s.store.Mu.Unlock() + + q := sq.Delete("replications").Where(sq.Eq{"local_bucket_id": localBucketID}).Suffix("RETURNING id") + query, args, err := q.ToSql() + if err != nil { + return err + } + + var deleted []string + if err := s.store.DB.SelectContext(ctx, &deleted, query, args...); err != nil { + return err + } + s.log.Debug("Deleted all replications for local bucket", + zap.String("bucket_id", localBucketID.String()), zap.Strings("ids", deleted)) + + return nil +} + func (s service) ValidateReplication(ctx context.Context, id platform.ID) error { config, err := s.getFullHTTPConfig(ctx, id) if err != nil { diff --git a/replications/service_test.go b/replications/service_test.go index d01a7edead..42a88c8771 100644 --- a/replications/service_test.go +++ b/replications/service_test.go @@ -418,6 +418,46 @@ func TestDeleteReplication(t *testing.T) { require.Nil(t, got) } +func TestDeleteReplications(t *testing.T) { + t.Parallel() + + svc, mocks, clean := newTestService(t) + defer clean(t) + + // Deleting when there is no bucket is OK. + require.NoError(t, svc.DeleteBucketReplications(ctx, replication.LocalBucketID)) + + // Register a handful of replications. + createReq2, createReq3 := createReq, createReq + createReq2.Name, createReq3.Name = "test2", "test3" + createReq2.LocalBucketID = platform.ID(77777) + createReq3.RemoteID = updatedReplication.RemoteID + mocks.bucketSvc.EXPECT().RLock().Times(3) + mocks.bucketSvc.EXPECT().RUnlock().Times(3) + mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), createReq.LocalBucketID).Return(&influxdb.Bucket{}, nil).Times(2) + mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), createReq2.LocalBucketID).Return(&influxdb.Bucket{}, nil) + insertRemote(t, svc.store, createReq.RemoteID) + insertRemote(t, svc.store, createReq3.RemoteID) + + for _, req := range []influxdb.CreateReplicationRequest{createReq, createReq2, createReq3} { + _, err := svc.CreateReplication(ctx, req) + require.NoError(t, err) + } + + listed, err := svc.ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: replication.OrgID}) + require.NoError(t, err) + require.Len(t, listed.Replications, 3) + + // Delete 2/3 by bucket ID. + require.NoError(t, svc.DeleteBucketReplications(ctx, createReq.LocalBucketID)) + + // Ensure they were deleted. + listed, err = svc.ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: replication.OrgID}) + require.NoError(t, err) + require.Len(t, listed.Replications, 1) + require.Equal(t, createReq2.LocalBucketID, listed.Replications[0].LocalBucketID) +} + func TestListReplications(t *testing.T) { t.Parallel() @@ -538,6 +578,7 @@ func newTestService(t *testing.T) (*service, mocks, func(t *testing.T)) { idGenerator: mock.NewIncrementingIDGenerator(initID), bucketService: mocks.bucketSvc, validator: mocks.validator, + log: logger, } return &svc, mocks, clean