172 lines
3.5 KiB
Go
172 lines
3.5 KiB
Go
package durablequeue
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
)
|
|
|
|
type Scanner interface {
|
|
// Next returns the current block and advances the scanner to the next block.
|
|
Next() bool
|
|
|
|
// Err returns any non io.EOF error as a result of calling the Next function.
|
|
Err() error
|
|
|
|
// Bytes returns the most recent block generated by a call to Next. A new buffer
|
|
// is generated with each call to Next, so the buffer may be retained by the caller.
|
|
Bytes() []byte
|
|
|
|
// Advance moves the head pointer to the next byte slice in the queue.
|
|
// Advance is guaranteed to make forward progress and is idempotent.
|
|
Advance() (int64, error)
|
|
}
|
|
|
|
type queueScanner struct {
|
|
q *Queue
|
|
ss *segmentScanner
|
|
}
|
|
|
|
func (qs *queueScanner) Next() bool {
|
|
return qs.ss.Next()
|
|
}
|
|
|
|
func (qs *queueScanner) Err() error {
|
|
return qs.ss.Err()
|
|
}
|
|
|
|
func (qs *queueScanner) Bytes() []byte {
|
|
return qs.ss.Bytes()
|
|
}
|
|
|
|
func (qs *queueScanner) Advance() (n int64, err error) {
|
|
n, err = qs.ss.Advance()
|
|
// always advance to the next segment if the current segment presents any error
|
|
// condition, which either indicates success (io.EOF) or corruption of some kind.
|
|
if err != nil {
|
|
qs.q.mu.Lock()
|
|
defer qs.q.mu.Unlock()
|
|
|
|
// retry under lock - otherwise it is possible a write happened between getting the EOF
|
|
// and taking the queue lock.
|
|
if err == io.EOF {
|
|
n, err = qs.ss.Advance()
|
|
if err == nil {
|
|
return n, nil
|
|
}
|
|
}
|
|
|
|
// If the error was not EOF, force the segment to be trimmed
|
|
force := err != io.EOF
|
|
if trimErr := qs.q.trimHead(force); trimErr != nil {
|
|
return 0, trimErr
|
|
}
|
|
if err != io.EOF {
|
|
// We are dropping writes due to this error, so we should report it
|
|
return n, fmt.Errorf("dropped bad disk queue segment: %w", err)
|
|
}
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
type segmentScanner struct {
|
|
s *segment
|
|
pos int64
|
|
n int64
|
|
buf []byte
|
|
err error
|
|
eof bool
|
|
|
|
//TODO(SGC): consider adding backing buffer once we send writes to remote node as single array
|
|
}
|
|
|
|
var _ Scanner = (*segmentScanner)(nil)
|
|
|
|
func (l *segment) newScanner() (*segmentScanner, error) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
// If we're at the end of the file, can't advance
|
|
if int64(l.pos) == l.size-footerSize {
|
|
return nil, io.EOF
|
|
}
|
|
|
|
if err := l.seekToCurrent(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &segmentScanner{s: l, pos: l.pos}, nil
|
|
}
|
|
|
|
func (ss *segmentScanner) Next() bool {
|
|
ss.s.mu.Lock()
|
|
defer ss.s.mu.Unlock()
|
|
|
|
if ss.eof || ss.err != nil {
|
|
return false
|
|
}
|
|
|
|
if err := ss.s.seek(ss.pos); err != nil {
|
|
ss.setErr(err)
|
|
return false
|
|
}
|
|
|
|
for {
|
|
if int64(ss.pos) == ss.s.size-footerSize {
|
|
ss.eof = true
|
|
return false
|
|
}
|
|
|
|
ss.n++
|
|
// read the record size
|
|
sz, err := ss.s.readUint64()
|
|
if err == io.EOF {
|
|
return false
|
|
} else if err != nil {
|
|
ss.setErr(err)
|
|
return false
|
|
}
|
|
|
|
ss.pos += 8 + int64(sz)
|
|
if sz == 0 {
|
|
continue
|
|
}
|
|
|
|
if sz > uint64(ss.s.maxSize) {
|
|
ss.setErr(fmt.Errorf("record size out of range: max %d: got %d", ss.s.maxSize, sz))
|
|
return false
|
|
}
|
|
|
|
// The node processor will hold a reference to ss.buf via the Bytes method,
|
|
// so it's important to create a new slice here,
|
|
// even though it looks like we could reslice ss.buf.
|
|
ss.buf = make([]byte, sz)
|
|
|
|
if err := ss.s.readBytes(ss.buf); err != nil {
|
|
ss.setErr(err)
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
}
|
|
|
|
func (ss *segmentScanner) setErr(err error) {
|
|
ss.err = err
|
|
ss.buf = nil
|
|
}
|
|
|
|
func (ss *segmentScanner) Err() error {
|
|
return ss.err
|
|
}
|
|
|
|
func (ss *segmentScanner) Bytes() []byte {
|
|
return ss.buf
|
|
}
|
|
|
|
func (ss *segmentScanner) Advance() (int64, error) {
|
|
if ss.err != nil {
|
|
return ss.n, ss.err
|
|
}
|
|
return ss.n, ss.s.advanceTo(ss.pos)
|
|
}
|