tsdb: remove hll sketches
This keeps file compatability by just writing out zeros for the sizes and offsets. Perhaps it's ok to just nuke everything and remove the data. It also keeps the hll package because it seems generally useful even if it's not currently being used.pull/10616/head
parent
381d449b82
commit
a7657ac409
|
@ -9,7 +9,6 @@ import (
|
|||
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/models"
|
||||
"github.com/influxdata/platform/pkg/estimator"
|
||||
"github.com/influxdata/platform/pkg/limiter"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -39,8 +38,6 @@ type Engine interface {
|
|||
DeleteSeriesRange(itr SeriesIterator, min, max int64) error
|
||||
DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error
|
||||
|
||||
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
|
||||
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
|
||||
SeriesN() int64
|
||||
|
||||
MeasurementExists(name []byte) (bool, error)
|
||||
|
|
|
@ -75,11 +75,8 @@ The series block stores raw series keys in sorted order. It also provides hash
|
|||
indexes so that series can be looked up quickly. Hash indexes are inserted
|
||||
periodically so that memory size is limited at write time. Once all the series
|
||||
and hash indexes have been written then a list of index entries are written
|
||||
so that hash indexes can be looked up via binary search.
|
||||
|
||||
The end of the block contains two HyperLogLog++ sketches which track the
|
||||
estimated number of created series and deleted series. After the sketches is
|
||||
a trailer which contains metadata about the block.
|
||||
so that hash indexes can be looked up via binary search. After the entries
|
||||
is a trailer which contains metadata about the block.
|
||||
|
||||
┏━━━━━━━SeriesBlock━━━━━━━━┓
|
||||
┃ ┌──────────────────────┐ ┃
|
||||
|
@ -105,8 +102,6 @@ a trailer which contains metadata about the block.
|
|||
┃ ├──────────────────────┤ ┃
|
||||
┃ │ Index Entries │ ┃
|
||||
┃ ├──────────────────────┤ ┃
|
||||
┃ │ HLL Sketches │ ┃
|
||||
┃ ├──────────────────────┤ ┃
|
||||
┃ │ Trailer │ ┃
|
||||
┃ └──────────────────────┘ ┃
|
||||
┗━━━━━━━━━━━━━━━━━━━━━━━━━━┛
|
||||
|
@ -167,9 +162,6 @@ series offsets, and the offset to their tag block. This allows all series for
|
|||
a measurement to be traversed quickly and it allows fast direct lookups of
|
||||
measurements and their tags.
|
||||
|
||||
This block also contains HyperLogLog++ sketches for new and deleted
|
||||
measurements.
|
||||
|
||||
┏━━━━Measurement Block━━━━━┓
|
||||
┃ ┌──────────────────────┐ ┃
|
||||
┃ │ Measurement │ ┃
|
||||
|
@ -182,8 +174,6 @@ measurements.
|
|||
┃ │ Hash Index │ ┃
|
||||
┃ │ │ ┃
|
||||
┃ ├──────────────────────┤ ┃
|
||||
┃ │ HLL Sketches │ ┃
|
||||
┃ ├──────────────────────┤ ┃
|
||||
┃ │ Trailer │ ┃
|
||||
┃ └──────────────────────┘ ┃
|
||||
┗━━━━━━━━━━━━━━━━━━━━━━━━━━┛
|
||||
|
|
|
@ -8,8 +8,6 @@ import (
|
|||
"unsafe"
|
||||
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/pkg/estimator"
|
||||
"github.com/influxdata/platform/pkg/estimator/hll"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
)
|
||||
|
||||
|
@ -408,36 +406,6 @@ func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.Serie
|
|||
return tsdb.NewSeriesIDSetIterator(ss), nil
|
||||
}
|
||||
|
||||
// MeasurementsSketches returns the merged measurement sketches for the FileSet.
|
||||
func (fs *FileSet) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||
sketch, tSketch := hll.NewDefaultPlus(), hll.NewDefaultPlus()
|
||||
for _, f := range fs.files {
|
||||
if s, t, err := f.MeasurementsSketches(); err != nil {
|
||||
return nil, nil, err
|
||||
} else if err := sketch.Merge(s); err != nil {
|
||||
return nil, nil, err
|
||||
} else if err := tSketch.Merge(t); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
return sketch, tSketch, nil
|
||||
}
|
||||
|
||||
// SeriesSketches returns the merged measurement sketches for the FileSet.
|
||||
func (fs *FileSet) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||
sketch, tSketch := hll.NewDefaultPlus(), hll.NewDefaultPlus()
|
||||
for _, f := range fs.files {
|
||||
if s, t, err := f.SeriesSketches(); err != nil {
|
||||
return nil, nil, err
|
||||
} else if err := sketch.Merge(s); err != nil {
|
||||
return nil, nil, err
|
||||
} else if err := tSketch.Merge(t); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
return sketch, tSketch, nil
|
||||
}
|
||||
|
||||
// File represents a log or index file.
|
||||
type File interface {
|
||||
Close() error
|
||||
|
@ -461,10 +429,6 @@ type File interface {
|
|||
TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator
|
||||
TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error)
|
||||
|
||||
// Sketches for cardinality estimation
|
||||
MeasurementsSketches() (s, t estimator.Sketch, err error)
|
||||
SeriesSketches() (s, t estimator.Sketch, err error)
|
||||
|
||||
// Bitmap series existance.
|
||||
SeriesIDSet() (*tsdb.SeriesIDSet, error)
|
||||
TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error)
|
||||
|
|
|
@ -20,8 +20,6 @@ import (
|
|||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/models"
|
||||
"github.com/influxdata/platform/pkg/estimator"
|
||||
"github.com/influxdata/platform/pkg/estimator/hll"
|
||||
"github.com/influxdata/platform/pkg/slices"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
"go.uber.org/zap"
|
||||
|
@ -128,10 +126,6 @@ type Index struct {
|
|||
sfile *tsdb.SeriesFile // series lookup file
|
||||
database string // Name of database.
|
||||
|
||||
// Cached sketches.
|
||||
mSketch, mTSketch estimator.Sketch // Measurement sketches
|
||||
sSketch, sTSketch estimator.Sketch // Series sketches
|
||||
|
||||
// Index's version.
|
||||
version int
|
||||
|
||||
|
@ -152,10 +146,6 @@ func NewIndex(sfile *tsdb.SeriesFile, database string, c Config, options ...Inde
|
|||
version: Version,
|
||||
sfile: sfile,
|
||||
database: database,
|
||||
mSketch: hll.NewDefaultPlus(),
|
||||
mTSketch: hll.NewDefaultPlus(),
|
||||
sSketch: hll.NewDefaultPlus(),
|
||||
sTSketch: hll.NewDefaultPlus(),
|
||||
PartitionN: DefaultPartitionN,
|
||||
}
|
||||
|
||||
|
@ -182,10 +172,6 @@ func (i *Index) Bytes() int {
|
|||
b += int(unsafe.Sizeof(i.logger))
|
||||
b += int(unsafe.Sizeof(i.sfile))
|
||||
// Do not count SeriesFile because it belongs to the code that constructed this Index.
|
||||
b += int(unsafe.Sizeof(i.mSketch)) + i.mSketch.Bytes()
|
||||
b += int(unsafe.Sizeof(i.mTSketch)) + i.mTSketch.Bytes()
|
||||
b += int(unsafe.Sizeof(i.sSketch)) + i.sSketch.Bytes()
|
||||
b += int(unsafe.Sizeof(i.sTSketch)) + i.sTSketch.Bytes()
|
||||
b += int(unsafe.Sizeof(i.database)) + len(i.database)
|
||||
b += int(unsafe.Sizeof(i.version))
|
||||
b += int(unsafe.Sizeof(i.PartitionN))
|
||||
|
@ -278,13 +264,6 @@ func (i *Index) Open() error {
|
|||
}
|
||||
}
|
||||
|
||||
// Refresh cached sketches.
|
||||
if err := i.updateSeriesSketches(); err != nil {
|
||||
return err
|
||||
} else if err := i.updateMeasurementSketches(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Mark opened.
|
||||
i.opened = true
|
||||
i.logger.Info("Index opened", zap.Int("partitions", partitionN))
|
||||
|
@ -364,36 +343,6 @@ func (i *Index) availableThreads() int {
|
|||
return n
|
||||
}
|
||||
|
||||
// updateMeasurementSketches rebuilds the cached measurement sketches.
|
||||
func (i *Index) updateMeasurementSketches() error {
|
||||
i.mSketch, i.mTSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus()
|
||||
for j := 0; j < int(i.PartitionN); j++ {
|
||||
if s, t, err := i.partitions[j].MeasurementsSketches(); err != nil {
|
||||
return err
|
||||
} else if i.mSketch.Merge(s); err != nil {
|
||||
return err
|
||||
} else if i.mTSketch.Merge(t); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateSeriesSketches rebuilds the cached series sketches.
|
||||
func (i *Index) updateSeriesSketches() error {
|
||||
i.sSketch, i.sTSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus()
|
||||
for j := 0; j < int(i.PartitionN); j++ {
|
||||
if s, t, err := i.partitions[j].SeriesSketches(); err != nil {
|
||||
return err
|
||||
} else if i.sSketch.Merge(s); err != nil {
|
||||
return err
|
||||
} else if i.sTSketch.Merge(t); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ForEachMeasurementName iterates over all measurement names in the index,
|
||||
// applying fn. It returns the first error encountered, if any.
|
||||
//
|
||||
|
@ -632,12 +581,6 @@ func (i *Index) DropMeasurement(name []byte) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Update sketches.
|
||||
i.mTSketch.Add(name)
|
||||
if err := i.updateSeriesSketches(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -734,14 +677,6 @@ func (i *Index) CreateSeriesListIfNotExists(collection *tsdb.SeriesCollection) e
|
|||
}
|
||||
}
|
||||
|
||||
// Update sketches.
|
||||
for _, key := range collection.Keys {
|
||||
i.sSketch.Add(key)
|
||||
}
|
||||
for _, name := range collection.Names {
|
||||
i.mSketch.Add(name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -762,8 +697,6 @@ func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags, typ
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i.sSketch.Add(key)
|
||||
i.mSketch.Add(name)
|
||||
|
||||
if len(ids) == 0 || ids[0].IsZero() {
|
||||
return nil // No new series, nothing further to update.
|
||||
|
@ -805,9 +738,6 @@ func (i *Index) DropSeries(seriesID tsdb.SeriesID, key []byte, cascade bool) err
|
|||
return err
|
||||
}
|
||||
|
||||
// Add sketch tombstone.
|
||||
i.sTSketch.Add(key)
|
||||
|
||||
if !cascade {
|
||||
return nil
|
||||
}
|
||||
|
@ -856,16 +786,6 @@ func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) error {
|
|||
return i.DropMeasurement(name)
|
||||
}
|
||||
|
||||
// MeasurementsSketches returns the two measurement sketches for the index.
|
||||
func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||
return i.mSketch, i.mTSketch, nil
|
||||
}
|
||||
|
||||
// SeriesSketches returns the two series sketches for the index.
|
||||
func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||
return i.sSketch, i.sTSketch, nil
|
||||
}
|
||||
|
||||
// SeriesN returns the series cardinality in the index. It is the sum of all
|
||||
// partition cardinalities.
|
||||
func (i *Index) SeriesN() int64 {
|
||||
|
|
|
@ -10,8 +10,6 @@ import (
|
|||
"unsafe"
|
||||
|
||||
"github.com/influxdata/platform/models"
|
||||
"github.com/influxdata/platform/pkg/estimator"
|
||||
"github.com/influxdata/platform/pkg/estimator/hll"
|
||||
"github.com/influxdata/platform/pkg/mmap"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
)
|
||||
|
@ -32,8 +30,7 @@ const (
|
|||
8 + 8 + // measurement block offset + size
|
||||
8 + 8 + // series id set offset + size
|
||||
8 + 8 + // tombstone series id set offset + size
|
||||
8 + 8 + // series sketch offset + size
|
||||
8 + 8 + // tombstone series sketch offset + size
|
||||
8 + 8 + 8 + 8 + // legacy sketch info
|
||||
0
|
||||
)
|
||||
|
||||
|
@ -57,9 +54,6 @@ type IndexFile struct {
|
|||
seriesIDSetData []byte
|
||||
tombstoneSeriesIDSetData []byte
|
||||
|
||||
// Series sketch data.
|
||||
sketchData, tSketchData []byte
|
||||
|
||||
// Sortable identifier & filepath to the log file.
|
||||
level int
|
||||
id int
|
||||
|
@ -181,10 +175,6 @@ func (f *IndexFile) UnmarshalBinary(data []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Slice series sketch data.
|
||||
f.sketchData = data[t.SeriesSketch.Offset : t.SeriesSketch.Offset+t.SeriesSketch.Size]
|
||||
f.tSketchData = data[t.TombstoneSeriesSketch.Offset : t.TombstoneSeriesSketch.Offset+t.TombstoneSeriesSketch.Size]
|
||||
|
||||
// Slice series set data.
|
||||
f.seriesIDSetData = data[t.SeriesIDSet.Offset : t.SeriesIDSet.Offset+t.SeriesIDSet.Size]
|
||||
f.tombstoneSeriesIDSetData = data[t.TombstoneSeriesIDSet.Offset : t.TombstoneSeriesIDSet.Offset+t.TombstoneSeriesIDSet.Size]
|
||||
|
@ -382,25 +372,6 @@ func (f *IndexFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterat
|
|||
return f.mblk.SeriesIDIterator(name)
|
||||
}
|
||||
|
||||
// MeasurementsSketches returns existence and tombstone sketches for measurements.
|
||||
func (f *IndexFile) MeasurementsSketches() (sketch, tSketch estimator.Sketch, err error) {
|
||||
return f.mblk.Sketches()
|
||||
}
|
||||
|
||||
// SeriesSketches returns existence and tombstone sketches for series.
|
||||
func (f *IndexFile) SeriesSketches() (sketch, tSketch estimator.Sketch, err error) {
|
||||
sketch = hll.NewDefaultPlus()
|
||||
if err := sketch.UnmarshalBinary(f.sketchData); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
tSketch = hll.NewDefaultPlus()
|
||||
if err := tSketch.UnmarshalBinary(f.tSketchData); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return sketch, tSketch, nil
|
||||
}
|
||||
|
||||
// ReadIndexFileTrailer returns the index file trailer from data.
|
||||
func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) {
|
||||
var t IndexFileTrailer
|
||||
|
@ -426,13 +397,8 @@ func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) {
|
|||
t.TombstoneSeriesIDSet.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
|
||||
t.TombstoneSeriesIDSet.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
|
||||
|
||||
// Read series sketch set info.
|
||||
t.SeriesSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
|
||||
t.SeriesSketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
|
||||
|
||||
// Read series tombstone sketch info.
|
||||
t.TombstoneSeriesSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
|
||||
t.TombstoneSeriesSketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
|
||||
// Skip over any legacy sketch data.
|
||||
buf = buf[8*4:]
|
||||
|
||||
if len(buf) != 2 { // Version field still in buffer.
|
||||
return t, fmt.Errorf("unread %d bytes left unread in trailer", len(buf)-2)
|
||||
|
@ -458,16 +424,6 @@ type IndexFileTrailer struct {
|
|||
Offset int64
|
||||
Size int64
|
||||
}
|
||||
|
||||
SeriesSketch struct {
|
||||
Offset int64
|
||||
Size int64
|
||||
}
|
||||
|
||||
TombstoneSeriesSketch struct {
|
||||
Offset int64
|
||||
Size int64
|
||||
}
|
||||
}
|
||||
|
||||
// WriteTo writes the trailer to w.
|
||||
|
@ -493,18 +449,11 @@ func (t *IndexFileTrailer) WriteTo(w io.Writer) (n int64, err error) {
|
|||
return n, err
|
||||
}
|
||||
|
||||
// Write series sketch info.
|
||||
if err := writeUint64To(w, uint64(t.SeriesSketch.Offset), &n); err != nil {
|
||||
return n, err
|
||||
} else if err := writeUint64To(w, uint64(t.SeriesSketch.Size), &n); err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Write series tombstone sketch info.
|
||||
if err := writeUint64To(w, uint64(t.TombstoneSeriesSketch.Offset), &n); err != nil {
|
||||
return n, err
|
||||
} else if err := writeUint64To(w, uint64(t.TombstoneSeriesSketch.Size), &n); err != nil {
|
||||
return n, err
|
||||
// Write legacy sketch info.
|
||||
for i := 0; i < 4; i++ {
|
||||
if err := writeUint64To(w, 0, &n); err != nil {
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
|
||||
// Write index file encoding version.
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/platform/pkg/bytesutil"
|
||||
"github.com/influxdata/platform/pkg/estimator/hll"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
)
|
||||
|
||||
|
@ -205,21 +204,6 @@ func (p IndexFiles) CompactTo(w io.Writer, sfile *tsdb.SeriesFile, m, k uint64,
|
|||
return n, err
|
||||
}
|
||||
|
||||
// Generate sketches from series sets.
|
||||
sketch := hll.NewDefaultPlus()
|
||||
seriesIDSet.ForEach(func(id tsdb.SeriesID) {
|
||||
if key := sfile.SeriesKey(id); key != nil {
|
||||
sketch.Add(key)
|
||||
}
|
||||
})
|
||||
|
||||
tSketch := hll.NewDefaultPlus()
|
||||
tombstoneSeriesIDSet.ForEach(func(id tsdb.SeriesID) {
|
||||
if key := sfile.SeriesKey(id); key != nil {
|
||||
tSketch.Add(key)
|
||||
}
|
||||
})
|
||||
|
||||
// Write series set.
|
||||
t.SeriesIDSet.Offset = n
|
||||
nn, err := seriesIDSet.WriteTo(bw)
|
||||
|
@ -236,26 +220,6 @@ func (p IndexFiles) CompactTo(w io.Writer, sfile *tsdb.SeriesFile, m, k uint64,
|
|||
}
|
||||
t.TombstoneSeriesIDSet.Size = n - t.TombstoneSeriesIDSet.Offset
|
||||
|
||||
// Write series sketches. TODO(edd): Implement WriterTo on HLL++.
|
||||
t.SeriesSketch.Offset = n
|
||||
data, err := sketch.MarshalBinary()
|
||||
if err != nil {
|
||||
return n, err
|
||||
} else if _, err := bw.Write(data); err != nil {
|
||||
return n, err
|
||||
}
|
||||
t.SeriesSketch.Size = int64(len(data))
|
||||
n += t.SeriesSketch.Size
|
||||
|
||||
t.TombstoneSeriesSketch.Offset = n
|
||||
if data, err = tSketch.MarshalBinary(); err != nil {
|
||||
return n, err
|
||||
} else if _, err := bw.Write(data); err != nil {
|
||||
return n, err
|
||||
}
|
||||
t.TombstoneSeriesSketch.Size = int64(len(data))
|
||||
n += t.TombstoneSeriesSketch.Size
|
||||
|
||||
// Write trailer.
|
||||
nn, err = t.WriteTo(bw)
|
||||
n += nn
|
||||
|
|
|
@ -16,8 +16,6 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/pkg/bloom"
|
||||
"github.com/influxdata/platform/models"
|
||||
"github.com/influxdata/platform/pkg/estimator"
|
||||
"github.com/influxdata/platform/pkg/estimator/hll"
|
||||
"github.com/influxdata/platform/pkg/mmap"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
)
|
||||
|
@ -876,32 +874,6 @@ func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n
|
|||
}
|
||||
t.TombstoneSeriesIDSet.Size = n - t.TombstoneSeriesIDSet.Offset
|
||||
|
||||
// Build series sketches.
|
||||
sSketch, sTSketch, err := f.seriesSketches()
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Write series sketches.
|
||||
t.SeriesSketch.Offset = n
|
||||
data, err := sSketch.MarshalBinary()
|
||||
if err != nil {
|
||||
return n, err
|
||||
} else if _, err := bw.Write(data); err != nil {
|
||||
return n, err
|
||||
}
|
||||
t.SeriesSketch.Size = int64(len(data))
|
||||
n += t.SeriesSketch.Size
|
||||
|
||||
t.TombstoneSeriesSketch.Offset = n
|
||||
if data, err = sTSketch.MarshalBinary(); err != nil {
|
||||
return n, err
|
||||
} else if _, err := bw.Write(data); err != nil {
|
||||
return n, err
|
||||
}
|
||||
t.TombstoneSeriesSketch.Size = int64(len(data))
|
||||
n += t.TombstoneSeriesSketch.Size
|
||||
|
||||
// Write trailer.
|
||||
nn, err = t.WriteTo(bw)
|
||||
n += nn
|
||||
|
@ -1034,47 +1006,6 @@ type logFileMeasurementCompactInfo struct {
|
|||
size int64
|
||||
}
|
||||
|
||||
// MeasurementsSketches returns sketches for existing and tombstoned measurement names.
|
||||
func (f *LogFile) MeasurementsSketches() (sketch, tSketch estimator.Sketch, err error) {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
return f.measurementsSketches()
|
||||
}
|
||||
|
||||
func (f *LogFile) measurementsSketches() (sketch, tSketch estimator.Sketch, err error) {
|
||||
sketch, tSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus()
|
||||
for _, mm := range f.mms {
|
||||
if mm.deleted {
|
||||
tSketch.Add(mm.name)
|
||||
} else {
|
||||
sketch.Add(mm.name)
|
||||
}
|
||||
}
|
||||
return sketch, tSketch, nil
|
||||
}
|
||||
|
||||
// SeriesSketches returns sketches for existing and tombstoned series.
|
||||
func (f *LogFile) SeriesSketches() (sketch, tSketch estimator.Sketch, err error) {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
return f.seriesSketches()
|
||||
}
|
||||
|
||||
func (f *LogFile) seriesSketches() (sketch, tSketch estimator.Sketch, err error) {
|
||||
sketch = hll.NewDefaultPlus()
|
||||
f.seriesIDSet.ForEach(func(id tsdb.SeriesID) {
|
||||
name, keys := f.sfile.Series(id)
|
||||
sketch.Add(models.MakeKey(name, keys))
|
||||
})
|
||||
|
||||
tSketch = hll.NewDefaultPlus()
|
||||
f.tombstoneSeriesIDSet.ForEach(func(id tsdb.SeriesID) {
|
||||
name, keys := f.sfile.Series(id)
|
||||
sketch.Add(models.MakeKey(name, keys))
|
||||
})
|
||||
return sketch, tSketch, nil
|
||||
}
|
||||
|
||||
// MeasurementCardinalityStats returns cardinality stats for this log file.
|
||||
func (f *LogFile) MeasurementCardinalityStats() MeasurementCardinalityStats {
|
||||
f.mu.RLock()
|
||||
|
|
|
@ -8,8 +8,6 @@ import (
|
|||
"sort"
|
||||
"unsafe"
|
||||
|
||||
"github.com/influxdata/platform/pkg/estimator"
|
||||
"github.com/influxdata/platform/pkg/estimator/hll"
|
||||
"github.com/influxdata/platform/pkg/rhh"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
)
|
||||
|
@ -33,8 +31,7 @@ const (
|
|||
2 + // version
|
||||
8 + 8 + // data offset/size
|
||||
8 + 8 + // hash index offset/size
|
||||
8 + 8 + // measurement sketch offset/size
|
||||
8 + 8 // tombstone measurement sketch offset/size
|
||||
8 + 8 + 8 + 8 // legacy sketch info
|
||||
|
||||
// Measurement key block fields.
|
||||
MeasurementNSize = 8
|
||||
|
@ -54,9 +51,6 @@ type MeasurementBlock struct {
|
|||
data []byte
|
||||
hashData []byte
|
||||
|
||||
// Measurement sketch and tombstone sketch for cardinality estimation.
|
||||
sketchData, tSketchData []byte
|
||||
|
||||
version int // block version
|
||||
}
|
||||
|
||||
|
@ -131,10 +125,6 @@ func (blk *MeasurementBlock) UnmarshalBinary(data []byte) error {
|
|||
blk.hashData = data[t.HashIndex.Offset:]
|
||||
blk.hashData = blk.hashData[:t.HashIndex.Size]
|
||||
|
||||
// Initialise sketch data.
|
||||
blk.sketchData = data[t.Sketch.Offset:][:t.Sketch.Size]
|
||||
blk.tSketchData = data[t.TSketch.Offset:][:t.TSketch.Size]
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -156,20 +146,6 @@ func (blk *MeasurementBlock) SeriesIDIterator(name []byte) tsdb.SeriesIDIterator
|
|||
return &rawSeriesIDIterator{n: e.series.n, data: e.series.data}
|
||||
}
|
||||
|
||||
// Sketches returns existence and tombstone measurement sketches.
|
||||
func (blk *MeasurementBlock) Sketches() (sketch, tSketch estimator.Sketch, err error) {
|
||||
sketch = hll.NewDefaultPlus()
|
||||
if err := sketch.UnmarshalBinary(blk.sketchData); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
tSketch = hll.NewDefaultPlus()
|
||||
if err := tSketch.UnmarshalBinary(blk.tSketchData); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return sketch, tSketch, nil
|
||||
}
|
||||
|
||||
// blockMeasurementIterator iterates over a list measurements in a block.
|
||||
type blockMeasurementIterator struct {
|
||||
elem MeasurementBlockElem
|
||||
|
@ -249,18 +225,6 @@ type MeasurementBlockTrailer struct {
|
|||
Offset int64
|
||||
Size int64
|
||||
}
|
||||
|
||||
// Offset and size of cardinality sketch for measurements.
|
||||
Sketch struct {
|
||||
Offset int64
|
||||
Size int64
|
||||
}
|
||||
|
||||
// Offset and size of cardinality sketch for tombstoned measurements.
|
||||
TSketch struct {
|
||||
Offset int64
|
||||
Size int64
|
||||
}
|
||||
}
|
||||
|
||||
// ReadMeasurementBlockTrailer returns the block trailer from data.
|
||||
|
@ -284,13 +248,8 @@ func ReadMeasurementBlockTrailer(data []byte) (MeasurementBlockTrailer, error) {
|
|||
t.HashIndex.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
|
||||
t.HashIndex.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
|
||||
|
||||
// Read measurement sketch info.
|
||||
t.Sketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
|
||||
t.Sketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
|
||||
|
||||
// Read tombstone measurement sketch info.
|
||||
t.TSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
|
||||
t.TSketch.Size = int64(binary.BigEndian.Uint64(buf[0:8]))
|
||||
// Skip over old sketch info
|
||||
buf = buf[4*8:]
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
@ -311,18 +270,11 @@ func (t *MeasurementBlockTrailer) WriteTo(w io.Writer) (n int64, err error) {
|
|||
return n, err
|
||||
}
|
||||
|
||||
// Write measurement sketch info.
|
||||
if err := writeUint64To(w, uint64(t.Sketch.Offset), &n); err != nil {
|
||||
return n, err
|
||||
} else if err := writeUint64To(w, uint64(t.Sketch.Size), &n); err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Write tombstone measurement sketch info.
|
||||
if err := writeUint64To(w, uint64(t.TSketch.Offset), &n); err != nil {
|
||||
return n, err
|
||||
} else if err := writeUint64To(w, uint64(t.TSketch.Size), &n); err != nil {
|
||||
return n, err
|
||||
// Write legacy sketch info.
|
||||
for i := 0; i < 4; i++ {
|
||||
if err := writeUint64To(w, 0, &n); err != nil {
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
|
||||
// Write measurement block version.
|
||||
|
@ -480,17 +432,12 @@ func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error {
|
|||
type MeasurementBlockWriter struct {
|
||||
buf bytes.Buffer
|
||||
mms map[string]measurement
|
||||
|
||||
// Measurement sketch and tombstoned measurement sketch.
|
||||
sketch, tSketch estimator.Sketch
|
||||
}
|
||||
|
||||
// NewMeasurementBlockWriter returns a new MeasurementBlockWriter.
|
||||
func NewMeasurementBlockWriter() *MeasurementBlockWriter {
|
||||
return &MeasurementBlockWriter{
|
||||
mms: make(map[string]measurement),
|
||||
sketch: hll.NewDefaultPlus(),
|
||||
tSketch: hll.NewDefaultPlus(),
|
||||
mms: make(map[string]measurement),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -509,25 +456,12 @@ func (mw *MeasurementBlockWriter) Add(name []byte, deleted bool, offset, size in
|
|||
}
|
||||
|
||||
mw.mms[string(name)] = mm
|
||||
|
||||
if deleted {
|
||||
mw.tSketch.Add(name)
|
||||
} else {
|
||||
mw.sketch.Add(name)
|
||||
}
|
||||
}
|
||||
|
||||
// WriteTo encodes the measurements to w.
|
||||
func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) {
|
||||
var t MeasurementBlockTrailer
|
||||
|
||||
// The sketches must be set before calling WriteTo.
|
||||
if mw.sketch == nil {
|
||||
return 0, errors.New("measurement sketch not set")
|
||||
} else if mw.tSketch == nil {
|
||||
return 0, errors.New("measurement tombstone sketch not set")
|
||||
}
|
||||
|
||||
// Sort names.
|
||||
names := make([]string, 0, len(mw.mms))
|
||||
for name := range mw.mms {
|
||||
|
@ -589,19 +523,6 @@ func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) {
|
|||
}
|
||||
t.HashIndex.Size = n - t.HashIndex.Offset
|
||||
|
||||
// Write the sketches out.
|
||||
t.Sketch.Offset = n
|
||||
if err := writeSketchTo(w, mw.sketch, &n); err != nil {
|
||||
return n, err
|
||||
}
|
||||
t.Sketch.Size = n - t.Sketch.Offset
|
||||
|
||||
t.TSketch.Offset = n
|
||||
if err := writeSketchTo(w, mw.tSketch, &n); err != nil {
|
||||
return n, err
|
||||
}
|
||||
t.TSketch.Size = n - t.TSketch.Offset
|
||||
|
||||
// Write trailer.
|
||||
nn, err := t.WriteTo(w)
|
||||
n += nn
|
||||
|
@ -656,19 +577,6 @@ func (mw *MeasurementBlockWriter) writeMeasurementTo(w io.Writer, name []byte, m
|
|||
return err
|
||||
}
|
||||
|
||||
// writeSketchTo writes an estimator.Sketch into w, updating the number of bytes
|
||||
// written via n.
|
||||
func writeSketchTo(w io.Writer, s estimator.Sketch, n *int64) error {
|
||||
data, err := s.MarshalBinary()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nn, err := w.Write(data)
|
||||
*n += int64(nn)
|
||||
return err
|
||||
}
|
||||
|
||||
type measurement struct {
|
||||
deleted bool
|
||||
tagBlock struct {
|
||||
|
|
|
@ -18,18 +18,16 @@ func TestReadMeasurementBlockTrailer(t *testing.T) {
|
|||
blockversion = uint16(1)
|
||||
blockOffset, blockSize = uint64(1), uint64(2500)
|
||||
hashIdxOffset, hashIdxSize = uint64(2501), uint64(1000)
|
||||
sketchOffset, sketchSize = uint64(3501), uint64(250)
|
||||
tsketchOffset, tsketchSize = uint64(3751), uint64(250)
|
||||
)
|
||||
|
||||
binary.BigEndian.PutUint64(data[0:], blockOffset)
|
||||
binary.BigEndian.PutUint64(data[8:], blockSize)
|
||||
binary.BigEndian.PutUint64(data[16:], hashIdxOffset)
|
||||
binary.BigEndian.PutUint64(data[24:], hashIdxSize)
|
||||
binary.BigEndian.PutUint64(data[32:], sketchOffset)
|
||||
binary.BigEndian.PutUint64(data[40:], sketchSize)
|
||||
binary.BigEndian.PutUint64(data[48:], tsketchOffset)
|
||||
binary.BigEndian.PutUint64(data[56:], tsketchSize)
|
||||
binary.BigEndian.PutUint64(data[32:], 0)
|
||||
binary.BigEndian.PutUint64(data[40:], 0)
|
||||
binary.BigEndian.PutUint64(data[48:], 0)
|
||||
binary.BigEndian.PutUint64(data[56:], 0)
|
||||
binary.BigEndian.PutUint16(data[64:], blockversion)
|
||||
|
||||
trailer, err := tsi1.ReadMeasurementBlockTrailer(data)
|
||||
|
@ -43,11 +41,7 @@ func TestReadMeasurementBlockTrailer(t *testing.T) {
|
|||
trailer.Data.Offset == int64(blockOffset) &&
|
||||
trailer.Data.Size == int64(blockSize) &&
|
||||
trailer.HashIndex.Offset == int64(hashIdxOffset) &&
|
||||
trailer.HashIndex.Size == int64(hashIdxSize) &&
|
||||
trailer.Sketch.Offset == int64(sketchOffset) &&
|
||||
trailer.Sketch.Size == int64(sketchSize) &&
|
||||
trailer.TSketch.Offset == int64(tsketchOffset) &&
|
||||
trailer.TSketch.Size == int64(tsketchSize)
|
||||
trailer.HashIndex.Size == int64(hashIdxSize)
|
||||
|
||||
if !ok {
|
||||
t.Fatalf("got %v\nwhich does not match expected", trailer)
|
||||
|
@ -65,14 +59,6 @@ func TestMeasurementBlockTrailer_WriteTo(t *testing.T) {
|
|||
Offset int64
|
||||
Size int64
|
||||
}{Offset: 3, Size: 4},
|
||||
Sketch: struct {
|
||||
Offset int64
|
||||
Size int64
|
||||
}{Offset: 5, Size: 6},
|
||||
TSketch: struct {
|
||||
Offset int64
|
||||
Size int64
|
||||
}{Offset: 7, Size: 8},
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
|
@ -91,10 +77,10 @@ func TestMeasurementBlockTrailer_WriteTo(t *testing.T) {
|
|||
"0000000000000002" + // data size
|
||||
"0000000000000003" + // hash index offset
|
||||
"0000000000000004" + // hash index size
|
||||
"0000000000000005" + // sketch offset
|
||||
"0000000000000006" + // sketch size
|
||||
"0000000000000007" + // tsketch offset
|
||||
"0000000000000008" + // tsketch size
|
||||
"0000000000000000" + // legacy sketch offset
|
||||
"0000000000000000" + // legacy sketch size
|
||||
"0000000000000000" + // legacy tsketch offset
|
||||
"0000000000000000" + // legacy tsketch size
|
||||
"0001" // version
|
||||
|
||||
if got, exp := fmt.Sprintf("%x", buf.String()), exp; got != exp {
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/logger"
|
||||
"github.com/influxdata/platform/pkg/bytesutil"
|
||||
"github.com/influxdata/platform/pkg/estimator"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -697,28 +696,6 @@ func (p *Partition) DropSeries(seriesID tsdb.SeriesID) error {
|
|||
return p.CheckLogFile()
|
||||
}
|
||||
|
||||
// MeasurementsSketches returns the two sketches for the partition by merging all
|
||||
// instances of the type sketch types in all the index files.
|
||||
func (p *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||
fs, err := p.RetainFileSet()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer fs.Release()
|
||||
return fs.MeasurementsSketches()
|
||||
}
|
||||
|
||||
// SeriesSketches returns the two sketches for the partition by merging all
|
||||
// instances of the type sketch types in all the index files.
|
||||
func (p *Partition) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||
fs, err := p.RetainFileSet()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer fs.Release()
|
||||
return fs.SeriesSketches()
|
||||
}
|
||||
|
||||
// HasTagKey returns true if tag key exists.
|
||||
func (p *Partition) HasTagKey(name, key []byte) (bool, error) {
|
||||
fs, err := p.RetainFileSet()
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
"github.com/influxdata/platform/logger"
|
||||
"github.com/influxdata/platform/models"
|
||||
"github.com/influxdata/platform/pkg/bytesutil"
|
||||
"github.com/influxdata/platform/pkg/estimator"
|
||||
"github.com/influxdata/platform/pkg/limiter"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
"github.com/influxdata/platform/tsdb/tsi1"
|
||||
|
@ -464,20 +463,6 @@ func (e *Engine) SeriesN() int64 {
|
|||
return e.index.SeriesN()
|
||||
}
|
||||
|
||||
// MeasurementsSketches returns sketches that describe the cardinality of the
|
||||
// measurements in this shard and measurements that were in this shard, but have
|
||||
// been tombstoned.
|
||||
func (e *Engine) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||
return e.index.MeasurementsSketches()
|
||||
}
|
||||
|
||||
// SeriesSketches returns sketches that describe the cardinality of the
|
||||
// series in this shard and series that were in this shard, but have
|
||||
// been tombstoned.
|
||||
func (e *Engine) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||
return e.index.SeriesSketches()
|
||||
}
|
||||
|
||||
// LastModified returns the time when this shard was last modified.
|
||||
func (e *Engine) LastModified() time.Time {
|
||||
fsTime := e.FileStore.LastModified()
|
||||
|
|
Loading…
Reference in New Issue