Skeleton storage engine

pull/10616/head
Edd Robinson 2018-10-02 16:06:37 +01:00
parent 8c338df860
commit 981b2cdbea
13 changed files with 172 additions and 186 deletions

15
storage/config.go Normal file
View File

@ -0,0 +1,15 @@
package storage
import (
"github.com/influxdata/platform/tsdb"
"github.com/influxdata/platform/tsdb/index/tsi1"
)
// Config defaults
const ()
// Config holds the configuration for an Engine.
type Config struct {
EngineOptions tsdb.EngineOptions `toml:"-"`
Index tsi1.Config `toml:"index"`
}

60
storage/engine.go Normal file
View File

@ -0,0 +1,60 @@
package storage
import (
"path/filepath"
"github.com/influxdata/platform/tsdb"
"github.com/influxdata/platform/tsdb/index/tsi1"
"github.com/influxdata/platform/tsdb/tsm1"
"go.uber.org/zap"
)
type Engine struct {
config Config
index *tsi1.Index
sfile *tsdb.SeriesFile
engine *tsm1.Engine
logger *zap.Logger
}
func NewEngine(path string, c Config) *Engine {
e := &Engine{
sfile: tsdb.NewSeriesFile(filepath.Join(path, tsdb.SeriesFileDirectory)),
logger: zap.NewNop(),
}
// Initialise index.
index := tsi1.NewIndex(e.sfile, "remove me", c.Index,
tsi1.WithPath(filepath.Join(path, tsi1.DefaultIndexDirectoryName)),
)
e.index = index
// Initialise Engine
engine := tsm1.NewEngine(0, tsdb.Index(e.index), filepath.Join(path, "data"), "remove-me-wal", e.sfile, c.EngineOptions)
// TODO(edd): Once the tsdb.Engine abstraction is gone, this won't be needed.
e.engine = engine.(*tsm1.Engine)
return e
}
// WithLogger sets the logger on the Store. It must be called before Open.
func (e *Engine) WithLogger(log *zap.Logger) {
e.logger = log.With(zap.String("service", "storage"))
e.sfile.WithLogger(e.logger)
e.index.WithLogger(e.logger)
e.engine.WithLogger(e.logger)
}
// Open opens the store and all underlying resources. It returns an error if
// any of the underlying systems fail to open.
func (e *Engine) Open() error {
return nil
}
// Close closes the store and all underlying resources. It returns an error if
// any of the underlying systems fail to close.
func (e *Engine) Close() error {
return nil
}

View File

@ -54,10 +54,6 @@ const (
// DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions
// that can run at one time. A value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime.
DefaultMaxConcurrentCompactions = 0
// DefaultMaxIndexLogFileSize is the default threshold, in bytes, when an index
// write-ahead log file will compact into an index file.
DefaultMaxIndexLogFileSize = 1 * 1024 * 1024 // 1MB
)
// Config holds the configuration for the tsbd package.
@ -88,12 +84,6 @@ type Config struct {
// not affected by this limit. A value of 0 limits compactions to runtime.GOMAXPROCS(0).
MaxConcurrentCompactions int `toml:"max-concurrent-compactions"`
// MaxIndexLogFileSize is the threshold, in bytes, when an index write-ahead log file will
// compact into an index file. Lower sizes will cause log files to be compacted more quickly
// and result in lower heap usage at the expense of write throughput. Higher sizes will
// be compacted less frequently, store more series in-memory, and provide higher write throughput.
MaxIndexLogFileSize toml.Size `toml:"max-index-log-file-size"`
TraceLoggingEnabled bool `toml:"trace-logging-enabled"`
// TSMWillNeed controls whether we hint to the kernel that we intend to
@ -119,8 +109,6 @@ func NewConfig() Config {
CompactThroughputBurst: toml.Size(DefaultCompactThroughputBurst),
MaxConcurrentCompactions: DefaultMaxConcurrentCompactions,
MaxIndexLogFileSize: toml.Size(DefaultMaxIndexLogFileSize),
TraceLoggingEnabled: false,
TSMWillNeed: false,
}

View File

@ -2477,16 +2477,6 @@ func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile
return fn(id, "remove-me", path, seriesIDSet, sfile, options), nil
}
func MustOpenIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index {
idx, err := NewIndex(id, database, path, seriesIDSet, sfile, options)
if err != nil {
panic(err)
} else if err := idx.Open(); err != nil {
panic(err)
}
return idx
}
// assert will panic with a given formatted message if the given condition is false.
func assert(condition bool, msg string, v ...interface{}) {
if !condition {

27
tsdb/index/tsi1/config.go Normal file
View File

@ -0,0 +1,27 @@
package tsi1
import "github.com/influxdata/influxdb/toml"
// DefaultMaxIndexLogFileSize is the default threshold, in bytes, when an index
// write-ahead log file will compact into an index file.
const DefaultMaxIndexLogFileSize = 1 * 1024 * 1024 // 1MB
// DefaultIndexDirectoryName is the default name of the directory holding the
// index data.
const DefaultIndexDirectoryName = "index"
// Config holds configurable Index options.
type Config struct {
// MaxIndexLogFileSize is the threshold, in bytes, when an index write-ahead log file will
// compact into an index file. Lower sizes will cause log files to be compacted more quickly
// and result in lower heap usage at the expense of write throughput. Higher sizes will
// be compacted less frequently, store more series in-memory, and provide higher write throughput.
MaxIndexLogFileSize toml.Size `toml:"max-index-log-file-size"`
}
// NewConfig returns a new Config.
func NewConfig() Config {
return Config{
MaxIndexLogFileSize: toml.Size(DefaultMaxIndexLogFileSize),
}
}

View File

@ -8,11 +8,12 @@ import (
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/tsdb"
"github.com/influxdata/platform/tsdb/index/tsi1"
)
// Ensure fileset can return an iterator over all series in the index.
func TestFileSet_SeriesIDIterator(t *testing.T) {
idx := MustOpenIndex(1)
idx := MustOpenIndex(1, tsi1.NewConfig())
defer idx.Close()
// Create initial set of series.
@ -81,7 +82,7 @@ func TestFileSet_SeriesIDIterator(t *testing.T) {
// Ensure fileset can return an iterator over all series for one measurement.
func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
idx := MustOpenIndex(1)
idx := MustOpenIndex(1, tsi1.NewConfig())
defer idx.Close()
// Create initial set of series.
@ -147,7 +148,7 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
// Ensure fileset can return an iterator over all measurements for the index.
func TestFileSet_MeasurementIterator(t *testing.T) {
idx := MustOpenIndex(1)
idx := MustOpenIndex(1, tsi1.NewConfig())
defer idx.Close()
// Create initial set of series.
@ -221,7 +222,7 @@ func TestFileSet_MeasurementIterator(t *testing.T) {
// Ensure fileset can return an iterator over all keys for one measurement.
func TestFileSet_TagKeyIterator(t *testing.T) {
idx := MustOpenIndex(1)
idx := MustOpenIndex(1, tsi1.NewConfig())
defer idx.Close()
// Create initial set of series.

View File

@ -51,10 +51,10 @@ func init() {
}
}
tsdb.RegisterIndex(IndexName, func(_ uint64, db, path string, _ *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Index {
idx := NewIndex(sfile, db, WithPath(path), WithMaximumLogFileSize(int64(opt.Config.MaxIndexLogFileSize)))
return idx
})
// tsdb.RegisterIndex(IndexName, func(_ uint64, db, path string, _ *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, ) tsdb.Index {
// idx := NewIndex(sfile, db, WithPath(path), WithMaximumLogFileSize(int64(opt.Config.MaxIndexLogFileSize)))
// return idx
// })
}
// DefaultPartitionN determines how many shards the index will be partitioned into.
@ -85,21 +85,6 @@ var DisableCompactions = func() IndexOption {
}
}
// WithLogger sets the logger for the Index.
var WithLogger = func(l zap.Logger) IndexOption {
return func(i *Index) {
i.logger = l.With(zap.String("index", "tsi"))
}
}
// WithMaximumLogFileSize sets the maximum size of LogFiles before they're
// compacted into IndexFiles.
var WithMaximumLogFileSize = func(size int64) IndexOption {
return func(i *Index) {
i.maxLogFileSize = size
}
}
// DisableFsync disables flushing and syncing of underlying files. Primarily this
// impacts the LogFiles. This option can be set when working with the index in
// an offline manner, for cases where a hard failure can be overcome by re-running the tooling.
@ -160,10 +145,10 @@ func (i *Index) UniqueReferenceID() uintptr {
}
// NewIndex returns a new instance of Index.
func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *Index {
func NewIndex(sfile *tsdb.SeriesFile, database string, c Config, options ...IndexOption) *Index {
idx := &Index{
tagValueCache: NewTagValueSeriesIDCache(DefaultSeriesIDSetCacheSize),
maxLogFileSize: tsdb.DefaultMaxIndexLogFileSize,
maxLogFileSize: int64(c.MaxIndexLogFileSize),
logger: zap.NewNop(),
version: Version,
sfile: sfile,
@ -219,8 +204,6 @@ func (i *Index) Database() string {
// It's not safe to call WithLogger after the index has been opened, or before
// it has been closed.
func (i *Index) WithLogger(l *zap.Logger) {
i.mu.Lock()
defer i.mu.Unlock()
i.logger = l.With(zap.String("index", "tsi"))
}

View File

@ -19,7 +19,7 @@ const M, K = 4096, 6
// Ensure index can iterate over all measurement names.
func TestIndex_ForEachMeasurementName(t *testing.T) {
idx := MustOpenIndex(1)
idx := MustOpenIndex(1, tsi1.NewConfig())
defer idx.Close()
// Add series to index.
@ -72,7 +72,7 @@ func TestIndex_ForEachMeasurementName(t *testing.T) {
// Ensure index can return whether a measurement exists.
func TestIndex_MeasurementExists(t *testing.T) {
idx := MustOpenIndex(1)
idx := MustOpenIndex(1, tsi1.NewConfig())
defer idx.Close()
// Add series to index.
@ -134,7 +134,7 @@ func TestIndex_MeasurementExists(t *testing.T) {
// Ensure index can return a list of matching measurements.
func TestIndex_MeasurementNamesByRegex(t *testing.T) {
idx := MustOpenIndex(1)
idx := MustOpenIndex(1, tsi1.NewConfig())
defer idx.Close()
// Add series to index.
@ -159,7 +159,7 @@ func TestIndex_MeasurementNamesByRegex(t *testing.T) {
// Ensure index can delete a measurement and all related keys, values, & series.
func TestIndex_DropMeasurement(t *testing.T) {
idx := MustOpenIndex(1)
idx := MustOpenIndex(1, tsi1.NewConfig())
defer idx.Close()
// Add series to index.
@ -206,7 +206,7 @@ func TestIndex_DropMeasurement(t *testing.T) {
func TestIndex_Open(t *testing.T) {
// Opening a fresh index should set the MANIFEST version to current version.
idx := NewIndex(tsi1.DefaultPartitionN)
idx := NewIndex(tsi1.DefaultPartitionN, tsi1.NewConfig())
t.Run("open new index", func(t *testing.T) {
if err := idx.Open(); err != nil {
t.Fatal(err)
@ -235,7 +235,7 @@ func TestIndex_Open(t *testing.T) {
incompatibleVersions := []int{-1, 0, 2}
for _, v := range incompatibleVersions {
t.Run(fmt.Sprintf("incompatible index version: %d", v), func(t *testing.T) {
idx = NewIndex(tsi1.DefaultPartitionN)
idx = NewIndex(tsi1.DefaultPartitionN, tsi1.NewConfig())
// Manually create a MANIFEST file for an incompatible index version.
// under one of the partitions.
partitionPath := filepath.Join(idx.Path(), "2")
@ -269,7 +269,7 @@ func TestIndex_Open(t *testing.T) {
func TestIndex_Manifest(t *testing.T) {
t.Run("current MANIFEST", func(t *testing.T) {
idx := MustOpenIndex(tsi1.DefaultPartitionN)
idx := MustOpenIndex(tsi1.DefaultPartitionN, tsi1.NewConfig())
// Check version set appropriately.
for i := 0; uint64(i) < tsi1.DefaultPartitionN; i++ {
@ -282,7 +282,7 @@ func TestIndex_Manifest(t *testing.T) {
}
func TestIndex_DiskSizeBytes(t *testing.T) {
idx := MustOpenIndex(tsi1.DefaultPartitionN)
idx := MustOpenIndex(tsi1.DefaultPartitionN, tsi1.NewConfig())
defer idx.Close()
// Add series to index.
@ -312,20 +312,24 @@ func TestIndex_DiskSizeBytes(t *testing.T) {
// Index is a test wrapper for tsi1.Index.
type Index struct {
*tsi1.Index
Config tsi1.Config
SeriesFile *SeriesFile
}
// NewIndex returns a new instance of Index at a temporary path.
func NewIndex(partitionN uint64) *Index {
idx := &Index{SeriesFile: NewSeriesFile()}
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(MustTempDir()))
func NewIndex(partitionN uint64, c tsi1.Config) *Index {
idx := &Index{
Config: tsi1.NewConfig(),
SeriesFile: NewSeriesFile(),
}
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", idx.Config, tsi1.WithPath(MustTempDir()))
idx.Index.PartitionN = partitionN
return idx
}
// MustOpenIndex returns a new, open index. Panic on error.
func MustOpenIndex(partitionN uint64) *Index {
idx := NewIndex(partitionN)
func MustOpenIndex(partitionN uint64, c tsi1.Config) *Index {
idx := NewIndex(partitionN, c)
if err := idx.Open(); err != nil {
panic(err)
}
@ -362,7 +366,7 @@ func (idx *Index) Reopen() error {
}
partitionN := idx.Index.PartitionN // Remember how many partitions to use.
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(idx.Index.Path()))
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", idx.Config, tsi1.WithPath(idx.Index.Path()))
idx.Index.PartitionN = partitionN
return idx.Open()
}

View File

@ -92,7 +92,7 @@ func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
sfile: sfile,
seriesIDSet: tsdb.NewSeriesIDSet(),
MaxLogFileSize: tsdb.DefaultMaxIndexLogFileSize,
MaxLogFileSize: DefaultMaxIndexLogFileSize,
// compactionEnabled: true,
compactionInterrupt: make(chan struct{}),

View File

@ -66,7 +66,7 @@ func TestIndexSet_MeasurementNamesByExpr(t *testing.T) {
// Setup indexes
indexes := map[string]*Index{}
for _, name := range tsdb.RegisteredIndexes() {
idx := MustOpenNewIndex(name)
idx := MustOpenNewIndex(tsi1.NewConfig())
idx.AddSeries("cpu", map[string]string{"region": "east"}, models.Integer)
idx.AddSeries("cpu", map[string]string{"region": "west", "secret": "foo"}, models.Integer)
idx.AddSeries("disk", map[string]string{"secret": "foo"}, models.Integer)
@ -107,114 +107,20 @@ func TestIndexSet_MeasurementNamesByExpr(t *testing.T) {
}
}
func TestIndex_Sketches(t *testing.T) {
checkCardinalities := func(t *testing.T, index *Index, state string, series, tseries, measurements, tmeasurements int) {
t.Helper()
// Get sketches and check cardinality...
sketch, tsketch, err := index.SeriesSketches()
if err != nil {
t.Fatal(err)
}
// delta calculates a rough 10% delta. If i is small then a minimum value
// of 2 is used.
delta := func(i int) int {
v := i / 10
if v == 0 {
v = 2
}
return v
}
// series cardinality should be well within 10%.
if got, exp := int(sketch.Count()), series; got-exp < -delta(series) || got-exp > delta(series) {
t.Errorf("[%s] got series cardinality %d, expected ~%d", state, got, exp)
}
// check series tombstones
if got, exp := int(tsketch.Count()), tseries; got-exp < -delta(tseries) || got-exp > delta(tseries) {
t.Errorf("[%s] got series tombstone cardinality %d, expected ~%d", state, got, exp)
}
// Check measurement cardinality.
if sketch, tsketch, err = index.MeasurementsSketches(); err != nil {
t.Fatal(err)
}
if got, exp := int(sketch.Count()), measurements; got != exp { //got-exp < -delta(measurements) || got-exp > delta(measurements) {
t.Errorf("[%s] got measurement cardinality %d, expected ~%d", state, got, exp)
}
if got, exp := int(tsketch.Count()), tmeasurements; got != exp { //got-exp < -delta(tmeasurements) || got-exp > delta(tmeasurements) {
t.Errorf("[%s] got measurement tombstone cardinality %d, expected ~%d", state, got, exp)
}
}
idx := MustNewIndex()
// Override the log file max size to force a log file compaction sooner.
// This way, we will test the sketches are correct when they have been
// compacted into IndexFiles, and also when they're loaded from
// IndexFiles after a re-open.
tsi1.WithMaximumLogFileSize(1 << 10)(idx.Index)
// Open the index
idx.MustOpen()
defer idx.Close()
series := genTestSeries(10, 5, 3)
// Add series to index.
for _, serie := range series {
if err := idx.AddSeries(serie.Measurement, serie.Tags.Map(), serie.Type); err != nil {
t.Fatal(err)
}
}
// Check cardinalities after adding series.
checkCardinalities(t, idx, "initial", 2430, 0, 10, 0)
// Re-open the index.
if err := idx.Reopen(); err != nil {
panic(err)
}
// Check cardinalities after the reopen
checkCardinalities(t, idx, "initial|reopen", 2430, 0, 10, 0)
// Drop some series
if err := idx.DropMeasurement([]byte("measurement2")); err != nil {
t.Fatal(err)
} else if err := idx.DropMeasurement([]byte("measurement5")); err != nil {
t.Fatal(err)
}
// Check cardinalities after the delete
checkCardinalities(t, idx, "initial|reopen|delete", 2923, 0, 10, 2)
// Re-open the index.
if err := idx.Reopen(); err != nil {
panic(err)
}
// Check cardinalities after the reopen
checkCardinalities(t, idx, "initial|reopen|delete|reopen", 2923, 0, 10, 2)
}
// Index wraps a series file and index.
type Index struct {
rootPath string
config tsi1.Config
*tsi1.Index
rootPath string
indexType string
sfile *tsdb.SeriesFile
sfile *tsdb.SeriesFile
}
// MustNewIndex will initialize a new index using the provide type. It creates
// everything under the same root directory so it can be cleanly removed on Close.
//
// The index will not be opened.
func MustNewIndex() *Index {
opts := tsdb.NewEngineOptions()
func MustNewIndex(c tsi1.Config) *Index {
rootPath, err := ioutil.TempDir("", "influxdb-tsdb")
if err != nil {
panic(err)
@ -230,17 +136,15 @@ func MustNewIndex() *Index {
panic(err)
}
i, err := tsdb.NewIndex(0, "db0", filepath.Join(rootPath, "index"), tsdb.NewSeriesIDSet(), sfile, opts)
if err != nil {
panic(err)
}
i := tsi1.NewIndex(sfile, "remove-me", c, tsi1.WithPath(filepath.Join(rootPath, "index")))
if testing.Verbose() {
i.WithLogger(logger.New(os.Stderr))
}
idx := &Index{
Index: i.(*tsi1.Index),
config: c,
Index: i,
rootPath: rootPath,
sfile: sfile,
}
@ -249,8 +153,8 @@ func MustNewIndex() *Index {
// MustOpenNewIndex will initialize a new index using the provide type and opens
// it.
func MustOpenNewIndex(index string) *Index {
idx := MustNewIndex()
func MustOpenNewIndex(c tsi1.Config) *Index {
idx := MustNewIndex(c)
idx.MustOpen()
return idx
}
@ -287,13 +191,7 @@ func (i *Index) Reopen() error {
return err
}
opts := tsdb.NewEngineOptions()
idx, err := tsdb.NewIndex(0, "db0", filepath.Join(i.rootPath, "index"), tsdb.NewSeriesIDSet(), i.sfile, opts)
if err != nil {
return err
}
i.Index = idx.(*tsi1.Index)
i.Index = tsi1.NewIndex(i.SeriesFile(), "remove-me", i.config, tsi1.WithPath(filepath.Join(i.rootPath, "index")))
return i.Index.Open()
}
@ -371,7 +269,7 @@ func BenchmarkIndexSet_TagSets(b *testing.B) {
b.Run("1M series", func(b *testing.B) {
b.ReportAllocs()
for _, indexType := range tsdb.RegisteredIndexes() {
idx := MustOpenNewIndex(indexType)
idx := MustOpenNewIndex(tsi1.NewConfig())
setup(idx)
name := []byte("m4")
@ -442,7 +340,7 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
}
runBenchmark := func(b *testing.B, index string, queryN int) {
idx := MustOpenNewIndex(index)
idx := MustOpenNewIndex(tsi1.NewConfig())
var wg sync.WaitGroup
begin := make(chan struct{})
@ -498,7 +396,7 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
}
// Re-open everything
idx = MustOpenNewIndex(index)
idx = MustOpenNewIndex(tsi1.NewConfig())
wg.Add(1)
begin = make(chan struct{})
once = sync.Once{}

View File

@ -48,6 +48,11 @@ func NewSeriesFile(path string) *SeriesFile {
}
}
// WithLogger sets the logger on the SeriesFile and all underlying partitions. It must be called before Open.
func (f *SeriesFile) WithLogger(log *zap.Logger) {
f.Logger = log.With(zap.String("service", "series-file"))
}
// Open memory maps the data file at the file's path.
func (f *SeriesFile) Open() error {
// Wait for all references to be released and prevent new ones from being acquired.
@ -62,6 +67,7 @@ func (f *SeriesFile) Open() error {
// Open partitions.
f.partitions = make([]*SeriesPartition, 0, SeriesFilePartitionN)
for i := 0; i < SeriesFilePartitionN; i++ {
// TODO(edd): These partition initialisation should be moved up to NewSeriesFile.
p := NewSeriesPartition(i, f.SeriesPartitionPath(i))
p.Logger = f.Logger.With(zap.Int("partition", p.ID()))
if err := p.Open(); err != nil {

View File

@ -16,6 +16,7 @@ import (
)
func TestShardWriteAndIndex(t *testing.T) {
t.Skip("Move to the new engine")
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := filepath.Join(tmpDir, "shard")
@ -83,6 +84,7 @@ func TestShardWriteAndIndex(t *testing.T) {
}
func TestWriteTimeTag(t *testing.T) {
t.Skip("Move to the new engine")
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := filepath.Join(tmpDir, "shard")
@ -123,6 +125,7 @@ func TestWriteTimeTag(t *testing.T) {
}
func TestWriteTimeField(t *testing.T) {
t.Skip("Move to the new engine")
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := filepath.Join(tmpDir, "shard")
@ -152,6 +155,7 @@ func TestWriteTimeField(t *testing.T) {
}
func TestShardWriteAddNewField(t *testing.T) {
t.Skip("Move to the new engine")
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := filepath.Join(tmpDir, "shard")
@ -199,6 +203,7 @@ func TestShardWriteAddNewField(t *testing.T) {
// Ensures that when a shard is closed, it removes any series meta-data
// from the index.
func TestShard_Close_RemoveIndex(t *testing.T) {
t.Skip("Move to the new engine")
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := filepath.Join(tmpDir, "shard")

View File

@ -26,6 +26,7 @@ import (
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/deep"
"github.com/influxdata/platform/tsdb"
"github.com/influxdata/platform/tsdb/index/tsi1"
"github.com/influxdata/platform/tsdb/tsm1"
)
@ -199,7 +200,7 @@ func TestEngine_Backup(t *testing.T) {
db := path.Base(f.Name())
opt := tsdb.NewEngineOptions()
idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt)
idx := MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt)
defer idx.Close()
e := tsm1.NewEngine(1, idx, f.Name(), walPath, sfile.SeriesFile, opt).(*tsm1.Engine)
@ -306,7 +307,7 @@ func TestEngine_Export(t *testing.T) {
db := path.Base(f.Name())
opt := tsdb.NewEngineOptions()
idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt)
idx := MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt)
defer idx.Close()
e := tsm1.NewEngine(1, idx, f.Name(), walPath, sfile.SeriesFile, opt).(*tsm1.Engine)
@ -1649,7 +1650,7 @@ func TestEngine_SnapshotsDisabled(t *testing.T) {
db := path.Base(dir)
opt := tsdb.NewEngineOptions()
idx := tsdb.MustOpenIndex(1, db, filepath.Join(dir, "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt)
idx := MustOpenIndex(1, db, filepath.Join(dir, "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt)
defer idx.Close()
e := tsm1.NewEngine(1, idx, dir, walPath, sfile.SeriesFile, opt).(*tsm1.Engine)
@ -2321,7 +2322,7 @@ func NewEngine() (*Engine, error) {
opt.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{seriesIDs})
idxPath := filepath.Join(dbPath, "index")
idx := tsdb.MustOpenIndex(1, db, idxPath, seriesIDs, sfile, opt)
idx := MustOpenIndex(1, db, idxPath, seriesIDs, sfile, opt)
tsm1Engine := tsm1.NewEngine(1, idx, filepath.Join(root, "data"), filepath.Join(root, "wal"), sfile, opt).(*tsm1.Engine)
@ -2390,7 +2391,7 @@ func (e *Engine) Reopen() error {
opt.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{seriesIDSet})
// Re-open index.
e.index = tsdb.MustOpenIndex(1, db, e.indexPath, seriesIDSet, e.sfile, opt)
e.index = MustOpenIndex(1, db, e.indexPath, seriesIDSet, e.sfile, opt)
// Re-initialize engine.
e.Engine = tsm1.NewEngine(1, e.index, filepath.Join(e.root, "data"), filepath.Join(e.root, "wal"), e.sfile, opt).(*tsm1.Engine)
@ -2460,6 +2461,14 @@ func (e *Engine) MustWriteSnapshot() {
}
}
func MustOpenIndex(id uint64, database, path string, seriesIDSet *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, options tsdb.EngineOptions) tsdb.Index {
idx := tsi1.NewIndex(sfile, database, tsi1.NewConfig(), tsi1.WithPath(path))
if err := idx.Open(); err != nil {
panic(err)
}
return idx
}
// SeriesFile is a test wrapper for tsdb.SeriesFile.
type SeriesFile struct {
*tsdb.SeriesFile