diff --git a/CHANGELOG.md b/CHANGELOG.md index d550dd7252..1ddce5f840 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ ### Features - [#8495](https://github.com/influxdata/influxdb/pull/8495): Improve CLI connection warnings +- [#9146](https://github.com/influxdata/influxdb/issues/9146): Backup can produce data in the same format as the enterprise backup/restore tool. +- [#8879](https://github.com/influxdata/influxdb/issues/8879): Export functionality using start/end to filter exported data by timestamp - [#9084](https://github.com/influxdata/influxdb/pull/9084): Handle high cardinality deletes in TSM engine - [#9162](https://github.com/influxdata/influxdb/pull/9162): Improve inmem index startup performance for high cardinality. - [#8491](https://github.com/influxdata/influxdb/pull/8491): Add further tsi support for streaming/copying shards. @@ -21,6 +23,18 @@ - [#9144](https://github.com/influxdata/influxdb/issues/9144): Fix query compilation so multiple nested distinct calls is allowable - [#8789](https://github.com/influxdata/influxdb/issues/8789): Fix CLI to allow quoted database names in use statement +## v1.4.3 [unreleased] + +### Configuration Changes + +#### `[data]` Section + +The default value for `cache-snapshot-memory-size` has been changed from `25m` to `256m`. + +### Bugfixes + +- [#9201](https://github.com/influxdata/influxdb/issues/9201): Fix higher disk i/o utilization + ## v1.4.2 [2017-11-15] Refer to the 1.4.0 breaking changes section if `influxd` fails to start with an `incompatible tsi1 index MANIFEST` error. diff --git a/cmd/influxd/backup/backup.go b/cmd/influxd/backup/backup.go index ff7fea05ca..e5c4b61864 100644 --- a/cmd/influxd/backup/backup.go +++ b/cmd/influxd/backup/backup.go @@ -1,7 +1,8 @@ -// Package backup is the backup subcommand for the influxd command. +// Package backup implements both the backup and export subcommands for the influxd command. package backup import ( + "compress/gzip" "encoding/binary" "encoding/json" "errors" @@ -13,9 +14,9 @@ import ( "os" "path/filepath" "strconv" - "strings" "time" + "github.com/influxdata/influxdb/cmd/influxd/backup_util" "github.com/influxdata/influxdb/services/snapshotter" "github.com/influxdata/influxdb/tcp" "github.com/influxdata/influxdb/tsdb" @@ -43,9 +44,20 @@ type Command struct { Stderr io.Writer Stdout io.Writer - host string - path string - database string + host string + path string + database string + retentionPolicy string + shardID string + + isBackup bool + since time.Time + start time.Time + end time.Time + + enterprise bool + manifest backup_util.Manifest + enterpriseFileBase string } // NewCommand returns a new instance of Command with default settings. @@ -63,100 +75,230 @@ func (cmd *Command) Run(args ...string) error { cmd.StderrLogger = log.New(cmd.Stderr, "", log.LstdFlags) // Parse command line arguments. - retentionPolicy, shardID, since, err := cmd.parseFlags(args) + err := cmd.parseFlags(args) if err != nil { return err } - // based on the arguments passed in we only backup the minimum - if shardID != "" { + if cmd.shardID != "" { // always backup the metastore if err := cmd.backupMetastore(); err != nil { return err } - err = cmd.backupShard(retentionPolicy, shardID, since) - } else if retentionPolicy != "" { - err = cmd.backupRetentionPolicy(retentionPolicy, since) + err = cmd.backupShard(cmd.database, cmd.retentionPolicy, cmd.shardID) + + } else if cmd.retentionPolicy != "" { + // always backup the metastore + if err := cmd.backupMetastore(); err != nil { + return err + } + err = cmd.backupRetentionPolicy() } else if cmd.database != "" { - err = cmd.backupDatabase(since) + // always backup the metastore + if err := cmd.backupMetastore(); err != nil { + return err + } + err = cmd.backupDatabase() } else { - err = cmd.backupMetastore() + // always backup the metastore + if err := cmd.backupMetastore(); err != nil { + return err + } + + cmd.StdoutLogger.Println("No database, retention policy or shard ID given. Full meta store backed up.") + if cmd.enterprise { + cmd.StdoutLogger.Println("Backing up all databases in enterprise format") + if err := cmd.backupDatabase(); err != nil { + cmd.StderrLogger.Printf("backup failed: %v", err) + return err + } + + } + + } + + if cmd.enterprise { + cmd.manifest.Platform = "OSS" + filename := cmd.enterpriseFileBase + ".manifest" + if err := cmd.manifest.Save(filepath.Join(cmd.path, filename)); err != nil { + cmd.StderrLogger.Printf("manifest save failed: %v", err) + return err + } } if err != nil { cmd.StderrLogger.Printf("backup failed: %v", err) return err } - cmd.StdoutLogger.Println("backup complete") return nil } // parseFlags parses and validates the command line arguments into a request object. -func (cmd *Command) parseFlags(args []string) (retentionPolicy, shardID string, since time.Time, err error) { +func (cmd *Command) parseFlags(args []string) (err error) { fs := flag.NewFlagSet("", flag.ContinueOnError) fs.StringVar(&cmd.host, "host", "localhost:8088", "") fs.StringVar(&cmd.database, "database", "", "") - fs.StringVar(&retentionPolicy, "retention", "", "") - fs.StringVar(&shardID, "shard", "", "") + fs.StringVar(&cmd.retentionPolicy, "retention", "", "") + fs.StringVar(&cmd.shardID, "shard", "", "") var sinceArg string + var startArg string + var endArg string fs.StringVar(&sinceArg, "since", "", "") + fs.StringVar(&startArg, "start", "", "") + fs.StringVar(&endArg, "end", "", "") + fs.BoolVar(&cmd.enterprise, "enterprise", false, "") fs.SetOutput(cmd.Stderr) fs.Usage = cmd.printUsage err = fs.Parse(args) if err != nil { - return + return err } + + // for enterprise saving, if needed + cmd.enterpriseFileBase = time.Now().UTC().Format(backup_util.EnterpriseFileNamePattern) + + // if startArg and endArg are unspecified, then assume we are doing a full backup of the DB + cmd.isBackup = startArg == "" && endArg == "" + if sinceArg != "" { - since, err = time.Parse(time.RFC3339, sinceArg) + cmd.since, err = time.Parse(time.RFC3339, sinceArg) if err != nil { - return + return err + } + } + if startArg != "" { + if cmd.isBackup { + return errors.New("backup command uses one of -since or -start/-end") + } + cmd.start, err = time.Parse(time.RFC3339, startArg) + if err != nil { + return err + } + } + + if endArg != "" { + if cmd.isBackup { + return errors.New("backup command uses one of -since or -start/-end") + } + cmd.end, err = time.Parse(time.RFC3339, endArg) + if err != nil { + return err + } + + // start should be < end + if !cmd.start.Before(cmd.end) { + return errors.New("start date must be before end date") } } // Ensure that only one arg is specified. - if fs.NArg() == 0 { - return "", "", time.Unix(0, 0), errors.New("backup destination path required") - } else if fs.NArg() != 1 { - return "", "", time.Unix(0, 0), errors.New("only one backup path allowed") + if fs.NArg() != 1 { + return errors.New("Exactly one backup path is required.") } cmd.path = fs.Arg(0) err = os.MkdirAll(cmd.path, 0700) - return + return err } -// backupShard will write a tar archive of the passed in shard with any TSM files that have been -// created since the time passed in -func (cmd *Command) backupShard(retentionPolicy string, shardID string, since time.Time) error { - id, err := strconv.ParseUint(shardID, 10, 64) +func (cmd *Command) backupShard(db, rp, sid string) error { + reqType := snapshotter.RequestShardBackup + if !cmd.isBackup { + reqType = snapshotter.RequestShardExport + } + + id, err := strconv.ParseUint(sid, 10, 64) if err != nil { return err } - shardArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, fmt.Sprintf(BackupFilePattern, cmd.database, retentionPolicy, id))) + shardArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, fmt.Sprintf(backup_util.BackupFilePattern, db, rp, id))) if err != nil { return err } cmd.StdoutLogger.Printf("backing up db=%v rp=%v shard=%v to %s since %s", - cmd.database, retentionPolicy, shardID, shardArchivePath, since) + db, rp, sid, shardArchivePath, cmd.since) req := &snapshotter.Request{ - Type: snapshotter.RequestShardBackup, - Database: cmd.database, - RetentionPolicy: retentionPolicy, - ShardID: id, - Since: since, + Type: reqType, + BackupDatabase: db, + BackupRetentionPolicy: rp, + ShardID: id, + Since: cmd.since, + ExportStart: cmd.start, + ExportEnd: cmd.end, } // TODO: verify shard backup data - return cmd.downloadAndVerify(req, shardArchivePath, nil) + err = cmd.downloadAndVerify(req, shardArchivePath, nil) + + if err != nil { + return err + } + + if cmd.enterprise { + f, err := os.Open(shardArchivePath) + defer f.Close() + defer os.Remove(shardArchivePath) + + filePrefix := cmd.enterpriseFileBase + ".s" + sid + filename := filePrefix + ".tar.gz" + out, err := os.OpenFile(filepath.Join(cmd.path, filename), os.O_CREATE|os.O_RDWR, 0600) + + zw := gzip.NewWriter(out) + zw.Name = filePrefix + ".tar" + + cw := backup_util.CountingWriter{Writer: zw} + + _, err = io.Copy(&cw, f) + if err != nil { + if err := zw.Close(); err != nil { + return err + } + + if err := out.Close(); err != nil { + return err + } + return err + } + + shardid, err := strconv.ParseUint(sid, 10, 64) + if err != nil { + if err := zw.Close(); err != nil { + return err + } + + if err := out.Close(); err != nil { + return err + } + return err + } + cmd.manifest.Files = append(cmd.manifest.Files, backup_util.Entry{ + Database: db, + Policy: rp, + ShardID: shardid, + FileName: filename, + Size: cw.Total, + LastModified: 0, + }) + + if err := zw.Close(); err != nil { + return err + } + + if err := out.Close(); err != nil { + return err + } + } + return nil + } // backupSeriesFile will write a tar archive of the series file for the database. @@ -168,20 +310,20 @@ func (cmd *Command) backupSeriesFile() error { cmd.StdoutLogger.Printf("backing up series file to %s", seriesFileArchivePath) req := &snapshotter.Request{ - Type: snapshotter.RequestSeriesFileBackup, - Database: cmd.database, + Type: snapshotter.RequestSeriesFileBackup, + BackupDatabase: cmd.database, } return cmd.downloadAndVerify(req, seriesFileArchivePath, nil) } -// backupDatabase will request the database information from the server and then backup the metastore and -// every shard in every retention policy in the database. Each shard will be written to a separate tar. -func (cmd *Command) backupDatabase(since time.Time) error { - cmd.StdoutLogger.Printf("backing up db=%s since %s", cmd.database, since) +// backupDatabase will request the database information from the server and then backup +// every shard in every retention policy in the database. Each shard will be written to a separate file. +func (cmd *Command) backupDatabase() error { + cmd.StdoutLogger.Printf("backing up db=%s", cmd.database) req := &snapshotter.Request{ - Type: snapshotter.RequestDatabaseInfo, - Database: cmd.database, + Type: snapshotter.RequestDatabaseInfo, + BackupDatabase: cmd.database, } response, err := cmd.requestInfo(req) @@ -189,18 +331,18 @@ func (cmd *Command) backupDatabase(since time.Time) error { return err } - return cmd.backupResponsePaths(response, since) + return cmd.backupResponsePaths(response) } // backupRetentionPolicy will request the retention policy information from the server and then backup -// the metastore and every shard in the retention policy. Each shard will be written to a separate tar. -func (cmd *Command) backupRetentionPolicy(retentionPolicy string, since time.Time) error { - cmd.StdoutLogger.Printf("backing up rp=%s since %s", retentionPolicy, since) +// every shard in the retention policy. Each shard will be written to a separate file. +func (cmd *Command) backupRetentionPolicy() error { + cmd.StdoutLogger.Printf("backing up rp=%s since %s", cmd.retentionPolicy, cmd.since) req := &snapshotter.Request{ - Type: snapshotter.RequestRetentionPolicyInfo, - Database: cmd.database, - RetentionPolicy: retentionPolicy, + Type: snapshotter.RequestRetentionPolicyInfo, + BackupDatabase: cmd.database, + BackupRetentionPolicy: cmd.retentionPolicy, } response, err := cmd.requestInfo(req) @@ -208,23 +350,22 @@ func (cmd *Command) backupRetentionPolicy(retentionPolicy string, since time.Tim return err } - return cmd.backupResponsePaths(response, since) + return cmd.backupResponsePaths(response) } -// backupResponsePaths will backup the metastore and all shard paths in the response struct -func (cmd *Command) backupResponsePaths(response *snapshotter.Response, since time.Time) error { - if err := cmd.backupMetastore(); err != nil { - return err - } +// backupResponsePaths will backup all shards identified by shard paths in the response struct +func (cmd *Command) backupResponsePaths(response *snapshotter.Response) error { // loop through the returned paths and back up each shard for _, path := range response.Paths { - rp, id, err := retentionAndShardFromPath(path) + db, rp, id, err := backup_util.DBRetentionAndShardFromPath(path) if err != nil { return err } - if err := cmd.backupShard(rp, id, since); err != nil { + err = cmd.backupShard(db, rp, id) + + if err != nil { return err } } @@ -236,10 +377,10 @@ func (cmd *Command) backupResponsePaths(response *snapshotter.Response, since ti return nil } -// backupMetastore will backup the metastore on the host to the passed in path. Database and retention policy backups -// will force a backup of the metastore as well as requesting a specific shard backup from the command line +// backupMetastore will backup the whole metastore on the host to the backup path +// if useDB is non-empty, it will backup metadata only for the named database. func (cmd *Command) backupMetastore() error { - metastoreArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, Metafile)) + metastoreArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, backup_util.Metafile)) if err != nil { return err } @@ -250,13 +391,24 @@ func (cmd *Command) backupMetastore() error { Type: snapshotter.RequestMetastoreBackup, } - return cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error { - binData, err := ioutil.ReadFile(file) + err = cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error { + f, err := os.Open(file) + if err != nil { + return err + } + defer f.Close() + + var magicByte [8]byte + n, err := io.ReadFull(f, magicByte[:]) if err != nil { return err } - magic := binary.BigEndian.Uint64(binData[:8]) + if n < 8 { + return errors.New("Not enough bytes data to verify") + } + + magic := binary.BigEndian.Uint64(magicByte[:]) if magic != snapshotter.BackupMagicHeader { cmd.StderrLogger.Println("Invalid metadata blob, ensure the metadata service is running (default port 8088)") return errors.New("invalid metadata received") @@ -264,6 +416,28 @@ func (cmd *Command) backupMetastore() error { return nil }) + + if err != nil { + return err + } + + if cmd.enterprise { + metaBytes, err := backup_util.GetMetaBytes(metastoreArchivePath) + defer os.Remove(metastoreArchivePath) + if err != nil { + return err + } + filename := cmd.enterpriseFileBase + ".meta" + if err := ioutil.WriteFile(filepath.Join(cmd.path, filename), metaBytes, 0644); err != nil { + fmt.Fprintln(cmd.Stdout, "Error.") + return err + } + + cmd.manifest.Meta.FileName = filename + cmd.manifest.Meta.Size = int64(len(metaBytes)) + } + + return nil } // nextPath returns the next file to write to. @@ -282,8 +456,7 @@ func (cmd *Command) nextPath(path string) (string, error) { // downloadAndVerify will download either the metastore or shard to a temp file and then // rename it to a good backup file name after complete func (cmd *Command) downloadAndVerify(req *snapshotter.Request, path string, validator func(string) error) error { - - tmppath := path + Suffix + tmppath := path + backup_util.Suffix if err := cmd.download(req, tmppath); err != nil { return err } @@ -333,6 +506,11 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error { } defer conn.Close() + _, err = conn.Write([]byte{byte(req.Type)}) + if err != nil { + return err + } + // Write the request if err := json.NewEncoder(conn).Encode(req); err != nil { return fmt.Errorf("encode snapshot request: %s", err) @@ -357,11 +535,16 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error { // requestInfo will request the database or retention policy information from the host func (cmd *Command) requestInfo(request *snapshotter.Request) (*snapshotter.Response, error) { // Connect to snapshotter service. + var r snapshotter.Response conn, err := tcp.Dial("tcp", cmd.host, snapshotter.MuxHeader) if err != nil { return nil, err } defer conn.Close() + _, err = conn.Write([]byte{byte(request.Type)}) + if err != nil { + return &r, err + } // Write the request if err := json.NewEncoder(conn).Encode(request); err != nil { @@ -369,7 +552,7 @@ func (cmd *Command) requestInfo(request *snapshotter.Request) (*snapshotter.Resp } // Read the response - var r snapshotter.Response + if err := json.NewDecoder(conn).Decode(&r); err != nil { return nil, err } @@ -379,7 +562,7 @@ func (cmd *Command) requestInfo(request *snapshotter.Request) (*snapshotter.Resp // printUsage prints the usage message to STDERR. func (cmd *Command) printUsage() { - fmt.Fprintf(cmd.Stdout, `Downloads a snapshot of a data node and saves it to disk. + fmt.Fprintf(cmd.Stdout, `Downloads a file level age-based snapshot of a data node and saves it to disk. Usage: influxd backup [flags] PATH @@ -393,18 +576,14 @@ Usage: influxd backup [flags] PATH Optional. The shard id to backup. If specified, retention is required. -since <2015-12-24T08:12:23Z> Optional. Do an incremental backup since the passed in RFC3339 - formatted time. + formatted time. Not compatible with -start or -end. + -start <2015-12-24T08:12:23Z> + All points earlier than this time stamp will be excluded from the export. Not compatible with -since. + -end <2015-12-24T08:12:23Z> + All points later than this time stamp will be excluded from the export. Not compatible with -since. + -enterprise + Generate backup files in the format used for influxdb enterprise. `) -} -// retentionAndShardFromPath will take the shard relative path and split it into the -// retention policy name and shard ID. The first part of the path should be the database name. -func retentionAndShardFromPath(path string) (retention, shard string, err error) { - a := strings.Split(path, string(filepath.Separator)) - if len(a) != 3 { - return "", "", fmt.Errorf("expected database, retention policy, and shard id in path: %s", path) - } - - return a[1], a[2], nil } diff --git a/cmd/influxd/backup_util/backup_util.go b/cmd/influxd/backup_util/backup_util.go new file mode 100644 index 0000000000..9c2a36d43a --- /dev/null +++ b/cmd/influxd/backup_util/backup_util.go @@ -0,0 +1,146 @@ +package backup_util + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "os" + "strings" + + "encoding/json" + "github.com/influxdata/influxdb/services/snapshotter" + "io/ioutil" + "path/filepath" +) + +const ( + // Suffix is a suffix added to the backup while it's in-process. + Suffix = ".pending" + + // Metafile is the base name given to the metastore backups. + Metafile = "meta" + + // BackupFilePattern is the beginning of the pattern for a backup + // file. They follow the scheme ... + BackupFilePattern = "%s.%s.%05d" + + EnterpriseFileNamePattern = "20060102T150405Z" + + OSSManifest = "OSS" + + ENTManifest = "ENT" +) + +func GetMetaBytes(fname string) ([]byte, error) { + f, err := os.Open(fname) + if err != nil { + return []byte{}, err + } + + var buf bytes.Buffer + if _, err := io.Copy(&buf, f); err != nil { + return []byte{}, fmt.Errorf("copy: %s", err) + } + + b := buf.Bytes() + var i int + + // Make sure the file is actually a meta store backup file + magic := binary.BigEndian.Uint64(b[:8]) + if magic != snapshotter.BackupMagicHeader { + return []byte{}, fmt.Errorf("invalid metadata file") + } + i += 8 + + // Size of the meta store bytes + length := int(binary.BigEndian.Uint64(b[i : i+8])) + i += 8 + metaBytes := b[i : i+length] + + return metaBytes, nil +} + +// Manifest lists the meta and shard file information contained in the backup. +// If Limited is false, the manifest contains a full backup, otherwise +// it is a partial backup. +type Manifest struct { + Platform string `json:"platform"` + Meta MetaEntry `json:"meta"` + Limited bool `json:"limited"` + Files []Entry `json:"files"` + + // If limited is true, then one (or all) of the following fields will be set + + Database string `json:"database,omitempty"` + Policy string `json:"policy,omitempty"` + ShardID uint64 `json:"shard_id,omitempty"` +} + +// Entry contains the data information for a backed up shard. +type Entry struct { + Database string `json:"database"` + Policy string `json:"policy"` + ShardID uint64 `json:"shardID"` + FileName string `json:"fileName"` + Size int64 `json:"size"` + LastModified int64 `json:"lastModified"` +} + +func (e *Entry) SizeOrZero() int64 { + if e == nil { + return 0 + } + return e.Size +} + +// MetaEntry contains the meta store information for a backup. +type MetaEntry struct { + FileName string `json:"fileName"` + Size int64 `json:"size"` +} + +// Size returns the size of the manifest. +func (m *Manifest) Size() int64 { + if m == nil { + return 0 + } + + size := m.Meta.Size + + for _, f := range m.Files { + size += f.Size + } + return size +} + +func (manifest *Manifest) Save(filename string) error { + b, err := json.MarshalIndent(manifest, "", " ") + if err != nil { + return fmt.Errorf("create manifest: %v", err) + } + + return ioutil.WriteFile(filename, b, 0600) +} + +type CountingWriter struct { + io.Writer + Total int64 // Total # of bytes transferred +} + +func (w *CountingWriter) Write(p []byte) (n int, err error) { + n, err = w.Writer.Write(p) + w.Total += int64(n) + return +} + +// retentionAndShardFromPath will take the shard relative path and split it into the +// retention policy name and shard ID. The first part of the path should be the database name. +func DBRetentionAndShardFromPath(path string) (db, retention, shard string, err error) { + a := strings.Split(path, string(filepath.Separator)) + if len(a) != 3 { + return "", "", "", fmt.Errorf("expected database, retention policy, and shard id in path: %s", path) + } + + return a[0], a[1], a[2], nil +} diff --git a/cmd/influxd/restore/restore.go b/cmd/influxd/restore/restore.go index b4e1446cfc..ac3b872caa 100644 --- a/cmd/influxd/restore/restore.go +++ b/cmd/influxd/restore/restore.go @@ -14,7 +14,7 @@ import ( "path/filepath" "strconv" - "github.com/influxdata/influxdb/cmd/influxd/backup" + "github.com/influxdata/influxdb/cmd/influxd/backup_util" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/services/snapshotter" "github.com/influxdata/influxdb/tsdb" @@ -157,7 +157,7 @@ func (cmd *Command) unpackSeriesFile() error { // cluster and replaces the root metadata. func (cmd *Command) unpackMeta() error { // find the meta file - metaFiles, err := filepath.Glob(filepath.Join(cmd.backupFilesPath, backup.Metafile+".*")) + metaFiles, err := filepath.Glob(filepath.Join(cmd.backupFilesPath, backup_util.Metafile+".*")) if err != nil { return err } @@ -268,7 +268,7 @@ func (cmd *Command) unpackShard(shardID string) error { } // find the shard backup files - pat := filepath.Join(cmd.backupFilesPath, fmt.Sprintf(backup.BackupFilePattern, cmd.database, cmd.retention, id)) + pat := filepath.Join(cmd.backupFilesPath, fmt.Sprintf(backup_util.BackupFilePattern, cmd.database, cmd.retention, id)) return cmd.unpackFiles(pat + ".*") } diff --git a/etc/config.sample.toml b/etc/config.sample.toml index c56eb1ca63..5f9a4e8f78 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -79,7 +79,7 @@ # snapshot the cache and write it to a TSM file, freeing up memory # Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k). # Values without a size suffix are in bytes. - # cache-snapshot-memory-size = "25m" + # cache-snapshot-memory-size = "256m" # CacheSnapshotWriteColdDuration is the length of time at # which the engine will snapshot the cache and write it to diff --git a/internal/tsdb_store.go b/internal/tsdb_store.go index 119c6e7c06..e2f27ca1a8 100644 --- a/internal/tsdb_store.go +++ b/internal/tsdb_store.go @@ -15,6 +15,7 @@ import ( type TSDBStoreMock struct { BackupShardFn func(id uint64, since time.Time, w io.Writer) error BackupSeriesFileFn func(database string, w io.Writer) error + ExportShardFn func(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error CloseFn func() error CreateShardFn func(database, policy string, shardID uint64, enabled bool) error CreateShardSnapshotFn func(id uint64) (string, error) @@ -54,6 +55,9 @@ func (s *TSDBStoreMock) BackupShard(id uint64, since time.Time, w io.Writer) err func (s *TSDBStoreMock) BackupSeriesFile(database string, w io.Writer) error { return s.BackupSeriesFileFn(database, w) } +func (s *TSDBStoreMock) ExportShard(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error { + return s.ExportShardFn(id, ExportStart, ExportEnd, w) +} func (s *TSDBStoreMock) Close() error { return s.CloseFn() } func (s *TSDBStoreMock) CreateShard(database string, retentionPolicy string, shardID uint64, enabled bool) error { return s.CreateShardFn(database, retentionPolicy, shardID, enabled) diff --git a/man/influxd-backup.txt b/man/influxd-backup.txt index d7177ba3b3..188a2b09ab 100644 --- a/man/influxd-backup.txt +++ b/man/influxd-backup.txt @@ -5,6 +5,7 @@ NAME ---- influxd-backup - Downloads a snapshot of a data node and saves it to disk + SYNOPSIS -------- 'influxd backup' [options] @@ -30,6 +31,13 @@ OPTIONS -since <2015-12-24T08:12:13Z>:: Do an incremental backup since the passed in time. The time needs to be in the RFC3339 format. Optional. +-start <2015-12-24T08:12:23Z>:: + All points earlier than this time stamp will be excluded from the export. Not compatible with -since. +-end <2015-12-24T08:12:23Z>:: + All points later than this time stamp will be excluded from the export. Not compatible with -since. +-enterprise:: + Generate backup files in the format used for influxdb enterprise. + SEE ALSO -------- *influxd-restore*(1) diff --git a/services/snapshotter/client.go b/services/snapshotter/client.go index ac6c87411b..2c1e1b8ada 100644 --- a/services/snapshotter/client.go +++ b/services/snapshotter/client.go @@ -64,6 +64,10 @@ func (c *Client) doRequest(req *Request) ([]byte, error) { defer conn.Close() // Write the request + _, err = conn.Write([]byte{byte(req.Type)}) + if err != nil { + return nil, err + } if err := json.NewEncoder(conn).Encode(req); err != nil { return nil, fmt.Errorf("encode snapshot request: %s", err) } diff --git a/services/snapshotter/client_test.go b/services/snapshotter/client_test.go index ee5c5f2a73..b4427ee16d 100644 --- a/services/snapshotter/client_test.go +++ b/services/snapshotter/client_test.go @@ -58,6 +58,12 @@ func TestClient_MetastoreBackup_InvalidMetadata(t *testing.T) { return } + var typ [1]byte + if _, err := conn.Read(typ[:]); err != nil { + t.Errorf("unable to read typ header: %s", err) + return + } + var m map[string]interface{} dec := json.NewDecoder(conn) if err := dec.Decode(&m); err != nil { diff --git a/services/snapshotter/service.go b/services/snapshotter/service.go index fa02218f92..452beb5b94 100644 --- a/services/snapshotter/service.go +++ b/services/snapshotter/service.go @@ -42,6 +42,7 @@ type Service struct { TSDBStore interface { BackupShard(id uint64, since time.Time, w io.Writer) error BackupSeriesFile(database string, w io.Writer) error + ExportShard(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error Shard(id uint64) *tsdb.Shard ShardRelativePath(id uint64) (string, error) } @@ -69,7 +70,9 @@ func (s *Service) Open() error { // Close implements the Service interface. func (s *Service) Close() error { if s.Listener != nil { - s.Listener.Close() + if err := s.Listener.Close(); err != nil { + return err + } } s.wg.Wait() return nil @@ -87,6 +90,7 @@ func (s *Service) serve() { for { // Wait for next connection. conn, err := s.Listener.Accept() + if err != nil && strings.Contains(err.Error(), "connection closed") { s.Logger.Info("snapshot listener closed") return @@ -109,28 +113,39 @@ func (s *Service) serve() { // handleConn processes conn. This is run in a separate goroutine. func (s *Service) handleConn(conn net.Conn) error { + var typ [1]byte + + _, err := conn.Read(typ[:]) + if err != nil { + return err + } + r, err := s.readRequest(conn) if err != nil { return fmt.Errorf("read request: %s", err) } - switch r.Type { + switch RequestType(typ[0]) { case RequestShardBackup: if err := s.TSDBStore.BackupShard(r.ShardID, r.Since, conn); err != nil { return err } + case RequestShardExport: + if err := s.TSDBStore.ExportShard(r.ShardID, r.ExportStart, r.ExportEnd, conn); err != nil { + return err + } case RequestMetastoreBackup: if err := s.writeMetaStore(conn); err != nil { return err } case RequestSeriesFileBackup: - if err := s.TSDBStore.BackupSeriesFile(r.Database, conn); err != nil { + if err := s.TSDBStore.BackupSeriesFile(r.BackupDatabase, conn); err != nil { return err } case RequestDatabaseInfo: - return s.writeDatabaseInfo(conn, r.Database) + return s.writeDatabaseInfo(conn, r.BackupDatabase) case RequestRetentionPolicyInfo: - return s.writeRetentionPolicyInfo(conn, r.Database, r.RetentionPolicy) + return s.writeRetentionPolicyInfo(conn, r.BackupDatabase, r.BackupRetentionPolicy) default: return fmt.Errorf("request type unknown: %v", r.Type) } @@ -141,6 +156,7 @@ func (s *Service) handleConn(conn net.Conn) error { func (s *Service) writeMetaStore(conn net.Conn) error { // Retrieve and serialize the current meta data. metaBlob, err := s.MetaClient.MarshalBinary() + if err != nil { return fmt.Errorf("marshal meta: %s", err) } @@ -179,29 +195,37 @@ func (s *Service) writeMetaStore(conn net.Conn) error { // this server into the connection. func (s *Service) writeDatabaseInfo(conn net.Conn, database string) error { res := Response{} - db := s.MetaClient.Database(database) - if db == nil { - return influxdb.ErrDatabaseNotFound(database) + dbs := []meta.DatabaseInfo{} + if database != "" { + db := s.MetaClient.Database(database) + if db == nil { + return influxdb.ErrDatabaseNotFound(database) + } + dbs = append(dbs, *db) + } else { + // we'll allow collecting info on all databases + dbs = s.MetaClient.(*meta.Client).Databases() } - for _, rp := range db.RetentionPolicies { - for _, sg := range rp.ShardGroups { - for _, sh := range sg.Shards { - // ignore if the shard isn't on the server - if s.TSDBStore.Shard(sh.ID) == nil { - continue - } + for _, db := range dbs { + for _, rp := range db.RetentionPolicies { + for _, sg := range rp.ShardGroups { + for _, sh := range sg.Shards { + // ignore if the shard isn't on the server + if s.TSDBStore.Shard(sh.ID) == nil { + continue + } - path, err := s.TSDBStore.ShardRelativePath(sh.ID) - if err != nil { - return err - } + path, err := s.TSDBStore.ShardRelativePath(sh.ID) + if err != nil { + return err + } - res.Paths = append(res.Paths, path) + res.Paths = append(res.Paths, path) + } } } } - if err := json.NewEncoder(conn).Encode(res); err != nil { return fmt.Errorf("encode response: %s", err.Error()) } @@ -281,16 +305,33 @@ const ( // RequestRetentionPolicyInfo represents a request for retention policy info. RequestRetentionPolicyInfo + + // RequestShardExport represents a request to export Shard data. Similar to a backup, but shards + // may be filtered based on the start/end times on each block. + RequestShardExport + + // RequestMetaStoreUpdate represents a request to upload a metafile that will be used to do a live update + // to the existing metastore. + RequestMetaStoreUpdate + + // RequestShardUpdate will initiate the upload of a shard data tar file + // and have the engine import the data. + RequestShardUpdate ) // Request represents a request for a specific backup or for information // about the shards on this server for a database or retention policy. type Request struct { - Type RequestType - Database string - RetentionPolicy string - ShardID uint64 - Since time.Time + Type RequestType + BackupDatabase string + RestoreDatabase string + BackupRetentionPolicy string + RestoreRetentionPolicy string + ShardID uint64 + Since time.Time + ExportStart time.Time + ExportEnd time.Time + UploadSize int64 } // Response contains the relative paths for all the shards on this server diff --git a/services/snapshotter/service_test.go b/services/snapshotter/service_test.go index ab23d71780..203807f7e5 100644 --- a/services/snapshotter/service_test.go +++ b/services/snapshotter/service_test.go @@ -132,6 +132,10 @@ func TestSnapshotter_RequestShardBackup(t *testing.T) { Since: time.Unix(0, 0), } conn.Write([]byte{snapshotter.MuxHeader}) + _, err = conn.Write([]byte{byte(req.Type)}) + if err != nil { + t.Errorf("could not encode request type to conn: %v", err) + } enc := json.NewEncoder(conn) if err := enc.Encode(&req); err != nil { t.Errorf("unable to encode request: %s", err) @@ -222,10 +226,14 @@ func TestSnapshotter_RequestDatabaseInfo(t *testing.T) { defer conn.Close() req := snapshotter.Request{ - Type: snapshotter.RequestDatabaseInfo, - Database: "db0", + Type: snapshotter.RequestDatabaseInfo, + BackupDatabase: "db0", } conn.Write([]byte{snapshotter.MuxHeader}) + _, err = conn.Write([]byte{byte(req.Type)}) + if err != nil { + t.Errorf("could not encode request type to conn: %v", err) + } enc := json.NewEncoder(conn) if err := enc.Encode(&req); err != nil { t.Errorf("unable to encode request: %s", err) @@ -272,10 +280,14 @@ func TestSnapshotter_RequestDatabaseInfo_ErrDatabaseNotFound(t *testing.T) { defer conn.Close() req := snapshotter.Request{ - Type: snapshotter.RequestDatabaseInfo, - Database: "doesnotexist", + Type: snapshotter.RequestDatabaseInfo, + BackupDatabase: "doesnotexist", } conn.Write([]byte{snapshotter.MuxHeader}) + _, err = conn.Write([]byte{byte(req.Type)}) + if err != nil { + t.Errorf("could not encode request type to conn: %v", err) + } enc := json.NewEncoder(conn) if err := enc.Encode(&req); err != nil { t.Errorf("unable to encode request: %s", err) @@ -332,11 +344,15 @@ func TestSnapshotter_RequestRetentionPolicyInfo(t *testing.T) { defer conn.Close() req := snapshotter.Request{ - Type: snapshotter.RequestRetentionPolicyInfo, - Database: "db0", - RetentionPolicy: "rp0", + Type: snapshotter.RequestRetentionPolicyInfo, + BackupDatabase: "db0", + BackupRetentionPolicy: "rp0", } conn.Write([]byte{snapshotter.MuxHeader}) + _, err = conn.Write([]byte{byte(req.Type)}) + if err != nil { + t.Errorf("could not encode request type to conn: %v", err) + } enc := json.NewEncoder(conn) if err := enc.Encode(&req); err != nil { t.Errorf("unable to encode request: %s", err) diff --git a/tsdb/config.go b/tsdb/config.go index 6ab91feaad..5041a44cae 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -26,7 +26,7 @@ const ( // DefaultCacheSnapshotMemorySize is the size at which the engine will // snapshot the cache and write it to a TSM file, freeing up memory - DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB + DefaultCacheSnapshotMemorySize = 256 * 1024 * 1024 // 256MB // DefaultCacheSnapshotWriteColdDuration is the length of time at which // the engine will snapshot the cache and write it to a new TSM file if diff --git a/tsdb/engine.go b/tsdb/engine.go index 2028110683..e0fe2d02fc 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -42,6 +42,7 @@ type Engine interface { CreateSnapshot() (string, error) Backup(w io.Writer, basePath string, since time.Time) error + Export(w io.Writer, basePath string, start time.Time, end time.Time) error Restore(r io.Reader, basePath string) error Import(r io.Reader, basePath string) error diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index acc888bc2c..57d938a993 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -263,16 +263,9 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { minGenerations = level + 1 } - // Each compaction group should run against 4 generations. For level 1, since these - // can get created much more quickly, bump the grouping to 8 to keep file counts lower. - groupSize := 4 - if level == 1 { - groupSize = 8 - } - var cGroups []CompactionGroup for _, group := range levelGroups { - for _, chunk := range group.chunk(groupSize) { + for _, chunk := range group.chunk(4) { var cGroup CompactionGroup var hasTombstones bool for _, gen := range chunk { @@ -1027,16 +1020,28 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ } func (c *Compactor) write(path string, iter KeyIterator) (err error) { - fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) + fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return errCompactionInProgress{err: err} } // Create the write for the new TSM file. - w, err := NewTSMWriter(fd) - if err != nil { - return err + var w TSMWriter + + // Use a disk based TSM buffer if it looks like we might create a big index + // in memory. + if iter.EstimatedIndexSize() > 64*1024*1024 { + w, err = NewTSMWriterWithDiskBuffer(fd) + if err != nil { + return err + } + } else { + w, err = NewTSMWriter(fd) + if err != nil { + return err + } } + defer func() { closeErr := w.Close() if err == nil { @@ -1145,6 +1150,10 @@ type KeyIterator interface { // Err returns any errors encountered during iteration. Err() error + + // EstimatedIndexSize returns the estimated size of the index that would + // be required to store all the series and entries in the KeyIterator. + EstimatedIndexSize() int } // tsmKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces @@ -1278,6 +1287,14 @@ func (k *tsmKeyIterator) hasMergedValues() bool { len(k.mergedBooleanValues) > 0 } +func (k *tsmKeyIterator) EstimatedIndexSize() int { + var size uint32 + for _, r := range k.readers { + size += r.IndexSize() + } + return int(size) / len(k.readers) +} + // Next returns true if there are any values remaining in the iterator. func (k *tsmKeyIterator) Next() bool { RETRY: @@ -1516,6 +1533,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte return cki } +func (c *cacheKeyIterator) EstimatedIndexSize() int { + // We return 0 here since we already have all the entries in memory to write an index. + return 0 +} + func (c *cacheKeyIterator) encode() { concurrency := runtime.GOMAXPROCS(0) n := len(c.ready) diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index e3608f9837..69e5b0c392 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1792,14 +1792,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { Path: "08-01.tsm1", Size: 1 * 1024 * 1024, }, - tsm1.FileStat{ - Path: "09-01.tsm1", - Size: 1 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "10-01.tsm1", - Size: 1 * 1024 * 1024, - }, } cp := tsm1.NewDefaultPlanner( @@ -1810,8 +1802,8 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} - expFiles2 := []tsm1.FileStat{data[8], data[9]} + expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} + expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { @@ -1887,8 +1879,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} - expFiles2 := []tsm1.FileStat{data[8], data[9]} + expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} + expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index ff9e8e6788..bf7b990036 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -747,6 +747,142 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error { return nil } +func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.Time) error { + path, err := e.CreateSnapshot() + if err != nil { + return err + } + + // Remove the temporary snapshot dir + defer os.RemoveAll(path) + if err := e.index.SnapshotTo(path); err != nil { + return err + } + + tw := tar.NewWriter(w) + defer tw.Close() + + // Recursively read all files from path. + files, err := readDir(path, "") + if err != nil { + return err + } + + for _, file := range files { + if !strings.HasSuffix(file, ".tsm") { + if err := e.writeFileToBackup(file, basePath, filepath.Join(path, file), tw); err != nil { + return err + } + } + + var tombstonePath string + f, err := os.Open(filepath.Join(path, file)) + if err != nil { + return err + } + r, err := NewTSMReader(f) + if err != nil { + return err + } + + // Grab the tombstone file if one exists. + if r.HasTombstones() { + tombstonePath = filepath.Base(r.TombstoneFiles()[0].Path) + } + + min, max := r.TimeRange() + stun := start.UnixNano() + eun := end.UnixNano() + + // We overlap time ranges, we need to filter the file + if min >= stun && min <= eun && max > eun || // overlap to the right + max >= stun && max <= eun && min < stun || // overlap to the left + min <= stun && max >= eun { // TSM file has a range LARGER than the boundary + err := e.filterFileToBackup(r, file, basePath, filepath.Join(path, file), start.UnixNano(), end.UnixNano(), tw) + if err != nil { + if err := r.Close(); err != nil { + return err + } + return err + } + + } + + // above is the only case where we need to keep the reader open. + if err := r.Close(); err != nil { + return err + } + + // the TSM file is 100% inside the range, so we can just write it without scanning each block + if min >= start.UnixNano() && max <= end.UnixNano() { + if err := e.writeFileToBackup(file, basePath, filepath.Join(path, file), tw); err != nil { + return err + } + } + + // if this TSM file had a tombstone we'll write out the whole thing too. + if tombstonePath != "" { + if err := e.writeFileToBackup(tombstonePath, basePath, filepath.Join(path, tombstonePath), tw); err != nil { + return err + } + } + + } + + return nil +} + +func (e *Engine) filterFileToBackup(r *TSMReader, name, shardRelativePath, fullPath string, start, end int64, tw *tar.Writer) error { + path := fullPath + ".tmp" + out, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return err + } + defer os.Remove(path) + + w, err := NewTSMWriter(out) + if err != nil { + return err + } + defer w.Close() + + // implicit else: here we iterate over the blocks and only keep the ones we really want. + var bi *BlockIterator + bi = r.BlockIterator() + + for bi.Next() { + // not concerned with typ or checksum since we are just blindly writing back, with no decoding + key, minTime, maxTime, _, _, buf, err := bi.Read() + if err != nil { + return err + } + if minTime >= start && minTime <= end || + maxTime >= start && maxTime <= end || + minTime <= start && maxTime >= end { + err := w.WriteBlock(key, minTime, maxTime, buf) + if err != nil { + return err + } + } + } + + if err := bi.Err(); err != nil { + return err + } + + err = w.WriteIndex() + if err != nil { + return err + } + + // make sure the whole file is out to disk + if err := w.Flush(); err != nil { + return err + } + + return e.writeFileToBackup(name, shardRelativePath, path, tw) +} + // writeFileToBackup copies the file into the tar archive. Files will use the shardRelativePath // in their names. This should be the // part of the path. func (e *Engine) writeFileToBackup(name string, shardRelativePath, fullPath string, tw *tar.Writer) error { @@ -1471,7 +1607,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compact(quit <-chan struct{}) { - t := time.NewTicker(time.Second) + t := time.NewTicker(5 * time.Second) defer t.Stop() for { diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 120b215b4d..54a3dfbeb2 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -256,6 +256,268 @@ func TestEngine_Backup(t *testing.T) { } } +func TestEngine_Export(t *testing.T) { + // Generate temporary file. + f, _ := ioutil.TempFile("", "tsm") + f.Close() + os.Remove(f.Name()) + walPath := filepath.Join(f.Name(), "wal") + os.MkdirAll(walPath, 0777) + defer os.RemoveAll(f.Name()) + + // Create a few points. + p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") + p2 := MustParsePointString("cpu,host=B value=1.2 2000000000") + p3 := MustParsePointString("cpu,host=C value=1.3 3000000000") + + // Write those points to the engine. + db := path.Base(f.Name()) + opt := tsdb.NewEngineOptions() + opt.InmemIndex = inmem.NewIndex(db) + idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), opt) + defer idx.Close() + + e := tsm1.NewEngine(1, idx, db, f.Name(), walPath, opt).(*tsm1.Engine) + + // mock the planner so compactions don't run during the test + e.CompactionPlan = &mockPlanner{} + + if err := e.Open(); err != nil { + t.Fatalf("failed to open tsm1 engine: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p1}); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + if err := e.WriteSnapshot(); err != nil { + t.Fatalf("failed to snapshot: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p2}); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + if err := e.WriteSnapshot(); err != nil { + t.Fatalf("failed to snapshot: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p3}); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + // export the whole DB + var exBuf bytes.Buffer + if err := e.Export(&exBuf, "", time.Unix(0, 0), time.Unix(0, 4000000000)); err != nil { + t.Fatalf("failed to export: %s", err.Error()) + } + + var bkBuf bytes.Buffer + if err := e.Backup(&bkBuf, "", time.Unix(0, 0)); err != nil { + t.Fatalf("failed to backup: %s", err.Error()) + } + + if len(e.FileStore.Files()) != 3 { + t.Fatalf("file count wrong: exp: %d, got: %d", 3, len(e.FileStore.Files())) + } + + fileNames := map[string]bool{} + for _, f := range e.FileStore.Files() { + fileNames[filepath.Base(f.Path())] = true + } + + fileData, err := getExportData(&exBuf) + if err != nil { + t.Errorf("Error extracting data from export: %s", err.Error()) + } + + // TEST 1: did we get any extra files not found in the store? + for k, _ := range fileData { + if _, ok := fileNames[k]; !ok { + t.Errorf("exported a file not in the store: %s", k) + } + } + + // TEST 2: did we miss any files that the store had? + for k, _ := range fileNames { + if _, ok := fileData[k]; !ok { + t.Errorf("failed to export a file from the store: %s", k) + } + } + + // TEST 3: Does 'backup' get the same files + bits? + tr := tar.NewReader(&bkBuf) + + th, err := tr.Next() + for err == nil { + expData, ok := fileData[th.Name] + if !ok { + t.Errorf("Extra file in backup: %q", th.Name) + continue + } + + buf := new(bytes.Buffer) + if _, err := io.Copy(buf, tr); err != nil { + t.Fatal(err) + } + + if !equalBuffers(expData, buf) { + t.Errorf("2Difference in data between backup and Export for file %s", th.Name) + } + + th, err = tr.Next() + } + + if t.Failed() { + t.FailNow() + } + + // TEST 4: Are subsets (1), (2), (3), (1,2), (2,3) accurately found in the larger export? + // export the whole DB + var ex1 bytes.Buffer + if err := e.Export(&ex1, "", time.Unix(0, 0), time.Unix(0, 1000000000)); err != nil { + t.Fatalf("failed to export: %s", err.Error()) + } + ex1Data, err := getExportData(&ex1) + if err != nil { + t.Errorf("Error extracting data from export: %s", err.Error()) + } + + for k, v := range ex1Data { + fullExp, ok := fileData[k] + if !ok { + t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) + continue + } + if !equalBuffers(fullExp, v) { + t.Errorf("2Difference in data between backup and Export for file %s", th.Name) + } + + } + + var ex2 bytes.Buffer + if err := e.Export(&ex2, "", time.Unix(0, 1000000001), time.Unix(0, 2000000000)); err != nil { + t.Fatalf("failed to export: %s", err.Error()) + } + + ex2Data, err := getExportData(&ex2) + if err != nil { + t.Errorf("Error extracting data from export: %s", err.Error()) + } + + for k, v := range ex2Data { + fullExp, ok := fileData[k] + if !ok { + t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) + continue + } + if !equalBuffers(fullExp, v) { + t.Errorf("2Difference in data between backup and Export for file %s", th.Name) + } + + } + + var ex3 bytes.Buffer + if err := e.Export(&ex3, "", time.Unix(0, 2000000001), time.Unix(0, 3000000000)); err != nil { + t.Fatalf("failed to export: %s", err.Error()) + } + + ex3Data, err := getExportData(&ex3) + if err != nil { + t.Errorf("Error extracting data from export: %s", err.Error()) + } + + for k, v := range ex3Data { + fullExp, ok := fileData[k] + if !ok { + t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) + continue + } + if !equalBuffers(fullExp, v) { + t.Errorf("2Difference in data between backup and Export for file %s", th.Name) + } + + } + + var ex12 bytes.Buffer + if err := e.Export(&ex12, "", time.Unix(0, 0), time.Unix(0, 2000000000)); err != nil { + t.Fatalf("failed to export: %s", err.Error()) + } + + ex12Data, err := getExportData(&ex12) + if err != nil { + t.Errorf("Error extracting data from export: %s", err.Error()) + } + + for k, v := range ex12Data { + fullExp, ok := fileData[k] + if !ok { + t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) + continue + } + if !equalBuffers(fullExp, v) { + t.Errorf("2Difference in data between backup and Export for file %s", th.Name) + } + + } + + var ex23 bytes.Buffer + if err := e.Export(&ex23, "", time.Unix(0, 1000000001), time.Unix(0, 3000000000)); err != nil { + t.Fatalf("failed to export: %s", err.Error()) + } + + ex23Data, err := getExportData(&ex23) + if err != nil { + t.Errorf("Error extracting data from export: %s", err.Error()) + } + + for k, v := range ex23Data { + fullExp, ok := fileData[k] + if !ok { + t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) + continue + } + if !equalBuffers(fullExp, v) { + t.Errorf("2Difference in data between backup and Export for file %s", th.Name) + } + + } +} + +func equalBuffers(bufA, bufB *bytes.Buffer) bool { + for i, v := range bufA.Bytes() { + if v != bufB.Bytes()[i] { + return false + } + } + return true +} + +func getExportData(exBuf *bytes.Buffer) (map[string]*bytes.Buffer, error) { + + tr := tar.NewReader(exBuf) + + fileData := make(map[string]*bytes.Buffer) + + // TEST 1: Get the bits for each file. If we got a file the store doesn't know about, report error + for { + th, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + + buf := new(bytes.Buffer) + if _, err := io.Copy(buf, tr); err != nil { + return nil, err + } + fileData[th.Name] = buf + + } + + return fileData, nil +} + // Ensure engine can create an ascending iterator for cached values. func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) { t.Parallel() diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index b00ec3c4af..eda8985f2e 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -97,6 +97,10 @@ const ( // max length of a key in an index entry (measurement + tags) maxKeyLength = (1 << (2 * 8)) - 1 + + // The threshold amount data written before we periodically fsync a TSM file. This helps avoid + // long pauses due to very large fsyncs at the end of writing a TSM file. + fsyncEvery = 512 * 1024 * 1024 ) var ( @@ -233,7 +237,7 @@ func (e *IndexEntry) String() string { // NewIndexWriter returns a new IndexWriter. func NewIndexWriter() IndexWriter { - buf := bytes.NewBuffer(make([]byte, 0, 4096)) + buf := bytes.NewBuffer(make([]byte, 0, 1024*1024)) return &directIndex{buf: buf, w: bufio.NewWriter(buf)} } @@ -253,6 +257,9 @@ type indexBlock struct { type directIndex struct { keyCount int size uint32 + + // The bytes written count of when we last fsync'd + lastSync uint32 fd *os.File buf *bytes.Buffer @@ -377,7 +384,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) { return 0, err } - return io.Copy(w, bufio.NewReader(d.fd)) + return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024)) } func (d *directIndex) flush(w io.Writer) (int64, error) { @@ -435,6 +442,15 @@ func (d *directIndex) flush(w io.Writer) (int64, error) { d.indexEntries.Type = 0 d.indexEntries.entries = d.indexEntries.entries[:0] + // If this is a disk based index and we've written more than the fsync threshold, + // fsync the data to avoid long pauses later on. + if d.fd != nil && d.size-d.lastSync > fsyncEvery { + if err := d.fd.Sync(); err != nil { + return N, err + } + d.lastSync = d.size + } + return N, nil } @@ -486,18 +502,30 @@ type tsmWriter struct { w *bufio.Writer index IndexWriter n int64 + + // The bytes written count of when we last fsync'd + lastSync int64 } // NewTSMWriter returns a new TSMWriter writing to w. func NewTSMWriter(w io.Writer) (TSMWriter, error) { + index := NewIndexWriter() + return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil +} + +// NewTSMWriterWithDiskBuffer returns a new TSMWriter writing to w and will use a disk +// based buffer for the TSM index if possible. +func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) { var index IndexWriter - if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") { - f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) + // Make sure is a File so we can write the temp index alongside it. + if fw, ok := w.(*os.File); ok { + f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return nil, err } index = NewDiskIndexWriter(f) } else { + // w is not a file, just use an inmem index index = NewIndexWriter() } @@ -612,6 +640,14 @@ func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte) // Increment file position pointer (checksum + block len) t.n += int64(n) + // fsync the file periodically to avoid long pauses with very big files. + if t.n-t.lastSync > fsyncEvery { + if err := t.sync(); err != nil { + return err + } + t.lastSync = t.n + } + if len(t.index.Entries(key)) >= maxIndexEntries { return ErrMaxBlocksExceeded } @@ -646,6 +682,10 @@ func (t *tsmWriter) Flush() error { return err } + return t.sync() +} + +func (t *tsmWriter) sync() error { if f, ok := t.wrapped.(*os.File); ok { if err := f.Sync(); err != nil { return err diff --git a/tsdb/shard.go b/tsdb/shard.go index cb094e5f8b..9f70a66a0c 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -1011,6 +1011,14 @@ func (s *Shard) Backup(w io.Writer, basePath string, since time.Time) error { return engine.Backup(w, basePath, since) } +func (s *Shard) Export(w io.Writer, basePath string, start time.Time, end time.Time) error { + engine, err := s.engine() + if err != nil { + return err + } + return engine.Export(w, basePath, start, end) +} + // Restore restores data to the underlying engine for the shard. // The shard is reopened after restore. func (s *Shard) Restore(r io.Reader, basePath string) error { diff --git a/tsdb/store.go b/tsdb/store.go index 78e08d78ef..41e4285b5c 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -170,6 +170,12 @@ func (s *Store) loadShards() error { lim := s.EngineOptions.Config.MaxConcurrentCompactions if lim == 0 { lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions + + // On systems with more cores, cap at 4 to reduce disk utilization + if lim > 4 { + lim = 4 + } + if lim < 1 { lim = 1 } @@ -851,6 +857,20 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error { return shard.Backup(w, path, since) } +func (s *Store) ExportShard(id uint64, start time.Time, end time.Time, w io.Writer) error { + shard := s.Shard(id) + if shard == nil { + return fmt.Errorf("shard %d doesn't exist on this server", id) + } + + path, err := relativePath(s.path, shard.path) + if err != nil { + return err + } + + return shard.Export(w, path, start, end) +} + // RestoreShard restores a backup from r to a given shard. // This will only overwrite files included in the backup. func (s *Store) RestoreShard(id uint64, r io.Reader) error {