Add incremental backups.

This commit adds incremental backup support. Snapshotting from the server
now creates a full backup if one does not exist and creates numbered
incremental backups after that.

For example, if you ran:

  $ influxd backup /tmp/snapshot

Then you'll see a full snapshot in /tmp/snapshot. If you run the same
command again then an incremental snapshot will be created at
/tmp/snapshot.0. Running it again will create /tmp/snapshot.1.
pull/2053/head
Ben Johnson 2015-03-24 15:57:03 -06:00
parent 29cb550d95
commit 2401e69f58
7 changed files with 471 additions and 24 deletions

View File

@ -1,6 +1,8 @@
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
@ -8,6 +10,8 @@ import (
"net/http"
"net/url"
"os"
"github.com/influxdb/influxdb"
)
// BackupSuffix is a suffix added to the backup while it's in-process.
@ -41,13 +45,25 @@ func (cmd *BackupCommand) Run(args ...string) error {
return err
}
// TODO: Check highest index from local version.
// Retrieve snapshot from local file.
ss, err := influxdb.ReadFileSnapshot(path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("read file snapshot: %s", err)
}
// Determine temporary path to download to.
tmppath := path + BackupSuffix
// Calculate path of next backup file.
// This uses the path if it doesn't exist.
// Otherwise it appends an autoincrementing number.
path, err = cmd.nextPath(path)
if err != nil {
return fmt.Errorf("next path: %s", err)
}
// Retrieve snapshot.
if err := cmd.download(u, tmppath); err != nil {
if err := cmd.download(u, ss, tmppath); err != nil {
return fmt.Errorf("download: %s", err)
}
@ -89,8 +105,28 @@ func (cmd *BackupCommand) parseFlags(args []string) (url.URL, string, error) {
return *u, path, nil
}
// nextPath returns the next file to write to.
func (cmd *BackupCommand) nextPath(path string) (string, error) {
// Use base path if it doesn't exist.
if _, err := os.Stat(path); os.IsNotExist(err) {
return path, nil
} else if err != nil {
return "", err
}
// Otherwise iterate through incremental files until one is available.
for i := 0; ; i++ {
s := fmt.Sprintf(path+".%d", i)
if _, err := os.Stat(s); os.IsNotExist(err) {
return s, nil
} else if err != nil {
return "", err
}
}
}
// download downloads a snapshot from a host to a given path.
func (cmd *BackupCommand) download(u url.URL, path string) error {
func (cmd *BackupCommand) download(u url.URL, ss *influxdb.Snapshot, path string) error {
// Create local file to write to.
f, err := os.Create(path)
if err != nil {
@ -98,9 +134,23 @@ func (cmd *BackupCommand) download(u url.URL, path string) error {
}
defer f.Close()
// Fetch the archive from the server.
// Encode snapshot.
var buf bytes.Buffer
if ss != nil {
if err := json.NewEncoder(&buf).Encode(ss); err != nil {
return fmt.Errorf("encode snapshot: %s", err)
}
}
// Create request with existing snapshot as the body.
u.Path = "/snapshot"
resp, err := http.Get(u.String())
req, err := http.NewRequest("GET", u.String(), &buf)
if err != nil {
return fmt.Errorf("new request: %s", err)
}
// Fetch the archive from the server.
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("get: %s", err)
}

View File

@ -32,16 +32,26 @@ func TestBackupCommand(t *testing.T) {
}))
defer s.Close()
// Execute the backup against the mock server.
// Create a temp path and remove incremental backups at the end.
path := tempfile()
defer os.Remove(path)
defer os.Remove(path)
defer os.Remove(path)
// Execute the backup against the mock server.
for i := 0; i < 3; i++ {
if err := NewBackupCommand().Run("-host", s.URL, path); err != nil {
t.Fatal(err)
}
}
// Verify snapshot was written to path.
// Verify snapshot and two incremental snapshots were written.
if _, err := os.Stat(path); err != nil {
t.Fatalf("snapshot not found: %s", err)
} else if _, err = os.Stat(path + ".0"); err != nil {
t.Fatalf("incremental snapshot(0) not found: %s", err)
} else if _, err = os.Stat(path + ".1"); err != nil {
t.Fatalf("incremental snapshot(1) not found: %s", err)
}
}

View File

@ -53,22 +53,21 @@ func (cmd *RestoreCommand) Run(args ...string) error {
return fmt.Errorf("remove data dir: %s", err)
}
// Open snapshot file.
f, err := os.Open(path)
// Open snapshot file and all incremental backups.
ssr, files, err := influxdb.OpenFileSnapshotsReader(path)
if err != nil {
return fmt.Errorf("open: %s", err)
}
defer f.Close()
defer closeAll(files)
// Create reader and extract manifest.
sr := influxdb.NewSnapshotReader(f)
ss, err := sr.Snapshot()
// Extract manifest.
ss, err := ssr.Snapshot()
if err != nil {
return fmt.Errorf("snapshot: %s", err)
}
// Unpack snapshot files into data directory.
if err := cmd.unpack(config.DataDir(), sr); err != nil {
if err := cmd.unpack(config.DataDir(), ssr); err != nil {
return fmt.Errorf("unpack: %s", err)
}
@ -110,8 +109,14 @@ func (cmd *RestoreCommand) parseFlags(args []string) (*Config, string, error) {
return config, path, nil
}
func closeAll(a []io.Closer) {
for _, c := range a {
_ = c.Close()
}
}
// unpack expands the files in the snapshot archive into a directory.
func (cmd *RestoreCommand) unpack(path string, sr *influxdb.SnapshotReader) error {
func (cmd *RestoreCommand) unpack(path string, ssr *influxdb.SnapshotsReader) error {
// Create root directory.
if err := os.MkdirAll(path, 0777); err != nil {
return fmt.Errorf("mkdir: err=%s", err)
@ -120,7 +125,7 @@ func (cmd *RestoreCommand) unpack(path string, sr *influxdb.SnapshotReader) erro
// Loop over files and extract.
for {
// Read entry header.
sf, err := sr.Next()
sf, err := ssr.Next()
if err == io.EOF {
break
} else if err != nil {
@ -141,7 +146,7 @@ func (cmd *RestoreCommand) unpack(path string, sr *influxdb.SnapshotReader) erro
defer f.Close()
// Copy contents from reader.
if _, err := io.CopyN(f, sr, sf.Size); err != nil {
if _, err := io.CopyN(f, ssr, sf.Size); err != nil {
return fmt.Errorf("copy: entry=%s, err=%s", sf.Name, err)
}

View File

@ -778,6 +778,13 @@ type SnapshotHandler struct {
}
func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Read in previous snapshot from request body.
var prev influxdb.Snapshot
if err := json.NewDecoder(r.Body).Decode(&prev); err != nil && err != io.EOF {
httpError(w, "error reading previous snapshot: "+err.Error(), false, http.StatusBadRequest)
return
}
// Retrieve a snapshot from the server.
sw, err := h.CreateSnapshotWriter()
if err != nil {
@ -786,7 +793,8 @@ func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
defer sw.Close()
// TODO: Subtract existing snapshot from writer.
// Subtract existing snapshot from writer.
sw.Snapshot = sw.Snapshot.Diff(&prev)
// Write to response.
if _, err := sw.WriteTo(w); err != nil {

View File

@ -1627,17 +1627,26 @@ func TestSnapshotHandler(t *testing.T) {
h.CreateSnapshotWriter = func() (*influxdb.SnapshotWriter, error) {
return &influxdb.SnapshotWriter{
Snapshot: &influxdb.Snapshot{
Files: []influxdb.SnapshotFile{{Name: "meta", Size: 5, Index: 12}},
Files: []influxdb.SnapshotFile{
{Name: "meta", Size: 5, Index: 12},
{Name: "shards/1", Size: 6, Index: 15},
},
},
FileWriters: map[string]influxdb.SnapshotFileWriter{
"meta": influxdb.NopWriteToCloser(bytes.NewBufferString("55555")),
"shards/1": influxdb.NopWriteToCloser(bytes.NewBufferString("666666")),
},
}, nil
}
// Execute handler.
// Execute handler with an existing snapshot to diff.
// The "shards/1" has a higher index in the diff so it won't be included in the snapshot.
w := httptest.NewRecorder()
h.ServeHTTP(w, nil)
r, _ := http.NewRequest(
"GET", "http://localhost/snapshot",
strings.NewReader(`{"files":[{"name":"meta","index":10},{"name":"shards/1","index":20}]}`),
)
h.ServeHTTP(w, r)
// Verify status code is successful and the snapshot was written.
if w.Code != http.StatusOK {

View File

@ -5,8 +5,10 @@ import (
"encoding/json"
"fmt"
"io"
"os"
"path"
"path/filepath"
"sort"
"time"
"github.com/boltdb/bolt"
@ -54,9 +56,45 @@ loop:
diff.Files = append(diff.Files, a)
}
// Sort files.
sort.Sort(SnapshotFiles(diff.Files))
return diff
}
// Merge returns a Snapshot that combines s with other.
// Only the newest file between the two snapshots is returned.
func (s *Snapshot) Merge(other *Snapshot) *Snapshot {
ret := &Snapshot{}
ret.Files = make([]SnapshotFile, len(s.Files))
copy(ret.Files, s.Files)
// Update/insert versions of files that are newer in other.
loop:
for _, a := range other.Files {
for i, b := range ret.Files {
// Ignore if it doesn't match.
if a.Name != b.Name {
continue
}
// Update if it's newer and then start the next file.
if a.Index > b.Index {
ret.Files[i] = a
}
continue loop
}
// If the file wasn't found then append it.
ret.Files = append(ret.Files, a)
}
// Sort files.
sort.Sort(SnapshotFiles(ret.Files))
return ret
}
// SnapshotFile represents a single file in a Snapshot.
type SnapshotFile struct {
Name string `json:"name"` // filename
@ -64,6 +102,13 @@ type SnapshotFile struct {
Index uint64 `json:"index"` // highest index applied
}
// SnapshotFiles represents a sortable list of snapshot files.
type SnapshotFiles []SnapshotFile
func (p SnapshotFiles) Len() int { return len(p) }
func (p SnapshotFiles) Less(i, j int) bool { return p[i].Name < p[j].Name }
func (p SnapshotFiles) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// SnapshotReader reads a snapshot from a Reader.
// This type is not safe for concurrent use.
type SnapshotReader struct {
@ -147,6 +192,215 @@ func (sr *SnapshotReader) Read(b []byte) (n int, err error) {
return sr.tr.Read(b)
}
// SnapshotsReader reads from a collection of snapshots.
// Only files with the highest index are read from the reader.
// This type is not safe for concurrent use.
type SnapshotsReader struct {
readers []*SnapshotReader // underlying snapshot readers
files []*SnapshotFile // current file for each reader
snapshot *Snapshot // combined snapshot from all readers
index int // index of file in snapshot to read
curr *SnapshotReader // current reader
}
// NewSnapshotsReader returns a new SnapshotsReader reading from a list of readers.
func NewSnapshotsReader(readers ...io.Reader) *SnapshotsReader {
r := &SnapshotsReader{
readers: make([]*SnapshotReader, len(readers)),
files: make([]*SnapshotFile, len(readers)),
index: -1,
}
for i := range readers {
r.readers[i] = NewSnapshotReader(readers[i])
}
return r
}
// Snapshot returns the combined snapshot from all readers.
func (ssr *SnapshotsReader) Snapshot() (*Snapshot, error) {
// Use snapshot if it's already been calculated.
if ssr.snapshot != nil {
return ssr.snapshot, nil
}
// Build snapshot from other readers.
ss := &Snapshot{}
for i, sr := range ssr.readers {
other, err := sr.Snapshot()
if err != nil {
return nil, fmt.Errorf("snapshot: idx=%d, err=%s", i, err)
}
ss = ss.Merge(other)
}
// Cache snapshot and return.
ssr.snapshot = ss
return ss, nil
}
// Next returns the next file in the reader.
func (ssr *SnapshotsReader) Next() (SnapshotFile, error) {
ss, err := ssr.Snapshot()
if err != nil {
return SnapshotFile{}, fmt.Errorf("snapshot: %s", err)
}
// Return EOF if there are no more files in snapshot.
if ssr.index == len(ss.Files)-1 {
ssr.curr = nil
return SnapshotFile{}, io.EOF
}
// Queue up next files.
if err := ssr.nextFiles(); err != nil {
return SnapshotFile{}, fmt.Errorf("next files: %s", err)
}
// Increment the file index.
ssr.index++
sf := ss.Files[ssr.index]
// Find the matching reader. Clear other readers.
var sr *SnapshotReader
for i, f := range ssr.files {
if f == nil || f.Name != sf.Name {
continue
}
// Set reader to the first match.
if sr == nil && *f == sf {
sr = ssr.readers[i]
}
ssr.files[i] = nil
}
// Return an error if file doesn't match.
// This shouldn't happen unless the underlying snapshot is altered.
if sr == nil {
return SnapshotFile{}, fmt.Errorf("snaphot file not found in readers: %s", sf.Name)
}
// Set current reader.
ssr.curr = sr
// Return file.
return sf, nil
}
// nextFiles queues up a next file for all readers.
func (ssr *SnapshotsReader) nextFiles() error {
for i, sr := range ssr.readers {
if ssr.files[i] == nil {
// Read next file.
sf, err := sr.Next()
if err == io.EOF {
ssr.files[i] = nil
continue
} else if err != nil {
return fmt.Errorf("next: reader=%d, err=%s", i, err)
}
// Cache file.
ssr.files[i] = &sf
}
}
return nil
}
// nextIndex returns the index of the next reader to read from.
// Returns -1 if all readers are at EOF.
func (ssr *SnapshotsReader) nextIndex() int {
// Find the next file by name and lowest index.
index := -1
for i, f := range ssr.files {
if f == nil {
continue
} else if index == -1 {
index = i
} else if f.Name < ssr.files[index].Name {
index = i
} else if f.Name == ssr.files[index].Name && f.Index > ssr.files[index].Index {
index = i
}
}
return index
}
// Read reads the current entry in the reader.
func (ssr *SnapshotsReader) Read(b []byte) (n int, err error) {
if ssr.curr == nil {
return 0, io.EOF
}
return ssr.curr.Read(b)
}
// OpenFileSnapshotsReader returns a SnapshotsReader based on the path of the base snapshot.
// Returns the underlying files which need to be closed separately.
func OpenFileSnapshotsReader(path string) (*SnapshotsReader, []io.Closer, error) {
var readers []io.Reader
var closers []io.Closer
if err := func() error {
// Open original snapshot file.
f, err := os.Open(path)
if os.IsNotExist(err) {
return err
} else if err != nil {
return fmt.Errorf("open snapshot: %s", err)
}
readers = append(readers, f)
closers = append(closers, f)
// Open all incremental snapshots.
for i := 0; ; i++ {
filename := path + fmt.Sprintf(".%d", i)
f, err := os.Open(filename)
if os.IsNotExist(err) {
break
} else if err != nil {
return fmt.Errorf("open incremental snapshot: file=%s, err=%s", filename, err)
}
readers = append(readers, f)
closers = append(closers, f)
}
return nil
}(); err != nil {
closeAll(closers)
return nil, nil, err
}
return NewSnapshotsReader(readers...), nil, nil
}
// ReadFileSnapshot returns a Snapshot for a given base snapshot path.
// This snapshot merges all incremental backup snapshots as well.
func ReadFileSnapshot(path string) (*Snapshot, error) {
// Open a multi-snapshot reader.
ssr, files, err := OpenFileSnapshotsReader(path)
if os.IsNotExist(err) {
return nil, err
} else if err != nil {
return nil, fmt.Errorf("open file snapshots reader: %s", err)
}
defer closeAll(files)
// Read snapshot.
ss, err := ssr.Snapshot()
if err != nil {
return nil, fmt.Errorf("snapshot: %s", err)
}
return ss, nil
}
func closeAll(a []io.Closer) {
for _, c := range a {
_ = c.Close()
}
}
// SnapshotWriter writes a snapshot and the underlying files to disk as a tar archive.
type SnapshotWriter struct {
// The snapshot to write from.
@ -198,6 +452,10 @@ func (sw *SnapshotWriter) WriteTo(w io.Writer) (n int64, err error) {
// Close any file writers that aren't required.
sw.closeUnusedWriters()
// Sort snapshot files.
// This is required for combining multiple snapshots together.
sort.Sort(SnapshotFiles(sw.Snapshot.Files))
// Begin writing a tar file to the output.
tw := tar.NewWriter(w)
defer tw.Close()

View File

@ -76,6 +76,44 @@ func TestSnapshot_Diff(t *testing.T) {
}
}
// Ensure a snapshot can be merged so that the newest files from the two snapshots are returned.
func TestSnapshot_Merge(t *testing.T) {
for i, tt := range []struct {
s *influxdb.Snapshot
other *influxdb.Snapshot
result *influxdb.Snapshot
}{
// 0. Mixed higher, lower, equal indices.
{
s: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{
{Name: "a", Size: 10, Index: 1},
{Name: "b", Size: 10, Index: 10}, // keep: same, first
{Name: "c", Size: 10, Index: 21}, // keep: higher
{Name: "e", Size: 10, Index: 15}, // keep: higher
}},
other: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{
{Name: "a", Size: 20, Index: 2}, // keep: higher
{Name: "b", Size: 20, Index: 10},
{Name: "c", Size: 20, Index: 11},
{Name: "d", Size: 20, Index: 14}, // keep: new
{Name: "e", Size: 20, Index: 12},
}},
result: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{
{Name: "a", Size: 20, Index: 2},
{Name: "b", Size: 10, Index: 10},
{Name: "c", Size: 10, Index: 21},
{Name: "d", Size: 20, Index: 14},
{Name: "e", Size: 10, Index: 15},
}},
},
} {
result := tt.s.Merge(tt.other)
if !reflect.DeepEqual(tt.result, result) {
t.Errorf("%d. mismatch:\n\nexp=%#v\n\ngot=%#v", i, tt.result, result)
}
}
}
// Ensure a snapshot writer can write a set of files to an archive
func TestSnapshotWriter(t *testing.T) {
// Create a new writer with a snapshot and file writers.
@ -163,6 +201,75 @@ func TestSnapshotWriter_CloseUnused(t *testing.T) {
}
}
// Ensure a SnapshotsReader can read from multiple snapshots.
func TestSnapshotsReader(t *testing.T) {
var sw *influxdb.SnapshotWriter
bufs := make([]bytes.Buffer, 2)
// Snapshot #1
sw = influxdb.NewSnapshotWriter()
sw.Snapshot.Files = []influxdb.SnapshotFile{
{Name: "meta", Size: 3, Index: 12},
{Name: "shards/1", Size: 5, Index: 15},
}
sw.FileWriters["meta"] = &bufCloser{Buffer: *bytes.NewBufferString("foo")}
sw.FileWriters["shards/1"] = &bufCloser{Buffer: *bytes.NewBufferString("55555")}
if _, err := sw.WriteTo(&bufs[0]); err != nil {
t.Fatal(err)
} else if err = sw.Close(); err != nil {
t.Fatal(err)
}
// Snapshot #2
sw = influxdb.NewSnapshotWriter()
sw.Snapshot.Files = []influxdb.SnapshotFile{
{Name: "meta", Size: 3, Index: 20},
{Name: "shards/2", Size: 6, Index: 30},
}
sw.FileWriters["meta"] = &bufCloser{Buffer: *bytes.NewBufferString("bar")}
sw.FileWriters["shards/2"] = &bufCloser{Buffer: *bytes.NewBufferString("666666")}
if _, err := sw.WriteTo(&bufs[1]); err != nil {
t.Fatal(err)
} else if err = sw.Close(); err != nil {
t.Fatal(err)
}
// Read and merge snapshots.
ssr := influxdb.NewSnapshotsReader(&bufs[0], &bufs[1])
// Next should be the second meta file.
if f, err := ssr.Next(); err != nil {
t.Fatalf("unexpected error(meta): %s", err)
} else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "meta", Size: 3, Index: 20}) {
t.Fatalf("file mismatch(meta): %#v", f)
} else if b := MustReadAll(ssr); string(b) != `bar` {
t.Fatalf("unexpected file(meta): %s", b)
}
// Next should be shards/1.
if f, err := ssr.Next(); err != nil {
t.Fatalf("unexpected error(shards/1): %s", err)
} else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "shards/1", Size: 5, Index: 15}) {
t.Fatalf("file mismatch(shards/1): %#v", f)
} else if b := MustReadAll(ssr); string(b) != `55555` {
t.Fatalf("unexpected file(shards/1): %s", b)
}
// Next should be shards/2.
if f, err := ssr.Next(); err != nil {
t.Fatalf("unexpected error(shards/2): %s", err)
} else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "shards/2", Size: 6, Index: 30}) {
t.Fatalf("file mismatch(shards/2): %#v", f)
} else if b := MustReadAll(ssr); string(b) != `666666` {
t.Fatalf("unexpected file(shards/2): %s", b)
}
// Check for end of snapshot.
if _, err := ssr.Next(); err != io.EOF {
t.Fatalf("expected EOF: %s", err)
}
}
// bufCloser adds a Close() method to a bytes.Buffer
type bufCloser struct {
bytes.Buffer