From d19fbf5ab42a79d043d3abab5b21a4fe5ae6108f Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 21 Sep 2016 16:04:37 +0100 Subject: [PATCH] Wire in HLL estimator --- pkg/estimator/hll.go | 88 +++++++++++++ pkg/murmur3/murmur128.go | 224 ++++++++++++++++++++++++++++++++ pkg/murmur3/murmur64.go | 60 +++++++++ pkg/murmur3/murmur64_test.go | 49 +++++++ tsdb/engine.go | 6 +- tsdb/engine/tsm1/engine.go | 18 ++- tsdb/engine/tsm1/engine_test.go | 20 ++- tsdb/index.go | 5 +- tsdb/meta.go | 54 ++++++-- tsdb/meta_test.go | 6 +- tsdb/shard.go | 31 +++-- tsdb/shard_test.go | 16 +-- tsdb/store.go | 27 +++- 13 files changed, 567 insertions(+), 37 deletions(-) create mode 100644 pkg/estimator/hll.go create mode 100644 pkg/murmur3/murmur128.go create mode 100644 pkg/murmur3/murmur64.go create mode 100644 pkg/murmur3/murmur64_test.go diff --git a/pkg/estimator/hll.go b/pkg/estimator/hll.go new file mode 100644 index 0000000000..194e4907c8 --- /dev/null +++ b/pkg/estimator/hll.go @@ -0,0 +1,88 @@ +package estimator + +import ( + "fmt" + "io" + + "github.com/clarkduvall/hyperloglog" + "github.com/influxdata/influxdb/pkg/murmur3" +) + +// Sketch is the interface representing a sketch for estimating cardinality. +type Sketch interface { + // Add adds a single value to the sketch. + Add(v []byte) error + + // Count returns a cardinality estimate for the sketch. + Count() (count uint64, err error) + + // ReadFrom implements the io.ReaderAt interface. + // + // Implementations of the ReadFrom method should ensure that values are + // streamed from the provided reader into the Sketch. + ReadFrom(r io.Reader) (n int64, err error) + + // Merge merges another sketch into this one. + Merge(s Sketch) error +} + +type HyperLogLogPlus struct { + hll *hyperloglog.HyperLogLogPlus +} + +func NewHyperLogLogPlus(precision uint8) (*HyperLogLogPlus, error) { + hll, err := hyperloglog.NewPlus(precision) + if err != nil { + return nil, err + } + + return &HyperLogLogPlus{hll: hll}, nil +} + +// hash64 implements the hyperloglog.Hash64 interface. +// See: https://godoc.org/github.com/clarkduvall/hyperloglog#Hash64 +type hash64 uint64 + +func (h hash64) Sum64() uint64 { + return uint64(h) +} + +func (s *HyperLogLogPlus) Add(v []byte) error { + hash := hash64(murmur3.Sum64(v)) + s.hll.Add(hash) + return nil +} + +func (s *HyperLogLogPlus) Count() (count uint64, err error) { return s.hll.Count(), nil } + +func (s *HyperLogLogPlus) ReadFrom(r io.Reader) (n int64, err error) { + var ( + m int + buf [4]byte + ) + + for err == nil { + if m, err = r.Read(buf[:]); err == nil { + if m < len(buf) { + err = fmt.Errorf("short read. Only read %d bytes", m) + } else { + n += int64(m) + err = s.Add(buf[:]) + } + } + } + + if err != io.EOF { + return 0, err + } + return n, nil +} + +func (s *HyperLogLogPlus) Merge(sketch Sketch) error { + other, ok := sketch.(*HyperLogLogPlus) + if !ok { + return fmt.Errorf("sketch is of type %T", sketch) + } + + return s.Merge(other) +} diff --git a/pkg/murmur3/murmur128.go b/pkg/murmur3/murmur128.go new file mode 100644 index 0000000000..e6451c315d --- /dev/null +++ b/pkg/murmur3/murmur128.go @@ -0,0 +1,224 @@ +package murmur3 + +// NOTE(edd): This code is adapted from Sébastien Paolacci's +// implementation of murmur3's 128-bit hash. It was adapted so that +// the direct murmur3.Hash128 implementation could be used directly +// instead of through the hash.Hash interface. + +import ( + "hash" + "unsafe" +) + +const ( + c1_128 = 0x87c37b91114253d5 + c2_128 = 0x4cf5ad432745937f +) + +// Make sure interfaces are correctly implemented. +var ( + _ hash.Hash = new(Hash128) +) + +// Hash128 represents a partial evaluation of a 128-bit hash. +// +// Hash128 should also be used when 64-bit hashes are required. In such a case, +// the first part of the running hash may be used. +type Hash128 struct { + clen int // Digested input cumulative length. + tail []byte // 0 to Size()-1 bytes view of `buf'. + buf [16]byte // Expected (but not required) to be Size() large. + + h1 uint64 // Unfinalized running hash part 1. + h2 uint64 // Unfinalized running hash part 2. +} + +// BlockSize returns the hash's underlying block size. +func (h *Hash128) BlockSize() int { return 1 } + +// Write (via the embedded io.Writer interface) adds more data to the running +// hash. +func (h *Hash128) Write(p []byte) (n int, err error) { + n = len(p) + h.clen += n + + if len(h.tail) > 0 { + // Stick back pending bytes. + nfree := h.Size() - len(h.tail) // nfree ∈ [1, h.Size()-1]. + if nfree < len(p) { + // One full block can be formeh. + block := append(h.tail, p[:nfree]...) + p = p[nfree:] + _ = h.bmix(block) // No tail. + } else { + // Tail's buf is large enough to prevent reallocs. + p = append(h.tail, p...) + } + } + + h.tail = h.bmix(p) + + // Keep own copy of the 0 to Size()-1 pending bytes. + nn := copy(h.buf[:], h.tail) + h.tail = h.buf[:nn] + + return n, nil +} + +// Reset resets the Hash to its initial state. +func (h *Hash128) Reset() { + h.clen = 0 + h.tail = nil + h.h1, h.h2 = 0, 0 +} + +// Size returns the size of the hash. +func (h *Hash128) Size() int { return 16 } + +// Sum appends the current hash to b and returns the resulting slice. +func (h *Hash128) Sum(b []byte) []byte { + h1, h2 := h.Sum128() + return append(b, + byte(h1>>56), byte(h1>>48), byte(h1>>40), byte(h1>>32), + byte(h1>>24), byte(h1>>16), byte(h1>>8), byte(h1), + + byte(h2>>56), byte(h2>>48), byte(h2>>40), byte(h2>>32), + byte(h2>>24), byte(h2>>16), byte(h2>>8), byte(h2), + ) +} + +func (h *Hash128) bmix(p []byte) (tail []byte) { + h1, h2 := h.h1, h.h2 + + nblocks := len(p) / 16 + for i := 0; i < nblocks; i++ { + t := (*[2]uint64)(unsafe.Pointer(&p[i*16])) + k1, k2 := t[0], t[1] + + k1 *= c1_128 + k1 = (k1 << 31) | (k1 >> 33) // rotl64(k1, 31) + k1 *= c2_128 + h1 ^= k1 + + h1 = (h1 << 27) | (h1 >> 37) // rotl64(h1, 27) + h1 += h2 + h1 = h1*5 + 0x52dce729 + + k2 *= c2_128 + k2 = (k2 << 33) | (k2 >> 31) // rotl64(k2, 33) + k2 *= c1_128 + h2 ^= k2 + + h2 = (h2 << 31) | (h2 >> 33) // rotl64(h2, 31) + h2 += h1 + h2 = h2*5 + 0x38495ab5 + } + h.h1, h.h2 = h1, h2 + return p[nblocks*h.Size():] +} + +// Sum128 returns the 128-bit current hash. +func (h *Hash128) Sum128() (h1, h2 uint64) { + + h1, h2 = h.h1, h.h2 + + var k1, k2 uint64 + switch len(h.tail) & 15 { + case 15: + k2 ^= uint64(h.tail[14]) << 48 + fallthrough + case 14: + k2 ^= uint64(h.tail[13]) << 40 + fallthrough + case 13: + k2 ^= uint64(h.tail[12]) << 32 + fallthrough + case 12: + k2 ^= uint64(h.tail[11]) << 24 + fallthrough + case 11: + k2 ^= uint64(h.tail[10]) << 16 + fallthrough + case 10: + k2 ^= uint64(h.tail[9]) << 8 + fallthrough + case 9: + k2 ^= uint64(h.tail[8]) << 0 + + k2 *= c2_128 + k2 = (k2 << 33) | (k2 >> 31) // rotl64(k2, 33) + k2 *= c1_128 + h2 ^= k2 + + fallthrough + + case 8: + k1 ^= uint64(h.tail[7]) << 56 + fallthrough + case 7: + k1 ^= uint64(h.tail[6]) << 48 + fallthrough + case 6: + k1 ^= uint64(h.tail[5]) << 40 + fallthrough + case 5: + k1 ^= uint64(h.tail[4]) << 32 + fallthrough + case 4: + k1 ^= uint64(h.tail[3]) << 24 + fallthrough + case 3: + k1 ^= uint64(h.tail[2]) << 16 + fallthrough + case 2: + k1 ^= uint64(h.tail[1]) << 8 + fallthrough + case 1: + k1 ^= uint64(h.tail[0]) << 0 + k1 *= c1_128 + k1 = (k1 << 31) | (k1 >> 33) // rotl64(k1, 31) + k1 *= c2_128 + h1 ^= k1 + } + + h1 ^= uint64(h.clen) + h2 ^= uint64(h.clen) + + h1 += h2 + h2 += h1 + + h1 = fmix64(h1) + h2 = fmix64(h2) + + h1 += h2 + h2 += h1 + + return h1, h2 +} + +func fmix64(k uint64) uint64 { + k ^= k >> 33 + k *= 0xff51afd7ed558ccd + k ^= k >> 33 + k *= 0xc4ceb9fe1a85ec53 + k ^= k >> 33 + return k +} + +/* +func rotl64(x uint64, r byte) uint64 { + return (x << r) | (x >> (64 - r)) +} +*/ + +// Sum128 returns the MurmurHash3 sum of data. It is equivalent to the +// following sequence (without the extra burden and the extra allocation): +// hasher := New128() +// hasher.Write(data) +// return hasher.Sum128() +func Sum128(data []byte) (h1 uint64, h2 uint64) { + h := new(Hash128) + h.tail = h.bmix(data) + h.clen = len(data) + return h.Sum128() +} diff --git a/pkg/murmur3/murmur64.go b/pkg/murmur3/murmur64.go new file mode 100644 index 0000000000..0db482dccd --- /dev/null +++ b/pkg/murmur3/murmur64.go @@ -0,0 +1,60 @@ +package murmur3 + +// NOTE(edd): This code is adapted from Sébastien Paolacci's +// implementation of murmur3's 64-bit hash. It was adapted so that +// the direct murmur3.Hash64 implementation could be used directly +// instead of through the hash.Hash interface. + +import ( + "hash" +) + +// Make sure interfaces are correctly implemented. +var ( + _ hash.Hash = new(Hash64) + _ hash.Hash64 = new(Hash64) +) + +// Hash64 is half a Hash128. +type Hash64 Hash128 + +// BlockSize returns the hash's underlying block size. +func (h *Hash64) BlockSize() int { return 1 } + +// Reset resets the Hash to its initial state. +func (h *Hash64) Reset() { + (*Hash128)(h).Reset() +} + +// Size returns the size of the hash. +func (h *Hash64) Size() int { return 8 } + +// Write (via the embedded io.Writer interface) adds more data to the running +// hash. +func (h *Hash64) Write(p []byte) (n int, err error) { + return (*Hash128)(h).Write(p) +} + +// Sum appends the current hash to b and returns the resulting slice. +func (h *Hash64) Sum(b []byte) []byte { + h1 := h.Sum64() + return append(b, + byte(h1>>56), byte(h1>>48), byte(h1>>40), byte(h1>>32), + byte(h1>>24), byte(h1>>16), byte(h1>>8), byte(h1)) +} + +// Sum64 returns the 64-bit current hash. +func (h *Hash64) Sum64() uint64 { + h1, _ := (*Hash128)(h).Sum128() + return h1 +} + +// Sum64 returns the MurmurHash3 sum of data. It is equivalent to the +// following sequence (without the extra burden and the extra allocation): +// hasher := New128() +// hasher.Write(data) +// return hasher.Sum64() +func Sum64(data []byte) uint64 { + h1, _ := Sum128(data) + return h1 +} diff --git a/pkg/murmur3/murmur64_test.go b/pkg/murmur3/murmur64_test.go new file mode 100644 index 0000000000..1b751ee839 --- /dev/null +++ b/pkg/murmur3/murmur64_test.go @@ -0,0 +1,49 @@ +package murmur3_test + +import ( + "crypto/rand" + + "github.com/cespare/xxhash" + + // "github.com/OneOfOne/xxhash" + // "github.com/spaolacci/murmur3" + + "testing" +) + +func benchmark(b *testing.B, hashFn func([]byte) uint64, n int) { + // generate random data. + var buf = make([]byte, n) + if m, err := rand.Read(buf); err != nil { + b.Fatal(err) + } else if m != n { + b.Fatalf("only wrote %d bytes to buffer of size %d", m, n) + } + + b.ReportAllocs() + b.ResetTimer() + var bv uint64 + for i := 0; i < b.N; i++ { + bv += hashFn(buf) + } +} + +var ( + // hashFn = xxhash.Checksum64 + hashFn = xxhash.Sum64 + // hashFn = murmur3.Sum64 +) + +func BenchmarkHash_5(b *testing.B) { benchmark(b, hashFn, 5) } +func BenchmarkHash_10(b *testing.B) { benchmark(b, hashFn, 10) } +func BenchmarkHash_15(b *testing.B) { benchmark(b, hashFn, 15) } +func BenchmarkHash_20(b *testing.B) { benchmark(b, hashFn, 20) } +func BenchmarkHash_25(b *testing.B) { benchmark(b, hashFn, 25) } +func BenchmarkHash_30(b *testing.B) { benchmark(b, hashFn, 30) } +func BenchmarkHash_35(b *testing.B) { benchmark(b, hashFn, 35) } +func BenchmarkHash_40(b *testing.B) { benchmark(b, hashFn, 40) } +func BenchmarkHash_45(b *testing.B) { benchmark(b, hashFn, 45) } +func BenchmarkHash_50(b *testing.B) { benchmark(b, hashFn, 50) } +func BenchmarkHash_100(b *testing.B) { benchmark(b, hashFn, 100) } +func BenchmarkHash_250(b *testing.B) { benchmark(b, hashFn, 250) } +func BenchmarkHash_500(b *testing.B) { benchmark(b, hashFn, 500) } diff --git a/tsdb/engine.go b/tsdb/engine.go index 2833cd2b00..5bf9bff328 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/estimator" "go.uber.org/zap" ) @@ -43,7 +44,10 @@ type Engine interface { CreateSeries(measurment string, series *Series) (*Series, error) DeleteSeriesRange(keys []string, min, max int64) error Series(key string) (*Series, error) - SeriesCardinality() (n int64, err error) + + SeriesN() (uint64, error) + SeriesSketch() (estimator.Sketch, error) + MeasurementsSketch() (estimator.Sketch, error) CreateMeasurement(name string) (*Measurement, error) DeleteMeasurement(name string, seriesKeys []string) error diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index dd7149ac55..6d69331dba 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -20,6 +20,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/tsdb" "go.uber.org/zap" ) @@ -320,8 +321,16 @@ func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields { return m } -func (e *Engine) SeriesCardinality() (int64, error) { - panic("TODO: edd") +func (e *Engine) SeriesN() (uint64, error) { + return e.index.SeriesN() +} + +func (e *Engine) SeriesSketch() (estimator.Sketch, error) { + return e.index.SeriesSketch() +} + +func (e *Engine) MeasurementsSketch() (estimator.Sketch, error) { + return e.index.MeasurementsSketch() } // EngineStatistics maintains statistics for the engine. @@ -460,6 +469,11 @@ func (e *Engine) WithLogger(log zap.Logger) { func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) error { now := time.Now() + // Open the index if it's not already open. + if err := index.Open(); err != nil { + return err + } + // Save reference to index for iterator creation. e.index = index e.FileStore.dereferencer = index diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 736f609760..6d1ee5cb67 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -38,7 +38,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { } // Load metadata index. - index := tsdb.NewDatabaseIndex("db") + index := MustNewDatabaseIndex("db") if err := e.LoadMetadataIndex(1, index); err != nil { t.Fatal(err) } @@ -64,7 +64,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { } // Load metadata index. - index = tsdb.NewDatabaseIndex("db") + index = MustNewDatabaseIndex("db") if err := e.LoadMetadataIndex(1, index); err != nil { t.Fatal(err) } @@ -91,7 +91,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { } // Load metadata index. - index = tsdb.NewDatabaseIndex("db") + index = MustNewDatabaseIndex("db") if err := e.LoadMetadataIndex(1, index); err != nil { t.Fatal(err) } @@ -571,7 +571,7 @@ func TestEngine_DeleteSeries(t *testing.T) { // Write those points to the engine. e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine) - e.LoadMetadataIndex(1, tsdb.NewDatabaseIndex("db0")) // Initialise an index + e.LoadMetadataIndex(1, MustNewDatabaseIndex("db0")) // Initialise an index // mock the planner so compactions don't run during the test e.CompactionPlan = &mockPlanner{} @@ -964,7 +964,7 @@ func MustOpenEngine() *Engine { if err := e.Open(); err != nil { panic(err) } - if err := e.LoadMetadataIndex(1, tsdb.NewDatabaseIndex("db")); err != nil { + if err := e.LoadMetadataIndex(1, MustNewDatabaseIndex("db")); err != nil { panic(err) } return e @@ -1010,6 +1010,16 @@ func (e *Engine) MustMeasurement(name string) *tsdb.Measurement { return m } +// MustNewDatabaseIndex creates a tsdb.DatabaseIndex, panicking if there is an +// error doing do. +func MustNewDatabaseIndex(name string) *tsdb.DatabaseIndex { + index, err := tsdb.NewDatabaseIndex(name) + if err != nil { + panic(err) + } + return index +} + // WritePointsString parses a string buffer and writes the points. func (e *Engine) WritePointsString(buf ...string) error { return e.WritePoints(MustParsePointsString(strings.Join(buf, "\n"))) diff --git a/tsdb/index.go b/tsdb/index.go index d27e086a80..48646f09ba 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -5,6 +5,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/estimator" ) type Index interface { @@ -23,7 +24,9 @@ type Index interface { Series(key string) (*Series, error) DropSeries(keys []string) error - SeriesN() (int64, error) + SeriesN() (uint64, error) + SeriesSketch() (estimator.Sketch, error) + MeasurementsSketch() (estimator.Sketch, error) Statistics(tags map[string]string) []models.Statistic TagsForSeries(key string) (models.Tags, error) diff --git a/tsdb/meta.go b/tsdb/meta.go index f25218ade6..f824ecce4d 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/escape" + "github.com/influxdata/influxdb/pkg/estimator" internal "github.com/influxdata/influxdb/tsdb/internal" "github.com/gogo/protobuf/proto" @@ -33,6 +34,9 @@ type DatabaseIndex struct { series map[string]*Series // map series key to the Series object lastID uint64 // last used series ID. They're in memory only for this shard + seriesSketch *estimator.HyperLogLogPlus + measurementsSketch *estimator.HyperLogLogPlus + name string // name of the database represented by this index stats *IndexStatistics @@ -40,18 +44,26 @@ type DatabaseIndex struct { } // NewDatabaseIndex returns a new initialized DatabaseIndex. -func NewDatabaseIndex(name string) *DatabaseIndex { - return &DatabaseIndex{ +func NewDatabaseIndex(name string) (index *DatabaseIndex, err error) { + index = &DatabaseIndex{ measurements: make(map[string]*Measurement), series: make(map[string]*Series), name: name, stats: &IndexStatistics{}, defaultTags: models.StatisticTags{"database": name}, } + if index.seriesSketch, err = estimator.NewHyperLogLogPlus(14); err != nil { + return nil, err + } + + if index.measurementsSketch, err = estimator.NewHyperLogLogPlus(14); err != nil { + return nil, err + } + return index, nil } -func (d *DatabaseIndex) Open() error { return nil } -func (d *DatabaseIndex) Close() error { return nil } +func (d *DatabaseIndex) Open() (err error) { return nil } +func (d *DatabaseIndex) Close() error { return nil } // IndexStatistics maintains statistics for the index. type IndexStatistics struct { @@ -79,11 +91,18 @@ func (d *DatabaseIndex) Series(key string) (*Series, error) { return s, nil } -// SeriesN returns the number of series. -func (d *DatabaseIndex) SeriesN() (int64, error) { +// SeriesN returns the exact number of series in the index. +func (d *DatabaseIndex) SeriesN() (uint64, error) { d.mu.RLock() defer d.mu.RUnlock() - return int64(len(d.series)), nil + return uint64(len(d.series)), nil +} + +// SeriesSketch returns the sketch for the series. +func (d *DatabaseIndex) SeriesSketch() (estimator.Sketch, error) { + d.mu.RLock() + defer d.mu.RUnlock() + return d.seriesSketch, nil } // Measurement returns the measurement object from the index by the name @@ -93,6 +112,13 @@ func (d *DatabaseIndex) Measurement(name string) (*Measurement, error) { return d.measurements[name], nil } +// MeasurementsSketch returns the sketch for the series. +func (d *DatabaseIndex) MeasurementsSketch() (estimator.Sketch, error) { + d.mu.RLock() + defer d.mu.RUnlock() + return d.measurementsSketch, nil +} + // MeasurementsByName returns a list of measurements. func (d *DatabaseIndex) MeasurementsByName(names []string) ([]*Measurement, error) { d.mu.RLock() @@ -142,7 +168,10 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser d.mu.RUnlock() // get or create the measurement index - m, _ := d.CreateMeasurementIndexIfNotExists(measurementName) + m, err := d.CreateMeasurementIndexIfNotExists(measurementName) + if err != nil { + return nil, err + } d.mu.Lock() // Check for the series again under a write lock @@ -161,6 +190,10 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser m.AddSeries(series) + // Add the series to the series sketch. + if err := d.seriesSketch.Add([]byte(series.Key)); err != nil { + return nil, err + } atomic.AddInt64(&d.stats.NumSeries, 1) d.mu.Unlock() @@ -191,6 +224,11 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) (*Measure if m == nil { m = NewMeasurement(name) d.measurements[name] = m + + // Add the measurement to the measurements sketch. + if err := d.measurementsSketch.Add([]byte(name)); err != nil { + return nil, err + } atomic.AddInt64(&d.stats.NumMeasurements, 1) } return m, nil diff --git a/tsdb/meta_test.go b/tsdb/meta_test.go index d3a33b8d96..09bfe5dfda 100644 --- a/tsdb/meta_test.go +++ b/tsdb/meta_test.go @@ -248,7 +248,11 @@ func BenchmarkCreateSeriesIndex_1M(b *testing.B) { func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) { idxs := make([]*tsdb.DatabaseIndex, 0, b.N) for i := 0; i < b.N; i++ { - idxs = append(idxs, tsdb.NewDatabaseIndex(fmt.Sprintf("db%d", i))) + index, err := tsdb.NewDatabaseIndex(fmt.Sprintf("db%d", i)) + if err != nil { + b.Fatal(err) + } + idxs = append(idxs, index) } b.ResetTimer() diff --git a/tsdb/shard.go b/tsdb/shard.go index 19141e3b53..f8cfe86012 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -201,7 +201,12 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { return nil } - seriesN, _ := s.engine.SeriesCardinality() + seriesN, err := s.engine.SeriesN() + if err != nil { + s.logger.Print(err) + seriesN = 0 + } + tags = s.defaultTags.Merge(tags) statistics := []models.Statistic{{ Name: "shard", @@ -258,17 +263,23 @@ func (s *Shard) Open() error { // Load metadata index. start := time.Now() - if err := e.LoadMetadataIndex(s.id, NewDatabaseIndex(s.database)); err != nil { + index, err := NewDatabaseIndex(s.database) + if err != nil { + return err + } + + if err := e.LoadMetadataIndex(s.id, index); err != nil { return err } s.engine = e - count, err := s.engine.SeriesCardinality() + seriesN, err := s.engine.SeriesN() if err != nil { return err } - atomic.AddInt64(&s.stats.SeriesCreated, int64(count)) + // Store statistic of exact number of series in shard. + atomic.AddInt64(&s.stats.SeriesCreated, int64(seriesN)) s.logger.Info(fmt.Sprintf("%s database index loaded in %s", s.path, time.Now().Sub(start))) @@ -566,15 +577,15 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, } if ss == nil { - cnt, err := s.engine.SeriesN() + sn, err := s.engine.SeriesN() if err != nil { return nil, nil, err } - if s.options.Config.MaxSeriesPerDatabase > 0 && cnt+1 > int64(s.options.Config.MaxSeriesPerDatabase) { + if s.options.Config.MaxSeriesPerDatabase > 0 && sn+1 > int64(s.options.Config.MaxSeriesPerDatabase) { atomic.AddInt64(&s.stats.WritePointsDropped, 1) dropped++ - reason = fmt.Sprintf("db %s max series limit reached: (%d/%d)", s.database, cnt, s.options.Config.MaxSeriesPerDatabase) + reason = fmt.Sprintf("db %s max series limit reached: (%d/%d)", s.database, sn, s.options.Config.MaxSeriesPerDatabase) continue } @@ -667,12 +678,12 @@ func (s *Shard) MeasurementsByExpr(cond influxql.Expr) (Measurements, bool, erro return s.engine.MeasurementsByExpr(cond) } -// SeriesCardinality returns the number of series buckets on the shard. -func (s *Shard) SeriesCardinality() (int64, error) { +// SeriesN returns the exact number of series in the shard. +func (s *Shard) SeriesN() (uint64, error) { if err := s.ready(); err != nil { return 0, err } - return s.engine.SeriesCardinality() + return s.engine.SeriesN() } // Series returns a series by key. diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index d75a86e733..453e44e442 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -65,12 +65,12 @@ func TestShardWriteAndIndex(t *testing.T) { } validateIndex := func() { - cnt, err := sh.SeriesCardinality() + cnt, err := sh.SeriesN() if err != nil { t.Fatal(err) } - if got, exp := cnt, int64(1); got != exp { + if got, exp := cnt, uint64(1); got != exp { t.Fatalf("got %v series, exp %v series in index", got, exp) } @@ -346,11 +346,11 @@ func TestShardWriteAddNewField(t *testing.T) { t.Fatalf(err.Error()) } - cnt, err := sh.SeriesCardinality() + cnt, err := sh.SeriesN() if err != nil { t.Fatal(err) } - if got, exp := cnt, int64(1); got != exp { + if got, exp := cnt, uint64(1); got != exp { t.Fatalf("got %d series, exp %d series in index", got, exp) } @@ -470,11 +470,11 @@ func TestShard_Close_RemoveIndex(t *testing.T) { t.Fatalf(err.Error()) } - cnt, err := sh.SeriesCardinality() + cnt, err := sh.SeriesN() if err != nil { t.Fatal(err) } - if got, exp := cnt, int64(1); got != exp { + if got, exp := cnt, uint64(1); got != exp { t.Fatalf("got %d series, exp %d series in index", got, exp) } @@ -482,10 +482,10 @@ func TestShard_Close_RemoveIndex(t *testing.T) { sh.Close() sh.Open() - if cnt, err = sh.SeriesCardinality(); err != nil { + if cnt, err = sh.SeriesN(); err != nil { t.Fatal(err) } - if got, exp := cnt, int64(1); got != exp { + if got, exp := cnt, uint64(1); got != exp { t.Fatalf("got %d series, exp %d series in index", got, exp) } diff --git a/tsdb/store.go b/tsdb/store.go index 9de5022015..0fdd494974 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/limiter" "go.uber.org/zap" ) @@ -554,7 +555,31 @@ func (s *Store) DiskSize() (int64, error) { // SeriesCardinality returns the series cardinality for the provided database. func (s *Store) SeriesCardinality(database string) (int64, error) { - panic("TODO: edd") + s.mu.RLock() + shards := s.filterShards(byDatabase(database)) + s.mu.RUnlock() + + var sketch estimator.Sketch + // Iterate over all shards for the database and combine all of the series + // sketches. + for _, shard := range shards { + other, err := shard.engine.SeriesSketch() + if err != nil { + return 0, err + } + + if sketch == nil { + sketch = other + } else if err = sketch.Merge(other); err != nil { + return 0, err + } + } + + if sketch != nil { + cnt, err := sketch.Count() + return int64(cnt), err + } + return 0, nil } // MeasurementsCardinality returns the measurement cardinality for the provided