Remove Dev prefix from tsm engine/tx

pull/5331/head
Jason Wilder 2016-01-10 15:59:46 -07:00
parent 9c851f790e
commit 24f1bcfd20
2 changed files with 54 additions and 54 deletions

View File

@ -18,11 +18,11 @@ import (
) )
func init() { func init() {
tsdb.RegisterEngine("tsm1", NewDevEngine) tsdb.RegisterEngine("tsm1", NewEngine)
} }
// Ensure Engine implements the interface. // Ensure Engine implements the interface.
var _ tsdb.Engine = &DevEngine{} var _ tsdb.Engine = &Engine{}
const ( const (
// keyFieldSeparator separates the series key from the field name in the composite key // keyFieldSeparator separates the series key from the field name in the composite key
@ -31,7 +31,7 @@ const (
) )
// Engine represents a storage engine with compressed blocks. // Engine represents a storage engine with compressed blocks.
type DevEngine struct { type Engine struct {
mu sync.RWMutex mu sync.RWMutex
done chan struct{} done chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -57,8 +57,8 @@ type DevEngine struct {
CacheFlushWriteColdDuration time.Duration CacheFlushWriteColdDuration time.Duration
} }
// NewDevEngine returns a new instance of Engine. // NewEngine returns a new instance of Engine.
func NewDevEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine { func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
w := NewWAL(walPath) w := NewWAL(walPath)
w.LoggingEnabled = opt.Config.WALLoggingEnabled w.LoggingEnabled = opt.Config.WALLoggingEnabled
@ -72,7 +72,7 @@ func NewDevEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engi
FileStore: fs, FileStore: fs,
} }
e := &DevEngine{ e := &Engine{
path: path, path: path,
logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags), logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags),
@ -95,19 +95,19 @@ func NewDevEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engi
} }
// Path returns the path the engine was opened with. // Path returns the path the engine was opened with.
func (e *DevEngine) Path() string { return e.path } func (e *Engine) Path() string { return e.path }
// PerformMaintenance is for periodic maintenance of the store. A no-op for b1 // PerformMaintenance is for periodic maintenance of the store. A no-op for b1
func (e *DevEngine) PerformMaintenance() { func (e *Engine) PerformMaintenance() {
} }
// Format returns the format type of this engine // Format returns the format type of this engine
func (e *DevEngine) Format() tsdb.EngineFormat { func (e *Engine) Format() tsdb.EngineFormat {
return tsdb.TSM1Format return tsdb.TSM1Format
} }
// Open opens and initializes the engine. // Open opens and initializes the engine.
func (e *DevEngine) Open() error { func (e *Engine) Open() error {
e.done = make(chan struct{}) e.done = make(chan struct{})
e.Compactor.Cancel = e.done e.Compactor.Cancel = e.done
@ -142,7 +142,7 @@ func (e *DevEngine) Open() error {
} }
// Close closes the engine. // Close closes the engine.
func (e *DevEngine) Close() error { func (e *Engine) Close() error {
// Shutdown goroutines and wait. // Shutdown goroutines and wait.
close(e.done) close(e.done)
e.wg.Wait() e.wg.Wait()
@ -158,10 +158,10 @@ func (e *DevEngine) Close() error {
} }
// SetLogOutput is a no-op. // SetLogOutput is a no-op.
func (e *DevEngine) SetLogOutput(w io.Writer) {} func (e *Engine) SetLogOutput(w io.Writer) {}
// LoadMetadataIndex loads the shard metadata into memory. // LoadMetadataIndex loads the shard metadata into memory.
func (e *DevEngine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { func (e *Engine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
keys := e.FileStore.Keys() keys := e.FileStore.Keys()
keysLoaded := make(map[string]bool) keysLoaded := make(map[string]bool)
@ -213,7 +213,7 @@ func (e *DevEngine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex,
// that new TSM files will not be able to be created in this shard while the // that new TSM files will not be able to be created in this shard while the
// backup is running. For shards that are still acively getting writes, this // backup is running. For shards that are still acively getting writes, this
// could cause the WAL to backup, increasing memory usage and evenutally rejecting writes. // could cause the WAL to backup, increasing memory usage and evenutally rejecting writes.
func (e *DevEngine) Backup(w io.Writer, basePath string, since time.Time) error { func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
if err := e.WriteSnapshot(); err != nil { if err := e.WriteSnapshot(); err != nil {
return err return err
} }
@ -248,7 +248,7 @@ func (e *DevEngine) Backup(w io.Writer, basePath string, since time.Time) error
// writeFileToBackup will copy the file into the tar archive. Files will use the shardRelativePath // writeFileToBackup will copy the file into the tar archive. Files will use the shardRelativePath
// in their names. This should be the <db>/<retention policy>/<id> part of the path // in their names. This should be the <db>/<retention policy>/<id> part of the path
func (e *DevEngine) writeFileToBackup(f FileStat, shardRelativePath string, tw *tar.Writer) error { func (e *Engine) writeFileToBackup(f FileStat, shardRelativePath string, tw *tar.Writer) error {
h := &tar.Header{ h := &tar.Header{
Name: filepath.Join(shardRelativePath, filepath.Base(f.Path)), Name: filepath.Join(shardRelativePath, filepath.Base(f.Path)),
ModTime: f.LastModified, ModTime: f.LastModified,
@ -271,7 +271,7 @@ func (e *DevEngine) writeFileToBackup(f FileStat, shardRelativePath string, tw *
// addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the // addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the
// database index and measurement fields // database index and measurement fields
func (e *DevEngine) addToIndexFromKey(key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { func (e *Engine) addToIndexFromKey(key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
seriesKey, field := seriesAndFieldFromCompositeKey(key) seriesKey, field := seriesAndFieldFromCompositeKey(key)
measurement := tsdb.MeasurementFromSeriesKey(seriesKey) measurement := tsdb.MeasurementFromSeriesKey(seriesKey)
@ -304,7 +304,7 @@ func (e *DevEngine) addToIndexFromKey(key string, fieldType influxql.DataType, i
// WritePoints writes metadata and point data into the engine. // WritePoints writes metadata and point data into the engine.
// Returns an error if new points are added to an existing key. // Returns an error if new points are added to an existing key.
func (e *DevEngine) WritePoints(points []models.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { func (e *Engine) WritePoints(points []models.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
values := map[string][]Value{} values := map[string][]Value{}
for _, p := range points { for _, p := range points {
for k, v := range p.Fields() { for k, v := range p.Fields() {
@ -327,7 +327,7 @@ func (e *DevEngine) WritePoints(points []models.Point, measurementFieldsToSave m
} }
// DeleteSeries deletes the series from the engine. // DeleteSeries deletes the series from the engine.
func (e *DevEngine) DeleteSeries(seriesKeys []string) error { func (e *Engine) DeleteSeries(seriesKeys []string) error {
e.mu.RLock() e.mu.RLock()
defer e.mu.RUnlock() defer e.mu.RUnlock()
@ -369,24 +369,24 @@ func (e *DevEngine) DeleteSeries(seriesKeys []string) error {
} }
// DeleteMeasurement deletes a measurement and all related series. // DeleteMeasurement deletes a measurement and all related series.
func (e *DevEngine) DeleteMeasurement(name string, seriesKeys []string) error { func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error {
return e.DeleteSeries(seriesKeys) return e.DeleteSeries(seriesKeys)
} }
// SeriesCount returns the number of series buckets on the shard. // SeriesCount returns the number of series buckets on the shard.
func (e *DevEngine) SeriesCount() (n int, err error) { func (e *Engine) SeriesCount() (n int, err error) {
return 0, nil return 0, nil
} }
// Begin starts a new transaction on the engine. // Begin starts a new transaction on the engine.
func (e *DevEngine) Begin(writable bool) (tsdb.Tx, error) { func (e *Engine) Begin(writable bool) (tsdb.Tx, error) {
return &devTx{engine: e}, nil return &tx{engine: e}, nil
} }
func (e *DevEngine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") } func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done. // WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
func (e *DevEngine) WriteSnapshot() error { func (e *Engine) WriteSnapshot() error {
// Lock and grab the cache snapshot along with all the closed WAL // Lock and grab the cache snapshot along with all the closed WAL
// filenames associated with the snapshot // filenames associated with the snapshot
closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) { closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) {
@ -415,7 +415,7 @@ func (e *DevEngine) WriteSnapshot() error {
} }
// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments // writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments
func (e *DevEngine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache, compactor *Compactor) error { func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache, compactor *Compactor) error {
// write the new snapshot files // write the new snapshot files
newFiles, err := compactor.WriteSnapshot(snapshot) newFiles, err := compactor.WriteSnapshot(snapshot)
if err != nil { if err != nil {
@ -443,7 +443,7 @@ func (e *DevEngine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache
} }
// compactCache continually checks if the WAL cache should be written to disk // compactCache continually checks if the WAL cache should be written to disk
func (e *DevEngine) compactCache() { func (e *Engine) compactCache() {
defer e.wg.Done() defer e.wg.Done()
for { for {
select { select {
@ -464,7 +464,7 @@ func (e *DevEngine) compactCache() {
// ShouldCompactCache returns true if the Cache is over its flush threshold // ShouldCompactCache returns true if the Cache is over its flush threshold
// or if the passed in lastWriteTime is older than the write cold threshold // or if the passed in lastWriteTime is older than the write cold threshold
func (e *DevEngine) ShouldCompactCache(lastWriteTime time.Time) bool { func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
sz := e.Cache.Size() sz := e.Cache.Size()
if sz == 0 { if sz == 0 {
@ -475,7 +475,7 @@ func (e *DevEngine) ShouldCompactCache(lastWriteTime time.Time) bool {
time.Now().Sub(lastWriteTime) > e.CacheFlushWriteColdDuration time.Now().Sub(lastWriteTime) > e.CacheFlushWriteColdDuration
} }
func (e *DevEngine) compactTSMLevel(fast bool, level int) { func (e *Engine) compactTSMLevel(fast bool, level int) {
defer e.wg.Done() defer e.wg.Done()
for { for {
@ -539,7 +539,7 @@ func (e *DevEngine) compactTSMLevel(fast bool, level int) {
} }
} }
func (e *DevEngine) compactTSMFull() { func (e *Engine) compactTSMFull() {
defer e.wg.Done() defer e.wg.Done()
for { for {
@ -592,7 +592,7 @@ func (e *DevEngine) compactTSMFull() {
} }
// reloadCache reads the WAL segment files and loads them into the cache. // reloadCache reads the WAL segment files and loads them into the cache.
func (e *DevEngine) reloadCache() error { func (e *Engine) reloadCache() error {
files, err := segmentFileNames(e.WAL.Path()) files, err := segmentFileNames(e.WAL.Path())
if err != nil { if err != nil {
return err return err
@ -606,7 +606,7 @@ func (e *DevEngine) reloadCache() error {
return nil return nil
} }
func (e *DevEngine) cleanup() error { func (e *Engine) cleanup() error {
files, err := filepath.Glob(filepath.Join(e.path, fmt.Sprintf("*.%s", CompactionTempExtension))) files, err := filepath.Glob(filepath.Join(e.path, fmt.Sprintf("*.%s", CompactionTempExtension)))
if err != nil { if err != nil {
return fmt.Errorf("error getting compaction checkpoints: %s", err.Error()) return fmt.Errorf("error getting compaction checkpoints: %s", err.Error())
@ -620,18 +620,18 @@ func (e *DevEngine) cleanup() error {
return nil return nil
} }
func (e *DevEngine) KeyCursor(key string) *KeyCursor { func (e *Engine) KeyCursor(key string) *KeyCursor {
e.mu.RLock() e.mu.RLock()
defer e.mu.RUnlock() defer e.mu.RUnlock()
return e.FileStore.KeyCursor(key) return e.FileStore.KeyCursor(key)
} }
type devTx struct { type tx struct {
engine *DevEngine engine *Engine
} }
// Cursor returns a cursor for all cached and TSM-based data. // Cursor returns a cursor for all cached and TSM-based data.
func (t *devTx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor { func (t *tx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
if len(fields) == 1 { if len(fields) == 1 {
key := SeriesFieldKey(series, fields[0]) key := SeriesFieldKey(series, fields[0])
return &devCursor{ return &devCursor{
@ -664,10 +664,10 @@ func (t *devTx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, asc
return NewMultiFieldCursor(cursorFields, cursors, ascending) return NewMultiFieldCursor(cursorFields, cursors, ascending)
} }
func (t *devTx) Rollback() error { return nil } func (t *tx) Rollback() error { return nil }
func (t *devTx) Size() int64 { panic("not implemented") } func (t *tx) Size() int64 { panic("not implemented") }
func (t *devTx) Commit() error { panic("not implemented") } func (t *tx) Commit() error { panic("not implemented") }
func (t *devTx) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") } func (t *tx) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
// devCursor is a cursor that combines both TSM and cached data. // devCursor is a cursor that combines both TSM and cached data.
type devCursor struct { type devCursor struct {

View File

@ -17,7 +17,7 @@ import (
) )
// Ensure an engine containing cached values responds correctly to queries. // Ensure an engine containing cached values responds correctly to queries.
func TestDevEngine_QueryCache_Ascending(t *testing.T) { func TestEngine_QueryCache_Ascending(t *testing.T) {
// Generate temporary file. // Generate temporary file.
f, _ := ioutil.TempFile("", "tsm") f, _ := ioutil.TempFile("", "tsm")
f.Close() f.Close()
@ -32,7 +32,7 @@ func TestDevEngine_QueryCache_Ascending(t *testing.T) {
p3 := parsePoint("cpu,host=A value=1.3 3000000000") p3 := parsePoint("cpu,host=A value=1.3 3000000000")
// Write those points to the engine. // Write those points to the engine.
e := NewDevEngine(f.Name(), walPath, tsdb.NewEngineOptions()) e := NewEngine(f.Name(), walPath, tsdb.NewEngineOptions())
if err := e.Open(); err != nil { if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error()) t.Fatalf("failed to open tsm1 engine: %s", err.Error())
} }
@ -41,7 +41,7 @@ func TestDevEngine_QueryCache_Ascending(t *testing.T) {
} }
// Start a query transactions and get a cursor. // Start a query transactions and get a cursor.
tx := devTx{engine: e.(*DevEngine)} tx := tx{engine: e.(*Engine)}
ascCursor := tx.Cursor("cpu,host=A", []string{"value"}, nil, true) ascCursor := tx.Cursor("cpu,host=A", []string{"value"}, nil, true)
k, v := ascCursor.SeekTo(1) k, v := ascCursor.SeekTo(1)
@ -76,7 +76,7 @@ func TestDevEngine_QueryCache_Ascending(t *testing.T) {
} }
// Ensure an engine containing cached values responds correctly to queries. // Ensure an engine containing cached values responds correctly to queries.
func TestDevEngine_QueryTSM_Ascending(t *testing.T) { func TestEngine_QueryTSM_Ascending(t *testing.T) {
fs := NewFileStore("") fs := NewFileStore("")
// Setup 3 files // Setup 3 files
@ -133,7 +133,7 @@ func TestDevEngine_QueryTSM_Ascending(t *testing.T) {
} }
// Ensure an engine containing cached values responds correctly to queries. // Ensure an engine containing cached values responds correctly to queries.
func TestDevEngine_QueryCache_Descending(t *testing.T) { func TestEngine_QueryCache_Descending(t *testing.T) {
// Generate temporary file. // Generate temporary file.
f, _ := ioutil.TempFile("", "tsm") f, _ := ioutil.TempFile("", "tsm")
f.Close() f.Close()
@ -148,7 +148,7 @@ func TestDevEngine_QueryCache_Descending(t *testing.T) {
p3 := parsePoint("cpu,host=A value=1.3 3000000000") p3 := parsePoint("cpu,host=A value=1.3 3000000000")
// Write those points to the engine. // Write those points to the engine.
e := NewDevEngine(f.Name(), walPath, tsdb.NewEngineOptions()) e := NewEngine(f.Name(), walPath, tsdb.NewEngineOptions())
if err := e.Open(); err != nil { if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error()) t.Fatalf("failed to open tsm1 engine: %s", err.Error())
} }
@ -157,7 +157,7 @@ func TestDevEngine_QueryCache_Descending(t *testing.T) {
} }
// Start a query transactions and get a cursor. // Start a query transactions and get a cursor.
tx := devTx{engine: e.(*DevEngine)} tx := tx{engine: e.(*Engine)}
descCursor := tx.Cursor("cpu,host=A", []string{"value"}, nil, false) descCursor := tx.Cursor("cpu,host=A", []string{"value"}, nil, false)
k, v := descCursor.SeekTo(4000000000) k, v := descCursor.SeekTo(4000000000)
@ -177,7 +177,7 @@ func TestDevEngine_QueryCache_Descending(t *testing.T) {
} }
// Ensure an engine containing cached values responds correctly to queries. // Ensure an engine containing cached values responds correctly to queries.
func TestDevEngine_QueryTSM_Descending(t *testing.T) { func TestEngine_QueryTSM_Descending(t *testing.T) {
fs := NewFileStore("") fs := NewFileStore("")
// Setup 3 files // Setup 3 files
@ -218,7 +218,7 @@ func TestDevEngine_QueryTSM_Descending(t *testing.T) {
} }
} }
func TestDevEngine_LoadMetadataIndex(t *testing.T) { func TestEngine_LoadMetadataIndex(t *testing.T) {
// Generate temporary file. // Generate temporary file.
f, _ := ioutil.TempFile("", "tsm") f, _ := ioutil.TempFile("", "tsm")
f.Close() f.Close()
@ -232,7 +232,7 @@ func TestDevEngine_LoadMetadataIndex(t *testing.T) {
p2 := parsePoint("cpu,host=B value=1.2 2000000000") p2 := parsePoint("cpu,host=B value=1.2 2000000000")
// Write those points to the engine. // Write those points to the engine.
e := NewDevEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*DevEngine) e := NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*Engine)
if err := e.Open(); err != nil { if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error()) t.Fatalf("failed to open tsm1 engine: %s", err.Error())
} }
@ -317,7 +317,7 @@ func TestDevEngine_LoadMetadataIndex(t *testing.T) {
} }
// Ensure that deletes only sent to the WAL will clear out the data from the cache on restart // Ensure that deletes only sent to the WAL will clear out the data from the cache on restart
func TestDevEngine_DeleteWALLoadMetadata(t *testing.T) { func TestEngine_DeleteWALLoadMetadata(t *testing.T) {
// Generate temporary file. // Generate temporary file.
f, _ := ioutil.TempFile("", "tsm") f, _ := ioutil.TempFile("", "tsm")
f.Close() f.Close()
@ -331,7 +331,7 @@ func TestDevEngine_DeleteWALLoadMetadata(t *testing.T) {
p2 := parsePoint("cpu,host=B value=1.2 2000000000") p2 := parsePoint("cpu,host=B value=1.2 2000000000")
// Write those points to the engine. // Write those points to the engine.
e := NewDevEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*DevEngine) e := NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*Engine)
if err := e.Open(); err != nil { if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error()) t.Fatalf("failed to open tsm1 engine: %s", err.Error())
} }
@ -347,7 +347,7 @@ func TestDevEngine_DeleteWALLoadMetadata(t *testing.T) {
t.Fatalf("error closing: %s", err.Error()) t.Fatalf("error closing: %s", err.Error())
} }
e = NewDevEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*DevEngine) e = NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*Engine)
if err := e.Open(); err != nil { if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error()) t.Fatalf("failed to open tsm1 engine: %s", err.Error())
} }
@ -362,7 +362,7 @@ func TestDevEngine_DeleteWALLoadMetadata(t *testing.T) {
} }
// Ensure that the engine will backup any TSM files created since the passed in time // Ensure that the engine will backup any TSM files created since the passed in time
func TestDevEngine_Backup(t *testing.T) { func TestEngine_Backup(t *testing.T) {
// Generate temporary file. // Generate temporary file.
f, _ := ioutil.TempFile("", "tsm") f, _ := ioutil.TempFile("", "tsm")
f.Close() f.Close()
@ -377,7 +377,7 @@ func TestDevEngine_Backup(t *testing.T) {
p3 := parsePoint("cpu,host=C value=1.3 3000000000") p3 := parsePoint("cpu,host=C value=1.3 3000000000")
// Write those points to the engine. // Write those points to the engine.
e := NewDevEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*DevEngine) e := NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*Engine)
// mock the planner so compactions don't run during the test // mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{} e.CompactionPlan = &mockPlanner{}