Add measurement sketches

pull/7913/head
Edd Robinson 2016-11-17 16:33:35 +00:00 committed by Ben Johnson
parent 1339c7b146
commit 695adafc00
No known key found for this signature in database
GPG Key ID: 81741CD251883081
9 changed files with 270 additions and 25 deletions

View File

@ -28,6 +28,9 @@ import (
// Current version of HLL implementation.
const version uint8 = 1
// DefaultPrecision is the default precision.
const DefaultPrecision = 16
// Plus implements the Hyperloglog++ algorithm, described in the following
// paper: http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf
//

View File

@ -65,10 +65,10 @@ func NewIndex(id uint64, path string, opt tsdb.EngineOptions) *Index {
series: make(map[string]*tsdb.Series),
}
index.seriesSketch = hll.MustNewPlus(16)
index.seriesTSSketch = hll.MustNewPlus(16)
index.measurementsSketch = hll.MustNewPlus(16)
index.measurementsTSSketch = hll.MustNewPlus(16)
index.seriesSketch = hll.NewDefaultPlus()
index.seriesTSSketch = hll.NewDefaultPlus()
index.measurementsSketch = hll.NewDefaultPlus()
index.measurementsTSSketch = hll.NewDefaultPlus()
return index
}

View File

@ -177,16 +177,34 @@ Measurements
Sketch
Tombstone Sketch
Trailer
Trailer
Hash Index Offset <uint64>
Block Offset <uint64>
Block Size <uint64>
Hash Index Offset <uint64>
Hash Index Size <uint64>
Sketch Offset <uint64>
Sketch Size <uint64>
Tomb Sketch Offset <uint64>
Tomb Sketch Size <uint64>
Block Version <uint16>

View File

@ -15,6 +15,8 @@ import (
"sync"
"time"
"github.com/influxdata/influxdb/pkg/estimator/hll"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/bytesutil"
@ -622,10 +624,24 @@ func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
}
// MeasurementsSketches returns the two sketches for the index by merging all
// instances of the type sketch types in all the indexes files.
func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
//FIXME(edd)
return nil, nil, fmt.Errorf("MeasurementSketches not implemented")
var (
sketch estimator.Sketch = hll.NewDefaultPlus()
tsketch estimator.Sketch = hll.NewDefaultPlus()
)
// Iterate over all the index files and merge all the sketches.
for _, f := range i.indexFiles {
if err := sketch.Merge(f.mblk.sketch); err != nil {
return nil, nil, err
}
if err := tsketch.Merge(f.mblk.tsketch); err != nil {
return nil, nil, err
}
}
return sketch, tsketch, nil
}
// Dereference is a nop.

View File

@ -4,6 +4,8 @@ import (
"fmt"
"io"
"sort"
"github.com/influxdata/influxdb/pkg/estimator/hll"
)
// IndexFiles represents a layered set of index files.
@ -216,6 +218,22 @@ func (p *IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactI
func (p *IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo, n *int64) error {
mw := NewMeasurementBlockWriter()
// As the index files are merged together, it's possible that measurements
// were added, removed and then added again over time. Since sketches cannot
// have values removed from them, the measurements would be in both the
// resulting measurements and tombstoned measurements sketches. So that a
// measurements only appears in one of the sketches, we rebuild some fresh
// sketches during the compaction.
mw.sketch, mw.tsketch = hll.NewDefaultPlus(), hll.NewDefaultPlus()
itr := p.MeasurementIterator()
for e := itr.Next(); e != nil; e = itr.Next() {
if e.Deleted() {
mw.tsketch.Add(e.Name())
} else {
mw.sketch.Add(e.Name())
}
}
// Add measurement data.
for _, name := range info.names {
// Look-up series ids.
@ -235,6 +253,22 @@ func (p *IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo
mw.Add(name, pos.offset, pos.size, seriesIDs)
}
// Generate merged sketches to write out.
sketch, tsketch := hll.NewDefaultPlus(), hll.NewDefaultPlus()
// merge all the sketches in the index files together.
for _, idx := range *p {
if err := sketch.Merge(idx.mblk.sketch); err != nil {
return err
}
if err := tsketch.Merge(idx.mblk.tsketch); err != nil {
return err
}
}
// Set the merged sketches on the measurement block writer.
mw.sketch, mw.tsketch = sketch, tsketch
// Write data to writer.
nn, err := mw.WriteTo(w)
*n += nn

View File

@ -282,7 +282,7 @@ func TestIndex_TagKeyIterator(t *testing.T) {
} else if e := itr.Next(); string(e.Key()) != `type` {
t.Fatalf("unexpected key(%s): %s", state, e.Key())
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil key(%s): %s/%s", state, e.Key())
t.Fatalf("expected nil key(%s): %s", state, e.Key())
}
}); err != nil {
t.Fatal(err)

View File

@ -7,6 +7,8 @@ import (
"io"
"sort"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/estimator/hll"
"github.com/influxdata/influxdb/pkg/rhh"
)
@ -27,7 +29,9 @@ const (
MeasurementTrailerSize = 0 +
2 + // version
8 + 8 + // data offset/size
8 + 8 // hash index offset/size
8 + 8 + // hash index offset/size
8 + 8 + // measurement sketch offset/size
8 + 8 // tombstone measurement sketch offset/size
// Measurement key block fields.
MeasurementNSize = 4
@ -36,8 +40,8 @@ const (
// Measurement errors.
var (
ErrUnsupportedMeasurementBlockVersion = errors.New("unsupported meaurement block version")
ErrMeasurementBlockSizeMismatch = errors.New("meaurement block size mismatch")
ErrUnsupportedMeasurementBlockVersion = errors.New("unsupported measurement block version")
ErrMeasurementBlockSizeMismatch = errors.New("measurement block size mismatch")
)
// MeasurementBlock represents a collection of all measurements in an index.
@ -45,6 +49,10 @@ type MeasurementBlock struct {
data []byte
hashData []byte
// Measurement block sketch and tombstone sketch for cardinality
// estimation.
sketch, tsketch estimator.Sketch
version int // block version
}
@ -109,6 +117,18 @@ func (blk *MeasurementBlock) UnmarshalBinary(data []byte) error {
blk.hashData = data[t.HashIndex.Offset:]
blk.hashData = blk.hashData[:t.HashIndex.Size]
// Initialise sketches. We're currently using HLL+.
var s, ts *hll.Plus
if err := s.UnmarshalBinary(data[t.Sketch.Offset:][:t.Sketch.Size]); err != nil {
return err
}
blk.sketch = s
if err := ts.UnmarshalBinary(data[t.TSketch.Offset:][:t.TSketch.Size]); err != nil {
return err
}
blk.tsketch = ts
return nil
}
@ -180,20 +200,32 @@ type MeasurementBlockTrailer struct {
Offset int64
Size int64
}
// Offset and size of cardinality sketch for measurements.
Sketch struct {
Offset int64
Size int64
}
// Offset and size of cardinality sketch for tombstoned measurements.
TSketch struct {
Offset int64
Size int64
}
}
// ReadMeasurementBlockTrailer returns the block trailer from data.
func ReadMeasurementBlockTrailer(data []byte) (MeasurementBlockTrailer, error) {
var t MeasurementBlockTrailer
// Read version.
// Read version (which is located in the last two bytes of the trailer).
t.Version = int(binary.BigEndian.Uint16(data[len(data)-2:]))
if t.Version != MeasurementBlockVersion {
return t, ErrUnsupportedIndexFileVersion
}
// Slice trailer data.
buf := data[len(data)-IndexFileTrailerSize:]
buf := data[len(data)-MeasurementTrailerSize:]
// Read data section info.
t.Data.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
@ -203,6 +235,14 @@ func ReadMeasurementBlockTrailer(data []byte) (MeasurementBlockTrailer, error) {
t.HashIndex.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
t.HashIndex.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
// Read measurment sketch info.
t.Sketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
t.Sketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
// Read tombstone measurment sketch info.
t.TSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
t.TSketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
return t, nil
}
@ -222,7 +262,21 @@ func (t *MeasurementBlockTrailer) WriteTo(w io.Writer) (n int64, err error) {
return n, err
}
// Write index file encoding version.
// Write measurement sketch info.
if err := writeUint64To(w, uint64(t.Sketch.Offset), &n); err != nil {
return n, err
} else if err := writeUint64To(w, uint64(t.Sketch.Size), &n); err != nil {
return n, err
}
// Write tombstone measurement sketch info.
if err := writeUint64To(w, uint64(t.TSketch.Offset), &n); err != nil {
return n, err
} else if err := writeUint64To(w, uint64(t.TSketch.Size), &n); err != nil {
return n, err
}
// Write measurement block version.
if err := writeUint16To(w, MeasurementBlockVersion, &n); err != nil {
return n, err
}
@ -306,6 +360,10 @@ func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error {
// MeasurementBlockWriter writes a measurement block.
type MeasurementBlockWriter struct {
mms map[string]measurement
// Measurement sketch and tombstoned measurement sketch. These must be
// set before calling WriteTo.
sketch, tsketch estimator.Sketch
}
// NewMeasurementBlockWriter returns a new MeasurementBlockWriter.
@ -324,17 +382,17 @@ func (mw *MeasurementBlockWriter) Add(name []byte, offset, size int64, seriesIDs
mw.mms[string(name)] = mm
}
// Delete marks a measurement as tombstoned.
func (mw *MeasurementBlockWriter) Delete(name []byte) {
mm := mw.mms[string(name)]
mm.deleted = true
mw.mms[string(name)] = mm
}
// WriteTo encodes the measurements to w.
func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) {
var t MeasurementBlockTrailer
// The sketches must be set before calling WriteTo.
if mw.sketch == nil {
return 0, errors.New("measurement sketch not set")
} else if mw.tsketch == nil {
return 0, errors.New("measurement tombstone sketch not set")
}
// Sort names.
names := make([]string, 0, len(mw.mms))
for name := range mw.mms {
@ -396,6 +454,19 @@ func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) {
}
t.HashIndex.Size = n - t.HashIndex.Offset
// Write the sketches out.
t.Sketch.Offset = n
if err := writeSketchTo(w, mw.sketch, &n); err != nil {
return n, err
}
t.Sketch.Size = n - t.Sketch.Offset
t.TSketch.Offset = n
if err := writeSketchTo(w, mw.tsketch, &n); err != nil {
return n, err
}
t.TSketch.Size = n - t.TSketch.Offset
// Write trailer.
nn, err := t.WriteTo(w)
n += nn
@ -439,6 +510,20 @@ func (mw *MeasurementBlockWriter) writeMeasurementTo(w io.Writer, name []byte, m
return nil
}
// writeSketchTo writes an estimator.Sketch into w, updating the number of bytes
// written via n.
func writeSketchTo(w io.Writer, s estimator.Sketch, n *int64) error {
// TODO(edd): implement io.WriterTo on sketches.
data, err := s.MarshalBinary()
if err != nil {
return err
}
nn, err := w.Write(data)
*n += int64(nn)
return err
}
type measurement struct {
deleted bool
tagBlock struct {

View File

@ -2,12 +2,105 @@ package tsi1_test
import (
"bytes"
"encoding/binary"
"fmt"
"reflect"
"testing"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
)
func TestReadMeasurementBlockTrailer(t *testing.T) {
// Build a trailer
var (
data = make([]byte, tsi1.MeasurementTrailerSize)
blockversion = uint16(1)
blockOffset, blockSize = uint64(1), uint64(2500)
hashIdxOffset, hashIdxSize = uint64(2501), uint64(1000)
sketchOffset, sketchSize = uint64(3501), uint64(250)
tsketchOffset, tsketchSize = uint64(3751), uint64(250)
)
binary.BigEndian.PutUint64(data[0:], blockOffset)
binary.BigEndian.PutUint64(data[8:], blockSize)
binary.BigEndian.PutUint64(data[16:], hashIdxOffset)
binary.BigEndian.PutUint64(data[24:], hashIdxSize)
binary.BigEndian.PutUint64(data[32:], sketchOffset)
binary.BigEndian.PutUint64(data[40:], sketchSize)
binary.BigEndian.PutUint64(data[48:], tsketchOffset)
binary.BigEndian.PutUint64(data[56:], tsketchSize)
binary.BigEndian.PutUint16(data[64:], blockversion)
trailer, err := tsi1.ReadMeasurementBlockTrailer(data)
if err != nil {
t.Logf("trailer is: %#v\n", trailer)
t.Fatal(err)
}
ok := true &&
trailer.Version == int(blockversion) &&
trailer.Data.Offset == int64(blockOffset) &&
trailer.Data.Size == int64(blockSize) &&
trailer.HashIndex.Offset == int64(hashIdxOffset) &&
trailer.HashIndex.Size == int64(hashIdxSize) &&
trailer.Sketch.Offset == int64(sketchOffset) &&
trailer.Sketch.Size == int64(sketchSize) &&
trailer.TSketch.Offset == int64(tsketchOffset) &&
trailer.TSketch.Size == int64(tsketchSize)
if !ok {
t.Fatalf("got %v\nwhich does not match expected", trailer)
}
}
func TestMeasurementBlockTrailer_WriteTo(t *testing.T) {
var trailer = tsi1.MeasurementBlockTrailer{
Version: 1,
Data: struct {
Offset int64
Size int64
}{Offset: 1, Size: 2},
HashIndex: struct {
Offset int64
Size int64
}{Offset: 3, Size: 4},
Sketch: struct {
Offset int64
Size int64
}{Offset: 5, Size: 6},
TSketch: struct {
Offset int64
Size int64
}{Offset: 7, Size: 8},
}
var buf bytes.Buffer
n, err := trailer.WriteTo(&buf)
if got, exp := n, int64(tsi1.MeasurementTrailerSize); got != exp {
t.Fatalf("got %v, exp %v", got, exp)
}
if got := err; got != nil {
t.Fatalf("got %v, exp %v", got, nil)
}
// Verify trailer written correctly.
exp := "" +
"0000000000000001" + // data offset
"0000000000000002" + // data size
"0000000000000003" + // hash index offset
"0000000000000004" + // hash index size
"0000000000000005" + // sketch offset
"0000000000000006" + // sketch size
"0000000000000007" + // tsketch offset
"0000000000000008" + // tsketch size
"0001" // version
if got, exp := fmt.Sprintf("%x", buf.String()), exp; got != exp {
t.Fatalf("got %v, exp %v", got, exp)
}
}
// Ensure measurement blocks can be written and opened.
func TestMeasurementBlockWriter(t *testing.T) {
// Write 3 measurements to writer.

View File

@ -101,8 +101,6 @@ func (blk *SeriesBlock) Series(name []byte, tags models.Tags) SeriesElem {
return nil
}
}
return nil
}
// SeriesOffset returns offset of the encoded series key.
@ -269,8 +267,6 @@ func (blk *SeriesBlock) EncodeTerm(v []byte) uint32 {
return 0
}
}
return 0
}
// TermCount returns the number of terms within the dictionary.