Add prefix key to tombstoner

This commit adds support for "prefix keys". Prefix keys differ from
regular tombstone key entries in that the key of the entry should act as
a prefix that matches all series with the same prefix key for the given
time range.

This means only one entry is needed to delete many series.

The tombstone entries now have a maximum length of 16777215 (24 bits),
with the remaining 8 high bits available for setting further options /
meta information about the tombstone entry.

In this case, the top bit is used to indicate that the tombstone entry
is intended to be a prefix. This leaves 7 spare bits for future use.
pull/10616/head
Edd Robinson 2018-12-17 13:28:46 +00:00
parent 1b5ad5e129
commit 262772544a
2 changed files with 305 additions and 157 deletions

View File

@ -1,5 +1,45 @@
package tsm1
/*
Tombstone file format:
Tombstone File
Header
4 bytes Tombstone Entries
Tombstone Entry
Prefix Reserved Key Length Key Min Time Max Time
Bit 7 bits 24 bits N bytes 8 bytes 8 bytes
*/
import (
"bufio"
"compress/gzip"
@ -19,13 +59,9 @@ import (
const (
headerSize = 4
v4header = 0x1504
v5header = 0x1505
)
var (
errIncompatibleV4Version = errors.New("incompatible v4 version")
errIncompatibleV5Version = errors.New("incompatible v5 version")
)
var errIncompatibleV4Version = errors.New("incompatible v4 version")
// Tombstoner records tombstones when entries are deleted.
type Tombstoner struct {
@ -72,11 +108,23 @@ type Tombstone struct {
// Key is the tombstoned series key.
Key []byte
// Prefix indicates if this tombstone entry is a prefix key, meaning all
// keys with a prefix matching Key should be removed for the [Min, Max] range.
Prefix bool
// Min and Max are the min and max unix nanosecond time ranges of Key that are deleted. If
// the full range is deleted, both values are -1.
Min, Max int64
}
func (t Tombstone) String() string {
prefix := "Key"
if t.Prefix {
prefix = "Prefix"
}
return fmt.Sprintf("%s: %q, [%d, %d]", prefix, t.Key, t.Min, t.Max)
}
// WithObserver sets a FileStoreObserver for when the tombstone file is written.
func (t *Tombstoner) WithObserver(obs FileStoreObserver) {
if obs == nil {
@ -85,6 +133,40 @@ func (t *Tombstoner) WithObserver(obs FileStoreObserver) {
t.obs = obs
}
// AddPrefix adds a prefix-based tomstone key.
func (t *Tombstoner) AddPrefix(key []byte) error {
return t.AddPrefixRange(key, math.MinInt64, math.MaxInt64)
}
// AddPrefixRange adds a prefix-based tomstone key with an explicit range.
func (t *Tombstoner) AddPrefixRange(key []byte, min, max int64) error {
if t.FilterFn != nil && !t.FilterFn(key) {
return nil
}
t.mu.Lock()
defer t.mu.Unlock()
// If this TSMFile has not been written (mainly in tests), don't write a
// tombstone because the keys will not be written when it's actually saved.
if t.Path == "" {
return nil
}
t.statsLoaded = false
if err := t.prepareLatest(); err != nil {
return err
}
return t.writeTombstoneV4(t.gz, Tombstone{
Key: key,
Min: min,
Max: max,
Prefix: true,
})
}
// Add adds the all keys, across all timestamps, to the tombstone.
func (t *Tombstoner) Add(keys [][]byte) error {
return t.AddRange(keys, math.MinInt64, math.MaxInt64)
@ -111,26 +193,24 @@ func (t *Tombstoner) AddRange(keys [][]byte, min, max int64) error {
t.statsLoaded = false
if err := t.prepareLatest(); err == errIncompatibleV5Version {
for _, k := range keys {
if t.FilterFn != nil && !t.FilterFn(k) {
continue
}
if err := t.writeTombstoneV4(t.gz, Tombstone{
Key: k,
Min: min,
Max: max,
}); err != nil {
return err
}
}
} else if err != nil {
if err := t.prepareLatest(); err != nil {
return err
}
// WRITE V5 TOMBSTONE
for _, k := range keys {
if t.FilterFn != nil && !t.FilterFn(k) {
continue
}
if err := t.writeTombstoneV4(t.gz, Tombstone{
Key: k,
Min: min,
Max: max,
Prefix: false,
}); err != nil {
return err
}
}
return nil
}
@ -234,45 +314,12 @@ func (t *Tombstoner) Walk(fn func(t Tombstone) error) error {
header := binary.BigEndian.Uint32(b[:])
if header == v4header {
return t.readTombstoneV4(f, fn)
} else if header == v5header {
return t.readTombstone(f, fn)
}
return errors.New("invalid tombstone file")
}
// func (t *Tombstoner) writeTombstoneV3(tombstones []Tombstone) error {
// tmp, err := ioutil.TempFile(filepath.Dir(t.Path), "tombstone")
// if err != nil {
// return err
// }
// defer tmp.Close()
// var b [8]byte
// bw := bufio.NewWriterSize(tmp, 1024*1024)
// binary.BigEndian.PutUint32(b[:4], v3header)
// if _, err := bw.Write(b[:4]); err != nil {
// return err
// }
// gz := gzip.NewWriter(bw)
// for _, ts := range tombstones {
// if err := t.writeTombstoneV4(gz, ts); err != nil {
// return err
// }
// }
// t.gz = gz
// t.bw = bw
// t.pendingFile = tmp
// t.tombstones = t.tombstones[:0]
// return t.commit()
// }
func (t *Tombstoner) prepareLatest() error {
if t.pendingFile != nil {
if t.pendingFile != nil { // There is already a pending tombstone file open.
return nil
}
@ -287,18 +334,18 @@ func (t *Tombstoner) prepareLatest() error {
os.Remove(tmp.Name())
}
// Copy the existing v4 file if it exists
// Copy the existing v5 file if it exists
f, err := os.Open(t.tombstonePath())
if !os.IsNotExist(err) {
defer f.Close()
var b [4]byte
if n, err := f.Read(b[:]); n == 4 && err == nil {
header := binary.BigEndian.Uint32(b[:])
// There is an existing tombstone on disk and it's not a v5.
// version again.
if header != v5header {
// There is an existing tombstone on disk and it's not a v4.
// We can't support it.
if header != v4header {
removeTmp()
return errIncompatibleV5Version
return errIncompatibleV4Version
}
// Seek back to the beginning we copy the header
@ -324,7 +371,7 @@ func (t *Tombstoner) prepareLatest() error {
// Write the header only if the file is new
if os.IsNotExist(err) {
binary.BigEndian.PutUint32(b[:4], v5header)
binary.BigEndian.PutUint32(b[:4], v4header)
if _, err := bw.Write(b[:4]); err != nil {
removeTmp()
return err
@ -395,80 +442,6 @@ func (t *Tombstoner) rollback() error {
return os.Remove(tmpFilename)
}
func (t *Tombstoner) readTombstone(f *os.File, fn func(t Tombstone) error) error {
return nil
}
// readTombstoneV3 reads the third version of tombstone files that are capable
// of storing keys and the range of time for the key that points were deleted. This
// format is a binary and compressed with gzip.
func (t *Tombstoner) readTombstoneV3(f *os.File, fn func(t Tombstone) error) error {
// Skip header, already checked earlier
if _, err := f.Seek(headerSize, io.SeekStart); err != nil {
return err
}
var (
min, max int64
key []byte
)
gr, err := gzip.NewReader(bufio.NewReader(f))
if err != nil {
return err
}
defer gr.Close()
b := make([]byte, 4096)
for {
if _, err = io.ReadFull(gr, b[:4]); err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
return err
}
keyLen := int(binary.BigEndian.Uint32(b[:4]))
if keyLen > len(b) {
b = make([]byte, keyLen)
}
if _, err := io.ReadFull(gr, b[:keyLen]); err != nil {
return err
}
// Copy the key since b is re-used
key = make([]byte, keyLen)
copy(key, b[:keyLen])
if _, err := io.ReadFull(gr, b[:8]); err != nil {
return err
}
min = int64(binary.BigEndian.Uint64(b[:8]))
if _, err := io.ReadFull(gr, b[:8]); err != nil {
return err
}
max = int64(binary.BigEndian.Uint64(b[:8]))
if err := fn(Tombstone{
Key: key,
Min: min,
Max: max,
}); err != nil {
return err
}
}
for _, t := range t.tombstones {
if err := fn(t); err != nil {
return err
}
}
return nil
}
// readTombstoneV4 reads the fourth version of tombstone files that are capable
// of storing multiple v3 files appended together.
func (t *Tombstoner) readTombstoneV4(f *os.File, fn func(t Tombstone) error) error {
@ -482,9 +455,12 @@ func (t *Tombstoner) readTombstoneV4(f *os.File, fn func(t Tombstone) error) err
return err
}
}
var (
prefix bool
min, max int64
key []byte
kmask = 0xff000000 // Mask for non key-length bits
)
br := bufio.NewReaderSize(f, 64*1024)
@ -508,6 +484,11 @@ func (t *Tombstoner) readTombstoneV4(f *os.File, fn func(t Tombstone) error) err
}
keyLen := int(binary.BigEndian.Uint32(b[:4]))
prefix = keyLen>>31 == 1 // Prefix is set according to whether the highest bit is set.
// Remove 8 MSB to get correct length.
keyLen &^= kmask
if keyLen+16 > len(b) {
b = make([]byte, keyLen+16)
}
@ -532,9 +513,10 @@ func (t *Tombstoner) readTombstoneV4(f *os.File, fn func(t Tombstone) error) err
max = int64(binary.BigEndian.Uint64(maxBuf))
if err := fn(Tombstone{
Key: key,
Min: min,
Max: max,
Key: key,
Min: min,
Max: max,
Prefix: prefix,
}); err != nil {
return err
}
@ -584,13 +566,27 @@ func (t *Tombstoner) tombstonePath() string {
}
func (t *Tombstoner) writeTombstoneV4(dst io.Writer, ts Tombstone) error {
binary.BigEndian.PutUint32(t.tmp[:4], uint32(len(ts.Key)))
maxKeyLen := 0x00ffffff // 24 bit key length. Top 8 bits for other information.
// Maximum key length. Leaves 8 spare bits.
if len(ts.Key) > maxKeyLen {
return fmt.Errorf("key has length %d, maximum allowed key length %d", len(ts.Key), maxKeyLen)
}
l := uint32(len(ts.Key))
if ts.Prefix {
// A mask to set the prefix bit on a tombstone.
l |= (1 << 31)
}
binary.BigEndian.PutUint32(t.tmp[:4], l)
if _, err := dst.Write(t.tmp[:4]); err != nil {
return err
}
if _, err := dst.Write([]byte(ts.Key)); err != nil {
return err
}
binary.BigEndian.PutUint64(t.tmp[:], uint64(ts.Min))
if _, err := dst.Write(t.tmp[:]); err != nil {
return err
@ -600,5 +596,3 @@ func (t *Tombstoner) writeTombstoneV4(dst io.Writer, ts Tombstone) error {
_, err := dst.Write(t.tmp[:])
return err
}
func (t *Tombstoner) writeTombstone(dst io.Writer, ts Tombstone) error { return nil }

View File

@ -5,6 +5,7 @@ import (
"encoding/hex"
"fmt"
"io"
"math"
"os"
"reflect"
"testing"
@ -15,7 +16,6 @@ import (
)
func TestTombstoner_Add(t *testing.T) {
t.Skip("TODO")
dir := MustTempDir()
defer func() { os.RemoveAll(dir) }()
@ -74,10 +74,97 @@ func TestTombstoner_Add(t *testing.T) {
if got, exp := string(entries[0].Key), "foo"; got != exp {
t.Fatalf("value mismatch: got %v, exp %v", got, exp)
}
if got, exp := entries[0].Prefix, false; got != exp {
t.Fatalf("value mismatch: got %v, exp %v", got, exp)
}
}
func TestTombstoner_AddPrefix(t *testing.T) {
dir := MustTempDir()
defer func() { os.RemoveAll(dir) }()
f := MustTempFile(dir)
ts := tsm1.NewTombstoner(f.Name(), nil)
entries := mustReadAll(ts)
if got, exp := len(entries), 0; got != exp {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}
stats := ts.TombstoneFiles()
if got, exp := len(stats), 0; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}
if err := ts.AddPrefix([]byte("some-prefix")); err != nil {
t.Fatal(err)
}
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}
exp := tsm1.Tombstone{
Key: []byte("some-prefix"),
Min: math.MinInt64,
Max: math.MaxInt64,
Prefix: true,
}
entries = mustReadAll(ts)
if got, exp := len(entries), 1; got != exp {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}
if got := entries[0]; !reflect.DeepEqual(got, exp) {
t.Fatalf("unexpected tombstone entry. Got %s, expected %s", got, exp)
}
}
func TestTombstoner_AddPrefixRange(t *testing.T) {
dir := MustTempDir()
defer func() { os.RemoveAll(dir) }()
f := MustTempFile(dir)
ts := tsm1.NewTombstoner(f.Name(), nil)
entries := mustReadAll(ts)
if got, exp := len(entries), 0; got != exp {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}
stats := ts.TombstoneFiles()
if got, exp := len(stats), 0; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}
if err := ts.AddPrefixRange([]byte("some-prefix"), 20, 30); err != nil {
t.Fatal(err)
}
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}
exp := tsm1.Tombstone{
Key: []byte("some-prefix"),
Min: 20,
Max: 30,
Prefix: true,
}
entries = mustReadAll(ts)
if got, exp := len(entries), 1; got != exp {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}
if got := entries[0]; !reflect.DeepEqual(got, exp) {
t.Fatalf("unexpected tombstone entry. Got %s, expected %s", got, exp)
}
}
func TestTombstoner_Add_LargeKey(t *testing.T) {
t.Skip("TODO")
dir := MustTempDir()
defer func() { os.RemoveAll(dir) }()
@ -139,8 +226,43 @@ func TestTombstoner_Add_LargeKey(t *testing.T) {
}
}
func TestTombstoner_Add_KeyTooBig(t *testing.T) {
dir := MustTempDir()
defer func() { os.RemoveAll(dir) }()
f := MustTempFile(dir)
ts := tsm1.NewTombstoner(f.Name(), nil)
entries := mustReadAll(ts)
if got, exp := len(entries), 0; got != exp {
t.Fatalf("length mismatch: got %v, exp %v", got, exp)
}
stats := ts.TombstoneFiles()
if got, exp := len(stats), 0; got != exp {
t.Fatalf("stat length mismatch: got %v, exp %v", got, exp)
}
key := bytes.Repeat([]byte{'a'}, 0x00ffffff) // This is OK.
if err := ts.Add([][]byte{key}); err != nil {
t.Fatal(err)
}
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}
key = append(key, 'a') // This is not
if err := ts.Add([][]byte{key}); err == nil {
t.Fatalf("got no error, expected key length error")
}
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}
}
func TestTombstoner_Add_Multiple(t *testing.T) {
t.Skip("TODO")
dir := MustTempDir()
defer func() { os.RemoveAll(dir) }()
@ -210,10 +332,17 @@ func TestTombstoner_Add_Multiple(t *testing.T) {
t.Fatalf("value mismatch: got %v, exp %v", got, exp)
}
if got, exp := entries[0].Prefix, false; got != exp {
t.Fatalf("value mismatch: got %v, exp %v", got, exp)
}
if got, exp := string(entries[1].Key), "bar"; got != exp {
t.Fatalf("value mismatch: got %v, exp %v", got, exp)
}
if got, exp := entries[1].Prefix, false; got != exp {
t.Fatalf("value mismatch: got %v, exp %v", got, exp)
}
}
func TestTombstoner_Add_Empty(t *testing.T) {
@ -249,7 +378,6 @@ func TestTombstoner_Add_Empty(t *testing.T) {
}
func TestTombstoner_Delete(t *testing.T) {
t.Skip("TODO")
dir := MustTempDir()
defer func() { os.RemoveAll(dir) }()
@ -289,7 +417,7 @@ func TestTombstoner_Delete(t *testing.T) {
}
}
func TestTombstoner_V4(t *testing.T) {
func TestTombstoner_Existing(t *testing.T) {
dir := MustTempDir()
defer func() { os.RemoveAll(dir) }()
@ -331,9 +459,8 @@ func TestTombstoner_V4(t *testing.T) {
panic(err)
}
ts := tsm1.NewTombstoner(name, nil)
t.Run("read", func(t *testing.T) {
ts := tsm1.NewTombstoner(name, nil)
var gotKeys []string
if err := ts.Walk(func(tombstone tsm1.Tombstone) error {
gotKeys = append(gotKeys, string(tombstone.Key))
@ -341,6 +468,8 @@ func TestTombstoner_V4(t *testing.T) {
t.Fatalf("got max time %d, expected %d", got, exp)
} else if got, exp := tombstone.Max, expMax; got != exp {
t.Fatalf("got max time %d, expected %d", got, exp)
} else if got, exp := tombstone.Prefix, false; got != exp {
t.Fatalf("got prefix key, expected non-prefix key")
}
return nil
}); err != nil {
@ -351,12 +480,36 @@ func TestTombstoner_V4(t *testing.T) {
t.Fatalf("tombstone entries differ:\n%s\n", cmp.Diff(gotKeys, expKeys, nil))
}
})
}
func TestTombstoner_ReadV5(t *testing.T) {
dir := MustTempDir()
defer func() { os.RemoveAll(dir) }()
t.Skip("TODO")
t.Run("add_prefix", func(t *testing.T) {
ts := tsm1.NewTombstoner(name, nil)
if err := ts.AddPrefixRange([]byte("new-prefix"), 10, 20); err != nil {
t.Fatal(err)
}
if err := ts.Flush(); err != nil {
t.Fatal(err)
}
var got tsm1.Tombstone
if err := ts.Walk(func(tombstone tsm1.Tombstone) error {
got = tombstone
return nil
}); err != nil {
t.Fatal(err)
}
exp := tsm1.Tombstone{
Key: []byte("new-prefix"),
Min: 10,
Max: 20,
Prefix: true,
}
if !reflect.DeepEqual(got, exp) {
t.Fatalf("unexpected tombstone entry. Got %s, expected %s", got, exp)
}
})
}
func mustReadAll(t *tsm1.Tombstoner) []tsm1.Tombstone {
@ -365,9 +518,10 @@ func mustReadAll(t *tsm1.Tombstoner) []tsm1.Tombstone {
b := make([]byte, len(t.Key))
copy(b, t.Key)
tombstones = append(tombstones, tsm1.Tombstone{
Min: t.Min,
Max: t.Max,
Key: b,
Min: t.Min,
Max: t.Max,
Key: b,
Prefix: t.Prefix,
})
return nil
}); err != nil {