fix: advance replications queue after successful remote writes (#22967)

* fix: advance replications queue after successful remote writes to prevent data duplication on errors

* fix: loop on sendwrite

* chore: remove flaky test

* chore: add TODO about future optimization
pull/22981/head
William Baker 2021-12-08 12:52:46 -06:00 committed by GitHub
parent 39eeb3e456
commit e5cbd279ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 19 additions and 24 deletions

View File

@ -151,46 +151,41 @@ func (rq *replicationQueue) SendWrite() bool {
// the Scanner object that we don't know how to handle.
scan, err := rq.queue.NewScanner()
if err != nil {
if err != io.EOF {
if !errors.Is(err, io.EOF) {
rq.logger.Error("Error creating replications queue scanner", zap.Error(err))
}
return false
}
for scan.Next() {
// An io.EOF error here indicates that there is no more data
// left to process, and is an expected error.
if scan.Err() == io.EOF {
break
}
// Any other here indicates a problem, so we log the error and
// drop the data with a call to scan.Advance() later.
if scan.Err() != nil {
if err := scan.Err(); err != nil {
if errors.Is(err, io.EOF) {
// An io.EOF error here indicates that there is no more data left to process, and is an expected error.
return false
}
// Any other error here indicates a problem reading the data from the queue, so we log the error and drop the data
// with a call to scan.Advance() later.
rq.logger.Info("Segment read error.", zap.Error(scan.Err()))
// TODO: Add metrics collection for dropped data here.
break
}
// An error here indicates an unhandlable error. Data is not corrupt, and
// the remote write is not retryable.
if err = rq.remoteWriter.Write(scan.Bytes()); err != nil {
// An error here indicates an unhandleable remote write error. The scanner will not be advanced.
rq.logger.Error("Error in replication stream", zap.Error(err))
return false
}
}
// Update metrics after the call to scan.Advance()
defer func() {
rq.metrics.Dequeue(rq.id, rq.queue.TotalBytes())
}()
if _, err = scan.Advance(); err != nil {
if err != io.EOF {
rq.logger.Error("Error in replication queue scanner", zap.Error(err))
// TODO: As a potential future optimization, Advance() could be called only if a certain amount of time has passed
// since an Advance(), a certain amount of data has been transferred, a certain number of writes, etc. to avoid an
// fsync after every remote write.
if _, err = scan.Advance(); err != nil {
if err != io.EOF {
rq.logger.Error("Error in replication queue scanner", zap.Error(err))
}
return false
}
return false
rq.metrics.Dequeue(rq.id, rq.queue.TotalBytes())
}
return true
}