feat: added replications queue management to launcher tasks (#22820)

* feat: added replications queue management to launcher tasks

* refactor: separated sql logic into replications service rather than durable queue manager

* refactor: extended replications feature flag to launcher code and minor change to startup function param

* chore: added unit test coverage for replications server startup queue management

* refactor: made error messages reusable and factored out unecessary string from queue management tests

* refactor: changed queue management error names to pass linter check
pull/22858/head
mcfarlm3 2021-11-09 11:32:07 -08:00 committed by GitHub
parent f6568a7ff6
commit cd0243d2b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 387 additions and 55 deletions

View File

@ -662,6 +662,22 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
ts.BucketService = replications.NewBucketService(
m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc)
replicationsFlag := feature.ReplicationStreamBackend()
if replicationsFlag.Enabled(ctx, m.flagger) {
if err = replicationSvc.Open(ctx); err != nil {
m.log.Error("Failed to open replications service", zap.Error(err))
return err
}
m.closers = append(m.closers, labeledCloser{
label: "replications",
closer: func(context.Context) error {
return replicationSvc.Close()
},
})
}
errorHandler := kithttp.NewErrorHandler(m.log.With(zap.String("handler", "error_logger")))
m.apibackend = &http.APIBackend{
AssetsPath: opts.AssetsPath,

View File

@ -1,6 +1,7 @@
package internal
import (
"errors"
"fmt"
"os"
"path/filepath"
@ -14,23 +15,28 @@ import (
type durableQueueManager struct {
replicationQueues map[platform.ID]*durablequeue.Queue
logger *zap.Logger
enginePath string
queuePath string
mutex sync.RWMutex
}
var errStartup = errors.New("startup tasks for replications durable queue management failed, see server logs for details")
var errShutdown = errors.New("shutdown tasks for replications durable queues failed, see server logs for details")
// NewDurableQueueManager creates a new durableQueueManager struct, for managing durable queues associated with
//replication streams.
func NewDurableQueueManager(log *zap.Logger, enginePath string) *durableQueueManager {
func NewDurableQueueManager(log *zap.Logger, queuePath string) *durableQueueManager {
replicationQueues := make(map[platform.ID]*durablequeue.Queue)
os.MkdirAll(queuePath, 0777)
return &durableQueueManager{
replicationQueues: replicationQueues,
logger: log,
enginePath: enginePath,
queuePath: queuePath,
}
}
// InitializeQueue creates a new durable queue which is associated with a replication stream.
// InitializeQueue creates and opens a new durable queue which is associated with a replication stream.
func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64) error {
qm.mutex.Lock()
defer qm.mutex.Unlock()
@ -42,8 +48,7 @@ func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQue
// Set up path for new queue on disk
dir := filepath.Join(
qm.enginePath,
"replicationq",
qm.queuePath,
replicationID.String(),
)
if err := os.MkdirAll(dir, 0777); err != nil {
@ -111,7 +116,7 @@ func (qm *durableQueueManager) DeleteQueue(replicationID platform.ID) error {
return nil
}
// UpdateMaxQueueSize updates the maximum size of the durable queue.
// UpdateMaxQueueSize updates the maximum size of a durable queue.
func (qm *durableQueueManager) UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error {
qm.mutex.RLock()
defer qm.mutex.RUnlock()
@ -143,3 +148,93 @@ func (qm *durableQueueManager) CurrentQueueSizes(ids []platform.ID) (map[platfor
return sizes, nil
}
// StartReplicationQueues updates the durableQueueManager.replicationQueues map, fully removing any partially deleted
// queues (present on disk, but not tracked in sqlite), opening all current queues, and logging info for each.
func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[platform.ID]int64) error {
errOccurred := false
for id, size := range trackedReplications {
// Re-initialize a queue struct for each replication stream from sqlite
queue, err := durablequeue.NewQueue(
filepath.Join(qm.queuePath, id.String()),
size,
durablequeue.DefaultSegmentSize,
&durablequeue.SharedCount{},
durablequeue.MaxWritesPending,
func(bytes []byte) error {
return nil
},
)
if err != nil {
qm.logger.Error("failed to initialize replication stream durable queue", zap.Error(err))
errOccurred = true
continue
}
// Open and map the queue struct to its replication ID
if err := queue.Open(); err != nil {
qm.logger.Error("failed to open replication stream durable queue", zap.Error(err), zap.String("id", id.String()))
errOccurred = true
continue
} else {
qm.replicationQueues[id] = queue
qm.logger.Info("Opened replication stream", zap.String("id", id.String()), zap.String("path", queue.Dir()))
}
}
if errOccurred {
return errStartup
}
// Get contents of replicationq directory
entries, err := os.ReadDir(qm.queuePath)
if err != nil {
return err
}
for _, entry := range entries {
// Skip over non-relevant entries (must be a dir named with a replication ID)
if !entry.IsDir() {
continue
}
id, err := platform.IDFromString(entry.Name())
if err != nil {
continue
}
// Partial delete found, needs to be fully removed
if qm.replicationQueues[*id] == nil {
if err := os.RemoveAll(filepath.Join(qm.queuePath, id.String())); err != nil {
qm.logger.Error("failed to remove durable queue during partial delete cleanup", zap.Error(err), zap.String("id", id.String()))
errOccurred = true
}
}
}
if errOccurred {
return errStartup
} else {
return nil
}
}
// CloseAll loops through all current replication stream queues and closes them without deleting on-disk resources
func (qm *durableQueueManager) CloseAll() error {
errOccurred := false
for id, queue := range qm.replicationQueues {
if err := queue.Close(); err != nil {
qm.logger.Error("failed to close durable queue", zap.Error(err), zap.String("id", id.String()))
errOccurred = true
}
}
if errOccurred {
return errShutdown
} else {
return nil
}
}

View File

@ -7,94 +7,245 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/pkg/durablequeue"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
var (
replicationID = platform.ID(1)
id1 = platform.ID(1)
id2 = platform.ID(2)
maxQueueSizeBytes = 3 * influxdb.DefaultReplicationMaxQueueSizeBytes
)
func TestCreateNewQueueDirExists(t *testing.T) {
t.Parallel()
tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
err = qm.InitializeQueue(replicationID, maxQueueSizeBytes)
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(tempEnginePath, "replicationq", replicationID.String()))
require.DirExists(t, filepath.Join(queuePath, id1.String()))
}
func TestCreateNewQueueDuplicateID(t *testing.T) {
t.Parallel()
tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create a valid new queue
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
err = qm.InitializeQueue(replicationID, maxQueueSizeBytes)
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
// Try to initialize another queue with the same replication ID
err = qm.InitializeQueue(replicationID, maxQueueSizeBytes)
err = qm.InitializeQueue(id1, maxQueueSizeBytes)
require.EqualError(t, err, "durable queue already exists for replication ID \"0000000000000001\"")
}
func TestDeleteQueueDirRemoved(t *testing.T) {
t.Parallel()
tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create a valid new queue
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
err = qm.InitializeQueue(replicationID, maxQueueSizeBytes)
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(tempEnginePath, "replicationq", replicationID.String()))
require.DirExists(t, filepath.Join(queuePath, id1.String()))
// Delete queue and make sure its queue has been deleted from disk
err = qm.DeleteQueue(replicationID)
err = qm.DeleteQueue(id1)
require.NoError(t, err)
require.NoDirExists(t, filepath.Join(tempEnginePath, "replicationq", replicationID.String()))
require.NoDirExists(t, filepath.Join(queuePath, id1.String()))
}
func TestDeleteQueueNonexistentID(t *testing.T) {
t.Parallel()
tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Delete nonexistent queue
err = qm.DeleteQueue(replicationID)
err := qm.DeleteQueue(id1)
require.EqualError(t, err, "durable queue not found for replication ID \"0000000000000001\"")
}
func TestUpdateMaxQueueSizeNonexistentID(t *testing.T) {
t.Parallel()
tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Update nonexistent queue
err = qm.UpdateMaxQueueSize(replicationID, influxdb.DefaultReplicationMaxQueueSizeBytes)
err := qm.UpdateMaxQueueSize(id1, influxdb.DefaultReplicationMaxQueueSizeBytes)
require.EqualError(t, err, "durable queue not found for replication ID \"0000000000000001\"")
}
func TestStartReplicationQueue(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
// Represents the replications tracked in sqlite, this one is tracked
trackedReplications := make(map[platform.ID]int64)
trackedReplications[id1] = maxQueueSizeBytes
// Simulate server shutdown by closing all queues and clearing replicationQueues map
shutdown(t, qm)
// Call startup function
err = qm.StartReplicationQueues(trackedReplications)
require.NoError(t, err)
// Make sure queue is stored in map
require.NotNil(t, qm.replicationQueues[id1])
// Ensure queue is open by trying to remove, will error if open
err = qm.replicationQueues[id1].Remove()
require.Errorf(t, err, "queue is open")
}
func TestStartReplicationQueuePartialDelete(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
// Represents the replications tracked in sqlite, replication above is not tracked (not present in map)
trackedReplications := make(map[platform.ID]int64)
// Simulate server shutdown by closing all queues and clearing replicationQueues map
shutdown(t, qm)
// Call startup function
err = qm.StartReplicationQueues(trackedReplications)
require.NoError(t, err)
// Make sure queue is not stored in map
require.Nil(t, qm.replicationQueues[id1])
// Check for queue on disk, should be removed
require.NoDirExists(t, filepath.Join(queuePath, id1.String()))
}
func TestStartReplicationQueuesMultiple(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create queue1
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
// Create queue2
err = qm.InitializeQueue(id2, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id2.String()))
// Represents the replications tracked in sqlite, both replications above are tracked
trackedReplications := make(map[platform.ID]int64)
trackedReplications[id1] = maxQueueSizeBytes
trackedReplications[id2] = maxQueueSizeBytes
// Simulate server shutdown by closing all queues and clearing replicationQueues map
shutdown(t, qm)
// Call startup function
err = qm.StartReplicationQueues(trackedReplications)
require.NoError(t, err)
// Make sure both queues are stored in map
require.NotNil(t, qm.replicationQueues[id1])
require.NotNil(t, qm.replicationQueues[id2])
// Make sure both queues are present on disk
require.DirExists(t, filepath.Join(queuePath, id1.String()))
require.DirExists(t, filepath.Join(queuePath, id2.String()))
// Ensure both queues are open by trying to remove, will error if open
err = qm.replicationQueues[id1].Remove()
require.Errorf(t, err, "queue is open")
err = qm.replicationQueues[id2].Remove()
require.Errorf(t, err, "queue is open")
}
func TestStartReplicationQueuesMultipleWithPartialDelete(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create queue1
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
// Create queue2
err = qm.InitializeQueue(id2, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id2.String()))
// Represents the replications tracked in sqlite, queue1 is tracked and queue2 is not
trackedReplications := make(map[platform.ID]int64)
trackedReplications[id1] = maxQueueSizeBytes
// Simulate server shutdown by closing all queues and clearing replicationQueues map
shutdown(t, qm)
// Call startup function
err = qm.StartReplicationQueues(trackedReplications)
require.NoError(t, err)
// Make sure queue1 is in replicationQueues map and queue2 is not
require.NotNil(t, qm.replicationQueues[id1])
require.Nil(t, qm.replicationQueues[id2])
// Make sure queue1 is present on disk and queue2 has been removed
require.DirExists(t, filepath.Join(queuePath, id1.String()))
require.NoDirExists(t, filepath.Join(queuePath, id2.String()))
// Ensure queue1 is open by trying to remove, will error if open
err = qm.replicationQueues[id1].Remove()
require.Errorf(t, err, "queue is open")
}
func initQueueManager(t *testing.T) (string, *durableQueueManager) {
t.Helper()
enginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
queuePath := filepath.Join(enginePath, "replicationq")
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, queuePath)
return queuePath, qm
}
func shutdown(t *testing.T, qm *durableQueueManager) {
t.Helper()
// Close all queues
err := qm.CloseAll()
require.NoError(t, err)
// Clear replication queues map
emptyMap := make(map[platform.ID]*durablequeue.Queue)
qm.replicationQueues = emptyMap
}

View File

@ -49,6 +49,20 @@ func (mr *MockDurableQueueManagerMockRecorder) CurrentQueueSizes(arg0 interface{
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CurrentQueueSizes", reflect.TypeOf((*MockDurableQueueManager)(nil).CurrentQueueSizes), arg0)
}
// CloseAll mocks base method.
func (m *MockDurableQueueManager) CloseAll() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CloseAll")
ret0, _ := ret[0].(error)
return ret0
}
// CloseAll indicates an expected call of CloseAll.
func (mr *MockDurableQueueManagerMockRecorder) CloseAll() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseAll", reflect.TypeOf((*MockDurableQueueManager)(nil).CloseAll))
}
// DeleteQueue mocks base method.
func (m *MockDurableQueueManager) DeleteQueue(arg0 platform.ID) error {
m.ctrl.T.Helper()
@ -77,6 +91,20 @@ func (mr *MockDurableQueueManagerMockRecorder) InitializeQueue(arg0, arg1 interf
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitializeQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).InitializeQueue), arg0, arg1)
}
// StartReplicationQueues mocks base method.
func (m *MockDurableQueueManager) StartReplicationQueues(arg0 map[platform.ID]int64) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StartReplicationQueues", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// StartReplicationQueues indicates an expected call of StartReplicationQueues.
func (mr *MockDurableQueueManagerMockRecorder) StartReplicationQueues(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartReplicationQueues", reflect.TypeOf((*MockDurableQueueManager)(nil).StartReplicationQueues), arg0)
}
// UpdateMaxQueueSize mocks base method.
func (m *MockDurableQueueManager) UpdateMaxQueueSize(arg0 platform.ID, arg1 int64) error {
m.ctrl.T.Helper()

View File

@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"path/filepath"
sq "github.com/Masterminds/squirrel"
"github.com/influxdata/influxdb/v2"
@ -40,12 +41,15 @@ func errLocalBucketNotFound(id platform.ID, cause error) error {
func NewService(store *sqlite.SqlStore, bktSvc BucketService, log *zap.Logger, enginePath string) *service {
return &service{
store: store,
idGenerator: snowflake.NewIDGenerator(),
bucketService: bktSvc,
validator: internal.NewValidator(),
log: log,
durableQueueManager: internal.NewDurableQueueManager(log, enginePath),
store: store,
idGenerator: snowflake.NewIDGenerator(),
bucketService: bktSvc,
validator: internal.NewValidator(),
log: log,
durableQueueManager: internal.NewDurableQueueManager(
log,
filepath.Join(enginePath, "replicationq"),
),
}
}
@ -64,6 +68,8 @@ type DurableQueueManager interface {
DeleteQueue(replicationID platform.ID) error
UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error
CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error)
StartReplicationQueues(trackedReplications map[platform.ID]int64) error
CloseAll() error
}
type service struct {
@ -154,7 +160,7 @@ func (s service) CreateReplication(ctx context.Context, request influxdb.CreateR
cleanupQueue := func() {
if cleanupErr := s.durableQueueManager.DeleteQueue(newID); cleanupErr != nil {
s.log.Warn("durable queue remaining on disk after initialization failure", zap.Error(cleanupErr), zap.String("ID", newID.String()))
s.log.Warn("durable queue remaining on disk after initialization failure", zap.Error(cleanupErr), zap.String("id", newID.String()))
}
}
@ -270,7 +276,7 @@ func (s service) UpdateReplication(ctx context.Context, id platform.ID, request
if request.MaxQueueSizeBytes != nil {
if err := s.durableQueueManager.UpdateMaxQueueSize(id, *request.MaxQueueSizeBytes); err != nil {
s.log.Warn("actual max queue size does not match the max queue size recorded in database", zap.String("ID", id.String()))
s.log.Warn("actual max queue size does not match the max queue size recorded in database", zap.String("id", id.String()))
return nil, err
}
}
@ -353,12 +359,12 @@ func (s service) DeleteBucketReplications(ctx context.Context, localBucketID pla
for _, replication := range deleted {
id, err := platform.IDFromString(replication)
if err != nil {
s.log.Error("durable queue remaining on disk after deletion failure", zap.Error(err), zap.String("ID", replication))
s.log.Error("durable queue remaining on disk after deletion failure", zap.Error(err), zap.String("id", replication))
errOccurred = true
}
if err := s.durableQueueManager.DeleteQueue(*id); err != nil {
s.log.Error("durable queue remaining on disk after deletion failure", zap.Error(err), zap.String("ID", replication))
s.log.Error("durable queue remaining on disk after deletion failure", zap.Error(err), zap.String("id", replication))
errOccurred = true
}
}
@ -424,3 +430,39 @@ func (s service) populateRemoteHTTPConfig(ctx context.Context, id platform.ID, t
return nil
}
func (s service) Open(ctx context.Context) error {
var trackedReplications influxdb.Replications
// Get replications from sqlite
q := sq.Select(
"id", "max_queue_size_bytes").
From("replications")
query, args, err := q.ToSql()
if err != nil {
return err
}
if err := s.store.DB.SelectContext(ctx, &trackedReplications.Replications, query, args...); err != nil {
return err
}
trackedReplicationsMap := make(map[platform.ID]int64)
for _, r := range trackedReplications.Replications {
trackedReplicationsMap[r.ID] = r.MaxQueueSizeBytes
}
// Queue manager completes startup tasks
if err := s.durableQueueManager.StartReplicationQueues(trackedReplicationsMap); err != nil {
return err
}
return nil
}
func (s service) Close() error {
if err := s.durableQueueManager.CloseAll(); err != nil {
return err
}
return nil
}