214 lines
5.2 KiB
Go
214 lines
5.2 KiB
Go
package snapshotter
|
|
|
|
import (
|
|
"archive/tar"
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/influxdata/influxdb/services/meta"
|
|
"github.com/influxdata/influxdb/tcp"
|
|
)
|
|
|
|
// Client provides an API for the snapshotter service.
|
|
type Client struct {
|
|
host string
|
|
}
|
|
|
|
// NewClient returns a new *Client.
|
|
func NewClient(host string) *Client {
|
|
return &Client{host: host}
|
|
}
|
|
|
|
// UpdateMeta takes a request object, writes a Base64 encoding to the tcp connection, and then sends the request to the snapshotter service.
|
|
// returns a mapping of the uploaded metadata shardID's to actual shardID's on the destination system.
|
|
func (c *Client) UpdateMeta(req *Request, upStream io.Reader) (map[uint64]uint64, error) {
|
|
var err error
|
|
|
|
// Connect to snapshotter service.
|
|
conn, err := tcp.Dial("tcp", c.host, MuxHeader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer conn.Close()
|
|
|
|
if _, err := conn.Write([]byte{byte(req.Type)}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := json.NewEncoder(conn).Encode(req); err != nil {
|
|
return nil, fmt.Errorf("encode snapshot request: %s", err)
|
|
}
|
|
|
|
if n, err := io.Copy(conn, upStream); (err != nil && err != io.EOF) || n != req.UploadSize {
|
|
return nil, fmt.Errorf("error uploading file: err=%v, n=%d, uploadSize: %d", err, n, req.UploadSize)
|
|
}
|
|
|
|
resp, err := io.ReadAll(conn)
|
|
if err != nil || len(resp) == 0 {
|
|
return nil, fmt.Errorf("updating metadata on influxd service failed: err=%v, n=%d", err, len(resp))
|
|
}
|
|
|
|
if len(resp) < 16 {
|
|
return nil, fmt.Errorf("response too short to be a metadata update response: %d", len(resp))
|
|
}
|
|
header, npairs, err := decodeUintPair(resp[:16])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if npairs == 0 {
|
|
return nil, fmt.Errorf("DB metadata not changed. database may already exist")
|
|
}
|
|
|
|
pairs := resp[16:]
|
|
|
|
if header != BackupMagicHeader {
|
|
return nil, fmt.Errorf("Response did not contain the proper header tag")
|
|
}
|
|
|
|
if uint64(len(pairs)) != npairs*16 {
|
|
return nil, fmt.Errorf("expected an even number of integer pairs in update meta repsonse")
|
|
}
|
|
|
|
shardIDMap := make(map[uint64]uint64)
|
|
for i := 0; i < int(npairs); i++ {
|
|
offset := i * 16
|
|
k, v, err := decodeUintPair(pairs[offset : offset+16])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
shardIDMap[k] = v
|
|
}
|
|
|
|
return shardIDMap, nil
|
|
}
|
|
|
|
func decodeUintPair(bits []byte) (uint64, uint64, error) {
|
|
if len(bits) != 16 {
|
|
return 0, 0, errors.New("slice must have exactly 16 bytes")
|
|
}
|
|
v1 := binary.BigEndian.Uint64(bits[:8])
|
|
v2 := binary.BigEndian.Uint64(bits[8:16])
|
|
return v1, v2, nil
|
|
}
|
|
|
|
// UploadShard writes a snapshot of the data to disk at the location specified by destinationDatabase and restoreRetention
|
|
func (c *Client) UploadShard(shardID, newShardID uint64, destinationDatabase, restoreRetention string, tr *tar.Reader) error {
|
|
conn, err := tcp.Dial("tcp", c.host, MuxHeader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
|
|
var shardBytes [9]byte
|
|
shardBytes[0] = byte(RequestShardUpdate)
|
|
binary.BigEndian.PutUint64(shardBytes[1:], newShardID)
|
|
if _, err := conn.Write(shardBytes[:]); err != nil {
|
|
return err
|
|
}
|
|
|
|
tw := tar.NewWriter(conn)
|
|
defer tw.Close()
|
|
|
|
for {
|
|
hdr, err := tr.Next()
|
|
if err == io.EOF {
|
|
break
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
names := strings.Split(filepath.FromSlash(hdr.Name), string(filepath.Separator))
|
|
|
|
if len(names) < 4 {
|
|
return fmt.Errorf("error parsing file name from shard tarfile: %s", hdr.Name)
|
|
}
|
|
|
|
if destinationDatabase == "" {
|
|
destinationDatabase = names[0]
|
|
}
|
|
|
|
if restoreRetention == "" {
|
|
restoreRetention = names[1]
|
|
}
|
|
|
|
filepathArgs := []string{destinationDatabase, restoreRetention, strconv.FormatUint(newShardID, 10)}
|
|
filepathArgs = append(filepathArgs, names[3:]...)
|
|
hdr.Name = filepath.ToSlash(filepath.Join(filepathArgs...))
|
|
|
|
if err := tw.WriteHeader(hdr); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := io.Copy(tw, tr); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// MetastoreBackup returns a snapshot of the meta store.
|
|
func (c *Client) MetastoreBackup() (*meta.Data, error) {
|
|
req := &Request{
|
|
Type: RequestMetastoreBackup,
|
|
}
|
|
|
|
b, err := c.doRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check the magic.
|
|
magic := binary.BigEndian.Uint64(b[:8])
|
|
if magic != BackupMagicHeader {
|
|
return nil, errors.New("invalid metadata received")
|
|
}
|
|
i := 8
|
|
|
|
// Size of the meta store bytes.
|
|
length := int(binary.BigEndian.Uint64(b[i : i+8]))
|
|
i += 8
|
|
metaBytes := b[i : i+length]
|
|
|
|
// Unpack meta data.
|
|
var data meta.Data
|
|
if err := data.UnmarshalBinary(metaBytes); err != nil {
|
|
return nil, fmt.Errorf("unmarshal: %s", err)
|
|
}
|
|
|
|
return &data, nil
|
|
}
|
|
|
|
// doRequest sends a request to the snapshotter service and returns the result.
|
|
func (c *Client) doRequest(req *Request) ([]byte, error) {
|
|
// Connect to snapshotter service.
|
|
conn, err := tcp.Dial("tcp", c.host, MuxHeader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer conn.Close()
|
|
|
|
// Write the request
|
|
_, err = conn.Write([]byte{byte(req.Type)})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := json.NewEncoder(conn).Encode(req); err != nil {
|
|
return nil, fmt.Errorf("encode snapshot request: %s", err)
|
|
}
|
|
|
|
// Read snapshot from the connection
|
|
var buf bytes.Buffer
|
|
_, err = io.Copy(&buf, conn)
|
|
|
|
return buf.Bytes(), err
|
|
}
|