diff --git a/pkg/estimator/hll.go b/pkg/estimator/hll.go index 194e4907c8..be4b32c5db 100644 --- a/pkg/estimator/hll.go +++ b/pkg/estimator/hll.go @@ -4,8 +4,8 @@ import ( "fmt" "io" + "github.com/cespare/xxhash" "github.com/clarkduvall/hyperloglog" - "github.com/influxdata/influxdb/pkg/murmur3" ) // Sketch is the interface representing a sketch for estimating cardinality. @@ -48,8 +48,7 @@ func (h hash64) Sum64() uint64 { } func (s *HyperLogLogPlus) Add(v []byte) error { - hash := hash64(murmur3.Sum64(v)) - s.hll.Add(hash) + s.hll.Add(hash64(xxhash.Sum64(v))) return nil } @@ -84,5 +83,5 @@ func (s *HyperLogLogPlus) Merge(sketch Sketch) error { return fmt.Errorf("sketch is of type %T", sketch) } - return s.Merge(other) + return s.hll.Merge(other.hll) } diff --git a/pkg/murmur3/murmur128.go b/pkg/murmur3/murmur128.go deleted file mode 100644 index e6451c315d..0000000000 --- a/pkg/murmur3/murmur128.go +++ /dev/null @@ -1,224 +0,0 @@ -package murmur3 - -// NOTE(edd): This code is adapted from Sébastien Paolacci's -// implementation of murmur3's 128-bit hash. It was adapted so that -// the direct murmur3.Hash128 implementation could be used directly -// instead of through the hash.Hash interface. - -import ( - "hash" - "unsafe" -) - -const ( - c1_128 = 0x87c37b91114253d5 - c2_128 = 0x4cf5ad432745937f -) - -// Make sure interfaces are correctly implemented. -var ( - _ hash.Hash = new(Hash128) -) - -// Hash128 represents a partial evaluation of a 128-bit hash. -// -// Hash128 should also be used when 64-bit hashes are required. In such a case, -// the first part of the running hash may be used. -type Hash128 struct { - clen int // Digested input cumulative length. - tail []byte // 0 to Size()-1 bytes view of `buf'. - buf [16]byte // Expected (but not required) to be Size() large. - - h1 uint64 // Unfinalized running hash part 1. - h2 uint64 // Unfinalized running hash part 2. -} - -// BlockSize returns the hash's underlying block size. -func (h *Hash128) BlockSize() int { return 1 } - -// Write (via the embedded io.Writer interface) adds more data to the running -// hash. -func (h *Hash128) Write(p []byte) (n int, err error) { - n = len(p) - h.clen += n - - if len(h.tail) > 0 { - // Stick back pending bytes. - nfree := h.Size() - len(h.tail) // nfree ∈ [1, h.Size()-1]. - if nfree < len(p) { - // One full block can be formeh. - block := append(h.tail, p[:nfree]...) - p = p[nfree:] - _ = h.bmix(block) // No tail. - } else { - // Tail's buf is large enough to prevent reallocs. - p = append(h.tail, p...) - } - } - - h.tail = h.bmix(p) - - // Keep own copy of the 0 to Size()-1 pending bytes. - nn := copy(h.buf[:], h.tail) - h.tail = h.buf[:nn] - - return n, nil -} - -// Reset resets the Hash to its initial state. -func (h *Hash128) Reset() { - h.clen = 0 - h.tail = nil - h.h1, h.h2 = 0, 0 -} - -// Size returns the size of the hash. -func (h *Hash128) Size() int { return 16 } - -// Sum appends the current hash to b and returns the resulting slice. -func (h *Hash128) Sum(b []byte) []byte { - h1, h2 := h.Sum128() - return append(b, - byte(h1>>56), byte(h1>>48), byte(h1>>40), byte(h1>>32), - byte(h1>>24), byte(h1>>16), byte(h1>>8), byte(h1), - - byte(h2>>56), byte(h2>>48), byte(h2>>40), byte(h2>>32), - byte(h2>>24), byte(h2>>16), byte(h2>>8), byte(h2), - ) -} - -func (h *Hash128) bmix(p []byte) (tail []byte) { - h1, h2 := h.h1, h.h2 - - nblocks := len(p) / 16 - for i := 0; i < nblocks; i++ { - t := (*[2]uint64)(unsafe.Pointer(&p[i*16])) - k1, k2 := t[0], t[1] - - k1 *= c1_128 - k1 = (k1 << 31) | (k1 >> 33) // rotl64(k1, 31) - k1 *= c2_128 - h1 ^= k1 - - h1 = (h1 << 27) | (h1 >> 37) // rotl64(h1, 27) - h1 += h2 - h1 = h1*5 + 0x52dce729 - - k2 *= c2_128 - k2 = (k2 << 33) | (k2 >> 31) // rotl64(k2, 33) - k2 *= c1_128 - h2 ^= k2 - - h2 = (h2 << 31) | (h2 >> 33) // rotl64(h2, 31) - h2 += h1 - h2 = h2*5 + 0x38495ab5 - } - h.h1, h.h2 = h1, h2 - return p[nblocks*h.Size():] -} - -// Sum128 returns the 128-bit current hash. -func (h *Hash128) Sum128() (h1, h2 uint64) { - - h1, h2 = h.h1, h.h2 - - var k1, k2 uint64 - switch len(h.tail) & 15 { - case 15: - k2 ^= uint64(h.tail[14]) << 48 - fallthrough - case 14: - k2 ^= uint64(h.tail[13]) << 40 - fallthrough - case 13: - k2 ^= uint64(h.tail[12]) << 32 - fallthrough - case 12: - k2 ^= uint64(h.tail[11]) << 24 - fallthrough - case 11: - k2 ^= uint64(h.tail[10]) << 16 - fallthrough - case 10: - k2 ^= uint64(h.tail[9]) << 8 - fallthrough - case 9: - k2 ^= uint64(h.tail[8]) << 0 - - k2 *= c2_128 - k2 = (k2 << 33) | (k2 >> 31) // rotl64(k2, 33) - k2 *= c1_128 - h2 ^= k2 - - fallthrough - - case 8: - k1 ^= uint64(h.tail[7]) << 56 - fallthrough - case 7: - k1 ^= uint64(h.tail[6]) << 48 - fallthrough - case 6: - k1 ^= uint64(h.tail[5]) << 40 - fallthrough - case 5: - k1 ^= uint64(h.tail[4]) << 32 - fallthrough - case 4: - k1 ^= uint64(h.tail[3]) << 24 - fallthrough - case 3: - k1 ^= uint64(h.tail[2]) << 16 - fallthrough - case 2: - k1 ^= uint64(h.tail[1]) << 8 - fallthrough - case 1: - k1 ^= uint64(h.tail[0]) << 0 - k1 *= c1_128 - k1 = (k1 << 31) | (k1 >> 33) // rotl64(k1, 31) - k1 *= c2_128 - h1 ^= k1 - } - - h1 ^= uint64(h.clen) - h2 ^= uint64(h.clen) - - h1 += h2 - h2 += h1 - - h1 = fmix64(h1) - h2 = fmix64(h2) - - h1 += h2 - h2 += h1 - - return h1, h2 -} - -func fmix64(k uint64) uint64 { - k ^= k >> 33 - k *= 0xff51afd7ed558ccd - k ^= k >> 33 - k *= 0xc4ceb9fe1a85ec53 - k ^= k >> 33 - return k -} - -/* -func rotl64(x uint64, r byte) uint64 { - return (x << r) | (x >> (64 - r)) -} -*/ - -// Sum128 returns the MurmurHash3 sum of data. It is equivalent to the -// following sequence (without the extra burden and the extra allocation): -// hasher := New128() -// hasher.Write(data) -// return hasher.Sum128() -func Sum128(data []byte) (h1 uint64, h2 uint64) { - h := new(Hash128) - h.tail = h.bmix(data) - h.clen = len(data) - return h.Sum128() -} diff --git a/pkg/murmur3/murmur64.go b/pkg/murmur3/murmur64.go deleted file mode 100644 index 0db482dccd..0000000000 --- a/pkg/murmur3/murmur64.go +++ /dev/null @@ -1,60 +0,0 @@ -package murmur3 - -// NOTE(edd): This code is adapted from Sébastien Paolacci's -// implementation of murmur3's 64-bit hash. It was adapted so that -// the direct murmur3.Hash64 implementation could be used directly -// instead of through the hash.Hash interface. - -import ( - "hash" -) - -// Make sure interfaces are correctly implemented. -var ( - _ hash.Hash = new(Hash64) - _ hash.Hash64 = new(Hash64) -) - -// Hash64 is half a Hash128. -type Hash64 Hash128 - -// BlockSize returns the hash's underlying block size. -func (h *Hash64) BlockSize() int { return 1 } - -// Reset resets the Hash to its initial state. -func (h *Hash64) Reset() { - (*Hash128)(h).Reset() -} - -// Size returns the size of the hash. -func (h *Hash64) Size() int { return 8 } - -// Write (via the embedded io.Writer interface) adds more data to the running -// hash. -func (h *Hash64) Write(p []byte) (n int, err error) { - return (*Hash128)(h).Write(p) -} - -// Sum appends the current hash to b and returns the resulting slice. -func (h *Hash64) Sum(b []byte) []byte { - h1 := h.Sum64() - return append(b, - byte(h1>>56), byte(h1>>48), byte(h1>>40), byte(h1>>32), - byte(h1>>24), byte(h1>>16), byte(h1>>8), byte(h1)) -} - -// Sum64 returns the 64-bit current hash. -func (h *Hash64) Sum64() uint64 { - h1, _ := (*Hash128)(h).Sum128() - return h1 -} - -// Sum64 returns the MurmurHash3 sum of data. It is equivalent to the -// following sequence (without the extra burden and the extra allocation): -// hasher := New128() -// hasher.Write(data) -// return hasher.Sum64() -func Sum64(data []byte) uint64 { - h1, _ := Sum128(data) - return h1 -} diff --git a/pkg/murmur3/murmur64_test.go b/pkg/murmur3/murmur64_test.go deleted file mode 100644 index 1b751ee839..0000000000 --- a/pkg/murmur3/murmur64_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package murmur3_test - -import ( - "crypto/rand" - - "github.com/cespare/xxhash" - - // "github.com/OneOfOne/xxhash" - // "github.com/spaolacci/murmur3" - - "testing" -) - -func benchmark(b *testing.B, hashFn func([]byte) uint64, n int) { - // generate random data. - var buf = make([]byte, n) - if m, err := rand.Read(buf); err != nil { - b.Fatal(err) - } else if m != n { - b.Fatalf("only wrote %d bytes to buffer of size %d", m, n) - } - - b.ReportAllocs() - b.ResetTimer() - var bv uint64 - for i := 0; i < b.N; i++ { - bv += hashFn(buf) - } -} - -var ( - // hashFn = xxhash.Checksum64 - hashFn = xxhash.Sum64 - // hashFn = murmur3.Sum64 -) - -func BenchmarkHash_5(b *testing.B) { benchmark(b, hashFn, 5) } -func BenchmarkHash_10(b *testing.B) { benchmark(b, hashFn, 10) } -func BenchmarkHash_15(b *testing.B) { benchmark(b, hashFn, 15) } -func BenchmarkHash_20(b *testing.B) { benchmark(b, hashFn, 20) } -func BenchmarkHash_25(b *testing.B) { benchmark(b, hashFn, 25) } -func BenchmarkHash_30(b *testing.B) { benchmark(b, hashFn, 30) } -func BenchmarkHash_35(b *testing.B) { benchmark(b, hashFn, 35) } -func BenchmarkHash_40(b *testing.B) { benchmark(b, hashFn, 40) } -func BenchmarkHash_45(b *testing.B) { benchmark(b, hashFn, 45) } -func BenchmarkHash_50(b *testing.B) { benchmark(b, hashFn, 50) } -func BenchmarkHash_100(b *testing.B) { benchmark(b, hashFn, 100) } -func BenchmarkHash_250(b *testing.B) { benchmark(b, hashFn, 250) } -func BenchmarkHash_500(b *testing.B) { benchmark(b, hashFn, 500) } diff --git a/tsdb/engine.go b/tsdb/engine.go index 5bf9bff328..7b0404ecc5 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -46,14 +46,13 @@ type Engine interface { Series(key string) (*Series, error) SeriesN() (uint64, error) - SeriesSketch() (estimator.Sketch, error) - MeasurementsSketch() (estimator.Sketch, error) + SeriesSketches() (estimator.Sketch, estimator.Sketch, error) + MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) CreateMeasurement(name string) (*Measurement, error) DeleteMeasurement(name string, seriesKeys []string) error Measurement(name string) (*Measurement, error) Measurements() (Measurements, error) - MeasurementCardinality() (n int64, err error) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error) MeasurementsByRegex(re *regexp.Regexp) (Measurements, error) MeasurementFields(measurement string) *MeasurementFields diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 6d69331dba..a08fcb836b 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -289,10 +289,6 @@ func (e *Engine) Measurements() (tsdb.Measurements, error) { return e.index.Measurements() } -func (e *Engine) MeasurementCardinality() (int64, error) { - panic("TODO: edd") -} - func (e *Engine) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) { return e.index.MeasurementsByExpr(expr) } @@ -325,12 +321,12 @@ func (e *Engine) SeriesN() (uint64, error) { return e.index.SeriesN() } -func (e *Engine) SeriesSketch() (estimator.Sketch, error) { - return e.index.SeriesSketch() +func (e *Engine) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { + return e.index.SeriesSketches() } -func (e *Engine) MeasurementsSketch() (estimator.Sketch, error) { - return e.index.MeasurementsSketch() +func (e *Engine) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { + return e.index.MeasurementsSketches() } // EngineStatistics maintains statistics for the engine. diff --git a/tsdb/index.go b/tsdb/index.go index 48646f09ba..b8bc131c79 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -25,8 +25,8 @@ type Index interface { DropSeries(keys []string) error SeriesN() (uint64, error) - SeriesSketch() (estimator.Sketch, error) - MeasurementsSketch() (estimator.Sketch, error) + SeriesSketches() (estimator.Sketch, estimator.Sketch, error) + MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) Statistics(tags map[string]string) []models.Statistic TagsForSeries(key string) (models.Tags, error) diff --git a/tsdb/meta.go b/tsdb/meta.go index f824ecce4d..856f072f35 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -34,8 +34,8 @@ type DatabaseIndex struct { series map[string]*Series // map series key to the Series object lastID uint64 // last used series ID. They're in memory only for this shard - seriesSketch *estimator.HyperLogLogPlus - measurementsSketch *estimator.HyperLogLogPlus + seriesSketch, seriesTSSketch *estimator.HyperLogLogPlus + measurementsSketch, measurementsTSSketch *estimator.HyperLogLogPlus name string // name of the database represented by this index @@ -52,13 +52,17 @@ func NewDatabaseIndex(name string) (index *DatabaseIndex, err error) { stats: &IndexStatistics{}, defaultTags: models.StatisticTags{"database": name}, } + if index.seriesSketch, err = estimator.NewHyperLogLogPlus(14); err != nil { return nil, err + } else if index.seriesTSSketch, err = estimator.NewHyperLogLogPlus(14); err != nil { + return nil, err + } else if index.measurementsSketch, err = estimator.NewHyperLogLogPlus(14); err != nil { + return nil, err + } else if index.measurementsTSSketch, err = estimator.NewHyperLogLogPlus(14); err != nil { + return nil, err } - if index.measurementsSketch, err = estimator.NewHyperLogLogPlus(14); err != nil { - return nil, err - } return index, nil } @@ -99,10 +103,10 @@ func (d *DatabaseIndex) SeriesN() (uint64, error) { } // SeriesSketch returns the sketch for the series. -func (d *DatabaseIndex) SeriesSketch() (estimator.Sketch, error) { +func (d *DatabaseIndex) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { d.mu.RLock() defer d.mu.RUnlock() - return d.seriesSketch, nil + return d.seriesSketch, d.seriesTSSketch, nil } // Measurement returns the measurement object from the index by the name @@ -113,10 +117,10 @@ func (d *DatabaseIndex) Measurement(name string) (*Measurement, error) { } // MeasurementsSketch returns the sketch for the series. -func (d *DatabaseIndex) MeasurementsSketch() (estimator.Sketch, error) { +func (d *DatabaseIndex) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { d.mu.RLock() defer d.mu.RUnlock() - return d.measurementsSketch, nil + return d.measurementsSketch, d.measurementsTSSketch, nil } // MeasurementsByName returns a list of measurements. diff --git a/tsdb/store.go b/tsdb/store.go index 0fdd494974..980361d1e4 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -553,39 +553,61 @@ func (s *Store) DiskSize() (int64, error) { return size, nil } -// SeriesCardinality returns the series cardinality for the provided database. -func (s *Store) SeriesCardinality(database string) (int64, error) { +func (s *Store) cardinalityEstimate(dbName string, getSketches func(*Shard) (estimator.Sketch, estimator.Sketch, error)) (int64, error) { + var ( + ss estimator.Sketch // Sketch estimating number of items. + ts estimator.Sketch // Sketch estimating number of tombstoned items. + ) + s.mu.RLock() - shards := s.filterShards(byDatabase(database)) + shards := s.filterShards(byDatabase(dbName)) s.mu.RUnlock() - var sketch estimator.Sketch - // Iterate over all shards for the database and combine all of the series + // Iterate over all shards for the database and combine all of the sketches. // sketches. for _, shard := range shards { - other, err := shard.engine.SeriesSketch() + s, t, err := getSketches(shard) if err != nil { return 0, err } - if sketch == nil { - sketch = other - } else if err = sketch.Merge(other); err != nil { + if ss == nil { + ss, ts = s, t + } else if err = ss.Merge(s); err != nil { + return 0, err + } else if err = ts.Merge(t); err != nil { return 0, err } } - if sketch != nil { - cnt, err := sketch.Count() - return int64(cnt), err + if ss != nil { + pos, err := ss.Count() + if err != nil { + return 0, err + } + + neg, err := ts.Count() + if err != nil { + return 0, err + } + return int64(pos - neg), nil } return 0, nil } +// SeriesCardinality returns the series cardinality for the provided database. +func (s *Store) SeriesCardinality(database string) (int64, error) { + return s.cardinalityEstimate(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) { + return sh.engine.SeriesSketches() + }) +} + // MeasurementsCardinality returns the measurement cardinality for the provided // database. func (s *Store) MeasurementsCardinality(database string) (int64, error) { - panic("TODO: edd") + return s.cardinalityEstimate(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) { + return sh.engine.MeasurementsSketches() + }) } // BackupShard will get the shard and have the engine backup since the passed in diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 06ae2a68b6..778694e221 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -4,6 +4,8 @@ import ( "bytes" "fmt" "io/ioutil" + "math" + "math/rand" "os" "path/filepath" "reflect" @@ -406,6 +408,124 @@ func TestStore_BackupRestoreShard(t *testing.T) { } } +func TestStore_SeriesCardinality_Unique(t *testing.T) { + if testing.Short() { + t.Skip("Skipping test in short mode.") + } + + store := MustOpenStore() + defer store.Close() + + // Generate point data to write to the shards. + series := genTestSeries(64, 5, 5) // 200,000 series + expCardinality := len(series) + + points := make([]models.Point, 0, len(series)) + for _, s := range series { + points = append(points, models.MustNewPoint(s.Measurement, s.Series.Tags, map[string]interface{}{"value": 1.0}, time.Now())) + } + + // Create requested number of shards in the store & write points across + // shards such that we never write the same series to multiple shards. + for shardID := 0; shardID < 10; shardID++ { + if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { + t.Fatalf("create shard: %s", err) + } + if err := store.BatchWrite(shardID, points[shardID*20000:(shardID+1)*20000]); err != nil { + t.Fatalf("batch write: %s", err) + } + } + + // Estimate the series cardinality... + cardinality, err := store.Store.SeriesCardinality("db") + if err != nil { + t.Fatal(err) + } + + // Estimated cardinality should be well within 1.5% of the actual cardinality. + if got, exp := math.Abs(float64(cardinality)-float64(expCardinality))/float64(expCardinality), 0.015; got > exp { + t.Fatalf("got epsilon of %v for cardinality %d (expected %d), which is larger than expected %v", got, cardinality, expCardinality, exp) + } +} + +// This test tests cardinality estimation when series data is duplicated across +// multiple shards. +func TestStore_SeriesCardinality_Duplicates(t *testing.T) { + if testing.Short() { + t.Skip("Skipping test in short mode.") + } + + store := MustOpenStore() + defer store.Close() + + // Generate point data to write to the shards. + series := genTestSeries(64, 5, 5) // 200,000 series. + expCardinality := len(series) + + points := make([]models.Point, 0, len(series)) + for _, s := range series { + points = append(points, models.MustNewPoint(s.Measurement, s.Series.Tags, map[string]interface{}{"value": 1.0}, time.Now())) + } + + // Create requested number of shards in the store & write points. + for shardID := 0; shardID < 10; shardID++ { + if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { + t.Fatalf("create shard: %s", err) + } + + var from, to int + if shardID == 0 { + // if it's the first shard then write all of the points. + from, to = 0, len(points)-1 + } else { + // For other shards we write a random sub-section of all the points. + // which will duplicate the series and shouldn't increase the + // cardinality. + from, to := rand.Intn(len(points)), rand.Intn(len(points)) + if from > to { + from, to = to, from + } + } + + if err := store.BatchWrite(shardID, points[from:to]); err != nil { + t.Fatalf("batch write: %s", err) + } + } + + // Estimate the series cardinality... + cardinality, err := store.Store.SeriesCardinality("db") + if err != nil { + t.Fatal(err) + } + + // Estimated cardinality should be well within 1.5% of the actual cardinality. + if got, exp := math.Abs(float64(cardinality)-float64(expCardinality))/float64(expCardinality), 0.015; got > exp { + t.Fatalf("got epsilon of %v for cardinality %d (expected %d), which is larger than expected %v", got, cardinality, expCardinality, exp) + } +} + +func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) { + store := MustOpenStore() + defer store.Close() + + // Write a point to 100 shards. + for shardID := 0; shardID < 100; shardID++ { + if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { + b.Fatalf("create shard: %s", err) + } + + err := store.WriteToShard(uint64(shardID), []models.Point{models.MustNewPoint("cpu", nil, map[string]interface{}{"value": 1.0}, time.Now())}) + if err != nil { + b.Fatalf("write: %s", err) + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = store.SeriesCardinality("db") + } +} + func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(b, 64, 5, 5, 1, 100) } func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) { @@ -476,6 +596,12 @@ func NewStore() *Store { // MustOpenStore returns a new, open Store at a temporary path. func MustOpenStore() *Store { s := NewStore() + + // Quieten the logs. + if !testing.Verbose() { + s.SetLogOutput(ioutil.Discard) + } + if err := s.Open(); err != nil { panic(err) }