diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index bf62e31b84..86258a3126 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -655,7 +655,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { remotesServer := remotesTransport.NewInstrumentedRemotesHandler( m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc) - replicationSvc := replications.NewService(m.sqlStore, ts, m.log.With(zap.String("service", "replications"))) + replicationSvc := replications.NewService(m.sqlStore, ts, m.log.With(zap.String("service", "replications")), opts.EnginePath) replicationServer := replicationTransport.NewInstrumentedReplicationHandler( m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc) diff --git a/pkg/durablequeue/queue.go b/pkg/durablequeue/queue.go index 2ab5686c63..9de352c765 100644 --- a/pkg/durablequeue/queue.go +++ b/pkg/durablequeue/queue.go @@ -360,6 +360,11 @@ func (l *Queue) TotalBytes() int64 { return n } +// Dir returns the directory associated with the queue. +func (l *Queue) Dir() string { + return l.dir +} + // diskUsage returns the total size on disk used by the Queue. func (l *Queue) diskUsage() int64 { var size int64 diff --git a/replications/internal/queue_management.go b/replications/internal/queue_management.go new file mode 100644 index 0000000000..2f1751b0ac --- /dev/null +++ b/replications/internal/queue_management.go @@ -0,0 +1,128 @@ +package internal + +import ( + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/influxdata/influxdb/v2/pkg/durablequeue" + "go.uber.org/zap" +) + +type durableQueueManager struct { + replicationQueues map[platform.ID]*durablequeue.Queue + logger *zap.Logger + enginePath string + mutex sync.RWMutex +} + +// NewDurableQueueManager creates a new durableQueueManager struct, for managing durable queues associated with +//replication streams. +func NewDurableQueueManager(log *zap.Logger, enginePath string) *durableQueueManager { + replicationQueues := make(map[platform.ID]*durablequeue.Queue) + + return &durableQueueManager{ + replicationQueues: replicationQueues, + logger: log, + enginePath: enginePath, + } +} + +// InitializeQueue creates a new durable queue which is associated with a replication stream. +func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64) error { + qm.mutex.Lock() + defer qm.mutex.Unlock() + + // Check for duplicate replication ID + if _, exists := qm.replicationQueues[replicationID]; exists { + return fmt.Errorf("durable queue already exists for replication ID %q", replicationID) + } + + // Set up path for new queue on disk + dir := filepath.Join( + qm.enginePath, + "replicationq", + replicationID.String(), + ) + if err := os.MkdirAll(dir, 0777); err != nil { + return err + } + + // Create a new durable queue + newQueue, err := durablequeue.NewQueue( + dir, + maxQueueSizeBytes, + durablequeue.DefaultSegmentSize, + &durablequeue.SharedCount{}, + durablequeue.MaxWritesPending, + func(bytes []byte) error { + return nil + }, + ) + + if err != nil { + return err + } + + // Map new durable queue to its corresponding replication stream via replication ID + qm.replicationQueues[replicationID] = newQueue + + // Open the new queue + if err := newQueue.Open(); err != nil { + return err + } + + qm.logger.Debug("Created new durable queue for replication stream", + zap.String("id", replicationID.String()), zap.String("path", dir)) + + return nil +} + +// DeleteQueue deletes a durable queue and its associated data on disk. +func (qm *durableQueueManager) DeleteQueue(replicationID platform.ID) error { + qm.mutex.Lock() + defer qm.mutex.Unlock() + + if qm.replicationQueues[replicationID] == nil { + return fmt.Errorf("durable queue not found for replication ID %q", replicationID) + } + + // Close the queue + if err := qm.replicationQueues[replicationID].Close(); err != nil { + return err + } + + qm.logger.Debug("Closed replication stream durable queue", + zap.String("id", replicationID.String()), zap.String("path", qm.replicationQueues[replicationID].Dir())) + + // Delete any enqueued, un-flushed data on disk for this queue + if err := qm.replicationQueues[replicationID].Remove(); err != nil { + return err + } + + qm.logger.Debug("Deleted data associated with replication stream durable queue", + zap.String("id", replicationID.String()), zap.String("path", qm.replicationQueues[replicationID].Dir())) + + // Remove entry from replicationQueues map + delete(qm.replicationQueues, replicationID) + + return nil +} + +// UpdateMaxQueueSize updates the maximum size of the durable queue. +func (qm *durableQueueManager) UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error { + qm.mutex.RLock() + defer qm.mutex.RUnlock() + + if qm.replicationQueues[replicationID] == nil { + return fmt.Errorf("durable queue not found for replication ID %q", replicationID) + } + + if err := qm.replicationQueues[replicationID].SetMaxSize(maxQueueSizeBytes); err != nil { + return err + } + + return nil +} diff --git a/replications/internal/queue_management_test.go b/replications/internal/queue_management_test.go new file mode 100644 index 0000000000..216daa2930 --- /dev/null +++ b/replications/internal/queue_management_test.go @@ -0,0 +1,100 @@ +package internal + +import ( + "os" + "path/filepath" + "testing" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/platform" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +var ( + replicationID = platform.ID(1) + maxQueueSizeBytes = 3 * influxdb.DefaultReplicationMaxQueueSizeBytes +) + +func TestCreateNewQueueDirExists(t *testing.T) { + t.Parallel() + + tempEnginePath, err := os.MkdirTemp("", "engine") + require.NoError(t, err) + defer os.RemoveAll(tempEnginePath) + + logger := zaptest.NewLogger(t) + qm := NewDurableQueueManager(logger, tempEnginePath) + err = qm.InitializeQueue(replicationID, maxQueueSizeBytes) + + require.NoError(t, err) + require.DirExists(t, filepath.Join(tempEnginePath, "replicationq", replicationID.String())) +} + +func TestCreateNewQueueDuplicateID(t *testing.T) { + t.Parallel() + + tempEnginePath, err := os.MkdirTemp("", "engine") + require.NoError(t, err) + defer os.RemoveAll(tempEnginePath) + + // Create a valid new queue + logger := zaptest.NewLogger(t) + qm := NewDurableQueueManager(logger, tempEnginePath) + err = qm.InitializeQueue(replicationID, maxQueueSizeBytes) + require.NoError(t, err) + + // Try to initialize another queue with the same replication ID + err = qm.InitializeQueue(replicationID, maxQueueSizeBytes) + require.EqualError(t, err, "durable queue already exists for replication ID \"0000000000000001\"") +} + +func TestDeleteQueueDirRemoved(t *testing.T) { + t.Parallel() + + tempEnginePath, err := os.MkdirTemp("", "engine") + require.NoError(t, err) + defer os.RemoveAll(tempEnginePath) + + // Create a valid new queue + logger := zaptest.NewLogger(t) + qm := NewDurableQueueManager(logger, tempEnginePath) + err = qm.InitializeQueue(replicationID, maxQueueSizeBytes) + require.NoError(t, err) + require.DirExists(t, filepath.Join(tempEnginePath, "replicationq", replicationID.String())) + + // Delete queue and make sure its queue has been deleted from disk + err = qm.DeleteQueue(replicationID) + require.NoError(t, err) + require.NoDirExists(t, filepath.Join(tempEnginePath, "replicationq", replicationID.String())) +} + +func TestDeleteQueueNonexistentID(t *testing.T) { + t.Parallel() + + tempEnginePath, err := os.MkdirTemp("", "engine") + require.NoError(t, err) + defer os.RemoveAll(tempEnginePath) + + logger := zaptest.NewLogger(t) + qm := NewDurableQueueManager(logger, tempEnginePath) + + // Delete nonexistent queue + err = qm.DeleteQueue(replicationID) + require.EqualError(t, err, "durable queue not found for replication ID \"0000000000000001\"") +} + +func TestUpdateMaxQueueSizeNonexistentID(t *testing.T) { + t.Parallel() + + tempEnginePath, err := os.MkdirTemp("", "engine") + require.NoError(t, err) + defer os.RemoveAll(tempEnginePath) + + logger := zaptest.NewLogger(t) + qm := NewDurableQueueManager(logger, tempEnginePath) + + // Update nonexistent queue + err = qm.UpdateMaxQueueSize(replicationID, influxdb.DefaultReplicationMaxQueueSizeBytes) + require.EqualError(t, err, "durable queue not found for replication ID \"0000000000000001\"") +} diff --git a/replications/mock/queue_management.go b/replications/mock/queue_management.go new file mode 100644 index 0000000000..dc29d0b891 --- /dev/null +++ b/replications/mock/queue_management.go @@ -0,0 +1,77 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/influxdata/influxdb/v2/replications (interfaces: DurableQueueManager) + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + platform "github.com/influxdata/influxdb/v2/kit/platform" +) + +// MockDurableQueueManager is a mock of DurableQueueManager interface. +type MockDurableQueueManager struct { + ctrl *gomock.Controller + recorder *MockDurableQueueManagerMockRecorder +} + +// MockDurableQueueManagerMockRecorder is the mock recorder for MockDurableQueueManager. +type MockDurableQueueManagerMockRecorder struct { + mock *MockDurableQueueManager +} + +// NewMockDurableQueueManager creates a new mock instance. +func NewMockDurableQueueManager(ctrl *gomock.Controller) *MockDurableQueueManager { + mock := &MockDurableQueueManager{ctrl: ctrl} + mock.recorder = &MockDurableQueueManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDurableQueueManager) EXPECT() *MockDurableQueueManagerMockRecorder { + return m.recorder +} + +// DeleteQueue mocks base method. +func (m *MockDurableQueueManager) DeleteQueue(arg0 platform.ID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteQueue", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteQueue indicates an expected call of DeleteQueue. +func (mr *MockDurableQueueManagerMockRecorder) DeleteQueue(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).DeleteQueue), arg0) +} + +// InitializeQueue mocks base method. +func (m *MockDurableQueueManager) InitializeQueue(arg0 platform.ID, arg1 int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InitializeQueue", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// InitializeQueue indicates an expected call of InitializeQueue. +func (mr *MockDurableQueueManagerMockRecorder) InitializeQueue(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitializeQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).InitializeQueue), arg0, arg1) +} + +// UpdateMaxQueueSize mocks base method. +func (m *MockDurableQueueManager) UpdateMaxQueueSize(arg0 platform.ID, arg1 int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateMaxQueueSize", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateMaxQueueSize indicates an expected call of UpdateMaxQueueSize. +func (mr *MockDurableQueueManagerMockRecorder) UpdateMaxQueueSize(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateMaxQueueSize", reflect.TypeOf((*MockDurableQueueManager)(nil).UpdateMaxQueueSize), arg0, arg1) +} diff --git a/replications/service.go b/replications/service.go index d3b3a482ed..38cabc6d44 100644 --- a/replications/service.go +++ b/replications/service.go @@ -38,13 +38,14 @@ func errLocalBucketNotFound(id platform.ID, cause error) error { } } -func NewService(store *sqlite.SqlStore, bktSvc BucketService, log *zap.Logger) *service { +func NewService(store *sqlite.SqlStore, bktSvc BucketService, log *zap.Logger, enginePath string) *service { return &service{ - store: store, - idGenerator: snowflake.NewIDGenerator(), - bucketService: bktSvc, - validator: internal.NewValidator(), - log: log, + store: store, + idGenerator: snowflake.NewIDGenerator(), + bucketService: bktSvc, + validator: internal.NewValidator(), + log: log, + durableQueueManager: internal.NewDurableQueueManager(log, enginePath), } } @@ -58,12 +59,19 @@ type BucketService interface { FindBucketByID(ctx context.Context, id platform.ID) (*influxdb.Bucket, error) } +type DurableQueueManager interface { + InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64) error + DeleteQueue(replicationID platform.ID) error + UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error +} + type service struct { - store *sqlite.SqlStore - idGenerator platform.IDGenerator - bucketService BucketService - validator ReplicationValidator - log *zap.Logger + store *sqlite.SqlStore + idGenerator platform.IDGenerator + bucketService BucketService + validator ReplicationValidator + durableQueueManager DurableQueueManager + log *zap.Logger } func (s service) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) { @@ -108,9 +116,14 @@ func (s service) CreateReplication(ctx context.Context, request influxdb.CreateR return nil, errLocalBucketNotFound(request.LocalBucketID, err) } + newID := s.idGenerator.ID() + if err := s.durableQueueManager.InitializeQueue(newID, request.MaxQueueSizeBytes); err != nil { + return nil, err + } + q := sq.Insert("replications"). SetMap(sq.Eq{ - "id": s.idGenerator.ID(), + "id": newID, "org_id": request.OrgID, "name": request.Name, "description": request.Description, @@ -124,18 +137,29 @@ func (s service) CreateReplication(ctx context.Context, request influxdb.CreateR }). Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, current_queue_size_bytes") + cleanupQueue := func() { + if cleanupErr := s.durableQueueManager.DeleteQueue(newID); cleanupErr != nil { + s.log.Warn("durable queue remaining on disk after initialization failure", zap.Error(cleanupErr), zap.String("ID", newID.String())) + } + } + query, args, err := q.ToSql() if err != nil { + cleanupQueue() return nil, err } var r influxdb.Replication + if err := s.store.DB.GetContext(ctx, &r, query, args...); err != nil { if sqlErr, ok := err.(sqlite3.Error); ok && sqlErr.ExtendedCode == sqlite3.ErrConstraintForeignKey { + cleanupQueue() return nil, errRemoteNotFound(request.RemoteID, err) } + cleanupQueue() return nil, err } + return &r, nil } @@ -222,6 +246,13 @@ func (s service) UpdateReplication(ctx context.Context, id platform.ID, request } return nil, err } + + if request.MaxQueueSizeBytes != nil { + if err := s.durableQueueManager.UpdateMaxQueueSize(id, *request.MaxQueueSizeBytes); err != nil { + s.log.Warn("actual max queue size does not match the max queue size recorded in database", zap.String("ID", id.String())) + return nil, err + } + } return &r, nil } @@ -267,6 +298,11 @@ func (s service) DeleteReplication(ctx context.Context, id platform.ID) error { } return err } + + if err := s.durableQueueManager.DeleteQueue(id); err != nil { + return err + } + return nil } @@ -284,9 +320,28 @@ func (s service) DeleteBucketReplications(ctx context.Context, localBucketID pla if err := s.store.DB.SelectContext(ctx, &deleted, query, args...); err != nil { return err } + + errOccurred := false + for _, replication := range deleted { + id, err := platform.IDFromString(replication) + if err != nil { + s.log.Error("durable queue remaining on disk after deletion failure", zap.Error(err), zap.String("ID", replication)) + errOccurred = true + } + + if err := s.durableQueueManager.DeleteQueue(*id); err != nil { + s.log.Error("durable queue remaining on disk after deletion failure", zap.Error(err), zap.String("ID", replication)) + errOccurred = true + } + } + s.log.Debug("Deleted all replications for local bucket", zap.String("bucket_id", localBucketID.String()), zap.Strings("ids", deleted)) + if errOccurred { + return fmt.Errorf("deleting replications for bucket %q failed, see server logs for details", localBucketID) + } + return nil } diff --git a/replications/service_test.go b/replications/service_test.go index 42a88c8771..2e9ad41205 100644 --- a/replications/service_test.go +++ b/replications/service_test.go @@ -93,6 +93,7 @@ func TestCreateAndGetReplication(t *testing.T) { require.Equal(t, errReplicationNotFound, svc.ValidateReplication(ctx, initID)) // Create a replication, check the results. + mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes) created, err := svc.CreateReplication(ctx, createReq) require.NoError(t, err) require.Equal(t, replication, *created) @@ -143,6 +144,8 @@ func TestCreateMissingRemote(t *testing.T) { mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), createReq.LocalBucketID). Return(&influxdb.Bucket{}, nil) + mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes) + mocks.durableQueueManager.EXPECT().DeleteQueue(initID) created, err := svc.CreateReplication(ctx, createReq) require.Error(t, err) require.Contains(t, err.Error(), fmt.Sprintf("remote %q not found", createReq.RemoteID)) @@ -239,11 +242,13 @@ func TestUpdateAndGetReplication(t *testing.T) { require.Nil(t, updated) // Create a replication. + mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes) created, err := svc.CreateReplication(ctx, createReq) require.NoError(t, err) require.Equal(t, replication, *created) // Update the replication. + mocks.durableQueueManager.EXPECT().UpdateMaxQueueSize(initID, *updateReq.MaxQueueSizeBytes) updated, err = svc.UpdateReplication(ctx, initID, updateReq) require.NoError(t, err) require.Equal(t, updatedReplication, *updated) @@ -262,6 +267,7 @@ func TestUpdateMissingRemote(t *testing.T) { Return(&influxdb.Bucket{}, nil) // Create a replication. + mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes) created, err := svc.CreateReplication(ctx, createReq) require.NoError(t, err) require.Equal(t, replication, *created) @@ -291,6 +297,7 @@ func TestUpdateNoop(t *testing.T) { Return(&influxdb.Bucket{}, nil) // Create a replication. + mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes) created, err := svc.CreateReplication(ctx, createReq) require.NoError(t, err) require.Equal(t, replication, *created) @@ -317,6 +324,7 @@ func TestValidateUpdatedReplicationWithoutPersisting(t *testing.T) { Return(&influxdb.Bucket{}, nil) // Create a replication. + mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes) created, err := svc.CreateReplication(ctx, createReq) require.NoError(t, err) require.Equal(t, replication, *created) @@ -345,6 +353,7 @@ func TestValidateUpdatedReplicationWithoutPersisting(t *testing.T) { Return(&influxdb.Bucket{}, nil) // Create a replication. + mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes) created, err := svc.CreateReplication(ctx, createReq) require.NoError(t, err) require.Equal(t, replication, *created) @@ -375,6 +384,7 @@ func TestValidateUpdatedReplicationWithoutPersisting(t *testing.T) { Return(&influxdb.Bucket{}, nil) // Create a replication. + mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes) created, err := svc.CreateReplication(ctx, createReq) require.NoError(t, err) require.Equal(t, replication, *created) @@ -407,9 +417,11 @@ func TestDeleteReplication(t *testing.T) { require.Equal(t, errReplicationNotFound, svc.DeleteReplication(ctx, initID)) // Create a replication, then delete it. + mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes) created, err := svc.CreateReplication(ctx, createReq) require.NoError(t, err) require.Equal(t, replication, *created) + mocks.durableQueueManager.EXPECT().DeleteQueue(initID) require.NoError(t, svc.DeleteReplication(ctx, initID)) // Looking up the ID should again produce an error. @@ -440,6 +452,7 @@ func TestDeleteReplications(t *testing.T) { insertRemote(t, svc.store, createReq3.RemoteID) for _, req := range []influxdb.CreateReplicationRequest{createReq, createReq2, createReq3} { + mocks.durableQueueManager.EXPECT().InitializeQueue(gomock.Any(), req.MaxQueueSizeBytes) _, err := svc.CreateReplication(ctx, req) require.NoError(t, err) } @@ -449,6 +462,7 @@ func TestDeleteReplications(t *testing.T) { require.Len(t, listed.Replications, 3) // Delete 2/3 by bucket ID. + mocks.durableQueueManager.EXPECT().DeleteQueue(gomock.Any()).Times(2) require.NoError(t, svc.DeleteBucketReplications(ctx, createReq.LocalBucketID)) // Ensure they were deleted. @@ -476,6 +490,7 @@ func TestListReplications(t *testing.T) { var allReplications []influxdb.Replication for _, req := range []influxdb.CreateReplicationRequest{createReq, createReq2, createReq3} { + mocks.durableQueueManager.EXPECT().InitializeQueue(gomock.Any(), createReq.MaxQueueSizeBytes) created, err := svc.CreateReplication(ctx, req) require.NoError(t, err) allReplications = append(allReplications, *created) @@ -554,8 +569,9 @@ func TestListReplications(t *testing.T) { } type mocks struct { - bucketSvc *replicationsMock.MockBucketService - validator *replicationsMock.MockReplicationValidator + bucketSvc *replicationsMock.MockBucketService + validator *replicationsMock.MockReplicationValidator + durableQueueManager *replicationsMock.MockDurableQueueManager } func newTestService(t *testing.T) (*service, mocks, func(t *testing.T)) { @@ -570,15 +586,17 @@ func newTestService(t *testing.T) (*service, mocks, func(t *testing.T)) { ctrl := gomock.NewController(t) mocks := mocks{ - bucketSvc: replicationsMock.NewMockBucketService(ctrl), - validator: replicationsMock.NewMockReplicationValidator(ctrl), + bucketSvc: replicationsMock.NewMockBucketService(ctrl), + validator: replicationsMock.NewMockReplicationValidator(ctrl), + durableQueueManager: replicationsMock.NewMockDurableQueueManager(ctrl), } svc := service{ - store: store, - idGenerator: mock.NewIncrementingIDGenerator(initID), - bucketService: mocks.bucketSvc, - validator: mocks.validator, - log: logger, + store: store, + idGenerator: mock.NewIncrementingIDGenerator(initID), + bucketService: mocks.bucketSvc, + validator: mocks.validator, + log: logger, + durableQueueManager: mocks.durableQueueManager, } return &svc, mocks, clean