feat: add replications queue scanner (#22873)
Co-authored-by: “mcfarlm3” <“58636946+mcfarlm3@users.noreply.github.com”>pull/22886/head
parent
16e3b165ca
commit
40d9587ece
|
@ -3,17 +3,27 @@ package internal
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/influxdata/influxdb/v2/pkg/durablequeue"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type replicationQueue struct {
|
||||
queue *durablequeue.Queue
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
receive chan struct{}
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
type durableQueueManager struct {
|
||||
replicationQueues map[platform.ID]*durablequeue.Queue
|
||||
replicationQueues map[platform.ID]*replicationQueue
|
||||
logger *zap.Logger
|
||||
queuePath string
|
||||
mutex sync.RWMutex
|
||||
|
@ -25,7 +35,7 @@ var errShutdown = errors.New("shutdown tasks for replications durable queues fai
|
|||
// NewDurableQueueManager creates a new durableQueueManager struct, for managing durable queues associated with
|
||||
//replication streams.
|
||||
func NewDurableQueueManager(log *zap.Logger, queuePath string) *durableQueueManager {
|
||||
replicationQueues := make(map[platform.ID]*durablequeue.Queue)
|
||||
replicationQueues := make(map[platform.ID]*replicationQueue)
|
||||
|
||||
os.MkdirAll(queuePath, 0777)
|
||||
|
||||
|
@ -71,44 +81,133 @@ func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQue
|
|||
return err
|
||||
}
|
||||
|
||||
// Map new durable queue to its corresponding replication stream via replication ID
|
||||
qm.replicationQueues[replicationID] = newQueue
|
||||
|
||||
// Open the new queue
|
||||
if err := newQueue.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Map new durable queue and scanner to its corresponding replication stream via replication ID
|
||||
rq := replicationQueue{
|
||||
queue: newQueue,
|
||||
logger: qm.logger,
|
||||
done: make(chan struct{}),
|
||||
receive: make(chan struct{}),
|
||||
}
|
||||
qm.replicationQueues[replicationID] = &rq
|
||||
rq.Open()
|
||||
|
||||
qm.logger.Debug("Created new durable queue for replication stream",
|
||||
zap.String("id", replicationID.String()), zap.String("path", dir))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rq *replicationQueue) Open() {
|
||||
rq.wg.Add(1)
|
||||
go rq.run()
|
||||
}
|
||||
|
||||
func (rq *replicationQueue) Close() error {
|
||||
close(rq.receive)
|
||||
close(rq.done)
|
||||
rq.wg.Wait() // wait for goroutine to finish processing all messages
|
||||
return rq.queue.Close()
|
||||
}
|
||||
|
||||
func (rq *replicationQueue) run() {
|
||||
defer rq.wg.Done()
|
||||
|
||||
retryInterval := time.Second
|
||||
retryTimer := time.NewTicker(retryInterval)
|
||||
defer retryTimer.Stop()
|
||||
|
||||
writer := func() {
|
||||
for {
|
||||
_, err := rq.SendWrite(func(b []byte) error {
|
||||
rq.logger.Info("written bytes", zap.Int("len", len(b)))
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
// No more data
|
||||
// Handle this gracefully, as it is an expected error to receive
|
||||
} else {
|
||||
// todo more error handling
|
||||
panic(1)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-rq.done: // end the goroutine when done is messaged
|
||||
return
|
||||
case <-retryTimer.C: // run the scanner every 1s
|
||||
writer()
|
||||
case <-rq.receive: // run the scanner on data append
|
||||
writer()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rq *replicationQueue) SendWrite(dp func([]byte) error) (int, error) {
|
||||
// err here can be io.EOF, indicating nothing to write
|
||||
scan, err := rq.queue.NewScanner()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var count int
|
||||
for scan.Next() {
|
||||
// This may return io.EOF to indicate an empty queue
|
||||
if scan.Err() != nil {
|
||||
err = scan.Err()
|
||||
break
|
||||
}
|
||||
if err = dp(scan.Bytes()); err != nil {
|
||||
break
|
||||
}
|
||||
count += len(scan.Bytes())
|
||||
}
|
||||
|
||||
if err != nil { // todo handle "skippable" errors
|
||||
rq.logger.Info("Segment read error.", zap.Error(scan.Err()))
|
||||
}
|
||||
|
||||
if _, err := scan.Advance(); err != nil {
|
||||
return count, err
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// DeleteQueue deletes a durable queue and its associated data on disk.
|
||||
func (qm *durableQueueManager) DeleteQueue(replicationID platform.ID) error {
|
||||
qm.mutex.Lock()
|
||||
defer qm.mutex.Unlock()
|
||||
|
||||
if qm.replicationQueues[replicationID] == nil {
|
||||
if _, exist := qm.replicationQueues[replicationID]; !exist {
|
||||
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
|
||||
}
|
||||
|
||||
rq := qm.replicationQueues[replicationID]
|
||||
|
||||
// Close the queue
|
||||
if err := qm.replicationQueues[replicationID].Close(); err != nil {
|
||||
if err := rq.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
qm.logger.Debug("Closed replication stream durable queue",
|
||||
zap.String("id", replicationID.String()), zap.String("path", qm.replicationQueues[replicationID].Dir()))
|
||||
zap.String("id", replicationID.String()), zap.String("path", rq.queue.Dir()))
|
||||
|
||||
// Delete any enqueued, un-flushed data on disk for this queue
|
||||
if err := qm.replicationQueues[replicationID].Remove(); err != nil {
|
||||
if err := rq.queue.Remove(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
qm.logger.Debug("Deleted data associated with replication stream durable queue",
|
||||
zap.String("id", replicationID.String()), zap.String("path", qm.replicationQueues[replicationID].Dir()))
|
||||
zap.String("id", replicationID.String()), zap.String("path", rq.queue.Dir()))
|
||||
|
||||
// Remove entry from replicationQueues map
|
||||
delete(qm.replicationQueues, replicationID)
|
||||
|
@ -121,11 +220,11 @@ func (qm *durableQueueManager) UpdateMaxQueueSize(replicationID platform.ID, max
|
|||
qm.mutex.RLock()
|
||||
defer qm.mutex.RUnlock()
|
||||
|
||||
if qm.replicationQueues[replicationID] == nil {
|
||||
if _, exist := qm.replicationQueues[replicationID]; !exist {
|
||||
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
|
||||
}
|
||||
|
||||
if err := qm.replicationQueues[replicationID].SetMaxSize(maxQueueSizeBytes); err != nil {
|
||||
if err := qm.replicationQueues[replicationID].queue.SetMaxSize(maxQueueSizeBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -140,10 +239,10 @@ func (qm *durableQueueManager) CurrentQueueSizes(ids []platform.ID) (map[platfor
|
|||
sizes := make(map[platform.ID]int64, len(ids))
|
||||
|
||||
for _, id := range ids {
|
||||
if qm.replicationQueues[id] == nil {
|
||||
if _, exist := qm.replicationQueues[id]; !exist {
|
||||
return nil, fmt.Errorf("durable queue not found for replication ID %q", id)
|
||||
}
|
||||
sizes[id] = qm.replicationQueues[id].DiskUsage()
|
||||
sizes[id] = qm.replicationQueues[id].queue.DiskUsage()
|
||||
}
|
||||
|
||||
return sizes, nil
|
||||
|
@ -179,7 +278,13 @@ func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[pl
|
|||
errOccurred = true
|
||||
continue
|
||||
} else {
|
||||
qm.replicationQueues[id] = queue
|
||||
qm.replicationQueues[id] = &replicationQueue{
|
||||
queue: queue,
|
||||
logger: qm.logger,
|
||||
done: make(chan struct{}),
|
||||
receive: make(chan struct{}),
|
||||
}
|
||||
qm.replicationQueues[id].Open()
|
||||
qm.logger.Info("Opened replication stream", zap.String("id", id.String()), zap.String("path", queue.Dir()))
|
||||
}
|
||||
}
|
||||
|
@ -206,7 +311,7 @@ func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[pl
|
|||
}
|
||||
|
||||
// Partial delete found, needs to be fully removed
|
||||
if qm.replicationQueues[*id] == nil {
|
||||
if _, exist := qm.replicationQueues[*id]; !exist {
|
||||
if err := os.RemoveAll(filepath.Join(qm.queuePath, id.String())); err != nil {
|
||||
qm.logger.Error("failed to remove durable queue during partial delete cleanup", zap.Error(err), zap.String("id", id.String()))
|
||||
errOccurred = true
|
||||
|
@ -225,8 +330,8 @@ func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[pl
|
|||
func (qm *durableQueueManager) CloseAll() error {
|
||||
errOccurred := false
|
||||
|
||||
for id, queue := range qm.replicationQueues {
|
||||
if err := queue.Close(); err != nil {
|
||||
for id, replicationQueue := range qm.replicationQueues {
|
||||
if err := replicationQueue.Close(); err != nil {
|
||||
qm.logger.Error("failed to close durable queue", zap.Error(err), zap.String("id", id.String()))
|
||||
errOccurred = true
|
||||
}
|
||||
|
@ -244,13 +349,14 @@ func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byt
|
|||
qm.mutex.RLock()
|
||||
defer qm.mutex.RUnlock()
|
||||
|
||||
if qm.replicationQueues[replicationID] == nil {
|
||||
if _, exist := qm.replicationQueues[replicationID]; !exist {
|
||||
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
|
||||
}
|
||||
|
||||
if err := qm.replicationQueues[replicationID].Append(data); err != nil {
|
||||
if err := qm.replicationQueues[replicationID].queue.Append(data); err != nil {
|
||||
return err
|
||||
}
|
||||
qm.replicationQueues[replicationID].receive <- struct{}{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -4,12 +4,14 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/influxdata/influxdb/v2/pkg/durablequeue"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"go.uber.org/zap/zaptest/observer"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -30,6 +32,81 @@ func TestCreateNewQueueDirExists(t *testing.T) {
|
|||
require.DirExists(t, filepath.Join(queuePath, id1.String()))
|
||||
}
|
||||
|
||||
func TestEnqueueScanLog(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Initialize queue manager with zap observer (to allow assertions on log messages)
|
||||
enginePath, err := os.MkdirTemp("", "engine")
|
||||
require.NoError(t, err)
|
||||
queuePath := filepath.Join(enginePath, "replicationq")
|
||||
|
||||
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
|
||||
observedLogger := zap.New(observedZapCore)
|
||||
|
||||
qm := NewDurableQueueManager(observedLogger, queuePath)
|
||||
defer os.RemoveAll(filepath.Dir(queuePath))
|
||||
|
||||
// Create new queue
|
||||
err = qm.InitializeQueue(id1, maxQueueSizeBytes)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enqueue some data
|
||||
testData := "weather,location=us-midwest temperature=82 1465839830100400200"
|
||||
err = qm.EnqueueData(id1, []byte(testData))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Give it a second to scan the queue
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Check that data the scanner logs is the same as what was enqueued
|
||||
require.Equal(t, 1, observedLogs.Len())
|
||||
allLogs := observedLogs.All()
|
||||
firstLog := allLogs[0]
|
||||
require.Equal(t, firstLog.Message, "written bytes")
|
||||
require.Equal(t, int64(62), firstLog.ContextMap()["len"])
|
||||
}
|
||||
|
||||
func TestEnqueueScanLogMultiple(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Initialize queue manager with zap observer (to allow assertions on log messages)
|
||||
enginePath, err := os.MkdirTemp("", "engine")
|
||||
require.NoError(t, err)
|
||||
queuePath := filepath.Join(enginePath, "replicationq")
|
||||
|
||||
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
|
||||
observedLogger := zap.New(observedZapCore)
|
||||
|
||||
qm := NewDurableQueueManager(observedLogger, queuePath)
|
||||
defer os.RemoveAll(filepath.Dir(queuePath))
|
||||
|
||||
// Create new queue
|
||||
err = qm.InitializeQueue(id1, maxQueueSizeBytes)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enqueue some data
|
||||
testData1 := "weather,location=us-midwest temperature=82 1465839830100400200"
|
||||
err = qm.EnqueueData(id1, []byte(testData1))
|
||||
require.NoError(t, err)
|
||||
|
||||
testData2 := "weather,location=us-midwest temperature=83 1465839830100400201"
|
||||
err = qm.EnqueueData(id1, []byte(testData2))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Give it a second to scan the queue
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Check that data the scanner logs is the same as what was enqueued
|
||||
require.Equal(t, 2, observedLogs.Len())
|
||||
allLogs := observedLogs.All()
|
||||
|
||||
require.Equal(t, allLogs[0].Message, "written bytes")
|
||||
require.Equal(t, int64(62), allLogs[0].ContextMap()["len"])
|
||||
|
||||
require.Equal(t, allLogs[1].Message, "written bytes")
|
||||
require.Equal(t, int64(62), allLogs[1].ContextMap()["len"])
|
||||
}
|
||||
|
||||
func TestCreateNewQueueDuplicateID(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -110,7 +187,7 @@ func TestStartReplicationQueue(t *testing.T) {
|
|||
require.NotNil(t, qm.replicationQueues[id1])
|
||||
|
||||
// Ensure queue is open by trying to remove, will error if open
|
||||
err = qm.replicationQueues[id1].Remove()
|
||||
err = qm.replicationQueues[id1].queue.Remove()
|
||||
require.Errorf(t, err, "queue is open")
|
||||
}
|
||||
|
||||
|
@ -179,9 +256,9 @@ func TestStartReplicationQueuesMultiple(t *testing.T) {
|
|||
require.DirExists(t, filepath.Join(queuePath, id2.String()))
|
||||
|
||||
// Ensure both queues are open by trying to remove, will error if open
|
||||
err = qm.replicationQueues[id1].Remove()
|
||||
err = qm.replicationQueues[id1].queue.Remove()
|
||||
require.Errorf(t, err, "queue is open")
|
||||
err = qm.replicationQueues[id2].Remove()
|
||||
err = qm.replicationQueues[id2].queue.Remove()
|
||||
require.Errorf(t, err, "queue is open")
|
||||
}
|
||||
|
||||
|
@ -221,7 +298,7 @@ func TestStartReplicationQueuesMultipleWithPartialDelete(t *testing.T) {
|
|||
require.NoDirExists(t, filepath.Join(queuePath, id2.String()))
|
||||
|
||||
// Ensure queue1 is open by trying to remove, will error if open
|
||||
err = qm.replicationQueues[id1].Remove()
|
||||
err = qm.replicationQueues[id1].queue.Remove()
|
||||
require.Errorf(t, err, "queue is open")
|
||||
}
|
||||
|
||||
|
@ -246,7 +323,7 @@ func shutdown(t *testing.T, qm *durableQueueManager) {
|
|||
require.NoError(t, err)
|
||||
|
||||
// Clear replication queues map
|
||||
emptyMap := make(map[platform.ID]*durablequeue.Queue)
|
||||
emptyMap := make(map[platform.ID]*replicationQueue)
|
||||
qm.replicationQueues = emptyMap
|
||||
}
|
||||
|
||||
|
@ -270,13 +347,56 @@ func TestEnqueueData(t *testing.T) {
|
|||
|
||||
data := "some fake data"
|
||||
|
||||
// close the scanner goroutine to specifically test EnqueueData()
|
||||
rq, ok := qm.replicationQueues[id1]
|
||||
require.True(t, ok)
|
||||
close(rq.done)
|
||||
go func() { <-rq.receive }() // absorb the receive to avoid testcase deadlock
|
||||
|
||||
require.NoError(t, qm.EnqueueData(id1, []byte(data)))
|
||||
sizes, err = qm.CurrentQueueSizes([]platform.ID{id1})
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, sizes[id1], int64(8))
|
||||
|
||||
written, err := qm.replicationQueues[id1].Current()
|
||||
written, err := qm.replicationQueues[id1].queue.Current()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, data, string(written))
|
||||
}
|
||||
|
||||
func TestGoroutineReceives(t *testing.T) {
|
||||
path, qm := initQueueManager(t)
|
||||
defer os.RemoveAll(path)
|
||||
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
|
||||
require.DirExists(t, filepath.Join(path, id1.String()))
|
||||
|
||||
rq, ok := qm.replicationQueues[id1]
|
||||
require.True(t, ok)
|
||||
require.NotNil(t, rq)
|
||||
close(rq.done) // atypical from normal behavior, but lets us receive channels to test
|
||||
|
||||
go func() { require.NoError(t, qm.EnqueueData(id1, []byte("1234"))) }()
|
||||
select {
|
||||
case <-rq.receive:
|
||||
return
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Test timed out")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestGoroutineCloses(t *testing.T) {
|
||||
path, qm := initQueueManager(t)
|
||||
defer os.RemoveAll(path)
|
||||
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
|
||||
require.DirExists(t, filepath.Join(path, id1.String()))
|
||||
|
||||
rq, ok := qm.replicationQueues[id1]
|
||||
require.True(t, ok)
|
||||
require.NotNil(t, rq)
|
||||
require.NoError(t, qm.CloseAll())
|
||||
|
||||
// wg should be zero here, indicating that the goroutine has closed
|
||||
// if this does not panic, then the routine is still active
|
||||
require.Panics(t, func() { rq.wg.Add(-1) })
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue