519 lines
16 KiB
Go
519 lines
16 KiB
Go
package internal
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"math"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/v2"
|
|
"github.com/influxdata/influxdb/v2/kit/platform"
|
|
"github.com/influxdata/influxdb/v2/pkg/durablequeue"
|
|
"github.com/influxdata/influxdb/v2/replications/metrics"
|
|
"github.com/influxdata/influxdb/v2/replications/remotewrite"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
scannerAdvanceInterval = 10 * time.Second
|
|
purgeInterval = 60 * time.Second
|
|
defaultMaxAge = 7 * 24 * time.Hour // 1 week
|
|
)
|
|
|
|
type remoteWriter interface {
|
|
Write(data []byte, attempt int) (time.Duration, error)
|
|
}
|
|
|
|
type replicationQueue struct {
|
|
id platform.ID
|
|
orgID platform.ID
|
|
localBucketID platform.ID
|
|
queue *durablequeue.Queue
|
|
wg sync.WaitGroup
|
|
done chan struct{}
|
|
receive chan struct{}
|
|
logger *zap.Logger
|
|
metrics *metrics.ReplicationsMetrics
|
|
remoteWriter remoteWriter
|
|
failedWrites int
|
|
maxAge time.Duration
|
|
}
|
|
|
|
type durableQueueManager struct {
|
|
replicationQueues map[platform.ID]*replicationQueue
|
|
logger *zap.Logger
|
|
queuePath string
|
|
mutex sync.RWMutex
|
|
metrics *metrics.ReplicationsMetrics
|
|
configStore remotewrite.HttpConfigStore
|
|
}
|
|
|
|
var errStartup = errors.New("startup tasks for replications durable queue management failed, see server logs for details")
|
|
var errShutdown = errors.New("shutdown tasks for replications durable queues failed, see server logs for details")
|
|
|
|
// NewDurableQueueManager creates a new durableQueueManager struct, for managing durable queues associated with
|
|
// replication streams.
|
|
func NewDurableQueueManager(log *zap.Logger, queuePath string, metrics *metrics.ReplicationsMetrics, configStore remotewrite.HttpConfigStore) *durableQueueManager {
|
|
replicationQueues := make(map[platform.ID]*replicationQueue)
|
|
|
|
os.MkdirAll(queuePath, 0777)
|
|
|
|
return &durableQueueManager{
|
|
replicationQueues: replicationQueues,
|
|
logger: log,
|
|
queuePath: queuePath,
|
|
metrics: metrics,
|
|
configStore: configStore,
|
|
}
|
|
}
|
|
|
|
// InitializeQueue creates and opens a new durable queue which is associated with a replication stream.
|
|
func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64, orgID platform.ID, localBucketID platform.ID, maxAgeSeconds int64) error {
|
|
qm.mutex.Lock()
|
|
defer qm.mutex.Unlock()
|
|
|
|
// Check for duplicate replication ID
|
|
if _, exists := qm.replicationQueues[replicationID]; exists {
|
|
return fmt.Errorf("durable queue already exists for replication ID %q", replicationID)
|
|
}
|
|
|
|
// Set up path for new queue on disk
|
|
dir := filepath.Join(
|
|
qm.queuePath,
|
|
replicationID.String(),
|
|
)
|
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create a new durable queue
|
|
newQueue, err := durablequeue.NewQueue(
|
|
dir,
|
|
maxQueueSizeBytes,
|
|
durablequeue.DefaultSegmentSize,
|
|
&durablequeue.SharedCount{},
|
|
durablequeue.MaxWritesPending,
|
|
func(bytes []byte) error {
|
|
return nil
|
|
},
|
|
)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Open the new queue
|
|
if err := newQueue.Open(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Map new durable queue and scanner to its corresponding replication stream via replication ID
|
|
rq := qm.newReplicationQueue(replicationID, orgID, localBucketID, newQueue, maxAgeSeconds)
|
|
qm.replicationQueues[replicationID] = rq
|
|
rq.Open()
|
|
|
|
qm.logger.Debug("Created new durable queue for replication stream",
|
|
zap.String("id", replicationID.String()), zap.String("path", dir))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (rq *replicationQueue) Open() {
|
|
rq.wg.Add(1)
|
|
go rq.run()
|
|
}
|
|
|
|
func (rq *replicationQueue) Close() error {
|
|
close(rq.receive)
|
|
close(rq.done)
|
|
rq.wg.Wait() // wait for goroutine to finish processing all messages
|
|
return rq.queue.Close()
|
|
}
|
|
|
|
func (rq *replicationQueue) run() {
|
|
defer rq.wg.Done()
|
|
retry := time.NewTimer(math.MaxInt64)
|
|
purgeTicker := time.NewTicker(purgeInterval)
|
|
|
|
sendWrite := func() time.Duration {
|
|
for {
|
|
waitForRetry, shouldRetry := rq.SendWrite()
|
|
|
|
if !shouldRetry {
|
|
return math.MaxInt64
|
|
}
|
|
|
|
// immediately retry if the wait time is zero
|
|
if waitForRetry == 0 {
|
|
continue
|
|
}
|
|
return waitForRetry
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-rq.done: // end the goroutine when done is messaged
|
|
return
|
|
case <-rq.receive: // run the scanner on data append
|
|
// Receive channel has a buffer to prevent a potential race condition where rq.SendWrite has reached EOF and will
|
|
// return false, but data is queued after evaluating the scanner and before the loop is ready to select on the
|
|
// receive channel again. This would result in data remaining unprocessed in the queue until the next send on the
|
|
// receive channel since the send to the receive channel in qm.EnqueueData is non-blocking. The buffer ensures
|
|
// that rq.SendWrite will be called again in this situation and not leave data in the queue. Outside of this
|
|
// specific scenario, the buffer might result in an extra call to rq.SendWrite that will immediately return on
|
|
// EOF.
|
|
retryTime := sendWrite()
|
|
if !retry.Stop() {
|
|
<-retry.C
|
|
}
|
|
retry.Reset(retryTime)
|
|
case <-retry.C:
|
|
retryTime := sendWrite()
|
|
retry.Reset(retryTime)
|
|
case <-purgeTicker.C:
|
|
if rq.maxAge != 0 {
|
|
rq.queue.PurgeOlderThan(time.Now().Add(-rq.maxAge))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// SendWrite processes data enqueued into the durablequeue.Queue.
|
|
// SendWrite is responsible for processing all data in the queue at the time of calling.
|
|
func (rq *replicationQueue) SendWrite() (waitForRetry time.Duration, shouldRetry bool) {
|
|
// Any error in creating the scanner should exit the loop in run()
|
|
// Either it is io.EOF indicating no data, or some other failure in making
|
|
// the Scanner object that we don't know how to handle.
|
|
scan, err := rq.queue.NewScanner()
|
|
if err != nil {
|
|
if !errors.Is(err, io.EOF) {
|
|
rq.logger.Error("Error creating replications queue scanner", zap.Error(err))
|
|
}
|
|
return 0, false
|
|
}
|
|
|
|
advanceScanner := func() error {
|
|
if _, err = scan.Advance(); err != nil {
|
|
if err != io.EOF {
|
|
rq.logger.Error("Error in replication queue scanner", zap.Error(err))
|
|
}
|
|
return err
|
|
}
|
|
rq.metrics.Dequeue(rq.id, rq.queue.TotalBytes())
|
|
return nil
|
|
}
|
|
|
|
ticker := time.NewTicker(scannerAdvanceInterval)
|
|
defer ticker.Stop()
|
|
|
|
for scan.Next() {
|
|
if err := scan.Err(); err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
// An io.EOF error here indicates that there is no more data left to process, and is an expected error.
|
|
return 0, false
|
|
}
|
|
// Any other error here indicates a problem reading the data from the queue, so we log the error and drop the data
|
|
// with a call to scan.Advance() later.
|
|
rq.logger.Info("Segment read error.", zap.Error(scan.Err()))
|
|
}
|
|
|
|
if waitForRetry, err := rq.remoteWriter.Write(scan.Bytes(), rq.failedWrites); err != nil {
|
|
rq.failedWrites++
|
|
// We failed the remote write. Do not advance the scanner
|
|
rq.logger.Error("Error in replication stream", zap.Error(err), zap.Int("retries", rq.failedWrites))
|
|
return waitForRetry, true
|
|
}
|
|
|
|
// a successful write resets the number of failed write attempts to zero
|
|
rq.failedWrites = 0
|
|
|
|
// Advance the scanner periodically to prevent extended runs of local writes without updating the underlying queue
|
|
// position.
|
|
select {
|
|
case <-ticker.C:
|
|
if err := advanceScanner(); err != nil {
|
|
return 0, false
|
|
}
|
|
default:
|
|
}
|
|
}
|
|
|
|
if err := advanceScanner(); err != nil {
|
|
return 0, false
|
|
}
|
|
return 0, true
|
|
}
|
|
|
|
// DeleteQueue deletes a durable queue and its associated data on disk.
|
|
func (qm *durableQueueManager) DeleteQueue(replicationID platform.ID) error {
|
|
qm.mutex.Lock()
|
|
defer qm.mutex.Unlock()
|
|
|
|
if _, exist := qm.replicationQueues[replicationID]; !exist {
|
|
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
|
|
}
|
|
|
|
rq := qm.replicationQueues[replicationID]
|
|
|
|
// Close the queue
|
|
if err := rq.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
qm.logger.Debug("Closed replication stream durable queue",
|
|
zap.String("id", replicationID.String()), zap.String("path", rq.queue.Dir()))
|
|
|
|
// Delete any enqueued, un-flushed data on disk for this queue
|
|
if err := rq.queue.Remove(); err != nil {
|
|
return err
|
|
}
|
|
|
|
qm.logger.Debug("Deleted data associated with replication stream durable queue",
|
|
zap.String("id", replicationID.String()), zap.String("path", rq.queue.Dir()))
|
|
|
|
// Remove entry from replicationQueues map
|
|
delete(qm.replicationQueues, replicationID)
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateMaxQueueSize updates the maximum size of a durable queue.
|
|
func (qm *durableQueueManager) UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error {
|
|
qm.mutex.RLock()
|
|
defer qm.mutex.RUnlock()
|
|
|
|
if _, exist := qm.replicationQueues[replicationID]; !exist {
|
|
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
|
|
}
|
|
|
|
if err := qm.replicationQueues[replicationID].queue.SetMaxSize(maxQueueSizeBytes); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CurrentQueueSizes returns the current size-on-disk for the requested set of durable queues.
|
|
func (qm *durableQueueManager) CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) {
|
|
qm.mutex.RLock()
|
|
defer qm.mutex.RUnlock()
|
|
|
|
sizes := make(map[platform.ID]int64, len(ids))
|
|
|
|
for _, id := range ids {
|
|
if _, exist := qm.replicationQueues[id]; !exist {
|
|
return nil, fmt.Errorf("durable queue not found for replication ID %q", id)
|
|
}
|
|
sizes[id] = qm.replicationQueues[id].queue.DiskUsage()
|
|
}
|
|
|
|
return sizes, nil
|
|
}
|
|
|
|
// Returns the remaining number of bytes in Queue to be read:
|
|
func (qm *durableQueueManager) RemainingQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) {
|
|
qm.mutex.RLock()
|
|
defer qm.mutex.RUnlock()
|
|
|
|
sizes := make(map[platform.ID]int64, len(ids))
|
|
|
|
for _, id := range ids {
|
|
if _, exist := qm.replicationQueues[id]; !exist {
|
|
return nil, fmt.Errorf("durable queue not found for replication ID %q", id)
|
|
}
|
|
sizes[id] = qm.replicationQueues[id].queue.TotalBytes()
|
|
}
|
|
|
|
return sizes, nil
|
|
}
|
|
|
|
// StartReplicationQueues updates the durableQueueManager.replicationQueues map, fully removing any partially deleted
|
|
// queues (present on disk, but not tracked in sqlite), opening all current queues, and logging info for each.
|
|
func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[platform.ID]*influxdb.TrackedReplication) error {
|
|
errOccurred := false
|
|
|
|
for id, repl := range trackedReplications {
|
|
// Re-initialize a queue struct for each replication stream from sqlite
|
|
queue, err := durablequeue.NewQueue(
|
|
filepath.Join(qm.queuePath, id.String()),
|
|
repl.MaxQueueSizeBytes,
|
|
durablequeue.DefaultSegmentSize,
|
|
&durablequeue.SharedCount{},
|
|
durablequeue.MaxWritesPending,
|
|
func(bytes []byte) error {
|
|
return nil
|
|
},
|
|
)
|
|
|
|
if err != nil {
|
|
qm.logger.Error("failed to initialize replication stream durable queue", zap.Error(err))
|
|
errOccurred = true
|
|
continue
|
|
}
|
|
|
|
// Open and map the queue struct to its replication ID
|
|
if err := queue.Open(); err != nil {
|
|
// This could have errored after a backup/restore (we do not persist the replicationq).
|
|
// Check if the dir exists, create if it doesn't, then open and carry on
|
|
if pErr, ok := err.(*fs.PathError); ok {
|
|
path := pErr.Path
|
|
if _, err := os.Stat(path); err != nil && os.IsNotExist(err) {
|
|
if err := os.MkdirAll(path, 0777); err != nil {
|
|
qm.logger.Error("error attempting to recreate missing replication queue", zap.Error(err), zap.String("id", id.String()), zap.String("path", path))
|
|
errOccurred = true
|
|
continue
|
|
}
|
|
|
|
if err := queue.Open(); err != nil {
|
|
qm.logger.Error("error attempting to open replication queue", zap.Error(err), zap.String("id", id.String()), zap.String("path", path))
|
|
errOccurred = true
|
|
continue
|
|
}
|
|
|
|
qm.replicationQueues[id] = qm.newReplicationQueue(id, repl.OrgID, repl.LocalBucketID, queue, repl.MaxAgeSeconds)
|
|
qm.replicationQueues[id].Open()
|
|
qm.logger.Info("Opened replication stream", zap.String("id", id.String()), zap.String("path", queue.Dir()))
|
|
}
|
|
} else {
|
|
qm.logger.Error("failed to open replication stream durable queue", zap.Error(err), zap.String("id", id.String()), zap.String("path", queue.Dir()))
|
|
errOccurred = true
|
|
}
|
|
} else {
|
|
qm.replicationQueues[id] = qm.newReplicationQueue(id, repl.OrgID, repl.LocalBucketID, queue, repl.MaxAgeSeconds)
|
|
qm.replicationQueues[id].Open()
|
|
qm.logger.Info("Opened replication stream", zap.String("id", id.String()), zap.String("path", queue.Dir()))
|
|
}
|
|
}
|
|
|
|
if errOccurred {
|
|
return errStartup
|
|
}
|
|
|
|
// Get contents of replicationq directory
|
|
entries, err := os.ReadDir(qm.queuePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, entry := range entries {
|
|
// Skip over non-relevant entries (must be a dir named with a replication ID)
|
|
if !entry.IsDir() {
|
|
continue
|
|
}
|
|
|
|
id, err := platform.IDFromString(entry.Name())
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// Partial delete found, needs to be fully removed
|
|
if _, exist := qm.replicationQueues[*id]; !exist {
|
|
if err := os.RemoveAll(filepath.Join(qm.queuePath, id.String())); err != nil {
|
|
qm.logger.Error("failed to remove durable queue during partial delete cleanup", zap.Error(err), zap.String("id", id.String()))
|
|
errOccurred = true
|
|
}
|
|
}
|
|
}
|
|
|
|
if errOccurred {
|
|
return errStartup
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// CloseAll loops through all current replication stream queues and closes them without deleting on-disk resources
|
|
func (qm *durableQueueManager) CloseAll() error {
|
|
errOccurred := false
|
|
|
|
for id, replicationQueue := range qm.replicationQueues {
|
|
if err := replicationQueue.Close(); err != nil {
|
|
qm.logger.Error("failed to close durable queue", zap.Error(err), zap.String("id", id.String()))
|
|
errOccurred = true
|
|
}
|
|
}
|
|
|
|
if errOccurred {
|
|
return errShutdown
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// EnqueueData persists a set of bytes to a replication's durable queue.
|
|
func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byte, numPoints int) (err error) {
|
|
qm.mutex.RLock()
|
|
defer qm.mutex.RUnlock()
|
|
|
|
// Update metrics if data fails to be added to queue
|
|
defer func() {
|
|
if err != nil {
|
|
qm.metrics.EnqueueError(replicationID, len(data), numPoints)
|
|
}
|
|
}()
|
|
|
|
rq, ok := qm.replicationQueues[replicationID]
|
|
if !ok {
|
|
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
|
|
}
|
|
|
|
if err := rq.queue.Append(data); err != nil {
|
|
return err
|
|
}
|
|
// Update metrics for this replication queue when adding data to the queue.
|
|
qm.metrics.EnqueueData(replicationID, len(data), numPoints, rq.queue.TotalBytes())
|
|
|
|
// Send to the replication receive channel if it is not full to activate the queue processing. If the receive channel
|
|
// is full, don't block further writes and return.
|
|
select {
|
|
case qm.replicationQueues[replicationID].receive <- struct{}{}:
|
|
default:
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (qm *durableQueueManager) newReplicationQueue(id platform.ID, orgID platform.ID, localBucketID platform.ID, queue *durablequeue.Queue, maxAgeSeconds int64) *replicationQueue {
|
|
logger := qm.logger.With(zap.String("replication_id", id.String()))
|
|
done := make(chan struct{})
|
|
// check for max age minimum
|
|
var maxAgeTime time.Duration
|
|
if maxAgeSeconds < 0 {
|
|
maxAgeTime = defaultMaxAge
|
|
} else {
|
|
maxAgeTime = time.Duration(maxAgeSeconds) * time.Second
|
|
}
|
|
|
|
return &replicationQueue{
|
|
id: id,
|
|
orgID: orgID,
|
|
localBucketID: localBucketID,
|
|
queue: queue,
|
|
done: done,
|
|
receive: make(chan struct{}, 1),
|
|
logger: logger,
|
|
metrics: qm.metrics,
|
|
remoteWriter: remotewrite.NewWriter(id, qm.configStore, qm.metrics, logger, done),
|
|
maxAge: maxAgeTime,
|
|
}
|
|
}
|
|
|
|
// GetReplications returns the ids of all currently registered replication streams matching the provided orgID
|
|
// and localBucketID
|
|
func (qm *durableQueueManager) GetReplications(orgID platform.ID, localBucketID platform.ID) []platform.ID {
|
|
replications := make([]platform.ID, 0, len(qm.replicationQueues))
|
|
|
|
for _, repl := range qm.replicationQueues {
|
|
if repl.orgID == orgID && repl.localBucketID == localBucketID {
|
|
replications = append(replications, repl.id)
|
|
}
|
|
}
|
|
return replications
|
|
}
|