Merge pull request #19864 from influxdata/backup-restore

pull/19913/head
Ben Johnson 2020-11-05 15:39:04 -07:00 committed by GitHub
commit a849bfdef3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1421 additions and 591 deletions

View File

@ -3,6 +3,7 @@ package authorizer
import ( import (
"context" "context"
"io" "io"
"time"
"github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing" "github.com/influxdata/influxdb/v2/kit/tracing"
@ -23,26 +24,22 @@ func NewBackupService(s influxdb.BackupService) *BackupService {
} }
} }
func (b BackupService) CreateBackup(ctx context.Context) (int, []string, error) { func (b BackupService) BackupKVStore(ctx context.Context, w io.Writer) error {
span, ctx := tracing.StartSpanFromContext(ctx) span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish() defer span.Finish()
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil { if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil {
return 0, nil, err
}
return b.s.CreateBackup(ctx)
}
func (b BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if err := IsAllowedAll(ctx, influxdb.ReadAllPermissions()); err != nil {
return err return err
} }
return b.s.FetchBackupFile(ctx, backupID, backupFile, w) return b.s.BackupKVStore(ctx, w)
} }
func (b BackupService) InternalBackupPath(backupID int) string { func (b BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uint64, since time.Time) error {
return b.s.InternalBackupPath(backupID) span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil {
return err
}
return b.s.BackupShard(ctx, w, shardID, since)
} }

54
authorizer/restore.go Normal file
View File

@ -0,0 +1,54 @@
package authorizer
import (
"context"
"io"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
)
var _ influxdb.RestoreService = (*RestoreService)(nil)
// RestoreService wraps a influxdb.RestoreService and authorizes actions
// against it appropriately.
type RestoreService struct {
s influxdb.RestoreService
}
// NewRestoreService constructs an instance of an authorizing restore service.
func NewRestoreService(s influxdb.RestoreService) *RestoreService {
return &RestoreService{
s: s,
}
}
func (b RestoreService) RestoreKVStore(ctx context.Context, r io.Reader) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil {
return err
}
return b.s.RestoreKVStore(ctx, r)
}
func (b RestoreService) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (shardIDMap map[uint64]uint64, err error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil {
return nil, err
}
return b.s.RestoreBucket(ctx, id, dbi)
}
func (b RestoreService) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil {
return err
}
return b.s.RestoreShard(ctx, shardID, r)
}

View File

@ -3,21 +3,67 @@ package influxdb
import ( import (
"context" "context"
"io" "io"
"time"
)
const (
BackupFilenamePattern = "20060102T150405Z"
) )
// BackupService represents the data backup functions of InfluxDB. // BackupService represents the data backup functions of InfluxDB.
type BackupService interface { type BackupService interface {
// CreateBackup creates a local copy (hard links) of the TSM data for all orgs and buckets. // BackupKVStore creates a live backup copy of the metadata database.
// The return values are used to download each backup file. BackupKVStore(ctx context.Context, w io.Writer) error
CreateBackup(context.Context) (backupID int, backupFiles []string, err error)
// FetchBackupFile downloads one backup file, data or metadata. // BackupShard downloads a backup file for a single shard.
FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error BackupShard(ctx context.Context, w io.Writer, shardID uint64, since time.Time) error
// InternalBackupPath is a utility to determine the on-disk location of a backup fileset.
InternalBackupPath(backupID int) string
} }
// KVBackupService represents the meta data backup functions of InfluxDB. // RestoreService represents the data restore functions of InfluxDB.
type KVBackupService interface { type RestoreService interface {
// Backup creates a live backup copy of the metadata database. // RestoreKVStore restores & replaces metadata database.
Backup(ctx context.Context, w io.Writer) error RestoreKVStore(ctx context.Context, r io.Reader) error
// RestoreKVStore restores the metadata database.
RestoreBucket(ctx context.Context, id ID, rpiData []byte) (shardIDMap map[uint64]uint64, err error)
// RestoreShard uploads a backup file for a single shard.
RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error
}
// Manifest lists the KV and shard file information contained in the backup.
type Manifest struct {
KV ManifestKVEntry `json:"kv"`
Files []ManifestEntry `json:"files"`
// These fields are only set if filtering options are set on the CLI.
OrganizationID string `json:"organizationID,omitempty"`
BucketID string `json:"bucketID,omitempty"`
}
// ManifestEntry contains the data information for a backed up shard.
type ManifestEntry struct {
OrganizationID string `json:"organizationID"`
OrganizationName string `json:"organizationName"`
BucketID string `json:"bucketID"`
BucketName string `json:"bucketName"`
ShardID uint64 `json:"shardID"`
FileName string `json:"fileName"`
Size int64 `json:"size"`
LastModified time.Time `json:"lastModified"`
}
// ManifestKVEntry contains the KV store information for a backup.
type ManifestKVEntry struct {
FileName string `json:"fileName"`
Size int64 `json:"size"`
}
// Size returns the size of the manifest.
func (m *Manifest) Size() int64 {
n := m.KV.Size
for _, f := range m.Files {
n += f.Size
}
return n
} }

View File

@ -8,10 +8,12 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"time" "time"
"github.com/influxdata/influxdb/v2/kit/tracing" "github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/kv" "github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/pkg/fs"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -22,6 +24,7 @@ var _ kv.SchemaStore = (*KVStore)(nil)
// KVStore is a kv.Store backed by boltdb. // KVStore is a kv.Store backed by boltdb.
type KVStore struct { type KVStore struct {
path string path string
mu sync.RWMutex
db *bolt.DB db *bolt.DB
log *zap.Logger log *zap.Logger
@ -53,6 +56,11 @@ func NewKVStore(log *zap.Logger, path string, opts ...KVOption) *KVStore {
return store return store
} }
// tempPath returns the path to the temporary file used by Restore().
func (s *KVStore) tempPath() string {
return s.path + ".tmp"
}
// Open creates boltDB file it doesn't exists and opens it otherwise. // Open creates boltDB file it doesn't exists and opens it otherwise.
func (s *KVStore) Open(ctx context.Context) error { func (s *KVStore) Open(ctx context.Context) error {
span, _ := tracing.StartSpanFromContext(ctx) span, _ := tracing.StartSpanFromContext(ctx)
@ -67,30 +75,46 @@ func (s *KVStore) Open(ctx context.Context) error {
return err return err
} }
// Remove any temporary file created during a failed restore.
if err := os.Remove(s.tempPath()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("unable to remove boltdb partial restore file: %w", err)
}
// Open database file. // Open database file.
db, err := bolt.Open(s.path, 0600, &bolt.Options{Timeout: 1 * time.Second}) if err := s.openDB(); err != nil {
if err != nil {
return fmt.Errorf("unable to open boltdb file %v", err) return fmt.Errorf("unable to open boltdb file %v", err)
} }
s.db = db
db.NoSync = s.noSync
s.log.Info("Resources opened", zap.String("path", s.path)) s.log.Info("Resources opened", zap.String("path", s.path))
return nil return nil
} }
func (s *KVStore) openDB() (err error) {
if s.db, err = bolt.Open(s.path, 0600, &bolt.Options{Timeout: 1 * time.Second}); err != nil {
return fmt.Errorf("unable to open boltdb file %v", err)
}
s.db.NoSync = s.noSync
return nil
}
// Close the connection to the bolt database // Close the connection to the bolt database
func (s *KVStore) Close() error { func (s *KVStore) Close() error {
if s.db != nil { if db := s.DB(); db != nil {
return s.db.Close() return db.Close()
} }
return nil return nil
} }
// DB returns a reference to the current Bolt database.
func (s *KVStore) DB() *bolt.DB {
s.mu.RLock()
defer s.mu.RUnlock()
return s.db
}
// Flush removes all bolt keys within each bucket. // Flush removes all bolt keys within each bucket.
func (s *KVStore) Flush(ctx context.Context) { func (s *KVStore) Flush(ctx context.Context) {
_ = s.db.Update( _ = s.DB().Update(
func(tx *bolt.Tx) error { func(tx *bolt.Tx) error {
return tx.ForEach(func(name []byte, b *bolt.Bucket) error { return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
s.cleanBucket(tx, b) s.cleanBucket(tx, b)
@ -117,6 +141,8 @@ func (s *KVStore) cleanBucket(tx *bolt.Tx, b *bolt.Bucket) {
// WithDB sets the boltdb on the store. // WithDB sets the boltdb on the store.
func (s *KVStore) WithDB(db *bolt.DB) { func (s *KVStore) WithDB(db *bolt.DB) {
s.mu.Lock()
defer s.mu.Unlock()
s.db = db s.db = db
} }
@ -125,7 +151,7 @@ func (s *KVStore) View(ctx context.Context, fn func(tx kv.Tx) error) error {
span, ctx := tracing.StartSpanFromContext(ctx) span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish() defer span.Finish()
return s.db.View(func(tx *bolt.Tx) error { return s.DB().View(func(tx *bolt.Tx) error {
return fn(&Tx{ return fn(&Tx{
tx: tx, tx: tx,
ctx: ctx, ctx: ctx,
@ -138,7 +164,7 @@ func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error {
span, ctx := tracing.StartSpanFromContext(ctx) span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish() defer span.Finish()
return s.db.Update(func(tx *bolt.Tx) error { return s.DB().Update(func(tx *bolt.Tx) error {
return fn(&Tx{ return fn(&Tx{
tx: tx, tx: tx,
ctx: ctx, ctx: ctx,
@ -149,7 +175,7 @@ func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error {
// CreateBucket creates a bucket in the underlying boltdb store if it // CreateBucket creates a bucket in the underlying boltdb store if it
// does not already exist // does not already exist
func (s *KVStore) CreateBucket(ctx context.Context, name []byte) error { func (s *KVStore) CreateBucket(ctx context.Context, name []byte) error {
return s.db.Update(func(tx *bolt.Tx) error { return s.DB().Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(name) _, err := tx.CreateBucketIfNotExists(name)
return err return err
}) })
@ -158,7 +184,7 @@ func (s *KVStore) CreateBucket(ctx context.Context, name []byte) error {
// DeleteBucket creates a bucket in the underlying boltdb store if it // DeleteBucket creates a bucket in the underlying boltdb store if it
// does not already exist // does not already exist
func (s *KVStore) DeleteBucket(ctx context.Context, name []byte) error { func (s *KVStore) DeleteBucket(ctx context.Context, name []byte) error {
return s.db.Update(func(tx *bolt.Tx) error { return s.DB().Update(func(tx *bolt.Tx) error {
if err := tx.DeleteBucket(name); err != nil && !errors.Is(err, bolt.ErrBucketNotFound) { if err := tx.DeleteBucket(name); err != nil && !errors.Is(err, bolt.ErrBucketNotFound) {
return err return err
} }
@ -172,12 +198,51 @@ func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
span, _ := tracing.StartSpanFromContext(ctx) span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish() defer span.Finish()
return s.db.View(func(tx *bolt.Tx) error { return s.DB().View(func(tx *bolt.Tx) error {
_, err := tx.WriteTo(w) _, err := tx.WriteTo(w)
return err return err
}) })
} }
// Restore replaces the underlying database with the data from r.
func (s *KVStore) Restore(ctx context.Context, r io.Reader) error {
if err := func() error {
f, err := os.Create(s.tempPath())
if err != nil {
return err
}
defer f.Close()
if _, err := io.Copy(f, r); err != nil {
return err
} else if err := f.Sync(); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}
// Swap and reopen under lock.
s.mu.Lock()
defer s.mu.Unlock()
if err := s.db.Close(); err != nil {
return err
}
// Atomically swap temporary file with current DB file.
if err := fs.RenameFileWithReplacement(s.tempPath(), s.path); err != nil {
return err
}
// Reopen with new database file.
return s.openDB()
}(); err != nil {
os.Remove(s.tempPath()) // clean up on error
return err
}
return nil
}
// Tx is a light wrapper around a boltdb transaction. It implements kv.Tx. // Tx is a light wrapper around a boltdb transaction. It implements kv.Tx.
type Tx struct { type Tx struct {
tx *bolt.Tx tx *bolt.Tx

View File

@ -39,6 +39,12 @@ type Bucket struct {
CRUDLog CRUDLog
} }
// Clone returns a shallow copy of b.
func (b *Bucket) Clone() *Bucket {
other := *b
return &other
}
// BucketType differentiates system buckets from user buckets. // BucketType differentiates system buckets from user buckets.
type BucketType int type BucketType int

View File

@ -1,97 +1,335 @@
package main package main
import ( import (
"compress/gzip"
"context" "context"
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"time"
"github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt" "github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/http" "github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/kv"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"go.uber.org/multierr" "go.uber.org/zap"
) )
func cmdBackup(f *globalFlags, opt genericCLIOpts) *cobra.Command { func cmdBackup(f *globalFlags, opts genericCLIOpts) *cobra.Command {
cmd := opt.newCmd("backup", backupF, false) return newCmdBackupBuilder(f, opts).cmdBackup()
cmd.Short = "Backup the data in InfluxDB"
cmd.Long = fmt.Sprintf(
`Backs up data and meta data for the running InfluxDB instance.
Downloaded files are written to the directory indicated by --path.
The target directory, and any parent directories, are created automatically.
Data file have extension .tsm; meta data is written to %s in the same directory.`,
bolt.DefaultFilename)
f.registerFlags(cmd)
opts := flagOpts{
{
DestP: &backupFlags.Path,
Flag: "path",
Short: 'p',
EnvVar: "PATH",
Desc: "directory path to write backup files to",
Required: true,
},
} }
opts.mustRegister(cmd)
type cmdBackupBuilder struct {
genericCLIOpts
*globalFlags
bucketID string
bucketName string
org organization
path string
manifest influxdb.Manifest
baseName string
backupService *http.BackupService
kvStore *bolt.KVStore
kvService *kv.Service
metaClient *meta.Client
logger *zap.Logger
}
func newCmdBackupBuilder(f *globalFlags, opts genericCLIOpts) *cmdBackupBuilder {
return &cmdBackupBuilder{
genericCLIOpts: opts,
globalFlags: f,
}
}
func (b *cmdBackupBuilder) cmdBackup() *cobra.Command {
cmd := b.newCmd("backup", b.backupRunE)
b.org.register(cmd, true)
cmd.Flags().StringVar(&b.bucketID, "bucket-id", "", "The ID of the bucket to backup")
cmd.Flags().StringVarP(&b.bucketName, "bucket", "b", "", "The name of the bucket to backup")
cmd.Use = "backup [flags] path"
cmd.Args = func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return fmt.Errorf("must specify output path")
} else if len(args) > 1 {
return fmt.Errorf("too many args specified")
}
b.path = args[0]
return nil
}
cmd.Short = "Backup database"
cmd.Long = `
Backs up InfluxDB to a directory.
Examples:
# backup all data
influx backup /path/to/backup
`
return cmd return cmd
} }
var backupFlags struct { func (b *cmdBackupBuilder) manifestPath() string {
Path string return fmt.Sprintf("%s.manifest", b.baseName)
} }
func newBackupService() (influxdb.BackupService, error) { func (b *cmdBackupBuilder) kvPath() string {
ac := flags.config() return fmt.Sprintf("%s.bolt", b.baseName)
return &http.BackupService{
Addr: ac.Host,
Token: ac.Token,
}, nil
} }
func backupF(cmd *cobra.Command, args []string) error { func (b *cmdBackupBuilder) shardPath(id uint64) string {
return fmt.Sprintf("%s.s%d", b.baseName, id) + ".tar.gz"
}
func (b *cmdBackupBuilder) backupRunE(cmd *cobra.Command, args []string) (err error) {
ctx := context.Background() ctx := context.Background()
if backupFlags.Path == "" { // Create top level logger
return fmt.Errorf("must specify path") logconf := influxlogger.NewConfig()
} if b.logger, err = logconf.New(os.Stdout); err != nil {
err := os.MkdirAll(backupFlags.Path, 0777)
if err != nil && !os.IsExist(err) {
return err return err
} }
backupService, err := newBackupService() // Determine a base
if err != nil { b.baseName = time.Now().UTC().Format(influxdb.BackupFilenamePattern)
// Ensure directory exsits.
if err := os.MkdirAll(b.path, 0777); err != nil {
return err return err
} }
id, backupFilenames, err := backupService.CreateBackup(ctx) ac := flags.config()
if err != nil { b.backupService = &http.BackupService{
Addr: ac.Host,
Token: ac.Token,
}
// Back up Bolt database to file.
if err := b.backupKVStore(ctx); err != nil {
return err return err
} }
fmt.Printf("Backup ID %d contains %d files\n", id, len(backupFilenames)) // Open bolt DB.
boltClient := bolt.NewClient(b.logger)
for _, backupFilename := range backupFilenames { boltClient.Path = filepath.Join(b.path, b.kvPath())
dest := filepath.Join(backupFlags.Path, backupFilename) if err := boltClient.Open(ctx); err != nil {
w, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return err return err
} }
err = backupService.FetchBackupFile(ctx, id, backupFilename, w) defer boltClient.Close()
if err != nil {
return multierr.Append(fmt.Errorf("error fetching file %s: %v", backupFilename, err), w.Close()) // Open meta store so we can iterate over meta data.
} b.kvStore = bolt.NewKVStore(b.logger, filepath.Join(b.path, b.kvPath()))
if err = w.Close(); err != nil { b.kvStore.WithDB(boltClient.DB())
b.kvService = kv.NewService(b.logger, b.kvStore, kv.ServiceConfig{})
b.metaClient = meta.NewClient(meta.NewConfig(), b.kvStore)
if err := b.metaClient.Open(); err != nil {
return err return err
} }
// Filter through organizations & buckets to backup appropriate shards.
if err := b.backupOrganizations(ctx); err != nil {
return err
} }
fmt.Printf("Backup complete") if err := b.writeManifest(ctx); err != nil {
return err
}
b.logger.Info("Backup complete")
return nil return nil
} }
// backupKVStore streams the bolt KV file to a file at path.
func (b *cmdBackupBuilder) backupKVStore(ctx context.Context) error {
path := filepath.Join(b.path, b.kvPath())
b.logger.Info("Backing up KV store", zap.String("path", b.kvPath()))
// Open writer to output file.
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
// Stream bolt file from server, sync, and ensure file closes correctly.
if err := b.backupService.BackupKVStore(ctx, f); err != nil {
return err
} else if err := f.Sync(); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}
// Lookup file size.
fi, err := os.Stat(path)
if err != nil {
return err
}
b.manifest.KV = influxdb.ManifestKVEntry{
FileName: b.kvPath(),
Size: fi.Size(),
}
return nil
}
func (b *cmdBackupBuilder) backupOrganizations(ctx context.Context) (err error) {
// Build a filter if org ID or org name were specified.
var filter influxdb.OrganizationFilter
if b.org.id != "" {
if filter.ID, err = influxdb.IDFromString(b.org.id); err != nil {
return err
}
} else if b.org.name != "" {
filter.Name = &b.org.name
}
// Retrieve a list of all matching organizations.
orgs, _, err := b.kvService.FindOrganizations(ctx, filter)
if err != nil {
return err
}
// Back up buckets in each matching organization.
for _, org := range orgs {
b.logger.Info("Backing up organization", zap.String("id", org.ID.String()), zap.String("name", org.Name))
if err := b.backupBuckets(ctx, org); err != nil {
return err
}
}
return nil
}
func (b *cmdBackupBuilder) backupBuckets(ctx context.Context, org *influxdb.Organization) (err error) {
// Build a filter if bucket ID or bucket name were specified.
var filter influxdb.BucketFilter
filter.OrganizationID = &org.ID
if b.bucketID != "" {
if filter.ID, err = influxdb.IDFromString(b.bucketID); err != nil {
return err
}
} else if b.bucketName != "" {
filter.Name = &b.bucketName
}
// Retrieve a list of all matching organizations.
buckets, _, err := b.kvService.FindBuckets(ctx, filter)
if err != nil {
return err
}
// Back up shards in each matching bucket.
for _, bkt := range buckets {
if err := b.backupBucket(ctx, org, bkt); err != nil {
return err
}
}
return nil
}
func (b *cmdBackupBuilder) backupBucket(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket) (err error) {
b.logger.Info("Backing up bucket", zap.String("id", bkt.ID.String()), zap.String("name", bkt.Name))
// Lookup matching database from the meta store.
dbi := b.metaClient.Database(bkt.ID.String())
if dbi == nil {
return fmt.Errorf("bucket database not found: %s", bkt.ID.String())
}
// Iterate over and backup each shard.
for _, rpi := range dbi.RetentionPolicies {
for _, sg := range rpi.ShardGroups {
if sg.Deleted() {
continue
}
for _, sh := range sg.Shards {
if err := b.backupShard(ctx, org, bkt, rpi.Name, sh.ID); influxdb.ErrorCode(err) == influxdb.ENotFound {
b.logger.Warn("Shard removed during backup", zap.Uint64("shard_id", sh.ID))
continue
} else if err != nil {
return err
}
}
}
}
return nil
}
// backupShard streams a tar of TSM data for shard.
func (b *cmdBackupBuilder) backupShard(ctx context.Context, org *influxdb.Organization, bkt *influxdb.Bucket, policy string, shardID uint64) error {
path := filepath.Join(b.path, b.shardPath(shardID))
b.logger.Info("Backing up shard", zap.Uint64("id", shardID), zap.String("path", b.shardPath(shardID)))
// Open writer to output file.
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
// Wrap file writer with a gzip writer.
gw := gzip.NewWriter(f)
defer gw.Close()
// Stream file from server, sync, and ensure file closes correctly.
if err := b.backupService.BackupShard(ctx, gw, shardID, time.Time{}); err != nil {
return err
} else if err := gw.Close(); err != nil {
return err
} else if err := f.Sync(); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}
// Determine file size.
fi, err := os.Stat(path)
if err != nil {
return err
}
// Update manifest.
b.manifest.Files = append(b.manifest.Files, influxdb.ManifestEntry{
OrganizationID: org.ID.String(),
OrganizationName: org.Name,
BucketID: bkt.ID.String(),
BucketName: bkt.Name,
ShardID: shardID,
FileName: b.shardPath(shardID),
Size: fi.Size(),
LastModified: fi.ModTime().UTC(),
})
return nil
}
// writeManifest writes the manifest file out.
func (b *cmdBackupBuilder) writeManifest(ctx context.Context) error {
path := filepath.Join(b.path, b.manifestPath())
b.logger.Info("Writing manifest", zap.String("path", b.manifestPath()))
buf, err := json.MarshalIndent(b.manifest, "", " ")
if err != nil {
return fmt.Errorf("create manifest: %w", err)
}
buf = append(buf, '\n')
return ioutil.WriteFile(path, buf, 0600)
}
func (b *cmdBackupBuilder) newCmd(use string, runE func(*cobra.Command, []string) error) *cobra.Command {
cmd := b.genericCLIOpts.newCmd(use, runE, true)
b.genericCLIOpts.registerPrintOptions(cmd)
b.globalFlags.registerFlags(cmd)
return cmd
}

View File

@ -328,6 +328,7 @@ func influxCmd(opts ...genericCLIOptFn) *cobra.Command {
cmdOrganization, cmdOrganization,
cmdPing, cmdPing,
cmdQuery, cmdQuery,
cmdRestore,
cmdSecret, cmdSecret,
cmdSetup, cmdSetup,
cmdStack, cmdStack,

399
cmd/influx/restore.go Normal file
View File

@ -0,0 +1,399 @@
package main
import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/kv"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
func cmdRestore(f *globalFlags, opts genericCLIOpts) *cobra.Command {
return newCmdRestoreBuilder(f, opts).cmdRestore()
}
type cmdRestoreBuilder struct {
genericCLIOpts
*globalFlags
full bool
bucketID string
bucketName string
newBucketName string
newOrgName string
org organization
path string
kvEntry *influxdb.ManifestKVEntry
shardEntries map[uint64]*influxdb.ManifestEntry
orgService *http.OrganizationService
bucketService *http.BucketService
restoreService *http.RestoreService
kvService *kv.Service
metaClient *meta.Client
logger *zap.Logger
}
func newCmdRestoreBuilder(f *globalFlags, opts genericCLIOpts) *cmdRestoreBuilder {
return &cmdRestoreBuilder{
genericCLIOpts: opts,
globalFlags: f,
shardEntries: make(map[uint64]*influxdb.ManifestEntry),
}
}
func (b *cmdRestoreBuilder) cmdRestore() *cobra.Command {
cmd := b.newCmd("restore", b.restoreRunE)
b.org.register(cmd, true)
cmd.Flags().BoolVar(&b.full, "full", false, "Fully restore and replace all data on server")
cmd.Flags().StringVar(&b.bucketID, "bucket-id", "", "The ID of the bucket to restore")
cmd.Flags().StringVarP(&b.bucketName, "bucket", "b", "", "The name of the bucket to restore")
cmd.Flags().StringVar(&b.newBucketName, "new-bucket", "", "The name of the bucket to restore to")
cmd.Flags().StringVar(&b.newOrgName, "new-org", "", "The name of the organization to restore to")
cmd.Flags().StringVar(&b.path, "input", "", "Local backup data path (required)")
cmd.Use = "restore [flags] path"
cmd.Args = func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return fmt.Errorf("must specify path to backup directory")
} else if len(args) > 1 {
return fmt.Errorf("too many args specified")
}
b.path = args[0]
return nil
}
cmd.Short = "Restores a backup directory to InfluxDB."
cmd.Long = `
Restore influxdb.
Examples:
# restore all data
influx restore /path/to/restore
`
return cmd
}
func (b *cmdRestoreBuilder) restoreRunE(cmd *cobra.Command, args []string) (err error) {
ctx := context.Background()
// Create top level logger
logconf := influxlogger.NewConfig()
if b.logger, err = logconf.New(os.Stdout); err != nil {
return err
}
// Ensure org/bucket filters are set if a new org/bucket name is specified.
if b.newOrgName != "" && b.org.id == "" && b.org.name == "" {
return fmt.Errorf("must specify source org id or name when renaming restored org")
} else if b.newBucketName != "" && b.bucketID == "" && b.bucketName == "" {
return fmt.Errorf("must specify source bucket id or name when renaming restored bucket")
}
// Read in set of KV data & shard data to restore.
if err := b.loadIncremental(); err != nil {
return fmt.Errorf("restore failed while processing manifest files: %s", err.Error())
} else if b.kvEntry == nil {
return fmt.Errorf("no manifest files found in: %s", b.path)
}
ac := flags.config()
b.restoreService = &http.RestoreService{
Addr: ac.Host,
Token: ac.Token,
}
client, err := newHTTPClient()
if err != nil {
return err
}
b.orgService = &http.OrganizationService{Client: client}
b.bucketService = &http.BucketService{Client: client}
if !b.full {
return b.restorePartial(ctx)
}
return b.restoreFull(ctx)
}
// restoreFull completely replaces the bolt metadata file and restores all shard data.
func (b *cmdRestoreBuilder) restoreFull(ctx context.Context) (err error) {
if err := b.restoreKVStore(ctx); err != nil {
return err
}
// Restore each shard for the bucket.
for _, file := range b.shardEntries {
if err := b.restoreShard(ctx, file.ShardID, file); err != nil {
return err
}
}
return nil
}
func (b *cmdRestoreBuilder) restoreKVStore(ctx context.Context) (err error) {
f, err := os.Open(filepath.Join(b.path, b.kvEntry.FileName))
if err != nil {
return err
}
defer f.Close()
if err := b.restoreService.RestoreKVStore(ctx, f); err != nil {
return err
}
b.logger.Info("Full metadata restored.")
return nil
}
// restorePartial restores shard data to a server without deleting existing data.
// Organizations & buckets are created as needed. Cannot overwrite an existing bucket.
func (b *cmdRestoreBuilder) restorePartial(ctx context.Context) (err error) {
// Open bolt DB.
boltClient := bolt.NewClient(b.logger)
boltClient.Path = filepath.Join(b.path, b.kvEntry.FileName)
if err := boltClient.Open(ctx); err != nil {
return err
}
defer boltClient.Close()
// Open meta store so we can iterate over meta data.
kvStore := bolt.NewKVStore(b.logger, boltClient.Path)
kvStore.WithDB(boltClient.DB())
b.kvService = kv.NewService(b.logger, kvStore, kv.ServiceConfig{})
b.metaClient = meta.NewClient(meta.NewConfig(), kvStore)
if err := b.metaClient.Open(); err != nil {
return err
}
// Filter through organizations & buckets to restore appropriate shards.
if err := b.restoreOrganizations(ctx); err != nil {
return err
}
b.logger.Info("Restore complete")
return nil
}
func (b *cmdRestoreBuilder) restoreOrganizations(ctx context.Context) (err error) {
// Build a filter if org ID or org name were specified.
var filter influxdb.OrganizationFilter
if b.org.id != "" {
if filter.ID, err = influxdb.IDFromString(b.org.id); err != nil {
return err
}
} else if b.org.name != "" {
filter.Name = &b.org.name
}
// Retrieve a list of all matching organizations.
orgs, _, err := b.kvService.FindOrganizations(ctx, filter)
if err != nil {
return err
}
// Restore matching organizations.
for _, org := range orgs {
if err := b.restoreOrganization(ctx, org); err != nil {
return err
}
}
return nil
}
func (b *cmdRestoreBuilder) restoreOrganization(ctx context.Context, org *influxdb.Organization) (err error) {
b.logger.Info("Restoring organization", zap.String("id", org.ID.String()), zap.String("name", org.Name))
newOrg := *org
if b.newOrgName != "" {
newOrg.Name = b.newOrgName
}
// Create organization on server, if it doesn't already exist.
if o, err := b.orgService.FindOrganization(ctx, influxdb.OrganizationFilter{Name: &newOrg.Name}); influxdb.ErrorCode(err) == influxdb.ENotFound {
if err := b.orgService.CreateOrganization(ctx, &newOrg); err != nil {
return fmt.Errorf("cannot create organization: %w", err)
}
} else if err != nil {
return fmt.Errorf("cannot find existing organization: %#v", err)
} else {
newOrg.ID = o.ID
}
// Build a filter if bucket ID or bucket name were specified.
var filter influxdb.BucketFilter
filter.OrganizationID = &org.ID // match on backup's org ID
if b.bucketID != "" {
if filter.ID, err = influxdb.IDFromString(b.bucketID); err != nil {
return err
}
} else if b.bucketName != "" {
filter.Name = &b.bucketName
}
// Retrieve a list of all buckets for the organization in the local backup.
buckets, _, err := b.kvService.FindBuckets(ctx, filter)
if err != nil {
return err
}
// Restore each matching bucket.
for _, bkt := range buckets {
// Skip internal buckets.
if strings.HasPrefix(bkt.Name, "_") {
continue
}
bkt = bkt.Clone()
bkt.OrgID = newOrg.ID
if err := b.restoreBucket(ctx, bkt); err != nil {
return err
}
}
return nil
}
func (b *cmdRestoreBuilder) restoreBucket(ctx context.Context, bkt *influxdb.Bucket) (err error) {
b.logger.Info("Restoring bucket", zap.String("id", bkt.ID.String()), zap.String("name", bkt.Name))
// Create bucket on server.
newBucket := *bkt
if b.newBucketName != "" {
newBucket.Name = b.newBucketName
}
if err := b.bucketService.CreateBucket(ctx, &newBucket); err != nil {
return fmt.Errorf("cannot create bucket: %w", err)
}
// Lookup matching database from the meta store.
// Search using bucket ID from backup.
dbi := b.metaClient.Database(bkt.ID.String())
if dbi == nil {
return fmt.Errorf("bucket database not found: %s", bkt.ID.String())
}
// Serialize to protobufs.
buf, err := dbi.MarshalBinary()
if err != nil {
return fmt.Errorf("cannot marshal database info: %w", err)
}
shardIDMap, err := b.restoreService.RestoreBucket(ctx, newBucket.ID, buf)
if err != nil {
return fmt.Errorf("cannot restore bucket: %w", err)
}
// Restore each shard for the bucket.
for _, file := range b.shardEntries {
if bkt.ID.String() != file.BucketID {
continue
}
// Skip if shard metadata was not imported.
newID, ok := shardIDMap[file.ShardID]
if !ok {
b.logger.Warn("Meta info not found, skipping file", zap.Uint64("shard", file.ShardID), zap.String("bucket_id", file.BucketID), zap.String("filename", file.FileName))
return nil
}
if err := b.restoreShard(ctx, newID, file); err != nil {
return err
}
}
return nil
}
func (b *cmdRestoreBuilder) restoreShard(ctx context.Context, newShardID uint64, file *influxdb.ManifestEntry) error {
b.logger.Info("Restoring shard live from backup", zap.Uint64("shard", newShardID), zap.String("filename", file.FileName))
f, err := os.Open(filepath.Join(b.path, file.FileName))
if err != nil {
return err
}
defer f.Close()
gr, err := gzip.NewReader(f)
if err != nil {
return err
}
defer gr.Close()
return b.restoreService.RestoreShard(ctx, newShardID, gr)
}
// loadIncremental loads multiple manifest files from a given directory.
func (b *cmdRestoreBuilder) loadIncremental() error {
// Read all manifest files from path, sort in descending time.
manifests, err := filepath.Glob(filepath.Join(b.path, "*.manifest"))
if err != nil {
return err
} else if len(manifests) == 0 {
return nil
}
sort.Sort(sort.Reverse(sort.StringSlice(manifests)))
b.shardEntries = make(map[uint64]*influxdb.ManifestEntry)
for _, filename := range manifests {
// Skip file if it is a directory.
if fi, err := os.Stat(filename); err != nil {
return err
} else if fi.IsDir() {
continue
}
// Read manifest file for backup.
var manifest influxdb.Manifest
if buf, err := ioutil.ReadFile(filename); err != nil {
return err
} else if err := json.Unmarshal(buf, &manifest); err != nil {
return fmt.Errorf("read manifest: %v", err)
}
// Save latest KV entry.
if b.kvEntry == nil {
b.kvEntry = &manifest.KV
}
// Load most recent backup per shard.
for i := range manifest.Files {
sh := manifest.Files[i]
if _, err := os.Stat(filepath.Join(b.path, sh.FileName)); err != nil {
continue
}
entry := b.shardEntries[sh.ShardID]
if entry == nil || sh.LastModified.After(entry.LastModified) {
b.shardEntries[sh.ShardID] = &sh
}
}
}
return nil
}
func (b *cmdRestoreBuilder) newCmd(use string, runE func(*cobra.Command, []string) error) *cobra.Command {
cmd := b.genericCLIOpts.newCmd(use, runE, true)
b.genericCLIOpts.registerPrintOptions(cmd)
b.globalFlags.registerFlags(cmd)
return cmd
}

View File

@ -27,6 +27,7 @@ type Engine interface {
storage.EngineSchema storage.EngineSchema
prom.PrometheusCollector prom.PrometheusCollector
influxdb.BackupService influxdb.BackupService
influxdb.RestoreService
SeriesCardinality(orgID, bucketID influxdb.ID) int64 SeriesCardinality(orgID, bucketID influxdb.ID) int64
@ -157,16 +158,24 @@ func (t *TemporaryEngine) Flush(ctx context.Context) {
} }
} }
func (t *TemporaryEngine) CreateBackup(ctx context.Context) (int, []string, error) { func (t *TemporaryEngine) BackupKVStore(ctx context.Context, w io.Writer) error {
return t.engine.CreateBackup(ctx) return t.engine.BackupKVStore(ctx, w)
} }
func (t *TemporaryEngine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error { func (t *TemporaryEngine) RestoreKVStore(ctx context.Context, r io.Reader) error {
return t.engine.FetchBackupFile(ctx, backupID, backupFile, w) return t.engine.RestoreKVStore(ctx, r)
} }
func (t *TemporaryEngine) InternalBackupPath(backupID int) string { func (t *TemporaryEngine) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (map[uint64]uint64, error) {
return t.engine.InternalBackupPath(backupID) return t.engine.RestoreBucket(ctx, id, dbi)
}
func (t *TemporaryEngine) BackupShard(ctx context.Context, w io.Writer, shardID uint64, since time.Time) error {
return t.engine.BackupShard(ctx, w, shardID, since)
}
func (t *TemporaryEngine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error {
return t.engine.RestoreShard(ctx, shardID, r)
} }
func (t *TemporaryEngine) TSDBStore() storage.TSDBStore { func (t *TemporaryEngine) TSDBStore() storage.TSDBStore {

View File

@ -857,6 +857,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
deleteService platform.DeleteService = m.engine deleteService platform.DeleteService = m.engine
pointsWriter storage.PointsWriter = m.engine pointsWriter storage.PointsWriter = m.engine
backupService platform.BackupService = m.engine backupService platform.BackupService = m.engine
restoreService platform.RestoreService = m.engine
) )
deps, err := influxdb.NewDependencies( deps, err := influxdb.NewDependencies(
@ -1208,7 +1209,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
}, },
DeleteService: deleteService, DeleteService: deleteService,
BackupService: backupService, BackupService: backupService,
KVBackupService: m.kvService, RestoreService: restoreService,
AuthorizationService: authSvc, AuthorizationService: authSvc,
AuthorizerV1: authorizerV1, AuthorizerV1: authorizerV1,
AlgoWProxy: &http.NoopProxyHandler{}, AlgoWProxy: &http.NoopProxyHandler{},

View File

@ -1,304 +0,0 @@
package restore
import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/kit/cli"
"github.com/spf13/cobra"
)
var Command = &cobra.Command{
Use: "restore",
Short: "Restore data and metadata from a backup",
Long: `
This command restores data and metadata from a backup fileset.
Any existing metadata and data will be temporarily moved while restore runs
and deleted after restore completes.
Rebuilding the index and series file uses default options as in
"influxd inspect build-tsi" with the given target engine path.
For additional performance options, run restore with "-rebuild-index false"
and build-tsi afterwards.
NOTES:
* The influxd server should not be running when using the restore tool
as it replaces all data and metadata.
`,
Args: cobra.ExactArgs(0),
RunE: restoreE,
}
var flags struct {
boltPath string
enginePath string
credPath string
backupPath string
rebuildTSI bool
}
func init() {
dir, err := fs.InfluxDir()
if err != nil {
panic(fmt.Errorf("failed to determine influx directory: %s", err))
}
Command.Flags().SortFlags = false
pfs := Command.PersistentFlags()
pfs.SortFlags = false
opts := []cli.Opt{
{
DestP: &flags.boltPath,
Flag: "bolt-path",
Default: filepath.Join(dir, bolt.DefaultFilename),
Desc: "path to target boltdb database",
},
{
DestP: &flags.enginePath,
Flag: "engine-path",
Default: filepath.Join(dir, "engine"),
Desc: "path to target persistent engine files",
},
{
DestP: &flags.credPath,
Flag: "credentials-path",
Default: filepath.Join(dir, fs.DefaultTokenFile),
Desc: "path to target credentials file",
},
{
DestP: &flags.backupPath,
Flag: "backup-path",
Default: "",
Desc: "path to backup files",
},
{
DestP: &flags.rebuildTSI,
Flag: "rebuild-index",
Default: true,
Desc: "if true, rebuild the TSI index and series file based on the given engine path (equivalent to influxd inspect build-tsi)",
},
}
cli.BindOptions(Command, opts)
}
func restoreE(cmd *cobra.Command, args []string) error {
if flags.backupPath == "" {
return fmt.Errorf("no backup path given")
}
if err := moveBolt(); err != nil {
return fmt.Errorf("failed to move existing bolt file: %v", err)
}
if err := moveCredentials(); err != nil {
return fmt.Errorf("failed to move existing credentials file: %v", err)
}
if err := moveEngine(); err != nil {
return fmt.Errorf("failed to move existing engine data: %v", err)
}
if err := restoreBolt(); err != nil {
return fmt.Errorf("failed to restore bolt file: %v", err)
}
if err := restoreCred(); err != nil {
return fmt.Errorf("failed to restore credentials file: %v", err)
}
if err := restoreEngine(); err != nil {
return fmt.Errorf("failed to restore all TSM files: %v", err)
}
if flags.rebuildTSI {
// FIXME: Implement rebuildTSI
panic("not implemented")
//sFilePath := filepath.Join(flags.enginePath, storage.DefaultSeriesFileDirectoryName)
//indexPath := filepath.Join(flags.enginePath, storage.DefaultIndexDirectoryName)
//rebuild := inspect.NewBuildTSICommand()
//rebuild.SetArgs([]string{"--sfile-path", sFilePath, "--tsi-path", indexPath})
//rebuild.Execute()
}
if err := removeTmpBolt(); err != nil {
return fmt.Errorf("restore completed, but failed to cleanup temporary bolt file: %v", err)
}
if err := removeTmpCred(); err != nil {
return fmt.Errorf("restore completed, but failed to cleanup temporary credentials file: %v", err)
}
if err := removeTmpEngine(); err != nil {
return fmt.Errorf("restore completed, but failed to cleanup temporary engine data: %v", err)
}
return nil
}
func moveBolt() error {
if _, err := os.Stat(flags.boltPath); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
if err := removeTmpBolt(); err != nil {
return err
}
return os.Rename(flags.boltPath, flags.boltPath+".tmp")
}
func moveCredentials() error {
if _, err := os.Stat(flags.credPath); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
if err := removeTmpCred(); err != nil {
return err
}
return os.Rename(flags.credPath, flags.credPath+".tmp")
}
func moveEngine() error {
if _, err := os.Stat(flags.enginePath); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
if err := removeTmpEngine(); err != nil {
return err
}
if err := os.Rename(flags.enginePath, tmpEnginePath()); err != nil {
return err
}
return os.MkdirAll(flags.enginePath, 0777)
}
func tmpEnginePath() string {
return filepath.Dir(flags.enginePath) + "tmp"
}
func removeTmpBolt() error {
return removeIfExists(flags.boltPath + ".tmp")
}
func removeTmpEngine() error {
return removeIfExists(tmpEnginePath())
}
func removeTmpCred() error {
return removeIfExists(flags.credPath + ".tmp")
}
func removeIfExists(path string) error {
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil
} else if err != nil {
return err
} else {
return os.RemoveAll(path)
}
}
func restoreBolt() error {
backupBolt := filepath.Join(flags.backupPath, bolt.DefaultFilename)
if err := restoreFile(backupBolt, flags.boltPath, "bolt"); err != nil {
return err
}
fmt.Printf("Restored Bolt to %s from %s\n", flags.boltPath, backupBolt)
return nil
}
func restoreEngine() error {
dataDir := filepath.Join(flags.enginePath, "/data")
if err := os.MkdirAll(dataDir, 0777); err != nil {
return err
}
count := 0
err := filepath.Walk(flags.backupPath, func(path string, info os.FileInfo, err error) error {
if strings.Contains(path, ".tsm") {
f, err := os.OpenFile(path, os.O_RDONLY, 0666)
if err != nil {
return fmt.Errorf("error opening TSM file: %v", err)
}
defer f.Close()
tsmPath := filepath.Join(dataDir, filepath.Base(path))
w, err := os.OpenFile(tsmPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return err
}
defer w.Close()
_, err = io.Copy(w, f)
if err != nil {
return err
}
count++
return nil
}
return nil
})
fmt.Printf("Restored %d TSM files to %v\n", count, dataDir)
return err
}
func restoreFile(backup string, target string, filetype string) error {
f, err := os.Open(backup)
if err != nil {
return fmt.Errorf("no %s file in backup: %v", filetype, err)
}
defer f.Close()
if err := os.MkdirAll(filepath.Dir(target), 0777); err != nil {
return err
}
w, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil && !os.IsNotExist(err) {
return err
}
defer w.Close()
_, err = io.Copy(w, f)
return err
}
func restoreCred() error {
backupCred := filepath.Join(flags.backupPath, fs.DefaultTokenFile)
_, err := os.Stat(backupCred)
if os.IsNotExist(err) {
fmt.Printf("No credentials file found in backup, skipping.\n")
return nil
} else if err != nil {
return err
}
if err := restoreFile(backupCred, flags.credPath, "credentials"); err != nil {
return err
}
fmt.Printf("Restored credentials to %s from %s\n", flags.credPath, backupCred)
return nil
}

View File

@ -58,7 +58,7 @@ type APIBackend struct {
PointsWriter storage.PointsWriter PointsWriter storage.PointsWriter
DeleteService influxdb.DeleteService DeleteService influxdb.DeleteService
BackupService influxdb.BackupService BackupService influxdb.BackupService
KVBackupService influxdb.KVBackupService RestoreService influxdb.RestoreService
AuthorizationService influxdb.AuthorizationService AuthorizationService influxdb.AuthorizationService
AuthorizerV1 influxdb.AuthorizerV1 AuthorizerV1 influxdb.AuthorizerV1
OnboardingService influxdb.OnboardingService OnboardingService influxdb.OnboardingService
@ -193,6 +193,10 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler {
backupBackend.BackupService = authorizer.NewBackupService(backupBackend.BackupService) backupBackend.BackupService = authorizer.NewBackupService(backupBackend.BackupService)
h.Mount(prefixBackup, NewBackupHandler(backupBackend)) h.Mount(prefixBackup, NewBackupHandler(backupBackend))
restoreBackend := NewRestoreBackend(b)
restoreBackend.RestoreService = authorizer.NewRestoreService(restoreBackend.RestoreService)
h.Mount(prefixRestore, NewRestoreHandler(restoreBackend))
h.Mount(dbrp.PrefixDBRP, dbrp.NewHTTPHandler(b.Logger, b.DBRPService, b.OrganizationService)) h.Mount(dbrp.PrefixDBRP, dbrp.NewHTTPHandler(b.Logger, b.DBRPService, b.OrganizationService))
writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b) writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b)
@ -234,6 +238,7 @@ var apiLinks = map[string]interface{}{
"analyze": "/api/v2/query/analyze", "analyze": "/api/v2/query/analyze",
"suggestions": "/api/v2/query/suggestions", "suggestions": "/api/v2/query/suggestions",
}, },
"restore": "/api/v2/restore",
"setup": "/api/v2/setup", "setup": "/api/v2/setup",
"signin": "/api/v2/signin", "signin": "/api/v2/signin",
"signout": "/api/v2/signout", "signout": "/api/v2/signout",

View File

@ -2,23 +2,17 @@ package http
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"os" "net/url"
"path"
"path/filepath"
"strconv" "strconv"
"time" "time"
"github.com/influxdata/httprouter" "github.com/influxdata/httprouter"
"github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt" // "github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/kit/tracing" "github.com/influxdata/influxdb/v2/kit/tracing"
"go.uber.org/multierr"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -28,7 +22,6 @@ type BackupBackend struct {
influxdb.HTTPErrorHandler influxdb.HTTPErrorHandler
BackupService influxdb.BackupService BackupService influxdb.BackupService
KVBackupService influxdb.KVBackupService
} }
// NewBackupBackend returns a new instance of BackupBackend. // NewBackupBackend returns a new instance of BackupBackend.
@ -38,7 +31,6 @@ func NewBackupBackend(b *APIBackend) *BackupBackend {
HTTPErrorHandler: b.HTTPErrorHandler, HTTPErrorHandler: b.HTTPErrorHandler,
BackupService: b.BackupService, BackupService: b.BackupService,
KVBackupService: b.KVBackupService,
} }
} }
@ -49,22 +41,16 @@ type BackupHandler struct {
Logger *zap.Logger Logger *zap.Logger
BackupService influxdb.BackupService BackupService influxdb.BackupService
KVBackupService influxdb.KVBackupService
} }
const ( const (
prefixBackup = "/api/v2/backup" prefixBackup = "/api/v2/backup"
backupIDParamName = "backup_id" backupKVStorePath = prefixBackup + "/kv"
backupFileParamName = "backup_file" backupShardPath = prefixBackup + "/shards/:shardID"
backupFilePath = prefixBackup + "/:" + backupIDParamName + "/file/:" + backupFileParamName
httpClientTimeout = time.Hour httpClientTimeout = time.Hour
) )
func composeBackupFilePath(backupID int, backupFile string) string {
return path.Join(prefixBackup, fmt.Sprint(backupID), "file", fmt.Sprint(backupFile))
}
// NewBackupHandler creates a new handler at /api/v2/backup to receive backup requests. // NewBackupHandler creates a new handler at /api/v2/backup to receive backup requests.
func NewBackupHandler(b *BackupBackend) *BackupHandler { func NewBackupHandler(b *BackupBackend) *BackupHandler {
h := &BackupHandler{ h := &BackupHandler{
@ -72,107 +58,48 @@ func NewBackupHandler(b *BackupBackend) *BackupHandler {
Router: NewRouter(b.HTTPErrorHandler), Router: NewRouter(b.HTTPErrorHandler),
Logger: b.Logger, Logger: b.Logger,
BackupService: b.BackupService, BackupService: b.BackupService,
KVBackupService: b.KVBackupService,
} }
h.HandlerFunc(http.MethodPost, prefixBackup, h.handleCreate) h.HandlerFunc(http.MethodGet, backupKVStorePath, h.handleBackupKVStore)
h.HandlerFunc(http.MethodGet, backupFilePath, h.handleFetchFile) h.HandlerFunc(http.MethodGet, backupShardPath, h.handleBackupShard)
return h return h
} }
type backup struct { func (h *BackupHandler) handleBackupKVStore(w http.ResponseWriter, r *http.Request) {
ID int `json:"id,omitempty"` span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleBackupKVStore")
Files []string `json:"files,omitempty"`
}
func (h *BackupHandler) handleCreate(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleCreate")
defer span.Finish() defer span.Finish()
ctx := r.Context() ctx := r.Context()
id, files, err := h.BackupService.CreateBackup(ctx) if err := h.BackupService.BackupKVStore(ctx, w); err != nil {
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
internalBackupPath := h.BackupService.InternalBackupPath(id)
boltPath := filepath.Join(internalBackupPath, bolt.DefaultFilename)
boltFile, err := os.OpenFile(boltPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0660)
if err != nil {
err = multierr.Append(err, os.RemoveAll(internalBackupPath))
h.HandleHTTPError(ctx, err, w)
return
}
if err = h.KVBackupService.Backup(ctx, boltFile); err != nil {
err = multierr.Append(err, os.RemoveAll(internalBackupPath))
h.HandleHTTPError(ctx, err, w)
return
}
files = append(files, bolt.DefaultFilename)
credsExist, err := h.backupCredentials(internalBackupPath)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if credsExist {
files = append(files, fs.DefaultConfigsFile)
}
b := backup{
ID: id,
Files: files,
}
if err = json.NewEncoder(w).Encode(&b); err != nil {
err = multierr.Append(err, os.RemoveAll(internalBackupPath))
h.HandleHTTPError(ctx, err, w) h.HandleHTTPError(ctx, err, w)
return return
} }
} }
func (h *BackupHandler) backupCredentials(internalBackupPath string) (bool, error) { func (h *BackupHandler) handleBackupShard(w http.ResponseWriter, r *http.Request) {
credBackupPath := filepath.Join(internalBackupPath, fs.DefaultConfigsFile) span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleBackupShard")
credPath, err := defaultConfigsPath()
if err != nil {
return false, err
}
token, err := ioutil.ReadFile(credPath)
if err != nil && !os.IsNotExist(err) {
return false, err
} else if os.IsNotExist(err) {
return false, nil
}
if err := ioutil.WriteFile(credBackupPath, []byte(token), 0600); err != nil {
return false, err
}
return true, nil
}
func (h *BackupHandler) handleFetchFile(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "BackupHandler.handleFetchFile")
defer span.Finish() defer span.Finish()
ctx := r.Context() ctx := r.Context()
params := httprouter.ParamsFromContext(ctx) params := httprouter.ParamsFromContext(ctx)
backupID, err := strconv.Atoi(params.ByName("backup_id")) shardID, err := strconv.ParseUint(params.ByName("shardID"), 10, 64)
if err != nil { if err != nil {
h.HandleHTTPError(ctx, err, w) h.HandleHTTPError(ctx, err, w)
return return
} }
backupFile := params.ByName("backup_file")
if err = h.BackupService.FetchBackupFile(ctx, backupID, backupFile, w); err != nil { var since time.Time
if s := r.URL.Query().Get("since"); s != "" {
if since, err = time.ParseInLocation(time.RFC3339, s, time.UTC); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}
if err := h.BackupService.BackupShard(ctx, w, shardID, since); err != nil {
h.HandleHTTPError(ctx, err, w) h.HandleHTTPError(ctx, err, w)
return return
} }
@ -185,47 +112,11 @@ type BackupService struct {
InsecureSkipVerify bool InsecureSkipVerify bool
} }
func (s *BackupService) CreateBackup(ctx context.Context) (int, []string, error) { func (s *BackupService) BackupKVStore(ctx context.Context, w io.Writer) error {
span, ctx := tracing.StartSpanFromContext(ctx) span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish() defer span.Finish()
u, err := NewURL(s.Addr, prefixBackup) u, err := NewURL(s.Addr, prefixBackup+"/kv")
if err != nil {
return 0, nil, err
}
req, err := http.NewRequest(http.MethodPost, u.String(), nil)
if err != nil {
return 0, nil, err
}
SetToken(s.Token, req)
req = req.WithContext(ctx)
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
hc.Timeout = httpClientTimeout
resp, err := hc.Do(req)
if err != nil {
return 0, nil, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return 0, nil, err
}
var b backup
if err = json.NewDecoder(resp.Body).Decode(&b); err != nil {
return 0, nil, err
}
return b.ID, b.Files, nil
}
func (s *BackupService) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(s.Addr, composeBackupFilePath(backupID, backupFile))
if err != nil { if err != nil {
return err return err
} }
@ -249,22 +140,45 @@ func (s *BackupService) FetchBackupFile(ctx context.Context, backupID int, backu
return err return err
} }
_, err = io.Copy(w, resp.Body) if _, err := io.Copy(w, resp.Body); err != nil {
return err
}
return resp.Body.Close()
}
func (s *BackupService) BackupShard(ctx context.Context, w io.Writer, shardID uint64, since time.Time) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(s.Addr, fmt.Sprintf(prefixBackup+"/shards/%d", shardID))
if err != nil { if err != nil {
return err return err
} }
if !since.IsZero() {
return nil u.RawQuery = (url.Values{"since": {since.UTC().Format(time.RFC3339)}}).Encode()
} }
func defaultConfigsPath() (string, error) { req, err := http.NewRequest(http.MethodGet, u.String(), nil)
dir, err := fs.InfluxDir()
if err != nil { if err != nil {
return "", err return err
} }
return filepath.Join(dir, fs.DefaultConfigsFile), nil SetToken(s.Token, req)
req = req.WithContext(ctx)
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
hc.Timeout = httpClientTimeout
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return err
} }
func (s *BackupService) InternalBackupPath(backupID int) string { if _, err := io.Copy(w, resp.Body); err != nil {
panic("internal method not implemented here") return err
}
return resp.Body.Close()
} }

234
http/restore_service.go Normal file
View File

@ -0,0 +1,234 @@
package http
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"go.uber.org/zap"
)
// RestoreBackend is all services and associated parameters required to construct the RestoreHandler.
type RestoreBackend struct {
Logger *zap.Logger
influxdb.HTTPErrorHandler
RestoreService influxdb.RestoreService
}
// NewRestoreBackend returns a new instance of RestoreBackend.
func NewRestoreBackend(b *APIBackend) *RestoreBackend {
return &RestoreBackend{
Logger: b.Logger.With(zap.String("handler", "restore")),
HTTPErrorHandler: b.HTTPErrorHandler,
RestoreService: b.RestoreService,
}
}
// RestoreHandler is http handler for restore service.
type RestoreHandler struct {
*httprouter.Router
influxdb.HTTPErrorHandler
Logger *zap.Logger
RestoreService influxdb.RestoreService
}
const (
prefixRestore = "/api/v2/restore"
restoreKVPath = prefixRestore + "/kv"
restoreBucketPath = prefixRestore + "/buckets/:bucketID"
restoreShardPath = prefixRestore + "/shards/:shardID"
)
// NewRestoreHandler creates a new handler at /api/v2/restore to receive restore requests.
func NewRestoreHandler(b *RestoreBackend) *RestoreHandler {
h := &RestoreHandler{
HTTPErrorHandler: b.HTTPErrorHandler,
Router: NewRouter(b.HTTPErrorHandler),
Logger: b.Logger,
RestoreService: b.RestoreService,
}
h.HandlerFunc(http.MethodPost, restoreKVPath, h.handleRestoreKVStore)
h.HandlerFunc(http.MethodPost, restoreBucketPath, h.handleRestoreBucket)
h.HandlerFunc(http.MethodPost, restoreShardPath, h.handleRestoreShard)
return h
}
func (h *RestoreHandler) handleRestoreKVStore(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreKVStore")
defer span.Finish()
ctx := r.Context()
if err := h.RestoreService.RestoreKVStore(ctx, r.Body); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}
func (h *RestoreHandler) handleRestoreBucket(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreBucket")
defer span.Finish()
ctx := r.Context()
// Read bucket ID.
bucketID, err := decodeIDFromCtx(r.Context(), "bucketID")
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
// Read serialized DBI data.
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
shardIDMap, err := h.RestoreService.RestoreBucket(ctx, bucketID, buf)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err := json.NewEncoder(w).Encode(shardIDMap); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}
func (h *RestoreHandler) handleRestoreShard(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreShard")
defer span.Finish()
ctx := r.Context()
params := httprouter.ParamsFromContext(ctx)
shardID, err := strconv.ParseUint(params.ByName("shardID"), 10, 64)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err := h.RestoreService.RestoreShard(ctx, shardID, r.Body); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}
// RestoreService is the client implementation of influxdb.RestoreService.
type RestoreService struct {
Addr string
Token string
InsecureSkipVerify bool
}
func (s *RestoreService) RestoreKVStore(ctx context.Context, r io.Reader) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(s.Addr, restoreKVPath)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, u.String(), r)
if err != nil {
return err
}
SetToken(s.Token, req)
req = req.WithContext(ctx)
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
hc.Timeout = httpClientTimeout
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return err
}
return nil
}
func (s *RestoreService) RestoreBucket(ctx context.Context, id influxdb.ID, dbi []byte) (map[uint64]uint64, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(s.Addr, prefixRestore+fmt.Sprintf("/buckets/%s", id.String()))
if err != nil {
return nil, err
}
req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewReader(dbi))
if err != nil {
return nil, err
}
SetToken(s.Token, req)
req = req.WithContext(ctx)
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
hc.Timeout = httpClientTimeout
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return nil, err
}
shardIDMap := make(map[uint64]uint64)
if err := json.NewDecoder(resp.Body).Decode(&shardIDMap); err != nil {
return nil, err
}
return shardIDMap, nil
}
func (s *RestoreService) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(s.Addr, fmt.Sprintf(prefixRestore+"/shards/%d", shardID))
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, u.String(), r)
if err != nil {
return err
}
SetToken(s.Token, req)
req = req.WithContext(ctx)
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
hc.Timeout = httpClientTimeout
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return err
}
return nil
}

View File

@ -88,6 +88,10 @@ func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
panic("not implemented") panic("not implemented")
} }
func (s *KVStore) Restore(ctx context.Context, r io.Reader) error {
panic("not implemented")
}
// Flush removes all data from the buckets. Used for testing. // Flush removes all data from the buckets. Used for testing.
func (s *KVStore) Flush(ctx context.Context) { func (s *KVStore) Flush(ctx context.Context) {
s.mu.Lock() s.mu.Lock()

View File

@ -26,6 +26,10 @@ func (s mockStore) Backup(ctx context.Context, w io.Writer) error {
return nil return nil
} }
func (s mockStore) Restore(ctx context.Context, r io.Reader) error {
return nil
}
func TestNewService(t *testing.T) { func TestNewService(t *testing.T) {
s := kv.NewService(zaptest.NewLogger(t), mockStore{}) s := kv.NewService(zaptest.NewLogger(t), mockStore{})

View File

@ -51,6 +51,8 @@ type Store interface {
Update(context.Context, func(Tx) error) error Update(context.Context, func(Tx) error) error
// Backup copies all K:Vs to a writer, file format determined by implementation. // Backup copies all K:Vs to a writer, file format determined by implementation.
Backup(ctx context.Context, w io.Writer) error Backup(ctx context.Context, w io.Writer) error
// Restore replaces the underlying data file with the data from r.
Restore(ctx context.Context, r io.Reader) error
} }
// Tx is a transaction in the store. // Tx is a transaction in the store.

View File

@ -14,6 +14,7 @@ type Store struct {
ViewFn func(func(kv.Tx) error) error ViewFn func(func(kv.Tx) error) error
UpdateFn func(func(kv.Tx) error) error UpdateFn func(func(kv.Tx) error) error
BackupFn func(ctx context.Context, w io.Writer) error BackupFn func(ctx context.Context, w io.Writer) error
RestoreFn func(ctx context.Context, r io.Reader) error
} }
// View opens up a transaction that will not write to any data. Implementing interfaces // View opens up a transaction that will not write to any data. Implementing interfaces
@ -31,6 +32,10 @@ func (s *Store) Backup(ctx context.Context, w io.Writer) error {
return s.BackupFn(ctx, w) return s.BackupFn(ctx, w)
} }
func (s *Store) Restore(ctx context.Context, r io.Reader) error {
return s.RestoreFn(ctx, r)
}
var _ (kv.Tx) = (*Tx)(nil) var _ (kv.Tx) = (*Tx)(nil)
// Tx is mock of a kv.Tx. // Tx is mock of a kv.Tx.

View File

@ -79,6 +79,10 @@ type MetaClient interface {
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
Backup(ctx context.Context, w io.Writer) error
Restore(ctx context.Context, r io.Reader) error
Data() meta.Data
SetData(data *meta.Data) error
} }
type TSDBStore interface { type TSDBStore interface {
@ -306,41 +310,148 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID
return ErrNotImplemented return ErrNotImplemented
} }
// CreateBackup creates a "snapshot" of all TSM data in the Engine. func (e *Engine) BackupKVStore(ctx context.Context, w io.Writer) error {
// 1) Snapshot the cache to ensure the backup includes all data written before now.
// 2) Create hard links to all TSM files, in a new directory within the engine root directory.
// 3) Return a unique backup ID (invalid after the process terminates) and list of files.
//
// TODO - do we need this?
//
func (e *Engine) CreateBackup(ctx context.Context) (int, []string, error) {
span, _ := tracing.StartSpanFromContext(ctx) span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish() defer span.Finish()
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil { if e.closing == nil {
return 0, nil, ErrEngineClosed return ErrEngineClosed
} }
return 0, nil, nil return e.metaClient.Backup(ctx, w)
}
func (e *Engine) BackupShard(ctx context.Context, w io.Writer, shardID uint64, since time.Time) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return ErrEngineClosed
}
return e.tsdbStore.BackupShard(shardID, since, w)
}
func (e *Engine) RestoreKVStore(ctx context.Context, r io.Reader) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return ErrEngineClosed
}
// Replace KV store data and remove all existing shard data.
if err := e.metaClient.Restore(ctx, r); err != nil {
return err
} else if err := e.tsdbStore.DeleteShards(); err != nil {
return err
}
// Create new shards based on the restored KV data.
data := e.metaClient.Data()
for _, dbi := range data.Databases {
for _, rpi := range dbi.RetentionPolicies {
for _, sgi := range rpi.ShardGroups {
if sgi.Deleted() {
continue
}
for _, sh := range sgi.Shards {
if err := e.tsdbStore.CreateShard(dbi.Name, rpi.Name, sh.ID, true); err != nil {
return err
}
}
}
}
} }
// FetchBackupFile writes a given backup file to the provided writer.
// After a successful write, the internal copy is removed.
func (e *Engine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error {
// TODO - need?
return nil return nil
} }
// InternalBackupPath provides the internal, full path directory name of the backup. func (e *Engine) RestoreBucket(ctx context.Context, id influxdb.ID, buf []byte) (map[uint64]uint64, error) {
// This should not be exposed via API. span, _ := tracing.StartSpanFromContext(ctx)
func (e *Engine) InternalBackupPath(backupID int) string { defer span.Finish()
e.mu.RLock() e.mu.RLock()
defer e.mu.RUnlock() defer e.mu.RUnlock()
if e.closing == nil { if e.closing == nil {
return "" return nil, ErrEngineClosed
} }
// TODO - need?
return "" var newDBI meta.DatabaseInfo
if err := newDBI.UnmarshalBinary(buf); err != nil {
return nil, err
}
data := e.metaClient.Data()
dbi := data.Database(id.String())
if dbi == nil {
return nil, fmt.Errorf("bucket dbi for %q not found during restore", newDBI.Name)
} else if len(newDBI.RetentionPolicies) != 1 {
return nil, fmt.Errorf("bucket must have 1 retention policy; attempting to restore %d retention policies", len(newDBI.RetentionPolicies))
}
dbi.RetentionPolicies = newDBI.RetentionPolicies
dbi.ContinuousQueries = newDBI.ContinuousQueries
// Generate shard ID mapping.
shardIDMap := make(map[uint64]uint64)
rpi := newDBI.RetentionPolicies[0]
for j, sgi := range rpi.ShardGroups {
data.MaxShardGroupID++
rpi.ShardGroups[j].ID = data.MaxShardGroupID
for k := range sgi.Shards {
data.MaxShardID++
shardIDMap[sgi.Shards[k].ID] = data.MaxShardID
sgi.Shards[k].ID = data.MaxShardID
sgi.Shards[k].Owners = []meta.ShardOwner{}
}
}
// Update data.
if err := e.metaClient.SetData(&data); err != nil {
return nil, err
}
// Create shards.
for _, sgi := range rpi.ShardGroups {
if sgi.Deleted() {
continue
}
for _, sh := range sgi.Shards {
if err := e.tsdbStore.CreateShard(dbi.Name, rpi.Name, sh.ID, true); err != nil {
return nil, err
}
}
}
return shardIDMap, nil
}
func (e *Engine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return ErrEngineClosed
}
return e.tsdbStore.RestoreShard(shardID, r)
} }
// SeriesCardinality returns the number of series in the engine. // SeriesCardinality returns the number of series in the engine.

View File

@ -1199,15 +1199,7 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as
return "", nil return "", nil
} }
nativeFileName := filepath.FromSlash(hdr.Name) filename := filepath.Base(filepath.FromSlash(hdr.Name))
// Skip file if it does not have a matching prefix.
if !strings.HasPrefix(nativeFileName, shardRelativePath) {
return "", nil
}
filename, err := filepath.Rel(shardRelativePath, nativeFileName)
if err != nil {
return "", err
}
// If this is a directory entry (usually just `index` for tsi), create it an move on. // If this is a directory entry (usually just `index` for tsi), create it an move on.
if hdr.Typeflag == tar.TypeDir { if hdr.Typeflag == tar.TypeDir {

View File

@ -17,6 +17,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/models"
@ -695,6 +696,16 @@ func (s *Store) SetShardEnabled(shardID uint64, enabled bool) error {
return nil return nil
} }
// DeleteShards removes all shards from disk.
func (s *Store) DeleteShards() error {
for _, id := range s.ShardIDs() {
if err := s.DeleteShard(id); err != nil {
return err
}
}
return nil
}
// DeleteShard removes a shard from disk. // DeleteShard removes a shard from disk.
func (s *Store) DeleteShard(shardID uint64) error { func (s *Store) DeleteShard(shardID uint64) error {
sh := s.Shard(shardID) sh := s.Shard(shardID)
@ -1192,7 +1203,10 @@ func (s *Store) MeasurementsSketches(database string) (estimator.Sketch, estimat
func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error { func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error {
shard := s.Shard(id) shard := s.Shard(id)
if shard == nil { if shard == nil {
return fmt.Errorf("shard %d doesn't exist on this server", id) return &influxdb.Error{
Code: influxdb.ENotFound,
Msg: fmt.Sprintf("shard %d not found", id),
}
} }
path, err := relativePath(s.path, shard.path) path, err := relativePath(s.path, shard.path)
@ -1206,7 +1220,10 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error {
func (s *Store) ExportShard(id uint64, start time.Time, end time.Time, w io.Writer) error { func (s *Store) ExportShard(id uint64, start time.Time, end time.Time, w io.Writer) error {
shard := s.Shard(id) shard := s.Shard(id)
if shard == nil { if shard == nil {
return fmt.Errorf("shard %d doesn't exist on this server", id) return &influxdb.Error{
Code: influxdb.ENotFound,
Msg: fmt.Sprintf("shard %d not found", id),
}
} }
path, err := relativePath(s.path, shard.path) path, err := relativePath(s.path, shard.path)

View File

@ -1036,6 +1036,17 @@ func (c *Client) Load() error {
}) })
} }
func (c *Client) Backup(ctx context.Context, w io.Writer) error {
return c.store.Backup(ctx, w)
}
func (c *Client) Restore(ctx context.Context, r io.Reader) error {
if err := c.store.Restore(ctx, r); err != nil {
return err
}
return c.Load()
}
type uint64Slice []uint64 type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) } func (a uint64Slice) Len() int { return len(a) }

View File

@ -994,6 +994,21 @@ func (di DatabaseInfo) clone() DatabaseInfo {
return other return other
} }
// MarshalBinary encodes dbi to a binary format.
func (dbi *DatabaseInfo) MarshalBinary() ([]byte, error) {
return proto.Marshal(dbi.marshal())
}
// UnmarshalBinary decodes dbi from a binary format.
func (dbi *DatabaseInfo) UnmarshalBinary(data []byte) error {
var pb internal.DatabaseInfo
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
dbi.unmarshal(&pb)
return nil
}
// marshal serializes to a protobuf representation. // marshal serializes to a protobuf representation.
func (di DatabaseInfo) marshal() *internal.DatabaseInfo { func (di DatabaseInfo) marshal() *internal.DatabaseInfo {
pb := &internal.DatabaseInfo{} pb := &internal.DatabaseInfo{}

View File

@ -36,6 +36,10 @@ func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
panic("not implemented") panic("not implemented")
} }
func (s *KVStore) Restore(ctx context.Context, r io.Reader) error {
panic("not implemented")
}
// Tx is an in memory transaction. // Tx is an in memory transaction.
// TODO: make transactions actually transactional // TODO: make transactions actually transactional
type Tx struct { type Tx struct {