diff --git a/tsdb/index/tsi1/doc.go b/tsdb/index/tsi1/doc.go index 094bd7dfac..cb35edbfa8 100644 --- a/tsdb/index/tsi1/doc.go +++ b/tsdb/index/tsi1/doc.go @@ -57,6 +57,14 @@ Series: ║ │ Series Data Offset │ ║ ║ ├─────────────────────────────┤ ║ ║ │ Series Data Pos │ ║ + ║ ├─────────────────────────────┤ ║ + ║ │ Sketch Offset │ ║ + ║ ├─────────────────────────────┤ ║ + ║ │ Sketch Size │ ║ + ║ ├─────────────────────────────┤ ║ + ║ │ Tomb Sketch Offset │ ║ + ║ ├─────────────────────────────┤ ║ + ║ │ Tomb Sketch Size │ ║ ║ └─────────────────────────────┘ ║ ╚═════════════════════════════════╝ diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 7d028b208a..b74241d058 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -618,30 +618,37 @@ func (i *Index) SeriesN() (n uint64, err error) { return i.logFiles[0].SeriesN(), nil } -func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { - //FIXME(edd) - return nil, nil, fmt.Errorf("SeriesSketches not implemented") +func (i *Index) sketches(nextSketches func(*IndexFile) (estimator.Sketch, estimator.Sketch)) (estimator.Sketch, estimator.Sketch, error) { + sketch, tsketch := hll.NewDefaultPlus(), hll.NewDefaultPlus() + // Iterate over all the index files and merge all the sketches. + for _, f := range i.indexFiles { + s, t := nextSketches(f) + if err := sketch.Merge(s); err != nil { + return nil, nil, err + } + + if err := tsketch.Merge(t); err != nil { + return nil, nil, err + } + } + return sketch, tsketch, nil +} + +// SeriesSketches returns the two sketches for the index by merging all +// instances of the type sketch types in all the indexes files. +func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { + return i.sketches(func(i *IndexFile) (estimator.Sketch, estimator.Sketch) { + return i.sblk.sketch, i.sblk.tsketch + }) } // MeasurementsSketches returns the two sketches for the index by merging all // instances of the type sketch types in all the indexes files. func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { - var ( - sketch estimator.Sketch = hll.NewDefaultPlus() - tsketch estimator.Sketch = hll.NewDefaultPlus() - ) - - // Iterate over all the index files and merge all the sketches. - for _, f := range i.indexFiles { - if err := sketch.Merge(f.mblk.sketch); err != nil { - return nil, nil, err - } - if err := tsketch.Merge(f.mblk.tsketch); err != nil { - return nil, nil, err - } - } - return sketch, tsketch, nil + return i.sketches(func(i *IndexFile) (estimator.Sketch, estimator.Sketch) { + return i.mblk.sketch, i.mblk.tsketch + }) } // Dereference is a nop. diff --git a/tsdb/index/tsi1/index_files.go b/tsdb/index/tsi1/index_files.go index 415eb0c08d..1b8076fcc2 100644 --- a/tsdb/index/tsi1/index_files.go +++ b/tsdb/index/tsi1/index_files.go @@ -131,13 +131,30 @@ func (p *IndexFiles) WriteTo(w io.Writer) (n int64, err error) { func (p *IndexFiles) writeSeriesBlockTo(w io.Writer, info *indexCompactInfo, n *int64) error { itr := p.SeriesIterator() + sw := NewSeriesBlockWriter() + + // As the index files are merged together, it's possible that series were + // added, removed and then added again over time. Since sketches cannot have + // values removed from them, the series would be in both the resulting + // series and tombstoned series sketches. So that a series only appears in + // one of the sketches, we rebuild some fresh sketches during the + // compaction. + // + // We update these sketches below as we iterate through the series in these + // index files. + sw.sketch, sw.tsketch = hll.NewDefaultPlus(), hll.NewDefaultPlus() // Write all series. - sw := NewSeriesBlockWriter() for e := itr.Next(); e != nil; e = itr.Next() { if err := sw.Add(e.Name(), e.Tags()); err != nil { return err } + + if e.Deleted() { + sw.tsketch.Add(e.SeriesKey()) + } else { + sw.sketch.Add(e.SeriesKey()) + } } // Flush series list. diff --git a/tsdb/index/tsi1/series_block.go b/tsdb/index/tsi1/series_block.go index 107362a31f..2f3efb1964 100644 --- a/tsdb/index/tsi1/series_block.go +++ b/tsdb/index/tsi1/series_block.go @@ -11,6 +11,8 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/pkg/estimator/hll" "github.com/influxdata/influxdb/pkg/rhh" ) @@ -30,6 +32,8 @@ const ( 8 + 8 + // series data offset/size 8 + 8 + // term index offset/size 8 + 8 + // series index offset/size + 8 + 8 + // series sketch offset/size + 8 + 8 + // tombstone series sketch offset/size 0 // Other field sizes @@ -58,6 +62,10 @@ type SeriesBlock struct { seriesData []byte seriesIndex []byte seriesIndexN uint32 + + // Series block sketch and tombstone sketch for cardinality + // estimation. + sketch, tsketch estimator.Sketch } // Series returns a series element. @@ -318,6 +326,18 @@ func (blk *SeriesBlock) UnmarshalBinary(data []byte) error { blk.seriesIndexN = binary.BigEndian.Uint32(blk.seriesIndex[:4]) blk.seriesIndex = blk.seriesIndex[4:] + // Initialise sketches. We're currently using HLL+. + var s, ts *hll.Plus + if err := s.UnmarshalBinary(data[t.Sketch.Offset:][:t.Sketch.Size]); err != nil { + return err + } + blk.sketch = s + + if err := ts.UnmarshalBinary(data[t.TSketch.Offset:][:t.TSketch.Size]); err != nil { + return err + } + blk.tsketch = ts + return nil } @@ -399,6 +419,10 @@ type SeriesBlockWriter struct { // Term list is available after writer has been written. termList *TermList + + // Series sketch and tombstoned series sketch. These must be + // set before calling WriteTo. + sketch, tsketch estimator.Sketch } // NewSeriesBlockWriter returns a new instance of SeriesBlockWriter. @@ -446,6 +470,13 @@ func (sw *SeriesBlockWriter) append(name []byte, tags models.Tags, deleted bool) func (sw *SeriesBlockWriter) WriteTo(w io.Writer) (n int64, err error) { var t SeriesBlockTrailer + // The sketches must be set before calling WriteTo. + if sw.sketch == nil { + return 0, errors.New("series sketch not set") + } else if sw.tsketch == nil { + return 0, errors.New("series tombstone sketch not set") + } + terms := NewTermList(sw.terms) // Write term dictionary. @@ -487,6 +518,19 @@ func (sw *SeriesBlockWriter) WriteTo(w io.Writer) (n int64, err error) { return n, err } + // Write the sketches out. + t.Sketch.Offset = n + if err := writeSketchTo(w, sw.sketch, &n); err != nil { + return n, err + } + t.Sketch.Size = n - t.Sketch.Offset + + t.TSketch.Offset = n + if err := writeSketchTo(w, sw.tsketch, &n); err != nil { + return n, err + } + t.TSketch.Size = n - t.TSketch.Offset + // Save term list for future encoding. sw.termList = terms @@ -633,7 +677,6 @@ func (sw *SeriesBlockWriter) writeSeriesIndexTo(w io.Writer, terms *TermList, n return err } } - return nil } @@ -683,6 +726,14 @@ func ReadSeriesBlockTrailer(data []byte) SeriesBlockTrailer { t.Series.Index.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] t.Series.Index.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + // Read series sketch info. + t.Sketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + t.Sketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + + // Read tombstone series sketch info. + t.TSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + t.TSketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + return t } @@ -709,6 +760,18 @@ type SeriesBlockTrailer struct { Size int64 } } + + // Offset and size of cardinality sketch for measurements. + Sketch struct { + Offset int64 + Size int64 + } + + // Offset and size of cardinality sketch for tombstoned measurements. + TSketch struct { + Offset int64 + Size int64 + } } func (t SeriesBlockTrailer) WriteTo(w io.Writer) (n int64, err error) { @@ -736,6 +799,19 @@ func (t SeriesBlockTrailer) WriteTo(w io.Writer) (n int64, err error) { return n, err } + // Write measurement sketch info.s + if err := writeUint64To(w, uint64(t.Sketch.Offset), &n); err != nil { + return n, err + } else if err := writeUint64To(w, uint64(t.Sketch.Size), &n); err != nil { + return n, err + } + + // Write tombstone measurement sketch info. + if err := writeUint64To(w, uint64(t.TSketch.Offset), &n); err != nil { + return n, err + } else if err := writeUint64To(w, uint64(t.TSketch.Size), &n); err != nil { + return n, err + } return n, nil } diff --git a/tsdb/index/tsi1/tsi1.go b/tsdb/index/tsi1/tsi1.go index b0636836f7..f2e9a8aa66 100644 --- a/tsdb/index/tsi1/tsi1.go +++ b/tsdb/index/tsi1/tsi1.go @@ -378,6 +378,7 @@ type seriesElem struct { deleted bool } +func (e *seriesElem) SeriesKey() []byte { return models.MakeKey(e.name, e.tags) } func (e *seriesElem) Name() []byte { return e.name } func (e *seriesElem) Tags() models.Tags { return e.tags } func (e *seriesElem) Deleted() bool { return e.deleted }