1194 lines
28 KiB
Go
1194 lines
28 KiB
Go
package durablequeue
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Possible errors returned by a queue.
|
|
var (
|
|
ErrNotOpen = fmt.Errorf("queue not open")
|
|
ErrQueueFull = fmt.Errorf("queue is full")
|
|
ErrQueueBlocked = fmt.Errorf("queue is blocked")
|
|
ErrSegmentFull = fmt.Errorf("segment is full")
|
|
)
|
|
|
|
const (
|
|
DefaultSegmentSize = 10 * 1024 * 1024
|
|
footerSize = 8
|
|
)
|
|
|
|
// MaxWritesPending is the number of writes that can be pending at any given time.
|
|
const MaxWritesPending = 1024
|
|
|
|
// Queue is a bounded, disk-backed, append-only type that combines Queue and
|
|
// log semantics. byte slices can be appended and read back in-order.
|
|
// The Queue maintains a pointer to the current Head
|
|
// byte slice and can re-read from the Head until it has been advanced.
|
|
//
|
|
// Internally, the Queue writes byte slices to multiple segment files so
|
|
// that disk space can be reclaimed. When a segment file is larger than
|
|
// the max segment size, a new file is created. Segments are removed
|
|
// after their Head pointer has advanced past the last entry. The first
|
|
// segment is the head, and the last segment is the tail. Reads are from
|
|
// the head segment and writes tail segment.
|
|
//
|
|
// queues can have a max size configured such that when the size of all
|
|
// segments on disk exceeds the size, write will fail.
|
|
//
|
|
// ┌─────┐
|
|
// │Head │
|
|
// ├─────┘
|
|
// │
|
|
// ▼
|
|
// ┌─────────────────┐ ┌─────────────────┐┌─────────────────┐
|
|
// │Segment 1 - 10MB │ │Segment 2 - 10MB ││Segment 3 - 10MB │
|
|
// └─────────────────┘ └─────────────────┘└─────────────────┘
|
|
// ▲
|
|
// │
|
|
// │
|
|
// ┌─────┐
|
|
// │Tail │
|
|
// └─────┘
|
|
type Queue struct {
|
|
mu sync.RWMutex
|
|
|
|
// Directory to create segments
|
|
dir string
|
|
|
|
// The head and tail segments. Reads are from the beginning of head,
|
|
// writes are appended to the tail.
|
|
head, tail *segment
|
|
|
|
// The maximum size in bytes of a segment file before a new one should be created
|
|
maxSegmentSize int64
|
|
|
|
// The maximum size allowed in bytes of all segments before writes will return
|
|
// an error
|
|
maxSize int64
|
|
queueTotalSize *SharedCount
|
|
|
|
// The segments that exist on disk
|
|
segments segments
|
|
// verifyBlockFn is used to verify a block within a segment contains valid data.
|
|
verifyBlockFn func([]byte) error
|
|
|
|
// Channel used for throttling append requests.
|
|
appendCh chan struct{}
|
|
|
|
// scratch is a temporary in-memory space for staging writes
|
|
scratch bytes.Buffer
|
|
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// SharedCount manages an integer value, which can be read/written concurrently.
|
|
type SharedCount struct {
|
|
value int64
|
|
}
|
|
|
|
// Add adds delta to the counter value.
|
|
func (sc *SharedCount) Add(delta int64) {
|
|
atomic.AddInt64(&sc.value, delta)
|
|
}
|
|
|
|
// Value returns the current value value.
|
|
func (sc *SharedCount) Value() int64 {
|
|
return atomic.LoadInt64(&sc.value)
|
|
}
|
|
|
|
type QueuePos struct {
|
|
Head string
|
|
Tail string
|
|
}
|
|
|
|
type segments []*segment
|
|
|
|
func (a segments) Len() int { return len(a) }
|
|
func (a segments) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
func (a segments) Less(i, j int) bool { return a[i].id < a[j].id }
|
|
|
|
// NewQueue create a Queue that will store segments in dir and that will consume no more than maxSize on disk.
|
|
func NewQueue(dir string, maxSize int64, maxSegmentSize int64, queueTotalSize *SharedCount, depth int, verifyBlockFn func([]byte) error) (*Queue, error) {
|
|
if maxSize < 2*maxSegmentSize {
|
|
return nil, fmt.Errorf("max queue size %d too small: must be at least twice the max segment size %d", maxSize, maxSegmentSize)
|
|
}
|
|
|
|
return &Queue{
|
|
dir: dir,
|
|
maxSegmentSize: maxSegmentSize,
|
|
maxSize: maxSize,
|
|
queueTotalSize: queueTotalSize,
|
|
segments: segments{},
|
|
appendCh: make(chan struct{}, depth),
|
|
logger: zap.NewNop(),
|
|
verifyBlockFn: verifyBlockFn,
|
|
}, nil
|
|
}
|
|
|
|
// WithLogger sets the internal logger to the logger passed in.
|
|
func (l *Queue) WithLogger(log *zap.Logger) {
|
|
l.logger = log
|
|
}
|
|
|
|
// SetMaxSize updates the max queue size to the passed-in value.
|
|
//
|
|
// Max queue size must be at least twice the current max segment size, otherwise an error will be returned.
|
|
//
|
|
// If the new value is smaller than the amount of data currently in the queue,
|
|
// writes will be rejected until the queue drains to below the new maximum.
|
|
func (l *Queue) SetMaxSize(maxSize int64) error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if maxSize < 2*l.maxSegmentSize {
|
|
return fmt.Errorf("queue size %d too small: must be at least %d bytes", maxSize, 2*l.maxSegmentSize)
|
|
}
|
|
|
|
l.maxSize = maxSize
|
|
return nil
|
|
}
|
|
|
|
// Open opens the queue for reading and writing.
|
|
func (l *Queue) Open() error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
segments, err := l.loadSegments()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
l.segments = segments
|
|
|
|
if len(l.segments) == 0 {
|
|
if err := l.addSegment(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
l.head = l.segments[0]
|
|
l.tail = l.segments[len(l.segments)-1]
|
|
|
|
// If the Head has been fully advanced and the segment size is modified,
|
|
// existing segments an get stuck and never allow clients to advance further.
|
|
// This advances the segment if the current Head is already at the end.
|
|
_, err = l.head.current()
|
|
if err == io.EOF {
|
|
return l.trimHead(false)
|
|
}
|
|
|
|
l.queueTotalSize.Add(l.DiskUsage())
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close stops the queue for reading and writing.
|
|
func (l *Queue) Close() error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
for _, s := range l.segments {
|
|
if err := s.close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
l.head = nil
|
|
l.tail = nil
|
|
l.segments = nil
|
|
return nil
|
|
}
|
|
|
|
// Remove removes all underlying file-based resources for the queue.
|
|
// It is an error to call this on an open queue.
|
|
func (l *Queue) Remove() error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if l.head != nil || l.tail != nil || l.segments != nil {
|
|
return fmt.Errorf("queue is open")
|
|
}
|
|
|
|
return os.RemoveAll(l.dir)
|
|
}
|
|
|
|
// RemoveSegments removes all segments for the queue.
|
|
// It is an error to call this on an open queue.
|
|
func (l *Queue) RemoveSegments() error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if l.head != nil || l.tail != nil || l.segments != nil {
|
|
return fmt.Errorf("queue is open")
|
|
}
|
|
|
|
files, err := os.ReadDir(l.dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, segment := range files {
|
|
// Segments should be files. Skip anything that is a dir.
|
|
if segment.IsDir() {
|
|
continue
|
|
}
|
|
|
|
// Segments file names are all numeric
|
|
_, err := strconv.ParseUint(segment.Name(), 10, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
path := filepath.Join(l.dir, segment.Name())
|
|
if err := os.Remove(path); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetMaxSegmentSize updates the max segment size for new and existing (tail) segments.
|
|
//
|
|
// The new segment size must be less than half the current max queue size, otherwise an error will be returned.
|
|
func (l *Queue) SetMaxSegmentSize(size int64) error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if 2*size > l.maxSize {
|
|
return fmt.Errorf("segment size %d is too large: must be at most half of max queue size %d", size, l.maxSize)
|
|
}
|
|
|
|
l.maxSegmentSize = size
|
|
|
|
for _, s := range l.segments {
|
|
s.SetMaxSegmentSize(size)
|
|
}
|
|
|
|
if l.tail.diskUsage() >= l.maxSegmentSize {
|
|
if err := l.addSegment(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (l *Queue) PurgeOlderThan(when time.Time) error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if len(l.segments) == 0 {
|
|
return nil
|
|
}
|
|
|
|
cutoff := when.Truncate(time.Second)
|
|
for {
|
|
mod, err := l.head.lastModified()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if mod.After(cutoff) || mod.Equal(cutoff) {
|
|
return nil
|
|
}
|
|
|
|
// If this is the last segment, first append a new one allowing
|
|
// trimming to proceed.
|
|
if len(l.segments) == 1 {
|
|
if err := l.addSegment(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := l.trimHead(false); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// LastModified returns the last time the queue was modified.
|
|
func (l *Queue) LastModified() (time.Time, error) {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
|
|
if l.tail != nil {
|
|
return l.tail.lastModified()
|
|
}
|
|
return time.Time{}.UTC(), nil
|
|
}
|
|
|
|
func (l *Queue) Position() (*QueuePos, error) {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
|
|
qp := &QueuePos{}
|
|
if l.head != nil {
|
|
qp.Head = fmt.Sprintf("%s:%d", l.head.path, l.head.pos)
|
|
}
|
|
if l.tail != nil {
|
|
qp.Tail = fmt.Sprintf("%s:%d", l.tail.path, l.tail.filePos())
|
|
}
|
|
return qp, nil
|
|
}
|
|
|
|
// Empty returns whether the queue's underlying segments are empty.
|
|
func (l *Queue) Empty() bool {
|
|
l.mu.RLock()
|
|
empty := l.tail.empty()
|
|
l.mu.RUnlock()
|
|
return empty
|
|
}
|
|
|
|
// TotalBytes returns the number of bytes of data remaining in the queue.
|
|
func (l *Queue) TotalBytes() int64 {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
var n int64
|
|
for _, s := range l.segments {
|
|
n += s.totalBytes()
|
|
}
|
|
return n
|
|
}
|
|
|
|
// Dir returns the directory associated with the queue.
|
|
func (l *Queue) Dir() string {
|
|
return l.dir
|
|
}
|
|
|
|
// DiskUsage returns the total size on disk used by the Queue.
|
|
func (l *Queue) DiskUsage() int64 {
|
|
var size int64
|
|
for _, s := range l.segments {
|
|
size += s.diskUsage()
|
|
}
|
|
return size
|
|
}
|
|
|
|
// addSegment creates a new empty segment file.
|
|
func (l *Queue) addSegment() error {
|
|
nextID, err := l.nextSegmentID()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
segment, err := newSegment(filepath.Join(l.dir, strconv.FormatUint(nextID, 10)), l.maxSegmentSize, l.verifyBlockFn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
l.tail = segment
|
|
l.segments = append(l.segments, segment)
|
|
return nil
|
|
}
|
|
|
|
// loadSegments loads all segments on disk.
|
|
func (l *Queue) loadSegments() (segments, error) {
|
|
var ss segments
|
|
|
|
files, err := os.ReadDir(l.dir)
|
|
if err != nil {
|
|
return ss, err
|
|
}
|
|
|
|
for _, segment := range files {
|
|
// Segments should be files. Skip anything that is a dir.
|
|
if segment.IsDir() {
|
|
continue
|
|
}
|
|
|
|
// Segments file names are all numeric
|
|
_, err := strconv.ParseUint(segment.Name(), 10, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
path := filepath.Join(l.dir, segment.Name())
|
|
l.logger.Info("Loading", zap.String("path", path))
|
|
segment, err := newSegment(path, l.maxSegmentSize, l.verifyBlockFn)
|
|
if err != nil {
|
|
return ss, err
|
|
}
|
|
|
|
// Segment repair can leave files that have no data to process. If this happens,
|
|
// the queue can get stuck. We need to remove any empty segments to prevent this.
|
|
if segment.empty() {
|
|
if err := segment.close(); err != nil {
|
|
return ss, err
|
|
}
|
|
if err := os.Remove(segment.path); err != nil {
|
|
return ss, err
|
|
}
|
|
continue
|
|
}
|
|
|
|
ss = append(ss, segment)
|
|
}
|
|
sort.Sort(ss)
|
|
|
|
return ss, nil
|
|
}
|
|
|
|
// nextSegmentID returns the next segment ID that is free.
|
|
func (l *Queue) nextSegmentID() (uint64, error) {
|
|
segments, err := os.ReadDir(l.dir)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var maxID uint64
|
|
for _, segment := range segments {
|
|
// Segments should be files. Skip anything that is not a dir.
|
|
if segment.IsDir() {
|
|
continue
|
|
}
|
|
|
|
// Segments file names are all numeric
|
|
segmentID, err := strconv.ParseUint(segment.Name(), 10, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
if segmentID > maxID {
|
|
maxID = segmentID
|
|
}
|
|
}
|
|
|
|
return maxID + 1, nil
|
|
}
|
|
|
|
// TotalSegments determines how many segments the current Queue is
|
|
// utilising. Empty segments at the end of the Queue are not counted.
|
|
func (l *Queue) TotalSegments() int {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
n := len(l.segments)
|
|
|
|
// Check last segment's size and if empty, ignore it.
|
|
if n > 0 && l.segments[n-1].empty() {
|
|
n--
|
|
}
|
|
return n
|
|
}
|
|
|
|
// Append appends a byte slice to the end of the queue.
|
|
func (l *Queue) Append(b []byte) error {
|
|
// Only allow append if there aren't too many concurrent requests.
|
|
select {
|
|
case l.appendCh <- struct{}{}:
|
|
defer func() { <-l.appendCh }()
|
|
default:
|
|
return ErrQueueBlocked
|
|
}
|
|
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if l.tail == nil {
|
|
return ErrNotOpen
|
|
}
|
|
|
|
if l.queueTotalSize.Value()+int64(len(b)) > l.maxSize {
|
|
return ErrQueueFull
|
|
}
|
|
|
|
// Append the entry to the tail, if the segment is full,
|
|
// try to create new segment and retry the append
|
|
bytesWritten, err := l.tail.append(b, &l.scratch)
|
|
if err == ErrSegmentFull {
|
|
if err := l.addSegment(); err != nil {
|
|
return err
|
|
}
|
|
bytesWritten, err = l.tail.append(b, &l.scratch)
|
|
}
|
|
|
|
if err == nil {
|
|
l.queueTotalSize.Add(bytesWritten)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Current returns the current byte slice at the Head of the queue.
|
|
func (l *Queue) Current() ([]byte, error) {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
if l.head == nil {
|
|
return nil, ErrNotOpen
|
|
}
|
|
|
|
return l.head.current()
|
|
}
|
|
|
|
// Peek returns the next n byte slices at the Head of the queue.
|
|
func (l *Queue) PeekN(n int) ([][]byte, error) {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
if l.head == nil {
|
|
return nil, ErrNotOpen
|
|
}
|
|
|
|
return l.head.peek(n)
|
|
}
|
|
|
|
func (l *Queue) NewScanner() (Scanner, error) {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
if l.head == nil {
|
|
return nil, ErrNotOpen
|
|
}
|
|
|
|
ss, err := l.head.newScanner()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &queueScanner{q: l, ss: ss}, nil
|
|
}
|
|
|
|
// Advance moves the Head point to the next byte slice in the queue.
|
|
func (l *Queue) Advance() error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
if l.head == nil {
|
|
return ErrNotOpen
|
|
}
|
|
|
|
err := l.head.advance()
|
|
if err == io.EOF {
|
|
if err := l.trimHead(false); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (l *Queue) trimHead(force bool) error {
|
|
// If there is only one segment, but it's full, add a new segment so
|
|
// so the Head segment can be trimmed.
|
|
if len(l.segments) == 1 && l.head.full() || force {
|
|
if err := l.addSegment(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
var bytesDeleted int64
|
|
|
|
if len(l.segments) > 1 {
|
|
l.segments = l.segments[1:]
|
|
|
|
bytesDeleted = l.head.diskUsage()
|
|
|
|
err := l.head.close()
|
|
if err != nil {
|
|
l.logger.Info("Failed to close segment file.", zap.Error(err), zap.String("path", l.head.path))
|
|
}
|
|
|
|
err = os.Remove(l.head.path)
|
|
if err != nil {
|
|
l.logger.Info("Failed to remove segment file.", zap.Error(err), zap.String("path", l.head.path))
|
|
}
|
|
l.head = l.segments[0]
|
|
}
|
|
|
|
l.queueTotalSize.Add(-bytesDeleted)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Segment is a Queue using a single file. The structure of a segment is a series
|
|
// lengths + block with a single footer point to the position in the segment of the
|
|
// current Head block.
|
|
//
|
|
// ┌──────────────────────────┐ ┌──────────────────────────┐ ┌────────────┐
|
|
// │ Block 1 │ │ Block 2 │ │ Footer │
|
|
// └──────────────────────────┘ └──────────────────────────┘ └────────────┘
|
|
// ┌────────────┐┌────────────┐ ┌────────────┐┌────────────┐ ┌────────────┐
|
|
// │Block 1 Len ││Block 1 Body│ │Block 2 Len ││Block 2 Body│ │Head Offset │
|
|
// │ 8 bytes ││ N bytes │ │ 8 bytes ││ N bytes │ │ 8 bytes │
|
|
// └────────────┘└────────────┘ └────────────┘└────────────┘ └────────────┘
|
|
//
|
|
// The footer holds the pointer to the Head entry at the end of the segment to allow writes
|
|
// to seek to the end and write sequentially (vs having to seek back to the beginning of
|
|
// the segment to update the Head pointer). Reads must seek to the end then back into the
|
|
// segment offset stored in the footer.
|
|
//
|
|
// Segments store arbitrary byte slices and leave the serialization to the caller. Segments
|
|
// are created with a max size and will block writes when the segment is full.
|
|
type segment struct {
|
|
mu sync.RWMutex
|
|
size int64 // Size of the entire segment file, including previously read blocks and the footer.
|
|
maxSize int64 // Maximum size of the segment file.
|
|
pos int64 // Position (offset) of current block.
|
|
file *os.File // Underlying file representing the segment.
|
|
|
|
// verifyBlockFn is used to verify a block within a segment contains valid data.
|
|
verifyBlockFn func([]byte) error
|
|
|
|
path string // Path of underlying file as passed to newSegment.
|
|
id uint64 // Segment ID as encoded in the file name of the segment.
|
|
}
|
|
|
|
func newSegment(path string, maxSize int64, verifyBlockFn func([]byte) error) (*segment, error) {
|
|
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
id, err := strconv.ParseUint(filepath.Base(f.Name()), 10, 64)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stats, err := os.Stat(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If the segment file is larger than the default segment size then we
|
|
// should consider a size under the file size valid.
|
|
if maxSize < stats.Size() {
|
|
maxSize = stats.Size()
|
|
}
|
|
|
|
s := &segment{
|
|
id: id,
|
|
file: f,
|
|
path: path,
|
|
size: stats.Size(),
|
|
maxSize: maxSize,
|
|
verifyBlockFn: verifyBlockFn,
|
|
}
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if err := s.open(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func (l *segment) open() error {
|
|
// If it's a new segment then write the location of the current record in this segment
|
|
if l.size == 0 {
|
|
l.pos = 0
|
|
|
|
if err := l.writeUint64(uint64(l.pos)); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := l.file.Sync(); err != nil {
|
|
return err
|
|
}
|
|
|
|
l.size = footerSize
|
|
|
|
return nil
|
|
}
|
|
|
|
// Existing segment so read the current position and the size of the current block
|
|
if err := l.seekEnd(-footerSize); err != nil {
|
|
return err
|
|
}
|
|
|
|
pos, err := l.readUint64()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check if the segment is corrupted. A segment is corrupted if the position
|
|
// value doesn't point to a valid location in the segment.
|
|
if pos > uint64(l.size)-footerSize {
|
|
if pos, err = l.repair(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Move to the part of the segment where the next block to read is.
|
|
// If we had to repair the segment, this will be the beginning of the
|
|
// segment.
|
|
l.pos = int64(pos)
|
|
if err := l.seekToCurrent(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If we're at the end of the segment, we're done.
|
|
if l.pos >= l.size-footerSize {
|
|
return nil
|
|
}
|
|
|
|
// Read the current block size.
|
|
currentSize, err := l.readUint64()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Is the size reported larger than what could possibly left? If so, it's corrupted.
|
|
if int64(currentSize) > l.size-footerSize-l.pos || int64(currentSize) < 0 {
|
|
if _, err = l.repair(); err != nil {
|
|
return err
|
|
}
|
|
return l.open()
|
|
}
|
|
|
|
// Extract the block data.
|
|
block := make([]byte, int64(currentSize))
|
|
if err := l.readBytes(block); err != nil {
|
|
if _, err = l.repair(); err != nil {
|
|
return err
|
|
}
|
|
return l.open()
|
|
}
|
|
|
|
// Seek back to the beginning of the block data.
|
|
if err := l.seek(l.pos + 8); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Verify the block data.
|
|
if err := l.verifyBlockFn(block); err != nil {
|
|
// Verification of the block failed... This means we need to
|
|
// truncate the segment.
|
|
if err = l.file.Truncate(l.pos); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := l.seek(l.pos); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Start from the beginning of the segment again.
|
|
// TODO(edd): This could be improved to point at the last block in
|
|
// the segment...
|
|
if err = l.writeUint64(0); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = l.file.Sync(); err != nil {
|
|
return err
|
|
}
|
|
l.size = l.pos + footerSize
|
|
|
|
// re-open the segment.
|
|
return l.open()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// full returns true if the segment can no longer accept writes.
|
|
func (l *segment) full() bool {
|
|
l.mu.RLock()
|
|
b := l.size >= l.maxSize
|
|
l.mu.RUnlock()
|
|
return b
|
|
}
|
|
|
|
// repair fixes a corrupted segment.
|
|
//
|
|
// A segment is either corrupted within a block, or the eight byte position
|
|
// value in the footer is itself corrupted (more unlikely).
|
|
//
|
|
// A corrupted segment is corrected by walking the segment until the corrupted
|
|
// block is located, which is then truncated. Regardless of which way the
|
|
// segment is corrupted, a new position pointing to the beginning of the
|
|
// segment, is written into the footer.
|
|
//
|
|
// repair returns the new position value that the segment should continue to be
|
|
// processed from.
|
|
//
|
|
// Note: if a block has been corrupted internally, e.g., due to a bit flip,
|
|
// repair will not be able to detect this.
|
|
func (l *segment) repair() (pos uint64, err error) {
|
|
// Seek to beginning of segment.
|
|
if err = l.seek(0); err != nil {
|
|
return pos, err
|
|
}
|
|
|
|
var (
|
|
recordSize uint64
|
|
offset int64
|
|
truncate bool
|
|
)
|
|
|
|
// Seek through each block in the segment until we have either read up to
|
|
// the footer, or we reach the end of the segment prematurely.
|
|
for {
|
|
offset = l.filePos()
|
|
|
|
if offset == l.size-footerSize {
|
|
// Segment looks good as we've successfully reached the end. Segment
|
|
// position in footer must be bad. This is a very unlikely case,
|
|
// since it means only the last eight bytes of an otherwise
|
|
// acceptable segment were corrupted.
|
|
break
|
|
}
|
|
|
|
// Read the record size.
|
|
if recordSize, err = l.readUint64(); err != nil {
|
|
truncate = true
|
|
break
|
|
}
|
|
|
|
// Skip the rest of the record. If we go beyond the end of the segment,
|
|
// or we hit an error, then we will truncate.
|
|
if _, err = l.file.Seek(int64(recordSize), io.SeekCurrent); err != nil || l.filePos() > l.size-footerSize {
|
|
truncate = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if truncate {
|
|
// We reached the end of the segment before we were supposed to, which
|
|
// means the last block is short. Truncate the corrupted last block
|
|
// onwards.
|
|
if err = l.file.Truncate(offset); err != nil {
|
|
return pos, err
|
|
}
|
|
}
|
|
|
|
// Set the position as the beginning of the segment, so that the entire
|
|
// segment will be replayed.
|
|
if err = l.seek(offset); err != nil {
|
|
return pos, err
|
|
}
|
|
|
|
if err = l.writeUint64(pos); err != nil {
|
|
return pos, err
|
|
}
|
|
|
|
if err = l.file.Sync(); err != nil {
|
|
return pos, err
|
|
}
|
|
|
|
l.size = offset + 8
|
|
return pos, err // Current implementation always returns 0 position.
|
|
}
|
|
|
|
// append adds byte slice to the end of segment.
|
|
func (l *segment) append(b []byte, scratch *bytes.Buffer) (int64, error) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if l.file == nil {
|
|
return 0, ErrNotOpen
|
|
}
|
|
|
|
if l.size > l.maxSize {
|
|
return 0, ErrSegmentFull
|
|
}
|
|
|
|
if err := l.seekEnd(-footerSize); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// TODO(SGC): error condition: (len(b) + l.size) > l.maxSize == true; scanner.Next will fail reading last block and get stuck
|
|
|
|
// If the size of this block is over the max size of the file,
|
|
// update the max file size so we don't get an error indicating
|
|
// the size is invalid when reading it back.
|
|
l64 := int64(len(b))
|
|
if l64 > l.maxSize {
|
|
l.maxSize = l64
|
|
}
|
|
|
|
// Construct the segment entry in memory first so it can be
|
|
// written to file atomically.
|
|
scratch.Reset()
|
|
|
|
var buf [8]byte
|
|
binary.BigEndian.PutUint64(buf[:], uint64(len(b)))
|
|
if _, err := scratch.Write(buf[:]); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if _, err := scratch.Write(b); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
binary.BigEndian.PutUint64(buf[:], uint64(l.pos))
|
|
if _, err := scratch.Write(buf[:]); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Write the segment entry to disk.
|
|
if err := l.writeBytes(scratch.Bytes()); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if err := l.file.Sync(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
bytesWritten := int64(len(b)) + 8 // uint64 for length
|
|
l.size += bytesWritten
|
|
|
|
return bytesWritten, nil
|
|
}
|
|
|
|
// empty returns whether there are any remaining blocks to be read from the segment.
|
|
func (l *segment) empty() bool {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return int64(l.pos) == l.size-footerSize
|
|
}
|
|
|
|
// current returns the byte slice that the current segment points to.
|
|
func (l *segment) current() ([]byte, error) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if int64(l.pos) == l.size-footerSize {
|
|
return nil, io.EOF
|
|
}
|
|
|
|
if err := l.seekToCurrent(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// read the record size
|
|
sz, err := l.readUint64()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if sz > uint64(l.maxSize) {
|
|
return nil, fmt.Errorf("record size out of range: max %d: got %d", l.maxSize, sz)
|
|
}
|
|
|
|
b := make([]byte, sz)
|
|
if err := l.readBytes(b); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return b, nil
|
|
}
|
|
|
|
func (l *segment) peek(n int) ([][]byte, error) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if int64(l.pos) == l.size-footerSize {
|
|
return nil, io.EOF
|
|
}
|
|
|
|
if err := l.seekToCurrent(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var blocks [][]byte
|
|
pos := l.pos
|
|
for i := 0; i < n; i++ {
|
|
|
|
if int64(pos) == l.size-footerSize {
|
|
return blocks, nil
|
|
}
|
|
|
|
// read the record size
|
|
sz, err := l.readUint64()
|
|
if err == io.EOF {
|
|
return blocks, nil
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
pos += 8
|
|
|
|
if sz == 0 {
|
|
continue
|
|
}
|
|
|
|
if sz > uint64(l.maxSize) {
|
|
return nil, fmt.Errorf("record size out of range: max %d: got %d", l.maxSize, sz)
|
|
}
|
|
|
|
pos += int64(sz)
|
|
|
|
b := make([]byte, sz)
|
|
if err := l.readBytes(b); err != nil {
|
|
return nil, err
|
|
}
|
|
blocks = append(blocks, b)
|
|
}
|
|
return blocks, nil
|
|
}
|
|
|
|
// advance advances the current value pointer.
|
|
//
|
|
// Usually a scanner should be used instead of calling advance
|
|
func (l *segment) advance() error {
|
|
if err := l.seekToCurrent(); err != nil {
|
|
return err
|
|
}
|
|
sz, err := l.readUint64()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
currentSize := int64(sz)
|
|
return l.advanceTo(l.pos + currentSize + 8)
|
|
}
|
|
|
|
// advanceTo advances the segment to the position specified by pos
|
|
func (l *segment) advanceTo(pos int64) error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if l.file == nil {
|
|
return ErrNotOpen
|
|
}
|
|
|
|
if pos < l.pos {
|
|
return fmt.Errorf("attempt to unread queue from %d to %d", l.pos, pos)
|
|
}
|
|
|
|
l.pos = pos
|
|
|
|
// If we're attempting to move beyond the end of the file, can't advance
|
|
if int64(pos) > l.size-footerSize {
|
|
return io.EOF
|
|
}
|
|
|
|
if err := l.seekEnd(-footerSize); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := l.writeUint64(uint64(pos)); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := l.file.Sync(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := l.seekToCurrent(); err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err := l.readUint64()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if int64(l.pos) == l.size-footerSize {
|
|
return io.EOF
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// totalBytes returns the number of bytes remaining in the segment file, excluding the footer.
|
|
func (l *segment) totalBytes() (n int64) {
|
|
l.mu.RLock()
|
|
n = l.size - int64(l.pos) - footerSize
|
|
l.mu.RUnlock()
|
|
return
|
|
}
|
|
|
|
func (l *segment) close() error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
err := l.file.Close()
|
|
l.file = nil
|
|
return err
|
|
}
|
|
|
|
func (l *segment) lastModified() (time.Time, error) {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
|
|
if l.file == nil {
|
|
return time.Time{}, ErrNotOpen
|
|
}
|
|
|
|
stats, err := os.Stat(l.file.Name())
|
|
if err != nil {
|
|
return time.Time{}, err
|
|
}
|
|
return stats.ModTime().UTC(), nil
|
|
}
|
|
|
|
func (l *segment) diskUsage() int64 {
|
|
l.mu.RLock()
|
|
defer l.mu.RUnlock()
|
|
return l.size
|
|
}
|
|
|
|
func (l *segment) SetMaxSegmentSize(size int64) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
l.maxSize = size
|
|
}
|
|
|
|
func (l *segment) seekToCurrent() error {
|
|
return l.seek(int64(l.pos))
|
|
}
|
|
|
|
func (l *segment) seek(pos int64) error {
|
|
n, err := l.file.Seek(pos, io.SeekStart)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if n != pos {
|
|
return fmt.Errorf("bad seek. exp %v, got %v", pos, n)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (l *segment) seekEnd(pos int64) error {
|
|
_, err := l.file.Seek(pos, io.SeekEnd)
|
|
return err
|
|
}
|
|
|
|
func (l *segment) filePos() int64 {
|
|
n, _ := l.file.Seek(0, io.SeekCurrent)
|
|
return n
|
|
}
|
|
|
|
func (l *segment) readUint64() (uint64, error) {
|
|
var b [8]byte
|
|
if err := l.readBytes(b[:]); err != nil {
|
|
return 0, err
|
|
}
|
|
return binary.BigEndian.Uint64(b[:]), nil
|
|
}
|
|
|
|
func (l *segment) writeUint64(sz uint64) error {
|
|
var buf [8]byte
|
|
binary.BigEndian.PutUint64(buf[:], sz)
|
|
return l.writeBytes(buf[:])
|
|
}
|
|
|
|
func (l *segment) writeBytes(b []byte) error {
|
|
n, err := l.file.Write(b)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if n != len(b) {
|
|
return fmt.Errorf("short write. got %d, exp %d", n, len(b))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (l *segment) readBytes(b []byte) error {
|
|
n, err := l.file.Read(b)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if n != len(b) {
|
|
return fmt.Errorf("bad read. exp %v, got %v", len(b), n)
|
|
}
|
|
return nil
|
|
}
|