feat: deleting a bucket also deletes all associated replications (#22424)
parent
1482bdfa38
commit
153a89dba0
|
@ -647,6 +647,17 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
|
|||
NotificationRuleFinder: notificationRuleSvc,
|
||||
}
|
||||
|
||||
remotesSvc := remotes.NewService(m.sqlStore)
|
||||
remotesServer := remotesTransport.NewInstrumentedRemotesHandler(
|
||||
m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc)
|
||||
|
||||
replicationSvc := replications.NewService(m.sqlStore, ts, m.log.With(zap.String("service", "replications")))
|
||||
replicationServer := replicationTransport.NewInstrumentedReplicationHandler(
|
||||
m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc)
|
||||
|
||||
ts.BucketService = replications.NewBucketService(
|
||||
m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc)
|
||||
|
||||
m.apibackend = &http.APIBackend{
|
||||
AssetsPath: opts.AssetsPath,
|
||||
UIDisabled: opts.UIDisabled,
|
||||
|
@ -852,14 +863,6 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
|
|||
),
|
||||
)
|
||||
|
||||
remotesSvc := remotes.NewService(m.sqlStore)
|
||||
remotesServer := remotesTransport.NewInstrumentedRemotesHandler(
|
||||
m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc)
|
||||
|
||||
replicationSvc := replications.NewService(m.sqlStore, ts)
|
||||
replicationServer := replicationTransport.NewInstrumentedReplicationHandler(
|
||||
m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc)
|
||||
|
||||
platformHandler := http.NewPlatformHandler(
|
||||
m.apibackend,
|
||||
http.WithResourceHandler(stacksHTTPServer),
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
package replications
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type ReplicationDeleter interface {
|
||||
// DeleteBucketReplications deletes all replications registered to the local bucket
|
||||
// with the given ID.
|
||||
DeleteBucketReplications(context.Context, platform.ID) error
|
||||
}
|
||||
|
||||
type bucketService struct {
|
||||
influxdb.BucketService
|
||||
logger *zap.Logger
|
||||
replicationDeleter ReplicationDeleter
|
||||
}
|
||||
|
||||
func NewBucketService(log *zap.Logger, bucketSvc influxdb.BucketService, deleter ReplicationDeleter) *bucketService {
|
||||
return &bucketService{
|
||||
BucketService: bucketSvc,
|
||||
logger: log,
|
||||
replicationDeleter: deleter,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *bucketService) DeleteBucket(ctx context.Context, id platform.ID) error {
|
||||
if err := s.BucketService.DeleteBucket(ctx, id); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.replicationDeleter.DeleteBucketReplications(ctx, id); err != nil {
|
||||
s.logger.Error("Failed to delete replications for bucket",
|
||||
zap.String("bucket_id", id.String()), zap.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/snowflake"
|
||||
"github.com/influxdata/influxdb/v2/sqlite"
|
||||
"github.com/mattn/go-sqlite3"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var errReplicationNotFound = &ierrors.Error{
|
||||
|
@ -37,12 +38,13 @@ func errLocalBucketNotFound(id platform.ID, cause error) error {
|
|||
}
|
||||
}
|
||||
|
||||
func NewService(store *sqlite.SqlStore, bktSvc BucketService) *service {
|
||||
func NewService(store *sqlite.SqlStore, bktSvc BucketService, log *zap.Logger) *service {
|
||||
return &service{
|
||||
store: store,
|
||||
idGenerator: snowflake.NewIDGenerator(),
|
||||
bucketService: bktSvc,
|
||||
validator: internal.NewValidator(),
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,6 +63,7 @@ type service struct {
|
|||
idGenerator platform.IDGenerator
|
||||
bucketService BucketService
|
||||
validator ReplicationValidator
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
func (s service) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) {
|
||||
|
@ -264,6 +267,26 @@ func (s service) DeleteReplication(ctx context.Context, id platform.ID) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s service) DeleteBucketReplications(ctx context.Context, localBucketID platform.ID) error {
|
||||
s.store.Mu.Lock()
|
||||
defer s.store.Mu.Unlock()
|
||||
|
||||
q := sq.Delete("replications").Where(sq.Eq{"local_bucket_id": localBucketID}).Suffix("RETURNING id")
|
||||
query, args, err := q.ToSql()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var deleted []string
|
||||
if err := s.store.DB.SelectContext(ctx, &deleted, query, args...); err != nil {
|
||||
return err
|
||||
}
|
||||
s.log.Debug("Deleted all replications for local bucket",
|
||||
zap.String("bucket_id", localBucketID.String()), zap.Strings("ids", deleted))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s service) ValidateReplication(ctx context.Context, id platform.ID) error {
|
||||
config, err := s.getFullHTTPConfig(ctx, id)
|
||||
if err != nil {
|
||||
|
|
|
@ -418,6 +418,46 @@ func TestDeleteReplication(t *testing.T) {
|
|||
require.Nil(t, got)
|
||||
}
|
||||
|
||||
func TestDeleteReplications(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
svc, mocks, clean := newTestService(t)
|
||||
defer clean(t)
|
||||
|
||||
// Deleting when there is no bucket is OK.
|
||||
require.NoError(t, svc.DeleteBucketReplications(ctx, replication.LocalBucketID))
|
||||
|
||||
// Register a handful of replications.
|
||||
createReq2, createReq3 := createReq, createReq
|
||||
createReq2.Name, createReq3.Name = "test2", "test3"
|
||||
createReq2.LocalBucketID = platform.ID(77777)
|
||||
createReq3.RemoteID = updatedReplication.RemoteID
|
||||
mocks.bucketSvc.EXPECT().RLock().Times(3)
|
||||
mocks.bucketSvc.EXPECT().RUnlock().Times(3)
|
||||
mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), createReq.LocalBucketID).Return(&influxdb.Bucket{}, nil).Times(2)
|
||||
mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), createReq2.LocalBucketID).Return(&influxdb.Bucket{}, nil)
|
||||
insertRemote(t, svc.store, createReq.RemoteID)
|
||||
insertRemote(t, svc.store, createReq3.RemoteID)
|
||||
|
||||
for _, req := range []influxdb.CreateReplicationRequest{createReq, createReq2, createReq3} {
|
||||
_, err := svc.CreateReplication(ctx, req)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
listed, err := svc.ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: replication.OrgID})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, listed.Replications, 3)
|
||||
|
||||
// Delete 2/3 by bucket ID.
|
||||
require.NoError(t, svc.DeleteBucketReplications(ctx, createReq.LocalBucketID))
|
||||
|
||||
// Ensure they were deleted.
|
||||
listed, err = svc.ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: replication.OrgID})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, listed.Replications, 1)
|
||||
require.Equal(t, createReq2.LocalBucketID, listed.Replications[0].LocalBucketID)
|
||||
}
|
||||
|
||||
func TestListReplications(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -538,6 +578,7 @@ func newTestService(t *testing.T) (*service, mocks, func(t *testing.T)) {
|
|||
idGenerator: mock.NewIncrementingIDGenerator(initID),
|
||||
bucketService: mocks.bucketSvc,
|
||||
validator: mocks.validator,
|
||||
log: logger,
|
||||
}
|
||||
|
||||
return &svc, mocks, clean
|
||||
|
|
Loading…
Reference in New Issue