Add empty series sketches back to tsi1 index

This commit adds initial empty sketches back to the tsi1 index, as well
as ensuring that ephemeral sketches in the index `LogFile` are updated
accordingly.

The commit also adds a test that verifies that the merged sketches at
the store level produce the correct results under writes, deletions and
re-opening of the store.

This commit does not provide working sketches for post-compaction on the
tsi1 index.
pull/9412/head
Edd Robinson 2018-02-01 16:20:52 +00:00 committed by Ben Johnson
parent c8f30da88a
commit 544329380f
No known key found for this signature in database
GPG Key ID: 81741CD251883081
12 changed files with 290 additions and 26 deletions

View File

@ -163,6 +163,10 @@ func (h *Plus) Add(v []byte) {
// Count returns a cardinality estimate.
func (h *Plus) Count() uint64 {
if h == nil {
return 0 // Nothing to do.
}
if h.sparse {
h.mergeSparse()
return uint64(h.linearCount(h.mp, h.mp-uint32(h.sparseList.count)))

View File

@ -57,6 +57,7 @@ type Engine interface {
DeleteSeriesRange(itr SeriesIterator, min, max int64) error
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
MeasurementExists(name []byte) (bool, error)

View File

@ -532,10 +532,20 @@ func (e *Engine) SeriesN() int64 {
return e.index.SeriesN()
}
// MeasurementsSketches returns sketches that describe the cardinality of the
// measurements in this shard and measurements that were in this shard, but have
// been tombstoned.
func (e *Engine) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
return e.index.MeasurementsSketches()
}
// SeriesSketches returns sketches that describe the cardinality of the
// series in this shard and series that were in this shard, but have
// been tombstoned.
func (e *Engine) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
return e.index.SeriesSketches()
}
// LastModified returns the time when this shard was last modified.
func (e *Engine) LastModified() time.Time {
walTime := e.WAL.LastWriteTime()

View File

@ -36,6 +36,7 @@ type Index interface {
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
HasTagKey(name, key []byte) (bool, error)
HasTagValue(name, key, value []byte) (bool, error)

View File

@ -408,6 +408,19 @@ func (fs *FileSet) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, e
return sketch, tsketch, nil
}
// SeriesSketches returns the merged measurement sketches for the FileSet.
func (fs *FileSet) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
sketch, tsketch := hll.NewDefaultPlus(), hll.NewDefaultPlus()
// Iterate over all the files and merge the sketches into the result.
for _, f := range fs.files {
if err := f.MergeSeriesSketches(sketch, tsketch); err != nil {
return nil, nil, err
}
}
return sketch, tsketch, nil
}
// File represents a log or index file.
type File interface {
Close() error
@ -433,6 +446,7 @@ type File interface {
// Sketches for cardinality estimation
MergeMeasurementsSketches(s, t estimator.Sketch) error
MergeSeriesSketches(s, t estimator.Sketch) error
// Bitmap series existance.
SeriesIDSet() (*tsdb.SeriesIDSet, error)

View File

@ -594,7 +594,7 @@ func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) error {
}
// MeasurementsSketches returns the two sketches for the index by merging all
// instances of the type sketch types in all the index files.
// instances of the type sketch types in all the partitions.
func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
s, ts := hll.NewDefaultPlus(), hll.NewDefaultPlus()
for _, p := range i.partitions {
@ -614,8 +614,27 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro
return s, ts, nil
}
// SeriesN returns the number of unique non-tombstoned series in this index.
//
// SeriesSketches returns the two sketches for the index by merging all
// instances of the type sketch types in all the partitions.
func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
s, ts := hll.NewDefaultPlus(), hll.NewDefaultPlus()
for _, p := range i.partitions {
// Get partition's measurement sketches and merge.
ps, pts, err := p.SeriesSketches()
if err != nil {
return nil, nil, err
}
if err := s.Merge(ps); err != nil {
return nil, nil, err
} else if err := ts.Merge(pts); err != nil {
return nil, nil, err
}
}
return s, ts, nil
}
// Since indexes are not shared across shards, the count returned by SeriesN
// cannot be combined with other shard's results. If you need to count series
// across indexes then use either the database-wide series file, or merge the

View File

@ -52,6 +52,9 @@ type IndexFile struct {
seriesIDSetData []byte
tombstoneSeriesIDSetData []byte
// Series sketches
sketch, tSketch estimator.Sketch
// Sortable identifier & filepath to the log file.
level int
id int
@ -340,8 +343,8 @@ func (f *IndexFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterat
return f.mblk.SeriesIDIterator(name)
}
// MergeMeasurementsSketches merges the index file's series sketches into the provided
// sketches.
// MergeMeasurementsSketches merges the index file's measurements sketches into
// the provided sketches.
func (f *IndexFile) MergeMeasurementsSketches(s, t estimator.Sketch) error {
if err := s.Merge(f.mblk.sketch); err != nil {
return err
@ -349,6 +352,15 @@ func (f *IndexFile) MergeMeasurementsSketches(s, t estimator.Sketch) error {
return t.Merge(f.mblk.tSketch)
}
// MergeSeriesSketches merges the index file's series sketches into the provided
// sketches.
func (f *IndexFile) MergeSeriesSketches(s, t estimator.Sketch) error {
if err := s.Merge(f.sketch); err != nil {
return err
}
return t.Merge(f.tSketch)
}
// ReadIndexFileTrailer returns the index file trailer from data.
func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) {
var t IndexFileTrailer

View File

@ -162,7 +162,6 @@ func (f *LogFile) Close() error {
}
f.mms = make(logMeasurements)
return nil
}
@ -658,15 +657,13 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) {
mm.tagSet[string(k)] = ts
}
// TODO(edd) increment series count....
f.sSketch.Add(seriesKey) // Add series to sketch.
f.mSketch.Add(name) // Add measurement to sketch as this may be the fist series for the measurement.
// Add/remove from appropriate series id sets.
if !deleted {
f.sSketch.Add(seriesKey) // Add series to sketch - key in series file format.
f.seriesIDSet.Add(e.SeriesID)
f.tombstoneSeriesIDSet.Remove(e.SeriesID)
} else {
f.sTSketch.Add(seriesKey) // Add series to tombstone sketch - key in series file format.
f.seriesIDSet.Remove(e.SeriesID)
f.tombstoneSeriesIDSet.Add(e.SeriesID)
}
@ -732,6 +729,9 @@ func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement {
series: make(map[uint64]struct{}),
}
f.mms[string(name)] = mm
// Add measurement to sketch.
f.mSketch.Add(name)
}
return mm
}
@ -968,6 +968,20 @@ func (f *LogFile) MergeMeasurementsSketches(sketch, tsketch estimator.Sketch) er
return tsketch.Merge(f.mTSketch)
}
// MergeSeriesSketches merges the series sketches belonging to this
// LogFile into the provided sketches.
//
// MergeSeriesSketches is safe for concurrent use by multiple goroutines.
func (f *LogFile) MergeSeriesSketches(sketch, tsketch estimator.Sketch) error {
f.mu.RLock()
defer f.mu.RUnlock()
if err := sketch.Merge(f.sSketch); err != nil {
return err
}
return tsketch.Merge(f.sTSketch)
}
// LogEntry represents a single log entry in the write-ahead log.
type LogEntry struct {
Flag byte // flag

View File

@ -622,7 +622,7 @@ func (i *Partition) DropSeries(seriesID uint64) error {
return i.CheckLogFile()
}
// MeasurementsSketches returns the two sketches for the index by merging all
// MeasurementsSketches returns the two sketches for the partition by merging all
// instances of the type sketch types in all the index files.
func (i *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
fs, err := i.RetainFileSet()
@ -633,6 +633,17 @@ func (i *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch,
return fs.MeasurementsSketches()
}
// SeriesSketches returns the two sketches for the partition by merging all
// instances of the type sketch types in all the index files.
func (i *Partition) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
fs, err := i.RetainFileSet()
if err != nil {
return nil, nil, err
}
defer fs.Release()
return fs.SeriesSketches()
}
// HasTagKey returns true if tag key exists.
func (i *Partition) HasTagKey(name, key []byte) (bool, error) {
fs, err := i.RetainFileSet()

View File

@ -705,6 +705,15 @@ func (s *Shard) SeriesN() int64 {
return engine.SeriesN()
}
// SeriesSketches returns the measurement sketches for the shard.
func (s *Shard) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
engine, err := s.engine()
if err != nil {
return nil, nil, err
}
return engine.SeriesSketches()
}
// MeasurementsSketches returns the measurement sketches for the shard.
func (s *Shard) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
engine, err := s.engine()

View File

@ -849,7 +849,9 @@ func (s *Store) DiskSize() (int64, error) {
return size, nil
}
func (s *Store) estimateCardinality(dbName string, getSketches func(*Shard) (estimator.Sketch, estimator.Sketch, error)) (int64, error) {
// sketchesForDatabase returns merged sketches for the provided database, by
// walking each shard in the database and merging the sketches found there.
func (s *Store) sketchesForDatabase(dbName string, getSketches func(*Shard) (estimator.Sketch, estimator.Sketch, error)) (estimator.Sketch, estimator.Sketch, error) {
var (
ss estimator.Sketch // Sketch estimating number of items.
ts estimator.Sketch // Sketch estimating number of tombstoned items.
@ -863,27 +865,26 @@ func (s *Store) estimateCardinality(dbName string, getSketches func(*Shard) (est
for _, shard := range shards {
s, t, err := getSketches(shard)
if err != nil {
return 0, err
return nil, nil, err
}
if ss == nil {
ss, ts = s, t
} else if err = ss.Merge(s); err != nil {
return 0, err
return nil, nil, err
} else if err = ts.Merge(t); err != nil {
return 0, err
return nil, nil, err
}
}
if ss != nil {
return int64(ss.Count() - ts.Count()), nil
}
return 0, nil
return ss, ts, nil
}
// SeriesCardinality returns the series cardinality for the provided database.
// SeriesCardinality returns the exact series cardinality for the provided
// database.
//
// Cardinality is calculated exactly by unioning all shards' bitsets of series
// IDs. The result of this method cannot be combined with any other results.
//
// Cardinality is calculated exactly by unioning all shards' bitsets of series IDs.
func (s *Store) SeriesCardinality(database string) (int64, error) {
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
@ -911,12 +912,46 @@ func (s *Store) SeriesCardinality(database string) (int64, error) {
return int64(ss.Cardinality()), nil
}
// MeasurementsCardinality returns the measurement cardinality for the provided
// database.
// SeriesSketches returns the sketches associated with the series data in all
// the shards in the provided database.
//
// Cardinality is calculated using a sketch-based estimation.
// The returned sketches can be combined with other sketches to provide an
// estimation across distributed databases.
func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Sketch, error) {
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
return sh.SeriesSketches()
})
}
// MeasurementsCardinality returns an estimation of the measurement cardinality
// for the provided database.
//
// Cardinality is calculated using a sketch-based estimation. The result of this
// method cannot be combined with any other results.
func (s *Store) MeasurementsCardinality(database string) (int64, error) {
return s.estimateCardinality(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
ss, ts, err := s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
return sh.MeasurementsSketches()
})
if err != nil {
return 0, err
}
return int64(ss.Count() - ts.Count()), nil
}
// MeasurementsSketches returns the sketches associated with the measurement
// data in all the shards in the provided database.
//
// The returned sketches can be combined with other sketches to provide an
// estimation across distributed databases.
func (s *Store) MeasurementsSketches(database string) (estimator.Sketch, estimator.Sketch, error) {
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}

View File

@ -16,6 +16,8 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/davecgh/go-spew/spew"
"github.com/influxdata/influxdb/internal"
"github.com/influxdata/influxdb/logger"
@ -941,6 +943,138 @@ func TestStore_Cardinality_Compactions(t *testing.T) {
}
}
func TestStore_Sketches(t *testing.T) {
t.Parallel()
checkCardinalities := func(store *tsdb.Store, series, tseries, measurements, tmeasurements int) error {
// Get sketches and check cardinality...
sketch, tsketch, err := store.SeriesSketches("db")
if err != nil {
return err
}
// delta calculates a rough 10% delta. If i is small then a minimum value
// of 2 is used.
delta := func(i int) int {
v := i / 10
if v == 0 {
v = 2
}
return v
}
// series cardinality should be well within 10%.
if got, exp := int(sketch.Count()), series; got-exp < -delta(series) || got-exp > delta(series) {
return fmt.Errorf("got series cardinality %d, expected ~%d", got, exp)
}
// check series tombstones
if got, exp := int(tsketch.Count()), tseries; got-exp < -delta(tseries) || got-exp > delta(tseries) {
return fmt.Errorf("got series tombstone cardinality %d, expected ~%d", got, exp)
}
// Check measurement cardinality.
if sketch, tsketch, err = store.MeasurementsSketches("db"); err != nil {
return err
}
if got, exp := int(sketch.Count()), measurements; got-exp < -delta(measurements) || got-exp > delta(measurements) {
return fmt.Errorf("got measurement cardinality %d, expected ~%d", got, exp)
}
if got, exp := int(tsketch.Count()), tmeasurements; got-exp < -delta(tmeasurements) || got-exp > delta(tmeasurements) {
return fmt.Errorf("got measurement tombstone cardinality %d, expected ~%d", got, exp)
}
return nil
}
test := func(index string) error {
store := MustOpenStore(index)
defer store.Close()
// Generate point data to write to the shards.
series := genTestSeries(10, 2, 4) // 160 series
points := make([]models.Point, 0, len(series))
for _, s := range series {
points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
}
// Create requested number of shards in the store & write points across
// shards such that we never write the same series to multiple shards.
for shardID := 0; shardID < 4; shardID++ {
if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
return fmt.Errorf("create shard: %s", err)
}
if err := store.BatchWrite(shardID, points[shardID*40:(shardID+1)*40]); err != nil {
return fmt.Errorf("batch write: %s", err)
}
}
// Check cardinalities
if err := checkCardinalities(store.Store, 160, 0, 10, 0); err != nil {
return fmt.Errorf("[initial] %v", err)
}
// Reopen the store.
if err := store.Reopen(); err != nil {
return err
}
// Check cardinalities
if err := checkCardinalities(store.Store, 160, 0, 10, 0); err != nil {
return fmt.Errorf("[initial|re-open] %v", err)
}
// Delete half the the measurements data
mnames, err := store.MeasurementNames(nil, "db", nil)
if err != nil {
return err
}
for _, name := range mnames[:len(mnames)/2] {
if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil); err != nil {
return err
}
}
// Check cardinalities - tombstones should be in
if err := checkCardinalities(store.Store, 160, 80, 10, 5); err != nil {
return fmt.Errorf("[initial|re-open|delete] %v", err)
}
// Reopen the store.
if err := store.Reopen(); err != nil {
return err
}
// Check cardinalities. In this case, the indexes behave differently.
//
// - The inmem index will report that there are 80 series and no tombstones.
// - The tsi1 index will report that there are 160 series and 80 tombstones.
//
// The result is the same, but the implementation differs.
expS, expTS, expM, expTM := 160, 80, 10, 5
if index == inmem.IndexName {
expS, expTS, expM, expTM = 80, 0, 5, 0
}
if err := checkCardinalities(store.Store, expS, expTS, expM, expTM); err != nil {
return fmt.Errorf("[initial|re-open|delete|re-open] %v", err)
}
return nil
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
t.Fatal(err)
}
})
}
}
func TestStore_TagValues(t *testing.T) {
t.Parallel()