merge master
commit
7d13bf3262
14
CHANGELOG.md
14
CHANGELOG.md
|
|
@ -7,6 +7,8 @@
|
|||
### Features
|
||||
|
||||
- [#8495](https://github.com/influxdata/influxdb/pull/8495): Improve CLI connection warnings
|
||||
- [#9146](https://github.com/influxdata/influxdb/issues/9146): Backup can produce data in the same format as the enterprise backup/restore tool.
|
||||
- [#8879](https://github.com/influxdata/influxdb/issues/8879): Export functionality using start/end to filter exported data by timestamp
|
||||
- [#9084](https://github.com/influxdata/influxdb/pull/9084): Handle high cardinality deletes in TSM engine
|
||||
- [#9162](https://github.com/influxdata/influxdb/pull/9162): Improve inmem index startup performance for high cardinality.
|
||||
- [#8491](https://github.com/influxdata/influxdb/pull/8491): Add further tsi support for streaming/copying shards.
|
||||
|
|
@ -21,6 +23,18 @@
|
|||
- [#9144](https://github.com/influxdata/influxdb/issues/9144): Fix query compilation so multiple nested distinct calls is allowable
|
||||
- [#8789](https://github.com/influxdata/influxdb/issues/8789): Fix CLI to allow quoted database names in use statement
|
||||
|
||||
## v1.4.3 [unreleased]
|
||||
|
||||
### Configuration Changes
|
||||
|
||||
#### `[data]` Section
|
||||
|
||||
The default value for `cache-snapshot-memory-size` has been changed from `25m` to `256m`.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- [#9201](https://github.com/influxdata/influxdb/issues/9201): Fix higher disk i/o utilization
|
||||
|
||||
## v1.4.2 [2017-11-15]
|
||||
|
||||
Refer to the 1.4.0 breaking changes section if `influxd` fails to start with an `incompatible tsi1 index MANIFEST` error.
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
// Package backup is the backup subcommand for the influxd command.
|
||||
// Package backup implements both the backup and export subcommands for the influxd command.
|
||||
package backup
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
|
@ -13,9 +14,9 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/cmd/influxd/backup_util"
|
||||
"github.com/influxdata/influxdb/services/snapshotter"
|
||||
"github.com/influxdata/influxdb/tcp"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
|
|
@ -43,9 +44,20 @@ type Command struct {
|
|||
Stderr io.Writer
|
||||
Stdout io.Writer
|
||||
|
||||
host string
|
||||
path string
|
||||
database string
|
||||
host string
|
||||
path string
|
||||
database string
|
||||
retentionPolicy string
|
||||
shardID string
|
||||
|
||||
isBackup bool
|
||||
since time.Time
|
||||
start time.Time
|
||||
end time.Time
|
||||
|
||||
enterprise bool
|
||||
manifest backup_util.Manifest
|
||||
enterpriseFileBase string
|
||||
}
|
||||
|
||||
// NewCommand returns a new instance of Command with default settings.
|
||||
|
|
@ -63,100 +75,230 @@ func (cmd *Command) Run(args ...string) error {
|
|||
cmd.StderrLogger = log.New(cmd.Stderr, "", log.LstdFlags)
|
||||
|
||||
// Parse command line arguments.
|
||||
retentionPolicy, shardID, since, err := cmd.parseFlags(args)
|
||||
err := cmd.parseFlags(args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// based on the arguments passed in we only backup the minimum
|
||||
if shardID != "" {
|
||||
if cmd.shardID != "" {
|
||||
// always backup the metastore
|
||||
if err := cmd.backupMetastore(); err != nil {
|
||||
return err
|
||||
}
|
||||
err = cmd.backupShard(retentionPolicy, shardID, since)
|
||||
} else if retentionPolicy != "" {
|
||||
err = cmd.backupRetentionPolicy(retentionPolicy, since)
|
||||
err = cmd.backupShard(cmd.database, cmd.retentionPolicy, cmd.shardID)
|
||||
|
||||
} else if cmd.retentionPolicy != "" {
|
||||
// always backup the metastore
|
||||
if err := cmd.backupMetastore(); err != nil {
|
||||
return err
|
||||
}
|
||||
err = cmd.backupRetentionPolicy()
|
||||
} else if cmd.database != "" {
|
||||
err = cmd.backupDatabase(since)
|
||||
// always backup the metastore
|
||||
if err := cmd.backupMetastore(); err != nil {
|
||||
return err
|
||||
}
|
||||
err = cmd.backupDatabase()
|
||||
} else {
|
||||
err = cmd.backupMetastore()
|
||||
// always backup the metastore
|
||||
if err := cmd.backupMetastore(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd.StdoutLogger.Println("No database, retention policy or shard ID given. Full meta store backed up.")
|
||||
if cmd.enterprise {
|
||||
cmd.StdoutLogger.Println("Backing up all databases in enterprise format")
|
||||
if err := cmd.backupDatabase(); err != nil {
|
||||
cmd.StderrLogger.Printf("backup failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if cmd.enterprise {
|
||||
cmd.manifest.Platform = "OSS"
|
||||
filename := cmd.enterpriseFileBase + ".manifest"
|
||||
if err := cmd.manifest.Save(filepath.Join(cmd.path, filename)); err != nil {
|
||||
cmd.StderrLogger.Printf("manifest save failed: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
cmd.StderrLogger.Printf("backup failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
cmd.StdoutLogger.Println("backup complete")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseFlags parses and validates the command line arguments into a request object.
|
||||
func (cmd *Command) parseFlags(args []string) (retentionPolicy, shardID string, since time.Time, err error) {
|
||||
func (cmd *Command) parseFlags(args []string) (err error) {
|
||||
fs := flag.NewFlagSet("", flag.ContinueOnError)
|
||||
|
||||
fs.StringVar(&cmd.host, "host", "localhost:8088", "")
|
||||
fs.StringVar(&cmd.database, "database", "", "")
|
||||
fs.StringVar(&retentionPolicy, "retention", "", "")
|
||||
fs.StringVar(&shardID, "shard", "", "")
|
||||
fs.StringVar(&cmd.retentionPolicy, "retention", "", "")
|
||||
fs.StringVar(&cmd.shardID, "shard", "", "")
|
||||
var sinceArg string
|
||||
var startArg string
|
||||
var endArg string
|
||||
fs.StringVar(&sinceArg, "since", "", "")
|
||||
fs.StringVar(&startArg, "start", "", "")
|
||||
fs.StringVar(&endArg, "end", "", "")
|
||||
fs.BoolVar(&cmd.enterprise, "enterprise", false, "")
|
||||
|
||||
fs.SetOutput(cmd.Stderr)
|
||||
fs.Usage = cmd.printUsage
|
||||
|
||||
err = fs.Parse(args)
|
||||
if err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// for enterprise saving, if needed
|
||||
cmd.enterpriseFileBase = time.Now().UTC().Format(backup_util.EnterpriseFileNamePattern)
|
||||
|
||||
// if startArg and endArg are unspecified, then assume we are doing a full backup of the DB
|
||||
cmd.isBackup = startArg == "" && endArg == ""
|
||||
|
||||
if sinceArg != "" {
|
||||
since, err = time.Parse(time.RFC3339, sinceArg)
|
||||
cmd.since, err = time.Parse(time.RFC3339, sinceArg)
|
||||
if err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
}
|
||||
if startArg != "" {
|
||||
if cmd.isBackup {
|
||||
return errors.New("backup command uses one of -since or -start/-end")
|
||||
}
|
||||
cmd.start, err = time.Parse(time.RFC3339, startArg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if endArg != "" {
|
||||
if cmd.isBackup {
|
||||
return errors.New("backup command uses one of -since or -start/-end")
|
||||
}
|
||||
cmd.end, err = time.Parse(time.RFC3339, endArg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// start should be < end
|
||||
if !cmd.start.Before(cmd.end) {
|
||||
return errors.New("start date must be before end date")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that only one arg is specified.
|
||||
if fs.NArg() == 0 {
|
||||
return "", "", time.Unix(0, 0), errors.New("backup destination path required")
|
||||
} else if fs.NArg() != 1 {
|
||||
return "", "", time.Unix(0, 0), errors.New("only one backup path allowed")
|
||||
if fs.NArg() != 1 {
|
||||
return errors.New("Exactly one backup path is required.")
|
||||
}
|
||||
cmd.path = fs.Arg(0)
|
||||
|
||||
err = os.MkdirAll(cmd.path, 0700)
|
||||
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// backupShard will write a tar archive of the passed in shard with any TSM files that have been
|
||||
// created since the time passed in
|
||||
func (cmd *Command) backupShard(retentionPolicy string, shardID string, since time.Time) error {
|
||||
id, err := strconv.ParseUint(shardID, 10, 64)
|
||||
func (cmd *Command) backupShard(db, rp, sid string) error {
|
||||
reqType := snapshotter.RequestShardBackup
|
||||
if !cmd.isBackup {
|
||||
reqType = snapshotter.RequestShardExport
|
||||
}
|
||||
|
||||
id, err := strconv.ParseUint(sid, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
shardArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, fmt.Sprintf(BackupFilePattern, cmd.database, retentionPolicy, id)))
|
||||
shardArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, fmt.Sprintf(backup_util.BackupFilePattern, db, rp, id)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd.StdoutLogger.Printf("backing up db=%v rp=%v shard=%v to %s since %s",
|
||||
cmd.database, retentionPolicy, shardID, shardArchivePath, since)
|
||||
db, rp, sid, shardArchivePath, cmd.since)
|
||||
|
||||
req := &snapshotter.Request{
|
||||
Type: snapshotter.RequestShardBackup,
|
||||
Database: cmd.database,
|
||||
RetentionPolicy: retentionPolicy,
|
||||
ShardID: id,
|
||||
Since: since,
|
||||
Type: reqType,
|
||||
BackupDatabase: db,
|
||||
BackupRetentionPolicy: rp,
|
||||
ShardID: id,
|
||||
Since: cmd.since,
|
||||
ExportStart: cmd.start,
|
||||
ExportEnd: cmd.end,
|
||||
}
|
||||
|
||||
// TODO: verify shard backup data
|
||||
return cmd.downloadAndVerify(req, shardArchivePath, nil)
|
||||
err = cmd.downloadAndVerify(req, shardArchivePath, nil)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cmd.enterprise {
|
||||
f, err := os.Open(shardArchivePath)
|
||||
defer f.Close()
|
||||
defer os.Remove(shardArchivePath)
|
||||
|
||||
filePrefix := cmd.enterpriseFileBase + ".s" + sid
|
||||
filename := filePrefix + ".tar.gz"
|
||||
out, err := os.OpenFile(filepath.Join(cmd.path, filename), os.O_CREATE|os.O_RDWR, 0600)
|
||||
|
||||
zw := gzip.NewWriter(out)
|
||||
zw.Name = filePrefix + ".tar"
|
||||
|
||||
cw := backup_util.CountingWriter{Writer: zw}
|
||||
|
||||
_, err = io.Copy(&cw, f)
|
||||
if err != nil {
|
||||
if err := zw.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := out.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
shardid, err := strconv.ParseUint(sid, 10, 64)
|
||||
if err != nil {
|
||||
if err := zw.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := out.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
cmd.manifest.Files = append(cmd.manifest.Files, backup_util.Entry{
|
||||
Database: db,
|
||||
Policy: rp,
|
||||
ShardID: shardid,
|
||||
FileName: filename,
|
||||
Size: cw.Total,
|
||||
LastModified: 0,
|
||||
})
|
||||
|
||||
if err := zw.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := out.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// backupSeriesFile will write a tar archive of the series file for the database.
|
||||
|
|
@ -168,20 +310,20 @@ func (cmd *Command) backupSeriesFile() error {
|
|||
|
||||
cmd.StdoutLogger.Printf("backing up series file to %s", seriesFileArchivePath)
|
||||
req := &snapshotter.Request{
|
||||
Type: snapshotter.RequestSeriesFileBackup,
|
||||
Database: cmd.database,
|
||||
Type: snapshotter.RequestSeriesFileBackup,
|
||||
BackupDatabase: cmd.database,
|
||||
}
|
||||
return cmd.downloadAndVerify(req, seriesFileArchivePath, nil)
|
||||
}
|
||||
|
||||
// backupDatabase will request the database information from the server and then backup the metastore and
|
||||
// every shard in every retention policy in the database. Each shard will be written to a separate tar.
|
||||
func (cmd *Command) backupDatabase(since time.Time) error {
|
||||
cmd.StdoutLogger.Printf("backing up db=%s since %s", cmd.database, since)
|
||||
// backupDatabase will request the database information from the server and then backup
|
||||
// every shard in every retention policy in the database. Each shard will be written to a separate file.
|
||||
func (cmd *Command) backupDatabase() error {
|
||||
cmd.StdoutLogger.Printf("backing up db=%s", cmd.database)
|
||||
|
||||
req := &snapshotter.Request{
|
||||
Type: snapshotter.RequestDatabaseInfo,
|
||||
Database: cmd.database,
|
||||
Type: snapshotter.RequestDatabaseInfo,
|
||||
BackupDatabase: cmd.database,
|
||||
}
|
||||
|
||||
response, err := cmd.requestInfo(req)
|
||||
|
|
@ -189,18 +331,18 @@ func (cmd *Command) backupDatabase(since time.Time) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return cmd.backupResponsePaths(response, since)
|
||||
return cmd.backupResponsePaths(response)
|
||||
}
|
||||
|
||||
// backupRetentionPolicy will request the retention policy information from the server and then backup
|
||||
// the metastore and every shard in the retention policy. Each shard will be written to a separate tar.
|
||||
func (cmd *Command) backupRetentionPolicy(retentionPolicy string, since time.Time) error {
|
||||
cmd.StdoutLogger.Printf("backing up rp=%s since %s", retentionPolicy, since)
|
||||
// every shard in the retention policy. Each shard will be written to a separate file.
|
||||
func (cmd *Command) backupRetentionPolicy() error {
|
||||
cmd.StdoutLogger.Printf("backing up rp=%s since %s", cmd.retentionPolicy, cmd.since)
|
||||
|
||||
req := &snapshotter.Request{
|
||||
Type: snapshotter.RequestRetentionPolicyInfo,
|
||||
Database: cmd.database,
|
||||
RetentionPolicy: retentionPolicy,
|
||||
Type: snapshotter.RequestRetentionPolicyInfo,
|
||||
BackupDatabase: cmd.database,
|
||||
BackupRetentionPolicy: cmd.retentionPolicy,
|
||||
}
|
||||
|
||||
response, err := cmd.requestInfo(req)
|
||||
|
|
@ -208,23 +350,22 @@ func (cmd *Command) backupRetentionPolicy(retentionPolicy string, since time.Tim
|
|||
return err
|
||||
}
|
||||
|
||||
return cmd.backupResponsePaths(response, since)
|
||||
return cmd.backupResponsePaths(response)
|
||||
}
|
||||
|
||||
// backupResponsePaths will backup the metastore and all shard paths in the response struct
|
||||
func (cmd *Command) backupResponsePaths(response *snapshotter.Response, since time.Time) error {
|
||||
if err := cmd.backupMetastore(); err != nil {
|
||||
return err
|
||||
}
|
||||
// backupResponsePaths will backup all shards identified by shard paths in the response struct
|
||||
func (cmd *Command) backupResponsePaths(response *snapshotter.Response) error {
|
||||
|
||||
// loop through the returned paths and back up each shard
|
||||
for _, path := range response.Paths {
|
||||
rp, id, err := retentionAndShardFromPath(path)
|
||||
db, rp, id, err := backup_util.DBRetentionAndShardFromPath(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cmd.backupShard(rp, id, since); err != nil {
|
||||
err = cmd.backupShard(db, rp, id)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -236,10 +377,10 @@ func (cmd *Command) backupResponsePaths(response *snapshotter.Response, since ti
|
|||
return nil
|
||||
}
|
||||
|
||||
// backupMetastore will backup the metastore on the host to the passed in path. Database and retention policy backups
|
||||
// will force a backup of the metastore as well as requesting a specific shard backup from the command line
|
||||
// backupMetastore will backup the whole metastore on the host to the backup path
|
||||
// if useDB is non-empty, it will backup metadata only for the named database.
|
||||
func (cmd *Command) backupMetastore() error {
|
||||
metastoreArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, Metafile))
|
||||
metastoreArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, backup_util.Metafile))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -250,13 +391,24 @@ func (cmd *Command) backupMetastore() error {
|
|||
Type: snapshotter.RequestMetastoreBackup,
|
||||
}
|
||||
|
||||
return cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error {
|
||||
binData, err := ioutil.ReadFile(file)
|
||||
err = cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error {
|
||||
f, err := os.Open(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var magicByte [8]byte
|
||||
n, err := io.ReadFull(f, magicByte[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
magic := binary.BigEndian.Uint64(binData[:8])
|
||||
if n < 8 {
|
||||
return errors.New("Not enough bytes data to verify")
|
||||
}
|
||||
|
||||
magic := binary.BigEndian.Uint64(magicByte[:])
|
||||
if magic != snapshotter.BackupMagicHeader {
|
||||
cmd.StderrLogger.Println("Invalid metadata blob, ensure the metadata service is running (default port 8088)")
|
||||
return errors.New("invalid metadata received")
|
||||
|
|
@ -264,6 +416,28 @@ func (cmd *Command) backupMetastore() error {
|
|||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cmd.enterprise {
|
||||
metaBytes, err := backup_util.GetMetaBytes(metastoreArchivePath)
|
||||
defer os.Remove(metastoreArchivePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
filename := cmd.enterpriseFileBase + ".meta"
|
||||
if err := ioutil.WriteFile(filepath.Join(cmd.path, filename), metaBytes, 0644); err != nil {
|
||||
fmt.Fprintln(cmd.Stdout, "Error.")
|
||||
return err
|
||||
}
|
||||
|
||||
cmd.manifest.Meta.FileName = filename
|
||||
cmd.manifest.Meta.Size = int64(len(metaBytes))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// nextPath returns the next file to write to.
|
||||
|
|
@ -282,8 +456,7 @@ func (cmd *Command) nextPath(path string) (string, error) {
|
|||
// downloadAndVerify will download either the metastore or shard to a temp file and then
|
||||
// rename it to a good backup file name after complete
|
||||
func (cmd *Command) downloadAndVerify(req *snapshotter.Request, path string, validator func(string) error) error {
|
||||
|
||||
tmppath := path + Suffix
|
||||
tmppath := path + backup_util.Suffix
|
||||
if err := cmd.download(req, tmppath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -333,6 +506,11 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error {
|
|||
}
|
||||
defer conn.Close()
|
||||
|
||||
_, err = conn.Write([]byte{byte(req.Type)})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write the request
|
||||
if err := json.NewEncoder(conn).Encode(req); err != nil {
|
||||
return fmt.Errorf("encode snapshot request: %s", err)
|
||||
|
|
@ -357,11 +535,16 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error {
|
|||
// requestInfo will request the database or retention policy information from the host
|
||||
func (cmd *Command) requestInfo(request *snapshotter.Request) (*snapshotter.Response, error) {
|
||||
// Connect to snapshotter service.
|
||||
var r snapshotter.Response
|
||||
conn, err := tcp.Dial("tcp", cmd.host, snapshotter.MuxHeader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer conn.Close()
|
||||
_, err = conn.Write([]byte{byte(request.Type)})
|
||||
if err != nil {
|
||||
return &r, err
|
||||
}
|
||||
|
||||
// Write the request
|
||||
if err := json.NewEncoder(conn).Encode(request); err != nil {
|
||||
|
|
@ -369,7 +552,7 @@ func (cmd *Command) requestInfo(request *snapshotter.Request) (*snapshotter.Resp
|
|||
}
|
||||
|
||||
// Read the response
|
||||
var r snapshotter.Response
|
||||
|
||||
if err := json.NewDecoder(conn).Decode(&r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -379,7 +562,7 @@ func (cmd *Command) requestInfo(request *snapshotter.Request) (*snapshotter.Resp
|
|||
|
||||
// printUsage prints the usage message to STDERR.
|
||||
func (cmd *Command) printUsage() {
|
||||
fmt.Fprintf(cmd.Stdout, `Downloads a snapshot of a data node and saves it to disk.
|
||||
fmt.Fprintf(cmd.Stdout, `Downloads a file level age-based snapshot of a data node and saves it to disk.
|
||||
|
||||
Usage: influxd backup [flags] PATH
|
||||
|
||||
|
|
@ -393,18 +576,14 @@ Usage: influxd backup [flags] PATH
|
|||
Optional. The shard id to backup. If specified, retention is required.
|
||||
-since <2015-12-24T08:12:23Z>
|
||||
Optional. Do an incremental backup since the passed in RFC3339
|
||||
formatted time.
|
||||
formatted time. Not compatible with -start or -end.
|
||||
-start <2015-12-24T08:12:23Z>
|
||||
All points earlier than this time stamp will be excluded from the export. Not compatible with -since.
|
||||
-end <2015-12-24T08:12:23Z>
|
||||
All points later than this time stamp will be excluded from the export. Not compatible with -since.
|
||||
-enterprise
|
||||
Generate backup files in the format used for influxdb enterprise.
|
||||
|
||||
`)
|
||||
}
|
||||
|
||||
// retentionAndShardFromPath will take the shard relative path and split it into the
|
||||
// retention policy name and shard ID. The first part of the path should be the database name.
|
||||
func retentionAndShardFromPath(path string) (retention, shard string, err error) {
|
||||
a := strings.Split(path, string(filepath.Separator))
|
||||
if len(a) != 3 {
|
||||
return "", "", fmt.Errorf("expected database, retention policy, and shard id in path: %s", path)
|
||||
}
|
||||
|
||||
return a[1], a[2], nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,146 @@
|
|||
package backup_util
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"encoding/json"
|
||||
"github.com/influxdata/influxdb/services/snapshotter"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
const (
|
||||
// Suffix is a suffix added to the backup while it's in-process.
|
||||
Suffix = ".pending"
|
||||
|
||||
// Metafile is the base name given to the metastore backups.
|
||||
Metafile = "meta"
|
||||
|
||||
// BackupFilePattern is the beginning of the pattern for a backup
|
||||
// file. They follow the scheme <database>.<retention>.<shardID>.<increment>
|
||||
BackupFilePattern = "%s.%s.%05d"
|
||||
|
||||
EnterpriseFileNamePattern = "20060102T150405Z"
|
||||
|
||||
OSSManifest = "OSS"
|
||||
|
||||
ENTManifest = "ENT"
|
||||
)
|
||||
|
||||
func GetMetaBytes(fname string) ([]byte, error) {
|
||||
f, err := os.Open(fname)
|
||||
if err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if _, err := io.Copy(&buf, f); err != nil {
|
||||
return []byte{}, fmt.Errorf("copy: %s", err)
|
||||
}
|
||||
|
||||
b := buf.Bytes()
|
||||
var i int
|
||||
|
||||
// Make sure the file is actually a meta store backup file
|
||||
magic := binary.BigEndian.Uint64(b[:8])
|
||||
if magic != snapshotter.BackupMagicHeader {
|
||||
return []byte{}, fmt.Errorf("invalid metadata file")
|
||||
}
|
||||
i += 8
|
||||
|
||||
// Size of the meta store bytes
|
||||
length := int(binary.BigEndian.Uint64(b[i : i+8]))
|
||||
i += 8
|
||||
metaBytes := b[i : i+length]
|
||||
|
||||
return metaBytes, nil
|
||||
}
|
||||
|
||||
// Manifest lists the meta and shard file information contained in the backup.
|
||||
// If Limited is false, the manifest contains a full backup, otherwise
|
||||
// it is a partial backup.
|
||||
type Manifest struct {
|
||||
Platform string `json:"platform"`
|
||||
Meta MetaEntry `json:"meta"`
|
||||
Limited bool `json:"limited"`
|
||||
Files []Entry `json:"files"`
|
||||
|
||||
// If limited is true, then one (or all) of the following fields will be set
|
||||
|
||||
Database string `json:"database,omitempty"`
|
||||
Policy string `json:"policy,omitempty"`
|
||||
ShardID uint64 `json:"shard_id,omitempty"`
|
||||
}
|
||||
|
||||
// Entry contains the data information for a backed up shard.
|
||||
type Entry struct {
|
||||
Database string `json:"database"`
|
||||
Policy string `json:"policy"`
|
||||
ShardID uint64 `json:"shardID"`
|
||||
FileName string `json:"fileName"`
|
||||
Size int64 `json:"size"`
|
||||
LastModified int64 `json:"lastModified"`
|
||||
}
|
||||
|
||||
func (e *Entry) SizeOrZero() int64 {
|
||||
if e == nil {
|
||||
return 0
|
||||
}
|
||||
return e.Size
|
||||
}
|
||||
|
||||
// MetaEntry contains the meta store information for a backup.
|
||||
type MetaEntry struct {
|
||||
FileName string `json:"fileName"`
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
|
||||
// Size returns the size of the manifest.
|
||||
func (m *Manifest) Size() int64 {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
size := m.Meta.Size
|
||||
|
||||
for _, f := range m.Files {
|
||||
size += f.Size
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
func (manifest *Manifest) Save(filename string) error {
|
||||
b, err := json.MarshalIndent(manifest, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("create manifest: %v", err)
|
||||
}
|
||||
|
||||
return ioutil.WriteFile(filename, b, 0600)
|
||||
}
|
||||
|
||||
type CountingWriter struct {
|
||||
io.Writer
|
||||
Total int64 // Total # of bytes transferred
|
||||
}
|
||||
|
||||
func (w *CountingWriter) Write(p []byte) (n int, err error) {
|
||||
n, err = w.Writer.Write(p)
|
||||
w.Total += int64(n)
|
||||
return
|
||||
}
|
||||
|
||||
// retentionAndShardFromPath will take the shard relative path and split it into the
|
||||
// retention policy name and shard ID. The first part of the path should be the database name.
|
||||
func DBRetentionAndShardFromPath(path string) (db, retention, shard string, err error) {
|
||||
a := strings.Split(path, string(filepath.Separator))
|
||||
if len(a) != 3 {
|
||||
return "", "", "", fmt.Errorf("expected database, retention policy, and shard id in path: %s", path)
|
||||
}
|
||||
|
||||
return a[0], a[1], a[2], nil
|
||||
}
|
||||
|
|
@ -14,7 +14,7 @@ import (
|
|||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
"github.com/influxdata/influxdb/cmd/influxd/backup"
|
||||
"github.com/influxdata/influxdb/cmd/influxd/backup_util"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxdb/services/snapshotter"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
|
|
@ -157,7 +157,7 @@ func (cmd *Command) unpackSeriesFile() error {
|
|||
// cluster and replaces the root metadata.
|
||||
func (cmd *Command) unpackMeta() error {
|
||||
// find the meta file
|
||||
metaFiles, err := filepath.Glob(filepath.Join(cmd.backupFilesPath, backup.Metafile+".*"))
|
||||
metaFiles, err := filepath.Glob(filepath.Join(cmd.backupFilesPath, backup_util.Metafile+".*"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -268,7 +268,7 @@ func (cmd *Command) unpackShard(shardID string) error {
|
|||
}
|
||||
|
||||
// find the shard backup files
|
||||
pat := filepath.Join(cmd.backupFilesPath, fmt.Sprintf(backup.BackupFilePattern, cmd.database, cmd.retention, id))
|
||||
pat := filepath.Join(cmd.backupFilesPath, fmt.Sprintf(backup_util.BackupFilePattern, cmd.database, cmd.retention, id))
|
||||
return cmd.unpackFiles(pat + ".*")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@
|
|||
# snapshot the cache and write it to a TSM file, freeing up memory
|
||||
# Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k).
|
||||
# Values without a size suffix are in bytes.
|
||||
# cache-snapshot-memory-size = "25m"
|
||||
# cache-snapshot-memory-size = "256m"
|
||||
|
||||
# CacheSnapshotWriteColdDuration is the length of time at
|
||||
# which the engine will snapshot the cache and write it to
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import (
|
|||
type TSDBStoreMock struct {
|
||||
BackupShardFn func(id uint64, since time.Time, w io.Writer) error
|
||||
BackupSeriesFileFn func(database string, w io.Writer) error
|
||||
ExportShardFn func(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error
|
||||
CloseFn func() error
|
||||
CreateShardFn func(database, policy string, shardID uint64, enabled bool) error
|
||||
CreateShardSnapshotFn func(id uint64) (string, error)
|
||||
|
|
@ -54,6 +55,9 @@ func (s *TSDBStoreMock) BackupShard(id uint64, since time.Time, w io.Writer) err
|
|||
func (s *TSDBStoreMock) BackupSeriesFile(database string, w io.Writer) error {
|
||||
return s.BackupSeriesFileFn(database, w)
|
||||
}
|
||||
func (s *TSDBStoreMock) ExportShard(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error {
|
||||
return s.ExportShardFn(id, ExportStart, ExportEnd, w)
|
||||
}
|
||||
func (s *TSDBStoreMock) Close() error { return s.CloseFn() }
|
||||
func (s *TSDBStoreMock) CreateShard(database string, retentionPolicy string, shardID uint64, enabled bool) error {
|
||||
return s.CreateShardFn(database, retentionPolicy, shardID, enabled)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ NAME
|
|||
----
|
||||
influxd-backup - Downloads a snapshot of a data node and saves it to disk
|
||||
|
||||
|
||||
SYNOPSIS
|
||||
--------
|
||||
'influxd backup' [options]
|
||||
|
|
@ -30,6 +31,13 @@ OPTIONS
|
|||
-since <2015-12-24T08:12:13Z>::
|
||||
Do an incremental backup since the passed in time. The time needs to be in the RFC3339 format. Optional.
|
||||
|
||||
-start <2015-12-24T08:12:23Z>::
|
||||
All points earlier than this time stamp will be excluded from the export. Not compatible with -since.
|
||||
-end <2015-12-24T08:12:23Z>::
|
||||
All points later than this time stamp will be excluded from the export. Not compatible with -since.
|
||||
-enterprise::
|
||||
Generate backup files in the format used for influxdb enterprise.
|
||||
|
||||
SEE ALSO
|
||||
--------
|
||||
*influxd-restore*(1)
|
||||
|
|
|
|||
|
|
@ -64,6 +64,10 @@ func (c *Client) doRequest(req *Request) ([]byte, error) {
|
|||
defer conn.Close()
|
||||
|
||||
// Write the request
|
||||
_, err = conn.Write([]byte{byte(req.Type)})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := json.NewEncoder(conn).Encode(req); err != nil {
|
||||
return nil, fmt.Errorf("encode snapshot request: %s", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -58,6 +58,12 @@ func TestClient_MetastoreBackup_InvalidMetadata(t *testing.T) {
|
|||
return
|
||||
}
|
||||
|
||||
var typ [1]byte
|
||||
if _, err := conn.Read(typ[:]); err != nil {
|
||||
t.Errorf("unable to read typ header: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
var m map[string]interface{}
|
||||
dec := json.NewDecoder(conn)
|
||||
if err := dec.Decode(&m); err != nil {
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ type Service struct {
|
|||
TSDBStore interface {
|
||||
BackupShard(id uint64, since time.Time, w io.Writer) error
|
||||
BackupSeriesFile(database string, w io.Writer) error
|
||||
ExportShard(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error
|
||||
Shard(id uint64) *tsdb.Shard
|
||||
ShardRelativePath(id uint64) (string, error)
|
||||
}
|
||||
|
|
@ -69,7 +70,9 @@ func (s *Service) Open() error {
|
|||
// Close implements the Service interface.
|
||||
func (s *Service) Close() error {
|
||||
if s.Listener != nil {
|
||||
s.Listener.Close()
|
||||
if err := s.Listener.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
s.wg.Wait()
|
||||
return nil
|
||||
|
|
@ -87,6 +90,7 @@ func (s *Service) serve() {
|
|||
for {
|
||||
// Wait for next connection.
|
||||
conn, err := s.Listener.Accept()
|
||||
|
||||
if err != nil && strings.Contains(err.Error(), "connection closed") {
|
||||
s.Logger.Info("snapshot listener closed")
|
||||
return
|
||||
|
|
@ -109,28 +113,39 @@ func (s *Service) serve() {
|
|||
|
||||
// handleConn processes conn. This is run in a separate goroutine.
|
||||
func (s *Service) handleConn(conn net.Conn) error {
|
||||
var typ [1]byte
|
||||
|
||||
_, err := conn.Read(typ[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r, err := s.readRequest(conn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read request: %s", err)
|
||||
}
|
||||
|
||||
switch r.Type {
|
||||
switch RequestType(typ[0]) {
|
||||
case RequestShardBackup:
|
||||
if err := s.TSDBStore.BackupShard(r.ShardID, r.Since, conn); err != nil {
|
||||
return err
|
||||
}
|
||||
case RequestShardExport:
|
||||
if err := s.TSDBStore.ExportShard(r.ShardID, r.ExportStart, r.ExportEnd, conn); err != nil {
|
||||
return err
|
||||
}
|
||||
case RequestMetastoreBackup:
|
||||
if err := s.writeMetaStore(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
case RequestSeriesFileBackup:
|
||||
if err := s.TSDBStore.BackupSeriesFile(r.Database, conn); err != nil {
|
||||
if err := s.TSDBStore.BackupSeriesFile(r.BackupDatabase, conn); err != nil {
|
||||
return err
|
||||
}
|
||||
case RequestDatabaseInfo:
|
||||
return s.writeDatabaseInfo(conn, r.Database)
|
||||
return s.writeDatabaseInfo(conn, r.BackupDatabase)
|
||||
case RequestRetentionPolicyInfo:
|
||||
return s.writeRetentionPolicyInfo(conn, r.Database, r.RetentionPolicy)
|
||||
return s.writeRetentionPolicyInfo(conn, r.BackupDatabase, r.BackupRetentionPolicy)
|
||||
default:
|
||||
return fmt.Errorf("request type unknown: %v", r.Type)
|
||||
}
|
||||
|
|
@ -141,6 +156,7 @@ func (s *Service) handleConn(conn net.Conn) error {
|
|||
func (s *Service) writeMetaStore(conn net.Conn) error {
|
||||
// Retrieve and serialize the current meta data.
|
||||
metaBlob, err := s.MetaClient.MarshalBinary()
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal meta: %s", err)
|
||||
}
|
||||
|
|
@ -179,29 +195,37 @@ func (s *Service) writeMetaStore(conn net.Conn) error {
|
|||
// this server into the connection.
|
||||
func (s *Service) writeDatabaseInfo(conn net.Conn, database string) error {
|
||||
res := Response{}
|
||||
db := s.MetaClient.Database(database)
|
||||
if db == nil {
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
dbs := []meta.DatabaseInfo{}
|
||||
if database != "" {
|
||||
db := s.MetaClient.Database(database)
|
||||
if db == nil {
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
}
|
||||
dbs = append(dbs, *db)
|
||||
} else {
|
||||
// we'll allow collecting info on all databases
|
||||
dbs = s.MetaClient.(*meta.Client).Databases()
|
||||
}
|
||||
|
||||
for _, rp := range db.RetentionPolicies {
|
||||
for _, sg := range rp.ShardGroups {
|
||||
for _, sh := range sg.Shards {
|
||||
// ignore if the shard isn't on the server
|
||||
if s.TSDBStore.Shard(sh.ID) == nil {
|
||||
continue
|
||||
}
|
||||
for _, db := range dbs {
|
||||
for _, rp := range db.RetentionPolicies {
|
||||
for _, sg := range rp.ShardGroups {
|
||||
for _, sh := range sg.Shards {
|
||||
// ignore if the shard isn't on the server
|
||||
if s.TSDBStore.Shard(sh.ID) == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
path, err := s.TSDBStore.ShardRelativePath(sh.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
path, err := s.TSDBStore.ShardRelativePath(sh.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res.Paths = append(res.Paths, path)
|
||||
res.Paths = append(res.Paths, path)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(conn).Encode(res); err != nil {
|
||||
return fmt.Errorf("encode response: %s", err.Error())
|
||||
}
|
||||
|
|
@ -281,16 +305,33 @@ const (
|
|||
|
||||
// RequestRetentionPolicyInfo represents a request for retention policy info.
|
||||
RequestRetentionPolicyInfo
|
||||
|
||||
// RequestShardExport represents a request to export Shard data. Similar to a backup, but shards
|
||||
// may be filtered based on the start/end times on each block.
|
||||
RequestShardExport
|
||||
|
||||
// RequestMetaStoreUpdate represents a request to upload a metafile that will be used to do a live update
|
||||
// to the existing metastore.
|
||||
RequestMetaStoreUpdate
|
||||
|
||||
// RequestShardUpdate will initiate the upload of a shard data tar file
|
||||
// and have the engine import the data.
|
||||
RequestShardUpdate
|
||||
)
|
||||
|
||||
// Request represents a request for a specific backup or for information
|
||||
// about the shards on this server for a database or retention policy.
|
||||
type Request struct {
|
||||
Type RequestType
|
||||
Database string
|
||||
RetentionPolicy string
|
||||
ShardID uint64
|
||||
Since time.Time
|
||||
Type RequestType
|
||||
BackupDatabase string
|
||||
RestoreDatabase string
|
||||
BackupRetentionPolicy string
|
||||
RestoreRetentionPolicy string
|
||||
ShardID uint64
|
||||
Since time.Time
|
||||
ExportStart time.Time
|
||||
ExportEnd time.Time
|
||||
UploadSize int64
|
||||
}
|
||||
|
||||
// Response contains the relative paths for all the shards on this server
|
||||
|
|
|
|||
|
|
@ -132,6 +132,10 @@ func TestSnapshotter_RequestShardBackup(t *testing.T) {
|
|||
Since: time.Unix(0, 0),
|
||||
}
|
||||
conn.Write([]byte{snapshotter.MuxHeader})
|
||||
_, err = conn.Write([]byte{byte(req.Type)})
|
||||
if err != nil {
|
||||
t.Errorf("could not encode request type to conn: %v", err)
|
||||
}
|
||||
enc := json.NewEncoder(conn)
|
||||
if err := enc.Encode(&req); err != nil {
|
||||
t.Errorf("unable to encode request: %s", err)
|
||||
|
|
@ -222,10 +226,14 @@ func TestSnapshotter_RequestDatabaseInfo(t *testing.T) {
|
|||
defer conn.Close()
|
||||
|
||||
req := snapshotter.Request{
|
||||
Type: snapshotter.RequestDatabaseInfo,
|
||||
Database: "db0",
|
||||
Type: snapshotter.RequestDatabaseInfo,
|
||||
BackupDatabase: "db0",
|
||||
}
|
||||
conn.Write([]byte{snapshotter.MuxHeader})
|
||||
_, err = conn.Write([]byte{byte(req.Type)})
|
||||
if err != nil {
|
||||
t.Errorf("could not encode request type to conn: %v", err)
|
||||
}
|
||||
enc := json.NewEncoder(conn)
|
||||
if err := enc.Encode(&req); err != nil {
|
||||
t.Errorf("unable to encode request: %s", err)
|
||||
|
|
@ -272,10 +280,14 @@ func TestSnapshotter_RequestDatabaseInfo_ErrDatabaseNotFound(t *testing.T) {
|
|||
defer conn.Close()
|
||||
|
||||
req := snapshotter.Request{
|
||||
Type: snapshotter.RequestDatabaseInfo,
|
||||
Database: "doesnotexist",
|
||||
Type: snapshotter.RequestDatabaseInfo,
|
||||
BackupDatabase: "doesnotexist",
|
||||
}
|
||||
conn.Write([]byte{snapshotter.MuxHeader})
|
||||
_, err = conn.Write([]byte{byte(req.Type)})
|
||||
if err != nil {
|
||||
t.Errorf("could not encode request type to conn: %v", err)
|
||||
}
|
||||
enc := json.NewEncoder(conn)
|
||||
if err := enc.Encode(&req); err != nil {
|
||||
t.Errorf("unable to encode request: %s", err)
|
||||
|
|
@ -332,11 +344,15 @@ func TestSnapshotter_RequestRetentionPolicyInfo(t *testing.T) {
|
|||
defer conn.Close()
|
||||
|
||||
req := snapshotter.Request{
|
||||
Type: snapshotter.RequestRetentionPolicyInfo,
|
||||
Database: "db0",
|
||||
RetentionPolicy: "rp0",
|
||||
Type: snapshotter.RequestRetentionPolicyInfo,
|
||||
BackupDatabase: "db0",
|
||||
BackupRetentionPolicy: "rp0",
|
||||
}
|
||||
conn.Write([]byte{snapshotter.MuxHeader})
|
||||
_, err = conn.Write([]byte{byte(req.Type)})
|
||||
if err != nil {
|
||||
t.Errorf("could not encode request type to conn: %v", err)
|
||||
}
|
||||
enc := json.NewEncoder(conn)
|
||||
if err := enc.Encode(&req); err != nil {
|
||||
t.Errorf("unable to encode request: %s", err)
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ const (
|
|||
|
||||
// DefaultCacheSnapshotMemorySize is the size at which the engine will
|
||||
// snapshot the cache and write it to a TSM file, freeing up memory
|
||||
DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB
|
||||
DefaultCacheSnapshotMemorySize = 256 * 1024 * 1024 // 256MB
|
||||
|
||||
// DefaultCacheSnapshotWriteColdDuration is the length of time at which
|
||||
// the engine will snapshot the cache and write it to a new TSM file if
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ type Engine interface {
|
|||
|
||||
CreateSnapshot() (string, error)
|
||||
Backup(w io.Writer, basePath string, since time.Time) error
|
||||
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
|
||||
Restore(r io.Reader, basePath string) error
|
||||
Import(r io.Reader, basePath string) error
|
||||
|
||||
|
|
|
|||
|
|
@ -263,16 +263,9 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
|
|||
minGenerations = level + 1
|
||||
}
|
||||
|
||||
// Each compaction group should run against 4 generations. For level 1, since these
|
||||
// can get created much more quickly, bump the grouping to 8 to keep file counts lower.
|
||||
groupSize := 4
|
||||
if level == 1 {
|
||||
groupSize = 8
|
||||
}
|
||||
|
||||
var cGroups []CompactionGroup
|
||||
for _, group := range levelGroups {
|
||||
for _, chunk := range group.chunk(groupSize) {
|
||||
for _, chunk := range group.chunk(4) {
|
||||
var cGroup CompactionGroup
|
||||
var hasTombstones bool
|
||||
for _, gen := range chunk {
|
||||
|
|
@ -1027,16 +1020,28 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
|
|||
}
|
||||
|
||||
func (c *Compactor) write(path string, iter KeyIterator) (err error) {
|
||||
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
|
||||
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
|
||||
if err != nil {
|
||||
return errCompactionInProgress{err: err}
|
||||
}
|
||||
|
||||
// Create the write for the new TSM file.
|
||||
w, err := NewTSMWriter(fd)
|
||||
if err != nil {
|
||||
return err
|
||||
var w TSMWriter
|
||||
|
||||
// Use a disk based TSM buffer if it looks like we might create a big index
|
||||
// in memory.
|
||||
if iter.EstimatedIndexSize() > 64*1024*1024 {
|
||||
w, err = NewTSMWriterWithDiskBuffer(fd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
w, err = NewTSMWriter(fd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
closeErr := w.Close()
|
||||
if err == nil {
|
||||
|
|
@ -1145,6 +1150,10 @@ type KeyIterator interface {
|
|||
|
||||
// Err returns any errors encountered during iteration.
|
||||
Err() error
|
||||
|
||||
// EstimatedIndexSize returns the estimated size of the index that would
|
||||
// be required to store all the series and entries in the KeyIterator.
|
||||
EstimatedIndexSize() int
|
||||
}
|
||||
|
||||
// tsmKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces
|
||||
|
|
@ -1278,6 +1287,14 @@ func (k *tsmKeyIterator) hasMergedValues() bool {
|
|||
len(k.mergedBooleanValues) > 0
|
||||
}
|
||||
|
||||
func (k *tsmKeyIterator) EstimatedIndexSize() int {
|
||||
var size uint32
|
||||
for _, r := range k.readers {
|
||||
size += r.IndexSize()
|
||||
}
|
||||
return int(size) / len(k.readers)
|
||||
}
|
||||
|
||||
// Next returns true if there are any values remaining in the iterator.
|
||||
func (k *tsmKeyIterator) Next() bool {
|
||||
RETRY:
|
||||
|
|
@ -1516,6 +1533,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte
|
|||
return cki
|
||||
}
|
||||
|
||||
func (c *cacheKeyIterator) EstimatedIndexSize() int {
|
||||
// We return 0 here since we already have all the entries in memory to write an index.
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c *cacheKeyIterator) encode() {
|
||||
concurrency := runtime.GOMAXPROCS(0)
|
||||
n := len(c.ready)
|
||||
|
|
|
|||
|
|
@ -1792,14 +1792,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
|
|||
Path: "08-01.tsm1",
|
||||
Size: 1 * 1024 * 1024,
|
||||
},
|
||||
tsm1.FileStat{
|
||||
Path: "09-01.tsm1",
|
||||
Size: 1 * 1024 * 1024,
|
||||
},
|
||||
tsm1.FileStat{
|
||||
Path: "10-01.tsm1",
|
||||
Size: 1 * 1024 * 1024,
|
||||
},
|
||||
}
|
||||
|
||||
cp := tsm1.NewDefaultPlanner(
|
||||
|
|
@ -1810,8 +1802,8 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
|
|||
}, tsdb.DefaultCompactFullWriteColdDuration,
|
||||
)
|
||||
|
||||
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
|
||||
expFiles2 := []tsm1.FileStat{data[8], data[9]}
|
||||
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
|
||||
expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]}
|
||||
|
||||
tsm := cp.PlanLevel(1)
|
||||
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
|
||||
|
|
@ -1887,8 +1879,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
|
|||
}, tsdb.DefaultCompactFullWriteColdDuration,
|
||||
)
|
||||
|
||||
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
|
||||
expFiles2 := []tsm1.FileStat{data[8], data[9]}
|
||||
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
|
||||
expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]}
|
||||
|
||||
tsm := cp.PlanLevel(1)
|
||||
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
|
||||
|
|
|
|||
|
|
@ -747,6 +747,142 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.Time) error {
|
||||
path, err := e.CreateSnapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove the temporary snapshot dir
|
||||
defer os.RemoveAll(path)
|
||||
if err := e.index.SnapshotTo(path); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tw := tar.NewWriter(w)
|
||||
defer tw.Close()
|
||||
|
||||
// Recursively read all files from path.
|
||||
files, err := readDir(path, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
if !strings.HasSuffix(file, ".tsm") {
|
||||
if err := e.writeFileToBackup(file, basePath, filepath.Join(path, file), tw); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var tombstonePath string
|
||||
f, err := os.Open(filepath.Join(path, file))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r, err := NewTSMReader(f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Grab the tombstone file if one exists.
|
||||
if r.HasTombstones() {
|
||||
tombstonePath = filepath.Base(r.TombstoneFiles()[0].Path)
|
||||
}
|
||||
|
||||
min, max := r.TimeRange()
|
||||
stun := start.UnixNano()
|
||||
eun := end.UnixNano()
|
||||
|
||||
// We overlap time ranges, we need to filter the file
|
||||
if min >= stun && min <= eun && max > eun || // overlap to the right
|
||||
max >= stun && max <= eun && min < stun || // overlap to the left
|
||||
min <= stun && max >= eun { // TSM file has a range LARGER than the boundary
|
||||
err := e.filterFileToBackup(r, file, basePath, filepath.Join(path, file), start.UnixNano(), end.UnixNano(), tw)
|
||||
if err != nil {
|
||||
if err := r.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// above is the only case where we need to keep the reader open.
|
||||
if err := r.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// the TSM file is 100% inside the range, so we can just write it without scanning each block
|
||||
if min >= start.UnixNano() && max <= end.UnixNano() {
|
||||
if err := e.writeFileToBackup(file, basePath, filepath.Join(path, file), tw); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// if this TSM file had a tombstone we'll write out the whole thing too.
|
||||
if tombstonePath != "" {
|
||||
if err := e.writeFileToBackup(tombstonePath, basePath, filepath.Join(path, tombstonePath), tw); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Engine) filterFileToBackup(r *TSMReader, name, shardRelativePath, fullPath string, start, end int64, tw *tar.Writer) error {
|
||||
path := fullPath + ".tmp"
|
||||
out, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer os.Remove(path)
|
||||
|
||||
w, err := NewTSMWriter(out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
// implicit else: here we iterate over the blocks and only keep the ones we really want.
|
||||
var bi *BlockIterator
|
||||
bi = r.BlockIterator()
|
||||
|
||||
for bi.Next() {
|
||||
// not concerned with typ or checksum since we are just blindly writing back, with no decoding
|
||||
key, minTime, maxTime, _, _, buf, err := bi.Read()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if minTime >= start && minTime <= end ||
|
||||
maxTime >= start && maxTime <= end ||
|
||||
minTime <= start && maxTime >= end {
|
||||
err := w.WriteBlock(key, minTime, maxTime, buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := bi.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = w.WriteIndex()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// make sure the whole file is out to disk
|
||||
if err := w.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return e.writeFileToBackup(name, shardRelativePath, path, tw)
|
||||
}
|
||||
|
||||
// writeFileToBackup copies the file into the tar archive. Files will use the shardRelativePath
|
||||
// in their names. This should be the <db>/<retention policy>/<id> part of the path.
|
||||
func (e *Engine) writeFileToBackup(name string, shardRelativePath, fullPath string, tw *tar.Writer) error {
|
||||
|
|
@ -1471,7 +1607,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
|
|||
}
|
||||
|
||||
func (e *Engine) compact(quit <-chan struct{}) {
|
||||
t := time.NewTicker(time.Second)
|
||||
t := time.NewTicker(5 * time.Second)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
|
|
|
|||
|
|
@ -256,6 +256,268 @@ func TestEngine_Backup(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEngine_Export(t *testing.T) {
|
||||
// Generate temporary file.
|
||||
f, _ := ioutil.TempFile("", "tsm")
|
||||
f.Close()
|
||||
os.Remove(f.Name())
|
||||
walPath := filepath.Join(f.Name(), "wal")
|
||||
os.MkdirAll(walPath, 0777)
|
||||
defer os.RemoveAll(f.Name())
|
||||
|
||||
// Create a few points.
|
||||
p1 := MustParsePointString("cpu,host=A value=1.1 1000000000")
|
||||
p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
|
||||
p3 := MustParsePointString("cpu,host=C value=1.3 3000000000")
|
||||
|
||||
// Write those points to the engine.
|
||||
db := path.Base(f.Name())
|
||||
opt := tsdb.NewEngineOptions()
|
||||
opt.InmemIndex = inmem.NewIndex(db)
|
||||
idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), opt)
|
||||
defer idx.Close()
|
||||
|
||||
e := tsm1.NewEngine(1, idx, db, f.Name(), walPath, opt).(*tsm1.Engine)
|
||||
|
||||
// mock the planner so compactions don't run during the test
|
||||
e.CompactionPlan = &mockPlanner{}
|
||||
|
||||
if err := e.Open(); err != nil {
|
||||
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WritePoints([]models.Point{p1}); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WritePoints([]models.Point{p2}); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
if err := e.WriteSnapshot(); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WritePoints([]models.Point{p3}); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
// export the whole DB
|
||||
var exBuf bytes.Buffer
|
||||
if err := e.Export(&exBuf, "", time.Unix(0, 0), time.Unix(0, 4000000000)); err != nil {
|
||||
t.Fatalf("failed to export: %s", err.Error())
|
||||
}
|
||||
|
||||
var bkBuf bytes.Buffer
|
||||
if err := e.Backup(&bkBuf, "", time.Unix(0, 0)); err != nil {
|
||||
t.Fatalf("failed to backup: %s", err.Error())
|
||||
}
|
||||
|
||||
if len(e.FileStore.Files()) != 3 {
|
||||
t.Fatalf("file count wrong: exp: %d, got: %d", 3, len(e.FileStore.Files()))
|
||||
}
|
||||
|
||||
fileNames := map[string]bool{}
|
||||
for _, f := range e.FileStore.Files() {
|
||||
fileNames[filepath.Base(f.Path())] = true
|
||||
}
|
||||
|
||||
fileData, err := getExportData(&exBuf)
|
||||
if err != nil {
|
||||
t.Errorf("Error extracting data from export: %s", err.Error())
|
||||
}
|
||||
|
||||
// TEST 1: did we get any extra files not found in the store?
|
||||
for k, _ := range fileData {
|
||||
if _, ok := fileNames[k]; !ok {
|
||||
t.Errorf("exported a file not in the store: %s", k)
|
||||
}
|
||||
}
|
||||
|
||||
// TEST 2: did we miss any files that the store had?
|
||||
for k, _ := range fileNames {
|
||||
if _, ok := fileData[k]; !ok {
|
||||
t.Errorf("failed to export a file from the store: %s", k)
|
||||
}
|
||||
}
|
||||
|
||||
// TEST 3: Does 'backup' get the same files + bits?
|
||||
tr := tar.NewReader(&bkBuf)
|
||||
|
||||
th, err := tr.Next()
|
||||
for err == nil {
|
||||
expData, ok := fileData[th.Name]
|
||||
if !ok {
|
||||
t.Errorf("Extra file in backup: %q", th.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if _, err := io.Copy(buf, tr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !equalBuffers(expData, buf) {
|
||||
t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
|
||||
}
|
||||
|
||||
th, err = tr.Next()
|
||||
}
|
||||
|
||||
if t.Failed() {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// TEST 4: Are subsets (1), (2), (3), (1,2), (2,3) accurately found in the larger export?
|
||||
// export the whole DB
|
||||
var ex1 bytes.Buffer
|
||||
if err := e.Export(&ex1, "", time.Unix(0, 0), time.Unix(0, 1000000000)); err != nil {
|
||||
t.Fatalf("failed to export: %s", err.Error())
|
||||
}
|
||||
ex1Data, err := getExportData(&ex1)
|
||||
if err != nil {
|
||||
t.Errorf("Error extracting data from export: %s", err.Error())
|
||||
}
|
||||
|
||||
for k, v := range ex1Data {
|
||||
fullExp, ok := fileData[k]
|
||||
if !ok {
|
||||
t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
if !equalBuffers(fullExp, v) {
|
||||
t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
var ex2 bytes.Buffer
|
||||
if err := e.Export(&ex2, "", time.Unix(0, 1000000001), time.Unix(0, 2000000000)); err != nil {
|
||||
t.Fatalf("failed to export: %s", err.Error())
|
||||
}
|
||||
|
||||
ex2Data, err := getExportData(&ex2)
|
||||
if err != nil {
|
||||
t.Errorf("Error extracting data from export: %s", err.Error())
|
||||
}
|
||||
|
||||
for k, v := range ex2Data {
|
||||
fullExp, ok := fileData[k]
|
||||
if !ok {
|
||||
t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
if !equalBuffers(fullExp, v) {
|
||||
t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
var ex3 bytes.Buffer
|
||||
if err := e.Export(&ex3, "", time.Unix(0, 2000000001), time.Unix(0, 3000000000)); err != nil {
|
||||
t.Fatalf("failed to export: %s", err.Error())
|
||||
}
|
||||
|
||||
ex3Data, err := getExportData(&ex3)
|
||||
if err != nil {
|
||||
t.Errorf("Error extracting data from export: %s", err.Error())
|
||||
}
|
||||
|
||||
for k, v := range ex3Data {
|
||||
fullExp, ok := fileData[k]
|
||||
if !ok {
|
||||
t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
if !equalBuffers(fullExp, v) {
|
||||
t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
var ex12 bytes.Buffer
|
||||
if err := e.Export(&ex12, "", time.Unix(0, 0), time.Unix(0, 2000000000)); err != nil {
|
||||
t.Fatalf("failed to export: %s", err.Error())
|
||||
}
|
||||
|
||||
ex12Data, err := getExportData(&ex12)
|
||||
if err != nil {
|
||||
t.Errorf("Error extracting data from export: %s", err.Error())
|
||||
}
|
||||
|
||||
for k, v := range ex12Data {
|
||||
fullExp, ok := fileData[k]
|
||||
if !ok {
|
||||
t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
if !equalBuffers(fullExp, v) {
|
||||
t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
var ex23 bytes.Buffer
|
||||
if err := e.Export(&ex23, "", time.Unix(0, 1000000001), time.Unix(0, 3000000000)); err != nil {
|
||||
t.Fatalf("failed to export: %s", err.Error())
|
||||
}
|
||||
|
||||
ex23Data, err := getExportData(&ex23)
|
||||
if err != nil {
|
||||
t.Errorf("Error extracting data from export: %s", err.Error())
|
||||
}
|
||||
|
||||
for k, v := range ex23Data {
|
||||
fullExp, ok := fileData[k]
|
||||
if !ok {
|
||||
t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
if !equalBuffers(fullExp, v) {
|
||||
t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func equalBuffers(bufA, bufB *bytes.Buffer) bool {
|
||||
for i, v := range bufA.Bytes() {
|
||||
if v != bufB.Bytes()[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func getExportData(exBuf *bytes.Buffer) (map[string]*bytes.Buffer, error) {
|
||||
|
||||
tr := tar.NewReader(exBuf)
|
||||
|
||||
fileData := make(map[string]*bytes.Buffer)
|
||||
|
||||
// TEST 1: Get the bits for each file. If we got a file the store doesn't know about, report error
|
||||
for {
|
||||
th, err := tr.Next()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if _, err := io.Copy(buf, tr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fileData[th.Name] = buf
|
||||
|
||||
}
|
||||
|
||||
return fileData, nil
|
||||
}
|
||||
|
||||
// Ensure engine can create an ascending iterator for cached values.
|
||||
func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
|
|
|||
|
|
@ -97,6 +97,10 @@ const (
|
|||
|
||||
// max length of a key in an index entry (measurement + tags)
|
||||
maxKeyLength = (1 << (2 * 8)) - 1
|
||||
|
||||
// The threshold amount data written before we periodically fsync a TSM file. This helps avoid
|
||||
// long pauses due to very large fsyncs at the end of writing a TSM file.
|
||||
fsyncEvery = 512 * 1024 * 1024
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -233,7 +237,7 @@ func (e *IndexEntry) String() string {
|
|||
|
||||
// NewIndexWriter returns a new IndexWriter.
|
||||
func NewIndexWriter() IndexWriter {
|
||||
buf := bytes.NewBuffer(make([]byte, 0, 4096))
|
||||
buf := bytes.NewBuffer(make([]byte, 0, 1024*1024))
|
||||
return &directIndex{buf: buf, w: bufio.NewWriter(buf)}
|
||||
}
|
||||
|
||||
|
|
@ -253,6 +257,9 @@ type indexBlock struct {
|
|||
type directIndex struct {
|
||||
keyCount int
|
||||
size uint32
|
||||
|
||||
// The bytes written count of when we last fsync'd
|
||||
lastSync uint32
|
||||
fd *os.File
|
||||
buf *bytes.Buffer
|
||||
|
||||
|
|
@ -377,7 +384,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
|
|||
return 0, err
|
||||
}
|
||||
|
||||
return io.Copy(w, bufio.NewReader(d.fd))
|
||||
return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024))
|
||||
}
|
||||
|
||||
func (d *directIndex) flush(w io.Writer) (int64, error) {
|
||||
|
|
@ -435,6 +442,15 @@ func (d *directIndex) flush(w io.Writer) (int64, error) {
|
|||
d.indexEntries.Type = 0
|
||||
d.indexEntries.entries = d.indexEntries.entries[:0]
|
||||
|
||||
// If this is a disk based index and we've written more than the fsync threshold,
|
||||
// fsync the data to avoid long pauses later on.
|
||||
if d.fd != nil && d.size-d.lastSync > fsyncEvery {
|
||||
if err := d.fd.Sync(); err != nil {
|
||||
return N, err
|
||||
}
|
||||
d.lastSync = d.size
|
||||
}
|
||||
|
||||
return N, nil
|
||||
|
||||
}
|
||||
|
|
@ -486,18 +502,30 @@ type tsmWriter struct {
|
|||
w *bufio.Writer
|
||||
index IndexWriter
|
||||
n int64
|
||||
|
||||
// The bytes written count of when we last fsync'd
|
||||
lastSync int64
|
||||
}
|
||||
|
||||
// NewTSMWriter returns a new TSMWriter writing to w.
|
||||
func NewTSMWriter(w io.Writer) (TSMWriter, error) {
|
||||
index := NewIndexWriter()
|
||||
return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil
|
||||
}
|
||||
|
||||
// NewTSMWriterWithDiskBuffer returns a new TSMWriter writing to w and will use a disk
|
||||
// based buffer for the TSM index if possible.
|
||||
func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) {
|
||||
var index IndexWriter
|
||||
if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") {
|
||||
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
|
||||
// Make sure is a File so we can write the temp index alongside it.
|
||||
if fw, ok := w.(*os.File); ok {
|
||||
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
index = NewDiskIndexWriter(f)
|
||||
} else {
|
||||
// w is not a file, just use an inmem index
|
||||
index = NewIndexWriter()
|
||||
}
|
||||
|
||||
|
|
@ -612,6 +640,14 @@ func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte)
|
|||
// Increment file position pointer (checksum + block len)
|
||||
t.n += int64(n)
|
||||
|
||||
// fsync the file periodically to avoid long pauses with very big files.
|
||||
if t.n-t.lastSync > fsyncEvery {
|
||||
if err := t.sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
t.lastSync = t.n
|
||||
}
|
||||
|
||||
if len(t.index.Entries(key)) >= maxIndexEntries {
|
||||
return ErrMaxBlocksExceeded
|
||||
}
|
||||
|
|
@ -646,6 +682,10 @@ func (t *tsmWriter) Flush() error {
|
|||
return err
|
||||
}
|
||||
|
||||
return t.sync()
|
||||
}
|
||||
|
||||
func (t *tsmWriter) sync() error {
|
||||
if f, ok := t.wrapped.(*os.File); ok {
|
||||
if err := f.Sync(); err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -1011,6 +1011,14 @@ func (s *Shard) Backup(w io.Writer, basePath string, since time.Time) error {
|
|||
return engine.Backup(w, basePath, since)
|
||||
}
|
||||
|
||||
func (s *Shard) Export(w io.Writer, basePath string, start time.Time, end time.Time) error {
|
||||
engine, err := s.engine()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return engine.Export(w, basePath, start, end)
|
||||
}
|
||||
|
||||
// Restore restores data to the underlying engine for the shard.
|
||||
// The shard is reopened after restore.
|
||||
func (s *Shard) Restore(r io.Reader, basePath string) error {
|
||||
|
|
|
|||
|
|
@ -170,6 +170,12 @@ func (s *Store) loadShards() error {
|
|||
lim := s.EngineOptions.Config.MaxConcurrentCompactions
|
||||
if lim == 0 {
|
||||
lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions
|
||||
|
||||
// On systems with more cores, cap at 4 to reduce disk utilization
|
||||
if lim > 4 {
|
||||
lim = 4
|
||||
}
|
||||
|
||||
if lim < 1 {
|
||||
lim = 1
|
||||
}
|
||||
|
|
@ -851,6 +857,20 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error {
|
|||
return shard.Backup(w, path, since)
|
||||
}
|
||||
|
||||
func (s *Store) ExportShard(id uint64, start time.Time, end time.Time, w io.Writer) error {
|
||||
shard := s.Shard(id)
|
||||
if shard == nil {
|
||||
return fmt.Errorf("shard %d doesn't exist on this server", id)
|
||||
}
|
||||
|
||||
path, err := relativePath(s.path, shard.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return shard.Export(w, path, start, end)
|
||||
}
|
||||
|
||||
// RestoreShard restores a backup from r to a given shard.
|
||||
// This will only overwrite files included in the backup.
|
||||
func (s *Store) RestoreShard(id uint64, r io.Reader) error {
|
||||
|
|
|
|||
Loading…
Reference in New Issue