kopia lib

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
pull/5233/head
Lyndon-Li 2022-08-19 13:47:00 +08:00
parent 1ba7b3de4f
commit 3769cd218a
18 changed files with 2616 additions and 172 deletions

View File

@ -0,0 +1,2 @@
Add changes for Kopia Integration: Kopia Lib - method implementation
Add changes to write Kopia Repository logs to Velero log

View File

@ -93,126 +93,140 @@ Velero by default uses the Unified Repository for all kinds of data movement, it
## The Unified Repository Interface
Below are the definitions of the Unified Repository Interface. All the functions are synchronization functions.
```
///BackupRepoService is used to initialize, open or maintain a backup repository
// BackupRepoService is used to initialize, open or maintain a backup repository
type BackupRepoService interface {
///Create a backup repository or connect to an existing backup repository
///repoOption: option to the backup repository and the underlying backup storage
///createNew: indicates whether to create a new or connect to an existing backup repository
///result: the backup repository specific output that could be used to open the backup repository later
Init(ctx context.Context, repoOption RepoOptions, createNew bool) error
///Open an backup repository that has been created/connected
///repoOption: options to open the backup repository and the underlying storage
Open(ctx context.Context, repoOption RepoOptions) (BackupRepo, error)
///Periodically called to maintain the backup repository to eliminate redundant data and improve performance
///repoOption: options to maintain the backup repository
Maintain(ctx context.Context, repoOption RepoOptions) error
// Init creates a backup repository or connect to an existing backup repository.
// repoOption: option to the backup repository and the underlying backup storage.
// createNew: indicates whether to create a new or connect to an existing backup repository.
Init(ctx context.Context, repoOption RepoOptions, createNew bool) error
// Open opens an backup repository that has been created/connected.
// repoOption: options to open the backup repository and the underlying storage.
Open(ctx context.Context, repoOption RepoOptions) (BackupRepo, error)
// Maintain is periodically called to maintain the backup repository to eliminate redundant data.
// repoOption: options to maintain the backup repository.
Maintain(ctx context.Context, repoOption RepoOptions) error
// DefaultMaintenanceFrequency returns the defgault frequency of maintenance, callers refer this
// frequency to maintain the backup repository to get the best maintenance performance
DefaultMaintenanceFrequency() time.Duration
}
///BackupRepo provides the access to the backup repository
// BackupRepo provides the access to the backup repository
type BackupRepo interface {
///Open an existing object for read
///id: the object's unified identifier
OpenObject(ctx context.Context, id ID) (ObjectReader, error)
///Get a manifest data
GetManifest(ctx context.Context, id ID, mani *RepoManifest) error
///Get one or more manifest data that match the given labels
FindManifests(ctx context.Context, filter ManifestFilter) ([]*ManifestEntryMetadata, error)
///Create a new object and return the object's writer interface
///return: A unified identifier of the object on success
NewObjectWriter(ctx context.Context, opt ObjectWriteOptions) ObjectWriter
///Save a manifest object
PutManifest(ctx context.Context, mani RepoManifest) (ID, error)
///Delete a manifest object
DeleteManifest(ctx context.Context, id ID) error
///Flush all the backup repository data
Flush(ctx context.Context) error
///Get the local time of the backup repository. It may be different from the time of the caller
Time() time.Time
///Close the backup repository
Close(ctx context.Context) error
}
// OpenObject opens an existing object for read.
// id: the object's unified identifier.
OpenObject(ctx context.Context, id ID) (ObjectReader, error)
// GetManifest gets a manifest data from the backup repository.
GetManifest(ctx context.Context, id ID, mani *RepoManifest) error
// FindManifests gets one or more manifest data that match the given labels
FindManifests(ctx context.Context, filter ManifestFilter) ([]*ManifestEntryMetadata, error)
// NewObjectWriter creates a new object and return the object's writer interface.
// return: A unified identifier of the object on success.
NewObjectWriter(ctx context.Context, opt ObjectWriteOptions) ObjectWriter
// PutManifest saves a manifest object into the backup repository.
PutManifest(ctx context.Context, mani RepoManifest) (ID, error)
// DeleteManifest deletes a manifest object from the backup repository.
DeleteManifest(ctx context.Context, id ID) error
// Flush flushes all the backup repository data
Flush(ctx context.Context) error
// Time returns the local time of the backup repository. It may be different from the time of the caller
Time() time.Time
// Close closes the backup repository
Close(ctx context.Context) error
type ObjectReader interface {
io.ReadCloser
io.Seeker
///Length returns the logical size of the object
Length() int64
io.ReadCloser
io.Seeker
// Length returns the logical size of the object
Length() int64
}
type ObjectWriter interface {
io.WriteCloser
///For some cases, i.e. block incremental, the object is not written sequentially
io.Seeker
// Periodically called to preserve the state of data written to the repo so far
// Return a unified identifier that represent the current state
// An empty ID could be returned on success if the backup repository doesn't support this
Checkpoint() (ID, error)
io.WriteCloser
///Wait for the completion of the object write
///Result returns the object's unified identifier after the write completes
Result() (ID, error)
}
// Seeker is used in the cases that the object is not written sequentially
io.Seeker
// Checkpoint is periodically called to preserve the state of data written to the repo so far.
// Checkpoint returns a unified identifier that represent the current state.
// An empty ID could be returned on success if the backup repository doesn't support this.
Checkpoint() (ID, error)
// Result waits for the completion of the object write.
// Result returns the object's unified identifier after the write completes.
Result() (ID, error)
}
```
Some data structure & constants used by the interfaces:
```
```
type RepoOptions struct {
///A repository specific string to identify a backup storage, i.e., "s3", "filesystem"
StorageType string
///Backup repository password, if any
RepoPassword string
///A custom path to save the repository's configuration, if any
ConfigFilePath string
///Other repository specific options
GeneralOptions map[string]string
///Storage specific options
StorageOptions map[string]string
// StorageType is a repository specific string to identify a backup storage, i.e., "s3", "filesystem"
StorageType string
// RepoPassword is the backup repository's password, if any
RepoPassword string
// ConfigFilePath is a custom path to save the repository's configuration, if any
ConfigFilePath string
// GeneralOptions takes other repository specific options
GeneralOptions map[string]string
// StorageOptions takes storage specific options
StorageOptions map[string]string
// Description is a description of the backup repository/backup repository operation.
// It is for logging/debugging purpose only and doesn't control any behavior of the backup repository.
Description string
}
///ObjectWriteOptions defines the options when creating an object for write
// ObjectWriteOptions defines the options when creating an object for write
type ObjectWriteOptions struct {
FullPath string ///Full logical path of the object
Description string ///A description of the object, could be empty
Prefix ID ///A prefix of the name used to save the object
AccessMode int ///OBJECT_DATA_ACCESS_*
BackupMode int ///OBJECT_DATA_BACKUP_*
FullPath string // Full logical path of the object
DataType int // OBJECT_DATA_TYPE_*
Description string // A description of the object, could be empty
Prefix ID // A prefix of the name used to save the object
AccessMode int // OBJECT_DATA_ACCESS_*
BackupMode int // OBJECT_DATA_BACKUP_*
}
const (
///Below consts defines the access mode when creating an object for write
OBJECT_DATA_ACCESS_MODE_UNKNOWN int = 0
OBJECT_DATA_ACCESS_MODE_FILE int = 1
OBJECT_DATA_ACCESS_MODE_BLOCK int = 2
// Below consts descrbe the data type of one object.
// Metadata: This type describes how the data is organized.
// For a file system backup, the Metadata describes a Dir or File.
// For a block backup, the Metadata describes a Disk and its incremental link.
ObjectDataTypeUnknown int = 0
ObjectDataTypeMetadata int = 1
ObjectDataTypeData int = 2
OBJECT_DATA_BACKUP_MODE_UNKNOWN int = 0
OBJECT_DATA_BACKUP_MODE_FULL int = 1
OBJECT_DATA_BACKUP_MODE_INC int = 2
// Below consts defines the access mode when creating an object for write
ObjectDataAccessModeUnknown int = 0
ObjectDataAccessModeFile int = 1
ObjectDataAccessModeBlock int = 2
ObjectDataBackupModeUnknown int = 0
ObjectDataBackupModeFull int = 1
ObjectDataBackupModeInc int = 2
)
///ManifestEntryMetadata is the metadata describing one manifest data
// ManifestEntryMetadata is the metadata describing one manifest data
type ManifestEntryMetadata struct {
ID ID ///The ID of the manifest data
Length int32 ///The data size of the manifest data
Labels map[string]string ///Labels saved together with the manifest data
ModTime time.Time ///Modified time of the manifest data
ID ID // The ID of the manifest data
Length int32 // The data size of the manifest data
Labels map[string]string // Labels saved together with the manifest data
ModTime time.Time // Modified time of the manifest data
}
type RepoManifest struct {
Payload interface{} ///The user data of manifest
Metadata *ManifestEntryMetadata ///The metadata data of manifest
Payload interface{} // The user data of manifest
Metadata *ManifestEntryMetadata // The metadata data of manifest
}
type ManifestFilter struct {

1
go.mod
View File

@ -76,6 +76,7 @@ require (
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/go-logr/zapr v0.4.0 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.4.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect

2
go.sum
View File

@ -323,6 +323,7 @@ github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6Wezm
github.com/gobwas/ws v1.1.0/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
@ -552,6 +553,7 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubernetes-csi/external-snapshotter/client/v4 v4.2.0 h1:nHHjmvjitIiyPlUHk/ofpgvBcNcawJLtf4PYHORLjAA=
github.com/kubernetes-csi/external-snapshotter/client/v4 v4.2.0/go.mod h1:YBCo4DoEeDndqvAn6eeu0vWM7QdXmHEeI9cFWplmBys=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=

View File

@ -18,6 +18,7 @@ package provider
import (
"context"
"time"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
)
@ -30,27 +31,27 @@ type RepoParam struct {
// Provider defines the methods to manipulate a backup repository
type Provider interface {
//InitRepo is to initialize a repository from a new storage place
// InitRepo is to initialize a repository from a new storage place
InitRepo(ctx context.Context, param RepoParam) error
//ConnectToRepo is to establish the connection to a
//storage place that a repository is already initialized
// ConnectToRepo is to establish the connection to a
// storage place that a repository is already initialized
ConnectToRepo(ctx context.Context, param RepoParam) error
//PrepareRepo is a combination of InitRepo and ConnectToRepo,
//it may do initializing + connecting, connecting only if the repository
//is already initialized, or do nothing if the repository is already connected
// PrepareRepo is a combination of InitRepo and ConnectToRepo,
// it may do initializing + connecting, connecting only if the repository
// is already initialized, or do nothing if the repository is already connected
PrepareRepo(ctx context.Context, param RepoParam) error
//PruneRepo does a full prune/maintenance of the repository
// PruneRepo does a full prune/maintenance of the repository
PruneRepo(ctx context.Context, param RepoParam) error
//PruneRepoQuick does a quick prune/maintenance of the repository if available
PruneRepoQuick(ctx context.Context, param RepoParam) error
//EnsureUnlockRepo esures to remove any stale file locks in the storage
// EnsureUnlockRepo esures to remove any stale file locks in the storage
EnsureUnlockRepo(ctx context.Context, param RepoParam) error
//Forget is to delete a snapshot from the repository
// Forget is to delete a snapshot from the repository
Forget(ctx context.Context, snapshotID string, param RepoParam) error
// DefaultMaintenanceFrequency returns the default frequency to run maintenance
DefaultMaintenanceFrequency(ctx context.Context, param RepoParam) time.Duration
}

View File

@ -18,6 +18,7 @@ package provider
import (
"context"
"time"
"github.com/sirupsen/logrus"
@ -55,11 +56,6 @@ func (r *resticRepositoryProvider) PruneRepo(ctx context.Context, param RepoPara
return r.svc.PruneRepo(param.BackupLocation, param.BackupRepo)
}
func (r *resticRepositoryProvider) PruneRepoQuick(ctx context.Context, param RepoParam) error {
// restic doesn't support this operation
return nil
}
func (r *resticRepositoryProvider) EnsureUnlockRepo(ctx context.Context, param RepoParam) error {
return r.svc.UnlockRepo(param.BackupLocation, param.BackupRepo)
}
@ -67,3 +63,7 @@ func (r *resticRepositoryProvider) EnsureUnlockRepo(ctx context.Context, param R
func (r *resticRepositoryProvider) Forget(ctx context.Context, snapshotID string, param RepoParam) error {
return r.svc.Forget(param.BackupLocation, param.BackupRepo, snapshotID)
}
func (r *resticRepositoryProvider) DefaultMaintenanceFrequency(ctx context.Context, param RepoParam) time.Duration {
return r.svc.DefaultMaintenanceFrequency()
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"path"
"strings"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -59,9 +60,8 @@ var funcTable = localFuncTable{
}
const (
repoOpDescFullMaintain = "full maintenance"
repoOpDescQuickMaintain = "quick maintenance"
repoOpDescForget = "forget"
repoOpDescMaintain = "repo maintenance"
repoOpDescForget = "forget"
repoConnectDesc = "unfied repo"
)
@ -70,7 +70,7 @@ const (
func NewUnifiedRepoProvider(
credentialGetter credentials.CredentialGetter,
log logrus.FieldLogger,
) (Provider, error) {
) Provider {
repo := unifiedRepoProvider{
credentialGetter: credentialGetter,
log: log,
@ -78,22 +78,21 @@ func NewUnifiedRepoProvider(
repo.repoService = createRepoService(log)
log.Debug("Finished create unified repo service")
return &repo, nil
return &repo
}
func (urp *unifiedRepoProvider) InitRepo(ctx context.Context, param RepoParam) error {
log := urp.log.WithFields(logrus.Fields{
"BSL name": param.BackupLocation.Name,
"BSL UID": param.BackupLocation.UID,
"BSL name": param.BackupLocation.Name,
"repo name": param.BackupRepo.Name,
"repo UID": param.BackupRepo.UID,
})
log.Debug("Start to init repo")
repoOption, err := udmrepo.NewRepoOptions(
udmrepo.WithPassword(urp, param),
udmrepo.WithConfigFile(urp.workPath, string(param.BackupLocation.UID)),
udmrepo.WithConfigFile(urp.workPath, string(param.BackupRepo.UID)),
udmrepo.WithGenOptions(
map[string]string{
udmrepo.GenOptionOwnerName: udmrepo.GetRepoUser(),
@ -120,15 +119,16 @@ func (urp *unifiedRepoProvider) InitRepo(ctx context.Context, param RepoParam) e
func (urp *unifiedRepoProvider) ConnectToRepo(ctx context.Context, param RepoParam) error {
log := urp.log.WithFields(logrus.Fields{
"BSL name": param.BackupLocation.Name,
"BSL UID": param.BackupLocation.UID,
"BSL name": param.BackupLocation.Name,
"repo name": param.BackupRepo.Name,
"repo UID": param.BackupRepo.UID,
})
log.Debug("Start to connect repo")
repoOption, err := udmrepo.NewRepoOptions(
udmrepo.WithPassword(urp, param),
udmrepo.WithConfigFile(urp.workPath, string(param.BackupLocation.UID)),
udmrepo.WithConfigFile(urp.workPath, string(param.BackupRepo.UID)),
udmrepo.WithGenOptions(
map[string]string{
udmrepo.GenOptionOwnerName: udmrepo.GetRepoUser(),
@ -155,15 +155,16 @@ func (urp *unifiedRepoProvider) ConnectToRepo(ctx context.Context, param RepoPar
func (urp *unifiedRepoProvider) PrepareRepo(ctx context.Context, param RepoParam) error {
log := urp.log.WithFields(logrus.Fields{
"BSL name": param.BackupLocation.Name,
"BSL UID": param.BackupLocation.UID,
"BSL name": param.BackupLocation.Name,
"repo name": param.BackupRepo.Name,
"repo UID": param.BackupRepo.UID,
})
log.Debug("Start to prepare repo")
repoOption, err := udmrepo.NewRepoOptions(
udmrepo.WithPassword(urp, param),
udmrepo.WithConfigFile(urp.workPath, string(param.BackupLocation.UID)),
udmrepo.WithConfigFile(urp.workPath, string(param.BackupRepo.UID)),
udmrepo.WithGenOptions(
map[string]string{
udmrepo.GenOptionOwnerName: udmrepo.GetRepoUser(),
@ -196,21 +197,17 @@ func (urp *unifiedRepoProvider) PrepareRepo(ctx context.Context, param RepoParam
func (urp *unifiedRepoProvider) PruneRepo(ctx context.Context, param RepoParam) error {
log := urp.log.WithFields(logrus.Fields{
"BSL name": param.BackupLocation.Name,
"BSL UID": param.BackupLocation.UID,
"BSL name": param.BackupLocation.Name,
"repo name": param.BackupRepo.Name,
"repo UID": param.BackupRepo.UID,
})
log.Debug("Start to prune repo")
repoOption, err := udmrepo.NewRepoOptions(
udmrepo.WithPassword(urp, param),
udmrepo.WithConfigFile(urp.workPath, string(param.BackupLocation.UID)),
udmrepo.WithGenOptions(
map[string]string{
udmrepo.GenOptionMaintainMode: udmrepo.GenOptionMaintainFull,
},
),
udmrepo.WithDescription(repoOpDescFullMaintain),
udmrepo.WithConfigFile(urp.workPath, string(param.BackupRepo.UID)),
udmrepo.WithDescription(repoOpDescMaintain),
)
if err != nil {
@ -227,39 +224,6 @@ func (urp *unifiedRepoProvider) PruneRepo(ctx context.Context, param RepoParam)
return nil
}
func (urp *unifiedRepoProvider) PruneRepoQuick(ctx context.Context, param RepoParam) error {
log := urp.log.WithFields(logrus.Fields{
"BSL name": param.BackupLocation.Name,
"BSL UID": param.BackupLocation.UID,
})
log.Debug("Start to prune repo quick")
repoOption, err := udmrepo.NewRepoOptions(
udmrepo.WithPassword(urp, param),
udmrepo.WithConfigFile(urp.workPath, string(param.BackupLocation.UID)),
udmrepo.WithGenOptions(
map[string]string{
udmrepo.GenOptionMaintainMode: udmrepo.GenOptionMaintainQuick,
},
),
udmrepo.WithDescription(repoOpDescQuickMaintain),
)
if err != nil {
return errors.Wrap(err, "error to get repo options")
}
err = urp.repoService.Maintain(ctx, *repoOption)
if err != nil {
return errors.Wrap(err, "error to prune backup repo quick")
}
log.Debug("Prune repo quick complete")
return nil
}
func (urp *unifiedRepoProvider) EnsureUnlockRepo(ctx context.Context, param RepoParam) error {
return nil
}
@ -267,7 +231,8 @@ func (urp *unifiedRepoProvider) EnsureUnlockRepo(ctx context.Context, param Repo
func (urp *unifiedRepoProvider) Forget(ctx context.Context, snapshotID string, param RepoParam) error {
log := urp.log.WithFields(logrus.Fields{
"BSL name": param.BackupLocation.Name,
"BSL UID": param.BackupLocation.UID,
"repo name": param.BackupRepo.Name,
"repo UID": param.BackupRepo.UID,
"snapshotID": snapshotID,
})
@ -275,7 +240,7 @@ func (urp *unifiedRepoProvider) Forget(ctx context.Context, snapshotID string, p
repoOption, err := udmrepo.NewRepoOptions(
udmrepo.WithPassword(urp, param),
udmrepo.WithConfigFile(urp.workPath, string(param.BackupLocation.UID)),
udmrepo.WithConfigFile(urp.workPath, string(param.BackupRepo.UID)),
udmrepo.WithDescription(repoOpDescForget),
)
@ -305,6 +270,10 @@ func (urp *unifiedRepoProvider) Forget(ctx context.Context, snapshotID string, p
return nil
}
func (urp *unifiedRepoProvider) DefaultMaintenanceFrequency(ctx context.Context, param RepoParam) time.Duration {
return urp.repoService.DefaultMaintenanceFrequency()
}
func (urp *unifiedRepoProvider) GetPassword(param interface{}) (string, error) {
repoParam, ok := param.(RepoParam)
if !ok {

View File

@ -775,6 +775,7 @@ func TestForget(t *testing.T) {
err := urp.Forget(context.Background(), "", RepoParam{
BackupLocation: &velerov1api.BackupStorageLocation{},
BackupRepo: &velerov1api.BackupRepository{},
})
if tc.expectedErr == "" {

View File

@ -18,6 +18,7 @@ package restic
import (
"os"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -71,6 +72,10 @@ func (r *RepositoryService) Forget(bsl *velerov1api.BackupStorageLocation, repo
return r.exec(restic.ForgetCommand(repo.Spec.ResticIdentifier, snapshotID), bsl)
}
func (r *RepositoryService) DefaultMaintenanceFrequency() time.Duration {
return restic.DefaultMaintenanceFrequency
}
func (r *RepositoryService) exec(cmd *restic.Command, bsl *velerov1api.BackupStorageLocation) error {
file, err := r.credentialsFileStore.Path(repokey.RepoKeySelector())
if err != nil {

View File

@ -0,0 +1,542 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
package mocks
import (
blob "github.com/kopia/kopia/repo/blob"
content "github.com/kopia/kopia/repo/content"
context "context"
index "github.com/kopia/kopia/repo/content/index"
manifest "github.com/kopia/kopia/repo/manifest"
mock "github.com/stretchr/testify/mock"
object "github.com/kopia/kopia/repo/object"
repo "github.com/kopia/kopia/repo"
throttling "github.com/kopia/kopia/repo/blob/throttling"
time "time"
)
// DirectRepository is an autogenerated mock type for the DirectRepository type
type DirectRepository struct {
mock.Mock
}
// AlsoLogToContentLog provides a mock function with given fields: ctx
func (_m *DirectRepository) AlsoLogToContentLog(ctx context.Context) context.Context {
ret := _m.Called(ctx)
var r0 context.Context
if rf, ok := ret.Get(0).(func(context.Context) context.Context); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(context.Context)
}
}
return r0
}
// BlobCfg provides a mock function with given fields:
func (_m *DirectRepository) BlobCfg() content.BlobCfgBlob {
ret := _m.Called()
var r0 content.BlobCfgBlob
if rf, ok := ret.Get(0).(func() content.BlobCfgBlob); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(content.BlobCfgBlob)
}
return r0
}
// BlobReader provides a mock function with given fields:
func (_m *DirectRepository) BlobReader() blob.Reader {
ret := _m.Called()
var r0 blob.Reader
if rf, ok := ret.Get(0).(func() blob.Reader); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(blob.Reader)
}
}
return r0
}
// BlobVolume provides a mock function with given fields:
func (_m *DirectRepository) BlobVolume() blob.Volume {
ret := _m.Called()
var r0 blob.Volume
if rf, ok := ret.Get(0).(func() blob.Volume); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(blob.Volume)
}
}
return r0
}
// ClientOptions provides a mock function with given fields:
func (_m *DirectRepository) ClientOptions() repo.ClientOptions {
ret := _m.Called()
var r0 repo.ClientOptions
if rf, ok := ret.Get(0).(func() repo.ClientOptions); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(repo.ClientOptions)
}
return r0
}
// Close provides a mock function with given fields: ctx
func (_m *DirectRepository) Close(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// ConfigFilename provides a mock function with given fields:
func (_m *DirectRepository) ConfigFilename() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// ContentInfo provides a mock function with given fields: ctx, contentID
func (_m *DirectRepository) ContentInfo(ctx context.Context, contentID index.ID) (index.Info, error) {
ret := _m.Called(ctx, contentID)
var r0 index.Info
if rf, ok := ret.Get(0).(func(context.Context, index.ID) index.Info); ok {
r0 = rf(ctx, contentID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(index.Info)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, index.ID) error); ok {
r1 = rf(ctx, contentID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ContentReader provides a mock function with given fields:
func (_m *DirectRepository) ContentReader() content.Reader {
ret := _m.Called()
var r0 content.Reader
if rf, ok := ret.Get(0).(func() content.Reader); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(content.Reader)
}
}
return r0
}
// Crypter provides a mock function with given fields:
func (_m *DirectRepository) Crypter() *content.Crypter {
ret := _m.Called()
var r0 *content.Crypter
if rf, ok := ret.Get(0).(func() *content.Crypter); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*content.Crypter)
}
}
return r0
}
// DeriveKey provides a mock function with given fields: purpose, keyLength
func (_m *DirectRepository) DeriveKey(purpose []byte, keyLength int) []byte {
ret := _m.Called(purpose, keyLength)
var r0 []byte
if rf, ok := ret.Get(0).(func([]byte, int) []byte); ok {
r0 = rf(purpose, keyLength)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}
return r0
}
// DisableIndexRefresh provides a mock function with given fields:
func (_m *DirectRepository) DisableIndexRefresh() {
_m.Called()
}
// FindManifests provides a mock function with given fields: ctx, labels
func (_m *DirectRepository) FindManifests(ctx context.Context, labels map[string]string) ([]*manifest.EntryMetadata, error) {
ret := _m.Called(ctx, labels)
var r0 []*manifest.EntryMetadata
if rf, ok := ret.Get(0).(func(context.Context, map[string]string) []*manifest.EntryMetadata); ok {
r0 = rf(ctx, labels)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*manifest.EntryMetadata)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, map[string]string) error); ok {
r1 = rf(ctx, labels)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetManifest provides a mock function with given fields: ctx, id, data
func (_m *DirectRepository) GetManifest(ctx context.Context, id manifest.ID, data interface{}) (*manifest.EntryMetadata, error) {
ret := _m.Called(ctx, id, data)
var r0 *manifest.EntryMetadata
if rf, ok := ret.Get(0).(func(context.Context, manifest.ID, interface{}) *manifest.EntryMetadata); ok {
r0 = rf(ctx, id, data)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*manifest.EntryMetadata)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, manifest.ID, interface{}) error); ok {
r1 = rf(ctx, id, data)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// IndexBlobs provides a mock function with given fields: ctx, includeInactive
func (_m *DirectRepository) IndexBlobs(ctx context.Context, includeInactive bool) ([]content.IndexBlobInfo, error) {
ret := _m.Called(ctx, includeInactive)
var r0 []content.IndexBlobInfo
if rf, ok := ret.Get(0).(func(context.Context, bool) []content.IndexBlobInfo); ok {
r0 = rf(ctx, includeInactive)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]content.IndexBlobInfo)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, bool) error); ok {
r1 = rf(ctx, includeInactive)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewDirectWriter provides a mock function with given fields: ctx, opt
func (_m *DirectRepository) NewDirectWriter(ctx context.Context, opt repo.WriteSessionOptions) (context.Context, repo.DirectRepositoryWriter, error) {
ret := _m.Called(ctx, opt)
var r0 context.Context
if rf, ok := ret.Get(0).(func(context.Context, repo.WriteSessionOptions) context.Context); ok {
r0 = rf(ctx, opt)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(context.Context)
}
}
var r1 repo.DirectRepositoryWriter
if rf, ok := ret.Get(1).(func(context.Context, repo.WriteSessionOptions) repo.DirectRepositoryWriter); ok {
r1 = rf(ctx, opt)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(repo.DirectRepositoryWriter)
}
}
var r2 error
if rf, ok := ret.Get(2).(func(context.Context, repo.WriteSessionOptions) error); ok {
r2 = rf(ctx, opt)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// NewWriter provides a mock function with given fields: ctx, opt
func (_m *DirectRepository) NewWriter(ctx context.Context, opt repo.WriteSessionOptions) (context.Context, repo.RepositoryWriter, error) {
ret := _m.Called(ctx, opt)
var r0 context.Context
if rf, ok := ret.Get(0).(func(context.Context, repo.WriteSessionOptions) context.Context); ok {
r0 = rf(ctx, opt)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(context.Context)
}
}
var r1 repo.RepositoryWriter
if rf, ok := ret.Get(1).(func(context.Context, repo.WriteSessionOptions) repo.RepositoryWriter); ok {
r1 = rf(ctx, opt)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(repo.RepositoryWriter)
}
}
var r2 error
if rf, ok := ret.Get(2).(func(context.Context, repo.WriteSessionOptions) error); ok {
r2 = rf(ctx, opt)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// ObjectFormat provides a mock function with given fields:
func (_m *DirectRepository) ObjectFormat() object.Format {
ret := _m.Called()
var r0 object.Format
if rf, ok := ret.Get(0).(func() object.Format); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(object.Format)
}
return r0
}
// OpenObject provides a mock function with given fields: ctx, id
func (_m *DirectRepository) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) {
ret := _m.Called(ctx, id)
var r0 object.Reader
if rf, ok := ret.Get(0).(func(context.Context, object.ID) object.Reader); ok {
r0 = rf(ctx, id)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(object.Reader)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, object.ID) error); ok {
r1 = rf(ctx, id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PrefetchContents provides a mock function with given fields: ctx, contentIDs, hint
func (_m *DirectRepository) PrefetchContents(ctx context.Context, contentIDs []index.ID, hint string) []index.ID {
ret := _m.Called(ctx, contentIDs, hint)
var r0 []index.ID
if rf, ok := ret.Get(0).(func(context.Context, []index.ID, string) []index.ID); ok {
r0 = rf(ctx, contentIDs, hint)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]index.ID)
}
}
return r0
}
// PrefetchObjects provides a mock function with given fields: ctx, objectIDs, hint
func (_m *DirectRepository) PrefetchObjects(ctx context.Context, objectIDs []object.ID, hint string) ([]index.ID, error) {
ret := _m.Called(ctx, objectIDs, hint)
var r0 []index.ID
if rf, ok := ret.Get(0).(func(context.Context, []object.ID, string) []index.ID); ok {
r0 = rf(ctx, objectIDs, hint)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]index.ID)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, []object.ID, string) error); ok {
r1 = rf(ctx, objectIDs, hint)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Refresh provides a mock function with given fields: ctx
func (_m *DirectRepository) Refresh(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// Throttler provides a mock function with given fields:
func (_m *DirectRepository) Throttler() throttling.SettableThrottler {
ret := _m.Called()
var r0 throttling.SettableThrottler
if rf, ok := ret.Get(0).(func() throttling.SettableThrottler); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(throttling.SettableThrottler)
}
}
return r0
}
// Time provides a mock function with given fields:
func (_m *DirectRepository) Time() time.Time {
ret := _m.Called()
var r0 time.Time
if rf, ok := ret.Get(0).(func() time.Time); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(time.Time)
}
return r0
}
// Token provides a mock function with given fields: password
func (_m *DirectRepository) Token(password string) (string, error) {
ret := _m.Called(password)
var r0 string
if rf, ok := ret.Get(0).(func(string) string); ok {
r0 = rf(password)
} else {
r0 = ret.Get(0).(string)
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(password)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// UniqueID provides a mock function with given fields:
func (_m *DirectRepository) UniqueID() []byte {
ret := _m.Called()
var r0 []byte
if rf, ok := ret.Get(0).(func() []byte); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}
return r0
}
// UpdateDescription provides a mock function with given fields: d
func (_m *DirectRepository) UpdateDescription(d string) {
_m.Called(d)
}
// VerifyObject provides a mock function with given fields: ctx, id
func (_m *DirectRepository) VerifyObject(ctx context.Context, id object.ID) ([]index.ID, error) {
ret := _m.Called(ctx, id)
var r0 []index.ID
if rf, ok := ret.Get(0).(func(context.Context, object.ID) []index.ID); ok {
r0 = rf(ctx, id)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]index.ID)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, object.ID) error); ok {
r1 = rf(ctx, id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
type mockConstructorTestingTNewDirectRepository interface {
mock.TestingT
Cleanup(func())
}
// NewDirectRepository creates a new instance of DirectRepository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewDirectRepository(t mockConstructorTestingTNewDirectRepository) *DirectRepository {
mock := &DirectRepository{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,718 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
package mocks
import (
blob "github.com/kopia/kopia/repo/blob"
content "github.com/kopia/kopia/repo/content"
context "context"
index "github.com/kopia/kopia/repo/content/index"
manifest "github.com/kopia/kopia/repo/manifest"
mock "github.com/stretchr/testify/mock"
object "github.com/kopia/kopia/repo/object"
repo "github.com/kopia/kopia/repo"
throttling "github.com/kopia/kopia/repo/blob/throttling"
time "time"
)
// DirectRepositoryWriter is an autogenerated mock type for the DirectRepositoryWriter type
type DirectRepositoryWriter struct {
mock.Mock
}
// AlsoLogToContentLog provides a mock function with given fields: ctx
func (_m *DirectRepositoryWriter) AlsoLogToContentLog(ctx context.Context) context.Context {
ret := _m.Called(ctx)
var r0 context.Context
if rf, ok := ret.Get(0).(func(context.Context) context.Context); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(context.Context)
}
}
return r0
}
// BlobCfg provides a mock function with given fields:
func (_m *DirectRepositoryWriter) BlobCfg() content.BlobCfgBlob {
ret := _m.Called()
var r0 content.BlobCfgBlob
if rf, ok := ret.Get(0).(func() content.BlobCfgBlob); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(content.BlobCfgBlob)
}
return r0
}
// BlobReader provides a mock function with given fields:
func (_m *DirectRepositoryWriter) BlobReader() blob.Reader {
ret := _m.Called()
var r0 blob.Reader
if rf, ok := ret.Get(0).(func() blob.Reader); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(blob.Reader)
}
}
return r0
}
// BlobStorage provides a mock function with given fields:
func (_m *DirectRepositoryWriter) BlobStorage() blob.Storage {
ret := _m.Called()
var r0 blob.Storage
if rf, ok := ret.Get(0).(func() blob.Storage); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(blob.Storage)
}
}
return r0
}
// BlobVolume provides a mock function with given fields:
func (_m *DirectRepositoryWriter) BlobVolume() blob.Volume {
ret := _m.Called()
var r0 blob.Volume
if rf, ok := ret.Get(0).(func() blob.Volume); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(blob.Volume)
}
}
return r0
}
// ChangePassword provides a mock function with given fields: ctx, newPassword
func (_m *DirectRepositoryWriter) ChangePassword(ctx context.Context, newPassword string) error {
ret := _m.Called(ctx, newPassword)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(ctx, newPassword)
} else {
r0 = ret.Error(0)
}
return r0
}
// ClientOptions provides a mock function with given fields:
func (_m *DirectRepositoryWriter) ClientOptions() repo.ClientOptions {
ret := _m.Called()
var r0 repo.ClientOptions
if rf, ok := ret.Get(0).(func() repo.ClientOptions); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(repo.ClientOptions)
}
return r0
}
// Close provides a mock function with given fields: ctx
func (_m *DirectRepositoryWriter) Close(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// CommitUpgrade provides a mock function with given fields: ctx
func (_m *DirectRepositoryWriter) CommitUpgrade(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// ConfigFilename provides a mock function with given fields:
func (_m *DirectRepositoryWriter) ConfigFilename() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// ContentInfo provides a mock function with given fields: ctx, contentID
func (_m *DirectRepositoryWriter) ContentInfo(ctx context.Context, contentID index.ID) (index.Info, error) {
ret := _m.Called(ctx, contentID)
var r0 index.Info
if rf, ok := ret.Get(0).(func(context.Context, index.ID) index.Info); ok {
r0 = rf(ctx, contentID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(index.Info)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, index.ID) error); ok {
r1 = rf(ctx, contentID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ContentManager provides a mock function with given fields:
func (_m *DirectRepositoryWriter) ContentManager() *content.WriteManager {
ret := _m.Called()
var r0 *content.WriteManager
if rf, ok := ret.Get(0).(func() *content.WriteManager); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*content.WriteManager)
}
}
return r0
}
// ContentReader provides a mock function with given fields:
func (_m *DirectRepositoryWriter) ContentReader() content.Reader {
ret := _m.Called()
var r0 content.Reader
if rf, ok := ret.Get(0).(func() content.Reader); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(content.Reader)
}
}
return r0
}
// Crypter provides a mock function with given fields:
func (_m *DirectRepositoryWriter) Crypter() *content.Crypter {
ret := _m.Called()
var r0 *content.Crypter
if rf, ok := ret.Get(0).(func() *content.Crypter); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*content.Crypter)
}
}
return r0
}
// DeleteManifest provides a mock function with given fields: ctx, id
func (_m *DirectRepositoryWriter) DeleteManifest(ctx context.Context, id manifest.ID) error {
ret := _m.Called(ctx, id)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, manifest.ID) error); ok {
r0 = rf(ctx, id)
} else {
r0 = ret.Error(0)
}
return r0
}
// DeriveKey provides a mock function with given fields: purpose, keyLength
func (_m *DirectRepositoryWriter) DeriveKey(purpose []byte, keyLength int) []byte {
ret := _m.Called(purpose, keyLength)
var r0 []byte
if rf, ok := ret.Get(0).(func([]byte, int) []byte); ok {
r0 = rf(purpose, keyLength)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}
return r0
}
// DisableIndexRefresh provides a mock function with given fields:
func (_m *DirectRepositoryWriter) DisableIndexRefresh() {
_m.Called()
}
// FindManifests provides a mock function with given fields: ctx, labels
func (_m *DirectRepositoryWriter) FindManifests(ctx context.Context, labels map[string]string) ([]*manifest.EntryMetadata, error) {
ret := _m.Called(ctx, labels)
var r0 []*manifest.EntryMetadata
if rf, ok := ret.Get(0).(func(context.Context, map[string]string) []*manifest.EntryMetadata); ok {
r0 = rf(ctx, labels)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*manifest.EntryMetadata)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, map[string]string) error); ok {
r1 = rf(ctx, labels)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Flush provides a mock function with given fields: ctx
func (_m *DirectRepositoryWriter) Flush(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// GetManifest provides a mock function with given fields: ctx, id, data
func (_m *DirectRepositoryWriter) GetManifest(ctx context.Context, id manifest.ID, data interface{}) (*manifest.EntryMetadata, error) {
ret := _m.Called(ctx, id, data)
var r0 *manifest.EntryMetadata
if rf, ok := ret.Get(0).(func(context.Context, manifest.ID, interface{}) *manifest.EntryMetadata); ok {
r0 = rf(ctx, id, data)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*manifest.EntryMetadata)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, manifest.ID, interface{}) error); ok {
r1 = rf(ctx, id, data)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// IndexBlobs provides a mock function with given fields: ctx, includeInactive
func (_m *DirectRepositoryWriter) IndexBlobs(ctx context.Context, includeInactive bool) ([]content.IndexBlobInfo, error) {
ret := _m.Called(ctx, includeInactive)
var r0 []content.IndexBlobInfo
if rf, ok := ret.Get(0).(func(context.Context, bool) []content.IndexBlobInfo); ok {
r0 = rf(ctx, includeInactive)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]content.IndexBlobInfo)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, bool) error); ok {
r1 = rf(ctx, includeInactive)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewDirectWriter provides a mock function with given fields: ctx, opt
func (_m *DirectRepositoryWriter) NewDirectWriter(ctx context.Context, opt repo.WriteSessionOptions) (context.Context, repo.DirectRepositoryWriter, error) {
ret := _m.Called(ctx, opt)
var r0 context.Context
if rf, ok := ret.Get(0).(func(context.Context, repo.WriteSessionOptions) context.Context); ok {
r0 = rf(ctx, opt)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(context.Context)
}
}
var r1 repo.DirectRepositoryWriter
if rf, ok := ret.Get(1).(func(context.Context, repo.WriteSessionOptions) repo.DirectRepositoryWriter); ok {
r1 = rf(ctx, opt)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(repo.DirectRepositoryWriter)
}
}
var r2 error
if rf, ok := ret.Get(2).(func(context.Context, repo.WriteSessionOptions) error); ok {
r2 = rf(ctx, opt)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// NewObjectWriter provides a mock function with given fields: ctx, opt
func (_m *DirectRepositoryWriter) NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer {
ret := _m.Called(ctx, opt)
var r0 object.Writer
if rf, ok := ret.Get(0).(func(context.Context, object.WriterOptions) object.Writer); ok {
r0 = rf(ctx, opt)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(object.Writer)
}
}
return r0
}
// NewWriter provides a mock function with given fields: ctx, opt
func (_m *DirectRepositoryWriter) NewWriter(ctx context.Context, opt repo.WriteSessionOptions) (context.Context, repo.RepositoryWriter, error) {
ret := _m.Called(ctx, opt)
var r0 context.Context
if rf, ok := ret.Get(0).(func(context.Context, repo.WriteSessionOptions) context.Context); ok {
r0 = rf(ctx, opt)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(context.Context)
}
}
var r1 repo.RepositoryWriter
if rf, ok := ret.Get(1).(func(context.Context, repo.WriteSessionOptions) repo.RepositoryWriter); ok {
r1 = rf(ctx, opt)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(repo.RepositoryWriter)
}
}
var r2 error
if rf, ok := ret.Get(2).(func(context.Context, repo.WriteSessionOptions) error); ok {
r2 = rf(ctx, opt)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// ObjectFormat provides a mock function with given fields:
func (_m *DirectRepositoryWriter) ObjectFormat() object.Format {
ret := _m.Called()
var r0 object.Format
if rf, ok := ret.Get(0).(func() object.Format); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(object.Format)
}
return r0
}
// OpenObject provides a mock function with given fields: ctx, id
func (_m *DirectRepositoryWriter) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) {
ret := _m.Called(ctx, id)
var r0 object.Reader
if rf, ok := ret.Get(0).(func(context.Context, object.ID) object.Reader); ok {
r0 = rf(ctx, id)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(object.Reader)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, object.ID) error); ok {
r1 = rf(ctx, id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PrefetchContents provides a mock function with given fields: ctx, contentIDs, hint
func (_m *DirectRepositoryWriter) PrefetchContents(ctx context.Context, contentIDs []index.ID, hint string) []index.ID {
ret := _m.Called(ctx, contentIDs, hint)
var r0 []index.ID
if rf, ok := ret.Get(0).(func(context.Context, []index.ID, string) []index.ID); ok {
r0 = rf(ctx, contentIDs, hint)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]index.ID)
}
}
return r0
}
// PrefetchObjects provides a mock function with given fields: ctx, objectIDs, hint
func (_m *DirectRepositoryWriter) PrefetchObjects(ctx context.Context, objectIDs []object.ID, hint string) ([]index.ID, error) {
ret := _m.Called(ctx, objectIDs, hint)
var r0 []index.ID
if rf, ok := ret.Get(0).(func(context.Context, []object.ID, string) []index.ID); ok {
r0 = rf(ctx, objectIDs, hint)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]index.ID)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, []object.ID, string) error); ok {
r1 = rf(ctx, objectIDs, hint)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PutManifest provides a mock function with given fields: ctx, labels, payload
func (_m *DirectRepositoryWriter) PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) {
ret := _m.Called(ctx, labels, payload)
var r0 manifest.ID
if rf, ok := ret.Get(0).(func(context.Context, map[string]string, interface{}) manifest.ID); ok {
r0 = rf(ctx, labels, payload)
} else {
r0 = ret.Get(0).(manifest.ID)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, map[string]string, interface{}) error); ok {
r1 = rf(ctx, labels, payload)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Refresh provides a mock function with given fields: ctx
func (_m *DirectRepositoryWriter) Refresh(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// RollbackUpgrade provides a mock function with given fields: ctx
func (_m *DirectRepositoryWriter) RollbackUpgrade(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// SetParameters provides a mock function with given fields: ctx, m, blobcfg
func (_m *DirectRepositoryWriter) SetParameters(ctx context.Context, m content.MutableParameters, blobcfg content.BlobCfgBlob) error {
ret := _m.Called(ctx, m, blobcfg)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, content.MutableParameters, content.BlobCfgBlob) error); ok {
r0 = rf(ctx, m, blobcfg)
} else {
r0 = ret.Error(0)
}
return r0
}
// SetUpgradeLockIntent provides a mock function with given fields: ctx, l
func (_m *DirectRepositoryWriter) SetUpgradeLockIntent(ctx context.Context, l content.UpgradeLock) (*content.UpgradeLock, error) {
ret := _m.Called(ctx, l)
var r0 *content.UpgradeLock
if rf, ok := ret.Get(0).(func(context.Context, content.UpgradeLock) *content.UpgradeLock); ok {
r0 = rf(ctx, l)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*content.UpgradeLock)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, content.UpgradeLock) error); ok {
r1 = rf(ctx, l)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Throttler provides a mock function with given fields:
func (_m *DirectRepositoryWriter) Throttler() throttling.SettableThrottler {
ret := _m.Called()
var r0 throttling.SettableThrottler
if rf, ok := ret.Get(0).(func() throttling.SettableThrottler); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(throttling.SettableThrottler)
}
}
return r0
}
// Time provides a mock function with given fields:
func (_m *DirectRepositoryWriter) Time() time.Time {
ret := _m.Called()
var r0 time.Time
if rf, ok := ret.Get(0).(func() time.Time); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(time.Time)
}
return r0
}
// Token provides a mock function with given fields: password
func (_m *DirectRepositoryWriter) Token(password string) (string, error) {
ret := _m.Called(password)
var r0 string
if rf, ok := ret.Get(0).(func(string) string); ok {
r0 = rf(password)
} else {
r0 = ret.Get(0).(string)
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(password)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// UniqueID provides a mock function with given fields:
func (_m *DirectRepositoryWriter) UniqueID() []byte {
ret := _m.Called()
var r0 []byte
if rf, ok := ret.Get(0).(func() []byte); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}
return r0
}
// UpdateDescription provides a mock function with given fields: d
func (_m *DirectRepositoryWriter) UpdateDescription(d string) {
_m.Called(d)
}
// VerifyObject provides a mock function with given fields: ctx, id
func (_m *DirectRepositoryWriter) VerifyObject(ctx context.Context, id object.ID) ([]index.ID, error) {
ret := _m.Called(ctx, id)
var r0 []index.ID
if rf, ok := ret.Get(0).(func(context.Context, object.ID) []index.ID); ok {
r0 = rf(ctx, id)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]index.ID)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, object.ID) error); ok {
r1 = rf(ctx, id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
type mockConstructorTestingTNewDirectRepositoryWriter interface {
mock.TestingT
Cleanup(func())
}
// NewDirectRepositoryWriter creates a new instance of DirectRepositoryWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewDirectRepositoryWriter(t mockConstructorTestingTNewDirectRepositoryWriter) *DirectRepositoryWriter {
mock := &DirectRepositoryWriter{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,587 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopialib
import (
"context"
"os"
"strings"
"sync/atomic"
"time"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot/snapshotmaintenance"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
type kopiaRepoService struct {
logger logrus.FieldLogger
}
type kopiaRepository struct {
rawRepo repo.Repository
rawWriter repo.RepositoryWriter
description string
uploaded int64
openTime time.Time
throttle logThrottle
logger logrus.FieldLogger
}
type kopiaMaintenance struct {
mode maintenance.Mode
startTime time.Time
uploaded int64
throttle logThrottle
logger logrus.FieldLogger
}
type logThrottle struct {
lastTime int64
interval time.Duration
}
type kopiaObjectReader struct {
rawReader object.Reader
}
type kopiaObjectWriter struct {
rawWriter object.Writer
}
const (
defaultLogInterval = time.Duration(time.Second * 10)
defaultMaintainCheckPeriod = time.Hour
overwriteFullMaintainInterval = time.Duration(0)
overwriteQuickMaintainInterval = time.Duration(0)
)
var kopiaRepoOpen = repo.Open
// NewKopiaRepoService creates an instance of BackupRepoService implemented by Kopia
func NewKopiaRepoService(logger logrus.FieldLogger) udmrepo.BackupRepoService {
ks := &kopiaRepoService{
logger: logger,
}
return ks
}
func (ks *kopiaRepoService) Init(ctx context.Context, repoOption udmrepo.RepoOptions, createNew bool) error {
repoCtx := logging.SetupKopiaLog(ctx, ks.logger)
if createNew {
if err := CreateBackupRepo(repoCtx, repoOption); err != nil {
return err
}
return writeInitParameters(repoCtx, repoOption, ks.logger)
} else {
return ConnectBackupRepo(repoCtx, repoOption)
}
}
func (ks *kopiaRepoService) Open(ctx context.Context, repoOption udmrepo.RepoOptions) (udmrepo.BackupRepo, error) {
repoConfig := repoOption.ConfigFilePath
if repoConfig == "" {
return nil, errors.New("invalid config file path")
}
if _, err := os.Stat(repoConfig); os.IsNotExist(err) {
return nil, errors.Wrapf(err, "repo config %s doesn't exist", repoConfig)
}
repoCtx := logging.SetupKopiaLog(ctx, ks.logger)
r, err := openKopiaRepo(repoCtx, repoConfig, repoOption.RepoPassword)
if err != nil {
return nil, err
}
kr := kopiaRepository{
rawRepo: r,
openTime: time.Now(),
description: repoOption.Description,
throttle: logThrottle{
interval: defaultLogInterval,
},
logger: ks.logger,
}
_, kr.rawWriter, err = r.NewWriter(repoCtx, repo.WriteSessionOptions{
Purpose: repoOption.Description,
OnUpload: kr.updateProgress,
})
if err != nil {
if e := r.Close(repoCtx); e != nil {
ks.logger.WithError(e).Error("Failed to close raw repository on error")
}
return nil, errors.Wrap(err, "error to create repo writer")
}
return &kr, nil
}
func (ks *kopiaRepoService) Maintain(ctx context.Context, repoOption udmrepo.RepoOptions) error {
repoConfig := repoOption.ConfigFilePath
if repoConfig == "" {
return errors.New("invalid config file path")
}
if _, err := os.Stat(repoConfig); os.IsNotExist(err) {
return errors.Wrapf(err, "repo config %s doesn't exist", repoConfig)
}
repoCtx := logging.SetupKopiaLog(ctx, ks.logger)
r, err := openKopiaRepo(repoCtx, repoConfig, repoOption.RepoPassword)
if err != nil {
return err
}
defer func() {
c := r.Close(repoCtx)
if c != nil {
ks.logger.WithError(c).Error("Failed to close repo")
}
}()
km := kopiaMaintenance{
mode: maintenance.ModeAuto,
startTime: time.Now(),
throttle: logThrottle{
interval: defaultLogInterval,
},
logger: ks.logger,
}
if mode, exist := repoOption.GeneralOptions[udmrepo.GenOptionMaintainMode]; exist {
if strings.EqualFold(mode, udmrepo.GenOptionMaintainFull) {
km.mode = maintenance.ModeFull
} else if strings.EqualFold(mode, udmrepo.GenOptionMaintainQuick) {
km.mode = maintenance.ModeQuick
}
}
err = repo.DirectWriteSession(repoCtx, r.(repo.DirectRepository), repo.WriteSessionOptions{
Purpose: "UdmRepoMaintenance",
OnUpload: km.maintainProgress,
}, func(ctx context.Context, dw repo.DirectRepositoryWriter) error {
return km.runMaintenance(ctx, dw)
})
if err != nil {
return errors.Wrap(err, "error to maintain repo")
}
return nil
}
func (ks *kopiaRepoService) DefaultMaintenanceFrequency() time.Duration {
return defaultMaintainCheckPeriod
}
func (km *kopiaMaintenance) runMaintenance(ctx context.Context, rep repo.DirectRepositoryWriter) error {
err := snapshotmaintenance.Run(logging.SetupKopiaLog(ctx, km.logger), rep, km.mode, false, maintenance.SafetyFull)
if err != nil {
return errors.Wrapf(err, "error to run maintenance under mode %s", km.mode)
}
return nil
}
// maintainProgress is called when the repository writes a piece of blob data to the storage during the maintenance
func (km *kopiaMaintenance) maintainProgress(uploaded int64) {
total := atomic.AddInt64(&km.uploaded, uploaded)
if km.throttle.shouldLog() {
km.logger.WithFields(
logrus.Fields{
"Start Time": km.startTime.Format(time.RFC3339Nano),
"Current": time.Now().Format(time.RFC3339Nano),
},
).Debugf("Repo maintenance uploaded %d bytes.", total)
}
}
func (kr *kopiaRepository) OpenObject(ctx context.Context, id udmrepo.ID) (udmrepo.ObjectReader, error) {
if kr.rawRepo == nil {
return nil, errors.New("repo is closed or not open")
}
reader, err := kr.rawRepo.OpenObject(logging.SetupKopiaLog(ctx, kr.logger), object.ID(id))
if err != nil {
return nil, errors.Wrap(err, "error to open object")
}
return &kopiaObjectReader{
rawReader: reader,
}, nil
}
func (kr *kopiaRepository) GetManifest(ctx context.Context, id udmrepo.ID, mani *udmrepo.RepoManifest) error {
if kr.rawRepo == nil {
return errors.New("repo is closed or not open")
}
metadata, err := kr.rawRepo.GetManifest(logging.SetupKopiaLog(ctx, kr.logger), manifest.ID(id), mani.Payload)
if err != nil {
return errors.Wrap(err, "error to get manifest")
}
mani.Metadata = getManifestEntryFromKopia(metadata)
return nil
}
func (kr *kopiaRepository) FindManifests(ctx context.Context, filter udmrepo.ManifestFilter) ([]*udmrepo.ManifestEntryMetadata, error) {
if kr.rawRepo == nil {
return nil, errors.New("repo is closed or not open")
}
metadata, err := kr.rawRepo.FindManifests(logging.SetupKopiaLog(ctx, kr.logger), filter.Labels)
if err != nil {
return nil, errors.Wrap(err, "error to find manifests")
}
return getManifestEntriesFromKopia(metadata), nil
}
func (kr *kopiaRepository) Time() time.Time {
if kr.rawRepo == nil {
return time.Time{}
}
return kr.rawRepo.Time()
}
func (kr *kopiaRepository) Close(ctx context.Context) error {
if kr.rawWriter != nil {
err := kr.rawWriter.Close(logging.SetupKopiaLog(ctx, kr.logger))
if err != nil {
return errors.Wrap(err, "error to close repo writer")
}
kr.rawWriter = nil
}
if kr.rawRepo != nil {
err := kr.rawRepo.Close(logging.SetupKopiaLog(ctx, kr.logger))
if err != nil {
return errors.Wrap(err, "error to close repo")
}
kr.rawRepo = nil
}
return nil
}
func (kr *kopiaRepository) NewObjectWriter(ctx context.Context, opt udmrepo.ObjectWriteOptions) udmrepo.ObjectWriter {
if kr.rawWriter == nil {
return nil
}
writer := kr.rawWriter.NewObjectWriter(logging.SetupKopiaLog(ctx, kr.logger), object.WriterOptions{
Description: opt.Description,
Prefix: index.ID(opt.Prefix),
AsyncWrites: getAsyncWrites(),
Compressor: getCompressorForObject(opt),
})
if writer == nil {
return nil
}
return &kopiaObjectWriter{
rawWriter: writer,
}
}
func (kr *kopiaRepository) PutManifest(ctx context.Context, manifest udmrepo.RepoManifest) (udmrepo.ID, error) {
if kr.rawWriter == nil {
return "", errors.New("repo writer is closed or not open")
}
id, err := kr.rawWriter.PutManifest(logging.SetupKopiaLog(ctx, kr.logger), manifest.Metadata.Labels, manifest.Payload)
if err != nil {
return "", errors.Wrap(err, "error to put manifest")
}
return udmrepo.ID(id), nil
}
func (kr *kopiaRepository) DeleteManifest(ctx context.Context, id udmrepo.ID) error {
if kr.rawWriter == nil {
return errors.New("repo writer is closed or not open")
}
err := kr.rawWriter.DeleteManifest(logging.SetupKopiaLog(ctx, kr.logger), manifest.ID(id))
if err != nil {
return errors.Wrap(err, "error to delete manifest")
}
return nil
}
func (kr *kopiaRepository) Flush(ctx context.Context) error {
if kr.rawWriter == nil {
return errors.New("repo writer is closed or not open")
}
err := kr.rawWriter.Flush(logging.SetupKopiaLog(ctx, kr.logger))
if err != nil {
return errors.Wrap(err, "error to flush repo")
}
return nil
}
// updateProgress is called when the repository writes a piece of blob data to the storage during data write
func (kr *kopiaRepository) updateProgress(uploaded int64) {
total := atomic.AddInt64(&kr.uploaded, uploaded)
if kr.throttle.shouldLog() {
kr.logger.WithFields(
logrus.Fields{
"Description": kr.description,
"Open Time": kr.openTime.Format(time.RFC3339Nano),
"Current": time.Now().Format(time.RFC3339Nano),
},
).Debugf("Repo uploaded %d bytes.", total)
}
}
func (kor *kopiaObjectReader) Read(p []byte) (int, error) {
if kor.rawReader == nil {
return 0, errors.New("object reader is closed or not open")
}
n, err := kor.rawReader.Read(p)
if err != nil {
return 0, errors.Wrap(err, "error to read object")
}
return n, nil
}
func (kor *kopiaObjectReader) Seek(offset int64, whence int) (int64, error) {
if kor.rawReader == nil {
return -1, errors.New("object reader is closed or not open")
}
p, err := kor.rawReader.Seek(offset, whence)
if err != nil {
return -1, errors.Wrap(err, "error to seek object")
}
return p, nil
}
func (kor *kopiaObjectReader) Close() error {
if kor.rawReader == nil {
return nil
}
err := kor.rawReader.Close()
if err != nil {
return errors.Wrap(err, "error to close object reader")
}
kor.rawReader = nil
return nil
}
func (kor *kopiaObjectReader) Length() int64 {
if kor.rawReader == nil {
return -1
}
return kor.rawReader.Length()
}
func (kow *kopiaObjectWriter) Write(p []byte) (int, error) {
if kow.rawWriter == nil {
return 0, errors.New("object writer is closed or not open")
}
n, err := kow.rawWriter.Write(p)
if err != nil {
return 0, errors.Wrap(err, "error to write object")
}
return n, nil
}
func (kow *kopiaObjectWriter) Seek(offset int64, whence int) (int64, error) {
return -1, errors.New("not supported")
}
func (kow *kopiaObjectWriter) Checkpoint() (udmrepo.ID, error) {
if kow.rawWriter == nil {
return udmrepo.ID(""), errors.New("object writer is closed or not open")
}
id, err := kow.rawWriter.Checkpoint()
if err != nil {
return udmrepo.ID(""), errors.Wrap(err, "error to checkpoint object")
}
return udmrepo.ID(id), nil
}
func (kow *kopiaObjectWriter) Result() (udmrepo.ID, error) {
if kow.rawWriter == nil {
return udmrepo.ID(""), errors.New("object writer is closed or not open")
}
id, err := kow.rawWriter.Result()
if err != nil {
return udmrepo.ID(""), errors.Wrap(err, "error to wait object")
}
return udmrepo.ID(id), nil
}
func (kow *kopiaObjectWriter) Close() error {
if kow.rawWriter == nil {
return nil
}
err := kow.rawWriter.Close()
if err != nil {
return errors.Wrap(err, "error to close object writer")
}
kow.rawWriter = nil
return nil
}
// getAsyncWrites returns the number of async writes, at present, we don't support async writes
func getAsyncWrites() int {
return 0
}
// getCompressorForObject returns the compressor for an object, at present, we don't support compression
func getCompressorForObject(opt udmrepo.ObjectWriteOptions) compression.Name {
return ""
}
func getManifestEntryFromKopia(kMani *manifest.EntryMetadata) *udmrepo.ManifestEntryMetadata {
return &udmrepo.ManifestEntryMetadata{
ID: udmrepo.ID(kMani.ID),
Labels: kMani.Labels,
Length: int32(kMani.Length),
ModTime: kMani.ModTime,
}
}
func getManifestEntriesFromKopia(kMani []*manifest.EntryMetadata) []*udmrepo.ManifestEntryMetadata {
var ret []*udmrepo.ManifestEntryMetadata
for _, entry := range kMani {
ret = append(ret, &udmrepo.ManifestEntryMetadata{
ID: udmrepo.ID(entry.ID),
Labels: entry.Labels,
Length: int32(entry.Length),
ModTime: entry.ModTime,
})
}
return ret
}
func (lt *logThrottle) shouldLog() bool {
nextOutputTime := atomic.LoadInt64((*int64)(&lt.lastTime))
if nowNano := time.Now().UnixNano(); nowNano > nextOutputTime {
if atomic.CompareAndSwapInt64((*int64)(&lt.lastTime), nextOutputTime, nowNano+lt.interval.Nanoseconds()) {
return true
}
}
return false
}
func openKopiaRepo(ctx context.Context, configFile string, password string) (repo.Repository, error) {
r, err := kopiaRepoOpen(ctx, configFile, password, &repo.Options{})
if os.IsNotExist(err) {
return nil, errors.Wrap(err, "error to open repo, repo doesn't exist")
}
if err != nil {
return nil, errors.Wrap(err, "error to open repo")
}
return r, nil
}
func writeInitParameters(ctx context.Context, repoOption udmrepo.RepoOptions, logger logrus.FieldLogger) error {
r, err := openKopiaRepo(ctx, repoOption.ConfigFilePath, repoOption.RepoPassword)
if err != nil {
return err
}
defer func() {
c := r.Close(ctx)
if c != nil {
logger.WithError(c).Error("Failed to close repo")
}
}()
err = repo.WriteSession(ctx, r, repo.WriteSessionOptions{
Purpose: "set init parameters",
}, func(ctx context.Context, w repo.RepositoryWriter) error {
p := maintenance.DefaultParams()
if overwriteFullMaintainInterval != time.Duration(0) {
logger.Infof("Full maintenance interval change from %v to %v", p.FullCycle.Interval, overwriteFullMaintainInterval)
p.FullCycle.Interval = overwriteFullMaintainInterval
}
if overwriteQuickMaintainInterval != time.Duration(0) {
logger.Infof("Quick maintenance interval change from %v to %v", p.QuickCycle.Interval, overwriteQuickMaintainInterval)
p.QuickCycle.Interval = overwriteQuickMaintainInterval
}
p.Owner = r.ClientOptions().UsernameAtHost()
if err := maintenance.SetParams(ctx, w, &p); err != nil {
return errors.Wrap(err, "error to set maintenance params")
}
return nil
})
if err != nil {
return errors.Wrap(err, "error to init write repo parameters")
}
return nil
}

View File

@ -0,0 +1,406 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopialib
import (
"context"
"os"
"testing"
"time"
"github.com/kopia/kopia/repo"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
repomocks "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend/mocks"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
func TestOpen(t *testing.T) {
var directRpo *repomocks.DirectRepository
testCases := []struct {
name string
repoOptions udmrepo.RepoOptions
returnRepo *repomocks.DirectRepository
repoOpen func(context.Context, string, string, *repo.Options) (repo.Repository, error)
newWriterError error
expectedErr string
expected *kopiaRepository
}{
{
name: "invalid config file",
expectedErr: "invalid config file path",
},
{
name: "config file doesn't exist",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "fake-file",
},
expectedErr: "repo config fake-file doesn't exist: stat fake-file: no such file or directory",
},
{
name: "repo open fail, repo not exist",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "/tmp",
},
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
return nil, os.ErrNotExist
},
expectedErr: "error to open repo, repo doesn't exist: file does not exist",
},
{
name: "repo open fail, other error",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "/tmp",
},
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
return nil, errors.New("fake-repo-open-error")
},
expectedErr: "error to open repo: fake-repo-open-error",
},
{
name: "create repository writer fail",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "/tmp",
},
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
return directRpo, nil
},
returnRepo: new(repomocks.DirectRepository),
newWriterError: errors.New("fake-new-writer-error"),
expectedErr: "error to create repo writer: fake-new-writer-error",
},
{
name: "create repository success",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "/tmp",
Description: "fake-description",
},
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
return directRpo, nil
},
returnRepo: new(repomocks.DirectRepository),
expected: &kopiaRepository{
description: "fake-description",
throttle: logThrottle{
interval: defaultLogInterval,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
logger := velerotest.NewLogger()
service := kopiaRepoService{
logger: logger,
}
if tc.repoOpen != nil {
kopiaRepoOpen = tc.repoOpen
}
if tc.returnRepo != nil {
directRpo = tc.returnRepo
}
if tc.returnRepo != nil {
tc.returnRepo.On("NewWriter", mock.Anything, mock.Anything).Return(nil, nil, tc.newWriterError)
tc.returnRepo.On("Close", mock.Anything).Return(nil)
}
repo, err := service.Open(context.Background(), tc.repoOptions)
if repo != nil {
require.Equal(t, tc.expected.description, repo.(*kopiaRepository).description)
require.Equal(t, tc.expected.throttle.interval, repo.(*kopiaRepository).throttle.interval)
require.Equal(t, repo.(*kopiaRepository).logger, logger)
}
if tc.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tc.expectedErr)
}
})
}
}
func TestMaintain(t *testing.T) {
var directRpo *repomocks.DirectRepository
testCases := []struct {
name string
repoOptions udmrepo.RepoOptions
returnRepo *repomocks.DirectRepository
returnRepoWriter *repomocks.DirectRepositoryWriter
repoOpen func(context.Context, string, string, *repo.Options) (repo.Repository, error)
newRepoWriterError error
findManifestError error
expectedErr string
}{
{
name: "invalid config file",
expectedErr: "invalid config file path",
},
{
name: "config file doesn't exist",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "fake-file",
},
expectedErr: "repo config fake-file doesn't exist: stat fake-file: no such file or directory",
},
{
name: "repo open fail, repo not exist",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "/tmp",
GeneralOptions: map[string]string{},
},
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
return nil, os.ErrNotExist
},
expectedErr: "error to open repo, repo doesn't exist: file does not exist",
},
{
name: "repo open fail, other error",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "/tmp",
GeneralOptions: map[string]string{},
},
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
return nil, errors.New("fake-repo-open-error")
},
expectedErr: "error to open repo: fake-repo-open-error",
},
{
name: "write session fail",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "/tmp",
GeneralOptions: map[string]string{},
},
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
return directRpo, nil
},
returnRepo: new(repomocks.DirectRepository),
newRepoWriterError: errors.New("fake-new-direct-writer-error"),
expectedErr: "error to maintain repo: unable to create direct writer: fake-new-direct-writer-error",
},
{
name: "maintain fail",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "/tmp",
GeneralOptions: map[string]string{},
},
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
return directRpo, nil
},
returnRepo: new(repomocks.DirectRepository),
returnRepoWriter: new(repomocks.DirectRepositoryWriter),
findManifestError: errors.New("fake-find-manifest-error"),
expectedErr: "error to maintain repo: error to run maintenance under mode auto: unable to get maintenance params: error looking for maintenance manifest: fake-find-manifest-error",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
logger := velerotest.NewLogger()
ctx := context.Background()
service := kopiaRepoService{
logger: logger,
}
if tc.repoOpen != nil {
kopiaRepoOpen = tc.repoOpen
}
if tc.returnRepo != nil {
directRpo = tc.returnRepo
}
if tc.returnRepo != nil {
tc.returnRepo.On("NewDirectWriter", mock.Anything, mock.Anything).Return(ctx, tc.returnRepoWriter, tc.newRepoWriterError)
tc.returnRepo.On("Close", mock.Anything).Return(nil)
}
if tc.returnRepoWriter != nil {
tc.returnRepoWriter.On("DisableIndexRefresh").Return()
tc.returnRepoWriter.On("AlsoLogToContentLog", mock.Anything).Return(nil)
tc.returnRepoWriter.On("Close", mock.Anything).Return(nil)
tc.returnRepoWriter.On("FindManifests", mock.Anything, mock.Anything).Return(nil, tc.findManifestError)
}
err := service.Maintain(ctx, tc.repoOptions)
if tc.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tc.expectedErr)
}
})
}
}
func TestWriteInitParameters(t *testing.T) {
var directRpo *repomocks.DirectRepository
testCases := []struct {
name string
repoOptions udmrepo.RepoOptions
returnRepo *repomocks.DirectRepository
returnRepoWriter *repomocks.DirectRepositoryWriter
repoOpen func(context.Context, string, string, *repo.Options) (repo.Repository, error)
newRepoWriterError error
findManifestError error
expectedErr string
}{
{
name: "repo open fail, repo not exist",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "/tmp",
GeneralOptions: map[string]string{},
},
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
return nil, os.ErrNotExist
},
expectedErr: "error to open repo, repo doesn't exist: file does not exist",
},
{
name: "repo open fail, other error",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "/tmp",
GeneralOptions: map[string]string{},
},
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
return nil, errors.New("fake-repo-open-error")
},
expectedErr: "error to open repo: fake-repo-open-error",
},
{
name: "write session fail",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "/tmp",
GeneralOptions: map[string]string{},
},
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
return directRpo, nil
},
returnRepo: new(repomocks.DirectRepository),
newRepoWriterError: errors.New("fake-new-writer-error"),
expectedErr: "error to init write repo parameters: unable to create writer: fake-new-writer-error",
},
{
name: "set repo param fail",
repoOptions: udmrepo.RepoOptions{
ConfigFilePath: "/tmp",
GeneralOptions: map[string]string{},
},
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
return directRpo, nil
},
returnRepo: new(repomocks.DirectRepository),
returnRepoWriter: new(repomocks.DirectRepositoryWriter),
findManifestError: errors.New("fake-find-manifest-error"),
expectedErr: "error to init write repo parameters: error to set maintenance params: error looking for maintenance manifest: fake-find-manifest-error",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
logger := velerotest.NewLogger()
ctx := context.Background()
if tc.repoOpen != nil {
kopiaRepoOpen = tc.repoOpen
}
if tc.returnRepo != nil {
directRpo = tc.returnRepo
}
if tc.returnRepo != nil {
tc.returnRepo.On("NewWriter", mock.Anything, mock.Anything).Return(ctx, tc.returnRepoWriter, tc.newRepoWriterError)
tc.returnRepo.On("ClientOptions").Return(repo.ClientOptions{})
tc.returnRepo.On("Close", mock.Anything).Return(nil)
}
if tc.returnRepoWriter != nil {
tc.returnRepoWriter.On("Close", mock.Anything).Return(nil)
tc.returnRepoWriter.On("FindManifests", mock.Anything, mock.Anything).Return(nil, tc.findManifestError)
}
err := writeInitParameters(ctx, tc.repoOptions, logger)
if tc.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tc.expectedErr)
}
})
}
}
func TestShouldLog(t *testing.T) {
testCases := []struct {
name string
lastTime int64
interval time.Duration
retValue bool
}{
{
name: "first time",
retValue: true,
},
{
name: "not run",
lastTime: time.Now().Add(time.Hour).UnixNano(),
interval: time.Second * 10,
},
{
name: "not first time, run",
lastTime: time.Now().Add(-time.Hour).UnixNano(),
interval: time.Second * 10,
retValue: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
lt := logThrottle{
lastTime: tc.lastTime,
interval: tc.interval,
}
before := lt.lastTime
nw := time.Now()
s := lt.shouldLog()
require.Equal(t, s, tc.retValue)
if s {
require.GreaterOrEqual(t, lt.lastTime-nw.UnixNano(), lt.interval)
} else {
require.Equal(t, lt.lastTime, before)
}
})
}
}

View File

@ -4,8 +4,10 @@ package mocks
import (
context "context"
time "time"
mock "github.com/stretchr/testify/mock"
udmrepo "github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
)
@ -14,6 +16,20 @@ type BackupRepoService struct {
mock.Mock
}
// DefaultMaintenanceFrequency provides a mock function with given fields:
func (_m *BackupRepoService) DefaultMaintenanceFrequency() time.Duration {
ret := _m.Called()
var r0 time.Duration
if rf, ok := ret.Get(0).(func() time.Duration); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(time.Duration)
}
return r0
}
// Init provides a mock function with given fields: ctx, repoOption, createNew
func (_m *BackupRepoService) Init(ctx context.Context, repoOption udmrepo.RepoOptions, createNew bool) error {
ret := _m.Called(ctx, repoOption, createNew)

View File

@ -84,6 +84,10 @@ type BackupRepoService interface {
// Maintain is periodically called to maintain the backup repository to eliminate redundant data.
// repoOption: options to maintain the backup repository.
Maintain(ctx context.Context, repoOption RepoOptions) error
// DefaultMaintenanceFrequency returns the defgault frequency of maintenance, callers refer this
// frequency to maintain the backup repository to get the best maintenance performance
DefaultMaintenanceFrequency() time.Duration
}
// BackupRepo provides the access to the backup repository

View File

@ -20,10 +20,10 @@ import (
"github.com/sirupsen/logrus"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib"
)
// Create creates an instance of BackupRepoService
func Create(logger logrus.FieldLogger) udmrepo.BackupRepoService {
///TODO: create from kopiaLib
return nil
return kopialib.NewKopiaRepoService(logger)
}

View File

@ -0,0 +1,90 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging
import (
"context"
"github.com/kopia/kopia/repo/logging"
"github.com/sirupsen/logrus"
)
type kopiaLog struct {
module string
logger logrus.FieldLogger
}
// SetupKopiaLog sets the Kopia log handler to the specific context, Kopia modules
// call the logger in the context to write logs
func SetupKopiaLog(ctx context.Context, logger logrus.FieldLogger) context.Context {
return logging.WithLogger(ctx, func(module string) logging.Logger {
return &kopiaLog{
module: module,
logger: logger,
}
})
}
func (kl *kopiaLog) Debugf(msg string, args ...interface{}) {
logger := kl.logger.WithField("logSource", kl.getLogSource())
logger.Debugf(msg, args...)
}
func (kl *kopiaLog) Debugw(msg string, keyValuePairs ...interface{}) {
logger := kl.logger.WithField("logSource", kl.getLogSource())
logger.WithFields(getLogFields(keyValuePairs...)).Debug(msg)
}
func (kl *kopiaLog) Infof(msg string, args ...interface{}) {
logger := kl.logger.WithField("logSource", kl.getLogSource())
logger.Infof(msg, args...)
}
func (kl *kopiaLog) Warnf(msg string, args ...interface{}) {
logger := kl.logger.WithField("logSource", kl.getLogSource())
logger.Warnf(msg, args...)
}
// We see Kopia generates error logs for some normal cases or non-critical
// cases. So Kopia's error logs are regarded as warning logs so that they don't
// affect Velero's workflow.
func (kl *kopiaLog) Errorf(msg string, args ...interface{}) {
logger := kl.logger.WithFields(logrus.Fields{
"logSource": kl.getLogSource(),
"sublevel": "error",
})
logger.Warnf(msg, args...)
}
func (kl *kopiaLog) getLogSource() string {
return "kopia/" + kl.module
}
func getLogFields(keyValuePairs ...interface{}) map[string]interface{} {
m := map[string]interface{}{}
for i := 0; i+1 < len(keyValuePairs); i += 2 {
s, ok := keyValuePairs[i].(string)
if !ok {
s = "non-string-key"
}
m[s] = keyValuePairs[i+1]
}
return m
}

View File

@ -0,0 +1,86 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestGetLogFields(t *testing.T) {
testCases := []struct {
name string
pairs []interface{}
expected map[string]interface{}
}{
{
name: "normal",
pairs: []interface{}{
"fake-key1",
"fake-value1",
"fake-key2",
10,
"fake-key3",
struct{ v int }{v: 10},
},
expected: map[string]interface{}{
"fake-key1": "fake-value1",
"fake-key2": 10,
"fake-key3": struct{ v int }{v: 10},
},
},
{
name: "non string key",
pairs: []interface{}{
"fake-key1",
"fake-value1",
10,
10,
"fake-key3",
struct{ v int }{v: 10},
},
expected: map[string]interface{}{
"fake-key1": "fake-value1",
"non-string-key": 10,
"fake-key3": struct{ v int }{v: 10},
},
},
{
name: "missing value",
pairs: []interface{}{
"fake-key1",
"fake-value1",
"fake-key2",
10,
"fake-key3",
},
expected: map[string]interface{}{
"fake-key1": "fake-value1",
"fake-key2": 10,
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
m := getLogFields(tc.pairs...)
require.Equal(t, tc.expected, m)
})
}
}