Wire in HLL estimator

pull/7913/head
Edd Robinson 2016-09-21 16:04:37 +01:00 committed by Ben Johnson
parent 2b8efefef4
commit d19fbf5ab4
No known key found for this signature in database
GPG Key ID: 81741CD251883081
13 changed files with 567 additions and 37 deletions

88
pkg/estimator/hll.go Normal file
View File

@ -0,0 +1,88 @@
package estimator
import (
"fmt"
"io"
"github.com/clarkduvall/hyperloglog"
"github.com/influxdata/influxdb/pkg/murmur3"
)
// Sketch is the interface representing a sketch for estimating cardinality.
type Sketch interface {
// Add adds a single value to the sketch.
Add(v []byte) error
// Count returns a cardinality estimate for the sketch.
Count() (count uint64, err error)
// ReadFrom implements the io.ReaderAt interface.
//
// Implementations of the ReadFrom method should ensure that values are
// streamed from the provided reader into the Sketch.
ReadFrom(r io.Reader) (n int64, err error)
// Merge merges another sketch into this one.
Merge(s Sketch) error
}
type HyperLogLogPlus struct {
hll *hyperloglog.HyperLogLogPlus
}
func NewHyperLogLogPlus(precision uint8) (*HyperLogLogPlus, error) {
hll, err := hyperloglog.NewPlus(precision)
if err != nil {
return nil, err
}
return &HyperLogLogPlus{hll: hll}, nil
}
// hash64 implements the hyperloglog.Hash64 interface.
// See: https://godoc.org/github.com/clarkduvall/hyperloglog#Hash64
type hash64 uint64
func (h hash64) Sum64() uint64 {
return uint64(h)
}
func (s *HyperLogLogPlus) Add(v []byte) error {
hash := hash64(murmur3.Sum64(v))
s.hll.Add(hash)
return nil
}
func (s *HyperLogLogPlus) Count() (count uint64, err error) { return s.hll.Count(), nil }
func (s *HyperLogLogPlus) ReadFrom(r io.Reader) (n int64, err error) {
var (
m int
buf [4]byte
)
for err == nil {
if m, err = r.Read(buf[:]); err == nil {
if m < len(buf) {
err = fmt.Errorf("short read. Only read %d bytes", m)
} else {
n += int64(m)
err = s.Add(buf[:])
}
}
}
if err != io.EOF {
return 0, err
}
return n, nil
}
func (s *HyperLogLogPlus) Merge(sketch Sketch) error {
other, ok := sketch.(*HyperLogLogPlus)
if !ok {
return fmt.Errorf("sketch is of type %T", sketch)
}
return s.Merge(other)
}

224
pkg/murmur3/murmur128.go Normal file
View File

@ -0,0 +1,224 @@
package murmur3
// NOTE(edd): This code is adapted from Sébastien Paolacci's
// implementation of murmur3's 128-bit hash. It was adapted so that
// the direct murmur3.Hash128 implementation could be used directly
// instead of through the hash.Hash interface.
import (
"hash"
"unsafe"
)
const (
c1_128 = 0x87c37b91114253d5
c2_128 = 0x4cf5ad432745937f
)
// Make sure interfaces are correctly implemented.
var (
_ hash.Hash = new(Hash128)
)
// Hash128 represents a partial evaluation of a 128-bit hash.
//
// Hash128 should also be used when 64-bit hashes are required. In such a case,
// the first part of the running hash may be used.
type Hash128 struct {
clen int // Digested input cumulative length.
tail []byte // 0 to Size()-1 bytes view of `buf'.
buf [16]byte // Expected (but not required) to be Size() large.
h1 uint64 // Unfinalized running hash part 1.
h2 uint64 // Unfinalized running hash part 2.
}
// BlockSize returns the hash's underlying block size.
func (h *Hash128) BlockSize() int { return 1 }
// Write (via the embedded io.Writer interface) adds more data to the running
// hash.
func (h *Hash128) Write(p []byte) (n int, err error) {
n = len(p)
h.clen += n
if len(h.tail) > 0 {
// Stick back pending bytes.
nfree := h.Size() - len(h.tail) // nfree ∈ [1, h.Size()-1].
if nfree < len(p) {
// One full block can be formeh.
block := append(h.tail, p[:nfree]...)
p = p[nfree:]
_ = h.bmix(block) // No tail.
} else {
// Tail's buf is large enough to prevent reallocs.
p = append(h.tail, p...)
}
}
h.tail = h.bmix(p)
// Keep own copy of the 0 to Size()-1 pending bytes.
nn := copy(h.buf[:], h.tail)
h.tail = h.buf[:nn]
return n, nil
}
// Reset resets the Hash to its initial state.
func (h *Hash128) Reset() {
h.clen = 0
h.tail = nil
h.h1, h.h2 = 0, 0
}
// Size returns the size of the hash.
func (h *Hash128) Size() int { return 16 }
// Sum appends the current hash to b and returns the resulting slice.
func (h *Hash128) Sum(b []byte) []byte {
h1, h2 := h.Sum128()
return append(b,
byte(h1>>56), byte(h1>>48), byte(h1>>40), byte(h1>>32),
byte(h1>>24), byte(h1>>16), byte(h1>>8), byte(h1),
byte(h2>>56), byte(h2>>48), byte(h2>>40), byte(h2>>32),
byte(h2>>24), byte(h2>>16), byte(h2>>8), byte(h2),
)
}
func (h *Hash128) bmix(p []byte) (tail []byte) {
h1, h2 := h.h1, h.h2
nblocks := len(p) / 16
for i := 0; i < nblocks; i++ {
t := (*[2]uint64)(unsafe.Pointer(&p[i*16]))
k1, k2 := t[0], t[1]
k1 *= c1_128
k1 = (k1 << 31) | (k1 >> 33) // rotl64(k1, 31)
k1 *= c2_128
h1 ^= k1
h1 = (h1 << 27) | (h1 >> 37) // rotl64(h1, 27)
h1 += h2
h1 = h1*5 + 0x52dce729
k2 *= c2_128
k2 = (k2 << 33) | (k2 >> 31) // rotl64(k2, 33)
k2 *= c1_128
h2 ^= k2
h2 = (h2 << 31) | (h2 >> 33) // rotl64(h2, 31)
h2 += h1
h2 = h2*5 + 0x38495ab5
}
h.h1, h.h2 = h1, h2
return p[nblocks*h.Size():]
}
// Sum128 returns the 128-bit current hash.
func (h *Hash128) Sum128() (h1, h2 uint64) {
h1, h2 = h.h1, h.h2
var k1, k2 uint64
switch len(h.tail) & 15 {
case 15:
k2 ^= uint64(h.tail[14]) << 48
fallthrough
case 14:
k2 ^= uint64(h.tail[13]) << 40
fallthrough
case 13:
k2 ^= uint64(h.tail[12]) << 32
fallthrough
case 12:
k2 ^= uint64(h.tail[11]) << 24
fallthrough
case 11:
k2 ^= uint64(h.tail[10]) << 16
fallthrough
case 10:
k2 ^= uint64(h.tail[9]) << 8
fallthrough
case 9:
k2 ^= uint64(h.tail[8]) << 0
k2 *= c2_128
k2 = (k2 << 33) | (k2 >> 31) // rotl64(k2, 33)
k2 *= c1_128
h2 ^= k2
fallthrough
case 8:
k1 ^= uint64(h.tail[7]) << 56
fallthrough
case 7:
k1 ^= uint64(h.tail[6]) << 48
fallthrough
case 6:
k1 ^= uint64(h.tail[5]) << 40
fallthrough
case 5:
k1 ^= uint64(h.tail[4]) << 32
fallthrough
case 4:
k1 ^= uint64(h.tail[3]) << 24
fallthrough
case 3:
k1 ^= uint64(h.tail[2]) << 16
fallthrough
case 2:
k1 ^= uint64(h.tail[1]) << 8
fallthrough
case 1:
k1 ^= uint64(h.tail[0]) << 0
k1 *= c1_128
k1 = (k1 << 31) | (k1 >> 33) // rotl64(k1, 31)
k1 *= c2_128
h1 ^= k1
}
h1 ^= uint64(h.clen)
h2 ^= uint64(h.clen)
h1 += h2
h2 += h1
h1 = fmix64(h1)
h2 = fmix64(h2)
h1 += h2
h2 += h1
return h1, h2
}
func fmix64(k uint64) uint64 {
k ^= k >> 33
k *= 0xff51afd7ed558ccd
k ^= k >> 33
k *= 0xc4ceb9fe1a85ec53
k ^= k >> 33
return k
}
/*
func rotl64(x uint64, r byte) uint64 {
return (x << r) | (x >> (64 - r))
}
*/
// Sum128 returns the MurmurHash3 sum of data. It is equivalent to the
// following sequence (without the extra burden and the extra allocation):
// hasher := New128()
// hasher.Write(data)
// return hasher.Sum128()
func Sum128(data []byte) (h1 uint64, h2 uint64) {
h := new(Hash128)
h.tail = h.bmix(data)
h.clen = len(data)
return h.Sum128()
}

60
pkg/murmur3/murmur64.go Normal file
View File

@ -0,0 +1,60 @@
package murmur3
// NOTE(edd): This code is adapted from Sébastien Paolacci's
// implementation of murmur3's 64-bit hash. It was adapted so that
// the direct murmur3.Hash64 implementation could be used directly
// instead of through the hash.Hash interface.
import (
"hash"
)
// Make sure interfaces are correctly implemented.
var (
_ hash.Hash = new(Hash64)
_ hash.Hash64 = new(Hash64)
)
// Hash64 is half a Hash128.
type Hash64 Hash128
// BlockSize returns the hash's underlying block size.
func (h *Hash64) BlockSize() int { return 1 }
// Reset resets the Hash to its initial state.
func (h *Hash64) Reset() {
(*Hash128)(h).Reset()
}
// Size returns the size of the hash.
func (h *Hash64) Size() int { return 8 }
// Write (via the embedded io.Writer interface) adds more data to the running
// hash.
func (h *Hash64) Write(p []byte) (n int, err error) {
return (*Hash128)(h).Write(p)
}
// Sum appends the current hash to b and returns the resulting slice.
func (h *Hash64) Sum(b []byte) []byte {
h1 := h.Sum64()
return append(b,
byte(h1>>56), byte(h1>>48), byte(h1>>40), byte(h1>>32),
byte(h1>>24), byte(h1>>16), byte(h1>>8), byte(h1))
}
// Sum64 returns the 64-bit current hash.
func (h *Hash64) Sum64() uint64 {
h1, _ := (*Hash128)(h).Sum128()
return h1
}
// Sum64 returns the MurmurHash3 sum of data. It is equivalent to the
// following sequence (without the extra burden and the extra allocation):
// hasher := New128()
// hasher.Write(data)
// return hasher.Sum64()
func Sum64(data []byte) uint64 {
h1, _ := Sum128(data)
return h1
}

View File

@ -0,0 +1,49 @@
package murmur3_test
import (
"crypto/rand"
"github.com/cespare/xxhash"
// "github.com/OneOfOne/xxhash"
// "github.com/spaolacci/murmur3"
"testing"
)
func benchmark(b *testing.B, hashFn func([]byte) uint64, n int) {
// generate random data.
var buf = make([]byte, n)
if m, err := rand.Read(buf); err != nil {
b.Fatal(err)
} else if m != n {
b.Fatalf("only wrote %d bytes to buffer of size %d", m, n)
}
b.ReportAllocs()
b.ResetTimer()
var bv uint64
for i := 0; i < b.N; i++ {
bv += hashFn(buf)
}
}
var (
// hashFn = xxhash.Checksum64
hashFn = xxhash.Sum64
// hashFn = murmur3.Sum64
)
func BenchmarkHash_5(b *testing.B) { benchmark(b, hashFn, 5) }
func BenchmarkHash_10(b *testing.B) { benchmark(b, hashFn, 10) }
func BenchmarkHash_15(b *testing.B) { benchmark(b, hashFn, 15) }
func BenchmarkHash_20(b *testing.B) { benchmark(b, hashFn, 20) }
func BenchmarkHash_25(b *testing.B) { benchmark(b, hashFn, 25) }
func BenchmarkHash_30(b *testing.B) { benchmark(b, hashFn, 30) }
func BenchmarkHash_35(b *testing.B) { benchmark(b, hashFn, 35) }
func BenchmarkHash_40(b *testing.B) { benchmark(b, hashFn, 40) }
func BenchmarkHash_45(b *testing.B) { benchmark(b, hashFn, 45) }
func BenchmarkHash_50(b *testing.B) { benchmark(b, hashFn, 50) }
func BenchmarkHash_100(b *testing.B) { benchmark(b, hashFn, 100) }
func BenchmarkHash_250(b *testing.B) { benchmark(b, hashFn, 250) }
func BenchmarkHash_500(b *testing.B) { benchmark(b, hashFn, 500) }

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"go.uber.org/zap"
)
@ -43,7 +44,10 @@ type Engine interface {
CreateSeries(measurment string, series *Series) (*Series, error)
DeleteSeriesRange(keys []string, min, max int64) error
Series(key string) (*Series, error)
SeriesCardinality() (n int64, err error)
SeriesN() (uint64, error)
SeriesSketch() (estimator.Sketch, error)
MeasurementsSketch() (estimator.Sketch, error)
CreateMeasurement(name string) (*Measurement, error)
DeleteMeasurement(name string, seriesKeys []string) error

View File

@ -20,6 +20,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/tsdb"
"go.uber.org/zap"
)
@ -320,8 +321,16 @@ func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields {
return m
}
func (e *Engine) SeriesCardinality() (int64, error) {
panic("TODO: edd")
func (e *Engine) SeriesN() (uint64, error) {
return e.index.SeriesN()
}
func (e *Engine) SeriesSketch() (estimator.Sketch, error) {
return e.index.SeriesSketch()
}
func (e *Engine) MeasurementsSketch() (estimator.Sketch, error) {
return e.index.MeasurementsSketch()
}
// EngineStatistics maintains statistics for the engine.
@ -460,6 +469,11 @@ func (e *Engine) WithLogger(log zap.Logger) {
func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) error {
now := time.Now()
// Open the index if it's not already open.
if err := index.Open(); err != nil {
return err
}
// Save reference to index for iterator creation.
e.index = index
e.FileStore.dereferencer = index

View File

@ -38,7 +38,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
}
// Load metadata index.
index := tsdb.NewDatabaseIndex("db")
index := MustNewDatabaseIndex("db")
if err := e.LoadMetadataIndex(1, index); err != nil {
t.Fatal(err)
}
@ -64,7 +64,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
}
// Load metadata index.
index = tsdb.NewDatabaseIndex("db")
index = MustNewDatabaseIndex("db")
if err := e.LoadMetadataIndex(1, index); err != nil {
t.Fatal(err)
}
@ -91,7 +91,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
}
// Load metadata index.
index = tsdb.NewDatabaseIndex("db")
index = MustNewDatabaseIndex("db")
if err := e.LoadMetadataIndex(1, index); err != nil {
t.Fatal(err)
}
@ -571,7 +571,7 @@ func TestEngine_DeleteSeries(t *testing.T) {
// Write those points to the engine.
e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
e.LoadMetadataIndex(1, tsdb.NewDatabaseIndex("db0")) // Initialise an index
e.LoadMetadataIndex(1, MustNewDatabaseIndex("db0")) // Initialise an index
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
@ -964,7 +964,7 @@ func MustOpenEngine() *Engine {
if err := e.Open(); err != nil {
panic(err)
}
if err := e.LoadMetadataIndex(1, tsdb.NewDatabaseIndex("db")); err != nil {
if err := e.LoadMetadataIndex(1, MustNewDatabaseIndex("db")); err != nil {
panic(err)
}
return e
@ -1010,6 +1010,16 @@ func (e *Engine) MustMeasurement(name string) *tsdb.Measurement {
return m
}
// MustNewDatabaseIndex creates a tsdb.DatabaseIndex, panicking if there is an
// error doing do.
func MustNewDatabaseIndex(name string) *tsdb.DatabaseIndex {
index, err := tsdb.NewDatabaseIndex(name)
if err != nil {
panic(err)
}
return index
}
// WritePointsString parses a string buffer and writes the points.
func (e *Engine) WritePointsString(buf ...string) error {
return e.WritePoints(MustParsePointsString(strings.Join(buf, "\n")))

View File

@ -5,6 +5,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
)
type Index interface {
@ -23,7 +24,9 @@ type Index interface {
Series(key string) (*Series, error)
DropSeries(keys []string) error
SeriesN() (int64, error)
SeriesN() (uint64, error)
SeriesSketch() (estimator.Sketch, error)
MeasurementsSketch() (estimator.Sketch, error)
Statistics(tags map[string]string) []models.Statistic
TagsForSeries(key string) (models.Tags, error)

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/escape"
"github.com/influxdata/influxdb/pkg/estimator"
internal "github.com/influxdata/influxdb/tsdb/internal"
"github.com/gogo/protobuf/proto"
@ -33,6 +34,9 @@ type DatabaseIndex struct {
series map[string]*Series // map series key to the Series object
lastID uint64 // last used series ID. They're in memory only for this shard
seriesSketch *estimator.HyperLogLogPlus
measurementsSketch *estimator.HyperLogLogPlus
name string // name of the database represented by this index
stats *IndexStatistics
@ -40,18 +44,26 @@ type DatabaseIndex struct {
}
// NewDatabaseIndex returns a new initialized DatabaseIndex.
func NewDatabaseIndex(name string) *DatabaseIndex {
return &DatabaseIndex{
func NewDatabaseIndex(name string) (index *DatabaseIndex, err error) {
index = &DatabaseIndex{
measurements: make(map[string]*Measurement),
series: make(map[string]*Series),
name: name,
stats: &IndexStatistics{},
defaultTags: models.StatisticTags{"database": name},
}
if index.seriesSketch, err = estimator.NewHyperLogLogPlus(14); err != nil {
return nil, err
}
if index.measurementsSketch, err = estimator.NewHyperLogLogPlus(14); err != nil {
return nil, err
}
return index, nil
}
func (d *DatabaseIndex) Open() error { return nil }
func (d *DatabaseIndex) Close() error { return nil }
func (d *DatabaseIndex) Open() (err error) { return nil }
func (d *DatabaseIndex) Close() error { return nil }
// IndexStatistics maintains statistics for the index.
type IndexStatistics struct {
@ -79,11 +91,18 @@ func (d *DatabaseIndex) Series(key string) (*Series, error) {
return s, nil
}
// SeriesN returns the number of series.
func (d *DatabaseIndex) SeriesN() (int64, error) {
// SeriesN returns the exact number of series in the index.
func (d *DatabaseIndex) SeriesN() (uint64, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return int64(len(d.series)), nil
return uint64(len(d.series)), nil
}
// SeriesSketch returns the sketch for the series.
func (d *DatabaseIndex) SeriesSketch() (estimator.Sketch, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.seriesSketch, nil
}
// Measurement returns the measurement object from the index by the name
@ -93,6 +112,13 @@ func (d *DatabaseIndex) Measurement(name string) (*Measurement, error) {
return d.measurements[name], nil
}
// MeasurementsSketch returns the sketch for the series.
func (d *DatabaseIndex) MeasurementsSketch() (estimator.Sketch, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.measurementsSketch, nil
}
// MeasurementsByName returns a list of measurements.
func (d *DatabaseIndex) MeasurementsByName(names []string) ([]*Measurement, error) {
d.mu.RLock()
@ -142,7 +168,10 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser
d.mu.RUnlock()
// get or create the measurement index
m, _ := d.CreateMeasurementIndexIfNotExists(measurementName)
m, err := d.CreateMeasurementIndexIfNotExists(measurementName)
if err != nil {
return nil, err
}
d.mu.Lock()
// Check for the series again under a write lock
@ -161,6 +190,10 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser
m.AddSeries(series)
// Add the series to the series sketch.
if err := d.seriesSketch.Add([]byte(series.Key)); err != nil {
return nil, err
}
atomic.AddInt64(&d.stats.NumSeries, 1)
d.mu.Unlock()
@ -191,6 +224,11 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) (*Measure
if m == nil {
m = NewMeasurement(name)
d.measurements[name] = m
// Add the measurement to the measurements sketch.
if err := d.measurementsSketch.Add([]byte(name)); err != nil {
return nil, err
}
atomic.AddInt64(&d.stats.NumMeasurements, 1)
}
return m, nil

View File

@ -248,7 +248,11 @@ func BenchmarkCreateSeriesIndex_1M(b *testing.B) {
func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) {
idxs := make([]*tsdb.DatabaseIndex, 0, b.N)
for i := 0; i < b.N; i++ {
idxs = append(idxs, tsdb.NewDatabaseIndex(fmt.Sprintf("db%d", i)))
index, err := tsdb.NewDatabaseIndex(fmt.Sprintf("db%d", i))
if err != nil {
b.Fatal(err)
}
idxs = append(idxs, index)
}
b.ResetTimer()

View File

@ -201,7 +201,12 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
return nil
}
seriesN, _ := s.engine.SeriesCardinality()
seriesN, err := s.engine.SeriesN()
if err != nil {
s.logger.Print(err)
seriesN = 0
}
tags = s.defaultTags.Merge(tags)
statistics := []models.Statistic{{
Name: "shard",
@ -258,17 +263,23 @@ func (s *Shard) Open() error {
// Load metadata index.
start := time.Now()
if err := e.LoadMetadataIndex(s.id, NewDatabaseIndex(s.database)); err != nil {
index, err := NewDatabaseIndex(s.database)
if err != nil {
return err
}
if err := e.LoadMetadataIndex(s.id, index); err != nil {
return err
}
s.engine = e
count, err := s.engine.SeriesCardinality()
seriesN, err := s.engine.SeriesN()
if err != nil {
return err
}
atomic.AddInt64(&s.stats.SeriesCreated, int64(count))
// Store statistic of exact number of series in shard.
atomic.AddInt64(&s.stats.SeriesCreated, int64(seriesN))
s.logger.Info(fmt.Sprintf("%s database index loaded in %s", s.path, time.Now().Sub(start)))
@ -566,15 +577,15 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
}
if ss == nil {
cnt, err := s.engine.SeriesN()
sn, err := s.engine.SeriesN()
if err != nil {
return nil, nil, err
}
if s.options.Config.MaxSeriesPerDatabase > 0 && cnt+1 > int64(s.options.Config.MaxSeriesPerDatabase) {
if s.options.Config.MaxSeriesPerDatabase > 0 && sn+1 > int64(s.options.Config.MaxSeriesPerDatabase) {
atomic.AddInt64(&s.stats.WritePointsDropped, 1)
dropped++
reason = fmt.Sprintf("db %s max series limit reached: (%d/%d)", s.database, cnt, s.options.Config.MaxSeriesPerDatabase)
reason = fmt.Sprintf("db %s max series limit reached: (%d/%d)", s.database, sn, s.options.Config.MaxSeriesPerDatabase)
continue
}
@ -667,12 +678,12 @@ func (s *Shard) MeasurementsByExpr(cond influxql.Expr) (Measurements, bool, erro
return s.engine.MeasurementsByExpr(cond)
}
// SeriesCardinality returns the number of series buckets on the shard.
func (s *Shard) SeriesCardinality() (int64, error) {
// SeriesN returns the exact number of series in the shard.
func (s *Shard) SeriesN() (uint64, error) {
if err := s.ready(); err != nil {
return 0, err
}
return s.engine.SeriesCardinality()
return s.engine.SeriesN()
}
// Series returns a series by key.

View File

@ -65,12 +65,12 @@ func TestShardWriteAndIndex(t *testing.T) {
}
validateIndex := func() {
cnt, err := sh.SeriesCardinality()
cnt, err := sh.SeriesN()
if err != nil {
t.Fatal(err)
}
if got, exp := cnt, int64(1); got != exp {
if got, exp := cnt, uint64(1); got != exp {
t.Fatalf("got %v series, exp %v series in index", got, exp)
}
@ -346,11 +346,11 @@ func TestShardWriteAddNewField(t *testing.T) {
t.Fatalf(err.Error())
}
cnt, err := sh.SeriesCardinality()
cnt, err := sh.SeriesN()
if err != nil {
t.Fatal(err)
}
if got, exp := cnt, int64(1); got != exp {
if got, exp := cnt, uint64(1); got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}
@ -470,11 +470,11 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
t.Fatalf(err.Error())
}
cnt, err := sh.SeriesCardinality()
cnt, err := sh.SeriesN()
if err != nil {
t.Fatal(err)
}
if got, exp := cnt, int64(1); got != exp {
if got, exp := cnt, uint64(1); got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}
@ -482,10 +482,10 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
sh.Close()
sh.Open()
if cnt, err = sh.SeriesCardinality(); err != nil {
if cnt, err = sh.SeriesN(); err != nil {
t.Fatal(err)
}
if got, exp := cnt, int64(1); got != exp {
if got, exp := cnt, uint64(1); got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}

View File

@ -16,6 +16,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/limiter"
"go.uber.org/zap"
)
@ -554,7 +555,31 @@ func (s *Store) DiskSize() (int64, error) {
// SeriesCardinality returns the series cardinality for the provided database.
func (s *Store) SeriesCardinality(database string) (int64, error) {
panic("TODO: edd")
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
var sketch estimator.Sketch
// Iterate over all shards for the database and combine all of the series
// sketches.
for _, shard := range shards {
other, err := shard.engine.SeriesSketch()
if err != nil {
return 0, err
}
if sketch == nil {
sketch = other
} else if err = sketch.Merge(other); err != nil {
return 0, err
}
}
if sketch != nil {
cnt, err := sketch.Count()
return int64(cnt), err
}
return 0, nil
}
// MeasurementsCardinality returns the measurement cardinality for the provided