Add series sketches

pull/7913/head
Edd Robinson 2016-11-28 13:21:39 +00:00 committed by Ben Johnson
parent 695adafc00
commit 190c78c644
No known key found for this signature in database
GPG Key ID: 81741CD251883081
5 changed files with 129 additions and 20 deletions

View File

@ -57,6 +57,14 @@ Series:
Series Data Offset <uint64>
Series Data Pos <uint64>
Sketch Offset <uint64>
Sketch Size <uint64>
Tomb Sketch Offset <uint64>
Tomb Sketch Size <uint64>

View File

@ -618,30 +618,37 @@ func (i *Index) SeriesN() (n uint64, err error) {
return i.logFiles[0].SeriesN(), nil
}
func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
//FIXME(edd)
return nil, nil, fmt.Errorf("SeriesSketches not implemented")
func (i *Index) sketches(nextSketches func(*IndexFile) (estimator.Sketch, estimator.Sketch)) (estimator.Sketch, estimator.Sketch, error) {
sketch, tsketch := hll.NewDefaultPlus(), hll.NewDefaultPlus()
// Iterate over all the index files and merge all the sketches.
for _, f := range i.indexFiles {
s, t := nextSketches(f)
if err := sketch.Merge(s); err != nil {
return nil, nil, err
}
if err := tsketch.Merge(t); err != nil {
return nil, nil, err
}
}
return sketch, tsketch, nil
}
// SeriesSketches returns the two sketches for the index by merging all
// instances of the type sketch types in all the indexes files.
func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
return i.sketches(func(i *IndexFile) (estimator.Sketch, estimator.Sketch) {
return i.sblk.sketch, i.sblk.tsketch
})
}
// MeasurementsSketches returns the two sketches for the index by merging all
// instances of the type sketch types in all the indexes files.
func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
var (
sketch estimator.Sketch = hll.NewDefaultPlus()
tsketch estimator.Sketch = hll.NewDefaultPlus()
)
// Iterate over all the index files and merge all the sketches.
for _, f := range i.indexFiles {
if err := sketch.Merge(f.mblk.sketch); err != nil {
return nil, nil, err
}
if err := tsketch.Merge(f.mblk.tsketch); err != nil {
return nil, nil, err
}
}
return sketch, tsketch, nil
return i.sketches(func(i *IndexFile) (estimator.Sketch, estimator.Sketch) {
return i.mblk.sketch, i.mblk.tsketch
})
}
// Dereference is a nop.

View File

@ -131,13 +131,30 @@ func (p *IndexFiles) WriteTo(w io.Writer) (n int64, err error) {
func (p *IndexFiles) writeSeriesBlockTo(w io.Writer, info *indexCompactInfo, n *int64) error {
itr := p.SeriesIterator()
sw := NewSeriesBlockWriter()
// As the index files are merged together, it's possible that series were
// added, removed and then added again over time. Since sketches cannot have
// values removed from them, the series would be in both the resulting
// series and tombstoned series sketches. So that a series only appears in
// one of the sketches, we rebuild some fresh sketches during the
// compaction.
//
// We update these sketches below as we iterate through the series in these
// index files.
sw.sketch, sw.tsketch = hll.NewDefaultPlus(), hll.NewDefaultPlus()
// Write all series.
sw := NewSeriesBlockWriter()
for e := itr.Next(); e != nil; e = itr.Next() {
if err := sw.Add(e.Name(), e.Tags()); err != nil {
return err
}
if e.Deleted() {
sw.tsketch.Add(e.SeriesKey())
} else {
sw.sketch.Add(e.SeriesKey())
}
}
// Flush series list.

View File

@ -11,6 +11,8 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/estimator/hll"
"github.com/influxdata/influxdb/pkg/rhh"
)
@ -30,6 +32,8 @@ const (
8 + 8 + // series data offset/size
8 + 8 + // term index offset/size
8 + 8 + // series index offset/size
8 + 8 + // series sketch offset/size
8 + 8 + // tombstone series sketch offset/size
0
// Other field sizes
@ -58,6 +62,10 @@ type SeriesBlock struct {
seriesData []byte
seriesIndex []byte
seriesIndexN uint32
// Series block sketch and tombstone sketch for cardinality
// estimation.
sketch, tsketch estimator.Sketch
}
// Series returns a series element.
@ -318,6 +326,18 @@ func (blk *SeriesBlock) UnmarshalBinary(data []byte) error {
blk.seriesIndexN = binary.BigEndian.Uint32(blk.seriesIndex[:4])
blk.seriesIndex = blk.seriesIndex[4:]
// Initialise sketches. We're currently using HLL+.
var s, ts *hll.Plus
if err := s.UnmarshalBinary(data[t.Sketch.Offset:][:t.Sketch.Size]); err != nil {
return err
}
blk.sketch = s
if err := ts.UnmarshalBinary(data[t.TSketch.Offset:][:t.TSketch.Size]); err != nil {
return err
}
blk.tsketch = ts
return nil
}
@ -399,6 +419,10 @@ type SeriesBlockWriter struct {
// Term list is available after writer has been written.
termList *TermList
// Series sketch and tombstoned series sketch. These must be
// set before calling WriteTo.
sketch, tsketch estimator.Sketch
}
// NewSeriesBlockWriter returns a new instance of SeriesBlockWriter.
@ -446,6 +470,13 @@ func (sw *SeriesBlockWriter) append(name []byte, tags models.Tags, deleted bool)
func (sw *SeriesBlockWriter) WriteTo(w io.Writer) (n int64, err error) {
var t SeriesBlockTrailer
// The sketches must be set before calling WriteTo.
if sw.sketch == nil {
return 0, errors.New("series sketch not set")
} else if sw.tsketch == nil {
return 0, errors.New("series tombstone sketch not set")
}
terms := NewTermList(sw.terms)
// Write term dictionary.
@ -487,6 +518,19 @@ func (sw *SeriesBlockWriter) WriteTo(w io.Writer) (n int64, err error) {
return n, err
}
// Write the sketches out.
t.Sketch.Offset = n
if err := writeSketchTo(w, sw.sketch, &n); err != nil {
return n, err
}
t.Sketch.Size = n - t.Sketch.Offset
t.TSketch.Offset = n
if err := writeSketchTo(w, sw.tsketch, &n); err != nil {
return n, err
}
t.TSketch.Size = n - t.TSketch.Offset
// Save term list for future encoding.
sw.termList = terms
@ -633,7 +677,6 @@ func (sw *SeriesBlockWriter) writeSeriesIndexTo(w io.Writer, terms *TermList, n
return err
}
}
return nil
}
@ -683,6 +726,14 @@ func ReadSeriesBlockTrailer(data []byte) SeriesBlockTrailer {
t.Series.Index.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
t.Series.Index.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
// Read series sketch info.
t.Sketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
t.Sketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
// Read tombstone series sketch info.
t.TSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
t.TSketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
return t
}
@ -709,6 +760,18 @@ type SeriesBlockTrailer struct {
Size int64
}
}
// Offset and size of cardinality sketch for measurements.
Sketch struct {
Offset int64
Size int64
}
// Offset and size of cardinality sketch for tombstoned measurements.
TSketch struct {
Offset int64
Size int64
}
}
func (t SeriesBlockTrailer) WriteTo(w io.Writer) (n int64, err error) {
@ -736,6 +799,19 @@ func (t SeriesBlockTrailer) WriteTo(w io.Writer) (n int64, err error) {
return n, err
}
// Write measurement sketch info.s
if err := writeUint64To(w, uint64(t.Sketch.Offset), &n); err != nil {
return n, err
} else if err := writeUint64To(w, uint64(t.Sketch.Size), &n); err != nil {
return n, err
}
// Write tombstone measurement sketch info.
if err := writeUint64To(w, uint64(t.TSketch.Offset), &n); err != nil {
return n, err
} else if err := writeUint64To(w, uint64(t.TSketch.Size), &n); err != nil {
return n, err
}
return n, nil
}

View File

@ -378,6 +378,7 @@ type seriesElem struct {
deleted bool
}
func (e *seriesElem) SeriesKey() []byte { return models.MakeKey(e.name, e.tags) }
func (e *seriesElem) Name() []byte { return e.name }
func (e *seriesElem) Tags() models.Tags { return e.tags }
func (e *seriesElem) Deleted() bool { return e.deleted }