refactor: use remote write func in NewDurableQueueManager (#22888)
parent
b9b86a19a3
commit
6ee472725f
|
@ -7,7 +7,6 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/influxdata/influxdb/v2/pkg/durablequeue"
|
||||
|
@ -20,6 +19,8 @@ type replicationQueue struct {
|
|||
done chan struct{}
|
||||
receive chan struct{}
|
||||
logger *zap.Logger
|
||||
|
||||
writeFunc func([]byte) error
|
||||
}
|
||||
|
||||
type durableQueueManager struct {
|
||||
|
@ -27,6 +28,8 @@ type durableQueueManager struct {
|
|||
logger *zap.Logger
|
||||
queuePath string
|
||||
mutex sync.RWMutex
|
||||
|
||||
writeFunc func([]byte) error
|
||||
}
|
||||
|
||||
var errStartup = errors.New("startup tasks for replications durable queue management failed, see server logs for details")
|
||||
|
@ -34,7 +37,7 @@ var errShutdown = errors.New("shutdown tasks for replications durable queues fai
|
|||
|
||||
// NewDurableQueueManager creates a new durableQueueManager struct, for managing durable queues associated with
|
||||
//replication streams.
|
||||
func NewDurableQueueManager(log *zap.Logger, queuePath string) *durableQueueManager {
|
||||
func NewDurableQueueManager(log *zap.Logger, queuePath string, writeFunc func([]byte) error) *durableQueueManager {
|
||||
replicationQueues := make(map[platform.ID]*replicationQueue)
|
||||
|
||||
os.MkdirAll(queuePath, 0777)
|
||||
|
@ -43,6 +46,7 @@ func NewDurableQueueManager(log *zap.Logger, queuePath string) *durableQueueMana
|
|||
replicationQueues: replicationQueues,
|
||||
logger: log,
|
||||
queuePath: queuePath,
|
||||
writeFunc: writeFunc,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -88,10 +92,11 @@ func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQue
|
|||
|
||||
// Map new durable queue and scanner to its corresponding replication stream via replication ID
|
||||
rq := replicationQueue{
|
||||
queue: newQueue,
|
||||
logger: qm.logger,
|
||||
done: make(chan struct{}),
|
||||
receive: make(chan struct{}),
|
||||
queue: newQueue,
|
||||
done: make(chan struct{}),
|
||||
receive: make(chan struct{}),
|
||||
logger: qm.logger.With(zap.String("replication_id", replicationID.String())),
|
||||
writeFunc: qm.writeFunc,
|
||||
}
|
||||
qm.replicationQueues[replicationID] = &rq
|
||||
rq.Open()
|
||||
|
@ -114,72 +119,74 @@ func (rq *replicationQueue) Close() error {
|
|||
return rq.queue.Close()
|
||||
}
|
||||
|
||||
// WriteFunc is currently a placeholder for the "default" behavior
|
||||
// of the queue scanner sending data from the durable queue to a remote host.
|
||||
func WriteFunc(b []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rq *replicationQueue) run() {
|
||||
defer rq.wg.Done()
|
||||
|
||||
retryInterval := time.Second
|
||||
retryTimer := time.NewTicker(retryInterval)
|
||||
defer retryTimer.Stop()
|
||||
|
||||
writer := func() {
|
||||
for {
|
||||
_, err := rq.SendWrite(func(b []byte) error {
|
||||
rq.logger.Info("written bytes", zap.Int("len", len(b)))
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
// No more data
|
||||
// Handle this gracefully, as it is an expected error to receive
|
||||
} else {
|
||||
// todo more error handling
|
||||
panic(1)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-rq.done: // end the goroutine when done is messaged
|
||||
return
|
||||
case <-retryTimer.C: // run the scanner every 1s
|
||||
writer()
|
||||
case <-rq.receive: // run the scanner on data append
|
||||
writer()
|
||||
for rq.SendWrite(rq.writeFunc) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rq *replicationQueue) SendWrite(dp func([]byte) error) (int, error) {
|
||||
// err here can be io.EOF, indicating nothing to write
|
||||
// SendWrite processes data enqueued into the durablequeue.Queue.
|
||||
// SendWrite is responsible for processing all data in the queue at the time of calling.
|
||||
// Retryable errors should be handled and retried in the dp function.
|
||||
// Unprocessable data should be dropped in the dp function.
|
||||
func (rq *replicationQueue) SendWrite(dp func([]byte) error) bool {
|
||||
|
||||
// Any error in creating the scanner should exit the loop in run()
|
||||
// Either it is io.EOF indicating no data, or some other failure in making
|
||||
// the Scanner object that we don't know how to handle.
|
||||
scan, err := rq.queue.NewScanner()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
if err != io.EOF {
|
||||
rq.logger.Error("Error creating replications queue scanner", zap.Error(err))
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
var count int
|
||||
for scan.Next() {
|
||||
// This may return io.EOF to indicate an empty queue
|
||||
|
||||
// An io.EOF error here indicates that there is no more data
|
||||
// left to process, and is an expected error.
|
||||
if scan.Err() == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
// Any other here indicates a problem, so we log the error and
|
||||
// drop the data with a call to scan.Advance() later.
|
||||
if scan.Err() != nil {
|
||||
err = scan.Err()
|
||||
rq.logger.Info("Segment read error.", zap.Error(scan.Err()))
|
||||
break
|
||||
}
|
||||
|
||||
// An error here indicates an unhandlable error. Data is not corrupt, and
|
||||
// the remote write is not retryable. A potential example of an error here
|
||||
// is an authentication error with the remote host.
|
||||
if err = dp(scan.Bytes()); err != nil {
|
||||
break
|
||||
rq.logger.Error("Error in replication stream", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
count += len(scan.Bytes())
|
||||
}
|
||||
|
||||
if err != nil { // todo handle "skippable" errors
|
||||
rq.logger.Info("Segment read error.", zap.Error(scan.Err()))
|
||||
if _, err = scan.Advance(); err != nil {
|
||||
if err != io.EOF {
|
||||
rq.logger.Error("Error in replication queue scanner", zap.Error(err))
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
if _, err := scan.Advance(); err != nil {
|
||||
return count, err
|
||||
}
|
||||
return count, nil
|
||||
return true
|
||||
}
|
||||
|
||||
// DeleteQueue deletes a durable queue and its associated data on disk.
|
||||
|
@ -279,10 +286,11 @@ func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[pl
|
|||
continue
|
||||
} else {
|
||||
qm.replicationQueues[id] = &replicationQueue{
|
||||
queue: queue,
|
||||
logger: qm.logger,
|
||||
done: make(chan struct{}),
|
||||
receive: make(chan struct{}),
|
||||
queue: queue,
|
||||
done: make(chan struct{}),
|
||||
receive: make(chan struct{}),
|
||||
logger: qm.logger.With(zap.String("replication_id", id.String())),
|
||||
writeFunc: qm.writeFunc,
|
||||
}
|
||||
qm.replicationQueues[id].Open()
|
||||
qm.logger.Info("Opened replication stream", zap.String("id", id.String()), zap.String("path", queue.Dir()))
|
||||
|
|
|
@ -9,9 +9,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"go.uber.org/zap/zaptest/observer"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -32,79 +30,41 @@ func TestCreateNewQueueDirExists(t *testing.T) {
|
|||
require.DirExists(t, filepath.Join(queuePath, id1.String()))
|
||||
}
|
||||
|
||||
func TestEnqueueScanLog(t *testing.T) {
|
||||
func TestEnqueueScan(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Initialize queue manager with zap observer (to allow assertions on log messages)
|
||||
enginePath, err := os.MkdirTemp("", "engine")
|
||||
require.NoError(t, err)
|
||||
queuePath := filepath.Join(enginePath, "replicationq")
|
||||
|
||||
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
|
||||
observedLogger := zap.New(observedZapCore)
|
||||
|
||||
qm := NewDurableQueueManager(observedLogger, queuePath)
|
||||
queuePath, qm := initQueueManager(t)
|
||||
defer os.RemoveAll(filepath.Dir(queuePath))
|
||||
|
||||
// Create new queue
|
||||
err = qm.InitializeQueue(id1, maxQueueSizeBytes)
|
||||
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enqueue some data
|
||||
testData := "weather,location=us-midwest temperature=82 1465839830100400200"
|
||||
qm.writeFunc = getTestWriteFunc(t, testData)
|
||||
err = qm.EnqueueData(id1, []byte(testData))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Give it a second to scan the queue
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Check that data the scanner logs is the same as what was enqueued
|
||||
require.Equal(t, 1, observedLogs.Len())
|
||||
allLogs := observedLogs.All()
|
||||
firstLog := allLogs[0]
|
||||
require.Equal(t, firstLog.Message, "written bytes")
|
||||
require.Equal(t, int64(62), firstLog.ContextMap()["len"])
|
||||
}
|
||||
|
||||
func TestEnqueueScanLogMultiple(t *testing.T) {
|
||||
func TestEnqueueScanMultiple(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Initialize queue manager with zap observer (to allow assertions on log messages)
|
||||
enginePath, err := os.MkdirTemp("", "engine")
|
||||
require.NoError(t, err)
|
||||
queuePath := filepath.Join(enginePath, "replicationq")
|
||||
|
||||
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
|
||||
observedLogger := zap.New(observedZapCore)
|
||||
|
||||
qm := NewDurableQueueManager(observedLogger, queuePath)
|
||||
queuePath, qm := initQueueManager(t)
|
||||
defer os.RemoveAll(filepath.Dir(queuePath))
|
||||
|
||||
// Create new queue
|
||||
err = qm.InitializeQueue(id1, maxQueueSizeBytes)
|
||||
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enqueue some data
|
||||
testData1 := "weather,location=us-midwest temperature=82 1465839830100400200"
|
||||
err = qm.EnqueueData(id1, []byte(testData1))
|
||||
testData := "weather,location=us-midwest temperature=82 1465839830100400200"
|
||||
qm.writeFunc = getTestWriteFunc(t, testData)
|
||||
err = qm.EnqueueData(id1, []byte(testData))
|
||||
require.NoError(t, err)
|
||||
|
||||
testData2 := "weather,location=us-midwest temperature=83 1465839830100400201"
|
||||
err = qm.EnqueueData(id1, []byte(testData2))
|
||||
err = qm.EnqueueData(id1, []byte(testData))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Give it a second to scan the queue
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Check that data the scanner logs is the same as what was enqueued
|
||||
require.Equal(t, 2, observedLogs.Len())
|
||||
allLogs := observedLogs.All()
|
||||
|
||||
require.Equal(t, allLogs[0].Message, "written bytes")
|
||||
require.Equal(t, int64(62), allLogs[0].ContextMap()["len"])
|
||||
|
||||
require.Equal(t, allLogs[1].Message, "written bytes")
|
||||
require.Equal(t, int64(62), allLogs[1].ContextMap()["len"])
|
||||
}
|
||||
|
||||
func TestCreateNewQueueDuplicateID(t *testing.T) {
|
||||
|
@ -310,7 +270,7 @@ func initQueueManager(t *testing.T) (string, *durableQueueManager) {
|
|||
queuePath := filepath.Join(enginePath, "replicationq")
|
||||
|
||||
logger := zaptest.NewLogger(t)
|
||||
qm := NewDurableQueueManager(logger, queuePath)
|
||||
qm := NewDurableQueueManager(logger, queuePath, WriteFunc)
|
||||
|
||||
return queuePath, qm
|
||||
}
|
||||
|
@ -327,6 +287,14 @@ func shutdown(t *testing.T, qm *durableQueueManager) {
|
|||
qm.replicationQueues = emptyMap
|
||||
}
|
||||
|
||||
func getTestWriteFunc(t *testing.T, expected string) func([]byte) error {
|
||||
t.Helper()
|
||||
return func(b []byte) error {
|
||||
require.Equal(t, expected, string(b))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnqueueData(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -335,7 +303,7 @@ func TestEnqueueData(t *testing.T) {
|
|||
defer os.RemoveAll(queuePath)
|
||||
|
||||
logger := zaptest.NewLogger(t)
|
||||
qm := NewDurableQueueManager(logger, queuePath)
|
||||
qm := NewDurableQueueManager(logger, queuePath, WriteFunc)
|
||||
|
||||
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
|
||||
require.DirExists(t, filepath.Join(queuePath, id1.String()))
|
||||
|
@ -365,6 +333,8 @@ func TestEnqueueData(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGoroutineReceives(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
path, qm := initQueueManager(t)
|
||||
defer os.RemoveAll(path)
|
||||
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
|
||||
|
@ -386,6 +356,8 @@ func TestGoroutineReceives(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGoroutineCloses(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
path, qm := initQueueManager(t)
|
||||
defer os.RemoveAll(path)
|
||||
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
|
||||
|
|
|
@ -56,6 +56,7 @@ func NewService(store *sqlite.SqlStore, bktSvc BucketService, localWriter storag
|
|||
durableQueueManager: internal.NewDurableQueueManager(
|
||||
log,
|
||||
filepath.Join(enginePath, "replicationq"),
|
||||
internal.WriteFunc,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue