Simplify WAL to not compact since it doesn't really help the engine anyway

pull/4011/head
Paul Dix 2015-09-04 18:54:37 -07:00
parent e38a204afc
commit a1fb77198b
3 changed files with 255 additions and 800 deletions

View File

@ -30,7 +30,7 @@ const (
// DefaultFlushColdInterval specifies how long after a partition has been cold
// for writes that a full flush and compaction are forced
DefaultFlushColdInterval = 5 * time.Minute
DefaultFlushColdInterval = 5 * time.Second
// DefaultParititionSizeThreshold specifies when a partition gets to this size in
// memory, we should slow down writes until it gets a chance to compact.

View File

@ -26,7 +26,6 @@ import (
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"io"
"log"
"os"
@ -45,9 +44,6 @@ const (
// DefaultSegmentSize of 2MB is the size at which segment files will be rolled over
DefaultSegmentSize = 2 * 1024 * 1024
// PartitionCount is the number of partitions in the WAL
PartitionCount = 5
// FileExtension is the file extension we expect for wal segments
FileExtension = "wal"
@ -114,8 +110,8 @@ type Log struct {
LogOutput io.Writer
logger *log.Logger
mu sync.RWMutex
partitions map[uint8]*Partition
mu sync.RWMutex
partition *Partition
// metaFile is the file that compressed metadata like series and fields are written to
metaFile *os.File
@ -141,12 +137,6 @@ type Log struct {
// PartitionSizeThreshold specifies when a partition should be forced to be flushed.
PartitionSizeThreshold uint64
// partitionCount is the number of separate partitions to create for the WAL.
// Compactions happen per partition. So this number will affect what percentage
// of the WAL gets compacted at a time. For instance, a setting of 10 means
// we generally will be compacting about 10% of the WAL at a time.
partitionCount uint64
// Index is the database that series data gets flushed to once it gets compacted
// out of the WAL.
Index IndexWriter
@ -176,7 +166,6 @@ func NewLog(path string) *Log {
CompactionThreshold: tsdb.DefaultCompactionThreshold,
PartitionSizeThreshold: tsdb.DefaultPartitionSizeThreshold,
ReadySeriesSize: tsdb.DefaultReadySeriesSize,
partitionCount: PartitionCount,
flushCheckInterval: defaultFlushCheckInterval,
logger: log.New(os.Stderr, "[wal] ", log.LstdFlags),
}
@ -198,17 +187,14 @@ func (l *Log) Open() error {
return err
}
// open the partitions
l.partitions = make(map[uint8]*Partition)
for i := uint64(1); i <= l.partitionCount; i++ {
p, err := NewPartition(uint8(i), l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index)
if err != nil {
return err
}
p.log = l
l.partitions[uint8(i)] = p
// open the partition
p, err := NewPartition(uint8(1), l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index)
if err != nil {
return err
}
if err := l.openPartitionFiles(); err != nil {
p.log = l
l.partition = p
if err := l.openPartitionFile(); err != nil {
return err
}
@ -241,7 +227,7 @@ func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
l.mu.RLock()
defer l.mu.RUnlock()
return l.partition([]byte(key)).cursor(key, direction)
return l.partition.cursor(key, direction)
}
func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
@ -252,17 +238,7 @@ func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.Measureme
}
// persist the raw point data
l.mu.RLock()
partitionsToWrite := l.pointsToPartitions(points)
l.mu.RUnlock()
for p, points := range partitionsToWrite {
if err := p.Write(points); err != nil {
return err
}
}
return nil
return l.partition.Write(points)
}
// Flush will force a flush on all paritions
@ -270,13 +246,7 @@ func (l *Log) Flush() error {
l.mu.RLock()
defer l.mu.RUnlock()
for _, p := range l.partitions {
if err := p.flushAndCompact(idleFlush); err != nil {
return err
}
}
return nil
return l.partition.flushAndCompact(idleFlush)
}
// LoadMetadatIndex loads the new series and fields files into memory and flushes them to the BoltDB index. This function
@ -347,11 +317,7 @@ func (l *Log) DeleteSeries(keys []string) error {
l.mu.Lock()
defer l.mu.Unlock()
for _, p := range l.partitions {
p.deleteSeries(keys)
}
return nil
return l.partition.deleteSeries(keys)
}
// readMetadataFile will read the entire contents of the meta file and return a slice of the
@ -498,55 +464,41 @@ func (l *Log) metadataFiles() ([]string, error) {
return a, nil
}
// pointsToPartitions returns a map that organizes the points into the partitions they should be mapped to
func (l *Log) pointsToPartitions(points []tsdb.Point) map[*Partition][]tsdb.Point {
m := make(map[*Partition][]tsdb.Point)
for _, p := range points {
pp := l.partition(p.Key())
m[pp] = append(m[pp], p)
}
return m
}
// openPartitionFiles will open all partitions and read their segment files
func (l *Log) openPartitionFiles() error {
results := make(chan error, len(l.partitions))
for _, p := range l.partitions {
go func(p *Partition) {
p.mu.Lock()
defer p.mu.Unlock()
// Recover from a partial compaction.
if err := p.recoverCompactionFile(); err != nil {
results <- fmt.Errorf("recover compaction files: %s", err)
return
}
fileNames, err := p.segmentFileNames()
if err != nil {
results <- err
return
}
for _, n := range fileNames {
entries, err := p.readFile(n)
if err != nil {
results <- err
return
}
for _, e := range entries {
p.addToCache(e.key, e.data, e.timestamp)
}
}
results <- nil
}(p)
// openPartitionFiles will open the partition and flush all segment files to the index
func (l *Log) openPartitionFile() error {
// Recover from a partial compaction.
if err := l.partition.recoverCompactionFile(); err != nil {
return fmt.Errorf("recover compaction files: %s", err)
}
for i := 0; i < len(l.partitions); i++ {
err := <-results
fileNames, err := l.partition.segmentFileNames()
if err != nil {
return err
}
if l.LoggingEnabled && len(fileNames) > 0 {
l.logger.Println("reading WAL files to flush to index")
}
for _, n := range fileNames {
entries, err := l.partition.readFile(n)
if err != nil {
return err
}
seriesToFlush := make(map[string][][]byte)
for _, e := range entries {
seriesToFlush[string(e.key)] = append(seriesToFlush[string(e.key)], MarshalEntry(e.timestamp, e.data))
}
if l.LoggingEnabled {
l.logger.Printf("writing %d series from WAL file %s to index\n", len(seriesToFlush), n)
}
if err := l.Index.WriteIndex(seriesToFlush, nil, nil); err != nil {
return err
}
if err := os.Remove(n); err != nil {
return err
}
}
return nil
@ -569,22 +521,19 @@ func (l *Log) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
// clear the cache
// close partition and metafile
if err := l.close(); err != nil {
return err
}
l.partitions = nil
return nil
}
// close all the open Log partitions and file handles
func (l *Log) close() error {
for _, p := range l.partitions {
if err := p.Close(); err != nil {
// log and skip so we can close the other partitions
l.logger.Println("error closing partition:", err)
}
if err := l.partition.Close(); err != nil {
// log and skip so we can close the other partitions
l.logger.Println("error closing partition:", err)
}
if err := l.metaFile.Close(); err != nil {
@ -599,11 +548,10 @@ func (l *Log) close() error {
func (l *Log) triggerAutoFlush() {
l.mu.RLock()
defer l.mu.RUnlock()
for _, p := range l.partitions {
if f := p.shouldFlush(l.MaxSeriesSize, l.CompactionThreshold); f != noFlush {
if err := p.flushAndCompact(f); err != nil {
l.logger.Printf("error flushing partition %d: %s\n", p.id, err)
}
if f := l.partition.shouldFlush(); f != noFlush {
if err := l.partition.flushAndCompact(f); err != nil {
l.logger.Printf("error flushing partition: %s\n", err)
}
}
}
@ -716,23 +664,6 @@ func (l *Log) flushMetadata() error {
return nil
}
// walPartition returns the partition number that key belongs to.
func (l *Log) partition(key []byte) *Partition {
h := fnv.New64a()
h.Write(key)
id := uint8(h.Sum64()%l.partitionCount + 1)
p := l.partitions[id]
if p == nil {
p, err := NewPartition(id, l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index)
if err != nil {
panic(err)
}
p.log = l
l.partitions[id] = p
}
return p
}
// Partition is a set of files for a partition of the WAL. We use multiple partitions so when compactions occur
// only a portion of the WAL must be flushed and compacted
type Partition struct {
@ -818,6 +749,7 @@ func (p *Partition) Close() error {
if err := p.currentSegmentFile.Close(); err != nil {
return err
}
p.currentSegmentFile = nil
return nil
}
@ -889,10 +821,8 @@ func (p *Partition) Write(points []tsdb.Point) error {
p.addToCache(pp.Key(), pp.Data(), pp.UnixNano())
}
}
if err := p.currentSegmentFile.Sync(); err != nil {
return err
}
return nil
return p.currentSegmentFile.Sync()
}
// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the partition
@ -938,11 +868,10 @@ func (p *Partition) fileIDFromName(name string) (uint32, error) {
return uint32(id), nil
}
// shouldFlush returns a flushType that indicates if a partition should be flushed and why. The criteria are:
// maxSeriesSize - flush if any series in the partition has exceeded this size threshold
// readySeriesSize - a series is ready to flush once it has this much data in it
// compactionThreshold - a partition is ready to flush if this percentage of series has hit the readySeriesSize or greater
func (p *Partition) shouldFlush(maxSeriesSize int, compactionThreshold float64) flushType {
// shouldFlush returns a flushType that indicates if a partition should be flushed and why. If the
// partition hasn't received a write in a configurable amount of time it will flush or if the
// size of the in memory cache is too large it will flush.
func (p *Partition) shouldFlush() flushType {
p.mu.Lock()
defer p.mu.Unlock()
@ -958,25 +887,10 @@ func (p *Partition) shouldFlush(maxSeriesSize int, compactionThreshold float64)
return idleFlush
}
countReady := 0
for _, c := range p.cache {
// if we have a series with the max possible size, shortcut out because we need to flush
if c.size > maxSeriesSize {
return thresholdFlush
} else if c.size > p.readySeriesSize {
countReady += 1
}
}
if float64(countReady)/float64(len(p.cache)) > compactionThreshold {
return thresholdFlush
}
return noFlush
}
// prepareSeriesToFlush will empty the cache of series that are ready based on their size
// and return information for the compaction process to use.
// prepareSeriesToFlush will empty the cache of series and return compaction information
func (p *Partition) prepareSeriesToFlush(readySeriesSize int, flush flushType) (*compactionInfo, error) {
p.mu.Lock()
defer p.mu.Unlock()
@ -990,36 +904,15 @@ func (p *Partition) prepareSeriesToFlush(readySeriesSize int, flush flushType) (
}
p.compactionRunning = true
// we've been ordered to flush and compact. iterate until we have at least
// some series to flush by cutting the ready size in half each iteration
// if we didn't come up with any
var seriesToFlush map[string][][]byte
var size int
// if this flush is being triggered because the partition is idle, all series hit the threshold
if flush == idleFlush {
for _, c := range p.cache {
size += c.size
}
seriesToFlush = make(map[string][][]byte)
for k, c := range p.cache {
seriesToFlush[k] = c.points
}
p.cache = make(map[string]*cacheEntry)
} else {
// only grab the series that hit the thresold. loop until we have series to flush
for {
s, n := p.seriesToFlush(readySeriesSize)
if len(s) > 0 {
seriesToFlush = s
size += n
break
}
// we didn't get any series to flush so cut the ready size in half
// and see if there are series that are ready at that level
readySeriesSize = readySeriesSize / 2
}
for _, c := range p.cache {
size += c.size
}
seriesToFlush := make(map[string][][]byte)
for k, c := range p.cache {
seriesToFlush[k] = c.points
}
p.cache = make(map[string]*cacheEntry)
c := &compactionInfo{seriesToFlush: seriesToFlush, flushSize: size}
@ -1047,29 +940,6 @@ func (p *Partition) prepareSeriesToFlush(readySeriesSize int, flush flushType) (
return c, nil
}
// seriesToFlush will clear the cache of series over the give threshold and return
// them in a new map along with their combined size
func (p *Partition) seriesToFlush(readySeriesSize int) (map[string][][]byte, int) {
seriesToFlush := make(map[string][][]byte)
size := 0
for k, c := range p.cache {
// if the series is over the threshold, save it in the map to flush later
if c.size >= readySeriesSize {
size += c.size
seriesToFlush[k] = c.points
// always hand the index data that is sorted
if c.isDirtySort {
sort.Sort(tsdb.ByteSlices(seriesToFlush[k]))
}
delete(p.cache, k)
}
}
return seriesToFlush, size
}
// flushAndCompact will flush any series that are over their threshold and then read in all old segment files and
// write the data that was not flushed to a new file
func (p *Partition) flushAndCompact(flush flushType) error {
@ -1083,35 +953,29 @@ func (p *Partition) flushAndCompact(flush flushType) error {
return nil
}
// Lock before flushing to ensure timing is not spent waiting for lock.
func() {
p.log.flushLock.Lock()
defer p.log.flushLock.Unlock()
if p.log.LoggingEnabled {
ftype := "idle"
if flush == thresholdFlush {
ftype = "threshold"
} else if flush == memoryFlush {
ftype = "memory"
}
pointCount := 0
for _, a := range c.seriesToFlush {
pointCount += len(a)
}
p.log.logger.Printf("Flush due to %s. Flushing %d series with %d points and %d bytes from partition %d. Compacting %d series\n", ftype, len(c.seriesToFlush), pointCount, c.flushSize, p.id, c.countCompacting)
}
startTime := time.Now()
if p.log.LoggingEnabled {
ftype := "idle"
if flush == thresholdFlush {
ftype = "threshold"
} else if flush == memoryFlush {
ftype = "memory"
}
pointCount := 0
for _, a := range c.seriesToFlush {
pointCount += len(a)
}
p.log.logger.Printf("Flush due to %s. Flushing %d series with %d points and %d bytes from partition %d. Compacting %d series\n", ftype, len(c.seriesToFlush), pointCount, c.flushSize, p.id, c.countCompacting)
}
// write the data to the index first
if err := p.index.WriteIndex(c.seriesToFlush, nil, nil); err != nil {
// if we can't write the index, we should just bring down the server hard
panic(fmt.Sprintf("error writing the wal to the index: %s", err.Error()))
}
if p.log.LoggingEnabled {
p.log.logger.Printf("write to index of partition %d took %s\n", p.id, time.Since(startTime))
}
}()
startTime := time.Now()
// write the data to the index first
if err := p.index.WriteIndex(c.seriesToFlush, nil, nil); err != nil {
// if we can't write the index, we should just bring down the server hard
panic(fmt.Sprintf("error writing the wal to the index: %s", err.Error()))
}
if p.log.LoggingEnabled {
p.log.logger.Printf("write to index of partition %d took %s\n", p.id, time.Since(startTime))
}
// clear the flush cache and reset the memory thresholds
p.mu.Lock()
@ -1126,28 +990,18 @@ func (p *Partition) flushAndCompact(flush flushType) error {
p.mu.Unlock()
}()
startTime := time.Now()
err = p.compactFiles(c, flush)
if p.log.LoggingEnabled {
p.log.logger.Printf("compaction of partition %d took %s\n", p.id, time.Since(startTime))
}
return err
return p.removeOldSegmentFiles(c)
}
func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error {
// removeOldSegmentFiles will delete all files that have been flushed to the
// index based on the information in the compaction info.
func (p *Partition) removeOldSegmentFiles(c *compactionInfo) error {
// now compact all the old data
fileNames, err := p.segmentFileNames()
if err != nil {
return err
}
// all compacted data from the segments will go into this file
compactionFile, err := p.os.OpenCompactionFile(p.compactionFileName(), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
for _, n := range fileNames {
id, err := p.idFromFileName(n)
if err != nil {
@ -1159,98 +1013,11 @@ func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error {
break
}
f, err := p.os.OpenSegmentFile(n, os.O_RDONLY, 0666)
if err != nil {
return err
}
sf := newSegment(f, p.log.logger)
var entries []*entry
for {
name, a, err := sf.readCompressedBlock()
if name != "" {
continue // skip name blocks
} else if err != nil {
return err
} else if a == nil {
break
}
// only compact the entries from series that haven't been flushed
for _, e := range a {
if _, ok := c.seriesToFlush[string(e.key)]; !ok {
entries = append(entries, e)
}
}
}
if err := p.writeCompactionEntry(compactionFile, f.Name(), entries); err != nil {
return err
}
// now close and delete the file
if err := f.Close(); err != nil {
return err
}
if err := os.Remove(n); err != nil {
return err
}
}
// close the compaction file and rename it so that it will appear as the very first segment
if err := compactionFile.Close(); err != nil {
return err
}
// if it's an idle flush remove the compaction file
if flush == idleFlush {
return os.Remove(compactionFile.Name())
}
return p.os.Rename(compactionFile.Name(), p.fileNameForSegment(1))
}
// writeCompactionEntry will write a marker for the beginning of the file we're compacting, a compressed block
// for all entries, then a marker for the end of the file
func (p *Partition) writeCompactionEntry(f *os.File, filename string, entries []*entry) error {
if err := p.writeCompactionFileName(f, filename); err != nil {
return err
}
var block bytes.Buffer
for _, e := range entries {
marshalWALEntry(&block, e.key, e.timestamp, e.data)
}
b := snappy.Encode(nil, block.Bytes())
if _, err := f.Write(u64tob(uint64(len(b)))); err != nil {
return err
}
if _, err := f.Write(b); err != nil {
return err
}
return f.Sync()
}
// writeCompactionFileName will write a compaction log length entry and the name of the file that is compacted
func (p *Partition) writeCompactionFileName(f *os.File, filename string) error {
length := u64tob(uint64(len([]byte(filename))))
// the beginning of the length has two bytes to indicate that this is a compaction log entry
length[0] = 0xFF
length[1] = 0xFF
if _, err := f.Write(length); err != nil {
return err
}
if _, err := f.Write([]byte(filename)); err != nil {
return err
}
return nil
}
@ -1329,11 +1096,6 @@ func (p *Partition) recoverCompactionFile() error {
// readFile will read a segment file and marshal its entries into the cache
func (p *Partition) readFile(path string) (entries []*entry, err error) {
id, err := p.fileIDFromName(path)
if err != nil {
return nil, err
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, err
@ -1354,15 +1116,8 @@ func (p *Partition) readFile(path string) (entries []*entry, err error) {
entries = append(entries, a...)
}
// if this is the highest segment file, it'll be the one we use, otherwise close it out now that we're done reading
if id > p.currentSegmentID {
p.currentSegmentID = id
p.currentSegmentFile = f
p.currentSegmentSize = sf.size
} else {
if err := f.Close(); err != nil {
return nil, err
}
if err := f.Close(); err != nil {
return nil, err
}
return
}
@ -1441,7 +1196,7 @@ func (p *Partition) idFromFileName(name string) (uint32, error) {
// segmentFileNames returns all the segment files names for the partition
func (p *Partition) segmentFileNames() ([]string, error) {
path := filepath.Join(p.path, fmt.Sprintf("%02d.*.%s", p.id, FileExtension))
path := filepath.Join(p.path, fmt.Sprintf("*.%s", FileExtension))
return filepath.Glob(path)
}
@ -1449,32 +1204,12 @@ func (p *Partition) segmentFileNames() ([]string, error) {
// from any of the series passed in.
func (p *Partition) deleteSeries(keys []string) error {
p.mu.Lock()
defer p.mu.Unlock()
p.compactionRunning = true
// remove the series from the cache and prepare the compaction info
size := 0
seriesToFlush := make(map[string][][]byte)
for _, k := range keys {
entry := p.cache[k]
if entry != nil {
seriesToFlush[k] = entry.points
size += entry.size
delete(p.cache, k)
}
delete(p.cache, k)
}
p.memorySize -= uint64(size)
p.mu.Unlock()
c := &compactionInfo{seriesToFlush: seriesToFlush, flushSize: size}
// roll over a new segment file so we can compact all the old ones
if err := p.newSegmentFile(); err != nil {
return err
}
c.compactFilesLessThan = p.currentSegmentID
return p.compactFiles(c, deleteFlush)
return p.flushAndCompact(deleteFlush)
}
// compactionInfo is a data object with information about a compaction running

View File

@ -3,19 +3,14 @@ package wal
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"testing"
"time"
// "runtime"
// "sync"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb"
)
@ -45,39 +40,50 @@ func TestWAL_WritePoints(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error())
}
verify := func() {
c := log.Cursor("cpu,host=A", tsdb.Forward)
k, v := c.Seek(inttob(1))
c := log.Cursor("cpu,host=A", tsdb.Forward)
k, v := c.Seek(inttob(1))
// ensure the series are there and points are in order
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
k, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
k, v = c.Next()
if k != nil {
t.Fatalf("expected nil on last seek: %v %v", k, v)
}
c = log.Cursor("cpu,host=B", tsdb.Forward)
k, v = c.Next()
if bytes.Compare(v, p3.Data()) != 0 {
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
// ensure the series are there and points are in order
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
verify()
k, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
// ensure that we can close and re-open the log with points still there
k, v = c.Next()
if k != nil {
t.Fatalf("expected nil on last seek: %v %v", k, v)
}
c = log.Cursor("cpu,host=B", tsdb.Forward)
k, v = c.Next()
if bytes.Compare(v, p3.Data()) != 0 {
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
// ensure that we can close and re-open the log with points getting to the index
log.Close()
log.Open()
verify()
points := make([]map[string][][]byte, 0)
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
points = append(points, pointsByKey)
return nil
}}
if err := log.Open(); err != nil {
t.Fatal("error opening log", err)
}
p := points[0]
if len(p["cpu,host=A"]) != 2 {
t.Fatal("expected two points for cpu,host=A flushed to index")
}
if len(p["cpu,host=B"]) != 1 {
t.Fatal("expected one point for cpu,host=B flushed to index")
}
// ensure we can write new points into the series
p4 := parsePoint("cpu,host=A value=1.0 7", codec)
@ -91,38 +97,38 @@ func TestWAL_WritePoints(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error())
}
verify2 := func() {
c := log.Cursor("cpu,host=A", tsdb.Forward)
k, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatalf("order wrong, expected p1, %v %v %v", v, k, p1.Data())
}
_, v = c.Next()
if bytes.Compare(v, p6.Data()) != 0 {
t.Fatal("order wrong, expected p6")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatal("order wrong, expected p6")
}
_, v = c.Next()
if bytes.Compare(v, p4.Data()) != 0 {
t.Fatal("order wrong, expected p6")
}
c = log.Cursor("cpu,host=C", tsdb.Forward)
_, v = c.Next()
if bytes.Compare(v, p5.Data()) != 0 {
t.Fatal("order wrong, expected p6")
}
c = log.Cursor("cpu,host=A", tsdb.Forward)
k, v = c.Next()
if bytes.Compare(v, p6.Data()) != 0 {
t.Fatal("order wrong, expected p6")
}
_, v = c.Next()
if bytes.Compare(v, p4.Data()) != 0 {
t.Fatal("order wrong, expected p6")
}
verify2()
c = log.Cursor("cpu,host=C", tsdb.Forward)
_, v = c.Next()
if bytes.Compare(v, p5.Data()) != 0 {
t.Fatal("order wrong, expected p6")
}
log.Close()
log.Open()
if err := log.Close(); err != nil {
t.Fatal("error closing log", err)
}
verify2()
points = make([]map[string][][]byte, 0)
if err := log.Open(); err != nil {
t.Fatal("error opening log", err)
}
p = points[0]
if len(p["cpu,host=A"]) != 2 {
t.Fatal("expected two points for cpu,host=A flushed to index")
}
if len(p["cpu,host=C"]) != 1 {
t.Fatal("expected one point for cpu,host=B flushed to index")
}
}
func TestWAL_CorruptDataLengthSize(t *testing.T) {
@ -149,32 +155,38 @@ func TestWAL_CorruptDataLengthSize(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error())
}
verify := func() {
c := log.Cursor("cpu,host=A", tsdb.Forward)
_, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatal("p1 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatal("p2 value wrong")
}
_, v = c.Next()
if v != nil {
t.Fatal("expected cursor to return nil")
}
c := log.Cursor("cpu,host=A", tsdb.Forward)
_, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatal("p1 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatal("p2 value wrong")
}
_, v = c.Next()
if v != nil {
t.Fatal("expected cursor to return nil")
}
verify()
// now write junk data and ensure that we can close, re-open and read
f := log.partitions[1].currentSegmentFile
f := log.partition.currentSegmentFile
f.Write([]byte{0x23, 0x12})
f.Sync()
log.Close()
points := make([]map[string][][]byte, 0)
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
points = append(points, pointsByKey)
return nil
}}
log.Open()
verify()
p := points[0]
if len(p["cpu,host=A"]) != 2 {
t.Fatal("expected two points for cpu,host=A")
}
// now write new data and ensure it's all good
p3 := parsePoint("cpu,host=A value=29.2 6", codec)
@ -182,26 +194,20 @@ func TestWAL_CorruptDataLengthSize(t *testing.T) {
t.Fatalf("failed to write point: %s", err.Error())
}
verify = func() {
c := log.Cursor("cpu,host=A", tsdb.Forward)
_, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatal("p1 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatal("p2 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p3.Data()) != 0 {
t.Fatal("p3 value wrong")
}
c = log.Cursor("cpu,host=A", tsdb.Forward)
_, v = c.Next()
if bytes.Compare(v, p3.Data()) != 0 {
t.Fatal("p3 value wrong")
}
verify()
log.Close()
points = make([]map[string][][]byte, 0)
log.Open()
verify()
p = points[0]
if len(p["cpu,host=A"]) != 1 {
t.Fatal("expected two points for cpu,host=A")
}
}
func TestWAL_CorruptDataBlock(t *testing.T) {
@ -228,27 +234,23 @@ func TestWAL_CorruptDataBlock(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error())
}
verify := func() {
c := log.Cursor("cpu,host=A", tsdb.Forward)
_, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatal("p1 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatal("p2 value wrong")
}
_, v = c.Next()
if v != nil {
t.Fatal("expected cursor to return nil")
}
c := log.Cursor("cpu,host=A", tsdb.Forward)
_, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatal("p1 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatal("p2 value wrong")
}
_, v = c.Next()
if v != nil {
t.Fatal("expected cursor to return nil")
}
verify()
// now write junk data and ensure that we can close, re-open and read
f := log.partitions[1].currentSegmentFile
f := log.partition.currentSegmentFile
f.Write(u64tob(23))
// now write a bunch of garbage
for i := 0; i < 1000; i++ {
@ -257,51 +259,6 @@ func TestWAL_CorruptDataBlock(t *testing.T) {
f.Sync()
log.Close()
log.Open()
verify()
// now write new data and ensure it's all good
p3 := parsePoint("cpu,host=A value=29.2 6", codec)
if err := log.WritePoints([]tsdb.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write point: %s", err.Error())
}
verify = func() {
c := log.Cursor("cpu,host=A", tsdb.Forward)
_, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatal("p1 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatal("p2 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p3.Data()) != 0 {
t.Fatal("p3 value wrong", p3.Data(), v)
}
}
verify()
log.Close()
log.Open()
verify()
}
// Ensure the wal flushes and compacts after a partition has enough series in
// it with enough data to flush
func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
log := openTestWAL()
log.partitionCount = 2
log.CompactionThreshold = 0.7
log.ReadySeriesSize = 1024
// set this high so that a flush doesn't automatically kick in and mess up our test
log.flushCheckInterval = time.Minute
defer log.Close()
defer os.RemoveAll(log.path)
points := make([]map[string][][]byte, 0)
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
@ -309,99 +266,38 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
return nil
}}
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
numSeries := 100
b := make([]byte, 70*5000)
for i := 1; i <= 100; i++ {
buf := bytes.NewBuffer(b)
for j := 1; j <= numSeries; j++ {
buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i))
}
// ensure that before we go over the threshold it isn't marked for flushing
if i < 50 {
// interleave data for some series that won't be ready to flush
buf.WriteString(fmt.Sprintf("cpu,host=A,region=useast1 value=%.3f %d\n", rand.Float64(), i))
buf.WriteString(fmt.Sprintf("cpu,host=A,region=useast3 value=%.3f %d\n", rand.Float64(), i))
// ensure that as a whole its not ready for flushing yet
if log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold) != noFlush {
t.Fatal("expected partition 1 to return false from shouldFlush")
}
}
// write the batch out
if err := log.WritePoints(parsePoints(buf.String(), codec), nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
buf = bytes.NewBuffer(b)
}
// ensure we have some data
c := log.Cursor("cpu,host=A,region=uswest23", tsdb.Forward)
k, v := c.Next()
if btou64(k) != 1 {
t.Fatalf("expected timestamp of 1, but got %v %v", k, v)
}
// ensure it is marked as should flush because of the threshold
if log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold) != thresholdFlush {
t.Fatal("expected partition 1 to return true from shouldFlush")
}
if err := log.partitions[1].flushAndCompact(thresholdFlush); err != nil {
t.Fatalf("error flushing and compacting: %s", err.Error())
}
// should be nil
c = log.Cursor("cpu,host=A,region=uswest23", tsdb.Forward)
k, v = c.Next()
if k != nil || v != nil {
t.Fatal("expected cache to be nil after flush: ", k, v)
}
c = log.Cursor("cpu,host=A,region=useast1", tsdb.Forward)
k, v = c.Next()
if btou64(k) != 1 {
t.Fatal("expected cache to be there after flush and compact: ", k, v)
}
if len(points) == 0 {
t.Fatal("expected points to be flushed to index")
}
// now close and re-open the wal and ensure the compacted data is gone and other data is still there
log.Close()
log.Open()
c = log.Cursor("cpu,host=A,region=uswest23", tsdb.Forward)
k, v = c.Next()
if k != nil || v != nil {
t.Fatal("expected cache to be nil after flush and re-open: ", k, v)
p := points[0]
if len(p["cpu,host=A"]) != 2 {
t.Fatal("expected two points for cpu,host=A")
}
c = log.Cursor("cpu,host=A,region=useast1", tsdb.Forward)
k, v = c.Next()
if btou64(k) != 1 {
t.Fatal("expected cache to be there after flush and compact: ", k, v)
// now write new data and ensure it's all good
p3 := parsePoint("cpu,host=A value=29.2 6", codec)
if err := log.WritePoints([]tsdb.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write point: %s", err.Error())
}
c = log.Cursor("cpu,host=A", tsdb.Forward)
_, v = c.Next()
if bytes.Compare(v, p3.Data()) != 0 {
t.Fatal("p3 value wrong", p3.Data(), v)
}
log.Close()
points = make([]map[string][][]byte, 0)
log.Open()
p = points[0]
if len(p["cpu,host=A"]) != 1 {
t.Fatal("expected two points for cpu,host=A")
}
}
// Ensure the wal forces a full flush after not having a write in a given interval of time
func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) {
log := openTestWAL()
log.partitionCount = 1
// set this low
log.flushCheckInterval = 10 * time.Millisecond
@ -453,16 +349,16 @@ func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) {
time.Sleep(700 * time.Millisecond)
// ensure that as a whole its not ready for flushing yet
if f := log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold); f != noFlush {
if f := log.partition.shouldFlush(); f != noFlush {
t.Fatalf("expected partition 1 to return noFlush from shouldFlush %v", f)
}
// ensure that the partition is empty
if log.partitions[1].memorySize != 0 || len(log.partitions[1].cache) != 0 {
if log.partition.memorySize != 0 || len(log.partition.cache) != 0 {
t.Fatal("expected partition to be empty")
}
// ensure that we didn't bother to open a new segment file
if log.partitions[1].currentSegmentFile != nil {
if log.partition.currentSegmentFile != nil {
t.Fatal("expected partition to not have an open segment file")
}
}
@ -592,10 +488,6 @@ func TestWAL_DeleteSeries(t *testing.T) {
defer log.Close()
defer os.RemoveAll(log.path)
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
@ -605,7 +497,9 @@ func TestWAL_DeleteSeries(t *testing.T) {
})
var seriesToIndex []*tsdb.SeriesCreate
var points map[string][][]byte
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
points = pointsByKey
seriesToIndex = append(seriesToIndex, seriesToCreate...)
return nil
}}
@ -615,6 +509,10 @@ func TestWAL_DeleteSeries(t *testing.T) {
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "B"})), map[string]string{"host": "B"})},
}
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
// test that we can write to two different series
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
p2 := parsePoint("cpu,host=B value=0.9 2", codec)
@ -641,9 +539,15 @@ func TestWAL_DeleteSeries(t *testing.T) {
}
// ensure data is there
if len(points["cpu,host=A"]) != 2 {
t.Fatal("expected cpu,host=A to be flushed to the index")
}
if len(points["cpu,host=B"]) != 0 {
t.Fatal("expected cpu,host=B to have no points in index")
}
c = log.Cursor("cpu,host=A", tsdb.Forward)
if k, _ := c.Next(); btou64(k) != 1 {
t.Fatal("expected data point for cpu,host=A")
if k, _ := c.Next(); k != nil {
t.Fatal("expected data to be out of the cache cpu,host=A")
}
// ensure series is deleted
@ -670,111 +574,19 @@ func TestWAL_DeleteSeries(t *testing.T) {
t.Fatalf("error closing log: %s", err.Error())
}
points = make(map[string][][]byte)
if err := log.Open(); err != nil {
t.Fatalf("error opening log: %s", err.Error())
}
// ensure data is there
c = log.Cursor("cpu,host=A", tsdb.Forward)
if k, _ := c.Next(); btou64(k) != 1 {
t.Fatal("expected data point for cpu,host=A")
}
// ensure series is deleted
c = log.Cursor("cpu,host=B", tsdb.Forward)
if k, _ := c.Next(); k != nil {
t.Fatal("expected no data for cpu,host=B")
}
}
// Ensure a partial compaction can be recovered from.
func TestWAL_Compact_Recovery(t *testing.T) {
log := openTestWAL()
log.partitionCount = 1
log.CompactionThreshold = 0.7
log.ReadySeriesSize = 1024
log.flushCheckInterval = time.Minute
defer log.Close()
defer os.RemoveAll(log.path)
points := make([]map[string][][]byte, 0)
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
points = append(points, pointsByKey)
return nil
}}
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
// Retrieve partition.
p := log.partitions[1]
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
b := make([]byte, 70*5000)
for i := 1; i <= 100; i++ {
buf := bytes.NewBuffer(b)
for j := 1; j <= 1000; j++ {
buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i))
}
buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", rand.Int(), rand.Float64(), i))
// Write the batch out.
if err := log.WritePoints(parsePoints(buf.String(), codec), nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
}
// Mock second open call to fail.
p.os.OpenSegmentFile = func(name string, flag int, perm os.FileMode) (file *os.File, err error) {
if filepath.Base(name) == "01.000001.wal" {
return os.OpenFile(name, flag, perm)
}
return nil, errors.New("marker")
}
if err := p.flushAndCompact(thresholdFlush); err == nil || err.Error() != "marker" {
t.Fatalf("unexpected flush error: %s", err)
}
p.os.OpenSegmentFile = os.OpenFile
// Append second file to simulate partial write.
func() {
f, err := os.OpenFile(p.compactionFileName(), os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
t.Fatal(err)
}
defer f.Close()
// Append filename and partial data.
if err := p.writeCompactionEntry(f, "01.000002.wal", []*entry{{key: []byte("foo"), data: []byte("bar"), timestamp: 100}}); err != nil {
t.Fatal(err)
}
// Truncate by a few bytes.
if fi, err := f.Stat(); err != nil {
t.Fatal(err)
} else if err = f.Truncate(fi.Size() - 2); err != nil {
t.Fatal(err)
}
}()
// Now close and re-open the wal and ensure there are no errors.
log.Close()
if err := log.Open(); err != nil {
t.Fatalf("unexpected open error: %s", err)
// ensure data wasn't flushed on open
if len(points) != 0 {
t.Fatal("expected no data to be flushed on open")
}
}
func TestWAL_QueryDuringCompaction(t *testing.T) {
log := openTestWAL()
log.partitionCount = 1
defer log.Close()
defer os.RemoveAll(log.path)
@ -915,98 +727,6 @@ func TestWAL_Cursor_Reverse(t *testing.T) {
}
}
// test that partitions get compacted and flushed when number of series hits compaction threshold
// test that partitions get compacted and flushed when a single series hits the compaction threshold
// test that writes slow down when the partition size threshold is hit
// func TestWAL_MultipleSegments(t *testing.T) {
// runtime.GOMAXPROCS(8)
// log := openTestWAL()
// defer log.Close()
// defer os.RemoveAll(log.path)
// log.PartitionSizeThreshold = 1024 * 1024 * 100
// flushCount := 0
// log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte) error {
// flushCount += 1
// fmt.Println("FLUSH: ", len(pointsByKey))
// return nil
// }}
// if err := log.Open(); err != nil {
// t.Fatalf("couldn't open wal: ", err.Error())
// }
// codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
// "value": {
// ID: uint8(1),
// Name: "value",
// Type: influxql.Float,
// },
// })
// startTime := time.Now()
// numSeries := 5000
// perPost := 5000
// b := make([]byte, 70*5000)
// totalPoints := 0
// for i := 1; i <= 10000; i++ {
// fmt.Println("WRITING: ", i*numSeries)
// n := 0
// buf := bytes.NewBuffer(b)
// var wg sync.WaitGroup
// for j := 1; j <= numSeries; j++ {
// totalPoints += 1
// n += 1
// buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i))
// if n >= perPost {
// go func(b string) {
// wg.Add(1)
// if err := log.WritePoints(parsePoints(b, codec)); err != nil {
// t.Fatalf("failed to write points: %s", err.Error())
// }
// wg.Done()
// }(buf.String())
// buf = bytes.NewBuffer(b)
// n = 0
// }
// }
// wg.Wait()
// }
// fmt.Println("PATH: ", log.path)
// dur := time.Now().Sub(startTime)
// fmt.Println("TIME TO WRITE: ", totalPoints, dur, float64(totalPoints)/dur.Seconds())
// fmt.Println("FLUSH COUNT: ", flushCount)
// for _, p := range log.partitions {
// fmt.Println("SIZE: ", p.memorySize/1024/1024)
// }
// max := 0
// for _, p := range log.partitions {
// for k, s := range p.cacheSizes {
// if s > max {
// fmt.Println(k, s)
// max = s
// }
// }
// }
// fmt.Println("CLOSING")
// log.Close()
// fmt.Println("TEST OPENING")
// startTime = time.Now()
// log.Open()
// fmt.Println("TIME TO OPEN: ", time.Now().Sub(startTime))
// for _, p := range log.partitions {
// fmt.Println("SIZE: ", p.memorySize)
// }
// c := log.Cursor("cpu,host=A,region=uswest10")
// k, v := c.Seek(inttob(23))
// fmt.Println("VALS: ", k, v)
// time.Sleep(time.Minute)
// }
type testIndexWriter struct {
fn func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
}