influxdb/http/restore_service.go

396 lines
11 KiB
Go

package http
import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"time"
"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorizer"
context2 "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/kit/tracing"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"go.uber.org/zap"
)
// RestoreBackend is all services and associated parameters required to construct the RestoreHandler.
type RestoreBackend struct {
Logger *zap.Logger
errors.HTTPErrorHandler
RestoreService influxdb.RestoreService
SqlBackupRestoreService influxdb.SqlBackupRestoreService
BucketService influxdb.BucketService
AuthorizationService influxdb.AuthorizationService
}
// NewRestoreBackend returns a new instance of RestoreBackend.
func NewRestoreBackend(b *APIBackend) *RestoreBackend {
return &RestoreBackend{
Logger: b.Logger.With(zap.String("handler", "restore")),
HTTPErrorHandler: b.HTTPErrorHandler,
RestoreService: b.RestoreService,
SqlBackupRestoreService: b.SqlBackupRestoreService,
BucketService: b.BucketService,
AuthorizationService: b.AuthorizationService,
}
}
// RestoreHandler is http handler for restore service.
type RestoreHandler struct {
*httprouter.Router
api *kithttp.API
errors.HTTPErrorHandler
Logger *zap.Logger
RestoreService influxdb.RestoreService
SqlBackupRestoreService influxdb.SqlBackupRestoreService
BucketService influxdb.BucketService
AuthorizationService influxdb.AuthorizationService
}
const (
prefixRestore = "/api/v2/restore"
restoreKVPath = prefixRestore + "/kv"
restoreSqlPath = prefixRestore + "/sql"
restoreShardPath = prefixRestore + "/shards/:shardID"
restoreBucketPath = prefixRestore + "/buckets/:bucketID" // Deprecated. Used by 2.0.x clients.
restoreBucketMetadataDeprecatedPath = prefixRestore + "/bucket-metadata" // Deprecated. Used by 2.1.0 of the CLI
restoreBucketMetadataPath = prefixRestore + "/bucketMetadata"
)
// NewRestoreHandler creates a new handler at /api/v2/restore to receive restore requests.
func NewRestoreHandler(b *RestoreBackend) *RestoreHandler {
h := &RestoreHandler{
HTTPErrorHandler: b.HTTPErrorHandler,
Router: NewRouter(b.HTTPErrorHandler),
Logger: b.Logger,
RestoreService: b.RestoreService,
SqlBackupRestoreService: b.SqlBackupRestoreService,
BucketService: b.BucketService,
AuthorizationService: b.AuthorizationService,
api: kithttp.NewAPI(kithttp.WithLog(b.Logger)),
}
h.HandlerFunc(http.MethodPost, restoreKVPath, h.handleRestoreKVStore)
h.HandlerFunc(http.MethodPost, restoreSqlPath, h.handleRestoreSqlStore)
h.HandlerFunc(http.MethodPost, restoreBucketPath, h.handleRestoreBucket)
h.HandlerFunc(http.MethodPost, restoreBucketMetadataDeprecatedPath, h.handleRestoreBucketMetadata)
h.HandlerFunc(http.MethodPost, restoreBucketMetadataPath, h.handleRestoreBucketMetadata)
h.HandlerFunc(http.MethodPost, restoreShardPath, h.handleRestoreShard)
return h
}
func (h *RestoreHandler) getOperatorToken(ctx context.Context) (influxdb.Authorization, error) {
// Get the token post-restore
auths, _, err := h.AuthorizationService.FindAuthorizations(ctx, influxdb.AuthorizationFilter{})
if err != nil {
return influxdb.Authorization{}, err
}
var operToken *influxdb.Authorization
for _, a := range auths {
authCtx := context.Background()
authCtx = context2.SetAuthorizer(authCtx, a)
if authorizer.IsAllowedAll(authCtx, influxdb.OperPermissions()) == nil {
operToken = a
break
}
}
if operToken == nil {
return influxdb.Authorization{}, fmt.Errorf("invalid backup without an operator token, consider editing the BoltDB in the backup with 'influxd recovery'")
}
return *operToken, nil
}
func (h *RestoreHandler) handleRestoreKVStore(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreKVStore")
defer span.Finish()
ctx := r.Context()
var kvBytes io.Reader = r.Body
if r.Header.Get("Content-Encoding") == "gzip" {
gzr, err := gzip.NewReader(kvBytes)
if err != nil {
err = &errors.Error{
Code: errors.EInvalid,
Msg: "failed to decode gzip request body",
Err: err,
}
h.HandleHTTPError(ctx, err, w)
}
defer gzr.Close()
kvBytes = gzr
}
if err := h.RestoreService.RestoreKVStore(ctx, kvBytes); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
// Get the token post-restore
operatorToken, err := h.getOperatorToken(ctx)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
// Return the new token to the caller so it can continue the restore
response := make(map[string]string)
response["token"] = operatorToken.Token
h.api.Respond(w, r, http.StatusOK, response)
}
func (h *RestoreHandler) handleRestoreSqlStore(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreSqlStore")
defer span.Finish()
ctx := r.Context()
var sqlBytes io.Reader = r.Body
if r.Header.Get("Content-Encoding") == "gzip" {
gzr, err := gzip.NewReader(sqlBytes)
if err != nil {
err = &errors.Error{
Code: errors.EInvalid,
Msg: "failed to decode gzip request body",
Err: err,
}
h.HandleHTTPError(ctx, err, w)
}
defer gzr.Close()
sqlBytes = gzr
}
if err := h.SqlBackupRestoreService.RestoreSqlStore(ctx, sqlBytes); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}
func (h *RestoreHandler) handleRestoreBucket(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreBucket")
defer span.Finish()
ctx := r.Context()
// Read bucket ID.
bucketID, err := decodeIDFromCtx(r.Context(), "bucketID")
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
// Read serialized DBI data.
buf, err := io.ReadAll(r.Body)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
shardIDMap, err := h.RestoreService.RestoreBucket(ctx, bucketID, buf)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err := json.NewEncoder(w).Encode(shardIDMap); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}
func (h *RestoreHandler) handleRestoreBucketMetadata(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreBucketMetadata")
defer span.Finish()
ctx := r.Context()
var b influxdb.BucketMetadataManifest
if err := h.api.DecodeJSON(r.Body, &b); err != nil {
h.api.Err(w, r, err)
return
}
// Create the bucket - This will fail if the bucket already exists.
// TODO: Could we support restoring to an existing bucket?
var description string
if b.Description != nil {
description = *b.Description
}
var rp, sgd time.Duration
if len(b.RetentionPolicies) > 0 {
policy := b.RetentionPolicies[0]
rp = policy.Duration
sgd = policy.ShardGroupDuration
}
bkt := influxdb.Bucket{
OrgID: b.OrganizationID,
Name: b.BucketName,
Description: description,
RetentionPeriod: rp,
ShardGroupDuration: sgd,
}
if err := h.BucketService.CreateBucket(ctx, &bkt); err != nil {
h.api.Err(w, r, err)
return
}
// Restore shard-level metadata for the new bucket.
// TODO: It's silly to marshal the DBI into binary here only to unmarshal it again within
// the RestoreService, but it's the easiest way to share code with the 2.0.x restore API
// and avoid introducing a circular dependency on the `meta` package.
// When we reach a point where we feel comfortable deleting the 2.0.x endpoints, consider
// refactoring this to pass a struct directly instead of the marshalled bytes.
dbi := manifestToDbInfo(b)
rawDbi, err := dbi.MarshalBinary()
if err != nil {
h.api.Err(w, r, err)
return
}
shardIDMap, err := h.RestoreService.RestoreBucket(ctx, bkt.ID, rawDbi)
if err != nil {
h.Logger.Warn("Cleaning up after failed bucket-restore", zap.String("bucket_id", bkt.ID.String()))
if err2 := h.BucketService.DeleteBucket(ctx, bkt.ID); err2 != nil {
h.Logger.Error("Failed to clean up bucket after failed restore",
zap.String("bucket_id", bkt.ID.String()), zap.Error(err2))
}
h.api.Err(w, r, err)
return
}
res := influxdb.RestoredBucketMappings{
ID: bkt.ID,
Name: bkt.Name,
ShardMappings: make([]influxdb.RestoredShardMapping, 0, len(shardIDMap)),
}
for old, new := range shardIDMap {
res.ShardMappings = append(res.ShardMappings, influxdb.RestoredShardMapping{OldId: old, NewId: new})
}
h.api.Respond(w, r, http.StatusCreated, res)
}
func manifestToDbInfo(m influxdb.BucketMetadataManifest) meta.DatabaseInfo {
dbi := meta.DatabaseInfo{
Name: m.BucketName,
DefaultRetentionPolicy: m.DefaultRetentionPolicy,
RetentionPolicies: make([]meta.RetentionPolicyInfo, len(m.RetentionPolicies)),
}
for i, rp := range m.RetentionPolicies {
dbi.RetentionPolicies[i] = manifestToRpInfo(rp)
}
return dbi
}
func manifestToRpInfo(m influxdb.RetentionPolicyManifest) meta.RetentionPolicyInfo {
rpi := meta.RetentionPolicyInfo{
Name: m.Name,
ReplicaN: m.ReplicaN,
Duration: m.Duration,
ShardGroupDuration: m.ShardGroupDuration,
ShardGroups: make([]meta.ShardGroupInfo, len(m.ShardGroups)),
Subscriptions: make([]meta.SubscriptionInfo, len(m.Subscriptions)),
}
for i, sg := range m.ShardGroups {
rpi.ShardGroups[i] = manifestToSgInfo(sg)
}
for i, s := range m.Subscriptions {
rpi.Subscriptions[i] = meta.SubscriptionInfo{
Name: s.Name,
Mode: s.Mode,
Destinations: s.Destinations,
}
}
return rpi
}
func manifestToSgInfo(m influxdb.ShardGroupManifest) meta.ShardGroupInfo {
var delAt, truncAt time.Time
if m.DeletedAt != nil {
delAt = *m.DeletedAt
}
if m.TruncatedAt != nil {
truncAt = *m.TruncatedAt
}
sgi := meta.ShardGroupInfo{
ID: m.ID,
StartTime: m.StartTime,
EndTime: m.EndTime,
DeletedAt: delAt,
TruncatedAt: truncAt,
Shards: make([]meta.ShardInfo, len(m.Shards)),
}
for i, sh := range m.Shards {
sgi.Shards[i] = manifestToShardInfo(sh)
}
return sgi
}
func manifestToShardInfo(m influxdb.ShardManifest) meta.ShardInfo {
si := meta.ShardInfo{
ID: m.ID,
Owners: make([]meta.ShardOwner, len(m.ShardOwners)),
}
for i, so := range m.ShardOwners {
si.Owners[i] = meta.ShardOwner{NodeID: so.NodeID}
}
return si
}
func (h *RestoreHandler) handleRestoreShard(w http.ResponseWriter, r *http.Request) {
span, r := tracing.ExtractFromHTTPRequest(r, "RestoreHandler.handleRestoreShard")
defer span.Finish()
ctx := r.Context()
params := httprouter.ParamsFromContext(ctx)
shardID, err := strconv.ParseUint(params.ByName("shardID"), 10, 64)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
var tsmBytes io.Reader = r.Body
if r.Header.Get("Content-Encoding") == "gzip" {
gzr, err := gzip.NewReader(tsmBytes)
if err != nil {
err = &errors.Error{
Code: errors.EInvalid,
Msg: "failed to decode gzip request body",
Err: err,
}
h.HandleHTTPError(ctx, err, w)
}
defer gzr.Close()
tsmBytes = gzr
}
if err := h.RestoreService.RestoreShard(ctx, shardID, tsmBytes); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
}