Sketches working

pull/7913/head
Edd Robinson 2016-09-23 14:33:47 +01:00 committed by Ben Johnson
parent d19fbf5ab4
commit bd8dd9a291
No known key found for this signature in database
GPG Key ID: 81741CD251883081
10 changed files with 185 additions and 372 deletions

View File

@ -4,8 +4,8 @@ import (
"fmt"
"io"
"github.com/cespare/xxhash"
"github.com/clarkduvall/hyperloglog"
"github.com/influxdata/influxdb/pkg/murmur3"
)
// Sketch is the interface representing a sketch for estimating cardinality.
@ -48,8 +48,7 @@ func (h hash64) Sum64() uint64 {
}
func (s *HyperLogLogPlus) Add(v []byte) error {
hash := hash64(murmur3.Sum64(v))
s.hll.Add(hash)
s.hll.Add(hash64(xxhash.Sum64(v)))
return nil
}
@ -84,5 +83,5 @@ func (s *HyperLogLogPlus) Merge(sketch Sketch) error {
return fmt.Errorf("sketch is of type %T", sketch)
}
return s.Merge(other)
return s.hll.Merge(other.hll)
}

View File

@ -1,224 +0,0 @@
package murmur3
// NOTE(edd): This code is adapted from Sébastien Paolacci's
// implementation of murmur3's 128-bit hash. It was adapted so that
// the direct murmur3.Hash128 implementation could be used directly
// instead of through the hash.Hash interface.
import (
"hash"
"unsafe"
)
const (
c1_128 = 0x87c37b91114253d5
c2_128 = 0x4cf5ad432745937f
)
// Make sure interfaces are correctly implemented.
var (
_ hash.Hash = new(Hash128)
)
// Hash128 represents a partial evaluation of a 128-bit hash.
//
// Hash128 should also be used when 64-bit hashes are required. In such a case,
// the first part of the running hash may be used.
type Hash128 struct {
clen int // Digested input cumulative length.
tail []byte // 0 to Size()-1 bytes view of `buf'.
buf [16]byte // Expected (but not required) to be Size() large.
h1 uint64 // Unfinalized running hash part 1.
h2 uint64 // Unfinalized running hash part 2.
}
// BlockSize returns the hash's underlying block size.
func (h *Hash128) BlockSize() int { return 1 }
// Write (via the embedded io.Writer interface) adds more data to the running
// hash.
func (h *Hash128) Write(p []byte) (n int, err error) {
n = len(p)
h.clen += n
if len(h.tail) > 0 {
// Stick back pending bytes.
nfree := h.Size() - len(h.tail) // nfree ∈ [1, h.Size()-1].
if nfree < len(p) {
// One full block can be formeh.
block := append(h.tail, p[:nfree]...)
p = p[nfree:]
_ = h.bmix(block) // No tail.
} else {
// Tail's buf is large enough to prevent reallocs.
p = append(h.tail, p...)
}
}
h.tail = h.bmix(p)
// Keep own copy of the 0 to Size()-1 pending bytes.
nn := copy(h.buf[:], h.tail)
h.tail = h.buf[:nn]
return n, nil
}
// Reset resets the Hash to its initial state.
func (h *Hash128) Reset() {
h.clen = 0
h.tail = nil
h.h1, h.h2 = 0, 0
}
// Size returns the size of the hash.
func (h *Hash128) Size() int { return 16 }
// Sum appends the current hash to b and returns the resulting slice.
func (h *Hash128) Sum(b []byte) []byte {
h1, h2 := h.Sum128()
return append(b,
byte(h1>>56), byte(h1>>48), byte(h1>>40), byte(h1>>32),
byte(h1>>24), byte(h1>>16), byte(h1>>8), byte(h1),
byte(h2>>56), byte(h2>>48), byte(h2>>40), byte(h2>>32),
byte(h2>>24), byte(h2>>16), byte(h2>>8), byte(h2),
)
}
func (h *Hash128) bmix(p []byte) (tail []byte) {
h1, h2 := h.h1, h.h2
nblocks := len(p) / 16
for i := 0; i < nblocks; i++ {
t := (*[2]uint64)(unsafe.Pointer(&p[i*16]))
k1, k2 := t[0], t[1]
k1 *= c1_128
k1 = (k1 << 31) | (k1 >> 33) // rotl64(k1, 31)
k1 *= c2_128
h1 ^= k1
h1 = (h1 << 27) | (h1 >> 37) // rotl64(h1, 27)
h1 += h2
h1 = h1*5 + 0x52dce729
k2 *= c2_128
k2 = (k2 << 33) | (k2 >> 31) // rotl64(k2, 33)
k2 *= c1_128
h2 ^= k2
h2 = (h2 << 31) | (h2 >> 33) // rotl64(h2, 31)
h2 += h1
h2 = h2*5 + 0x38495ab5
}
h.h1, h.h2 = h1, h2
return p[nblocks*h.Size():]
}
// Sum128 returns the 128-bit current hash.
func (h *Hash128) Sum128() (h1, h2 uint64) {
h1, h2 = h.h1, h.h2
var k1, k2 uint64
switch len(h.tail) & 15 {
case 15:
k2 ^= uint64(h.tail[14]) << 48
fallthrough
case 14:
k2 ^= uint64(h.tail[13]) << 40
fallthrough
case 13:
k2 ^= uint64(h.tail[12]) << 32
fallthrough
case 12:
k2 ^= uint64(h.tail[11]) << 24
fallthrough
case 11:
k2 ^= uint64(h.tail[10]) << 16
fallthrough
case 10:
k2 ^= uint64(h.tail[9]) << 8
fallthrough
case 9:
k2 ^= uint64(h.tail[8]) << 0
k2 *= c2_128
k2 = (k2 << 33) | (k2 >> 31) // rotl64(k2, 33)
k2 *= c1_128
h2 ^= k2
fallthrough
case 8:
k1 ^= uint64(h.tail[7]) << 56
fallthrough
case 7:
k1 ^= uint64(h.tail[6]) << 48
fallthrough
case 6:
k1 ^= uint64(h.tail[5]) << 40
fallthrough
case 5:
k1 ^= uint64(h.tail[4]) << 32
fallthrough
case 4:
k1 ^= uint64(h.tail[3]) << 24
fallthrough
case 3:
k1 ^= uint64(h.tail[2]) << 16
fallthrough
case 2:
k1 ^= uint64(h.tail[1]) << 8
fallthrough
case 1:
k1 ^= uint64(h.tail[0]) << 0
k1 *= c1_128
k1 = (k1 << 31) | (k1 >> 33) // rotl64(k1, 31)
k1 *= c2_128
h1 ^= k1
}
h1 ^= uint64(h.clen)
h2 ^= uint64(h.clen)
h1 += h2
h2 += h1
h1 = fmix64(h1)
h2 = fmix64(h2)
h1 += h2
h2 += h1
return h1, h2
}
func fmix64(k uint64) uint64 {
k ^= k >> 33
k *= 0xff51afd7ed558ccd
k ^= k >> 33
k *= 0xc4ceb9fe1a85ec53
k ^= k >> 33
return k
}
/*
func rotl64(x uint64, r byte) uint64 {
return (x << r) | (x >> (64 - r))
}
*/
// Sum128 returns the MurmurHash3 sum of data. It is equivalent to the
// following sequence (without the extra burden and the extra allocation):
// hasher := New128()
// hasher.Write(data)
// return hasher.Sum128()
func Sum128(data []byte) (h1 uint64, h2 uint64) {
h := new(Hash128)
h.tail = h.bmix(data)
h.clen = len(data)
return h.Sum128()
}

View File

@ -1,60 +0,0 @@
package murmur3
// NOTE(edd): This code is adapted from Sébastien Paolacci's
// implementation of murmur3's 64-bit hash. It was adapted so that
// the direct murmur3.Hash64 implementation could be used directly
// instead of through the hash.Hash interface.
import (
"hash"
)
// Make sure interfaces are correctly implemented.
var (
_ hash.Hash = new(Hash64)
_ hash.Hash64 = new(Hash64)
)
// Hash64 is half a Hash128.
type Hash64 Hash128
// BlockSize returns the hash's underlying block size.
func (h *Hash64) BlockSize() int { return 1 }
// Reset resets the Hash to its initial state.
func (h *Hash64) Reset() {
(*Hash128)(h).Reset()
}
// Size returns the size of the hash.
func (h *Hash64) Size() int { return 8 }
// Write (via the embedded io.Writer interface) adds more data to the running
// hash.
func (h *Hash64) Write(p []byte) (n int, err error) {
return (*Hash128)(h).Write(p)
}
// Sum appends the current hash to b and returns the resulting slice.
func (h *Hash64) Sum(b []byte) []byte {
h1 := h.Sum64()
return append(b,
byte(h1>>56), byte(h1>>48), byte(h1>>40), byte(h1>>32),
byte(h1>>24), byte(h1>>16), byte(h1>>8), byte(h1))
}
// Sum64 returns the 64-bit current hash.
func (h *Hash64) Sum64() uint64 {
h1, _ := (*Hash128)(h).Sum128()
return h1
}
// Sum64 returns the MurmurHash3 sum of data. It is equivalent to the
// following sequence (without the extra burden and the extra allocation):
// hasher := New128()
// hasher.Write(data)
// return hasher.Sum64()
func Sum64(data []byte) uint64 {
h1, _ := Sum128(data)
return h1
}

View File

@ -1,49 +0,0 @@
package murmur3_test
import (
"crypto/rand"
"github.com/cespare/xxhash"
// "github.com/OneOfOne/xxhash"
// "github.com/spaolacci/murmur3"
"testing"
)
func benchmark(b *testing.B, hashFn func([]byte) uint64, n int) {
// generate random data.
var buf = make([]byte, n)
if m, err := rand.Read(buf); err != nil {
b.Fatal(err)
} else if m != n {
b.Fatalf("only wrote %d bytes to buffer of size %d", m, n)
}
b.ReportAllocs()
b.ResetTimer()
var bv uint64
for i := 0; i < b.N; i++ {
bv += hashFn(buf)
}
}
var (
// hashFn = xxhash.Checksum64
hashFn = xxhash.Sum64
// hashFn = murmur3.Sum64
)
func BenchmarkHash_5(b *testing.B) { benchmark(b, hashFn, 5) }
func BenchmarkHash_10(b *testing.B) { benchmark(b, hashFn, 10) }
func BenchmarkHash_15(b *testing.B) { benchmark(b, hashFn, 15) }
func BenchmarkHash_20(b *testing.B) { benchmark(b, hashFn, 20) }
func BenchmarkHash_25(b *testing.B) { benchmark(b, hashFn, 25) }
func BenchmarkHash_30(b *testing.B) { benchmark(b, hashFn, 30) }
func BenchmarkHash_35(b *testing.B) { benchmark(b, hashFn, 35) }
func BenchmarkHash_40(b *testing.B) { benchmark(b, hashFn, 40) }
func BenchmarkHash_45(b *testing.B) { benchmark(b, hashFn, 45) }
func BenchmarkHash_50(b *testing.B) { benchmark(b, hashFn, 50) }
func BenchmarkHash_100(b *testing.B) { benchmark(b, hashFn, 100) }
func BenchmarkHash_250(b *testing.B) { benchmark(b, hashFn, 250) }
func BenchmarkHash_500(b *testing.B) { benchmark(b, hashFn, 500) }

View File

@ -46,14 +46,13 @@ type Engine interface {
Series(key string) (*Series, error)
SeriesN() (uint64, error)
SeriesSketch() (estimator.Sketch, error)
MeasurementsSketch() (estimator.Sketch, error)
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
CreateMeasurement(name string) (*Measurement, error)
DeleteMeasurement(name string, seriesKeys []string) error
Measurement(name string) (*Measurement, error)
Measurements() (Measurements, error)
MeasurementCardinality() (n int64, err error)
MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error)
MeasurementsByRegex(re *regexp.Regexp) (Measurements, error)
MeasurementFields(measurement string) *MeasurementFields

View File

@ -289,10 +289,6 @@ func (e *Engine) Measurements() (tsdb.Measurements, error) {
return e.index.Measurements()
}
func (e *Engine) MeasurementCardinality() (int64, error) {
panic("TODO: edd")
}
func (e *Engine) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) {
return e.index.MeasurementsByExpr(expr)
}
@ -325,12 +321,12 @@ func (e *Engine) SeriesN() (uint64, error) {
return e.index.SeriesN()
}
func (e *Engine) SeriesSketch() (estimator.Sketch, error) {
return e.index.SeriesSketch()
func (e *Engine) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
return e.index.SeriesSketches()
}
func (e *Engine) MeasurementsSketch() (estimator.Sketch, error) {
return e.index.MeasurementsSketch()
func (e *Engine) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
return e.index.MeasurementsSketches()
}
// EngineStatistics maintains statistics for the engine.

View File

@ -25,8 +25,8 @@ type Index interface {
DropSeries(keys []string) error
SeriesN() (uint64, error)
SeriesSketch() (estimator.Sketch, error)
MeasurementsSketch() (estimator.Sketch, error)
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
Statistics(tags map[string]string) []models.Statistic
TagsForSeries(key string) (models.Tags, error)

View File

@ -34,8 +34,8 @@ type DatabaseIndex struct {
series map[string]*Series // map series key to the Series object
lastID uint64 // last used series ID. They're in memory only for this shard
seriesSketch *estimator.HyperLogLogPlus
measurementsSketch *estimator.HyperLogLogPlus
seriesSketch, seriesTSSketch *estimator.HyperLogLogPlus
measurementsSketch, measurementsTSSketch *estimator.HyperLogLogPlus
name string // name of the database represented by this index
@ -52,13 +52,17 @@ func NewDatabaseIndex(name string) (index *DatabaseIndex, err error) {
stats: &IndexStatistics{},
defaultTags: models.StatisticTags{"database": name},
}
if index.seriesSketch, err = estimator.NewHyperLogLogPlus(14); err != nil {
return nil, err
} else if index.seriesTSSketch, err = estimator.NewHyperLogLogPlus(14); err != nil {
return nil, err
} else if index.measurementsSketch, err = estimator.NewHyperLogLogPlus(14); err != nil {
return nil, err
} else if index.measurementsTSSketch, err = estimator.NewHyperLogLogPlus(14); err != nil {
return nil, err
}
if index.measurementsSketch, err = estimator.NewHyperLogLogPlus(14); err != nil {
return nil, err
}
return index, nil
}
@ -99,10 +103,10 @@ func (d *DatabaseIndex) SeriesN() (uint64, error) {
}
// SeriesSketch returns the sketch for the series.
func (d *DatabaseIndex) SeriesSketch() (estimator.Sketch, error) {
func (d *DatabaseIndex) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.seriesSketch, nil
return d.seriesSketch, d.seriesTSSketch, nil
}
// Measurement returns the measurement object from the index by the name
@ -113,10 +117,10 @@ func (d *DatabaseIndex) Measurement(name string) (*Measurement, error) {
}
// MeasurementsSketch returns the sketch for the series.
func (d *DatabaseIndex) MeasurementsSketch() (estimator.Sketch, error) {
func (d *DatabaseIndex) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.measurementsSketch, nil
return d.measurementsSketch, d.measurementsTSSketch, nil
}
// MeasurementsByName returns a list of measurements.

View File

@ -553,39 +553,61 @@ func (s *Store) DiskSize() (int64, error) {
return size, nil
}
// SeriesCardinality returns the series cardinality for the provided database.
func (s *Store) SeriesCardinality(database string) (int64, error) {
func (s *Store) cardinalityEstimate(dbName string, getSketches func(*Shard) (estimator.Sketch, estimator.Sketch, error)) (int64, error) {
var (
ss estimator.Sketch // Sketch estimating number of items.
ts estimator.Sketch // Sketch estimating number of tombstoned items.
)
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
shards := s.filterShards(byDatabase(dbName))
s.mu.RUnlock()
var sketch estimator.Sketch
// Iterate over all shards for the database and combine all of the series
// Iterate over all shards for the database and combine all of the sketches.
// sketches.
for _, shard := range shards {
other, err := shard.engine.SeriesSketch()
s, t, err := getSketches(shard)
if err != nil {
return 0, err
}
if sketch == nil {
sketch = other
} else if err = sketch.Merge(other); err != nil {
if ss == nil {
ss, ts = s, t
} else if err = ss.Merge(s); err != nil {
return 0, err
} else if err = ts.Merge(t); err != nil {
return 0, err
}
}
if sketch != nil {
cnt, err := sketch.Count()
return int64(cnt), err
if ss != nil {
pos, err := ss.Count()
if err != nil {
return 0, err
}
neg, err := ts.Count()
if err != nil {
return 0, err
}
return int64(pos - neg), nil
}
return 0, nil
}
// SeriesCardinality returns the series cardinality for the provided database.
func (s *Store) SeriesCardinality(database string) (int64, error) {
return s.cardinalityEstimate(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
return sh.engine.SeriesSketches()
})
}
// MeasurementsCardinality returns the measurement cardinality for the provided
// database.
func (s *Store) MeasurementsCardinality(database string) (int64, error) {
panic("TODO: edd")
return s.cardinalityEstimate(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
return sh.engine.MeasurementsSketches()
})
}
// BackupShard will get the shard and have the engine backup since the passed in

View File

@ -4,6 +4,8 @@ import (
"bytes"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"reflect"
@ -406,6 +408,124 @@ func TestStore_BackupRestoreShard(t *testing.T) {
}
}
func TestStore_SeriesCardinality_Unique(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test in short mode.")
}
store := MustOpenStore()
defer store.Close()
// Generate point data to write to the shards.
series := genTestSeries(64, 5, 5) // 200,000 series
expCardinality := len(series)
points := make([]models.Point, 0, len(series))
for _, s := range series {
points = append(points, models.MustNewPoint(s.Measurement, s.Series.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
}
// Create requested number of shards in the store & write points across
// shards such that we never write the same series to multiple shards.
for shardID := 0; shardID < 10; shardID++ {
if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
t.Fatalf("create shard: %s", err)
}
if err := store.BatchWrite(shardID, points[shardID*20000:(shardID+1)*20000]); err != nil {
t.Fatalf("batch write: %s", err)
}
}
// Estimate the series cardinality...
cardinality, err := store.Store.SeriesCardinality("db")
if err != nil {
t.Fatal(err)
}
// Estimated cardinality should be well within 1.5% of the actual cardinality.
if got, exp := math.Abs(float64(cardinality)-float64(expCardinality))/float64(expCardinality), 0.015; got > exp {
t.Fatalf("got epsilon of %v for cardinality %d (expected %d), which is larger than expected %v", got, cardinality, expCardinality, exp)
}
}
// This test tests cardinality estimation when series data is duplicated across
// multiple shards.
func TestStore_SeriesCardinality_Duplicates(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test in short mode.")
}
store := MustOpenStore()
defer store.Close()
// Generate point data to write to the shards.
series := genTestSeries(64, 5, 5) // 200,000 series.
expCardinality := len(series)
points := make([]models.Point, 0, len(series))
for _, s := range series {
points = append(points, models.MustNewPoint(s.Measurement, s.Series.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
}
// Create requested number of shards in the store & write points.
for shardID := 0; shardID < 10; shardID++ {
if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
t.Fatalf("create shard: %s", err)
}
var from, to int
if shardID == 0 {
// if it's the first shard then write all of the points.
from, to = 0, len(points)-1
} else {
// For other shards we write a random sub-section of all the points.
// which will duplicate the series and shouldn't increase the
// cardinality.
from, to := rand.Intn(len(points)), rand.Intn(len(points))
if from > to {
from, to = to, from
}
}
if err := store.BatchWrite(shardID, points[from:to]); err != nil {
t.Fatalf("batch write: %s", err)
}
}
// Estimate the series cardinality...
cardinality, err := store.Store.SeriesCardinality("db")
if err != nil {
t.Fatal(err)
}
// Estimated cardinality should be well within 1.5% of the actual cardinality.
if got, exp := math.Abs(float64(cardinality)-float64(expCardinality))/float64(expCardinality), 0.015; got > exp {
t.Fatalf("got epsilon of %v for cardinality %d (expected %d), which is larger than expected %v", got, cardinality, expCardinality, exp)
}
}
func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) {
store := MustOpenStore()
defer store.Close()
// Write a point to 100 shards.
for shardID := 0; shardID < 100; shardID++ {
if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
b.Fatalf("create shard: %s", err)
}
err := store.WriteToShard(uint64(shardID), []models.Point{models.MustNewPoint("cpu", nil, map[string]interface{}{"value": 1.0}, time.Now())})
if err != nil {
b.Fatalf("write: %s", err)
}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = store.SeriesCardinality("db")
}
}
func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(b, 64, 5, 5, 1, 100) }
func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {
@ -476,6 +596,12 @@ func NewStore() *Store {
// MustOpenStore returns a new, open Store at a temporary path.
func MustOpenStore() *Store {
s := NewStore()
// Quieten the logs.
if !testing.Verbose() {
s.SetLogOutput(ioutil.Discard)
}
if err := s.Open(); err != nil {
panic(err)
}