fix: handle replication missing queue (#24123)
* fix: replications should startup after backup/restore * chore: refactor * test: improve logging and handle test betterpull/24135/head
parent
387d9007a7
commit
77fd64a975
|
@ -4,6 +4,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -358,9 +359,31 @@ func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[pl
|
|||
|
||||
// Open and map the queue struct to its replication ID
|
||||
if err := queue.Open(); err != nil {
|
||||
qm.logger.Error("failed to open replication stream durable queue", zap.Error(err), zap.String("id", id.String()))
|
||||
errOccurred = true
|
||||
continue
|
||||
// This could have errored after a backup/restore (we do not persist the replicationq).
|
||||
// Check if the dir exists, create if it doesn't, then open and carry on
|
||||
if pErr, ok := err.(*fs.PathError); ok {
|
||||
path := pErr.Path
|
||||
if _, err := os.Stat(path); err != nil && os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(path, 0777); err != nil {
|
||||
qm.logger.Error("error attempting to recreate missing replication queue", zap.Error(err), zap.String("id", id.String()), zap.String("path", path))
|
||||
errOccurred = true
|
||||
continue
|
||||
}
|
||||
|
||||
if err := queue.Open(); err != nil {
|
||||
qm.logger.Error("error attempting to open replication queue", zap.Error(err), zap.String("id", id.String()), zap.String("path", path))
|
||||
errOccurred = true
|
||||
continue
|
||||
}
|
||||
|
||||
qm.replicationQueues[id] = qm.newReplicationQueue(id, repl.OrgID, repl.LocalBucketID, queue, repl.MaxAgeSeconds)
|
||||
qm.replicationQueues[id].Open()
|
||||
qm.logger.Info("Opened replication stream", zap.String("id", id.String()), zap.String("path", queue.Dir()))
|
||||
}
|
||||
} else {
|
||||
qm.logger.Error("failed to open replication stream durable queue", zap.Error(err), zap.String("id", id.String()), zap.String("path", queue.Dir()))
|
||||
errOccurred = true
|
||||
}
|
||||
} else {
|
||||
qm.replicationQueues[id] = qm.newReplicationQueue(id, repl.OrgID, repl.LocalBucketID, queue, repl.MaxAgeSeconds)
|
||||
qm.replicationQueues[id].Open()
|
||||
|
|
|
@ -695,3 +695,42 @@ func TestGetReplications(t *testing.T) {
|
|||
repls = qm.GetReplications(orgID2, localBucketID2)
|
||||
require.ElementsMatch(t, expectedRepls, repls)
|
||||
}
|
||||
|
||||
func TestReplicationStartMissingQueue(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
queuePath, qm := initQueueManager(t)
|
||||
defer os.RemoveAll(filepath.Dir(queuePath))
|
||||
|
||||
// Create new queue
|
||||
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)
|
||||
require.NoError(t, err)
|
||||
require.DirExists(t, filepath.Join(queuePath, id1.String()))
|
||||
|
||||
// Represents the replications tracked in sqlite, this one is tracked
|
||||
trackedReplications := make(map[platform.ID]*influxdb.TrackedReplication)
|
||||
trackedReplications[id1] = &influxdb.TrackedReplication{
|
||||
MaxQueueSizeBytes: maxQueueSizeBytes,
|
||||
MaxAgeSeconds: 0,
|
||||
OrgID: orgID1,
|
||||
LocalBucketID: localBucketID1,
|
||||
}
|
||||
|
||||
// Simulate server shutdown by closing all queues and clearing replicationQueues map
|
||||
shutdown(t, qm)
|
||||
|
||||
// Delete the queue to simulate restoring from a backup
|
||||
err = os.RemoveAll(filepath.Join(queuePath))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Call startup function
|
||||
err = qm.StartReplicationQueues(trackedReplications)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Make sure queue is stored in map
|
||||
require.NotNil(t, qm.replicationQueues[id1])
|
||||
|
||||
// Ensure queue is open by trying to remove, will error if open
|
||||
err = qm.replicationQueues[id1].queue.Remove()
|
||||
require.Errorf(t, err, "queue is open")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue