remove SnapshotService, replace with direct BlockStore usage
Signed-off-by: Steve Kriss <steve@heptio.com>pull/715/head
parent
430ec2451a
commit
1c26fbde32
|
@ -55,7 +55,7 @@ type kubernetesBackupper struct {
|
|||
discoveryHelper discovery.Helper
|
||||
podCommandExecutor podexec.PodCommandExecutor
|
||||
groupBackupperFactory groupBackupperFactory
|
||||
snapshotService cloudprovider.SnapshotService
|
||||
blockStore cloudprovider.BlockStore
|
||||
resticBackupperFactory restic.BackupperFactory
|
||||
resticTimeout time.Duration
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ func NewKubernetesBackupper(
|
|||
discoveryHelper discovery.Helper,
|
||||
dynamicFactory client.DynamicFactory,
|
||||
podCommandExecutor podexec.PodCommandExecutor,
|
||||
snapshotService cloudprovider.SnapshotService,
|
||||
blockStore cloudprovider.BlockStore,
|
||||
resticBackupperFactory restic.BackupperFactory,
|
||||
resticTimeout time.Duration,
|
||||
) (Backupper, error) {
|
||||
|
@ -102,7 +102,7 @@ func NewKubernetesBackupper(
|
|||
dynamicFactory: dynamicFactory,
|
||||
podCommandExecutor: podCommandExecutor,
|
||||
groupBackupperFactory: &defaultGroupBackupperFactory{},
|
||||
snapshotService: snapshotService,
|
||||
blockStore: blockStore,
|
||||
resticBackupperFactory: resticBackupperFactory,
|
||||
resticTimeout: resticTimeout,
|
||||
}, nil
|
||||
|
@ -276,7 +276,7 @@ func (kb *kubernetesBackupper) Backup(logger logrus.FieldLogger, backup *api.Bac
|
|||
kb.podCommandExecutor,
|
||||
tw,
|
||||
resourceHooks,
|
||||
kb.snapshotService,
|
||||
kb.blockStore,
|
||||
resticBackupper,
|
||||
newPVCSnapshotTracker(),
|
||||
)
|
||||
|
|
|
@ -652,7 +652,7 @@ func (f *mockGroupBackupperFactory) newGroupBackupper(
|
|||
podCommandExecutor podexec.PodCommandExecutor,
|
||||
tarWriter tarWriter,
|
||||
resourceHooks []resourceHook,
|
||||
snapshotService cloudprovider.SnapshotService,
|
||||
blockStore cloudprovider.BlockStore,
|
||||
resticBackupper restic.Backupper,
|
||||
resticSnapshotTracker *pvcSnapshotTracker,
|
||||
) groupBackupper {
|
||||
|
@ -669,7 +669,7 @@ func (f *mockGroupBackupperFactory) newGroupBackupper(
|
|||
podCommandExecutor,
|
||||
tarWriter,
|
||||
resourceHooks,
|
||||
snapshotService,
|
||||
blockStore,
|
||||
resticBackupper,
|
||||
resticSnapshotTracker,
|
||||
)
|
||||
|
|
|
@ -49,7 +49,7 @@ type groupBackupperFactory interface {
|
|||
podCommandExecutor podexec.PodCommandExecutor,
|
||||
tarWriter tarWriter,
|
||||
resourceHooks []resourceHook,
|
||||
snapshotService cloudprovider.SnapshotService,
|
||||
blockStore cloudprovider.BlockStore,
|
||||
resticBackupper restic.Backupper,
|
||||
resticSnapshotTracker *pvcSnapshotTracker,
|
||||
) groupBackupper
|
||||
|
@ -69,7 +69,7 @@ func (f *defaultGroupBackupperFactory) newGroupBackupper(
|
|||
podCommandExecutor podexec.PodCommandExecutor,
|
||||
tarWriter tarWriter,
|
||||
resourceHooks []resourceHook,
|
||||
snapshotService cloudprovider.SnapshotService,
|
||||
blockStore cloudprovider.BlockStore,
|
||||
resticBackupper restic.Backupper,
|
||||
resticSnapshotTracker *pvcSnapshotTracker,
|
||||
) groupBackupper {
|
||||
|
@ -86,7 +86,7 @@ func (f *defaultGroupBackupperFactory) newGroupBackupper(
|
|||
podCommandExecutor: podCommandExecutor,
|
||||
tarWriter: tarWriter,
|
||||
resourceHooks: resourceHooks,
|
||||
snapshotService: snapshotService,
|
||||
blockStore: blockStore,
|
||||
resticBackupper: resticBackupper,
|
||||
resticSnapshotTracker: resticSnapshotTracker,
|
||||
resourceBackupperFactory: &defaultResourceBackupperFactory{},
|
||||
|
@ -109,7 +109,7 @@ type defaultGroupBackupper struct {
|
|||
podCommandExecutor podexec.PodCommandExecutor
|
||||
tarWriter tarWriter
|
||||
resourceHooks []resourceHook
|
||||
snapshotService cloudprovider.SnapshotService
|
||||
blockStore cloudprovider.BlockStore
|
||||
resticBackupper restic.Backupper
|
||||
resticSnapshotTracker *pvcSnapshotTracker
|
||||
resourceBackupperFactory resourceBackupperFactory
|
||||
|
@ -133,7 +133,7 @@ func (gb *defaultGroupBackupper) backupGroup(group *metav1.APIResourceList) erro
|
|||
gb.podCommandExecutor,
|
||||
gb.tarWriter,
|
||||
gb.resourceHooks,
|
||||
gb.snapshotService,
|
||||
gb.blockStore,
|
||||
gb.resticBackupper,
|
||||
gb.resticSnapshotTracker,
|
||||
)
|
||||
|
|
|
@ -161,7 +161,7 @@ func (rbf *mockResourceBackupperFactory) newResourceBackupper(
|
|||
podCommandExecutor podexec.PodCommandExecutor,
|
||||
tarWriter tarWriter,
|
||||
resourceHooks []resourceHook,
|
||||
snapshotService cloudprovider.SnapshotService,
|
||||
blockStore cloudprovider.BlockStore,
|
||||
resticBackupper restic.Backupper,
|
||||
resticSnapshotTracker *pvcSnapshotTracker,
|
||||
) resourceBackupper {
|
||||
|
@ -178,7 +178,7 @@ func (rbf *mockResourceBackupperFactory) newResourceBackupper(
|
|||
podCommandExecutor,
|
||||
tarWriter,
|
||||
resourceHooks,
|
||||
snapshotService,
|
||||
blockStore,
|
||||
resticBackupper,
|
||||
resticSnapshotTracker,
|
||||
)
|
||||
|
|
|
@ -54,7 +54,7 @@ type itemBackupperFactory interface {
|
|||
resourceHooks []resourceHook,
|
||||
dynamicFactory client.DynamicFactory,
|
||||
discoveryHelper discovery.Helper,
|
||||
snapshotService cloudprovider.SnapshotService,
|
||||
blockStore cloudprovider.BlockStore,
|
||||
resticBackupper restic.Backupper,
|
||||
resticSnapshotTracker *pvcSnapshotTracker,
|
||||
) ItemBackupper
|
||||
|
@ -72,7 +72,7 @@ func (f *defaultItemBackupperFactory) newItemBackupper(
|
|||
resourceHooks []resourceHook,
|
||||
dynamicFactory client.DynamicFactory,
|
||||
discoveryHelper discovery.Helper,
|
||||
snapshotService cloudprovider.SnapshotService,
|
||||
blockStore cloudprovider.BlockStore,
|
||||
resticBackupper restic.Backupper,
|
||||
resticSnapshotTracker *pvcSnapshotTracker,
|
||||
) ItemBackupper {
|
||||
|
@ -86,7 +86,7 @@ func (f *defaultItemBackupperFactory) newItemBackupper(
|
|||
resourceHooks: resourceHooks,
|
||||
dynamicFactory: dynamicFactory,
|
||||
discoveryHelper: discoveryHelper,
|
||||
snapshotService: snapshotService,
|
||||
blockStore: blockStore,
|
||||
itemHookHandler: &defaultItemHookHandler{
|
||||
podCommandExecutor: podCommandExecutor,
|
||||
},
|
||||
|
@ -114,7 +114,7 @@ type defaultItemBackupper struct {
|
|||
resourceHooks []resourceHook
|
||||
dynamicFactory client.DynamicFactory
|
||||
discoveryHelper discovery.Helper
|
||||
snapshotService cloudprovider.SnapshotService
|
||||
blockStore cloudprovider.BlockStore
|
||||
resticBackupper restic.Backupper
|
||||
resticSnapshotTracker *pvcSnapshotTracker
|
||||
|
||||
|
@ -219,7 +219,7 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
|
|||
obj = updatedObj
|
||||
|
||||
if groupResource == kuberesource.PersistentVolumes {
|
||||
if ib.snapshotService == nil {
|
||||
if ib.blockStore == nil {
|
||||
log.Debug("Skipping Persistent Volume snapshot because they're not enabled.")
|
||||
} else if err := ib.takePVSnapshot(obj, ib.backup, log); err != nil {
|
||||
backupErrs = append(backupErrs, err)
|
||||
|
@ -399,7 +399,7 @@ func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, backup
|
|||
log.Infof("label %q is not present on PersistentVolume", zoneLabel)
|
||||
}
|
||||
|
||||
volumeID, err := ib.snapshotService.GetVolumeID(obj)
|
||||
volumeID, err := ib.blockStore.GetVolumeID(obj)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error getting volume ID for PersistentVolume")
|
||||
}
|
||||
|
@ -416,14 +416,14 @@ func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, backup
|
|||
}
|
||||
|
||||
log.Info("Snapshotting PersistentVolume")
|
||||
snapshotID, err := ib.snapshotService.CreateSnapshot(volumeID, pvFailureDomainZone, tags)
|
||||
snapshotID, err := ib.blockStore.CreateSnapshot(volumeID, pvFailureDomainZone, tags)
|
||||
if err != nil {
|
||||
// log+error on purpose - log goes to the per-backup log file, error goes to the backup
|
||||
log.WithError(err).Error("error creating snapshot")
|
||||
return errors.WithMessage(err, "error creating snapshot")
|
||||
}
|
||||
|
||||
volumeType, iops, err := ib.snapshotService.GetVolumeInfo(volumeID, pvFailureDomainZone)
|
||||
volumeType, iops, err := ib.blockStore.GetVolumeInfo(volumeID, pvFailureDomainZone)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error getting volume info")
|
||||
return errors.WithMessage(err, "error getting volume info")
|
||||
|
|
|
@ -277,7 +277,7 @@ func TestBackupItemNoSkips(t *testing.T) {
|
|||
additionalItemError: errors.New("foo"),
|
||||
},
|
||||
{
|
||||
name: "takePVSnapshot is not invoked for PVs when snapshotService == nil",
|
||||
name: "takePVSnapshot is not invoked for PVs when blockStore == nil",
|
||||
namespaceIncludesExcludes: collections.NewIncludesExcludes().Includes("*"),
|
||||
item: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/zone": "us-east-1c"}}, "spec": {"awsElasticBlockStore": {"volumeID": "aws://us-east-1c/vol-abc123"}}}`,
|
||||
expectError: false,
|
||||
|
@ -286,7 +286,7 @@ func TestBackupItemNoSkips(t *testing.T) {
|
|||
groupResource: "persistentvolumes",
|
||||
},
|
||||
{
|
||||
name: "takePVSnapshot is invoked for PVs when snapshotService != nil",
|
||||
name: "takePVSnapshot is invoked for PVs when blockStore != nil",
|
||||
namespaceIncludesExcludes: collections.NewIncludesExcludes().Includes("*"),
|
||||
item: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/zone": "us-east-1c"}}, "spec": {"awsElasticBlockStore": {"volumeID": "aws://us-east-1c/vol-abc123"}}}`,
|
||||
expectError: false,
|
||||
|
@ -305,7 +305,7 @@ func TestBackupItemNoSkips(t *testing.T) {
|
|||
expectExcluded: false,
|
||||
expectedTarHeaderName: "resources/persistentvolumes/cluster/mypv.json",
|
||||
groupResource: "persistentvolumes",
|
||||
// empty snapshottableVolumes causes a snapshotService to be created, but no
|
||||
// empty snapshottableVolumes causes a blockStore to be created, but no
|
||||
// snapshots are expected to be taken.
|
||||
snapshottableVolumes: map[string]api.VolumeBackupInfo{},
|
||||
trackedPVCs: sets.NewString(key("pvc-ns", "pvc"), key("another-pvc-ns", "another-pvc")),
|
||||
|
@ -419,14 +419,14 @@ func TestBackupItemNoSkips(t *testing.T) {
|
|||
newPVCSnapshotTracker(),
|
||||
).(*defaultItemBackupper)
|
||||
|
||||
var snapshotService *arktest.FakeSnapshotService
|
||||
var blockStore *arktest.FakeBlockStore
|
||||
if test.snapshottableVolumes != nil {
|
||||
snapshotService = &arktest.FakeSnapshotService{
|
||||
blockStore = &arktest.FakeBlockStore{
|
||||
SnapshottableVolumes: test.snapshottableVolumes,
|
||||
VolumeID: "vol-abc123",
|
||||
Error: test.snapshotError,
|
||||
}
|
||||
b.snapshotService = snapshotService
|
||||
b.blockStore = blockStore
|
||||
}
|
||||
|
||||
if test.trackedPVCs != nil {
|
||||
|
@ -514,7 +514,7 @@ func TestBackupItemNoSkips(t *testing.T) {
|
|||
}
|
||||
|
||||
if test.snapshottableVolumes != nil {
|
||||
require.Equal(t, len(test.snapshottableVolumes), len(snapshotService.SnapshotsTaken))
|
||||
require.Equal(t, len(test.snapshottableVolumes), len(blockStore.SnapshotsTaken))
|
||||
|
||||
var expectedBackups []api.VolumeBackupInfo
|
||||
for _, vbi := range test.snapshottableVolumes {
|
||||
|
@ -719,12 +719,12 @@ func TestTakePVSnapshot(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
snapshotService := &arktest.FakeSnapshotService{
|
||||
blockStore := &arktest.FakeBlockStore{
|
||||
SnapshottableVolumes: test.volumeInfo,
|
||||
VolumeID: test.expectedVolumeID,
|
||||
}
|
||||
|
||||
ib := &defaultItemBackupper{snapshotService: snapshotService}
|
||||
ib := &defaultItemBackupper{blockStore: blockStore}
|
||||
|
||||
pv, err := arktest.GetAsMap(test.pv)
|
||||
if err != nil {
|
||||
|
@ -754,12 +754,12 @@ func TestTakePVSnapshot(t *testing.T) {
|
|||
}
|
||||
|
||||
// we should have one snapshot taken exactly
|
||||
require.Equal(t, test.expectedSnapshotsTaken, snapshotService.SnapshotsTaken.Len())
|
||||
require.Equal(t, test.expectedSnapshotsTaken, blockStore.SnapshotsTaken.Len())
|
||||
|
||||
if test.expectedSnapshotsTaken > 0 {
|
||||
// the snapshotID should be the one in the entry in snapshotService.SnapshottableVolumes
|
||||
// the snapshotID should be the one in the entry in blockStore.SnapshottableVolumes
|
||||
// for the volume we ran the test for
|
||||
snapshotID, _ := snapshotService.SnapshotsTaken.PopAny()
|
||||
snapshotID, _ := blockStore.SnapshotsTaken.PopAny()
|
||||
|
||||
expectedVolumeBackups["mypv"] = &v1.VolumeBackupInfo{
|
||||
SnapshotID: snapshotID,
|
||||
|
|
|
@ -51,7 +51,7 @@ type resourceBackupperFactory interface {
|
|||
podCommandExecutor podexec.PodCommandExecutor,
|
||||
tarWriter tarWriter,
|
||||
resourceHooks []resourceHook,
|
||||
snapshotService cloudprovider.SnapshotService,
|
||||
blockStore cloudprovider.BlockStore,
|
||||
resticBackupper restic.Backupper,
|
||||
resticSnapshotTracker *pvcSnapshotTracker,
|
||||
) resourceBackupper
|
||||
|
@ -72,7 +72,7 @@ func (f *defaultResourceBackupperFactory) newResourceBackupper(
|
|||
podCommandExecutor podexec.PodCommandExecutor,
|
||||
tarWriter tarWriter,
|
||||
resourceHooks []resourceHook,
|
||||
snapshotService cloudprovider.SnapshotService,
|
||||
blockStore cloudprovider.BlockStore,
|
||||
resticBackupper restic.Backupper,
|
||||
resticSnapshotTracker *pvcSnapshotTracker,
|
||||
) resourceBackupper {
|
||||
|
@ -89,7 +89,7 @@ func (f *defaultResourceBackupperFactory) newResourceBackupper(
|
|||
podCommandExecutor: podCommandExecutor,
|
||||
tarWriter: tarWriter,
|
||||
resourceHooks: resourceHooks,
|
||||
snapshotService: snapshotService,
|
||||
blockStore: blockStore,
|
||||
resticBackupper: resticBackupper,
|
||||
resticSnapshotTracker: resticSnapshotTracker,
|
||||
itemBackupperFactory: &defaultItemBackupperFactory{},
|
||||
|
@ -113,7 +113,7 @@ type defaultResourceBackupper struct {
|
|||
podCommandExecutor podexec.PodCommandExecutor
|
||||
tarWriter tarWriter
|
||||
resourceHooks []resourceHook
|
||||
snapshotService cloudprovider.SnapshotService
|
||||
blockStore cloudprovider.BlockStore
|
||||
resticBackupper restic.Backupper
|
||||
resticSnapshotTracker *pvcSnapshotTracker
|
||||
itemBackupperFactory itemBackupperFactory
|
||||
|
@ -189,7 +189,7 @@ func (rb *defaultResourceBackupper) backupResource(
|
|||
rb.resourceHooks,
|
||||
rb.dynamicFactory,
|
||||
rb.discoveryHelper,
|
||||
rb.snapshotService,
|
||||
rb.blockStore,
|
||||
rb.resticBackupper,
|
||||
rb.resticSnapshotTracker,
|
||||
)
|
||||
|
|
|
@ -544,7 +544,6 @@ func TestBackupResourceOnlyIncludesSpecifiedNamespaces(t *testing.T) {
|
|||
dynamicFactory: dynamicFactory,
|
||||
discoveryHelper: discoveryHelper,
|
||||
itemHookHandler: itemHookHandler,
|
||||
snapshotService: nil,
|
||||
}
|
||||
|
||||
itemBackupperFactory.On("newItemBackupper",
|
||||
|
@ -690,7 +689,7 @@ func (ibf *mockItemBackupperFactory) newItemBackupper(
|
|||
resourceHooks []resourceHook,
|
||||
dynamicFactory client.DynamicFactory,
|
||||
discoveryHelper discovery.Helper,
|
||||
snapshotService cloudprovider.SnapshotService,
|
||||
blockStore cloudprovider.BlockStore,
|
||||
resticBackupper restic.Backupper,
|
||||
resticSnapshotTracker *pvcSnapshotTracker,
|
||||
) ItemBackupper {
|
||||
|
@ -705,7 +704,7 @@ func (ibf *mockItemBackupperFactory) newItemBackupper(
|
|||
resourceHooks,
|
||||
dynamicFactory,
|
||||
discoveryHelper,
|
||||
snapshotService,
|
||||
blockStore,
|
||||
resticBackupper,
|
||||
resticSnapshotTracker,
|
||||
)
|
||||
|
|
|
@ -141,15 +141,6 @@ func (b *blockStore) GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, e
|
|||
return volumeType, iops, nil
|
||||
}
|
||||
|
||||
func (b *blockStore) IsVolumeReady(volumeID, volumeAZ string) (ready bool, err error) {
|
||||
volumeInfo, err := b.describeVolume(volumeID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return *volumeInfo.State == ec2.VolumeStateAvailable, nil
|
||||
}
|
||||
|
||||
func (b *blockStore) describeVolume(volumeID string) (*ec2.Volume, error) {
|
||||
req := &ec2.DescribeVolumesInput{
|
||||
VolumeIds: []*string{&volumeID},
|
||||
|
|
|
@ -183,19 +183,6 @@ func (b *blockStore) GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, e
|
|||
return string(res.AccountType), nil, nil
|
||||
}
|
||||
|
||||
func (b *blockStore) IsVolumeReady(volumeID, volumeAZ string) (ready bool, err error) {
|
||||
res, err := b.disks.Get(b.resourceGroup, volumeID)
|
||||
if err != nil {
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if res.ProvisioningState == nil {
|
||||
return false, errors.New("nil ProvisioningState returned from Get call")
|
||||
}
|
||||
|
||||
return *res.ProvisioningState == "Succeeded", nil
|
||||
}
|
||||
|
||||
func (b *blockStore) CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (string, error) {
|
||||
// Lookup disk info for its Location
|
||||
diskInfo, err := b.disks.Get(b.resourceGroup, volumeID)
|
||||
|
|
|
@ -137,16 +137,6 @@ func (b *blockStore) GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, e
|
|||
return res.Type, nil, nil
|
||||
}
|
||||
|
||||
func (b *blockStore) IsVolumeReady(volumeID, volumeAZ string) (ready bool, err error) {
|
||||
disk, err := b.gce.Disks.Get(b.project, volumeAZ, volumeID).Do()
|
||||
if err != nil {
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
// TODO can we consider a disk ready while it's in the RESTORING state?
|
||||
return disk.Status == "READY", nil
|
||||
}
|
||||
|
||||
func (b *blockStore) CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (string, error) {
|
||||
// snapshot names must adhere to RFC1035 and be 1-63 characters
|
||||
// long
|
||||
|
|
|
@ -146,27 +146,6 @@ func (_m *BlockStore) Init(config map[string]string) error {
|
|||
return r0
|
||||
}
|
||||
|
||||
// IsVolumeReady provides a mock function with given fields: volumeID, volumeAZ
|
||||
func (_m *BlockStore) IsVolumeReady(volumeID string, volumeAZ string) (bool, error) {
|
||||
ret := _m.Called(volumeID, volumeAZ)
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func(string, string) bool); ok {
|
||||
r0 = rf(volumeID, volumeAZ)
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(string, string) error); ok {
|
||||
r1 = rf(volumeID, volumeAZ)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// SetVolumeID provides a mock function with given fields: pv, volumeID
|
||||
func (_m *BlockStore) SetVolumeID(pv runtime.Unstructured, volumeID string) (runtime.Unstructured, error) {
|
||||
ret := _m.Called(pv, volumeID)
|
||||
|
|
|
@ -1,113 +0,0 @@
|
|||
/*
|
||||
Copyright 2017 the Heptio Ark contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cloudprovider
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
// SnapshotService exposes Ark-specific operations for snapshotting and restoring block
|
||||
// volumes.
|
||||
type SnapshotService interface {
|
||||
// CreateSnapshot triggers a snapshot for the specified cloud volume and tags it with metadata.
|
||||
// it returns the cloud snapshot ID, or an error if a problem is encountered triggering the snapshot via
|
||||
// the cloud API.
|
||||
CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (string, error)
|
||||
|
||||
// CreateVolumeFromSnapshot triggers a restore operation to create a new cloud volume from the specified
|
||||
// snapshot and volume characteristics. Returns the cloud volume ID, or an error if a problem is
|
||||
// encountered triggering the restore via the cloud API.
|
||||
CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ string, iops *int64) (string, error)
|
||||
|
||||
// DeleteSnapshot triggers a deletion of the specified Ark snapshot via the cloud API. It returns an
|
||||
// error if a problem is encountered triggering the deletion via the cloud API.
|
||||
DeleteSnapshot(snapshotID string) error
|
||||
|
||||
// GetVolumeInfo gets the type and IOPS (if applicable) from the cloud API.
|
||||
GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, error)
|
||||
|
||||
// GetVolumeID returns the cloud provider specific identifier for the PersistentVolume.
|
||||
GetVolumeID(pv runtime.Unstructured) (string, error)
|
||||
|
||||
// SetVolumeID sets the cloud provider specific identifier for the PersistentVolume.
|
||||
SetVolumeID(pv runtime.Unstructured, volumeID string) (runtime.Unstructured, error)
|
||||
}
|
||||
|
||||
const (
|
||||
volumeCreateWaitTimeout = 30 * time.Second
|
||||
volumeCreatePollInterval = 1 * time.Second
|
||||
)
|
||||
|
||||
type snapshotService struct {
|
||||
blockStore BlockStore
|
||||
}
|
||||
|
||||
var _ SnapshotService = &snapshotService{}
|
||||
|
||||
// NewSnapshotService creates a snapshot service using the provided block store
|
||||
func NewSnapshotService(blockStore BlockStore) SnapshotService {
|
||||
return &snapshotService{
|
||||
blockStore: blockStore,
|
||||
}
|
||||
}
|
||||
|
||||
func (sr *snapshotService) CreateVolumeFromSnapshot(snapshotID string, volumeType string, volumeAZ string, iops *int64) (string, error) {
|
||||
volumeID, err := sr.blockStore.CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ, iops)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// wait for volume to be ready (up to a maximum time limit)
|
||||
ticker := time.NewTicker(volumeCreatePollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
timeout := time.NewTimer(volumeCreateWaitTimeout)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timeout.C:
|
||||
return "", errors.Errorf("timeout reached waiting for volume %v to be ready", volumeID)
|
||||
case <-ticker.C:
|
||||
if ready, err := sr.blockStore.IsVolumeReady(volumeID, volumeAZ); err == nil && ready {
|
||||
return volumeID, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sr *snapshotService) CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (string, error) {
|
||||
return sr.blockStore.CreateSnapshot(volumeID, volumeAZ, tags)
|
||||
}
|
||||
|
||||
func (sr *snapshotService) DeleteSnapshot(snapshotID string) error {
|
||||
return sr.blockStore.DeleteSnapshot(snapshotID)
|
||||
}
|
||||
|
||||
func (sr *snapshotService) GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, error) {
|
||||
return sr.blockStore.GetVolumeInfo(volumeID, volumeAZ)
|
||||
}
|
||||
|
||||
func (sr *snapshotService) GetVolumeID(pv runtime.Unstructured) (string, error) {
|
||||
return sr.blockStore.GetVolumeID(pv)
|
||||
}
|
||||
|
||||
func (sr *snapshotService) SetVolumeID(pv runtime.Unstructured, volumeID string) (runtime.Unstructured, error) {
|
||||
return sr.blockStore.SetVolumeID(pv, volumeID)
|
||||
}
|
|
@ -80,9 +80,6 @@ type BlockStore interface {
|
|||
// the specified block volume in the given availability zone.
|
||||
GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, error)
|
||||
|
||||
// IsVolumeReady returns whether the specified volume is ready to be used.
|
||||
IsVolumeReady(volumeID, volumeAZ string) (ready bool, err error)
|
||||
|
||||
// CreateSnapshot creates a snapshot of the specified block volume, and applies the provided
|
||||
// set of tags to the snapshot.
|
||||
CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (snapshotID string, err error)
|
||||
|
|
|
@ -149,7 +149,7 @@ type server struct {
|
|||
kubeClient kubernetes.Interface
|
||||
arkClient clientset.Interface
|
||||
objectStore cloudprovider.ObjectStore
|
||||
snapshotService cloudprovider.SnapshotService
|
||||
blockStore cloudprovider.BlockStore
|
||||
discoveryClient discovery.DiscoveryInterface
|
||||
discoveryHelper arkdiscovery.Helper
|
||||
dynamicClient dynamic.Interface
|
||||
|
@ -255,8 +255,15 @@ func (s *server) run() error {
|
|||
}
|
||||
s.objectStore = objectStore
|
||||
|
||||
if err := s.initSnapshotService(config); err != nil {
|
||||
return err
|
||||
if config.PersistentVolumeProvider == nil {
|
||||
s.logger.Info("PersistentVolumeProvider config not provided, volume snapshots and restores are disabled")
|
||||
} else {
|
||||
s.logger.Info("Configuring cloud provider for snapshot service")
|
||||
blockStore, err := getBlockStore(*config.PersistentVolumeProvider, s.pluginManager)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.blockStore = blockStore
|
||||
}
|
||||
|
||||
if config.BackupStorageProvider.ResticLocation != "" {
|
||||
|
@ -464,21 +471,6 @@ func (s *server) watchConfig(config *api.Config) {
|
|||
})
|
||||
}
|
||||
|
||||
func (s *server) initSnapshotService(config *api.Config) error {
|
||||
if config.PersistentVolumeProvider == nil {
|
||||
s.logger.Info("PersistentVolumeProvider config not provided, volume snapshots and restores are disabled")
|
||||
return nil
|
||||
}
|
||||
|
||||
s.logger.Info("Configuring cloud provider for snapshot service")
|
||||
blockStore, err := getBlockStore(*config.PersistentVolumeProvider, s.pluginManager)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.snapshotService = cloudprovider.NewSnapshotService(blockStore)
|
||||
return nil
|
||||
}
|
||||
|
||||
func getObjectStore(cloudConfig api.CloudProviderConfig, manager plugin.Manager) (cloudprovider.ObjectStore, error) {
|
||||
if cloudConfig.Name == "" {
|
||||
return nil, errors.New("object storage provider name must not be empty")
|
||||
|
@ -620,7 +612,7 @@ func (s *server) runControllers(config *api.Config) error {
|
|||
s.discoveryHelper,
|
||||
client.NewDynamicFactory(s.dynamicClient),
|
||||
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
|
||||
s.snapshotService,
|
||||
s.blockStore,
|
||||
s.resticManager,
|
||||
config.PodVolumeOperationTimeout.Duration,
|
||||
)
|
||||
|
@ -632,7 +624,7 @@ func (s *server) runControllers(config *api.Config) error {
|
|||
backupper,
|
||||
config.BackupStorageProvider.CloudProviderConfig,
|
||||
config.BackupStorageProvider.Bucket,
|
||||
s.snapshotService != nil,
|
||||
s.blockStore != nil,
|
||||
s.logger,
|
||||
s.logLevel,
|
||||
s.pluginRegistry,
|
||||
|
@ -678,7 +670,7 @@ func (s *server) runControllers(config *api.Config) error {
|
|||
s.sharedInformerFactory.Ark().V1().DeleteBackupRequests(),
|
||||
s.arkClient.ArkV1(), // deleteBackupRequestClient
|
||||
s.arkClient.ArkV1(), // backupClient
|
||||
s.snapshotService,
|
||||
s.blockStore,
|
||||
s.objectStore,
|
||||
config.BackupStorageProvider.Bucket,
|
||||
s.sharedInformerFactory.Ark().V1().Restores(),
|
||||
|
@ -698,7 +690,7 @@ func (s *server) runControllers(config *api.Config) error {
|
|||
restorer, err := restore.NewKubernetesRestorer(
|
||||
s.discoveryHelper,
|
||||
client.NewDynamicFactory(s.dynamicClient),
|
||||
s.snapshotService,
|
||||
s.blockStore,
|
||||
config.ResourcePriorities,
|
||||
s.arkClient.ArkV1(),
|
||||
s.kubeClient.CoreV1().Namespaces(),
|
||||
|
@ -717,7 +709,7 @@ func (s *server) runControllers(config *api.Config) error {
|
|||
config.BackupStorageProvider.CloudProviderConfig,
|
||||
config.BackupStorageProvider.Bucket,
|
||||
s.sharedInformerFactory.Ark().V1().Backups(),
|
||||
s.snapshotService != nil,
|
||||
s.blockStore != nil,
|
||||
s.logger,
|
||||
s.logLevel,
|
||||
s.pluginRegistry,
|
||||
|
|
|
@ -49,7 +49,7 @@ type backupDeletionController struct {
|
|||
deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter
|
||||
deleteBackupRequestLister listers.DeleteBackupRequestLister
|
||||
backupClient arkv1client.BackupsGetter
|
||||
snapshotService cloudprovider.SnapshotService
|
||||
blockStore cloudprovider.BlockStore
|
||||
objectStore cloudprovider.ObjectStore
|
||||
bucket string
|
||||
restoreLister listers.RestoreLister
|
||||
|
@ -69,7 +69,7 @@ func NewBackupDeletionController(
|
|||
deleteBackupRequestInformer informers.DeleteBackupRequestInformer,
|
||||
deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter,
|
||||
backupClient arkv1client.BackupsGetter,
|
||||
snapshotService cloudprovider.SnapshotService,
|
||||
blockStore cloudprovider.BlockStore,
|
||||
objectStore cloudprovider.ObjectStore,
|
||||
bucket string,
|
||||
restoreInformer informers.RestoreInformer,
|
||||
|
@ -83,7 +83,7 @@ func NewBackupDeletionController(
|
|||
deleteBackupRequestClient: deleteBackupRequestClient,
|
||||
deleteBackupRequestLister: deleteBackupRequestInformer.Lister(),
|
||||
backupClient: backupClient,
|
||||
snapshotService: snapshotService,
|
||||
blockStore: blockStore,
|
||||
objectStore: objectStore,
|
||||
bucket: bucket,
|
||||
restoreLister: restoreInformer.Lister(),
|
||||
|
@ -220,7 +220,7 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
|
|||
|
||||
// If the backup includes snapshots but we don't currently have a PVProvider, we don't
|
||||
// want to orphan the snapshots so skip deletion.
|
||||
if c.snapshotService == nil && len(backup.Status.VolumeBackups) > 0 {
|
||||
if c.blockStore == nil && len(backup.Status.VolumeBackups) > 0 {
|
||||
req, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) {
|
||||
r.Status.Phase = v1.DeleteBackupRequestPhaseProcessed
|
||||
r.Status.Errors = []string{"unable to delete backup because it includes PV snapshots and Ark is not configured with a PersistentVolumeProvider"}
|
||||
|
@ -244,7 +244,7 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
|
|||
log.Info("Removing PV snapshots")
|
||||
for _, volumeBackup := range backup.Status.VolumeBackups {
|
||||
log.WithField("snapshotID", volumeBackup.SnapshotID).Info("Removing snapshot associated with backup")
|
||||
if err := c.snapshotService.DeleteSnapshot(volumeBackup.SnapshotID); err != nil {
|
||||
if err := c.blockStore.DeleteSnapshot(volumeBackup.SnapshotID); err != nil {
|
||||
errs = append(errs, errors.Wrapf(err, "error deleting snapshot %s", volumeBackup.SnapshotID).Error())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ func TestBackupDeletionControllerProcessQueueItem(t *testing.T) {
|
|||
sharedInformers.Ark().V1().DeleteBackupRequests(),
|
||||
client.ArkV1(), // deleteBackupRequestClient
|
||||
client.ArkV1(), // backupClient
|
||||
nil, // snapshotService
|
||||
nil, // blockStore
|
||||
nil, // backupService
|
||||
"bucket",
|
||||
sharedInformers.Ark().V1().Restores(),
|
||||
|
@ -108,7 +108,7 @@ func TestBackupDeletionControllerProcessQueueItem(t *testing.T) {
|
|||
type backupDeletionControllerTestData struct {
|
||||
client *fake.Clientset
|
||||
sharedInformers informers.SharedInformerFactory
|
||||
snapshotService *arktest.FakeSnapshotService
|
||||
blockStore *arktest.FakeBlockStore
|
||||
controller *backupDeletionController
|
||||
req *v1.DeleteBackupRequest
|
||||
}
|
||||
|
@ -116,19 +116,19 @@ type backupDeletionControllerTestData struct {
|
|||
func setupBackupDeletionControllerTest(objects ...runtime.Object) *backupDeletionControllerTestData {
|
||||
client := fake.NewSimpleClientset(objects...)
|
||||
sharedInformers := informers.NewSharedInformerFactory(client, 0)
|
||||
snapshotService := &arktest.FakeSnapshotService{SnapshotsTaken: sets.NewString()}
|
||||
blockStore := &arktest.FakeBlockStore{SnapshotsTaken: sets.NewString()}
|
||||
req := pkgbackup.NewDeleteBackupRequest("foo", "uid")
|
||||
|
||||
data := &backupDeletionControllerTestData{
|
||||
client: client,
|
||||
sharedInformers: sharedInformers,
|
||||
snapshotService: snapshotService,
|
||||
blockStore: blockStore,
|
||||
controller: NewBackupDeletionController(
|
||||
arktest.NewLogger(),
|
||||
sharedInformers.Ark().V1().DeleteBackupRequests(),
|
||||
client.ArkV1(), // deleteBackupRequestClient
|
||||
client.ArkV1(), // backupClient
|
||||
snapshotService,
|
||||
blockStore,
|
||||
nil, // objectStore
|
||||
"bucket",
|
||||
sharedInformers.Ark().V1().Restores(),
|
||||
|
@ -305,9 +305,9 @@ func TestBackupDeletionControllerProcessRequest(t *testing.T) {
|
|||
assert.Equal(t, expectedActions, td.client.Actions())
|
||||
})
|
||||
|
||||
t.Run("no snapshot service, backup has snapshots", func(t *testing.T) {
|
||||
t.Run("no block store, backup has snapshots", func(t *testing.T) {
|
||||
td := setupBackupDeletionControllerTest()
|
||||
td.controller.snapshotService = nil
|
||||
td.controller.blockStore = nil
|
||||
|
||||
td.client.PrependReactor("get", "backups", func(action core.Action) (bool, runtime.Object, error) {
|
||||
backup := arktest.NewTestBackup().WithName("backup-1").WithSnapshot("pv-1", "snap-1").Backup
|
||||
|
@ -364,7 +364,7 @@ func TestBackupDeletionControllerProcessRequest(t *testing.T) {
|
|||
td.client.PrependReactor("get", "backups", func(action core.Action) (bool, runtime.Object, error) {
|
||||
return true, backup, nil
|
||||
})
|
||||
td.snapshotService.SnapshotsTaken.Insert("snap-1")
|
||||
td.blockStore.SnapshotsTaken.Insert("snap-1")
|
||||
|
||||
td.client.PrependReactor("patch", "deletebackuprequests", func(action core.Action) (bool, runtime.Object, error) {
|
||||
return true, td.req, nil
|
||||
|
@ -438,7 +438,7 @@ func TestBackupDeletionControllerProcessRequest(t *testing.T) {
|
|||
arktest.CompareActions(t, expectedActions, td.client.Actions())
|
||||
|
||||
// Make sure snapshot was deleted
|
||||
assert.Equal(t, 0, td.snapshotService.SnapshotsTaken.Len())
|
||||
assert.Equal(t, 0, td.blockStore.SnapshotsTaken.Len())
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -560,7 +560,7 @@ func TestBackupDeletionControllerDeleteExpiredRequests(t *testing.T) {
|
|||
sharedInformers.Ark().V1().DeleteBackupRequests(),
|
||||
client.ArkV1(), // deleteBackupRequestClient
|
||||
client.ArkV1(), // backupClient
|
||||
nil, // snapshotService
|
||||
nil, // blockStore
|
||||
nil, // backupService
|
||||
"bucket",
|
||||
sharedInformers.Ark().V1().Restores(),
|
||||
|
|
|
@ -117,16 +117,6 @@ func (c *BlockStoreGRPCClient) GetVolumeInfo(volumeID, volumeAZ string) (string,
|
|||
return res.VolumeType, iops, nil
|
||||
}
|
||||
|
||||
// IsVolumeReady returns whether the specified volume is ready to be used.
|
||||
func (c *BlockStoreGRPCClient) IsVolumeReady(volumeID, volumeAZ string) (bool, error) {
|
||||
res, err := c.grpcClient.IsVolumeReady(context.Background(), &proto.IsVolumeReadyRequest{Plugin: c.plugin, VolumeID: volumeID, VolumeAZ: volumeAZ})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return res.Ready, nil
|
||||
}
|
||||
|
||||
// CreateSnapshot creates a snapshot of the specified block volume, and applies the provided
|
||||
// set of tags to the snapshot.
|
||||
func (c *BlockStoreGRPCClient) CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (string, error) {
|
||||
|
@ -292,21 +282,6 @@ func (s *BlockStoreGRPCServer) GetVolumeInfo(ctx context.Context, req *proto.Get
|
|||
return res, nil
|
||||
}
|
||||
|
||||
// IsVolumeReady returns whether the specified volume is ready to be used.
|
||||
func (s *BlockStoreGRPCServer) IsVolumeReady(ctx context.Context, req *proto.IsVolumeReadyRequest) (*proto.IsVolumeReadyResponse, error) {
|
||||
impl, err := s.getImpl(req.Plugin)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ready, err := impl.IsVolumeReady(req.VolumeID, req.VolumeAZ)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &proto.IsVolumeReadyResponse{Ready: ready}, nil
|
||||
}
|
||||
|
||||
// CreateSnapshot creates a snapshot of the specified block volume, and applies the provided
|
||||
// set of tags to the snapshot.
|
||||
func (s *BlockStoreGRPCServer) CreateSnapshot(ctx context.Context, req *proto.CreateSnapshotRequest) (*proto.CreateSnapshotResponse, error) {
|
||||
|
|
|
@ -20,8 +20,6 @@ It has these top-level messages:
|
|||
CreateVolumeResponse
|
||||
GetVolumeInfoRequest
|
||||
GetVolumeInfoResponse
|
||||
IsVolumeReadyRequest
|
||||
IsVolumeReadyResponse
|
||||
CreateSnapshotRequest
|
||||
CreateSnapshotResponse
|
||||
DeleteSnapshotRequest
|
||||
|
|
|
@ -137,54 +137,6 @@ func (m *GetVolumeInfoResponse) GetIops() int64 {
|
|||
return 0
|
||||
}
|
||||
|
||||
type IsVolumeReadyRequest struct {
|
||||
Plugin string `protobuf:"bytes,1,opt,name=plugin" json:"plugin,omitempty"`
|
||||
VolumeID string `protobuf:"bytes,2,opt,name=volumeID" json:"volumeID,omitempty"`
|
||||
VolumeAZ string `protobuf:"bytes,3,opt,name=volumeAZ" json:"volumeAZ,omitempty"`
|
||||
}
|
||||
|
||||
func (m *IsVolumeReadyRequest) Reset() { *m = IsVolumeReadyRequest{} }
|
||||
func (m *IsVolumeReadyRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*IsVolumeReadyRequest) ProtoMessage() {}
|
||||
func (*IsVolumeReadyRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{4} }
|
||||
|
||||
func (m *IsVolumeReadyRequest) GetPlugin() string {
|
||||
if m != nil {
|
||||
return m.Plugin
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *IsVolumeReadyRequest) GetVolumeID() string {
|
||||
if m != nil {
|
||||
return m.VolumeID
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *IsVolumeReadyRequest) GetVolumeAZ() string {
|
||||
if m != nil {
|
||||
return m.VolumeAZ
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type IsVolumeReadyResponse struct {
|
||||
Ready bool `protobuf:"varint,1,opt,name=ready" json:"ready,omitempty"`
|
||||
}
|
||||
|
||||
func (m *IsVolumeReadyResponse) Reset() { *m = IsVolumeReadyResponse{} }
|
||||
func (m *IsVolumeReadyResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*IsVolumeReadyResponse) ProtoMessage() {}
|
||||
func (*IsVolumeReadyResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{5} }
|
||||
|
||||
func (m *IsVolumeReadyResponse) GetReady() bool {
|
||||
if m != nil {
|
||||
return m.Ready
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type CreateSnapshotRequest struct {
|
||||
Plugin string `protobuf:"bytes,1,opt,name=plugin" json:"plugin,omitempty"`
|
||||
VolumeID string `protobuf:"bytes,2,opt,name=volumeID" json:"volumeID,omitempty"`
|
||||
|
@ -358,8 +310,6 @@ func init() {
|
|||
proto.RegisterType((*CreateVolumeResponse)(nil), "generated.CreateVolumeResponse")
|
||||
proto.RegisterType((*GetVolumeInfoRequest)(nil), "generated.GetVolumeInfoRequest")
|
||||
proto.RegisterType((*GetVolumeInfoResponse)(nil), "generated.GetVolumeInfoResponse")
|
||||
proto.RegisterType((*IsVolumeReadyRequest)(nil), "generated.IsVolumeReadyRequest")
|
||||
proto.RegisterType((*IsVolumeReadyResponse)(nil), "generated.IsVolumeReadyResponse")
|
||||
proto.RegisterType((*CreateSnapshotRequest)(nil), "generated.CreateSnapshotRequest")
|
||||
proto.RegisterType((*CreateSnapshotResponse)(nil), "generated.CreateSnapshotResponse")
|
||||
proto.RegisterType((*DeleteSnapshotRequest)(nil), "generated.DeleteSnapshotRequest")
|
||||
|
@ -383,7 +333,6 @@ type BlockStoreClient interface {
|
|||
Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*Empty, error)
|
||||
CreateVolumeFromSnapshot(ctx context.Context, in *CreateVolumeRequest, opts ...grpc.CallOption) (*CreateVolumeResponse, error)
|
||||
GetVolumeInfo(ctx context.Context, in *GetVolumeInfoRequest, opts ...grpc.CallOption) (*GetVolumeInfoResponse, error)
|
||||
IsVolumeReady(ctx context.Context, in *IsVolumeReadyRequest, opts ...grpc.CallOption) (*IsVolumeReadyResponse, error)
|
||||
CreateSnapshot(ctx context.Context, in *CreateSnapshotRequest, opts ...grpc.CallOption) (*CreateSnapshotResponse, error)
|
||||
DeleteSnapshot(ctx context.Context, in *DeleteSnapshotRequest, opts ...grpc.CallOption) (*Empty, error)
|
||||
GetVolumeID(ctx context.Context, in *GetVolumeIDRequest, opts ...grpc.CallOption) (*GetVolumeIDResponse, error)
|
||||
|
@ -425,15 +374,6 @@ func (c *blockStoreClient) GetVolumeInfo(ctx context.Context, in *GetVolumeInfoR
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func (c *blockStoreClient) IsVolumeReady(ctx context.Context, in *IsVolumeReadyRequest, opts ...grpc.CallOption) (*IsVolumeReadyResponse, error) {
|
||||
out := new(IsVolumeReadyResponse)
|
||||
err := grpc.Invoke(ctx, "/generated.BlockStore/IsVolumeReady", in, out, c.cc, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *blockStoreClient) CreateSnapshot(ctx context.Context, in *CreateSnapshotRequest, opts ...grpc.CallOption) (*CreateSnapshotResponse, error) {
|
||||
out := new(CreateSnapshotResponse)
|
||||
err := grpc.Invoke(ctx, "/generated.BlockStore/CreateSnapshot", in, out, c.cc, opts...)
|
||||
|
@ -476,7 +416,6 @@ type BlockStoreServer interface {
|
|||
Init(context.Context, *InitRequest) (*Empty, error)
|
||||
CreateVolumeFromSnapshot(context.Context, *CreateVolumeRequest) (*CreateVolumeResponse, error)
|
||||
GetVolumeInfo(context.Context, *GetVolumeInfoRequest) (*GetVolumeInfoResponse, error)
|
||||
IsVolumeReady(context.Context, *IsVolumeReadyRequest) (*IsVolumeReadyResponse, error)
|
||||
CreateSnapshot(context.Context, *CreateSnapshotRequest) (*CreateSnapshotResponse, error)
|
||||
DeleteSnapshot(context.Context, *DeleteSnapshotRequest) (*Empty, error)
|
||||
GetVolumeID(context.Context, *GetVolumeIDRequest) (*GetVolumeIDResponse, error)
|
||||
|
@ -541,24 +480,6 @@ func _BlockStore_GetVolumeInfo_Handler(srv interface{}, ctx context.Context, dec
|
|||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _BlockStore_IsVolumeReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(IsVolumeReadyRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(BlockStoreServer).IsVolumeReady(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/generated.BlockStore/IsVolumeReady",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(BlockStoreServer).IsVolumeReady(ctx, req.(*IsVolumeReadyRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _BlockStore_CreateSnapshot_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(CreateSnapshotRequest)
|
||||
if err := dec(in); err != nil {
|
||||
|
@ -647,10 +568,6 @@ var _BlockStore_serviceDesc = grpc.ServiceDesc{
|
|||
MethodName: "GetVolumeInfo",
|
||||
Handler: _BlockStore_GetVolumeInfo_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "IsVolumeReady",
|
||||
Handler: _BlockStore_IsVolumeReady_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "CreateSnapshot",
|
||||
Handler: _BlockStore_CreateSnapshot_Handler,
|
||||
|
|
|
@ -26,16 +26,6 @@ message GetVolumeInfoResponse {
|
|||
int64 iops = 2;
|
||||
}
|
||||
|
||||
message IsVolumeReadyRequest {
|
||||
string plugin = 1;
|
||||
string volumeID = 2;
|
||||
string volumeAZ = 3;
|
||||
}
|
||||
|
||||
message IsVolumeReadyResponse {
|
||||
bool ready = 1;
|
||||
}
|
||||
|
||||
message CreateSnapshotRequest {
|
||||
string plugin = 1;
|
||||
string volumeID = 2;
|
||||
|
@ -75,7 +65,6 @@ service BlockStore {
|
|||
rpc Init(InitRequest) returns (Empty);
|
||||
rpc CreateVolumeFromSnapshot(CreateVolumeRequest) returns (CreateVolumeResponse);
|
||||
rpc GetVolumeInfo(GetVolumeInfoRequest) returns (GetVolumeInfoResponse);
|
||||
rpc IsVolumeReady(IsVolumeReadyRequest) returns (IsVolumeReadyResponse);
|
||||
rpc CreateSnapshot(CreateSnapshotRequest) returns (CreateSnapshotResponse);
|
||||
rpc DeleteSnapshot(DeleteSnapshotRequest) returns (Empty);
|
||||
rpc GetVolumeID(GetVolumeIDRequest) returns (GetVolumeIDResponse);
|
||||
|
|
|
@ -139,15 +139,6 @@ func (r *restartableBlockStore) GetVolumeInfo(volumeID string, volumeAZ string)
|
|||
return delegate.GetVolumeInfo(volumeID, volumeAZ)
|
||||
}
|
||||
|
||||
// IsVolumeReady restarts the plugin's process if needed, then delegates the call.
|
||||
func (r *restartableBlockStore) IsVolumeReady(volumeID string, volumeAZ string) (ready bool, err error) {
|
||||
delegate, err := r.getDelegate()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return delegate.IsVolumeReady(volumeID, volumeAZ)
|
||||
}
|
||||
|
||||
// CreateSnapshot restarts the plugin's process if needed, then delegates the call.
|
||||
func (r *restartableBlockStore) CreateSnapshot(volumeID string, volumeAZ string, tags map[string]string) (snapshotID string, err error) {
|
||||
delegate, err := r.getDelegate()
|
||||
|
|
|
@ -229,12 +229,6 @@ func TestRestartableBlockStoreDelegatedFunctions(t *testing.T) {
|
|||
expectedErrorOutputs: []interface{}{"", (*int64)(nil), errors.Errorf("reset error")},
|
||||
expectedDelegateOutputs: []interface{}{"volumeType", to.Int64Ptr(10000), errors.Errorf("delegate error")},
|
||||
},
|
||||
restartableDelegateTest{
|
||||
function: "IsVolumeReady",
|
||||
inputs: []interface{}{"volumeID", "volumeAZ"},
|
||||
expectedErrorOutputs: []interface{}{false, errors.Errorf("reset error")},
|
||||
expectedDelegateOutputs: []interface{}{true, errors.Errorf("delegate error")},
|
||||
},
|
||||
restartableDelegateTest{
|
||||
function: "CreateSnapshot",
|
||||
inputs: []interface{}{"volumeID", "volumeAZ", map[string]string{"a": "b"}},
|
||||
|
|
|
@ -73,7 +73,7 @@ type kindString string
|
|||
type kubernetesRestorer struct {
|
||||
discoveryHelper discovery.Helper
|
||||
dynamicFactory client.DynamicFactory
|
||||
snapshotService cloudprovider.SnapshotService
|
||||
blockStore cloudprovider.BlockStore
|
||||
backupClient arkv1client.BackupsGetter
|
||||
namespaceClient corev1.NamespaceInterface
|
||||
resticRestorerFactory restic.RestorerFactory
|
||||
|
@ -145,7 +145,7 @@ func prioritizeResources(helper discovery.Helper, priorities []string, includedR
|
|||
func NewKubernetesRestorer(
|
||||
discoveryHelper discovery.Helper,
|
||||
dynamicFactory client.DynamicFactory,
|
||||
snapshotService cloudprovider.SnapshotService,
|
||||
blockStore cloudprovider.BlockStore,
|
||||
resourcePriorities []string,
|
||||
backupClient arkv1client.BackupsGetter,
|
||||
namespaceClient corev1.NamespaceInterface,
|
||||
|
@ -156,7 +156,7 @@ func NewKubernetesRestorer(
|
|||
return &kubernetesRestorer{
|
||||
discoveryHelper: discoveryHelper,
|
||||
dynamicFactory: dynamicFactory,
|
||||
snapshotService: snapshotService,
|
||||
blockStore: blockStore,
|
||||
backupClient: backupClient,
|
||||
namespaceClient: namespaceClient,
|
||||
resticRestorerFactory: resticRestorerFactory,
|
||||
|
@ -224,7 +224,7 @@ func (kr *kubernetesRestorer) Restore(log logrus.FieldLogger, restore *api.Resto
|
|||
snapshotVolumes: backup.Spec.SnapshotVolumes,
|
||||
restorePVs: restore.Spec.RestorePVs,
|
||||
volumeBackups: backup.Status.VolumeBackups,
|
||||
snapshotService: kr.snapshotService,
|
||||
blockStore: kr.blockStore,
|
||||
}
|
||||
|
||||
restoreCtx := &context{
|
||||
|
@ -238,7 +238,7 @@ func (kr *kubernetesRestorer) Restore(log logrus.FieldLogger, restore *api.Resto
|
|||
fileSystem: kr.fileSystem,
|
||||
namespaceClient: kr.namespaceClient,
|
||||
actions: resolvedActions,
|
||||
snapshotService: kr.snapshotService,
|
||||
blockStore: kr.blockStore,
|
||||
resticRestorer: resticRestorer,
|
||||
pvsToProvision: sets.NewString(),
|
||||
pvRestorer: pvRestorer,
|
||||
|
@ -319,7 +319,7 @@ type context struct {
|
|||
fileSystem filesystem.Interface
|
||||
namespaceClient corev1.NamespaceInterface
|
||||
actions []resolvedAction
|
||||
snapshotService cloudprovider.SnapshotService
|
||||
blockStore cloudprovider.BlockStore
|
||||
resticRestorer restic.Restorer
|
||||
globalWaitGroup arksync.ErrorGroup
|
||||
resourceWaitGroup sync.WaitGroup
|
||||
|
@ -901,7 +901,7 @@ type pvRestorer struct {
|
|||
snapshotVolumes *bool
|
||||
restorePVs *bool
|
||||
volumeBackups map[string]*api.VolumeBackupInfo
|
||||
snapshotService cloudprovider.SnapshotService
|
||||
blockStore cloudprovider.BlockStore
|
||||
}
|
||||
|
||||
func (r *pvRestorer) executePVAction(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
||||
|
@ -937,7 +937,7 @@ func (r *pvRestorer) executePVAction(obj *unstructured.Unstructured) (*unstructu
|
|||
|
||||
// Past this point, we expect to be doing a restore
|
||||
|
||||
if r.snapshotService == nil {
|
||||
if r.blockStore == nil {
|
||||
return nil, errors.New("you must configure a persistentVolumeProvider to restore PersistentVolumes from snapshots")
|
||||
}
|
||||
|
||||
|
@ -949,13 +949,13 @@ func (r *pvRestorer) executePVAction(obj *unstructured.Unstructured) (*unstructu
|
|||
)
|
||||
|
||||
log.Info("restoring persistent volume from snapshot")
|
||||
volumeID, err := r.snapshotService.CreateVolumeFromSnapshot(backupInfo.SnapshotID, backupInfo.Type, backupInfo.AvailabilityZone, backupInfo.Iops)
|
||||
volumeID, err := r.blockStore.CreateVolumeFromSnapshot(backupInfo.SnapshotID, backupInfo.Type, backupInfo.AvailabilityZone, backupInfo.Iops)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Info("successfully restored persistent volume from snapshot")
|
||||
|
||||
updated1, err := r.snapshotService.SetVolumeID(obj, volumeID)
|
||||
updated1, err := r.blockStore.SetVolumeID(obj, volumeID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -1193,7 +1193,7 @@ func TestExecutePVAction(t *testing.T) {
|
|||
restore *api.Restore
|
||||
backup *api.Backup
|
||||
volumeMap map[api.VolumeBackupInfo]string
|
||||
noSnapshotService bool
|
||||
noBlockStore bool
|
||||
expectedErr bool
|
||||
expectedRes *unstructured.Unstructured
|
||||
volumeID string
|
||||
|
@ -1261,36 +1261,36 @@ func TestExecutePVAction(t *testing.T) {
|
|||
expectedRes: NewTestUnstructured().WithName("pv-1").WithSpec("xyz").Unstructured,
|
||||
},
|
||||
{
|
||||
name: "restoring, snapshotService=nil, backup has at least 1 snapshot -> error",
|
||||
obj: NewTestUnstructured().WithName("pv-1").WithSpecField("awsElasticBlockStore", make(map[string]interface{})).Unstructured,
|
||||
restore: arktest.NewDefaultTestRestore().Restore,
|
||||
backup: &api.Backup{Status: api.BackupStatus{VolumeBackups: map[string]*api.VolumeBackupInfo{"pv-1": {SnapshotID: "snap-1"}}}},
|
||||
volumeMap: map[api.VolumeBackupInfo]string{{SnapshotID: "snap-1"}: "volume-1"},
|
||||
volumeID: "volume-1",
|
||||
noSnapshotService: true,
|
||||
expectedErr: true,
|
||||
expectedRes: NewTestUnstructured().WithName("pv-1").WithSpecField("awsElasticBlockStore", make(map[string]interface{})).Unstructured,
|
||||
name: "restoring, blockStore=nil, backup has at least 1 snapshot -> error",
|
||||
obj: NewTestUnstructured().WithName("pv-1").WithSpecField("awsElasticBlockStore", make(map[string]interface{})).Unstructured,
|
||||
restore: arktest.NewDefaultTestRestore().Restore,
|
||||
backup: &api.Backup{Status: api.BackupStatus{VolumeBackups: map[string]*api.VolumeBackupInfo{"pv-1": {SnapshotID: "snap-1"}}}},
|
||||
volumeMap: map[api.VolumeBackupInfo]string{{SnapshotID: "snap-1"}: "volume-1"},
|
||||
volumeID: "volume-1",
|
||||
noBlockStore: true,
|
||||
expectedErr: true,
|
||||
expectedRes: NewTestUnstructured().WithName("pv-1").WithSpecField("awsElasticBlockStore", make(map[string]interface{})).Unstructured,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
var (
|
||||
snapshotService cloudprovider.SnapshotService
|
||||
fakeSnapshotService *arktest.FakeSnapshotService
|
||||
blockStore cloudprovider.BlockStore
|
||||
fakeBlockStore *arktest.FakeBlockStore
|
||||
)
|
||||
if !test.noSnapshotService {
|
||||
fakeSnapshotService = &arktest.FakeSnapshotService{
|
||||
if !test.noBlockStore {
|
||||
fakeBlockStore = &arktest.FakeBlockStore{
|
||||
RestorableVolumes: test.volumeMap,
|
||||
VolumeID: test.volumeID,
|
||||
}
|
||||
snapshotService = fakeSnapshotService
|
||||
blockStore = fakeBlockStore
|
||||
}
|
||||
|
||||
r := &pvRestorer{
|
||||
logger: arktest.NewLogger(),
|
||||
restorePVs: test.restore.Spec.RestorePVs,
|
||||
snapshotService: snapshotService,
|
||||
logger: arktest.NewLogger(),
|
||||
restorePVs: test.restore.Spec.RestorePVs,
|
||||
blockStore: blockStore,
|
||||
}
|
||||
if test.backup != nil {
|
||||
r.snapshotVolumes = test.backup.Spec.SnapshotVolumes
|
||||
|
@ -1306,9 +1306,9 @@ func TestExecutePVAction(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
if test.expectSetVolumeID {
|
||||
assert.Equal(t, test.volumeID, fakeSnapshotService.VolumeIDSet)
|
||||
assert.Equal(t, test.volumeID, fakeBlockStore.VolumeIDSet)
|
||||
} else {
|
||||
assert.Equal(t, "", fakeSnapshotService.VolumeIDSet)
|
||||
assert.Equal(t, "", fakeBlockStore.VolumeIDSet)
|
||||
}
|
||||
assert.Equal(t, test.expectedRes, res)
|
||||
})
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
api "github.com/heptio/ark/pkg/apis/ark/v1"
|
||||
)
|
||||
|
||||
type FakeSnapshotService struct {
|
||||
type FakeBlockStore struct {
|
||||
// SnapshotID->VolumeID
|
||||
SnapshotsTaken sets.String
|
||||
|
||||
|
@ -41,26 +41,30 @@ type FakeSnapshotService struct {
|
|||
Error error
|
||||
}
|
||||
|
||||
func (s *FakeSnapshotService) CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (string, error) {
|
||||
if s.Error != nil {
|
||||
return "", s.Error
|
||||
func (bs *FakeBlockStore) Init(config map[string]string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bs *FakeBlockStore) CreateSnapshot(volumeID, volumeAZ string, tags map[string]string) (string, error) {
|
||||
if bs.Error != nil {
|
||||
return "", bs.Error
|
||||
}
|
||||
|
||||
if _, exists := s.SnapshottableVolumes[volumeID]; !exists {
|
||||
if _, exists := bs.SnapshottableVolumes[volumeID]; !exists {
|
||||
return "", errors.New("snapshottable volume not found")
|
||||
}
|
||||
|
||||
if s.SnapshotsTaken == nil {
|
||||
s.SnapshotsTaken = sets.NewString()
|
||||
if bs.SnapshotsTaken == nil {
|
||||
bs.SnapshotsTaken = sets.NewString()
|
||||
}
|
||||
s.SnapshotsTaken.Insert(s.SnapshottableVolumes[volumeID].SnapshotID)
|
||||
bs.SnapshotsTaken.Insert(bs.SnapshottableVolumes[volumeID].SnapshotID)
|
||||
|
||||
return s.SnapshottableVolumes[volumeID].SnapshotID, nil
|
||||
return bs.SnapshottableVolumes[volumeID].SnapshotID, nil
|
||||
}
|
||||
|
||||
func (s *FakeSnapshotService) CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ string, iops *int64) (string, error) {
|
||||
if s.Error != nil {
|
||||
return "", s.Error
|
||||
func (bs *FakeBlockStore) CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ string, iops *int64) (string, error) {
|
||||
if bs.Error != nil {
|
||||
return "", bs.Error
|
||||
}
|
||||
|
||||
key := api.VolumeBackupInfo{
|
||||
|
@ -70,44 +74,44 @@ func (s *FakeSnapshotService) CreateVolumeFromSnapshot(snapshotID, volumeType, v
|
|||
AvailabilityZone: volumeAZ,
|
||||
}
|
||||
|
||||
return s.RestorableVolumes[key], nil
|
||||
return bs.RestorableVolumes[key], nil
|
||||
}
|
||||
|
||||
func (s *FakeSnapshotService) DeleteSnapshot(snapshotID string) error {
|
||||
if s.Error != nil {
|
||||
return s.Error
|
||||
func (bs *FakeBlockStore) DeleteSnapshot(snapshotID string) error {
|
||||
if bs.Error != nil {
|
||||
return bs.Error
|
||||
}
|
||||
|
||||
if !s.SnapshotsTaken.Has(snapshotID) {
|
||||
if !bs.SnapshotsTaken.Has(snapshotID) {
|
||||
return errors.New("snapshot not found")
|
||||
}
|
||||
|
||||
s.SnapshotsTaken.Delete(snapshotID)
|
||||
bs.SnapshotsTaken.Delete(snapshotID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *FakeSnapshotService) GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, error) {
|
||||
if s.Error != nil {
|
||||
return "", nil, s.Error
|
||||
func (bs *FakeBlockStore) GetVolumeInfo(volumeID, volumeAZ string) (string, *int64, error) {
|
||||
if bs.Error != nil {
|
||||
return "", nil, bs.Error
|
||||
}
|
||||
|
||||
if volumeInfo, exists := s.SnapshottableVolumes[volumeID]; !exists {
|
||||
if volumeInfo, exists := bs.SnapshottableVolumes[volumeID]; !exists {
|
||||
return "", nil, errors.New("VolumeID not found")
|
||||
} else {
|
||||
return volumeInfo.Type, volumeInfo.Iops, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *FakeSnapshotService) GetVolumeID(pv runtime.Unstructured) (string, error) {
|
||||
if s.Error != nil {
|
||||
return "", s.Error
|
||||
func (bs *FakeBlockStore) GetVolumeID(pv runtime.Unstructured) (string, error) {
|
||||
if bs.Error != nil {
|
||||
return "", bs.Error
|
||||
}
|
||||
|
||||
return s.VolumeID, nil
|
||||
return bs.VolumeID, nil
|
||||
}
|
||||
|
||||
func (s *FakeSnapshotService) SetVolumeID(pv runtime.Unstructured, volumeID string) (runtime.Unstructured, error) {
|
||||
s.VolumeIDSet = volumeID
|
||||
return pv, s.Error
|
||||
func (bs *FakeBlockStore) SetVolumeID(pv runtime.Unstructured, volumeID string) (runtime.Unstructured, error) {
|
||||
bs.VolumeIDSet = volumeID
|
||||
return pv, bs.Error
|
||||
}
|
Loading…
Reference in New Issue