commit
d610a79487
1
Godeps
1
Godeps
|
@ -36,6 +36,7 @@ go.uber.org/multierr fb7d312c2c04c34f0ad621048bbb953b168f9ff6
|
|||
go.uber.org/zap 35aad584952c3e7020db7b839f6b102de6271f89
|
||||
golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd
|
||||
golang.org/x/net 9dfe39835686865bff950a07b394c12a98ddc811
|
||||
golang.org/x/sync fd80eb99c8f653c847d294a001bdf2a3a6f768f5
|
||||
golang.org/x/sys 062cd7e4e68206d8bab9b18396626e855c992658
|
||||
golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34
|
||||
golang.org/x/time 6dc17368e09b0e8634d71cac8168d853e869a0c7
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
package binaryutil
|
||||
|
||||
// VarintSize returns the number of bytes to varint encode x.
|
||||
// This code is copied from encoding/binary.PutVarint() with the buffer removed.
|
||||
func VarintSize(x int64) int {
|
||||
ux := uint64(x) << 1
|
||||
if x < 0 {
|
||||
ux = ^ux
|
||||
}
|
||||
return UvarintSize(ux)
|
||||
}
|
||||
|
||||
// UvarintSize returns the number of bytes to uvarint encode x.
|
||||
// This code is copied from encoding/binary.PutUvarint() with the buffer removed.
|
||||
func UvarintSize(x uint64) int {
|
||||
i := 0
|
||||
for x >= 0x80 {
|
||||
x >>= 7
|
||||
i++
|
||||
}
|
||||
return i + 1
|
||||
}
|
|
@ -14,6 +14,7 @@ import (
|
|||
"path/filepath"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -1185,6 +1186,7 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
|
|||
name, tags := e.sfile.Series(elem.SeriesID)
|
||||
gotKeys = append(gotKeys, string(models.MakeKey(name, tags)))
|
||||
}
|
||||
sort.Strings(gotKeys)
|
||||
|
||||
if !reflect.DeepEqual(gotKeys, expKeys) {
|
||||
t.Fatalf("got keys %v, expected %v", gotKeys, expKeys)
|
||||
|
|
|
@ -151,6 +151,25 @@ type SeriesIDIterator interface {
|
|||
Close() error
|
||||
}
|
||||
|
||||
// ReadAllSeriesIDIterator returns all ids from the iterator.
|
||||
func ReadAllSeriesIDIterator(itr SeriesIDIterator) ([]uint64, error) {
|
||||
if itr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var a []uint64
|
||||
for {
|
||||
e, err := itr.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if e.SeriesID == 0 {
|
||||
break
|
||||
}
|
||||
a = append(a, e.SeriesID)
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
// NewSeriesIDSliceIterator returns a SeriesIDIterator that iterates over a slice.
|
||||
func NewSeriesIDSliceIterator(ids []uint64) *SeriesIDSliceIterator {
|
||||
return &SeriesIDSliceIterator{ids: ids}
|
||||
|
|
|
@ -266,15 +266,7 @@ func (m *measurement) AddSeries(s *series) bool {
|
|||
valueMap = newTagKeyValue()
|
||||
m.seriesByTagKeyValue[string(t.Key)] = valueMap
|
||||
}
|
||||
ids := valueMap.LoadByte(t.Value)
|
||||
ids = append(ids, s.ID)
|
||||
|
||||
// most of the time the series ID will be higher than all others because it's a new
|
||||
// series. So don't do the sort if we don't have to.
|
||||
if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] {
|
||||
sort.Sort(ids)
|
||||
}
|
||||
valueMap.StoreByte(t.Value, ids)
|
||||
valueMap.InsertSeriesIDByte(t.Value, s.ID)
|
||||
}
|
||||
|
||||
return true
|
||||
|
@ -1292,13 +1284,13 @@ func (s *series) Deleted() bool {
|
|||
//
|
||||
// TODO(edd): This could possibly be replaced by a sync.Map once we use Go 1.9.
|
||||
type tagKeyValue struct {
|
||||
mu sync.RWMutex
|
||||
valueIDs map[string]seriesIDs
|
||||
mu sync.RWMutex
|
||||
entries map[string]*tagKeyValueEntry
|
||||
}
|
||||
|
||||
// NewTagKeyValue initialises a new TagKeyValue.
|
||||
func newTagKeyValue() *tagKeyValue {
|
||||
return &tagKeyValue{valueIDs: make(map[string]seriesIDs)}
|
||||
return &tagKeyValue{entries: make(map[string]*tagKeyValueEntry)}
|
||||
}
|
||||
|
||||
// Cardinality returns the number of values in the TagKeyValue.
|
||||
|
@ -1309,7 +1301,7 @@ func (t *tagKeyValue) Cardinality() int {
|
|||
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
return len(t.valueIDs)
|
||||
return len(t.entries)
|
||||
}
|
||||
|
||||
// Contains returns true if the TagKeyValue contains value.
|
||||
|
@ -1320,10 +1312,34 @@ func (t *tagKeyValue) Contains(value string) bool {
|
|||
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
_, ok := t.valueIDs[value]
|
||||
_, ok := t.entries[value]
|
||||
return ok
|
||||
}
|
||||
|
||||
// InsertSeriesID adds a series id to the tag key value.
|
||||
func (t *tagKeyValue) InsertSeriesID(value string, id uint64) {
|
||||
t.mu.Lock()
|
||||
entry := t.entries[value]
|
||||
if entry == nil {
|
||||
entry = newTagKeyValueEntry()
|
||||
t.entries[value] = entry
|
||||
}
|
||||
entry.m[id] = struct{}{}
|
||||
t.mu.Unlock()
|
||||
}
|
||||
|
||||
// InsertSeriesIDByte adds a series id to the tag key value.
|
||||
func (t *tagKeyValue) InsertSeriesIDByte(value []byte, id uint64) {
|
||||
t.mu.Lock()
|
||||
entry := t.entries[string(value)]
|
||||
if entry == nil {
|
||||
entry = newTagKeyValueEntry()
|
||||
t.entries[string(value)] = entry
|
||||
}
|
||||
entry.m[id] = struct{}{}
|
||||
t.mu.Unlock()
|
||||
}
|
||||
|
||||
// Load returns the SeriesIDs for the provided tag value.
|
||||
func (t *tagKeyValue) Load(value string) seriesIDs {
|
||||
if t == nil {
|
||||
|
@ -1331,9 +1347,10 @@ func (t *tagKeyValue) Load(value string) seriesIDs {
|
|||
}
|
||||
|
||||
t.mu.RLock()
|
||||
sIDs := t.valueIDs[value]
|
||||
entry := t.entries[value]
|
||||
ids := entry.ids()
|
||||
t.mu.RUnlock()
|
||||
return sIDs
|
||||
return ids
|
||||
}
|
||||
|
||||
// LoadByte returns the SeriesIDs for the provided tag value. It makes use of
|
||||
|
@ -1344,9 +1361,10 @@ func (t *tagKeyValue) LoadByte(value []byte) seriesIDs {
|
|||
}
|
||||
|
||||
t.mu.RLock()
|
||||
sIDs := t.valueIDs[string(value)]
|
||||
entry := t.entries[string(value)]
|
||||
ids := entry.ids()
|
||||
t.mu.RUnlock()
|
||||
return sIDs
|
||||
return ids
|
||||
}
|
||||
|
||||
// Range calls f sequentially on each key and value. A call to Range on a nil
|
||||
|
@ -1360,8 +1378,9 @@ func (t *tagKeyValue) Range(f func(tagValue string, a seriesIDs) bool) {
|
|||
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
for tagValue, a := range t.valueIDs {
|
||||
if !f(tagValue, a) {
|
||||
for tagValue, entry := range t.entries {
|
||||
ids := entry.ids()
|
||||
if !f(tagValue, ids) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -1376,18 +1395,33 @@ func (t *tagKeyValue) RangeAll(f func(k string, a seriesIDs)) {
|
|||
})
|
||||
}
|
||||
|
||||
// Store stores ids under the value key.
|
||||
func (t *tagKeyValue) Store(value string, ids seriesIDs) {
|
||||
t.mu.Lock()
|
||||
t.valueIDs[value] = ids
|
||||
t.mu.Unlock()
|
||||
type tagKeyValueEntry struct {
|
||||
m map[uint64]struct{} // series id set
|
||||
a seriesIDs // lazily sorted list of series.
|
||||
}
|
||||
|
||||
// StoreByte stores ids under the value key.
|
||||
func (t *tagKeyValue) StoreByte(value []byte, ids seriesIDs) {
|
||||
t.mu.Lock()
|
||||
t.valueIDs[string(value)] = ids
|
||||
t.mu.Unlock()
|
||||
func newTagKeyValueEntry() *tagKeyValueEntry {
|
||||
return &tagKeyValueEntry{m: make(map[uint64]struct{})}
|
||||
}
|
||||
|
||||
func (e *tagKeyValueEntry) ids() seriesIDs {
|
||||
if e == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(e.a) == len(e.m) {
|
||||
return e.a
|
||||
}
|
||||
|
||||
a := make(seriesIDs, 0, len(e.m))
|
||||
for id := range e.m {
|
||||
a = append(a, id)
|
||||
}
|
||||
sort.Sort(a)
|
||||
|
||||
e.a = a
|
||||
return e.a
|
||||
|
||||
}
|
||||
|
||||
// SeriesIDs is a convenience type for sorting, checking equality, and doing
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
package tsi1_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// Ensure fileset can return an iterator over all series in the index.
|
||||
|
@ -35,26 +39,12 @@ func TestFileSet_SeriesIDIterator(t *testing.T) {
|
|||
if itr == nil {
|
||||
t.Fatal("expected iterator")
|
||||
}
|
||||
|
||||
if elem, err := itr.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` {
|
||||
t.Fatalf("unexpected series: %s/%s", name, tags.String())
|
||||
}
|
||||
if elem, err := itr.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` {
|
||||
t.Fatalf("unexpected series: %s/%s", name, tags.String())
|
||||
}
|
||||
if elem, err := itr.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `mem` || tags.String() != `[{region east}]` {
|
||||
t.Fatalf("unexpected series: %s/%s", name, tags.String())
|
||||
}
|
||||
if elem, err := itr.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if elem.SeriesID != 0 {
|
||||
t.Fatalf("expected eof, got: %d", elem.SeriesID)
|
||||
if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{
|
||||
"cpu,[{region east}]",
|
||||
"cpu,[{region west}]",
|
||||
"mem,[{region east}]",
|
||||
}) {
|
||||
t.Fatalf("unexpected keys: %s", result)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -80,37 +70,14 @@ func TestFileSet_SeriesIDIterator(t *testing.T) {
|
|||
t.Fatal("expected iterator")
|
||||
}
|
||||
|
||||
allexpected := []struct {
|
||||
name string
|
||||
tagset string
|
||||
}{
|
||||
{`cpu`, `[{region east}]`},
|
||||
{`cpu`, `[{region west}]`},
|
||||
{`mem`, `[{region east}]`},
|
||||
{`disk`, `[]`},
|
||||
{`cpu`, `[{region north}]`},
|
||||
}
|
||||
|
||||
for _, expected := range allexpected {
|
||||
e, err := itr.Next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if name, tags := fs.SeriesFile().Series(e.SeriesID); string(name) != expected.name || tags.String() != expected.tagset {
|
||||
t.Fatalf("unexpected series: %s/%s", name, tags.String())
|
||||
}
|
||||
}
|
||||
|
||||
// Check for end of iterator...
|
||||
e, err := itr.Next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if e.SeriesID != 0 {
|
||||
name, tags := fs.SeriesFile().Series(e.SeriesID)
|
||||
t.Fatalf("got: %s/%s, but expected EOF", name, tags.String())
|
||||
if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{
|
||||
"cpu,[{region east}]",
|
||||
"cpu,[{region north}]",
|
||||
"cpu,[{region west}]",
|
||||
"disk,[]",
|
||||
"mem,[{region east}]",
|
||||
}) {
|
||||
t.Fatalf("unexpected keys: %s", result)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -145,20 +112,11 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
|
|||
t.Fatal("expected iterator")
|
||||
}
|
||||
|
||||
if elem, err := itr.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` {
|
||||
t.Fatalf("unexpected series: %s/%s", name, tags.String())
|
||||
}
|
||||
if elem, err := itr.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` {
|
||||
t.Fatalf("unexpected series: %s/%s", name, tags.String())
|
||||
}
|
||||
if elem, err := itr.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if elem.SeriesID != 0 {
|
||||
t.Fatalf("expected eof, got: %d", elem.SeriesID)
|
||||
if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{
|
||||
"cpu,[{region east}]",
|
||||
"cpu,[{region west}]",
|
||||
}) {
|
||||
t.Fatalf("unexpected keys: %s", result)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -183,25 +141,12 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
|
|||
t.Fatalf("expected iterator")
|
||||
}
|
||||
|
||||
if elem, err := itr.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` {
|
||||
t.Fatalf("unexpected series: %s/%s", name, tags.String())
|
||||
}
|
||||
if elem, err := itr.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` {
|
||||
t.Fatalf("unexpected series: %s/%s", name, tags.String())
|
||||
}
|
||||
if elem, err := itr.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region north}]` {
|
||||
t.Fatalf("unexpected series: %s/%s", name, tags.String())
|
||||
}
|
||||
if elem, err := itr.Next(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if elem.SeriesID != 0 {
|
||||
t.Fatalf("expected eof, got: %d", elem.SeriesID)
|
||||
if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{
|
||||
"cpu,[{region east}]",
|
||||
"cpu,[{region north}]",
|
||||
"cpu,[{region west}]",
|
||||
}) {
|
||||
t.Fatalf("unexpected keys: %s", result)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -348,3 +293,23 @@ func TestFileSet_TagKeyIterator(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func MustReadAllSeriesIDIteratorString(sfile *tsdb.SeriesFile, itr tsdb.SeriesIDIterator) []string {
|
||||
// Read all ids.
|
||||
ids, err := tsdb.ReadAllSeriesIDIterator(itr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Convert to keys and sort.
|
||||
keys := sfile.SeriesKeys(ids)
|
||||
sort.Slice(keys, func(i, j int) bool { return tsdb.CompareSeriesKeys(keys[i], keys[j]) == -1 })
|
||||
|
||||
// Convert to strings.
|
||||
a := make([]string, len(keys))
|
||||
for i := range a {
|
||||
name, tags := tsdb.ParseSeriesKey(keys[i])
|
||||
a[i] = fmt.Sprintf("%s,%s", name, tags.String())
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
|
|
@ -245,16 +245,13 @@ func (f *IndexFile) TagValueSeriesIDIterator(name, key, value []byte) tsdb.Serie
|
|||
}
|
||||
|
||||
// Find value element.
|
||||
ve := tblk.TagValueElem(key, value)
|
||||
if ve == nil {
|
||||
n, data := tblk.TagValueSeriesData(key, value)
|
||||
if n == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create an iterator over value's series.
|
||||
return &rawSeriesIDIterator{
|
||||
n: ve.(*TagBlockValueElem).series.n,
|
||||
data: ve.(*TagBlockValueElem).series.data,
|
||||
}
|
||||
return &rawSeriesIDIterator{n: n, data: data}
|
||||
}
|
||||
|
||||
// TagKey returns a tag key.
|
||||
|
|
|
@ -306,8 +306,8 @@ func TestIndex_DiskSizeBytes(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify on disk size is the same in each stage.
|
||||
// There are four series, and each series id is 8 bytes plus one byte for the tombstone header
|
||||
expSize := int64(4 * 9)
|
||||
// Each series stores flag(1) + series(uvarint(2)) + len(name)(1) + len(key)(1) + len(value)(1) + checksum(4).
|
||||
expSize := int64(4 * 10)
|
||||
|
||||
// Each MANIFEST file is 419 bytes and there are tsi1.DefaultPartitionN of them
|
||||
expSize += int64(tsi1.DefaultPartitionN * 419)
|
||||
|
|
|
@ -90,6 +90,14 @@ func (blk *TagBlock) UnmarshalBinary(data []byte) error {
|
|||
// TagKeyElem returns an element for a tag key.
|
||||
// Returns an element with a nil key if not found.
|
||||
func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem {
|
||||
var elem TagBlockKeyElem
|
||||
if !blk.DecodeTagKeyElem(key, &elem) {
|
||||
return nil
|
||||
}
|
||||
return &elem
|
||||
}
|
||||
|
||||
func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool {
|
||||
keyN := int64(binary.BigEndian.Uint64(blk.hashData[:TagKeyNSize]))
|
||||
hash := rhh.HashKey(key)
|
||||
pos := hash % keyN
|
||||
|
@ -100,21 +108,20 @@ func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem {
|
|||
// Find offset of tag key.
|
||||
offset := binary.BigEndian.Uint64(blk.hashData[TagKeyNSize+(pos*TagKeyOffsetSize):])
|
||||
if offset == 0 {
|
||||
return nil
|
||||
return false
|
||||
}
|
||||
|
||||
// Parse into element.
|
||||
var e TagBlockKeyElem
|
||||
e.unmarshal(blk.data[offset:], blk.data)
|
||||
elem.unmarshal(blk.data[offset:], blk.data)
|
||||
|
||||
// Return if keys match.
|
||||
if bytes.Equal(e.key, key) {
|
||||
return &e
|
||||
if bytes.Equal(elem.key, key) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check if we've exceeded the probe distance.
|
||||
if d > rhh.Dist(rhh.HashKey(e.key), pos, keyN) {
|
||||
return nil
|
||||
if d > rhh.Dist(rhh.HashKey(elem.key), pos, keyN) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Move position forward.
|
||||
|
@ -122,21 +129,39 @@ func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem {
|
|||
d++
|
||||
|
||||
if d > keyN {
|
||||
return nil
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TagValueElem returns an element for a tag value.
|
||||
func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem {
|
||||
// Find key element, exit if not found.
|
||||
kelem, _ := blk.TagKeyElem(key).(*TagBlockKeyElem)
|
||||
if kelem == nil {
|
||||
var valueElem TagBlockValueElem
|
||||
if !blk.DecodeTagValueElem(key, value, &valueElem) {
|
||||
return nil
|
||||
}
|
||||
return &valueElem
|
||||
}
|
||||
|
||||
// TagValueElem returns an element for a tag value.
|
||||
func (blk *TagBlock) TagValueSeriesData(key, value []byte) (uint64, []byte) {
|
||||
var valueElem TagBlockValueElem
|
||||
if !blk.DecodeTagValueElem(key, value, &valueElem) {
|
||||
return 0, nil
|
||||
}
|
||||
return valueElem.series.n, valueElem.series.data
|
||||
}
|
||||
|
||||
// DecodeTagValueElem returns an element for a tag value.
|
||||
func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockValueElem) bool {
|
||||
// Find key element, exit if not found.
|
||||
var keyElem TagBlockKeyElem
|
||||
if !blk.DecodeTagKeyElem(key, &keyElem) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Slice hash index data.
|
||||
hashData := kelem.hashIndex.buf
|
||||
hashData := keyElem.hashIndex.buf
|
||||
|
||||
valueN := int64(binary.BigEndian.Uint64(hashData[:TagValueNSize]))
|
||||
hash := rhh.HashKey(value)
|
||||
|
@ -148,22 +173,21 @@ func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem {
|
|||
// Find offset of tag value.
|
||||
offset := binary.BigEndian.Uint64(hashData[TagValueNSize+(pos*TagValueOffsetSize):])
|
||||
if offset == 0 {
|
||||
return nil
|
||||
return false
|
||||
}
|
||||
|
||||
// Parse into element.
|
||||
var e TagBlockValueElem
|
||||
e.unmarshal(blk.data[offset:])
|
||||
valueElem.unmarshal(blk.data[offset:])
|
||||
|
||||
// Return if values match.
|
||||
if bytes.Equal(e.value, value) {
|
||||
return &e
|
||||
if bytes.Equal(valueElem.value, value) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check if we've exceeded the probe distance.
|
||||
max := rhh.Dist(rhh.HashKey(e.value), pos, valueN)
|
||||
max := rhh.Dist(rhh.HashKey(valueElem.value), pos, valueN)
|
||||
if d > max {
|
||||
return nil
|
||||
return false
|
||||
}
|
||||
|
||||
// Move position forward.
|
||||
|
@ -171,7 +195,7 @@ func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem {
|
|||
d++
|
||||
|
||||
if d > valueN {
|
||||
return nil
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,42 +5,34 @@ import (
|
|||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
"sort"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/rhh"
|
||||
"github.com/influxdata/influxdb/pkg/binaryutil"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrSeriesFileClosed = errors.New("tsdb: series file closed")
|
||||
ErrSeriesFileClosed = errors.New("tsdb: series file closed")
|
||||
ErrInvalidSeriesPartitionID = errors.New("tsdb: invalid series partition id")
|
||||
)
|
||||
|
||||
// SeriesIDSize is the size in bytes of a series key ID.
|
||||
const SeriesIDSize = 8
|
||||
|
||||
// DefaultSeriesFileCompactThreshold is the number of series IDs to hold in the in-memory
|
||||
// series map before compacting and rebuilding the on-disk representation.
|
||||
const DefaultSeriesFileCompactThreshold = 1 << 20 // 1M
|
||||
const (
|
||||
// SeriesFilePartitionN is the number of partitions a series file is split into.
|
||||
SeriesFilePartitionN = 8
|
||||
)
|
||||
|
||||
// SeriesFile represents the section of the index that holds series data.
|
||||
type SeriesFile struct {
|
||||
mu sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
path string
|
||||
closed bool
|
||||
|
||||
segments []*SeriesSegment
|
||||
index *SeriesIndex
|
||||
seq uint64 // series id sequence
|
||||
|
||||
compacting bool
|
||||
|
||||
CompactThreshold int
|
||||
path string
|
||||
partitions []*SeriesPartition
|
||||
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
@ -48,83 +40,28 @@ type SeriesFile struct {
|
|||
// NewSeriesFile returns a new instance of SeriesFile.
|
||||
func NewSeriesFile(path string) *SeriesFile {
|
||||
return &SeriesFile{
|
||||
path: path,
|
||||
CompactThreshold: DefaultSeriesFileCompactThreshold,
|
||||
Logger: zap.NewNop(),
|
||||
path: path,
|
||||
Logger: zap.NewNop(),
|
||||
}
|
||||
}
|
||||
|
||||
// Open memory maps the data file at the file's path.
|
||||
func (f *SeriesFile) Open() error {
|
||||
if f.closed {
|
||||
return errors.New("tsdb: cannot reopen series file")
|
||||
}
|
||||
|
||||
// Create path if it doesn't exist.
|
||||
if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Open components.
|
||||
if err := func() (err error) {
|
||||
if err := f.openSegments(); err != nil {
|
||||
// Open partitions.
|
||||
f.partitions = make([]*SeriesPartition, 0, SeriesFilePartitionN)
|
||||
for i := 0; i < SeriesFilePartitionN; i++ {
|
||||
p := NewSeriesPartition(i, f.SeriesPartitionPath(i))
|
||||
p.Logger = f.Logger.With(zap.Int("partition", p.ID()))
|
||||
if err := p.Open(); err != nil {
|
||||
f.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Init last segment for writes.
|
||||
if err := f.activeSegment().InitForWrite(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f.index = NewSeriesIndex(f.IndexPath())
|
||||
if err := f.index.Open(); err != nil {
|
||||
return err
|
||||
} else if f.index.Recover(f.segments); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
f.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *SeriesFile) openSegments() error {
|
||||
fis, err := ioutil.ReadDir(f.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, fi := range fis {
|
||||
segmentID, err := ParseSeriesSegmentFilename(fi.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
segment := NewSeriesSegment(segmentID, filepath.Join(f.path, fi.Name()))
|
||||
if err := segment.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
f.segments = append(f.segments, segment)
|
||||
}
|
||||
|
||||
// Find max series id by searching segments in reverse order.
|
||||
for i := len(f.segments) - 1; i >= 0; i-- {
|
||||
if f.seq = f.segments[i].MaxSeriesID(); f.seq > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Create initial segment if none exist.
|
||||
if len(f.segments) == 0 {
|
||||
segment, err := CreateSeriesSegment(0, filepath.Join(f.path, "0000"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f.segments = append(f.segments, segment)
|
||||
f.partitions = append(f.partitions, p)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -132,172 +69,63 @@ func (f *SeriesFile) openSegments() error {
|
|||
|
||||
// Close unmaps the data file.
|
||||
func (f *SeriesFile) Close() (err error) {
|
||||
f.wg.Wait()
|
||||
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
f.closed = true
|
||||
|
||||
for _, s := range f.segments {
|
||||
if e := s.Close(); e != nil && err == nil {
|
||||
for _, p := range f.partitions {
|
||||
if e := p.Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
f.segments = nil
|
||||
|
||||
if f.index != nil {
|
||||
if e := f.index.Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
f.index = nil
|
||||
|
||||
f.partitions = nil
|
||||
return err
|
||||
}
|
||||
|
||||
// Path returns the path to the file.
|
||||
func (f *SeriesFile) Path() string { return f.path }
|
||||
|
||||
// Path returns the path to the series index.
|
||||
func (f *SeriesFile) IndexPath() string { return filepath.Join(f.path, "index") }
|
||||
// SeriesPartitionPath returns the path to a given partition.
|
||||
func (f *SeriesFile) SeriesPartitionPath(i int) string {
|
||||
return filepath.Join(f.path, fmt.Sprintf("%02x", i))
|
||||
}
|
||||
|
||||
// Partitions returns all partitions.
|
||||
func (f *SeriesFile) Partitions() []*SeriesPartition { return f.partitions }
|
||||
|
||||
// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
|
||||
// The returned ids list returns values for new series and zero for existing series.
|
||||
func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []uint64, err error) {
|
||||
f.mu.RLock()
|
||||
if f.closed {
|
||||
f.mu.RUnlock()
|
||||
return nil, ErrSeriesFileClosed
|
||||
keys := GenerateSeriesKeys(names, tagsSlice)
|
||||
keyPartitionIDs := f.SeriesKeysPartitionIDs(keys)
|
||||
ids = make([]uint64, len(keys))
|
||||
|
||||
var g errgroup.Group
|
||||
for i := range f.partitions {
|
||||
p := f.partitions[i]
|
||||
g.Go(func() error {
|
||||
return p.CreateSeriesListIfNotExists(keys, keyPartitionIDs, ids)
|
||||
})
|
||||
}
|
||||
ids, ok := f.index.FindIDListByNameTags(f.segments, names, tagsSlice, buf)
|
||||
if ok {
|
||||
f.mu.RUnlock()
|
||||
return ids, nil
|
||||
if err := g.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.mu.RUnlock()
|
||||
|
||||
type keyRange struct {
|
||||
id uint64
|
||||
offset int64
|
||||
}
|
||||
newKeyRanges := make([]keyRange, 0, len(names))
|
||||
|
||||
// Obtain write lock to create new series.
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
if f.closed {
|
||||
return nil, ErrSeriesFileClosed
|
||||
}
|
||||
|
||||
// Track offsets of duplicate series.
|
||||
newIDs := make(map[string]uint64, len(ids))
|
||||
|
||||
for i := range names {
|
||||
// Skip series that have already been created.
|
||||
if ids[i] != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Generate series key.
|
||||
buf = AppendSeriesKey(buf[:0], names[i], tagsSlice[i])
|
||||
|
||||
// Re-attempt lookup under write lock.
|
||||
if ids[i] = newIDs[string(buf)]; ids[i] != 0 {
|
||||
continue
|
||||
} else if ids[i] = f.index.FindIDByNameTags(f.segments, names[i], tagsSlice[i], buf); ids[i] != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Write to series log and save offset.
|
||||
id, offset, err := f.insert(buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Append new key to be added to hash map after flush.
|
||||
ids[i] = id
|
||||
newIDs[string(buf)] = id
|
||||
newKeyRanges = append(newKeyRanges, keyRange{id, offset})
|
||||
}
|
||||
|
||||
// Flush active segment writes so we can access data in mmap.
|
||||
if segment := f.activeSegment(); segment != nil {
|
||||
if err := segment.Flush(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Add keys to hash map(s).
|
||||
for _, keyRange := range newKeyRanges {
|
||||
f.index.Insert(f.seriesKeyByOffset(keyRange.offset), keyRange.id, keyRange.offset)
|
||||
}
|
||||
|
||||
// Check if we've crossed the compaction threshold.
|
||||
if !f.compacting && f.CompactThreshold != 0 && f.index.InMemCount() >= uint64(f.CompactThreshold) {
|
||||
f.compacting = true
|
||||
logger := f.Logger.With(zap.String("path", f.path))
|
||||
logger.Info("beginning series file compaction")
|
||||
|
||||
startTime := time.Now()
|
||||
f.wg.Add(1)
|
||||
go func() {
|
||||
defer f.wg.Done()
|
||||
|
||||
if err := NewSeriesFileCompactor().Compact(f); err != nil {
|
||||
logger.With(zap.Error(err)).Error("series file compaction failed")
|
||||
}
|
||||
|
||||
logger.With(zap.Duration("elapsed", time.Since(startTime))).Info("completed series file compaction")
|
||||
|
||||
// Clear compaction flag.
|
||||
f.mu.Lock()
|
||||
f.compacting = false
|
||||
f.mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
// DeleteSeriesID flags a series as permanently deleted.
|
||||
// If the series is reintroduced later then it must create a new id.
|
||||
func (f *SeriesFile) DeleteSeriesID(id uint64) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
if f.closed {
|
||||
return ErrSeriesFileClosed
|
||||
p := f.SeriesIDPartition(id)
|
||||
if p == nil {
|
||||
return ErrInvalidSeriesPartitionID
|
||||
}
|
||||
|
||||
// Already tombstoned, ignore.
|
||||
if f.index.IsDeleted(id) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write tombstone entry.
|
||||
_, err := f.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryTombstoneFlag, id, nil))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Mark tombstone in memory.
|
||||
f.index.Delete(id)
|
||||
|
||||
return nil
|
||||
return p.DeleteSeriesID(id)
|
||||
}
|
||||
|
||||
// IsDeleted returns true if the ID has been deleted before.
|
||||
func (f *SeriesFile) IsDeleted(id uint64) bool {
|
||||
f.mu.RLock()
|
||||
if f.closed {
|
||||
f.mu.RUnlock()
|
||||
p := f.SeriesIDPartition(id)
|
||||
if p == nil {
|
||||
return false
|
||||
}
|
||||
v := f.index.IsDeleted(id)
|
||||
f.mu.RUnlock()
|
||||
return v
|
||||
return p.IsDeleted(id)
|
||||
}
|
||||
|
||||
// SeriesKey returns the series key for a given id.
|
||||
|
@ -305,14 +133,20 @@ func (f *SeriesFile) SeriesKey(id uint64) []byte {
|
|||
if id == 0 {
|
||||
return nil
|
||||
}
|
||||
f.mu.RLock()
|
||||
if f.closed {
|
||||
f.mu.RUnlock()
|
||||
p := f.SeriesIDPartition(id)
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
key := f.seriesKeyByOffset(f.index.FindOffsetByID(id))
|
||||
f.mu.RUnlock()
|
||||
return key
|
||||
return p.SeriesKey(id)
|
||||
}
|
||||
|
||||
// SeriesKeys returns a list of series keys from a list of ids.
|
||||
func (f *SeriesFile) SeriesKeys(ids []uint64) [][]byte {
|
||||
keys := make([][]byte, len(ids))
|
||||
for i := range ids {
|
||||
keys[i] = f.SeriesKey(ids[i])
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// Series returns the parsed series name and tags for an offset.
|
||||
|
@ -326,14 +160,12 @@ func (f *SeriesFile) Series(id uint64) ([]byte, models.Tags) {
|
|||
|
||||
// SeriesID return the series id for the series.
|
||||
func (f *SeriesFile) SeriesID(name []byte, tags models.Tags, buf []byte) uint64 {
|
||||
f.mu.RLock()
|
||||
if f.closed {
|
||||
f.mu.RUnlock()
|
||||
key := AppendSeriesKey(buf[:0], name, tags)
|
||||
keyPartition := f.SeriesKeyPartition(key)
|
||||
if keyPartition == nil {
|
||||
return 0
|
||||
}
|
||||
id := f.index.FindIDBySeriesKey(f.segments, AppendSeriesKey(buf[:0], name, tags))
|
||||
f.mu.RUnlock()
|
||||
return id
|
||||
return keyPartition.FindIDBySeriesKey(key)
|
||||
}
|
||||
|
||||
// HasSeries return true if the series exists.
|
||||
|
@ -343,104 +175,53 @@ func (f *SeriesFile) HasSeries(name []byte, tags models.Tags, buf []byte) bool {
|
|||
|
||||
// SeriesCount returns the number of series.
|
||||
func (f *SeriesFile) SeriesCount() uint64 {
|
||||
f.mu.RLock()
|
||||
if f.closed {
|
||||
f.mu.RUnlock()
|
||||
return 0
|
||||
var n uint64
|
||||
for _, p := range f.partitions {
|
||||
n += p.SeriesCount()
|
||||
}
|
||||
n := f.index.Count()
|
||||
f.mu.RUnlock()
|
||||
return n
|
||||
}
|
||||
|
||||
// SeriesIterator returns an iterator over all the series.
|
||||
func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator {
|
||||
var ids []uint64
|
||||
for _, segment := range f.segments {
|
||||
ids = segment.AppendSeriesIDs(ids)
|
||||
for _, p := range f.partitions {
|
||||
ids = p.AppendSeriesIDs(ids)
|
||||
}
|
||||
sort.Sort(uint64Slice(ids))
|
||||
return NewSeriesIDSliceIterator(ids)
|
||||
}
|
||||
|
||||
// activeSegment returns the last segment.
|
||||
func (f *SeriesFile) activeSegment() *SeriesSegment {
|
||||
if len(f.segments) == 0 {
|
||||
func (f *SeriesFile) SeriesIDPartitionID(id uint64) int {
|
||||
return int(id & 0xFF)
|
||||
}
|
||||
|
||||
func (f *SeriesFile) SeriesIDPartition(id uint64) *SeriesPartition {
|
||||
partitionID := f.SeriesIDPartitionID(id)
|
||||
if partitionID >= len(f.partitions) {
|
||||
return nil
|
||||
}
|
||||
return f.segments[len(f.segments)-1]
|
||||
return f.partitions[partitionID]
|
||||
}
|
||||
|
||||
func (f *SeriesFile) insert(key []byte) (id uint64, offset int64, err error) {
|
||||
id = f.seq + 1
|
||||
|
||||
offset, err = f.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryInsertFlag, id, key))
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
func (f *SeriesFile) SeriesKeysPartitionIDs(keys [][]byte) []int {
|
||||
partitionIDs := make([]int, len(keys))
|
||||
for i := range keys {
|
||||
partitionIDs[i] = f.SeriesKeyPartitionID(keys[i])
|
||||
}
|
||||
|
||||
f.seq++
|
||||
return id, offset, nil
|
||||
return partitionIDs
|
||||
}
|
||||
|
||||
// writeLogEntry appends an entry to the end of the active segment.
|
||||
// If there is no more room in the segment then a new segment is added.
|
||||
func (f *SeriesFile) writeLogEntry(data []byte) (offset int64, err error) {
|
||||
segment := f.activeSegment()
|
||||
if segment == nil || !segment.CanWrite(data) {
|
||||
if segment, err = f.createSegment(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
return segment.WriteLogEntry(data)
|
||||
func (f *SeriesFile) SeriesKeyPartitionID(key []byte) int {
|
||||
return int(xxhash.Sum64(key) % SeriesFilePartitionN)
|
||||
}
|
||||
|
||||
// createSegment appends a new segment
|
||||
func (f *SeriesFile) createSegment() (*SeriesSegment, error) {
|
||||
// Close writer for active segment, if one exists.
|
||||
if segment := f.activeSegment(); segment != nil {
|
||||
if err := segment.CloseForWrite(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Generate a new sequential segment identifier.
|
||||
var id uint16
|
||||
if len(f.segments) > 0 {
|
||||
id = f.segments[len(f.segments)-1].ID() + 1
|
||||
}
|
||||
filename := fmt.Sprintf("%04x", id)
|
||||
|
||||
// Generate new empty segment.
|
||||
segment, err := CreateSeriesSegment(id, filepath.Join(f.path, filename))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.segments = append(f.segments, segment)
|
||||
|
||||
// Allow segment to write.
|
||||
if err := segment.InitForWrite(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return segment, nil
|
||||
}
|
||||
|
||||
func (f *SeriesFile) seriesKeyByOffset(offset int64) []byte {
|
||||
if offset == 0 {
|
||||
func (f *SeriesFile) SeriesKeyPartition(key []byte) *SeriesPartition {
|
||||
partitionID := f.SeriesKeyPartitionID(key)
|
||||
if partitionID >= len(f.partitions) {
|
||||
return nil
|
||||
}
|
||||
|
||||
segmentID, pos := SplitSeriesOffset(offset)
|
||||
for _, segment := range f.segments {
|
||||
if segment.ID() != segmentID {
|
||||
continue
|
||||
}
|
||||
|
||||
key, _ := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize))
|
||||
return key
|
||||
}
|
||||
|
||||
return nil
|
||||
return f.partitions[partitionID]
|
||||
}
|
||||
|
||||
// AppendSeriesKey serializes name and tags to a byte slice.
|
||||
|
@ -598,6 +379,41 @@ func CompareSeriesKeys(a, b []byte) int {
|
|||
}
|
||||
}
|
||||
|
||||
// GenerateSeriesKeys generates series keys for a list of names & tags using
|
||||
// a single large memory block.
|
||||
func GenerateSeriesKeys(names [][]byte, tagsSlice []models.Tags) [][]byte {
|
||||
buf := make([]byte, 0, SeriesKeysSize(names, tagsSlice))
|
||||
keys := make([][]byte, len(names))
|
||||
for i := range names {
|
||||
offset := len(buf)
|
||||
buf = AppendSeriesKey(buf, names[i], tagsSlice[i])
|
||||
keys[i] = buf[offset:]
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// SeriesKeysSize returns the number of bytes required to encode a list of name/tags.
|
||||
func SeriesKeysSize(names [][]byte, tagsSlice []models.Tags) int {
|
||||
var n int
|
||||
for i := range names {
|
||||
n += SeriesKeySize(names[i], tagsSlice[i])
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// SeriesKeySize returns the number of bytes required to encode a series key.
|
||||
func SeriesKeySize(name []byte, tags models.Tags) int {
|
||||
var n int
|
||||
n += 2 + len(name)
|
||||
n += binaryutil.UvarintSize(uint64(len(tags)))
|
||||
for _, tag := range tags {
|
||||
n += 2 + len(tag.Key)
|
||||
n += 2 + len(tag.Value)
|
||||
}
|
||||
n += binaryutil.UvarintSize(uint64(n))
|
||||
return n
|
||||
}
|
||||
|
||||
type seriesKeys [][]byte
|
||||
|
||||
func (a seriesKeys) Len() int { return len(a) }
|
||||
|
@ -606,216 +422,8 @@ func (a seriesKeys) Less(i, j int) bool {
|
|||
return CompareSeriesKeys(a[i], a[j]) == -1
|
||||
}
|
||||
|
||||
// SeriesFileCompactor represents an object reindexes a series file and optionally compacts segments.
|
||||
type SeriesFileCompactor struct{}
|
||||
type uint64Slice []uint64
|
||||
|
||||
// NewSeriesFileCompactor returns a new instance of SeriesFileCompactor.
|
||||
func NewSeriesFileCompactor() *SeriesFileCompactor {
|
||||
return &SeriesFileCompactor{}
|
||||
}
|
||||
|
||||
// Compact rebuilds the series file index.
|
||||
func (c *SeriesFileCompactor) Compact(f *SeriesFile) error {
|
||||
// Snapshot the partitions and index so we can check tombstones and replay at the end under lock.
|
||||
f.mu.RLock()
|
||||
segments := CloneSeriesSegments(f.segments)
|
||||
index := f.index.Clone()
|
||||
seriesN := f.index.Count()
|
||||
f.mu.RUnlock()
|
||||
|
||||
// Compact index to a temporary location.
|
||||
indexPath := index.path + ".compacting"
|
||||
if err := c.compactIndexTo(index, seriesN, segments, indexPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Swap compacted index under lock & replay since compaction.
|
||||
if err := func() error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
// Reopen index with new file.
|
||||
if err := f.index.Close(); err != nil {
|
||||
return err
|
||||
} else if err := os.Rename(indexPath, index.path); err != nil {
|
||||
return err
|
||||
} else if err := f.index.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Replay new entries.
|
||||
if err := f.index.Recover(f.segments); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *SeriesFileCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error {
|
||||
hdr := NewSeriesIndexHeader()
|
||||
hdr.Count = seriesN
|
||||
hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor)
|
||||
|
||||
// Allocate space for maps.
|
||||
keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
|
||||
idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
|
||||
|
||||
// Reindex all partitions.
|
||||
for _, segment := range segments {
|
||||
errDone := errors.New("done")
|
||||
|
||||
if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
|
||||
// Make sure we don't go past the offset where the compaction began.
|
||||
if offset >= index.maxOffset {
|
||||
return errDone
|
||||
}
|
||||
|
||||
// Only process insert entries.
|
||||
switch flag {
|
||||
case SeriesEntryInsertFlag: // fallthrough
|
||||
case SeriesEntryTombstoneFlag:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("unexpected series file log entry flag: %d", flag)
|
||||
}
|
||||
|
||||
// Ignore entry if tombstoned.
|
||||
if index.IsDeleted(id) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Save max series identifier processed.
|
||||
hdr.MaxSeriesID, hdr.MaxOffset = id, offset
|
||||
|
||||
// Insert into maps.
|
||||
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
|
||||
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
|
||||
}); err == errDone {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Open file handler.
|
||||
f, err := os.Create(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Calculate map positions.
|
||||
hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = SeriesIndexHeaderSize, int64(len(keyIDMap))
|
||||
hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size, int64(len(idOffsetMap))
|
||||
|
||||
// Write header.
|
||||
if _, err := hdr.WriteTo(f); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write maps.
|
||||
if _, err := f.Write(keyIDMap); err != nil {
|
||||
return err
|
||||
} else if _, err := f.Write(idOffsetMap); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Sync & close.
|
||||
if err := f.Sync(); err != nil {
|
||||
return err
|
||||
} else if err := f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *SeriesFileCompactor) insertKeyIDMap(dst []byte, capacity int64, segments []*SeriesSegment, key []byte, offset int64, id uint64) error {
|
||||
mask := capacity - 1
|
||||
hash := rhh.HashKey(key)
|
||||
|
||||
// Continue searching until we find an empty slot or lower probe distance.
|
||||
for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask {
|
||||
assert(i <= capacity, "key/id map full")
|
||||
elem := dst[(pos * SeriesIndexElemSize):]
|
||||
|
||||
// If empty slot found or matching offset, insert and exit.
|
||||
elemOffset := int64(binary.BigEndian.Uint64(elem[:8]))
|
||||
elemID := binary.BigEndian.Uint64(elem[8:])
|
||||
if elemOffset == 0 || elemOffset == offset {
|
||||
binary.BigEndian.PutUint64(elem[:8], uint64(offset))
|
||||
binary.BigEndian.PutUint64(elem[8:], id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read key at position & hash.
|
||||
elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize)
|
||||
elemHash := rhh.HashKey(elemKey)
|
||||
|
||||
// If the existing elem has probed less than us, then swap places with
|
||||
// existing elem, and keep going to find another slot for that elem.
|
||||
if d := rhh.Dist(elemHash, pos, capacity); d < dist {
|
||||
// Insert current values.
|
||||
binary.BigEndian.PutUint64(elem[:8], uint64(offset))
|
||||
binary.BigEndian.PutUint64(elem[8:], id)
|
||||
|
||||
// Swap with values in that position.
|
||||
hash, key, offset, id = elemHash, elemKey, elemOffset, elemID
|
||||
|
||||
// Update current distance.
|
||||
dist = d
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *SeriesFileCompactor) insertIDOffsetMap(dst []byte, capacity int64, id uint64, offset int64) {
|
||||
mask := capacity - 1
|
||||
hash := rhh.HashUint64(id)
|
||||
|
||||
// Continue searching until we find an empty slot or lower probe distance.
|
||||
for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask {
|
||||
assert(i <= capacity, "id/offset map full")
|
||||
elem := dst[(pos * SeriesIndexElemSize):]
|
||||
|
||||
// If empty slot found or matching id, insert and exit.
|
||||
elemID := binary.BigEndian.Uint64(elem[:8])
|
||||
elemOffset := int64(binary.BigEndian.Uint64(elem[8:]))
|
||||
if elemOffset == 0 || elemOffset == offset {
|
||||
binary.BigEndian.PutUint64(elem[:8], id)
|
||||
binary.BigEndian.PutUint64(elem[8:], uint64(offset))
|
||||
return
|
||||
}
|
||||
|
||||
// Hash key.
|
||||
elemHash := rhh.HashUint64(elemID)
|
||||
|
||||
// If the existing elem has probed less than us, then swap places with
|
||||
// existing elem, and keep going to find another slot for that elem.
|
||||
if d := rhh.Dist(elemHash, pos, capacity); d < dist {
|
||||
// Insert current values.
|
||||
binary.BigEndian.PutUint64(elem[:8], id)
|
||||
binary.BigEndian.PutUint64(elem[8:], uint64(offset))
|
||||
|
||||
// Swap with values in that position.
|
||||
hash, id, offset = elemHash, elemID, elemOffset
|
||||
|
||||
// Update current distance.
|
||||
dist = d
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pow2 returns the number that is the next highest power of 2.
|
||||
// Returns v if it is a power of 2.
|
||||
func pow2(v int64) int64 {
|
||||
for i := int64(2); i < 1<<62; i *= 2 {
|
||||
if i >= v {
|
||||
return i
|
||||
}
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
func (a uint64Slice) Len() int { return len(a) }
|
||||
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
|
||||
|
|
|
@ -48,9 +48,13 @@ func TestSeriesFile_Series(t *testing.T) {
|
|||
// Ensure series file can be compacted.
|
||||
func TestSeriesFileCompactor(t *testing.T) {
|
||||
sfile := MustOpenSeriesFile()
|
||||
sfile.CompactThreshold = 0
|
||||
defer sfile.Close()
|
||||
|
||||
// Disable automatic compactions.
|
||||
for _, p := range sfile.Partitions() {
|
||||
p.CompactThreshold = 0
|
||||
}
|
||||
|
||||
var names [][]byte
|
||||
var tagsSlice []models.Tags
|
||||
for i := 0; i < 10000; i++ {
|
||||
|
@ -66,10 +70,12 @@ func TestSeriesFileCompactor(t *testing.T) {
|
|||
t.Fatalf("unexpected series count: %d", n)
|
||||
}
|
||||
|
||||
// Compact in-place.
|
||||
compactor := tsdb.NewSeriesFileCompactor()
|
||||
if err := compactor.Compact(sfile.SeriesFile); err != nil {
|
||||
t.Fatal(err)
|
||||
// Compact in-place for each partition.
|
||||
for _, p := range sfile.Partitions() {
|
||||
compactor := tsdb.NewSeriesPartitionCompactor()
|
||||
if err := compactor.Compact(p); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify all series exist.
|
||||
|
|
|
@ -182,7 +182,7 @@ func (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byt
|
|||
|
||||
func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) uint64 {
|
||||
if v := idx.keyIDMap.Get(key); v != nil {
|
||||
if id, _ := v.(uint64); id != 0 {
|
||||
if id, _ := v.(uint64); id != 0 && !idx.IsDeleted(id) {
|
||||
return id
|
||||
}
|
||||
}
|
||||
|
@ -204,7 +204,11 @@ func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte)
|
|||
if d > rhh.Dist(elemHash, pos, idx.capacity) {
|
||||
return 0
|
||||
} else if elemHash == hash && bytes.Equal(elemKey, key) {
|
||||
return binary.BigEndian.Uint64(elem[8:])
|
||||
id := binary.BigEndian.Uint64(elem[8:])
|
||||
if idx.IsDeleted(id) {
|
||||
return 0
|
||||
}
|
||||
return id
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,665 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/rhh"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrSeriesPartitionClosed = errors.New("tsdb: series partition closed")
|
||||
)
|
||||
|
||||
// DefaultSeriesPartitionCompactThreshold is the number of series IDs to hold in the in-memory
|
||||
// series map before compacting and rebuilding the on-disk representation.
|
||||
const DefaultSeriesPartitionCompactThreshold = 1 << 17 // 128K
|
||||
|
||||
// SeriesPartition represents a subset of series file data.
|
||||
type SeriesPartition struct {
|
||||
mu sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
id int
|
||||
path string
|
||||
closed bool
|
||||
|
||||
segments []*SeriesSegment
|
||||
index *SeriesIndex
|
||||
seq uint64 // series id sequence
|
||||
|
||||
compacting bool
|
||||
|
||||
CompactThreshold int
|
||||
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
||||
// NewSeriesPartition returns a new instance of SeriesPartition.
|
||||
func NewSeriesPartition(id int, path string) *SeriesPartition {
|
||||
return &SeriesPartition{
|
||||
id: id,
|
||||
path: path,
|
||||
CompactThreshold: DefaultSeriesPartitionCompactThreshold,
|
||||
Logger: zap.NewNop(),
|
||||
}
|
||||
}
|
||||
|
||||
// Open memory maps the data file at the partition's path.
|
||||
func (p *SeriesPartition) Open() error {
|
||||
if p.closed {
|
||||
return errors.New("tsdb: cannot reopen series partition")
|
||||
}
|
||||
|
||||
// Create path if it doesn't exist.
|
||||
if err := os.MkdirAll(filepath.Join(p.path), 0777); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Open components.
|
||||
if err := func() (err error) {
|
||||
if err := p.openSegments(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Init last segment for writes.
|
||||
if err := p.activeSegment().InitForWrite(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.index = NewSeriesIndex(p.IndexPath())
|
||||
if err := p.index.Open(); err != nil {
|
||||
return err
|
||||
} else if p.index.Recover(p.segments); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
p.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SeriesPartition) openSegments() error {
|
||||
fis, err := ioutil.ReadDir(p.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, fi := range fis {
|
||||
segmentID, err := ParseSeriesSegmentFilename(fi.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
segment := NewSeriesSegment(segmentID, filepath.Join(p.path, fi.Name()))
|
||||
if err := segment.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
p.segments = append(p.segments, segment)
|
||||
}
|
||||
|
||||
// Find max series id by searching segments in reverse order.
|
||||
for i := len(p.segments) - 1; i >= 0; i-- {
|
||||
if p.seq = p.segments[i].MaxSeriesID(); p.seq > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Create initial segment if none exist.
|
||||
if len(p.segments) == 0 {
|
||||
segment, err := CreateSeriesSegment(0, filepath.Join(p.path, "0000"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.segments = append(p.segments, segment)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close unmaps the data files.
|
||||
func (p *SeriesPartition) Close() (err error) {
|
||||
p.wg.Wait()
|
||||
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
p.closed = true
|
||||
|
||||
for _, s := range p.segments {
|
||||
if e := s.Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
p.segments = nil
|
||||
|
||||
if p.index != nil {
|
||||
if e := p.index.Close(); e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
p.index = nil
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// ID returns the partition id.
|
||||
func (p *SeriesPartition) ID() int { return p.id }
|
||||
|
||||
// Path returns the path to the partition.
|
||||
func (p *SeriesPartition) Path() string { return p.path }
|
||||
|
||||
// Path returns the path to the series index.
|
||||
func (p *SeriesPartition) IndexPath() string { return filepath.Join(p.path, "index") }
|
||||
|
||||
// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
|
||||
// The returned ids list returns values for new series and zero for existing series.
|
||||
func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitionIDs []int, ids []uint64) error {
|
||||
var writeRequired bool
|
||||
p.mu.RLock()
|
||||
if p.closed {
|
||||
p.mu.RUnlock()
|
||||
return ErrSeriesPartitionClosed
|
||||
}
|
||||
for i := range keys {
|
||||
if keyPartitionIDs[i] != p.id {
|
||||
continue
|
||||
}
|
||||
id := p.index.FindIDBySeriesKey(p.segments, keys[i])
|
||||
if id == 0 {
|
||||
writeRequired = true
|
||||
continue
|
||||
}
|
||||
ids[i] = id
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
|
||||
// Exit if all series for this partition already exist.
|
||||
if !writeRequired {
|
||||
return nil
|
||||
}
|
||||
|
||||
type keyRange struct {
|
||||
id uint64
|
||||
offset int64
|
||||
}
|
||||
newKeyRanges := make([]keyRange, 0, len(keys))
|
||||
|
||||
// Obtain write lock to create new series.
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.closed {
|
||||
return ErrSeriesPartitionClosed
|
||||
}
|
||||
|
||||
// Track offsets of duplicate series.
|
||||
newIDs := make(map[string]uint64, len(ids))
|
||||
|
||||
for i := range keys {
|
||||
// Skip series that don't belong to the partition or have already been created.
|
||||
if keyPartitionIDs[i] != p.id || ids[i] != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Re-attempt lookup under write lock.
|
||||
key := keys[i]
|
||||
if ids[i] = newIDs[string(key)]; ids[i] != 0 {
|
||||
continue
|
||||
} else if ids[i] = p.index.FindIDBySeriesKey(p.segments, key); ids[i] != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Write to series log and save offset.
|
||||
id, offset, err := p.insert(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Append new key to be added to hash map after flush.
|
||||
ids[i] = id
|
||||
newIDs[string(key)] = id
|
||||
newKeyRanges = append(newKeyRanges, keyRange{id, offset})
|
||||
}
|
||||
|
||||
// Flush active segment writes so we can access data in mmap.
|
||||
if segment := p.activeSegment(); segment != nil {
|
||||
if err := segment.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Add keys to hash map(s).
|
||||
for _, keyRange := range newKeyRanges {
|
||||
p.index.Insert(p.seriesKeyByOffset(keyRange.offset), keyRange.id, keyRange.offset)
|
||||
}
|
||||
|
||||
// Check if we've crossed the compaction threshold.
|
||||
if !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) {
|
||||
p.compacting = true
|
||||
logger := p.Logger.With(zap.String("path", p.path))
|
||||
logger.Info("beginning series partition compaction")
|
||||
|
||||
startTime := time.Now()
|
||||
p.wg.Add(1)
|
||||
go func() {
|
||||
defer p.wg.Done()
|
||||
|
||||
if err := NewSeriesPartitionCompactor().Compact(p); err != nil {
|
||||
logger.With(zap.Error(err)).Error("series partition compaction failed")
|
||||
}
|
||||
|
||||
logger.With(zap.Duration("elapsed", time.Since(startTime))).Info("completed series partition compaction")
|
||||
|
||||
// Clear compaction flag.
|
||||
p.mu.Lock()
|
||||
p.compacting = false
|
||||
p.mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteSeriesID flags a series as permanently deleted.
|
||||
// If the series is reintroduced later then it must create a new id.
|
||||
func (p *SeriesPartition) DeleteSeriesID(id uint64) error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.closed {
|
||||
return ErrSeriesPartitionClosed
|
||||
}
|
||||
|
||||
// Already tombstoned, ignore.
|
||||
if p.index.IsDeleted(id) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write tombstone entry.
|
||||
_, err := p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryTombstoneFlag, id, nil))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Mark tombstone in memory.
|
||||
p.index.Delete(id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsDeleted returns true if the ID has been deleted before.
|
||||
func (p *SeriesPartition) IsDeleted(id uint64) bool {
|
||||
p.mu.RLock()
|
||||
if p.closed {
|
||||
p.mu.RUnlock()
|
||||
return false
|
||||
}
|
||||
v := p.index.IsDeleted(id)
|
||||
p.mu.RUnlock()
|
||||
return v
|
||||
}
|
||||
|
||||
// SeriesKey returns the series key for a given id.
|
||||
func (p *SeriesPartition) SeriesKey(id uint64) []byte {
|
||||
if id == 0 {
|
||||
return nil
|
||||
}
|
||||
p.mu.RLock()
|
||||
if p.closed {
|
||||
p.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
key := p.seriesKeyByOffset(p.index.FindOffsetByID(id))
|
||||
p.mu.RUnlock()
|
||||
return key
|
||||
}
|
||||
|
||||
// Series returns the parsed series name and tags for an offset.
|
||||
func (p *SeriesPartition) Series(id uint64) ([]byte, models.Tags) {
|
||||
key := p.SeriesKey(id)
|
||||
if key == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return ParseSeriesKey(key)
|
||||
}
|
||||
|
||||
// FindIDBySeriesKey return the series id for the series key.
|
||||
func (p *SeriesPartition) FindIDBySeriesKey(key []byte) uint64 {
|
||||
p.mu.RLock()
|
||||
if p.closed {
|
||||
p.mu.RUnlock()
|
||||
return 0
|
||||
}
|
||||
id := p.index.FindIDBySeriesKey(p.segments, key)
|
||||
p.mu.RUnlock()
|
||||
return id
|
||||
}
|
||||
|
||||
// SeriesCount returns the number of series.
|
||||
func (p *SeriesPartition) SeriesCount() uint64 {
|
||||
p.mu.RLock()
|
||||
if p.closed {
|
||||
p.mu.RUnlock()
|
||||
return 0
|
||||
}
|
||||
n := p.index.Count()
|
||||
p.mu.RUnlock()
|
||||
return n
|
||||
}
|
||||
|
||||
// AppendSeriesIDs returns a list of all series ids.
|
||||
func (p *SeriesPartition) AppendSeriesIDs(a []uint64) []uint64 {
|
||||
for _, segment := range p.segments {
|
||||
a = segment.AppendSeriesIDs(a)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// activeSegment returns the last segment.
|
||||
func (p *SeriesPartition) activeSegment() *SeriesSegment {
|
||||
if len(p.segments) == 0 {
|
||||
return nil
|
||||
}
|
||||
return p.segments[len(p.segments)-1]
|
||||
}
|
||||
|
||||
func (p *SeriesPartition) insert(key []byte) (id uint64, offset int64, err error) {
|
||||
// ID is built using a autoincrement sequence joined to the partition id.
|
||||
// Format: <seq(7b)><partition(1b)>
|
||||
id = ((p.seq + 1) << 8) | uint64(p.id)
|
||||
|
||||
offset, err = p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryInsertFlag, id, key))
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
p.seq++
|
||||
return id, offset, nil
|
||||
}
|
||||
|
||||
// writeLogEntry appends an entry to the end of the active segment.
|
||||
// If there is no more room in the segment then a new segment is added.
|
||||
func (p *SeriesPartition) writeLogEntry(data []byte) (offset int64, err error) {
|
||||
segment := p.activeSegment()
|
||||
if segment == nil || !segment.CanWrite(data) {
|
||||
if segment, err = p.createSegment(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
return segment.WriteLogEntry(data)
|
||||
}
|
||||
|
||||
// createSegment appends a new segment
|
||||
func (p *SeriesPartition) createSegment() (*SeriesSegment, error) {
|
||||
// Close writer for active segment, if one exists.
|
||||
if segment := p.activeSegment(); segment != nil {
|
||||
if err := segment.CloseForWrite(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Generate a new sequential segment identifier.
|
||||
var id uint16
|
||||
if len(p.segments) > 0 {
|
||||
id = p.segments[len(p.segments)-1].ID() + 1
|
||||
}
|
||||
filename := fmt.Sprintf("%04x", id)
|
||||
|
||||
// Generate new empty segment.
|
||||
segment, err := CreateSeriesSegment(id, filepath.Join(p.path, filename))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.segments = append(p.segments, segment)
|
||||
|
||||
// Allow segment to write.
|
||||
if err := segment.InitForWrite(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return segment, nil
|
||||
}
|
||||
|
||||
func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte {
|
||||
if offset == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
segmentID, pos := SplitSeriesOffset(offset)
|
||||
for _, segment := range p.segments {
|
||||
if segment.ID() != segmentID {
|
||||
continue
|
||||
}
|
||||
|
||||
key, _ := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize))
|
||||
return key
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SeriesPartitionCompactor represents an object reindexes a series partition and optionally compacts segments.
|
||||
type SeriesPartitionCompactor struct{}
|
||||
|
||||
// NewSeriesPartitionCompactor returns a new instance of SeriesPartitionCompactor.
|
||||
func NewSeriesPartitionCompactor() *SeriesPartitionCompactor {
|
||||
return &SeriesPartitionCompactor{}
|
||||
}
|
||||
|
||||
// Compact rebuilds the series partition index.
|
||||
func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) error {
|
||||
// Snapshot the partitions and index so we can check tombstones and replay at the end under lock.
|
||||
p.mu.RLock()
|
||||
segments := CloneSeriesSegments(p.segments)
|
||||
index := p.index.Clone()
|
||||
seriesN := p.index.Count()
|
||||
p.mu.RUnlock()
|
||||
|
||||
// Compact index to a temporary location.
|
||||
indexPath := index.path + ".compacting"
|
||||
if err := c.compactIndexTo(index, seriesN, segments, indexPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Swap compacted index under lock & replay since compaction.
|
||||
if err := func() error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
// Reopen index with new file.
|
||||
if err := p.index.Close(); err != nil {
|
||||
return err
|
||||
} else if err := os.Rename(indexPath, index.path); err != nil {
|
||||
return err
|
||||
} else if err := p.index.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Replay new entries.
|
||||
if err := p.index.Recover(p.segments); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error {
|
||||
hdr := NewSeriesIndexHeader()
|
||||
hdr.Count = seriesN
|
||||
hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor)
|
||||
|
||||
// Allocate space for maps.
|
||||
keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
|
||||
idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
|
||||
|
||||
// Reindex all partitions.
|
||||
for _, segment := range segments {
|
||||
errDone := errors.New("done")
|
||||
|
||||
if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
|
||||
// Make sure we don't go past the offset where the compaction began.
|
||||
if offset >= index.maxOffset {
|
||||
return errDone
|
||||
}
|
||||
|
||||
// Only process insert entries.
|
||||
switch flag {
|
||||
case SeriesEntryInsertFlag: // fallthrough
|
||||
case SeriesEntryTombstoneFlag:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("unexpected series partition log entry flag: %d", flag)
|
||||
}
|
||||
|
||||
// Ignore entry if tombstoned.
|
||||
if index.IsDeleted(id) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Save max series identifier processed.
|
||||
hdr.MaxSeriesID, hdr.MaxOffset = id, offset
|
||||
|
||||
// Insert into maps.
|
||||
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
|
||||
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
|
||||
}); err == errDone {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Open file handler.
|
||||
f, err := os.Create(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Calculate map positions.
|
||||
hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = SeriesIndexHeaderSize, int64(len(keyIDMap))
|
||||
hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size, int64(len(idOffsetMap))
|
||||
|
||||
// Write header.
|
||||
if _, err := hdr.WriteTo(f); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write maps.
|
||||
if _, err := f.Write(keyIDMap); err != nil {
|
||||
return err
|
||||
} else if _, err := f.Write(idOffsetMap); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Sync & close.
|
||||
if err := f.Sync(); err != nil {
|
||||
return err
|
||||
} else if err := f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *SeriesPartitionCompactor) insertKeyIDMap(dst []byte, capacity int64, segments []*SeriesSegment, key []byte, offset int64, id uint64) error {
|
||||
mask := capacity - 1
|
||||
hash := rhh.HashKey(key)
|
||||
|
||||
// Continue searching until we find an empty slot or lower probe distance.
|
||||
for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask {
|
||||
assert(i <= capacity, "key/id map full")
|
||||
elem := dst[(pos * SeriesIndexElemSize):]
|
||||
|
||||
// If empty slot found or matching offset, insert and exit.
|
||||
elemOffset := int64(binary.BigEndian.Uint64(elem[:8]))
|
||||
elemID := binary.BigEndian.Uint64(elem[8:])
|
||||
if elemOffset == 0 || elemOffset == offset {
|
||||
binary.BigEndian.PutUint64(elem[:8], uint64(offset))
|
||||
binary.BigEndian.PutUint64(elem[8:], id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read key at position & hash.
|
||||
elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize)
|
||||
elemHash := rhh.HashKey(elemKey)
|
||||
|
||||
// If the existing elem has probed less than us, then swap places with
|
||||
// existing elem, and keep going to find another slot for that elem.
|
||||
if d := rhh.Dist(elemHash, pos, capacity); d < dist {
|
||||
// Insert current values.
|
||||
binary.BigEndian.PutUint64(elem[:8], uint64(offset))
|
||||
binary.BigEndian.PutUint64(elem[8:], id)
|
||||
|
||||
// Swap with values in that position.
|
||||
hash, key, offset, id = elemHash, elemKey, elemOffset, elemID
|
||||
|
||||
// Update current distance.
|
||||
dist = d
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *SeriesPartitionCompactor) insertIDOffsetMap(dst []byte, capacity int64, id uint64, offset int64) {
|
||||
mask := capacity - 1
|
||||
hash := rhh.HashUint64(id)
|
||||
|
||||
// Continue searching until we find an empty slot or lower probe distance.
|
||||
for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask {
|
||||
assert(i <= capacity, "id/offset map full")
|
||||
elem := dst[(pos * SeriesIndexElemSize):]
|
||||
|
||||
// If empty slot found or matching id, insert and exit.
|
||||
elemID := binary.BigEndian.Uint64(elem[:8])
|
||||
elemOffset := int64(binary.BigEndian.Uint64(elem[8:]))
|
||||
if elemOffset == 0 || elemOffset == offset {
|
||||
binary.BigEndian.PutUint64(elem[:8], id)
|
||||
binary.BigEndian.PutUint64(elem[8:], uint64(offset))
|
||||
return
|
||||
}
|
||||
|
||||
// Hash key.
|
||||
elemHash := rhh.HashUint64(elemID)
|
||||
|
||||
// If the existing elem has probed less than us, then swap places with
|
||||
// existing elem, and keep going to find another slot for that elem.
|
||||
if d := rhh.Dist(elemHash, pos, capacity); d < dist {
|
||||
// Insert current values.
|
||||
binary.BigEndian.PutUint64(elem[:8], id)
|
||||
binary.BigEndian.PutUint64(elem[8:], uint64(offset))
|
||||
|
||||
// Swap with values in that position.
|
||||
hash, id, offset = elemHash, elemID, elemOffset
|
||||
|
||||
// Update current distance.
|
||||
dist = d
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pow2 returns the number that is the next highest power of 2.
|
||||
// Returns v if it is a power of 2.
|
||||
func pow2(v int64) int64 {
|
||||
for i := int64(2); i < 1<<62; i *= 2 {
|
||||
if i >= v {
|
||||
return i
|
||||
}
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
|
@ -843,23 +843,11 @@ func (s *Store) estimateCardinality(dbName string, getSketches func(*Shard) (est
|
|||
|
||||
// SeriesCardinality returns the series cardinality for the provided database.
|
||||
func (s *Store) SeriesCardinality(database string) (int64, error) {
|
||||
s.mu.RLock()
|
||||
shards := s.filterShards(byDatabase(database))
|
||||
s.mu.RUnlock()
|
||||
|
||||
// TODO(benbjohnson): Series file will be shared by the DB.
|
||||
var max int64
|
||||
for _, shard := range shards {
|
||||
index, err := shard.Index()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if n := index.SeriesN(); n > max {
|
||||
max = n
|
||||
}
|
||||
sfile := s.seriesFile(database)
|
||||
if sfile == nil {
|
||||
return 0, nil
|
||||
}
|
||||
return max, nil
|
||||
return int64(sfile.SeriesCount()), nil
|
||||
}
|
||||
|
||||
// MeasurementsCardinality returns the measurement cardinality for the provided
|
||||
|
|
Loading…
Reference in New Issue