influxdb/services/snapshotter/client.go

214 lines
5.2 KiB
Go

package snapshotter
import (
"archive/tar"
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"path/filepath"
"strconv"
"strings"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tcp"
)
// Client provides an API for the snapshotter service.
type Client struct {
host string
}
// NewClient returns a new *Client.
func NewClient(host string) *Client {
return &Client{host: host}
}
// UpdateMeta takes a request object, writes a Base64 encoding to the tcp connection, and then sends the request to the snapshotter service.
// returns a mapping of the uploaded metadata shardID's to actual shardID's on the destination system.
func (c *Client) UpdateMeta(req *Request, upStream io.Reader) (map[uint64]uint64, error) {
var err error
// Connect to snapshotter service.
conn, err := tcp.Dial("tcp", c.host, MuxHeader)
if err != nil {
return nil, err
}
defer conn.Close()
if _, err := conn.Write([]byte{byte(req.Type)}); err != nil {
return nil, err
}
if err := json.NewEncoder(conn).Encode(req); err != nil {
return nil, fmt.Errorf("encode snapshot request: %s", err)
}
if n, err := io.Copy(conn, upStream); (err != nil && err != io.EOF) || n != req.UploadSize {
return nil, fmt.Errorf("error uploading file: err=%v, n=%d, uploadSize: %d", err, n, req.UploadSize)
}
resp, err := io.ReadAll(conn)
if err != nil || len(resp) == 0 {
return nil, fmt.Errorf("updating metadata on influxd service failed: err=%v, n=%d", err, len(resp))
}
if len(resp) < 16 {
return nil, fmt.Errorf("response too short to be a metadata update response: %d", len(resp))
}
header, npairs, err := decodeUintPair(resp[:16])
if err != nil {
return nil, err
}
if npairs == 0 {
return nil, fmt.Errorf("DB metadata not changed. database may already exist")
}
pairs := resp[16:]
if header != BackupMagicHeader {
return nil, fmt.Errorf("Response did not contain the proper header tag")
}
if uint64(len(pairs)) != npairs*16 {
return nil, fmt.Errorf("expected an even number of integer pairs in update meta repsonse")
}
shardIDMap := make(map[uint64]uint64)
for i := 0; i < int(npairs); i++ {
offset := i * 16
k, v, err := decodeUintPair(pairs[offset : offset+16])
if err != nil {
return nil, err
}
shardIDMap[k] = v
}
return shardIDMap, nil
}
func decodeUintPair(bits []byte) (uint64, uint64, error) {
if len(bits) != 16 {
return 0, 0, errors.New("slice must have exactly 16 bytes")
}
v1 := binary.BigEndian.Uint64(bits[:8])
v2 := binary.BigEndian.Uint64(bits[8:16])
return v1, v2, nil
}
// UploadShard writes a snapshot of the data to disk at the location specified by destinationDatabase and restoreRetention
func (c *Client) UploadShard(shardID, newShardID uint64, destinationDatabase, restoreRetention string, tr *tar.Reader) error {
conn, err := tcp.Dial("tcp", c.host, MuxHeader)
if err != nil {
return err
}
defer conn.Close()
var shardBytes [9]byte
shardBytes[0] = byte(RequestShardUpdate)
binary.BigEndian.PutUint64(shardBytes[1:], newShardID)
if _, err := conn.Write(shardBytes[:]); err != nil {
return err
}
tw := tar.NewWriter(conn)
defer tw.Close()
for {
hdr, err := tr.Next()
if err == io.EOF {
break
} else if err != nil {
return err
}
names := strings.Split(filepath.FromSlash(hdr.Name), string(filepath.Separator))
if len(names) < 4 {
return fmt.Errorf("error parsing file name from shard tarfile: %s", hdr.Name)
}
if destinationDatabase == "" {
destinationDatabase = names[0]
}
if restoreRetention == "" {
restoreRetention = names[1]
}
filepathArgs := []string{destinationDatabase, restoreRetention, strconv.FormatUint(newShardID, 10)}
filepathArgs = append(filepathArgs, names[3:]...)
hdr.Name = filepath.ToSlash(filepath.Join(filepathArgs...))
if err := tw.WriteHeader(hdr); err != nil {
return err
}
if _, err := io.Copy(tw, tr); err != nil {
return err
}
}
return nil
}
// MetastoreBackup returns a snapshot of the meta store.
func (c *Client) MetastoreBackup() (*meta.Data, error) {
req := &Request{
Type: RequestMetastoreBackup,
}
b, err := c.doRequest(req)
if err != nil {
return nil, err
}
// Check the magic.
magic := binary.BigEndian.Uint64(b[:8])
if magic != BackupMagicHeader {
return nil, errors.New("invalid metadata received")
}
i := 8
// Size of the meta store bytes.
length := int(binary.BigEndian.Uint64(b[i : i+8]))
i += 8
metaBytes := b[i : i+length]
// Unpack meta data.
var data meta.Data
if err := data.UnmarshalBinary(metaBytes); err != nil {
return nil, fmt.Errorf("unmarshal: %s", err)
}
return &data, nil
}
// doRequest sends a request to the snapshotter service and returns the result.
func (c *Client) doRequest(req *Request) ([]byte, error) {
// Connect to snapshotter service.
conn, err := tcp.Dial("tcp", c.host, MuxHeader)
if err != nil {
return nil, err
}
defer conn.Close()
// Write the request
_, err = conn.Write([]byte{byte(req.Type)})
if err != nil {
return nil, err
}
if err := json.NewEncoder(conn).Encode(req); err != nil {
return nil, fmt.Errorf("encode snapshot request: %s", err)
}
// Read snapshot from the connection
var buf bytes.Buffer
_, err = io.Copy(&buf, conn)
return buf.Bytes(), err
}