Merge pull request #6593 from influxdata/cjl-copyshard

create shard snapshot
pull/6598/head
Cory LaNou 2016-05-10 20:01:59 -05:00
commit c32906a366
8 changed files with 185 additions and 2 deletions

View File

@ -37,7 +37,8 @@ With this release InfluxDB is moving to Go v1.6.
- [#6534](https://github.com/influxdata/influxdb/pull/6534): Move to Go v1.6.2 (over Go v1.4.3)
- [#6522](https://github.com/influxdata/influxdb/pull/6522): Dump TSM files to line protocol
- [#6585](https://github.com/influxdata/influxdb/pull/6585): Parallelize iterators
- [#6502](https://github.com/influxdata/influxdb/pull/6502): Add ability to copy shard via rpc calls. Remove deprecated copier service..
- [#6502](https://github.com/influxdata/influxdb/pull/6502): Add ability to copy shard via rpc calls. Remove deprecated copier service.
- [#6593](https://github.com/influxdata/influxdb/pull/6593): Add ability to create snapshots of shards.
### Bugfixes

View File

@ -43,6 +43,7 @@ type Engine interface {
DeleteMeasurement(name string, seriesKeys []string) error
SeriesCount() (n int, err error)
MeasurementFields(measurement string) *MeasurementFields
CreateSnapshot() (string, error)
// Format will return the format for the engine
Format() EngineFormat

View File

@ -4,6 +4,7 @@ import (
"archive/tar"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
@ -578,6 +579,19 @@ func (e *Engine) WriteSnapshot() error {
return e.writeSnapshotAndCommit(closedFiles, snapshot, compactor)
}
// CreateSnapshot will create a temp directory that holds
// temporary hardlinks to the underylyng shard files
func (e *Engine) CreateSnapshot() (string, error) {
if err := e.WriteSnapshot(); err != nil {
return "", nil
}
e.mu.RLock()
defer e.mu.RUnlock()
return e.FileStore.CreateSnapshot()
}
// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments
func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache, compactor *Compactor) (err error) {
@ -797,6 +811,20 @@ func (e *Engine) cleanup() error {
return fmt.Errorf("error removing temp compaction files: %v", err)
}
}
allfiles, err := ioutil.ReadDir(e.path)
if err != nil {
return err
}
for _, f := range allfiles {
// Check to see if there are any `.tmp` directories that were left over from failed shard snapshots
if f.IsDir() && strings.HasSuffix(f.Name(), ".tmp") {
if err := os.Remove(f.Name()); err != nil {
return fmt.Errorf("error removing tmp snapshot directory %q: %s", f.Name(), err)
}
}
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"expvar"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
@ -20,7 +21,7 @@ import (
type TSMFile interface {
// Path returns the underlying file path for the TSMFile. If the file
// has not be written or loaded from disk, the zero value is returne.
// has not be written or loaded from disk, the zero value is returned.
Path() string
// Read returns all the values in the block where time t resides
@ -113,6 +114,8 @@ type FileStore struct {
traceLogging bool
statMap *expvar.Map
currentTempDirID int
}
type FileStat struct {
@ -294,6 +297,24 @@ func (f *FileStore) Open() error {
return nil
}
// find the current max ID for temp directories
tmpfiles, err := ioutil.ReadDir(f.dir)
if err != nil {
return err
}
for _, fi := range tmpfiles {
if fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp") {
ss := strings.Split(filepath.Base(fi.Name()), ".")
if len(ss) == 2 {
if i, err := strconv.Atoi(ss[0]); err != nil {
if i > f.currentTempDirID {
f.currentTempDirID = i
}
}
}
}
}
files, err := filepath.Glob(filepath.Join(f.dir, fmt.Sprintf("*.%s", TSMFileExtension)))
if err != nil {
return err
@ -589,6 +610,43 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
return locations
}
// CreateSnapshot will create hardlinks for all tsm and tombstone files
// in the path provided
func (f *FileStore) CreateSnapshot() (string, error) {
files := f.Files()
f.mu.Lock()
f.currentTempDirID += 1
f.mu.Unlock()
f.mu.RLock()
defer f.mu.RUnlock()
// get a tmp directory name
tmpPath := fmt.Sprintf("%s/%d.tmp", f.dir, f.currentTempDirID)
err := os.Mkdir(tmpPath, 0777)
if err != nil {
return "", nil
}
for _, tsmf := range files {
newpath := filepath.Join(tmpPath, filepath.Base(tsmf.Path()))
if err := os.Link(tsmf.Path(), newpath); err != nil {
return "", fmt.Errorf("error creating tsm hard link: %q", err)
}
// Check for tombstones and link those as well
for _, tf := range tsmf.TombstoneFiles() {
tfpath := filepath.Join(f.dir, tf.Path)
newpath := filepath.Join(tmpPath, filepath.Base(tf.Path))
if err := os.Link(tfpath, newpath); err != nil {
return "", fmt.Errorf("error creating tombstone hard link: %q", err)
}
}
}
return tmpPath, nil
}
// ParseTSMFileName parses the generation and sequence from a TSM file name.
func ParseTSMFileName(name string) (int, int, error) {
base := filepath.Base(name)

View File

@ -1538,7 +1538,60 @@ func TestFileStore_Stats(t *testing.T) {
if got, exp := len(stats), 3; got != exp {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}
}
func TestFileStore_CreateSnapshot(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
fs := tsm1.NewFileStore(dir)
// Setup 3 files
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(1, 2.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(2, 3.0)}},
}
files, err := newFiles(dir, data...)
if err != nil {
t.Fatalf("unexpected error creating files: %v", err)
}
fs.Add(files...)
// Create a tombstone
if err := fs.DeleteRange([]string{"cpu"}, 1, 1); err != nil {
t.Fatalf("unexpected error delete range: %v", err)
}
s, e := fs.CreateSnapshot()
if e != nil {
t.Fatal(e)
}
t.Logf("temp file for hard links: %q", s)
tfs, e := ioutil.ReadDir(s)
if e != nil {
t.Fatal(e)
}
if len(tfs) == 0 {
t.Fatal("no files found")
}
for _, f := range fs.Files() {
p := filepath.Join(s, filepath.Base(f.Path()))
t.Logf("checking for existence of hard link %q", p)
if _, err := os.Stat(p); os.IsNotExist(err) {
t.Fatalf("unable to find file %q", p)
}
for _, tf := range f.TombstoneFiles() {
p := filepath.Join(s, filepath.Base(tf.Path))
t.Logf("checking for existence of hard link %q", p)
if _, err := os.Stat(p); os.IsNotExist(err) {
t.Fatalf("unable to find file %q", p)
}
}
}
}
func newFileDir(dir string, values ...keyValues) ([]string, error) {

View File

@ -561,6 +561,14 @@ func (s *Shard) Restore(r io.Reader, basePath string) error {
return s.Open()
}
// CreateSnapshot will return a path to a temp directory
// containing hard links to the underlying shard files
func (s *Shard) CreateSnapshot() (string, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.CreateSnapshot()
}
// Shards represents a sortable list of shards.
type Shards []*Shard

View File

@ -308,6 +308,17 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
return nil
}
// CreateShardSnapShot will create a hard link to the underlying shard and return a path
// The caller is responsible for cleaning up (removing) the file path returned
func (s *Store) CreateShardSnapshot(id uint64) (string, error) {
sh := s.Shard(id)
if sh == nil {
return "", ErrShardNotFound
}
return sh.CreateSnapshot()
}
// DeleteShard removes a shard from disk.
func (s *Store) DeleteShard(shardID uint64) error {
s.mu.Lock()

View File

@ -137,6 +137,29 @@ func TestStore_DeleteShard(t *testing.T) {
}
}
// Ensure the store can create a snapshot to a shard.
func TestStore_CreateShardSnapShot(t *testing.T) {
s := MustOpenStore()
defer s.Close()
// Create a new shard and verify that it exists.
if err := s.CreateShard("db0", "rp0", 1); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard")
} else if di := s.DatabaseIndex("db0"); di == nil {
t.Errorf("expected database index")
}
dir, e := s.CreateShardSnapshot(1)
if e != nil {
t.Fatal(e)
}
if dir == "" {
t.Fatal("empty directory name")
}
}
// Ensure the store reports an error when it can't open a database directory.
func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
s := NewStore()