Implement TSI index versioning
This commit adds a basic TSI versioning scheme, by adding a Version field to an index's MANIFEST file. Existing TSI indexes will not have this field present in their MANIFEST files, and thus will be deemed incomatible with the current version. Users with existing TSI indexes will be able to remove them, and convert the resulting inmem indexes to the current version of a TSI index using the influx_inspect tooling.pull/8857/head
parent
1028818ba6
commit
ea104596f0
|
@ -23,8 +23,13 @@ import (
|
|||
"github.com/uber-go/zap"
|
||||
)
|
||||
|
||||
// IndexName is the name of the index.
|
||||
const IndexName = "tsi1"
|
||||
const (
|
||||
// IndexName is the name of the index.
|
||||
IndexName = "tsi1"
|
||||
|
||||
// Version is the current version of the TSI index.
|
||||
Version = 1
|
||||
)
|
||||
|
||||
// Default compaction thresholds.
|
||||
const (
|
||||
|
@ -95,6 +100,9 @@ type Index struct {
|
|||
CompactionMonitorInterval time.Duration
|
||||
|
||||
logger zap.Logger
|
||||
|
||||
// Index's version.
|
||||
version int
|
||||
}
|
||||
|
||||
// NewIndex returns a new instance of Index.
|
||||
|
@ -106,10 +114,15 @@ func NewIndex() *Index {
|
|||
MaxLogFileSize: DefaultMaxLogFileSize,
|
||||
CompactionEnabled: true,
|
||||
|
||||
logger: zap.New(zap.NullEncoder()),
|
||||
logger: zap.New(zap.NullEncoder()),
|
||||
version: Version,
|
||||
}
|
||||
}
|
||||
|
||||
// ErrIncompatibleVersion is returned when attempting to read from an
|
||||
// incompatible tsi1 manifest file.
|
||||
var ErrIncompatibleVersion = errors.New("incompatible tsi1 index MANIFEST")
|
||||
|
||||
func (i *Index) Type() string { return IndexName }
|
||||
|
||||
// Open opens the index.
|
||||
|
@ -134,6 +147,11 @@ func (i *Index) Open() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Check to see if the MANIFEST file is compatible with the current Index.
|
||||
if err := m.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Copy compaction levels to the index.
|
||||
i.levels = make([]CompactionLevel, len(m.Levels))
|
||||
copy(i.levels, m.Levels)
|
||||
|
@ -287,8 +305,9 @@ func (i *Index) ManifestPath() string {
|
|||
// Manifest returns a manifest for the index.
|
||||
func (i *Index) Manifest() *Manifest {
|
||||
m := &Manifest{
|
||||
Levels: i.levels,
|
||||
Files: make([]string, len(i.fileSet.files)),
|
||||
Levels: i.levels,
|
||||
Files: make([]string, len(i.fileSet.files)),
|
||||
Version: i.version,
|
||||
}
|
||||
|
||||
for j, f := range i.fileSet.files {
|
||||
|
@ -1221,12 +1240,16 @@ func ParseFilename(name string) (level, id int) {
|
|||
type Manifest struct {
|
||||
Levels []CompactionLevel `json:"levels,omitempty"`
|
||||
Files []string `json:"files,omitempty"`
|
||||
|
||||
// Version should be updated whenever the TSI format has changed.
|
||||
Version int `json:"version,omitempty"`
|
||||
}
|
||||
|
||||
// NewManifest returns a new instance of Manifest with default compaction levels.
|
||||
func NewManifest() *Manifest {
|
||||
m := &Manifest{
|
||||
Levels: make([]CompactionLevel, len(DefaultCompactionLevels)),
|
||||
Levels: make([]CompactionLevel, len(DefaultCompactionLevels)),
|
||||
Version: Version,
|
||||
}
|
||||
copy(m.Levels, DefaultCompactionLevels[:])
|
||||
return m
|
||||
|
@ -1242,6 +1265,17 @@ func (m *Manifest) HasFile(name string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// Validate checks if the Manifest's version is compatible with this version
|
||||
// of the tsi1 index.
|
||||
func (m *Manifest) Validate() error {
|
||||
// If we don't have an explicit version in the manifest file then we know
|
||||
// it's not compatible with the latest tsi1 Index.
|
||||
if m.Version != Version {
|
||||
return ErrIncompatibleVersion
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadManifestFile reads a manifest from a file path.
|
||||
func ReadManifestFile(path string) (*Manifest, error) {
|
||||
buf, err := ioutil.ReadFile(path)
|
||||
|
|
|
@ -2,7 +2,9 @@ package tsi1_test
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"testing"
|
||||
|
@ -243,6 +245,71 @@ func TestIndex_DropMeasurement(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestIndex_Open(t *testing.T) {
|
||||
// Opening a fresh index should set the MANIFEST version to current version.
|
||||
idx := NewIndex()
|
||||
t.Run("open new index", func(t *testing.T) {
|
||||
if err := idx.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Check version set appropriately.
|
||||
if got, exp := idx.Manifest().Version, 1; got != exp {
|
||||
t.Fatalf("got index version %d, expected %d", got, exp)
|
||||
}
|
||||
})
|
||||
|
||||
// Reopening an open index should return an error.
|
||||
t.Run("reopen open index", func(t *testing.T) {
|
||||
err := idx.Open()
|
||||
if err == nil {
|
||||
idx.Close()
|
||||
t.Fatal("didn't get an error on reopen, but expected one")
|
||||
}
|
||||
idx.Close()
|
||||
})
|
||||
|
||||
// Opening an incompatible index should return an error.
|
||||
incompatibleVersions := []int{-1, 0, 2}
|
||||
for _, v := range incompatibleVersions {
|
||||
t.Run(fmt.Sprintf("incompatible index version: %d", v), func(t *testing.T) {
|
||||
idx = NewIndex()
|
||||
// Manually create a MANIFEST file for an incompatible index version.
|
||||
mpath := filepath.Join(idx.Path, tsi1.ManifestFileName)
|
||||
m := tsi1.NewManifest()
|
||||
m.Levels = nil
|
||||
m.Version = v // Set example MANIFEST version.
|
||||
if err := tsi1.WriteManifestFile(mpath, m); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Log the MANIFEST file.
|
||||
data, err := ioutil.ReadFile(mpath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
t.Logf("Incompatible MANIFEST: %s", data)
|
||||
|
||||
// Opening this index should return an error because the MANIFEST has an
|
||||
// incompatible version.
|
||||
err = idx.Open()
|
||||
if err != tsi1.ErrIncompatibleVersion {
|
||||
idx.Close()
|
||||
t.Fatalf("got error %v, expected %v", err, tsi1.ErrIncompatibleVersion)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndex_Manifest(t *testing.T) {
|
||||
t.Run("current MANIFEST", func(t *testing.T) {
|
||||
idx := MustOpenIndex()
|
||||
if got, exp := idx.Manifest().Version, tsi1.Version; got != exp {
|
||||
t.Fatalf("got MANIFEST version %d, expected %d", got, exp)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Index is a test wrapper for tsi1.Index.
|
||||
type Index struct {
|
||||
*tsi1.Index
|
||||
|
|
Loading…
Reference in New Issue