232 lines
5.4 KiB
Go
232 lines
5.4 KiB
Go
package backup_util
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync/atomic"
|
|
|
|
internal "github.com/influxdata/influxdb/cmd/influxd/backup_util/internal"
|
|
errors2 "github.com/influxdata/influxdb/pkg/errors"
|
|
"github.com/influxdata/influxdb/services/snapshotter"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
//go:generate protoc --go_out=./internal internal/backup_util.proto
|
|
|
|
const (
|
|
// Suffix is a suffix added to the backup while it's in-process.
|
|
Suffix = ".pending"
|
|
|
|
// Metafile is the base name given to the metastore backups.
|
|
Metafile = "meta"
|
|
|
|
// BackupFilePattern is the beginning of the pattern for a backup
|
|
// file. They follow the scheme <database>.<retention>.<shardID>.<increment>
|
|
BackupFilePattern = "%s.%s.%05d"
|
|
|
|
PortableFileNamePattern = "20060102T150405Z"
|
|
)
|
|
|
|
type PortablePacker struct {
|
|
Data []byte
|
|
MaxNodeID uint64
|
|
}
|
|
|
|
func (ep PortablePacker) MarshalBinary() ([]byte, error) {
|
|
ed := internal.PortableData{Data: ep.Data, MaxNodeID: &ep.MaxNodeID}
|
|
return proto.Marshal(&ed)
|
|
}
|
|
|
|
func (ep *PortablePacker) UnmarshalBinary(data []byte) error {
|
|
var pb internal.PortableData
|
|
if err := proto.Unmarshal(data, &pb); err != nil {
|
|
return err
|
|
}
|
|
|
|
ep.Data = pb.GetData()
|
|
ep.MaxNodeID = pb.GetMaxNodeID()
|
|
return nil
|
|
}
|
|
|
|
func GetMetaBytes(fname string) (_ []byte, retErr error) {
|
|
f, err := os.Open(fname)
|
|
if err != nil {
|
|
return []byte{}, err
|
|
}
|
|
defer errors2.Capture(&retErr, f.Close)()
|
|
|
|
var buf bytes.Buffer
|
|
if _, err := io.Copy(&buf, f); err != nil {
|
|
return []byte{}, fmt.Errorf("copy: %s", err)
|
|
}
|
|
|
|
b := buf.Bytes()
|
|
var i int
|
|
|
|
// Make sure the file is actually a meta store backup file
|
|
magic := binary.BigEndian.Uint64(b[:8])
|
|
if magic != snapshotter.BackupMagicHeader {
|
|
return []byte{}, fmt.Errorf("invalid metadata file")
|
|
}
|
|
i += 8
|
|
|
|
// Size of the meta store bytes
|
|
length := int(binary.BigEndian.Uint64(b[i : i+8]))
|
|
i += 8
|
|
metaBytes := b[i : i+length]
|
|
|
|
return metaBytes, nil
|
|
}
|
|
|
|
// Manifest lists the meta and shard file information contained in the backup.
|
|
// If Limited is false, the manifest contains a full backup, otherwise
|
|
// it is a partial backup.
|
|
type Manifest struct {
|
|
Meta MetaEntry `json:"meta"`
|
|
Limited bool `json:"limited"`
|
|
Files []Entry `json:"files"`
|
|
|
|
// If limited is true, then one (or all) of the following fields will be set
|
|
|
|
Database string `json:"database,omitempty"`
|
|
Policy string `json:"policy,omitempty"`
|
|
ShardID uint64 `json:"shard_id,omitempty"`
|
|
}
|
|
|
|
// Entry contains the data information for a backed up shard.
|
|
type Entry struct {
|
|
Database string `json:"database"`
|
|
Policy string `json:"policy"`
|
|
ShardID uint64 `json:"shardID"`
|
|
FileName string `json:"fileName"`
|
|
Size int64 `json:"size"`
|
|
LastModified int64 `json:"lastModified"`
|
|
}
|
|
|
|
func (e *Entry) SizeOrZero() int64 {
|
|
if e == nil {
|
|
return 0
|
|
}
|
|
return e.Size
|
|
}
|
|
|
|
// MetaEntry contains the meta store information for a backup.
|
|
type MetaEntry struct {
|
|
FileName string `json:"fileName"`
|
|
Size int64 `json:"size"`
|
|
}
|
|
|
|
// Size returns the size of the manifest.
|
|
func (m *Manifest) Size() int64 {
|
|
if m == nil {
|
|
return 0
|
|
}
|
|
|
|
size := m.Meta.Size
|
|
|
|
for _, f := range m.Files {
|
|
size += f.Size
|
|
}
|
|
return size
|
|
}
|
|
|
|
func (manifest *Manifest) Save(filename string) error {
|
|
b, err := json.MarshalIndent(manifest, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("create manifest: %v", err)
|
|
}
|
|
|
|
return os.WriteFile(filename, b, 0600)
|
|
}
|
|
|
|
// LoadIncremental loads multiple manifest files from a given directory.
|
|
func LoadIncremental(dir string) (*MetaEntry, map[uint64]*Entry, error) {
|
|
manifests, err := filepath.Glob(filepath.Join(dir, "*.manifest"))
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
shards := make(map[uint64]*Entry)
|
|
|
|
if len(manifests) == 0 {
|
|
return nil, shards, nil
|
|
}
|
|
|
|
sort.Sort(sort.Reverse(sort.StringSlice(manifests)))
|
|
var metaEntry MetaEntry
|
|
|
|
for _, fileName := range manifests {
|
|
fi, err := os.Stat(fileName)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if fi.IsDir() {
|
|
continue
|
|
}
|
|
|
|
f, err := os.Open(fileName)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
var manifest Manifest
|
|
err = json.NewDecoder(f).Decode(&manifest)
|
|
f.Close()
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("read manifest: %v", err)
|
|
}
|
|
|
|
// sorted (descending) above, so first manifest is most recent
|
|
if metaEntry.FileName == "" {
|
|
metaEntry = manifest.Meta
|
|
}
|
|
|
|
for i := range manifest.Files {
|
|
sh := manifest.Files[i]
|
|
if _, err := os.Stat(filepath.Join(dir, sh.FileName)); err != nil {
|
|
continue
|
|
}
|
|
|
|
e := shards[sh.ShardID]
|
|
if e == nil || sh.LastModified > e.LastModified {
|
|
shards[sh.ShardID] = &sh
|
|
}
|
|
}
|
|
}
|
|
|
|
return &metaEntry, shards, nil
|
|
}
|
|
|
|
type CountingWriter struct {
|
|
io.Writer
|
|
Total int64 // Total # of bytes transferred
|
|
}
|
|
|
|
func (w *CountingWriter) Write(p []byte) (n int, err error) {
|
|
n, err = w.Writer.Write(p)
|
|
atomic.AddInt64(&w.Total, int64(n))
|
|
return
|
|
}
|
|
|
|
func (w *CountingWriter) BytesWritten() int64 {
|
|
return atomic.LoadInt64(&w.Total)
|
|
}
|
|
|
|
// retentionAndShardFromPath will take the shard relative path and split it into the
|
|
// retention policy name and shard ID. The first part of the path should be the database name.
|
|
func DBRetentionAndShardFromPath(path string) (db, retention, shard string, err error) {
|
|
a := strings.Split(path, string(filepath.Separator))
|
|
if len(a) != 3 {
|
|
return "", "", "", fmt.Errorf("expected database, retention policy, and shard id in path: %s", path)
|
|
}
|
|
|
|
return a[0], a[1], a[2], nil
|
|
}
|