feat: Implement backup/restore CLI subcommands.

pull/19864/head
Ben Johnson 2020-10-29 16:43:02 -06:00
parent 89773ba30b
commit 23679c2375
17 changed files with 1129 additions and 571 deletions

View File

@ -23,26 +23,24 @@ func NewBackupService(s influxdb.BackupService) *BackupService {
}
}
func (b BackupService) CreateBackup(ctx context.Context) (int, []string, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
return 0, nil, err
}
return b.s.CreateBackup(ctx)
}
func (b BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
func (b BackupService) BackupKVStore(ctx context.Context, w io.Writer) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
// TODO(bbj): Correct permissions.
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
return err
}
return b.s.FetchBackupFile(ctx, backupID, backupFile, w)
return b.s.BackupKVStore(ctx, w)
}
func (b BackupService) InternalBackupPath(backupID int) string {
return b.s.InternalBackupPath(backupID)
func (b BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
// TODO(bbj): Correct permissions.
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
return err
}
return b.s.BackupShard(ctx, w, shardID)
}

46
authorizer/restore.go Normal file
View File

@ -0,0 +1,46 @@
package authorizer
import (
"context"
"io"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
)
var _ influxdb.RestoreService = (*RestoreService)(nil)
// RestoreService wraps a influxdb.RestoreService and authorizes actions
// against it appropriately.
type RestoreService struct {
s influxdb.RestoreService
}
// NewRestoreService constructs an instance of an authorizing restore service.
func NewRestoreService(s influxdb.RestoreService) *RestoreService {
return &RestoreService{
s: s,
}
}
func (b RestoreService) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (shardIDMap map[uint64]uint64, err error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
// TODO(bbj): Correct permissions.
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
return nil, err
}
return b.s.RestoreBucket(ctx, id, dbi)
}
func (b RestoreService) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
// TODO(bbj): Correct permissions.
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
return err
}
return b.s.RestoreShard(ctx, shardID, r)
}

View File

@ -3,21 +3,67 @@ package influxdb
import (
"context"
"io"
"time"
)
const (
BackupFilenamePattern = "20060102T150405Z"
)
// BackupService represents the data backup functions of InfluxDB.
type BackupService interface {
// CreateBackup creates a local copy (hard links) of the TSM data for all orgs and buckets.
// The return values are used to download each backup file.
CreateBackup(context.Context) (backupID int, backupFiles []string, err error)
// FetchBackupFile downloads one backup file, data or metadata.
FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error
// InternalBackupPath is a utility to determine the on-disk location of a backup fileset.
InternalBackupPath(backupID int) string
// BackupKVStore creates a live backup copy of the metadata database.
BackupKVStore(ctx context.Context, w io.Writer) error
// BackupShard downloads a backup file for a single shard.
BackupShard(ctx context.Context, w io.Writer, shardID uint64) error
}
// KVBackupService represents the meta data backup functions of InfluxDB.
type KVBackupService interface {
// Backup creates a live backup copy of the metadata database.
Backup(ctx context.Context, w io.Writer) error
// RestoreService represents the data restore functions of InfluxDB.
type RestoreService interface {
// RestoreKVStore restores the metadata database.
RestoreBucket(ctx context.Context, id ID, rpiData []byte) (shardIDMap map[uint64]uint64, err error)
// RestoreShard uploads a backup file for a single shard.
RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error
}
// Manifest lists the KV and shard file information contained in the backup.
// If Limited is false, the manifest contains a full backup, otherwise
// it is a partial backup.
type Manifest struct {
KV ManifestKVEntry `json:"kv"`
Files []ManifestEntry `json:"files"`
// If limited is true, then one (or all) of the following fields will be set
OrganizationID string `json:"organizationID,omitempty"`
BucketID string `json:"bucketID,omitempty"`
}
// ManifestEntry contains the data information for a backed up shard.
type ManifestEntry struct {
OrganizationID string `json:"organizationID"`
OrganizationName string `json:"organizationName"`
BucketID string `json:"bucketID"`
BucketName string `json:"bucketName"`
ShardID uint64 `json:"shardID"`
FileName string `json:"fileName"`
Size int64 `json:"size"`
LastModified time.Time `json:"lastModified"`
}
// ManifestKVEntry contains the KV store information for a backup.
type ManifestKVEntry struct {
FileName string `json:"fileName"`
Size int64 `json:"size"`
}
// Size returns the size of the manifest.
func (m *Manifest) Size() int64 {
n := m.KV.Size
for _, f := range m.Files {
n += f.Size
}
return n
}

View File

@ -1,97 +1,330 @@
package main
import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/kv"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/spf13/cobra"
"go.uber.org/multierr"
"go.uber.org/zap"
)
func cmdBackup(f *globalFlags, opt genericCLIOpts) *cobra.Command {
cmd := opt.newCmd("backup", backupF, false)
cmd.Short = "Backup the data in InfluxDB"
cmd.Long = fmt.Sprintf(
`Backs up data and meta data for the running InfluxDB instance.
Downloaded files are written to the directory indicated by --path.
The target directory, and any parent directories, are created automatically.
Data file have extension .tsm; meta data is written to %s in the same directory.`,
bolt.DefaultFilename)
func cmdBackup(f *globalFlags, opts genericCLIOpts) *cobra.Command {
return newCmdBackupBuilder(f, opts).cmdBackup()
}
f.registerFlags(cmd)
type backupSVCsFn func() (influxdb.BackupService, error)
opts := flagOpts{
{
DestP: &backupFlags.Path,
Flag: "path",
Short: 'p',
EnvVar: "PATH",
Desc: "directory path to write backup files to",
Required: true,
},
type cmdBackupBuilder struct {
genericCLIOpts
*globalFlags
bucketID string
bucketName string
org organization
outputPath string
manifest influxdb.Manifest
baseName string
backupService *http.BackupService
kvStore *bolt.KVStore
kvService *kv.Service
metaClient *meta.Client
logger *zap.Logger
}
func newCmdBackupBuilder(f *globalFlags, opts genericCLIOpts) *cmdBackupBuilder {
return &cmdBackupBuilder{
genericCLIOpts: opts,
globalFlags: f,
}
opts.mustRegister(cmd)
}
func (b *cmdBackupBuilder) cmdBackup() *cobra.Command {
cmd := b.newCmd("backup", b.backupRunE)
b.org.register(cmd, true)
cmd.Flags().StringVar(&b.bucketID, "bucket-id", "", "The ID of the bucket to backup")
cmd.Flags().StringVarP(&b.bucketName, "bucket", "b", "", "The name of the bucket to backup")
cmd.Use = "backup [flags] path"
cmd.Args = func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return fmt.Errorf("must specify output path")
} else if len(args) > 1 {
return fmt.Errorf("too many args specified")
}
b.outputPath = args[0]
return nil
}
cmd.Short = "Backup database"
cmd.Long = `
Backs up InfluxDB to a directory.
Examples:
# backup all data
influx backup /path/to/backup
`
return cmd
}
var backupFlags struct {
Path string
func (b *cmdBackupBuilder) manifestPath() string {
return fmt.Sprintf("%s.manifest", b.baseName)
}
func newBackupService() (influxdb.BackupService, error) {
ac := flags.config()
return &http.BackupService{
Addr: ac.Host,
Token: ac.Token,
}, nil
func (b *cmdBackupBuilder) kvPath() string {
return fmt.Sprintf("%s.bolt", b.baseName)
}
func backupF(cmd *cobra.Command, args []string) error {
func (b *cmdBackupBuilder) shardPath(id uint64) string {
return fmt.Sprintf("%s.s%d", b.baseName, id) + ".tar.gz"
}
func (b *cmdBackupBuilder) backupRunE(cmd *cobra.Command, args []string) (err error) {
ctx := context.Background()
if backupFlags.Path == "" {
return fmt.Errorf("must specify path")
}
err := os.MkdirAll(backupFlags.Path, 0777)
if err != nil && !os.IsExist(err) {
// Create top level logger
logconf := influxlogger.NewConfig()
if b.logger, err = logconf.New(os.Stdout); err != nil {
return err
}
backupService, err := newBackupService()
if err != nil {
// Determine a base
b.baseName = time.Now().UTC().Format(influxdb.BackupFilenamePattern)
// Ensure directory exsits.
if err := os.MkdirAll(b.outputPath, 0777); err != nil {
return err
}
id, backupFilenames, err := backupService.CreateBackup(ctx)
if err != nil {
ac := flags.config()
b.backupService = &http.BackupService{
Addr: ac.Host,
Token: ac.Token,
}
// Back up Bolt database to file.
if err := b.backupKVStore(ctx); err != nil {
return err
}
fmt.Printf("Backup ID %d contains %d files\n", id, len(backupFilenames))
// Open bolt DB.
boltClient := bolt.NewClient(b.logger)
boltClient.Path = filepath.Join(b.outputPath, b.kvPath())
if err := boltClient.Open(ctx); err != nil {
return err
}
defer boltClient.Close()
for _, backupFilename := range backupFilenames {
dest := filepath.Join(backupFlags.Path, backupFilename)
w, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return err
}
err = backupService.FetchBackupFile(ctx, id, backupFilename, w)
if err != nil {
return multierr.Append(fmt.Errorf("error fetching file %s: %v", backupFilename, err), w.Close())
}
if err = w.Close(); err != nil {
return err
}
// Open meta store so we can iterate over meta data.
b.kvStore = bolt.NewKVStore(b.logger, filepath.Join(b.outputPath, b.kvPath()))
b.kvStore.WithDB(boltClient.DB())
b.kvService = kv.NewService(b.logger, b.kvStore, kv.ServiceConfig{})
b.metaClient = meta.NewClient(meta.NewConfig(), b.kvStore)
if err := b.metaClient.Open(); err != nil {
return err
}
fmt.Printf("Backup complete")
// Filter through organizations & buckets to backup appropriate shards.
if err := b.backupOrganizations(ctx); err != nil {
return err
}
if err := b.writeManifest(ctx); err != nil {
return err
}
b.logger.Info("Backup complete")
return nil
}
// backupKVStore streams the bolt KV file to a file at path.
func (b *cmdBackupBuilder) backupKVStore(ctx context.Context) error {
path := filepath.Join(b.outputPath, b.kvPath())
b.logger.Info("Backing up KV store", zap.String("path", b.kvPath()))
// Open writer to output file.
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
// Stream bolt file from server, sync, and ensure file closes correctly.
if err := b.backupService.BackupKVStore(ctx, f); err != nil {
return err
} else if err := f.Sync(); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}
// Lookup file size.
fi, err := os.Stat(path)
if err != nil {
return err
}
b.manifest.KV = influxdb.ManifestKVEntry{
FileName: b.kvPath(),
Size: fi.Size(),
}
return nil
}
func (b *cmdBackupBuilder) backupOrganizations(ctx context.Context) (err error) {
// Build a filter if org ID or org name were specified.
var filter influxdb.OrganizationFilter
if b.org.id != "" {
if filter.ID, err = influxdb.IDFromString(b.org.id); err != nil {
return err
}
} else if b.org.name != "" {
filter.Name = &b.org.name
}
// Retrieve a list of all matching organizations.
orgs, _, err := b.kvService.FindOrganizations(ctx, filter)
if err != nil {
return err
}
// Back up buckets in each matching organization.
for _, org := range orgs {
b.logger.Info("Backing up organization", zap.String("id", org.ID.String()), zap.String("name", org.Name))
if err := b.backupBuckets(ctx, org); err != nil {
return err
}
}
return nil
}
func (b *cmdBackupBuilder) backupBuckets(ctx context.Context, org *influxdb.Organization) (err error) {
// Build a filter if bucket ID or bucket name were specified.
var filter influxdb.BucketFilter
filter.OrganizationID = &org.ID
if b.bucketID != "" {
if filter.ID, err = influxdb.IDFromString(b.bucketID); err != nil {
return err
}
} else if b.bucketName != "" {
filter.Name = &b.bucketName
}
// Retrieve a list of all matching organizations.
buckets, _, err := b.kvService.FindBuckets(ctx, filter)
if err != nil {
return err
}
// Back up shards in each matching bucket.
for _, bkt := range buckets {
if err := b.backupBucket(ctx, org, bkt); err != nil {
return err
}
}
return nil
}
func (b *cmdBackupBuilder) backupBucket(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket) (err error) {
b.logger.Info("Backing up bucket", zap.String("id", bkt.ID.String()), zap.String("name", bkt.Name))
// Lookup matching database from the meta store.
dbi := b.metaClient.Database(bkt.ID.String())
if dbi == nil {
return fmt.Errorf("bucket database not found: %s", bkt.ID.String())
}
// Iterate over and backup each shard.
for _, rpi := range dbi.RetentionPolicies {
for _, sg := range rpi.ShardGroups {
for _, sh := range sg.Shards {
if err := b.backupShard(ctx, org, bkt, rpi.Name, sh.ID); err != nil {
return err
}
}
}
}
return nil
}
// backupShard streams a tar of TSM data for shard.
func (b *cmdBackupBuilder) backupShard(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket, policy string, shardID uint64) error {
path := filepath.Join(b.outputPath, b.shardPath(shardID))
b.logger.Info("Backing up shard", zap.Uint64("id", shardID), zap.String("path", b.shardPath(shardID)))
// Open writer to output file.
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
// Wrap file writer with a gzip writer.
gw := gzip.NewWriter(f)
defer gw.Close()
// Stream file from server, sync, and ensure file closes correctly.
if err := b.backupService.BackupShard(ctx, gw, shardID); err != nil {
return err
} else if err := gw.Close(); err != nil {
return err
} else if err := f.Sync(); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}
// Determine file size.
fi, err := os.Stat(path)
if err != nil {
return err
}
// Update manifest.
b.manifest.Files = append(b.manifest.Files, influxdb.ManifestEntry{
OrganizationID: org.ID.String(),
OrganizationName: org.Name,
BucketID: bkt.ID.String(),
BucketName: bkt.Name,
ShardID: shardID,
FileName: b.shardPath(shardID),
Size: fi.Size(),
LastModified: fi.ModTime().UTC(),
})
return nil
}
// writeManifest writes the manifest file out.
func (b *cmdBackupBuilder) writeManifest(ctx context.Context) error {
path := filepath.Join(b.outputPath, b.manifestPath())
b.logger.Info("Writing manifest", zap.String("path", b.manifestPath()))
buf, err := json.MarshalIndent(b.manifest, "", " ")
if err != nil {
return fmt.Errorf("create manifest: %w", err)
}
buf = append(buf, '\n')
return ioutil.WriteFile(path, buf, 0600)
}
func (b *cmdBackupBuilder) newCmd(use string, runE func(*cobra.Command, []string) error) *cobra.Command {
cmd := b.genericCLIOpts.newCmd(use, runE, true)
b.genericCLIOpts.registerPrintOptions(cmd)
b.globalFlags.registerFlags(cmd)
return cmd
}

View File

@ -328,6 +328,7 @@ func influxCmd(opts ...genericCLIOptFn) *cobra.Command {
cmdOrganization,
cmdPing,
cmdQuery,
cmdRestore,
cmdSecret,
cmdSetup,
cmdStack,

347
cmd/influx/restore.go Normal file
View File

@ -0,0 +1,347 @@
package main
import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/kv"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
func cmdRestore(f *globalFlags, opts genericCLIOpts) *cobra.Command {
return newCmdRestoreBuilder(f, opts).cmdRestore()
}
type restoreSVCsFn func() (influxdb.RestoreService, error)
type cmdRestoreBuilder struct {
genericCLIOpts
*globalFlags
bucketID string
bucketName string
newBucketName string
newOrgName string
org organization
shardID uint64
path string
kvEntry *influxdb.ManifestKVEntry
shardEntries map[uint64]*influxdb.ManifestEntry
orgService *http.OrganizationService
bucketService *http.BucketService
restoreService *http.RestoreService
kvService *kv.Service
metaClient *meta.Client
logger *zap.Logger
}
func newCmdRestoreBuilder(f *globalFlags, opts genericCLIOpts) *cmdRestoreBuilder {
return &cmdRestoreBuilder{
genericCLIOpts: opts,
globalFlags: f,
shardEntries: make(map[uint64]*influxdb.ManifestEntry),
}
}
func (b *cmdRestoreBuilder) cmdRestore() *cobra.Command {
cmd := b.newCmd("restore", b.restoreRunE)
b.org.register(cmd, true)
cmd.Flags().StringVar(&b.bucketID, "bucket-id", "", "The ID of the bucket to restore")
cmd.Flags().StringVarP(&b.bucketName, "bucket", "b", "", "The name of the bucket to restore")
cmd.Flags().StringVar(&b.newBucketName, "new-bucket", "", "The name of the bucket to restore to")
cmd.Flags().StringVar(&b.newOrgName, "new-org", "", "The name of the organization to restore to")
cmd.Flags().Uint64Var(&b.shardID, "shard-id", 0, "The shard to restore")
cmd.Flags().StringVar(&b.path, "input", "", "Local backup data path (required)")
cmd.Use = "restore [flags] path"
cmd.Args = func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return fmt.Errorf("must specify path to backup directory")
} else if len(args) > 1 {
return fmt.Errorf("too many args specified")
}
b.path = args[0]
return nil
}
cmd.Short = "Restores a backup directory to InfluxDB."
cmd.Long = `
Restore influxdb.
Examples:
# restore all data
influx restore /path/to/restore
`
return cmd
}
func (b *cmdRestoreBuilder) restoreRunE(cmd *cobra.Command, args []string) (err error) {
ctx := context.Background()
// Create top level logger
logconf := influxlogger.NewConfig()
if b.logger, err = logconf.New(os.Stdout); err != nil {
return err
}
// Read in set of KV data & shard data to restore.
if err := b.loadIncremental(); err != nil {
return fmt.Errorf("restore failed while processing manifest files: %s", err.Error())
} else if b.kvEntry == nil {
return fmt.Errorf("No manifest files found in: %s\n", b.path)
}
ac := flags.config()
b.restoreService = &http.RestoreService{
Addr: ac.Host,
Token: ac.Token,
}
client, err := newHTTPClient()
if err != nil {
return err
}
b.orgService = &http.OrganizationService{Client: client}
b.bucketService = &http.BucketService{Client: client}
// Open bolt DB.
boltClient := bolt.NewClient(b.logger)
boltClient.Path = filepath.Join(b.path, b.kvEntry.FileName)
if err := boltClient.Open(ctx); err != nil {
return err
}
defer boltClient.Close()
// Open meta store so we can iterate over meta data.
kvStore := bolt.NewKVStore(b.logger, boltClient.Path)
kvStore.WithDB(boltClient.DB())
b.kvService = kv.NewService(b.logger, kvStore, kv.ServiceConfig{})
b.metaClient = meta.NewClient(meta.NewConfig(), kvStore)
if err := b.metaClient.Open(); err != nil {
return err
}
// Filter through organizations & buckets to restore appropriate shards.
if err := b.restoreOrganizations(ctx); err != nil {
return err
}
b.logger.Info("Restore complete")
return nil
}
func (b *cmdRestoreBuilder) restoreOrganizations(ctx context.Context) (err error) {
// Build a filter if org ID or org name were specified.
var filter influxdb.OrganizationFilter
if b.org.id != "" {
if filter.ID, err = influxdb.IDFromString(b.org.id); err != nil {
return err
}
} else if b.org.name != "" {
filter.Name = &b.org.name
}
// Retrieve a list of all matching organizations.
orgs, _, err := b.kvService.FindOrganizations(ctx, filter)
if err != nil {
return err
}
// Restore matching organizations.
for _, org := range orgs {
if err := b.restoreOrganization(ctx, org); err != nil {
return err
}
}
return nil
}
func (b *cmdRestoreBuilder) restoreOrganization(ctx context.Context, org *influxdb.Organization) (err error) {
b.logger.Info("Restoring organization", zap.String("id", org.ID.String()), zap.String("name", org.Name))
// Create organization on server, if it doesn't already exist.
if a, _, err := b.orgService.FindOrganizations(ctx, influxdb.OrganizationFilter{Name: &org.Name}); err != nil {
return fmt.Errorf("cannot find existing organization: %w", err)
} else if len(a) == 0 {
tmp := *org // copy so we don't lose our ID
if err := b.orgService.CreateOrganization(ctx, &tmp); err != nil {
return fmt.Errorf("cannot create organization: %w", err)
}
}
// Build a filter if bucket ID or bucket name were specified.
var filter influxdb.BucketFilter
filter.OrganizationID = &org.ID
if b.bucketID != "" {
if filter.ID, err = influxdb.IDFromString(b.bucketID); err != nil {
return err
}
} else if b.bucketName != "" {
filter.Name = &b.bucketName
}
// Retrieve a list of all buckets for the organization in the local backup.
buckets, _, err := b.kvService.FindBuckets(ctx, filter)
if err != nil {
return err
}
// Restore each matching bucket.
for _, bkt := range buckets {
// Skip internal buckets.
if strings.HasPrefix(bkt.Name, "_") {
continue
}
if err := b.restoreBucket(ctx, org, bkt); err != nil {
return err
}
}
return nil
}
func (b *cmdRestoreBuilder) restoreBucket(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket) (err error) {
b.logger.Info("Restoring bucket", zap.String("id", bkt.ID.String()), zap.String("name", bkt.Name))
// Create bucket on server.
newBucket := *bkt
if b.newBucketName != "" {
newBucket.Name = b.newBucketName
}
if err := b.bucketService.CreateBucket(ctx, &newBucket); err != nil {
return fmt.Errorf("cannot create bucket: %w", err)
}
// Lookup matching database from the meta store.
dbi := b.metaClient.Database(bkt.ID.String())
if dbi == nil {
return fmt.Errorf("bucket database not found: %s", bkt.ID.String())
}
// Serialize to protobufs.
buf, err := dbi.MarshalBinary()
if err != nil {
return fmt.Errorf("cannot marshal database info: %w", err)
}
shardIDMap, err := b.restoreService.RestoreBucket(ctx, newBucket.ID, buf)
if err != nil {
return fmt.Errorf("cannot restore bucket: %w", err)
}
// Restore each shard for the bucket.
for _, file := range b.shardEntries {
if bkt.ID.String() != file.BucketID {
continue
} else if b.shardID != 0 && b.shardID != file.ShardID {
continue
}
// Skip if shard metadata was not imported.
newID, ok := shardIDMap[file.ShardID]
if !ok {
b.logger.Warn("Meta info not found, skipping file", zap.Uint64("shard", file.ShardID), zap.String("bucket_id", file.BucketID), zap.String("filename", file.FileName))
return nil
}
if err := b.restoreShard(ctx, newID, file); err != nil {
return err
}
}
return nil
}
func (b *cmdRestoreBuilder) restoreShard(ctx context.Context, newShardID uint64, file *influxdb.ManifestEntry) error {
b.logger.Info("Restoring shard live from backup", zap.Uint64("shard", newShardID), zap.String("filename", file.FileName))
f, err := os.Open(filepath.Join(b.path, file.FileName))
if err != nil {
return err
}
defer f.Close()
gr, err := gzip.NewReader(f)
if err != nil {
return err
}
defer gr.Close()
return b.restoreService.RestoreShard(ctx, newShardID, gr)
}
// loadIncremental loads multiple manifest files from a given directory.
func (b *cmdRestoreBuilder) loadIncremental() error {
// Read all manifest files from path, sort in descending time.
manifests, err := filepath.Glob(filepath.Join(b.path, "*.manifest"))
if err != nil {
return err
} else if len(manifests) == 0 {
return nil
}
sort.Sort(sort.Reverse(sort.StringSlice(manifests)))
b.shardEntries = make(map[uint64]*influxdb.ManifestEntry)
for _, filename := range manifests {
// Skip file if it is a directory.
if fi, err := os.Stat(filename); err != nil {
return err
} else if fi.IsDir() {
continue
}
// Read manifest file for backup.
var manifest influxdb.Manifest
if buf, err := ioutil.ReadFile(filename); err != nil {
return err
} else if err := json.Unmarshal(buf, &manifest); err != nil {
return fmt.Errorf("read manifest: %v", err)
}
// Save latest KV entry.
if b.kvEntry == nil {
b.kvEntry = &manifest.KV
}
// Load most recent backup per shard.
for i := range manifest.Files {
sh := manifest.Files[i]
if _, err := os.Stat(filepath.Join(b.path, sh.FileName)); err != nil {
continue
}
entry := b.shardEntries[sh.ShardID]
if entry == nil || sh.LastModified.After(entry.LastModified) {
b.shardEntries[sh.ShardID] = &sh
}
}
}
return nil
}
func (b *cmdRestoreBuilder) newCmd(use string, runE func(*cobra.Command, []string) error) *cobra.Command {
cmd := b.genericCLIOpts.newCmd(use, runE, true)
b.genericCLIOpts.registerPrintOptions(cmd)
b.globalFlags.registerFlags(cmd)
return cmd
}

View File

@ -27,6 +27,7 @@ type Engine interface {
storage.EngineSchema
prom.PrometheusCollector
influxdb.BackupService
influxdb.RestoreService
SeriesCardinality(orgID, bucketID influxdb.ID) int64
@ -157,16 +158,20 @@ func (t *TemporaryEngine) Flush(ctx context.Context) {
}
}
func (t *TemporaryEngine) CreateBackup(ctx context.Context) (int, []string, error) {
return t.engine.CreateBackup(ctx)
func (t *TemporaryEngine) BackupKVStore(ctx context.Context, w io.Writer) error {
return t.engine.BackupKVStore(ctx, w)
}
func (t *TemporaryEngine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
return t.engine.FetchBackupFile(ctx, backupID, backupFile, w)
func (t *TemporaryEngine) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (map[uint64]uint64, error) {
return t.engine.RestoreBucket(ctx, id, dbi)
}
func (t *TemporaryEngine) InternalBackupPath(backupID int) string {
return t.engine.InternalBackupPath(backupID)
func (t *TemporaryEngine) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error {
return t.engine.BackupShard(ctx, w, shardID)
}
func (t *TemporaryEngine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error {
return t.engine.RestoreShard(ctx, shardID, r)
}
func (t *TemporaryEngine) TSDBStore() storage.TSDBStore {

View File

@ -854,9 +854,10 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.reg.MustRegister(m.engine.PrometheusCollectors()...)
var (
deleteService platform.DeleteService = m.engine
pointsWriter storage.PointsWriter = m.engine
backupService platform.BackupService = m.engine
deleteService platform.DeleteService = m.engine
pointsWriter storage.PointsWriter = m.engine
backupService platform.BackupService = m.engine
restoreService platform.RestoreService = m.engine
)
deps, err := influxdb.NewDependencies(
@ -1208,7 +1209,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
},
DeleteService: deleteService,
BackupService: backupService,
KVBackupService: m.kvService,
RestoreService: restoreService,
AuthorizationService: authSvc,
AuthorizerV1: authorizerV1,
AlgoWProxy: &http.NoopProxyHandler{},

View File

@ -1,304 +0,0 @@
package restore
import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/kit/cli"
"github.com/spf13/cobra"
)
var Command = &cobra.Command{
Use: "restore",
Short: "Restore data and metadata from a backup",
Long: `
This command restores data and metadata from a backup fileset.
Any existing metadata and data will be temporarily moved while restore runs
and deleted after restore completes.
Rebuilding the index and series file uses default options as in
"influxd inspect build-tsi" with the given target engine path.
For additional performance options, run restore with "-rebuild-index false"
and build-tsi afterwards.
NOTES:
* The influxd server should not be running when using the restore tool
as it replaces all data and metadata.
`,
Args: cobra.ExactArgs(0),
RunE: restoreE,
}
var flags struct {
boltPath string
enginePath string
credPath string
backupPath string
rebuildTSI bool
}
func init() {
dir, err := fs.InfluxDir()
if err != nil {
panic(fmt.Errorf("failed to determine influx directory: %s", err))
}
Command.Flags().SortFlags = false
pfs := Command.PersistentFlags()
pfs.SortFlags = false
opts := []cli.Opt{
{
DestP: &flags.boltPath,
Flag: "bolt-path",
Default: filepath.Join(dir, bolt.DefaultFilename),
Desc: "path to target boltdb database",
},
{
DestP: &flags.enginePath,
Flag: "engine-path",
Default: filepath.Join(dir, "engine"),
Desc: "path to target persistent engine files",
},
{
DestP: &flags.credPath,
Flag: "credentials-path",
Default: filepath.Join(dir, fs.DefaultTokenFile),
Desc: "path to target credentials file",
},
{
DestP: &flags.backupPath,
Flag: "backup-path",
Default: "",
Desc: "path to backup files",
},
{
DestP: &flags.rebuildTSI,
Flag: "rebuild-index",
Default: true,
Desc: "if true, rebuild the TSI index and series file based on the given engine path (equivalent to influxd inspect build-tsi)",
},
}
cli.BindOptions(Command, opts)
}
func restoreE(cmd *cobra.Command, args []string) error {
if flags.backupPath == "" {
return fmt.Errorf("no backup path given")
}
if err := moveBolt(); err != nil {
return fmt.Errorf("failed to move existing bolt file: %v", err)
}
if err := moveCredentials(); err != nil {
return fmt.Errorf("failed to move existing credentials file: %v", err)
}
if err := moveEngine(); err != nil {
return fmt.Errorf("failed to move existing engine data: %v", err)
}
if err := restoreBolt(); err != nil {
return fmt.Errorf("failed to restore bolt file: %v", err)
}
if err := restoreCred(); err != nil {
return fmt.Errorf("failed to restore credentials file: %v", err)
}
if err := restoreEngine(); err != nil {
return fmt.Errorf("failed to restore all TSM files: %v", err)
}
if flags.rebuildTSI {
// FIXME: Implement rebuildTSI
panic("not implemented")
//sFilePath := filepath.Join(flags.enginePath, storage.DefaultSeriesFileDirectoryName)
//indexPath := filepath.Join(flags.enginePath, storage.DefaultIndexDirectoryName)
//rebuild := inspect.NewBuildTSICommand()
//rebuild.SetArgs([]string{"--sfile-path", sFilePath, "--tsi-path", indexPath})
//rebuild.Execute()
}
if err := removeTmpBolt(); err != nil {
return fmt.Errorf("restore completed, but failed to cleanup temporary bolt file: %v", err)
}
if err := removeTmpCred(); err != nil {
return fmt.Errorf("restore completed, but failed to cleanup temporary credentials file: %v", err)
}
if err := removeTmpEngine(); err != nil {
return fmt.Errorf("restore completed, but failed to cleanup temporary engine data: %v", err)
}
return nil
}
func moveBolt() error {
if _, err := os.Stat(flags.boltPath); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
if err := removeTmpBolt(); err != nil {
return err
}
return os.Rename(flags.boltPath, flags.boltPath+".tmp")
}
func moveCredentials() error {
if _, err := os.Stat(flags.credPath); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
if err := removeTmpCred(); err != nil {
return err
}
return os.Rename(flags.credPath, flags.credPath+".tmp")
}
func moveEngine() error {
if _, err := os.Stat(flags.enginePath); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
if err := removeTmpEngine(); err != nil {
return err
}
if err := os.Rename(flags.enginePath, tmpEnginePath()); err != nil {
return err
}
return os.MkdirAll(flags.enginePath, 0777)
}
func tmpEnginePath() string {
return filepath.Dir(flags.enginePath) + "tmp"
}
func removeTmpBolt() error {
return removeIfExists(flags.boltPath + ".tmp")
}
func removeTmpEngine() error {
return removeIfExists(tmpEnginePath())
}
func removeTmpCred() error {
return removeIfExists(flags.credPath + ".tmp")
}
func removeIfExists(path string) error {
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
} else {
return os.RemoveAll(path)
}
}
func restoreBolt() error {
backupBolt := filepath.Join(flags.backupPath, bolt.DefaultFilename)
if err := restoreFile(backupBolt, flags.boltPath, "bolt"); err != nil {
return err
}
fmt.Printf("Restored Bolt to %s from %s\n", flags.boltPath, backupBolt)
return nil
}
func restoreEngine() error {
dataDir := filepath.Join(flags.enginePath, "/data")
if err := os.MkdirAll(dataDir, 0777); err != nil {
return err
}
count := 0
err := filepath.Walk(flags.backupPath, func(path string, info os.FileInfo, err error) error {
if strings.Contains(path, ".tsm") {
f, err := os.OpenFile(path, os.O_RDONLY, 0666)
if err != nil {
return fmt.Errorf("error opening TSM file: %v", err)
}
defer f.Close()
tsmPath := filepath.Join(dataDir, filepath.Base(path))
w, err := os.OpenFile(tsmPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return err
}
defer w.Close()
_, err = io.Copy(w, f)
if err != nil {
return err
}
count++
return nil
}
return nil
})
fmt.Printf("Restored %d TSM files to %v\n", count, dataDir)
return err
}
func restoreFile(backup string, target string, filetype string) error {
f, err := os.Open(backup)
if err != nil {
return fmt.Errorf("no %s file in backup: %v", filetype, err)
}
defer f.Close()
if err := os.MkdirAll(filepath.Dir(target), 0777); err != nil {
return err
}
w, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil && !os.IsNotExist(err) {
return err
}
defer w.Close()
_, err = io.Copy(w, f)
return err
}
func restoreCred() error {
backupCred := filepath.Join(flags.backupPath, fs.DefaultTokenFile)
_, err := os.Stat(backupCred)
if os.IsNotExist(err) {
fmt.Printf("No credentials file found in backup, skipping.\n")
return nil
} else if err != nil {
return err
}
if err := restoreFile(backupCred, flags.credPath, "credentials"); err != nil {
return err
}
fmt.Printf("Restored credentials to %s from %s\n", flags.credPath, backupCred)
return nil
}

View File

@ -58,7 +58,7 @@ type APIBackend struct {
PointsWriter storage.PointsWriter
DeleteService influxdb.DeleteService
BackupService influxdb.BackupService
KVBackupService influxdb.KVBackupService
RestoreService influxdb.RestoreService
AuthorizationService influxdb.AuthorizationService
AuthorizerV1 influxdb.AuthorizerV1
OnboardingService influxdb.OnboardingService
@ -193,6 +193,10 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler {
backupBackend.BackupService = authorizer.NewBackupService(backupBackend.BackupService)
h.Mount(prefixBackup, NewBackupHandler(backupBackend))
restoreBackend := NewRestoreBackend(b)
restoreBackend.RestoreService = authorizer.NewRestoreService(restoreBackend.RestoreService)
h.Mount(prefixRestore, NewRestoreHandler(restoreBackend))
h.Mount(dbrp.PrefixDBRP, dbrp.NewHTTPHandler(b.Logger, b.DBRPService, b.OrganizationService))
writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b)
@ -234,6 +238,7 @@ var apiLinks = map[string]interface{}{
"analyze": "/api/v2/query/analyze",
"suggestions": "/api/v2/query/suggestions",
},
"restore": "/api/v2/restore",
"setup": "/api/v2/setup",
"signin": "/api/v2/signin",
"signout": "/api/v2/signout",

View File

@ -2,23 +2,18 @@ package http
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"time"
"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
// "github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/kit/tracing"
"go.uber.org/multierr"
"go.uber.org/zap"
)
@ -27,8 +22,7 @@ type BackupBackend struct {
Logger *zap.Logger
influxdb.HTTPErrorHandler
BackupService influxdb.BackupService
KVBackupService influxdb.KVBackupService
BackupService influxdb.BackupService
}
// NewBackupBackend returns a new instance of BackupBackend.
@ -38,7 +32,6 @@ func NewBackupBackend(b *APIBackend) *BackupBackend {
HTTPErrorHandler: b.HTTPErrorHandler,
BackupService: b.BackupService,
KVBackupService: b.KVBackupService,
}
}
@ -48,23 +41,17 @@ type BackupHandler struct {
influxdb.HTTPErrorHandler
Logger *zap.Logger
BackupService influxdb.BackupService
KVBackupService influxdb.KVBackupService
BackupService influxdb.BackupService
}
const (
prefixBackup = "/api/v2/backup"
backupIDParamName = "backup_id"
backupFileParamName = "backup_file"
backupFilePath = prefixBackup + "/:" + backupIDParamName + "/file/:" + backupFileParamName
prefixBackup = "/api/v2/backup"
backupKVStorePath = prefixBackup + "/kv"
backupShardPath = prefixBackup + "/shards/:shardID"
httpClientTimeout = time.Hour
)
func composeBackupFilePath(backupID int, backupFile string) string {
return path.Join(prefixBackup, fmt.Sprint(backupID), "file", fmt.Sprint(backupFile))
}
// NewBackupHandler creates a new handler at /api/v2/backup to receive backup requests.
func NewBackupHandler(b *BackupBackend) *BackupHandler {
h := &BackupHandler{
@ -72,107 +59,40 @@ func NewBackupHandler(b *BackupBackend) *BackupHandler {
Router: NewRouter(b.HTTPErrorHandler),
Logger: b.Logger,
BackupService: b.BackupService,
KVBackupService: b.KVBackupService,
}
h.HandlerFunc(http.MethodPost, prefixBackup, h.handleCreate)
h.HandlerFunc(http.MethodGet, backupFilePath, h.handleFetchFile)
h.HandlerFunc(http.MethodGet, backupKVStorePath, h.handleBackupKVStore)
h.HandlerFunc(http.MethodGet, backupShardPath, h.handleBackupShard)
return h
}
type backup struct {
ID int `json:"id,omitempty"`
Files []string `json:"files,omitempty"`
}
func (h *BackupHandler) handleCreate(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleCreate")
func (h *BackupHandler) handleBackupKVStore(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleBackupKVStore")
defer span.Finish()
ctx := r.Context()
id, files, err := h.BackupService.CreateBackup(ctx)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
internalBackupPath := h.BackupService.InternalBackupPath(id)
boltPath := filepath.Join(internalBackupPath, bolt.DefaultFilename)
boltFile, err := os.OpenFile(boltPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0660)
if err != nil {
err = multierr.Append(err, os.RemoveAll(internalBackupPath))
h.HandleHTTPError(ctx, err, w)
return
}
if err = h.KVBackupService.Backup(ctx, boltFile); err != nil {
err = multierr.Append(err, os.RemoveAll(internalBackupPath))
h.HandleHTTPError(ctx, err, w)
return
}
files = append(files, bolt.DefaultFilename)
credsExist, err := h.backupCredentials(internalBackupPath)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if credsExist {
files = append(files, fs.DefaultConfigsFile)
}
b := backup{
ID: id,
Files: files,
}
if err = json.NewEncoder(w).Encode(&b); err != nil {
err = multierr.Append(err, os.RemoveAll(internalBackupPath))
if err := h.BackupService.BackupKVStore(ctx, w); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}
func (h *BackupHandler) backupCredentials(internalBackupPath string) (bool, error) {
credBackupPath := filepath.Join(internalBackupPath, fs.DefaultConfigsFile)
credPath, err := defaultConfigsPath()
if err != nil {
return false, err
}
token, err := ioutil.ReadFile(credPath)
if err != nil && !os.IsNotExist(err) {
return false, err
} else if os.IsNotExist(err) {
return false, nil
}
if err := ioutil.WriteFile(credBackupPath, []byte(token), 0600); err != nil {
return false, err
}
return true, nil
}
func (h *BackupHandler) handleFetchFile(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleFetchFile")
func (h *BackupHandler) handleBackupShard(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleBackupShard")
defer span.Finish()
ctx := r.Context()
params := httprouter.ParamsFromContext(ctx)
backupID, err := strconv.Atoi(params.ByName("backup_id"))
shardID, err := strconv.ParseUint(params.ByName("shardID"), 10, 64)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
backupFile := params.ByName("backup_file")
if err = h.BackupService.FetchBackupFile(ctx, backupID, backupFile, w); err != nil {
if err := h.BackupService.BackupShard(ctx, w, shardID); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
@ -185,47 +105,11 @@ type BackupService struct {
InsecureSkipVerify bool
}
func (s *BackupService) CreateBackup(ctx context.Context) (int, []string, error) {
func (s *BackupService) BackupKVStore(ctx context.Context, w io.Writer) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(s.Addr, prefixBackup)
if err != nil {
return 0, nil, err
}
req, err := http.NewRequest(http.MethodPost, u.String(), nil)
if err != nil {
return 0, nil, err
}
SetToken(s.Token, req)
req = req.WithContext(ctx)
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
hc.Timeout = httpClientTimeout
resp, err := hc.Do(req)
if err != nil {
return 0, nil, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return 0, nil, err
}
var b backup
if err = json.NewDecoder(resp.Body).Decode(&b); err != nil {
return 0, nil, err
}
return b.ID, b.Files, nil
}
func (s *BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(s.Addr, composeBackupFilePath(backupID, backupFile))
u, err := NewURL(s.Addr, prefixBackup+"/kv")
if err != nil {
return err
}
@ -249,12 +133,44 @@ func (s *BackupService) FetchBackupFile(ctx context.Context, backupID int, backu
return err
}
_, err = io.Copy(w, resp.Body)
if _, err := io.Copy(w, resp.Body); err != nil {
return err
}
return resp.Body.Close()
}
func (s *BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(s.Addr, fmt.Sprintf(prefixBackup+"/shards/%d", shardID))
if err != nil {
return err
}
return nil
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return err
}
SetToken(s.Token, req)
req = req.WithContext(ctx)
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
hc.Timeout = httpClientTimeout
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return err
}
if _, err := io.Copy(w, resp.Body); err != nil {
return err
}
return resp.Body.Close()
}
func defaultConfigsPath() (string, error) {
@ -264,7 +180,3 @@ func defaultConfigsPath() (string, error) {
}
return filepath.Join(dir, fs.DefaultConfigsFile), nil
}
func (s *BackupService) InternalBackupPath(backupID int) string {
panic("internal method not implemented here")
}

189
http/restore_service.go Normal file
View File

@ -0,0 +1,189 @@
package http
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"go.uber.org/zap"
)
// RestoreBackend is all services and associated parameters required to construct the RestoreHandler.
type RestoreBackend struct {
Logger *zap.Logger
influxdb.HTTPErrorHandler
RestoreService influxdb.RestoreService
}
// NewRestoreBackend returns a new instance of RestoreBackend.
func NewRestoreBackend(b *APIBackend) *RestoreBackend {
return &RestoreBackend{
Logger: b.Logger.With(zap.String("handler", "restore")),
HTTPErrorHandler: b.HTTPErrorHandler,
RestoreService: b.RestoreService,
}
}
// RestoreHandler is http handler for restore service.
type RestoreHandler struct {
*httprouter.Router
influxdb.HTTPErrorHandler
Logger *zap.Logger
RestoreService influxdb.RestoreService
}
const (
prefixRestore = "/api/v2/restore"
restoreBucketPath = prefixRestore + "/buckets/:bucketID"
restoreShardPath = prefixRestore + "/shards/:shardID"
)
// NewRestoreHandler creates a new handler at /api/v2/restore to receive restore requests.
func NewRestoreHandler(b *RestoreBackend) *RestoreHandler {
h := &RestoreHandler{
HTTPErrorHandler: b.HTTPErrorHandler,
Router: NewRouter(b.HTTPErrorHandler),
Logger: b.Logger,
RestoreService: b.RestoreService,
}
h.HandlerFunc(http.MethodPost, restoreBucketPath, h.handleRestoreBucket)
h.HandlerFunc(http.MethodPost, restoreShardPath, h.handleRestoreShard)
return h
}
func (h *RestoreHandler) handleRestoreBucket(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreBucket")
defer span.Finish()
ctx := r.Context()
// Read bucket ID.
bucketID, err := decodeIDFromCtx(r.Context(), "bucketID")
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
// Read serialized DBI data.
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
shardIDMap, err := h.RestoreService.RestoreBucket(ctx, bucketID, buf)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err := json.NewEncoder(w).Encode(shardIDMap); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}
func (h *RestoreHandler) handleRestoreShard(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreShard")
defer span.Finish()
ctx := r.Context()
params := httprouter.ParamsFromContext(ctx)
shardID, err := strconv.ParseUint(params.ByName("shardID"), 10, 64)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err := h.RestoreService.RestoreShard(ctx, shardID, r.Body); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}
// RestoreService is the client implementation of influxdb.RestoreService.
type RestoreService struct {
Addr string
Token string
InsecureSkipVerify bool
}
func (s *RestoreService) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (map[uint64]uint64, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(s.Addr, prefixRestore+fmt.Sprintf("/buckets/%s", id.String()))
if err != nil {
return nil, err
}
req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewReader(dbi))
if err != nil {
return nil, err
}
SetToken(s.Token, req)
req = req.WithContext(ctx)
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
hc.Timeout = httpClientTimeout
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return nil, err
}
shardIDMap := make(map[uint64]uint64)
if err := json.NewDecoder(resp.Body).Decode(&shardIDMap); err != nil {
return nil, err
}
return shardIDMap, nil
}
func (s *RestoreService) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(s.Addr, fmt.Sprintf(prefixRestore+"/shards/%d", shardID))
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, u.String(), r)
if err != nil {
return err
}
SetToken(s.Token, req)
req = req.WithContext(ctx)
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
hc.Timeout = httpClientTimeout
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return err
}
return nil
}

View File

@ -33,6 +33,7 @@ func NewBucketService(log *zap.Logger, s influxdb.BucketService, engine EngineSc
BucketService: s,
log: log,
engine: engine,
log: logger,
}
}

View File

@ -79,6 +79,9 @@ type MetaClient interface {
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
Backup(ctx context.Context, w io.Writer) error
Data() meta.Data
SetData(data *meta.Data) error
}
type TSDBStore interface {
@ -306,41 +309,105 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID
return ErrNotImplemented
}
// CreateBackup creates a "snapshot" of all TSM data in the Engine.
// 1) Snapshot the cache to ensure the backup includes all data written before now.
// 2) Create hard links to all TSM files, in a new directory within the engine root directory.
// 3) Return a unique backup ID (invalid after the process terminates) and list of files.
//
// TODO - do we need this?
//
func (e *Engine) CreateBackup(ctx context.Context) (int, []string, error) {
func (e *Engine) BackupKVStore(ctx context.Context, w io.Writer) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if e.closing == nil {
return 0, nil, ErrEngineClosed
}
return 0, nil, nil
}
// FetchBackupFile writes a given backup file to the provided writer.
// After a successful write, the internal copy is removed.
func (e *Engine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
// TODO - need?
return nil
}
// InternalBackupPath provides the internal, full path directory name of the backup.
// This should not be exposed via API.
func (e *Engine) InternalBackupPath(backupID int) string {
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return ""
return ErrEngineClosed
}
// TODO - need?
return ""
return e.metaClient.Backup(ctx, w)
}
func (e *Engine) BackupShard(ctx context.Context, w io.Writer, shardID uint64) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return ErrEngineClosed
}
return e.tsdbStore.BackupShard(shardID, time.Time{}, w)
}
func (e *Engine) RestoreBucket(ctx context.Context, id influxdb.ID, buf []byte) (map[uint64]uint64, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return nil, ErrEngineClosed
}
var newDBI meta.DatabaseInfo
if err := newDBI.UnmarshalBinary(buf); err != nil {
return nil, err
}
data := e.metaClient.Data()
dbi := data.Database(id.String())
if dbi == nil {
return nil, fmt.Errorf("bucket dbi for %q not found during restore", newDBI.Name)
} else if len(newDBI.RetentionPolicies) != 1 {
return nil, fmt.Errorf("bucket must have 1 retention policy; attempting to restore %d retention policies", len(newDBI.RetentionPolicies))
}
dbi.RetentionPolicies = newDBI.RetentionPolicies
dbi.ContinuousQueries = newDBI.ContinuousQueries
// Generate shard ID mapping.
shardIDMap := make(map[uint64]uint64)
rpi := newDBI.RetentionPolicies[0]
for j, sgi := range rpi.ShardGroups {
data.MaxShardGroupID++
rpi.ShardGroups[j].ID = data.MaxShardGroupID
for k := range sgi.Shards {
data.MaxShardID++
shardIDMap[sgi.Shards[k].ID] = data.MaxShardID
sgi.Shards[k].ID = data.MaxShardID
sgi.Shards[k].Owners = []meta.ShardOwner{}
}
}
// Update data.
if err := e.metaClient.SetData(&data); err != nil {
return nil, err
}
// Create shards.
for _, sgi := range rpi.ShardGroups {
for _, sh := range sgi.Shards {
if err := e.tsdbStore.CreateShard(dbi.Name, rpi.Name, sh.ID, true); err != nil {
return nil, err
}
}
}
return shardIDMap, nil
}
func (e *Engine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return ErrEngineClosed
}
return e.tsdbStore.RestoreShard(shardID, r)
}
// SeriesCardinality returns the number of series in the engine.

View File

@ -1199,15 +1199,7 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as
return "", nil
}
nativeFileName := filepath.FromSlash(hdr.Name)
// Skip file if it does not have a matching prefix.
if !strings.HasPrefix(nativeFileName, shardRelativePath) {
return "", nil
}
filename, err := filepath.Rel(shardRelativePath, nativeFileName)
if err != nil {
return "", err
}
filename := filepath.Base(filepath.FromSlash(hdr.Name))
// If this is a directory entry (usually just `index` for tsi), create it an move on.
if hdr.Typeflag == tar.TypeDir {

View File

@ -1036,6 +1036,10 @@ func (c *Client) Load() error {
})
}
func (c *Client) Backup(ctx context.Context, w io.Writer) error {
return c.store.Backup(ctx, w)
}
type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }

View File

@ -994,6 +994,21 @@ func (di DatabaseInfo) clone() DatabaseInfo {
return other
}
// MarshalBinary encodes dbi to a binary format.
func (dbi *DatabaseInfo) MarshalBinary() ([]byte, error) {
return proto.Marshal(dbi.marshal())
}
// UnmarshalBinary decodes dbi from a binary format.
func (dbi *DatabaseInfo) UnmarshalBinary(data []byte) error {
var pb internal.DatabaseInfo
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
dbi.unmarshal(&pb)
return nil
}
// marshal serializes to a protobuf representation.
func (di DatabaseInfo) marshal() *internal.DatabaseInfo {
pb := &internal.DatabaseInfo{}