From 23679c2375161ddf98bcd1ea13b98432c8a95c87 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 29 Oct 2020 16:43:02 -0600 Subject: [PATCH 1/7] feat: Implement backup/restore CLI subcommands. --- authorizer/backup.go | 26 ++- authorizer/restore.go | 46 ++++ backup.go | 68 +++++- cmd/influx/backup.go | 343 +++++++++++++++++++++++++----- cmd/influx/main.go | 1 + cmd/influx/restore.go | 347 +++++++++++++++++++++++++++++++ cmd/influxd/launcher/engine.go | 17 +- cmd/influxd/launcher/launcher.go | 9 +- cmd/influxd/restore/command.go | 304 --------------------------- http/api_handler.go | 7 +- http/backup_service.go | 190 +++++------------ http/restore_service.go | 189 +++++++++++++++++ storage/bucket_service.go | 1 + storage/engine.go | 123 ++++++++--- tsdb/engine/tsm1/engine.go | 10 +- v1/services/meta/client.go | 4 + v1/services/meta/data.go | 15 ++ 17 files changed, 1129 insertions(+), 571 deletions(-) create mode 100644 authorizer/restore.go create mode 100644 cmd/influx/restore.go delete mode 100644 cmd/influxd/restore/command.go create mode 100644 http/restore_service.go diff --git a/authorizer/backup.go b/authorizer/backup.go index 76c398fdc9..c19d6c937d 100644 --- a/authorizer/backup.go +++ b/authorizer/backup.go @@ -23,26 +23,24 @@ func NewBackupService(s influxdb.BackupService) *BackupService { } } -func (b BackupService) CreateBackup(ctx context.Context) (int, []string, error) { - span, ctx := tracing.StartSpanFromContext(ctx) - defer span.Finish() - - if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { - return 0, nil, err - } - return b.s.CreateBackup(ctx) -} - -func (b BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { +func (b BackupService) BackupKVStore(ctx context.Context, w io.Writer) error { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() + // TODO(bbj): Correct permissions. if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { return err } - return b.s.FetchBackupFile(ctx, backupID, backupFile, w) + return b.s.BackupKVStore(ctx, w) } -func (b BackupService) InternalBackupPath(backupID int) string { - return b.s.InternalBackupPath(backupID) +func (b BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + // TODO(bbj): Correct permissions. + if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { + return err + } + return b.s.BackupShard(ctx, w, shardID) } diff --git a/authorizer/restore.go b/authorizer/restore.go new file mode 100644 index 0000000000..40cb421c43 --- /dev/null +++ b/authorizer/restore.go @@ -0,0 +1,46 @@ +package authorizer + +import ( + "context" + "io" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/tracing" +) + +var _ influxdb.RestoreService = (*RestoreService)(nil) + +// RestoreService wraps a influxdb.RestoreService and authorizes actions +// against it appropriately. +type RestoreService struct { + s influxdb.RestoreService +} + +// NewRestoreService constructs an instance of an authorizing restore service. +func NewRestoreService(s influxdb.RestoreService) *RestoreService { + return &RestoreService{ + s: s, + } +} + +func (b RestoreService) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (shardIDMap map[uint64]uint64, err error) { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + // TODO(bbj): Correct permissions. + if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { + return nil, err + } + return b.s.RestoreBucket(ctx, id, dbi) +} + +func (b RestoreService) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + // TODO(bbj): Correct permissions. + if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { + return err + } + return b.s.RestoreShard(ctx, shardID, r) +} diff --git a/backup.go b/backup.go index fe2ed03a71..8777542c9b 100644 --- a/backup.go +++ b/backup.go @@ -3,21 +3,67 @@ package influxdb import ( "context" "io" + "time" +) + +const ( + BackupFilenamePattern = "20060102T150405Z" ) // BackupService represents the data backup functions of InfluxDB. type BackupService interface { - // CreateBackup creates a local copy (hard links) of the TSM data for all orgs and buckets. - // The return values are used to download each backup file. - CreateBackup(context.Context) (backupID int, backupFiles []string, err error) - // FetchBackupFile downloads one backup file, data or metadata. - FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error - // InternalBackupPath is a utility to determine the on-disk location of a backup fileset. - InternalBackupPath(backupID int) string + // BackupKVStore creates a live backup copy of the metadata database. + BackupKVStore(ctx context.Context, w io.Writer) error + + // BackupShard downloads a backup file for a single shard. + BackupShard(ctx context.Context, w io.Writer, shardID uint64) error } -// KVBackupService represents the meta data backup functions of InfluxDB. -type KVBackupService interface { - // Backup creates a live backup copy of the metadata database. - Backup(ctx context.Context, w io.Writer) error +// RestoreService represents the data restore functions of InfluxDB. +type RestoreService interface { + // RestoreKVStore restores the metadata database. + RestoreBucket(ctx context.Context, id ID, rpiData []byte) (shardIDMap map[uint64]uint64, err error) + + // RestoreShard uploads a backup file for a single shard. + RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error +} + +// Manifest lists the KV and shard file information contained in the backup. +// If Limited is false, the manifest contains a full backup, otherwise +// it is a partial backup. +type Manifest struct { + KV ManifestKVEntry `json:"kv"` + Files []ManifestEntry `json:"files"` + + // If limited is true, then one (or all) of the following fields will be set + + OrganizationID string `json:"organizationID,omitempty"` + BucketID string `json:"bucketID,omitempty"` +} + +// ManifestEntry contains the data information for a backed up shard. +type ManifestEntry struct { + OrganizationID string `json:"organizationID"` + OrganizationName string `json:"organizationName"` + BucketID string `json:"bucketID"` + BucketName string `json:"bucketName"` + ShardID uint64 `json:"shardID"` + FileName string `json:"fileName"` + Size int64 `json:"size"` + LastModified time.Time `json:"lastModified"` +} + +// ManifestKVEntry contains the KV store information for a backup. +type ManifestKVEntry struct { + FileName string `json:"fileName"` + Size int64 `json:"size"` +} + +// Size returns the size of the manifest. +func (m *Manifest) Size() int64 { + n := m.KV.Size + for _, f := range m.Files { + n += f.Size + } + return n } diff --git a/cmd/influx/backup.go b/cmd/influx/backup.go index bee51275ed..0235dbc0e4 100644 --- a/cmd/influx/backup.go +++ b/cmd/influx/backup.go @@ -1,97 +1,330 @@ package main import ( + "compress/gzip" "context" + "encoding/json" "fmt" + "io/ioutil" "os" "path/filepath" + "time" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/bolt" "github.com/influxdata/influxdb/v2/http" + "github.com/influxdata/influxdb/v2/kv" + influxlogger "github.com/influxdata/influxdb/v2/logger" + "github.com/influxdata/influxdb/v2/v1/services/meta" "github.com/spf13/cobra" - "go.uber.org/multierr" + "go.uber.org/zap" ) -func cmdBackup(f *globalFlags, opt genericCLIOpts) *cobra.Command { - cmd := opt.newCmd("backup", backupF, false) - cmd.Short = "Backup the data in InfluxDB" - cmd.Long = fmt.Sprintf( - `Backs up data and meta data for the running InfluxDB instance. -Downloaded files are written to the directory indicated by --path. -The target directory, and any parent directories, are created automatically. -Data file have extension .tsm; meta data is written to %s in the same directory.`, - bolt.DefaultFilename) +func cmdBackup(f *globalFlags, opts genericCLIOpts) *cobra.Command { + return newCmdBackupBuilder(f, opts).cmdBackup() +} - f.registerFlags(cmd) +type backupSVCsFn func() (influxdb.BackupService, error) - opts := flagOpts{ - { - DestP: &backupFlags.Path, - Flag: "path", - Short: 'p', - EnvVar: "PATH", - Desc: "directory path to write backup files to", - Required: true, - }, +type cmdBackupBuilder struct { + genericCLIOpts + *globalFlags + + bucketID string + bucketName string + org organization + outputPath string + + manifest influxdb.Manifest + baseName string + + backupService *http.BackupService + kvStore *bolt.KVStore + kvService *kv.Service + metaClient *meta.Client + + logger *zap.Logger +} + +func newCmdBackupBuilder(f *globalFlags, opts genericCLIOpts) *cmdBackupBuilder { + return &cmdBackupBuilder{ + genericCLIOpts: opts, + globalFlags: f, } - opts.mustRegister(cmd) +} +func (b *cmdBackupBuilder) cmdBackup() *cobra.Command { + cmd := b.newCmd("backup", b.backupRunE) + b.org.register(cmd, true) + cmd.Flags().StringVar(&b.bucketID, "bucket-id", "", "The ID of the bucket to backup") + cmd.Flags().StringVarP(&b.bucketName, "bucket", "b", "", "The name of the bucket to backup") + cmd.Use = "backup [flags] path" + cmd.Args = func(cmd *cobra.Command, args []string) error { + if len(args) == 0 { + return fmt.Errorf("must specify output path") + } else if len(args) > 1 { + return fmt.Errorf("too many args specified") + } + b.outputPath = args[0] + return nil + } + cmd.Short = "Backup database" + cmd.Long = ` +Backs up InfluxDB to a directory. + +Examples: + # backup all data + influx backup /path/to/backup +` return cmd } -var backupFlags struct { - Path string +func (b *cmdBackupBuilder) manifestPath() string { + return fmt.Sprintf("%s.manifest", b.baseName) } -func newBackupService() (influxdb.BackupService, error) { - ac := flags.config() - return &http.BackupService{ - Addr: ac.Host, - Token: ac.Token, - }, nil +func (b *cmdBackupBuilder) kvPath() string { + return fmt.Sprintf("%s.bolt", b.baseName) } -func backupF(cmd *cobra.Command, args []string) error { +func (b *cmdBackupBuilder) shardPath(id uint64) string { + return fmt.Sprintf("%s.s%d", b.baseName, id) + ".tar.gz" +} + +func (b *cmdBackupBuilder) backupRunE(cmd *cobra.Command, args []string) (err error) { ctx := context.Background() - if backupFlags.Path == "" { - return fmt.Errorf("must specify path") - } - - err := os.MkdirAll(backupFlags.Path, 0777) - if err != nil && !os.IsExist(err) { + // Create top level logger + logconf := influxlogger.NewConfig() + if b.logger, err = logconf.New(os.Stdout); err != nil { return err } - backupService, err := newBackupService() - if err != nil { + // Determine a base + b.baseName = time.Now().UTC().Format(influxdb.BackupFilenamePattern) + + // Ensure directory exsits. + if err := os.MkdirAll(b.outputPath, 0777); err != nil { return err } - id, backupFilenames, err := backupService.CreateBackup(ctx) - if err != nil { + ac := flags.config() + b.backupService = &http.BackupService{ + Addr: ac.Host, + Token: ac.Token, + } + + // Back up Bolt database to file. + if err := b.backupKVStore(ctx); err != nil { return err } - fmt.Printf("Backup ID %d contains %d files\n", id, len(backupFilenames)) + // Open bolt DB. + boltClient := bolt.NewClient(b.logger) + boltClient.Path = filepath.Join(b.outputPath, b.kvPath()) + if err := boltClient.Open(ctx); err != nil { + return err + } + defer boltClient.Close() - for _, backupFilename := range backupFilenames { - dest := filepath.Join(backupFlags.Path, backupFilename) - w, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) - if err != nil { - return err - } - err = backupService.FetchBackupFile(ctx, id, backupFilename, w) - if err != nil { - return multierr.Append(fmt.Errorf("error fetching file %s: %v", backupFilename, err), w.Close()) - } - if err = w.Close(); err != nil { - return err - } + // Open meta store so we can iterate over meta data. + b.kvStore = bolt.NewKVStore(b.logger, filepath.Join(b.outputPath, b.kvPath())) + b.kvStore.WithDB(boltClient.DB()) + b.kvService = kv.NewService(b.logger, b.kvStore, kv.ServiceConfig{}) + + b.metaClient = meta.NewClient(meta.NewConfig(), b.kvStore) + if err := b.metaClient.Open(); err != nil { + return err } - fmt.Printf("Backup complete") + // Filter through organizations & buckets to backup appropriate shards. + if err := b.backupOrganizations(ctx); err != nil { + return err + } + + if err := b.writeManifest(ctx); err != nil { + return err + } + + b.logger.Info("Backup complete") return nil } + +// backupKVStore streams the bolt KV file to a file at path. +func (b *cmdBackupBuilder) backupKVStore(ctx context.Context) error { + path := filepath.Join(b.outputPath, b.kvPath()) + b.logger.Info("Backing up KV store", zap.String("path", b.kvPath())) + + // Open writer to output file. + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + // Stream bolt file from server, sync, and ensure file closes correctly. + if err := b.backupService.BackupKVStore(ctx, f); err != nil { + return err + } else if err := f.Sync(); err != nil { + return err + } else if err := f.Close(); err != nil { + return err + } + + // Lookup file size. + fi, err := os.Stat(path) + if err != nil { + return err + } + b.manifest.KV = influxdb.ManifestKVEntry{ + FileName: b.kvPath(), + Size: fi.Size(), + } + + return nil +} + +func (b *cmdBackupBuilder) backupOrganizations(ctx context.Context) (err error) { + // Build a filter if org ID or org name were specified. + var filter influxdb.OrganizationFilter + if b.org.id != "" { + if filter.ID, err = influxdb.IDFromString(b.org.id); err != nil { + return err + } + } else if b.org.name != "" { + filter.Name = &b.org.name + } + + // Retrieve a list of all matching organizations. + orgs, _, err := b.kvService.FindOrganizations(ctx, filter) + if err != nil { + return err + } + + // Back up buckets in each matching organization. + for _, org := range orgs { + b.logger.Info("Backing up organization", zap.String("id", org.ID.String()), zap.String("name", org.Name)) + if err := b.backupBuckets(ctx, org); err != nil { + return err + } + } + return nil +} + +func (b *cmdBackupBuilder) backupBuckets(ctx context.Context, org *influxdb.Organization) (err error) { + // Build a filter if bucket ID or bucket name were specified. + var filter influxdb.BucketFilter + filter.OrganizationID = &org.ID + if b.bucketID != "" { + if filter.ID, err = influxdb.IDFromString(b.bucketID); err != nil { + return err + } + } else if b.bucketName != "" { + filter.Name = &b.bucketName + } + + // Retrieve a list of all matching organizations. + buckets, _, err := b.kvService.FindBuckets(ctx, filter) + if err != nil { + return err + } + + // Back up shards in each matching bucket. + for _, bkt := range buckets { + if err := b.backupBucket(ctx, org, bkt); err != nil { + return err + } + } + return nil +} + +func (b *cmdBackupBuilder) backupBucket(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket) (err error) { + b.logger.Info("Backing up bucket", zap.String("id", bkt.ID.String()), zap.String("name", bkt.Name)) + + // Lookup matching database from the meta store. + dbi := b.metaClient.Database(bkt.ID.String()) + if dbi == nil { + return fmt.Errorf("bucket database not found: %s", bkt.ID.String()) + } + + // Iterate over and backup each shard. + for _, rpi := range dbi.RetentionPolicies { + for _, sg := range rpi.ShardGroups { + for _, sh := range sg.Shards { + if err := b.backupShard(ctx, org, bkt, rpi.Name, sh.ID); err != nil { + return err + } + } + } + } + return nil +} + +// backupShard streams a tar of TSM data for shard. +func (b *cmdBackupBuilder) backupShard(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket, policy string, shardID uint64) error { + path := filepath.Join(b.outputPath, b.shardPath(shardID)) + b.logger.Info("Backing up shard", zap.Uint64("id", shardID), zap.String("path", b.shardPath(shardID))) + + // Open writer to output file. + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + // Wrap file writer with a gzip writer. + gw := gzip.NewWriter(f) + defer gw.Close() + + // Stream file from server, sync, and ensure file closes correctly. + if err := b.backupService.BackupShard(ctx, gw, shardID); err != nil { + return err + } else if err := gw.Close(); err != nil { + return err + } else if err := f.Sync(); err != nil { + return err + } else if err := f.Close(); err != nil { + return err + } + + // Determine file size. + fi, err := os.Stat(path) + if err != nil { + return err + } + + // Update manifest. + b.manifest.Files = append(b.manifest.Files, influxdb.ManifestEntry{ + OrganizationID: org.ID.String(), + OrganizationName: org.Name, + BucketID: bkt.ID.String(), + BucketName: bkt.Name, + ShardID: shardID, + FileName: b.shardPath(shardID), + Size: fi.Size(), + LastModified: fi.ModTime().UTC(), + }) + + return nil +} + +// writeManifest writes the manifest file out. +func (b *cmdBackupBuilder) writeManifest(ctx context.Context) error { + path := filepath.Join(b.outputPath, b.manifestPath()) + b.logger.Info("Writing manifest", zap.String("path", b.manifestPath())) + + buf, err := json.MarshalIndent(b.manifest, "", " ") + if err != nil { + return fmt.Errorf("create manifest: %w", err) + } + buf = append(buf, '\n') + return ioutil.WriteFile(path, buf, 0600) +} + +func (b *cmdBackupBuilder) newCmd(use string, runE func(*cobra.Command, []string) error) *cobra.Command { + cmd := b.genericCLIOpts.newCmd(use, runE, true) + b.genericCLIOpts.registerPrintOptions(cmd) + b.globalFlags.registerFlags(cmd) + return cmd +} diff --git a/cmd/influx/main.go b/cmd/influx/main.go index bf2a5b2457..1bdcb9af57 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -328,6 +328,7 @@ func influxCmd(opts ...genericCLIOptFn) *cobra.Command { cmdOrganization, cmdPing, cmdQuery, + cmdRestore, cmdSecret, cmdSetup, cmdStack, diff --git a/cmd/influx/restore.go b/cmd/influx/restore.go new file mode 100644 index 0000000000..1ebdcfdc5e --- /dev/null +++ b/cmd/influx/restore.go @@ -0,0 +1,347 @@ +package main + +import ( + "compress/gzip" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "strings" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/bolt" + "github.com/influxdata/influxdb/v2/http" + "github.com/influxdata/influxdb/v2/kv" + influxlogger "github.com/influxdata/influxdb/v2/logger" + "github.com/influxdata/influxdb/v2/v1/services/meta" + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +func cmdRestore(f *globalFlags, opts genericCLIOpts) *cobra.Command { + return newCmdRestoreBuilder(f, opts).cmdRestore() +} + +type restoreSVCsFn func() (influxdb.RestoreService, error) + +type cmdRestoreBuilder struct { + genericCLIOpts + *globalFlags + + bucketID string + bucketName string + newBucketName string + newOrgName string + org organization + shardID uint64 + path string + + kvEntry *influxdb.ManifestKVEntry + shardEntries map[uint64]*influxdb.ManifestEntry + + orgService *http.OrganizationService + bucketService *http.BucketService + restoreService *http.RestoreService + kvService *kv.Service + metaClient *meta.Client + + logger *zap.Logger +} + +func newCmdRestoreBuilder(f *globalFlags, opts genericCLIOpts) *cmdRestoreBuilder { + return &cmdRestoreBuilder{ + genericCLIOpts: opts, + globalFlags: f, + + shardEntries: make(map[uint64]*influxdb.ManifestEntry), + } +} + +func (b *cmdRestoreBuilder) cmdRestore() *cobra.Command { + cmd := b.newCmd("restore", b.restoreRunE) + b.org.register(cmd, true) + cmd.Flags().StringVar(&b.bucketID, "bucket-id", "", "The ID of the bucket to restore") + cmd.Flags().StringVarP(&b.bucketName, "bucket", "b", "", "The name of the bucket to restore") + cmd.Flags().StringVar(&b.newBucketName, "new-bucket", "", "The name of the bucket to restore to") + cmd.Flags().StringVar(&b.newOrgName, "new-org", "", "The name of the organization to restore to") + cmd.Flags().Uint64Var(&b.shardID, "shard-id", 0, "The shard to restore") + cmd.Flags().StringVar(&b.path, "input", "", "Local backup data path (required)") + cmd.Use = "restore [flags] path" + cmd.Args = func(cmd *cobra.Command, args []string) error { + if len(args) == 0 { + return fmt.Errorf("must specify path to backup directory") + } else if len(args) > 1 { + return fmt.Errorf("too many args specified") + } + b.path = args[0] + return nil + } + cmd.Short = "Restores a backup directory to InfluxDB." + cmd.Long = ` +Restore influxdb. + +Examples: + # restore all data + influx restore /path/to/restore +` + return cmd +} + +func (b *cmdRestoreBuilder) restoreRunE(cmd *cobra.Command, args []string) (err error) { + ctx := context.Background() + + // Create top level logger + logconf := influxlogger.NewConfig() + if b.logger, err = logconf.New(os.Stdout); err != nil { + return err + } + + // Read in set of KV data & shard data to restore. + if err := b.loadIncremental(); err != nil { + return fmt.Errorf("restore failed while processing manifest files: %s", err.Error()) + } else if b.kvEntry == nil { + return fmt.Errorf("No manifest files found in: %s\n", b.path) + + } + + ac := flags.config() + b.restoreService = &http.RestoreService{ + Addr: ac.Host, + Token: ac.Token, + } + + client, err := newHTTPClient() + if err != nil { + return err + } + + b.orgService = &http.OrganizationService{Client: client} + b.bucketService = &http.BucketService{Client: client} + + // Open bolt DB. + boltClient := bolt.NewClient(b.logger) + boltClient.Path = filepath.Join(b.path, b.kvEntry.FileName) + if err := boltClient.Open(ctx); err != nil { + return err + } + defer boltClient.Close() + + // Open meta store so we can iterate over meta data. + kvStore := bolt.NewKVStore(b.logger, boltClient.Path) + kvStore.WithDB(boltClient.DB()) + b.kvService = kv.NewService(b.logger, kvStore, kv.ServiceConfig{}) + + b.metaClient = meta.NewClient(meta.NewConfig(), kvStore) + if err := b.metaClient.Open(); err != nil { + return err + } + + // Filter through organizations & buckets to restore appropriate shards. + if err := b.restoreOrganizations(ctx); err != nil { + return err + } + + b.logger.Info("Restore complete") + + return nil +} + +func (b *cmdRestoreBuilder) restoreOrganizations(ctx context.Context) (err error) { + // Build a filter if org ID or org name were specified. + var filter influxdb.OrganizationFilter + if b.org.id != "" { + if filter.ID, err = influxdb.IDFromString(b.org.id); err != nil { + return err + } + } else if b.org.name != "" { + filter.Name = &b.org.name + } + + // Retrieve a list of all matching organizations. + orgs, _, err := b.kvService.FindOrganizations(ctx, filter) + if err != nil { + return err + } + + // Restore matching organizations. + for _, org := range orgs { + if err := b.restoreOrganization(ctx, org); err != nil { + return err + } + } + return nil +} + +func (b *cmdRestoreBuilder) restoreOrganization(ctx context.Context, org *influxdb.Organization) (err error) { + b.logger.Info("Restoring organization", zap.String("id", org.ID.String()), zap.String("name", org.Name)) + + // Create organization on server, if it doesn't already exist. + if a, _, err := b.orgService.FindOrganizations(ctx, influxdb.OrganizationFilter{Name: &org.Name}); err != nil { + return fmt.Errorf("cannot find existing organization: %w", err) + } else if len(a) == 0 { + tmp := *org // copy so we don't lose our ID + if err := b.orgService.CreateOrganization(ctx, &tmp); err != nil { + return fmt.Errorf("cannot create organization: %w", err) + } + } + + // Build a filter if bucket ID or bucket name were specified. + var filter influxdb.BucketFilter + filter.OrganizationID = &org.ID + if b.bucketID != "" { + if filter.ID, err = influxdb.IDFromString(b.bucketID); err != nil { + return err + } + } else if b.bucketName != "" { + filter.Name = &b.bucketName + } + + // Retrieve a list of all buckets for the organization in the local backup. + buckets, _, err := b.kvService.FindBuckets(ctx, filter) + if err != nil { + return err + } + + // Restore each matching bucket. + for _, bkt := range buckets { + // Skip internal buckets. + if strings.HasPrefix(bkt.Name, "_") { + continue + } + + if err := b.restoreBucket(ctx, org, bkt); err != nil { + return err + } + } + return nil +} + +func (b *cmdRestoreBuilder) restoreBucket(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket) (err error) { + b.logger.Info("Restoring bucket", zap.String("id", bkt.ID.String()), zap.String("name", bkt.Name)) + + // Create bucket on server. + newBucket := *bkt + if b.newBucketName != "" { + newBucket.Name = b.newBucketName + } + if err := b.bucketService.CreateBucket(ctx, &newBucket); err != nil { + return fmt.Errorf("cannot create bucket: %w", err) + } + + // Lookup matching database from the meta store. + dbi := b.metaClient.Database(bkt.ID.String()) + if dbi == nil { + return fmt.Errorf("bucket database not found: %s", bkt.ID.String()) + } + + // Serialize to protobufs. + buf, err := dbi.MarshalBinary() + if err != nil { + return fmt.Errorf("cannot marshal database info: %w", err) + } + + shardIDMap, err := b.restoreService.RestoreBucket(ctx, newBucket.ID, buf) + if err != nil { + return fmt.Errorf("cannot restore bucket: %w", err) + } + + // Restore each shard for the bucket. + for _, file := range b.shardEntries { + if bkt.ID.String() != file.BucketID { + continue + } else if b.shardID != 0 && b.shardID != file.ShardID { + continue + } + + // Skip if shard metadata was not imported. + newID, ok := shardIDMap[file.ShardID] + if !ok { + b.logger.Warn("Meta info not found, skipping file", zap.Uint64("shard", file.ShardID), zap.String("bucket_id", file.BucketID), zap.String("filename", file.FileName)) + return nil + } + + if err := b.restoreShard(ctx, newID, file); err != nil { + return err + } + } + + return nil +} + +func (b *cmdRestoreBuilder) restoreShard(ctx context.Context, newShardID uint64, file *influxdb.ManifestEntry) error { + b.logger.Info("Restoring shard live from backup", zap.Uint64("shard", newShardID), zap.String("filename", file.FileName)) + + f, err := os.Open(filepath.Join(b.path, file.FileName)) + if err != nil { + return err + } + defer f.Close() + + gr, err := gzip.NewReader(f) + if err != nil { + return err + } + defer gr.Close() + + return b.restoreService.RestoreShard(ctx, newShardID, gr) +} + +// loadIncremental loads multiple manifest files from a given directory. +func (b *cmdRestoreBuilder) loadIncremental() error { + // Read all manifest files from path, sort in descending time. + manifests, err := filepath.Glob(filepath.Join(b.path, "*.manifest")) + if err != nil { + return err + } else if len(manifests) == 0 { + return nil + } + sort.Sort(sort.Reverse(sort.StringSlice(manifests))) + + b.shardEntries = make(map[uint64]*influxdb.ManifestEntry) + for _, filename := range manifests { + // Skip file if it is a directory. + if fi, err := os.Stat(filename); err != nil { + return err + } else if fi.IsDir() { + continue + } + + // Read manifest file for backup. + var manifest influxdb.Manifest + if buf, err := ioutil.ReadFile(filename); err != nil { + return err + } else if err := json.Unmarshal(buf, &manifest); err != nil { + return fmt.Errorf("read manifest: %v", err) + } + + // Save latest KV entry. + if b.kvEntry == nil { + b.kvEntry = &manifest.KV + } + + // Load most recent backup per shard. + for i := range manifest.Files { + sh := manifest.Files[i] + if _, err := os.Stat(filepath.Join(b.path, sh.FileName)); err != nil { + continue + } + + entry := b.shardEntries[sh.ShardID] + if entry == nil || sh.LastModified.After(entry.LastModified) { + b.shardEntries[sh.ShardID] = &sh + } + } + } + + return nil +} + +func (b *cmdRestoreBuilder) newCmd(use string, runE func(*cobra.Command, []string) error) *cobra.Command { + cmd := b.genericCLIOpts.newCmd(use, runE, true) + b.genericCLIOpts.registerPrintOptions(cmd) + b.globalFlags.registerFlags(cmd) + return cmd +} diff --git a/cmd/influxd/launcher/engine.go b/cmd/influxd/launcher/engine.go index 78c653989a..c12af95de7 100644 --- a/cmd/influxd/launcher/engine.go +++ b/cmd/influxd/launcher/engine.go @@ -27,6 +27,7 @@ type Engine interface { storage.EngineSchema prom.PrometheusCollector influxdb.BackupService + influxdb.RestoreService SeriesCardinality(orgID, bucketID influxdb.ID) int64 @@ -157,16 +158,20 @@ func (t *TemporaryEngine) Flush(ctx context.Context) { } } -func (t *TemporaryEngine) CreateBackup(ctx context.Context) (int, []string, error) { - return t.engine.CreateBackup(ctx) +func (t *TemporaryEngine) BackupKVStore(ctx context.Context, w io.Writer) error { + return t.engine.BackupKVStore(ctx, w) } -func (t *TemporaryEngine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { - return t.engine.FetchBackupFile(ctx, backupID, backupFile, w) +func (t *TemporaryEngine) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (map[uint64]uint64, error) { + return t.engine.RestoreBucket(ctx, id, dbi) } -func (t *TemporaryEngine) InternalBackupPath(backupID int) string { - return t.engine.InternalBackupPath(backupID) +func (t *TemporaryEngine) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error { + return t.engine.BackupShard(ctx, w, shardID) +} + +func (t *TemporaryEngine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error { + return t.engine.RestoreShard(ctx, shardID, r) } func (t *TemporaryEngine) TSDBStore() storage.TSDBStore { diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index a4408a2e63..a4cfa75579 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -854,9 +854,10 @@ func (m *Launcher) run(ctx context.Context) (err error) { m.reg.MustRegister(m.engine.PrometheusCollectors()...) var ( - deleteService platform.DeleteService = m.engine - pointsWriter storage.PointsWriter = m.engine - backupService platform.BackupService = m.engine + deleteService platform.DeleteService = m.engine + pointsWriter storage.PointsWriter = m.engine + backupService platform.BackupService = m.engine + restoreService platform.RestoreService = m.engine ) deps, err := influxdb.NewDependencies( @@ -1208,7 +1209,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { }, DeleteService: deleteService, BackupService: backupService, - KVBackupService: m.kvService, + RestoreService: restoreService, AuthorizationService: authSvc, AuthorizerV1: authorizerV1, AlgoWProxy: &http.NoopProxyHandler{}, diff --git a/cmd/influxd/restore/command.go b/cmd/influxd/restore/command.go deleted file mode 100644 index 7617e54a0a..0000000000 --- a/cmd/influxd/restore/command.go +++ /dev/null @@ -1,304 +0,0 @@ -package restore - -import ( - "fmt" - "io" - "os" - "path/filepath" - "strings" - - "github.com/influxdata/influxdb/v2/bolt" - "github.com/influxdata/influxdb/v2/internal/fs" - "github.com/influxdata/influxdb/v2/kit/cli" - "github.com/spf13/cobra" -) - -var Command = &cobra.Command{ - Use: "restore", - Short: "Restore data and metadata from a backup", - Long: ` -This command restores data and metadata from a backup fileset. - -Any existing metadata and data will be temporarily moved while restore runs -and deleted after restore completes. - -Rebuilding the index and series file uses default options as in -"influxd inspect build-tsi" with the given target engine path. -For additional performance options, run restore with "-rebuild-index false" -and build-tsi afterwards. - -NOTES: - -* The influxd server should not be running when using the restore tool - as it replaces all data and metadata. -`, - Args: cobra.ExactArgs(0), - RunE: restoreE, -} - -var flags struct { - boltPath string - enginePath string - credPath string - backupPath string - rebuildTSI bool -} - -func init() { - dir, err := fs.InfluxDir() - if err != nil { - panic(fmt.Errorf("failed to determine influx directory: %s", err)) - } - - Command.Flags().SortFlags = false - - pfs := Command.PersistentFlags() - pfs.SortFlags = false - - opts := []cli.Opt{ - { - DestP: &flags.boltPath, - Flag: "bolt-path", - Default: filepath.Join(dir, bolt.DefaultFilename), - Desc: "path to target boltdb database", - }, - { - DestP: &flags.enginePath, - Flag: "engine-path", - Default: filepath.Join(dir, "engine"), - Desc: "path to target persistent engine files", - }, - { - DestP: &flags.credPath, - Flag: "credentials-path", - Default: filepath.Join(dir, fs.DefaultTokenFile), - Desc: "path to target credentials file", - }, - { - DestP: &flags.backupPath, - Flag: "backup-path", - Default: "", - Desc: "path to backup files", - }, - { - DestP: &flags.rebuildTSI, - Flag: "rebuild-index", - Default: true, - Desc: "if true, rebuild the TSI index and series file based on the given engine path (equivalent to influxd inspect build-tsi)", - }, - } - - cli.BindOptions(Command, opts) -} - -func restoreE(cmd *cobra.Command, args []string) error { - if flags.backupPath == "" { - return fmt.Errorf("no backup path given") - } - - if err := moveBolt(); err != nil { - return fmt.Errorf("failed to move existing bolt file: %v", err) - } - - if err := moveCredentials(); err != nil { - return fmt.Errorf("failed to move existing credentials file: %v", err) - } - - if err := moveEngine(); err != nil { - return fmt.Errorf("failed to move existing engine data: %v", err) - } - - if err := restoreBolt(); err != nil { - return fmt.Errorf("failed to restore bolt file: %v", err) - } - - if err := restoreCred(); err != nil { - return fmt.Errorf("failed to restore credentials file: %v", err) - } - - if err := restoreEngine(); err != nil { - return fmt.Errorf("failed to restore all TSM files: %v", err) - } - - if flags.rebuildTSI { - // FIXME: Implement rebuildTSI - panic("not implemented") - //sFilePath := filepath.Join(flags.enginePath, storage.DefaultSeriesFileDirectoryName) - //indexPath := filepath.Join(flags.enginePath, storage.DefaultIndexDirectoryName) - - //rebuild := inspect.NewBuildTSICommand() - //rebuild.SetArgs([]string{"--sfile-path", sFilePath, "--tsi-path", indexPath}) - //rebuild.Execute() - } - - if err := removeTmpBolt(); err != nil { - return fmt.Errorf("restore completed, but failed to cleanup temporary bolt file: %v", err) - } - - if err := removeTmpCred(); err != nil { - return fmt.Errorf("restore completed, but failed to cleanup temporary credentials file: %v", err) - } - - if err := removeTmpEngine(); err != nil { - return fmt.Errorf("restore completed, but failed to cleanup temporary engine data: %v", err) - } - - return nil -} - -func moveBolt() error { - if _, err := os.Stat(flags.boltPath); os.IsNotExist(err) { - return nil - } else if err != nil { - return err - } - - if err := removeTmpBolt(); err != nil { - return err - } - - return os.Rename(flags.boltPath, flags.boltPath+".tmp") -} - -func moveCredentials() error { - if _, err := os.Stat(flags.credPath); os.IsNotExist(err) { - return nil - } else if err != nil { - return err - } - - if err := removeTmpCred(); err != nil { - return err - } - - return os.Rename(flags.credPath, flags.credPath+".tmp") -} - -func moveEngine() error { - if _, err := os.Stat(flags.enginePath); os.IsNotExist(err) { - return nil - } else if err != nil { - return err - } - - if err := removeTmpEngine(); err != nil { - return err - } - - if err := os.Rename(flags.enginePath, tmpEnginePath()); err != nil { - return err - } - - return os.MkdirAll(flags.enginePath, 0777) -} - -func tmpEnginePath() string { - return filepath.Dir(flags.enginePath) + "tmp" -} - -func removeTmpBolt() error { - return removeIfExists(flags.boltPath + ".tmp") -} - -func removeTmpEngine() error { - return removeIfExists(tmpEnginePath()) -} - -func removeTmpCred() error { - return removeIfExists(flags.credPath + ".tmp") -} - -func removeIfExists(path string) error { - if _, err := os.Stat(path); os.IsNotExist(err) { - return nil - } else if err != nil { - return err - } else { - return os.RemoveAll(path) - } -} - -func restoreBolt() error { - backupBolt := filepath.Join(flags.backupPath, bolt.DefaultFilename) - - if err := restoreFile(backupBolt, flags.boltPath, "bolt"); err != nil { - return err - } - - fmt.Printf("Restored Bolt to %s from %s\n", flags.boltPath, backupBolt) - return nil -} - -func restoreEngine() error { - dataDir := filepath.Join(flags.enginePath, "/data") - if err := os.MkdirAll(dataDir, 0777); err != nil { - return err - } - - count := 0 - err := filepath.Walk(flags.backupPath, func(path string, info os.FileInfo, err error) error { - if strings.Contains(path, ".tsm") { - f, err := os.OpenFile(path, os.O_RDONLY, 0666) - if err != nil { - return fmt.Errorf("error opening TSM file: %v", err) - } - defer f.Close() - - tsmPath := filepath.Join(dataDir, filepath.Base(path)) - w, err := os.OpenFile(tsmPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) - if err != nil { - return err - } - defer w.Close() - - _, err = io.Copy(w, f) - if err != nil { - return err - } - count++ - return nil - } - return nil - }) - fmt.Printf("Restored %d TSM files to %v\n", count, dataDir) - return err -} - -func restoreFile(backup string, target string, filetype string) error { - f, err := os.Open(backup) - if err != nil { - return fmt.Errorf("no %s file in backup: %v", filetype, err) - } - defer f.Close() - - if err := os.MkdirAll(filepath.Dir(target), 0777); err != nil { - return err - } - w, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) - if err != nil && !os.IsNotExist(err) { - return err - } - defer w.Close() - - _, err = io.Copy(w, f) - return err -} - -func restoreCred() error { - backupCred := filepath.Join(flags.backupPath, fs.DefaultTokenFile) - - _, err := os.Stat(backupCred) - if os.IsNotExist(err) { - fmt.Printf("No credentials file found in backup, skipping.\n") - return nil - } else if err != nil { - return err - } - - if err := restoreFile(backupCred, flags.credPath, "credentials"); err != nil { - return err - } - - fmt.Printf("Restored credentials to %s from %s\n", flags.credPath, backupCred) - return nil -} diff --git a/http/api_handler.go b/http/api_handler.go index e880d101f9..d8d5376c11 100644 --- a/http/api_handler.go +++ b/http/api_handler.go @@ -58,7 +58,7 @@ type APIBackend struct { PointsWriter storage.PointsWriter DeleteService influxdb.DeleteService BackupService influxdb.BackupService - KVBackupService influxdb.KVBackupService + RestoreService influxdb.RestoreService AuthorizationService influxdb.AuthorizationService AuthorizerV1 influxdb.AuthorizerV1 OnboardingService influxdb.OnboardingService @@ -193,6 +193,10 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler { backupBackend.BackupService = authorizer.NewBackupService(backupBackend.BackupService) h.Mount(prefixBackup, NewBackupHandler(backupBackend)) + restoreBackend := NewRestoreBackend(b) + restoreBackend.RestoreService = authorizer.NewRestoreService(restoreBackend.RestoreService) + h.Mount(prefixRestore, NewRestoreHandler(restoreBackend)) + h.Mount(dbrp.PrefixDBRP, dbrp.NewHTTPHandler(b.Logger, b.DBRPService, b.OrganizationService)) writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b) @@ -234,6 +238,7 @@ var apiLinks = map[string]interface{}{ "analyze": "/api/v2/query/analyze", "suggestions": "/api/v2/query/suggestions", }, + "restore": "/api/v2/restore", "setup": "/api/v2/setup", "signin": "/api/v2/signin", "signout": "/api/v2/signout", diff --git a/http/backup_service.go b/http/backup_service.go index f840c14dda..ddc2acbc06 100644 --- a/http/backup_service.go +++ b/http/backup_service.go @@ -2,23 +2,18 @@ package http import ( "context" - "encoding/json" "fmt" "io" - "io/ioutil" "net/http" - "os" - "path" "path/filepath" "strconv" "time" "github.com/influxdata/httprouter" "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/bolt" + // "github.com/influxdata/influxdb/v2/bolt" "github.com/influxdata/influxdb/v2/internal/fs" "github.com/influxdata/influxdb/v2/kit/tracing" - "go.uber.org/multierr" "go.uber.org/zap" ) @@ -27,8 +22,7 @@ type BackupBackend struct { Logger *zap.Logger influxdb.HTTPErrorHandler - BackupService influxdb.BackupService - KVBackupService influxdb.KVBackupService + BackupService influxdb.BackupService } // NewBackupBackend returns a new instance of BackupBackend. @@ -38,7 +32,6 @@ func NewBackupBackend(b *APIBackend) *BackupBackend { HTTPErrorHandler: b.HTTPErrorHandler, BackupService: b.BackupService, - KVBackupService: b.KVBackupService, } } @@ -48,23 +41,17 @@ type BackupHandler struct { influxdb.HTTPErrorHandler Logger *zap.Logger - BackupService influxdb.BackupService - KVBackupService influxdb.KVBackupService + BackupService influxdb.BackupService } const ( - prefixBackup = "/api/v2/backup" - backupIDParamName = "backup_id" - backupFileParamName = "backup_file" - backupFilePath = prefixBackup + "/:" + backupIDParamName + "/file/:" + backupFileParamName + prefixBackup = "/api/v2/backup" + backupKVStorePath = prefixBackup + "/kv" + backupShardPath = prefixBackup + "/shards/:shardID" httpClientTimeout = time.Hour ) -func composeBackupFilePath(backupID int, backupFile string) string { - return path.Join(prefixBackup, fmt.Sprint(backupID), "file", fmt.Sprint(backupFile)) -} - // NewBackupHandler creates a new handler at /api/v2/backup to receive backup requests. func NewBackupHandler(b *BackupBackend) *BackupHandler { h := &BackupHandler{ @@ -72,107 +59,40 @@ func NewBackupHandler(b *BackupBackend) *BackupHandler { Router: NewRouter(b.HTTPErrorHandler), Logger: b.Logger, BackupService: b.BackupService, - KVBackupService: b.KVBackupService, } - h.HandlerFunc(http.MethodPost, prefixBackup, h.handleCreate) - h.HandlerFunc(http.MethodGet, backupFilePath, h.handleFetchFile) + h.HandlerFunc(http.MethodGet, backupKVStorePath, h.handleBackupKVStore) + h.HandlerFunc(http.MethodGet, backupShardPath, h.handleBackupShard) return h } -type backup struct { - ID int `json:"id,omitempty"` - Files []string `json:"files,omitempty"` -} - -func (h *BackupHandler) handleCreate(w http.ResponseWriter, r *http.Request) { - span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleCreate") +func (h *BackupHandler) handleBackupKVStore(w http.ResponseWriter, r *http.Request) { + span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleBackupKVStore") defer span.Finish() ctx := r.Context() - id, files, err := h.BackupService.CreateBackup(ctx) - if err != nil { - h.HandleHTTPError(ctx, err, w) - return - } - - internalBackupPath := h.BackupService.InternalBackupPath(id) - - boltPath := filepath.Join(internalBackupPath, bolt.DefaultFilename) - boltFile, err := os.OpenFile(boltPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0660) - if err != nil { - err = multierr.Append(err, os.RemoveAll(internalBackupPath)) - h.HandleHTTPError(ctx, err, w) - return - } - - if err = h.KVBackupService.Backup(ctx, boltFile); err != nil { - err = multierr.Append(err, os.RemoveAll(internalBackupPath)) - h.HandleHTTPError(ctx, err, w) - return - } - - files = append(files, bolt.DefaultFilename) - - credsExist, err := h.backupCredentials(internalBackupPath) - - if err != nil { - h.HandleHTTPError(ctx, err, w) - return - } - - if credsExist { - files = append(files, fs.DefaultConfigsFile) - } - - b := backup{ - ID: id, - Files: files, - } - if err = json.NewEncoder(w).Encode(&b); err != nil { - err = multierr.Append(err, os.RemoveAll(internalBackupPath)) + if err := h.BackupService.BackupKVStore(ctx, w); err != nil { h.HandleHTTPError(ctx, err, w) return } } -func (h *BackupHandler) backupCredentials(internalBackupPath string) (bool, error) { - credBackupPath := filepath.Join(internalBackupPath, fs.DefaultConfigsFile) - - credPath, err := defaultConfigsPath() - if err != nil { - return false, err - } - token, err := ioutil.ReadFile(credPath) - if err != nil && !os.IsNotExist(err) { - return false, err - } else if os.IsNotExist(err) { - return false, nil - } - - if err := ioutil.WriteFile(credBackupPath, []byte(token), 0600); err != nil { - return false, err - } - return true, nil -} - -func (h *BackupHandler) handleFetchFile(w http.ResponseWriter, r *http.Request) { - span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleFetchFile") +func (h *BackupHandler) handleBackupShard(w http.ResponseWriter, r *http.Request) { + span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleBackupShard") defer span.Finish() ctx := r.Context() params := httprouter.ParamsFromContext(ctx) - backupID, err := strconv.Atoi(params.ByName("backup_id")) + shardID, err := strconv.ParseUint(params.ByName("shardID"), 10, 64) if err != nil { h.HandleHTTPError(ctx, err, w) return } - backupFile := params.ByName("backup_file") - if err = h.BackupService.FetchBackupFile(ctx, backupID, backupFile, w); err != nil { + if err := h.BackupService.BackupShard(ctx, w, shardID); err != nil { h.HandleHTTPError(ctx, err, w) return } @@ -185,47 +105,11 @@ type BackupService struct { InsecureSkipVerify bool } -func (s *BackupService) CreateBackup(ctx context.Context) (int, []string, error) { +func (s *BackupService) BackupKVStore(ctx context.Context, w io.Writer) error { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() - u, err := NewURL(s.Addr, prefixBackup) - if err != nil { - return 0, nil, err - } - - req, err := http.NewRequest(http.MethodPost, u.String(), nil) - if err != nil { - return 0, nil, err - } - SetToken(s.Token, req) - req = req.WithContext(ctx) - - hc := NewClient(u.Scheme, s.InsecureSkipVerify) - hc.Timeout = httpClientTimeout - resp, err := hc.Do(req) - if err != nil { - return 0, nil, err - } - defer resp.Body.Close() - - if err := CheckError(resp); err != nil { - return 0, nil, err - } - - var b backup - if err = json.NewDecoder(resp.Body).Decode(&b); err != nil { - return 0, nil, err - } - - return b.ID, b.Files, nil -} - -func (s *BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { - span, ctx := tracing.StartSpanFromContext(ctx) - defer span.Finish() - - u, err := NewURL(s.Addr, composeBackupFilePath(backupID, backupFile)) + u, err := NewURL(s.Addr, prefixBackup+"/kv") if err != nil { return err } @@ -249,12 +133,44 @@ func (s *BackupService) FetchBackupFile(ctx context.Context, backupID int, backu return err } - _, err = io.Copy(w, resp.Body) + if _, err := io.Copy(w, resp.Body); err != nil { + return err + } + return resp.Body.Close() +} + +func (s *BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + u, err := NewURL(s.Addr, fmt.Sprintf(prefixBackup+"/shards/%d", shardID)) if err != nil { return err } - return nil + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return err + } + SetToken(s.Token, req) + req = req.WithContext(ctx) + + hc := NewClient(u.Scheme, s.InsecureSkipVerify) + hc.Timeout = httpClientTimeout + resp, err := hc.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + return err + } + + if _, err := io.Copy(w, resp.Body); err != nil { + return err + } + return resp.Body.Close() } func defaultConfigsPath() (string, error) { @@ -264,7 +180,3 @@ func defaultConfigsPath() (string, error) { } return filepath.Join(dir, fs.DefaultConfigsFile), nil } - -func (s *BackupService) InternalBackupPath(backupID int) string { - panic("internal method not implemented here") -} diff --git a/http/restore_service.go b/http/restore_service.go new file mode 100644 index 0000000000..4ebbe91387 --- /dev/null +++ b/http/restore_service.go @@ -0,0 +1,189 @@ +package http + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "strconv" + + "github.com/influxdata/httprouter" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/tracing" + "go.uber.org/zap" +) + +// RestoreBackend is all services and associated parameters required to construct the RestoreHandler. +type RestoreBackend struct { + Logger *zap.Logger + influxdb.HTTPErrorHandler + + RestoreService influxdb.RestoreService +} + +// NewRestoreBackend returns a new instance of RestoreBackend. +func NewRestoreBackend(b *APIBackend) *RestoreBackend { + return &RestoreBackend{ + Logger: b.Logger.With(zap.String("handler", "restore")), + + HTTPErrorHandler: b.HTTPErrorHandler, + RestoreService: b.RestoreService, + } +} + +// RestoreHandler is http handler for restore service. +type RestoreHandler struct { + *httprouter.Router + influxdb.HTTPErrorHandler + Logger *zap.Logger + + RestoreService influxdb.RestoreService +} + +const ( + prefixRestore = "/api/v2/restore" + restoreBucketPath = prefixRestore + "/buckets/:bucketID" + restoreShardPath = prefixRestore + "/shards/:shardID" +) + +// NewRestoreHandler creates a new handler at /api/v2/restore to receive restore requests. +func NewRestoreHandler(b *RestoreBackend) *RestoreHandler { + h := &RestoreHandler{ + HTTPErrorHandler: b.HTTPErrorHandler, + Router: NewRouter(b.HTTPErrorHandler), + Logger: b.Logger, + RestoreService: b.RestoreService, + } + + h.HandlerFunc(http.MethodPost, restoreBucketPath, h.handleRestoreBucket) + h.HandlerFunc(http.MethodPost, restoreShardPath, h.handleRestoreShard) + + return h +} + +func (h *RestoreHandler) handleRestoreBucket(w http.ResponseWriter, r *http.Request) { + span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreBucket") + defer span.Finish() + + ctx := r.Context() + + // Read bucket ID. + bucketID, err := decodeIDFromCtx(r.Context(), "bucketID") + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + // Read serialized DBI data. + buf, err := ioutil.ReadAll(r.Body) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + shardIDMap, err := h.RestoreService.RestoreBucket(ctx, bucketID, buf) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + if err := json.NewEncoder(w).Encode(shardIDMap); err != nil { + h.HandleHTTPError(ctx, err, w) + return + } +} + +func (h *RestoreHandler) handleRestoreShard(w http.ResponseWriter, r *http.Request) { + span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreShard") + defer span.Finish() + + ctx := r.Context() + + params := httprouter.ParamsFromContext(ctx) + shardID, err := strconv.ParseUint(params.ByName("shardID"), 10, 64) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + if err := h.RestoreService.RestoreShard(ctx, shardID, r.Body); err != nil { + h.HandleHTTPError(ctx, err, w) + return + } +} + +// RestoreService is the client implementation of influxdb.RestoreService. +type RestoreService struct { + Addr string + Token string + InsecureSkipVerify bool +} + +func (s *RestoreService) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (map[uint64]uint64, error) { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + u, err := NewURL(s.Addr, prefixRestore+fmt.Sprintf("/buckets/%s", id.String())) + if err != nil { + return nil, err + } + + req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewReader(dbi)) + if err != nil { + return nil, err + } + SetToken(s.Token, req) + req = req.WithContext(ctx) + + hc := NewClient(u.Scheme, s.InsecureSkipVerify) + hc.Timeout = httpClientTimeout + resp, err := hc.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + return nil, err + } + + shardIDMap := make(map[uint64]uint64) + if err := json.NewDecoder(resp.Body).Decode(&shardIDMap); err != nil { + return nil, err + } + return shardIDMap, nil +} + +func (s *RestoreService) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + u, err := NewURL(s.Addr, fmt.Sprintf(prefixRestore+"/shards/%d", shardID)) + if err != nil { + return err + } + + req, err := http.NewRequest(http.MethodPost, u.String(), r) + if err != nil { + return err + } + SetToken(s.Token, req) + req = req.WithContext(ctx) + + hc := NewClient(u.Scheme, s.InsecureSkipVerify) + hc.Timeout = httpClientTimeout + resp, err := hc.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + return err + } + + return nil +} diff --git a/storage/bucket_service.go b/storage/bucket_service.go index 8f86ced624..df9335638d 100644 --- a/storage/bucket_service.go +++ b/storage/bucket_service.go @@ -33,6 +33,7 @@ func NewBucketService(log *zap.Logger, s influxdb.BucketService, engine EngineSc BucketService: s, log: log, engine: engine, + log: logger, } } diff --git a/storage/engine.go b/storage/engine.go index d9edcb536a..c90c603b2b 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -79,6 +79,9 @@ type MetaClient interface { RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error + Backup(ctx context.Context, w io.Writer) error + Data() meta.Data + SetData(data *meta.Data) error } type TSDBStore interface { @@ -306,41 +309,105 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID return ErrNotImplemented } -// CreateBackup creates a "snapshot" of all TSM data in the Engine. -// 1) Snapshot the cache to ensure the backup includes all data written before now. -// 2) Create hard links to all TSM files, in a new directory within the engine root directory. -// 3) Return a unique backup ID (invalid after the process terminates) and list of files. -// -// TODO - do we need this? -// -func (e *Engine) CreateBackup(ctx context.Context) (int, []string, error) { +func (e *Engine) BackupKVStore(ctx context.Context, w io.Writer) error { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() - if e.closing == nil { - return 0, nil, ErrEngineClosed - } - - return 0, nil, nil -} - -// FetchBackupFile writes a given backup file to the provided writer. -// After a successful write, the internal copy is removed. -func (e *Engine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { - // TODO - need? - return nil -} - -// InternalBackupPath provides the internal, full path directory name of the backup. -// This should not be exposed via API. -func (e *Engine) InternalBackupPath(backupID int) string { e.mu.RLock() defer e.mu.RUnlock() + if e.closing == nil { - return "" + return ErrEngineClosed } - // TODO - need? - return "" + + return e.metaClient.Backup(ctx, w) +} + +func (e *Engine) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error { + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + e.mu.RLock() + defer e.mu.RUnlock() + + if e.closing == nil { + return ErrEngineClosed + } + + return e.tsdbStore.BackupShard(shardID, time.Time{}, w) +} + +func (e *Engine) RestoreBucket(ctx context.Context, id influxdb.ID, buf []byte) (map[uint64]uint64, error) { + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + e.mu.RLock() + defer e.mu.RUnlock() + + if e.closing == nil { + return nil, ErrEngineClosed + } + + var newDBI meta.DatabaseInfo + if err := newDBI.UnmarshalBinary(buf); err != nil { + return nil, err + } + + data := e.metaClient.Data() + dbi := data.Database(id.String()) + if dbi == nil { + return nil, fmt.Errorf("bucket dbi for %q not found during restore", newDBI.Name) + } else if len(newDBI.RetentionPolicies) != 1 { + return nil, fmt.Errorf("bucket must have 1 retention policy; attempting to restore %d retention policies", len(newDBI.RetentionPolicies)) + } + + dbi.RetentionPolicies = newDBI.RetentionPolicies + dbi.ContinuousQueries = newDBI.ContinuousQueries + + // Generate shard ID mapping. + shardIDMap := make(map[uint64]uint64) + rpi := newDBI.RetentionPolicies[0] + for j, sgi := range rpi.ShardGroups { + data.MaxShardGroupID++ + rpi.ShardGroups[j].ID = data.MaxShardGroupID + + for k := range sgi.Shards { + data.MaxShardID++ + shardIDMap[sgi.Shards[k].ID] = data.MaxShardID + sgi.Shards[k].ID = data.MaxShardID + sgi.Shards[k].Owners = []meta.ShardOwner{} + } + } + + // Update data. + if err := e.metaClient.SetData(&data); err != nil { + return nil, err + } + + // Create shards. + for _, sgi := range rpi.ShardGroups { + for _, sh := range sgi.Shards { + if err := e.tsdbStore.CreateShard(dbi.Name, rpi.Name, sh.ID, true); err != nil { + return nil, err + } + } + } + + return shardIDMap, nil +} + +func (e *Engine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error { + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + e.mu.RLock() + defer e.mu.RUnlock() + + if e.closing == nil { + return ErrEngineClosed + } + + return e.tsdbStore.RestoreShard(shardID, r) } // SeriesCardinality returns the number of series in the engine. diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index cd4472d601..93ae652e43 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1199,15 +1199,7 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as return "", nil } - nativeFileName := filepath.FromSlash(hdr.Name) - // Skip file if it does not have a matching prefix. - if !strings.HasPrefix(nativeFileName, shardRelativePath) { - return "", nil - } - filename, err := filepath.Rel(shardRelativePath, nativeFileName) - if err != nil { - return "", err - } + filename := filepath.Base(filepath.FromSlash(hdr.Name)) // If this is a directory entry (usually just `index` for tsi), create it an move on. if hdr.Typeflag == tar.TypeDir { diff --git a/v1/services/meta/client.go b/v1/services/meta/client.go index fd1ec402e3..2d4466c3de 100644 --- a/v1/services/meta/client.go +++ b/v1/services/meta/client.go @@ -1036,6 +1036,10 @@ func (c *Client) Load() error { }) } +func (c *Client) Backup(ctx context.Context, w io.Writer) error { + return c.store.Backup(ctx, w) +} + type uint64Slice []uint64 func (a uint64Slice) Len() int { return len(a) } diff --git a/v1/services/meta/data.go b/v1/services/meta/data.go index fa2c8b6650..709dc32f99 100644 --- a/v1/services/meta/data.go +++ b/v1/services/meta/data.go @@ -994,6 +994,21 @@ func (di DatabaseInfo) clone() DatabaseInfo { return other } +// MarshalBinary encodes dbi to a binary format. +func (dbi *DatabaseInfo) MarshalBinary() ([]byte, error) { + return proto.Marshal(dbi.marshal()) +} + +// UnmarshalBinary decodes dbi from a binary format. +func (dbi *DatabaseInfo) UnmarshalBinary(data []byte) error { + var pb internal.DatabaseInfo + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + dbi.unmarshal(&pb) + return nil +} + // marshal serializes to a protobuf representation. func (di DatabaseInfo) marshal() *internal.DatabaseInfo { pb := &internal.DatabaseInfo{} From 6e1097c625906a1e641034020b4285115aa1a008 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 30 Oct 2020 13:46:54 -0600 Subject: [PATCH 2/7] feat: Minor updates to backup/restore --- authorizer/backup.go | 5 ++-- backup.go | 2 +- bucket.go | 6 +++++ cmd/influx/backup.go | 20 ++++++++-------- cmd/influx/restore.go | 42 +++++++++++++++++++++------------- cmd/influxd/launcher/engine.go | 4 ++-- http/backup_service.go | 26 +++++++++++---------- storage/engine.go | 4 ++-- 8 files changed, 63 insertions(+), 46 deletions(-) diff --git a/authorizer/backup.go b/authorizer/backup.go index c19d6c937d..98966643d5 100644 --- a/authorizer/backup.go +++ b/authorizer/backup.go @@ -3,6 +3,7 @@ package authorizer import ( "context" "io" + "time" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/tracing" @@ -34,7 +35,7 @@ func (b BackupService) BackupKVStore(ctx context.Context, w io.Writer) error { return b.s.BackupKVStore(ctx, w) } -func (b BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error { +func (b BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uint64, since time.Time) error { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -42,5 +43,5 @@ func (b BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uin if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { return err } - return b.s.BackupShard(ctx, w, shardID) + return b.s.BackupShard(ctx, w, shardID, since) } diff --git a/backup.go b/backup.go index 8777542c9b..c14bfd7e16 100644 --- a/backup.go +++ b/backup.go @@ -16,7 +16,7 @@ type BackupService interface { BackupKVStore(ctx context.Context, w io.Writer) error // BackupShard downloads a backup file for a single shard. - BackupShard(ctx context.Context, w io.Writer, shardID uint64) error + BackupShard(ctx context.Context, w io.Writer, shardID uint64, since time.Time) error } // RestoreService represents the data restore functions of InfluxDB. diff --git a/bucket.go b/bucket.go index b2d8070b7d..9dbc70552a 100644 --- a/bucket.go +++ b/bucket.go @@ -39,6 +39,12 @@ type Bucket struct { CRUDLog } +// Clone returns a shallow copy of b. +func (b *Bucket) Clone() *Bucket { + other := *b + return &other +} + // BucketType differentiates system buckets from user buckets. type BucketType int diff --git a/cmd/influx/backup.go b/cmd/influx/backup.go index 0235dbc0e4..2664f356ca 100644 --- a/cmd/influx/backup.go +++ b/cmd/influx/backup.go @@ -24,8 +24,6 @@ func cmdBackup(f *globalFlags, opts genericCLIOpts) *cobra.Command { return newCmdBackupBuilder(f, opts).cmdBackup() } -type backupSVCsFn func() (influxdb.BackupService, error) - type cmdBackupBuilder struct { genericCLIOpts *globalFlags @@ -33,7 +31,7 @@ type cmdBackupBuilder struct { bucketID string bucketName string org organization - outputPath string + path string manifest influxdb.Manifest baseName string @@ -65,7 +63,7 @@ func (b *cmdBackupBuilder) cmdBackup() *cobra.Command { } else if len(args) > 1 { return fmt.Errorf("too many args specified") } - b.outputPath = args[0] + b.path = args[0] return nil } cmd.Short = "Backup database" @@ -104,7 +102,7 @@ func (b *cmdBackupBuilder) backupRunE(cmd *cobra.Command, args []string) (err er b.baseName = time.Now().UTC().Format(influxdb.BackupFilenamePattern) // Ensure directory exsits. - if err := os.MkdirAll(b.outputPath, 0777); err != nil { + if err := os.MkdirAll(b.path, 0777); err != nil { return err } @@ -121,14 +119,14 @@ func (b *cmdBackupBuilder) backupRunE(cmd *cobra.Command, args []string) (err er // Open bolt DB. boltClient := bolt.NewClient(b.logger) - boltClient.Path = filepath.Join(b.outputPath, b.kvPath()) + boltClient.Path = filepath.Join(b.path, b.kvPath()) if err := boltClient.Open(ctx); err != nil { return err } defer boltClient.Close() // Open meta store so we can iterate over meta data. - b.kvStore = bolt.NewKVStore(b.logger, filepath.Join(b.outputPath, b.kvPath())) + b.kvStore = bolt.NewKVStore(b.logger, filepath.Join(b.path, b.kvPath())) b.kvStore.WithDB(boltClient.DB()) b.kvService = kv.NewService(b.logger, b.kvStore, kv.ServiceConfig{}) @@ -153,7 +151,7 @@ func (b *cmdBackupBuilder) backupRunE(cmd *cobra.Command, args []string) (err er // backupKVStore streams the bolt KV file to a file at path. func (b *cmdBackupBuilder) backupKVStore(ctx context.Context) error { - path := filepath.Join(b.outputPath, b.kvPath()) + path := filepath.Join(b.path, b.kvPath()) b.logger.Info("Backing up KV store", zap.String("path", b.kvPath())) // Open writer to output file. @@ -263,7 +261,7 @@ func (b *cmdBackupBuilder) backupBucket(ctx context.Context, org *influxdb.Organ // backupShard streams a tar of TSM data for shard. func (b *cmdBackupBuilder) backupShard(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket, policy string, shardID uint64) error { - path := filepath.Join(b.outputPath, b.shardPath(shardID)) + path := filepath.Join(b.path, b.shardPath(shardID)) b.logger.Info("Backing up shard", zap.Uint64("id", shardID), zap.String("path", b.shardPath(shardID))) // Open writer to output file. @@ -278,7 +276,7 @@ func (b *cmdBackupBuilder) backupShard(ctx context.Context, org *influxdb.Organi defer gw.Close() // Stream file from server, sync, and ensure file closes correctly. - if err := b.backupService.BackupShard(ctx, gw, shardID); err != nil { + if err := b.backupService.BackupShard(ctx, gw, shardID, time.Time{}); err != nil { return err } else if err := gw.Close(); err != nil { return err @@ -311,7 +309,7 @@ func (b *cmdBackupBuilder) backupShard(ctx context.Context, org *influxdb.Organi // writeManifest writes the manifest file out. func (b *cmdBackupBuilder) writeManifest(ctx context.Context) error { - path := filepath.Join(b.outputPath, b.manifestPath()) + path := filepath.Join(b.path, b.manifestPath()) b.logger.Info("Writing manifest", zap.String("path", b.manifestPath())) buf, err := json.MarshalIndent(b.manifest, "", " ") diff --git a/cmd/influx/restore.go b/cmd/influx/restore.go index 1ebdcfdc5e..288569eebd 100644 --- a/cmd/influx/restore.go +++ b/cmd/influx/restore.go @@ -25,8 +25,6 @@ func cmdRestore(f *globalFlags, opts genericCLIOpts) *cobra.Command { return newCmdRestoreBuilder(f, opts).cmdRestore() } -type restoreSVCsFn func() (influxdb.RestoreService, error) - type cmdRestoreBuilder struct { genericCLIOpts *globalFlags @@ -36,7 +34,6 @@ type cmdRestoreBuilder struct { newBucketName string newOrgName string org organization - shardID uint64 path string kvEntry *influxdb.ManifestKVEntry @@ -67,7 +64,6 @@ func (b *cmdRestoreBuilder) cmdRestore() *cobra.Command { cmd.Flags().StringVarP(&b.bucketName, "bucket", "b", "", "The name of the bucket to restore") cmd.Flags().StringVar(&b.newBucketName, "new-bucket", "", "The name of the bucket to restore to") cmd.Flags().StringVar(&b.newOrgName, "new-org", "", "The name of the organization to restore to") - cmd.Flags().Uint64Var(&b.shardID, "shard-id", 0, "The shard to restore") cmd.Flags().StringVar(&b.path, "input", "", "Local backup data path (required)") cmd.Use = "restore [flags] path" cmd.Args = func(cmd *cobra.Command, args []string) error { @@ -99,12 +95,18 @@ func (b *cmdRestoreBuilder) restoreRunE(cmd *cobra.Command, args []string) (err return err } + // Ensure org/bucket filters are set if a new org/bucket name is specified. + if b.newOrgName != "" && b.org.id == "" && b.org.name == "" { + return fmt.Errorf("must specify source org id or name when renaming restored org") + } else if b.newBucketName != "" && b.bucketID == "" && b.bucketName == "" { + return fmt.Errorf("must specify source bucket id or name when renaming restored bucket") + } + // Read in set of KV data & shard data to restore. if err := b.loadIncremental(); err != nil { return fmt.Errorf("restore failed while processing manifest files: %s", err.Error()) } else if b.kvEntry == nil { - return fmt.Errorf("No manifest files found in: %s\n", b.path) - + return fmt.Errorf("no manifest files found in: %s", b.path) } ac := flags.config() @@ -178,19 +180,25 @@ func (b *cmdRestoreBuilder) restoreOrganizations(ctx context.Context) (err error func (b *cmdRestoreBuilder) restoreOrganization(ctx context.Context, org *influxdb.Organization) (err error) { b.logger.Info("Restoring organization", zap.String("id", org.ID.String()), zap.String("name", org.Name)) + newOrg := *org + if b.newOrgName != "" { + newOrg.Name = b.newOrgName + } + // Create organization on server, if it doesn't already exist. - if a, _, err := b.orgService.FindOrganizations(ctx, influxdb.OrganizationFilter{Name: &org.Name}); err != nil { - return fmt.Errorf("cannot find existing organization: %w", err) - } else if len(a) == 0 { - tmp := *org // copy so we don't lose our ID - if err := b.orgService.CreateOrganization(ctx, &tmp); err != nil { + if o, err := b.orgService.FindOrganization(ctx, influxdb.OrganizationFilter{Name: &newOrg.Name}); influxdb.ErrorCode(err) == influxdb.ENotFound { + if err := b.orgService.CreateOrganization(ctx, &newOrg); err != nil { return fmt.Errorf("cannot create organization: %w", err) } + } else if err != nil { + return fmt.Errorf("cannot find existing organization: %#v", err) + } else { + newOrg.ID = o.ID } // Build a filter if bucket ID or bucket name were specified. var filter influxdb.BucketFilter - filter.OrganizationID = &org.ID + filter.OrganizationID = &org.ID // match on backup's org ID if b.bucketID != "" { if filter.ID, err = influxdb.IDFromString(b.bucketID); err != nil { return err @@ -212,14 +220,17 @@ func (b *cmdRestoreBuilder) restoreOrganization(ctx context.Context, org *influx continue } - if err := b.restoreBucket(ctx, org, bkt); err != nil { + bkt = bkt.Clone() + bkt.OrgID = newOrg.ID + + if err := b.restoreBucket(ctx, bkt); err != nil { return err } } return nil } -func (b *cmdRestoreBuilder) restoreBucket(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket) (err error) { +func (b *cmdRestoreBuilder) restoreBucket(ctx context.Context, bkt *influxdb.Bucket) (err error) { b.logger.Info("Restoring bucket", zap.String("id", bkt.ID.String()), zap.String("name", bkt.Name)) // Create bucket on server. @@ -232,6 +243,7 @@ func (b *cmdRestoreBuilder) restoreBucket(ctx context.Context, org *influxdb.Org } // Lookup matching database from the meta store. + // Search using bucket ID from backup. dbi := b.metaClient.Database(bkt.ID.String()) if dbi == nil { return fmt.Errorf("bucket database not found: %s", bkt.ID.String()) @@ -252,8 +264,6 @@ func (b *cmdRestoreBuilder) restoreBucket(ctx context.Context, org *influxdb.Org for _, file := range b.shardEntries { if bkt.ID.String() != file.BucketID { continue - } else if b.shardID != 0 && b.shardID != file.ShardID { - continue } // Skip if shard metadata was not imported. diff --git a/cmd/influxd/launcher/engine.go b/cmd/influxd/launcher/engine.go index c12af95de7..1b984ec15e 100644 --- a/cmd/influxd/launcher/engine.go +++ b/cmd/influxd/launcher/engine.go @@ -166,8 +166,8 @@ func (t *TemporaryEngine) RestoreBucket(ctx context.Context, id influxdb.ID, dbi return t.engine.RestoreBucket(ctx, id, dbi) } -func (t *TemporaryEngine) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error { - return t.engine.BackupShard(ctx, w, shardID) +func (t *TemporaryEngine) BackupShard(ctx context.Context, w io.Writer, shardID uint64, since time.Time) error { + return t.engine.BackupShard(ctx, w, shardID, since) } func (t *TemporaryEngine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error { diff --git a/http/backup_service.go b/http/backup_service.go index ddc2acbc06..9652bae3c7 100644 --- a/http/backup_service.go +++ b/http/backup_service.go @@ -5,14 +5,13 @@ import ( "fmt" "io" "net/http" - "path/filepath" + "net/url" "strconv" "time" "github.com/influxdata/httprouter" "github.com/influxdata/influxdb/v2" // "github.com/influxdata/influxdb/v2/bolt" - "github.com/influxdata/influxdb/v2/internal/fs" "github.com/influxdata/influxdb/v2/kit/tracing" "go.uber.org/zap" ) @@ -92,7 +91,15 @@ func (h *BackupHandler) handleBackupShard(w http.ResponseWriter, r *http.Request return } - if err := h.BackupService.BackupShard(ctx, w, shardID); err != nil { + var since time.Time + if s := r.URL.Query().Get("since"); s != "" { + if since, err = time.ParseInLocation(time.RFC3339, s, time.UTC); err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + } + + if err := h.BackupService.BackupShard(ctx, w, shardID, since); err != nil { h.HandleHTTPError(ctx, err, w) return } @@ -139,7 +146,7 @@ func (s *BackupService) BackupKVStore(ctx context.Context, w io.Writer) error { return resp.Body.Close() } -func (s *BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error { +func (s *BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uint64, since time.Time) error { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -147,6 +154,9 @@ func (s *BackupService) BackupShard(ctx context.Context, w io.Writer, shardID ui if err != nil { return err } + if !since.IsZero() { + u.RawQuery = (url.Values{"since": {since.UTC().Format(time.RFC3339)}}).Encode() + } req, err := http.NewRequest(http.MethodGet, u.String(), nil) if err != nil { @@ -172,11 +182,3 @@ func (s *BackupService) BackupShard(ctx context.Context, w io.Writer, shardID ui } return resp.Body.Close() } - -func defaultConfigsPath() (string, error) { - dir, err := fs.InfluxDir() - if err != nil { - return "", err - } - return filepath.Join(dir, fs.DefaultConfigsFile), nil -} diff --git a/storage/engine.go b/storage/engine.go index c90c603b2b..aba8c9672a 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -323,7 +323,7 @@ func (e *Engine) BackupKVStore(ctx context.Context, w io.Writer) error { return e.metaClient.Backup(ctx, w) } -func (e *Engine) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error { +func (e *Engine) BackupShard(ctx context.Context, w io.Writer, shardID uint64, since time.Time) error { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -334,7 +334,7 @@ func (e *Engine) BackupShard(ctx context.Context, w io.Writer, shardID uint64) e return ErrEngineClosed } - return e.tsdbStore.BackupShard(shardID, time.Time{}, w) + return e.tsdbStore.BackupShard(shardID, since, w) } func (e *Engine) RestoreBucket(ctx context.Context, id influxdb.ID, buf []byte) (map[uint64]uint64, error) { From ea1a3dbe608e9b78ea84eeb9e2d3f5a91c256f6c Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 2 Nov 2020 08:37:46 -0700 Subject: [PATCH 3/7] fix: Return ENotFound for BackupShard() --- tsdb/store.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tsdb/store.go b/tsdb/store.go index 8ed9f78aab..55b53fd225 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -1192,7 +1192,10 @@ func (s *Store) MeasurementsSketches(database string) (estimator.Sketch, estimat func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error { shard := s.Shard(id) if shard == nil { - return fmt.Errorf("shard %d doesn't exist on this server", id) + return &influxdb.Error{ + Code: influxdb.ENotFound, + Msg: fmt.Sprintf("shard %d not found", id), + } } path, err := relativePath(s.path, shard.path) @@ -1206,7 +1209,10 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error { func (s *Store) ExportShard(id uint64, start time.Time, end time.Time, w io.Writer) error { shard := s.Shard(id) if shard == nil { - return fmt.Errorf("shard %d doesn't exist on this server", id) + return &influxdb.Error{ + Code: influxdb.ENotFound, + Msg: fmt.Sprintf("shard %d not found", id), + } } path, err := relativePath(s.path, shard.path) From 5f1968b331840f4bc14525326f10d48f1284e645 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 2 Nov 2020 08:40:24 -0700 Subject: [PATCH 4/7] fix: Skip deleted shard groups during backup --- cmd/influx/backup.go | 9 ++++++++- storage/engine.go | 4 ++++ tsdb/store.go | 1 + 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/cmd/influx/backup.go b/cmd/influx/backup.go index 2664f356ca..f5804155aa 100644 --- a/cmd/influx/backup.go +++ b/cmd/influx/backup.go @@ -249,8 +249,15 @@ func (b *cmdBackupBuilder) backupBucket(ctx context.Context, org *influxdb.Organ // Iterate over and backup each shard. for _, rpi := range dbi.RetentionPolicies { for _, sg := range rpi.ShardGroups { + if sg.Deleted() { + continue + } + for _, sh := range sg.Shards { - if err := b.backupShard(ctx, org, bkt, rpi.Name, sh.ID); err != nil { + if err := b.backupShard(ctx, org, bkt, rpi.Name, sh.ID); influxdb.ErrorCode(err) == influxdb.ENotFound { + b.logger.Warn("Shard removed during backup", zap.Uint64("shard_id", sh.ID)) + continue + } else if err != nil { return err } } diff --git a/storage/engine.go b/storage/engine.go index aba8c9672a..4bb91ca443 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -386,6 +386,10 @@ func (e *Engine) RestoreBucket(ctx context.Context, id influxdb.ID, buf []byte) // Create shards. for _, sgi := range rpi.ShardGroups { + if sgi.Deleted() { + continue + } + for _, sh := range sgi.Shards { if err := e.tsdbStore.CreateShard(dbi.Name, rpi.Name, sh.ID, true); err != nil { return nil, err diff --git a/tsdb/store.go b/tsdb/store.go index 55b53fd225..2db74fcc7c 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -17,6 +17,7 @@ import ( "sync" "time" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/models" From 2c554ae9742b1f2233cd4bee7f99042638e171a1 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 2 Nov 2020 10:10:09 -0700 Subject: [PATCH 5/7] fix: Use operator permissions for backup/restore --- authorizer/backup.go | 6 ++---- authorizer/restore.go | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/authorizer/backup.go b/authorizer/backup.go index 98966643d5..05bf9a5bd5 100644 --- a/authorizer/backup.go +++ b/authorizer/backup.go @@ -28,8 +28,7 @@ func (b BackupService) BackupKVStore(ctx context.Context, w io.Writer) error { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() - // TODO(bbj): Correct permissions. - if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { + if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil { return err } return b.s.BackupKVStore(ctx, w) @@ -39,8 +38,7 @@ func (b BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uin span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() - // TODO(bbj): Correct permissions. - if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { + if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil { return err } return b.s.BackupShard(ctx, w, shardID, since) diff --git a/authorizer/restore.go b/authorizer/restore.go index 40cb421c43..4523e75160 100644 --- a/authorizer/restore.go +++ b/authorizer/restore.go @@ -28,7 +28,7 @@ func (b RestoreService) RestoreBucket(ctx context.Context, id influxdb.ID, dbi [ defer span.Finish() // TODO(bbj): Correct permissions. - if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { + if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil { return nil, err } return b.s.RestoreBucket(ctx, id, dbi) @@ -39,7 +39,7 @@ func (b RestoreService) RestoreShard(ctx context.Context, shardID uint64, r io.R defer span.Finish() // TODO(bbj): Correct permissions. - if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { + if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil { return err } return b.s.RestoreShard(ctx, shardID, r) From 419b0cf76b65d4eca19f116015932a62c7c6f65f Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 3 Nov 2020 16:36:28 -0700 Subject: [PATCH 6/7] feat: Implement full restore --- authorizer/restore.go | 12 ++++- backup.go | 3 ++ bolt/kv.go | 91 +++++++++++++++++++++++++++----- cmd/influx/restore.go | 42 +++++++++++++++ cmd/influxd/launcher/engine.go | 4 ++ go.sum | 1 + http/restore_service.go | 45 ++++++++++++++++ inmem/kv.go | 4 ++ kv/service_test.go | 4 ++ kv/store.go | 2 + mock/kv.go | 11 ++-- storage/bucket_service.go | 1 - storage/engine.go | 40 ++++++++++++++ tsdb/store.go | 10 ++++ v1/services/meta/client.go | 7 +++ v1/services/meta/filestore/kv.go | 4 ++ 16 files changed, 262 insertions(+), 19 deletions(-) diff --git a/authorizer/restore.go b/authorizer/restore.go index 4523e75160..f2fd87fa5f 100644 --- a/authorizer/restore.go +++ b/authorizer/restore.go @@ -23,11 +23,20 @@ func NewRestoreService(s influxdb.RestoreService) *RestoreService { } } +func (b RestoreService) RestoreKVStore(ctx context.Context, r io.Reader) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil { + return err + } + return b.s.RestoreKVStore(ctx, r) +} + func (b RestoreService) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (shardIDMap map[uint64]uint64, err error) { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() - // TODO(bbj): Correct permissions. if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil { return nil, err } @@ -38,7 +47,6 @@ func (b RestoreService) RestoreShard(ctx context.Context, shardID uint64, r io.R span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() - // TODO(bbj): Correct permissions. if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil { return err } diff --git a/backup.go b/backup.go index c14bfd7e16..7a15169113 100644 --- a/backup.go +++ b/backup.go @@ -21,6 +21,9 @@ type BackupService interface { // RestoreService represents the data restore functions of InfluxDB. type RestoreService interface { + // RestoreKVStore restores & replaces metadata database. + RestoreKVStore(ctx context.Context, r io.Reader) error + // RestoreKVStore restores the metadata database. RestoreBucket(ctx context.Context, id ID, rpiData []byte) (shardIDMap map[uint64]uint64, err error) diff --git a/bolt/kv.go b/bolt/kv.go index 6af664238e..2a0de90f87 100644 --- a/bolt/kv.go +++ b/bolt/kv.go @@ -8,10 +8,12 @@ import ( "io" "os" "path/filepath" + "sync" "time" "github.com/influxdata/influxdb/v2/kit/tracing" "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/pkg/fs" bolt "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -22,6 +24,7 @@ var _ kv.SchemaStore = (*KVStore)(nil) // KVStore is a kv.Store backed by boltdb. type KVStore struct { path string + mu sync.RWMutex db *bolt.DB log *zap.Logger @@ -53,6 +56,11 @@ func NewKVStore(log *zap.Logger, path string, opts ...KVOption) *KVStore { return store } +// tempPath returns the path to the temporary file used by Restore(). +func (s *KVStore) tempPath() string { + return s.path + ".tmp" +} + // Open creates boltDB file it doesn't exists and opens it otherwise. func (s *KVStore) Open(ctx context.Context) error { span, _ := tracing.StartSpanFromContext(ctx) @@ -67,30 +75,46 @@ func (s *KVStore) Open(ctx context.Context) error { return err } + // Remove any temporary file created during a failed restore. + if err := os.Remove(s.tempPath()); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("unable to remove boltdb partial restore file: %w", err) + } + // Open database file. - db, err := bolt.Open(s.path, 0600, &bolt.Options{Timeout: 1 * time.Second}) - if err != nil { + if err := s.openDB(); err != nil { return fmt.Errorf("unable to open boltdb file %v", err) } - s.db = db - - db.NoSync = s.noSync s.log.Info("Resources opened", zap.String("path", s.path)) return nil } +func (s *KVStore) openDB() (err error) { + if s.db, err = bolt.Open(s.path, 0600, &bolt.Options{Timeout: 1 * time.Second}); err != nil { + return fmt.Errorf("unable to open boltdb file %v", err) + } + s.db.NoSync = s.noSync + return nil +} + // Close the connection to the bolt database func (s *KVStore) Close() error { - if s.db != nil { - return s.db.Close() + if db := s.DB(); db != nil { + return db.Close() } return nil } +// DB returns a reference to the current Bolt database. +func (s *KVStore) DB() *bolt.DB { + s.mu.RLock() + defer s.mu.RUnlock() + return s.db +} + // Flush removes all bolt keys within each bucket. func (s *KVStore) Flush(ctx context.Context) { - _ = s.db.Update( + _ = s.DB().Update( func(tx *bolt.Tx) error { return tx.ForEach(func(name []byte, b *bolt.Bucket) error { s.cleanBucket(tx, b) @@ -117,6 +141,8 @@ func (s *KVStore) cleanBucket(tx *bolt.Tx, b *bolt.Bucket) { // WithDB sets the boltdb on the store. func (s *KVStore) WithDB(db *bolt.DB) { + s.mu.Lock() + defer s.mu.Unlock() s.db = db } @@ -125,7 +151,7 @@ func (s *KVStore) View(ctx context.Context, fn func(tx kv.Tx) error) error { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() - return s.db.View(func(tx *bolt.Tx) error { + return s.DB().View(func(tx *bolt.Tx) error { return fn(&Tx{ tx: tx, ctx: ctx, @@ -138,7 +164,7 @@ func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() - return s.db.Update(func(tx *bolt.Tx) error { + return s.DB().Update(func(tx *bolt.Tx) error { return fn(&Tx{ tx: tx, ctx: ctx, @@ -149,7 +175,7 @@ func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error { // CreateBucket creates a bucket in the underlying boltdb store if it // does not already exist func (s *KVStore) CreateBucket(ctx context.Context, name []byte) error { - return s.db.Update(func(tx *bolt.Tx) error { + return s.DB().Update(func(tx *bolt.Tx) error { _, err := tx.CreateBucketIfNotExists(name) return err }) @@ -158,7 +184,7 @@ func (s *KVStore) CreateBucket(ctx context.Context, name []byte) error { // DeleteBucket creates a bucket in the underlying boltdb store if it // does not already exist func (s *KVStore) DeleteBucket(ctx context.Context, name []byte) error { - return s.db.Update(func(tx *bolt.Tx) error { + return s.DB().Update(func(tx *bolt.Tx) error { if err := tx.DeleteBucket(name); err != nil && !errors.Is(err, bolt.ErrBucketNotFound) { return err } @@ -172,12 +198,51 @@ func (s *KVStore) Backup(ctx context.Context, w io.Writer) error { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() - return s.db.View(func(tx *bolt.Tx) error { + return s.DB().View(func(tx *bolt.Tx) error { _, err := tx.WriteTo(w) return err }) } +// Restore replaces the underlying database with the data from r. +func (s *KVStore) Restore(ctx context.Context, r io.Reader) error { + if err := func() error { + f, err := os.Create(s.tempPath()) + if err != nil { + return err + } + defer f.Close() + + if _, err := io.Copy(f, r); err != nil { + return err + } else if err := f.Sync(); err != nil { + return err + } else if err := f.Close(); err != nil { + return err + } + + // Swap and reopen under lock. + s.mu.Lock() + defer s.mu.Unlock() + + if err := s.db.Close(); err != nil { + return err + } + + // Atomically swap temporary file with current DB file. + if err := fs.RenameFileWithReplacement(s.tempPath(), s.path); err != nil { + return err + } + + // Reopen with new database file. + return s.openDB() + }(); err != nil { + os.Remove(s.tempPath()) // clean up on error + return err + } + return nil +} + // Tx is a light wrapper around a boltdb transaction. It implements kv.Tx. type Tx struct { tx *bolt.Tx diff --git a/cmd/influx/restore.go b/cmd/influx/restore.go index 288569eebd..87170efd08 100644 --- a/cmd/influx/restore.go +++ b/cmd/influx/restore.go @@ -29,6 +29,7 @@ type cmdRestoreBuilder struct { genericCLIOpts *globalFlags + full bool bucketID string bucketName string newBucketName string @@ -60,6 +61,7 @@ func newCmdRestoreBuilder(f *globalFlags, opts genericCLIOpts) *cmdRestoreBuilde func (b *cmdRestoreBuilder) cmdRestore() *cobra.Command { cmd := b.newCmd("restore", b.restoreRunE) b.org.register(cmd, true) + cmd.Flags().BoolVar(&b.full, "full", false, "Fully restore and replace all data on server") cmd.Flags().StringVar(&b.bucketID, "bucket-id", "", "The ID of the bucket to restore") cmd.Flags().StringVarP(&b.bucketName, "bucket", "b", "", "The name of the bucket to restore") cmd.Flags().StringVar(&b.newBucketName, "new-bucket", "", "The name of the bucket to restore to") @@ -123,6 +125,46 @@ func (b *cmdRestoreBuilder) restoreRunE(cmd *cobra.Command, args []string) (err b.orgService = &http.OrganizationService{Client: client} b.bucketService = &http.BucketService{Client: client} + if !b.full { + return b.restorePartial(ctx) + } + return b.restoreFull(ctx) +} + +// restoreFull completely replaces the bolt metadata file and restores all shard data. +func (b *cmdRestoreBuilder) restoreFull(ctx context.Context) (err error) { + if err := b.restoreKVStore(ctx); err != nil { + return err + } + + // Restore each shard for the bucket. + for _, file := range b.shardEntries { + if err := b.restoreShard(ctx, file.ShardID, file); err != nil { + return err + } + } + + return nil +} + +func (b *cmdRestoreBuilder) restoreKVStore(ctx context.Context) (err error) { + f, err := os.Open(filepath.Join(b.path, b.kvEntry.FileName)) + if err != nil { + return err + } + defer f.Close() + + if err := b.restoreService.RestoreKVStore(ctx, f); err != nil { + return err + } + b.logger.Info("Full metadata restored.") + + return nil +} + +// restorePartial restores shard data to a server without deleting existing data. +// Organizations & buckets are created as needed. Cannot overwrite an existing bucket. +func (b *cmdRestoreBuilder) restorePartial(ctx context.Context) (err error) { // Open bolt DB. boltClient := bolt.NewClient(b.logger) boltClient.Path = filepath.Join(b.path, b.kvEntry.FileName) diff --git a/cmd/influxd/launcher/engine.go b/cmd/influxd/launcher/engine.go index 1b984ec15e..3bfb15e7b8 100644 --- a/cmd/influxd/launcher/engine.go +++ b/cmd/influxd/launcher/engine.go @@ -162,6 +162,10 @@ func (t *TemporaryEngine) BackupKVStore(ctx context.Context, w io.Writer) error return t.engine.BackupKVStore(ctx, w) } +func (t *TemporaryEngine) RestoreKVStore(ctx context.Context, r io.Reader) error { + return t.engine.RestoreKVStore(ctx, r) +} + func (t *TemporaryEngine) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (map[uint64]uint64, error) { return t.engine.RestoreBucket(ctx, id, dbi) } diff --git a/go.sum b/go.sum index b1fddc8b24..e02ebe6cf8 100644 --- a/go.sum +++ b/go.sum @@ -328,6 +328,7 @@ github.com/influxdata/flux v0.93.0 h1:PHx2zMkknjwjsNOH040lDahoilbuuRTUCV7Ownrrdk github.com/influxdata/flux v0.93.0/go.mod h1:9csju6RUyFbwxcIR0Nyr8Z+fh2O4axq0zJE6DGHg1Cc= github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU= github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA= +github.com/influxdata/influxdb v1.8.3 h1:WEypI1BQFTT4teLM+1qkEcvUi0dAvopAI/ir0vAiBg8= github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo= github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM= diff --git a/http/restore_service.go b/http/restore_service.go index 4ebbe91387..2369ded664 100644 --- a/http/restore_service.go +++ b/http/restore_service.go @@ -45,6 +45,7 @@ type RestoreHandler struct { const ( prefixRestore = "/api/v2/restore" + restoreKVPath = prefixRestore + "/kv" restoreBucketPath = prefixRestore + "/buckets/:bucketID" restoreShardPath = prefixRestore + "/shards/:shardID" ) @@ -58,12 +59,25 @@ func NewRestoreHandler(b *RestoreBackend) *RestoreHandler { RestoreService: b.RestoreService, } + h.HandlerFunc(http.MethodPost, restoreKVPath, h.handleRestoreKVStore) h.HandlerFunc(http.MethodPost, restoreBucketPath, h.handleRestoreBucket) h.HandlerFunc(http.MethodPost, restoreShardPath, h.handleRestoreShard) return h } +func (h *RestoreHandler) handleRestoreKVStore(w http.ResponseWriter, r *http.Request) { + span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreKVStore") + defer span.Finish() + + ctx := r.Context() + + if err := h.RestoreService.RestoreKVStore(ctx, r.Body); err != nil { + h.HandleHTTPError(ctx, err, w) + return + } +} + func (h *RestoreHandler) handleRestoreBucket(w http.ResponseWriter, r *http.Request) { span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreBucket") defer span.Finish() @@ -122,6 +136,37 @@ type RestoreService struct { InsecureSkipVerify bool } +func (s *RestoreService) RestoreKVStore(ctx context.Context, r io.Reader) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + u, err := NewURL(s.Addr, restoreKVPath) + if err != nil { + return err + } + + req, err := http.NewRequest(http.MethodPost, u.String(), r) + if err != nil { + return err + } + SetToken(s.Token, req) + req = req.WithContext(ctx) + + hc := NewClient(u.Scheme, s.InsecureSkipVerify) + hc.Timeout = httpClientTimeout + resp, err := hc.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + return err + } + + return nil +} + func (s *RestoreService) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (map[uint64]uint64, error) { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() diff --git a/inmem/kv.go b/inmem/kv.go index ebf17352c0..3e80d866b2 100644 --- a/inmem/kv.go +++ b/inmem/kv.go @@ -88,6 +88,10 @@ func (s *KVStore) Backup(ctx context.Context, w io.Writer) error { panic("not implemented") } +func (s *KVStore) Restore(ctx context.Context, r io.Reader) error { + panic("not implemented") +} + // Flush removes all data from the buckets. Used for testing. func (s *KVStore) Flush(ctx context.Context) { s.mu.Lock() diff --git a/kv/service_test.go b/kv/service_test.go index a31718e196..82acb7bfe0 100644 --- a/kv/service_test.go +++ b/kv/service_test.go @@ -26,6 +26,10 @@ func (s mockStore) Backup(ctx context.Context, w io.Writer) error { return nil } +func (s mockStore) Restore(ctx context.Context, r io.Reader) error { + return nil +} + func TestNewService(t *testing.T) { s := kv.NewService(zaptest.NewLogger(t), mockStore{}) diff --git a/kv/store.go b/kv/store.go index b31e21542b..9a699f7999 100644 --- a/kv/store.go +++ b/kv/store.go @@ -51,6 +51,8 @@ type Store interface { Update(context.Context, func(Tx) error) error // Backup copies all K:Vs to a writer, file format determined by implementation. Backup(ctx context.Context, w io.Writer) error + // Restore replaces the underlying data file with the data from r. + Restore(ctx context.Context, r io.Reader) error } // Tx is a transaction in the store. diff --git a/mock/kv.go b/mock/kv.go index 2798a70afc..eac0765259 100644 --- a/mock/kv.go +++ b/mock/kv.go @@ -11,9 +11,10 @@ var _ (kv.Store) = (*Store)(nil) // Store is a mock kv.Store type Store struct { - ViewFn func(func(kv.Tx) error) error - UpdateFn func(func(kv.Tx) error) error - BackupFn func(ctx context.Context, w io.Writer) error + ViewFn func(func(kv.Tx) error) error + UpdateFn func(func(kv.Tx) error) error + BackupFn func(ctx context.Context, w io.Writer) error + RestoreFn func(ctx context.Context, r io.Reader) error } // View opens up a transaction that will not write to any data. Implementing interfaces @@ -31,6 +32,10 @@ func (s *Store) Backup(ctx context.Context, w io.Writer) error { return s.BackupFn(ctx, w) } +func (s *Store) Restore(ctx context.Context, r io.Reader) error { + return s.RestoreFn(ctx, r) +} + var _ (kv.Tx) = (*Tx)(nil) // Tx is mock of a kv.Tx. diff --git a/storage/bucket_service.go b/storage/bucket_service.go index df9335638d..8f86ced624 100644 --- a/storage/bucket_service.go +++ b/storage/bucket_service.go @@ -33,7 +33,6 @@ func NewBucketService(log *zap.Logger, s influxdb.BucketService, engine EngineSc BucketService: s, log: log, engine: engine, - log: logger, } } diff --git a/storage/engine.go b/storage/engine.go index 4bb91ca443..18ecebaeaa 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -80,6 +80,7 @@ type MetaClient interface { ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error Backup(ctx context.Context, w io.Writer) error + Restore(ctx context.Context, r io.Reader) error Data() meta.Data SetData(data *meta.Data) error } @@ -337,6 +338,45 @@ func (e *Engine) BackupShard(ctx context.Context, w io.Writer, shardID uint64, s return e.tsdbStore.BackupShard(shardID, since, w) } +func (e *Engine) RestoreKVStore(ctx context.Context, r io.Reader) error { + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + e.mu.RLock() + defer e.mu.RUnlock() + + if e.closing == nil { + return ErrEngineClosed + } + + // Replace KV store data and remove all existing shard data. + if err := e.metaClient.Restore(ctx, r); err != nil { + return err + } else if err := e.tsdbStore.DeleteShards(); err != nil { + return err + } + + // Create new shards based on the restored KV data. + data := e.metaClient.Data() + for _, dbi := range data.Databases { + for _, rpi := range dbi.RetentionPolicies { + for _, sgi := range rpi.ShardGroups { + if sgi.Deleted() { + continue + } + + for _, sh := range sgi.Shards { + if err := e.tsdbStore.CreateShard(dbi.Name, rpi.Name, sh.ID, true); err != nil { + return err + } + } + } + } + } + + return nil +} + func (e *Engine) RestoreBucket(ctx context.Context, id influxdb.ID, buf []byte) (map[uint64]uint64, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() diff --git a/tsdb/store.go b/tsdb/store.go index 2db74fcc7c..646f2c6e17 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -696,6 +696,16 @@ func (s *Store) SetShardEnabled(shardID uint64, enabled bool) error { return nil } +// DeleteShards removes all shards from disk. +func (s *Store) DeleteShards() error { + for _, id := range s.ShardIDs() { + if err := s.DeleteShard(id); err != nil { + return err + } + } + return nil +} + // DeleteShard removes a shard from disk. func (s *Store) DeleteShard(shardID uint64) error { sh := s.Shard(shardID) diff --git a/v1/services/meta/client.go b/v1/services/meta/client.go index 2d4466c3de..0e46ebdcc3 100644 --- a/v1/services/meta/client.go +++ b/v1/services/meta/client.go @@ -1040,6 +1040,13 @@ func (c *Client) Backup(ctx context.Context, w io.Writer) error { return c.store.Backup(ctx, w) } +func (c *Client) Restore(ctx context.Context, r io.Reader) error { + if err := c.store.Restore(ctx, r); err != nil { + return err + } + return c.Load() +} + type uint64Slice []uint64 func (a uint64Slice) Len() int { return len(a) } diff --git a/v1/services/meta/filestore/kv.go b/v1/services/meta/filestore/kv.go index 60613249c2..bfa31521f2 100644 --- a/v1/services/meta/filestore/kv.go +++ b/v1/services/meta/filestore/kv.go @@ -36,6 +36,10 @@ func (s *KVStore) Backup(ctx context.Context, w io.Writer) error { panic("not implemented") } +func (s *KVStore) Restore(ctx context.Context, r io.Reader) error { + panic("not implemented") +} + // Tx is an in memory transaction. // TODO: make transactions actually transactional type Tx struct { From 00b58deab0c07b02485f1c154907c89f26b9624b Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 5 Nov 2020 10:10:32 -0700 Subject: [PATCH 7/7] fix: pr review changes --- backup.go | 5 +---- go.sum | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/backup.go b/backup.go index 7a15169113..71f3b08ab6 100644 --- a/backup.go +++ b/backup.go @@ -32,14 +32,11 @@ type RestoreService interface { } // Manifest lists the KV and shard file information contained in the backup. -// If Limited is false, the manifest contains a full backup, otherwise -// it is a partial backup. type Manifest struct { KV ManifestKVEntry `json:"kv"` Files []ManifestEntry `json:"files"` - // If limited is true, then one (or all) of the following fields will be set - + // These fields are only set if filtering options are set on the CLI. OrganizationID string `json:"organizationID,omitempty"` BucketID string `json:"bucketID,omitempty"` } diff --git a/go.sum b/go.sum index e02ebe6cf8..b1fddc8b24 100644 --- a/go.sum +++ b/go.sum @@ -328,7 +328,6 @@ github.com/influxdata/flux v0.93.0 h1:PHx2zMkknjwjsNOH040lDahoilbuuRTUCV7Ownrrdk github.com/influxdata/flux v0.93.0/go.mod h1:9csju6RUyFbwxcIR0Nyr8Z+fh2O4axq0zJE6DGHg1Cc= github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU= github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA= -github.com/influxdata/influxdb v1.8.3 h1:WEypI1BQFTT4teLM+1qkEcvUi0dAvopAI/ir0vAiBg8= github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo= github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM=