From 695adafc0036fc575efe1fc806f9b3f61c5f3ac7 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 17 Nov 2016 16:33:35 +0000 Subject: [PATCH] Add measurement sketches --- pkg/estimator/hll/hll.go | 3 + tsdb/index/inmem/inmem.go | 8 +- tsdb/index/tsi1/doc.go | 20 +++- tsdb/index/tsi1/index.go | 20 +++- tsdb/index/tsi1/index_files.go | 34 +++++++ tsdb/index/tsi1/index_test.go | 2 +- tsdb/index/tsi1/measurement_block.go | 111 +++++++++++++++++++--- tsdb/index/tsi1/measurement_block_test.go | 93 ++++++++++++++++++ tsdb/index/tsi1/series_block.go | 4 - 9 files changed, 270 insertions(+), 25 deletions(-) diff --git a/pkg/estimator/hll/hll.go b/pkg/estimator/hll/hll.go index b94cd359c0..f4926bc0f4 100644 --- a/pkg/estimator/hll/hll.go +++ b/pkg/estimator/hll/hll.go @@ -28,6 +28,9 @@ import ( // Current version of HLL implementation. const version uint8 = 1 +// DefaultPrecision is the default precision. +const DefaultPrecision = 16 + // Plus implements the Hyperloglog++ algorithm, described in the following // paper: http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf // diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 465c361829..a225a99ff3 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -65,10 +65,10 @@ func NewIndex(id uint64, path string, opt tsdb.EngineOptions) *Index { series: make(map[string]*tsdb.Series), } - index.seriesSketch = hll.MustNewPlus(16) - index.seriesTSSketch = hll.MustNewPlus(16) - index.measurementsSketch = hll.MustNewPlus(16) - index.measurementsTSSketch = hll.MustNewPlus(16) + index.seriesSketch = hll.NewDefaultPlus() + index.seriesTSSketch = hll.NewDefaultPlus() + index.measurementsSketch = hll.NewDefaultPlus() + index.measurementsTSSketch = hll.NewDefaultPlus() return index } diff --git a/tsdb/index/tsi1/doc.go b/tsdb/index/tsi1/doc.go index c74fb7505f..094bd7dfac 100644 --- a/tsdb/index/tsi1/doc.go +++ b/tsdb/index/tsi1/doc.go @@ -177,16 +177,34 @@ Measurements ║ ┃ └───────────────────────────────┘ ┃ ║ ║ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ║ ║ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ║ + ║ ┃ Sketch ┃ ║ + ║ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ║ + ║ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ║ + ║ ┃ Tombstone Sketch ┃ ║ + ║ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ║ + ║ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ║ ║ ┃ Trailer ┃ ║ ║ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ║ ╚═══════════════════════════════════════╝ ╔════════════Trailer══════════════╗ ║ ┌─────────────────────────────┐ ║ - ║ │ Hash Index Offset │ ║ + ║ │ Block Offset │ ║ ║ ├─────────────────────────────┤ ║ ║ │ Block Size │ ║ ║ ├─────────────────────────────┤ ║ + ║ │ Hash Index Offset │ ║ + ║ ├─────────────────────────────┤ ║ + ║ │ Hash Index Size │ ║ + ║ ├─────────────────────────────┤ ║ + ║ │ Sketch Offset │ ║ + ║ ├─────────────────────────────┤ ║ + ║ │ Sketch Size │ ║ + ║ ├─────────────────────────────┤ ║ + ║ │ Tomb Sketch Offset │ ║ + ║ ├─────────────────────────────┤ ║ + ║ │ Tomb Sketch Size │ ║ + ║ ├─────────────────────────────┤ ║ ║ │ Block Version │ ║ ║ └─────────────────────────────┘ ║ ╚═════════════════════════════════╝ diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index a67a5235e1..7d028b208a 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -15,6 +15,8 @@ import ( "sync" "time" + "github.com/influxdata/influxdb/pkg/estimator/hll" + "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bytesutil" @@ -622,10 +624,24 @@ func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { } +// MeasurementsSketches returns the two sketches for the index by merging all +// instances of the type sketch types in all the indexes files. func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { - //FIXME(edd) - return nil, nil, fmt.Errorf("MeasurementSketches not implemented") + var ( + sketch estimator.Sketch = hll.NewDefaultPlus() + tsketch estimator.Sketch = hll.NewDefaultPlus() + ) + // Iterate over all the index files and merge all the sketches. + for _, f := range i.indexFiles { + if err := sketch.Merge(f.mblk.sketch); err != nil { + return nil, nil, err + } + if err := tsketch.Merge(f.mblk.tsketch); err != nil { + return nil, nil, err + } + } + return sketch, tsketch, nil } // Dereference is a nop. diff --git a/tsdb/index/tsi1/index_files.go b/tsdb/index/tsi1/index_files.go index 8e66bfc70d..415eb0c08d 100644 --- a/tsdb/index/tsi1/index_files.go +++ b/tsdb/index/tsi1/index_files.go @@ -4,6 +4,8 @@ import ( "fmt" "io" "sort" + + "github.com/influxdata/influxdb/pkg/estimator/hll" ) // IndexFiles represents a layered set of index files. @@ -216,6 +218,22 @@ func (p *IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactI func (p *IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo, n *int64) error { mw := NewMeasurementBlockWriter() + // As the index files are merged together, it's possible that measurements + // were added, removed and then added again over time. Since sketches cannot + // have values removed from them, the measurements would be in both the + // resulting measurements and tombstoned measurements sketches. So that a + // measurements only appears in one of the sketches, we rebuild some fresh + // sketches during the compaction. + mw.sketch, mw.tsketch = hll.NewDefaultPlus(), hll.NewDefaultPlus() + itr := p.MeasurementIterator() + for e := itr.Next(); e != nil; e = itr.Next() { + if e.Deleted() { + mw.tsketch.Add(e.Name()) + } else { + mw.sketch.Add(e.Name()) + } + } + // Add measurement data. for _, name := range info.names { // Look-up series ids. @@ -235,6 +253,22 @@ func (p *IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo mw.Add(name, pos.offset, pos.size, seriesIDs) } + // Generate merged sketches to write out. + sketch, tsketch := hll.NewDefaultPlus(), hll.NewDefaultPlus() + + // merge all the sketches in the index files together. + for _, idx := range *p { + if err := sketch.Merge(idx.mblk.sketch); err != nil { + return err + } + if err := tsketch.Merge(idx.mblk.tsketch); err != nil { + return err + } + } + + // Set the merged sketches on the measurement block writer. + mw.sketch, mw.tsketch = sketch, tsketch + // Write data to writer. nn, err := mw.WriteTo(w) *n += nn diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index 11f1a048eb..49405b08c1 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -282,7 +282,7 @@ func TestIndex_TagKeyIterator(t *testing.T) { } else if e := itr.Next(); string(e.Key()) != `type` { t.Fatalf("unexpected key(%s): %s", state, e.Key()) } else if e := itr.Next(); e != nil { - t.Fatalf("expected nil key(%s): %s/%s", state, e.Key()) + t.Fatalf("expected nil key(%s): %s", state, e.Key()) } }); err != nil { t.Fatal(err) diff --git a/tsdb/index/tsi1/measurement_block.go b/tsdb/index/tsi1/measurement_block.go index bddc1fc0a1..4b40aa8675 100644 --- a/tsdb/index/tsi1/measurement_block.go +++ b/tsdb/index/tsi1/measurement_block.go @@ -7,6 +7,8 @@ import ( "io" "sort" + "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/pkg/estimator/hll" "github.com/influxdata/influxdb/pkg/rhh" ) @@ -27,7 +29,9 @@ const ( MeasurementTrailerSize = 0 + 2 + // version 8 + 8 + // data offset/size - 8 + 8 // hash index offset/size + 8 + 8 + // hash index offset/size + 8 + 8 + // measurement sketch offset/size + 8 + 8 // tombstone measurement sketch offset/size // Measurement key block fields. MeasurementNSize = 4 @@ -36,8 +40,8 @@ const ( // Measurement errors. var ( - ErrUnsupportedMeasurementBlockVersion = errors.New("unsupported meaurement block version") - ErrMeasurementBlockSizeMismatch = errors.New("meaurement block size mismatch") + ErrUnsupportedMeasurementBlockVersion = errors.New("unsupported measurement block version") + ErrMeasurementBlockSizeMismatch = errors.New("measurement block size mismatch") ) // MeasurementBlock represents a collection of all measurements in an index. @@ -45,6 +49,10 @@ type MeasurementBlock struct { data []byte hashData []byte + // Measurement block sketch and tombstone sketch for cardinality + // estimation. + sketch, tsketch estimator.Sketch + version int // block version } @@ -109,6 +117,18 @@ func (blk *MeasurementBlock) UnmarshalBinary(data []byte) error { blk.hashData = data[t.HashIndex.Offset:] blk.hashData = blk.hashData[:t.HashIndex.Size] + // Initialise sketches. We're currently using HLL+. + var s, ts *hll.Plus + if err := s.UnmarshalBinary(data[t.Sketch.Offset:][:t.Sketch.Size]); err != nil { + return err + } + blk.sketch = s + + if err := ts.UnmarshalBinary(data[t.TSketch.Offset:][:t.TSketch.Size]); err != nil { + return err + } + blk.tsketch = ts + return nil } @@ -180,20 +200,32 @@ type MeasurementBlockTrailer struct { Offset int64 Size int64 } + + // Offset and size of cardinality sketch for measurements. + Sketch struct { + Offset int64 + Size int64 + } + + // Offset and size of cardinality sketch for tombstoned measurements. + TSketch struct { + Offset int64 + Size int64 + } } // ReadMeasurementBlockTrailer returns the block trailer from data. func ReadMeasurementBlockTrailer(data []byte) (MeasurementBlockTrailer, error) { var t MeasurementBlockTrailer - // Read version. + // Read version (which is located in the last two bytes of the trailer). t.Version = int(binary.BigEndian.Uint16(data[len(data)-2:])) if t.Version != MeasurementBlockVersion { return t, ErrUnsupportedIndexFileVersion } // Slice trailer data. - buf := data[len(data)-IndexFileTrailerSize:] + buf := data[len(data)-MeasurementTrailerSize:] // Read data section info. t.Data.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] @@ -203,6 +235,14 @@ func ReadMeasurementBlockTrailer(data []byte) (MeasurementBlockTrailer, error) { t.HashIndex.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] t.HashIndex.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + // Read measurment sketch info. + t.Sketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + t.Sketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + + // Read tombstone measurment sketch info. + t.TSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + t.TSketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + return t, nil } @@ -222,7 +262,21 @@ func (t *MeasurementBlockTrailer) WriteTo(w io.Writer) (n int64, err error) { return n, err } - // Write index file encoding version. + // Write measurement sketch info. + if err := writeUint64To(w, uint64(t.Sketch.Offset), &n); err != nil { + return n, err + } else if err := writeUint64To(w, uint64(t.Sketch.Size), &n); err != nil { + return n, err + } + + // Write tombstone measurement sketch info. + if err := writeUint64To(w, uint64(t.TSketch.Offset), &n); err != nil { + return n, err + } else if err := writeUint64To(w, uint64(t.TSketch.Size), &n); err != nil { + return n, err + } + + // Write measurement block version. if err := writeUint16To(w, MeasurementBlockVersion, &n); err != nil { return n, err } @@ -306,6 +360,10 @@ func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error { // MeasurementBlockWriter writes a measurement block. type MeasurementBlockWriter struct { mms map[string]measurement + + // Measurement sketch and tombstoned measurement sketch. These must be + // set before calling WriteTo. + sketch, tsketch estimator.Sketch } // NewMeasurementBlockWriter returns a new MeasurementBlockWriter. @@ -324,17 +382,17 @@ func (mw *MeasurementBlockWriter) Add(name []byte, offset, size int64, seriesIDs mw.mms[string(name)] = mm } -// Delete marks a measurement as tombstoned. -func (mw *MeasurementBlockWriter) Delete(name []byte) { - mm := mw.mms[string(name)] - mm.deleted = true - mw.mms[string(name)] = mm -} - // WriteTo encodes the measurements to w. func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) { var t MeasurementBlockTrailer + // The sketches must be set before calling WriteTo. + if mw.sketch == nil { + return 0, errors.New("measurement sketch not set") + } else if mw.tsketch == nil { + return 0, errors.New("measurement tombstone sketch not set") + } + // Sort names. names := make([]string, 0, len(mw.mms)) for name := range mw.mms { @@ -396,6 +454,19 @@ func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) { } t.HashIndex.Size = n - t.HashIndex.Offset + // Write the sketches out. + t.Sketch.Offset = n + if err := writeSketchTo(w, mw.sketch, &n); err != nil { + return n, err + } + t.Sketch.Size = n - t.Sketch.Offset + + t.TSketch.Offset = n + if err := writeSketchTo(w, mw.tsketch, &n); err != nil { + return n, err + } + t.TSketch.Size = n - t.TSketch.Offset + // Write trailer. nn, err := t.WriteTo(w) n += nn @@ -439,6 +510,20 @@ func (mw *MeasurementBlockWriter) writeMeasurementTo(w io.Writer, name []byte, m return nil } +// writeSketchTo writes an estimator.Sketch into w, updating the number of bytes +// written via n. +func writeSketchTo(w io.Writer, s estimator.Sketch, n *int64) error { + // TODO(edd): implement io.WriterTo on sketches. + data, err := s.MarshalBinary() + if err != nil { + return err + } + + nn, err := w.Write(data) + *n += int64(nn) + return err +} + type measurement struct { deleted bool tagBlock struct { diff --git a/tsdb/index/tsi1/measurement_block_test.go b/tsdb/index/tsi1/measurement_block_test.go index 68edd1384a..7a0a5e6c2b 100644 --- a/tsdb/index/tsi1/measurement_block_test.go +++ b/tsdb/index/tsi1/measurement_block_test.go @@ -2,12 +2,105 @@ package tsi1_test import ( "bytes" + "encoding/binary" + "fmt" "reflect" "testing" "github.com/influxdata/influxdb/tsdb/index/tsi1" ) +func TestReadMeasurementBlockTrailer(t *testing.T) { + // Build a trailer + var ( + data = make([]byte, tsi1.MeasurementTrailerSize) + blockversion = uint16(1) + blockOffset, blockSize = uint64(1), uint64(2500) + hashIdxOffset, hashIdxSize = uint64(2501), uint64(1000) + sketchOffset, sketchSize = uint64(3501), uint64(250) + tsketchOffset, tsketchSize = uint64(3751), uint64(250) + ) + + binary.BigEndian.PutUint64(data[0:], blockOffset) + binary.BigEndian.PutUint64(data[8:], blockSize) + binary.BigEndian.PutUint64(data[16:], hashIdxOffset) + binary.BigEndian.PutUint64(data[24:], hashIdxSize) + binary.BigEndian.PutUint64(data[32:], sketchOffset) + binary.BigEndian.PutUint64(data[40:], sketchSize) + binary.BigEndian.PutUint64(data[48:], tsketchOffset) + binary.BigEndian.PutUint64(data[56:], tsketchSize) + binary.BigEndian.PutUint16(data[64:], blockversion) + + trailer, err := tsi1.ReadMeasurementBlockTrailer(data) + if err != nil { + t.Logf("trailer is: %#v\n", trailer) + t.Fatal(err) + } + + ok := true && + trailer.Version == int(blockversion) && + trailer.Data.Offset == int64(blockOffset) && + trailer.Data.Size == int64(blockSize) && + trailer.HashIndex.Offset == int64(hashIdxOffset) && + trailer.HashIndex.Size == int64(hashIdxSize) && + trailer.Sketch.Offset == int64(sketchOffset) && + trailer.Sketch.Size == int64(sketchSize) && + trailer.TSketch.Offset == int64(tsketchOffset) && + trailer.TSketch.Size == int64(tsketchSize) + + if !ok { + t.Fatalf("got %v\nwhich does not match expected", trailer) + } +} + +func TestMeasurementBlockTrailer_WriteTo(t *testing.T) { + var trailer = tsi1.MeasurementBlockTrailer{ + Version: 1, + Data: struct { + Offset int64 + Size int64 + }{Offset: 1, Size: 2}, + HashIndex: struct { + Offset int64 + Size int64 + }{Offset: 3, Size: 4}, + Sketch: struct { + Offset int64 + Size int64 + }{Offset: 5, Size: 6}, + TSketch: struct { + Offset int64 + Size int64 + }{Offset: 7, Size: 8}, + } + + var buf bytes.Buffer + n, err := trailer.WriteTo(&buf) + if got, exp := n, int64(tsi1.MeasurementTrailerSize); got != exp { + t.Fatalf("got %v, exp %v", got, exp) + } + + if got := err; got != nil { + t.Fatalf("got %v, exp %v", got, nil) + } + + // Verify trailer written correctly. + exp := "" + + "0000000000000001" + // data offset + "0000000000000002" + // data size + "0000000000000003" + // hash index offset + "0000000000000004" + // hash index size + "0000000000000005" + // sketch offset + "0000000000000006" + // sketch size + "0000000000000007" + // tsketch offset + "0000000000000008" + // tsketch size + "0001" // version + + if got, exp := fmt.Sprintf("%x", buf.String()), exp; got != exp { + t.Fatalf("got %v, exp %v", got, exp) + } +} + // Ensure measurement blocks can be written and opened. func TestMeasurementBlockWriter(t *testing.T) { // Write 3 measurements to writer. diff --git a/tsdb/index/tsi1/series_block.go b/tsdb/index/tsi1/series_block.go index 9192abbad7..107362a31f 100644 --- a/tsdb/index/tsi1/series_block.go +++ b/tsdb/index/tsi1/series_block.go @@ -101,8 +101,6 @@ func (blk *SeriesBlock) Series(name []byte, tags models.Tags) SeriesElem { return nil } } - - return nil } // SeriesOffset returns offset of the encoded series key. @@ -269,8 +267,6 @@ func (blk *SeriesBlock) EncodeTerm(v []byte) uint32 { return 0 } } - - return 0 } // TermCount returns the number of terms within the dictionary.