diff --git a/CHANGELOG.md b/CHANGELOG.md index dc91b48431..f94cc21728 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ 1. [16523](https://github.com/influxdata/influxdb/pull/16523): Change influx packages to be CRD compliant 1. [16547](https://github.com/influxdata/influxdb/pull/16547): Allow trailing newline in credentials file and CLI integration 1. [16545](https://github.com/influxdata/influxdb/pull/16545): Add support for prefixed cursor search to ForwardCursor types +1. [16504](https://github.com/influxdata/influxdb/pull/16504): Add backup and restore ### UI Improvements diff --git a/authorizer/authorize.go b/authorizer/authorize.go index 7fb718c70b..778b2b2ed6 100644 --- a/authorizer/authorize.go +++ b/authorizer/authorize.go @@ -11,15 +11,23 @@ import ( // IsAllowed checks to see if an action is authorized by retrieving the authorizer // off of context and authorizing the action appropriately. func IsAllowed(ctx context.Context, p influxdb.Permission) error { + return IsAllowedAll(ctx, []influxdb.Permission{p}) +} + +// IsAllowedAll checks to see if an action is authorized by ALL permissions. +// Also see IsAllowed. +func IsAllowedAll(ctx context.Context, permissions []influxdb.Permission) error { a, err := influxdbcontext.GetAuthorizer(ctx) if err != nil { return err } - if !a.Allowed(p) { - return &influxdb.Error{ - Code: influxdb.EUnauthorized, - Msg: fmt.Sprintf("%s is unauthorized", p), + for _, p := range permissions { + if !a.Allowed(p) { + return &influxdb.Error{ + Code: influxdb.EUnauthorized, + Msg: fmt.Sprintf("%s is unauthorized", p), + } } } diff --git a/authorizer/backup.go b/authorizer/backup.go new file mode 100644 index 0000000000..4dc502e102 --- /dev/null +++ b/authorizer/backup.go @@ -0,0 +1,48 @@ +package authorizer + +import ( + "context" + "io" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/kit/tracing" +) + +var _ influxdb.BackupService = (*BackupService)(nil) + +// BackupService wraps a influxdb.BackupService and authorizes actions +// against it appropriately. +type BackupService struct { + s influxdb.BackupService +} + +// NewBackupService constructs an instance of an authorizing backup service. +func NewBackupService(s influxdb.BackupService) *BackupService { + return &BackupService{ + s: s, + } +} + +func (b BackupService) CreateBackup(ctx context.Context) (int, []string, error) { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { + return 0, nil, err + } + return b.s.CreateBackup(ctx) +} + +func (b BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { + return err + } + return b.s.FetchBackupFile(ctx, backupID, backupFile, w) +} + +func (b BackupService) InternalBackupPath(backupID int) string { + return b.s.InternalBackupPath(backupID) +} diff --git a/authz.go b/authz.go index b9a049ed82..97d2e0fc46 100644 --- a/authz.go +++ b/authz.go @@ -335,6 +335,16 @@ func OperPermissions() []Permission { return ps } +// ReadAllPermissions represents permission to read all data and metadata. +// Like OperPermissions, but allows read-only users. +func ReadAllPermissions() []Permission { + ps := make([]Permission, len(AllResourceTypes)) + for i, t := range AllResourceTypes { + ps[i] = Permission{Action: ReadAction, Resource: Resource{Type: t}} + } + return ps +} + // OwnerPermissions are the default permissions for those who own a resource. func OwnerPermissions(orgID ID) []Permission { ps := []Permission{} diff --git a/backup.go b/backup.go new file mode 100644 index 0000000000..fe2ed03a71 --- /dev/null +++ b/backup.go @@ -0,0 +1,23 @@ +package influxdb + +import ( + "context" + "io" +) + +// BackupService represents the data backup functions of InfluxDB. +type BackupService interface { + // CreateBackup creates a local copy (hard links) of the TSM data for all orgs and buckets. + // The return values are used to download each backup file. + CreateBackup(context.Context) (backupID int, backupFiles []string, err error) + // FetchBackupFile downloads one backup file, data or metadata. + FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error + // InternalBackupPath is a utility to determine the on-disk location of a backup fileset. + InternalBackupPath(backupID int) string +} + +// KVBackupService represents the meta data backup functions of InfluxDB. +type KVBackupService interface { + // Backup creates a live backup copy of the metadata database. + Backup(ctx context.Context, w io.Writer) error +} diff --git a/bolt/bbolt.go b/bolt/bbolt.go index e24eee8228..6b97ed0d6c 100644 --- a/bolt/bbolt.go +++ b/bolt/bbolt.go @@ -14,6 +14,8 @@ import ( "go.uber.org/zap" ) +const DefaultFilename = "influxd.bolt" + // Client is a client for the boltDB data store. type Client struct { Path string diff --git a/bolt/kv.go b/bolt/kv.go index 2961dd931b..7334bf5b49 100644 --- a/bolt/kv.go +++ b/bolt/kv.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "os" "path/filepath" "time" @@ -124,6 +125,17 @@ func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error { }) } +// Backup copies all K:Vs to a writer, in BoltDB format. +func (s *KVStore) Backup(ctx context.Context, w io.Writer) error { + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + return s.db.View(func(tx *bolt.Tx) error { + _, err := tx.WriteTo(w) + return err + }) +} + // Tx is a light wrapper around a boltdb transaction. It implements kv.Tx. type Tx struct { tx *bolt.Tx diff --git a/cmd/influx/backup.go b/cmd/influx/backup.go new file mode 100644 index 0000000000..ce52eaf251 --- /dev/null +++ b/cmd/influx/backup.go @@ -0,0 +1,111 @@ +package main + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/bolt" + "github.com/influxdata/influxdb/http" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "go.uber.org/multierr" +) + +func cmdBackup() *cobra.Command { + cmd := &cobra.Command{ + Use: "backup", + Short: "Backup the data in InfluxDB", + Long: fmt.Sprintf( + `Backs up data and meta data for the running InfluxDB instance. +Downloaded files are written to the directory indicated by --path. +The target directory, and any parent directories, are created automatically. +Data file have extension .tsm; meta data is written to %s in the same directory.`, + bolt.DefaultFilename), + RunE: backupF, + } + opts := flagOpts{ + { + DestP: &backupFlags.Path, + Flag: "path", + Short: 'p', + EnvVar: "PATH", + Desc: "directory path to write backup files to", + Required: true, + }, + } + opts.mustRegister(cmd) + + return cmd +} + +var backupFlags struct { + Path string +} + +func init() { + err := viper.BindEnv("PATH") + if err != nil { + panic(err) + } + if h := viper.GetString("PATH"); h != "" { + backupFlags.Path = h + } +} + +func newBackupService() (influxdb.BackupService, error) { + return &http.BackupService{ + Addr: flags.host, + Token: flags.token, + }, nil +} + +func backupF(cmd *cobra.Command, args []string) error { + ctx := context.Background() + + if flags.local { + return fmt.Errorf("local flag not supported for backup command") + } + + if backupFlags.Path == "" { + return fmt.Errorf("must specify path") + } + + err := os.MkdirAll(backupFlags.Path, 0777) + if err != nil && !os.IsExist(err) { + return err + } + + backupService, err := newBackupService() + if err != nil { + return err + } + + id, backupFilenames, err := backupService.CreateBackup(ctx) + if err != nil { + return err + } + + fmt.Printf("Backup ID %d contains %d files\n", id, len(backupFilenames)) + + for _, backupFilename := range backupFilenames { + dest := filepath.Join(backupFlags.Path, backupFilename) + w, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) + if err != nil { + return err + } + err = backupService.FetchBackupFile(ctx, id, backupFilename, w) + if err != nil { + return multierr.Append(fmt.Errorf("error fetching file %s: %v", backupFilename, err), w.Close()) + } + if err = w.Close(); err != nil { + return err + } + } + + fmt.Printf("Backup complete") + + return nil +} diff --git a/cmd/influx/main.go b/cmd/influx/main.go index 3a5c566f63..3990cc5e0f 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -118,6 +118,7 @@ func influxCmd(opts ...genericCLIOptFn) *cobra.Command { cmd.AddCommand( cmdAuth(), + cmdBackup(), cmdBucket(runEWrapper), cmdDelete(), cmdOrganization(runEWrapper), @@ -199,7 +200,7 @@ func defaultTokenPath() (string, string, error) { if err != nil { return "", "", err } - return filepath.Join(dir, "credentials"), dir, nil + return filepath.Join(dir, http.DefaultTokenFile), dir, nil } func getTokenFromDefaultPath() string { diff --git a/cmd/influxd/launcher/engine.go b/cmd/influxd/launcher/engine.go index c0bb5679bb..29a6c9c2f8 100644 --- a/cmd/influxd/launcher/engine.go +++ b/cmd/influxd/launcher/engine.go @@ -2,6 +2,7 @@ package launcher import ( "context" + "io" "io/ioutil" "os" "sync" @@ -29,6 +30,7 @@ type Engine interface { storage.PointsWriter storage.BucketDeleter prom.PrometheusCollector + influxdb.BackupService SeriesCardinality() int64 @@ -165,3 +167,15 @@ func (t *TemporaryEngine) Flush(ctx context.Context) { t.log.Fatal("unable to open engine", zap.Error(err)) } } + +func (t *TemporaryEngine) CreateBackup(ctx context.Context) (int, []string, error) { + return t.engine.CreateBackup(ctx) +} + +func (t *TemporaryEngine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { + return t.engine.FetchBackupFile(ctx, backupID, backupFile, w) +} + +func (t *TemporaryEngine) InternalBackupPath(backupID int) string { + return t.engine.InternalBackupPath(backupID) +} diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 184779b9d9..9586d0902a 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -147,7 +147,7 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) { { DestP: &l.boltPath, Flag: "bolt-path", - Default: filepath.Join(dir, "influxd.bolt"), + Default: filepath.Join(dir, bolt.DefaultFilename), Desc: "path to boltdb database", }, { @@ -581,6 +581,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { var ( deleteService platform.DeleteService = m.engine pointsWriter storage.PointsWriter = m.engine + backupService platform.BackupService = m.engine ) // TODO(cwolff): Figure out a good default per-query memory limit: @@ -772,6 +773,8 @@ func (m *Launcher) run(ctx context.Context) (err error) { NewQueryService: source.NewQueryService, PointsWriter: pointsWriter, DeleteService: deleteService, + BackupService: backupService, + KVBackupService: m.kvService, AuthorizationService: authSvc, // Wrap the BucketService in a storage backed one that will ensure deleted buckets are removed from the storage engine. BucketService: storage.NewBucketService(bucketSvc, m.engine), diff --git a/cmd/influxd/launcher/launcher_helpers.go b/cmd/influxd/launcher/launcher_helpers.go index 3c64a95b51..f8ac54878f 100644 --- a/cmd/influxd/launcher/launcher_helpers.go +++ b/cmd/influxd/launcher/launcher_helpers.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/lang" platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/bolt" influxdbcontext "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/http" "github.com/influxdata/influxdb/kv" @@ -79,7 +80,7 @@ func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, args ...string) * // Run executes the program with additional arguments to set paths and ports. func (tl *TestLauncher) Run(ctx context.Context, args ...string) error { - args = append(args, "--bolt-path", filepath.Join(tl.Path, "influxd.bolt")) + args = append(args, "--bolt-path", filepath.Join(tl.Path, bolt.DefaultFilename)) args = append(args, "--engine-path", filepath.Join(tl.Path, "engine")) args = append(args, "--http-bind-address", "127.0.0.1:0") args = append(args, "--log-level", "debug") diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index ba29ac9e5c..444d60c1d5 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/cmd/influxd/generate" "github.com/influxdata/influxdb/cmd/influxd/inspect" "github.com/influxdata/influxdb/cmd/influxd/launcher" + "github.com/influxdata/influxdb/cmd/influxd/restore" _ "github.com/influxdata/influxdb/query/builtin" _ "github.com/influxdata/influxdb/tsdb/tsi1" _ "github.com/influxdata/influxdb/tsdb/tsm1" @@ -46,6 +47,7 @@ func init() { rootCmd.AddCommand(launcher.NewCommand()) rootCmd.AddCommand(generate.Command) rootCmd.AddCommand(inspect.NewCommand()) + rootCmd.AddCommand(restore.Command) // TODO: this should be removed in the future: https://github.com/influxdata/influxdb/issues/16220 if os.Getenv("QUERY_TRACING") == "1" { diff --git a/cmd/influxd/restore/command.go b/cmd/influxd/restore/command.go new file mode 100644 index 0000000000..3fa467e94d --- /dev/null +++ b/cmd/influxd/restore/command.go @@ -0,0 +1,293 @@ +package restore + +import ( + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/influxdata/influxdb/bolt" + "github.com/influxdata/influxdb/cmd/influxd/inspect" + "github.com/influxdata/influxdb/http" + "github.com/influxdata/influxdb/internal/fs" + "github.com/influxdata/influxdb/kit/cli" + "github.com/influxdata/influxdb/storage" + "github.com/spf13/cobra" +) + +var Command = &cobra.Command{ + Use: "restore", + Short: "Restore data and metadata from a backup", + Long: ` +This command restores data and metadata from a backup fileset. + +Any existing metadata and data will be temporarily moved while restore runs +and deleted after restore completes. + +Rebuilding the index and series file uses default options as in +"influxd inspect build-tsi" with the given target engine path. +For additional performance options, run restore with "-rebuild-index false" +and build-tsi afterwards. + +NOTES: + +* The influxd server should not be running when using the restore tool + as it replaces all data and metadata. +`, + Args: cobra.ExactArgs(0), + RunE: restoreE, +} + +var flags struct { + boltPath string + enginePath string + credPath string + backupPath string + rebuildTSI bool +} + +func init() { + dir, err := fs.InfluxDir() + if err != nil { + panic(fmt.Errorf("failed to determine influx directory: %s", err)) + } + + Command.Flags().SortFlags = false + + pfs := Command.PersistentFlags() + pfs.SortFlags = false + + opts := []cli.Opt{ + { + DestP: &flags.boltPath, + Flag: "bolt-path", + Default: filepath.Join(dir, http.DefaultTokenFile), + Desc: "path to target boltdb database", + }, + { + DestP: &flags.enginePath, + Flag: "engine-path", + Default: filepath.Join(dir, "engine"), + Desc: "path to target persistent engine files", + }, + { + DestP: &flags.credPath, + Flag: "credentials-path", + Default: filepath.Join(dir, http.DefaultTokenFile), + Desc: "path to target persistent engine files", + }, + { + DestP: &flags.backupPath, + Flag: "backup-path", + Default: "", + Desc: "path to backup files", + }, + { + DestP: &flags.rebuildTSI, + Flag: "rebuild-index", + Default: true, + Desc: "if true, rebuild the TSI index and series file based on the given engine path (equivalent to influxd inspect build-tsi)", + }, + } + + cli.BindOptions(Command, opts) +} + +func restoreE(cmd *cobra.Command, args []string) error { + if flags.backupPath == "" { + return fmt.Errorf("no backup path given") + } + + if err := moveBolt(); err != nil { + return fmt.Errorf("failed to move existing bolt file: %v", err) + } + + if err := moveCredentials(); err != nil { + return fmt.Errorf("failed to move existing credentials file: %v", err) + } + + if err := moveEngine(); err != nil { + return fmt.Errorf("failed to move existing engine data: %v", err) + } + + if err := restoreBolt(); err != nil { + return fmt.Errorf("failed to restore bolt file: %v", err) + } + + if err := restoreCred(); err != nil { + return fmt.Errorf("failed to restore credentials file: %v", err) + } + + if err := restoreEngine(); err != nil { + return fmt.Errorf("failed to restore all TSM files: %v", err) + } + + if flags.rebuildTSI { + sFilePath := filepath.Join(flags.enginePath, storage.DefaultSeriesFileDirectoryName) + indexPath := filepath.Join(flags.enginePath, storage.DefaultIndexDirectoryName) + + rebuild := inspect.NewBuildTSICommand() + rebuild.SetArgs([]string{"--sfile-path", sFilePath, "--tsi-path", indexPath}) + rebuild.Execute() + } + + if err := removeTmpBolt(); err != nil { + return fmt.Errorf("restore completed, but failed to cleanup temporary bolt file: %v", err) + } + + if err := removeTmpEngine(); err != nil { + return fmt.Errorf("restore completed, but failed to cleanup temporary engine data: %v", err) + } + + return nil +} + +func moveBolt() error { + if _, err := os.Stat(flags.boltPath); os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + + if err := removeTmpBolt(); err != nil { + return err + } + + return os.Rename(flags.boltPath, flags.boltPath+".tmp") +} + +func moveCredentials() error { + if _, err := os.Stat(flags.credPath); os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + + if err := removeTmpCred(); err != nil { + return err + } + + return os.Rename(flags.credPath, flags.credPath+".tmp") +} + +func moveEngine() error { + if _, err := os.Stat(flags.enginePath); os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + + if err := removeTmpEngine(); err != nil { + return err + } + + if err := os.Rename(flags.enginePath, tmpEnginePath()); err != nil { + return err + } + + return os.MkdirAll(flags.enginePath, 0777) +} + +func tmpEnginePath() string { + return filepath.Dir(flags.enginePath) + "tmp" +} + +func removeTmpBolt() error { + return removeIfExists(flags.boltPath + ".tmp") +} + +func removeTmpEngine() error { + return removeIfExists(tmpEnginePath()) +} + +func removeTmpCred() error { + return removeIfExists(flags.credPath + ".tmp") +} + +func removeIfExists(path string) error { + if _, err := os.Stat(path); os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } else { + return os.RemoveAll(path) + } +} + +func restoreBolt() error { + backupBolt := filepath.Join(flags.backupPath, bolt.DefaultFilename) + + if err := restoreFile(backupBolt, flags.boltPath, "bolt"); err != nil { + return err + } + + fmt.Printf("Restored Bolt to %s from %s\n", flags.boltPath, backupBolt) + return nil +} + +func restoreEngine() error { + dataDir := filepath.Join(flags.enginePath, "/data") + if err := os.MkdirAll(dataDir, 0777); err != nil { + return err + } + + count := 0 + err := filepath.Walk(flags.backupPath, func(path string, info os.FileInfo, err error) error { + if strings.Contains(path, ".tsm") { + f, err := os.OpenFile(path, os.O_RDONLY, 0666) + if err != nil { + return fmt.Errorf("error opening TSM file: %v", err) + } + defer f.Close() + + tsmPath := filepath.Join(dataDir, filepath.Base(path)) + w, err := os.OpenFile(tsmPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) + if err != nil { + return err + } + defer w.Close() + + _, err = io.Copy(w, f) + if err != nil { + return err + } + count++ + return nil + } + return nil + }) + fmt.Printf("Restored %d TSM files to %v\n", count, dataDir) + return err +} + +func restoreFile(backup string, target string, filetype string) error { + f, err := os.Open(backup) + if err != nil { + return fmt.Errorf("no %s file in backup: %v", filetype, err) + } + defer f.Close() + + if err := os.MkdirAll(filepath.Dir(target), 0777); err != nil { + return err + } + w, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) + if err != nil && !os.IsNotExist(err) { + return err + } + defer w.Close() + + _, err = io.Copy(w, f) + return err +} + +func restoreCred() error { + backupCred := filepath.Join(flags.backupPath, http.DefaultTokenFile) + + if err := restoreFile(backupCred, flags.credPath, "credentials"); err != nil { + return err + } + + fmt.Printf("Restored credentials to %s from %s\n", flags.credPath, backupCred) + return nil +} diff --git a/http/api_handler.go b/http/api_handler.go index 52b280e6af..6d05233680 100644 --- a/http/api_handler.go +++ b/http/api_handler.go @@ -51,6 +51,8 @@ type APIBackend struct { PointsWriter storage.PointsWriter DeleteService influxdb.DeleteService + BackupService influxdb.BackupService + KVBackupService influxdb.KVBackupService AuthorizationService influxdb.AuthorizationService BucketService influxdb.BucketService SessionService influxdb.SessionService @@ -213,6 +215,10 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler { variableBackend.VariableService = authorizer.NewVariableService(b.VariableService) h.Mount(prefixVariables, NewVariableHandler(b.Logger, variableBackend)) + backupBackend := NewBackupBackend(b) + backupBackend.BackupService = authorizer.NewBackupService(backupBackend.BackupService) + h.Mount(prefixBackup, NewBackupHandler(backupBackend)) + writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b) h.Mount(prefixWrite, NewWriteHandler(b.Logger, writeBackend, WithMaxBatchSizeBytes(b.MaxBatchSizeBytes), @@ -231,6 +237,7 @@ var apiLinks = map[string]interface{}{ // when adding new links, please take care to keep this list alphabetical // as this makes it easier to verify values against the swagger document. "authorizations": "/api/v2/authorizations", + "backup": "/api/v2/backup", "buckets": "/api/v2/buckets", "dashboards": "/api/v2/dashboards", "external": map[string]string{ diff --git a/http/backup_service.go b/http/backup_service.go new file mode 100644 index 0000000000..947902ee7c --- /dev/null +++ b/http/backup_service.go @@ -0,0 +1,260 @@ +package http + +import ( + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "path" + "path/filepath" + "strconv" + "time" + + "github.com/influxdata/httprouter" + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/bolt" + "github.com/influxdata/influxdb/internal/fs" + "github.com/influxdata/influxdb/kit/tracing" + "go.uber.org/multierr" + "go.uber.org/zap" +) + +const DefaultTokenFile = "credentials" + +// BackupBackend is all services and associated parameters required to construct the BackupHandler. +type BackupBackend struct { + Logger *zap.Logger + influxdb.HTTPErrorHandler + + BackupService influxdb.BackupService + KVBackupService influxdb.KVBackupService +} + +// NewBackupBackend returns a new instance of BackupBackend. +func NewBackupBackend(b *APIBackend) *BackupBackend { + return &BackupBackend{ + Logger: b.Logger.With(zap.String("handler", "backup")), + + HTTPErrorHandler: b.HTTPErrorHandler, + BackupService: b.BackupService, + KVBackupService: b.KVBackupService, + } +} + +type BackupHandler struct { + *httprouter.Router + influxdb.HTTPErrorHandler + Logger *zap.Logger + + BackupService influxdb.BackupService + KVBackupService influxdb.KVBackupService +} + +const ( + prefixBackup = "/api/v2/backup" + backupIDParamName = "backup_id" + backupFileParamName = "backup_file" + backupFilePath = prefixBackup + "/:" + backupIDParamName + "/file/:" + backupFileParamName + + httpClientTimeout = time.Hour +) + +func composeBackupFilePath(backupID int, backupFile string) string { + return path.Join(prefixBackup, fmt.Sprint(backupID), "file", fmt.Sprint(backupFile)) +} + +// NewBackupHandler creates a new handler at /api/v2/backup to receive backup requests. +func NewBackupHandler(b *BackupBackend) *BackupHandler { + h := &BackupHandler{ + HTTPErrorHandler: b.HTTPErrorHandler, + Router: NewRouter(b.HTTPErrorHandler), + Logger: b.Logger, + BackupService: b.BackupService, + KVBackupService: b.KVBackupService, + } + + h.HandlerFunc(http.MethodPost, prefixBackup, h.handleCreate) + h.HandlerFunc(http.MethodGet, backupFilePath, h.handleFetchFile) + + return h +} + +type backup struct { + ID int `json:"id,omitempty"` + Files []string `json:"files,omitempty"` +} + +func (h *BackupHandler) handleCreate(w http.ResponseWriter, r *http.Request) { + span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleCreate") + defer span.Finish() + + ctx := r.Context() + + id, files, err := h.BackupService.CreateBackup(ctx) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + internalBackupPath := h.BackupService.InternalBackupPath(id) + + boltPath := filepath.Join(internalBackupPath, bolt.DefaultFilename) + boltFile, err := os.OpenFile(boltPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0660) + if err != nil { + err = multierr.Append(err, os.RemoveAll(internalBackupPath)) + h.HandleHTTPError(ctx, err, w) + return + } + + if err = h.KVBackupService.Backup(ctx, boltFile); err != nil { + err = multierr.Append(err, os.RemoveAll(internalBackupPath)) + h.HandleHTTPError(ctx, err, w) + return + } + + files = append(files, bolt.DefaultFilename) + + credBackupPath := filepath.Join(internalBackupPath, DefaultTokenFile) + + credPath, err := defaultTokenPath() + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + token, err := ioutil.ReadFile(credPath) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + if err := ioutil.WriteFile(credBackupPath, []byte(token), 0600); err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + files = append(files, DefaultTokenFile) + + b := backup{ + ID: id, + Files: files, + } + if err = json.NewEncoder(w).Encode(&b); err != nil { + err = multierr.Append(err, os.RemoveAll(internalBackupPath)) + h.HandleHTTPError(ctx, err, w) + return + } +} + +func (h *BackupHandler) handleFetchFile(w http.ResponseWriter, r *http.Request) { + span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleFetchFile") + defer span.Finish() + + ctx := r.Context() + + params := httprouter.ParamsFromContext(ctx) + backupID, err := strconv.Atoi(params.ByName("backup_id")) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + backupFile := params.ByName("backup_file") + + if err = h.BackupService.FetchBackupFile(ctx, backupID, backupFile, w); err != nil { + h.HandleHTTPError(ctx, err, w) + return + } +} + +// BackupService is the client implementation of influxdb.BackupService. +type BackupService struct { + Addr string + Token string + InsecureSkipVerify bool +} + +func (s *BackupService) CreateBackup(ctx context.Context) (int, []string, error) { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + u, err := NewURL(s.Addr, prefixBackup) + if err != nil { + return 0, nil, err + } + + req, err := http.NewRequest(http.MethodPost, u.String(), nil) + if err != nil { + return 0, nil, err + } + SetToken(s.Token, req) + req = req.WithContext(ctx) + + hc := NewClient(u.Scheme, s.InsecureSkipVerify) + hc.Timeout = httpClientTimeout + resp, err := hc.Do(req) + if err != nil { + return 0, nil, err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + return 0, nil, err + } + + var b backup + if err = json.NewDecoder(resp.Body).Decode(&b); err != nil { + return 0, nil, err + } + + return b.ID, b.Files, nil +} + +func (s *BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + u, err := NewURL(s.Addr, composeBackupFilePath(backupID, backupFile)) + if err != nil { + return err + } + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return err + } + SetToken(s.Token, req) + req = req.WithContext(ctx) + + hc := NewClient(u.Scheme, s.InsecureSkipVerify) + hc.Timeout = httpClientTimeout + resp, err := hc.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + return err + } + + _, err = io.Copy(w, resp.Body) + if err != nil { + return err + } + + return nil +} + +func defaultTokenPath() (string, error) { + dir, err := fs.InfluxDir() + if err != nil { + return "", err + } + return filepath.Join(dir, DefaultTokenFile), nil +} + +func (s *BackupService) InternalBackupPath(backupID int) string { + panic("internal method not implemented here") +} diff --git a/http/client.go b/http/client.go index ef671f735b..0b3dc0af2b 100644 --- a/http/client.go +++ b/http/client.go @@ -40,6 +40,7 @@ type Service struct { InsecureSkipVerify bool *AuthorizationService + *BackupService *BucketService *DashboardService *OrganizationService @@ -60,11 +61,15 @@ func NewService(addr, token string) (*Service, error) { Addr: addr, Token: token, AuthorizationService: &AuthorizationService{Client: httpClient}, - BucketService: &BucketService{Client: httpClient}, - DashboardService: &DashboardService{Client: httpClient}, - OrganizationService: &OrganizationService{Client: httpClient}, - UserService: &UserService{Client: httpClient}, - VariableService: &VariableService{Client: httpClient}, + BackupService: &BackupService{ + Addr: addr, + Token: token, + }, + BucketService: &BucketService{Client: httpClient}, + DashboardService: &DashboardService{Client: httpClient}, + OrganizationService: &OrganizationService{Client: httpClient}, + UserService: &UserService{Client: httpClient}, + VariableService: &VariableService{Client: httpClient}, WriteService: &WriteService{ Addr: addr, Token: token, diff --git a/inmem/kv.go b/inmem/kv.go index f2866401a3..cd360b5b4b 100644 --- a/inmem/kv.go +++ b/inmem/kv.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "sync" "github.com/google/btree" @@ -56,6 +57,10 @@ func (s *KVStore) Update(ctx context.Context, fn func(kv.Tx) error) error { }) } +func (s *KVStore) Backup(ctx context.Context, w io.Writer) error { + panic("not implemented") +} + // Flush removes all data from the buckets. Used for testing. func (s *KVStore) Flush(ctx context.Context) { s.mu.Lock() diff --git a/kv/backup.go b/kv/backup.go new file mode 100644 index 0000000000..0744bfb677 --- /dev/null +++ b/kv/backup.go @@ -0,0 +1,10 @@ +package kv + +import ( + "context" + "io" +) + +func (s *Service) Backup(ctx context.Context, w io.Writer) error { + return s.kv.Backup(ctx, w) +} diff --git a/kv/service_test.go b/kv/service_test.go index 50dc017613..0ecccb1e28 100644 --- a/kv/service_test.go +++ b/kv/service_test.go @@ -2,6 +2,7 @@ package kv_test import ( "context" + "io" "testing" "time" @@ -21,6 +22,10 @@ func (s mockStore) Update(context.Context, func(kv.Tx) error) error { return nil } +func (s mockStore) Backup(ctx context.Context, w io.Writer) error { + return nil +} + func TestNewService(t *testing.T) { s := kv.NewService(zaptest.NewLogger(t), mockStore{}) diff --git a/kv/store.go b/kv/store.go index fa889a6e03..e1354ecc95 100644 --- a/kv/store.go +++ b/kv/store.go @@ -3,6 +3,7 @@ package kv import ( "context" "errors" + "io" ) var ( @@ -29,6 +30,8 @@ type Store interface { View(context.Context, func(Tx) error) error // Update opens up a transaction that will mutate data. Update(context.Context, func(Tx) error) error + // Backup copies all K:Vs to a writer, file format determined by implementation. + Backup(ctx context.Context, w io.Writer) error } // Tx is a transaction in the store. diff --git a/mock/kv.go b/mock/kv.go index 87359a9c01..881c0b1b7b 100644 --- a/mock/kv.go +++ b/mock/kv.go @@ -2,6 +2,7 @@ package mock import ( "context" + "io" "github.com/influxdata/influxdb/kv" ) @@ -12,6 +13,7 @@ var _ (kv.Store) = (*Store)(nil) type Store struct { ViewFn func(func(kv.Tx) error) error UpdateFn func(func(kv.Tx) error) error + BackupFn func(ctx context.Context, w io.Writer) error } // View opens up a transaction that will not write to any data. Implementing interfaces @@ -25,6 +27,10 @@ func (s *Store) Update(ctx context.Context, fn func(kv.Tx) error) error { return s.UpdateFn(fn) } +func (s *Store) Backup(ctx context.Context, w io.Writer) error { + return s.BackupFn(ctx, w) +} + var _ (kv.Tx) = (*Tx)(nil) // Tx is mock of a kv.Tx. diff --git a/storage/engine.go b/storage/engine.go index 67da18fb02..32150c9570 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -3,9 +3,12 @@ package storage import ( "bytes" "context" - "errors" "fmt" + "io" + "io/ioutil" "math" + "os" + "path/filepath" "sync" "time" @@ -21,7 +24,9 @@ import ( "github.com/influxdata/influxdb/tsdb/tsm1" "github.com/influxdata/influxdb/tsdb/value" "github.com/influxdata/influxql" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/multierr" "go.uber.org/zap" ) @@ -662,6 +667,104 @@ func (e *Engine) deleteBucketRangeLocked(ctx context.Context, orgID, bucketID pl return e.engine.DeletePrefixRange(ctx, name, min, max, pred) } +// CreateBackup creates a "snapshot" of all TSM data in the Engine. +// 1) Snapshot the cache to ensure the backup includes all data written before now. +// 2) Create hard links to all TSM files, in a new directory within the engine root directory. +// 3) Return a unique backup ID (invalid after the process terminates) and list of files. +func (e *Engine) CreateBackup(ctx context.Context) (int, []string, error) { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + if e.closing == nil { + return 0, nil, ErrEngineClosed + } + + if err := e.engine.WriteSnapshot(ctx, tsm1.CacheStatusBackup); err != nil { + return 0, nil, err + } + + id, snapshotPath, err := e.engine.FileStore.CreateSnapshot(ctx) + if err != nil { + return 0, nil, err + } + + fileInfos, err := ioutil.ReadDir(snapshotPath) + if err != nil { + return 0, nil, err + } + filenames := make([]string, len(fileInfos)) + for i, fi := range fileInfos { + filenames[i] = fi.Name() + } + + return id, filenames, nil +} + +// FetchBackupFile writes a given backup file to the provided writer. +// After a successful write, the internal copy is removed. +func (e *Engine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + e.mu.RLock() + defer e.mu.RUnlock() + if e.closing == nil { + return ErrEngineClosed + } + + if err := e.fetchBackup(ctx, backupID, backupFile, w); err != nil { + e.logger.Error("Failed to fetch file for backup", zap.Error(err), zap.Int("backup_id", backupID), zap.String("backup_file", backupFile)) + return err + } + + backupPath := e.engine.FileStore.InternalBackupPath(backupID) + backupFileFullPath := filepath.Join(backupPath, backupFile) + if err := os.Remove(backupFileFullPath); err != nil { + e.logger.Info("Failed to remove backup file after fetch", zap.Error(err), zap.Int("backup_id", backupID), zap.String("backup_file", backupFile)) + } + + return nil +} + +func (e *Engine) fetchBackup(ctx context.Context, backupID int, backupFile string, w io.Writer) error { + backupPath := e.engine.FileStore.InternalBackupPath(backupID) + if fi, err := os.Stat(backupPath); err != nil { + if os.IsNotExist(err) { + return errors.Errorf("backup %d not found", backupID) + } + return errors.WithMessagef(err, "failed to locate backup %d", backupID) + } else if !fi.IsDir() { + return errors.Errorf("error in filesystem path of backup %d", backupID) + } + + backupFileFullPath := filepath.Join(backupPath, backupFile) + file, err := os.Open(backupFileFullPath) + if err != nil { + if os.IsNotExist(err) { + return errors.Errorf("backup file %d/%s not found", backupID, backupFile) + } + return errors.WithMessagef(err, "failed to open backup file %d/%s", backupID, backupFile) + } + defer file.Close() + + if _, err = io.Copy(w, file); err != nil { + err = multierr.Append(err, file.Close()) + return errors.WithMessagef(err, "failed to copy backup file %d/%s to writer", backupID, backupFile) + } + + if err = file.Close(); err != nil { + return errors.WithMessagef(err, "failed to close backup file %d/%s", backupID, backupFile) + } + + return nil +} + +// InternalBackupPath provides the internal, full path directory name of the backup. +// This should not be exposed via API. +func (e *Engine) InternalBackupPath(backupID int) string { + return e.engine.FileStore.InternalBackupPath(backupID) +} + // SeriesCardinality returns the number of series in the engine. func (e *Engine) SeriesCardinality() int64 { e.mu.RLock() diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index bd733a817e..21fe48fc77 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -799,8 +799,9 @@ func (e *Engine) WriteSnapshot(ctx context.Context, status CacheStatus) error { if err != nil && err != errCompactionsDisabled { e.logger.Info("Error writing snapshot", zap.Error(err)) } - e.compactionTracker.SnapshotAttempted(err == nil || err == errCompactionsDisabled || - err == ErrSnapshotInProgress, status, time.Since(start)) + e.compactionTracker.SnapshotAttempted( + err == nil || err == errCompactionsDisabled || err == ErrSnapshotInProgress, + status, time.Since(start)) if err != nil { return err @@ -932,6 +933,7 @@ const ( CacheStatusColdNoWrites // The cache has not been written to for long enough that it should be snapshotted. CacheStatusRetention // The cache was snapshotted before running retention. CacheStatusFullCompaction // The cache was snapshotted as part of a full compaction. + CacheStatusBackup // The cache was snapshotted before running backup. ) // ShouldCompactCache returns a status indicating if the Cache should be diff --git a/tsdb/tsm1/file_store.go b/tsdb/tsm1/file_store.go index 0497e160ed..9c46f944dd 100644 --- a/tsdb/tsm1/file_store.go +++ b/tsdb/tsm1/file_store.go @@ -1210,7 +1210,7 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location { // CreateSnapshot creates hardlinks for all tsm and tombstone files // in the path provided. -func (f *FileStore) CreateSnapshot(ctx context.Context) (string, error) { +func (f *FileStore) CreateSnapshot(ctx context.Context) (backupID int, backupDirFullPath string, err error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -1230,30 +1230,35 @@ func (f *FileStore) CreateSnapshot(ctx context.Context) (string, error) { // increment and keep track of the current temp dir for when we drop the lock. // this ensures we are the only writer to the directory. f.currentTempDirID += 1 - tmpPath := fmt.Sprintf("%d.%s", f.currentTempDirID, TmpTSMFileExtension) - tmpPath = filepath.Join(f.dir, tmpPath) + backupID = f.currentTempDirID f.mu.Unlock() + backupDirFullPath = f.InternalBackupPath(backupID) + // create the tmp directory and add the hard links. there is no longer any shared // mutable state. - err := os.Mkdir(tmpPath, 0777) + err = os.Mkdir(backupDirFullPath, 0777) if err != nil { - return "", err + return 0, "", err } for _, tsmf := range files { - newpath := filepath.Join(tmpPath, filepath.Base(tsmf.Path())) + newpath := filepath.Join(backupDirFullPath, filepath.Base(tsmf.Path())) if err := os.Link(tsmf.Path(), newpath); err != nil { - return "", fmt.Errorf("error creating tsm hard link: %q", err) + return 0, "", fmt.Errorf("error creating tsm hard link: %q", err) } for _, tf := range tsmf.TombstoneFiles() { - newpath := filepath.Join(tmpPath, filepath.Base(tf.Path)) + newpath := filepath.Join(backupDirFullPath, filepath.Base(tf.Path)) if err := os.Link(tf.Path, newpath); err != nil { - return "", fmt.Errorf("error creating tombstone hard link: %q", err) + return 0, "", fmt.Errorf("error creating tombstone hard link: %q", err) } } } - return tmpPath, nil + return backupID, backupDirFullPath, nil +} + +func (f *FileStore) InternalBackupPath(backupID int) string { + return filepath.Join(f.dir, fmt.Sprintf("%d.%s", backupID, TmpTSMFileExtension)) } // MeasurementStats returns the sum of all measurement stats within the store. diff --git a/tsdb/tsm1/file_store_test.go b/tsdb/tsm1/file_store_test.go index 16546da6c8..244b34b9b9 100644 --- a/tsdb/tsm1/file_store_test.go +++ b/tsdb/tsm1/file_store_test.go @@ -2724,7 +2724,7 @@ func TestFileStore_CreateSnapshot(t *testing.T) { t.Fatalf("unexpected error delete range: %v", err) } - s, e := fs.CreateSnapshot(context.Background()) + _, s, e := fs.CreateSnapshot(context.Background()) if e != nil { t.Fatal(e) }