2015-11-18 06:08:29 +00:00
|
|
|
package tsm1
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"log"
|
2015-11-21 22:50:28 +00:00
|
|
|
"math"
|
2015-11-18 06:08:29 +00:00
|
|
|
"os"
|
|
|
|
"path/filepath"
|
|
|
|
"sort"
|
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
2015-11-21 22:50:28 +00:00
|
|
|
"time"
|
2015-11-18 06:08:29 +00:00
|
|
|
|
|
|
|
"github.com/golang/snappy"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2015-11-25 04:24:13 +00:00
|
|
|
// DefaultSegmentSize of 10MB is the size at which segment files will be rolled over
|
2015-11-21 22:50:28 +00:00
|
|
|
DefaultSegmentSize = 10 * 1024 * 1024
|
2015-11-18 06:08:29 +00:00
|
|
|
|
|
|
|
// FileExtension is the file extension we expect for wal segments
|
|
|
|
WALFileExtension = "wal"
|
|
|
|
|
|
|
|
WALFilePrefix = "_"
|
|
|
|
|
2015-11-19 06:15:44 +00:00
|
|
|
defaultBufLen = 1024 << 10 // 1MB (sized for batches of 5000 points)
|
2015-11-21 22:50:28 +00:00
|
|
|
|
|
|
|
float64EntryType = 1
|
|
|
|
int64EntryType = 2
|
|
|
|
boolEntryType = 3
|
|
|
|
stringEntryType = 4
|
2015-11-18 06:08:29 +00:00
|
|
|
)
|
|
|
|
|
2015-11-25 20:51:56 +00:00
|
|
|
// SegmentInfo represents metadata about a segment.
|
|
|
|
type SegmentInfo struct {
|
|
|
|
name string
|
|
|
|
id int
|
|
|
|
}
|
|
|
|
|
2015-12-06 23:50:39 +00:00
|
|
|
// WalEntryType is a byte written to a wal segment file that indicates what the following compressed block contains
|
|
|
|
type WalEntryType byte
|
2015-11-18 06:08:29 +00:00
|
|
|
|
|
|
|
const (
|
2015-12-06 23:50:39 +00:00
|
|
|
WriteWALEntryType WalEntryType = 0x01
|
|
|
|
DeleteWALEntryType WalEntryType = 0x02
|
2015-11-18 06:08:29 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
var ErrWALClosed = fmt.Errorf("WAL closed")
|
|
|
|
|
|
|
|
type WAL struct {
|
2015-12-06 23:50:39 +00:00
|
|
|
mu sync.RWMutex
|
|
|
|
lastWriteTime time.Time
|
2015-11-18 22:42:48 +00:00
|
|
|
|
2015-11-18 06:08:29 +00:00
|
|
|
path string
|
|
|
|
|
|
|
|
// write variables
|
2015-11-18 22:42:48 +00:00
|
|
|
currentSegmentID int
|
|
|
|
currentSegmentWriter *WALSegmentWriter
|
2015-11-18 06:08:29 +00:00
|
|
|
|
|
|
|
// cache and flush variables
|
|
|
|
closing chan struct{}
|
|
|
|
|
|
|
|
// WALOutput is the writer used by the logger.
|
|
|
|
LogOutput io.Writer
|
|
|
|
logger *log.Logger
|
|
|
|
|
|
|
|
// SegmentSize is the file size at which a segment file will be rotated
|
|
|
|
SegmentSize int
|
|
|
|
|
|
|
|
// LoggingEnabled specifies if detailed logs should be output
|
|
|
|
LoggingEnabled bool
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewWAL(path string) *WAL {
|
|
|
|
return &WAL{
|
|
|
|
path: path,
|
|
|
|
|
|
|
|
// these options should be overriden by any options in the config
|
2015-11-18 22:42:48 +00:00
|
|
|
LogOutput: os.Stderr,
|
|
|
|
SegmentSize: DefaultSegmentSize,
|
2015-12-06 23:50:39 +00:00
|
|
|
logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
|
2015-11-18 22:42:48 +00:00
|
|
|
closing: make(chan struct{}),
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Path returns the path the log was initialized with.
|
2015-11-18 22:42:48 +00:00
|
|
|
func (l *WAL) Path() string {
|
|
|
|
l.mu.RLock()
|
|
|
|
defer l.mu.RUnlock()
|
|
|
|
return l.path
|
|
|
|
}
|
2015-11-18 06:08:29 +00:00
|
|
|
|
|
|
|
// Open opens and initializes the Log. Will recover from previous unclosed shutdowns
|
|
|
|
func (l *WAL) Open() error {
|
2015-11-18 22:42:48 +00:00
|
|
|
l.mu.Lock()
|
|
|
|
defer l.mu.Unlock()
|
|
|
|
|
2015-11-18 06:08:29 +00:00
|
|
|
if l.LoggingEnabled {
|
2015-12-06 23:50:39 +00:00
|
|
|
l.logger.Printf("tsm1 WAL starting with %d segment size\n", l.SegmentSize)
|
|
|
|
l.logger.Printf("tsm1 WAL writing to %s\n", l.path)
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
if err := os.MkdirAll(l.path, 0777); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-12-01 04:25:30 +00:00
|
|
|
segments, err := segmentFileNames(l.path)
|
|
|
|
if err != nil {
|
2015-11-30 19:29:24 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-12-01 04:25:30 +00:00
|
|
|
if len(segments) > 0 {
|
|
|
|
lastSegment := segments[len(segments)-1]
|
|
|
|
id, err := idFromFileName(lastSegment)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
l.currentSegmentID = id
|
|
|
|
stat, err := os.Stat(lastSegment)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if stat.Size() == 0 {
|
|
|
|
os.Remove(lastSegment)
|
|
|
|
}
|
|
|
|
if err := l.newSegmentFile(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
l.closing = make(chan struct{})
|
|
|
|
|
2015-12-07 19:35:37 +00:00
|
|
|
l.lastWriteTime = time.Now()
|
|
|
|
|
2015-11-18 06:08:29 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-11-25 20:23:10 +00:00
|
|
|
// WritePoints writes the given points to the WAL. Returns the WAL segment ID to
|
|
|
|
// which the points were written. If an error is returned the segment ID should
|
|
|
|
// be ignored.
|
|
|
|
func (l *WAL) WritePoints(values map[string][]Value) (int, error) {
|
2015-11-18 06:08:29 +00:00
|
|
|
entry := &WriteWALEntry{
|
2015-11-21 22:50:28 +00:00
|
|
|
Values: values,
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
|
2015-11-25 20:23:10 +00:00
|
|
|
id, err := l.writeToLog(entry)
|
|
|
|
if err != nil {
|
|
|
|
return -1, err
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
|
2015-11-25 20:23:10 +00:00
|
|
|
return id, nil
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
func (l *WAL) ClosedSegments() ([]string, error) {
|
2015-11-18 22:42:48 +00:00
|
|
|
l.mu.RLock()
|
2015-11-30 19:29:24 +00:00
|
|
|
defer l.mu.RUnlock()
|
2015-11-18 22:42:48 +00:00
|
|
|
// Not loading files from disk so nothing to do
|
|
|
|
if l.path == "" {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
var currentFile string
|
2015-12-01 04:25:30 +00:00
|
|
|
if l.currentSegmentWriter != nil {
|
2015-12-06 23:50:39 +00:00
|
|
|
currentFile = l.currentSegmentWriter.path()
|
2015-12-03 13:11:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
files, err := segmentFileNames(l.path)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var closedFiles []string
|
|
|
|
for _, fn := range files {
|
|
|
|
// Skip the current path
|
|
|
|
if fn == currentFile {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
closedFiles = append(closedFiles, fn)
|
2015-12-01 04:25:30 +00:00
|
|
|
}
|
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
return closedFiles, nil
|
2015-11-18 22:42:48 +00:00
|
|
|
}
|
|
|
|
|
2015-12-03 21:23:01 +00:00
|
|
|
func (l *WAL) Remove(files []string) error {
|
|
|
|
l.mu.Lock()
|
|
|
|
defer l.mu.Unlock()
|
|
|
|
for _, fn := range files {
|
|
|
|
os.RemoveAll(fn)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-12-06 23:50:39 +00:00
|
|
|
// LastWriteTime is the last time anything was written to the WAL
|
|
|
|
func (l *WAL) LastWriteTime() time.Time {
|
2015-11-19 06:15:44 +00:00
|
|
|
l.mu.RLock()
|
2015-12-06 23:50:39 +00:00
|
|
|
defer l.mu.RUnlock()
|
|
|
|
return l.lastWriteTime
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l *WAL) writeToLog(entry WALEntry) (int, error) {
|
|
|
|
// encode and compress the entry while we're not locked
|
|
|
|
bytes := make([]byte, defaultBufLen)
|
|
|
|
|
|
|
|
b, err := entry.Encode(bytes)
|
|
|
|
if err != nil {
|
|
|
|
return -1, err
|
|
|
|
}
|
|
|
|
|
|
|
|
compressed := snappy.Encode(b, b)
|
|
|
|
|
|
|
|
l.mu.Lock()
|
|
|
|
defer l.mu.Unlock()
|
|
|
|
|
2015-11-18 06:08:29 +00:00
|
|
|
// Make sure the log has not been closed
|
|
|
|
select {
|
|
|
|
case <-l.closing:
|
2015-11-25 20:23:10 +00:00
|
|
|
return -1, ErrWALClosed
|
2015-11-18 06:08:29 +00:00
|
|
|
default:
|
|
|
|
}
|
2015-11-30 19:29:24 +00:00
|
|
|
|
2015-12-06 23:50:39 +00:00
|
|
|
// roll the segment file if needed
|
2015-11-19 17:54:58 +00:00
|
|
|
if err := l.rollSegment(); err != nil {
|
2015-11-25 20:23:10 +00:00
|
|
|
return -1, fmt.Errorf("error rolling WAL segment: %v", err)
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
|
2015-12-06 23:50:39 +00:00
|
|
|
// write and sync
|
|
|
|
if err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil {
|
2015-11-25 20:23:10 +00:00
|
|
|
return -1, fmt.Errorf("error writing WAL entry: %v", err)
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
|
2015-12-06 23:50:39 +00:00
|
|
|
l.lastWriteTime = time.Now()
|
|
|
|
|
|
|
|
return l.currentSegmentID, l.currentSegmentWriter.sync()
|
2015-11-18 22:42:48 +00:00
|
|
|
}
|
2015-11-18 06:08:29 +00:00
|
|
|
|
2015-12-01 04:25:30 +00:00
|
|
|
// rollSegment closes the current segment and opens a new one if the current segment is over
|
|
|
|
// the max segment size.
|
2015-11-19 17:54:58 +00:00
|
|
|
func (l *WAL) rollSegment() error {
|
2015-12-06 23:50:39 +00:00
|
|
|
if l.currentSegmentWriter == nil || l.currentSegmentWriter.size > DefaultSegmentSize {
|
2015-11-19 17:54:58 +00:00
|
|
|
if err := l.newSegmentFile(); err != nil {
|
|
|
|
// A drop database or RP call could trigger this error if writes were in-flight
|
|
|
|
// when the drop statement executes.
|
2015-12-03 21:23:01 +00:00
|
|
|
return fmt.Errorf("error opening new segment file for wal (2): %v", err)
|
2015-11-19 17:54:58 +00:00
|
|
|
}
|
2015-11-21 22:50:28 +00:00
|
|
|
return nil
|
2015-11-19 17:54:58 +00:00
|
|
|
}
|
2015-12-06 23:50:39 +00:00
|
|
|
|
2015-11-19 17:54:58 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-12-01 04:25:30 +00:00
|
|
|
// CloseSegment closes the current segment if it is non-empty and opens a new one.
|
|
|
|
func (l *WAL) CloseSegment() error {
|
|
|
|
l.mu.Lock()
|
|
|
|
defer l.mu.Unlock()
|
2015-12-06 23:50:39 +00:00
|
|
|
if l.currentSegmentWriter == nil || l.currentSegmentWriter.size > 0 {
|
2015-12-01 04:25:30 +00:00
|
|
|
if err := l.newSegmentFile(); err != nil {
|
|
|
|
// A drop database or RP call could trigger this error if writes were in-flight
|
|
|
|
// when the drop statement executes.
|
2015-12-03 21:23:01 +00:00
|
|
|
return fmt.Errorf("error opening new segment file for wal (1): %v", err)
|
2015-12-01 04:25:30 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-11-25 20:23:10 +00:00
|
|
|
// Delete deletes the given keys, returning the segment ID for the operation.
|
|
|
|
func (l *WAL) Delete(keys []string) (int, error) {
|
2015-12-07 19:35:37 +00:00
|
|
|
if len(keys) == 0 {
|
|
|
|
return 0, nil
|
|
|
|
}
|
2015-11-18 22:42:48 +00:00
|
|
|
entry := &DeleteWALEntry{
|
|
|
|
Keys: keys,
|
|
|
|
}
|
2015-11-18 06:08:29 +00:00
|
|
|
|
2015-11-25 20:23:10 +00:00
|
|
|
id, err := l.writeToLog(entry)
|
|
|
|
if err != nil {
|
|
|
|
return -1, err
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
2015-11-25 20:23:10 +00:00
|
|
|
return id, nil
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Close will finish any flush that is currently in process and close file handles
|
|
|
|
func (l *WAL) Close() error {
|
2015-11-18 22:42:48 +00:00
|
|
|
l.mu.Lock()
|
|
|
|
defer l.mu.Unlock()
|
2015-11-18 06:08:29 +00:00
|
|
|
|
|
|
|
// Close, but don't set to nil so future goroutines can still be signaled
|
|
|
|
close(l.closing)
|
|
|
|
|
2015-11-18 22:42:48 +00:00
|
|
|
if l.currentSegmentWriter != nil {
|
2015-12-06 23:50:39 +00:00
|
|
|
l.currentSegmentWriter.close()
|
2015-12-04 16:09:39 +00:00
|
|
|
l.currentSegmentWriter = nil
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// segmentFileNames will return all files that are WAL segment files in sorted order by ascending ID
|
2015-11-30 19:29:24 +00:00
|
|
|
func segmentFileNames(dir string) ([]string, error) {
|
|
|
|
names, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("%s*.%s", WALFilePrefix, WALFileExtension)))
|
2015-11-18 06:08:29 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
sort.Strings(names)
|
|
|
|
return names, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the log
|
|
|
|
func (l *WAL) newSegmentFile() error {
|
|
|
|
l.currentSegmentID++
|
2015-11-18 22:42:48 +00:00
|
|
|
if l.currentSegmentWriter != nil {
|
2015-12-06 23:50:39 +00:00
|
|
|
if err := l.currentSegmentWriter.close(); err != nil {
|
2015-11-18 06:08:29 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension))
|
2015-11-23 21:01:09 +00:00
|
|
|
fd, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
|
2015-11-18 06:08:29 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-11-23 21:01:09 +00:00
|
|
|
l.currentSegmentWriter = NewWALSegmentWriter(fd)
|
2015-11-18 06:08:29 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// WALEntry is record stored in each WAL segment. Each entry has a type
|
|
|
|
// and an opaque, type dependent byte slice data attribute.
|
|
|
|
type WALEntry interface {
|
2015-12-06 23:50:39 +00:00
|
|
|
Type() WalEntryType
|
2015-11-18 06:08:29 +00:00
|
|
|
Encode(dst []byte) ([]byte, error)
|
|
|
|
MarshalBinary() ([]byte, error)
|
2015-11-19 06:15:44 +00:00
|
|
|
UnmarshalBinary(b []byte) error
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// WriteWALEntry represents a write of points.
|
|
|
|
type WriteWALEntry struct {
|
2015-11-21 22:50:28 +00:00
|
|
|
Values map[string][]Value
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
|
2015-11-24 16:44:37 +00:00
|
|
|
// Encode converts the WriteWALEntry into a byte stream using dst if it
|
|
|
|
// is large enough. If dst is too small, the slice will be grown to fit the
|
|
|
|
// encoded entry.
|
2015-11-18 06:08:29 +00:00
|
|
|
func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
|
2015-11-24 16:44:37 +00:00
|
|
|
// The entries values are encode as follows:
|
|
|
|
//
|
|
|
|
// For each key and slice of values, first a 1 byte type for the []Values
|
|
|
|
// slice is written. Following the type, the length and key bytes are written.
|
|
|
|
// Following the key, a 4 byte count followed by each value as a 8 byte time
|
|
|
|
// and N byte value. The value is dependent on the type being encoded. float64,
|
|
|
|
// int64, use 8 bytes, bool uses 1 byte, and string is similar to the key encoding.
|
|
|
|
//
|
|
|
|
// This structure is then repeated for each key an value slices.
|
|
|
|
//
|
|
|
|
// ┌────────────────────────────────────────────────────────────────────┐
|
|
|
|
// │ WriteWALEntry │
|
|
|
|
// ├──────┬─────────┬────────┬───────┬─────────┬─────────┬───┬──────┬───┤
|
|
|
|
// │ Type │ Key Len │ Key │ Count │ Time │ Value │...│ Type │...│
|
|
|
|
// │1 byte│ 4 bytes │ N bytes│4 bytes│ 8 bytes │ N bytes │ │1 byte│ │
|
|
|
|
// └──────┴─────────┴────────┴───────┴─────────┴─────────┴───┴──────┴───┘
|
2015-11-18 06:08:29 +00:00
|
|
|
var n int
|
2015-11-21 22:50:28 +00:00
|
|
|
|
|
|
|
for k, v := range w.Values {
|
|
|
|
|
2015-12-10 00:56:01 +00:00
|
|
|
// Make sure we have enough space in our buf before copying. If not,
|
|
|
|
// grow the buf.
|
|
|
|
if len(dst[:n])+2+len(k)+len(v)*8+4 > len(dst) {
|
|
|
|
grow := make([]byte, len(dst)*2)
|
|
|
|
dst = append(dst, grow...)
|
|
|
|
}
|
|
|
|
|
2015-11-21 22:50:28 +00:00
|
|
|
switch v[0].Value().(type) {
|
|
|
|
case float64:
|
|
|
|
dst[n] = float64EntryType
|
|
|
|
case int64:
|
|
|
|
dst[n] = int64EntryType
|
|
|
|
case bool:
|
|
|
|
dst[n] = boolEntryType
|
|
|
|
case string:
|
|
|
|
dst[n] = stringEntryType
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unsupported value type: %#v", v[0].Value())
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
2015-11-21 22:50:28 +00:00
|
|
|
n++
|
2015-11-18 06:08:29 +00:00
|
|
|
|
2015-11-21 22:50:28 +00:00
|
|
|
n += copy(dst[n:], u16tob(uint16(len(k))))
|
|
|
|
n += copy(dst[n:], []byte(k))
|
|
|
|
|
|
|
|
n += copy(dst[n:], u32tob(uint32(len(v))))
|
|
|
|
|
|
|
|
for _, vv := range v {
|
2015-12-10 00:56:01 +00:00
|
|
|
|
|
|
|
// Grow our slice if needed
|
|
|
|
if len(dst[:n])+16 > len(dst) {
|
|
|
|
grow := make([]byte, len(dst)*2)
|
|
|
|
dst = append(dst, grow...)
|
|
|
|
}
|
|
|
|
|
2015-11-21 22:50:28 +00:00
|
|
|
n += copy(dst[n:], u64tob(uint64(vv.Time().UnixNano())))
|
|
|
|
switch t := vv.Value().(type) {
|
|
|
|
case float64:
|
|
|
|
n += copy(dst[n:], u64tob(uint64(math.Float64bits(t))))
|
|
|
|
case int64:
|
|
|
|
n += copy(dst[n:], u64tob(uint64(t)))
|
|
|
|
case bool:
|
|
|
|
if t {
|
|
|
|
n += copy(dst[n:], []byte{1})
|
|
|
|
} else {
|
|
|
|
n += copy(dst[n:], []byte{0})
|
|
|
|
}
|
|
|
|
case string:
|
|
|
|
n += copy(dst[n:], u32tob(uint32(len(t))))
|
|
|
|
n += copy(dst[n:], []byte(t))
|
|
|
|
}
|
|
|
|
}
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return dst[:n], nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WriteWALEntry) MarshalBinary() ([]byte, error) {
|
|
|
|
// Temp buffer to write marshaled points into
|
2015-11-19 06:15:44 +00:00
|
|
|
b := make([]byte, defaultBufLen)
|
2015-11-18 06:08:29 +00:00
|
|
|
return w.Encode(b)
|
|
|
|
}
|
|
|
|
|
2015-11-19 06:15:44 +00:00
|
|
|
func (w *WriteWALEntry) UnmarshalBinary(b []byte) error {
|
2015-11-18 06:08:29 +00:00
|
|
|
var i int
|
|
|
|
for i < len(b) {
|
2015-11-21 22:50:28 +00:00
|
|
|
typ := b[i]
|
|
|
|
i++
|
|
|
|
|
|
|
|
length := int(btou16(b[i : i+2]))
|
|
|
|
i += 2
|
|
|
|
k := string(b[i : i+length])
|
|
|
|
i += length
|
|
|
|
|
|
|
|
nvals := int(btou32(b[i : i+4]))
|
2015-11-18 06:08:29 +00:00
|
|
|
i += 4
|
|
|
|
|
2015-11-21 22:50:28 +00:00
|
|
|
var values []Value
|
|
|
|
switch typ {
|
|
|
|
case float64EntryType:
|
|
|
|
values = getFloat64Values(nvals)
|
|
|
|
case int64EntryType:
|
|
|
|
values = getInt64Values(nvals)
|
|
|
|
case boolEntryType:
|
|
|
|
values = getBoolValues(nvals)
|
|
|
|
case stringEntryType:
|
|
|
|
values = getStringValues(nvals)
|
|
|
|
default:
|
|
|
|
return fmt.Errorf("unsupported value type: %#v", typ)
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
2015-11-21 22:50:28 +00:00
|
|
|
|
|
|
|
for j := 0; j < nvals; j++ {
|
|
|
|
t := time.Unix(0, int64(btou64(b[i:i+8])))
|
|
|
|
i += 8
|
|
|
|
|
|
|
|
switch typ {
|
|
|
|
case float64EntryType:
|
|
|
|
v := math.Float64frombits((btou64(b[i : i+8])))
|
|
|
|
i += 8
|
|
|
|
if fv, ok := values[j].(*FloatValue); ok {
|
|
|
|
fv.time = t
|
|
|
|
fv.value = v
|
|
|
|
}
|
|
|
|
case int64EntryType:
|
|
|
|
v := int64(btou64(b[i : i+8]))
|
|
|
|
i += 8
|
|
|
|
if fv, ok := values[j].(*Int64Value); ok {
|
|
|
|
fv.time = t
|
|
|
|
fv.value = v
|
|
|
|
}
|
|
|
|
case boolEntryType:
|
|
|
|
v := b[i]
|
|
|
|
i += 1
|
|
|
|
if fv, ok := values[j].(*BoolValue); ok {
|
|
|
|
fv.time = t
|
|
|
|
if v == 1 {
|
|
|
|
fv.value = true
|
|
|
|
} else {
|
|
|
|
fv.value = false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
case stringEntryType:
|
|
|
|
length := int(btou32(b[i : i+4]))
|
|
|
|
i += 4
|
|
|
|
v := string(b[i : i+length])
|
|
|
|
i += length
|
|
|
|
if fv, ok := values[j].(*StringValue); ok {
|
|
|
|
fv.time = t
|
|
|
|
fv.value = v
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return fmt.Errorf("unsupported value type: %#v", typ)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
w.Values[k] = values
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-12-06 23:50:39 +00:00
|
|
|
func (w *WriteWALEntry) Type() WalEntryType {
|
2015-11-18 06:08:29 +00:00
|
|
|
return WriteWALEntryType
|
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteWALEntry represents the deletion of multiple series.
|
|
|
|
type DeleteWALEntry struct {
|
|
|
|
Keys []string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *DeleteWALEntry) MarshalBinary() ([]byte, error) {
|
2015-11-19 06:15:44 +00:00
|
|
|
b := make([]byte, defaultBufLen)
|
2015-11-18 06:08:29 +00:00
|
|
|
return w.Encode(b)
|
|
|
|
}
|
|
|
|
|
2015-11-19 06:15:44 +00:00
|
|
|
func (w *DeleteWALEntry) UnmarshalBinary(b []byte) error {
|
2015-11-18 06:08:29 +00:00
|
|
|
w.Keys = strings.Split(string(b), "\n")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *DeleteWALEntry) Encode(dst []byte) ([]byte, error) {
|
|
|
|
var n int
|
|
|
|
for _, k := range w.Keys {
|
2015-12-08 17:47:06 +00:00
|
|
|
if len(dst[:n])+1+len(k) > len(dst) {
|
|
|
|
grow := make([]byte, len(dst)*2)
|
2015-11-18 06:08:29 +00:00
|
|
|
dst = append(dst, grow...)
|
|
|
|
}
|
|
|
|
|
|
|
|
n += copy(dst[n:], k)
|
|
|
|
n += copy(dst[n:], "\n")
|
|
|
|
}
|
|
|
|
|
|
|
|
// We return n-1 to strip off the last newline so that unmarshalling the value
|
|
|
|
// does not produce an empty string
|
|
|
|
return []byte(dst[:n-1]), nil
|
|
|
|
}
|
|
|
|
|
2015-12-06 23:50:39 +00:00
|
|
|
func (w *DeleteWALEntry) Type() WalEntryType {
|
2015-11-18 06:08:29 +00:00
|
|
|
return DeleteWALEntryType
|
|
|
|
}
|
|
|
|
|
|
|
|
// WALSegmentWriter writes WAL segments.
|
|
|
|
type WALSegmentWriter struct {
|
|
|
|
w io.WriteCloser
|
|
|
|
size int
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewWALSegmentWriter(w io.WriteCloser) *WALSegmentWriter {
|
|
|
|
return &WALSegmentWriter{
|
|
|
|
w: w,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-12-06 23:50:39 +00:00
|
|
|
func (w *WALSegmentWriter) path() string {
|
2015-11-18 22:42:48 +00:00
|
|
|
if f, ok := w.w.(*os.File); ok {
|
|
|
|
return f.Name()
|
|
|
|
}
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
|
2015-12-06 23:50:39 +00:00
|
|
|
func (w *WALSegmentWriter) Write(entryType WalEntryType, compressed []byte) error {
|
|
|
|
if _, err := w.w.Write([]byte{byte(entryType)}); err != nil {
|
2015-11-18 06:08:29 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-12-06 23:50:39 +00:00
|
|
|
if _, err := w.w.Write(u32tob(uint32(len(compressed)))); err != nil {
|
2015-11-19 06:15:44 +00:00
|
|
|
return err
|
|
|
|
}
|
2015-11-21 22:50:28 +00:00
|
|
|
|
2015-12-06 23:50:39 +00:00
|
|
|
if _, err := w.w.Write(compressed); err != nil {
|
2015-11-19 06:15:44 +00:00
|
|
|
return err
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
|
2015-11-23 21:01:09 +00:00
|
|
|
// 5 is the 1 byte type + 4 byte uint32 length
|
2015-11-19 06:15:44 +00:00
|
|
|
w.size += len(compressed) + 5
|
2015-11-18 06:08:29 +00:00
|
|
|
|
2015-11-18 22:42:48 +00:00
|
|
|
return nil
|
2015-11-18 06:08:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Sync flushes the file systems in-memory copy of recently written data to disk.
|
2015-12-06 23:50:39 +00:00
|
|
|
func (w *WALSegmentWriter) sync() error {
|
2015-11-18 06:08:29 +00:00
|
|
|
if f, ok := w.w.(*os.File); ok {
|
|
|
|
return f.Sync()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-12-06 23:50:39 +00:00
|
|
|
func (w *WALSegmentWriter) close() error {
|
2015-11-18 06:08:29 +00:00
|
|
|
return w.w.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
// WALSegmentReader reads WAL segments.
|
|
|
|
type WALSegmentReader struct {
|
|
|
|
r io.ReadCloser
|
|
|
|
entry WALEntry
|
2015-12-06 01:44:40 +00:00
|
|
|
n int64
|
2015-11-18 06:08:29 +00:00
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewWALSegmentReader(r io.ReadCloser) *WALSegmentReader {
|
|
|
|
return &WALSegmentReader{
|
|
|
|
r: r,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next indicates if there is a value to read
|
|
|
|
func (r *WALSegmentReader) Next() bool {
|
2015-11-19 06:15:44 +00:00
|
|
|
b := getBuf(defaultBufLen)
|
2015-11-18 06:08:29 +00:00
|
|
|
defer putBuf(b)
|
2015-12-06 01:44:40 +00:00
|
|
|
var nReadOK int
|
2015-11-18 06:08:29 +00:00
|
|
|
|
|
|
|
// read the type and the length of the entry
|
2015-12-06 01:44:40 +00:00
|
|
|
n, err := io.ReadFull(r.r, b[:5])
|
2015-11-18 06:08:29 +00:00
|
|
|
if err == io.EOF {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
r.err = err
|
|
|
|
// We return true here because we want the client code to call read which
|
|
|
|
// will return the this error to be handled.
|
|
|
|
return true
|
|
|
|
}
|
2015-12-06 01:44:40 +00:00
|
|
|
nReadOK += n
|
2015-11-18 06:08:29 +00:00
|
|
|
|
|
|
|
entryType := b[0]
|
|
|
|
length := btou32(b[1:5])
|
|
|
|
|
|
|
|
// read the compressed block and decompress it
|
|
|
|
if int(length) > len(b) {
|
|
|
|
b = make([]byte, length)
|
|
|
|
}
|
|
|
|
|
2015-12-06 01:44:40 +00:00
|
|
|
n, err = io.ReadFull(r.r, b[:length])
|
2015-11-18 06:08:29 +00:00
|
|
|
if err != nil {
|
|
|
|
r.err = err
|
|
|
|
return true
|
|
|
|
}
|
2015-12-06 01:44:40 +00:00
|
|
|
nReadOK += n
|
2015-11-18 06:08:29 +00:00
|
|
|
|
2015-11-21 22:50:28 +00:00
|
|
|
data, err := snappy.Decode(nil, b[:length])
|
2015-11-18 06:08:29 +00:00
|
|
|
if err != nil {
|
|
|
|
r.err = err
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// and marshal it and send it to the cache
|
2015-12-06 23:50:39 +00:00
|
|
|
switch WalEntryType(entryType) {
|
2015-11-18 06:08:29 +00:00
|
|
|
case WriteWALEntryType:
|
2015-11-21 22:50:28 +00:00
|
|
|
r.entry = &WriteWALEntry{
|
|
|
|
Values: map[string][]Value{},
|
|
|
|
}
|
2015-11-18 06:08:29 +00:00
|
|
|
case DeleteWALEntryType:
|
|
|
|
r.entry = &DeleteWALEntry{}
|
|
|
|
default:
|
|
|
|
r.err = fmt.Errorf("unknown wal entry type: %v", entryType)
|
|
|
|
return true
|
|
|
|
}
|
2015-11-19 06:15:44 +00:00
|
|
|
r.err = r.entry.UnmarshalBinary(data)
|
2015-12-06 01:44:40 +00:00
|
|
|
if r.err == nil {
|
|
|
|
// Read and decode of this entry was successful.
|
|
|
|
r.n += int64(nReadOK)
|
|
|
|
}
|
2015-11-18 06:08:29 +00:00
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *WALSegmentReader) Read() (WALEntry, error) {
|
|
|
|
if r.err != nil {
|
|
|
|
return nil, r.err
|
|
|
|
}
|
|
|
|
return r.entry, nil
|
|
|
|
}
|
|
|
|
|
2015-12-06 01:44:40 +00:00
|
|
|
// Count returns the total number of bytes read successfully from the segment, as
|
|
|
|
// of the last call to Read(). The segment is guaranteed to be valid up to and
|
|
|
|
// including this number of bytes.
|
|
|
|
func (r *WALSegmentReader) Count() int64 {
|
|
|
|
return r.n
|
|
|
|
}
|
|
|
|
|
2015-11-18 06:08:29 +00:00
|
|
|
func (r *WALSegmentReader) Error() error {
|
|
|
|
return r.err
|
|
|
|
}
|
|
|
|
|
2015-11-21 22:50:28 +00:00
|
|
|
func (r *WALSegmentReader) Close() error {
|
|
|
|
return r.r.Close()
|
|
|
|
}
|
|
|
|
|
2015-11-18 06:08:29 +00:00
|
|
|
// idFromFileName parses the segment file ID from its name
|
|
|
|
func idFromFileName(name string) (int, error) {
|
|
|
|
parts := strings.Split(filepath.Base(name), ".")
|
|
|
|
if len(parts) != 2 {
|
|
|
|
return 0, fmt.Errorf("file %s has wrong name format to have an id", name)
|
|
|
|
}
|
|
|
|
|
|
|
|
id, err := strconv.ParseUint(parts[0][1:], 10, 32)
|
|
|
|
|
|
|
|
return int(id), err
|
|
|
|
}
|