diff --git a/replications/internal/queue_management.go b/replications/internal/queue_management.go index dc68173828..55e20a27be 100644 --- a/replications/internal/queue_management.go +++ b/replications/internal/queue_management.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/pkg/durablequeue" @@ -15,6 +16,10 @@ import ( "go.uber.org/zap" ) +const ( + scannerAdvanceInterval = 10 * time.Second +) + type remoteWriter interface { Write([]byte) error } @@ -157,6 +162,20 @@ func (rq *replicationQueue) SendWrite() bool { return false } + advanceScanner := func() error { + if _, err = scan.Advance(); err != nil { + if err != io.EOF { + rq.logger.Error("Error in replication queue scanner", zap.Error(err)) + } + return err + } + rq.metrics.Dequeue(rq.id, rq.queue.TotalBytes()) + return nil + } + + ticker := time.NewTicker(scannerAdvanceInterval) + defer ticker.Stop() + for scan.Next() { if err := scan.Err(); err != nil { if errors.Is(err, io.EOF) { @@ -174,18 +193,20 @@ func (rq *replicationQueue) SendWrite() bool { return false } - // TODO: As a potential future optimization, Advance() could be called only if a certain amount of time has passed - // since an Advance(), a certain amount of data has been transferred, a certain number of writes, etc. to avoid an - // fsync after every remote write. - if _, err = scan.Advance(); err != nil { - if err != io.EOF { - rq.logger.Error("Error in replication queue scanner", zap.Error(err)) + // Advance the scanner periodically to prevent extended runs of local writes without updating the underlying queue + // position. + select { + case <-ticker.C: + if err := advanceScanner(); err != nil { + return false } - return false + default: } - rq.metrics.Dequeue(rq.id, rq.queue.TotalBytes()) } + if err := advanceScanner(); err != nil { + return false + } return true }