Pulled in backup-relevant code for review (#9193)

for issue #8879
pull/9211/head
Adam 2017-12-07 11:35:20 -05:00 committed by GitHub
parent f250b64721
commit a0b2195d6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 942 additions and 126 deletions

View File

@ -7,6 +7,8 @@
### Features
- [#8495](https://github.com/influxdata/influxdb/pull/8495): Improve CLI connection warnings
- [#9146](https://github.com/influxdata/influxdb/issues/9146): Backup can produce data in the same format as the enterprise backup/restore tool.
- [#8879](https://github.com/influxdata/influxdb/issues/8879): Export functionality using start/end to filter exported data by timestamp
- [#9084](https://github.com/influxdata/influxdb/pull/9084): Handle high cardinality deletes in TSM engine
- [#9162](https://github.com/influxdata/influxdb/pull/9162): Improve inmem index startup performance for high cardinality.
- [#8491](https://github.com/influxdata/influxdb/pull/8491): Add further tsi support for streaming/copying shards.

View File

@ -1,4 +1,4 @@
// Package backup is the backup subcommand for the influxd command.
// Package backup implements both the backup and export subcommands for the influxd command.
package backup
import (
@ -8,28 +8,17 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"compress/gzip"
"github.com/influxdata/influxdb/cmd/influxd/backup_util"
"github.com/influxdata/influxdb/services/snapshotter"
"github.com/influxdata/influxdb/tcp"
)
const (
// Suffix is a suffix added to the backup while it's in-process.
Suffix = ".pending"
// Metafile is the base name given to the metastore backups.
Metafile = "meta"
// BackupFilePattern is the beginning of the pattern for a backup
// file. They follow the scheme <database>.<retention>.<shardID>.<increment>
BackupFilePattern = "%s.%s.%05d"
"io/ioutil"
)
// Command represents the program execution for "influxd backup".
@ -45,6 +34,17 @@ type Command struct {
host string
path string
database string
retentionPolicy string
shardID string
isBackup bool
since time.Time
start time.Time
end time.Time
enterprise bool
manifest backup_util.Manifest
enterpriseFileBase string
}
// NewCommand returns a new instance of Command with default settings.
@ -62,110 +62,240 @@ func (cmd *Command) Run(args ...string) error {
cmd.StderrLogger = log.New(cmd.Stderr, "", log.LstdFlags)
// Parse command line arguments.
retentionPolicy, shardID, since, err := cmd.parseFlags(args)
err := cmd.parseFlags(args)
if err != nil {
return err
}
// based on the arguments passed in we only backup the minimum
if shardID != "" {
if cmd.shardID != "" {
// always backup the metastore
if err := cmd.backupMetastore(); err != nil {
return err
}
err = cmd.backupShard(retentionPolicy, shardID, since)
} else if retentionPolicy != "" {
err = cmd.backupRetentionPolicy(retentionPolicy, since)
err = cmd.backupShard(cmd.database, cmd.retentionPolicy, cmd.shardID)
} else if cmd.retentionPolicy != "" {
// always backup the metastore
if err := cmd.backupMetastore(); err != nil {
return err
}
err = cmd.backupRetentionPolicy()
} else if cmd.database != "" {
err = cmd.backupDatabase(since)
// always backup the metastore
if err := cmd.backupMetastore(); err != nil {
return err
}
err = cmd.backupDatabase()
} else {
err = cmd.backupMetastore()
// always backup the metastore
if err := cmd.backupMetastore(); err != nil {
return err
}
cmd.StdoutLogger.Println("No database, retention policy or shard ID given. Full meta store backed up.")
if cmd.enterprise {
cmd.StdoutLogger.Println("Backing up all databases in enterprise format")
if err := cmd.backupDatabase(); err != nil {
cmd.StderrLogger.Printf("backup failed: %v", err)
return err
}
}
}
if cmd.enterprise {
cmd.manifest.Platform = "OSS"
filename := cmd.enterpriseFileBase + ".manifest"
if err := cmd.manifest.Save(filepath.Join(cmd.path, filename)); err != nil {
cmd.StderrLogger.Printf("manifest save failed: %v", err)
return err
}
}
if err != nil {
cmd.StderrLogger.Printf("backup failed: %v", err)
return err
}
cmd.StdoutLogger.Println("backup complete")
return nil
}
// parseFlags parses and validates the command line arguments into a request object.
func (cmd *Command) parseFlags(args []string) (retentionPolicy, shardID string, since time.Time, err error) {
func (cmd *Command) parseFlags(args []string) (err error) {
fs := flag.NewFlagSet("", flag.ContinueOnError)
fs.StringVar(&cmd.host, "host", "localhost:8088", "")
fs.StringVar(&cmd.database, "database", "", "")
fs.StringVar(&retentionPolicy, "retention", "", "")
fs.StringVar(&shardID, "shard", "", "")
fs.StringVar(&cmd.retentionPolicy, "retention", "", "")
fs.StringVar(&cmd.shardID, "shard", "", "")
var sinceArg string
var startArg string
var endArg string
fs.StringVar(&sinceArg, "since", "", "")
fs.StringVar(&startArg, "start", "", "")
fs.StringVar(&endArg, "end", "", "")
fs.BoolVar(&cmd.enterprise, "enterprise", false, "")
fs.SetOutput(cmd.Stderr)
fs.Usage = cmd.printUsage
err = fs.Parse(args)
if err != nil {
return
return err
}
// for enterprise saving, if needed
cmd.enterpriseFileBase = time.Now().UTC().Format(backup_util.EnterpriseFileNamePattern)
// if startArg and endArg are unspecified, then assume we are doing a full backup of the DB
cmd.isBackup = startArg == "" && endArg == ""
if sinceArg != "" {
since, err = time.Parse(time.RFC3339, sinceArg)
cmd.since, err = time.Parse(time.RFC3339, sinceArg)
if err != nil {
return
return err
}
}
if startArg != "" {
if cmd.isBackup {
return errors.New("backup command uses one of -since or -start/-end")
}
cmd.start, err = time.Parse(time.RFC3339, startArg)
if err != nil {
return err
}
}
if endArg != "" {
if cmd.isBackup {
return errors.New("backup command uses one of -since or -start/-end")
}
cmd.end, err = time.Parse(time.RFC3339, endArg)
if err != nil {
return err
}
// start should be < end
if !cmd.start.Before(cmd.end) {
return errors.New("start date must be before end date")
}
}
// Ensure that only one arg is specified.
if fs.NArg() == 0 {
return "", "", time.Unix(0, 0), errors.New("backup destination path required")
} else if fs.NArg() != 1 {
return "", "", time.Unix(0, 0), errors.New("only one backup path allowed")
if fs.NArg() != 1 {
return errors.New("Exactly one backup path is required.")
}
cmd.path = fs.Arg(0)
err = os.MkdirAll(cmd.path, 0700)
return
return err
}
// backupShard will write a tar archive of the passed in shard with any TSM files that have been
// created since the time passed in
func (cmd *Command) backupShard(retentionPolicy string, shardID string, since time.Time) error {
id, err := strconv.ParseUint(shardID, 10, 64)
func (cmd *Command) backupShard(db, rp, sid string) error {
reqType := snapshotter.RequestShardBackup
if !cmd.isBackup {
reqType = snapshotter.RequestShardExport
}
id, err := strconv.ParseUint(sid, 10, 64)
if err != nil {
return err
}
shardArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, fmt.Sprintf(BackupFilePattern, cmd.database, retentionPolicy, id)))
shardArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, fmt.Sprintf(backup_util.BackupFilePattern, db, rp, id)))
if err != nil {
return err
}
cmd.StdoutLogger.Printf("backing up db=%v rp=%v shard=%v to %s since %s",
cmd.database, retentionPolicy, shardID, shardArchivePath, since)
db, rp, sid, shardArchivePath, cmd.since)
req := &snapshotter.Request{
Type: snapshotter.RequestShardBackup,
Database: cmd.database,
RetentionPolicy: retentionPolicy,
Type: reqType,
BackupDatabase: db,
BackupRetentionPolicy: rp,
ShardID: id,
Since: since,
Since: cmd.since,
ExportStart: cmd.start,
ExportEnd: cmd.end,
}
// TODO: verify shard backup data
return cmd.downloadAndVerify(req, shardArchivePath, nil)
err = cmd.downloadAndVerify(req, shardArchivePath, nil)
if err != nil {
return err
}
// backupDatabase will request the database information from the server and then backup the metastore and
// every shard in every retention policy in the database. Each shard will be written to a separate tar.
func (cmd *Command) backupDatabase(since time.Time) error {
cmd.StdoutLogger.Printf("backing up db=%s since %s", cmd.database, since)
if cmd.enterprise {
f, err := os.Open(shardArchivePath)
defer f.Close()
defer os.Remove(shardArchivePath)
filePrefix := cmd.enterpriseFileBase + ".s" + sid
filename := filePrefix + ".tar.gz"
out, err := os.OpenFile(filepath.Join(cmd.path, filename), os.O_CREATE|os.O_RDWR, 0600)
zw := gzip.NewWriter(out)
zw.Name = filePrefix + ".tar"
cw := backup_util.CountingWriter{Writer: zw}
_, err = io.Copy(&cw, f)
if err != nil {
if err := zw.Close(); err != nil {
return err
}
if err := out.Close(); err != nil {
return err
}
return err
}
shardid, err := strconv.ParseUint(sid, 10, 64)
if err != nil {
if err := zw.Close(); err != nil {
return err
}
if err := out.Close(); err != nil {
return err
}
return err
}
cmd.manifest.Files = append(cmd.manifest.Files, backup_util.Entry{
Database: db,
Policy: rp,
ShardID: shardid,
FileName: filename,
Size: cw.Total,
LastModified: 0,
})
if err := zw.Close(); err != nil {
return err
}
if err := out.Close(); err != nil {
return err
}
}
return nil
}
// backupDatabase will request the database information from the server and then backup
// every shard in every retention policy in the database. Each shard will be written to a separate file.
func (cmd *Command) backupDatabase() error {
cmd.StdoutLogger.Printf("backing up db=%s", cmd.database)
req := &snapshotter.Request{
Type: snapshotter.RequestDatabaseInfo,
Database: cmd.database,
BackupDatabase: cmd.database,
}
response, err := cmd.requestInfo(req)
@ -173,18 +303,18 @@ func (cmd *Command) backupDatabase(since time.Time) error {
return err
}
return cmd.backupResponsePaths(response, since)
return cmd.backupResponsePaths(response)
}
// backupRetentionPolicy will request the retention policy information from the server and then backup
// the metastore and every shard in the retention policy. Each shard will be written to a separate tar.
func (cmd *Command) backupRetentionPolicy(retentionPolicy string, since time.Time) error {
cmd.StdoutLogger.Printf("backing up rp=%s since %s", retentionPolicy, since)
// every shard in the retention policy. Each shard will be written to a separate file.
func (cmd *Command) backupRetentionPolicy() error {
cmd.StdoutLogger.Printf("backing up rp=%s since %s", cmd.retentionPolicy, cmd.since)
req := &snapshotter.Request{
Type: snapshotter.RequestRetentionPolicyInfo,
Database: cmd.database,
RetentionPolicy: retentionPolicy,
BackupDatabase: cmd.database,
BackupRetentionPolicy: cmd.retentionPolicy,
}
response, err := cmd.requestInfo(req)
@ -192,23 +322,22 @@ func (cmd *Command) backupRetentionPolicy(retentionPolicy string, since time.Tim
return err
}
return cmd.backupResponsePaths(response, since)
return cmd.backupResponsePaths(response)
}
// backupResponsePaths will backup the metastore and all shard paths in the response struct
func (cmd *Command) backupResponsePaths(response *snapshotter.Response, since time.Time) error {
if err := cmd.backupMetastore(); err != nil {
return err
}
// backupResponsePaths will backup all shards identified by shard paths in the response struct
func (cmd *Command) backupResponsePaths(response *snapshotter.Response) error {
// loop through the returned paths and back up each shard
for _, path := range response.Paths {
rp, id, err := retentionAndShardFromPath(path)
db, rp, id, err := backup_util.DBRetentionAndShardFromPath(path)
if err != nil {
return err
}
if err := cmd.backupShard(rp, id, since); err != nil {
err = cmd.backupShard(db, rp, id)
if err != nil {
return err
}
}
@ -216,10 +345,10 @@ func (cmd *Command) backupResponsePaths(response *snapshotter.Response, since ti
return nil
}
// backupMetastore will backup the metastore on the host to the passed in path. Database and retention policy backups
// will force a backup of the metastore as well as requesting a specific shard backup from the command line
// backupMetastore will backup the whole metastore on the host to the backup path
// if useDB is non-empty, it will backup metadata only for the named database.
func (cmd *Command) backupMetastore() error {
metastoreArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, Metafile))
metastoreArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, backup_util.Metafile))
if err != nil {
return err
}
@ -230,13 +359,24 @@ func (cmd *Command) backupMetastore() error {
Type: snapshotter.RequestMetastoreBackup,
}
return cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error {
binData, err := ioutil.ReadFile(file)
err = cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error {
f, err := os.Open(file)
if err != nil {
return err
}
defer f.Close()
var magicByte [8]byte
n, err := io.ReadFull(f, magicByte[:])
if err != nil {
return err
}
magic := binary.BigEndian.Uint64(binData[:8])
if n < 8 {
return errors.New("Not enough bytes data to verify")
}
magic := binary.BigEndian.Uint64(magicByte[:])
if magic != snapshotter.BackupMagicHeader {
cmd.StderrLogger.Println("Invalid metadata blob, ensure the metadata service is running (default port 8088)")
return errors.New("invalid metadata received")
@ -244,6 +384,28 @@ func (cmd *Command) backupMetastore() error {
return nil
})
if err != nil {
return err
}
if cmd.enterprise {
metaBytes, err := backup_util.GetMetaBytes(metastoreArchivePath)
defer os.Remove(metastoreArchivePath)
if err != nil {
return err
}
filename := cmd.enterpriseFileBase + ".meta"
if err := ioutil.WriteFile(filepath.Join(cmd.path, filename), metaBytes, 0644); err != nil {
fmt.Fprintln(cmd.Stdout, "Error.")
return err
}
cmd.manifest.Meta.FileName = filename
cmd.manifest.Meta.Size = int64(len(metaBytes))
}
return nil
}
// nextPath returns the next file to write to.
@ -262,7 +424,7 @@ func (cmd *Command) nextPath(path string) (string, error) {
// downloadAndVerify will download either the metastore or shard to a temp file and then
// rename it to a good backup file name after complete
func (cmd *Command) downloadAndVerify(req *snapshotter.Request, path string, validator func(string) error) error {
tmppath := path + Suffix
tmppath := path + backup_util.Suffix
if err := cmd.download(req, tmppath); err != nil {
return err
}
@ -312,6 +474,11 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error {
}
defer conn.Close()
_, err = conn.Write([]byte{byte(req.Type)})
if err != nil {
return err
}
// Write the request
if err := json.NewEncoder(conn).Encode(req); err != nil {
return fmt.Errorf("encode snapshot request: %s", err)
@ -336,11 +503,16 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error {
// requestInfo will request the database or retention policy information from the host
func (cmd *Command) requestInfo(request *snapshotter.Request) (*snapshotter.Response, error) {
// Connect to snapshotter service.
var r snapshotter.Response
conn, err := tcp.Dial("tcp", cmd.host, snapshotter.MuxHeader)
if err != nil {
return nil, err
}
defer conn.Close()
_, err = conn.Write([]byte{byte(request.Type)})
if err != nil {
return &r, err
}
// Write the request
if err := json.NewEncoder(conn).Encode(request); err != nil {
@ -348,7 +520,7 @@ func (cmd *Command) requestInfo(request *snapshotter.Request) (*snapshotter.Resp
}
// Read the response
var r snapshotter.Response
if err := json.NewDecoder(conn).Decode(&r); err != nil {
return nil, err
}
@ -358,7 +530,7 @@ func (cmd *Command) requestInfo(request *snapshotter.Request) (*snapshotter.Resp
// printUsage prints the usage message to STDERR.
func (cmd *Command) printUsage() {
fmt.Fprintf(cmd.Stdout, `Downloads a snapshot of a data node and saves it to disk.
fmt.Fprintf(cmd.Stdout, `Downloads a file level age-based snapshot of a data node and saves it to disk.
Usage: influxd backup [flags] PATH
@ -372,18 +544,14 @@ Usage: influxd backup [flags] PATH
Optional. The shard id to backup. If specified, retention is required.
-since <2015-12-24T08:12:23Z>
Optional. Do an incremental backup since the passed in RFC3339
formatted time.
formatted time. Not compatible with -start or -end.
-start <2015-12-24T08:12:23Z>
All points earlier than this time stamp will be excluded from the export. Not compatible with -since.
-end <2015-12-24T08:12:23Z>
All points later than this time stamp will be excluded from the export. Not compatible with -since.
-enterprise
Generate backup files in the format used for influxdb enterprise.
`)
}
// retentionAndShardFromPath will take the shard relative path and split it into the
// retention policy name and shard ID. The first part of the path should be the database name.
func retentionAndShardFromPath(path string) (retention, shard string, err error) {
a := strings.Split(path, string(filepath.Separator))
if len(a) != 3 {
return "", "", fmt.Errorf("expected database, retention policy, and shard id in path: %s", path)
}
return a[1], a[2], nil
}

View File

@ -0,0 +1,146 @@
package backup_util
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"os"
"strings"
"encoding/json"
"github.com/influxdata/influxdb/services/snapshotter"
"io/ioutil"
"path/filepath"
)
const (
// Suffix is a suffix added to the backup while it's in-process.
Suffix = ".pending"
// Metafile is the base name given to the metastore backups.
Metafile = "meta"
// BackupFilePattern is the beginning of the pattern for a backup
// file. They follow the scheme <database>.<retention>.<shardID>.<increment>
BackupFilePattern = "%s.%s.%05d"
EnterpriseFileNamePattern = "20060102T150405Z"
OSSManifest = "OSS"
ENTManifest = "ENT"
)
func GetMetaBytes(fname string) ([]byte, error) {
f, err := os.Open(fname)
if err != nil {
return []byte{}, err
}
var buf bytes.Buffer
if _, err := io.Copy(&buf, f); err != nil {
return []byte{}, fmt.Errorf("copy: %s", err)
}
b := buf.Bytes()
var i int
// Make sure the file is actually a meta store backup file
magic := binary.BigEndian.Uint64(b[:8])
if magic != snapshotter.BackupMagicHeader {
return []byte{}, fmt.Errorf("invalid metadata file")
}
i += 8
// Size of the meta store bytes
length := int(binary.BigEndian.Uint64(b[i : i+8]))
i += 8
metaBytes := b[i : i+length]
return metaBytes, nil
}
// Manifest lists the meta and shard file information contained in the backup.
// If Limited is false, the manifest contains a full backup, otherwise
// it is a partial backup.
type Manifest struct {
Platform string `json:"platform"`
Meta MetaEntry `json:"meta"`
Limited bool `json:"limited"`
Files []Entry `json:"files"`
// If limited is true, then one (or all) of the following fields will be set
Database string `json:"database,omitempty"`
Policy string `json:"policy,omitempty"`
ShardID uint64 `json:"shard_id,omitempty"`
}
// Entry contains the data information for a backed up shard.
type Entry struct {
Database string `json:"database"`
Policy string `json:"policy"`
ShardID uint64 `json:"shardID"`
FileName string `json:"fileName"`
Size int64 `json:"size"`
LastModified int64 `json:"lastModified"`
}
func (e *Entry) SizeOrZero() int64 {
if e == nil {
return 0
}
return e.Size
}
// MetaEntry contains the meta store information for a backup.
type MetaEntry struct {
FileName string `json:"fileName"`
Size int64 `json:"size"`
}
// Size returns the size of the manifest.
func (m *Manifest) Size() int64 {
if m == nil {
return 0
}
size := m.Meta.Size
for _, f := range m.Files {
size += f.Size
}
return size
}
func (manifest *Manifest) Save(filename string) error {
b, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return fmt.Errorf("create manifest: %v", err)
}
return ioutil.WriteFile(filename, b, 0600)
}
type CountingWriter struct {
io.Writer
Total int64 // Total # of bytes transferred
}
func (w *CountingWriter) Write(p []byte) (n int, err error) {
n, err = w.Writer.Write(p)
w.Total += int64(n)
return
}
// retentionAndShardFromPath will take the shard relative path and split it into the
// retention policy name and shard ID. The first part of the path should be the database name.
func DBRetentionAndShardFromPath(path string) (db, retention, shard string, err error) {
a := strings.Split(path, string(filepath.Separator))
if len(a) != 3 {
return "", "", "", fmt.Errorf("expected database, retention policy, and shard id in path: %s", path)
}
return a[0], a[1], a[2], nil
}

View File

@ -14,7 +14,7 @@ import (
"path/filepath"
"strconv"
"github.com/influxdata/influxdb/cmd/influxd/backup"
"github.com/influxdata/influxdb/cmd/influxd/backup_util"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/services/snapshotter"
)
@ -116,7 +116,7 @@ func (cmd *Command) parseFlags(args []string) error {
// cluster and replaces the root metadata.
func (cmd *Command) unpackMeta() error {
// find the meta file
metaFiles, err := filepath.Glob(filepath.Join(cmd.backupFilesPath, backup.Metafile+".*"))
metaFiles, err := filepath.Glob(filepath.Join(cmd.backupFilesPath, backup_util.Metafile+".*"))
if err != nil {
return err
}
@ -227,7 +227,7 @@ func (cmd *Command) unpackShard(shardID string) error {
}
// find the shard backup files
pat := filepath.Join(cmd.backupFilesPath, fmt.Sprintf(backup.BackupFilePattern, cmd.database, cmd.retention, id))
pat := filepath.Join(cmd.backupFilesPath, fmt.Sprintf(backup_util.BackupFilePattern, cmd.database, cmd.retention, id))
return cmd.unpackFiles(pat + ".*")
}

View File

@ -14,6 +14,7 @@ import (
// TSDBStoreMock is a mockable implementation of tsdb.Store.
type TSDBStoreMock struct {
BackupShardFn func(id uint64, since time.Time, w io.Writer) error
ExportShardFn func(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error
CloseFn func() error
CreateShardFn func(database, policy string, shardID uint64, enabled bool) error
CreateShardSnapshotFn func(id uint64) (string, error)
@ -50,6 +51,9 @@ type TSDBStoreMock struct {
func (s *TSDBStoreMock) BackupShard(id uint64, since time.Time, w io.Writer) error {
return s.BackupShardFn(id, since, w)
}
func (s *TSDBStoreMock) ExportShard(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error {
return s.ExportShardFn(id, ExportStart, ExportEnd, w)
}
func (s *TSDBStoreMock) Close() error { return s.CloseFn() }
func (s *TSDBStoreMock) CreateShard(database string, retentionPolicy string, shardID uint64, enabled bool) error {
return s.CreateShardFn(database, retentionPolicy, shardID, enabled)

View File

@ -5,6 +5,7 @@ NAME
----
influxd-backup - Downloads a snapshot of a data node and saves it to disk
SYNOPSIS
--------
'influxd backup' [options]
@ -30,6 +31,13 @@ OPTIONS
-since <2015-12-24T08:12:13Z>::
Do an incremental backup since the passed in time. The time needs to be in the RFC3339 format. Optional.
-start <2015-12-24T08:12:23Z>::
All points earlier than this time stamp will be excluded from the export. Not compatible with -since.
-end <2015-12-24T08:12:23Z>::
All points later than this time stamp will be excluded from the export. Not compatible with -since.
-enterprise::
Generate backup files in the format used for influxdb enterprise.
SEE ALSO
--------
*influxd-restore*(1)

View File

@ -64,6 +64,10 @@ func (c *Client) doRequest(req *Request) ([]byte, error) {
defer conn.Close()
// Write the request
_, err = conn.Write([]byte{byte(req.Type)})
if err != nil {
return nil, err
}
if err := json.NewEncoder(conn).Encode(req); err != nil {
return nil, fmt.Errorf("encode snapshot request: %s", err)
}

View File

@ -58,6 +58,12 @@ func TestClient_MetastoreBackup_InvalidMetadata(t *testing.T) {
return
}
var typ [1]byte
if _, err := conn.Read(typ[:]); err != nil {
t.Errorf("unable to read typ header: %s", err)
return
}
var m map[string]interface{}
dec := json.NewDecoder(conn)
if err := dec.Decode(&m); err != nil {

View File

@ -41,6 +41,7 @@ type Service struct {
TSDBStore interface {
BackupShard(id uint64, since time.Time, w io.Writer) error
ExportShard(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error
Shard(id uint64) *tsdb.Shard
ShardRelativePath(id uint64) (string, error)
}
@ -68,7 +69,9 @@ func (s *Service) Open() error {
// Close implements the Service interface.
func (s *Service) Close() error {
if s.Listener != nil {
s.Listener.Close()
if err := s.Listener.Close(); err != nil {
return err
}
}
s.wg.Wait()
return nil
@ -86,6 +89,7 @@ func (s *Service) serve() {
for {
// Wait for next connection.
conn, err := s.Listener.Accept()
if err != nil && strings.Contains(err.Error(), "connection closed") {
s.Logger.Info("snapshot listener closed")
return
@ -108,24 +112,35 @@ func (s *Service) serve() {
// handleConn processes conn. This is run in a separate goroutine.
func (s *Service) handleConn(conn net.Conn) error {
var typ [1]byte
_, err := conn.Read(typ[:])
if err != nil {
return err
}
r, err := s.readRequest(conn)
if err != nil {
return fmt.Errorf("read request: %s", err)
}
switch r.Type {
switch RequestType(typ[0]) {
case RequestShardBackup:
if err := s.TSDBStore.BackupShard(r.ShardID, r.Since, conn); err != nil {
return err
}
case RequestShardExport:
if err := s.TSDBStore.ExportShard(r.ShardID, r.ExportStart, r.ExportEnd, conn); err != nil {
return err
}
case RequestMetastoreBackup:
if err := s.writeMetaStore(conn); err != nil {
return err
}
case RequestDatabaseInfo:
return s.writeDatabaseInfo(conn, r.Database)
return s.writeDatabaseInfo(conn, r.BackupDatabase)
case RequestRetentionPolicyInfo:
return s.writeRetentionPolicyInfo(conn, r.Database, r.RetentionPolicy)
return s.writeRetentionPolicyInfo(conn, r.BackupDatabase, r.BackupRetentionPolicy)
default:
return fmt.Errorf("request type unknown: %v", r.Type)
}
@ -136,6 +151,7 @@ func (s *Service) handleConn(conn net.Conn) error {
func (s *Service) writeMetaStore(conn net.Conn) error {
// Retrieve and serialize the current meta data.
metaBlob, err := s.MetaClient.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal meta: %s", err)
}
@ -174,11 +190,19 @@ func (s *Service) writeMetaStore(conn net.Conn) error {
// this server into the connection.
func (s *Service) writeDatabaseInfo(conn net.Conn, database string) error {
res := Response{}
dbs := []meta.DatabaseInfo{}
if database != "" {
db := s.MetaClient.Database(database)
if db == nil {
return influxdb.ErrDatabaseNotFound(database)
}
dbs = append(dbs, *db)
} else {
// we'll allow collecting info on all databases
dbs = s.MetaClient.(*meta.Client).Databases()
}
for _, db := range dbs {
for _, rp := range db.RetentionPolicies {
for _, sg := range rp.ShardGroups {
for _, sh := range sg.Shards {
@ -196,7 +220,7 @@ func (s *Service) writeDatabaseInfo(conn net.Conn, database string) error {
}
}
}
}
if err := json.NewEncoder(conn).Encode(res); err != nil {
return fmt.Errorf("encode response: %s", err.Error())
}
@ -273,16 +297,33 @@ const (
// RequestRetentionPolicyInfo represents a request for retention policy info.
RequestRetentionPolicyInfo
// RequestShardExport represents a request to export Shard data. Similar to a backup, but shards
// may be filtered based on the start/end times on each block.
RequestShardExport
// RequestMetaStoreUpdate represents a request to upload a metafile that will be used to do a live update
// to the existing metastore.
RequestMetaStoreUpdate
// RequestShardUpdate will initiate the upload of a shard data tar file
// and have the engine import the data.
RequestShardUpdate
)
// Request represents a request for a specific backup or for information
// about the shards on this server for a database or retention policy.
type Request struct {
Type RequestType
Database string
RetentionPolicy string
BackupDatabase string
RestoreDatabase string
BackupRetentionPolicy string
RestoreRetentionPolicy string
ShardID uint64
Since time.Time
ExportStart time.Time
ExportEnd time.Time
UploadSize int64
}
// Response contains the relative paths for all the shards on this server

View File

@ -132,6 +132,10 @@ func TestSnapshotter_RequestShardBackup(t *testing.T) {
Since: time.Unix(0, 0),
}
conn.Write([]byte{snapshotter.MuxHeader})
_, err = conn.Write([]byte{byte(req.Type)})
if err != nil {
t.Errorf("could not encode request type to conn: %v", err)
}
enc := json.NewEncoder(conn)
if err := enc.Encode(&req); err != nil {
t.Errorf("unable to encode request: %s", err)
@ -223,9 +227,13 @@ func TestSnapshotter_RequestDatabaseInfo(t *testing.T) {
req := snapshotter.Request{
Type: snapshotter.RequestDatabaseInfo,
Database: "db0",
BackupDatabase: "db0",
}
conn.Write([]byte{snapshotter.MuxHeader})
_, err = conn.Write([]byte{byte(req.Type)})
if err != nil {
t.Errorf("could not encode request type to conn: %v", err)
}
enc := json.NewEncoder(conn)
if err := enc.Encode(&req); err != nil {
t.Errorf("unable to encode request: %s", err)
@ -273,9 +281,13 @@ func TestSnapshotter_RequestDatabaseInfo_ErrDatabaseNotFound(t *testing.T) {
req := snapshotter.Request{
Type: snapshotter.RequestDatabaseInfo,
Database: "doesnotexist",
BackupDatabase: "doesnotexist",
}
conn.Write([]byte{snapshotter.MuxHeader})
_, err = conn.Write([]byte{byte(req.Type)})
if err != nil {
t.Errorf("could not encode request type to conn: %v", err)
}
enc := json.NewEncoder(conn)
if err := enc.Encode(&req); err != nil {
t.Errorf("unable to encode request: %s", err)
@ -333,10 +345,14 @@ func TestSnapshotter_RequestRetentionPolicyInfo(t *testing.T) {
req := snapshotter.Request{
Type: snapshotter.RequestRetentionPolicyInfo,
Database: "db0",
RetentionPolicy: "rp0",
BackupDatabase: "db0",
BackupRetentionPolicy: "rp0",
}
conn.Write([]byte{snapshotter.MuxHeader})
_, err = conn.Write([]byte{byte(req.Type)})
if err != nil {
t.Errorf("could not encode request type to conn: %v", err)
}
enc := json.NewEncoder(conn)
if err := enc.Encode(&req); err != nil {
t.Errorf("unable to encode request: %s", err)

View File

@ -42,6 +42,7 @@ type Engine interface {
CreateSnapshot() (string, error)
Backup(w io.Writer, basePath string, since time.Time) error
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
Restore(r io.Reader, basePath string) error
Import(r io.Reader, basePath string) error

View File

@ -776,6 +776,142 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
return nil
}
func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.Time) error {
path, err := e.CreateSnapshot()
if err != nil {
return err
}
// Remove the temporary snapshot dir
defer os.RemoveAll(path)
if err := e.index.SnapshotTo(path); err != nil {
return err
}
tw := tar.NewWriter(w)
defer tw.Close()
// Recursively read all files from path.
files, err := readDir(path, "")
if err != nil {
return err
}
for _, file := range files {
if !strings.HasSuffix(file, ".tsm") {
if err := e.writeFileToBackup(file, basePath, filepath.Join(path, file), tw); err != nil {
return err
}
}
var tombstonePath string
f, err := os.Open(filepath.Join(path, file))
if err != nil {
return err
}
r, err := NewTSMReader(f)
if err != nil {
return err
}
// Grab the tombstone file if one exists.
if r.HasTombstones() {
tombstonePath = filepath.Base(r.TombstoneFiles()[0].Path)
}
min, max := r.TimeRange()
stun := start.UnixNano()
eun := end.UnixNano()
// We overlap time ranges, we need to filter the file
if min >= stun && min <= eun && max > eun || // overlap to the right
max >= stun && max <= eun && min < stun || // overlap to the left
min <= stun && max >= eun { // TSM file has a range LARGER than the boundary
err := e.filterFileToBackup(r, file, basePath, filepath.Join(path, file), start.UnixNano(), end.UnixNano(), tw)
if err != nil {
if err := r.Close(); err != nil {
return err
}
return err
}
}
// above is the only case where we need to keep the reader open.
if err := r.Close(); err != nil {
return err
}
// the TSM file is 100% inside the range, so we can just write it without scanning each block
if min >= start.UnixNano() && max <= end.UnixNano() {
if err := e.writeFileToBackup(file, basePath, filepath.Join(path, file), tw); err != nil {
return err
}
}
// if this TSM file had a tombstone we'll write out the whole thing too.
if tombstonePath != "" {
if err := e.writeFileToBackup(tombstonePath, basePath, filepath.Join(path, tombstonePath), tw); err != nil {
return err
}
}
}
return nil
}
func (e *Engine) filterFileToBackup(r *TSMReader, name, shardRelativePath, fullPath string, start, end int64, tw *tar.Writer) error {
path := fullPath + ".tmp"
out, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
defer os.Remove(path)
w, err := NewTSMWriter(out)
if err != nil {
return err
}
defer w.Close()
// implicit else: here we iterate over the blocks and only keep the ones we really want.
var bi *BlockIterator
bi = r.BlockIterator()
for bi.Next() {
// not concerned with typ or checksum since we are just blindly writing back, with no decoding
key, minTime, maxTime, _, _, buf, err := bi.Read()
if err != nil {
return err
}
if minTime >= start && minTime <= end ||
maxTime >= start && maxTime <= end ||
minTime <= start && maxTime >= end {
err := w.WriteBlock(key, minTime, maxTime, buf)
if err != nil {
return err
}
}
}
if err := bi.Err(); err != nil {
return err
}
err = w.WriteIndex()
if err != nil {
return err
}
// make sure the whole file is out to disk
if err := w.Flush(); err != nil {
return err
}
return e.writeFileToBackup(name, shardRelativePath, path, tw)
}
// writeFileToBackup copies the file into the tar archive. Files will use the shardRelativePath
// in their names. This should be the <db>/<retention policy>/<id> part of the path.
func (e *Engine) writeFileToBackup(name string, shardRelativePath, fullPath string, tw *tar.Writer) error {

View File

@ -253,6 +253,268 @@ func TestEngine_Backup(t *testing.T) {
}
}
func TestEngine_Export(t *testing.T) {
// Generate temporary file.
f, _ := ioutil.TempFile("", "tsm")
f.Close()
os.Remove(f.Name())
walPath := filepath.Join(f.Name(), "wal")
os.MkdirAll(walPath, 0777)
defer os.RemoveAll(f.Name())
// Create a few points.
p1 := MustParsePointString("cpu,host=A value=1.1 1000000000")
p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
p3 := MustParsePointString("cpu,host=C value=1.3 3000000000")
// Write those points to the engine.
db := path.Base(f.Name())
opt := tsdb.NewEngineOptions()
opt.InmemIndex = inmem.NewIndex(db)
idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), opt)
defer idx.Close()
e := tsm1.NewEngine(1, idx, db, f.Name(), walPath, opt).(*tsm1.Engine)
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p1}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p2}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p3}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// export the whole DB
var exBuf bytes.Buffer
if err := e.Export(&exBuf, "", time.Unix(0, 0), time.Unix(0, 4000000000)); err != nil {
t.Fatalf("failed to export: %s", err.Error())
}
var bkBuf bytes.Buffer
if err := e.Backup(&bkBuf, "", time.Unix(0, 0)); err != nil {
t.Fatalf("failed to backup: %s", err.Error())
}
if len(e.FileStore.Files()) != 3 {
t.Fatalf("file count wrong: exp: %d, got: %d", 3, len(e.FileStore.Files()))
}
fileNames := map[string]bool{}
for _, f := range e.FileStore.Files() {
fileNames[filepath.Base(f.Path())] = true
}
fileData, err := getExportData(&exBuf)
if err != nil {
t.Errorf("Error extracting data from export: %s", err.Error())
}
// TEST 1: did we get any extra files not found in the store?
for k, _ := range fileData {
if _, ok := fileNames[k]; !ok {
t.Errorf("exported a file not in the store: %s", k)
}
}
// TEST 2: did we miss any files that the store had?
for k, _ := range fileNames {
if _, ok := fileData[k]; !ok {
t.Errorf("failed to export a file from the store: %s", k)
}
}
// TEST 3: Does 'backup' get the same files + bits?
tr := tar.NewReader(&bkBuf)
th, err := tr.Next()
for err == nil {
expData, ok := fileData[th.Name]
if !ok {
t.Errorf("Extra file in backup: %q", th.Name)
continue
}
buf := new(bytes.Buffer)
if _, err := io.Copy(buf, tr); err != nil {
t.Fatal(err)
}
if !equalBuffers(expData, buf) {
t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
}
th, err = tr.Next()
}
if t.Failed() {
t.FailNow()
}
// TEST 4: Are subsets (1), (2), (3), (1,2), (2,3) accurately found in the larger export?
// export the whole DB
var ex1 bytes.Buffer
if err := e.Export(&ex1, "", time.Unix(0, 0), time.Unix(0, 1000000000)); err != nil {
t.Fatalf("failed to export: %s", err.Error())
}
ex1Data, err := getExportData(&ex1)
if err != nil {
t.Errorf("Error extracting data from export: %s", err.Error())
}
for k, v := range ex1Data {
fullExp, ok := fileData[k]
if !ok {
t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
continue
}
if !equalBuffers(fullExp, v) {
t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
}
}
var ex2 bytes.Buffer
if err := e.Export(&ex2, "", time.Unix(0, 1000000001), time.Unix(0, 2000000000)); err != nil {
t.Fatalf("failed to export: %s", err.Error())
}
ex2Data, err := getExportData(&ex2)
if err != nil {
t.Errorf("Error extracting data from export: %s", err.Error())
}
for k, v := range ex2Data {
fullExp, ok := fileData[k]
if !ok {
t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
continue
}
if !equalBuffers(fullExp, v) {
t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
}
}
var ex3 bytes.Buffer
if err := e.Export(&ex3, "", time.Unix(0, 2000000001), time.Unix(0, 3000000000)); err != nil {
t.Fatalf("failed to export: %s", err.Error())
}
ex3Data, err := getExportData(&ex3)
if err != nil {
t.Errorf("Error extracting data from export: %s", err.Error())
}
for k, v := range ex3Data {
fullExp, ok := fileData[k]
if !ok {
t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
continue
}
if !equalBuffers(fullExp, v) {
t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
}
}
var ex12 bytes.Buffer
if err := e.Export(&ex12, "", time.Unix(0, 0), time.Unix(0, 2000000000)); err != nil {
t.Fatalf("failed to export: %s", err.Error())
}
ex12Data, err := getExportData(&ex12)
if err != nil {
t.Errorf("Error extracting data from export: %s", err.Error())
}
for k, v := range ex12Data {
fullExp, ok := fileData[k]
if !ok {
t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
continue
}
if !equalBuffers(fullExp, v) {
t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
}
}
var ex23 bytes.Buffer
if err := e.Export(&ex23, "", time.Unix(0, 1000000001), time.Unix(0, 3000000000)); err != nil {
t.Fatalf("failed to export: %s", err.Error())
}
ex23Data, err := getExportData(&ex23)
if err != nil {
t.Errorf("Error extracting data from export: %s", err.Error())
}
for k, v := range ex23Data {
fullExp, ok := fileData[k]
if !ok {
t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error())
continue
}
if !equalBuffers(fullExp, v) {
t.Errorf("2Difference in data between backup and Export for file %s", th.Name)
}
}
}
func equalBuffers(bufA, bufB *bytes.Buffer) bool {
for i, v := range bufA.Bytes() {
if v != bufB.Bytes()[i] {
return false
}
}
return true
}
func getExportData(exBuf *bytes.Buffer) (map[string]*bytes.Buffer, error) {
tr := tar.NewReader(exBuf)
fileData := make(map[string]*bytes.Buffer)
// TEST 1: Get the bits for each file. If we got a file the store doesn't know about, report error
for {
th, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
buf := new(bytes.Buffer)
if _, err := io.Copy(buf, tr); err != nil {
return nil, err
}
fileData[th.Name] = buf
}
return fileData, nil
}
// Ensure engine can create an ascending iterator for cached values.
func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
t.Parallel()

View File

@ -1084,6 +1084,14 @@ func (s *Shard) Backup(w io.Writer, basePath string, since time.Time) error {
return engine.Backup(w, basePath, since)
}
func (s *Shard) Export(w io.Writer, basePath string, start time.Time, end time.Time) error {
engine, err := s.engine()
if err != nil {
return err
}
return engine.Export(w, basePath, start, end)
}
// Restore restores data to the underlying engine for the shard.
// The shard is reopened after restore.
func (s *Shard) Restore(r io.Reader, basePath string) error {

View File

@ -796,6 +796,20 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error {
return shard.Backup(w, path, since)
}
func (s *Store) ExportShard(id uint64, start time.Time, end time.Time, w io.Writer) error {
shard := s.Shard(id)
if shard == nil {
return fmt.Errorf("shard %d doesn't exist on this server", id)
}
path, err := relativePath(s.path, shard.path)
if err != nil {
return err
}
return shard.Export(w, path, start, end)
}
// RestoreShard restores a backup from r to a given shard.
// This will only overwrite files included in the backup.
func (s *Store) RestoreShard(id uint64, r io.Reader) error {