Merge pull request #9412 from influxdata/er-series-sketches

Sketches, Cont'd
pull/9415/head
Ben Johnson 2018-02-08 11:41:39 -07:00 committed by GitHub
commit 0af6fb8e10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 672 additions and 93 deletions

View File

@ -172,7 +172,7 @@ func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string,
}
// Open TSI index in temporary path.
tsiIndex := tsi1.NewIndex(sfile, tsi1.WithPath(tmpPath))
tsiIndex := tsi1.NewIndex(sfile, dbName, tsi1.WithPath(tmpPath))
tsiIndex.WithLogger(cmd.Logger)
cmd.Logger.Info("opening tsi index in temporary location", zap.String("path", tmpPath))
if err := tsiIndex.Open(); err != nil {

View File

@ -174,6 +174,7 @@ func (cmd *Command) readFileSet(sfile *tsdb.SeriesFile) (*tsi1.Index, *tsi1.File
return nil, nil, err
} else if fi.IsDir() {
idx := tsi1.NewIndex(sfile,
"",
tsi1.WithPath(cmd.paths[0]),
tsi1.DisableCompactions(),
)

View File

@ -163,6 +163,10 @@ func (h *Plus) Add(v []byte) {
// Count returns a cardinality estimate.
func (h *Plus) Count() uint64 {
if h == nil {
return 0 // Nothing to do.
}
if h.sparse {
h.mergeSparse()
return uint64(h.linearCount(h.mp, h.mp-uint32(h.sparseList.count)))
@ -275,13 +279,16 @@ func (h *Plus) MarshalBinary() (data []byte, err error) {
// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface.
func (h *Plus) UnmarshalBinary(data []byte) error {
if len(data) < 12 {
return fmt.Errorf("provided buffer %v too short for initializing HLL sketch", data)
}
// Unmarshal version. We may need this in the future if we make
// non-compatible changes.
_ = data[0]
// Unmarshal precision.
p := uint8(data[1])
newh, err := NewPlus(p)
if err != nil {
return err

View File

@ -57,6 +57,7 @@ type Engine interface {
DeleteSeriesRange(itr SeriesIterator, min, max int64) error
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
MeasurementExists(name []byte) (bool, error)

View File

@ -532,10 +532,20 @@ func (e *Engine) SeriesN() int64 {
return e.index.SeriesN()
}
// MeasurementsSketches returns sketches that describe the cardinality of the
// measurements in this shard and measurements that were in this shard, but have
// been tombstoned.
func (e *Engine) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
return e.index.MeasurementsSketches()
}
// SeriesSketches returns sketches that describe the cardinality of the
// series in this shard and series that were in this shard, but have
// been tombstoned.
func (e *Engine) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
return e.index.SeriesSketches()
}
// LastModified returns the time when this shard was last modified.
func (e *Engine) LastModified() time.Time {
walTime := e.WAL.LastWriteTime()

View File

@ -36,6 +36,7 @@ type Index interface {
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
HasTagKey(name, key []byte) (bool, error)
HasTagValue(name, key, value []byte) (bool, error)

View File

@ -408,6 +408,19 @@ func (fs *FileSet) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, e
return sketch, tsketch, nil
}
// SeriesSketches returns the merged measurement sketches for the FileSet.
func (fs *FileSet) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
sketch, tsketch := hll.NewDefaultPlus(), hll.NewDefaultPlus()
// Iterate over all the files and merge the sketches into the result.
for _, f := range fs.files {
if err := f.MergeSeriesSketches(sketch, tsketch); err != nil {
return nil, nil, err
}
}
return sketch, tsketch, nil
}
// File represents a log or index file.
type File interface {
Close() error
@ -433,6 +446,7 @@ type File interface {
// Sketches for cardinality estimation
MergeMeasurementsSketches(s, t estimator.Sketch) error
MergeSeriesSketches(s, t estimator.Sketch) error
// Bitmap series existance.
SeriesIDSet() (*tsdb.SeriesIDSet, error)

View File

@ -39,8 +39,7 @@ func init() {
}
tsdb.RegisterIndex(IndexName, func(_ uint64, db, path string, _ *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, _ tsdb.EngineOptions) tsdb.Index {
idx := NewIndex(sfile, WithPath(path))
idx.database = db
idx := NewIndex(sfile, db, WithPath(path))
return idx
})
}
@ -77,36 +76,46 @@ var WithLogger = func(l zap.Logger) IndexOption {
}
}
// WithMaximumLogFileSize sets the maximum size of LogFiles before they're
// compacted into IndexFiles.
var WithMaximumLogFileSize = func(size int64) IndexOption {
return func(i *Index) {
i.maxLogFileSize = size
}
}
// Index represents a collection of layered index files and WAL.
type Index struct {
mu sync.RWMutex
partitions []*Partition
opened bool
// The following can be set when initialising an Index.
// The following may be set when initializing an Index.
path string // Root directory of the index partitions.
disableCompactions bool // Initially disables compactions on the index.
maxLogFileSize int64 // Maximum size of a LogFile before it's compacted.
logger *zap.Logger // Index's logger.
sfile *tsdb.SeriesFile // series lookup file
// The following must be set when initializing an Index.
sfile *tsdb.SeriesFile // series lookup file
database string // Name of database.
// Index's version.
version int
// Name of database.
database string
// Number of partitions used by the index.
PartitionN uint64
}
// NewIndex returns a new instance of Index.
func NewIndex(sfile *tsdb.SeriesFile, options ...IndexOption) *Index {
func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *Index {
idx := &Index{
logger: zap.NewNop(),
version: Version,
sfile: sfile,
PartitionN: DefaultPartitionN,
maxLogFileSize: DefaultMaxLogFileSize,
logger: zap.NewNop(),
version: Version,
sfile: sfile,
database: database,
PartitionN: DefaultPartitionN,
}
for _, option := range options {
@ -128,10 +137,6 @@ func (i *Index) Database() string {
func (i *Index) WithLogger(l *zap.Logger) {
i.mu.Lock()
defer i.mu.Unlock()
for i, p := range i.partitions {
p.logger = l.With(zap.String("index", "tsi"), zap.String("partition", fmt.Sprint(i+1)))
}
i.logger = l.With(zap.String("index", "tsi"))
}
@ -167,12 +172,13 @@ func (i *Index) Open() error {
return err
}
// Inititalise index partitions.
// Initialize index partitions.
i.partitions = make([]*Partition, i.PartitionN)
for j := 0; j < len(i.partitions); j++ {
p := NewPartition(i.sfile, filepath.Join(i.path, fmt.Sprint(j)))
p.MaxLogFileSize = i.maxLogFileSize
p.Database = i.database
p.logger = i.logger.With(zap.String("partition", fmt.Sprint(j+1)))
p.logger = i.logger.With(zap.String("index", "tsi"), zap.String("partition", fmt.Sprint(j+1)))
i.partitions[j] = p
}
@ -251,6 +257,8 @@ func (i *Index) Close() error {
}
}
// Mark index as closed.
i.opened = false
return nil
}
@ -594,7 +602,7 @@ func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) error {
}
// MeasurementsSketches returns the two sketches for the index by merging all
// instances of the type sketch types in all the index files.
// instances of the type sketch types in all the partitions.
func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
s, ts := hll.NewDefaultPlus(), hll.NewDefaultPlus()
for _, p := range i.partitions {
@ -614,8 +622,27 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro
return s, ts, nil
}
// SeriesN returns the number of unique non-tombstoned series in this index.
//
// SeriesSketches returns the two sketches for the index by merging all
// instances of the type sketch types in all the partitions.
func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
s, ts := hll.NewDefaultPlus(), hll.NewDefaultPlus()
for _, p := range i.partitions {
// Get partition's measurement sketches and merge.
ps, pts, err := p.SeriesSketches()
if err != nil {
return nil, nil, err
}
if err := s.Merge(ps); err != nil {
return nil, nil, err
} else if err := ts.Merge(pts); err != nil {
return nil, nil, err
}
}
return s, ts, nil
}
// Since indexes are not shared across shards, the count returned by SeriesN
// cannot be combined with other shard's results. If you need to count series
// across indexes then use either the database-wide series file, or merge the

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/estimator/hll"
"github.com/influxdata/influxdb/pkg/mmap"
"github.com/influxdata/influxdb/tsdb"
)
@ -25,10 +26,13 @@ const (
// IndexFile trailer fields
IndexFileVersionSize = 2
// IndexFileTrailerSize is the size of the trailer. Currently 82 bytes.
IndexFileTrailerSize = IndexFileVersionSize +
8 + 8 + // measurement block offset + size
8 + 8 + // series id set offset + size
8 + 8 + // tombstone series id set offset + size
8 + 8 + // series sketch offset + size
8 + 8 + // tombstone series sketch offset + size
0
)
@ -52,6 +56,9 @@ type IndexFile struct {
seriesIDSetData []byte
tombstoneSeriesIDSetData []byte
// Series sketches
sketch, tSketch estimator.Sketch
// Sortable identifier & filepath to the log file.
level int
id int
@ -66,11 +73,22 @@ type IndexFile struct {
// NewIndexFile returns a new instance of IndexFile.
func NewIndexFile(sfile *tsdb.SeriesFile) *IndexFile {
return &IndexFile{sfile: sfile}
return &IndexFile{
sfile: sfile,
sketch: hll.NewDefaultPlus(),
tSketch: hll.NewDefaultPlus(),
}
}
// Open memory maps the data file at the file's path.
func (f *IndexFile) Open() error {
defer func() {
if err := recover(); err != nil {
err = fmt.Errorf("[Index file: %s] %v", f.path, err)
panic(err)
}
}()
// Extract identifier from path name.
f.id, f.level = ParseFilename(f.Path())
@ -138,12 +156,23 @@ func (f *IndexFile) UnmarshalBinary(data []byte) error {
return err
}
// Slice series sketch data.
buf := data[t.SeriesSketch.Offset : t.SeriesSketch.Offset+t.SeriesSketch.Size]
if err := f.sketch.UnmarshalBinary(buf); err != nil {
return err
}
buf = data[t.TombstoneSeriesSketch.Offset : t.TombstoneSeriesSketch.Offset+t.TombstoneSeriesSketch.Size]
if err := f.tSketch.UnmarshalBinary(buf); err != nil {
return err
}
// Slice series set data.
f.seriesIDSetData = data[t.SeriesIDSet.Offset : t.SeriesIDSet.Offset+t.SeriesIDSet.Size]
f.tombstoneSeriesIDSetData = data[t.TombstoneSeriesIDSet.Offset : t.TombstoneSeriesIDSet.Offset+t.TombstoneSeriesIDSet.Size]
// Slice measurement block data.
buf := data[t.MeasurementBlock.Offset:]
buf = data[t.MeasurementBlock.Offset:]
buf = buf[:t.MeasurementBlock.Size]
// Unmarshal measurement block.
@ -340,8 +369,8 @@ func (f *IndexFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterat
return f.mblk.SeriesIDIterator(name)
}
// MergeMeasurementsSketches merges the index file's series sketches into the provided
// sketches.
// MergeMeasurementsSketches merges the index file's measurements sketches into
// the provided sketches.
func (f *IndexFile) MergeMeasurementsSketches(s, t estimator.Sketch) error {
if err := s.Merge(f.mblk.sketch); err != nil {
return err
@ -349,6 +378,15 @@ func (f *IndexFile) MergeMeasurementsSketches(s, t estimator.Sketch) error {
return t.Merge(f.mblk.tSketch)
}
// MergeSeriesSketches merges the index file's series sketches into the provided
// sketches.
func (f *IndexFile) MergeSeriesSketches(s, t estimator.Sketch) error {
if err := s.Merge(f.sketch); err != nil {
return err
}
return t.Merge(f.tSketch)
}
// ReadIndexFileTrailer returns the index file trailer from data.
func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) {
var t IndexFileTrailer
@ -370,10 +408,21 @@ func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) {
t.SeriesIDSet.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
t.SeriesIDSet.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
// Read series id set info.
// Read series tombstone id set info.
t.TombstoneSeriesIDSet.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
t.TombstoneSeriesIDSet.Size = int64(binary.BigEndian.Uint64(buf[0:8]))
t.TombstoneSeriesIDSet.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
// Read series sketch set info.
t.SeriesSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
t.SeriesSketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
// Read series tombstone sketch info.
t.TombstoneSeriesSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
t.TombstoneSeriesSketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
if len(buf) != 2 { // Version field still in buffer.
return t, fmt.Errorf("unread %d bytes left unread in trailer", len(buf)-2)
}
return t, nil
}
@ -395,6 +444,16 @@ type IndexFileTrailer struct {
Offset int64
Size int64
}
SeriesSketch struct {
Offset int64
Size int64
}
TombstoneSeriesSketch struct {
Offset int64
Size int64
}
}
// WriteTo writes the trailer to w.
@ -420,6 +479,20 @@ func (t *IndexFileTrailer) WriteTo(w io.Writer) (n int64, err error) {
return n, err
}
// Write series sketch info.
if err := writeUint64To(w, uint64(t.SeriesSketch.Offset), &n); err != nil {
return n, err
} else if err := writeUint64To(w, uint64(t.SeriesSketch.Size), &n); err != nil {
return n, err
}
// Write series tombstone sketch info.
if err := writeUint64To(w, uint64(t.TombstoneSeriesSketch.Offset), &n); err != nil {
return n, err
} else if err := writeUint64To(w, uint64(t.TombstoneSeriesSketch.Size), &n); err != nil {
return n, err
}
// Write index file encoding version.
if err := writeUint16To(w, IndexFileVersion, &n); err != nil {
return n, err

View File

@ -114,11 +114,11 @@ func CreateIndexFile(sfile *tsdb.SeriesFile, series []Series) (*tsi1.IndexFile,
}
// Load index file from buffer.
var f tsi1.IndexFile
f := tsi1.NewIndexFile(sfile)
if err := f.UnmarshalBinary(buf.Bytes()); err != nil {
return nil, err
}
return &f, nil
return f, nil
}
// GenerateIndexFile generates an index file from a set of series based on the count arguments.
@ -137,11 +137,11 @@ func GenerateIndexFile(sfile *tsdb.SeriesFile, measurementN, tagN, valueN int) (
}
// Load index file from buffer.
var f tsi1.IndexFile
f := tsi1.NewIndexFile(sfile)
if err := f.UnmarshalBinary(buf.Bytes()); err != nil {
return nil, err
}
return &f, nil
return f, nil
}
func MustGenerateIndexFile(sfile *tsdb.SeriesFile, measurementN, tagN, valueN int) *tsi1.IndexFile {

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/influxdata/influxdb/pkg/bytesutil"
"github.com/influxdata/influxdb/pkg/estimator/hll"
"github.com/influxdata/influxdb/tsdb"
)
@ -197,6 +198,21 @@ func (p IndexFiles) CompactTo(w io.Writer, sfile *tsdb.SeriesFile, m, k uint64,
return n, err
}
// Generate sketches from series sets.
sketch := hll.NewDefaultPlus()
seriesIDSet.ForEach(func(id uint64) {
if key := sfile.SeriesKey(id); key != nil {
sketch.Add(key)
}
})
tSketch := hll.NewDefaultPlus()
tombstoneSeriesIDSet.ForEach(func(id uint64) {
if key := sfile.SeriesKey(id); key != nil {
tSketch.Add(key)
}
})
// Write series set.
t.SeriesIDSet.Offset = n
nn, err := seriesIDSet.WriteTo(bw)
@ -213,6 +229,26 @@ func (p IndexFiles) CompactTo(w io.Writer, sfile *tsdb.SeriesFile, m, k uint64,
}
t.TombstoneSeriesIDSet.Size = n - t.TombstoneSeriesIDSet.Offset
// Write series sketches. TODO(edd): Implement WriterTo on HLL++.
t.SeriesSketch.Offset = n
data, err := sketch.MarshalBinary()
if err != nil {
return n, err
} else if _, err := bw.Write(data); err != nil {
return n, err
}
t.SeriesSketch.Size = int64(len(data))
n += t.SeriesSketch.Size
t.TombstoneSeriesSketch.Offset = n
if data, err = tSketch.MarshalBinary(); err != nil {
return n, err
} else if _, err := bw.Write(data); err != nil {
return n, err
}
t.TombstoneSeriesSketch.Size = int64(len(data))
n += t.TombstoneSeriesSketch.Size
// Write trailer.
nn, err = t.WriteTo(bw)
n += nn

View File

@ -42,7 +42,7 @@ func TestIndexFiles_WriteTo(t *testing.T) {
}
// Unmarshal buffer into a new index file.
var f tsi1.IndexFile
f := tsi1.NewIndexFile(sfile.SeriesFile)
if err := f.UnmarshalBinary(buf.Bytes()); err != nil {
t.Fatal(err)
}

View File

@ -317,7 +317,7 @@ type Index struct {
// NewIndex returns a new instance of Index at a temporary path.
func NewIndex(partitionN uint64) *Index {
idx := &Index{SeriesFile: NewSeriesFile()}
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, tsi1.WithPath(MustTempDir()))
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(MustTempDir()))
idx.Index.PartitionN = partitionN
return idx
}
@ -361,7 +361,7 @@ func (idx *Index) Reopen() error {
}
partitionN := idx.Index.PartitionN // Remember how many partitions to use.
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, tsi1.WithPath(idx.Index.Path()))
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(idx.Index.Path()))
idx.Index.PartitionN = partitionN
return idx.Open()
}

View File

@ -162,7 +162,6 @@ func (f *LogFile) Close() error {
}
f.mms = make(logMeasurements)
return nil
}
@ -658,15 +657,13 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) {
mm.tagSet[string(k)] = ts
}
// TODO(edd) increment series count....
f.sSketch.Add(seriesKey) // Add series to sketch.
f.mSketch.Add(name) // Add measurement to sketch as this may be the fist series for the measurement.
// Add/remove from appropriate series id sets.
if !deleted {
f.sSketch.Add(seriesKey) // Add series to sketch - key in series file format.
f.seriesIDSet.Add(e.SeriesID)
f.tombstoneSeriesIDSet.Remove(e.SeriesID)
} else {
f.sTSketch.Add(seriesKey) // Add series to tombstone sketch - key in series file format.
f.seriesIDSet.Remove(e.SeriesID)
f.tombstoneSeriesIDSet.Add(e.SeriesID)
}
@ -732,6 +729,9 @@ func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement {
series: make(map[uint64]struct{}),
}
f.mms[string(name)] = mm
// Add measurement to sketch.
f.mSketch.Add(name)
}
return mm
}
@ -822,6 +822,26 @@ func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n
}
t.TombstoneSeriesIDSet.Size = n - t.TombstoneSeriesIDSet.Offset
// Write series sketches. TODO(edd): Implement WriterTo on HLL++.
t.SeriesSketch.Offset = n
data, err := f.sSketch.MarshalBinary()
if err != nil {
return n, err
} else if _, err := bw.Write(data); err != nil {
return n, err
}
t.SeriesSketch.Size = int64(len(data))
n += t.SeriesSketch.Size
t.TombstoneSeriesSketch.Offset = n
if data, err = f.sTSketch.MarshalBinary(); err != nil {
return n, err
} else if _, err := bw.Write(data); err != nil {
return n, err
}
t.TombstoneSeriesSketch.Size = int64(len(data))
n += t.TombstoneSeriesSketch.Size
// Write trailer.
nn, err = t.WriteTo(bw)
n += nn
@ -968,6 +988,20 @@ func (f *LogFile) MergeMeasurementsSketches(sketch, tsketch estimator.Sketch) er
return tsketch.Merge(f.mTSketch)
}
// MergeSeriesSketches merges the series sketches belonging to this
// LogFile into the provided sketches.
//
// MergeSeriesSketches is safe for concurrent use by multiple goroutines.
func (f *LogFile) MergeSeriesSketches(sketch, tsketch estimator.Sketch) error {
f.mu.RLock()
defer f.mu.RUnlock()
if err := sketch.Merge(f.sSketch); err != nil {
return err
}
return tsketch.Merge(f.sTSketch)
}
// LogEntry represents a single log entry in the write-ahead log.
type LogEntry struct {
Flag byte // flag

View File

@ -564,6 +564,22 @@ func (i *Partition) DropMeasurement(name []byte) error {
}
}
// Delete all series.
if itr := fs.MeasurementSeriesIDIterator(name); itr != nil {
defer itr.Close()
for {
elem, err := itr.Next()
if err != nil {
return err
} else if elem.SeriesID == 0 {
break
}
if err := i.activeLogFile.DeleteSeriesID(elem.SeriesID); err != nil {
return err
}
}
}
// Mark measurement as deleted.
if err := func() error {
i.mu.RLock()
@ -622,7 +638,7 @@ func (i *Partition) DropSeries(seriesID uint64) error {
return i.CheckLogFile()
}
// MeasurementsSketches returns the two sketches for the index by merging all
// MeasurementsSketches returns the two sketches for the partition by merging all
// instances of the type sketch types in all the index files.
func (i *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
fs, err := i.RetainFileSet()
@ -633,6 +649,17 @@ func (i *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch,
return fs.MeasurementsSketches()
}
// SeriesSketches returns the two sketches for the partition by merging all
// instances of the type sketch types in all the index files.
func (i *Partition) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
fs, err := i.RetainFileSet()
if err != nil {
return nil, nil, err
}
defer fs.Release()
return fs.SeriesSketches()
}
// HasTagKey returns true if tag key exists.
func (i *Partition) HasTagKey(name, key []byte) (bool, error) {
fs, err := i.RetainFileSet()

View File

@ -9,10 +9,12 @@ import (
"testing"
"github.com/influxdata/influxdb/internal"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/slices"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
"github.com/influxdata/influxql"
)
@ -55,7 +57,7 @@ func TestIndexSet_MeasurementNamesByExpr(t *testing.T) {
// Setup indexes
indexes := map[string]*Index{}
for _, name := range tsdb.RegisteredIndexes() {
idx := MustNewIndex(name)
idx := MustOpenNewIndex(name)
idx.AddSeries("cpu", map[string]string{"region": "east"})
idx.AddSeries("cpu", map[string]string{"region": "west", "secret": "foo"})
idx.AddSeries("disk", map[string]string{"secret": "foo"})
@ -131,17 +133,134 @@ func TestIndexSet_MeasurementNamesByExpr(t *testing.T) {
}
}
type Index struct {
tsdb.Index
rootPath string
sfile *tsdb.SeriesFile
func TestIndex_Sketches(t *testing.T) {
checkCardinalities := func(t *testing.T, index *Index, state string, series, tseries, measurements, tmeasurements int) {
// Get sketches and check cardinality...
sketch, tsketch, err := index.SeriesSketches()
if err != nil {
t.Fatal(err)
}
// delta calculates a rough 10% delta. If i is small then a minimum value
// of 2 is used.
delta := func(i int) int {
v := i / 10
if v == 0 {
v = 2
}
return v
}
// series cardinality should be well within 10%.
if got, exp := int(sketch.Count()), series; got-exp < -delta(series) || got-exp > delta(series) {
t.Errorf("[%s] got series cardinality %d, expected ~%d", state, got, exp)
}
// check series tombstones
if got, exp := int(tsketch.Count()), tseries; got-exp < -delta(tseries) || got-exp > delta(tseries) {
t.Errorf("[%s] got series tombstone cardinality %d, expected ~%d", state, got, exp)
}
// Check measurement cardinality.
if sketch, tsketch, err = index.MeasurementsSketches(); err != nil {
t.Fatal(err)
}
if got, exp := int(sketch.Count()), measurements; got != exp { //got-exp < -delta(measurements) || got-exp > delta(measurements) {
t.Errorf("[%s] got measurement cardinality %d, expected ~%d", state, got, exp)
}
if got, exp := int(tsketch.Count()), tmeasurements; got != exp { //got-exp < -delta(tmeasurements) || got-exp > delta(tmeasurements) {
t.Errorf("[%s] got measurement tombstone cardinality %d, expected ~%d", state, got, exp)
}
}
test := func(t *testing.T, index string) error {
idx := MustNewIndex(index)
if index, ok := idx.Index.(*tsi1.Index); ok {
// Override the log file max size to force a log file compaction sooner.
// This way, we will test the sketches are correct when they have been
// compacted into IndexFiles, and also when they're loaded from
// IndexFiles after a re-open.
tsi1.WithMaximumLogFileSize(1 << 10)(index)
}
// Open the index
idx.MustOpen()
defer idx.Close()
series := genTestSeries(10, 5, 3)
// Add series to index.
for _, serie := range series {
if err := idx.AddSeries(serie.Measurement, serie.Tags.Map()); err != nil {
t.Fatal(err)
}
}
// Check cardinalities after adding series.
checkCardinalities(t, idx, "initial", 2430, 0, 10, 0)
// Re-open step only applies to the TSI index.
if _, ok := idx.Index.(*tsi1.Index); ok {
// Re-open the index.
if err := idx.Reopen(); err != nil {
panic(err)
}
// Check cardinalities after the reopen
checkCardinalities(t, idx, "initial|reopen", 2430, 0, 10, 0)
}
// Drop some series
if err := idx.DropMeasurement([]byte("measurement2")); err != nil {
return err
} else if err := idx.DropMeasurement([]byte("measurement5")); err != nil {
return err
}
// Check cardinalities after the delete
checkCardinalities(t, idx, "initial|reopen|delete", 2430, 486, 10, 2)
// Re-open step only applies to the TSI index.
if _, ok := idx.Index.(*tsi1.Index); ok {
// Re-open the index.
if err := idx.Reopen(); err != nil {
panic(err)
}
// Check cardinalities after the reopen
checkCardinalities(t, idx, "initial|reopen|delete|reopen", 2430, 486, 10, 2)
}
return nil
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(t, index); err != nil {
t.Fatal(err)
}
})
}
}
// Index wraps a series file and index.
type Index struct {
tsdb.Index
rootPath string
indexType string
sfile *tsdb.SeriesFile
}
// MustNewIndex will initialize a new index using the provide type. It creates
// everything under the same root directory so it can be cleanly removed on Close.
//
// The index will not be opened.
func MustNewIndex(index string) *Index {
opts := tsdb.NewEngineOptions()
opts.IndexVersion = index
rootPath, err := ioutil.TempDir("", "influxdb-tsdb")
fmt.Println(rootPath)
if err != nil {
panic(err)
}
@ -160,14 +279,39 @@ func MustNewIndex(index string) *Index {
opts.InmemIndex = inmem.NewIndex("db0", sfile)
}
i, err := tsdb.NewIndex(0, "db0", filepath.Join(rootPath, "index"), tsdb.NewSeriesIDSet(), sfile, opts)
if err != nil {
panic(err)
}
if testing.Verbose() {
i.WithLogger(logger.New(os.Stderr))
}
idx := &Index{
Index: tsdb.MustOpenIndex(0, "db0", filepath.Join(rootPath, "index"), tsdb.NewSeriesIDSet(), sfile, opts),
rootPath: rootPath,
sfile: sfile,
Index: i,
indexType: index,
rootPath: rootPath,
sfile: sfile,
}
return idx
}
// MustOpenNewIndex will initialize a new index using the provide type and opens
// it.
func MustOpenNewIndex(index string) *Index {
idx := MustNewIndex(index)
idx.MustOpen()
return idx
}
// MustOpen opens the underlying index or panics.
func (i *Index) MustOpen() {
if err := i.Index.Open(); err != nil {
panic(err)
}
}
func (idx *Index) IndexSet() *tsdb.IndexSet {
return &tsdb.IndexSet{Indexes: []tsdb.Index{idx.Index}, SeriesFile: idx.sfile}
}
@ -178,6 +322,36 @@ func (idx *Index) AddSeries(name string, tags map[string]string) error {
return idx.CreateSeriesIfNotExists([]byte(key), []byte(name), t)
}
// Reopen closes and re-opens the underlying index, without removing any data.
func (i *Index) Reopen() error {
if err := i.Index.Close(); err != nil {
return err
}
if err := i.sfile.Close(); err != nil {
return err
}
i.sfile = tsdb.NewSeriesFile(i.sfile.Path())
if err := i.sfile.Open(); err != nil {
return err
}
opts := tsdb.NewEngineOptions()
opts.IndexVersion = i.indexType
if i.indexType == inmem.IndexName {
opts.InmemIndex = inmem.NewIndex("db0", i.sfile)
}
idx, err := tsdb.NewIndex(0, "db0", filepath.Join(i.rootPath, "index"), tsdb.NewSeriesIDSet(), i.sfile, opts)
if err != nil {
return err
}
i.Index = idx
return i.Index.Open()
}
// Close closes the index cleanly and removes all on-disk data.
func (i *Index) Close() error {
if err := i.Index.Close(); err != nil {
return err
@ -186,5 +360,6 @@ func (i *Index) Close() error {
if err := i.sfile.Close(); err != nil {
return err
}
return os.RemoveAll(i.rootPath)
//return os.RemoveAll(i.rootPath)
return nil
}

View File

@ -148,7 +148,7 @@ type TestSeries struct {
func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
measurements := genStrList("measurement", mCnt)
tagSets := NewTagSetGenerator(tCnt, vCnt).AllSets()
var series []*TestSeries
series := make([]*TestSeries, 0, mCnt*len(tagSets))
for _, m := range measurements {
for _, ts := range tagSets {
series = append(series, &TestSeries{

View File

@ -705,6 +705,15 @@ func (s *Shard) SeriesN() int64 {
return engine.SeriesN()
}
// SeriesSketches returns the measurement sketches for the shard.
func (s *Shard) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
engine, err := s.engine()
if err != nil {
return nil, nil, err
}
return engine.SeriesSketches()
}
// MeasurementsSketches returns the measurement sketches for the shard.
func (s *Shard) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
engine, err := s.engine()

View File

@ -849,7 +849,9 @@ func (s *Store) DiskSize() (int64, error) {
return size, nil
}
func (s *Store) estimateCardinality(dbName string, getSketches func(*Shard) (estimator.Sketch, estimator.Sketch, error)) (int64, error) {
// sketchesForDatabase returns merged sketches for the provided database, by
// walking each shard in the database and merging the sketches found there.
func (s *Store) sketchesForDatabase(dbName string, getSketches func(*Shard) (estimator.Sketch, estimator.Sketch, error)) (estimator.Sketch, estimator.Sketch, error) {
var (
ss estimator.Sketch // Sketch estimating number of items.
ts estimator.Sketch // Sketch estimating number of tombstoned items.
@ -863,27 +865,26 @@ func (s *Store) estimateCardinality(dbName string, getSketches func(*Shard) (est
for _, shard := range shards {
s, t, err := getSketches(shard)
if err != nil {
return 0, err
return nil, nil, err
}
if ss == nil {
ss, ts = s, t
} else if err = ss.Merge(s); err != nil {
return 0, err
return nil, nil, err
} else if err = ts.Merge(t); err != nil {
return 0, err
return nil, nil, err
}
}
if ss != nil {
return int64(ss.Count() - ts.Count()), nil
}
return 0, nil
return ss, ts, nil
}
// SeriesCardinality returns the series cardinality for the provided database.
// SeriesCardinality returns the exact series cardinality for the provided
// database.
//
// Cardinality is calculated exactly by unioning all shards' bitsets of series
// IDs. The result of this method cannot be combined with any other results.
//
// Cardinality is calculated exactly by unioning all shards' bitsets of series IDs.
func (s *Store) SeriesCardinality(database string) (int64, error) {
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
@ -911,12 +912,46 @@ func (s *Store) SeriesCardinality(database string) (int64, error) {
return int64(ss.Cardinality()), nil
}
// MeasurementsCardinality returns the measurement cardinality for the provided
// database.
// SeriesSketches returns the sketches associated with the series data in all
// the shards in the provided database.
//
// Cardinality is calculated using a sketch-based estimation.
// The returned sketches can be combined with other sketches to provide an
// estimation across distributed databases.
func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Sketch, error) {
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
return sh.SeriesSketches()
})
}
// MeasurementsCardinality returns an estimation of the measurement cardinality
// for the provided database.
//
// Cardinality is calculated using a sketch-based estimation. The result of this
// method cannot be combined with any other results.
func (s *Store) MeasurementsCardinality(database string) (int64, error) {
return s.estimateCardinality(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
ss, ts, err := s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
return sh.MeasurementsSketches()
})
if err != nil {
return 0, err
}
return int64(ss.Count() - ts.Count()), nil
}
// MeasurementsSketches returns the sketches associated with the measurement
// data in all the shards in the provided database.
//
// The returned sketches can be combined with other sketches to provide an
// estimation across distributed databases.
func (s *Store) MeasurementsSketches(database string) (estimator.Sketch, estimator.Sketch, error) {
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}

View File

@ -16,6 +16,8 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/davecgh/go-spew/spew"
"github.com/influxdata/influxdb/internal"
"github.com/influxdata/influxdb/logger"
@ -280,8 +282,7 @@ func TestStore_Open(t *testing.T) {
t.Parallel()
test := func(index string) {
s := NewStore()
s.EngineOptions.IndexVersion = index
s := NewStore(index)
defer s.Close()
if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil {
@ -324,8 +325,7 @@ func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
t.Parallel()
test := func(index string) {
s := NewStore()
s.EngineOptions.IndexVersion = index
s := NewStore(index)
defer s.Close()
// Create a file instead of a directory for a database.
@ -351,8 +351,7 @@ func TestStore_Open_InvalidRetentionPolicy(t *testing.T) {
t.Parallel()
test := func(index string) {
s := NewStore()
s.EngineOptions.IndexVersion = index
s := NewStore(index)
defer s.Close()
// Create an RP file instead of a directory.
@ -382,8 +381,7 @@ func TestStore_Open_InvalidShard(t *testing.T) {
t.Parallel()
test := func(index string) {
s := NewStore()
s.EngineOptions.IndexVersion = index
s := NewStore(index)
defer s.Close()
// Create a non-numeric shard file.
@ -712,8 +710,7 @@ func TestStore_Cardinality_Tombstoning(t *testing.T) {
}
test := func(index string) {
store := NewStore()
store.EngineOptions.IndexVersion = index
store := NewStore(index)
if err := store.Open(); err != nil {
panic(err)
}
@ -771,8 +768,6 @@ func testStoreCardinalityUnique(t *testing.T, store *Store) {
}
func TestStore_Cardinality_Unique(t *testing.T) {
t.Skip("TODO(benbjohnson): Merge series file to DB level")
t.Parallel()
if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" {
@ -780,8 +775,7 @@ func TestStore_Cardinality_Unique(t *testing.T) {
}
test := func(index string) {
store := NewStore()
store.EngineOptions.IndexVersion = index
store := NewStore(index)
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
@ -863,8 +857,7 @@ func TestStore_Cardinality_Duplicates(t *testing.T) {
}
test := func(index string) {
store := NewStore()
store.EngineOptions.IndexVersion = index
store := NewStore(index)
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
@ -932,8 +925,7 @@ func TestStore_Cardinality_Compactions(t *testing.T) {
}
test := func(index string) error {
store := NewStore()
store.EngineOptions.Config.Index = "inmem"
store := NewStore(index)
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
if err := store.Open(); err != nil {
panic(err)
@ -951,6 +943,138 @@ func TestStore_Cardinality_Compactions(t *testing.T) {
}
}
func TestStore_Sketches(t *testing.T) {
t.Parallel()
checkCardinalities := func(store *tsdb.Store, series, tseries, measurements, tmeasurements int) error {
// Get sketches and check cardinality...
sketch, tsketch, err := store.SeriesSketches("db")
if err != nil {
return err
}
// delta calculates a rough 10% delta. If i is small then a minimum value
// of 2 is used.
delta := func(i int) int {
v := i / 10
if v == 0 {
v = 2
}
return v
}
// series cardinality should be well within 10%.
if got, exp := int(sketch.Count()), series; got-exp < -delta(series) || got-exp > delta(series) {
return fmt.Errorf("got series cardinality %d, expected ~%d", got, exp)
}
// check series tombstones
if got, exp := int(tsketch.Count()), tseries; got-exp < -delta(tseries) || got-exp > delta(tseries) {
return fmt.Errorf("got series tombstone cardinality %d, expected ~%d", got, exp)
}
// Check measurement cardinality.
if sketch, tsketch, err = store.MeasurementsSketches("db"); err != nil {
return err
}
if got, exp := int(sketch.Count()), measurements; got-exp < -delta(measurements) || got-exp > delta(measurements) {
return fmt.Errorf("got measurement cardinality %d, expected ~%d", got, exp)
}
if got, exp := int(tsketch.Count()), tmeasurements; got-exp < -delta(tmeasurements) || got-exp > delta(tmeasurements) {
return fmt.Errorf("got measurement tombstone cardinality %d, expected ~%d", got, exp)
}
return nil
}
test := func(index string) error {
store := MustOpenStore(index)
defer store.Close()
// Generate point data to write to the shards.
series := genTestSeries(10, 2, 4) // 160 series
points := make([]models.Point, 0, len(series))
for _, s := range series {
points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
}
// Create requested number of shards in the store & write points across
// shards such that we never write the same series to multiple shards.
for shardID := 0; shardID < 4; shardID++ {
if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
return fmt.Errorf("create shard: %s", err)
}
if err := store.BatchWrite(shardID, points[shardID*40:(shardID+1)*40]); err != nil {
return fmt.Errorf("batch write: %s", err)
}
}
// Check cardinalities
if err := checkCardinalities(store.Store, 160, 0, 10, 0); err != nil {
return fmt.Errorf("[initial] %v", err)
}
// Reopen the store.
if err := store.Reopen(); err != nil {
return err
}
// Check cardinalities
if err := checkCardinalities(store.Store, 160, 0, 10, 0); err != nil {
return fmt.Errorf("[initial|re-open] %v", err)
}
// Delete half the the measurements data
mnames, err := store.MeasurementNames(nil, "db", nil)
if err != nil {
return err
}
for _, name := range mnames[:len(mnames)/2] {
if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil); err != nil {
return err
}
}
// Check cardinalities - tombstones should be in
if err := checkCardinalities(store.Store, 160, 80, 10, 5); err != nil {
return fmt.Errorf("[initial|re-open|delete] %v", err)
}
// Reopen the store.
if err := store.Reopen(); err != nil {
return err
}
// Check cardinalities. In this case, the indexes behave differently.
//
// - The inmem index will report that there are 80 series and no tombstones.
// - The tsi1 index will report that there are 160 series and 80 tombstones.
//
// The result is the same, but the implementation differs.
expS, expTS, expM, expTM := 160, 80, 10, 5
if index == inmem.IndexName {
expS, expTS, expM, expTM = 80, 0, 5, 0
}
if err := checkCardinalities(store.Store, expS, expTS, expM, expTM); err != nil {
return fmt.Errorf("[initial|re-open|delete|re-open] %v", err)
}
return nil
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
t.Fatal(err)
}
})
}
}
func TestStore_TagValues(t *testing.T) {
t.Parallel()
@ -1400,8 +1524,7 @@ func createTagValues(mname string, kvs map[string][]string) tsdb.TagValues {
func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) {
for _, index := range tsdb.RegisteredIndexes() {
store := NewStore()
store.EngineOptions.IndexVersion = index
store := NewStore(index)
if err := store.Open(); err != nil {
panic(err)
}
@ -1504,8 +1627,7 @@ func BenchmarkStore_TagValues(b *testing.B) {
var s *Store
setup := func(shards, measurements, tagValues int, index string, useRandom bool) []uint64 { // returns shard ids
s = NewStore()
s.EngineOptions.IndexVersion = index
s := NewStore(index)
if err := s.Open(); err != nil {
panic(err)
}
@ -1607,16 +1729,18 @@ func BenchmarkStore_TagValues(b *testing.B) {
// Store is a test wrapper for tsdb.Store.
type Store struct {
*tsdb.Store
index string
}
// NewStore returns a new instance of Store with a temporary path.
func NewStore() *Store {
func NewStore(index string) *Store {
path, err := ioutil.TempDir("", "influxdb-tsdb-")
if err != nil {
panic(err)
}
s := &Store{Store: tsdb.NewStore(path)}
s := &Store{Store: tsdb.NewStore(path), index: index}
s.EngineOptions.IndexVersion = index
s.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
s.EngineOptions.Config.TraceLoggingEnabled = true
@ -1630,8 +1754,7 @@ func NewStore() *Store {
// MustOpenStore returns a new, open Store using the specified index,
// at a temporary path.
func MustOpenStore(index string) *Store {
s := NewStore()
s.EngineOptions.IndexVersion = index
s := NewStore(index)
if err := s.Open(); err != nil {
panic(err)
@ -1646,7 +1769,13 @@ func (s *Store) Reopen() error {
}
s.Store = tsdb.NewStore(s.Path())
s.EngineOptions.IndexVersion = s.index
s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal")
s.EngineOptions.Config.TraceLoggingEnabled = true
if testing.Verbose() {
s.WithLogger(logger.New(os.Stdout))
}
return s.Store.Open()
}