diff --git a/authorizer/backup.go b/authorizer/backup.go index 76c398fdc9..c19d6c937d 100644 --- a/authorizer/backup.go +++ b/authorizer/backup.go @@ -23,26 +23,24 @@ func NewBackupService(s influxdb.BackupService) *BackupService { } } -func (b BackupService) CreateBackup(ctx context.Context) (int, []string, error) { - span, ctx := tracing.StartSpanFromContext(ctx) - defer span.Finish() - - if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { - return 0, nil, err - } - return b.s.CreateBackup(ctx) -} - -func (b BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { +func (b BackupService) BackupKVStore(ctx context.Context, w io.Writer) error { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() + // TODO(bbj): Correct permissions. if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { return err } - return b.s.FetchBackupFile(ctx, backupID, backupFile, w) + return b.s.BackupKVStore(ctx, w) } -func (b BackupService) InternalBackupPath(backupID int) string { - return b.s.InternalBackupPath(backupID) +func (b BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + // TODO(bbj): Correct permissions. + if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { + return err + } + return b.s.BackupShard(ctx, w, shardID) } diff --git a/authorizer/restore.go b/authorizer/restore.go new file mode 100644 index 0000000000..40cb421c43 --- /dev/null +++ b/authorizer/restore.go @@ -0,0 +1,46 @@ +package authorizer + +import ( + "context" + "io" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/tracing" +) + +var _ influxdb.RestoreService = (*RestoreService)(nil) + +// RestoreService wraps a influxdb.RestoreService and authorizes actions +// against it appropriately. +type RestoreService struct { + s influxdb.RestoreService +} + +// NewRestoreService constructs an instance of an authorizing restore service. +func NewRestoreService(s influxdb.RestoreService) *RestoreService { + return &RestoreService{ + s: s, + } +} + +func (b RestoreService) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (shardIDMap map[uint64]uint64, err error) { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + // TODO(bbj): Correct permissions. + if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { + return nil, err + } + return b.s.RestoreBucket(ctx, id, dbi) +} + +func (b RestoreService) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + // TODO(bbj): Correct permissions. + if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { + return err + } + return b.s.RestoreShard(ctx, shardID, r) +} diff --git a/backup.go b/backup.go index fe2ed03a71..8777542c9b 100644 --- a/backup.go +++ b/backup.go @@ -3,21 +3,67 @@ package influxdb import ( "context" "io" + "time" +) + +const ( + BackupFilenamePattern = "20060102T150405Z" ) // BackupService represents the data backup functions of InfluxDB. type BackupService interface { - // CreateBackup creates a local copy (hard links) of the TSM data for all orgs and buckets. - // The return values are used to download each backup file. - CreateBackup(context.Context) (backupID int, backupFiles []string, err error) - // FetchBackupFile downloads one backup file, data or metadata. - FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error - // InternalBackupPath is a utility to determine the on-disk location of a backup fileset. - InternalBackupPath(backupID int) string + // BackupKVStore creates a live backup copy of the metadata database. + BackupKVStore(ctx context.Context, w io.Writer) error + + // BackupShard downloads a backup file for a single shard. + BackupShard(ctx context.Context, w io.Writer, shardID uint64) error } -// KVBackupService represents the meta data backup functions of InfluxDB. -type KVBackupService interface { - // Backup creates a live backup copy of the metadata database. - Backup(ctx context.Context, w io.Writer) error +// RestoreService represents the data restore functions of InfluxDB. +type RestoreService interface { + // RestoreKVStore restores the metadata database. + RestoreBucket(ctx context.Context, id ID, rpiData []byte) (shardIDMap map[uint64]uint64, err error) + + // RestoreShard uploads a backup file for a single shard. + RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error +} + +// Manifest lists the KV and shard file information contained in the backup. +// If Limited is false, the manifest contains a full backup, otherwise +// it is a partial backup. +type Manifest struct { + KV ManifestKVEntry `json:"kv"` + Files []ManifestEntry `json:"files"` + + // If limited is true, then one (or all) of the following fields will be set + + OrganizationID string `json:"organizationID,omitempty"` + BucketID string `json:"bucketID,omitempty"` +} + +// ManifestEntry contains the data information for a backed up shard. +type ManifestEntry struct { + OrganizationID string `json:"organizationID"` + OrganizationName string `json:"organizationName"` + BucketID string `json:"bucketID"` + BucketName string `json:"bucketName"` + ShardID uint64 `json:"shardID"` + FileName string `json:"fileName"` + Size int64 `json:"size"` + LastModified time.Time `json:"lastModified"` +} + +// ManifestKVEntry contains the KV store information for a backup. +type ManifestKVEntry struct { + FileName string `json:"fileName"` + Size int64 `json:"size"` +} + +// Size returns the size of the manifest. +func (m *Manifest) Size() int64 { + n := m.KV.Size + for _, f := range m.Files { + n += f.Size + } + return n } diff --git a/cmd/influx/backup.go b/cmd/influx/backup.go index bee51275ed..0235dbc0e4 100644 --- a/cmd/influx/backup.go +++ b/cmd/influx/backup.go @@ -1,97 +1,330 @@ package main import ( + "compress/gzip" "context" + "encoding/json" "fmt" + "io/ioutil" "os" "path/filepath" + "time" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/bolt" "github.com/influxdata/influxdb/v2/http" + "github.com/influxdata/influxdb/v2/kv" + influxlogger "github.com/influxdata/influxdb/v2/logger" + "github.com/influxdata/influxdb/v2/v1/services/meta" "github.com/spf13/cobra" - "go.uber.org/multierr" + "go.uber.org/zap" ) -func cmdBackup(f *globalFlags, opt genericCLIOpts) *cobra.Command { - cmd := opt.newCmd("backup", backupF, false) - cmd.Short = "Backup the data in InfluxDB" - cmd.Long = fmt.Sprintf( - `Backs up data and meta data for the running InfluxDB instance. -Downloaded files are written to the directory indicated by --path. -The target directory, and any parent directories, are created automatically. -Data file have extension .tsm; meta data is written to %s in the same directory.`, - bolt.DefaultFilename) +func cmdBackup(f *globalFlags, opts genericCLIOpts) *cobra.Command { + return newCmdBackupBuilder(f, opts).cmdBackup() +} - f.registerFlags(cmd) +type backupSVCsFn func() (influxdb.BackupService, error) - opts := flagOpts{ - { - DestP: &backupFlags.Path, - Flag: "path", - Short: 'p', - EnvVar: "PATH", - Desc: "directory path to write backup files to", - Required: true, - }, +type cmdBackupBuilder struct { + genericCLIOpts + *globalFlags + + bucketID string + bucketName string + org organization + outputPath string + + manifest influxdb.Manifest + baseName string + + backupService *http.BackupService + kvStore *bolt.KVStore + kvService *kv.Service + metaClient *meta.Client + + logger *zap.Logger +} + +func newCmdBackupBuilder(f *globalFlags, opts genericCLIOpts) *cmdBackupBuilder { + return &cmdBackupBuilder{ + genericCLIOpts: opts, + globalFlags: f, } - opts.mustRegister(cmd) +} +func (b *cmdBackupBuilder) cmdBackup() *cobra.Command { + cmd := b.newCmd("backup", b.backupRunE) + b.org.register(cmd, true) + cmd.Flags().StringVar(&b.bucketID, "bucket-id", "", "The ID of the bucket to backup") + cmd.Flags().StringVarP(&b.bucketName, "bucket", "b", "", "The name of the bucket to backup") + cmd.Use = "backup [flags] path" + cmd.Args = func(cmd *cobra.Command, args []string) error { + if len(args) == 0 { + return fmt.Errorf("must specify output path") + } else if len(args) > 1 { + return fmt.Errorf("too many args specified") + } + b.outputPath = args[0] + return nil + } + cmd.Short = "Backup database" + cmd.Long = ` +Backs up InfluxDB to a directory. + +Examples: + # backup all data + influx backup /path/to/backup +` return cmd } -var backupFlags struct { - Path string +func (b *cmdBackupBuilder) manifestPath() string { + return fmt.Sprintf("%s.manifest", b.baseName) } -func newBackupService() (influxdb.BackupService, error) { - ac := flags.config() - return &http.BackupService{ - Addr: ac.Host, - Token: ac.Token, - }, nil +func (b *cmdBackupBuilder) kvPath() string { + return fmt.Sprintf("%s.bolt", b.baseName) } -func backupF(cmd *cobra.Command, args []string) error { +func (b *cmdBackupBuilder) shardPath(id uint64) string { + return fmt.Sprintf("%s.s%d", b.baseName, id) + ".tar.gz" +} + +func (b *cmdBackupBuilder) backupRunE(cmd *cobra.Command, args []string) (err error) { ctx := context.Background() - if backupFlags.Path == "" { - return fmt.Errorf("must specify path") - } - - err := os.MkdirAll(backupFlags.Path, 0777) - if err != nil && !os.IsExist(err) { + // Create top level logger + logconf := influxlogger.NewConfig() + if b.logger, err = logconf.New(os.Stdout); err != nil { return err } - backupService, err := newBackupService() - if err != nil { + // Determine a base + b.baseName = time.Now().UTC().Format(influxdb.BackupFilenamePattern) + + // Ensure directory exsits. + if err := os.MkdirAll(b.outputPath, 0777); err != nil { return err } - id, backupFilenames, err := backupService.CreateBackup(ctx) - if err != nil { + ac := flags.config() + b.backupService = &http.BackupService{ + Addr: ac.Host, + Token: ac.Token, + } + + // Back up Bolt database to file. + if err := b.backupKVStore(ctx); err != nil { return err } - fmt.Printf("Backup ID %d contains %d files\n", id, len(backupFilenames)) + // Open bolt DB. + boltClient := bolt.NewClient(b.logger) + boltClient.Path = filepath.Join(b.outputPath, b.kvPath()) + if err := boltClient.Open(ctx); err != nil { + return err + } + defer boltClient.Close() - for _, backupFilename := range backupFilenames { - dest := filepath.Join(backupFlags.Path, backupFilename) - w, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) - if err != nil { - return err - } - err = backupService.FetchBackupFile(ctx, id, backupFilename, w) - if err != nil { - return multierr.Append(fmt.Errorf("error fetching file %s: %v", backupFilename, err), w.Close()) - } - if err = w.Close(); err != nil { - return err - } + // Open meta store so we can iterate over meta data. + b.kvStore = bolt.NewKVStore(b.logger, filepath.Join(b.outputPath, b.kvPath())) + b.kvStore.WithDB(boltClient.DB()) + b.kvService = kv.NewService(b.logger, b.kvStore, kv.ServiceConfig{}) + + b.metaClient = meta.NewClient(meta.NewConfig(), b.kvStore) + if err := b.metaClient.Open(); err != nil { + return err } - fmt.Printf("Backup complete") + // Filter through organizations & buckets to backup appropriate shards. + if err := b.backupOrganizations(ctx); err != nil { + return err + } + + if err := b.writeManifest(ctx); err != nil { + return err + } + + b.logger.Info("Backup complete") return nil } + +// backupKVStore streams the bolt KV file to a file at path. +func (b *cmdBackupBuilder) backupKVStore(ctx context.Context) error { + path := filepath.Join(b.outputPath, b.kvPath()) + b.logger.Info("Backing up KV store", zap.String("path", b.kvPath())) + + // Open writer to output file. + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + // Stream bolt file from server, sync, and ensure file closes correctly. + if err := b.backupService.BackupKVStore(ctx, f); err != nil { + return err + } else if err := f.Sync(); err != nil { + return err + } else if err := f.Close(); err != nil { + return err + } + + // Lookup file size. + fi, err := os.Stat(path) + if err != nil { + return err + } + b.manifest.KV = influxdb.ManifestKVEntry{ + FileName: b.kvPath(), + Size: fi.Size(), + } + + return nil +} + +func (b *cmdBackupBuilder) backupOrganizations(ctx context.Context) (err error) { + // Build a filter if org ID or org name were specified. + var filter influxdb.OrganizationFilter + if b.org.id != "" { + if filter.ID, err = influxdb.IDFromString(b.org.id); err != nil { + return err + } + } else if b.org.name != "" { + filter.Name = &b.org.name + } + + // Retrieve a list of all matching organizations. + orgs, _, err := b.kvService.FindOrganizations(ctx, filter) + if err != nil { + return err + } + + // Back up buckets in each matching organization. + for _, org := range orgs { + b.logger.Info("Backing up organization", zap.String("id", org.ID.String()), zap.String("name", org.Name)) + if err := b.backupBuckets(ctx, org); err != nil { + return err + } + } + return nil +} + +func (b *cmdBackupBuilder) backupBuckets(ctx context.Context, org *influxdb.Organization) (err error) { + // Build a filter if bucket ID or bucket name were specified. + var filter influxdb.BucketFilter + filter.OrganizationID = &org.ID + if b.bucketID != "" { + if filter.ID, err = influxdb.IDFromString(b.bucketID); err != nil { + return err + } + } else if b.bucketName != "" { + filter.Name = &b.bucketName + } + + // Retrieve a list of all matching organizations. + buckets, _, err := b.kvService.FindBuckets(ctx, filter) + if err != nil { + return err + } + + // Back up shards in each matching bucket. + for _, bkt := range buckets { + if err := b.backupBucket(ctx, org, bkt); err != nil { + return err + } + } + return nil +} + +func (b *cmdBackupBuilder) backupBucket(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket) (err error) { + b.logger.Info("Backing up bucket", zap.String("id", bkt.ID.String()), zap.String("name", bkt.Name)) + + // Lookup matching database from the meta store. + dbi := b.metaClient.Database(bkt.ID.String()) + if dbi == nil { + return fmt.Errorf("bucket database not found: %s", bkt.ID.String()) + } + + // Iterate over and backup each shard. + for _, rpi := range dbi.RetentionPolicies { + for _, sg := range rpi.ShardGroups { + for _, sh := range sg.Shards { + if err := b.backupShard(ctx, org, bkt, rpi.Name, sh.ID); err != nil { + return err + } + } + } + } + return nil +} + +// backupShard streams a tar of TSM data for shard. +func (b *cmdBackupBuilder) backupShard(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket, policy string, shardID uint64) error { + path := filepath.Join(b.outputPath, b.shardPath(shardID)) + b.logger.Info("Backing up shard", zap.Uint64("id", shardID), zap.String("path", b.shardPath(shardID))) + + // Open writer to output file. + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + // Wrap file writer with a gzip writer. + gw := gzip.NewWriter(f) + defer gw.Close() + + // Stream file from server, sync, and ensure file closes correctly. + if err := b.backupService.BackupShard(ctx, gw, shardID); err != nil { + return err + } else if err := gw.Close(); err != nil { + return err + } else if err := f.Sync(); err != nil { + return err + } else if err := f.Close(); err != nil { + return err + } + + // Determine file size. + fi, err := os.Stat(path) + if err != nil { + return err + } + + // Update manifest. + b.manifest.Files = append(b.manifest.Files, influxdb.ManifestEntry{ + OrganizationID: org.ID.String(), + OrganizationName: org.Name, + BucketID: bkt.ID.String(), + BucketName: bkt.Name, + ShardID: shardID, + FileName: b.shardPath(shardID), + Size: fi.Size(), + LastModified: fi.ModTime().UTC(), + }) + + return nil +} + +// writeManifest writes the manifest file out. +func (b *cmdBackupBuilder) writeManifest(ctx context.Context) error { + path := filepath.Join(b.outputPath, b.manifestPath()) + b.logger.Info("Writing manifest", zap.String("path", b.manifestPath())) + + buf, err := json.MarshalIndent(b.manifest, "", " ") + if err != nil { + return fmt.Errorf("create manifest: %w", err) + } + buf = append(buf, '\n') + return ioutil.WriteFile(path, buf, 0600) +} + +func (b *cmdBackupBuilder) newCmd(use string, runE func(*cobra.Command, []string) error) *cobra.Command { + cmd := b.genericCLIOpts.newCmd(use, runE, true) + b.genericCLIOpts.registerPrintOptions(cmd) + b.globalFlags.registerFlags(cmd) + return cmd +} diff --git a/cmd/influx/main.go b/cmd/influx/main.go index bf2a5b2457..1bdcb9af57 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -328,6 +328,7 @@ func influxCmd(opts ...genericCLIOptFn) *cobra.Command { cmdOrganization, cmdPing, cmdQuery, + cmdRestore, cmdSecret, cmdSetup, cmdStack, diff --git a/cmd/influx/restore.go b/cmd/influx/restore.go new file mode 100644 index 0000000000..1ebdcfdc5e --- /dev/null +++ b/cmd/influx/restore.go @@ -0,0 +1,347 @@ +package main + +import ( + "compress/gzip" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "strings" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/bolt" + "github.com/influxdata/influxdb/v2/http" + "github.com/influxdata/influxdb/v2/kv" + influxlogger "github.com/influxdata/influxdb/v2/logger" + "github.com/influxdata/influxdb/v2/v1/services/meta" + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +func cmdRestore(f *globalFlags, opts genericCLIOpts) *cobra.Command { + return newCmdRestoreBuilder(f, opts).cmdRestore() +} + +type restoreSVCsFn func() (influxdb.RestoreService, error) + +type cmdRestoreBuilder struct { + genericCLIOpts + *globalFlags + + bucketID string + bucketName string + newBucketName string + newOrgName string + org organization + shardID uint64 + path string + + kvEntry *influxdb.ManifestKVEntry + shardEntries map[uint64]*influxdb.ManifestEntry + + orgService *http.OrganizationService + bucketService *http.BucketService + restoreService *http.RestoreService + kvService *kv.Service + metaClient *meta.Client + + logger *zap.Logger +} + +func newCmdRestoreBuilder(f *globalFlags, opts genericCLIOpts) *cmdRestoreBuilder { + return &cmdRestoreBuilder{ + genericCLIOpts: opts, + globalFlags: f, + + shardEntries: make(map[uint64]*influxdb.ManifestEntry), + } +} + +func (b *cmdRestoreBuilder) cmdRestore() *cobra.Command { + cmd := b.newCmd("restore", b.restoreRunE) + b.org.register(cmd, true) + cmd.Flags().StringVar(&b.bucketID, "bucket-id", "", "The ID of the bucket to restore") + cmd.Flags().StringVarP(&b.bucketName, "bucket", "b", "", "The name of the bucket to restore") + cmd.Flags().StringVar(&b.newBucketName, "new-bucket", "", "The name of the bucket to restore to") + cmd.Flags().StringVar(&b.newOrgName, "new-org", "", "The name of the organization to restore to") + cmd.Flags().Uint64Var(&b.shardID, "shard-id", 0, "The shard to restore") + cmd.Flags().StringVar(&b.path, "input", "", "Local backup data path (required)") + cmd.Use = "restore [flags] path" + cmd.Args = func(cmd *cobra.Command, args []string) error { + if len(args) == 0 { + return fmt.Errorf("must specify path to backup directory") + } else if len(args) > 1 { + return fmt.Errorf("too many args specified") + } + b.path = args[0] + return nil + } + cmd.Short = "Restores a backup directory to InfluxDB." + cmd.Long = ` +Restore influxdb. + +Examples: + # restore all data + influx restore /path/to/restore +` + return cmd +} + +func (b *cmdRestoreBuilder) restoreRunE(cmd *cobra.Command, args []string) (err error) { + ctx := context.Background() + + // Create top level logger + logconf := influxlogger.NewConfig() + if b.logger, err = logconf.New(os.Stdout); err != nil { + return err + } + + // Read in set of KV data & shard data to restore. + if err := b.loadIncremental(); err != nil { + return fmt.Errorf("restore failed while processing manifest files: %s", err.Error()) + } else if b.kvEntry == nil { + return fmt.Errorf("No manifest files found in: %s\n", b.path) + + } + + ac := flags.config() + b.restoreService = &http.RestoreService{ + Addr: ac.Host, + Token: ac.Token, + } + + client, err := newHTTPClient() + if err != nil { + return err + } + + b.orgService = &http.OrganizationService{Client: client} + b.bucketService = &http.BucketService{Client: client} + + // Open bolt DB. + boltClient := bolt.NewClient(b.logger) + boltClient.Path = filepath.Join(b.path, b.kvEntry.FileName) + if err := boltClient.Open(ctx); err != nil { + return err + } + defer boltClient.Close() + + // Open meta store so we can iterate over meta data. + kvStore := bolt.NewKVStore(b.logger, boltClient.Path) + kvStore.WithDB(boltClient.DB()) + b.kvService = kv.NewService(b.logger, kvStore, kv.ServiceConfig{}) + + b.metaClient = meta.NewClient(meta.NewConfig(), kvStore) + if err := b.metaClient.Open(); err != nil { + return err + } + + // Filter through organizations & buckets to restore appropriate shards. + if err := b.restoreOrganizations(ctx); err != nil { + return err + } + + b.logger.Info("Restore complete") + + return nil +} + +func (b *cmdRestoreBuilder) restoreOrganizations(ctx context.Context) (err error) { + // Build a filter if org ID or org name were specified. + var filter influxdb.OrganizationFilter + if b.org.id != "" { + if filter.ID, err = influxdb.IDFromString(b.org.id); err != nil { + return err + } + } else if b.org.name != "" { + filter.Name = &b.org.name + } + + // Retrieve a list of all matching organizations. + orgs, _, err := b.kvService.FindOrganizations(ctx, filter) + if err != nil { + return err + } + + // Restore matching organizations. + for _, org := range orgs { + if err := b.restoreOrganization(ctx, org); err != nil { + return err + } + } + return nil +} + +func (b *cmdRestoreBuilder) restoreOrganization(ctx context.Context, org *influxdb.Organization) (err error) { + b.logger.Info("Restoring organization", zap.String("id", org.ID.String()), zap.String("name", org.Name)) + + // Create organization on server, if it doesn't already exist. + if a, _, err := b.orgService.FindOrganizations(ctx, influxdb.OrganizationFilter{Name: &org.Name}); err != nil { + return fmt.Errorf("cannot find existing organization: %w", err) + } else if len(a) == 0 { + tmp := *org // copy so we don't lose our ID + if err := b.orgService.CreateOrganization(ctx, &tmp); err != nil { + return fmt.Errorf("cannot create organization: %w", err) + } + } + + // Build a filter if bucket ID or bucket name were specified. + var filter influxdb.BucketFilter + filter.OrganizationID = &org.ID + if b.bucketID != "" { + if filter.ID, err = influxdb.IDFromString(b.bucketID); err != nil { + return err + } + } else if b.bucketName != "" { + filter.Name = &b.bucketName + } + + // Retrieve a list of all buckets for the organization in the local backup. + buckets, _, err := b.kvService.FindBuckets(ctx, filter) + if err != nil { + return err + } + + // Restore each matching bucket. + for _, bkt := range buckets { + // Skip internal buckets. + if strings.HasPrefix(bkt.Name, "_") { + continue + } + + if err := b.restoreBucket(ctx, org, bkt); err != nil { + return err + } + } + return nil +} + +func (b *cmdRestoreBuilder) restoreBucket(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket) (err error) { + b.logger.Info("Restoring bucket", zap.String("id", bkt.ID.String()), zap.String("name", bkt.Name)) + + // Create bucket on server. + newBucket := *bkt + if b.newBucketName != "" { + newBucket.Name = b.newBucketName + } + if err := b.bucketService.CreateBucket(ctx, &newBucket); err != nil { + return fmt.Errorf("cannot create bucket: %w", err) + } + + // Lookup matching database from the meta store. + dbi := b.metaClient.Database(bkt.ID.String()) + if dbi == nil { + return fmt.Errorf("bucket database not found: %s", bkt.ID.String()) + } + + // Serialize to protobufs. + buf, err := dbi.MarshalBinary() + if err != nil { + return fmt.Errorf("cannot marshal database info: %w", err) + } + + shardIDMap, err := b.restoreService.RestoreBucket(ctx, newBucket.ID, buf) + if err != nil { + return fmt.Errorf("cannot restore bucket: %w", err) + } + + // Restore each shard for the bucket. + for _, file := range b.shardEntries { + if bkt.ID.String() != file.BucketID { + continue + } else if b.shardID != 0 && b.shardID != file.ShardID { + continue + } + + // Skip if shard metadata was not imported. + newID, ok := shardIDMap[file.ShardID] + if !ok { + b.logger.Warn("Meta info not found, skipping file", zap.Uint64("shard", file.ShardID), zap.String("bucket_id", file.BucketID), zap.String("filename", file.FileName)) + return nil + } + + if err := b.restoreShard(ctx, newID, file); err != nil { + return err + } + } + + return nil +} + +func (b *cmdRestoreBuilder) restoreShard(ctx context.Context, newShardID uint64, file *influxdb.ManifestEntry) error { + b.logger.Info("Restoring shard live from backup", zap.Uint64("shard", newShardID), zap.String("filename", file.FileName)) + + f, err := os.Open(filepath.Join(b.path, file.FileName)) + if err != nil { + return err + } + defer f.Close() + + gr, err := gzip.NewReader(f) + if err != nil { + return err + } + defer gr.Close() + + return b.restoreService.RestoreShard(ctx, newShardID, gr) +} + +// loadIncremental loads multiple manifest files from a given directory. +func (b *cmdRestoreBuilder) loadIncremental() error { + // Read all manifest files from path, sort in descending time. + manifests, err := filepath.Glob(filepath.Join(b.path, "*.manifest")) + if err != nil { + return err + } else if len(manifests) == 0 { + return nil + } + sort.Sort(sort.Reverse(sort.StringSlice(manifests))) + + b.shardEntries = make(map[uint64]*influxdb.ManifestEntry) + for _, filename := range manifests { + // Skip file if it is a directory. + if fi, err := os.Stat(filename); err != nil { + return err + } else if fi.IsDir() { + continue + } + + // Read manifest file for backup. + var manifest influxdb.Manifest + if buf, err := ioutil.ReadFile(filename); err != nil { + return err + } else if err := json.Unmarshal(buf, &manifest); err != nil { + return fmt.Errorf("read manifest: %v", err) + } + + // Save latest KV entry. + if b.kvEntry == nil { + b.kvEntry = &manifest.KV + } + + // Load most recent backup per shard. + for i := range manifest.Files { + sh := manifest.Files[i] + if _, err := os.Stat(filepath.Join(b.path, sh.FileName)); err != nil { + continue + } + + entry := b.shardEntries[sh.ShardID] + if entry == nil || sh.LastModified.After(entry.LastModified) { + b.shardEntries[sh.ShardID] = &sh + } + } + } + + return nil +} + +func (b *cmdRestoreBuilder) newCmd(use string, runE func(*cobra.Command, []string) error) *cobra.Command { + cmd := b.genericCLIOpts.newCmd(use, runE, true) + b.genericCLIOpts.registerPrintOptions(cmd) + b.globalFlags.registerFlags(cmd) + return cmd +} diff --git a/cmd/influxd/launcher/engine.go b/cmd/influxd/launcher/engine.go index 78c653989a..c12af95de7 100644 --- a/cmd/influxd/launcher/engine.go +++ b/cmd/influxd/launcher/engine.go @@ -27,6 +27,7 @@ type Engine interface { storage.EngineSchema prom.PrometheusCollector influxdb.BackupService + influxdb.RestoreService SeriesCardinality(orgID, bucketID influxdb.ID) int64 @@ -157,16 +158,20 @@ func (t *TemporaryEngine) Flush(ctx context.Context) { } } -func (t *TemporaryEngine) CreateBackup(ctx context.Context) (int, []string, error) { - return t.engine.CreateBackup(ctx) +func (t *TemporaryEngine) BackupKVStore(ctx context.Context, w io.Writer) error { + return t.engine.BackupKVStore(ctx, w) } -func (t *TemporaryEngine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { - return t.engine.FetchBackupFile(ctx, backupID, backupFile, w) +func (t *TemporaryEngine) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (map[uint64]uint64, error) { + return t.engine.RestoreBucket(ctx, id, dbi) } -func (t *TemporaryEngine) InternalBackupPath(backupID int) string { - return t.engine.InternalBackupPath(backupID) +func (t *TemporaryEngine) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error { + return t.engine.BackupShard(ctx, w, shardID) +} + +func (t *TemporaryEngine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error { + return t.engine.RestoreShard(ctx, shardID, r) } func (t *TemporaryEngine) TSDBStore() storage.TSDBStore { diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index a4408a2e63..a4cfa75579 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -854,9 +854,10 @@ func (m *Launcher) run(ctx context.Context) (err error) { m.reg.MustRegister(m.engine.PrometheusCollectors()...) var ( - deleteService platform.DeleteService = m.engine - pointsWriter storage.PointsWriter = m.engine - backupService platform.BackupService = m.engine + deleteService platform.DeleteService = m.engine + pointsWriter storage.PointsWriter = m.engine + backupService platform.BackupService = m.engine + restoreService platform.RestoreService = m.engine ) deps, err := influxdb.NewDependencies( @@ -1208,7 +1209,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { }, DeleteService: deleteService, BackupService: backupService, - KVBackupService: m.kvService, + RestoreService: restoreService, AuthorizationService: authSvc, AuthorizerV1: authorizerV1, AlgoWProxy: &http.NoopProxyHandler{}, diff --git a/cmd/influxd/restore/command.go b/cmd/influxd/restore/command.go deleted file mode 100644 index 7617e54a0a..0000000000 --- a/cmd/influxd/restore/command.go +++ /dev/null @@ -1,304 +0,0 @@ -package restore - -import ( - "fmt" - "io" - "os" - "path/filepath" - "strings" - - "github.com/influxdata/influxdb/v2/bolt" - "github.com/influxdata/influxdb/v2/internal/fs" - "github.com/influxdata/influxdb/v2/kit/cli" - "github.com/spf13/cobra" -) - -var Command = &cobra.Command{ - Use: "restore", - Short: "Restore data and metadata from a backup", - Long: ` -This command restores data and metadata from a backup fileset. - -Any existing metadata and data will be temporarily moved while restore runs -and deleted after restore completes. - -Rebuilding the index and series file uses default options as in -"influxd inspect build-tsi" with the given target engine path. -For additional performance options, run restore with "-rebuild-index false" -and build-tsi afterwards. - -NOTES: - -* The influxd server should not be running when using the restore tool - as it replaces all data and metadata. -`, - Args: cobra.ExactArgs(0), - RunE: restoreE, -} - -var flags struct { - boltPath string - enginePath string - credPath string - backupPath string - rebuildTSI bool -} - -func init() { - dir, err := fs.InfluxDir() - if err != nil { - panic(fmt.Errorf("failed to determine influx directory: %s", err)) - } - - Command.Flags().SortFlags = false - - pfs := Command.PersistentFlags() - pfs.SortFlags = false - - opts := []cli.Opt{ - { - DestP: &flags.boltPath, - Flag: "bolt-path", - Default: filepath.Join(dir, bolt.DefaultFilename), - Desc: "path to target boltdb database", - }, - { - DestP: &flags.enginePath, - Flag: "engine-path", - Default: filepath.Join(dir, "engine"), - Desc: "path to target persistent engine files", - }, - { - DestP: &flags.credPath, - Flag: "credentials-path", - Default: filepath.Join(dir, fs.DefaultTokenFile), - Desc: "path to target credentials file", - }, - { - DestP: &flags.backupPath, - Flag: "backup-path", - Default: "", - Desc: "path to backup files", - }, - { - DestP: &flags.rebuildTSI, - Flag: "rebuild-index", - Default: true, - Desc: "if true, rebuild the TSI index and series file based on the given engine path (equivalent to influxd inspect build-tsi)", - }, - } - - cli.BindOptions(Command, opts) -} - -func restoreE(cmd *cobra.Command, args []string) error { - if flags.backupPath == "" { - return fmt.Errorf("no backup path given") - } - - if err := moveBolt(); err != nil { - return fmt.Errorf("failed to move existing bolt file: %v", err) - } - - if err := moveCredentials(); err != nil { - return fmt.Errorf("failed to move existing credentials file: %v", err) - } - - if err := moveEngine(); err != nil { - return fmt.Errorf("failed to move existing engine data: %v", err) - } - - if err := restoreBolt(); err != nil { - return fmt.Errorf("failed to restore bolt file: %v", err) - } - - if err := restoreCred(); err != nil { - return fmt.Errorf("failed to restore credentials file: %v", err) - } - - if err := restoreEngine(); err != nil { - return fmt.Errorf("failed to restore all TSM files: %v", err) - } - - if flags.rebuildTSI { - // FIXME: Implement rebuildTSI - panic("not implemented") - //sFilePath := filepath.Join(flags.enginePath, storage.DefaultSeriesFileDirectoryName) - //indexPath := filepath.Join(flags.enginePath, storage.DefaultIndexDirectoryName) - - //rebuild := inspect.NewBuildTSICommand() - //rebuild.SetArgs([]string{"--sfile-path", sFilePath, "--tsi-path", indexPath}) - //rebuild.Execute() - } - - if err := removeTmpBolt(); err != nil { - return fmt.Errorf("restore completed, but failed to cleanup temporary bolt file: %v", err) - } - - if err := removeTmpCred(); err != nil { - return fmt.Errorf("restore completed, but failed to cleanup temporary credentials file: %v", err) - } - - if err := removeTmpEngine(); err != nil { - return fmt.Errorf("restore completed, but failed to cleanup temporary engine data: %v", err) - } - - return nil -} - -func moveBolt() error { - if _, err := os.Stat(flags.boltPath); os.IsNotExist(err) { - return nil - } else if err != nil { - return err - } - - if err := removeTmpBolt(); err != nil { - return err - } - - return os.Rename(flags.boltPath, flags.boltPath+".tmp") -} - -func moveCredentials() error { - if _, err := os.Stat(flags.credPath); os.IsNotExist(err) { - return nil - } else if err != nil { - return err - } - - if err := removeTmpCred(); err != nil { - return err - } - - return os.Rename(flags.credPath, flags.credPath+".tmp") -} - -func moveEngine() error { - if _, err := os.Stat(flags.enginePath); os.IsNotExist(err) { - return nil - } else if err != nil { - return err - } - - if err := removeTmpEngine(); err != nil { - return err - } - - if err := os.Rename(flags.enginePath, tmpEnginePath()); err != nil { - return err - } - - return os.MkdirAll(flags.enginePath, 0777) -} - -func tmpEnginePath() string { - return filepath.Dir(flags.enginePath) + "tmp" -} - -func removeTmpBolt() error { - return removeIfExists(flags.boltPath + ".tmp") -} - -func removeTmpEngine() error { - return removeIfExists(tmpEnginePath()) -} - -func removeTmpCred() error { - return removeIfExists(flags.credPath + ".tmp") -} - -func removeIfExists(path string) error { - if _, err := os.Stat(path); os.IsNotExist(err) { - return nil - } else if err != nil { - return err - } else { - return os.RemoveAll(path) - } -} - -func restoreBolt() error { - backupBolt := filepath.Join(flags.backupPath, bolt.DefaultFilename) - - if err := restoreFile(backupBolt, flags.boltPath, "bolt"); err != nil { - return err - } - - fmt.Printf("Restored Bolt to %s from %s\n", flags.boltPath, backupBolt) - return nil -} - -func restoreEngine() error { - dataDir := filepath.Join(flags.enginePath, "/data") - if err := os.MkdirAll(dataDir, 0777); err != nil { - return err - } - - count := 0 - err := filepath.Walk(flags.backupPath, func(path string, info os.FileInfo, err error) error { - if strings.Contains(path, ".tsm") { - f, err := os.OpenFile(path, os.O_RDONLY, 0666) - if err != nil { - return fmt.Errorf("error opening TSM file: %v", err) - } - defer f.Close() - - tsmPath := filepath.Join(dataDir, filepath.Base(path)) - w, err := os.OpenFile(tsmPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) - if err != nil { - return err - } - defer w.Close() - - _, err = io.Copy(w, f) - if err != nil { - return err - } - count++ - return nil - } - return nil - }) - fmt.Printf("Restored %d TSM files to %v\n", count, dataDir) - return err -} - -func restoreFile(backup string, target string, filetype string) error { - f, err := os.Open(backup) - if err != nil { - return fmt.Errorf("no %s file in backup: %v", filetype, err) - } - defer f.Close() - - if err := os.MkdirAll(filepath.Dir(target), 0777); err != nil { - return err - } - w, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) - if err != nil && !os.IsNotExist(err) { - return err - } - defer w.Close() - - _, err = io.Copy(w, f) - return err -} - -func restoreCred() error { - backupCred := filepath.Join(flags.backupPath, fs.DefaultTokenFile) - - _, err := os.Stat(backupCred) - if os.IsNotExist(err) { - fmt.Printf("No credentials file found in backup, skipping.\n") - return nil - } else if err != nil { - return err - } - - if err := restoreFile(backupCred, flags.credPath, "credentials"); err != nil { - return err - } - - fmt.Printf("Restored credentials to %s from %s\n", flags.credPath, backupCred) - return nil -} diff --git a/http/api_handler.go b/http/api_handler.go index e880d101f9..d8d5376c11 100644 --- a/http/api_handler.go +++ b/http/api_handler.go @@ -58,7 +58,7 @@ type APIBackend struct { PointsWriter storage.PointsWriter DeleteService influxdb.DeleteService BackupService influxdb.BackupService - KVBackupService influxdb.KVBackupService + RestoreService influxdb.RestoreService AuthorizationService influxdb.AuthorizationService AuthorizerV1 influxdb.AuthorizerV1 OnboardingService influxdb.OnboardingService @@ -193,6 +193,10 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler { backupBackend.BackupService = authorizer.NewBackupService(backupBackend.BackupService) h.Mount(prefixBackup, NewBackupHandler(backupBackend)) + restoreBackend := NewRestoreBackend(b) + restoreBackend.RestoreService = authorizer.NewRestoreService(restoreBackend.RestoreService) + h.Mount(prefixRestore, NewRestoreHandler(restoreBackend)) + h.Mount(dbrp.PrefixDBRP, dbrp.NewHTTPHandler(b.Logger, b.DBRPService, b.OrganizationService)) writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b) @@ -234,6 +238,7 @@ var apiLinks = map[string]interface{}{ "analyze": "/api/v2/query/analyze", "suggestions": "/api/v2/query/suggestions", }, + "restore": "/api/v2/restore", "setup": "/api/v2/setup", "signin": "/api/v2/signin", "signout": "/api/v2/signout", diff --git a/http/backup_service.go b/http/backup_service.go index f840c14dda..ddc2acbc06 100644 --- a/http/backup_service.go +++ b/http/backup_service.go @@ -2,23 +2,18 @@ package http import ( "context" - "encoding/json" "fmt" "io" - "io/ioutil" "net/http" - "os" - "path" "path/filepath" "strconv" "time" "github.com/influxdata/httprouter" "github.com/influxdata/influxdb/v2" - "github.com/influxdata/influxdb/v2/bolt" + // "github.com/influxdata/influxdb/v2/bolt" "github.com/influxdata/influxdb/v2/internal/fs" "github.com/influxdata/influxdb/v2/kit/tracing" - "go.uber.org/multierr" "go.uber.org/zap" ) @@ -27,8 +22,7 @@ type BackupBackend struct { Logger *zap.Logger influxdb.HTTPErrorHandler - BackupService influxdb.BackupService - KVBackupService influxdb.KVBackupService + BackupService influxdb.BackupService } // NewBackupBackend returns a new instance of BackupBackend. @@ -38,7 +32,6 @@ func NewBackupBackend(b *APIBackend) *BackupBackend { HTTPErrorHandler: b.HTTPErrorHandler, BackupService: b.BackupService, - KVBackupService: b.KVBackupService, } } @@ -48,23 +41,17 @@ type BackupHandler struct { influxdb.HTTPErrorHandler Logger *zap.Logger - BackupService influxdb.BackupService - KVBackupService influxdb.KVBackupService + BackupService influxdb.BackupService } const ( - prefixBackup = "/api/v2/backup" - backupIDParamName = "backup_id" - backupFileParamName = "backup_file" - backupFilePath = prefixBackup + "/:" + backupIDParamName + "/file/:" + backupFileParamName + prefixBackup = "/api/v2/backup" + backupKVStorePath = prefixBackup + "/kv" + backupShardPath = prefixBackup + "/shards/:shardID" httpClientTimeout = time.Hour ) -func composeBackupFilePath(backupID int, backupFile string) string { - return path.Join(prefixBackup, fmt.Sprint(backupID), "file", fmt.Sprint(backupFile)) -} - // NewBackupHandler creates a new handler at /api/v2/backup to receive backup requests. func NewBackupHandler(b *BackupBackend) *BackupHandler { h := &BackupHandler{ @@ -72,107 +59,40 @@ func NewBackupHandler(b *BackupBackend) *BackupHandler { Router: NewRouter(b.HTTPErrorHandler), Logger: b.Logger, BackupService: b.BackupService, - KVBackupService: b.KVBackupService, } - h.HandlerFunc(http.MethodPost, prefixBackup, h.handleCreate) - h.HandlerFunc(http.MethodGet, backupFilePath, h.handleFetchFile) + h.HandlerFunc(http.MethodGet, backupKVStorePath, h.handleBackupKVStore) + h.HandlerFunc(http.MethodGet, backupShardPath, h.handleBackupShard) return h } -type backup struct { - ID int `json:"id,omitempty"` - Files []string `json:"files,omitempty"` -} - -func (h *BackupHandler) handleCreate(w http.ResponseWriter, r *http.Request) { - span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleCreate") +func (h *BackupHandler) handleBackupKVStore(w http.ResponseWriter, r *http.Request) { + span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleBackupKVStore") defer span.Finish() ctx := r.Context() - id, files, err := h.BackupService.CreateBackup(ctx) - if err != nil { - h.HandleHTTPError(ctx, err, w) - return - } - - internalBackupPath := h.BackupService.InternalBackupPath(id) - - boltPath := filepath.Join(internalBackupPath, bolt.DefaultFilename) - boltFile, err := os.OpenFile(boltPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0660) - if err != nil { - err = multierr.Append(err, os.RemoveAll(internalBackupPath)) - h.HandleHTTPError(ctx, err, w) - return - } - - if err = h.KVBackupService.Backup(ctx, boltFile); err != nil { - err = multierr.Append(err, os.RemoveAll(internalBackupPath)) - h.HandleHTTPError(ctx, err, w) - return - } - - files = append(files, bolt.DefaultFilename) - - credsExist, err := h.backupCredentials(internalBackupPath) - - if err != nil { - h.HandleHTTPError(ctx, err, w) - return - } - - if credsExist { - files = append(files, fs.DefaultConfigsFile) - } - - b := backup{ - ID: id, - Files: files, - } - if err = json.NewEncoder(w).Encode(&b); err != nil { - err = multierr.Append(err, os.RemoveAll(internalBackupPath)) + if err := h.BackupService.BackupKVStore(ctx, w); err != nil { h.HandleHTTPError(ctx, err, w) return } } -func (h *BackupHandler) backupCredentials(internalBackupPath string) (bool, error) { - credBackupPath := filepath.Join(internalBackupPath, fs.DefaultConfigsFile) - - credPath, err := defaultConfigsPath() - if err != nil { - return false, err - } - token, err := ioutil.ReadFile(credPath) - if err != nil && !os.IsNotExist(err) { - return false, err - } else if os.IsNotExist(err) { - return false, nil - } - - if err := ioutil.WriteFile(credBackupPath, []byte(token), 0600); err != nil { - return false, err - } - return true, nil -} - -func (h *BackupHandler) handleFetchFile(w http.ResponseWriter, r *http.Request) { - span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleFetchFile") +func (h *BackupHandler) handleBackupShard(w http.ResponseWriter, r *http.Request) { + span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleBackupShard") defer span.Finish() ctx := r.Context() params := httprouter.ParamsFromContext(ctx) - backupID, err := strconv.Atoi(params.ByName("backup_id")) + shardID, err := strconv.ParseUint(params.ByName("shardID"), 10, 64) if err != nil { h.HandleHTTPError(ctx, err, w) return } - backupFile := params.ByName("backup_file") - if err = h.BackupService.FetchBackupFile(ctx, backupID, backupFile, w); err != nil { + if err := h.BackupService.BackupShard(ctx, w, shardID); err != nil { h.HandleHTTPError(ctx, err, w) return } @@ -185,47 +105,11 @@ type BackupService struct { InsecureSkipVerify bool } -func (s *BackupService) CreateBackup(ctx context.Context) (int, []string, error) { +func (s *BackupService) BackupKVStore(ctx context.Context, w io.Writer) error { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() - u, err := NewURL(s.Addr, prefixBackup) - if err != nil { - return 0, nil, err - } - - req, err := http.NewRequest(http.MethodPost, u.String(), nil) - if err != nil { - return 0, nil, err - } - SetToken(s.Token, req) - req = req.WithContext(ctx) - - hc := NewClient(u.Scheme, s.InsecureSkipVerify) - hc.Timeout = httpClientTimeout - resp, err := hc.Do(req) - if err != nil { - return 0, nil, err - } - defer resp.Body.Close() - - if err := CheckError(resp); err != nil { - return 0, nil, err - } - - var b backup - if err = json.NewDecoder(resp.Body).Decode(&b); err != nil { - return 0, nil, err - } - - return b.ID, b.Files, nil -} - -func (s *BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { - span, ctx := tracing.StartSpanFromContext(ctx) - defer span.Finish() - - u, err := NewURL(s.Addr, composeBackupFilePath(backupID, backupFile)) + u, err := NewURL(s.Addr, prefixBackup+"/kv") if err != nil { return err } @@ -249,12 +133,44 @@ func (s *BackupService) FetchBackupFile(ctx context.Context, backupID int, backu return err } - _, err = io.Copy(w, resp.Body) + if _, err := io.Copy(w, resp.Body); err != nil { + return err + } + return resp.Body.Close() +} + +func (s *BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + u, err := NewURL(s.Addr, fmt.Sprintf(prefixBackup+"/shards/%d", shardID)) if err != nil { return err } - return nil + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return err + } + SetToken(s.Token, req) + req = req.WithContext(ctx) + + hc := NewClient(u.Scheme, s.InsecureSkipVerify) + hc.Timeout = httpClientTimeout + resp, err := hc.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + return err + } + + if _, err := io.Copy(w, resp.Body); err != nil { + return err + } + return resp.Body.Close() } func defaultConfigsPath() (string, error) { @@ -264,7 +180,3 @@ func defaultConfigsPath() (string, error) { } return filepath.Join(dir, fs.DefaultConfigsFile), nil } - -func (s *BackupService) InternalBackupPath(backupID int) string { - panic("internal method not implemented here") -} diff --git a/http/restore_service.go b/http/restore_service.go new file mode 100644 index 0000000000..4ebbe91387 --- /dev/null +++ b/http/restore_service.go @@ -0,0 +1,189 @@ +package http + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "strconv" + + "github.com/influxdata/httprouter" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/tracing" + "go.uber.org/zap" +) + +// RestoreBackend is all services and associated parameters required to construct the RestoreHandler. +type RestoreBackend struct { + Logger *zap.Logger + influxdb.HTTPErrorHandler + + RestoreService influxdb.RestoreService +} + +// NewRestoreBackend returns a new instance of RestoreBackend. +func NewRestoreBackend(b *APIBackend) *RestoreBackend { + return &RestoreBackend{ + Logger: b.Logger.With(zap.String("handler", "restore")), + + HTTPErrorHandler: b.HTTPErrorHandler, + RestoreService: b.RestoreService, + } +} + +// RestoreHandler is http handler for restore service. +type RestoreHandler struct { + *httprouter.Router + influxdb.HTTPErrorHandler + Logger *zap.Logger + + RestoreService influxdb.RestoreService +} + +const ( + prefixRestore = "/api/v2/restore" + restoreBucketPath = prefixRestore + "/buckets/:bucketID" + restoreShardPath = prefixRestore + "/shards/:shardID" +) + +// NewRestoreHandler creates a new handler at /api/v2/restore to receive restore requests. +func NewRestoreHandler(b *RestoreBackend) *RestoreHandler { + h := &RestoreHandler{ + HTTPErrorHandler: b.HTTPErrorHandler, + Router: NewRouter(b.HTTPErrorHandler), + Logger: b.Logger, + RestoreService: b.RestoreService, + } + + h.HandlerFunc(http.MethodPost, restoreBucketPath, h.handleRestoreBucket) + h.HandlerFunc(http.MethodPost, restoreShardPath, h.handleRestoreShard) + + return h +} + +func (h *RestoreHandler) handleRestoreBucket(w http.ResponseWriter, r *http.Request) { + span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreBucket") + defer span.Finish() + + ctx := r.Context() + + // Read bucket ID. + bucketID, err := decodeIDFromCtx(r.Context(), "bucketID") + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + // Read serialized DBI data. + buf, err := ioutil.ReadAll(r.Body) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + shardIDMap, err := h.RestoreService.RestoreBucket(ctx, bucketID, buf) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + if err := json.NewEncoder(w).Encode(shardIDMap); err != nil { + h.HandleHTTPError(ctx, err, w) + return + } +} + +func (h *RestoreHandler) handleRestoreShard(w http.ResponseWriter, r *http.Request) { + span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreShard") + defer span.Finish() + + ctx := r.Context() + + params := httprouter.ParamsFromContext(ctx) + shardID, err := strconv.ParseUint(params.ByName("shardID"), 10, 64) + if err != nil { + h.HandleHTTPError(ctx, err, w) + return + } + + if err := h.RestoreService.RestoreShard(ctx, shardID, r.Body); err != nil { + h.HandleHTTPError(ctx, err, w) + return + } +} + +// RestoreService is the client implementation of influxdb.RestoreService. +type RestoreService struct { + Addr string + Token string + InsecureSkipVerify bool +} + +func (s *RestoreService) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (map[uint64]uint64, error) { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + u, err := NewURL(s.Addr, prefixRestore+fmt.Sprintf("/buckets/%s", id.String())) + if err != nil { + return nil, err + } + + req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewReader(dbi)) + if err != nil { + return nil, err + } + SetToken(s.Token, req) + req = req.WithContext(ctx) + + hc := NewClient(u.Scheme, s.InsecureSkipVerify) + hc.Timeout = httpClientTimeout + resp, err := hc.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + return nil, err + } + + shardIDMap := make(map[uint64]uint64) + if err := json.NewDecoder(resp.Body).Decode(&shardIDMap); err != nil { + return nil, err + } + return shardIDMap, nil +} + +func (s *RestoreService) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + u, err := NewURL(s.Addr, fmt.Sprintf(prefixRestore+"/shards/%d", shardID)) + if err != nil { + return err + } + + req, err := http.NewRequest(http.MethodPost, u.String(), r) + if err != nil { + return err + } + SetToken(s.Token, req) + req = req.WithContext(ctx) + + hc := NewClient(u.Scheme, s.InsecureSkipVerify) + hc.Timeout = httpClientTimeout + resp, err := hc.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + return err + } + + return nil +} diff --git a/storage/bucket_service.go b/storage/bucket_service.go index 8f86ced624..df9335638d 100644 --- a/storage/bucket_service.go +++ b/storage/bucket_service.go @@ -33,6 +33,7 @@ func NewBucketService(log *zap.Logger, s influxdb.BucketService, engine EngineSc BucketService: s, log: log, engine: engine, + log: logger, } } diff --git a/storage/engine.go b/storage/engine.go index d9edcb536a..c90c603b2b 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -79,6 +79,9 @@ type MetaClient interface { RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error + Backup(ctx context.Context, w io.Writer) error + Data() meta.Data + SetData(data *meta.Data) error } type TSDBStore interface { @@ -306,41 +309,105 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID return ErrNotImplemented } -// CreateBackup creates a "snapshot" of all TSM data in the Engine. -// 1) Snapshot the cache to ensure the backup includes all data written before now. -// 2) Create hard links to all TSM files, in a new directory within the engine root directory. -// 3) Return a unique backup ID (invalid after the process terminates) and list of files. -// -// TODO - do we need this? -// -func (e *Engine) CreateBackup(ctx context.Context) (int, []string, error) { +func (e *Engine) BackupKVStore(ctx context.Context, w io.Writer) error { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() - if e.closing == nil { - return 0, nil, ErrEngineClosed - } - - return 0, nil, nil -} - -// FetchBackupFile writes a given backup file to the provided writer. -// After a successful write, the internal copy is removed. -func (e *Engine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { - // TODO - need? - return nil -} - -// InternalBackupPath provides the internal, full path directory name of the backup. -// This should not be exposed via API. -func (e *Engine) InternalBackupPath(backupID int) string { e.mu.RLock() defer e.mu.RUnlock() + if e.closing == nil { - return "" + return ErrEngineClosed } - // TODO - need? - return "" + + return e.metaClient.Backup(ctx, w) +} + +func (e *Engine) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error { + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + e.mu.RLock() + defer e.mu.RUnlock() + + if e.closing == nil { + return ErrEngineClosed + } + + return e.tsdbStore.BackupShard(shardID, time.Time{}, w) +} + +func (e *Engine) RestoreBucket(ctx context.Context, id influxdb.ID, buf []byte) (map[uint64]uint64, error) { + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + e.mu.RLock() + defer e.mu.RUnlock() + + if e.closing == nil { + return nil, ErrEngineClosed + } + + var newDBI meta.DatabaseInfo + if err := newDBI.UnmarshalBinary(buf); err != nil { + return nil, err + } + + data := e.metaClient.Data() + dbi := data.Database(id.String()) + if dbi == nil { + return nil, fmt.Errorf("bucket dbi for %q not found during restore", newDBI.Name) + } else if len(newDBI.RetentionPolicies) != 1 { + return nil, fmt.Errorf("bucket must have 1 retention policy; attempting to restore %d retention policies", len(newDBI.RetentionPolicies)) + } + + dbi.RetentionPolicies = newDBI.RetentionPolicies + dbi.ContinuousQueries = newDBI.ContinuousQueries + + // Generate shard ID mapping. + shardIDMap := make(map[uint64]uint64) + rpi := newDBI.RetentionPolicies[0] + for j, sgi := range rpi.ShardGroups { + data.MaxShardGroupID++ + rpi.ShardGroups[j].ID = data.MaxShardGroupID + + for k := range sgi.Shards { + data.MaxShardID++ + shardIDMap[sgi.Shards[k].ID] = data.MaxShardID + sgi.Shards[k].ID = data.MaxShardID + sgi.Shards[k].Owners = []meta.ShardOwner{} + } + } + + // Update data. + if err := e.metaClient.SetData(&data); err != nil { + return nil, err + } + + // Create shards. + for _, sgi := range rpi.ShardGroups { + for _, sh := range sgi.Shards { + if err := e.tsdbStore.CreateShard(dbi.Name, rpi.Name, sh.ID, true); err != nil { + return nil, err + } + } + } + + return shardIDMap, nil +} + +func (e *Engine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error { + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + e.mu.RLock() + defer e.mu.RUnlock() + + if e.closing == nil { + return ErrEngineClosed + } + + return e.tsdbStore.RestoreShard(shardID, r) } // SeriesCardinality returns the number of series in the engine. diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index cd4472d601..93ae652e43 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1199,15 +1199,7 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as return "", nil } - nativeFileName := filepath.FromSlash(hdr.Name) - // Skip file if it does not have a matching prefix. - if !strings.HasPrefix(nativeFileName, shardRelativePath) { - return "", nil - } - filename, err := filepath.Rel(shardRelativePath, nativeFileName) - if err != nil { - return "", err - } + filename := filepath.Base(filepath.FromSlash(hdr.Name)) // If this is a directory entry (usually just `index` for tsi), create it an move on. if hdr.Typeflag == tar.TypeDir { diff --git a/v1/services/meta/client.go b/v1/services/meta/client.go index fd1ec402e3..2d4466c3de 100644 --- a/v1/services/meta/client.go +++ b/v1/services/meta/client.go @@ -1036,6 +1036,10 @@ func (c *Client) Load() error { }) } +func (c *Client) Backup(ctx context.Context, w io.Writer) error { + return c.store.Backup(ctx, w) +} + type uint64Slice []uint64 func (a uint64Slice) Len() int { return len(a) } diff --git a/v1/services/meta/data.go b/v1/services/meta/data.go index fa2c8b6650..709dc32f99 100644 --- a/v1/services/meta/data.go +++ b/v1/services/meta/data.go @@ -994,6 +994,21 @@ func (di DatabaseInfo) clone() DatabaseInfo { return other } +// MarshalBinary encodes dbi to a binary format. +func (dbi *DatabaseInfo) MarshalBinary() ([]byte, error) { + return proto.Marshal(dbi.marshal()) +} + +// UnmarshalBinary decodes dbi from a binary format. +func (dbi *DatabaseInfo) UnmarshalBinary(data []byte) error { + var pb internal.DatabaseInfo + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + dbi.unmarshal(&pb) + return nil +} + // marshal serializes to a protobuf representation. func (di DatabaseInfo) marshal() *internal.DatabaseInfo { pb := &internal.DatabaseInfo{}