From 544329380f166d2b3b2665a71d8783c65ad3edeb Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 1 Feb 2018 16:20:52 +0000 Subject: [PATCH] Add empty series sketches back to tsi1 index This commit adds initial empty sketches back to the tsi1 index, as well as ensuring that ephemeral sketches in the index `LogFile` are updated accordingly. The commit also adds a test that verifies that the merged sketches at the store level produce the correct results under writes, deletions and re-opening of the store. This commit does not provide working sketches for post-compaction on the tsi1 index. --- pkg/estimator/hll/hll.go | 4 + tsdb/engine.go | 1 + tsdb/engine/tsm1/engine.go | 10 +++ tsdb/index.go | 1 + tsdb/index/tsi1/file_set.go | 14 ++++ tsdb/index/tsi1/index.go | 25 ++++++- tsdb/index/tsi1/index_file.go | 16 +++- tsdb/index/tsi1/log_file.go | 24 ++++-- tsdb/index/tsi1/partition.go | 13 +++- tsdb/shard.go | 9 +++ tsdb/store.go | 65 +++++++++++++---- tsdb/store_test.go | 134 ++++++++++++++++++++++++++++++++++ 12 files changed, 290 insertions(+), 26 deletions(-) diff --git a/pkg/estimator/hll/hll.go b/pkg/estimator/hll/hll.go index 35c8ec5d46..8aa823c595 100644 --- a/pkg/estimator/hll/hll.go +++ b/pkg/estimator/hll/hll.go @@ -163,6 +163,10 @@ func (h *Plus) Add(v []byte) { // Count returns a cardinality estimate. func (h *Plus) Count() uint64 { + if h == nil { + return 0 // Nothing to do. + } + if h.sparse { h.mergeSparse() return uint64(h.linearCount(h.mp, h.mp-uint32(h.sparseList.count))) diff --git a/tsdb/engine.go b/tsdb/engine.go index 9fcbffb968..3c8d05d38e 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -57,6 +57,7 @@ type Engine interface { DeleteSeriesRange(itr SeriesIterator, min, max int64) error MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) + SeriesSketches() (estimator.Sketch, estimator.Sketch, error) SeriesN() int64 MeasurementExists(name []byte) (bool, error) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 45f99c0a1e..bfd4db3b03 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -532,10 +532,20 @@ func (e *Engine) SeriesN() int64 { return e.index.SeriesN() } +// MeasurementsSketches returns sketches that describe the cardinality of the +// measurements in this shard and measurements that were in this shard, but have +// been tombstoned. func (e *Engine) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { return e.index.MeasurementsSketches() } +// SeriesSketches returns sketches that describe the cardinality of the +// series in this shard and series that were in this shard, but have +// been tombstoned. +func (e *Engine) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { + return e.index.SeriesSketches() +} + // LastModified returns the time when this shard was last modified. func (e *Engine) LastModified() time.Time { walTime := e.WAL.LastWriteTime() diff --git a/tsdb/index.go b/tsdb/index.go index b2b9e18008..d6c0c46387 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -36,6 +36,7 @@ type Index interface { MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) SeriesN() int64 + SeriesSketches() (estimator.Sketch, estimator.Sketch, error) HasTagKey(name, key []byte) (bool, error) HasTagValue(name, key, value []byte) (bool, error) diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index 4797b95fa8..3470987ddb 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -408,6 +408,19 @@ func (fs *FileSet) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, e return sketch, tsketch, nil } +// SeriesSketches returns the merged measurement sketches for the FileSet. +func (fs *FileSet) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { + sketch, tsketch := hll.NewDefaultPlus(), hll.NewDefaultPlus() + + // Iterate over all the files and merge the sketches into the result. + for _, f := range fs.files { + if err := f.MergeSeriesSketches(sketch, tsketch); err != nil { + return nil, nil, err + } + } + return sketch, tsketch, nil +} + // File represents a log or index file. type File interface { Close() error @@ -433,6 +446,7 @@ type File interface { // Sketches for cardinality estimation MergeMeasurementsSketches(s, t estimator.Sketch) error + MergeSeriesSketches(s, t estimator.Sketch) error // Bitmap series existance. SeriesIDSet() (*tsdb.SeriesIDSet, error) diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 58205b8b8d..a57f796515 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -594,7 +594,7 @@ func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) error { } // MeasurementsSketches returns the two sketches for the index by merging all -// instances of the type sketch types in all the index files. +// instances of the type sketch types in all the partitions. func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { s, ts := hll.NewDefaultPlus(), hll.NewDefaultPlus() for _, p := range i.partitions { @@ -614,8 +614,27 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro return s, ts, nil } -// SeriesN returns the number of unique non-tombstoned series in this index. -// +// SeriesSketches returns the two sketches for the index by merging all +// instances of the type sketch types in all the partitions. +func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { + s, ts := hll.NewDefaultPlus(), hll.NewDefaultPlus() + for _, p := range i.partitions { + // Get partition's measurement sketches and merge. + ps, pts, err := p.SeriesSketches() + if err != nil { + return nil, nil, err + } + + if err := s.Merge(ps); err != nil { + return nil, nil, err + } else if err := ts.Merge(pts); err != nil { + return nil, nil, err + } + } + + return s, ts, nil +} + // Since indexes are not shared across shards, the count returned by SeriesN // cannot be combined with other shard's results. If you need to count series // across indexes then use either the database-wide series file, or merge the diff --git a/tsdb/index/tsi1/index_file.go b/tsdb/index/tsi1/index_file.go index 6b215efd77..e2e9c02f8e 100644 --- a/tsdb/index/tsi1/index_file.go +++ b/tsdb/index/tsi1/index_file.go @@ -52,6 +52,9 @@ type IndexFile struct { seriesIDSetData []byte tombstoneSeriesIDSetData []byte + // Series sketches + sketch, tSketch estimator.Sketch + // Sortable identifier & filepath to the log file. level int id int @@ -340,8 +343,8 @@ func (f *IndexFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterat return f.mblk.SeriesIDIterator(name) } -// MergeMeasurementsSketches merges the index file's series sketches into the provided -// sketches. +// MergeMeasurementsSketches merges the index file's measurements sketches into +// the provided sketches. func (f *IndexFile) MergeMeasurementsSketches(s, t estimator.Sketch) error { if err := s.Merge(f.mblk.sketch); err != nil { return err @@ -349,6 +352,15 @@ func (f *IndexFile) MergeMeasurementsSketches(s, t estimator.Sketch) error { return t.Merge(f.mblk.tSketch) } +// MergeSeriesSketches merges the index file's series sketches into the provided +// sketches. +func (f *IndexFile) MergeSeriesSketches(s, t estimator.Sketch) error { + if err := s.Merge(f.sketch); err != nil { + return err + } + return t.Merge(f.tSketch) +} + // ReadIndexFileTrailer returns the index file trailer from data. func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) { var t IndexFileTrailer diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 245acccb3e..dc127e7d2b 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -162,7 +162,6 @@ func (f *LogFile) Close() error { } f.mms = make(logMeasurements) - return nil } @@ -658,15 +657,13 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) { mm.tagSet[string(k)] = ts } - // TODO(edd) increment series count.... - f.sSketch.Add(seriesKey) // Add series to sketch. - f.mSketch.Add(name) // Add measurement to sketch as this may be the fist series for the measurement. - // Add/remove from appropriate series id sets. if !deleted { + f.sSketch.Add(seriesKey) // Add series to sketch - key in series file format. f.seriesIDSet.Add(e.SeriesID) f.tombstoneSeriesIDSet.Remove(e.SeriesID) } else { + f.sTSketch.Add(seriesKey) // Add series to tombstone sketch - key in series file format. f.seriesIDSet.Remove(e.SeriesID) f.tombstoneSeriesIDSet.Add(e.SeriesID) } @@ -732,6 +729,9 @@ func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement { series: make(map[uint64]struct{}), } f.mms[string(name)] = mm + + // Add measurement to sketch. + f.mSketch.Add(name) } return mm } @@ -968,6 +968,20 @@ func (f *LogFile) MergeMeasurementsSketches(sketch, tsketch estimator.Sketch) er return tsketch.Merge(f.mTSketch) } +// MergeSeriesSketches merges the series sketches belonging to this +// LogFile into the provided sketches. +// +// MergeSeriesSketches is safe for concurrent use by multiple goroutines. +func (f *LogFile) MergeSeriesSketches(sketch, tsketch estimator.Sketch) error { + f.mu.RLock() + defer f.mu.RUnlock() + + if err := sketch.Merge(f.sSketch); err != nil { + return err + } + return tsketch.Merge(f.sTSketch) +} + // LogEntry represents a single log entry in the write-ahead log. type LogEntry struct { Flag byte // flag diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 28a79e6e94..d0cb63df98 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -622,7 +622,7 @@ func (i *Partition) DropSeries(seriesID uint64) error { return i.CheckLogFile() } -// MeasurementsSketches returns the two sketches for the index by merging all +// MeasurementsSketches returns the two sketches for the partition by merging all // instances of the type sketch types in all the index files. func (i *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { fs, err := i.RetainFileSet() @@ -633,6 +633,17 @@ func (i *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, return fs.MeasurementsSketches() } +// SeriesSketches returns the two sketches for the partition by merging all +// instances of the type sketch types in all the index files. +func (i *Partition) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { + fs, err := i.RetainFileSet() + if err != nil { + return nil, nil, err + } + defer fs.Release() + return fs.SeriesSketches() +} + // HasTagKey returns true if tag key exists. func (i *Partition) HasTagKey(name, key []byte) (bool, error) { fs, err := i.RetainFileSet() diff --git a/tsdb/shard.go b/tsdb/shard.go index a55d053b20..954cb19cd0 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -705,6 +705,15 @@ func (s *Shard) SeriesN() int64 { return engine.SeriesN() } +// SeriesSketches returns the measurement sketches for the shard. +func (s *Shard) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { + engine, err := s.engine() + if err != nil { + return nil, nil, err + } + return engine.SeriesSketches() +} + // MeasurementsSketches returns the measurement sketches for the shard. func (s *Shard) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { engine, err := s.engine() diff --git a/tsdb/store.go b/tsdb/store.go index 88194d3835..e8ab5db650 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -849,7 +849,9 @@ func (s *Store) DiskSize() (int64, error) { return size, nil } -func (s *Store) estimateCardinality(dbName string, getSketches func(*Shard) (estimator.Sketch, estimator.Sketch, error)) (int64, error) { +// sketchesForDatabase returns merged sketches for the provided database, by +// walking each shard in the database and merging the sketches found there. +func (s *Store) sketchesForDatabase(dbName string, getSketches func(*Shard) (estimator.Sketch, estimator.Sketch, error)) (estimator.Sketch, estimator.Sketch, error) { var ( ss estimator.Sketch // Sketch estimating number of items. ts estimator.Sketch // Sketch estimating number of tombstoned items. @@ -863,27 +865,26 @@ func (s *Store) estimateCardinality(dbName string, getSketches func(*Shard) (est for _, shard := range shards { s, t, err := getSketches(shard) if err != nil { - return 0, err + return nil, nil, err } if ss == nil { ss, ts = s, t } else if err = ss.Merge(s); err != nil { - return 0, err + return nil, nil, err } else if err = ts.Merge(t); err != nil { - return 0, err + return nil, nil, err } } - - if ss != nil { - return int64(ss.Count() - ts.Count()), nil - } - return 0, nil + return ss, ts, nil } -// SeriesCardinality returns the series cardinality for the provided database. +// SeriesCardinality returns the exact series cardinality for the provided +// database. +// +// Cardinality is calculated exactly by unioning all shards' bitsets of series +// IDs. The result of this method cannot be combined with any other results. // -// Cardinality is calculated exactly by unioning all shards' bitsets of series IDs. func (s *Store) SeriesCardinality(database string) (int64, error) { s.mu.RLock() shards := s.filterShards(byDatabase(database)) @@ -911,12 +912,46 @@ func (s *Store) SeriesCardinality(database string) (int64, error) { return int64(ss.Cardinality()), nil } -// MeasurementsCardinality returns the measurement cardinality for the provided -// database. +// SeriesSketches returns the sketches associated with the series data in all +// the shards in the provided database. // -// Cardinality is calculated using a sketch-based estimation. +// The returned sketches can be combined with other sketches to provide an +// estimation across distributed databases. +func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Sketch, error) { + return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) { + if sh == nil { + return nil, nil, errors.New("shard nil, can't get cardinality") + } + return sh.SeriesSketches() + }) +} + +// MeasurementsCardinality returns an estimation of the measurement cardinality +// for the provided database. +// +// Cardinality is calculated using a sketch-based estimation. The result of this +// method cannot be combined with any other results. func (s *Store) MeasurementsCardinality(database string) (int64, error) { - return s.estimateCardinality(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) { + ss, ts, err := s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) { + if sh == nil { + return nil, nil, errors.New("shard nil, can't get cardinality") + } + return sh.MeasurementsSketches() + }) + + if err != nil { + return 0, err + } + return int64(ss.Count() - ts.Count()), nil +} + +// MeasurementsSketches returns the sketches associated with the measurement +// data in all the shards in the provided database. +// +// The returned sketches can be combined with other sketches to provide an +// estimation across distributed databases. +func (s *Store) MeasurementsSketches(database string) (estimator.Sketch, estimator.Sketch, error) { + return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) { if sh == nil { return nil, nil, errors.New("shard nil, can't get cardinality") } diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 48c96d9ea5..6255e00326 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -16,6 +16,8 @@ import ( "testing" "time" + "github.com/influxdata/influxdb/tsdb/index/inmem" + "github.com/davecgh/go-spew/spew" "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/logger" @@ -941,6 +943,138 @@ func TestStore_Cardinality_Compactions(t *testing.T) { } } +func TestStore_Sketches(t *testing.T) { + t.Parallel() + + checkCardinalities := func(store *tsdb.Store, series, tseries, measurements, tmeasurements int) error { + // Get sketches and check cardinality... + sketch, tsketch, err := store.SeriesSketches("db") + if err != nil { + return err + } + + // delta calculates a rough 10% delta. If i is small then a minimum value + // of 2 is used. + delta := func(i int) int { + v := i / 10 + if v == 0 { + v = 2 + } + return v + } + + // series cardinality should be well within 10%. + if got, exp := int(sketch.Count()), series; got-exp < -delta(series) || got-exp > delta(series) { + return fmt.Errorf("got series cardinality %d, expected ~%d", got, exp) + } + + // check series tombstones + if got, exp := int(tsketch.Count()), tseries; got-exp < -delta(tseries) || got-exp > delta(tseries) { + return fmt.Errorf("got series tombstone cardinality %d, expected ~%d", got, exp) + } + + // Check measurement cardinality. + if sketch, tsketch, err = store.MeasurementsSketches("db"); err != nil { + return err + } + + if got, exp := int(sketch.Count()), measurements; got-exp < -delta(measurements) || got-exp > delta(measurements) { + return fmt.Errorf("got measurement cardinality %d, expected ~%d", got, exp) + } + + if got, exp := int(tsketch.Count()), tmeasurements; got-exp < -delta(tmeasurements) || got-exp > delta(tmeasurements) { + return fmt.Errorf("got measurement tombstone cardinality %d, expected ~%d", got, exp) + } + return nil + } + + test := func(index string) error { + store := MustOpenStore(index) + defer store.Close() + + // Generate point data to write to the shards. + series := genTestSeries(10, 2, 4) // 160 series + + points := make([]models.Point, 0, len(series)) + for _, s := range series { + points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) + } + + // Create requested number of shards in the store & write points across + // shards such that we never write the same series to multiple shards. + for shardID := 0; shardID < 4; shardID++ { + if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { + return fmt.Errorf("create shard: %s", err) + } + + if err := store.BatchWrite(shardID, points[shardID*40:(shardID+1)*40]); err != nil { + return fmt.Errorf("batch write: %s", err) + } + } + + // Check cardinalities + if err := checkCardinalities(store.Store, 160, 0, 10, 0); err != nil { + return fmt.Errorf("[initial] %v", err) + } + + // Reopen the store. + if err := store.Reopen(); err != nil { + return err + } + + // Check cardinalities + if err := checkCardinalities(store.Store, 160, 0, 10, 0); err != nil { + return fmt.Errorf("[initial|re-open] %v", err) + } + + // Delete half the the measurements data + mnames, err := store.MeasurementNames(nil, "db", nil) + if err != nil { + return err + } + + for _, name := range mnames[:len(mnames)/2] { + if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil); err != nil { + return err + } + } + + // Check cardinalities - tombstones should be in + if err := checkCardinalities(store.Store, 160, 80, 10, 5); err != nil { + return fmt.Errorf("[initial|re-open|delete] %v", err) + } + + // Reopen the store. + if err := store.Reopen(); err != nil { + return err + } + + // Check cardinalities. In this case, the indexes behave differently. + // + // - The inmem index will report that there are 80 series and no tombstones. + // - The tsi1 index will report that there are 160 series and 80 tombstones. + // + // The result is the same, but the implementation differs. + expS, expTS, expM, expTM := 160, 80, 10, 5 + if index == inmem.IndexName { + expS, expTS, expM, expTM = 80, 0, 5, 0 + } + + if err := checkCardinalities(store.Store, expS, expTS, expM, expTM); err != nil { + return fmt.Errorf("[initial|re-open|delete|re-open] %v", err) + } + return nil + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { + if err := test(index); err != nil { + t.Fatal(err) + } + }) + } +} + func TestStore_TagValues(t *testing.T) { t.Parallel()