feat: replication apis durable queue management (#22719)

* feat: added durable queue management to replications service

* refactor: improved mapping of replication streams to durable queues

* refactor: modified replication stream durable queues to use user-specified engine path

* chore: generated test mocks for replications DurableQueueManager

* chore: add test coverage for replications durable queue manager

* refactor: made changes based on code review, added mutex to durableQueueManager, improved error logging

* chore: ran make fmt

* refactor: further improvements to error logging
pull/22785/head
mcfarlm3 2021-10-26 12:14:29 -07:00 committed by GitHub
parent fba7fac534
commit 8825cd5d50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 405 additions and 22 deletions

View File

@ -655,7 +655,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
remotesServer := remotesTransport.NewInstrumentedRemotesHandler(
m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc)
replicationSvc := replications.NewService(m.sqlStore, ts, m.log.With(zap.String("service", "replications")))
replicationSvc := replications.NewService(m.sqlStore, ts, m.log.With(zap.String("service", "replications")), opts.EnginePath)
replicationServer := replicationTransport.NewInstrumentedReplicationHandler(
m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc)

View File

@ -360,6 +360,11 @@ func (l *Queue) TotalBytes() int64 {
return n
}
// Dir returns the directory associated with the queue.
func (l *Queue) Dir() string {
return l.dir
}
// diskUsage returns the total size on disk used by the Queue.
func (l *Queue) diskUsage() int64 {
var size int64

View File

@ -0,0 +1,128 @@
package internal
import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/pkg/durablequeue"
"go.uber.org/zap"
)
type durableQueueManager struct {
replicationQueues map[platform.ID]*durablequeue.Queue
logger *zap.Logger
enginePath string
mutex sync.RWMutex
}
// NewDurableQueueManager creates a new durableQueueManager struct, for managing durable queues associated with
//replication streams.
func NewDurableQueueManager(log *zap.Logger, enginePath string) *durableQueueManager {
replicationQueues := make(map[platform.ID]*durablequeue.Queue)
return &durableQueueManager{
replicationQueues: replicationQueues,
logger: log,
enginePath: enginePath,
}
}
// InitializeQueue creates a new durable queue which is associated with a replication stream.
func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64) error {
qm.mutex.Lock()
defer qm.mutex.Unlock()
// Check for duplicate replication ID
if _, exists := qm.replicationQueues[replicationID]; exists {
return fmt.Errorf("durable queue already exists for replication ID %q", replicationID)
}
// Set up path for new queue on disk
dir := filepath.Join(
qm.enginePath,
"replicationq",
replicationID.String(),
)
if err := os.MkdirAll(dir, 0777); err != nil {
return err
}
// Create a new durable queue
newQueue, err := durablequeue.NewQueue(
dir,
maxQueueSizeBytes,
durablequeue.DefaultSegmentSize,
&durablequeue.SharedCount{},
durablequeue.MaxWritesPending,
func(bytes []byte) error {
return nil
},
)
if err != nil {
return err
}
// Map new durable queue to its corresponding replication stream via replication ID
qm.replicationQueues[replicationID] = newQueue
// Open the new queue
if err := newQueue.Open(); err != nil {
return err
}
qm.logger.Debug("Created new durable queue for replication stream",
zap.String("id", replicationID.String()), zap.String("path", dir))
return nil
}
// DeleteQueue deletes a durable queue and its associated data on disk.
func (qm *durableQueueManager) DeleteQueue(replicationID platform.ID) error {
qm.mutex.Lock()
defer qm.mutex.Unlock()
if qm.replicationQueues[replicationID] == nil {
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
}
// Close the queue
if err := qm.replicationQueues[replicationID].Close(); err != nil {
return err
}
qm.logger.Debug("Closed replication stream durable queue",
zap.String("id", replicationID.String()), zap.String("path", qm.replicationQueues[replicationID].Dir()))
// Delete any enqueued, un-flushed data on disk for this queue
if err := qm.replicationQueues[replicationID].Remove(); err != nil {
return err
}
qm.logger.Debug("Deleted data associated with replication stream durable queue",
zap.String("id", replicationID.String()), zap.String("path", qm.replicationQueues[replicationID].Dir()))
// Remove entry from replicationQueues map
delete(qm.replicationQueues, replicationID)
return nil
}
// UpdateMaxQueueSize updates the maximum size of the durable queue.
func (qm *durableQueueManager) UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error {
qm.mutex.RLock()
defer qm.mutex.RUnlock()
if qm.replicationQueues[replicationID] == nil {
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
}
if err := qm.replicationQueues[replicationID].SetMaxSize(maxQueueSizeBytes); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,100 @@
package internal
import (
"os"
"path/filepath"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
var (
replicationID = platform.ID(1)
maxQueueSizeBytes = 3 * influxdb.DefaultReplicationMaxQueueSizeBytes
)
func TestCreateNewQueueDirExists(t *testing.T) {
t.Parallel()
tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
err = qm.InitializeQueue(replicationID, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(tempEnginePath, "replicationq", replicationID.String()))
}
func TestCreateNewQueueDuplicateID(t *testing.T) {
t.Parallel()
tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)
// Create a valid new queue
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
err = qm.InitializeQueue(replicationID, maxQueueSizeBytes)
require.NoError(t, err)
// Try to initialize another queue with the same replication ID
err = qm.InitializeQueue(replicationID, maxQueueSizeBytes)
require.EqualError(t, err, "durable queue already exists for replication ID \"0000000000000001\"")
}
func TestDeleteQueueDirRemoved(t *testing.T) {
t.Parallel()
tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)
// Create a valid new queue
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
err = qm.InitializeQueue(replicationID, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(tempEnginePath, "replicationq", replicationID.String()))
// Delete queue and make sure its queue has been deleted from disk
err = qm.DeleteQueue(replicationID)
require.NoError(t, err)
require.NoDirExists(t, filepath.Join(tempEnginePath, "replicationq", replicationID.String()))
}
func TestDeleteQueueNonexistentID(t *testing.T) {
t.Parallel()
tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
// Delete nonexistent queue
err = qm.DeleteQueue(replicationID)
require.EqualError(t, err, "durable queue not found for replication ID \"0000000000000001\"")
}
func TestUpdateMaxQueueSizeNonexistentID(t *testing.T) {
t.Parallel()
tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
// Update nonexistent queue
err = qm.UpdateMaxQueueSize(replicationID, influxdb.DefaultReplicationMaxQueueSizeBytes)
require.EqualError(t, err, "durable queue not found for replication ID \"0000000000000001\"")
}

View File

@ -0,0 +1,77 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/influxdata/influxdb/v2/replications (interfaces: DurableQueueManager)
// Package mock is a generated GoMock package.
package mock
import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
platform "github.com/influxdata/influxdb/v2/kit/platform"
)
// MockDurableQueueManager is a mock of DurableQueueManager interface.
type MockDurableQueueManager struct {
ctrl *gomock.Controller
recorder *MockDurableQueueManagerMockRecorder
}
// MockDurableQueueManagerMockRecorder is the mock recorder for MockDurableQueueManager.
type MockDurableQueueManagerMockRecorder struct {
mock *MockDurableQueueManager
}
// NewMockDurableQueueManager creates a new mock instance.
func NewMockDurableQueueManager(ctrl *gomock.Controller) *MockDurableQueueManager {
mock := &MockDurableQueueManager{ctrl: ctrl}
mock.recorder = &MockDurableQueueManagerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockDurableQueueManager) EXPECT() *MockDurableQueueManagerMockRecorder {
return m.recorder
}
// DeleteQueue mocks base method.
func (m *MockDurableQueueManager) DeleteQueue(arg0 platform.ID) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteQueue", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteQueue indicates an expected call of DeleteQueue.
func (mr *MockDurableQueueManagerMockRecorder) DeleteQueue(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).DeleteQueue), arg0)
}
// InitializeQueue mocks base method.
func (m *MockDurableQueueManager) InitializeQueue(arg0 platform.ID, arg1 int64) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "InitializeQueue", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// InitializeQueue indicates an expected call of InitializeQueue.
func (mr *MockDurableQueueManagerMockRecorder) InitializeQueue(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitializeQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).InitializeQueue), arg0, arg1)
}
// UpdateMaxQueueSize mocks base method.
func (m *MockDurableQueueManager) UpdateMaxQueueSize(arg0 platform.ID, arg1 int64) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateMaxQueueSize", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateMaxQueueSize indicates an expected call of UpdateMaxQueueSize.
func (mr *MockDurableQueueManagerMockRecorder) UpdateMaxQueueSize(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateMaxQueueSize", reflect.TypeOf((*MockDurableQueueManager)(nil).UpdateMaxQueueSize), arg0, arg1)
}

View File

@ -38,13 +38,14 @@ func errLocalBucketNotFound(id platform.ID, cause error) error {
}
}
func NewService(store *sqlite.SqlStore, bktSvc BucketService, log *zap.Logger) *service {
func NewService(store *sqlite.SqlStore, bktSvc BucketService, log *zap.Logger, enginePath string) *service {
return &service{
store: store,
idGenerator: snowflake.NewIDGenerator(),
bucketService: bktSvc,
validator: internal.NewValidator(),
log: log,
durableQueueManager: internal.NewDurableQueueManager(log, enginePath),
}
}
@ -58,11 +59,18 @@ type BucketService interface {
FindBucketByID(ctx context.Context, id platform.ID) (*influxdb.Bucket, error)
}
type DurableQueueManager interface {
InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64) error
DeleteQueue(replicationID platform.ID) error
UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error
}
type service struct {
store *sqlite.SqlStore
idGenerator platform.IDGenerator
bucketService BucketService
validator ReplicationValidator
durableQueueManager DurableQueueManager
log *zap.Logger
}
@ -108,9 +116,14 @@ func (s service) CreateReplication(ctx context.Context, request influxdb.CreateR
return nil, errLocalBucketNotFound(request.LocalBucketID, err)
}
newID := s.idGenerator.ID()
if err := s.durableQueueManager.InitializeQueue(newID, request.MaxQueueSizeBytes); err != nil {
return nil, err
}
q := sq.Insert("replications").
SetMap(sq.Eq{
"id": s.idGenerator.ID(),
"id": newID,
"org_id": request.OrgID,
"name": request.Name,
"description": request.Description,
@ -124,18 +137,29 @@ func (s service) CreateReplication(ctx context.Context, request influxdb.CreateR
}).
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, current_queue_size_bytes")
cleanupQueue := func() {
if cleanupErr := s.durableQueueManager.DeleteQueue(newID); cleanupErr != nil {
s.log.Warn("durable queue remaining on disk after initialization failure", zap.Error(cleanupErr), zap.String("ID", newID.String()))
}
}
query, args, err := q.ToSql()
if err != nil {
cleanupQueue()
return nil, err
}
var r influxdb.Replication
if err := s.store.DB.GetContext(ctx, &r, query, args...); err != nil {
if sqlErr, ok := err.(sqlite3.Error); ok && sqlErr.ExtendedCode == sqlite3.ErrConstraintForeignKey {
cleanupQueue()
return nil, errRemoteNotFound(request.RemoteID, err)
}
cleanupQueue()
return nil, err
}
return &r, nil
}
@ -222,6 +246,13 @@ func (s service) UpdateReplication(ctx context.Context, id platform.ID, request
}
return nil, err
}
if request.MaxQueueSizeBytes != nil {
if err := s.durableQueueManager.UpdateMaxQueueSize(id, *request.MaxQueueSizeBytes); err != nil {
s.log.Warn("actual max queue size does not match the max queue size recorded in database", zap.String("ID", id.String()))
return nil, err
}
}
return &r, nil
}
@ -267,6 +298,11 @@ func (s service) DeleteReplication(ctx context.Context, id platform.ID) error {
}
return err
}
if err := s.durableQueueManager.DeleteQueue(id); err != nil {
return err
}
return nil
}
@ -284,9 +320,28 @@ func (s service) DeleteBucketReplications(ctx context.Context, localBucketID pla
if err := s.store.DB.SelectContext(ctx, &deleted, query, args...); err != nil {
return err
}
errOccurred := false
for _, replication := range deleted {
id, err := platform.IDFromString(replication)
if err != nil {
s.log.Error("durable queue remaining on disk after deletion failure", zap.Error(err), zap.String("ID", replication))
errOccurred = true
}
if err := s.durableQueueManager.DeleteQueue(*id); err != nil {
s.log.Error("durable queue remaining on disk after deletion failure", zap.Error(err), zap.String("ID", replication))
errOccurred = true
}
}
s.log.Debug("Deleted all replications for local bucket",
zap.String("bucket_id", localBucketID.String()), zap.Strings("ids", deleted))
if errOccurred {
return fmt.Errorf("deleting replications for bucket %q failed, see server logs for details", localBucketID)
}
return nil
}

View File

@ -93,6 +93,7 @@ func TestCreateAndGetReplication(t *testing.T) {
require.Equal(t, errReplicationNotFound, svc.ValidateReplication(ctx, initID))
// Create a replication, check the results.
mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes)
created, err := svc.CreateReplication(ctx, createReq)
require.NoError(t, err)
require.Equal(t, replication, *created)
@ -143,6 +144,8 @@ func TestCreateMissingRemote(t *testing.T) {
mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), createReq.LocalBucketID).
Return(&influxdb.Bucket{}, nil)
mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes)
mocks.durableQueueManager.EXPECT().DeleteQueue(initID)
created, err := svc.CreateReplication(ctx, createReq)
require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf("remote %q not found", createReq.RemoteID))
@ -239,11 +242,13 @@ func TestUpdateAndGetReplication(t *testing.T) {
require.Nil(t, updated)
// Create a replication.
mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes)
created, err := svc.CreateReplication(ctx, createReq)
require.NoError(t, err)
require.Equal(t, replication, *created)
// Update the replication.
mocks.durableQueueManager.EXPECT().UpdateMaxQueueSize(initID, *updateReq.MaxQueueSizeBytes)
updated, err = svc.UpdateReplication(ctx, initID, updateReq)
require.NoError(t, err)
require.Equal(t, updatedReplication, *updated)
@ -262,6 +267,7 @@ func TestUpdateMissingRemote(t *testing.T) {
Return(&influxdb.Bucket{}, nil)
// Create a replication.
mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes)
created, err := svc.CreateReplication(ctx, createReq)
require.NoError(t, err)
require.Equal(t, replication, *created)
@ -291,6 +297,7 @@ func TestUpdateNoop(t *testing.T) {
Return(&influxdb.Bucket{}, nil)
// Create a replication.
mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes)
created, err := svc.CreateReplication(ctx, createReq)
require.NoError(t, err)
require.Equal(t, replication, *created)
@ -317,6 +324,7 @@ func TestValidateUpdatedReplicationWithoutPersisting(t *testing.T) {
Return(&influxdb.Bucket{}, nil)
// Create a replication.
mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes)
created, err := svc.CreateReplication(ctx, createReq)
require.NoError(t, err)
require.Equal(t, replication, *created)
@ -345,6 +353,7 @@ func TestValidateUpdatedReplicationWithoutPersisting(t *testing.T) {
Return(&influxdb.Bucket{}, nil)
// Create a replication.
mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes)
created, err := svc.CreateReplication(ctx, createReq)
require.NoError(t, err)
require.Equal(t, replication, *created)
@ -375,6 +384,7 @@ func TestValidateUpdatedReplicationWithoutPersisting(t *testing.T) {
Return(&influxdb.Bucket{}, nil)
// Create a replication.
mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes)
created, err := svc.CreateReplication(ctx, createReq)
require.NoError(t, err)
require.Equal(t, replication, *created)
@ -407,9 +417,11 @@ func TestDeleteReplication(t *testing.T) {
require.Equal(t, errReplicationNotFound, svc.DeleteReplication(ctx, initID))
// Create a replication, then delete it.
mocks.durableQueueManager.EXPECT().InitializeQueue(initID, createReq.MaxQueueSizeBytes)
created, err := svc.CreateReplication(ctx, createReq)
require.NoError(t, err)
require.Equal(t, replication, *created)
mocks.durableQueueManager.EXPECT().DeleteQueue(initID)
require.NoError(t, svc.DeleteReplication(ctx, initID))
// Looking up the ID should again produce an error.
@ -440,6 +452,7 @@ func TestDeleteReplications(t *testing.T) {
insertRemote(t, svc.store, createReq3.RemoteID)
for _, req := range []influxdb.CreateReplicationRequest{createReq, createReq2, createReq3} {
mocks.durableQueueManager.EXPECT().InitializeQueue(gomock.Any(), req.MaxQueueSizeBytes)
_, err := svc.CreateReplication(ctx, req)
require.NoError(t, err)
}
@ -449,6 +462,7 @@ func TestDeleteReplications(t *testing.T) {
require.Len(t, listed.Replications, 3)
// Delete 2/3 by bucket ID.
mocks.durableQueueManager.EXPECT().DeleteQueue(gomock.Any()).Times(2)
require.NoError(t, svc.DeleteBucketReplications(ctx, createReq.LocalBucketID))
// Ensure they were deleted.
@ -476,6 +490,7 @@ func TestListReplications(t *testing.T) {
var allReplications []influxdb.Replication
for _, req := range []influxdb.CreateReplicationRequest{createReq, createReq2, createReq3} {
mocks.durableQueueManager.EXPECT().InitializeQueue(gomock.Any(), createReq.MaxQueueSizeBytes)
created, err := svc.CreateReplication(ctx, req)
require.NoError(t, err)
allReplications = append(allReplications, *created)
@ -556,6 +571,7 @@ func TestListReplications(t *testing.T) {
type mocks struct {
bucketSvc *replicationsMock.MockBucketService
validator *replicationsMock.MockReplicationValidator
durableQueueManager *replicationsMock.MockDurableQueueManager
}
func newTestService(t *testing.T) (*service, mocks, func(t *testing.T)) {
@ -572,6 +588,7 @@ func newTestService(t *testing.T) (*service, mocks, func(t *testing.T)) {
mocks := mocks{
bucketSvc: replicationsMock.NewMockBucketService(ctrl),
validator: replicationsMock.NewMockReplicationValidator(ctrl),
durableQueueManager: replicationsMock.NewMockDurableQueueManager(ctrl),
}
svc := service{
store: store,
@ -579,6 +596,7 @@ func newTestService(t *testing.T) (*service, mocks, func(t *testing.T)) {
bucketService: mocks.bucketSvc,
validator: mocks.validator,
log: logger,
durableQueueManager: mocks.durableQueueManager,
}
return &svc, mocks, clean