From 9c1d7d00a9fa89ea667e233e21e26aba91f5a04b Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 6 Dec 2017 09:35:24 -0700 Subject: [PATCH 1/7] Switch O_SYNC to periodic fsync O_SYNC was added with writing TSM files to fix an issue where the final fsync at the end cause the process to stall. This ends up increase disk util to much so this change switches to use multiple fsyncs while writing the TSM file instead of O_SYNC or one large one at the end. --- tsdb/engine/tsm1/compact.go | 2 +- tsdb/engine/tsm1/writer.go | 37 ++++++++++++++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index acc888bc2c..67cab267ce 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -1027,7 +1027,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ } func (c *Compactor) write(path string, iter KeyIterator) (err error) { - fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) + fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return errCompactionInProgress{err: err} } diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index b00ec3c4af..146a9fea17 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -97,6 +97,10 @@ const ( // max length of a key in an index entry (measurement + tags) maxKeyLength = (1 << (2 * 8)) - 1 + + // The threshold amount data written before we periodically fsync a TSM file. This helps avoid + // long pauses due to very large fsyncs at the end of writing a TSM file. + fsyncEvery = 512 * 1024 * 1024 ) var ( @@ -233,7 +237,7 @@ func (e *IndexEntry) String() string { // NewIndexWriter returns a new IndexWriter. func NewIndexWriter() IndexWriter { - buf := bytes.NewBuffer(make([]byte, 0, 4096)) + buf := bytes.NewBuffer(make([]byte, 0, 1024*1024)) return &directIndex{buf: buf, w: bufio.NewWriter(buf)} } @@ -253,6 +257,9 @@ type indexBlock struct { type directIndex struct { keyCount int size uint32 + + // The bytes written count of when we last fsync'd + lastSync uint32 fd *os.File buf *bytes.Buffer @@ -377,7 +384,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) { return 0, err } - return io.Copy(w, bufio.NewReader(d.fd)) + return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024)) } func (d *directIndex) flush(w io.Writer) (int64, error) { @@ -435,6 +442,15 @@ func (d *directIndex) flush(w io.Writer) (int64, error) { d.indexEntries.Type = 0 d.indexEntries.entries = d.indexEntries.entries[:0] + // If this is a disk based index and we've written more than the fsync threshold, + // fsync the data to avoid long pauses later on. + if d.fd != nil && d.size-d.lastSync > fsyncEvery { + if err := d.fd.Sync(); err != nil { + return N, err + } + d.lastSync = d.size + } + return N, nil } @@ -486,13 +502,16 @@ type tsmWriter struct { w *bufio.Writer index IndexWriter n int64 + + // The bytes written count of when we last fsync'd + lastSync int64 } // NewTSMWriter returns a new TSMWriter writing to w. func NewTSMWriter(w io.Writer) (TSMWriter, error) { var index IndexWriter if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") { - f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) + f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return nil, err } @@ -612,6 +631,14 @@ func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte) // Increment file position pointer (checksum + block len) t.n += int64(n) + // fsync the file periodically to avoid long pauses with very big files. + if t.n-t.lastSync > fsyncEvery { + if err := t.sync(); err != nil { + return err + } + t.lastSync = t.n + } + if len(t.index.Entries(key)) >= maxIndexEntries { return ErrMaxBlocksExceeded } @@ -646,6 +673,10 @@ func (t *tsmWriter) Flush() error { return err } + return t.sync() +} + +func (t *tsmWriter) sync() error { if f, ok := t.wrapped.(*os.File); ok { if err := f.Sync(); err != nil { return err From e584cb6842b89f9c3e166fc3226b89a31101dcac Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 6 Dec 2017 09:39:03 -0700 Subject: [PATCH 2/7] Increase cache-snapshot-memory-size default With the recent changes to compactions and snapshotting, the current default can create lots of small level 1 TSM files. This increases the default in order to create larger level 1 files and less disk utilization. --- etc/config.sample.toml | 2 +- tsdb/config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index c56eb1ca63..5f9a4e8f78 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -79,7 +79,7 @@ # snapshot the cache and write it to a TSM file, freeing up memory # Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k). # Values without a size suffix are in bytes. - # cache-snapshot-memory-size = "25m" + # cache-snapshot-memory-size = "256m" # CacheSnapshotWriteColdDuration is the length of time at # which the engine will snapshot the cache and write it to diff --git a/tsdb/config.go b/tsdb/config.go index 6ab91feaad..5041a44cae 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -26,7 +26,7 @@ const ( // DefaultCacheSnapshotMemorySize is the size at which the engine will // snapshot the cache and write it to a TSM file, freeing up memory - DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB + DefaultCacheSnapshotMemorySize = 256 * 1024 * 1024 // 256MB // DefaultCacheSnapshotWriteColdDuration is the length of time at which // the engine will snapshot the cache and write it to a new TSM file if From 56d8f05f1211467d9228f0f5ce748b8fa81b4eee Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 6 Dec 2017 09:40:49 -0700 Subject: [PATCH 3/7] Cap concurrent compactions when large number of cores exists The default max-concurrent-compactions settings allows up to 50% of cores to be used for compactions. When the number of cores is high (>8), this can lead to high disk utilization. Capping at 4 and combined with high snapshot sizes seems to keep the compaction backlog reasonable and not tax the disks as much. Systems with lots of IOPS, RAM and CPU cores may want to increase these. --- tsdb/store.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tsdb/store.go b/tsdb/store.go index 14e4e87620..ae889c46cf 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -169,6 +169,12 @@ func (s *Store) loadShards() error { lim := s.EngineOptions.Config.MaxConcurrentCompactions if lim == 0 { lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions + + // On systems with more cores, cap at 4 to reduce disk utilization + if lim > 4 { + lim = 4 + } + if lim < 1 { lim = 1 } From 0a85ce2b73552f95e5f3152f11573f842932b638 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 6 Dec 2017 09:45:01 -0700 Subject: [PATCH 4/7] Schedule compactions less aggressively This runs the scheduler every 5s instead of every 1s as well as reduces the scope of a level 1 plan. --- tsdb/engine/tsm1/compact.go | 9 +-------- tsdb/engine/tsm1/compact_test.go | 16 ++++------------ tsdb/engine/tsm1/engine.go | 2 +- 3 files changed, 6 insertions(+), 21 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 67cab267ce..39759be3a9 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -263,16 +263,9 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { minGenerations = level + 1 } - // Each compaction group should run against 4 generations. For level 1, since these - // can get created much more quickly, bump the grouping to 8 to keep file counts lower. - groupSize := 4 - if level == 1 { - groupSize = 8 - } - var cGroups []CompactionGroup for _, group := range levelGroups { - for _, chunk := range group.chunk(groupSize) { + for _, chunk := range group.chunk(4) { var cGroup CompactionGroup var hasTombstones bool for _, gen := range chunk { diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index e3608f9837..69e5b0c392 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1792,14 +1792,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { Path: "08-01.tsm1", Size: 1 * 1024 * 1024, }, - tsm1.FileStat{ - Path: "09-01.tsm1", - Size: 1 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "10-01.tsm1", - Size: 1 * 1024 * 1024, - }, } cp := tsm1.NewDefaultPlanner( @@ -1810,8 +1802,8 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} - expFiles2 := []tsm1.FileStat{data[8], data[9]} + expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} + expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { @@ -1887,8 +1879,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} - expFiles2 := []tsm1.FileStat{data[8], data[9]} + expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} + expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 4d2773af66..63b6a72584 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1509,7 +1509,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compact(quit <-chan struct{}) { - t := time.NewTicker(time.Second) + t := time.NewTicker(5 * time.Second) defer t.Stop() for { From 9f2a422039a3a1d3f536724c51cb73bbeeccbb23 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 6 Dec 2017 13:29:04 -0700 Subject: [PATCH 5/7] Use disk based TSM index more selectively The disk based temp index for writing a TSM file was used for compactions other than snapshot compactions. That meant it was used even for smaller compactiont that would not use much memory. An unintended side-effect of this is higher disk IO when copying the index to the final file. This switches when to use the index based on the estimated size of the new index that will be written. This isn't exact, but seems to work kick in at higher cardinality and larger compactions when it is necessary to avoid OOMs. --- tsdb/engine/tsm1/compact.go | 35 ++++++++++++++++++++++++++++++++--- tsdb/engine/tsm1/writer.go | 11 ++++++++++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 39759be3a9..57d938a993 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -1026,10 +1026,22 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) { } // Create the write for the new TSM file. - w, err := NewTSMWriter(fd) - if err != nil { - return err + var w TSMWriter + + // Use a disk based TSM buffer if it looks like we might create a big index + // in memory. + if iter.EstimatedIndexSize() > 64*1024*1024 { + w, err = NewTSMWriterWithDiskBuffer(fd) + if err != nil { + return err + } + } else { + w, err = NewTSMWriter(fd) + if err != nil { + return err + } } + defer func() { closeErr := w.Close() if err == nil { @@ -1138,6 +1150,10 @@ type KeyIterator interface { // Err returns any errors encountered during iteration. Err() error + + // EstimatedIndexSize returns the estimated size of the index that would + // be required to store all the series and entries in the KeyIterator. + EstimatedIndexSize() int } // tsmKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces @@ -1271,6 +1287,14 @@ func (k *tsmKeyIterator) hasMergedValues() bool { len(k.mergedBooleanValues) > 0 } +func (k *tsmKeyIterator) EstimatedIndexSize() int { + var size uint32 + for _, r := range k.readers { + size += r.IndexSize() + } + return int(size) / len(k.readers) +} + // Next returns true if there are any values remaining in the iterator. func (k *tsmKeyIterator) Next() bool { RETRY: @@ -1509,6 +1533,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte return cki } +func (c *cacheKeyIterator) EstimatedIndexSize() int { + // We return 0 here since we already have all the entries in memory to write an index. + return 0 +} + func (c *cacheKeyIterator) encode() { concurrency := runtime.GOMAXPROCS(0) n := len(c.ready) diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index 146a9fea17..eda8985f2e 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -509,14 +509,23 @@ type tsmWriter struct { // NewTSMWriter returns a new TSMWriter writing to w. func NewTSMWriter(w io.Writer) (TSMWriter, error) { + index := NewIndexWriter() + return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil +} + +// NewTSMWriterWithDiskBuffer returns a new TSMWriter writing to w and will use a disk +// based buffer for the TSM index if possible. +func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) { var index IndexWriter - if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") { + // Make sure is a File so we can write the temp index alongside it. + if fw, ok := w.(*os.File); ok { f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return nil, err } index = NewDiskIndexWriter(f) } else { + // w is not a file, just use an inmem index index = NewIndexWriter() } From 0b929fe669cfaa6ddf243bb5337e1b2f77c9fb34 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 6 Dec 2017 13:39:38 -0700 Subject: [PATCH 6/7] Update changelog --- CHANGELOG.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d550dd7252..153ddd75fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,18 @@ - [#9144](https://github.com/influxdata/influxdb/issues/9144): Fix query compilation so multiple nested distinct calls is allowable - [#8789](https://github.com/influxdata/influxdb/issues/8789): Fix CLI to allow quoted database names in use statement +## v1.4.3 [unreleased] + +### Configuration Changes + +#### `[data]` Section + +The default value for `cache-snapshot-memory-size` has been changed from `25m` to `256m`. + +### Bugfixes + +- [#9201](https://github.com/influxdata/influxdb/issues/9201): Fix higher disk i/o utilization + ## v1.4.2 [2017-11-15] Refer to the 1.4.0 breaking changes section if `influxd` fails to start with an `incompatible tsi1 index MANIFEST` error. From a0b2195d6baa7c8afdabca3f058381de4fe7884b Mon Sep 17 00:00:00 2001 From: Adam Date: Thu, 7 Dec 2017 11:35:20 -0500 Subject: [PATCH 7/7] Pulled in backup-relevant code for review (#9193) for issue #8879 --- CHANGELOG.md | 2 + cmd/influxd/backup/backup.go | 350 ++++++++++++++++++------- cmd/influxd/backup_util/backup_util.go | 146 +++++++++++ cmd/influxd/restore/restore.go | 6 +- internal/tsdb_store.go | 4 + man/influxd-backup.txt | 8 + services/snapshotter/client.go | 4 + services/snapshotter/client_test.go | 6 + services/snapshotter/service.go | 91 +++++-- services/snapshotter/service_test.go | 30 ++- tsdb/engine.go | 1 + tsdb/engine/tsm1/engine.go | 136 ++++++++++ tsdb/engine/tsm1/engine_test.go | 262 ++++++++++++++++++ tsdb/shard.go | 8 + tsdb/store.go | 14 + 15 files changed, 942 insertions(+), 126 deletions(-) create mode 100644 cmd/influxd/backup_util/backup_util.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 153ddd75fb..1ddce5f840 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ ### Features - [#8495](https://github.com/influxdata/influxdb/pull/8495): Improve CLI connection warnings +- [#9146](https://github.com/influxdata/influxdb/issues/9146): Backup can produce data in the same format as the enterprise backup/restore tool. +- [#8879](https://github.com/influxdata/influxdb/issues/8879): Export functionality using start/end to filter exported data by timestamp - [#9084](https://github.com/influxdata/influxdb/pull/9084): Handle high cardinality deletes in TSM engine - [#9162](https://github.com/influxdata/influxdb/pull/9162): Improve inmem index startup performance for high cardinality. - [#8491](https://github.com/influxdata/influxdb/pull/8491): Add further tsi support for streaming/copying shards. diff --git a/cmd/influxd/backup/backup.go b/cmd/influxd/backup/backup.go index db90fcea47..13e14d4b9b 100644 --- a/cmd/influxd/backup/backup.go +++ b/cmd/influxd/backup/backup.go @@ -1,4 +1,4 @@ -// Package backup is the backup subcommand for the influxd command. +// Package backup implements both the backup and export subcommands for the influxd command. package backup import ( @@ -8,28 +8,17 @@ import ( "flag" "fmt" "io" - "io/ioutil" "log" "os" "path/filepath" "strconv" - "strings" "time" + "compress/gzip" + "github.com/influxdata/influxdb/cmd/influxd/backup_util" "github.com/influxdata/influxdb/services/snapshotter" "github.com/influxdata/influxdb/tcp" -) - -const ( - // Suffix is a suffix added to the backup while it's in-process. - Suffix = ".pending" - - // Metafile is the base name given to the metastore backups. - Metafile = "meta" - - // BackupFilePattern is the beginning of the pattern for a backup - // file. They follow the scheme ... - BackupFilePattern = "%s.%s.%05d" + "io/ioutil" ) // Command represents the program execution for "influxd backup". @@ -42,9 +31,20 @@ type Command struct { Stderr io.Writer Stdout io.Writer - host string - path string - database string + host string + path string + database string + retentionPolicy string + shardID string + + isBackup bool + since time.Time + start time.Time + end time.Time + + enterprise bool + manifest backup_util.Manifest + enterpriseFileBase string } // NewCommand returns a new instance of Command with default settings. @@ -62,110 +62,240 @@ func (cmd *Command) Run(args ...string) error { cmd.StderrLogger = log.New(cmd.Stderr, "", log.LstdFlags) // Parse command line arguments. - retentionPolicy, shardID, since, err := cmd.parseFlags(args) + err := cmd.parseFlags(args) if err != nil { return err } - // based on the arguments passed in we only backup the minimum - if shardID != "" { + if cmd.shardID != "" { // always backup the metastore if err := cmd.backupMetastore(); err != nil { return err } - err = cmd.backupShard(retentionPolicy, shardID, since) - } else if retentionPolicy != "" { - err = cmd.backupRetentionPolicy(retentionPolicy, since) + err = cmd.backupShard(cmd.database, cmd.retentionPolicy, cmd.shardID) + + } else if cmd.retentionPolicy != "" { + // always backup the metastore + if err := cmd.backupMetastore(); err != nil { + return err + } + err = cmd.backupRetentionPolicy() } else if cmd.database != "" { - err = cmd.backupDatabase(since) + // always backup the metastore + if err := cmd.backupMetastore(); err != nil { + return err + } + err = cmd.backupDatabase() } else { - err = cmd.backupMetastore() + // always backup the metastore + if err := cmd.backupMetastore(); err != nil { + return err + } + + cmd.StdoutLogger.Println("No database, retention policy or shard ID given. Full meta store backed up.") + if cmd.enterprise { + cmd.StdoutLogger.Println("Backing up all databases in enterprise format") + if err := cmd.backupDatabase(); err != nil { + cmd.StderrLogger.Printf("backup failed: %v", err) + return err + } + + } + + } + + if cmd.enterprise { + cmd.manifest.Platform = "OSS" + filename := cmd.enterpriseFileBase + ".manifest" + if err := cmd.manifest.Save(filepath.Join(cmd.path, filename)); err != nil { + cmd.StderrLogger.Printf("manifest save failed: %v", err) + return err + } } if err != nil { cmd.StderrLogger.Printf("backup failed: %v", err) return err } - cmd.StdoutLogger.Println("backup complete") return nil } // parseFlags parses and validates the command line arguments into a request object. -func (cmd *Command) parseFlags(args []string) (retentionPolicy, shardID string, since time.Time, err error) { +func (cmd *Command) parseFlags(args []string) (err error) { fs := flag.NewFlagSet("", flag.ContinueOnError) fs.StringVar(&cmd.host, "host", "localhost:8088", "") fs.StringVar(&cmd.database, "database", "", "") - fs.StringVar(&retentionPolicy, "retention", "", "") - fs.StringVar(&shardID, "shard", "", "") + fs.StringVar(&cmd.retentionPolicy, "retention", "", "") + fs.StringVar(&cmd.shardID, "shard", "", "") var sinceArg string + var startArg string + var endArg string fs.StringVar(&sinceArg, "since", "", "") + fs.StringVar(&startArg, "start", "", "") + fs.StringVar(&endArg, "end", "", "") + fs.BoolVar(&cmd.enterprise, "enterprise", false, "") fs.SetOutput(cmd.Stderr) fs.Usage = cmd.printUsage err = fs.Parse(args) if err != nil { - return + return err } + + // for enterprise saving, if needed + cmd.enterpriseFileBase = time.Now().UTC().Format(backup_util.EnterpriseFileNamePattern) + + // if startArg and endArg are unspecified, then assume we are doing a full backup of the DB + cmd.isBackup = startArg == "" && endArg == "" + if sinceArg != "" { - since, err = time.Parse(time.RFC3339, sinceArg) + cmd.since, err = time.Parse(time.RFC3339, sinceArg) if err != nil { - return + return err + } + } + if startArg != "" { + if cmd.isBackup { + return errors.New("backup command uses one of -since or -start/-end") + } + cmd.start, err = time.Parse(time.RFC3339, startArg) + if err != nil { + return err + } + } + + if endArg != "" { + if cmd.isBackup { + return errors.New("backup command uses one of -since or -start/-end") + } + cmd.end, err = time.Parse(time.RFC3339, endArg) + if err != nil { + return err + } + + // start should be < end + if !cmd.start.Before(cmd.end) { + return errors.New("start date must be before end date") } } // Ensure that only one arg is specified. - if fs.NArg() == 0 { - return "", "", time.Unix(0, 0), errors.New("backup destination path required") - } else if fs.NArg() != 1 { - return "", "", time.Unix(0, 0), errors.New("only one backup path allowed") + if fs.NArg() != 1 { + return errors.New("Exactly one backup path is required.") } cmd.path = fs.Arg(0) err = os.MkdirAll(cmd.path, 0700) - return + return err } -// backupShard will write a tar archive of the passed in shard with any TSM files that have been -// created since the time passed in -func (cmd *Command) backupShard(retentionPolicy string, shardID string, since time.Time) error { - id, err := strconv.ParseUint(shardID, 10, 64) +func (cmd *Command) backupShard(db, rp, sid string) error { + reqType := snapshotter.RequestShardBackup + if !cmd.isBackup { + reqType = snapshotter.RequestShardExport + } + + id, err := strconv.ParseUint(sid, 10, 64) if err != nil { return err } - shardArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, fmt.Sprintf(BackupFilePattern, cmd.database, retentionPolicy, id))) + shardArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, fmt.Sprintf(backup_util.BackupFilePattern, db, rp, id))) if err != nil { return err } cmd.StdoutLogger.Printf("backing up db=%v rp=%v shard=%v to %s since %s", - cmd.database, retentionPolicy, shardID, shardArchivePath, since) + db, rp, sid, shardArchivePath, cmd.since) req := &snapshotter.Request{ - Type: snapshotter.RequestShardBackup, - Database: cmd.database, - RetentionPolicy: retentionPolicy, - ShardID: id, - Since: since, + Type: reqType, + BackupDatabase: db, + BackupRetentionPolicy: rp, + ShardID: id, + Since: cmd.since, + ExportStart: cmd.start, + ExportEnd: cmd.end, } // TODO: verify shard backup data - return cmd.downloadAndVerify(req, shardArchivePath, nil) + err = cmd.downloadAndVerify(req, shardArchivePath, nil) + + if err != nil { + return err + } + + if cmd.enterprise { + f, err := os.Open(shardArchivePath) + defer f.Close() + defer os.Remove(shardArchivePath) + + filePrefix := cmd.enterpriseFileBase + ".s" + sid + filename := filePrefix + ".tar.gz" + out, err := os.OpenFile(filepath.Join(cmd.path, filename), os.O_CREATE|os.O_RDWR, 0600) + + zw := gzip.NewWriter(out) + zw.Name = filePrefix + ".tar" + + cw := backup_util.CountingWriter{Writer: zw} + + _, err = io.Copy(&cw, f) + if err != nil { + if err := zw.Close(); err != nil { + return err + } + + if err := out.Close(); err != nil { + return err + } + return err + } + + shardid, err := strconv.ParseUint(sid, 10, 64) + if err != nil { + if err := zw.Close(); err != nil { + return err + } + + if err := out.Close(); err != nil { + return err + } + return err + } + cmd.manifest.Files = append(cmd.manifest.Files, backup_util.Entry{ + Database: db, + Policy: rp, + ShardID: shardid, + FileName: filename, + Size: cw.Total, + LastModified: 0, + }) + + if err := zw.Close(); err != nil { + return err + } + + if err := out.Close(); err != nil { + return err + } + } + return nil + } -// backupDatabase will request the database information from the server and then backup the metastore and -// every shard in every retention policy in the database. Each shard will be written to a separate tar. -func (cmd *Command) backupDatabase(since time.Time) error { - cmd.StdoutLogger.Printf("backing up db=%s since %s", cmd.database, since) +// backupDatabase will request the database information from the server and then backup +// every shard in every retention policy in the database. Each shard will be written to a separate file. +func (cmd *Command) backupDatabase() error { + cmd.StdoutLogger.Printf("backing up db=%s", cmd.database) req := &snapshotter.Request{ - Type: snapshotter.RequestDatabaseInfo, - Database: cmd.database, + Type: snapshotter.RequestDatabaseInfo, + BackupDatabase: cmd.database, } response, err := cmd.requestInfo(req) @@ -173,18 +303,18 @@ func (cmd *Command) backupDatabase(since time.Time) error { return err } - return cmd.backupResponsePaths(response, since) + return cmd.backupResponsePaths(response) } // backupRetentionPolicy will request the retention policy information from the server and then backup -// the metastore and every shard in the retention policy. Each shard will be written to a separate tar. -func (cmd *Command) backupRetentionPolicy(retentionPolicy string, since time.Time) error { - cmd.StdoutLogger.Printf("backing up rp=%s since %s", retentionPolicy, since) +// every shard in the retention policy. Each shard will be written to a separate file. +func (cmd *Command) backupRetentionPolicy() error { + cmd.StdoutLogger.Printf("backing up rp=%s since %s", cmd.retentionPolicy, cmd.since) req := &snapshotter.Request{ - Type: snapshotter.RequestRetentionPolicyInfo, - Database: cmd.database, - RetentionPolicy: retentionPolicy, + Type: snapshotter.RequestRetentionPolicyInfo, + BackupDatabase: cmd.database, + BackupRetentionPolicy: cmd.retentionPolicy, } response, err := cmd.requestInfo(req) @@ -192,23 +322,22 @@ func (cmd *Command) backupRetentionPolicy(retentionPolicy string, since time.Tim return err } - return cmd.backupResponsePaths(response, since) + return cmd.backupResponsePaths(response) } -// backupResponsePaths will backup the metastore and all shard paths in the response struct -func (cmd *Command) backupResponsePaths(response *snapshotter.Response, since time.Time) error { - if err := cmd.backupMetastore(); err != nil { - return err - } +// backupResponsePaths will backup all shards identified by shard paths in the response struct +func (cmd *Command) backupResponsePaths(response *snapshotter.Response) error { // loop through the returned paths and back up each shard for _, path := range response.Paths { - rp, id, err := retentionAndShardFromPath(path) + db, rp, id, err := backup_util.DBRetentionAndShardFromPath(path) if err != nil { return err } - if err := cmd.backupShard(rp, id, since); err != nil { + err = cmd.backupShard(db, rp, id) + + if err != nil { return err } } @@ -216,10 +345,10 @@ func (cmd *Command) backupResponsePaths(response *snapshotter.Response, since ti return nil } -// backupMetastore will backup the metastore on the host to the passed in path. Database and retention policy backups -// will force a backup of the metastore as well as requesting a specific shard backup from the command line +// backupMetastore will backup the whole metastore on the host to the backup path +// if useDB is non-empty, it will backup metadata only for the named database. func (cmd *Command) backupMetastore() error { - metastoreArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, Metafile)) + metastoreArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, backup_util.Metafile)) if err != nil { return err } @@ -230,13 +359,24 @@ func (cmd *Command) backupMetastore() error { Type: snapshotter.RequestMetastoreBackup, } - return cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error { - binData, err := ioutil.ReadFile(file) + err = cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error { + f, err := os.Open(file) + if err != nil { + return err + } + defer f.Close() + + var magicByte [8]byte + n, err := io.ReadFull(f, magicByte[:]) if err != nil { return err } - magic := binary.BigEndian.Uint64(binData[:8]) + if n < 8 { + return errors.New("Not enough bytes data to verify") + } + + magic := binary.BigEndian.Uint64(magicByte[:]) if magic != snapshotter.BackupMagicHeader { cmd.StderrLogger.Println("Invalid metadata blob, ensure the metadata service is running (default port 8088)") return errors.New("invalid metadata received") @@ -244,6 +384,28 @@ func (cmd *Command) backupMetastore() error { return nil }) + + if err != nil { + return err + } + + if cmd.enterprise { + metaBytes, err := backup_util.GetMetaBytes(metastoreArchivePath) + defer os.Remove(metastoreArchivePath) + if err != nil { + return err + } + filename := cmd.enterpriseFileBase + ".meta" + if err := ioutil.WriteFile(filepath.Join(cmd.path, filename), metaBytes, 0644); err != nil { + fmt.Fprintln(cmd.Stdout, "Error.") + return err + } + + cmd.manifest.Meta.FileName = filename + cmd.manifest.Meta.Size = int64(len(metaBytes)) + } + + return nil } // nextPath returns the next file to write to. @@ -262,7 +424,7 @@ func (cmd *Command) nextPath(path string) (string, error) { // downloadAndVerify will download either the metastore or shard to a temp file and then // rename it to a good backup file name after complete func (cmd *Command) downloadAndVerify(req *snapshotter.Request, path string, validator func(string) error) error { - tmppath := path + Suffix + tmppath := path + backup_util.Suffix if err := cmd.download(req, tmppath); err != nil { return err } @@ -312,6 +474,11 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error { } defer conn.Close() + _, err = conn.Write([]byte{byte(req.Type)}) + if err != nil { + return err + } + // Write the request if err := json.NewEncoder(conn).Encode(req); err != nil { return fmt.Errorf("encode snapshot request: %s", err) @@ -336,11 +503,16 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error { // requestInfo will request the database or retention policy information from the host func (cmd *Command) requestInfo(request *snapshotter.Request) (*snapshotter.Response, error) { // Connect to snapshotter service. + var r snapshotter.Response conn, err := tcp.Dial("tcp", cmd.host, snapshotter.MuxHeader) if err != nil { return nil, err } defer conn.Close() + _, err = conn.Write([]byte{byte(request.Type)}) + if err != nil { + return &r, err + } // Write the request if err := json.NewEncoder(conn).Encode(request); err != nil { @@ -348,7 +520,7 @@ func (cmd *Command) requestInfo(request *snapshotter.Request) (*snapshotter.Resp } // Read the response - var r snapshotter.Response + if err := json.NewDecoder(conn).Decode(&r); err != nil { return nil, err } @@ -358,7 +530,7 @@ func (cmd *Command) requestInfo(request *snapshotter.Request) (*snapshotter.Resp // printUsage prints the usage message to STDERR. func (cmd *Command) printUsage() { - fmt.Fprintf(cmd.Stdout, `Downloads a snapshot of a data node and saves it to disk. + fmt.Fprintf(cmd.Stdout, `Downloads a file level age-based snapshot of a data node and saves it to disk. Usage: influxd backup [flags] PATH @@ -372,18 +544,14 @@ Usage: influxd backup [flags] PATH Optional. The shard id to backup. If specified, retention is required. -since <2015-12-24T08:12:23Z> Optional. Do an incremental backup since the passed in RFC3339 - formatted time. + formatted time. Not compatible with -start or -end. + -start <2015-12-24T08:12:23Z> + All points earlier than this time stamp will be excluded from the export. Not compatible with -since. + -end <2015-12-24T08:12:23Z> + All points later than this time stamp will be excluded from the export. Not compatible with -since. + -enterprise + Generate backup files in the format used for influxdb enterprise. `) -} -// retentionAndShardFromPath will take the shard relative path and split it into the -// retention policy name and shard ID. The first part of the path should be the database name. -func retentionAndShardFromPath(path string) (retention, shard string, err error) { - a := strings.Split(path, string(filepath.Separator)) - if len(a) != 3 { - return "", "", fmt.Errorf("expected database, retention policy, and shard id in path: %s", path) - } - - return a[1], a[2], nil } diff --git a/cmd/influxd/backup_util/backup_util.go b/cmd/influxd/backup_util/backup_util.go new file mode 100644 index 0000000000..9c2a36d43a --- /dev/null +++ b/cmd/influxd/backup_util/backup_util.go @@ -0,0 +1,146 @@ +package backup_util + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "os" + "strings" + + "encoding/json" + "github.com/influxdata/influxdb/services/snapshotter" + "io/ioutil" + "path/filepath" +) + +const ( + // Suffix is a suffix added to the backup while it's in-process. + Suffix = ".pending" + + // Metafile is the base name given to the metastore backups. + Metafile = "meta" + + // BackupFilePattern is the beginning of the pattern for a backup + // file. They follow the scheme ... + BackupFilePattern = "%s.%s.%05d" + + EnterpriseFileNamePattern = "20060102T150405Z" + + OSSManifest = "OSS" + + ENTManifest = "ENT" +) + +func GetMetaBytes(fname string) ([]byte, error) { + f, err := os.Open(fname) + if err != nil { + return []byte{}, err + } + + var buf bytes.Buffer + if _, err := io.Copy(&buf, f); err != nil { + return []byte{}, fmt.Errorf("copy: %s", err) + } + + b := buf.Bytes() + var i int + + // Make sure the file is actually a meta store backup file + magic := binary.BigEndian.Uint64(b[:8]) + if magic != snapshotter.BackupMagicHeader { + return []byte{}, fmt.Errorf("invalid metadata file") + } + i += 8 + + // Size of the meta store bytes + length := int(binary.BigEndian.Uint64(b[i : i+8])) + i += 8 + metaBytes := b[i : i+length] + + return metaBytes, nil +} + +// Manifest lists the meta and shard file information contained in the backup. +// If Limited is false, the manifest contains a full backup, otherwise +// it is a partial backup. +type Manifest struct { + Platform string `json:"platform"` + Meta MetaEntry `json:"meta"` + Limited bool `json:"limited"` + Files []Entry `json:"files"` + + // If limited is true, then one (or all) of the following fields will be set + + Database string `json:"database,omitempty"` + Policy string `json:"policy,omitempty"` + ShardID uint64 `json:"shard_id,omitempty"` +} + +// Entry contains the data information for a backed up shard. +type Entry struct { + Database string `json:"database"` + Policy string `json:"policy"` + ShardID uint64 `json:"shardID"` + FileName string `json:"fileName"` + Size int64 `json:"size"` + LastModified int64 `json:"lastModified"` +} + +func (e *Entry) SizeOrZero() int64 { + if e == nil { + return 0 + } + return e.Size +} + +// MetaEntry contains the meta store information for a backup. +type MetaEntry struct { + FileName string `json:"fileName"` + Size int64 `json:"size"` +} + +// Size returns the size of the manifest. +func (m *Manifest) Size() int64 { + if m == nil { + return 0 + } + + size := m.Meta.Size + + for _, f := range m.Files { + size += f.Size + } + return size +} + +func (manifest *Manifest) Save(filename string) error { + b, err := json.MarshalIndent(manifest, "", " ") + if err != nil { + return fmt.Errorf("create manifest: %v", err) + } + + return ioutil.WriteFile(filename, b, 0600) +} + +type CountingWriter struct { + io.Writer + Total int64 // Total # of bytes transferred +} + +func (w *CountingWriter) Write(p []byte) (n int, err error) { + n, err = w.Writer.Write(p) + w.Total += int64(n) + return +} + +// retentionAndShardFromPath will take the shard relative path and split it into the +// retention policy name and shard ID. The first part of the path should be the database name. +func DBRetentionAndShardFromPath(path string) (db, retention, shard string, err error) { + a := strings.Split(path, string(filepath.Separator)) + if len(a) != 3 { + return "", "", "", fmt.Errorf("expected database, retention policy, and shard id in path: %s", path) + } + + return a[0], a[1], a[2], nil +} diff --git a/cmd/influxd/restore/restore.go b/cmd/influxd/restore/restore.go index 932aeb7236..dd4eb6a048 100644 --- a/cmd/influxd/restore/restore.go +++ b/cmd/influxd/restore/restore.go @@ -14,7 +14,7 @@ import ( "path/filepath" "strconv" - "github.com/influxdata/influxdb/cmd/influxd/backup" + "github.com/influxdata/influxdb/cmd/influxd/backup_util" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/services/snapshotter" ) @@ -116,7 +116,7 @@ func (cmd *Command) parseFlags(args []string) error { // cluster and replaces the root metadata. func (cmd *Command) unpackMeta() error { // find the meta file - metaFiles, err := filepath.Glob(filepath.Join(cmd.backupFilesPath, backup.Metafile+".*")) + metaFiles, err := filepath.Glob(filepath.Join(cmd.backupFilesPath, backup_util.Metafile+".*")) if err != nil { return err } @@ -227,7 +227,7 @@ func (cmd *Command) unpackShard(shardID string) error { } // find the shard backup files - pat := filepath.Join(cmd.backupFilesPath, fmt.Sprintf(backup.BackupFilePattern, cmd.database, cmd.retention, id)) + pat := filepath.Join(cmd.backupFilesPath, fmt.Sprintf(backup_util.BackupFilePattern, cmd.database, cmd.retention, id)) return cmd.unpackFiles(pat + ".*") } diff --git a/internal/tsdb_store.go b/internal/tsdb_store.go index 1e0a4dec35..3054c8cf61 100644 --- a/internal/tsdb_store.go +++ b/internal/tsdb_store.go @@ -14,6 +14,7 @@ import ( // TSDBStoreMock is a mockable implementation of tsdb.Store. type TSDBStoreMock struct { BackupShardFn func(id uint64, since time.Time, w io.Writer) error + ExportShardFn func(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error CloseFn func() error CreateShardFn func(database, policy string, shardID uint64, enabled bool) error CreateShardSnapshotFn func(id uint64) (string, error) @@ -50,6 +51,9 @@ type TSDBStoreMock struct { func (s *TSDBStoreMock) BackupShard(id uint64, since time.Time, w io.Writer) error { return s.BackupShardFn(id, since, w) } +func (s *TSDBStoreMock) ExportShard(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error { + return s.ExportShardFn(id, ExportStart, ExportEnd, w) +} func (s *TSDBStoreMock) Close() error { return s.CloseFn() } func (s *TSDBStoreMock) CreateShard(database string, retentionPolicy string, shardID uint64, enabled bool) error { return s.CreateShardFn(database, retentionPolicy, shardID, enabled) diff --git a/man/influxd-backup.txt b/man/influxd-backup.txt index d7177ba3b3..188a2b09ab 100644 --- a/man/influxd-backup.txt +++ b/man/influxd-backup.txt @@ -5,6 +5,7 @@ NAME ---- influxd-backup - Downloads a snapshot of a data node and saves it to disk + SYNOPSIS -------- 'influxd backup' [options] @@ -30,6 +31,13 @@ OPTIONS -since <2015-12-24T08:12:13Z>:: Do an incremental backup since the passed in time. The time needs to be in the RFC3339 format. Optional. +-start <2015-12-24T08:12:23Z>:: + All points earlier than this time stamp will be excluded from the export. Not compatible with -since. +-end <2015-12-24T08:12:23Z>:: + All points later than this time stamp will be excluded from the export. Not compatible with -since. +-enterprise:: + Generate backup files in the format used for influxdb enterprise. + SEE ALSO -------- *influxd-restore*(1) diff --git a/services/snapshotter/client.go b/services/snapshotter/client.go index ac6c87411b..2c1e1b8ada 100644 --- a/services/snapshotter/client.go +++ b/services/snapshotter/client.go @@ -64,6 +64,10 @@ func (c *Client) doRequest(req *Request) ([]byte, error) { defer conn.Close() // Write the request + _, err = conn.Write([]byte{byte(req.Type)}) + if err != nil { + return nil, err + } if err := json.NewEncoder(conn).Encode(req); err != nil { return nil, fmt.Errorf("encode snapshot request: %s", err) } diff --git a/services/snapshotter/client_test.go b/services/snapshotter/client_test.go index ee5c5f2a73..b4427ee16d 100644 --- a/services/snapshotter/client_test.go +++ b/services/snapshotter/client_test.go @@ -58,6 +58,12 @@ func TestClient_MetastoreBackup_InvalidMetadata(t *testing.T) { return } + var typ [1]byte + if _, err := conn.Read(typ[:]); err != nil { + t.Errorf("unable to read typ header: %s", err) + return + } + var m map[string]interface{} dec := json.NewDecoder(conn) if err := dec.Decode(&m); err != nil { diff --git a/services/snapshotter/service.go b/services/snapshotter/service.go index 5abde36217..6053e3ed99 100644 --- a/services/snapshotter/service.go +++ b/services/snapshotter/service.go @@ -41,6 +41,7 @@ type Service struct { TSDBStore interface { BackupShard(id uint64, since time.Time, w io.Writer) error + ExportShard(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error Shard(id uint64) *tsdb.Shard ShardRelativePath(id uint64) (string, error) } @@ -68,7 +69,9 @@ func (s *Service) Open() error { // Close implements the Service interface. func (s *Service) Close() error { if s.Listener != nil { - s.Listener.Close() + if err := s.Listener.Close(); err != nil { + return err + } } s.wg.Wait() return nil @@ -86,6 +89,7 @@ func (s *Service) serve() { for { // Wait for next connection. conn, err := s.Listener.Accept() + if err != nil && strings.Contains(err.Error(), "connection closed") { s.Logger.Info("snapshot listener closed") return @@ -108,24 +112,35 @@ func (s *Service) serve() { // handleConn processes conn. This is run in a separate goroutine. func (s *Service) handleConn(conn net.Conn) error { + var typ [1]byte + + _, err := conn.Read(typ[:]) + if err != nil { + return err + } + r, err := s.readRequest(conn) if err != nil { return fmt.Errorf("read request: %s", err) } - switch r.Type { + switch RequestType(typ[0]) { case RequestShardBackup: if err := s.TSDBStore.BackupShard(r.ShardID, r.Since, conn); err != nil { return err } + case RequestShardExport: + if err := s.TSDBStore.ExportShard(r.ShardID, r.ExportStart, r.ExportEnd, conn); err != nil { + return err + } case RequestMetastoreBackup: if err := s.writeMetaStore(conn); err != nil { return err } case RequestDatabaseInfo: - return s.writeDatabaseInfo(conn, r.Database) + return s.writeDatabaseInfo(conn, r.BackupDatabase) case RequestRetentionPolicyInfo: - return s.writeRetentionPolicyInfo(conn, r.Database, r.RetentionPolicy) + return s.writeRetentionPolicyInfo(conn, r.BackupDatabase, r.BackupRetentionPolicy) default: return fmt.Errorf("request type unknown: %v", r.Type) } @@ -136,6 +151,7 @@ func (s *Service) handleConn(conn net.Conn) error { func (s *Service) writeMetaStore(conn net.Conn) error { // Retrieve and serialize the current meta data. metaBlob, err := s.MetaClient.MarshalBinary() + if err != nil { return fmt.Errorf("marshal meta: %s", err) } @@ -174,29 +190,37 @@ func (s *Service) writeMetaStore(conn net.Conn) error { // this server into the connection. func (s *Service) writeDatabaseInfo(conn net.Conn, database string) error { res := Response{} - db := s.MetaClient.Database(database) - if db == nil { - return influxdb.ErrDatabaseNotFound(database) + dbs := []meta.DatabaseInfo{} + if database != "" { + db := s.MetaClient.Database(database) + if db == nil { + return influxdb.ErrDatabaseNotFound(database) + } + dbs = append(dbs, *db) + } else { + // we'll allow collecting info on all databases + dbs = s.MetaClient.(*meta.Client).Databases() } - for _, rp := range db.RetentionPolicies { - for _, sg := range rp.ShardGroups { - for _, sh := range sg.Shards { - // ignore if the shard isn't on the server - if s.TSDBStore.Shard(sh.ID) == nil { - continue - } + for _, db := range dbs { + for _, rp := range db.RetentionPolicies { + for _, sg := range rp.ShardGroups { + for _, sh := range sg.Shards { + // ignore if the shard isn't on the server + if s.TSDBStore.Shard(sh.ID) == nil { + continue + } - path, err := s.TSDBStore.ShardRelativePath(sh.ID) - if err != nil { - return err - } + path, err := s.TSDBStore.ShardRelativePath(sh.ID) + if err != nil { + return err + } - res.Paths = append(res.Paths, path) + res.Paths = append(res.Paths, path) + } } } } - if err := json.NewEncoder(conn).Encode(res); err != nil { return fmt.Errorf("encode response: %s", err.Error()) } @@ -273,16 +297,33 @@ const ( // RequestRetentionPolicyInfo represents a request for retention policy info. RequestRetentionPolicyInfo + + // RequestShardExport represents a request to export Shard data. Similar to a backup, but shards + // may be filtered based on the start/end times on each block. + RequestShardExport + + // RequestMetaStoreUpdate represents a request to upload a metafile that will be used to do a live update + // to the existing metastore. + RequestMetaStoreUpdate + + // RequestShardUpdate will initiate the upload of a shard data tar file + // and have the engine import the data. + RequestShardUpdate ) // Request represents a request for a specific backup or for information // about the shards on this server for a database or retention policy. type Request struct { - Type RequestType - Database string - RetentionPolicy string - ShardID uint64 - Since time.Time + Type RequestType + BackupDatabase string + RestoreDatabase string + BackupRetentionPolicy string + RestoreRetentionPolicy string + ShardID uint64 + Since time.Time + ExportStart time.Time + ExportEnd time.Time + UploadSize int64 } // Response contains the relative paths for all the shards on this server diff --git a/services/snapshotter/service_test.go b/services/snapshotter/service_test.go index ab23d71780..203807f7e5 100644 --- a/services/snapshotter/service_test.go +++ b/services/snapshotter/service_test.go @@ -132,6 +132,10 @@ func TestSnapshotter_RequestShardBackup(t *testing.T) { Since: time.Unix(0, 0), } conn.Write([]byte{snapshotter.MuxHeader}) + _, err = conn.Write([]byte{byte(req.Type)}) + if err != nil { + t.Errorf("could not encode request type to conn: %v", err) + } enc := json.NewEncoder(conn) if err := enc.Encode(&req); err != nil { t.Errorf("unable to encode request: %s", err) @@ -222,10 +226,14 @@ func TestSnapshotter_RequestDatabaseInfo(t *testing.T) { defer conn.Close() req := snapshotter.Request{ - Type: snapshotter.RequestDatabaseInfo, - Database: "db0", + Type: snapshotter.RequestDatabaseInfo, + BackupDatabase: "db0", } conn.Write([]byte{snapshotter.MuxHeader}) + _, err = conn.Write([]byte{byte(req.Type)}) + if err != nil { + t.Errorf("could not encode request type to conn: %v", err) + } enc := json.NewEncoder(conn) if err := enc.Encode(&req); err != nil { t.Errorf("unable to encode request: %s", err) @@ -272,10 +280,14 @@ func TestSnapshotter_RequestDatabaseInfo_ErrDatabaseNotFound(t *testing.T) { defer conn.Close() req := snapshotter.Request{ - Type: snapshotter.RequestDatabaseInfo, - Database: "doesnotexist", + Type: snapshotter.RequestDatabaseInfo, + BackupDatabase: "doesnotexist", } conn.Write([]byte{snapshotter.MuxHeader}) + _, err = conn.Write([]byte{byte(req.Type)}) + if err != nil { + t.Errorf("could not encode request type to conn: %v", err) + } enc := json.NewEncoder(conn) if err := enc.Encode(&req); err != nil { t.Errorf("unable to encode request: %s", err) @@ -332,11 +344,15 @@ func TestSnapshotter_RequestRetentionPolicyInfo(t *testing.T) { defer conn.Close() req := snapshotter.Request{ - Type: snapshotter.RequestRetentionPolicyInfo, - Database: "db0", - RetentionPolicy: "rp0", + Type: snapshotter.RequestRetentionPolicyInfo, + BackupDatabase: "db0", + BackupRetentionPolicy: "rp0", } conn.Write([]byte{snapshotter.MuxHeader}) + _, err = conn.Write([]byte{byte(req.Type)}) + if err != nil { + t.Errorf("could not encode request type to conn: %v", err) + } enc := json.NewEncoder(conn) if err := enc.Encode(&req); err != nil { t.Errorf("unable to encode request: %s", err) diff --git a/tsdb/engine.go b/tsdb/engine.go index 55858737aa..ba27d6ff2e 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -42,6 +42,7 @@ type Engine interface { CreateSnapshot() (string, error) Backup(w io.Writer, basePath string, since time.Time) error + Export(w io.Writer, basePath string, start time.Time, end time.Time) error Restore(r io.Reader, basePath string) error Import(r io.Reader, basePath string) error diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 63b6a72584..434d9e93ae 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -776,6 +776,142 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error { return nil } +func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.Time) error { + path, err := e.CreateSnapshot() + if err != nil { + return err + } + + // Remove the temporary snapshot dir + defer os.RemoveAll(path) + if err := e.index.SnapshotTo(path); err != nil { + return err + } + + tw := tar.NewWriter(w) + defer tw.Close() + + // Recursively read all files from path. + files, err := readDir(path, "") + if err != nil { + return err + } + + for _, file := range files { + if !strings.HasSuffix(file, ".tsm") { + if err := e.writeFileToBackup(file, basePath, filepath.Join(path, file), tw); err != nil { + return err + } + } + + var tombstonePath string + f, err := os.Open(filepath.Join(path, file)) + if err != nil { + return err + } + r, err := NewTSMReader(f) + if err != nil { + return err + } + + // Grab the tombstone file if one exists. + if r.HasTombstones() { + tombstonePath = filepath.Base(r.TombstoneFiles()[0].Path) + } + + min, max := r.TimeRange() + stun := start.UnixNano() + eun := end.UnixNano() + + // We overlap time ranges, we need to filter the file + if min >= stun && min <= eun && max > eun || // overlap to the right + max >= stun && max <= eun && min < stun || // overlap to the left + min <= stun && max >= eun { // TSM file has a range LARGER than the boundary + err := e.filterFileToBackup(r, file, basePath, filepath.Join(path, file), start.UnixNano(), end.UnixNano(), tw) + if err != nil { + if err := r.Close(); err != nil { + return err + } + return err + } + + } + + // above is the only case where we need to keep the reader open. + if err := r.Close(); err != nil { + return err + } + + // the TSM file is 100% inside the range, so we can just write it without scanning each block + if min >= start.UnixNano() && max <= end.UnixNano() { + if err := e.writeFileToBackup(file, basePath, filepath.Join(path, file), tw); err != nil { + return err + } + } + + // if this TSM file had a tombstone we'll write out the whole thing too. + if tombstonePath != "" { + if err := e.writeFileToBackup(tombstonePath, basePath, filepath.Join(path, tombstonePath), tw); err != nil { + return err + } + } + + } + + return nil +} + +func (e *Engine) filterFileToBackup(r *TSMReader, name, shardRelativePath, fullPath string, start, end int64, tw *tar.Writer) error { + path := fullPath + ".tmp" + out, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return err + } + defer os.Remove(path) + + w, err := NewTSMWriter(out) + if err != nil { + return err + } + defer w.Close() + + // implicit else: here we iterate over the blocks and only keep the ones we really want. + var bi *BlockIterator + bi = r.BlockIterator() + + for bi.Next() { + // not concerned with typ or checksum since we are just blindly writing back, with no decoding + key, minTime, maxTime, _, _, buf, err := bi.Read() + if err != nil { + return err + } + if minTime >= start && minTime <= end || + maxTime >= start && maxTime <= end || + minTime <= start && maxTime >= end { + err := w.WriteBlock(key, minTime, maxTime, buf) + if err != nil { + return err + } + } + } + + if err := bi.Err(); err != nil { + return err + } + + err = w.WriteIndex() + if err != nil { + return err + } + + // make sure the whole file is out to disk + if err := w.Flush(); err != nil { + return err + } + + return e.writeFileToBackup(name, shardRelativePath, path, tw) +} + // writeFileToBackup copies the file into the tar archive. Files will use the shardRelativePath // in their names. This should be the // part of the path. func (e *Engine) writeFileToBackup(name string, shardRelativePath, fullPath string, tw *tar.Writer) error { diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index f0c8b285b7..842d414c92 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -253,6 +253,268 @@ func TestEngine_Backup(t *testing.T) { } } +func TestEngine_Export(t *testing.T) { + // Generate temporary file. + f, _ := ioutil.TempFile("", "tsm") + f.Close() + os.Remove(f.Name()) + walPath := filepath.Join(f.Name(), "wal") + os.MkdirAll(walPath, 0777) + defer os.RemoveAll(f.Name()) + + // Create a few points. + p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") + p2 := MustParsePointString("cpu,host=B value=1.2 2000000000") + p3 := MustParsePointString("cpu,host=C value=1.3 3000000000") + + // Write those points to the engine. + db := path.Base(f.Name()) + opt := tsdb.NewEngineOptions() + opt.InmemIndex = inmem.NewIndex(db) + idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), opt) + defer idx.Close() + + e := tsm1.NewEngine(1, idx, db, f.Name(), walPath, opt).(*tsm1.Engine) + + // mock the planner so compactions don't run during the test + e.CompactionPlan = &mockPlanner{} + + if err := e.Open(); err != nil { + t.Fatalf("failed to open tsm1 engine: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p1}); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + if err := e.WriteSnapshot(); err != nil { + t.Fatalf("failed to snapshot: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p2}); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + if err := e.WriteSnapshot(); err != nil { + t.Fatalf("failed to snapshot: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p3}); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + // export the whole DB + var exBuf bytes.Buffer + if err := e.Export(&exBuf, "", time.Unix(0, 0), time.Unix(0, 4000000000)); err != nil { + t.Fatalf("failed to export: %s", err.Error()) + } + + var bkBuf bytes.Buffer + if err := e.Backup(&bkBuf, "", time.Unix(0, 0)); err != nil { + t.Fatalf("failed to backup: %s", err.Error()) + } + + if len(e.FileStore.Files()) != 3 { + t.Fatalf("file count wrong: exp: %d, got: %d", 3, len(e.FileStore.Files())) + } + + fileNames := map[string]bool{} + for _, f := range e.FileStore.Files() { + fileNames[filepath.Base(f.Path())] = true + } + + fileData, err := getExportData(&exBuf) + if err != nil { + t.Errorf("Error extracting data from export: %s", err.Error()) + } + + // TEST 1: did we get any extra files not found in the store? + for k, _ := range fileData { + if _, ok := fileNames[k]; !ok { + t.Errorf("exported a file not in the store: %s", k) + } + } + + // TEST 2: did we miss any files that the store had? + for k, _ := range fileNames { + if _, ok := fileData[k]; !ok { + t.Errorf("failed to export a file from the store: %s", k) + } + } + + // TEST 3: Does 'backup' get the same files + bits? + tr := tar.NewReader(&bkBuf) + + th, err := tr.Next() + for err == nil { + expData, ok := fileData[th.Name] + if !ok { + t.Errorf("Extra file in backup: %q", th.Name) + continue + } + + buf := new(bytes.Buffer) + if _, err := io.Copy(buf, tr); err != nil { + t.Fatal(err) + } + + if !equalBuffers(expData, buf) { + t.Errorf("2Difference in data between backup and Export for file %s", th.Name) + } + + th, err = tr.Next() + } + + if t.Failed() { + t.FailNow() + } + + // TEST 4: Are subsets (1), (2), (3), (1,2), (2,3) accurately found in the larger export? + // export the whole DB + var ex1 bytes.Buffer + if err := e.Export(&ex1, "", time.Unix(0, 0), time.Unix(0, 1000000000)); err != nil { + t.Fatalf("failed to export: %s", err.Error()) + } + ex1Data, err := getExportData(&ex1) + if err != nil { + t.Errorf("Error extracting data from export: %s", err.Error()) + } + + for k, v := range ex1Data { + fullExp, ok := fileData[k] + if !ok { + t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) + continue + } + if !equalBuffers(fullExp, v) { + t.Errorf("2Difference in data between backup and Export for file %s", th.Name) + } + + } + + var ex2 bytes.Buffer + if err := e.Export(&ex2, "", time.Unix(0, 1000000001), time.Unix(0, 2000000000)); err != nil { + t.Fatalf("failed to export: %s", err.Error()) + } + + ex2Data, err := getExportData(&ex2) + if err != nil { + t.Errorf("Error extracting data from export: %s", err.Error()) + } + + for k, v := range ex2Data { + fullExp, ok := fileData[k] + if !ok { + t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) + continue + } + if !equalBuffers(fullExp, v) { + t.Errorf("2Difference in data between backup and Export for file %s", th.Name) + } + + } + + var ex3 bytes.Buffer + if err := e.Export(&ex3, "", time.Unix(0, 2000000001), time.Unix(0, 3000000000)); err != nil { + t.Fatalf("failed to export: %s", err.Error()) + } + + ex3Data, err := getExportData(&ex3) + if err != nil { + t.Errorf("Error extracting data from export: %s", err.Error()) + } + + for k, v := range ex3Data { + fullExp, ok := fileData[k] + if !ok { + t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) + continue + } + if !equalBuffers(fullExp, v) { + t.Errorf("2Difference in data between backup and Export for file %s", th.Name) + } + + } + + var ex12 bytes.Buffer + if err := e.Export(&ex12, "", time.Unix(0, 0), time.Unix(0, 2000000000)); err != nil { + t.Fatalf("failed to export: %s", err.Error()) + } + + ex12Data, err := getExportData(&ex12) + if err != nil { + t.Errorf("Error extracting data from export: %s", err.Error()) + } + + for k, v := range ex12Data { + fullExp, ok := fileData[k] + if !ok { + t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) + continue + } + if !equalBuffers(fullExp, v) { + t.Errorf("2Difference in data between backup and Export for file %s", th.Name) + } + + } + + var ex23 bytes.Buffer + if err := e.Export(&ex23, "", time.Unix(0, 1000000001), time.Unix(0, 3000000000)); err != nil { + t.Fatalf("failed to export: %s", err.Error()) + } + + ex23Data, err := getExportData(&ex23) + if err != nil { + t.Errorf("Error extracting data from export: %s", err.Error()) + } + + for k, v := range ex23Data { + fullExp, ok := fileData[k] + if !ok { + t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) + continue + } + if !equalBuffers(fullExp, v) { + t.Errorf("2Difference in data between backup and Export for file %s", th.Name) + } + + } +} + +func equalBuffers(bufA, bufB *bytes.Buffer) bool { + for i, v := range bufA.Bytes() { + if v != bufB.Bytes()[i] { + return false + } + } + return true +} + +func getExportData(exBuf *bytes.Buffer) (map[string]*bytes.Buffer, error) { + + tr := tar.NewReader(exBuf) + + fileData := make(map[string]*bytes.Buffer) + + // TEST 1: Get the bits for each file. If we got a file the store doesn't know about, report error + for { + th, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + + buf := new(bytes.Buffer) + if _, err := io.Copy(buf, tr); err != nil { + return nil, err + } + fileData[th.Name] = buf + + } + + return fileData, nil +} + // Ensure engine can create an ascending iterator for cached values. func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) { t.Parallel() diff --git a/tsdb/shard.go b/tsdb/shard.go index 7c4287ef8b..492b447d40 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -1084,6 +1084,14 @@ func (s *Shard) Backup(w io.Writer, basePath string, since time.Time) error { return engine.Backup(w, basePath, since) } +func (s *Shard) Export(w io.Writer, basePath string, start time.Time, end time.Time) error { + engine, err := s.engine() + if err != nil { + return err + } + return engine.Export(w, basePath, start, end) +} + // Restore restores data to the underlying engine for the shard. // The shard is reopened after restore. func (s *Shard) Restore(r io.Reader, basePath string) error { diff --git a/tsdb/store.go b/tsdb/store.go index ae889c46cf..6d3c6bdea2 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -796,6 +796,20 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error { return shard.Backup(w, path, since) } +func (s *Store) ExportShard(id uint64, start time.Time, end time.Time, w io.Writer) error { + shard := s.Shard(id) + if shard == nil { + return fmt.Errorf("shard %d doesn't exist on this server", id) + } + + path, err := relativePath(s.path, shard.path) + if err != nil { + return err + } + + return shard.Export(w, path, start, end) +} + // RestoreShard restores a backup from r to a given shard. // This will only overwrite files included in the backup. func (s *Store) RestoreShard(id uint64, r io.Reader) error {