feat: advance queue scanner periodically instead of every remote write (#22981)
parent
e3ff434f81
commit
a7a5233432
|
@ -7,6 +7,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||||
"github.com/influxdata/influxdb/v2/pkg/durablequeue"
|
"github.com/influxdata/influxdb/v2/pkg/durablequeue"
|
||||||
|
@ -15,6 +16,10 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
scannerAdvanceInterval = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
type remoteWriter interface {
|
type remoteWriter interface {
|
||||||
Write([]byte) error
|
Write([]byte) error
|
||||||
}
|
}
|
||||||
|
@ -157,6 +162,20 @@ func (rq *replicationQueue) SendWrite() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
advanceScanner := func() error {
|
||||||
|
if _, err = scan.Advance(); err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
rq.logger.Error("Error in replication queue scanner", zap.Error(err))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
rq.metrics.Dequeue(rq.id, rq.queue.TotalBytes())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker := time.NewTicker(scannerAdvanceInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
for scan.Next() {
|
for scan.Next() {
|
||||||
if err := scan.Err(); err != nil {
|
if err := scan.Err(); err != nil {
|
||||||
if errors.Is(err, io.EOF) {
|
if errors.Is(err, io.EOF) {
|
||||||
|
@ -174,18 +193,20 @@ func (rq *replicationQueue) SendWrite() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: As a potential future optimization, Advance() could be called only if a certain amount of time has passed
|
// Advance the scanner periodically to prevent extended runs of local writes without updating the underlying queue
|
||||||
// since an Advance(), a certain amount of data has been transferred, a certain number of writes, etc. to avoid an
|
// position.
|
||||||
// fsync after every remote write.
|
select {
|
||||||
if _, err = scan.Advance(); err != nil {
|
case <-ticker.C:
|
||||||
if err != io.EOF {
|
if err := advanceScanner(); err != nil {
|
||||||
rq.logger.Error("Error in replication queue scanner", zap.Error(err))
|
return false
|
||||||
}
|
}
|
||||||
return false
|
default:
|
||||||
}
|
}
|
||||||
rq.metrics.Dequeue(rq.id, rq.queue.TotalBytes())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := advanceScanner(); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue