Merge pull request #4900 from influxdb/jw-compact

WAL segment compaction
pull/4902/head
Jason Wilder 2015-11-24 21:35:13 -07:00
commit d931f5dd22
12 changed files with 1940 additions and 106 deletions

View File

@ -101,6 +101,17 @@ func u64tob(v uint64) []byte {
return b
}
func btou32(b []byte) uint32 {
return binary.BigEndian.Uint32(b)
}
// u32tob converts a uint32 into an 4-byte slice.
func u32tob(v uint32) []byte {
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, v)
return b
}
// ShardIDs is a collection of UINT 64 that represent shard ids.
type ShardIDs []uint64

View File

@ -16,7 +16,8 @@ Displays detailed information about InfluxDB data files.
println(`Commands:
info - displays series meta-data for all shards. Default location [$HOME/.influxdb]
dumptsm - dumps low-level details about tsm1 files.`)
dumptsm - dumps low-level details about tsm1 files.
dumptsmdev - dumps low-level details about tsm1dev files.`)
println()
}
@ -80,6 +81,38 @@ func main() {
opts.dumpBlocks = opts.dumpBlocks || dumpAll || opts.filterKey != ""
opts.dumpIndex = opts.dumpIndex || dumpAll || opts.filterKey != ""
cmdDumpTsm1(opts)
case "dumptsmdev":
var dumpAll bool
opts := &tsdmDumpOpts{}
fs := flag.NewFlagSet("file", flag.ExitOnError)
fs.BoolVar(&opts.dumpIndex, "index", false, "Dump raw index data")
fs.BoolVar(&opts.dumpBlocks, "blocks", false, "Dump raw block data")
fs.BoolVar(&dumpAll, "all", false, "Dump all data. Caution: This may print a lot of information")
fs.StringVar(&opts.filterKey, "filter-key", "", "Only display index and block data match this key substring")
fs.Usage = func() {
println("Usage: influx_inspect dumptsm [options] <path>\n\n Dumps low-level details about tsm1 files.")
println()
println("Options:")
fs.PrintDefaults()
os.Exit(0)
}
if err := fs.Parse(flag.Args()[1:]); err != nil {
fmt.Printf("%v", err)
os.Exit(1)
}
if len(fs.Args()) == 0 || fs.Args()[0] == "" {
fmt.Printf("TSM file not specified\n\n")
fs.Usage()
fs.PrintDefaults()
os.Exit(1)
}
opts.path = fs.Args()[0]
opts.dumpBlocks = opts.dumpBlocks || dumpAll || opts.filterKey != ""
opts.dumpIndex = opts.dumpIndex || dumpAll || opts.filterKey != ""
cmdDumpTsm1dev(opts)
default:
flag.Usage()
os.Exit(1)

View File

@ -441,3 +441,196 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
println()
}
}
func cmdDumpTsm1dev(opts *tsdmDumpOpts) {
var errors []error
f, err := os.Open(opts.path)
if err != nil {
println(err.Error())
os.Exit(1)
}
// Get the file size
stat, err := f.Stat()
if err != nil {
println(err.Error())
os.Exit(1)
}
b := make([]byte, 8)
r, err := tsm1.NewTSMReader(f)
if err != nil {
println("Error opening TSM files: ", err.Error())
}
defer r.Close()
minTime, maxTime := r.TimeRange()
keys := r.Keys()
blockStats := &blockStats{}
println("Summary:")
fmt.Printf(" File: %s\n", opts.path)
fmt.Printf(" Time Range: %s - %s\n",
minTime.UTC().Format(time.RFC3339Nano),
maxTime.UTC().Format(time.RFC3339Nano),
)
fmt.Printf(" Duration: %s ", maxTime.Sub(minTime))
fmt.Printf(" Series: %d ", len(keys))
fmt.Printf(" File Size: %d\n", stat.Size())
println()
tw := tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', 0)
fmt.Fprintln(tw, " "+strings.Join([]string{"Pos", "Min Time", "Max Time", "Ofs", "Size", "Key", "Field"}, "\t"))
var pos int
for _, key := range keys {
for _, e := range r.Entries(key) {
pos++
split := strings.Split(key, "#!~#")
// We dont' know know if we have fields so use an informative default
var measurement, field string = "UNKNOWN", "UNKNOWN"
// Possible corruption? Try to read as much as we can and point to the problem.
measurement = split[0]
field = split[1]
if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) {
continue
}
fmt.Fprintln(tw, " "+strings.Join([]string{
strconv.FormatInt(int64(pos), 10),
e.MinTime.UTC().Format(time.RFC3339Nano),
e.MaxTime.UTC().Format(time.RFC3339Nano),
strconv.FormatInt(int64(e.Offset), 10),
strconv.FormatInt(int64(e.Size), 10),
measurement,
field,
}, "\t"))
}
}
if opts.dumpIndex {
println("Index:")
tw.Flush()
println()
}
tw = tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', 0)
fmt.Fprintln(tw, " "+strings.Join([]string{"Blk", "Chk", "Ofs", "Len", "Type", "Min Time", "Points", "Enc [T/V]", "Len [T/V]"}, "\t"))
// Starting at 5 because the magic number is 4 bytes + 1 byte version
i := int64(5)
var blockCount, pointCount, blockSize int64
indexSize := r.IndexSize()
// Start at the beginning and read every block
for _, key := range keys {
for _, e := range r.Entries(key) {
f.Seek(int64(e.Offset), 0)
f.Read(b[:4])
chksum := btou32(b)
buf := make([]byte, e.Size)
f.Read(buf)
blockSize += int64(len(buf)) + 4
startTime := time.Unix(0, int64(btou64(buf[:8])))
blockType := buf[8]
encoded := buf[9:]
var v []tsm1.Value
v, err := tsm1.DecodeBlock(buf, v)
if err != nil {
fmt.Printf("error: %v\n", err.Error())
os.Exit(1)
}
pointCount += int64(len(v))
// Length of the timestamp block
tsLen, j := binary.Uvarint(encoded)
// Unpack the timestamp bytes
ts := encoded[int(j) : int(j)+int(tsLen)]
// Unpack the value bytes
values := encoded[int(j)+int(tsLen):]
tsEncoding := timeEnc[int(ts[0]>>4)]
vEncoding := encDescs[int(blockType+1)][values[0]>>4]
typeDesc := blockTypes[blockType]
blockStats.inc(0, ts[0]>>4)
blockStats.inc(int(blockType+1), values[0]>>4)
blockStats.size(len(buf))
if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) {
i += (4 + int64(e.Size))
blockCount++
continue
}
fmt.Fprintln(tw, " "+strings.Join([]string{
strconv.FormatInt(blockCount, 10),
strconv.FormatUint(uint64(chksum), 10),
strconv.FormatInt(i, 10),
strconv.FormatInt(int64(len(buf)), 10),
typeDesc,
startTime.UTC().Format(time.RFC3339Nano),
strconv.FormatInt(int64(len(v)), 10),
fmt.Sprintf("%s/%s", tsEncoding, vEncoding),
fmt.Sprintf("%d/%d", len(ts), len(values)),
}, "\t"))
i += (4 + int64(e.Size))
blockCount++
}
}
if opts.dumpBlocks {
println("Blocks:")
tw.Flush()
println()
}
fmt.Printf("Statistics\n")
fmt.Printf(" Blocks:\n")
fmt.Printf(" Total: %d Size: %d Min: %d Max: %d Avg: %d\n",
blockCount, blockSize, blockStats.min, blockStats.max, blockSize/blockCount)
fmt.Printf(" Index:\n")
fmt.Printf(" Total: %d Size: %d\n", blockCount, indexSize)
fmt.Printf(" Points:\n")
fmt.Printf(" Total: %d", pointCount)
println()
println(" Encoding:")
for i, counts := range blockStats.counts {
if len(counts) == 0 {
continue
}
fmt.Printf(" %s: ", strings.Title(fieldType[i]))
for j, v := range counts {
fmt.Printf("\t%s: %d (%d%%) ", encDescs[i][j], v, int(float64(v)/float64(blockCount)*100))
}
println()
}
fmt.Printf(" Compression:\n")
fmt.Printf(" Per block: %0.2f bytes/point\n", float64(blockSize)/float64(pointCount))
fmt.Printf(" Total: %0.2f bytes/point\n", float64(stat.Size())/float64(pointCount))
if len(errors) > 0 {
println()
fmt.Printf("Errors (%d):\n", len(errors))
for _, err := range errors {
fmt.Printf(" * %v\n", err)
}
println()
}
}

View File

@ -47,6 +47,7 @@ const (
B1Format EngineFormat = iota
BZ1Format
TSM1Format
TSM1DevFormat
)
// NewEngineFunc creates a new engine.

298
tsdb/engine/tsm1/compact.go Normal file
View File

@ -0,0 +1,298 @@
package tsm1
// Compactions are the process of creating read-optimized TSM files.
// The files are created by converting write-optimized WAL entries
// to read-optimized TSM format. They can also be created from existing
// TSM files when there are tombstone records that neeed to be removed, points
// that were overwritten by later writes and need to updated, or multiple
// smaller TSM files need to be merged to reduce file counts and improve
// compression ratios.
//
// The the compaction process is stream-oriented using multiple readers and
// iterators. The resulting stream is written sorted and chunked to allow for
// one-pass writing of a new TSM file.
import (
"fmt"
"os"
"path/filepath"
"sort"
)
var errMaxFileExceeded = fmt.Errorf("max file exceeded")
// Compactor merges multiple WAL segments and TSM files into one or more
// new TSM files.
type Compactor struct {
Dir string
MaxFileSize int
currentID int
merge *MergeIterator
}
// Compact converts WAL segements and TSM files into new TSM files.
func (c *Compactor) Compact(walSegments []string) ([]string, error) {
var walReaders []*WALSegmentReader
// For each segment, create a reader to iterate over each WAL entry
for _, path := range walSegments {
f, err := os.Open(path)
if err != nil {
return nil, err
}
r := NewWALSegmentReader(f)
defer r.Close()
walReaders = append(walReaders, r)
}
// WALKeyIterator allows all the segments to be ordered by key and
// sorted values during compaction.
walKeyIterator, err := NewWALKeyIterator(walReaders...)
if err != nil {
return nil, err
}
// Merge iterator combines the WAL and TSM iterators (note: TSM iteration is
// not in place yet). It will also chunk the values into 1000 element blocks.
c.merge = NewMergeIterator(walKeyIterator, 1000)
defer c.merge.Close()
// These are the new TSM files written
var files []string
for {
// TODO: this needs to be intialized based on the existing files on disk
c.currentID++
// New TSM files are written to a temp file and renamed when fully completed.
fileName := filepath.Join(c.Dir, fmt.Sprintf("%07d.%s.tmp", c.currentID, Format))
// Write as much as possible to this file
err := c.write(fileName)
// We've hit the max file limit and there is more to write. Create a new file
// and continue.
if err == errMaxFileExceeded {
files = append(files, fileName)
continue
}
// We hit an error but didn't finish the compaction. Remove the temp file and abort.
if err != nil {
os.RemoveAll(fileName)
return nil, err
}
files = append(files, fileName)
break
}
c.merge = nil
return files, nil
}
func (c *Compactor) write(path string) error {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
// Create the write for the new TSM file.
w, err := NewTSMWriter(fd)
if err != nil {
return err
}
for c.merge.Next() {
// Each call to read returns the next sorted key (or the prior one if there are
// more values to write). The size of values will be less than or equal to our
// chunk size (1000)
key, values, err := c.merge.Read()
if err != nil {
return err
}
// Write the key and value
if err := w.Write(key, values); err != nil {
return err
}
// If we have a max file size configured and we're over it, close out the file
// and return the error.
if c.MaxFileSize != 0 && w.Size() > c.MaxFileSize {
if err := w.WriteIndex(); err != nil {
return err
}
if err := w.Close(); err != nil {
return err
}
return errMaxFileExceeded
}
}
// We're all done. Close out the file.
if err := w.WriteIndex(); err != nil {
return err
}
if err := w.Close(); err != nil {
return err
}
return nil
}
// MergeIterator merges multiple KeyIterators while chunking each read call
// into a fixed size. Each iteration, the lowest lexicographically ordered
// key is returned with the next set of values for that key ordered by time. Values
// with identical times are overwitten by the WAL KeyIterator.
//
// Moving through the full iteration cycle will result in sorted, unique, chunks of values
// up to a max size. Each key returned will be greater than or equal to the prior
// key returned.
type MergeIterator struct {
// wal is the iterator for multiple WAL segments combined
wal KeyIterator
// size is the maximum value of a chunk to return
size int
// key is the current iteration series key
key string
// walBuf is the remaining values from the last wal Read call
walBuf []Value
// chunk is the current set of values that will be returned by Read
chunk []Value
// err is any error returned by an underlying iterator to be returned by Read
err error
}
func (m *MergeIterator) Next() bool {
// Prime the wal buffer if possible
if len(m.walBuf) == 0 && m.wal.Next() {
k, v, err := m.wal.Read()
m.key = k
m.err = err
m.walBuf = v
}
// Move size elements into the current chunk and slice the same
// amount off of the wal buffer.
if m.size < len(m.walBuf) {
m.chunk = m.walBuf[:m.size]
m.walBuf = m.walBuf[m.size:]
} else {
m.chunk = m.walBuf
m.walBuf = m.walBuf[:0]
}
return len(m.chunk) > 0
}
func (m *MergeIterator) Read() (string, []Value, error) {
return m.key, m.chunk, m.err
}
func (m *MergeIterator) Close() error {
m.walBuf = nil
m.chunk = nil
return m.wal.Close()
}
func NewMergeIterator(WAL KeyIterator, size int) *MergeIterator {
m := &MergeIterator{
wal: WAL,
size: size,
}
return m
}
// KeyIterator allows iteration over set of keys and values in sorted order.
type KeyIterator interface {
Next() bool
Read() (string, []Value, error)
Close() error
}
// walKeyIterator allows WAL segments to be iterated over in sorted order.
type walKeyIterator struct {
k string
Order []string
Series map[string]Values
}
func (k *walKeyIterator) Next() bool {
if len(k.Order) == 0 {
return false
}
k.k = k.Order[0]
k.Order = k.Order[1:]
return true
}
func (k *walKeyIterator) Read() (string, []Value, error) {
return k.k, k.Series[k.k], nil
}
func (k *walKeyIterator) Close() error {
k.Order = nil
k.Series = nil
return nil
}
func NewWALKeyIterator(readers ...*WALSegmentReader) (KeyIterator, error) {
series := map[string]Values{}
order := []string{}
// Iterate over each reader in order. Later readers will overwrite earlier ones if values
// overlap.
for _, r := range readers {
for r.Next() {
entry, err := r.Read()
if err != nil {
return nil, err
}
switch t := entry.(type) {
case *WriteWALEntry:
// Each point needs to be decomposed from a time with multiple fields, to a time, value tuple
for k, v := range t.Values {
// Just append each point as we see it. Dedup and sorting happens later.
series[k] = append(series[k], v...)
}
case *DeleteWALEntry:
// Each key is a series, measurement + tagset string
for _, k := range t.Keys {
// seriesKey is specific to a field, measurment + tagset string + sep + field name
for seriesKey := range series {
// If the delete series key matches the portion before the separator, we delete what we have
if k == seriesKey {
delete(series, seriesKey)
}
}
}
}
}
}
// Need to create the order that we'll iterate over (sorted key), as well as
// sort and dedup all the points for each key.
for k, v := range series {
order = append(order, k)
series[k] = v.Deduplicate()
}
sort.Strings(order)
return &walKeyIterator{
Series: series,
Order: order,
}, nil
}

View File

@ -0,0 +1,878 @@
package tsm1_test
import (
"fmt"
"os"
"testing"
"time"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)
// Tests that a single WAL segment can be read and iterated over
func TestKeyIterator_WALSegment_Single(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(time.Unix(1, 0), 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1},
}
entries := []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: writes,
},
}
r := MustWALSegment(dir, entries)
iter, err := tsm1.NewWALKeyIterator(r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var readValues bool
for iter.Next() {
key, values, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
if got, exp := key, "cpu,host=A#!~#value"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), len(writes); got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
for _, v := range values {
readValues = true
assertValueEqual(t, v, v1)
}
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
// // Tests that duplicate point values are merged
func TestKeyIterator_WALSegment_Duplicate(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(time.Unix(1, 0), int64(1))
v2 := tsm1.NewValue(time.Unix(1, 0), int64(2))
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1, v2},
}
entries := []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: writes,
},
}
r := MustWALSegment(dir, entries)
iter, err := tsm1.NewWALKeyIterator(r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var readValues bool
for iter.Next() {
key, values, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
if got, exp := key, "cpu,host=A#!~#value"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), 1; got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
readValues = true
assertValueEqual(t, values[0], v2)
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
// // Tests that a multiple WAL segment can be read and iterated over
func TestKeyIterator_WALSegment_Multiple(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(time.Unix(1, 0), int64(1))
points1 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1},
}
entries := []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points1,
},
}
r1 := MustWALSegment(dir, entries)
v2 := tsm1.NewValue(time.Unix(2, 0), int64(2))
points2 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v2},
}
entries = []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points2,
},
}
r2 := MustWALSegment(dir, entries)
iter, err := tsm1.NewWALKeyIterator(r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var readValues bool
for iter.Next() {
key, values, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
if got, exp := key, "cpu,host=A#!~#value"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), 2; got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
readValues = true
assertValueEqual(t, values[0], v1)
assertValueEqual(t, values[1], v2)
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
// // Tests that a multiple WAL segments with out of order points are
// // sorted while iterating
func TestKeyIterator_WALSegment_MultiplePointsSorted(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(time.Unix(2, 0), int64(2))
points1 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1},
}
entries := []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points1,
},
}
r1 := MustWALSegment(dir, entries)
v2 := tsm1.NewValue(time.Unix(1, 0), int64(1))
points2 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v2},
}
entries = []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points2,
},
}
r2 := MustWALSegment(dir, entries)
iter, err := tsm1.NewWALKeyIterator(r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var readValues bool
for iter.Next() {
key, values, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
if got, exp := key, "cpu,host=A#!~#value"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), 2; got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
readValues = true
assertValueEqual(t, values[0], v2)
assertValueEqual(t, values[1], v1)
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
// // Tests that multiple keys are iterated over in sorted order
func TestKeyIterator_WALSegment_MultipleKeysSorted(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(time.Unix(1, 0), float64(1))
points1 := map[string][]tsm1.Value{
"cpu,host=B#!~#value": []tsm1.Value{v1},
}
entries := []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points1,
},
}
r1 := MustWALSegment(dir, entries)
v2 := tsm1.NewValue(time.Unix(1, 0), float64(1))
points2 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v2},
}
entries = []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points2,
},
}
r2 := MustWALSegment(dir, entries)
iter, err := tsm1.NewWALKeyIterator(r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var readValues bool
var data = []struct {
key string
value tsm1.Value
}{
{"cpu,host=A#!~#value", v2},
{"cpu,host=B#!~#value", v1},
}
for iter.Next() {
key, values, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
if got, exp := key, data[0].key; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), 1; got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
readValues = true
assertValueEqual(t, values[0], data[0].value)
data = data[1:]
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
// // Tests that deletes after writes removes the previous written values
func TestKeyIterator_WALSegment_MultipleKeysDeleted(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(time.Unix(1, 0), float64(1))
points1 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1},
}
entries := []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points1,
},
}
r1 := MustWALSegment(dir, entries)
v2 := tsm1.NewValue(time.Unix(1, 0), float64(1))
v3 := tsm1.NewValue(time.Unix(1, 0), float64(1))
points2 := map[string][]tsm1.Value{
"cpu,host=A#!~#count": []tsm1.Value{v2},
"cpu,host=B#!~#value": []tsm1.Value{v3},
}
entries = []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points2,
},
&tsm1.DeleteWALEntry{
Keys: []string{
"cpu,host=A#!~#count",
"cpu,host=A#!~#value",
},
},
}
r2 := MustWALSegment(dir, entries)
iter, err := tsm1.NewWALKeyIterator(r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var readValues bool
var data = []struct {
key string
value tsm1.Value
}{
{"cpu,host=B#!~#value", v3},
}
for iter.Next() {
key, values, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
if got, exp := key, data[0].key; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), 1; got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
readValues = true
assertValueEqual(t, values[0], data[0].value)
data = data[1:]
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
// // Tests that writes, deletes followed by more writes returns the the
// // correct values.
func TestKeyIterator_WALSegment_WriteAfterDelete(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(time.Unix(1, 0), float64(1))
points1 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1},
}
entries := []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points1,
},
}
r1 := MustWALSegment(dir, entries)
v2 := tsm1.NewValue(time.Unix(1, 0), float64(1))
v3 := tsm1.NewValue(time.Unix(1, 0), float64(1))
points2 := map[string][]tsm1.Value{
"cpu,host=A#!~#count": []tsm1.Value{v2},
"cpu,host=B#!~#value": []tsm1.Value{v3},
}
entries = []tsm1.WALEntry{
&tsm1.DeleteWALEntry{
Keys: []string{
"cpu,host=A#!~#count",
"cpu,host=A#!~#value",
},
},
&tsm1.WriteWALEntry{
Values: points2,
},
}
r2 := MustWALSegment(dir, entries)
iter, err := tsm1.NewWALKeyIterator(r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
var readValues bool
var data = []struct {
key string
value tsm1.Value
}{
{"cpu,host=A#!~#count", v2},
{"cpu,host=B#!~#value", v3},
}
for iter.Next() {
key, values, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error read: %v", err)
}
if got, exp := key, data[0].key; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), 1; got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
readValues = true
assertValueEqual(t, values[0], data[0].value)
data = data[1:]
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
// // Tests that merge iterator over a wal returns points order correctly.
func TestMergeIteragor_Single(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(time.Unix(1, 0), float64(1))
v2 := tsm1.NewValue(time.Unix(2, 0), float64(2))
points := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1, v2},
}
entries := []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points,
},
}
r := MustWALSegment(dir, entries)
iter, err := tsm1.NewWALKeyIterator(r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
// Read should return a chunk of 1 value
m := tsm1.NewMergeIterator(iter, 1)
var readValues bool
for _, p := range points {
if !m.Next() {
t.Fatalf("expected next, got false")
}
key, values, err := m.Read()
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := key, "cpu,host=A#!~#value"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), 1; got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
readValues = true
assertValueEqual(t, values[0], p[0])
}
if !readValues {
t.Fatalf("failed to read any values")
}
}
// // Tests that merge iterator over a wal returns points order by key and time.
func TestMergeIteragor_MultipleKeys(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(time.Unix(1, 0), float64(1))
v2 := tsm1.NewValue(time.Unix(1, 0), float64(1))
v3 := tsm1.NewValue(time.Unix(2, 0), float64(2))
v4 := tsm1.NewValue(time.Unix(2, 0), float64(2))
v5 := tsm1.NewValue(time.Unix(1, 0), float64(3)) // overwrites p1
points1 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1, v3},
"cpu,host=B#!~#value": []tsm1.Value{v2},
}
points2 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v5},
"cpu,host=B#!~#value": []tsm1.Value{v4},
}
entries := []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points1,
},
&tsm1.WriteWALEntry{
Values: points2,
},
}
r := MustWALSegment(dir, entries)
iter, err := tsm1.NewWALKeyIterator(r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
m := tsm1.NewMergeIterator(iter, 2)
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{v5, v3}},
{"cpu,host=B#!~#value", []tsm1.Value{v2, v4}},
}
for _, p := range data {
if !m.Next() {
t.Fatalf("expected next, got false")
}
key, values, err := m.Read()
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := key, p.key; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
}
// // Tests that the merge iterator does not pull in deleted WAL entries.
func TestMergeIteragor_DeletedKeys(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(time.Unix(1, 0), float64(1))
v2 := tsm1.NewValue(time.Unix(1, 0), float64(1))
v3 := tsm1.NewValue(time.Unix(2, 0), float64(2))
v4 := tsm1.NewValue(time.Unix(2, 0), float64(2))
v5 := tsm1.NewValue(time.Unix(1, 0), float64(3)) // overwrites p1
points1 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1, v3},
"cpu,host=B#!~#value": []tsm1.Value{v2},
}
points2 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v5},
"cpu,host=B#!~#value": []tsm1.Value{v4},
}
entries := []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points1,
},
&tsm1.WriteWALEntry{
Values: points2,
},
&tsm1.DeleteWALEntry{
Keys: []string{"cpu,host=A#!~#value"},
},
}
r := MustWALSegment(dir, entries)
iter, err := tsm1.NewWALKeyIterator(r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
m := tsm1.NewMergeIterator(iter, 2)
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=B#!~#value", []tsm1.Value{v2, v4}},
}
for _, p := range data {
if !m.Next() {
t.Fatalf("expected next, got false")
}
key, values, err := m.Read()
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := key, p.key; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
}
// // Tests compacting a single wal segment into one tsm file
func TestCompactor_SingleWALSegment(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
v1 := tsm1.NewValue(time.Unix(1, 0), float64(1))
v2 := tsm1.NewValue(time.Unix(1, 0), float64(1))
v3 := tsm1.NewValue(time.Unix(2, 0), float64(2))
points1 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1},
"cpu,host=B#!~#value": []tsm1.Value{v2, v3},
}
entries := []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points1,
},
}
f := MustTempFile(dir)
defer f.Close()
w := tsm1.NewWALSegmentWriter(f)
for _, e := range entries {
if err := w.Write(e); err != nil {
t.Fatalf("unexpected error writing entry: %v", err)
}
}
compactor := &tsm1.Compactor{
Dir: dir,
}
files, err := compactor.Compact([]string{f.Name()})
if err != nil {
t.Fatalf("unexpected error compacting: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
f, err = os.Open(files[0])
if err != nil {
t.Fatalf("unexpected error openting tsm: %v", err)
}
r, err := tsm1.NewTSMReader(f)
if err != nil {
t.Fatalf("unexpected error creating tsm reader: %v", err)
}
keys := r.Keys()
if got, exp := len(keys), 2; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{v1}},
{"cpu,host=B#!~#value", []tsm1.Value{v2, v3}},
}
for _, p := range data {
values, err := r.ReadAll(p.key)
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
}
// // Tests compacting a multiple wal segment into one tsm file
func TestCompactor_MultipleWALSegment(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
// First WAL segment
v1 := tsm1.NewValue(time.Unix(1, 0), float64(1))
v2 := tsm1.NewValue(time.Unix(1, 0), float64(1))
v3 := tsm1.NewValue(time.Unix(2, 0), float64(2))
points1 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1, v3},
"cpu,host=B#!~#value": []tsm1.Value{v2},
}
entries := []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points1,
},
}
f1 := MustTempFile(dir)
defer f1.Close()
w := tsm1.NewWALSegmentWriter(f1)
for _, e := range entries {
if err := w.Write(e); err != nil {
t.Fatalf("unexpected error writing entry: %v", err)
}
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing writer: %v", err)
}
// Second WAL segment
v4 := tsm1.NewValue(time.Unix(2, 0), float64(2))
v5 := tsm1.NewValue(time.Unix(3, 0), float64(1))
v6 := tsm1.NewValue(time.Unix(4, 0), float64(1))
points2 := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v5, v6},
"cpu,host=B#!~#value": []tsm1.Value{v4},
}
entries = []tsm1.WALEntry{
&tsm1.WriteWALEntry{
Values: points2,
},
}
f2 := MustTempFile(dir)
defer f2.Close()
w = tsm1.NewWALSegmentWriter(f2)
for _, e := range entries {
if err := w.Write(e); err != nil {
t.Fatalf("unexpected error writing entry: %v", err)
}
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing writer: %v", err)
}
compactor := &tsm1.Compactor{
Dir: dir,
}
files, err := compactor.Compact([]string{f1.Name(), f2.Name()})
if err != nil {
t.Fatalf("unexpected error compacting: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}
f, err := os.Open(files[0])
if err != nil {
t.Fatalf("unexpected error openting tsm: %v", err)
}
defer f.Close()
r, err := tsm1.NewTSMReader(f)
if err != nil {
t.Fatalf("unexpected error creating tsm reader: %v", err)
}
defer r.Close()
keys := r.Keys()
if got, exp := len(keys), 2; got != exp {
t.Fatalf("keys length mismatch: got %v, exp %v", got, exp)
}
var data = []struct {
key string
points []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{v1, v3, v5, v6}},
{"cpu,host=B#!~#value", []tsm1.Value{v2, v4}},
}
for _, p := range data {
values, err := r.ReadAll(p.key)
if err != nil {
t.Fatalf("unexpected error reading: %v", err)
}
if got, exp := len(values), len(p.points); got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
for i, point := range p.points {
assertValueEqual(t, values[i], point)
}
}
}
func assertValueEqual(t *testing.T, a, b tsm1.Value) {
if got, exp := a.Time(), b.Time(); !got.Equal(exp) {
t.Fatalf("time mismatch: got %v, exp %v", got, exp)
}
if got, exp := a.Value(), b.Value(); got != exp {
t.Fatalf("value mismatch: got %v, exp %v", got, exp)
}
}
func assertEqual(t *testing.T, a tsm1.Value, b models.Point, field string) {
if got, exp := a.Time(), b.Time(); !got.Equal(exp) {
t.Fatalf("time mismatch: got %v, exp %v", got, exp)
}
if got, exp := a.Value(), b.Fields()[field]; got != exp {
t.Fatalf("value mismatch: got %v, exp %v", got, exp)
}
}
func MustWALSegment(dir string, entries []tsm1.WALEntry) *tsm1.WALSegmentReader {
f := MustTempFile(dir)
w := tsm1.NewWALSegmentWriter(f)
for _, e := range entries {
if err := w.Write(e); err != nil {
panic(fmt.Sprintf("write WAL entry: %v", err))
}
}
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
panic(fmt.Sprintf("seek WAL: %v", err))
}
return tsm1.NewWALSegmentReader(f)
}

View File

@ -67,6 +67,7 @@ import (
"fmt"
"hash/crc32"
"io"
"math"
"os"
"sort"
"sync"
@ -109,6 +110,9 @@ type TSMWriter interface {
// Closes any underlying file resources.
Close() error
// Size returns the current size in bytes of the file
Size() int
}
// TSMIndex represent the index section of a TSM file. The index records all
@ -139,6 +143,9 @@ type TSMIndex interface {
// Keys returns the unique set of keys in the index.
Keys() []string
// Size returns the size of a the current index in bytes
Size() int
// Type returns the block type of the values stored for the key. Returns one of
// BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist,
// an error is returned.
@ -367,6 +374,10 @@ func (d *directIndex) UnmarshalBinary(b []byte) error {
return nil
}
func (d *directIndex) Size() int {
return 0
}
// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This
// implementation can be used for indexes that may be MMAPed into memory.
type indirectIndex struct {
@ -597,6 +608,10 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
return nil
}
func (d *indirectIndex) Size() int {
return 0
}
// tsmWriter writes keys and values in the TSM format
type tsmWriter struct {
w io.Writer
@ -667,6 +682,10 @@ func (t *tsmWriter) Close() error {
return nil
}
func (t *tsmWriter) Size() int {
return int(t.n) + t.index.Size()
}
type tsmReader struct {
mu sync.Mutex
@ -879,6 +898,30 @@ func (t *tsmReader) Delete(key string) error {
return nil
}
// TimeRange returns the min and max time across all keys in the file.
func (t *tsmReader) TimeRange() (time.Time, time.Time) {
min, max := time.Unix(0, math.MaxInt64), time.Unix(0, math.MinInt64)
for _, k := range t.index.Keys() {
for _, e := range t.index.Entries(k) {
if e.MinTime.Before(min) {
min = e.MinTime
}
if e.MaxTime.After(max) {
max = e.MaxTime
}
}
}
return min, max
}
func (t *tsmReader) Entries(key string) []*IndexEntry {
return t.index.Entries(key)
}
func (t *tsmReader) IndexSize() int {
return t.index.Size()
}
type indexEntries struct {
Type byte
entries []*IndexEntry

View File

@ -32,6 +32,7 @@ type Value interface {
UnixNano() int64
Value() interface{}
Size() int
String() string
}
func NewValue(t time.Time, value interface{}) Value {
@ -55,6 +56,7 @@ func (e *EmptyValue) UnixNano() int64 { return tsdb.EOF }
func (e *EmptyValue) Time() time.Time { return time.Unix(0, tsdb.EOF) }
func (e *EmptyValue) Value() interface{} { return nil }
func (e *EmptyValue) Size() int { return 0 }
func (e *EmptyValue) String() string { return "" }
// Values represented a time ascending sorted collection of Value types.
// the underlying type should be the same across all values, but the interface
@ -210,6 +212,10 @@ func (f *FloatValue) Size() int {
return 16
}
func (f *FloatValue) String() string {
return fmt.Sprintf("%v %v", f.Time(), f.Value())
}
func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
if len(values) == 0 {
return nil, nil
@ -316,6 +322,10 @@ func (b *BoolValue) Value() interface{} {
return b.value
}
func (f *BoolValue) String() string {
return fmt.Sprintf("%v %v", f.Time(), f.Value())
}
func encodeBoolBlock(buf []byte, values []Value) ([]byte, error) {
if len(values) == 0 {
return nil, nil
@ -417,7 +427,9 @@ func (v *Int64Value) Size() int {
return 16
}
func (v *Int64Value) String() string { return fmt.Sprintf("%v", v.value) }
func (f *Int64Value) String() string {
return fmt.Sprintf("%v %v", f.Time(), f.Value())
}
func encodeInt64Block(buf []byte, values []Value) ([]byte, error) {
tsEnc := NewTimeEncoder()
@ -508,7 +520,9 @@ func (v *StringValue) Size() int {
return 8 + len(v.value)
}
func (v *StringValue) String() string { return v.value }
func (f *StringValue) String() string {
return fmt.Sprintf("%v %v", f.Time(), f.Value())
}
func encodeStringBlock(buf []byte, values []Value) ([]byte, error) {
tsEnc := NewTimeEncoder()

View File

@ -1,15 +1,24 @@
package tsm1
import (
"fmt"
"io"
"log"
"os"
"sync"
"time"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
)
// minCompactionSegments is the number of WAL segements that must be
// closed in order for a compaction to run. A lower value would shorten
// compaction times and memory requirements, but produce more TSM files
// with lower compression ratios. A higher value increases compaction times
// and memory usage but produces more dense TSM files.
const minCompactionSegments = 10
func init() {
tsdb.RegisterEngine("tsm1dev", NewDevEngine)
}
@ -24,7 +33,8 @@ type DevEngine struct {
path string
logger *log.Logger
WAL *WAL
WAL *WAL
Compactor *Compactor
RotateFileSize uint32
MaxFileSize uint32
@ -36,11 +46,16 @@ func NewDevEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engi
w := NewWAL(walPath)
w.LoggingEnabled = opt.Config.WALLoggingEnabled
c := &Compactor{
Dir: path,
}
e := &DevEngine{
path: path,
logger: log.New(os.Stderr, "[tsm1dev] ", log.LstdFlags),
WAL: w,
Compactor: c,
RotateFileSize: DefaultRotateFileSize,
MaxFileSize: MaxDataFileSize,
MaxPointsPerBlock: DefaultMaxPointsPerBlock,
@ -58,15 +73,21 @@ func (e *DevEngine) PerformMaintenance() {
// Format returns the format type of this engine
func (e *DevEngine) Format() tsdb.EngineFormat {
return tsdb.TSM1Format
return tsdb.TSM1DevFormat
}
// Open opens and initializes the engine.
func (e *DevEngine) Open() error {
if err := os.MkdirAll(e.path, 0777); err != nil {
return err
}
if err := e.WAL.Open(); err != nil {
return err
}
go e.compact()
return nil
}
@ -91,7 +112,15 @@ func (e *DevEngine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseInd
// WritePoints writes metadata and point data into the engine.
// Returns an error if new points are added to an existing key.
func (e *DevEngine) WritePoints(points []models.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
return e.WAL.WritePoints(points)
values := map[string][]Value{}
for _, p := range points {
for k, v := range p.Fields() {
key := fmt.Sprintf("%s%s%s", p.Key(), keyFieldSeparator, k)
values[key] = append(values[key], NewValue(p.Time(), v))
}
}
return e.WAL.WritePoints(values)
}
// DeleteSeries deletes the series from the engine.
@ -115,3 +144,55 @@ func (e *DevEngine) Begin(writable bool) (tsdb.Tx, error) {
}
func (e *DevEngine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
func (e *DevEngine) compact() {
for {
// Grab the closed segments that are no longer being written to
segments, err := e.WAL.ClosedSegments()
if err != nil {
e.logger.Printf("error retrieving closed WAL segments: %v", err)
time.Sleep(time.Second)
continue
}
// NOTE: This logic is temporary. Only compact the closed segments if we
// have at least 10 of them.
n := minCompactionSegments
if len(segments) == 0 || len(segments) < n {
time.Sleep(time.Second)
continue
}
// If we have more than 10, just compact 10 to keep compactions times bounded.
compact := segments[:n]
start := time.Now()
files, err := e.Compactor.Compact(compact)
if err != nil {
e.logger.Printf("error compacting WAL segments: %v", err)
}
// TODO: this is stubbed out but would be the place to replace files in the
// file store with the new compacted versions.
e.replaceFiles(files, compact)
// TODO: if replacement succeeds, we'd update the cache with the latest checkpoint.
// e.Cache.SetCheckpoint(...)
e.logger.Printf("compacted %d segments into %d files in %s", len(compact), len(files), time.Since(start))
}
}
func (e *DevEngine) replaceFiles(tsm, segments []string) {
// TODO: this is temporary, this func should replace the files in the file store
// The new TSM files are have a tmp extension. First rename them.
for _, f := range tsm {
os.Rename(f, f[:len(f)-4])
}
// The segments are fully compacted, delete them.
for _, f := range segments {
os.RemoveAll(f)
}
}

147
tsdb/engine/tsm1/pools.go Normal file
View File

@ -0,0 +1,147 @@
package tsm1
import "sync"
var (
bufPool sync.Pool
float64ValuePool sync.Pool
int64ValuePool sync.Pool
boolValuePool sync.Pool
stringValuePool sync.Pool
)
// getBuf returns a buffer with length size from the buffer pool.
func getBuf(size int) []byte {
x := bufPool.Get()
if x == nil {
return make([]byte, size)
}
buf := x.([]byte)
if cap(buf) < size {
return make([]byte, size)
}
return buf[:size]
}
// putBuf returns a buffer to the pool.
func putBuf(buf []byte) {
bufPool.Put(buf)
}
// getBuf returns a buffer with length size from the buffer pool.
func getFloat64Values(size int) []Value {
var buf []Value
x := float64ValuePool.Get()
if x == nil {
buf = make([]Value, size)
} else {
buf = x.([]Value)
}
if cap(buf) < size {
return make([]Value, size)
}
for i, v := range buf {
if v == nil {
buf[i] = &FloatValue{}
}
}
return buf[:size]
}
// putBuf returns a buffer to the pool.
func putFloat64Values(buf []Value) {
float64ValuePool.Put(buf)
}
// getBuf returns a buffer with length size from the buffer pool.
func getInt64Values(size int) []Value {
var buf []Value
x := int64ValuePool.Get()
if x == nil {
buf = make([]Value, size)
} else {
buf = x.([]Value)
}
if cap(buf) < size {
return make([]Value, size)
}
for i, v := range buf {
if v == nil {
buf[i] = &Int64Value{}
}
}
return buf[:size]
}
// putBuf returns a buffer to the pool.
func putInt64Values(buf []Value) {
int64ValuePool.Put(buf)
}
// getBuf returns a buffer with length size from the buffer pool.
func getBoolValues(size int) []Value {
var buf []Value
x := boolValuePool.Get()
if x == nil {
buf = make([]Value, size)
} else {
buf = x.([]Value)
}
if cap(buf) < size {
return make([]Value, size)
}
for i, v := range buf {
if v == nil {
buf[i] = &BoolValue{}
}
}
return buf[:size]
}
// putBuf returns a buffer to the pool.
func putStringValues(buf []Value) {
stringValuePool.Put(buf)
}
// getBuf returns a buffer with length size from the buffer pool.
func getStringValues(size int) []Value {
var buf []Value
x := stringValuePool.Get()
if x == nil {
buf = make([]Value, size)
} else {
buf = x.([]Value)
}
if cap(buf) < size {
return make([]Value, size)
}
for i, v := range buf {
if v == nil {
buf[i] = &StringValue{}
}
}
return buf[:size]
}
// putBuf returns a buffer to the pool.
func putBoolValues(buf []Value) {
boolValuePool.Put(buf)
}
func putValue(buf []Value) {
if len(buf) > 0 {
switch buf[0].(type) {
case *FloatValue:
putFloat64Values(buf)
case *Int64Value:
putInt64Values(buf)
case *BoolValue:
putBoolValues(buf)
case *StringValue:
putBoolValues(buf)
}
}
}

View File

@ -4,21 +4,21 @@ import (
"fmt"
"io"
"log"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"github.com/influxdb/influxdb/models"
"time"
"github.com/golang/snappy"
)
const (
// DefaultSegmentSize of 2MB is the size at which segment files will be rolled over
DefaultSegmentSize = 2 * 1024 * 1024
// DefaultSegmentSize of 10MB is the size at which segment files will be rolled over
DefaultSegmentSize = 10 * 1024 * 1024
// FileExtension is the file extension we expect for wal segments
WALFileExtension = "wal"
@ -26,6 +26,11 @@ const (
WALFilePrefix = "_"
defaultBufLen = 1024 << 10 // 1MB (sized for batches of 5000 points)
float64EntryType = 1
int64EntryType = 2
boolEntryType = 3
stringEntryType = 4
)
// walEntry is a byte written to a wal segment file that indicates what the following compressed block contains
@ -38,8 +43,6 @@ const (
var ErrWALClosed = fmt.Errorf("WAL closed")
var bufPool sync.Pool
type WAL struct {
mu sync.RWMutex
@ -100,9 +103,9 @@ func (l *WAL) Open() error {
return nil
}
func (l *WAL) WritePoints(points []models.Point) error {
func (l *WAL) WritePoints(values map[string][]Value) error {
entry := &WriteWALEntry{
Points: points,
Values: values,
}
if err := l.writeToLog(entry); err != nil {
@ -114,12 +117,17 @@ func (l *WAL) WritePoints(points []models.Point) error {
func (l *WAL) ClosedSegments() ([]string, error) {
l.mu.RLock()
defer l.mu.RUnlock()
var activePath string
if l.currentSegmentWriter != nil {
activePath = l.currentSegmentWriter.Path()
}
// Not loading files from disk so nothing to do
if l.path == "" {
l.mu.RUnlock()
return nil, nil
}
l.mu.RUnlock()
files, err := l.segmentFileNames()
if err != nil {
@ -129,7 +137,7 @@ func (l *WAL) ClosedSegments() ([]string, error) {
var names []string
for _, fn := range files {
// Skip the active segment
if l.currentSegmentWriter != nil && fn == l.currentSegmentWriter.Path() {
if fn == activePath {
continue
}
@ -164,16 +172,20 @@ func (l *WAL) writeToLog(entry WALEntry) error {
}
func (l *WAL) rollSegment() error {
l.mu.Lock()
defer l.mu.Unlock()
l.mu.RLock()
if l.currentSegmentWriter == nil || l.currentSegmentWriter.Size() > DefaultSegmentSize {
l.mu.RUnlock()
l.mu.Lock()
defer l.mu.Unlock()
if err := l.newSegmentFile(); err != nil {
// A drop database or RP call could trigger this error if writes were in-flight
// when the drop statement executes.
return fmt.Errorf("error opening new segment file for wal: %v", err)
}
return nil
}
l.mu.RUnlock()
return nil
}
@ -243,26 +255,77 @@ type WALEntry interface {
// WriteWALEntry represents a write of points.
type WriteWALEntry struct {
Points []models.Point
Values map[string][]Value
}
// Encode converts the WriteWALEntry into a byte stream using dst if it
// is large enough. If dst is too small, the slice will be grown to fit the
// encoded entry.
func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
// The entries values are encode as follows:
//
// For each key and slice of values, first a 1 byte type for the []Values
// slice is written. Following the type, the length and key bytes are written.
// Following the key, a 4 byte count followed by each value as a 8 byte time
// and N byte value. The value is dependent on the type being encoded. float64,
// int64, use 8 bytes, bool uses 1 byte, and string is similar to the key encoding.
//
// This structure is then repeated for each key an value slices.
//
// ┌────────────────────────────────────────────────────────────────────┐
// │ WriteWALEntry │
// ├──────┬─────────┬────────┬───────┬─────────┬─────────┬───┬──────┬───┤
// │ Type │ Key Len │ Key │ Count │ Time │ Value │...│ Type │...│
// │1 byte│ 4 bytes │ N bytes│4 bytes│ 8 bytes │ N bytes │ │1 byte│ │
// └──────┴─────────┴────────┴───────┴─────────┴─────────┴───┴──────┴───┘
var n int
for _, p := range w.Points {
// Marshaling points to bytes is relatively expensive, only do it once
bytes, err := p.MarshalBinary()
if err != nil {
return nil, err
for k, v := range w.Values {
switch v[0].Value().(type) {
case float64:
dst[n] = float64EntryType
case int64:
dst[n] = int64EntryType
case bool:
dst[n] = boolEntryType
case string:
dst[n] = stringEntryType
default:
return nil, fmt.Errorf("unsupported value type: %#v", v[0].Value())
}
n++
// Make sure we have enough space in our buf before copying. If not,
// grow the buf.
if len(bytes)+4 > len(dst)-n {
grow := make([]byte, len(bytes)*2)
if len(k)+2+len(v)*8+4 > len(dst)-n {
grow := make([]byte, len(dst)*2)
dst = append(dst, grow...)
}
n += copy(dst[n:], u32tob(uint32(len(bytes))))
n += copy(dst[n:], bytes)
n += copy(dst[n:], u16tob(uint16(len(k))))
n += copy(dst[n:], []byte(k))
n += copy(dst[n:], u32tob(uint32(len(v))))
for _, vv := range v {
n += copy(dst[n:], u64tob(uint64(vv.Time().UnixNano())))
switch t := vv.Value().(type) {
case float64:
n += copy(dst[n:], u64tob(uint64(math.Float64bits(t))))
case int64:
n += copy(dst[n:], u64tob(uint64(t)))
case bool:
if t {
n += copy(dst[n:], []byte{1})
} else {
n += copy(dst[n:], []byte{0})
}
case string:
n += copy(dst[n:], u32tob(uint32(len(t))))
n += copy(dst[n:], []byte(t))
}
}
}
return dst[:n], nil
@ -276,17 +339,76 @@ func (w *WriteWALEntry) MarshalBinary() ([]byte, error) {
func (w *WriteWALEntry) UnmarshalBinary(b []byte) error {
var i int
for i < len(b) {
length := int(btou32(b[i : i+4]))
typ := b[i]
i++
length := int(btou16(b[i : i+2]))
i += 2
k := string(b[i : i+length])
i += length
nvals := int(btou32(b[i : i+4]))
i += 4
point, err := models.NewPointFromBytes(b[i : i+length])
if err != nil {
return err
var values []Value
switch typ {
case float64EntryType:
values = getFloat64Values(nvals)
case int64EntryType:
values = getInt64Values(nvals)
case boolEntryType:
values = getBoolValues(nvals)
case stringEntryType:
values = getStringValues(nvals)
default:
return fmt.Errorf("unsupported value type: %#v", typ)
}
i += length
w.Points = append(w.Points, point)
for j := 0; j < nvals; j++ {
t := time.Unix(0, int64(btou64(b[i:i+8])))
i += 8
switch typ {
case float64EntryType:
v := math.Float64frombits((btou64(b[i : i+8])))
i += 8
if fv, ok := values[j].(*FloatValue); ok {
fv.time = t
fv.value = v
}
case int64EntryType:
v := int64(btou64(b[i : i+8]))
i += 8
if fv, ok := values[j].(*Int64Value); ok {
fv.time = t
fv.value = v
}
case boolEntryType:
v := b[i]
i += 1
if fv, ok := values[j].(*BoolValue); ok {
fv.time = t
if v == 1 {
fv.value = true
} else {
fv.value = false
}
}
case stringEntryType:
length := int(btou32(b[i : i+4]))
i += 4
v := string(b[i : i+length])
i += length
if fv, ok := values[j].(*StringValue); ok {
fv.time = t
fv.value = v
}
default:
return fmt.Errorf("unsupported value type: %#v", typ)
}
}
w.Values[k] = values
}
return nil
}
@ -333,8 +455,7 @@ func (w *DeleteWALEntry) Type() walEntryType {
// WALSegmentWriter writes WAL segments.
type WALSegmentWriter struct {
mu sync.RWMutex
mu sync.RWMutex
w io.WriteCloser
size int
}
@ -371,10 +492,12 @@ func (w *WALSegmentWriter) Write(e WALEntry) error {
if _, err := w.w.Write([]byte{byte(e.Type())}); err != nil {
return err
}
if _, err := w.w.Write(u32tob(uint32(len(compressed)))); err != nil {
if _, err = w.w.Write(u32tob(uint32(len(compressed)))); err != nil {
return err
}
if _, err := w.w.Write(compressed); err != nil {
if _, err = w.w.Write(compressed); err != nil {
return err
}
@ -459,10 +582,7 @@ func (r *WALSegmentReader) Next() bool {
return true
}
buf := getBuf(defaultBufLen)
defer putBuf(buf)
data, err := snappy.Decode(buf, b[:length])
data, err := snappy.Decode(nil, b[:length])
if err != nil {
r.err = err
return true
@ -471,7 +591,9 @@ func (r *WALSegmentReader) Next() bool {
// and marshal it and send it to the cache
switch walEntryType(entryType) {
case WriteWALEntryType:
r.entry = &WriteWALEntry{}
r.entry = &WriteWALEntry{
Values: map[string][]Value{},
}
case DeleteWALEntryType:
r.entry = &DeleteWALEntry{}
default:
@ -494,6 +616,10 @@ func (r *WALSegmentReader) Error() error {
return r.err
}
func (r *WALSegmentReader) Close() error {
return r.r.Close()
}
// idFromFileName parses the segment file ID from its name
func idFromFileName(name string) (int, error) {
parts := strings.Split(filepath.Base(name), ".")
@ -505,21 +631,3 @@ func idFromFileName(name string) (int, error) {
return int(id), err
}
// getBuf returns a buffer with length size from the buffer pool.
func getBuf(size int) []byte {
x := bufPool.Get()
if x == nil {
return make([]byte, size)
}
buf := x.([]byte)
if cap(buf) < size {
return make([]byte, size)
}
return buf[:size]
}
// putBuf returns a buffer to the pool.
func putBuf(buf []byte) {
bufPool.Put(buf)
}

View File

@ -1,11 +1,10 @@
package tsm1_test
import (
"fmt"
"os"
"testing"
"time"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)
@ -15,14 +14,20 @@ func TestWALWriter_WritePoints_Single(t *testing.T) {
f := MustTempFile(dir)
w := tsm1.NewWALSegmentWriter(f)
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p1 := tsm1.NewValue(time.Unix(1, 0), 1.1)
p2 := tsm1.NewValue(time.Unix(1, 0), int64(1))
p3 := tsm1.NewValue(time.Unix(1, 0), true)
p4 := tsm1.NewValue(time.Unix(1, 0), "string")
points := []models.Point{
p1,
values := map[string][]tsm1.Value{
"cpu,host=A#!~#float": []tsm1.Value{p1},
"cpu,host=A#!~#int": []tsm1.Value{p2},
"cpu,host=A#!~#bool": []tsm1.Value{p3},
"cpu,host=A#!~#string": []tsm1.Value{p4},
}
entry := &tsm1.WriteWALEntry{
Points: points,
Values: values,
}
if err := w.Write(entry); err != nil {
@ -49,9 +54,11 @@ func TestWALWriter_WritePoints_Single(t *testing.T) {
t.Fatalf("expected WriteWALEntry: got %#v", e)
}
for i, p := range e.Points {
if exp, got := points[i].String(), p.String(); exp != got {
t.Fatalf("points mismatch: got %v, exp %v", got, exp)
for k, v := range e.Values {
for i, vv := range v {
if got, exp := vv.String(), values[k][i].String(); got != exp {
t.Fatalf("points mismatch: got %v, exp %v", got, exp)
}
}
}
}
@ -62,21 +69,20 @@ func TestWALWriter_WritePoints_Multiple(t *testing.T) {
f := MustTempFile(dir)
w := tsm1.NewWALSegmentWriter(f)
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p2 := parsePoint("cpu,host=B value=1.1 1000000000")
p1 := tsm1.NewValue(time.Unix(1, 0), int64(1))
p2 := tsm1.NewValue(time.Unix(1, 0), int64(2))
exp := [][]models.Point{
[]models.Point{
p1,
},
[]models.Point{
p2,
},
exp := []struct {
key string
values []tsm1.Value
}{
{"cpu,host=A#!~#value", []tsm1.Value{p1}},
{"cpu,host=B#!~#value", []tsm1.Value{p2}},
}
for _, e := range exp {
for _, v := range exp {
entry := &tsm1.WriteWALEntry{
Points: e,
Values: map[string][]tsm1.Value{v.key: v.values},
}
if err := w.Write(entry); err != nil {
@ -106,10 +112,19 @@ func TestWALWriter_WritePoints_Multiple(t *testing.T) {
t.Fatalf("expected WriteWALEntry: got %#v", e)
}
points := e.Points
for i, p := range ep {
if exp, got := points[i].String(), p.String(); exp != got {
t.Fatalf("points mismatch: got %v, exp %v", got, exp)
for k, v := range e.Values {
if got, exp := k, ep.key; got != exp {
t.Fatalf("key mismatch. got %v, exp %v", got, exp)
}
if got, exp := len(v), len(ep.values); got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
for i, vv := range v {
if got, exp := vv.String(), ep.values[i].String(); got != exp {
t.Fatalf("points mismatch: got %v, exp %v", got, exp)
}
}
}
}
@ -164,19 +179,22 @@ func TestWALWriter_WritePointsDelete_Multiple(t *testing.T) {
f := MustTempFile(dir)
w := tsm1.NewWALSegmentWriter(f)
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
write := &tsm1.WriteWALEntry{
Points: []models.Point{p1},
p1 := tsm1.NewValue(time.Unix(1, 0), true)
values := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{p1},
}
if err := w.Write(write); err != nil {
writeEntry := &tsm1.WriteWALEntry{
Values: values,
}
if err := w.Write(writeEntry); err != nil {
fatal(t, "write points", err)
}
// Write the delete entry
deleteEntry := &tsm1.DeleteWALEntry{
Keys: []string{"cpu"},
Keys: []string{"cpu,host=A#!~value"},
}
if err := w.Write(deleteEntry); err != nil {
@ -205,10 +223,15 @@ func TestWALWriter_WritePointsDelete_Multiple(t *testing.T) {
t.Fatalf("expected WriteWALEntry: got %#v", e)
}
points := e.Points
for i, p := range write.Points {
if exp, got := points[i].String(), p.String(); exp != got {
t.Fatalf("points mismatch: got %v, exp %v", got, exp)
for k, v := range e.Values {
if got, exp := len(v), len(values[k]); got != exp {
t.Fatalf("values length mismatch: got %v, exp %v", got, exp)
}
for i, vv := range v {
if got, exp := vv.String(), values[k][i].String(); got != exp {
t.Fatalf("points mismatch: got %v, exp %v", got, exp)
}
}
}
@ -254,8 +277,10 @@ func TestWAL_ClosedSegments(t *testing.T) {
t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
}
if err := w.WritePoints([]models.Point{
parsePoint("cpu,host=A value=1.1 1000000000"),
if err := w.WritePoints(map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{
tsm1.NewValue(time.Unix(1, 0), 1.1),
},
}); err != nil {
t.Fatalf("error writing points: %v", err)
}
@ -323,9 +348,10 @@ func TestWAL_Delete(t *testing.T) {
}
func BenchmarkWALSegmentWriter(b *testing.B) {
points := make([]models.Point, 5000)
for i := range points {
points[i] = parsePoint(fmt.Sprintf("cpu,host=host-%d value=1.1 1000000000", i))
points := map[string][]tsm1.Value{}
for i := 0; i < 5000; i++ {
k := "cpu,host=A#!~#value"
points[k] = append(points[k], tsm1.NewValue(time.Unix(int64(i), 0), 1.1))
}
dir := MustTempDir()
@ -335,7 +361,7 @@ func BenchmarkWALSegmentWriter(b *testing.B) {
w := tsm1.NewWALSegmentWriter(f)
write := &tsm1.WriteWALEntry{
Points: points,
Values: points,
}
b.ResetTimer()
@ -347,9 +373,10 @@ func BenchmarkWALSegmentWriter(b *testing.B) {
}
func BenchmarkWALSegmentReader(b *testing.B) {
points := make([]models.Point, 5000)
for i := range points {
points[i] = parsePoint(fmt.Sprintf("cpu,host=host-%d value=1.1 1000000000", i))
points := map[string][]tsm1.Value{}
for i := 0; i < 5000; i++ {
k := "cpu,host=A#!~#value"
points[k] = append(points[k], tsm1.NewValue(time.Unix(int64(i), 0), 1.1))
}
dir := MustTempDir()
@ -359,7 +386,7 @@ func BenchmarkWALSegmentReader(b *testing.B) {
w := tsm1.NewWALSegmentWriter(f)
write := &tsm1.WriteWALEntry{
Points: points,
Values: points,
}
for i := 0; i < 100; i++ {