Initial refactor of tsi1.Index

This commit carries out the initial refactor of the tsi1.Index into
tsi1.Partition. We then create a new tsi1.Index that will be an
abstraction over a collection of Partitions.
pull/9150/head
Edd Robinson 2017-09-27 15:09:44 +01:00 committed by Ben Johnson
parent fb646549f4
commit 7aa9de508d
No known key found for this signature in database
GPG Key ID: 81741CD251883081
7 changed files with 723 additions and 426 deletions

View File

@ -138,9 +138,10 @@ func (cmd *Command) readFileSet() (*tsi1.Index, *tsi1.FileSet, error) {
if err != nil {
return nil, nil, err
} else if fi.IsDir() {
idx := tsi1.NewIndex()
idx.Path = cmd.paths[0]
idx.CompactionEnabled = false
idx := tsi1.NewIndex(
tsi1.WithPath(cmd.paths[0]),
tsi1.WithCompactions(false),
)
if err := idx.Open(); err != nil {
return nil, nil, err
}

View File

@ -87,9 +87,10 @@ func (cmd *Command) run(dataDir, walDir string) error {
}
// Open TSI index in temporary path.
tsiIndex := tsi1.NewIndex()
tsiIndex.Path = tmpPath
tsiIndex.WithLogger(cmd.Logger)
tsiIndex := tsi1.NewIndex(
tsi1.WithPath(tmpPath),
tsi1.WithLogger(cmd.Logger),
)
cmd.Logger.Info("opening tsi index in temporary location", zap.String("path", tmpPath))
if err := tsiIndex.Open(); err != nil {
return err

283
tsdb/index/tsi1/index.go Normal file
View File

@ -0,0 +1,283 @@
package tsi1
import (
"errors"
"regexp"
"sync"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
"github.com/uber-go/zap"
)
// IndexName is the name of the index.
const IndexName = "tsi1"
// An IndexOption is a functional option for changing the configuration of
// an Index.
type IndexOption func(i *Index)
// WithPath sets the root path of the Index
var WithPath = func(path string) IndexOption {
return func(i *Index) {
i.path = path
}
}
// WithCompactions enables or disabled compactions on the Index.
var WithCompactions = func(enabled bool) IndexOption {
return func(i *Index) {
i.CompactionEnabled = enabled
}
}
// WithLogger sets the logger for the Index.
var WithLogger = func(l zap.Logger) IndexOption {
return func(i *Index) {
i.logger = l.With(zap.String("index", "tsi"))
}
}
// Index represents a collection of layered index files and WAL.
type Index struct {
mu sync.RWMutex
opened bool
// The following can be set when initialising an Index.
path string // Root directory of the index files.
CompactionEnabled bool // Frequency of compaction checks.
logger zap.Logger // Index's logger.
// Close management.
once sync.Once
closing chan struct{}
// Index's version.
version int
}
// NewIndex returns a new instance of Index.
func NewIndex(options ...IndexOption) *Index {
i := &Index{
closing: make(chan struct{}),
logger: zap.New(zap.NullEncoder()),
version: Version,
}
for _, option := range options {
option(i)
}
return i
}
// Type returns the type of Index this is.
func (i *Index) Type() string { return IndexName }
// Open opens the index.
func (i *Index) Open() error {
i.mu.Lock()
defer i.mu.Unlock()
if i.opened {
return errors.New("index already open")
}
// TODO(edd): Open all the Partitions.
// Mark opened.
i.opened = true
return nil
}
// Wait returns once outstanding compactions have finished.
func (i *Index) Wait() {
// TODO(edd): Wait on each partition.
}
// Close closes the index.
func (i *Index) Close() error {
// Wait for goroutines to finish.
i.once.Do(func() { close(i.closing) })
i.Wait()
// Lock index and close partitions.
i.mu.Lock()
defer i.mu.Unlock()
// TODO(edd): Close Partitions.
return nil
}
// RetainFileSet returns the current fileset for all partitions in the Index.
func (i *Index) RetainFileSet() *FileSet {
// TODO(edd): Merge all FileSets for all partitions. For the moment we will
// just append them from each partition.
return nil
}
// SetFieldSet sets a shared field set from the engine.
func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
i.mu.Lock()
// TODO(edd): set the field set on all the Partitions?
i.mu.Unlock()
}
// ForEachMeasurementName iterates over all measurement names in the index.
func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
// TODO(edd): Call on each partition. Could be done in parallel?
return nil
}
// MeasurementExists returns true if a measurement exists.
func (i *Index) MeasurementExists(name []byte) (bool, error) {
// TODO(edd): Call on each Partition. In parallel?
return false, nil
}
// MeasurementNamesByExpr returns measurement names for the provided expression.
func (i *Index) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
// TODO(edd): Call on each partition. Merge results.
return nil, nil
}
// MeasurementNamesByRegex returns measurement names for the provided regex.
func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
// TODO(edd): Call on each Partition. Merge results.
return nil, nil
}
// DropMeasurement deletes a measurement from the index.
func (i *Index) DropMeasurement(name []byte) error {
// TODO(edd): Call on each Partition. In parallel?
return nil
}
// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk.
func (i *Index) CreateSeriesListIfNotExists(_, names [][]byte, tagsSlice []models.Tags) error {
// TODO(edd): Call on correct Partition.
return nil
}
// InitializeSeries is a no-op. This only applies to the in-memory index.
func (i *Index) InitializeSeries(key, name []byte, tags models.Tags) error {
return nil
}
// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted.
func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
// TODO(edd): Call on correct Partition.
return nil
}
// DropSeries drops the provided series from the index.
func (i *Index) DropSeries(key []byte) error {
// TODO(edd): Call on correct Partition.
return nil
}
// MeasurementsSketches returns the two sketches for the index by merging all
// instances of the type sketch types in all the index files.
func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
// TODO(edd): Merge sketches for each partition.
return nil, nil, nil
}
// SeriesN returns the number of unique non-tombstoned series in the index.
// Since indexes are not shared across shards, the count returned by SeriesN
// cannot be combined with other shard's results. If you need to count series
// across indexes then use SeriesSketches and merge the results from other
// indexes.
func (i *Index) SeriesN() int64 {
// TODO(edd): Sum over all Partitions.
return 0
}
// HasTagKey returns true if tag key exists.
func (i *Index) HasTagKey(name, key []byte) (bool, error) {
// TODO(edd): Check on each Partition? In parallel?
return false, nil
}
// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
// TODO(edd): Call on each partition. Merge results.
return nil, nil
}
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
//
// See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this
// method.
func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
// TODO(edd): Merge results from each Partition.
return nil, nil
}
// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies
// the provided function.
func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
// TODO(edd): Apply fn on each Partition. In parallel?
return nil
}
// TagKeyCardinality always returns zero.
// It is not possible to determine cardinality of tags across index files, and
// thus it cannot be done across partitions.
func (i *Index) TagKeyCardinality(name, key []byte) int {
return 0
}
// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
func (i *Index) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
// TODO(edd): Merge results from each Partition.
return nil, nil
}
// TagSets returns an ordered list of tag sets for a measurement by dimension
// and filtered by an optional conditional expression.
func (i *Index) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
// TODO(edd): Merge results from each Partition.
return nil, nil
}
// SnapshotTo creates hard links to the file set into path.
func (i *Index) SnapshotTo(path string) error {
// TODO(edd): Call on each Partition.
return nil
}
// SetFieldName is a no-op on tsi1.
func (i *Index) SetFieldName(measurement []byte, name string) {}
// RemoveShard is a no-op on tsi1.
func (i *Index) RemoveShard(shardID uint64) {}
// AssignShard is a no-op on tsi1.
func (i *Index) AssignShard(k string, shardID uint64) {}
// UnassignShard simply calls into DropSeries.
func (i *Index) UnassignShard(k string, shardID uint64) error {
// This can be called directly once inmem is gone.
return i.DropSeries([]byte(k))
}
// SeriesPointIterator returns an influxql iterator over all series.
func (i *Index) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) {
// TODO(edd): Create iterators for each Partition and return a merged
// iterator.
return nil, nil
}
// Compact requests a compaction of log files.
func (i *Index) Compact() {
// TODO(edd): Request compactions on each Partition?
}
// Rebuild is a no-op on tsi1.
func (i *Index) Rebuild() {}

View File

@ -235,14 +235,6 @@ func (p IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo,
for m := mitr.Next(); m != nil; m = mitr.Next() {
name := m.Name()
// Look-up series ids.
itr := p.MeasurementSeriesIDIterator(name)
var seriesIDs []uint32
for e := itr.Next(); e.SeriesID != 0; e = itr.Next() {
seriesIDs = append(seriesIDs, e.SeriesID)
}
sort.Sort(uint32Slice(seriesIDs))
// Look-up series ids.
itr := p.MeasurementSeriesIDIterator(name)
var seriesIDs []uint64

View File

@ -0,0 +1,330 @@
package tsi1_test
import (
"fmt"
"os"
"reflect"
"regexp"
"testing"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
"github.com/influxdata/influxql"
)
// Bloom filter settings used in tests.
const M, K = 4096, 6
// Ensure index can iterate over all measurement names.
func TestIndex_ForEachMeasurementName(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Add series to index.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
}); err != nil {
t.Fatal(err)
}
// Verify measurements are returned.
idx.Run(t, func(t *testing.T) {
var names []string
if err := idx.ForEachMeasurementName(func(name []byte) error {
names = append(names, string(name))
return nil
}); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(names, []string{"cpu", "mem"}) {
t.Fatalf("unexpected names: %#v", names)
}
})
// Add more series.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("disk")},
{Name: []byte("mem")},
}); err != nil {
t.Fatal(err)
}
// Verify new measurements.
idx.Run(t, func(t *testing.T) {
var names []string
if err := idx.ForEachMeasurementName(func(name []byte) error {
names = append(names, string(name))
return nil
}); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(names, []string{"cpu", "disk", "mem"}) {
t.Fatalf("unexpected names: %#v", names)
}
})
}
// Ensure index can return whether a measurement exists.
func TestIndex_MeasurementExists(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Add series to index.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
}); err != nil {
t.Fatal(err)
}
// Verify measurement exists.
idx.Run(t, func(t *testing.T) {
if v, err := idx.MeasurementExists([]byte("cpu")); err != nil {
t.Fatal(err)
} else if !v {
t.Fatal("expected measurement to exist")
}
})
// Delete one series.
if err := idx.DropSeries(models.MakeKey([]byte("cpu"), models.NewTags(map[string]string{"region": "east"}))); err != nil {
t.Fatal(err)
}
// Verify measurement still exists.
idx.Run(t, func(t *testing.T) {
if v, err := idx.MeasurementExists([]byte("cpu")); err != nil {
t.Fatal(err)
} else if !v {
t.Fatal("expected measurement to still exist")
}
})
// Delete second series.
if err := idx.DropSeries(models.MakeKey([]byte("cpu"), models.NewTags(map[string]string{"region": "west"}))); err != nil {
t.Fatal(err)
}
// Verify measurement is now deleted.
idx.Run(t, func(t *testing.T) {
if v, err := idx.MeasurementExists([]byte("cpu")); err != nil {
t.Fatal(err)
} else if v {
t.Fatal("expected measurement to be deleted")
}
})
}
// Ensure index can return a list of matching measurements.
func TestIndex_MeasurementNamesByExpr(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Add series to index.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("disk"), Tags: models.NewTags(map[string]string{"region": "north"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "west", "country": "us"})},
}); err != nil {
t.Fatal(err)
}
// Retrieve measurements by expression
idx.Run(t, func(t *testing.T) {
t.Run("EQ", func(t *testing.T) {
names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`region = 'west'`))
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) {
t.Fatalf("unexpected names: %v", names)
}
})
t.Run("NEQ", func(t *testing.T) {
names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`region != 'east'`))
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, [][]byte{[]byte("disk"), []byte("mem")}) {
t.Fatalf("unexpected names: %v", names)
}
})
t.Run("EQREGEX", func(t *testing.T) {
names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`region =~ /east|west/`))
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) {
t.Fatalf("unexpected names: %v", names)
}
})
t.Run("NEQREGEX", func(t *testing.T) {
names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`country !~ /^u/`))
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("disk")}) {
t.Fatalf("unexpected names: %v", names)
}
})
})
}
// Ensure index can return a list of matching measurements.
func TestIndex_MeasurementNamesByRegex(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Add series to index.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu")},
{Name: []byte("disk")},
{Name: []byte("mem")},
}); err != nil {
t.Fatal(err)
}
// Retrieve measurements by regex.
idx.Run(t, func(t *testing.T) {
names, err := idx.MeasurementNamesByRegex(regexp.MustCompile(`cpu|mem`))
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) {
t.Fatalf("unexpected names: %v", names)
}
})
}
// Ensure index can delete a measurement and all related keys, values, & series.
func TestIndex_DropMeasurement(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Add series to index.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("disk"), Tags: models.NewTags(map[string]string{"region": "north"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "west", "country": "us"})},
}); err != nil {
t.Fatal(err)
}
// Drop measurement.
if err := idx.DropMeasurement([]byte("cpu")); err != nil {
t.Fatal(err)
}
// Verify data is gone in each stage.
idx.Run(t, func(t *testing.T) {
// Verify measurement is gone.
if v, err := idx.MeasurementExists([]byte("cpu")); err != nil {
t.Fatal(err)
} else if v {
t.Fatal("expected no measurement")
}
// Obtain file set to perform lower level checks.
fs := idx.RetainFileSet()
defer fs.Release()
// Verify tags & values are gone.
if e := fs.TagKeyIterator([]byte("cpu")).Next(); e != nil && !e.Deleted() {
t.Fatal("expected deleted tag key")
}
if itr := fs.TagValueIterator([]byte("cpu"), []byte("region")); itr != nil {
t.Fatal("expected nil tag value iterator")
}
})
}
// Index is a test wrapper for tsi1.Index.
type Index struct {
Path string
*tsi1.Index
}
// NewIndex returns a new instance of Index at a temporary path.
func NewIndex() *Index {
var idx = &Index{Path: MustTempDir()}
idx.Index = tsi1.NewIndex(tsi1.WithPath(idx.Path))
return idx
}
// MustOpenIndex returns a new, open index. Panic on error.
func MustOpenIndex() *Index {
idx := NewIndex()
if err := idx.Open(); err != nil {
panic(err)
}
return idx
}
// Close closes and removes the index directory.
func (idx *Index) Close() error {
defer os.RemoveAll(idx.Path)
return idx.Index.Close()
}
// Reopen closes and opens the index.
func (idx *Index) Reopen() error {
if err := idx.Index.Close(); err != nil {
return err
}
path := idx.Path
idx.Index = tsi1.NewIndex()
idx.Path = path
if err := idx.Open(); err != nil {
return err
}
return nil
}
// Run executes a subtest for each of several different states:
//
// - Immediately
// - After reopen
// - After compaction
// - After reopen again
//
// The index should always respond in the same fashion regardless of
// how data is stored. This helper allows the index to be easily tested
// in all major states.
func (idx *Index) Run(t *testing.T, fn func(t *testing.T)) {
// Invoke immediately.
t.Run("state=initial", fn)
// Reopen and invoke again.
if err := idx.Reopen(); err != nil {
t.Fatalf("reopen error: %s", err)
}
t.Run("state=reopen", fn)
// TODO: Request a compaction.
// if err := idx.Compact(); err != nil {
// t.Fatalf("compact error: %s", err)
// }
// t.Run("state=post-compaction", fn)
// Reopen and invoke again.
if err := idx.Reopen(); err != nil {
t.Fatalf("post-compaction reopen error: %s", err)
}
t.Run("state=post-compaction-reopen", fn)
}
// CreateSeriesSliceIfNotExists creates multiple series at a time.
func (idx *Index) CreateSeriesSliceIfNotExists(a []Series) error {
for i, s := range a {
if err := idx.CreateSeriesListIfNotExists(nil, [][]byte{s.Name}, []models.Tags{s.Tags}); err != nil {
return fmt.Errorf("i=%d, name=%s, tags=%v, err=%s", i, s.Name, s.Tags, err)
}
}
return nil
}

View File

@ -24,29 +24,11 @@ import (
"github.com/uber-go/zap"
)
const (
// IndexName is the name of the index.
IndexName = "tsi1"
// Version is the current version of the TSI index.
const Version = 1
// Version is the current version of the TSI index.
Version = 1
)
// Default compaction thresholds.
const (
DefaultMaxLogFileSize = 5 * 1024 * 1024
)
func init() {
tsdb.RegisterIndex(IndexName, func(id uint64, database, path string, opt tsdb.EngineOptions) tsdb.Index {
idx := NewIndex()
idx.ShardID = id
idx.Database = database
idx.Path = path
idx.options = opt
return idx
})
}
// DefaultMaxLogFileSize is the default compaction threshold.
const DefaultMaxLogFileSize = 5 * 1024 * 1024
// File extensions.
const (
@ -56,22 +38,16 @@ const (
CompactingExt = ".compacting"
)
const (
// ManifestFileName is the name of the index manifest file.
ManifestFileName = "MANIFEST"
// ManifestFileName is the name of the index manifest file.
const ManifestFileName = "MANIFEST"
// SeriesFileName is the name of the series file.
SeriesFileName = "series"
)
// SeriesFileName is the name of the series file.
const SeriesFileName = "series"
// Ensure index implements the interface.
var _ tsdb.Index = &Index{}
// Index represents a collection of layered index files and WAL.
type Index struct {
mu sync.RWMutex
opened bool
options tsdb.EngineOptions
// Partition represents a collection of layered index files and WAL.
type Partition struct {
mu sync.RWMutex
opened bool
sfile *SeriesFile // series lookup file
activeLogFile *LogFile // current log file
@ -90,9 +66,6 @@ type Index struct {
// Fieldset shared with engine.
fieldset *tsdb.MeasurementFieldSet
// Associated shard info.
ShardID uint64
// Name of database.
Database string
@ -112,9 +85,9 @@ type Index struct {
version int
}
// NewIndex returns a new instance of Index.
func NewIndex() *Index {
return &Index{
// NewPartition returns a new instance of Partition.
func NewPartition() *Partition {
return &Partition{
closing: make(chan struct{}),
// Default compaction thresholds.
@ -130,15 +103,13 @@ func NewIndex() *Index {
// incompatible tsi1 manifest file.
var ErrIncompatibleVersion = errors.New("incompatible tsi1 index MANIFEST")
func (i *Index) Type() string { return IndexName }
// Open opens the index.
func (i *Index) Open() error {
// Open opens the partition.
func (i *Partition) Open() error {
i.mu.Lock()
defer i.mu.Unlock()
if i.opened {
return errors.New("index already open")
return errors.New("index partition already open")
}
// Create directory if it doesn't exist.
@ -229,7 +200,7 @@ func (i *Index) Open() error {
}
// openLogFile opens a log file and appends it to the index.
func (i *Index) openLogFile(path string) (*LogFile, error) {
func (i *Partition) openLogFile(path string) (*LogFile, error) {
f := NewLogFile(i.sfile, path)
if err := f.Open(); err != nil {
return nil, err
@ -238,7 +209,7 @@ func (i *Index) openLogFile(path string) (*LogFile, error) {
}
// openIndexFile opens a log file and appends it to the index.
func (i *Index) openIndexFile(path string) (*IndexFile, error) {
func (i *Partition) openIndexFile(path string) (*IndexFile, error) {
f := NewIndexFile(i.sfile)
f.SetPath(path)
if err := f.Open(); err != nil {
@ -248,7 +219,7 @@ func (i *Index) openIndexFile(path string) (*IndexFile, error) {
}
// deleteNonManifestFiles removes all files not in the manifest.
func (i *Index) deleteNonManifestFiles(m *Manifest) error {
func (i *Partition) deleteNonManifestFiles(m *Manifest) error {
dir, err := os.Open(i.Path)
if err != nil {
return err
@ -276,12 +247,12 @@ func (i *Index) deleteNonManifestFiles(m *Manifest) error {
}
// Wait returns once outstanding compactions have finished.
func (i *Index) Wait() {
func (i *Partition) Wait() {
i.wg.Wait()
}
// Close closes the index.
func (i *Index) Close() error {
func (i *Partition) Close() error {
// Wait for goroutines to finish.
i.once.Do(func() { close(i.closing) })
i.wg.Wait()
@ -300,29 +271,29 @@ func (i *Index) Close() error {
}
// NextSequence returns the next file identifier.
func (i *Index) NextSequence() int {
func (i *Partition) NextSequence() int {
i.mu.Lock()
defer i.mu.Unlock()
return i.nextSequence()
}
func (i *Index) nextSequence() int {
func (i *Partition) nextSequence() int {
i.seq++
return i.seq
}
// ManifestPath returns the path to the index's manifest file.
func (i *Index) ManifestPath() string {
func (i *Partition) ManifestPath() string {
return filepath.Join(i.Path, ManifestFileName)
}
// SeriesFilePath returns the path to the index's series file.
func (i *Index) SeriesFilePath() string {
func (i *Partition) SeriesFilePath() string {
return filepath.Join(i.Path, SeriesFileName)
}
// Manifest returns a manifest for the index.
func (i *Index) Manifest() *Manifest {
func (i *Partition) Manifest() *Manifest {
m := &Manifest{
Levels: i.levels,
Files: make([]string, len(i.fileSet.files)),
@ -337,41 +308,41 @@ func (i *Index) Manifest() *Manifest {
}
// writeManifestFile writes the manifest to the appropriate file path.
func (i *Index) writeManifestFile() error {
func (i *Partition) writeManifestFile() error {
return WriteManifestFile(i.ManifestPath(), i.Manifest())
}
// WithLogger sets the logger for the index.
func (i *Index) WithLogger(logger zap.Logger) {
func (i *Partition) WithLogger(logger zap.Logger) {
i.logger = logger.With(zap.String("index", "tsi"))
}
// SetFieldSet sets a shared field set from the engine.
func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
func (i *Partition) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
i.mu.Lock()
i.fieldset = fs
i.mu.Unlock()
}
// RetainFileSet returns the current fileset and adds a reference count.
func (i *Index) RetainFileSet() *FileSet {
func (i *Partition) RetainFileSet() *FileSet {
i.mu.RLock()
fs := i.retainFileSet()
i.mu.RUnlock()
return fs
}
func (i *Index) retainFileSet() *FileSet {
func (i *Partition) retainFileSet() *FileSet {
fs := i.fileSet
fs.Retain()
return fs
}
// FileN returns the active files in the file set.
func (i *Index) FileN() int { return len(i.fileSet.files) }
func (i *Partition) FileN() int { return len(i.fileSet.files) }
// prependActiveLogFile adds a new log file so that the current log file can be compacted.
func (i *Index) prependActiveLogFile() error {
func (i *Partition) prependActiveLogFile() error {
// Open file and insert it into the first position.
f, err := i.openLogFile(filepath.Join(i.Path, FormatLogFileName(i.nextSequence())))
if err != nil {
@ -392,7 +363,7 @@ func (i *Index) prependActiveLogFile() error {
}
// ForEachMeasurementName iterates over all measurement names in the index.
func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
func (i *Partition) ForEachMeasurementName(fn func(name []byte) error) error {
fs := i.RetainFileSet()
defer fs.Release()
@ -411,14 +382,14 @@ func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
}
// MeasurementExists returns true if a measurement exists.
func (i *Index) MeasurementExists(name []byte) (bool, error) {
func (i *Partition) MeasurementExists(name []byte) (bool, error) {
fs := i.RetainFileSet()
defer fs.Release()
m := fs.Measurement(name)
return m != nil && !m.Deleted(), nil
}
func (i *Index) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
func (i *Partition) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
fs := i.RetainFileSet()
defer fs.Release()
@ -428,7 +399,7 @@ func (i *Index) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
return bytesutil.CloneSlice(names), err
}
func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
func (i *Partition) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
fs := i.RetainFileSet()
defer fs.Release()
@ -448,7 +419,7 @@ func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
}
// DropMeasurement deletes a measurement from the index.
func (i *Index) DropMeasurement(name []byte) error {
func (i *Partition) DropMeasurement(name []byte) error {
fs := i.RetainFileSet()
defer fs.Release()
@ -516,7 +487,7 @@ func (i *Index) DropMeasurement(name []byte) error {
}
// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk.
func (i *Index) CreateSeriesListIfNotExists(_, names [][]byte, tagsSlice []models.Tags) error {
func (i *Partition) CreateSeriesListIfNotExists(_, names [][]byte, tagsSlice []models.Tags) error {
// All slices must be of equal length.
if len(names) != len(tagsSlice) {
return errors.New("names/tags length mismatch")
@ -539,16 +510,16 @@ func (i *Index) CreateSeriesListIfNotExists(_, names [][]byte, tagsSlice []model
}
// InitializeSeries is a no-op. This only applies to the in-memory index.
func (i *Index) InitializeSeries(key, name []byte, tags models.Tags) error {
func (i *Partition) InitializeSeries(key, name []byte, tags models.Tags) error {
return nil
}
// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted.
func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
func (i *Partition) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
return i.CreateSeriesListIfNotExists(nil, [][]byte{name}, []models.Tags{tags})
}
func (i *Index) DropSeries(key []byte) error {
func (i *Partition) DropSeries(key []byte) error {
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
@ -592,7 +563,7 @@ func (i *Index) DropSeries(key []byte) error {
// MeasurementsSketches returns the two sketches for the index by merging all
// instances of the type sketch types in all the index files.
func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
func (i *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
fs := i.RetainFileSet()
defer fs.Release()
return fs.MeasurementsSketches()
@ -603,19 +574,19 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro
// cannot be combined with other shard's results. If you need to count series
// across indexes then use SeriesSketches and merge the results from other
// indexes.
func (i *Index) SeriesN() int64 {
func (i *Partition) SeriesN() int64 {
return int64(i.sfile.SeriesCount())
}
// HasTagKey returns true if tag key exists.
func (i *Index) HasTagKey(name, key []byte) (bool, error) {
func (i *Partition) HasTagKey(name, key []byte) (bool, error) {
fs := i.RetainFileSet()
defer fs.Release()
return fs.HasTagKey(name, key), nil
}
// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
func (i *Partition) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
fs := i.RetainFileSet()
defer fs.Release()
return fs.MeasurementTagKeysByExpr(name, expr)
@ -625,7 +596,7 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
//
// See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this
// method.
func (i *Index) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
func (i *Partition) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
fs := i.RetainFileSet()
defer fs.Release()
@ -685,7 +656,7 @@ func (i *Index) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte
}
// ForEachMeasurementTagKey iterates over all tag keys in a measurement.
func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
func (i *Partition) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
fs := i.RetainFileSet()
defer fs.Release()
@ -705,12 +676,12 @@ func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error)
// TagKeyCardinality always returns zero.
// It is not possible to determine cardinality of tags across index files.
func (i *Index) TagKeyCardinality(name, key []byte) int {
func (i *Partition) TagKeyCardinality(name, key []byte) int {
return 0
}
// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
func (i *Index) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
func (i *Partition) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
fs := i.RetainFileSet()
defer fs.Release()
@ -722,7 +693,7 @@ func (i *Index) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([]
// TagSets returns an ordered list of tag sets for a measurement by dimension
// and filtered by an optional conditional expression.
func (i *Index) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
func (i *Partition) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
fs := i.RetainFileSet()
defer fs.Release()
@ -811,7 +782,7 @@ func (i *Index) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet
}
// SnapshotTo creates hard links to the file set into path.
func (i *Index) SnapshotTo(path string) error {
func (i *Partition) SnapshotTo(path string) error {
i.mu.Lock()
defer i.mu.Unlock()
@ -847,31 +818,31 @@ func (i *Index) SnapshotTo(path string) error {
return nil
}
func (i *Index) SetFieldName(measurement []byte, name string) {}
func (i *Index) RemoveShard(shardID uint64) {}
func (i *Index) AssignShard(k string, shardID uint64) {}
func (i *Partition) SetFieldName(measurement []byte, name string) {}
func (i *Partition) RemoveShard(shardID uint64) {}
func (i *Partition) AssignShard(k string, shardID uint64) {}
func (i *Index) UnassignShard(k string, shardID uint64) error {
func (i *Partition) UnassignShard(k string, shardID uint64) error {
// This can be called directly once inmem is gone.
return i.DropSeries([]byte(k))
}
// SeriesPointIterator returns an influxql iterator over all series.
func (i *Index) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) {
func (i *Partition) SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error) {
// NOTE: The iterator handles releasing the file set.
fs := i.RetainFileSet()
return newSeriesPointIterator(fs, i.fieldset, opt), nil
}
// Compact requests a compaction of log files.
func (i *Index) Compact() {
func (i *Partition) Compact() {
i.mu.Lock()
defer i.mu.Unlock()
i.compact()
}
// compact compacts continguous groups of files that are not currently compacting.
func (i *Index) compact() {
func (i *Partition) compact() {
if !i.CompactionEnabled {
return
}
@ -927,7 +898,7 @@ func (i *Index) compact() {
// compactToLevel compacts a set of files into a new file. Replaces old files with
// compacted file on successful completion. This runs in a separate goroutine.
func (i *Index) compactToLevel(files []*IndexFile, level int) {
func (i *Partition) compactToLevel(files []*IndexFile, level int) {
assert(len(files) >= 2, "at least two index files are required for compaction")
assert(level > 0, "cannot compact level zero")
@ -1022,9 +993,9 @@ func (i *Index) compactToLevel(files []*IndexFile, level int) {
}
}
func (i *Index) Rebuild() {}
func (i *Partition) Rebuild() {}
func (i *Index) CheckLogFile() error {
func (i *Partition) CheckLogFile() error {
// Check log file size under read lock.
if size := func() int64 {
i.mu.RLock()
@ -1040,7 +1011,7 @@ func (i *Index) CheckLogFile() error {
return i.checkLogFile()
}
func (i *Index) checkLogFile() error {
func (i *Partition) checkLogFile() error {
if i.activeLogFile.Size() < i.MaxLogFileSize {
return nil
}
@ -1067,7 +1038,7 @@ func (i *Index) checkLogFile() error {
// compactLogFile compacts f into a tsi file. The new file will share the
// same identifier but will have a ".tsi" extension. Once the log file is
// compacted then the manifest is updated and the log file is discarded.
func (i *Index) compactLogFile(logFile *LogFile) {
func (i *Partition) compactLogFile(logFile *LogFile) {
start := time.Now()
// Retrieve identifier from current path.

View File

@ -5,277 +5,42 @@ import (
"io/ioutil"
"os"
"path/filepath"
"reflect"
"regexp"
"testing"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
"github.com/influxdata/influxql"
)
// Bloom filter settings used in tests.
const M, K = 4096, 6
// Ensure index can iterate over all measurement names.
func TestIndex_ForEachMeasurementName(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Add series to index.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
}); err != nil {
t.Fatal(err)
}
// Verify measurements are returned.
idx.Run(t, func(t *testing.T) {
var names []string
if err := idx.ForEachMeasurementName(func(name []byte) error {
names = append(names, string(name))
return nil
}); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(names, []string{"cpu", "mem"}) {
t.Fatalf("unexpected names: %#v", names)
}
})
// Add more series.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("disk")},
{Name: []byte("mem")},
}); err != nil {
t.Fatal(err)
}
// Verify new measurements.
idx.Run(t, func(t *testing.T) {
var names []string
if err := idx.ForEachMeasurementName(func(name []byte) error {
names = append(names, string(name))
return nil
}); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(names, []string{"cpu", "disk", "mem"}) {
t.Fatalf("unexpected names: %#v", names)
}
})
}
// Ensure index can return whether a measurement exists.
func TestIndex_MeasurementExists(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Add series to index.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
}); err != nil {
t.Fatal(err)
}
// Verify measurement exists.
idx.Run(t, func(t *testing.T) {
if v, err := idx.MeasurementExists([]byte("cpu")); err != nil {
t.Fatal(err)
} else if !v {
t.Fatal("expected measurement to exist")
}
})
// Delete one series.
if err := idx.DropSeries(models.MakeKey([]byte("cpu"), models.NewTags(map[string]string{"region": "east"}))); err != nil {
t.Fatal(err)
}
// Verify measurement still exists.
idx.Run(t, func(t *testing.T) {
if v, err := idx.MeasurementExists([]byte("cpu")); err != nil {
t.Fatal(err)
} else if !v {
t.Fatal("expected measurement to still exist")
}
})
// Delete second series.
if err := idx.DropSeries(models.MakeKey([]byte("cpu"), models.NewTags(map[string]string{"region": "west"}))); err != nil {
t.Fatal(err)
}
// Verify measurement is now deleted.
idx.Run(t, func(t *testing.T) {
if v, err := idx.MeasurementExists([]byte("cpu")); err != nil {
t.Fatal(err)
} else if v {
t.Fatal("expected measurement to be deleted")
}
})
}
// Ensure index can return a list of matching measurements.
func TestIndex_MeasurementNamesByExpr(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Add series to index.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("disk"), Tags: models.NewTags(map[string]string{"region": "north"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "west", "country": "us"})},
}); err != nil {
t.Fatal(err)
}
// Retrieve measurements by expression
idx.Run(t, func(t *testing.T) {
t.Run("EQ", func(t *testing.T) {
names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`region = 'west'`))
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) {
t.Fatalf("unexpected names: %v", names)
}
})
t.Run("NEQ", func(t *testing.T) {
names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`region != 'east'`))
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, [][]byte{[]byte("disk"), []byte("mem")}) {
t.Fatalf("unexpected names: %v", names)
}
})
t.Run("EQREGEX", func(t *testing.T) {
names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`region =~ /east|west/`))
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) {
t.Fatalf("unexpected names: %v", names)
}
})
t.Run("NEQREGEX", func(t *testing.T) {
names, err := idx.MeasurementNamesByExpr(influxql.MustParseExpr(`country !~ /^u/`))
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("disk")}) {
t.Fatalf("unexpected names: %v", names)
}
})
})
}
// Ensure index can return a list of matching measurements.
func TestIndex_MeasurementNamesByRegex(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Add series to index.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu")},
{Name: []byte("disk")},
{Name: []byte("mem")},
}); err != nil {
t.Fatal(err)
}
// Retrieve measurements by regex.
idx.Run(t, func(t *testing.T) {
names, err := idx.MeasurementNamesByRegex(regexp.MustCompile(`cpu|mem`))
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, [][]byte{[]byte("cpu"), []byte("mem")}) {
t.Fatalf("unexpected names: %v", names)
}
})
}
// Ensure index can delete a measurement and all related keys, values, & series.
func TestIndex_DropMeasurement(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Add series to index.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("disk"), Tags: models.NewTags(map[string]string{"region": "north"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "west", "country": "us"})},
}); err != nil {
t.Fatal(err)
}
// Drop measurement.
if err := idx.DropMeasurement([]byte("cpu")); err != nil {
t.Fatal(err)
}
// Verify data is gone in each stage.
idx.Run(t, func(t *testing.T) {
// Verify measurement is gone.
if v, err := idx.MeasurementExists([]byte("cpu")); err != nil {
t.Fatal(err)
} else if v {
t.Fatal("expected no measurement")
}
// Obtain file set to perform lower level checks.
fs := idx.RetainFileSet()
defer fs.Release()
// Verify tags & values are gone.
if e := fs.TagKeyIterator([]byte("cpu")).Next(); e != nil && !e.Deleted() {
t.Fatal("expected deleted tag key")
}
if itr := fs.TagValueIterator([]byte("cpu"), []byte("region")); itr != nil {
t.Fatal("expected nil tag value iterator")
}
})
}
func TestIndex_Open(t *testing.T) {
func TestPartition_Open(t *testing.T) {
// Opening a fresh index should set the MANIFEST version to current version.
idx := NewIndex()
p := NewPartition()
t.Run("open new index", func(t *testing.T) {
if err := idx.Open(); err != nil {
if err := p.Open(); err != nil {
t.Fatal(err)
}
// Check version set appropriately.
if got, exp := idx.Manifest().Version, 1; got != exp {
if got, exp := p.Manifest().Version, 1; got != exp {
t.Fatalf("got index version %d, expected %d", got, exp)
}
})
// Reopening an open index should return an error.
t.Run("reopen open index", func(t *testing.T) {
err := idx.Open()
err := p.Open()
if err == nil {
idx.Close()
p.Close()
t.Fatal("didn't get an error on reopen, but expected one")
}
idx.Close()
p.Close()
})
// Opening an incompatible index should return an error.
incompatibleVersions := []int{-1, 0, 2}
for _, v := range incompatibleVersions {
t.Run(fmt.Sprintf("incompatible index version: %d", v), func(t *testing.T) {
idx = NewIndex()
p = NewPartition()
// Manually create a MANIFEST file for an incompatible index version.
mpath := filepath.Join(idx.Path, tsi1.ManifestFileName)
mpath := filepath.Join(p.Path, tsi1.ManifestFileName)
m := tsi1.NewManifest()
m.Levels = nil
m.Version = v // Set example MANIFEST version.
@ -292,105 +57,59 @@ func TestIndex_Open(t *testing.T) {
// Opening this index should return an error because the MANIFEST has an
// incompatible version.
err = idx.Open()
err = p.Open()
if err != tsi1.ErrIncompatibleVersion {
idx.Close()
p.Close()
t.Fatalf("got error %v, expected %v", err, tsi1.ErrIncompatibleVersion)
}
})
}
}
func TestIndex_Manifest(t *testing.T) {
func TestPartition_Manifest(t *testing.T) {
t.Run("current MANIFEST", func(t *testing.T) {
idx := MustOpenIndex()
if got, exp := idx.Manifest().Version, tsi1.Version; got != exp {
p := MustOpenPartition()
if got, exp := p.Manifest().Version, tsi1.Version; got != exp {
t.Fatalf("got MANIFEST version %d, expected %d", got, exp)
}
})
}
// Index is a test wrapper for tsi1.Index.
type Index struct {
*tsi1.Index
// Partition is a test wrapper for tsi1.Partition.
type Partition struct {
*tsi1.Partition
}
// NewIndex returns a new instance of Index at a temporary path.
func NewIndex() *Index {
idx := &Index{Index: tsi1.NewIndex()}
idx.Path = MustTempDir()
return idx
// NewPartition returns a new instance of Partition at a temporary path.
func NewPartition() *Partition {
p := &Partition{Partition: tsi1.NewPartition()}
p.Path = MustTempDir()
return p
}
// MustOpenIndex returns a new, open index. Panic on error.
func MustOpenIndex() *Index {
idx := NewIndex()
if err := idx.Open(); err != nil {
// MustOpenPartition returns a new, open index. Panic on error.
func MustOpenPartition() *Partition {
p := NewPartition()
if err := p.Open(); err != nil {
panic(err)
}
return idx
return p
}
// Close closes and removes the index directory.
func (idx *Index) Close() error {
defer os.RemoveAll(idx.Path)
return idx.Index.Close()
func (p *Partition) Close() error {
defer os.RemoveAll(p.Path)
return p.Partition.Close()
}
// Reopen closes and opens the index.
func (idx *Index) Reopen() error {
if err := idx.Index.Close(); err != nil {
func (p *Partition) Reopen() error {
if err := p.Partition.Close(); err != nil {
return err
}
path := idx.Path
idx.Index = tsi1.NewIndex()
idx.Path = path
if err := idx.Open(); err != nil {
return err
}
return nil
}
// Run executes a subtest for each of several different states:
//
// - Immediately
// - After reopen
// - After compaction
// - After reopen again
//
// The index should always respond in the same fashion regardless of
// how data is stored. This helper allows the index to be easily tested
// in all major states.
func (idx *Index) Run(t *testing.T, fn func(t *testing.T)) {
// Invoke immediately.
t.Run("state=initial", fn)
// Reopen and invoke again.
if err := idx.Reopen(); err != nil {
t.Fatalf("reopen error: %s", err)
}
t.Run("state=reopen", fn)
// TODO: Request a compaction.
// if err := idx.Compact(); err != nil {
// t.Fatalf("compact error: %s", err)
// }
// t.Run("state=post-compaction", fn)
// Reopen and invoke again.
if err := idx.Reopen(); err != nil {
t.Fatalf("post-compaction reopen error: %s", err)
}
t.Run("state=post-compaction-reopen", fn)
}
// CreateSeriesSliceIfNotExists creates multiple series at a time.
func (idx *Index) CreateSeriesSliceIfNotExists(a []Series) error {
for i, s := range a {
if err := idx.CreateSeriesListIfNotExists(nil, [][]byte{s.Name}, []models.Tags{s.Tags}); err != nil {
return fmt.Errorf("i=%d, name=%s, tags=%v, err=%s", i, s.Name, s.Tags, err)
}
}
return nil
path := p.Path
p.Partition = tsi1.NewPartition()
p.Path = path
return p.Open()
}