diff --git a/Godeps b/Godeps index 92e38372de..b0222ef6d4 100644 --- a/Godeps +++ b/Godeps @@ -36,6 +36,7 @@ go.uber.org/multierr fb7d312c2c04c34f0ad621048bbb953b168f9ff6 go.uber.org/zap 35aad584952c3e7020db7b839f6b102de6271f89 golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd golang.org/x/net 9dfe39835686865bff950a07b394c12a98ddc811 +golang.org/x/sync fd80eb99c8f653c847d294a001bdf2a3a6f768f5 golang.org/x/sys 062cd7e4e68206d8bab9b18396626e855c992658 golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34 golang.org/x/time 6dc17368e09b0e8634d71cac8168d853e869a0c7 diff --git a/pkg/binaryutil/binaryutil.go b/pkg/binaryutil/binaryutil.go new file mode 100644 index 0000000000..b1d5f2ad06 --- /dev/null +++ b/pkg/binaryutil/binaryutil.go @@ -0,0 +1,22 @@ +package binaryutil + +// VarintSize returns the number of bytes to varint encode x. +// This code is copied from encoding/binary.PutVarint() with the buffer removed. +func VarintSize(x int64) int { + ux := uint64(x) << 1 + if x < 0 { + ux = ^ux + } + return UvarintSize(ux) +} + +// UvarintSize returns the number of bytes to uvarint encode x. +// This code is copied from encoding/binary.PutUvarint() with the buffer removed. +func UvarintSize(x uint64) int { + i := 0 + for x >= 0x80 { + x >>= 7 + i++ + } + return i + 1 +} diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 25ac383d64..53475c4fbc 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -14,6 +14,7 @@ import ( "path/filepath" "reflect" "runtime" + "sort" "strings" "sync" "testing" @@ -1185,6 +1186,7 @@ func TestEngine_DeleteSeriesRange(t *testing.T) { name, tags := e.sfile.Series(elem.SeriesID) gotKeys = append(gotKeys, string(models.MakeKey(name, tags))) } + sort.Strings(gotKeys) if !reflect.DeepEqual(gotKeys, expKeys) { t.Fatalf("got keys %v, expected %v", gotKeys, expKeys) diff --git a/tsdb/index.go b/tsdb/index.go index 8c6b8d3e50..05e4eec803 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -151,6 +151,25 @@ type SeriesIDIterator interface { Close() error } +// ReadAllSeriesIDIterator returns all ids from the iterator. +func ReadAllSeriesIDIterator(itr SeriesIDIterator) ([]uint64, error) { + if itr == nil { + return nil, nil + } + + var a []uint64 + for { + e, err := itr.Next() + if err != nil { + return nil, err + } else if e.SeriesID == 0 { + break + } + a = append(a, e.SeriesID) + } + return a, nil +} + // NewSeriesIDSliceIterator returns a SeriesIDIterator that iterates over a slice. func NewSeriesIDSliceIterator(ids []uint64) *SeriesIDSliceIterator { return &SeriesIDSliceIterator{ids: ids} diff --git a/tsdb/index/inmem/meta.go b/tsdb/index/inmem/meta.go index e5ed4200ed..8cff602641 100644 --- a/tsdb/index/inmem/meta.go +++ b/tsdb/index/inmem/meta.go @@ -266,15 +266,7 @@ func (m *measurement) AddSeries(s *series) bool { valueMap = newTagKeyValue() m.seriesByTagKeyValue[string(t.Key)] = valueMap } - ids := valueMap.LoadByte(t.Value) - ids = append(ids, s.ID) - - // most of the time the series ID will be higher than all others because it's a new - // series. So don't do the sort if we don't have to. - if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] { - sort.Sort(ids) - } - valueMap.StoreByte(t.Value, ids) + valueMap.InsertSeriesIDByte(t.Value, s.ID) } return true @@ -1292,13 +1284,13 @@ func (s *series) Deleted() bool { // // TODO(edd): This could possibly be replaced by a sync.Map once we use Go 1.9. type tagKeyValue struct { - mu sync.RWMutex - valueIDs map[string]seriesIDs + mu sync.RWMutex + entries map[string]*tagKeyValueEntry } // NewTagKeyValue initialises a new TagKeyValue. func newTagKeyValue() *tagKeyValue { - return &tagKeyValue{valueIDs: make(map[string]seriesIDs)} + return &tagKeyValue{entries: make(map[string]*tagKeyValueEntry)} } // Cardinality returns the number of values in the TagKeyValue. @@ -1309,7 +1301,7 @@ func (t *tagKeyValue) Cardinality() int { t.mu.RLock() defer t.mu.RUnlock() - return len(t.valueIDs) + return len(t.entries) } // Contains returns true if the TagKeyValue contains value. @@ -1320,10 +1312,34 @@ func (t *tagKeyValue) Contains(value string) bool { t.mu.RLock() defer t.mu.RUnlock() - _, ok := t.valueIDs[value] + _, ok := t.entries[value] return ok } +// InsertSeriesID adds a series id to the tag key value. +func (t *tagKeyValue) InsertSeriesID(value string, id uint64) { + t.mu.Lock() + entry := t.entries[value] + if entry == nil { + entry = newTagKeyValueEntry() + t.entries[value] = entry + } + entry.m[id] = struct{}{} + t.mu.Unlock() +} + +// InsertSeriesIDByte adds a series id to the tag key value. +func (t *tagKeyValue) InsertSeriesIDByte(value []byte, id uint64) { + t.mu.Lock() + entry := t.entries[string(value)] + if entry == nil { + entry = newTagKeyValueEntry() + t.entries[string(value)] = entry + } + entry.m[id] = struct{}{} + t.mu.Unlock() +} + // Load returns the SeriesIDs for the provided tag value. func (t *tagKeyValue) Load(value string) seriesIDs { if t == nil { @@ -1331,9 +1347,10 @@ func (t *tagKeyValue) Load(value string) seriesIDs { } t.mu.RLock() - sIDs := t.valueIDs[value] + entry := t.entries[value] + ids := entry.ids() t.mu.RUnlock() - return sIDs + return ids } // LoadByte returns the SeriesIDs for the provided tag value. It makes use of @@ -1344,9 +1361,10 @@ func (t *tagKeyValue) LoadByte(value []byte) seriesIDs { } t.mu.RLock() - sIDs := t.valueIDs[string(value)] + entry := t.entries[string(value)] + ids := entry.ids() t.mu.RUnlock() - return sIDs + return ids } // Range calls f sequentially on each key and value. A call to Range on a nil @@ -1360,8 +1378,9 @@ func (t *tagKeyValue) Range(f func(tagValue string, a seriesIDs) bool) { t.mu.RLock() defer t.mu.RUnlock() - for tagValue, a := range t.valueIDs { - if !f(tagValue, a) { + for tagValue, entry := range t.entries { + ids := entry.ids() + if !f(tagValue, ids) { return } } @@ -1376,18 +1395,33 @@ func (t *tagKeyValue) RangeAll(f func(k string, a seriesIDs)) { }) } -// Store stores ids under the value key. -func (t *tagKeyValue) Store(value string, ids seriesIDs) { - t.mu.Lock() - t.valueIDs[value] = ids - t.mu.Unlock() +type tagKeyValueEntry struct { + m map[uint64]struct{} // series id set + a seriesIDs // lazily sorted list of series. } -// StoreByte stores ids under the value key. -func (t *tagKeyValue) StoreByte(value []byte, ids seriesIDs) { - t.mu.Lock() - t.valueIDs[string(value)] = ids - t.mu.Unlock() +func newTagKeyValueEntry() *tagKeyValueEntry { + return &tagKeyValueEntry{m: make(map[uint64]struct{})} +} + +func (e *tagKeyValueEntry) ids() seriesIDs { + if e == nil { + return nil + } + + if len(e.a) == len(e.m) { + return e.a + } + + a := make(seriesIDs, 0, len(e.m)) + for id := range e.m { + a = append(a, id) + } + sort.Sort(a) + + e.a = a + return e.a + } // SeriesIDs is a convenience type for sorting, checking equality, and doing diff --git a/tsdb/index/tsi1/file_set_test.go b/tsdb/index/tsi1/file_set_test.go index be396ebb42..1c2a44f91c 100644 --- a/tsdb/index/tsi1/file_set_test.go +++ b/tsdb/index/tsi1/file_set_test.go @@ -1,9 +1,13 @@ package tsi1_test import ( + "fmt" + "reflect" + "sort" "testing" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb" ) // Ensure fileset can return an iterator over all series in the index. @@ -35,26 +39,12 @@ func TestFileSet_SeriesIDIterator(t *testing.T) { if itr == nil { t.Fatal("expected iterator") } - - if elem, err := itr.Next(); err != nil { - t.Fatal(err) - } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if elem, err := itr.Next(); err != nil { - t.Fatal(err) - } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if elem, err := itr.Next(); err != nil { - t.Fatal(err) - } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `mem` || tags.String() != `[{region east}]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if elem, err := itr.Next(); err != nil { - t.Fatal(err) - } else if elem.SeriesID != 0 { - t.Fatalf("expected eof, got: %d", elem.SeriesID) + if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{ + "cpu,[{region east}]", + "cpu,[{region west}]", + "mem,[{region east}]", + }) { + t.Fatalf("unexpected keys: %s", result) } }) @@ -80,37 +70,14 @@ func TestFileSet_SeriesIDIterator(t *testing.T) { t.Fatal("expected iterator") } - allexpected := []struct { - name string - tagset string - }{ - {`cpu`, `[{region east}]`}, - {`cpu`, `[{region west}]`}, - {`mem`, `[{region east}]`}, - {`disk`, `[]`}, - {`cpu`, `[{region north}]`}, - } - - for _, expected := range allexpected { - e, err := itr.Next() - if err != nil { - t.Fatal(err) - } - - if name, tags := fs.SeriesFile().Series(e.SeriesID); string(name) != expected.name || tags.String() != expected.tagset { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - } - - // Check for end of iterator... - e, err := itr.Next() - if err != nil { - t.Fatal(err) - } - - if e.SeriesID != 0 { - name, tags := fs.SeriesFile().Series(e.SeriesID) - t.Fatalf("got: %s/%s, but expected EOF", name, tags.String()) + if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{ + "cpu,[{region east}]", + "cpu,[{region north}]", + "cpu,[{region west}]", + "disk,[]", + "mem,[{region east}]", + }) { + t.Fatalf("unexpected keys: %s", result) } }) } @@ -145,20 +112,11 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) { t.Fatal("expected iterator") } - if elem, err := itr.Next(); err != nil { - t.Fatal(err) - } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if elem, err := itr.Next(); err != nil { - t.Fatal(err) - } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if elem, err := itr.Next(); err != nil { - t.Fatal(err) - } else if elem.SeriesID != 0 { - t.Fatalf("expected eof, got: %d", elem.SeriesID) + if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{ + "cpu,[{region east}]", + "cpu,[{region west}]", + }) { + t.Fatalf("unexpected keys: %s", result) } }) @@ -183,25 +141,12 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) { t.Fatalf("expected iterator") } - if elem, err := itr.Next(); err != nil { - t.Fatal(err) - } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if elem, err := itr.Next(); err != nil { - t.Fatal(err) - } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if elem, err := itr.Next(); err != nil { - t.Fatal(err) - } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region north}]` { - t.Fatalf("unexpected series: %s/%s", name, tags.String()) - } - if elem, err := itr.Next(); err != nil { - t.Fatal(err) - } else if elem.SeriesID != 0 { - t.Fatalf("expected eof, got: %d", elem.SeriesID) + if result := MustReadAllSeriesIDIteratorString(fs.SeriesFile(), itr); !reflect.DeepEqual(result, []string{ + "cpu,[{region east}]", + "cpu,[{region north}]", + "cpu,[{region west}]", + }) { + t.Fatalf("unexpected keys: %s", result) } }) } @@ -348,3 +293,23 @@ func TestFileSet_TagKeyIterator(t *testing.T) { } }) } + +func MustReadAllSeriesIDIteratorString(sfile *tsdb.SeriesFile, itr tsdb.SeriesIDIterator) []string { + // Read all ids. + ids, err := tsdb.ReadAllSeriesIDIterator(itr) + if err != nil { + panic(err) + } + + // Convert to keys and sort. + keys := sfile.SeriesKeys(ids) + sort.Slice(keys, func(i, j int) bool { return tsdb.CompareSeriesKeys(keys[i], keys[j]) == -1 }) + + // Convert to strings. + a := make([]string, len(keys)) + for i := range a { + name, tags := tsdb.ParseSeriesKey(keys[i]) + a[i] = fmt.Sprintf("%s,%s", name, tags.String()) + } + return a +} diff --git a/tsdb/index/tsi1/index_file.go b/tsdb/index/tsi1/index_file.go index fa3e0be437..9be24fb29d 100644 --- a/tsdb/index/tsi1/index_file.go +++ b/tsdb/index/tsi1/index_file.go @@ -245,16 +245,13 @@ func (f *IndexFile) TagValueSeriesIDIterator(name, key, value []byte) tsdb.Serie } // Find value element. - ve := tblk.TagValueElem(key, value) - if ve == nil { + n, data := tblk.TagValueSeriesData(key, value) + if n == 0 { return nil } // Create an iterator over value's series. - return &rawSeriesIDIterator{ - n: ve.(*TagBlockValueElem).series.n, - data: ve.(*TagBlockValueElem).series.data, - } + return &rawSeriesIDIterator{n: n, data: data} } // TagKey returns a tag key. diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index 4f92e1a71b..318588bf59 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -306,8 +306,8 @@ func TestIndex_DiskSizeBytes(t *testing.T) { } // Verify on disk size is the same in each stage. - // There are four series, and each series id is 8 bytes plus one byte for the tombstone header - expSize := int64(4 * 9) + // Each series stores flag(1) + series(uvarint(2)) + len(name)(1) + len(key)(1) + len(value)(1) + checksum(4). + expSize := int64(4 * 10) // Each MANIFEST file is 419 bytes and there are tsi1.DefaultPartitionN of them expSize += int64(tsi1.DefaultPartitionN * 419) diff --git a/tsdb/index/tsi1/tag_block.go b/tsdb/index/tsi1/tag_block.go index e636b164b5..ca1d06fba9 100644 --- a/tsdb/index/tsi1/tag_block.go +++ b/tsdb/index/tsi1/tag_block.go @@ -90,6 +90,14 @@ func (blk *TagBlock) UnmarshalBinary(data []byte) error { // TagKeyElem returns an element for a tag key. // Returns an element with a nil key if not found. func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem { + var elem TagBlockKeyElem + if !blk.DecodeTagKeyElem(key, &elem) { + return nil + } + return &elem +} + +func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool { keyN := int64(binary.BigEndian.Uint64(blk.hashData[:TagKeyNSize])) hash := rhh.HashKey(key) pos := hash % keyN @@ -100,21 +108,20 @@ func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem { // Find offset of tag key. offset := binary.BigEndian.Uint64(blk.hashData[TagKeyNSize+(pos*TagKeyOffsetSize):]) if offset == 0 { - return nil + return false } // Parse into element. - var e TagBlockKeyElem - e.unmarshal(blk.data[offset:], blk.data) + elem.unmarshal(blk.data[offset:], blk.data) // Return if keys match. - if bytes.Equal(e.key, key) { - return &e + if bytes.Equal(elem.key, key) { + return true } // Check if we've exceeded the probe distance. - if d > rhh.Dist(rhh.HashKey(e.key), pos, keyN) { - return nil + if d > rhh.Dist(rhh.HashKey(elem.key), pos, keyN) { + return false } // Move position forward. @@ -122,21 +129,39 @@ func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem { d++ if d > keyN { - return nil + return false } } } // TagValueElem returns an element for a tag value. func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem { - // Find key element, exit if not found. - kelem, _ := blk.TagKeyElem(key).(*TagBlockKeyElem) - if kelem == nil { + var valueElem TagBlockValueElem + if !blk.DecodeTagValueElem(key, value, &valueElem) { return nil } + return &valueElem +} + +// TagValueElem returns an element for a tag value. +func (blk *TagBlock) TagValueSeriesData(key, value []byte) (uint64, []byte) { + var valueElem TagBlockValueElem + if !blk.DecodeTagValueElem(key, value, &valueElem) { + return 0, nil + } + return valueElem.series.n, valueElem.series.data +} + +// DecodeTagValueElem returns an element for a tag value. +func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockValueElem) bool { + // Find key element, exit if not found. + var keyElem TagBlockKeyElem + if !blk.DecodeTagKeyElem(key, &keyElem) { + return false + } // Slice hash index data. - hashData := kelem.hashIndex.buf + hashData := keyElem.hashIndex.buf valueN := int64(binary.BigEndian.Uint64(hashData[:TagValueNSize])) hash := rhh.HashKey(value) @@ -148,22 +173,21 @@ func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem { // Find offset of tag value. offset := binary.BigEndian.Uint64(hashData[TagValueNSize+(pos*TagValueOffsetSize):]) if offset == 0 { - return nil + return false } // Parse into element. - var e TagBlockValueElem - e.unmarshal(blk.data[offset:]) + valueElem.unmarshal(blk.data[offset:]) // Return if values match. - if bytes.Equal(e.value, value) { - return &e + if bytes.Equal(valueElem.value, value) { + return true } // Check if we've exceeded the probe distance. - max := rhh.Dist(rhh.HashKey(e.value), pos, valueN) + max := rhh.Dist(rhh.HashKey(valueElem.value), pos, valueN) if d > max { - return nil + return false } // Move position forward. @@ -171,7 +195,7 @@ func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem { d++ if d > valueN { - return nil + return false } } } diff --git a/tsdb/series_file.go b/tsdb/series_file.go index b1a8599fdd..383608f83a 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -5,42 +5,34 @@ import ( "encoding/binary" "errors" "fmt" - "io/ioutil" "os" "path/filepath" - "sync" - "time" + "sort" + "github.com/cespare/xxhash" "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/pkg/rhh" + "github.com/influxdata/influxdb/pkg/binaryutil" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var ( - ErrSeriesFileClosed = errors.New("tsdb: series file closed") + ErrSeriesFileClosed = errors.New("tsdb: series file closed") + ErrInvalidSeriesPartitionID = errors.New("tsdb: invalid series partition id") ) // SeriesIDSize is the size in bytes of a series key ID. const SeriesIDSize = 8 -// DefaultSeriesFileCompactThreshold is the number of series IDs to hold in the in-memory -// series map before compacting and rebuilding the on-disk representation. -const DefaultSeriesFileCompactThreshold = 1 << 20 // 1M +const ( + // SeriesFilePartitionN is the number of partitions a series file is split into. + SeriesFilePartitionN = 8 +) // SeriesFile represents the section of the index that holds series data. type SeriesFile struct { - mu sync.RWMutex - wg sync.WaitGroup - path string - closed bool - - segments []*SeriesSegment - index *SeriesIndex - seq uint64 // series id sequence - - compacting bool - - CompactThreshold int + path string + partitions []*SeriesPartition Logger *zap.Logger } @@ -48,83 +40,28 @@ type SeriesFile struct { // NewSeriesFile returns a new instance of SeriesFile. func NewSeriesFile(path string) *SeriesFile { return &SeriesFile{ - path: path, - CompactThreshold: DefaultSeriesFileCompactThreshold, - Logger: zap.NewNop(), + path: path, + Logger: zap.NewNop(), } } // Open memory maps the data file at the file's path. func (f *SeriesFile) Open() error { - if f.closed { - return errors.New("tsdb: cannot reopen series file") - } - // Create path if it doesn't exist. if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil { return err } - // Open components. - if err := func() (err error) { - if err := f.openSegments(); err != nil { + // Open partitions. + f.partitions = make([]*SeriesPartition, 0, SeriesFilePartitionN) + for i := 0; i < SeriesFilePartitionN; i++ { + p := NewSeriesPartition(i, f.SeriesPartitionPath(i)) + p.Logger = f.Logger.With(zap.Int("partition", p.ID())) + if err := p.Open(); err != nil { + f.Close() return err } - - // Init last segment for writes. - if err := f.activeSegment().InitForWrite(); err != nil { - return err - } - - f.index = NewSeriesIndex(f.IndexPath()) - if err := f.index.Open(); err != nil { - return err - } else if f.index.Recover(f.segments); err != nil { - return err - } - - return nil - }(); err != nil { - f.Close() - return err - } - - return nil -} - -func (f *SeriesFile) openSegments() error { - fis, err := ioutil.ReadDir(f.path) - if err != nil { - return err - } - - for _, fi := range fis { - segmentID, err := ParseSeriesSegmentFilename(fi.Name()) - if err != nil { - continue - } - - segment := NewSeriesSegment(segmentID, filepath.Join(f.path, fi.Name())) - if err := segment.Open(); err != nil { - return err - } - f.segments = append(f.segments, segment) - } - - // Find max series id by searching segments in reverse order. - for i := len(f.segments) - 1; i >= 0; i-- { - if f.seq = f.segments[i].MaxSeriesID(); f.seq > 0 { - break - } - } - - // Create initial segment if none exist. - if len(f.segments) == 0 { - segment, err := CreateSeriesSegment(0, filepath.Join(f.path, "0000")) - if err != nil { - return err - } - f.segments = append(f.segments, segment) + f.partitions = append(f.partitions, p) } return nil @@ -132,172 +69,63 @@ func (f *SeriesFile) openSegments() error { // Close unmaps the data file. func (f *SeriesFile) Close() (err error) { - f.wg.Wait() - - f.mu.Lock() - defer f.mu.Unlock() - - f.closed = true - - for _, s := range f.segments { - if e := s.Close(); e != nil && err == nil { + for _, p := range f.partitions { + if e := p.Close(); e != nil && err == nil { err = e } } - f.segments = nil - - if f.index != nil { - if e := f.index.Close(); e != nil && err == nil { - err = e - } - } - f.index = nil - + f.partitions = nil return err } // Path returns the path to the file. func (f *SeriesFile) Path() string { return f.path } -// Path returns the path to the series index. -func (f *SeriesFile) IndexPath() string { return filepath.Join(f.path, "index") } +// SeriesPartitionPath returns the path to a given partition. +func (f *SeriesFile) SeriesPartitionPath(i int) string { + return filepath.Join(f.path, fmt.Sprintf("%02x", i)) +} + +// Partitions returns all partitions. +func (f *SeriesFile) Partitions() []*SeriesPartition { return f.partitions } // CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist. // The returned ids list returns values for new series and zero for existing series. func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []uint64, err error) { - f.mu.RLock() - if f.closed { - f.mu.RUnlock() - return nil, ErrSeriesFileClosed + keys := GenerateSeriesKeys(names, tagsSlice) + keyPartitionIDs := f.SeriesKeysPartitionIDs(keys) + ids = make([]uint64, len(keys)) + + var g errgroup.Group + for i := range f.partitions { + p := f.partitions[i] + g.Go(func() error { + return p.CreateSeriesListIfNotExists(keys, keyPartitionIDs, ids) + }) } - ids, ok := f.index.FindIDListByNameTags(f.segments, names, tagsSlice, buf) - if ok { - f.mu.RUnlock() - return ids, nil + if err := g.Wait(); err != nil { + return nil, err } - f.mu.RUnlock() - - type keyRange struct { - id uint64 - offset int64 - } - newKeyRanges := make([]keyRange, 0, len(names)) - - // Obtain write lock to create new series. - f.mu.Lock() - defer f.mu.Unlock() - - if f.closed { - return nil, ErrSeriesFileClosed - } - - // Track offsets of duplicate series. - newIDs := make(map[string]uint64, len(ids)) - - for i := range names { - // Skip series that have already been created. - if ids[i] != 0 { - continue - } - - // Generate series key. - buf = AppendSeriesKey(buf[:0], names[i], tagsSlice[i]) - - // Re-attempt lookup under write lock. - if ids[i] = newIDs[string(buf)]; ids[i] != 0 { - continue - } else if ids[i] = f.index.FindIDByNameTags(f.segments, names[i], tagsSlice[i], buf); ids[i] != 0 { - continue - } - - // Write to series log and save offset. - id, offset, err := f.insert(buf) - if err != nil { - return nil, err - } - - // Append new key to be added to hash map after flush. - ids[i] = id - newIDs[string(buf)] = id - newKeyRanges = append(newKeyRanges, keyRange{id, offset}) - } - - // Flush active segment writes so we can access data in mmap. - if segment := f.activeSegment(); segment != nil { - if err := segment.Flush(); err != nil { - return nil, err - } - } - - // Add keys to hash map(s). - for _, keyRange := range newKeyRanges { - f.index.Insert(f.seriesKeyByOffset(keyRange.offset), keyRange.id, keyRange.offset) - } - - // Check if we've crossed the compaction threshold. - if !f.compacting && f.CompactThreshold != 0 && f.index.InMemCount() >= uint64(f.CompactThreshold) { - f.compacting = true - logger := f.Logger.With(zap.String("path", f.path)) - logger.Info("beginning series file compaction") - - startTime := time.Now() - f.wg.Add(1) - go func() { - defer f.wg.Done() - - if err := NewSeriesFileCompactor().Compact(f); err != nil { - logger.With(zap.Error(err)).Error("series file compaction failed") - } - - logger.With(zap.Duration("elapsed", time.Since(startTime))).Info("completed series file compaction") - - // Clear compaction flag. - f.mu.Lock() - f.compacting = false - f.mu.Unlock() - }() - } - return ids, nil } // DeleteSeriesID flags a series as permanently deleted. // If the series is reintroduced later then it must create a new id. func (f *SeriesFile) DeleteSeriesID(id uint64) error { - f.mu.Lock() - defer f.mu.Unlock() - - if f.closed { - return ErrSeriesFileClosed + p := f.SeriesIDPartition(id) + if p == nil { + return ErrInvalidSeriesPartitionID } - - // Already tombstoned, ignore. - if f.index.IsDeleted(id) { - return nil - } - - // Write tombstone entry. - _, err := f.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryTombstoneFlag, id, nil)) - if err != nil { - return err - } - - // Mark tombstone in memory. - f.index.Delete(id) - - return nil + return p.DeleteSeriesID(id) } // IsDeleted returns true if the ID has been deleted before. func (f *SeriesFile) IsDeleted(id uint64) bool { - f.mu.RLock() - if f.closed { - f.mu.RUnlock() + p := f.SeriesIDPartition(id) + if p == nil { return false } - v := f.index.IsDeleted(id) - f.mu.RUnlock() - return v + return p.IsDeleted(id) } // SeriesKey returns the series key for a given id. @@ -305,14 +133,20 @@ func (f *SeriesFile) SeriesKey(id uint64) []byte { if id == 0 { return nil } - f.mu.RLock() - if f.closed { - f.mu.RUnlock() + p := f.SeriesIDPartition(id) + if p == nil { return nil } - key := f.seriesKeyByOffset(f.index.FindOffsetByID(id)) - f.mu.RUnlock() - return key + return p.SeriesKey(id) +} + +// SeriesKeys returns a list of series keys from a list of ids. +func (f *SeriesFile) SeriesKeys(ids []uint64) [][]byte { + keys := make([][]byte, len(ids)) + for i := range ids { + keys[i] = f.SeriesKey(ids[i]) + } + return keys } // Series returns the parsed series name and tags for an offset. @@ -326,14 +160,12 @@ func (f *SeriesFile) Series(id uint64) ([]byte, models.Tags) { // SeriesID return the series id for the series. func (f *SeriesFile) SeriesID(name []byte, tags models.Tags, buf []byte) uint64 { - f.mu.RLock() - if f.closed { - f.mu.RUnlock() + key := AppendSeriesKey(buf[:0], name, tags) + keyPartition := f.SeriesKeyPartition(key) + if keyPartition == nil { return 0 } - id := f.index.FindIDBySeriesKey(f.segments, AppendSeriesKey(buf[:0], name, tags)) - f.mu.RUnlock() - return id + return keyPartition.FindIDBySeriesKey(key) } // HasSeries return true if the series exists. @@ -343,104 +175,53 @@ func (f *SeriesFile) HasSeries(name []byte, tags models.Tags, buf []byte) bool { // SeriesCount returns the number of series. func (f *SeriesFile) SeriesCount() uint64 { - f.mu.RLock() - if f.closed { - f.mu.RUnlock() - return 0 + var n uint64 + for _, p := range f.partitions { + n += p.SeriesCount() } - n := f.index.Count() - f.mu.RUnlock() return n } // SeriesIterator returns an iterator over all the series. func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator { var ids []uint64 - for _, segment := range f.segments { - ids = segment.AppendSeriesIDs(ids) + for _, p := range f.partitions { + ids = p.AppendSeriesIDs(ids) } + sort.Sort(uint64Slice(ids)) return NewSeriesIDSliceIterator(ids) } -// activeSegment returns the last segment. -func (f *SeriesFile) activeSegment() *SeriesSegment { - if len(f.segments) == 0 { +func (f *SeriesFile) SeriesIDPartitionID(id uint64) int { + return int(id & 0xFF) +} + +func (f *SeriesFile) SeriesIDPartition(id uint64) *SeriesPartition { + partitionID := f.SeriesIDPartitionID(id) + if partitionID >= len(f.partitions) { return nil } - return f.segments[len(f.segments)-1] + return f.partitions[partitionID] } -func (f *SeriesFile) insert(key []byte) (id uint64, offset int64, err error) { - id = f.seq + 1 - - offset, err = f.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryInsertFlag, id, key)) - if err != nil { - return 0, 0, err +func (f *SeriesFile) SeriesKeysPartitionIDs(keys [][]byte) []int { + partitionIDs := make([]int, len(keys)) + for i := range keys { + partitionIDs[i] = f.SeriesKeyPartitionID(keys[i]) } - - f.seq++ - return id, offset, nil + return partitionIDs } -// writeLogEntry appends an entry to the end of the active segment. -// If there is no more room in the segment then a new segment is added. -func (f *SeriesFile) writeLogEntry(data []byte) (offset int64, err error) { - segment := f.activeSegment() - if segment == nil || !segment.CanWrite(data) { - if segment, err = f.createSegment(); err != nil { - return 0, err - } - } - return segment.WriteLogEntry(data) +func (f *SeriesFile) SeriesKeyPartitionID(key []byte) int { + return int(xxhash.Sum64(key) % SeriesFilePartitionN) } -// createSegment appends a new segment -func (f *SeriesFile) createSegment() (*SeriesSegment, error) { - // Close writer for active segment, if one exists. - if segment := f.activeSegment(); segment != nil { - if err := segment.CloseForWrite(); err != nil { - return nil, err - } - } - - // Generate a new sequential segment identifier. - var id uint16 - if len(f.segments) > 0 { - id = f.segments[len(f.segments)-1].ID() + 1 - } - filename := fmt.Sprintf("%04x", id) - - // Generate new empty segment. - segment, err := CreateSeriesSegment(id, filepath.Join(f.path, filename)) - if err != nil { - return nil, err - } - f.segments = append(f.segments, segment) - - // Allow segment to write. - if err := segment.InitForWrite(); err != nil { - return nil, err - } - - return segment, nil -} - -func (f *SeriesFile) seriesKeyByOffset(offset int64) []byte { - if offset == 0 { +func (f *SeriesFile) SeriesKeyPartition(key []byte) *SeriesPartition { + partitionID := f.SeriesKeyPartitionID(key) + if partitionID >= len(f.partitions) { return nil } - - segmentID, pos := SplitSeriesOffset(offset) - for _, segment := range f.segments { - if segment.ID() != segmentID { - continue - } - - key, _ := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize)) - return key - } - - return nil + return f.partitions[partitionID] } // AppendSeriesKey serializes name and tags to a byte slice. @@ -598,6 +379,41 @@ func CompareSeriesKeys(a, b []byte) int { } } +// GenerateSeriesKeys generates series keys for a list of names & tags using +// a single large memory block. +func GenerateSeriesKeys(names [][]byte, tagsSlice []models.Tags) [][]byte { + buf := make([]byte, 0, SeriesKeysSize(names, tagsSlice)) + keys := make([][]byte, len(names)) + for i := range names { + offset := len(buf) + buf = AppendSeriesKey(buf, names[i], tagsSlice[i]) + keys[i] = buf[offset:] + } + return keys +} + +// SeriesKeysSize returns the number of bytes required to encode a list of name/tags. +func SeriesKeysSize(names [][]byte, tagsSlice []models.Tags) int { + var n int + for i := range names { + n += SeriesKeySize(names[i], tagsSlice[i]) + } + return n +} + +// SeriesKeySize returns the number of bytes required to encode a series key. +func SeriesKeySize(name []byte, tags models.Tags) int { + var n int + n += 2 + len(name) + n += binaryutil.UvarintSize(uint64(len(tags))) + for _, tag := range tags { + n += 2 + len(tag.Key) + n += 2 + len(tag.Value) + } + n += binaryutil.UvarintSize(uint64(n)) + return n +} + type seriesKeys [][]byte func (a seriesKeys) Len() int { return len(a) } @@ -606,216 +422,8 @@ func (a seriesKeys) Less(i, j int) bool { return CompareSeriesKeys(a[i], a[j]) == -1 } -// SeriesFileCompactor represents an object reindexes a series file and optionally compacts segments. -type SeriesFileCompactor struct{} +type uint64Slice []uint64 -// NewSeriesFileCompactor returns a new instance of SeriesFileCompactor. -func NewSeriesFileCompactor() *SeriesFileCompactor { - return &SeriesFileCompactor{} -} - -// Compact rebuilds the series file index. -func (c *SeriesFileCompactor) Compact(f *SeriesFile) error { - // Snapshot the partitions and index so we can check tombstones and replay at the end under lock. - f.mu.RLock() - segments := CloneSeriesSegments(f.segments) - index := f.index.Clone() - seriesN := f.index.Count() - f.mu.RUnlock() - - // Compact index to a temporary location. - indexPath := index.path + ".compacting" - if err := c.compactIndexTo(index, seriesN, segments, indexPath); err != nil { - return err - } - - // Swap compacted index under lock & replay since compaction. - if err := func() error { - f.mu.Lock() - defer f.mu.Unlock() - - // Reopen index with new file. - if err := f.index.Close(); err != nil { - return err - } else if err := os.Rename(indexPath, index.path); err != nil { - return err - } else if err := f.index.Open(); err != nil { - return err - } - - // Replay new entries. - if err := f.index.Recover(f.segments); err != nil { - return err - } - return nil - }(); err != nil { - return err - } - - return nil -} - -func (c *SeriesFileCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error { - hdr := NewSeriesIndexHeader() - hdr.Count = seriesN - hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor) - - // Allocate space for maps. - keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) - idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) - - // Reindex all partitions. - for _, segment := range segments { - errDone := errors.New("done") - - if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { - // Make sure we don't go past the offset where the compaction began. - if offset >= index.maxOffset { - return errDone - } - - // Only process insert entries. - switch flag { - case SeriesEntryInsertFlag: // fallthrough - case SeriesEntryTombstoneFlag: - return nil - default: - return fmt.Errorf("unexpected series file log entry flag: %d", flag) - } - - // Ignore entry if tombstoned. - if index.IsDeleted(id) { - return nil - } - - // Save max series identifier processed. - hdr.MaxSeriesID, hdr.MaxOffset = id, offset - - // Insert into maps. - c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset) - return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id) - }); err == errDone { - break - } else if err != nil { - return err - } - } - - // Open file handler. - f, err := os.Create(path) - if err != nil { - return err - } - defer f.Close() - - // Calculate map positions. - hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = SeriesIndexHeaderSize, int64(len(keyIDMap)) - hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size, int64(len(idOffsetMap)) - - // Write header. - if _, err := hdr.WriteTo(f); err != nil { - return err - } - - // Write maps. - if _, err := f.Write(keyIDMap); err != nil { - return err - } else if _, err := f.Write(idOffsetMap); err != nil { - return err - } - - // Sync & close. - if err := f.Sync(); err != nil { - return err - } else if err := f.Close(); err != nil { - return err - } - - return nil -} - -func (c *SeriesFileCompactor) insertKeyIDMap(dst []byte, capacity int64, segments []*SeriesSegment, key []byte, offset int64, id uint64) error { - mask := capacity - 1 - hash := rhh.HashKey(key) - - // Continue searching until we find an empty slot or lower probe distance. - for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask { - assert(i <= capacity, "key/id map full") - elem := dst[(pos * SeriesIndexElemSize):] - - // If empty slot found or matching offset, insert and exit. - elemOffset := int64(binary.BigEndian.Uint64(elem[:8])) - elemID := binary.BigEndian.Uint64(elem[8:]) - if elemOffset == 0 || elemOffset == offset { - binary.BigEndian.PutUint64(elem[:8], uint64(offset)) - binary.BigEndian.PutUint64(elem[8:], id) - return nil - } - - // Read key at position & hash. - elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize) - elemHash := rhh.HashKey(elemKey) - - // If the existing elem has probed less than us, then swap places with - // existing elem, and keep going to find another slot for that elem. - if d := rhh.Dist(elemHash, pos, capacity); d < dist { - // Insert current values. - binary.BigEndian.PutUint64(elem[:8], uint64(offset)) - binary.BigEndian.PutUint64(elem[8:], id) - - // Swap with values in that position. - hash, key, offset, id = elemHash, elemKey, elemOffset, elemID - - // Update current distance. - dist = d - } - } -} - -func (c *SeriesFileCompactor) insertIDOffsetMap(dst []byte, capacity int64, id uint64, offset int64) { - mask := capacity - 1 - hash := rhh.HashUint64(id) - - // Continue searching until we find an empty slot or lower probe distance. - for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask { - assert(i <= capacity, "id/offset map full") - elem := dst[(pos * SeriesIndexElemSize):] - - // If empty slot found or matching id, insert and exit. - elemID := binary.BigEndian.Uint64(elem[:8]) - elemOffset := int64(binary.BigEndian.Uint64(elem[8:])) - if elemOffset == 0 || elemOffset == offset { - binary.BigEndian.PutUint64(elem[:8], id) - binary.BigEndian.PutUint64(elem[8:], uint64(offset)) - return - } - - // Hash key. - elemHash := rhh.HashUint64(elemID) - - // If the existing elem has probed less than us, then swap places with - // existing elem, and keep going to find another slot for that elem. - if d := rhh.Dist(elemHash, pos, capacity); d < dist { - // Insert current values. - binary.BigEndian.PutUint64(elem[:8], id) - binary.BigEndian.PutUint64(elem[8:], uint64(offset)) - - // Swap with values in that position. - hash, id, offset = elemHash, elemID, elemOffset - - // Update current distance. - dist = d - } - } -} - -// pow2 returns the number that is the next highest power of 2. -// Returns v if it is a power of 2. -func pow2(v int64) int64 { - for i := int64(2); i < 1<<62; i *= 2 { - if i >= v { - return i - } - } - panic("unreachable") -} +func (a uint64Slice) Len() int { return len(a) } +func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] } diff --git a/tsdb/series_file_test.go b/tsdb/series_file_test.go index 81b20933ac..d13f516bca 100644 --- a/tsdb/series_file_test.go +++ b/tsdb/series_file_test.go @@ -48,9 +48,13 @@ func TestSeriesFile_Series(t *testing.T) { // Ensure series file can be compacted. func TestSeriesFileCompactor(t *testing.T) { sfile := MustOpenSeriesFile() - sfile.CompactThreshold = 0 defer sfile.Close() + // Disable automatic compactions. + for _, p := range sfile.Partitions() { + p.CompactThreshold = 0 + } + var names [][]byte var tagsSlice []models.Tags for i := 0; i < 10000; i++ { @@ -66,10 +70,12 @@ func TestSeriesFileCompactor(t *testing.T) { t.Fatalf("unexpected series count: %d", n) } - // Compact in-place. - compactor := tsdb.NewSeriesFileCompactor() - if err := compactor.Compact(sfile.SeriesFile); err != nil { - t.Fatal(err) + // Compact in-place for each partition. + for _, p := range sfile.Partitions() { + compactor := tsdb.NewSeriesPartitionCompactor() + if err := compactor.Compact(p); err != nil { + t.Fatal(err) + } } // Verify all series exist. diff --git a/tsdb/series_index.go b/tsdb/series_index.go index dbcff6ecb4..ea37629c19 100644 --- a/tsdb/series_index.go +++ b/tsdb/series_index.go @@ -182,7 +182,7 @@ func (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byt func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) uint64 { if v := idx.keyIDMap.Get(key); v != nil { - if id, _ := v.(uint64); id != 0 { + if id, _ := v.(uint64); id != 0 && !idx.IsDeleted(id) { return id } } @@ -204,7 +204,11 @@ func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) if d > rhh.Dist(elemHash, pos, idx.capacity) { return 0 } else if elemHash == hash && bytes.Equal(elemKey, key) { - return binary.BigEndian.Uint64(elem[8:]) + id := binary.BigEndian.Uint64(elem[8:]) + if idx.IsDeleted(id) { + return 0 + } + return id } } } diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go new file mode 100644 index 0000000000..565bc94bf4 --- /dev/null +++ b/tsdb/series_partition.go @@ -0,0 +1,665 @@ +package tsdb + +import ( + "encoding/binary" + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sync" + "time" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/rhh" + "go.uber.org/zap" +) + +var ( + ErrSeriesPartitionClosed = errors.New("tsdb: series partition closed") +) + +// DefaultSeriesPartitionCompactThreshold is the number of series IDs to hold in the in-memory +// series map before compacting and rebuilding the on-disk representation. +const DefaultSeriesPartitionCompactThreshold = 1 << 17 // 128K + +// SeriesPartition represents a subset of series file data. +type SeriesPartition struct { + mu sync.RWMutex + wg sync.WaitGroup + id int + path string + closed bool + + segments []*SeriesSegment + index *SeriesIndex + seq uint64 // series id sequence + + compacting bool + + CompactThreshold int + + Logger *zap.Logger +} + +// NewSeriesPartition returns a new instance of SeriesPartition. +func NewSeriesPartition(id int, path string) *SeriesPartition { + return &SeriesPartition{ + id: id, + path: path, + CompactThreshold: DefaultSeriesPartitionCompactThreshold, + Logger: zap.NewNop(), + } +} + +// Open memory maps the data file at the partition's path. +func (p *SeriesPartition) Open() error { + if p.closed { + return errors.New("tsdb: cannot reopen series partition") + } + + // Create path if it doesn't exist. + if err := os.MkdirAll(filepath.Join(p.path), 0777); err != nil { + return err + } + + // Open components. + if err := func() (err error) { + if err := p.openSegments(); err != nil { + return err + } + + // Init last segment for writes. + if err := p.activeSegment().InitForWrite(); err != nil { + return err + } + + p.index = NewSeriesIndex(p.IndexPath()) + if err := p.index.Open(); err != nil { + return err + } else if p.index.Recover(p.segments); err != nil { + return err + } + + return nil + }(); err != nil { + p.Close() + return err + } + + return nil +} + +func (p *SeriesPartition) openSegments() error { + fis, err := ioutil.ReadDir(p.path) + if err != nil { + return err + } + + for _, fi := range fis { + segmentID, err := ParseSeriesSegmentFilename(fi.Name()) + if err != nil { + continue + } + + segment := NewSeriesSegment(segmentID, filepath.Join(p.path, fi.Name())) + if err := segment.Open(); err != nil { + return err + } + p.segments = append(p.segments, segment) + } + + // Find max series id by searching segments in reverse order. + for i := len(p.segments) - 1; i >= 0; i-- { + if p.seq = p.segments[i].MaxSeriesID(); p.seq > 0 { + break + } + } + + // Create initial segment if none exist. + if len(p.segments) == 0 { + segment, err := CreateSeriesSegment(0, filepath.Join(p.path, "0000")) + if err != nil { + return err + } + p.segments = append(p.segments, segment) + } + + return nil +} + +// Close unmaps the data files. +func (p *SeriesPartition) Close() (err error) { + p.wg.Wait() + + p.mu.Lock() + defer p.mu.Unlock() + + p.closed = true + + for _, s := range p.segments { + if e := s.Close(); e != nil && err == nil { + err = e + } + } + p.segments = nil + + if p.index != nil { + if e := p.index.Close(); e != nil && err == nil { + err = e + } + } + p.index = nil + + return err +} + +// ID returns the partition id. +func (p *SeriesPartition) ID() int { return p.id } + +// Path returns the path to the partition. +func (p *SeriesPartition) Path() string { return p.path } + +// Path returns the path to the series index. +func (p *SeriesPartition) IndexPath() string { return filepath.Join(p.path, "index") } + +// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist. +// The returned ids list returns values for new series and zero for existing series. +func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitionIDs []int, ids []uint64) error { + var writeRequired bool + p.mu.RLock() + if p.closed { + p.mu.RUnlock() + return ErrSeriesPartitionClosed + } + for i := range keys { + if keyPartitionIDs[i] != p.id { + continue + } + id := p.index.FindIDBySeriesKey(p.segments, keys[i]) + if id == 0 { + writeRequired = true + continue + } + ids[i] = id + } + p.mu.RUnlock() + + // Exit if all series for this partition already exist. + if !writeRequired { + return nil + } + + type keyRange struct { + id uint64 + offset int64 + } + newKeyRanges := make([]keyRange, 0, len(keys)) + + // Obtain write lock to create new series. + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return ErrSeriesPartitionClosed + } + + // Track offsets of duplicate series. + newIDs := make(map[string]uint64, len(ids)) + + for i := range keys { + // Skip series that don't belong to the partition or have already been created. + if keyPartitionIDs[i] != p.id || ids[i] != 0 { + continue + } + + // Re-attempt lookup under write lock. + key := keys[i] + if ids[i] = newIDs[string(key)]; ids[i] != 0 { + continue + } else if ids[i] = p.index.FindIDBySeriesKey(p.segments, key); ids[i] != 0 { + continue + } + + // Write to series log and save offset. + id, offset, err := p.insert(key) + if err != nil { + return err + } + + // Append new key to be added to hash map after flush. + ids[i] = id + newIDs[string(key)] = id + newKeyRanges = append(newKeyRanges, keyRange{id, offset}) + } + + // Flush active segment writes so we can access data in mmap. + if segment := p.activeSegment(); segment != nil { + if err := segment.Flush(); err != nil { + return err + } + } + + // Add keys to hash map(s). + for _, keyRange := range newKeyRanges { + p.index.Insert(p.seriesKeyByOffset(keyRange.offset), keyRange.id, keyRange.offset) + } + + // Check if we've crossed the compaction threshold. + if !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) { + p.compacting = true + logger := p.Logger.With(zap.String("path", p.path)) + logger.Info("beginning series partition compaction") + + startTime := time.Now() + p.wg.Add(1) + go func() { + defer p.wg.Done() + + if err := NewSeriesPartitionCompactor().Compact(p); err != nil { + logger.With(zap.Error(err)).Error("series partition compaction failed") + } + + logger.With(zap.Duration("elapsed", time.Since(startTime))).Info("completed series partition compaction") + + // Clear compaction flag. + p.mu.Lock() + p.compacting = false + p.mu.Unlock() + }() + } + + return nil +} + +// DeleteSeriesID flags a series as permanently deleted. +// If the series is reintroduced later then it must create a new id. +func (p *SeriesPartition) DeleteSeriesID(id uint64) error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return ErrSeriesPartitionClosed + } + + // Already tombstoned, ignore. + if p.index.IsDeleted(id) { + return nil + } + + // Write tombstone entry. + _, err := p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryTombstoneFlag, id, nil)) + if err != nil { + return err + } + + // Mark tombstone in memory. + p.index.Delete(id) + + return nil +} + +// IsDeleted returns true if the ID has been deleted before. +func (p *SeriesPartition) IsDeleted(id uint64) bool { + p.mu.RLock() + if p.closed { + p.mu.RUnlock() + return false + } + v := p.index.IsDeleted(id) + p.mu.RUnlock() + return v +} + +// SeriesKey returns the series key for a given id. +func (p *SeriesPartition) SeriesKey(id uint64) []byte { + if id == 0 { + return nil + } + p.mu.RLock() + if p.closed { + p.mu.RUnlock() + return nil + } + key := p.seriesKeyByOffset(p.index.FindOffsetByID(id)) + p.mu.RUnlock() + return key +} + +// Series returns the parsed series name and tags for an offset. +func (p *SeriesPartition) Series(id uint64) ([]byte, models.Tags) { + key := p.SeriesKey(id) + if key == nil { + return nil, nil + } + return ParseSeriesKey(key) +} + +// FindIDBySeriesKey return the series id for the series key. +func (p *SeriesPartition) FindIDBySeriesKey(key []byte) uint64 { + p.mu.RLock() + if p.closed { + p.mu.RUnlock() + return 0 + } + id := p.index.FindIDBySeriesKey(p.segments, key) + p.mu.RUnlock() + return id +} + +// SeriesCount returns the number of series. +func (p *SeriesPartition) SeriesCount() uint64 { + p.mu.RLock() + if p.closed { + p.mu.RUnlock() + return 0 + } + n := p.index.Count() + p.mu.RUnlock() + return n +} + +// AppendSeriesIDs returns a list of all series ids. +func (p *SeriesPartition) AppendSeriesIDs(a []uint64) []uint64 { + for _, segment := range p.segments { + a = segment.AppendSeriesIDs(a) + } + return a +} + +// activeSegment returns the last segment. +func (p *SeriesPartition) activeSegment() *SeriesSegment { + if len(p.segments) == 0 { + return nil + } + return p.segments[len(p.segments)-1] +} + +func (p *SeriesPartition) insert(key []byte) (id uint64, offset int64, err error) { + // ID is built using a autoincrement sequence joined to the partition id. + // Format: + id = ((p.seq + 1) << 8) | uint64(p.id) + + offset, err = p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryInsertFlag, id, key)) + if err != nil { + return 0, 0, err + } + + p.seq++ + return id, offset, nil +} + +// writeLogEntry appends an entry to the end of the active segment. +// If there is no more room in the segment then a new segment is added. +func (p *SeriesPartition) writeLogEntry(data []byte) (offset int64, err error) { + segment := p.activeSegment() + if segment == nil || !segment.CanWrite(data) { + if segment, err = p.createSegment(); err != nil { + return 0, err + } + } + return segment.WriteLogEntry(data) +} + +// createSegment appends a new segment +func (p *SeriesPartition) createSegment() (*SeriesSegment, error) { + // Close writer for active segment, if one exists. + if segment := p.activeSegment(); segment != nil { + if err := segment.CloseForWrite(); err != nil { + return nil, err + } + } + + // Generate a new sequential segment identifier. + var id uint16 + if len(p.segments) > 0 { + id = p.segments[len(p.segments)-1].ID() + 1 + } + filename := fmt.Sprintf("%04x", id) + + // Generate new empty segment. + segment, err := CreateSeriesSegment(id, filepath.Join(p.path, filename)) + if err != nil { + return nil, err + } + p.segments = append(p.segments, segment) + + // Allow segment to write. + if err := segment.InitForWrite(); err != nil { + return nil, err + } + + return segment, nil +} + +func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte { + if offset == 0 { + return nil + } + + segmentID, pos := SplitSeriesOffset(offset) + for _, segment := range p.segments { + if segment.ID() != segmentID { + continue + } + + key, _ := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize)) + return key + } + + return nil +} + +// SeriesPartitionCompactor represents an object reindexes a series partition and optionally compacts segments. +type SeriesPartitionCompactor struct{} + +// NewSeriesPartitionCompactor returns a new instance of SeriesPartitionCompactor. +func NewSeriesPartitionCompactor() *SeriesPartitionCompactor { + return &SeriesPartitionCompactor{} +} + +// Compact rebuilds the series partition index. +func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) error { + // Snapshot the partitions and index so we can check tombstones and replay at the end under lock. + p.mu.RLock() + segments := CloneSeriesSegments(p.segments) + index := p.index.Clone() + seriesN := p.index.Count() + p.mu.RUnlock() + + // Compact index to a temporary location. + indexPath := index.path + ".compacting" + if err := c.compactIndexTo(index, seriesN, segments, indexPath); err != nil { + return err + } + + // Swap compacted index under lock & replay since compaction. + if err := func() error { + p.mu.Lock() + defer p.mu.Unlock() + + // Reopen index with new file. + if err := p.index.Close(); err != nil { + return err + } else if err := os.Rename(indexPath, index.path); err != nil { + return err + } else if err := p.index.Open(); err != nil { + return err + } + + // Replay new entries. + if err := p.index.Recover(p.segments); err != nil { + return err + } + return nil + }(); err != nil { + return err + } + + return nil +} + +func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error { + hdr := NewSeriesIndexHeader() + hdr.Count = seriesN + hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor) + + // Allocate space for maps. + keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) + idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) + + // Reindex all partitions. + for _, segment := range segments { + errDone := errors.New("done") + + if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { + // Make sure we don't go past the offset where the compaction began. + if offset >= index.maxOffset { + return errDone + } + + // Only process insert entries. + switch flag { + case SeriesEntryInsertFlag: // fallthrough + case SeriesEntryTombstoneFlag: + return nil + default: + return fmt.Errorf("unexpected series partition log entry flag: %d", flag) + } + + // Ignore entry if tombstoned. + if index.IsDeleted(id) { + return nil + } + + // Save max series identifier processed. + hdr.MaxSeriesID, hdr.MaxOffset = id, offset + + // Insert into maps. + c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset) + return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id) + }); err == errDone { + break + } else if err != nil { + return err + } + } + + // Open file handler. + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + // Calculate map positions. + hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = SeriesIndexHeaderSize, int64(len(keyIDMap)) + hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size, int64(len(idOffsetMap)) + + // Write header. + if _, err := hdr.WriteTo(f); err != nil { + return err + } + + // Write maps. + if _, err := f.Write(keyIDMap); err != nil { + return err + } else if _, err := f.Write(idOffsetMap); err != nil { + return err + } + + // Sync & close. + if err := f.Sync(); err != nil { + return err + } else if err := f.Close(); err != nil { + return err + } + + return nil +} + +func (c *SeriesPartitionCompactor) insertKeyIDMap(dst []byte, capacity int64, segments []*SeriesSegment, key []byte, offset int64, id uint64) error { + mask := capacity - 1 + hash := rhh.HashKey(key) + + // Continue searching until we find an empty slot or lower probe distance. + for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask { + assert(i <= capacity, "key/id map full") + elem := dst[(pos * SeriesIndexElemSize):] + + // If empty slot found or matching offset, insert and exit. + elemOffset := int64(binary.BigEndian.Uint64(elem[:8])) + elemID := binary.BigEndian.Uint64(elem[8:]) + if elemOffset == 0 || elemOffset == offset { + binary.BigEndian.PutUint64(elem[:8], uint64(offset)) + binary.BigEndian.PutUint64(elem[8:], id) + return nil + } + + // Read key at position & hash. + elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize) + elemHash := rhh.HashKey(elemKey) + + // If the existing elem has probed less than us, then swap places with + // existing elem, and keep going to find another slot for that elem. + if d := rhh.Dist(elemHash, pos, capacity); d < dist { + // Insert current values. + binary.BigEndian.PutUint64(elem[:8], uint64(offset)) + binary.BigEndian.PutUint64(elem[8:], id) + + // Swap with values in that position. + hash, key, offset, id = elemHash, elemKey, elemOffset, elemID + + // Update current distance. + dist = d + } + } +} + +func (c *SeriesPartitionCompactor) insertIDOffsetMap(dst []byte, capacity int64, id uint64, offset int64) { + mask := capacity - 1 + hash := rhh.HashUint64(id) + + // Continue searching until we find an empty slot or lower probe distance. + for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask { + assert(i <= capacity, "id/offset map full") + elem := dst[(pos * SeriesIndexElemSize):] + + // If empty slot found or matching id, insert and exit. + elemID := binary.BigEndian.Uint64(elem[:8]) + elemOffset := int64(binary.BigEndian.Uint64(elem[8:])) + if elemOffset == 0 || elemOffset == offset { + binary.BigEndian.PutUint64(elem[:8], id) + binary.BigEndian.PutUint64(elem[8:], uint64(offset)) + return + } + + // Hash key. + elemHash := rhh.HashUint64(elemID) + + // If the existing elem has probed less than us, then swap places with + // existing elem, and keep going to find another slot for that elem. + if d := rhh.Dist(elemHash, pos, capacity); d < dist { + // Insert current values. + binary.BigEndian.PutUint64(elem[:8], id) + binary.BigEndian.PutUint64(elem[8:], uint64(offset)) + + // Swap with values in that position. + hash, id, offset = elemHash, elemID, elemOffset + + // Update current distance. + dist = d + } + } +} + +// pow2 returns the number that is the next highest power of 2. +// Returns v if it is a power of 2. +func pow2(v int64) int64 { + for i := int64(2); i < 1<<62; i *= 2 { + if i >= v { + return i + } + } + panic("unreachable") +} diff --git a/tsdb/store.go b/tsdb/store.go index 20d4d42c08..1e106a72eb 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -843,23 +843,11 @@ func (s *Store) estimateCardinality(dbName string, getSketches func(*Shard) (est // SeriesCardinality returns the series cardinality for the provided database. func (s *Store) SeriesCardinality(database string) (int64, error) { - s.mu.RLock() - shards := s.filterShards(byDatabase(database)) - s.mu.RUnlock() - - // TODO(benbjohnson): Series file will be shared by the DB. - var max int64 - for _, shard := range shards { - index, err := shard.Index() - if err != nil { - return 0, err - } - - if n := index.SeriesN(); n > max { - max = n - } + sfile := s.seriesFile(database) + if sfile == nil { + return 0, nil } - return max, nil + return int64(sfile.SeriesCount()), nil } // MeasurementsCardinality returns the measurement cardinality for the provided