Add 'ark backup logs' command for retrieval
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>pull/40/head
parent
9848a7a55b
commit
03dde45c09
|
@ -72,6 +72,21 @@ spec:
|
|||
plural: configs
|
||||
kind: Config
|
||||
|
||||
---
|
||||
apiVersion: apiextensions.k8s.io/v1beta1
|
||||
kind: CustomResourceDefinition
|
||||
metadata:
|
||||
name: downloadrequests.ark.heptio.com
|
||||
labels:
|
||||
component: ark
|
||||
spec:
|
||||
group: ark.heptio.com
|
||||
version: v1
|
||||
scope: Namespaced
|
||||
names:
|
||||
plural: downloadrequests
|
||||
kind: DownloadRequest
|
||||
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Namespace
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
Copyright 2017 Heptio Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1
|
||||
|
||||
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
// DownloadRequestSpec is the specification for a download request.
|
||||
type DownloadRequestSpec struct {
|
||||
// Target is what to download (e.g. logs for a backup).
|
||||
Target DownloadTarget `json:"target"`
|
||||
}
|
||||
|
||||
// DownloadTargetKind represents what type of file to download.
|
||||
type DownloadTargetKind string
|
||||
|
||||
const (
|
||||
DownloadTargetKindBackupLog DownloadTargetKind = "BackupLog"
|
||||
)
|
||||
|
||||
// DownloadTarget is the specification for what kind of file to download, and the name of the
|
||||
// resource with which it's associated.
|
||||
type DownloadTarget struct {
|
||||
// Kind is the type of file to download.
|
||||
Kind DownloadTargetKind `json:"kind"`
|
||||
// Name is the name of the kubernetes resource with which the file is associated.
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// DownloadRequestPhase represents the lifecycle phase of a DownloadRequest.
|
||||
type DownloadRequestPhase string
|
||||
|
||||
const (
|
||||
// DownloadRequestPhaseNew means the DownloadRequest has not been processed by the
|
||||
// DownloadRequestController yet.
|
||||
DownloadRequestPhaseNew DownloadRequestPhase = "New"
|
||||
// DownloadRequestPhaseProcessed means the DownloadRequest has been processed by the
|
||||
// DownloadRequestController.
|
||||
DownloadRequestPhaseProcessed DownloadRequestPhase = "Processed"
|
||||
)
|
||||
|
||||
// DownloadRequestStatus is the current status of a DownloadRequest.
|
||||
type DownloadRequestStatus struct {
|
||||
// Phase is the current state of the DownloadRequest.
|
||||
Phase DownloadRequestPhase `json:"phase"`
|
||||
// DownloadURL contains the pre-signed URL for the target file.
|
||||
DownloadURL string `json:"downloadURL"`
|
||||
// Expiration is when this DownloadRequest expires and can be deleted by the system.
|
||||
Expiration metav1.Time `json:"expiration"`
|
||||
}
|
||||
|
||||
// +genclient=true
|
||||
|
||||
// DownloadRequest is a request to download an artifact from backup object storage, such as a backup
|
||||
// log file.
|
||||
type DownloadRequest struct {
|
||||
metav1.TypeMeta `json:",inline"`
|
||||
metav1.ObjectMeta `json:"metadata"`
|
||||
|
||||
Spec DownloadRequestSpec `json:"spec"`
|
||||
Status DownloadRequestStatus `json:"status,omitempty"`
|
||||
}
|
||||
|
||||
// DownloadRequestList is a list of DownloadRequests.
|
||||
type DownloadRequestList struct {
|
||||
metav1.TypeMeta `json:",inline"`
|
||||
metav1.ListMeta `json:"metadata"`
|
||||
Items []DownloadRequest `json:"items"`
|
||||
}
|
|
@ -51,6 +51,8 @@ func addKnownTypes(scheme *runtime.Scheme) error {
|
|||
&RestoreList{},
|
||||
&Config{},
|
||||
&ConfigList{},
|
||||
&DownloadRequest{},
|
||||
&DownloadRequestList{},
|
||||
)
|
||||
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
|
||||
return nil
|
||||
|
|
|
@ -19,10 +19,12 @@ package aws
|
|||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/endpoints"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/golang/glog"
|
||||
|
||||
"github.com/heptio/ark/pkg/cloudprovider"
|
||||
)
|
||||
|
@ -131,3 +133,13 @@ func (op *objectStorageAdapter) DeleteObject(bucket string, key string) error {
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
func (op *objectStorageAdapter) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) {
|
||||
glog.V(4).Infof("CreateSignedURL: bucket=%s, key=%s, ttl=%d", bucket, key, ttl)
|
||||
req, _ := op.s3.GetObjectRequest(&s3.GetObjectInput{
|
||||
Bucket: aws.String(bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
|
||||
return req.Presign(ttl)
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/storage"
|
||||
|
||||
|
@ -134,6 +135,22 @@ func (op *objectStorageAdapter) DeleteObject(bucket string, key string) error {
|
|||
return blob.Delete(nil)
|
||||
}
|
||||
|
||||
const sasURIReadPermission = "r"
|
||||
|
||||
func (op *objectStorageAdapter) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) {
|
||||
container, err := getContainerReference(op.blobClient, bucket)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
blob, err := getBlobReference(container, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return blob.GetSASURI(time.Now().Add(ttl), sasURIReadPermission)
|
||||
}
|
||||
|
||||
func getContainerReference(blobClient *storage.BlobStorageClient, bucket string) (*storage.Container, error) {
|
||||
container := blobClient.GetContainerReference(bucket)
|
||||
if container == nil {
|
||||
|
|
|
@ -53,6 +53,10 @@ type BackupService interface {
|
|||
|
||||
// GetBackup gets the specified api.Backup from the given bucket in object storage.
|
||||
GetBackup(bucket, name string) (*api.Backup, error)
|
||||
|
||||
// CreateBackupLogSignedURL creates a pre-signed URL that can be used to download a backup's log
|
||||
// file from object storage. The URL expires after ttl.
|
||||
CreateBackupLogSignedURL(bucket, backupName string, ttl time.Duration) (string, error)
|
||||
}
|
||||
|
||||
// BackupGetter knows how to list backups in object storage.
|
||||
|
@ -67,6 +71,18 @@ const (
|
|||
logFileFormatString = "%s/%s.log.gz"
|
||||
)
|
||||
|
||||
func getMetadataKey(backup string) string {
|
||||
return fmt.Sprintf(metadataFileFormatString, backup)
|
||||
}
|
||||
|
||||
func getBackupKey(backup string) string {
|
||||
return fmt.Sprintf(backupFileFormatString, backup, backup)
|
||||
}
|
||||
|
||||
func getBackupLogKey(backup string) string {
|
||||
return fmt.Sprintf(logFileFormatString, backup, backup)
|
||||
}
|
||||
|
||||
type backupService struct {
|
||||
objectStorage ObjectStorageAdapter
|
||||
decoder runtime.Decoder
|
||||
|
@ -85,15 +101,14 @@ func NewBackupService(objectStorage ObjectStorageAdapter) BackupService {
|
|||
|
||||
func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.ReadSeeker) error {
|
||||
// upload metadata file
|
||||
metadataKey := fmt.Sprintf(metadataFileFormatString, backupName)
|
||||
metadataKey := getMetadataKey(backupName)
|
||||
if err := br.objectStorage.PutObject(bucket, metadataKey, metadata); err != nil {
|
||||
// failure to upload metadata file is a hard-stop
|
||||
return err
|
||||
}
|
||||
|
||||
// upload tar file
|
||||
backupKey := fmt.Sprintf(backupFileFormatString, backupName, backupName)
|
||||
if err := br.objectStorage.PutObject(bucket, backupKey, backup); err != nil {
|
||||
if err := br.objectStorage.PutObject(bucket, getBackupKey(backupName), backup); err != nil {
|
||||
// try to delete the metadata file since the data upload failed
|
||||
deleteErr := br.objectStorage.DeleteObject(bucket, metadataKey)
|
||||
|
||||
|
@ -102,7 +117,7 @@ func (br *backupService) UploadBackup(bucket, backupName string, metadata, backu
|
|||
|
||||
// uploading log file is best-effort; if it fails, we log the error but call the overall upload a
|
||||
// success
|
||||
logKey := fmt.Sprintf(logFileFormatString, backupName, backupName)
|
||||
logKey := getBackupLogKey(backupName)
|
||||
if err := br.objectStorage.PutObject(bucket, logKey, log); err != nil {
|
||||
glog.Errorf("error uploading %s/%s: %v", bucket, logKey, err)
|
||||
}
|
||||
|
@ -111,7 +126,7 @@ func (br *backupService) UploadBackup(bucket, backupName string, metadata, backu
|
|||
}
|
||||
|
||||
func (br *backupService) DownloadBackup(bucket, backupName string) (io.ReadCloser, error) {
|
||||
return br.objectStorage.GetObject(bucket, fmt.Sprintf(backupFileFormatString, backupName, backupName))
|
||||
return br.objectStorage.GetObject(bucket, getBackupKey(backupName))
|
||||
}
|
||||
|
||||
func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) {
|
||||
|
@ -166,17 +181,21 @@ func (br *backupService) GetBackup(bucket, name string) (*api.Backup, error) {
|
|||
}
|
||||
|
||||
func (br *backupService) DeleteBackupFile(bucket, backupName string) error {
|
||||
key := fmt.Sprintf(backupFileFormatString, backupName, backupName)
|
||||
key := getBackupKey(backupName)
|
||||
glog.V(4).Infof("Trying to delete bucket=%s, key=%s", bucket, key)
|
||||
return br.objectStorage.DeleteObject(bucket, key)
|
||||
}
|
||||
|
||||
func (br *backupService) DeleteBackupMetadataFile(bucket, backupName string) error {
|
||||
key := fmt.Sprintf(metadataFileFormatString, backupName)
|
||||
key := getMetadataKey(backupName)
|
||||
glog.V(4).Infof("Trying to delete bucket=%s, key=%s", bucket, key)
|
||||
return br.objectStorage.DeleteObject(bucket, key)
|
||||
}
|
||||
|
||||
func (br *backupService) CreateBackupLogSignedURL(bucket, backupName string, ttl time.Duration) (string, error) {
|
||||
return br.objectStorage.CreateSignedURL(bucket, getBackupLogKey(backupName), ttl)
|
||||
}
|
||||
|
||||
// cachedBackupService wraps a real backup service with a cache for getting cloud backups.
|
||||
type cachedBackupService struct {
|
||||
BackupService
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"io/ioutil"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
|
@ -465,3 +466,7 @@ func (os *fakeObjectStorage) DeleteObject(bucket string, key string) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (os *fakeObjectStorage) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
|
|
@ -17,23 +17,29 @@ limitations under the License.
|
|||
package gcp
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/oauth2"
|
||||
"golang.org/x/oauth2/google"
|
||||
// TODO switch to using newstorage
|
||||
newstorage "cloud.google.com/go/storage"
|
||||
storage "google.golang.org/api/storage/v1"
|
||||
|
||||
"github.com/heptio/ark/pkg/cloudprovider"
|
||||
)
|
||||
|
||||
type objectStorageAdapter struct {
|
||||
gcs *storage.Service
|
||||
gcs *storage.Service
|
||||
googleAccessID string
|
||||
privateKey []byte
|
||||
}
|
||||
|
||||
var _ cloudprovider.ObjectStorageAdapter = &objectStorageAdapter{}
|
||||
|
||||
func NewObjectStorageAdapter() (cloudprovider.ObjectStorageAdapter, error) {
|
||||
func NewObjectStorageAdapter(googleAccessID string, privateKey []byte) (cloudprovider.ObjectStorageAdapter, error) {
|
||||
client, err := google.DefaultClient(oauth2.NoContext, storage.DevstorageReadWriteScope)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -45,7 +51,9 @@ func NewObjectStorageAdapter() (cloudprovider.ObjectStorageAdapter, error) {
|
|||
}
|
||||
|
||||
return &objectStorageAdapter{
|
||||
gcs: gcs,
|
||||
gcs: gcs,
|
||||
googleAccessID: googleAccessID,
|
||||
privateKey: privateKey,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -87,3 +95,19 @@ func (op *objectStorageAdapter) ListCommonPrefixes(bucket string, delimiter stri
|
|||
func (op *objectStorageAdapter) DeleteObject(bucket string, key string) error {
|
||||
return op.gcs.Objects.Delete(bucket, key).Do()
|
||||
}
|
||||
|
||||
func (op *objectStorageAdapter) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) {
|
||||
if op.googleAccessID == "" {
|
||||
return "", errors.New("unable to create a pre-signed URL - make sure GOOGLE_APPLICATION_CREDENTIALS points to a valid GCE service account file (missing email address)")
|
||||
}
|
||||
if len(op.privateKey) == 0 {
|
||||
return "", errors.New("unable to create a pre-signed URL - make sure GOOGLE_APPLICATION_CREDENTIALS points to a valid GCE service account file (missing private key)")
|
||||
}
|
||||
|
||||
return newstorage.SignedURL(bucket, key, &newstorage.SignedURLOptions{
|
||||
GoogleAccessID: op.googleAccessID,
|
||||
PrivateKey: op.privateKey,
|
||||
Method: "GET",
|
||||
Expires: time.Now().Add(ttl),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -16,7 +16,10 @@ limitations under the License.
|
|||
|
||||
package cloudprovider
|
||||
|
||||
import "io"
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ObjectStorageAdapter exposes basic object-storage operations required
|
||||
// by Ark.
|
||||
|
@ -37,6 +40,9 @@ type ObjectStorageAdapter interface {
|
|||
// DeleteObject removes object with the specified key from the given
|
||||
// bucket.
|
||||
DeleteObject(bucket string, key string) error
|
||||
|
||||
// CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl.
|
||||
CreateSignedURL(bucket, key string, ttl time.Duration) (string, error)
|
||||
}
|
||||
|
||||
// BlockStorageAdapter exposes basic block-storage operations required
|
||||
|
|
|
@ -32,6 +32,7 @@ func NewCommand(f client.Factory) *cobra.Command {
|
|||
c.AddCommand(
|
||||
NewCreateCommand(f),
|
||||
NewGetCommand(f),
|
||||
NewLogsCommand(f),
|
||||
|
||||
// Will implement describe later
|
||||
// NewDescribeCommand(f),
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
Copyright 2017 Heptio Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package backup
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/heptio/ark/pkg/apis/ark/v1"
|
||||
"github.com/heptio/ark/pkg/client"
|
||||
"github.com/heptio/ark/pkg/cmd"
|
||||
"github.com/heptio/ark/pkg/cmd/util/downloadrequest"
|
||||
)
|
||||
|
||||
func NewLogsCommand(f client.Factory) *cobra.Command {
|
||||
timeout := time.Minute
|
||||
|
||||
c := &cobra.Command{
|
||||
Use: "logs BACKUP",
|
||||
Short: "Get backup logs",
|
||||
Run: func(c *cobra.Command, args []string) {
|
||||
if len(args) != 1 {
|
||||
err := errors.New("backup name is required")
|
||||
cmd.CheckError(err)
|
||||
}
|
||||
|
||||
arkClient, err := f.Client()
|
||||
cmd.CheckError(err)
|
||||
|
||||
err = downloadrequest.Stream(arkClient.ArkV1(), args[0], v1.DownloadTargetKindBackupLog, os.Stdout, timeout)
|
||||
cmd.CheckError(err)
|
||||
},
|
||||
}
|
||||
|
||||
c.Flags().DurationVar(&timeout, "timeout", timeout, "how long to wait to receive logs")
|
||||
|
||||
return c
|
||||
}
|
|
@ -19,10 +19,14 @@ package server
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/oauth2/google"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
|
@ -305,7 +309,27 @@ func getObjectStorageProvider(cloudConfig api.CloudProviderConfig, field string)
|
|||
cloudConfig.AWS.KMSKeyID,
|
||||
cloudConfig.AWS.S3ForcePathStyle)
|
||||
case cloudConfig.GCP != nil:
|
||||
objectStorage, err = gcp.NewObjectStorageAdapter()
|
||||
var email string
|
||||
var privateKey []byte
|
||||
|
||||
credentialsFile := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
|
||||
if credentialsFile != "" {
|
||||
// Get the email and private key from the credentials file so we can pre-sign download URLs
|
||||
creds, err := ioutil.ReadFile(credentialsFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jwtConfig, err := google.JWTConfigFromJSON(creds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
email = jwtConfig.Email
|
||||
privateKey = jwtConfig.PrivateKey
|
||||
} else {
|
||||
glog.Warning("GOOGLE_APPLICATION_CREDENTIALS is undefined; some features such as downloading log files will not work")
|
||||
}
|
||||
|
||||
objectStorage, err = gcp.NewObjectStorageAdapter(email, privateKey)
|
||||
case cloudConfig.Azure != nil:
|
||||
objectStorage, err = azure.NewObjectStorageAdapter()
|
||||
}
|
||||
|
@ -465,6 +489,19 @@ func (s *server) runControllers(config *api.Config) error {
|
|||
wg.Done()
|
||||
}()
|
||||
|
||||
downloadRequestController := controller.NewDownloadRequestController(
|
||||
s.arkClient.ArkV1(),
|
||||
s.sharedInformerFactory.Ark().V1().DownloadRequests(),
|
||||
s.sharedInformerFactory.Ark().V1().Backups(),
|
||||
s.backupService,
|
||||
config.BackupStorageProvider.Bucket,
|
||||
)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
downloadRequestController.Run(ctx, 1)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
// SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS
|
||||
go s.sharedInformerFactory.Start(ctx.Done())
|
||||
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
/*
|
||||
Copyright 2017 Heptio Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package downloadrequest
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
|
||||
"github.com/heptio/ark/pkg/apis/ark/v1"
|
||||
arkclientv1 "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1"
|
||||
)
|
||||
|
||||
func Stream(client arkclientv1.DownloadRequestsGetter, name string, kind v1.DownloadTargetKind, w io.Writer, timeout time.Duration) error {
|
||||
req := &v1.DownloadRequest{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: v1.DefaultNamespace,
|
||||
Name: fmt.Sprintf("%s-%s", name, time.Now().Format("20060102150405")),
|
||||
},
|
||||
Spec: v1.DownloadRequestSpec{
|
||||
Target: v1.DownloadTarget{
|
||||
Kind: kind,
|
||||
Name: name,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
req, err := client.DownloadRequests(v1.DefaultNamespace).Create(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer client.DownloadRequests(v1.DefaultNamespace).Delete(req.Name, nil)
|
||||
|
||||
listOptions := metav1.ListOptions{
|
||||
//TODO: once kube-apiserver http://issue.k8s.io/51046 is fixed, uncomment
|
||||
//FieldSelector: "metadata.name=" + req.Name
|
||||
ResourceVersion: req.ResourceVersion,
|
||||
}
|
||||
watcher, err := client.DownloadRequests(v1.DefaultNamespace).Watch(listOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer watcher.Stop()
|
||||
|
||||
expired := time.NewTimer(timeout)
|
||||
defer expired.Stop()
|
||||
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case <-expired.C:
|
||||
return errors.New("timed out waiting for download URL")
|
||||
case e := <-watcher.ResultChan():
|
||||
updated, ok := e.Object.(*v1.DownloadRequest)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected type %T", e.Object)
|
||||
}
|
||||
|
||||
if updated.Name != req.Name {
|
||||
continue
|
||||
}
|
||||
|
||||
switch e.Type {
|
||||
case watch.Deleted:
|
||||
errors.New("download request was unexpectedly deleted")
|
||||
case watch.Modified:
|
||||
if updated.Status.DownloadURL != "" {
|
||||
req = updated
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if req.Status.DownloadURL == "" {
|
||||
return errors.New("file not found")
|
||||
}
|
||||
|
||||
httpClient := new(http.Client)
|
||||
|
||||
httpReq, err := http.NewRequest("GET", req.Status.DownloadURL, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Manually set this header so the net/http library does not automatically try to decompress. We
|
||||
// need to handle this manually because it's not currently possible to set the MIME type for the
|
||||
// pre-signed URLs for GCP or Azure.
|
||||
httpReq.Header.Set("Accept-Encoding", "gzip")
|
||||
|
||||
resp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("request failed; unable to decode response body: %v", err)
|
||||
}
|
||||
|
||||
return fmt.Errorf("request failed: %v", string(body))
|
||||
}
|
||||
|
||||
gzipReader, err := gzip.NewReader(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer gzipReader.Close()
|
||||
|
||||
_, err = io.Copy(w, gzipReader)
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,224 @@
|
|||
/*
|
||||
Copyright 2017 Heptio Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package downloadrequest
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/heptio/ark/pkg/apis/ark/v1"
|
||||
"github.com/heptio/ark/pkg/generated/clientset/fake"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
core "k8s.io/client-go/testing"
|
||||
)
|
||||
|
||||
func TestStream(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
kind v1.DownloadTargetKind
|
||||
timeout time.Duration
|
||||
createError error
|
||||
watchError error
|
||||
watchAdds []runtime.Object
|
||||
watchModifies []runtime.Object
|
||||
watchDeletes []runtime.Object
|
||||
updateWithURL bool
|
||||
statusCode int
|
||||
body string
|
||||
deleteError error
|
||||
expectedError string
|
||||
}{
|
||||
{
|
||||
name: "error creating req",
|
||||
createError: errors.New("foo"),
|
||||
kind: v1.DownloadTargetKindBackupLog,
|
||||
expectedError: "foo",
|
||||
},
|
||||
{
|
||||
name: "error creating watch",
|
||||
watchError: errors.New("bar"),
|
||||
kind: v1.DownloadTargetKindBackupLog,
|
||||
expectedError: "bar",
|
||||
},
|
||||
{
|
||||
name: "timed out",
|
||||
kind: v1.DownloadTargetKindBackupLog,
|
||||
timeout: time.Millisecond,
|
||||
expectedError: "timed out waiting for download URL",
|
||||
},
|
||||
{
|
||||
name: "unexpected watch type",
|
||||
kind: v1.DownloadTargetKindBackupLog,
|
||||
watchAdds: []runtime.Object{&v1.Backup{}},
|
||||
expectedError: "unexpected type *v1.Backup",
|
||||
},
|
||||
{
|
||||
name: "other requests added/updated/deleted first",
|
||||
kind: v1.DownloadTargetKindBackupLog,
|
||||
watchAdds: []runtime.Object{
|
||||
newDownloadRequest("foo").DownloadRequest,
|
||||
},
|
||||
watchModifies: []runtime.Object{
|
||||
newDownloadRequest("foo").DownloadRequest,
|
||||
},
|
||||
watchDeletes: []runtime.Object{
|
||||
newDownloadRequest("foo").DownloadRequest,
|
||||
},
|
||||
updateWithURL: true,
|
||||
statusCode: http.StatusOK,
|
||||
body: "download body",
|
||||
},
|
||||
{
|
||||
name: "http error",
|
||||
kind: v1.DownloadTargetKindBackupLog,
|
||||
updateWithURL: true,
|
||||
statusCode: http.StatusInternalServerError,
|
||||
body: "some error",
|
||||
expectedError: "request failed: some error",
|
||||
},
|
||||
}
|
||||
|
||||
const testTimeout = 30 * time.Second
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
client := fake.NewSimpleClientset()
|
||||
|
||||
created := make(chan *v1.DownloadRequest, 1)
|
||||
client.PrependReactor("create", "downloadrequests", func(action core.Action) (bool, runtime.Object, error) {
|
||||
createAction := action.(core.CreateAction)
|
||||
created <- createAction.GetObject().(*v1.DownloadRequest)
|
||||
return true, createAction.GetObject(), test.createError
|
||||
})
|
||||
|
||||
fakeWatch := watch.NewFake()
|
||||
client.PrependWatchReactor("downloadrequests", core.DefaultWatchReactor(fakeWatch, test.watchError))
|
||||
|
||||
deleted := make(chan string, 1)
|
||||
client.PrependReactor("delete", "downloadrequests", func(action core.Action) (bool, runtime.Object, error) {
|
||||
deleteAction := action.(core.DeleteAction)
|
||||
deleted <- deleteAction.GetName()
|
||||
return true, nil, test.deleteError
|
||||
})
|
||||
|
||||
timeout := test.timeout
|
||||
if timeout == 0 {
|
||||
timeout = testTimeout
|
||||
}
|
||||
|
||||
var server *httptest.Server
|
||||
var url string
|
||||
if test.updateWithURL {
|
||||
server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
w.WriteHeader(test.statusCode)
|
||||
if test.statusCode == http.StatusOK {
|
||||
gzipWriter := gzip.NewWriter(w)
|
||||
fmt.Fprintf(gzipWriter, test.body)
|
||||
gzipWriter.Close()
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, test.body)
|
||||
}))
|
||||
defer server.Close()
|
||||
url = server.URL
|
||||
}
|
||||
|
||||
output := new(bytes.Buffer)
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
err := Stream(client.ArkV1(), "name", test.kind, output, timeout)
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
for i := range test.watchAdds {
|
||||
fakeWatch.Add(test.watchAdds[i])
|
||||
}
|
||||
for i := range test.watchModifies {
|
||||
fakeWatch.Modify(test.watchModifies[i])
|
||||
}
|
||||
for i := range test.watchDeletes {
|
||||
fakeWatch.Delete(test.watchDeletes[i])
|
||||
}
|
||||
|
||||
var createdName string
|
||||
if test.updateWithURL {
|
||||
select {
|
||||
case r := <-created:
|
||||
createdName = r.Name
|
||||
r.Status.DownloadURL = url
|
||||
fakeWatch.Modify(r)
|
||||
case <-time.After(testTimeout):
|
||||
t.Fatalf("created object not received")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
var err error
|
||||
select {
|
||||
case err = <-errCh:
|
||||
case <-time.After(testTimeout):
|
||||
t.Fatal("test timed out")
|
||||
}
|
||||
|
||||
if test.expectedError != "" {
|
||||
require.EqualError(t, err, test.expectedError)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
if test.statusCode != http.StatusOK {
|
||||
assert.EqualError(t, err, "request failed: "+test.body)
|
||||
return
|
||||
}
|
||||
|
||||
assert.Equal(t, test.body, output.String())
|
||||
|
||||
select {
|
||||
case name := <-deleted:
|
||||
assert.Equal(t, createdName, name)
|
||||
default:
|
||||
t.Fatal("download request was not deleted")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type downloadRequest struct {
|
||||
*v1.DownloadRequest
|
||||
}
|
||||
|
||||
func newDownloadRequest(name string) *downloadRequest {
|
||||
return &downloadRequest{
|
||||
DownloadRequest: &v1.DownloadRequest{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: v1.DefaultNamespace,
|
||||
Name: name,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
|
@ -0,0 +1,285 @@
|
|||
/*
|
||||
Copyright 2017 Heptio Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
"github.com/heptio/ark/pkg/apis/ark/v1"
|
||||
"github.com/heptio/ark/pkg/cloudprovider"
|
||||
"github.com/heptio/ark/pkg/generated/clientset/scheme"
|
||||
arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1"
|
||||
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
|
||||
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
|
||||
)
|
||||
|
||||
type downloadRequestController struct {
|
||||
downloadRequestClient arkv1client.DownloadRequestsGetter
|
||||
downloadRequestLister listers.DownloadRequestLister
|
||||
downloadRequestListerSynced cache.InformerSynced
|
||||
backupLister listers.BackupLister
|
||||
backupListerSynced cache.InformerSynced
|
||||
|
||||
backupService cloudprovider.BackupService
|
||||
bucket string
|
||||
|
||||
syncHandler func(key string) error
|
||||
queue workqueue.RateLimitingInterface
|
||||
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
// NewDownloadRequestController creates a new DownloadRequestController.
|
||||
func NewDownloadRequestController(
|
||||
downloadRequestClient arkv1client.DownloadRequestsGetter,
|
||||
downloadRequestInformer informers.DownloadRequestInformer,
|
||||
backupInformer informers.BackupInformer,
|
||||
backupService cloudprovider.BackupService,
|
||||
bucket string,
|
||||
) Interface {
|
||||
c := &downloadRequestController{
|
||||
downloadRequestClient: downloadRequestClient,
|
||||
downloadRequestLister: downloadRequestInformer.Lister(),
|
||||
downloadRequestListerSynced: downloadRequestInformer.Informer().HasSynced,
|
||||
backupLister: backupInformer.Lister(),
|
||||
backupListerSynced: backupInformer.Informer().HasSynced,
|
||||
|
||||
backupService: backupService,
|
||||
bucket: bucket,
|
||||
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"),
|
||||
|
||||
clock: &clock.RealClock{},
|
||||
}
|
||||
|
||||
c.syncHandler = c.processDownloadRequest
|
||||
|
||||
downloadRequestInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("error creating queue key for %#v: %v", obj, err))
|
||||
return
|
||||
}
|
||||
c.queue.Add(key)
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// Run is a blocking function that runs the specified number of worker goroutines
|
||||
// to process items in the work queue. It will return when it receives on the
|
||||
// ctx.Done() channel.
|
||||
func (c *downloadRequestController) Run(ctx context.Context, numWorkers int) error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
defer func() {
|
||||
glog.Infof("Waiting for workers to finish their work")
|
||||
|
||||
c.queue.ShutDown()
|
||||
|
||||
// We have to wait here in the deferred function instead of at the bottom of the function body
|
||||
// because we have to shut down the queue in order for the workers to shut down gracefully, and
|
||||
// we want to shut down the queue via defer and not at the end of the body.
|
||||
wg.Wait()
|
||||
|
||||
glog.Infof("All workers have finished")
|
||||
}()
|
||||
|
||||
glog.Info("Starting DownloadRequestController")
|
||||
defer glog.Infof("Shutting down DownloadRequestController")
|
||||
|
||||
glog.Info("Waiting for caches to sync")
|
||||
if !cache.WaitForCacheSync(ctx.Done(), c.downloadRequestListerSynced, c.backupListerSynced) {
|
||||
return errors.New("timed out waiting for caches to sync")
|
||||
}
|
||||
glog.Info("Caches are synced")
|
||||
|
||||
wg.Add(numWorkers)
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
go func() {
|
||||
wait.Until(c.runWorker, time.Second, ctx.Done())
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
wait.Until(c.resync, time.Minute, ctx.Done())
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
<-ctx.Done()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// runWorker runs a worker until the controller's queue indicates it's time to shut down.
|
||||
func (c *downloadRequestController) runWorker() {
|
||||
// continually take items off the queue (waits if it's
|
||||
// empty) until we get a shutdown signal from the queue
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem processes a single item from the queue.
|
||||
func (c *downloadRequestController) processNextWorkItem() bool {
|
||||
key, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
// always call done on this item, since if it fails we'll add
|
||||
// it back with rate-limiting below
|
||||
defer c.queue.Done(key)
|
||||
|
||||
err := c.syncHandler(key.(string))
|
||||
if err == nil {
|
||||
// If you had no error, tell the queue to stop tracking history for your key. This will reset
|
||||
// things like failure counts for per-item rate limiting.
|
||||
c.queue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
glog.Errorf("syncHandler error: %v", err)
|
||||
// we had an error processing the item so add it back
|
||||
// into the queue for re-processing with rate-limiting
|
||||
c.queue.AddRateLimited(key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// processDownloadRequest is the default per-item sync handler. It generates a pre-signed URL for
|
||||
// a new DownloadRequest or deletes the DownloadRequest if it has expired.
|
||||
func (c *downloadRequestController) processDownloadRequest(key string) error {
|
||||
glog.V(4).Infof("processDownloadRequest for key %q", key)
|
||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("error splitting key %q: %v", key, err)
|
||||
return err
|
||||
}
|
||||
|
||||
downloadRequest, err := c.downloadRequestLister.DownloadRequests(ns).Get(name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
glog.V(4).Infof("unable to find DownloadRequest %q", key)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
glog.V(4).Infof("error getting DownloadRequest %q: %v", key, err)
|
||||
return err
|
||||
}
|
||||
|
||||
switch downloadRequest.Status.Phase {
|
||||
case "", v1.DownloadRequestPhaseNew:
|
||||
return c.generatePreSignedURL(downloadRequest)
|
||||
case v1.DownloadRequestPhaseProcessed:
|
||||
return c.deleteIfExpired(downloadRequest)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
const signedURLTTL = 10 * time.Minute
|
||||
|
||||
// generatePreSignedURL generates a pre-signed URL for downloadRequest, changes the phase to
|
||||
// Processed, and persists the changes to storage.
|
||||
func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.DownloadRequest) error {
|
||||
switch downloadRequest.Spec.Target.Kind {
|
||||
case v1.DownloadTargetKindBackupLog:
|
||||
update, err := cloneDownloadRequest(downloadRequest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
update.Status.DownloadURL, err = c.backupService.CreateBackupLogSignedURL(c.bucket, update.Spec.Target.Name, signedURLTTL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
update.Status.Phase = v1.DownloadRequestPhaseProcessed
|
||||
update.Status.Expiration = metav1.NewTime(c.clock.Now().Add(signedURLTTL))
|
||||
|
||||
_, err = c.downloadRequestClient.DownloadRequests(update.Namespace).Update(update)
|
||||
return err
|
||||
}
|
||||
|
||||
return fmt.Errorf("unsupported download target kind %q", downloadRequest.Spec.Target.Kind)
|
||||
}
|
||||
|
||||
// deleteIfExpired deletes downloadRequest if it has expired.
|
||||
func (c *downloadRequestController) deleteIfExpired(downloadRequest *v1.DownloadRequest) error {
|
||||
glog.V(4).Infof("checking for expiration of %s/%s", downloadRequest.Namespace, downloadRequest.Name)
|
||||
if downloadRequest.Status.Expiration.Time.Before(c.clock.Now()) {
|
||||
glog.V(4).Infof("%s/%s has not expired", downloadRequest.Namespace, downloadRequest.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
glog.V(4).Infof("%s/%s has expired - deleting", downloadRequest.Namespace, downloadRequest.Name)
|
||||
return c.downloadRequestClient.DownloadRequests(downloadRequest.Namespace).Delete(downloadRequest.Name, nil)
|
||||
}
|
||||
|
||||
// resync requeues all the DownloadRequests in the lister's cache. This is mostly to handle deleting
|
||||
// any expired requests that were not deleted as part of the normal client flow for whatever reason.
|
||||
func (c *downloadRequestController) resync() {
|
||||
list, err := c.downloadRequestLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("error listing download requests: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
for _, dr := range list {
|
||||
key, err := cache.MetaNamespaceKeyFunc(dr)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("error generating key for download request %#v: %v", dr, err))
|
||||
continue
|
||||
}
|
||||
|
||||
c.queue.Add(key)
|
||||
}
|
||||
}
|
||||
|
||||
// cloneDownloadRequest makes a deep copy of in.
|
||||
func cloneDownloadRequest(in *v1.DownloadRequest) (*v1.DownloadRequest, error) {
|
||||
clone, err := scheme.Scheme.DeepCopy(in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out, ok := clone.(*v1.DownloadRequest)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type: %T", clone)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
|
@ -19,6 +19,7 @@ package controller
|
|||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
@ -516,6 +517,7 @@ func TestGarbageCollectPicksUpBackupUponExpiration(t *testing.T) {
|
|||
assert.Equal(0, len(snapshotService.SnapshotsTaken), "snapshots should have been garbage-collected.")
|
||||
}
|
||||
|
||||
// TODO remove this and use util/test mock instead
|
||||
type fakeBackupService struct {
|
||||
backupMetadataByBucket map[string][]*api.Backup
|
||||
backupsByBucket map[string][]*api.Backup
|
||||
|
@ -554,6 +556,10 @@ func (s *fakeBackupService) DownloadBackup(bucket, name string) (io.ReadCloser,
|
|||
return ioutil.NopCloser(bytes.NewReader([]byte("hello world"))), nil
|
||||
}
|
||||
|
||||
func (s *fakeBackupService) DownloadBackupLogs(bucket, name string) (io.ReadCloser, error) {
|
||||
return ioutil.NopCloser(bytes.NewReader([]byte("hello world in a log"))), nil
|
||||
}
|
||||
|
||||
func (s *fakeBackupService) DeleteBackupMetadataFile(bucket, backupName string) error {
|
||||
backups, found := s.backupMetadataByBucket[bucket]
|
||||
if !found {
|
||||
|
@ -599,3 +605,7 @@ func (s *fakeBackupService) DeleteBackupFile(bucket, backupName string) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *fakeBackupService) CreateBackupLogSignedURL(bucket, backupName string, ttl time.Duration) (string, error) {
|
||||
return fmt.Sprintf("http://some.server/%s/%s/%d", bucket, backupName, ttl), nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
Copyright 2017 Heptio Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Code generated by mockery v1.0.0
|
||||
package test
|
||||
|
||||
import io "io"
|
||||
import mock "github.com/stretchr/testify/mock"
|
||||
import v1 "github.com/heptio/ark/pkg/apis/ark/v1"
|
||||
|
||||
// BackupService is an autogenerated mock type for the BackupService type
|
||||
type BackupService struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// DeleteBackup provides a mock function with given fields: bucket, backupName
|
||||
func (_m *BackupService) DeleteBackup(bucket string, backupName string) error {
|
||||
ret := _m.Called(bucket, backupName)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(string, string) error); ok {
|
||||
r0 = rf(bucket, backupName)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// DownloadBackup provides a mock function with given fields: bucket, name
|
||||
func (_m *BackupService) DownloadBackup(bucket string, name string) (io.ReadCloser, error) {
|
||||
ret := _m.Called(bucket, name)
|
||||
|
||||
var r0 io.ReadCloser
|
||||
if rf, ok := ret.Get(0).(func(string, string) io.ReadCloser); ok {
|
||||
r0 = rf(bucket, name)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(io.ReadCloser)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(string, string) error); ok {
|
||||
r1 = rf(bucket, name)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// GetAllBackups provides a mock function with given fields: bucket
|
||||
func (_m *BackupService) GetAllBackups(bucket string) ([]*v1.Backup, error) {
|
||||
ret := _m.Called(bucket)
|
||||
|
||||
var r0 []*v1.Backup
|
||||
if rf, ok := ret.Get(0).(func(string) []*v1.Backup); ok {
|
||||
r0 = rf(bucket)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]*v1.Backup)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(string) error); ok {
|
||||
r1 = rf(bucket)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// UploadBackup provides a mock function with given fields: bucket, name, metadata, backup, log
|
||||
func (_m *BackupService) UploadBackup(bucket string, name string, metadata io.ReadSeeker, backup io.ReadSeeker, log io.ReadSeeker) error {
|
||||
ret := _m.Called(bucket, name, metadata, backup, log)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(string, string, io.ReadSeeker, io.ReadSeeker, io.ReadSeeker) error); ok {
|
||||
r0 = rf(bucket, name, metadata, backup, log)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
Copyright 2017 Heptio Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Code generated by mockery v1.0.0
|
||||
package test
|
||||
|
||||
import io "io"
|
||||
import mock "github.com/stretchr/testify/mock"
|
||||
|
||||
// ObjectStorageAdapter is an autogenerated mock type for the ObjectStorageAdapter type
|
||||
type ObjectStorageAdapter struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// DeleteObject provides a mock function with given fields: bucket, key
|
||||
func (_m *ObjectStorageAdapter) DeleteObject(bucket string, key string) error {
|
||||
ret := _m.Called(bucket, key)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(string, string) error); ok {
|
||||
r0 = rf(bucket, key)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// GetObject provides a mock function with given fields: bucket, key
|
||||
func (_m *ObjectStorageAdapter) GetObject(bucket string, key string) (io.ReadCloser, error) {
|
||||
ret := _m.Called(bucket, key)
|
||||
|
||||
var r0 io.ReadCloser
|
||||
if rf, ok := ret.Get(0).(func(string, string) io.ReadCloser); ok {
|
||||
r0 = rf(bucket, key)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(io.ReadCloser)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(string, string) error); ok {
|
||||
r1 = rf(bucket, key)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// ListCommonPrefixes provides a mock function with given fields: bucket, delimiter
|
||||
func (_m *ObjectStorageAdapter) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) {
|
||||
ret := _m.Called(bucket, delimiter)
|
||||
|
||||
var r0 []string
|
||||
if rf, ok := ret.Get(0).(func(string, string) []string); ok {
|
||||
r0 = rf(bucket, delimiter)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]string)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(string, string) error); ok {
|
||||
r1 = rf(bucket, delimiter)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// PutObject provides a mock function with given fields: bucket, key, body
|
||||
func (_m *ObjectStorageAdapter) PutObject(bucket string, key string, body io.ReadSeeker) error {
|
||||
ret := _m.Called(bucket, key, body)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(string, string, io.ReadSeeker) error); ok {
|
||||
r0 = rf(bucket, key, body)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
Loading…
Reference in New Issue