feat(storage): implement backup and restore (#16504)

* feat(backup): `influx backup` creates data backup

* feat(backup): initial restore work

* feat(restore): initial restore impl

Adds a restore tool which does offline restore of data and metadata.

* fix(restore): pr cleanup

* fix(restore): fix data dir creation

* fix(restore): pr cleanup

* chore: amend CHANGELOG

* fix: restore to empty dir fails differently

* feat(backup): backup and restore credentials

Saves the credentials file to backups and restores it from backups.

Additionally adds some logging for errors when fetching backup files.

* fix(restore): add missed commit

* fix(restore): pr cleanup

* fix(restore): fix default credentials restore path

* fix(backup): actually copy the credentials file for the backup

* fix: dirs get 0777, files get 0666

* fix: small review feedback

Co-authored-by: tmgordeeva <tanya@influxdata.com>
pull/16614/head
Jacob Marble 2020-01-21 14:22:45 -08:00 committed by GitHub
parent 9d7f6a523d
commit b836ab9c17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 966 additions and 26 deletions

View File

@ -5,6 +5,7 @@
1. [16523](https://github.com/influxdata/influxdb/pull/16523): Change influx packages to be CRD compliant
1. [16547](https://github.com/influxdata/influxdb/pull/16547): Allow trailing newline in credentials file and CLI integration
1. [16545](https://github.com/influxdata/influxdb/pull/16545): Add support for prefixed cursor search to ForwardCursor types
1. [16504](https://github.com/influxdata/influxdb/pull/16504): Add backup and restore
### UI Improvements

View File

@ -11,15 +11,23 @@ import (
// IsAllowed checks to see if an action is authorized by retrieving the authorizer
// off of context and authorizing the action appropriately.
func IsAllowed(ctx context.Context, p influxdb.Permission) error {
return IsAllowedAll(ctx, []influxdb.Permission{p})
}
// IsAllowedAll checks to see if an action is authorized by ALL permissions.
// Also see IsAllowed.
func IsAllowedAll(ctx context.Context, permissions []influxdb.Permission) error {
a, err := influxdbcontext.GetAuthorizer(ctx)
if err != nil {
return err
}
if !a.Allowed(p) {
return &influxdb.Error{
Code: influxdb.EUnauthorized,
Msg: fmt.Sprintf("%s is unauthorized", p),
for _, p := range permissions {
if !a.Allowed(p) {
return &influxdb.Error{
Code: influxdb.EUnauthorized,
Msg: fmt.Sprintf("%s is unauthorized", p),
}
}
}

48
authorizer/backup.go Normal file
View File

@ -0,0 +1,48 @@
package authorizer
import (
"context"
"io"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
)
var _ influxdb.BackupService = (*BackupService)(nil)
// BackupService wraps a influxdb.BackupService and authorizes actions
// against it appropriately.
type BackupService struct {
s influxdb.BackupService
}
// NewBackupService constructs an instance of an authorizing backup service.
func NewBackupService(s influxdb.BackupService) *BackupService {
return &BackupService{
s: s,
}
}
func (b BackupService) CreateBackup(ctx context.Context) (int, []string, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
return 0, nil, err
}
return b.s.CreateBackup(ctx)
}
func (b BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
return err
}
return b.s.FetchBackupFile(ctx, backupID, backupFile, w)
}
func (b BackupService) InternalBackupPath(backupID int) string {
return b.s.InternalBackupPath(backupID)
}

View File

@ -335,6 +335,16 @@ func OperPermissions() []Permission {
return ps
}
// ReadAllPermissions represents permission to read all data and metadata.
// Like OperPermissions, but allows read-only users.
func ReadAllPermissions() []Permission {
ps := make([]Permission, len(AllResourceTypes))
for i, t := range AllResourceTypes {
ps[i] = Permission{Action: ReadAction, Resource: Resource{Type: t}}
}
return ps
}
// OwnerPermissions are the default permissions for those who own a resource.
func OwnerPermissions(orgID ID) []Permission {
ps := []Permission{}

23
backup.go Normal file
View File

@ -0,0 +1,23 @@
package influxdb
import (
"context"
"io"
)
// BackupService represents the data backup functions of InfluxDB.
type BackupService interface {
// CreateBackup creates a local copy (hard links) of the TSM data for all orgs and buckets.
// The return values are used to download each backup file.
CreateBackup(context.Context) (backupID int, backupFiles []string, err error)
// FetchBackupFile downloads one backup file, data or metadata.
FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error
// InternalBackupPath is a utility to determine the on-disk location of a backup fileset.
InternalBackupPath(backupID int) string
}
// KVBackupService represents the meta data backup functions of InfluxDB.
type KVBackupService interface {
// Backup creates a live backup copy of the metadata database.
Backup(ctx context.Context, w io.Writer) error
}

View File

@ -14,6 +14,8 @@ import (
"go.uber.org/zap"
)
const DefaultFilename = "influxd.bolt"
// Client is a client for the boltDB data store.
type Client struct {
Path string

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"
@ -124,6 +125,17 @@ func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error {
})
}
// Backup copies all K:Vs to a writer, in BoltDB format.
func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
return s.db.View(func(tx *bolt.Tx) error {
_, err := tx.WriteTo(w)
return err
})
}
// Tx is a light wrapper around a boltdb transaction. It implements kv.Tx.
type Tx struct {
tx *bolt.Tx

111
cmd/influx/backup.go Normal file
View File

@ -0,0 +1,111 @@
package main
import (
"context"
"fmt"
"os"
"path/filepath"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/bolt"
"github.com/influxdata/influxdb/http"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/multierr"
)
func cmdBackup() *cobra.Command {
cmd := &cobra.Command{
Use: "backup",
Short: "Backup the data in InfluxDB",
Long: fmt.Sprintf(
`Backs up data and meta data for the running InfluxDB instance.
Downloaded files are written to the directory indicated by --path.
The target directory, and any parent directories, are created automatically.
Data file have extension .tsm; meta data is written to %s in the same directory.`,
bolt.DefaultFilename),
RunE: backupF,
}
opts := flagOpts{
{
DestP: &backupFlags.Path,
Flag: "path",
Short: 'p',
EnvVar: "PATH",
Desc: "directory path to write backup files to",
Required: true,
},
}
opts.mustRegister(cmd)
return cmd
}
var backupFlags struct {
Path string
}
func init() {
err := viper.BindEnv("PATH")
if err != nil {
panic(err)
}
if h := viper.GetString("PATH"); h != "" {
backupFlags.Path = h
}
}
func newBackupService() (influxdb.BackupService, error) {
return &http.BackupService{
Addr: flags.host,
Token: flags.token,
}, nil
}
func backupF(cmd *cobra.Command, args []string) error {
ctx := context.Background()
if flags.local {
return fmt.Errorf("local flag not supported for backup command")
}
if backupFlags.Path == "" {
return fmt.Errorf("must specify path")
}
err := os.MkdirAll(backupFlags.Path, 0777)
if err != nil && !os.IsExist(err) {
return err
}
backupService, err := newBackupService()
if err != nil {
return err
}
id, backupFilenames, err := backupService.CreateBackup(ctx)
if err != nil {
return err
}
fmt.Printf("Backup ID %d contains %d files\n", id, len(backupFilenames))
for _, backupFilename := range backupFilenames {
dest := filepath.Join(backupFlags.Path, backupFilename)
w, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return err
}
err = backupService.FetchBackupFile(ctx, id, backupFilename, w)
if err != nil {
return multierr.Append(fmt.Errorf("error fetching file %s: %v", backupFilename, err), w.Close())
}
if err = w.Close(); err != nil {
return err
}
}
fmt.Printf("Backup complete")
return nil
}

View File

@ -118,6 +118,7 @@ func influxCmd(opts ...genericCLIOptFn) *cobra.Command {
cmd.AddCommand(
cmdAuth(),
cmdBackup(),
cmdBucket(runEWrapper),
cmdDelete(),
cmdOrganization(runEWrapper),
@ -199,7 +200,7 @@ func defaultTokenPath() (string, string, error) {
if err != nil {
return "", "", err
}
return filepath.Join(dir, "credentials"), dir, nil
return filepath.Join(dir, http.DefaultTokenFile), dir, nil
}
func getTokenFromDefaultPath() string {

View File

@ -2,6 +2,7 @@ package launcher
import (
"context"
"io"
"io/ioutil"
"os"
"sync"
@ -29,6 +30,7 @@ type Engine interface {
storage.PointsWriter
storage.BucketDeleter
prom.PrometheusCollector
influxdb.BackupService
SeriesCardinality() int64
@ -165,3 +167,15 @@ func (t *TemporaryEngine) Flush(ctx context.Context) {
t.log.Fatal("unable to open engine", zap.Error(err))
}
}
func (t *TemporaryEngine) CreateBackup(ctx context.Context) (int, []string, error) {
return t.engine.CreateBackup(ctx)
}
func (t *TemporaryEngine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
return t.engine.FetchBackupFile(ctx, backupID, backupFile, w)
}
func (t *TemporaryEngine) InternalBackupPath(backupID int) string {
return t.engine.InternalBackupPath(backupID)
}

View File

@ -147,7 +147,7 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) {
{
DestP: &l.boltPath,
Flag: "bolt-path",
Default: filepath.Join(dir, "influxd.bolt"),
Default: filepath.Join(dir, bolt.DefaultFilename),
Desc: "path to boltdb database",
},
{
@ -581,6 +581,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var (
deleteService platform.DeleteService = m.engine
pointsWriter storage.PointsWriter = m.engine
backupService platform.BackupService = m.engine
)
// TODO(cwolff): Figure out a good default per-query memory limit:
@ -772,6 +773,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
NewQueryService: source.NewQueryService,
PointsWriter: pointsWriter,
DeleteService: deleteService,
BackupService: backupService,
KVBackupService: m.kvService,
AuthorizationService: authSvc,
// Wrap the BucketService in a storage backed one that will ensure deleted buckets are removed from the storage engine.
BucketService: storage.NewBucketService(bucketSvc, m.engine),

View File

@ -17,6 +17,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/bolt"
influxdbcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/http"
"github.com/influxdata/influxdb/kv"
@ -79,7 +80,7 @@ func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, args ...string) *
// Run executes the program with additional arguments to set paths and ports.
func (tl *TestLauncher) Run(ctx context.Context, args ...string) error {
args = append(args, "--bolt-path", filepath.Join(tl.Path, "influxd.bolt"))
args = append(args, "--bolt-path", filepath.Join(tl.Path, bolt.DefaultFilename))
args = append(args, "--engine-path", filepath.Join(tl.Path, "engine"))
args = append(args, "--http-bind-address", "127.0.0.1:0")
args = append(args, "--log-level", "debug")

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb/cmd/influxd/generate"
"github.com/influxdata/influxdb/cmd/influxd/inspect"
"github.com/influxdata/influxdb/cmd/influxd/launcher"
"github.com/influxdata/influxdb/cmd/influxd/restore"
_ "github.com/influxdata/influxdb/query/builtin"
_ "github.com/influxdata/influxdb/tsdb/tsi1"
_ "github.com/influxdata/influxdb/tsdb/tsm1"
@ -46,6 +47,7 @@ func init() {
rootCmd.AddCommand(launcher.NewCommand())
rootCmd.AddCommand(generate.Command)
rootCmd.AddCommand(inspect.NewCommand())
rootCmd.AddCommand(restore.Command)
// TODO: this should be removed in the future: https://github.com/influxdata/influxdb/issues/16220
if os.Getenv("QUERY_TRACING") == "1" {

View File

@ -0,0 +1,293 @@
package restore
import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/influxdata/influxdb/bolt"
"github.com/influxdata/influxdb/cmd/influxd/inspect"
"github.com/influxdata/influxdb/http"
"github.com/influxdata/influxdb/internal/fs"
"github.com/influxdata/influxdb/kit/cli"
"github.com/influxdata/influxdb/storage"
"github.com/spf13/cobra"
)
var Command = &cobra.Command{
Use: "restore",
Short: "Restore data and metadata from a backup",
Long: `
This command restores data and metadata from a backup fileset.
Any existing metadata and data will be temporarily moved while restore runs
and deleted after restore completes.
Rebuilding the index and series file uses default options as in
"influxd inspect build-tsi" with the given target engine path.
For additional performance options, run restore with "-rebuild-index false"
and build-tsi afterwards.
NOTES:
* The influxd server should not be running when using the restore tool
as it replaces all data and metadata.
`,
Args: cobra.ExactArgs(0),
RunE: restoreE,
}
var flags struct {
boltPath string
enginePath string
credPath string
backupPath string
rebuildTSI bool
}
func init() {
dir, err := fs.InfluxDir()
if err != nil {
panic(fmt.Errorf("failed to determine influx directory: %s", err))
}
Command.Flags().SortFlags = false
pfs := Command.PersistentFlags()
pfs.SortFlags = false
opts := []cli.Opt{
{
DestP: &flags.boltPath,
Flag: "bolt-path",
Default: filepath.Join(dir, http.DefaultTokenFile),
Desc: "path to target boltdb database",
},
{
DestP: &flags.enginePath,
Flag: "engine-path",
Default: filepath.Join(dir, "engine"),
Desc: "path to target persistent engine files",
},
{
DestP: &flags.credPath,
Flag: "credentials-path",
Default: filepath.Join(dir, http.DefaultTokenFile),
Desc: "path to target persistent engine files",
},
{
DestP: &flags.backupPath,
Flag: "backup-path",
Default: "",
Desc: "path to backup files",
},
{
DestP: &flags.rebuildTSI,
Flag: "rebuild-index",
Default: true,
Desc: "if true, rebuild the TSI index and series file based on the given engine path (equivalent to influxd inspect build-tsi)",
},
}
cli.BindOptions(Command, opts)
}
func restoreE(cmd *cobra.Command, args []string) error {
if flags.backupPath == "" {
return fmt.Errorf("no backup path given")
}
if err := moveBolt(); err != nil {
return fmt.Errorf("failed to move existing bolt file: %v", err)
}
if err := moveCredentials(); err != nil {
return fmt.Errorf("failed to move existing credentials file: %v", err)
}
if err := moveEngine(); err != nil {
return fmt.Errorf("failed to move existing engine data: %v", err)
}
if err := restoreBolt(); err != nil {
return fmt.Errorf("failed to restore bolt file: %v", err)
}
if err := restoreCred(); err != nil {
return fmt.Errorf("failed to restore credentials file: %v", err)
}
if err := restoreEngine(); err != nil {
return fmt.Errorf("failed to restore all TSM files: %v", err)
}
if flags.rebuildTSI {
sFilePath := filepath.Join(flags.enginePath, storage.DefaultSeriesFileDirectoryName)
indexPath := filepath.Join(flags.enginePath, storage.DefaultIndexDirectoryName)
rebuild := inspect.NewBuildTSICommand()
rebuild.SetArgs([]string{"--sfile-path", sFilePath, "--tsi-path", indexPath})
rebuild.Execute()
}
if err := removeTmpBolt(); err != nil {
return fmt.Errorf("restore completed, but failed to cleanup temporary bolt file: %v", err)
}
if err := removeTmpEngine(); err != nil {
return fmt.Errorf("restore completed, but failed to cleanup temporary engine data: %v", err)
}
return nil
}
func moveBolt() error {
if _, err := os.Stat(flags.boltPath); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
if err := removeTmpBolt(); err != nil {
return err
}
return os.Rename(flags.boltPath, flags.boltPath+".tmp")
}
func moveCredentials() error {
if _, err := os.Stat(flags.credPath); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
if err := removeTmpCred(); err != nil {
return err
}
return os.Rename(flags.credPath, flags.credPath+".tmp")
}
func moveEngine() error {
if _, err := os.Stat(flags.enginePath); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
if err := removeTmpEngine(); err != nil {
return err
}
if err := os.Rename(flags.enginePath, tmpEnginePath()); err != nil {
return err
}
return os.MkdirAll(flags.enginePath, 0777)
}
func tmpEnginePath() string {
return filepath.Dir(flags.enginePath) + "tmp"
}
func removeTmpBolt() error {
return removeIfExists(flags.boltPath + ".tmp")
}
func removeTmpEngine() error {
return removeIfExists(tmpEnginePath())
}
func removeTmpCred() error {
return removeIfExists(flags.credPath + ".tmp")
}
func removeIfExists(path string) error {
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
} else {
return os.RemoveAll(path)
}
}
func restoreBolt() error {
backupBolt := filepath.Join(flags.backupPath, bolt.DefaultFilename)
if err := restoreFile(backupBolt, flags.boltPath, "bolt"); err != nil {
return err
}
fmt.Printf("Restored Bolt to %s from %s\n", flags.boltPath, backupBolt)
return nil
}
func restoreEngine() error {
dataDir := filepath.Join(flags.enginePath, "/data")
if err := os.MkdirAll(dataDir, 0777); err != nil {
return err
}
count := 0
err := filepath.Walk(flags.backupPath, func(path string, info os.FileInfo, err error) error {
if strings.Contains(path, ".tsm") {
f, err := os.OpenFile(path, os.O_RDONLY, 0666)
if err != nil {
return fmt.Errorf("error opening TSM file: %v", err)
}
defer f.Close()
tsmPath := filepath.Join(dataDir, filepath.Base(path))
w, err := os.OpenFile(tsmPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return err
}
defer w.Close()
_, err = io.Copy(w, f)
if err != nil {
return err
}
count++
return nil
}
return nil
})
fmt.Printf("Restored %d TSM files to %v\n", count, dataDir)
return err
}
func restoreFile(backup string, target string, filetype string) error {
f, err := os.Open(backup)
if err != nil {
return fmt.Errorf("no %s file in backup: %v", filetype, err)
}
defer f.Close()
if err := os.MkdirAll(filepath.Dir(target), 0777); err != nil {
return err
}
w, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil && !os.IsNotExist(err) {
return err
}
defer w.Close()
_, err = io.Copy(w, f)
return err
}
func restoreCred() error {
backupCred := filepath.Join(flags.backupPath, http.DefaultTokenFile)
if err := restoreFile(backupCred, flags.credPath, "credentials"); err != nil {
return err
}
fmt.Printf("Restored credentials to %s from %s\n", flags.credPath, backupCred)
return nil
}

View File

@ -51,6 +51,8 @@ type APIBackend struct {
PointsWriter storage.PointsWriter
DeleteService influxdb.DeleteService
BackupService influxdb.BackupService
KVBackupService influxdb.KVBackupService
AuthorizationService influxdb.AuthorizationService
BucketService influxdb.BucketService
SessionService influxdb.SessionService
@ -213,6 +215,10 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler {
variableBackend.VariableService = authorizer.NewVariableService(b.VariableService)
h.Mount(prefixVariables, NewVariableHandler(b.Logger, variableBackend))
backupBackend := NewBackupBackend(b)
backupBackend.BackupService = authorizer.NewBackupService(backupBackend.BackupService)
h.Mount(prefixBackup, NewBackupHandler(backupBackend))
writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b)
h.Mount(prefixWrite, NewWriteHandler(b.Logger, writeBackend,
WithMaxBatchSizeBytes(b.MaxBatchSizeBytes),
@ -231,6 +237,7 @@ var apiLinks = map[string]interface{}{
// when adding new links, please take care to keep this list alphabetical
// as this makes it easier to verify values against the swagger document.
"authorizations": "/api/v2/authorizations",
"backup": "/api/v2/backup",
"buckets": "/api/v2/buckets",
"dashboards": "/api/v2/dashboards",
"external": map[string]string{

260
http/backup_service.go Normal file
View File

@ -0,0 +1,260 @@
package http
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"time"
"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/bolt"
"github.com/influxdata/influxdb/internal/fs"
"github.com/influxdata/influxdb/kit/tracing"
"go.uber.org/multierr"
"go.uber.org/zap"
)
const DefaultTokenFile = "credentials"
// BackupBackend is all services and associated parameters required to construct the BackupHandler.
type BackupBackend struct {
Logger *zap.Logger
influxdb.HTTPErrorHandler
BackupService influxdb.BackupService
KVBackupService influxdb.KVBackupService
}
// NewBackupBackend returns a new instance of BackupBackend.
func NewBackupBackend(b *APIBackend) *BackupBackend {
return &BackupBackend{
Logger: b.Logger.With(zap.String("handler", "backup")),
HTTPErrorHandler: b.HTTPErrorHandler,
BackupService: b.BackupService,
KVBackupService: b.KVBackupService,
}
}
type BackupHandler struct {
*httprouter.Router
influxdb.HTTPErrorHandler
Logger *zap.Logger
BackupService influxdb.BackupService
KVBackupService influxdb.KVBackupService
}
const (
prefixBackup = "/api/v2/backup"
backupIDParamName = "backup_id"
backupFileParamName = "backup_file"
backupFilePath = prefixBackup + "/:" + backupIDParamName + "/file/:" + backupFileParamName
httpClientTimeout = time.Hour
)
func composeBackupFilePath(backupID int, backupFile string) string {
return path.Join(prefixBackup, fmt.Sprint(backupID), "file", fmt.Sprint(backupFile))
}
// NewBackupHandler creates a new handler at /api/v2/backup to receive backup requests.
func NewBackupHandler(b *BackupBackend) *BackupHandler {
h := &BackupHandler{
HTTPErrorHandler: b.HTTPErrorHandler,
Router: NewRouter(b.HTTPErrorHandler),
Logger: b.Logger,
BackupService: b.BackupService,
KVBackupService: b.KVBackupService,
}
h.HandlerFunc(http.MethodPost, prefixBackup, h.handleCreate)
h.HandlerFunc(http.MethodGet, backupFilePath, h.handleFetchFile)
return h
}
type backup struct {
ID int `json:"id,omitempty"`
Files []string `json:"files,omitempty"`
}
func (h *BackupHandler) handleCreate(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleCreate")
defer span.Finish()
ctx := r.Context()
id, files, err := h.BackupService.CreateBackup(ctx)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
internalBackupPath := h.BackupService.InternalBackupPath(id)
boltPath := filepath.Join(internalBackupPath, bolt.DefaultFilename)
boltFile, err := os.OpenFile(boltPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0660)
if err != nil {
err = multierr.Append(err, os.RemoveAll(internalBackupPath))
h.HandleHTTPError(ctx, err, w)
return
}
if err = h.KVBackupService.Backup(ctx, boltFile); err != nil {
err = multierr.Append(err, os.RemoveAll(internalBackupPath))
h.HandleHTTPError(ctx, err, w)
return
}
files = append(files, bolt.DefaultFilename)
credBackupPath := filepath.Join(internalBackupPath, DefaultTokenFile)
credPath, err := defaultTokenPath()
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
token, err := ioutil.ReadFile(credPath)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err := ioutil.WriteFile(credBackupPath, []byte(token), 0600); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
files = append(files, DefaultTokenFile)
b := backup{
ID: id,
Files: files,
}
if err = json.NewEncoder(w).Encode(&b); err != nil {
err = multierr.Append(err, os.RemoveAll(internalBackupPath))
h.HandleHTTPError(ctx, err, w)
return
}
}
func (h *BackupHandler) handleFetchFile(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleFetchFile")
defer span.Finish()
ctx := r.Context()
params := httprouter.ParamsFromContext(ctx)
backupID, err := strconv.Atoi(params.ByName("backup_id"))
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
backupFile := params.ByName("backup_file")
if err = h.BackupService.FetchBackupFile(ctx, backupID, backupFile, w); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}
// BackupService is the client implementation of influxdb.BackupService.
type BackupService struct {
Addr string
Token string
InsecureSkipVerify bool
}
func (s *BackupService) CreateBackup(ctx context.Context) (int, []string, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(s.Addr, prefixBackup)
if err != nil {
return 0, nil, err
}
req, err := http.NewRequest(http.MethodPost, u.String(), nil)
if err != nil {
return 0, nil, err
}
SetToken(s.Token, req)
req = req.WithContext(ctx)
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
hc.Timeout = httpClientTimeout
resp, err := hc.Do(req)
if err != nil {
return 0, nil, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return 0, nil, err
}
var b backup
if err = json.NewDecoder(resp.Body).Decode(&b); err != nil {
return 0, nil, err
}
return b.ID, b.Files, nil
}
func (s *BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(s.Addr, composeBackupFilePath(backupID, backupFile))
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return err
}
SetToken(s.Token, req)
req = req.WithContext(ctx)
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
hc.Timeout = httpClientTimeout
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return err
}
_, err = io.Copy(w, resp.Body)
if err != nil {
return err
}
return nil
}
func defaultTokenPath() (string, error) {
dir, err := fs.InfluxDir()
if err != nil {
return "", err
}
return filepath.Join(dir, DefaultTokenFile), nil
}
func (s *BackupService) InternalBackupPath(backupID int) string {
panic("internal method not implemented here")
}

View File

@ -40,6 +40,7 @@ type Service struct {
InsecureSkipVerify bool
*AuthorizationService
*BackupService
*BucketService
*DashboardService
*OrganizationService
@ -60,11 +61,15 @@ func NewService(addr, token string) (*Service, error) {
Addr: addr,
Token: token,
AuthorizationService: &AuthorizationService{Client: httpClient},
BucketService: &BucketService{Client: httpClient},
DashboardService: &DashboardService{Client: httpClient},
OrganizationService: &OrganizationService{Client: httpClient},
UserService: &UserService{Client: httpClient},
VariableService: &VariableService{Client: httpClient},
BackupService: &BackupService{
Addr: addr,
Token: token,
},
BucketService: &BucketService{Client: httpClient},
DashboardService: &DashboardService{Client: httpClient},
OrganizationService: &OrganizationService{Client: httpClient},
UserService: &UserService{Client: httpClient},
VariableService: &VariableService{Client: httpClient},
WriteService: &WriteService{
Addr: addr,
Token: token,

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"sync"
"github.com/google/btree"
@ -56,6 +57,10 @@ func (s *KVStore) Update(ctx context.Context, fn func(kv.Tx) error) error {
})
}
func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
panic("not implemented")
}
// Flush removes all data from the buckets. Used for testing.
func (s *KVStore) Flush(ctx context.Context) {
s.mu.Lock()

10
kv/backup.go Normal file
View File

@ -0,0 +1,10 @@
package kv
import (
"context"
"io"
)
func (s *Service) Backup(ctx context.Context, w io.Writer) error {
return s.kv.Backup(ctx, w)
}

View File

@ -2,6 +2,7 @@ package kv_test
import (
"context"
"io"
"testing"
"time"
@ -21,6 +22,10 @@ func (s mockStore) Update(context.Context, func(kv.Tx) error) error {
return nil
}
func (s mockStore) Backup(ctx context.Context, w io.Writer) error {
return nil
}
func TestNewService(t *testing.T) {
s := kv.NewService(zaptest.NewLogger(t), mockStore{})

View File

@ -3,6 +3,7 @@ package kv
import (
"context"
"errors"
"io"
)
var (
@ -29,6 +30,8 @@ type Store interface {
View(context.Context, func(Tx) error) error
// Update opens up a transaction that will mutate data.
Update(context.Context, func(Tx) error) error
// Backup copies all K:Vs to a writer, file format determined by implementation.
Backup(ctx context.Context, w io.Writer) error
}
// Tx is a transaction in the store.

View File

@ -2,6 +2,7 @@ package mock
import (
"context"
"io"
"github.com/influxdata/influxdb/kv"
)
@ -12,6 +13,7 @@ var _ (kv.Store) = (*Store)(nil)
type Store struct {
ViewFn func(func(kv.Tx) error) error
UpdateFn func(func(kv.Tx) error) error
BackupFn func(ctx context.Context, w io.Writer) error
}
// View opens up a transaction that will not write to any data. Implementing interfaces
@ -25,6 +27,10 @@ func (s *Store) Update(ctx context.Context, fn func(kv.Tx) error) error {
return s.UpdateFn(fn)
}
func (s *Store) Backup(ctx context.Context, w io.Writer) error {
return s.BackupFn(ctx, w)
}
var _ (kv.Tx) = (*Tx)(nil)
// Tx is mock of a kv.Tx.

View File

@ -3,9 +3,12 @@ package storage
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"sync"
"time"
@ -21,7 +24,9 @@ import (
"github.com/influxdata/influxdb/tsdb/tsm1"
"github.com/influxdata/influxdb/tsdb/value"
"github.com/influxdata/influxql"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/multierr"
"go.uber.org/zap"
)
@ -662,6 +667,104 @@ func (e *Engine) deleteBucketRangeLocked(ctx context.Context, orgID, bucketID pl
return e.engine.DeletePrefixRange(ctx, name, min, max, pred)
}
// CreateBackup creates a "snapshot" of all TSM data in the Engine.
// 1) Snapshot the cache to ensure the backup includes all data written before now.
// 2) Create hard links to all TSM files, in a new directory within the engine root directory.
// 3) Return a unique backup ID (invalid after the process terminates) and list of files.
func (e *Engine) CreateBackup(ctx context.Context) (int, []string, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if e.closing == nil {
return 0, nil, ErrEngineClosed
}
if err := e.engine.WriteSnapshot(ctx, tsm1.CacheStatusBackup); err != nil {
return 0, nil, err
}
id, snapshotPath, err := e.engine.FileStore.CreateSnapshot(ctx)
if err != nil {
return 0, nil, err
}
fileInfos, err := ioutil.ReadDir(snapshotPath)
if err != nil {
return 0, nil, err
}
filenames := make([]string, len(fileInfos))
for i, fi := range fileInfos {
filenames[i] = fi.Name()
}
return id, filenames, nil
}
// FetchBackupFile writes a given backup file to the provided writer.
// After a successful write, the internal copy is removed.
func (e *Engine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return ErrEngineClosed
}
if err := e.fetchBackup(ctx, backupID, backupFile, w); err != nil {
e.logger.Error("Failed to fetch file for backup", zap.Error(err), zap.Int("backup_id", backupID), zap.String("backup_file", backupFile))
return err
}
backupPath := e.engine.FileStore.InternalBackupPath(backupID)
backupFileFullPath := filepath.Join(backupPath, backupFile)
if err := os.Remove(backupFileFullPath); err != nil {
e.logger.Info("Failed to remove backup file after fetch", zap.Error(err), zap.Int("backup_id", backupID), zap.String("backup_file", backupFile))
}
return nil
}
func (e *Engine) fetchBackup(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
backupPath := e.engine.FileStore.InternalBackupPath(backupID)
if fi, err := os.Stat(backupPath); err != nil {
if os.IsNotExist(err) {
return errors.Errorf("backup %d not found", backupID)
}
return errors.WithMessagef(err, "failed to locate backup %d", backupID)
} else if !fi.IsDir() {
return errors.Errorf("error in filesystem path of backup %d", backupID)
}
backupFileFullPath := filepath.Join(backupPath, backupFile)
file, err := os.Open(backupFileFullPath)
if err != nil {
if os.IsNotExist(err) {
return errors.Errorf("backup file %d/%s not found", backupID, backupFile)
}
return errors.WithMessagef(err, "failed to open backup file %d/%s", backupID, backupFile)
}
defer file.Close()
if _, err = io.Copy(w, file); err != nil {
err = multierr.Append(err, file.Close())
return errors.WithMessagef(err, "failed to copy backup file %d/%s to writer", backupID, backupFile)
}
if err = file.Close(); err != nil {
return errors.WithMessagef(err, "failed to close backup file %d/%s", backupID, backupFile)
}
return nil
}
// InternalBackupPath provides the internal, full path directory name of the backup.
// This should not be exposed via API.
func (e *Engine) InternalBackupPath(backupID int) string {
return e.engine.FileStore.InternalBackupPath(backupID)
}
// SeriesCardinality returns the number of series in the engine.
func (e *Engine) SeriesCardinality() int64 {
e.mu.RLock()

View File

@ -799,8 +799,9 @@ func (e *Engine) WriteSnapshot(ctx context.Context, status CacheStatus) error {
if err != nil && err != errCompactionsDisabled {
e.logger.Info("Error writing snapshot", zap.Error(err))
}
e.compactionTracker.SnapshotAttempted(err == nil || err == errCompactionsDisabled ||
err == ErrSnapshotInProgress, status, time.Since(start))
e.compactionTracker.SnapshotAttempted(
err == nil || err == errCompactionsDisabled || err == ErrSnapshotInProgress,
status, time.Since(start))
if err != nil {
return err
@ -932,6 +933,7 @@ const (
CacheStatusColdNoWrites // The cache has not been written to for long enough that it should be snapshotted.
CacheStatusRetention // The cache was snapshotted before running retention.
CacheStatusFullCompaction // The cache was snapshotted as part of a full compaction.
CacheStatusBackup // The cache was snapshotted before running backup.
)
// ShouldCompactCache returns a status indicating if the Cache should be

View File

@ -1210,7 +1210,7 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
// CreateSnapshot creates hardlinks for all tsm and tombstone files
// in the path provided.
func (f *FileStore) CreateSnapshot(ctx context.Context) (string, error) {
func (f *FileStore) CreateSnapshot(ctx context.Context) (backupID int, backupDirFullPath string, err error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
@ -1230,30 +1230,35 @@ func (f *FileStore) CreateSnapshot(ctx context.Context) (string, error) {
// increment and keep track of the current temp dir for when we drop the lock.
// this ensures we are the only writer to the directory.
f.currentTempDirID += 1
tmpPath := fmt.Sprintf("%d.%s", f.currentTempDirID, TmpTSMFileExtension)
tmpPath = filepath.Join(f.dir, tmpPath)
backupID = f.currentTempDirID
f.mu.Unlock()
backupDirFullPath = f.InternalBackupPath(backupID)
// create the tmp directory and add the hard links. there is no longer any shared
// mutable state.
err := os.Mkdir(tmpPath, 0777)
err = os.Mkdir(backupDirFullPath, 0777)
if err != nil {
return "", err
return 0, "", err
}
for _, tsmf := range files {
newpath := filepath.Join(tmpPath, filepath.Base(tsmf.Path()))
newpath := filepath.Join(backupDirFullPath, filepath.Base(tsmf.Path()))
if err := os.Link(tsmf.Path(), newpath); err != nil {
return "", fmt.Errorf("error creating tsm hard link: %q", err)
return 0, "", fmt.Errorf("error creating tsm hard link: %q", err)
}
for _, tf := range tsmf.TombstoneFiles() {
newpath := filepath.Join(tmpPath, filepath.Base(tf.Path))
newpath := filepath.Join(backupDirFullPath, filepath.Base(tf.Path))
if err := os.Link(tf.Path, newpath); err != nil {
return "", fmt.Errorf("error creating tombstone hard link: %q", err)
return 0, "", fmt.Errorf("error creating tombstone hard link: %q", err)
}
}
}
return tmpPath, nil
return backupID, backupDirFullPath, nil
}
func (f *FileStore) InternalBackupPath(backupID int) string {
return filepath.Join(f.dir, fmt.Sprintf("%d.%s", backupID, TmpTSMFileExtension))
}
// MeasurementStats returns the sum of all measurement stats within the store.

View File

@ -2724,7 +2724,7 @@ func TestFileStore_CreateSnapshot(t *testing.T) {
t.Fatalf("unexpected error delete range: %v", err)
}
s, e := fs.CreateSnapshot(context.Background())
_, s, e := fs.CreateSnapshot(context.Background())
if e != nil {
t.Fatal(e)
}