influxdb/pkg/durablequeue/scanner.go

172 lines
3.5 KiB
Go

package durablequeue
import (
"fmt"
"io"
)
type Scanner interface {
// Next returns the current block and advances the scanner to the next block.
Next() bool
// Err returns any non io.EOF error as a result of calling the Next function.
Err() error
// Bytes returns the most recent block generated by a call to Next. A new buffer
// is generated with each call to Next, so the buffer may be retained by the caller.
Bytes() []byte
// Advance moves the head pointer to the next byte slice in the queue.
// Advance is guaranteed to make forward progress and is idempotent.
Advance() (int64, error)
}
type queueScanner struct {
q *Queue
ss *segmentScanner
}
func (qs *queueScanner) Next() bool {
return qs.ss.Next()
}
func (qs *queueScanner) Err() error {
return qs.ss.Err()
}
func (qs *queueScanner) Bytes() []byte {
return qs.ss.Bytes()
}
func (qs *queueScanner) Advance() (n int64, err error) {
n, err = qs.ss.Advance()
// always advance to the next segment if the current segment presents any error
// condition, which either indicates success (io.EOF) or corruption of some kind.
if err != nil {
qs.q.mu.Lock()
defer qs.q.mu.Unlock()
// retry under lock - otherwise it is possible a write happened between getting the EOF
// and taking the queue lock.
if err == io.EOF {
n, err = qs.ss.Advance()
if err == nil {
return n, nil
}
}
// If the error was not EOF, force the segment to be trimmed
force := err != io.EOF
if trimErr := qs.q.trimHead(force); trimErr != nil {
return 0, trimErr
}
if err != io.EOF {
// We are dropping writes due to this error, so we should report it
return n, fmt.Errorf("dropped bad disk queue segment: %w", err)
}
}
return n, nil
}
type segmentScanner struct {
s *segment
pos int64
n int64
buf []byte
err error
eof bool
//TODO(SGC): consider adding backing buffer once we send writes to remote node as single array
}
var _ Scanner = (*segmentScanner)(nil)
func (l *segment) newScanner() (*segmentScanner, error) {
l.mu.Lock()
defer l.mu.Unlock()
// If we're at the end of the file, can't advance
if int64(l.pos) == l.size-footerSize {
return nil, io.EOF
}
if err := l.seekToCurrent(); err != nil {
return nil, err
}
return &segmentScanner{s: l, pos: l.pos}, nil
}
func (ss *segmentScanner) Next() bool {
ss.s.mu.Lock()
defer ss.s.mu.Unlock()
if ss.eof || ss.err != nil {
return false
}
if err := ss.s.seek(ss.pos); err != nil {
ss.setErr(err)
return false
}
for {
if int64(ss.pos) == ss.s.size-footerSize {
ss.eof = true
return false
}
ss.n++
// read the record size
sz, err := ss.s.readUint64()
if err == io.EOF {
return false
} else if err != nil {
ss.setErr(err)
return false
}
ss.pos += 8 + int64(sz)
if sz == 0 {
continue
}
if sz > uint64(ss.s.maxSize) {
ss.setErr(fmt.Errorf("record size out of range: max %d: got %d", ss.s.maxSize, sz))
return false
}
// The node processor will hold a reference to ss.buf via the Bytes method,
// so it's important to create a new slice here,
// even though it looks like we could reslice ss.buf.
ss.buf = make([]byte, sz)
if err := ss.s.readBytes(ss.buf); err != nil {
ss.setErr(err)
return false
}
return true
}
}
func (ss *segmentScanner) setErr(err error) {
ss.err = err
ss.buf = nil
}
func (ss *segmentScanner) Err() error {
return ss.err
}
func (ss *segmentScanner) Bytes() []byte {
return ss.buf
}
func (ss *segmentScanner) Advance() (int64, error) {
if ss.err != nil {
return ss.n, ss.err
}
return ss.n, ss.s.advanceTo(ss.pos)
}