From bd2008c893e15ec5e6f4f2f231073ef475dfcd02 Mon Sep 17 00:00:00 2001 From: Tiger Kaovilai Date: Wed, 26 Jun 2024 23:55:13 -0400 Subject: [PATCH 1/4] Make pkg/install/Deployment podTemplateOptions bool functions accept bool param Signed-off-by: Tiger Kaovilai --- pkg/install/deployment.go | 20 ++++++++++---------- pkg/install/deployment_test.go | 4 ++-- pkg/install/resources.go | 10 +++++----- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/install/deployment.go b/pkg/install/deployment.go index a8c2a131a..9ac46ece8 100644 --- a/pkg/install/deployment.go +++ b/pkg/install/deployment.go @@ -95,9 +95,9 @@ func WithSecret(secretPresent bool) podTemplateOption { } } -func WithRestoreOnly() podTemplateOption { +func WithRestoreOnly(b bool) podTemplateOption { return func(c *podTemplateConfig) { - c.restoreOnly = true + c.restoreOnly = b } } @@ -143,21 +143,21 @@ func WithUploaderType(t string) podTemplateOption { } } -func WithDefaultVolumesToFsBackup() podTemplateOption { +func WithDefaultVolumesToFsBackup(b bool) podTemplateOption { return func(c *podTemplateConfig) { - c.defaultVolumesToFsBackup = true + c.defaultVolumesToFsBackup = b } } -func WithDefaultSnapshotMoveData() podTemplateOption { +func WithDefaultSnapshotMoveData(b bool) podTemplateOption { return func(c *podTemplateConfig) { - c.defaultSnapshotMoveData = true + c.defaultSnapshotMoveData = b } } -func WithDisableInformerCache() podTemplateOption { +func WithDisableInformerCache(b bool) podTemplateOption { return func(c *podTemplateConfig) { - c.disableInformerCache = true + c.disableInformerCache = b } } @@ -167,9 +167,9 @@ func WithServiceAccountName(sa string) podTemplateOption { } } -func WithPrivilegedNodeAgent() podTemplateOption { +func WithPrivilegedNodeAgent(b bool) podTemplateOption { return func(c *podTemplateConfig) { - c.privilegedNodeAgent = true + c.privilegedNodeAgent = b } } diff --git a/pkg/install/deployment_test.go b/pkg/install/deployment_test.go index 04d301c01..426a53df6 100644 --- a/pkg/install/deployment_test.go +++ b/pkg/install/deployment_test.go @@ -31,7 +31,7 @@ func TestDeployment(t *testing.T) { assert.Equal(t, "velero", deploy.ObjectMeta.Namespace) - deploy = Deployment("velero", WithRestoreOnly()) + deploy = Deployment("velero", WithRestoreOnly(true)) assert.Equal(t, "--restore-only", deploy.Spec.Template.Spec.Containers[0].Args[1]) deploy = Deployment("velero", WithEnvFromSecretKey("my-var", "my-secret", "my-key")) @@ -67,7 +67,7 @@ func TestDeployment(t *testing.T) { deploy = Deployment("velero", WithServiceAccountName("test-sa")) assert.Equal(t, "test-sa", deploy.Spec.Template.Spec.ServiceAccountName) - deploy = Deployment("velero", WithDisableInformerCache()) + deploy = Deployment("velero", WithDisableInformerCache(true)) assert.Len(t, deploy.Spec.Template.Spec.Containers[0].Args, 2) assert.Equal(t, "--disable-informer-cache=true", deploy.Spec.Template.Spec.Containers[0].Args[1]) diff --git a/pkg/install/resources.go b/pkg/install/resources.go index 171fc2ece..752bc025b 100644 --- a/pkg/install/resources.go +++ b/pkg/install/resources.go @@ -358,7 +358,7 @@ func AllResources(o *VeleroOptions) *unstructured.UnstructuredList { } if o.RestoreOnly { - deployOpts = append(deployOpts, WithRestoreOnly()) + deployOpts = append(deployOpts, WithRestoreOnly(true)) } if len(o.Plugins) > 0 { @@ -366,15 +366,15 @@ func AllResources(o *VeleroOptions) *unstructured.UnstructuredList { } if o.DefaultVolumesToFsBackup { - deployOpts = append(deployOpts, WithDefaultVolumesToFsBackup()) + deployOpts = append(deployOpts, WithDefaultVolumesToFsBackup(true)) } if o.DefaultSnapshotMoveData { - deployOpts = append(deployOpts, WithDefaultSnapshotMoveData()) + deployOpts = append(deployOpts, WithDefaultSnapshotMoveData(true)) } if o.DisableInformerCache { - deployOpts = append(deployOpts, WithDisableInformerCache()) + deployOpts = append(deployOpts, WithDisableInformerCache(true)) } deploy := Deployment(o.Namespace, deployOpts...) @@ -396,7 +396,7 @@ func AllResources(o *VeleroOptions) *unstructured.UnstructuredList { dsOpts = append(dsOpts, WithFeatures(o.Features)) } if o.PrivilegedNodeAgent { - dsOpts = append(dsOpts, WithPrivilegedNodeAgent()) + dsOpts = append(dsOpts, WithPrivilegedNodeAgent(true)) } ds := DaemonSet(o.Namespace, dsOpts...) if err := appendUnstructured(resources, ds); err != nil { From e862b976a4a9775237d7e45c06701474dc09b78f Mon Sep 17 00:00:00 2001 From: Xun Jiang Date: Tue, 23 Jul 2024 14:39:01 +0800 Subject: [PATCH 2/4] Use labels instead of regex to filter E2E test cases. Signed-off-by: Xun Jiang --- .github/workflows/e2e-test-kind.yaml | 35 ++-- .github/workflows/pr-codespell.yml | 2 +- test/Makefile | 135 ++++++++-------- test/e2e/README.md | 78 ++++----- .../api-group/enable_api_group_versions.go | 2 +- test/e2e/e2e_suite_test.go | 153 ++++++++++++------ test/perf/e2e_suite_test.go | 9 +- 7 files changed, 240 insertions(+), 174 deletions(-) diff --git a/.github/workflows/e2e-test-kind.yaml b/.github/workflows/e2e-test-kind.yaml index 57d499823..9a379738a 100644 --- a/.github/workflows/e2e-test-kind.yaml +++ b/.github/workflows/e2e-test-kind.yaml @@ -67,16 +67,12 @@ jobs: - 1.27.10 - 1.28.6 - 1.29.1 - focus: - # tests to focus on, use `|` to concatenate multiple regexes to run on the same job - # ordered according to e2e_suite_test.go order - - Basic\]\[ClusterResource - - ResourceFiltering - - ResourceModifier|Backups|PrivilegesMgmt\]\[SSR - - Schedule\]\[OrderedResources - - NamespaceMapping\]\[Single\]\[Restic|NamespaceMapping\]\[Multiple\]\[Restic - - Basic\]\[Nodeport - - Basic\]\[StorageClass + labels: + # labels are used to filter running E2E cases + - Basic && (ClusterResource || NodePort || StorageClass) + - ResourceFiltering && !Restic + - ResourceModifier || (Backups && BackupsSync) || PrivilegesMgmt || OrderedResources + - (NamespaceMapping && Single && Restic) || (NamespaceMapping && Multiple && Restic) fail-fast: false steps: - name: Set up Go @@ -128,13 +124,18 @@ jobs: curl -LO https://dl.k8s.io/release/v${{ matrix.k8s }}/bin/linux/amd64/kubectl sudo install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl - GOPATH=~/go CLOUD_PROVIDER=kind \ - OBJECT_STORE_PROVIDER=aws BSL_CONFIG=region=minio,s3ForcePathStyle="true",s3Url=http://$(hostname -i):9000 \ - CREDS_FILE=/tmp/credential BSL_BUCKET=bucket \ - ADDITIONAL_OBJECT_STORE_PROVIDER=aws ADDITIONAL_BSL_CONFIG=region=minio,s3ForcePathStyle="true",s3Url=http://$(hostname -i):9000 \ - ADDITIONAL_CREDS_FILE=/tmp/credential ADDITIONAL_BSL_BUCKET=additional-bucket \ - GINKGO_FOCUS='${{ matrix.focus }}' VELERO_IMAGE=velero:pr-test \ - GINKGO_SKIP='SKIP_KIND|pv-backup|Restic|Snapshot|LongTime' \ + GOPATH=~/go \ + CLOUD_PROVIDER=kind \ + OBJECT_STORE_PROVIDER=aws \ + BSL_CONFIG=region=minio,s3ForcePathStyle="true",s3Url=http://$(hostname -i):9000 \ + CREDS_FILE=/tmp/credential \ + BSL_BUCKET=bucket \ + ADDITIONAL_OBJECT_STORE_PROVIDER=aws \ + ADDITIONAL_BSL_CONFIG=region=minio,s3ForcePathStyle="true",s3Url=http://$(hostname -i):9000 \ + ADDITIONAL_CREDS_FILE=/tmp/credential \ + ADDITIONAL_BSL_BUCKET=additional-bucket \ + VELERO_IMAGE=velero:pr-test \ + GINKGO_LABELS="${{ matrix.labels }}" \ make -C test/ run-e2e timeout-minutes: 30 - name: Upload debug bundle diff --git a/.github/workflows/pr-codespell.yml b/.github/workflows/pr-codespell.yml index ca733318c..0d3138e40 100644 --- a/.github/workflows/pr-codespell.yml +++ b/.github/workflows/pr-codespell.yml @@ -15,7 +15,7 @@ jobs: with: # ignore the config/.../crd.go file as it's generated binary data that is edited elswhere. skip: .git,*.png,*.jpg,*.woff,*.ttf,*.gif,*.ico,./config/crd/v1beta1/crds/crds.go,./config/crd/v1/crds/crds.go,./config/crd/v2alpha1/crds/crds.go,./go.sum,./LICENSE - ignore_words_list: iam,aks,ist,bridget,ue,shouldnot,atleast,notin,sme + ignore_words_list: iam,aks,ist,bridget,ue,shouldnot,atleast,notin,sme,optin check_filenames: true check_hidden: true diff --git a/test/Makefile b/test/Makefile index fbc0e08e3..bdf87183b 100644 --- a/test/Makefile +++ b/test/Makefile @@ -47,10 +47,9 @@ TOOLS_BIN_DIR := $(TOOLS_DIR)/$(BIN_DIR) GINKGO := $(GOBIN)/ginkgo KUSTOMIZE := $(TOOLS_BIN_DIR)/kustomize OUTPUT_DIR := _output/$(GOOS)/$(GOARCH)/bin -GINKGO_FOCUS ?= -GINKGO_SKIP ?= -SKIP_STR := $(foreach var, $(subst ., ,$(GINKGO_SKIP)),-skip "$(var)") -FOCUS_STR := $(foreach var, $(subst ., ,$(GINKGO_FOCUS)),-focus "$(var)") +# Please reference to this document for Ginkgo label spec format. +# https://onsi.github.io/ginkgo/#spec-labels +GINKGO_LABELS ?= VELERO_CLI ?=$$(pwd)/../_output/bin/$(GOOS)/$(GOARCH)/velero VELERO_IMAGE ?= velero/velero:main PLUGINS ?= @@ -129,26 +128,26 @@ VELERO_POD_CPU_REQUEST ?= 2 VELERO_POD_MEM_REQUEST ?= 2Gi POD_VOLUME_OPERATION_TIMEOUT ?= 6h -COMMON_ARGS := -velerocli=$(VELERO_CLI) \ - -velero-image=$(VELERO_IMAGE) \ - -plugins=$(PLUGINS) \ - -velero-version=$(VERSION) \ - -restore-helper-image=$(RESTORE_HELPER_IMAGE) \ - -velero-namespace=$(VELERO_NAMESPACE) \ - -credentials-file=$(CREDS_FILE) \ - -bucket=$(BSL_BUCKET) \ - -prefix=$(BSL_PREFIX) \ - -bsl-config=$(BSL_CONFIG) \ - -vsl-config=$(VSL_CONFIG) \ - -cloud-provider=$(CLOUD_PROVIDER) \ - -object-store-provider="$(OBJECT_STORE_PROVIDER)" \ - -features=$(FEATURES) \ - -install-velero=$(INSTALL_VELERO) \ - -registry-credential-file=$(REGISTRY_CREDENTIAL_FILE) \ - -debug-e2e-test=$(DEBUG_E2E_TEST) \ - -velero-server-debug-mode=$(VELERO_SERVER_DEBUG_MODE) \ - -uploader-type=$(UPLOADER_TYPE) \ - -debug-velero-pod-restart=$(DEBUG_VELERO_POD_RESTART) +COMMON_ARGS := --velerocli=$(VELERO_CLI) \ + --velero-image=$(VELERO_IMAGE) \ + --plugins=$(PLUGINS) \ + --velero-version=$(VERSION) \ + --restore-helper-image=$(RESTORE_HELPER_IMAGE) \ + --velero-namespace=$(VELERO_NAMESPACE) \ + --credentials-file=$(CREDS_FILE) \ + --bucket=$(BSL_BUCKET) \ + --prefix=$(BSL_PREFIX) \ + --bsl-config=$(BSL_CONFIG) \ + --vsl-config=$(VSL_CONFIG) \ + --cloud-provider=$(CLOUD_PROVIDER) \ + --object-store-provider="$(OBJECT_STORE_PROVIDER)" \ + --features=$(FEATURES) \ + --install-velero=$(INSTALL_VELERO) \ + --registry-credential-file=$(REGISTRY_CREDENTIAL_FILE) \ + --debug-e2e-test=$(DEBUG_E2E_TEST) \ + --velero-server-debug-mode=$(VELERO_SERVER_DEBUG_MODE) \ + --uploader-type=$(UPLOADER_TYPE) \ + --debug-velero-pod-restart=$(DEBUG_VELERO_POD_RESTART) # Make sure ginkgo is in $GOBIN .PHONY:ginkgo @@ -166,31 +165,36 @@ run-e2e: ginkgo (echo "Bucket to store the backups from E2E tests is required, please re-run with BSL_BUCKET="; exit 1 ) @[ "${CLOUD_PROVIDER}" ] && echo "Using cloud provider ${CLOUD_PROVIDER}" || \ (echo "Cloud provider for target cloud/plugin provider is required, please rerun with CLOUD_PROVIDER="; exit 1) - @$(GINKGO) run -v $(FOCUS_STR) $(SKIP_STR) --junit-report report.xml ./e2e -- $(COMMON_ARGS) \ - -upgrade-from-velero-cli=$(UPGRADE_FROM_VELERO_CLI) \ - -upgrade-from-velero-version=$(UPGRADE_FROM_VELERO_VERSION) \ - -migrate-from-velero-cli=$(MIGRATE_FROM_VELERO_CLI) \ - -migrate-from-velero-version=$(MIGRATE_FROM_VELERO_VERSION) \ - -additional-bsl-plugins=$(ADDITIONAL_BSL_PLUGINS) \ - -additional-bsl-object-store-provider="$(ADDITIONAL_OBJECT_STORE_PROVIDER)" \ - -additional-bsl-credentials-file=$(ADDITIONAL_CREDS_FILE) \ - -additional-bsl-bucket=$(ADDITIONAL_BSL_BUCKET) \ - -additional-bsl-prefix=$(ADDITIONAL_BSL_PREFIX) \ - -additional-bsl-config=$(ADDITIONAL_BSL_CONFIG) \ - -default-cluster-context=$(DEFAULT_CLUSTER) \ - -standby-cluster-context=$(STANDBY_CLUSTER) \ - -snapshot-move-data=$(SNAPSHOT_MOVE_DATA) \ - -data-mover-plugin=$(DATA_MOVER_PLUGIN) \ - -standby-cluster-cloud-provider=$(STANDBY_CLUSTER_CLOUD_PROVIDER) \ - -standby-cluster-plugins=$(STANDBY_CLUSTER_PLUGINS) \ - -standby-cluster-object-store-provider=$(STANDBY_CLUSTER_OBJECT_STORE_PROVIDER) \ - -default-cluster-name=$(DEFAULT_CLUSTER_NAME) \ - -standby-cluster-name=$(STANDBY_CLUSTER_NAME) \ - -eks-policy-arn=$(EKS_POLICY_ARN) \ - -default-cls-service-account-name=$(DEFAULT_CLS_SERVICE_ACCOUNT_NAME) \ - -standby-cls-service-account-name=$(STANDBY_CLS_SERVICE_ACCOUNT_NAME) - -kibishii-directory=$(KIBISHII_DIRECTORY) \ - -disable-informer-cache=$(DISABLE_INFORMER_CACHE) + @$(GINKGO) run \ + -v \ + --junit-report report.xml \ + --label-filter="$(GINKGO_LABELS)" \ + ./e2e \ + -- $(COMMON_ARGS) \ + --upgrade-from-velero-cli=$(UPGRADE_FROM_VELERO_CLI) \ + --upgrade-from-velero-version=$(UPGRADE_FROM_VELERO_VERSION) \ + --migrate-from-velero-cli=$(MIGRATE_FROM_VELERO_CLI) \ + --migrate-from-velero-version=$(MIGRATE_FROM_VELERO_VERSION) \ + --additional-bsl-plugins=$(ADDITIONAL_BSL_PLUGINS) \ + --additional-bsl-object-store-provider="$(ADDITIONAL_OBJECT_STORE_PROVIDER)" \ + --additional-bsl-credentials-file=$(ADDITIONAL_CREDS_FILE) \ + --additional-bsl-bucket=$(ADDITIONAL_BSL_BUCKET) \ + --additional-bsl-prefix=$(ADDITIONAL_BSL_PREFIX) \ + --additional-bsl-config=$(ADDITIONAL_BSL_CONFIG) \ + --default-cluster-context=$(DEFAULT_CLUSTER) \ + --standby-cluster-context=$(STANDBY_CLUSTER) \ + --snapshot-move-data=$(SNAPSHOT_MOVE_DATA) \ + --data-mover-plugin=$(DATA_MOVER_PLUGIN) \ + --standby-cluster-cloud-provider=$(STANDBY_CLUSTER_CLOUD_PROVIDER) \ + --standby-cluster-plugins=$(STANDBY_CLUSTER_PLUGINS) \ + --standby-cluster-object-store-provider=$(STANDBY_CLUSTER_OBJECT_STORE_PROVIDER) \ + --default-cluster-name=$(DEFAULT_CLUSTER_NAME) \ + --standby-cluster-name=$(STANDBY_CLUSTER_NAME) \ + --eks-policy-arn=$(EKS_POLICY_ARN) \ + --default-cls-service-account-name=$(DEFAULT_CLS_SERVICE_ACCOUNT_NAME) \ + --standby-cls-service-account-name=$(STANDBY_CLS_SERVICE_ACCOUNT_NAME) + --kibishii-directory=$(KIBISHII_DIRECTORY) \ + --disable-informer-cache=$(DISABLE_INFORMER_CACHE) .PHONY: run-perf run-perf: ginkgo @@ -200,20 +204,25 @@ run-perf: ginkgo (echo "Bucket to store the backups from E2E tests is required, please re-run with BSL_BUCKET="; exit 1 ) @[ "${CLOUD_PROVIDER}" ] && echo "Using cloud provider ${CLOUD_PROVIDER}" || \ (echo "Cloud provider for target cloud/plugin provider is required, please rerun with CLOUD_PROVIDER="; exit 1) - @$(GINKGO) run -v $(FOCUS_STR) $(SKIP_STR) --junit-report report.xml ./perf -- $(COMMON_ARGS) \ - -nfs-server-path=$(NFS_SERVER_PATH) \ - -test-case-describe=$(TEST_CASE_DESCRIBE) \ - -backup-for-restore=$(BACKUP_FOR_RESTORE) \ - -delete-cluster-resource=$(Delete_Cluster_Resource) \ - -node-agent-pod-cpu-limit=$(NODE_AGENT_POD_CPU_LIMIT) \ - -node-agent-pod-mem-limit=$(NODE_AGENT_POD_MEM_LIMIT) \ - -node-agent-pod-cpu-request=$(NODE_AGENT_POD_CPU_REQUEST) \ - -node-agent-pod-mem-request=$(NODE_AGENT_POD_MEM_REQUEST) \ - -velero-pod-cpu-limit=$(VELERO_POD_CPU_LIMIT) \ - -velero-pod-mem-limit=$(VELERO_POD_MEM_LIMIT) \ - -velero-pod-cpu-request=$(VELERO_POD_CPU_REQUEST) \ - -velero-pod-mem-request=$(VELERO_POD_MEM_REQUEST) \ - -pod-volume-operation-timeout=$(POD_VOLUME_OPERATION_TIMEOUT) + @$(GINKGO) run \ + -v \ + --junit-report report.xml \ + --label-filter="$(GINKGO_LABELS)" \ + ./perf \ + -- $(COMMON_ARGS) \ + --nfs-server-path=$(NFS_SERVER_PATH) \ + --test-case-describe=$(TEST_CASE_DESCRIBE) \ + --backup-for-restore=$(BACKUP_FOR_RESTORE) \ + --delete-cluster-resource=$(Delete_Cluster_Resource) \ + --node-agent-pod-cpu-limit=$(NODE_AGENT_POD_CPU_LIMIT) \ + --node-agent-pod-mem-limit=$(NODE_AGENT_POD_MEM_LIMIT) \ + --node-agent-pod-cpu-request=$(NODE_AGENT_POD_CPU_REQUEST) \ + --node-agent-pod-mem-request=$(NODE_AGENT_POD_MEM_REQUEST) \ + --velero-pod-cpu-limit=$(VELERO_POD_CPU_LIMIT) \ + --velero-pod-mem-limit=$(VELERO_POD_MEM_LIMIT) \ + --velero-pod-cpu-request=$(VELERO_POD_CPU_REQUEST) \ + --velero-pod-mem-request=$(VELERO_POD_MEM_REQUEST) \ + --pod-volume-operation-timeout=$(POD_VOLUME_OPERATION_TIMEOUT) build: ginkgo mkdir -p $(OUTPUT_DIR) diff --git a/test/e2e/README.md b/test/e2e/README.md index b1d825999..4da989f8e 100644 --- a/test/e2e/README.md +++ b/test/e2e/README.md @@ -157,9 +157,9 @@ Basic examples: BSL_CONFIG="resourceGroup=$AZURE_BACKUP_RESOURCE_GROUP,storageAccount=$AZURE_STORAGE_ACCOUNT_ID,subscriptionId=$AZURE_BACKUP_SUBSCRIPTION_ID" BSL_BUCKET= CREDS_FILE=/path/to/azure-creds CLOUD_PROVIDER=azure make test-e2e ``` Please refer to `velero-plugin-for-microsoft-azure` documentation for instruction to [set up permissions for Velero](https://github.com/vmware-tanzu/velero-plugin-for-microsoft-azure#set-permissions-for-velero) and to [set up azure storage account and blob container](https://github.com/vmware-tanzu/velero-plugin-for-microsoft-azure#setup-azure-storage-account-and-blob-container) -1. Run Ginko-focused Restore Multi-API Groups tests using Minio as the backup storage location: +1. Run Multi-API group and version tests using MinIO as the backup storage location: ```bash - BSL_CONFIG="region=minio,s3ForcePathStyle=\"true\",s3Url=:9000" BSL_PREFIX= BSL_BUCKET= CREDS_FILE= CLOUD_PROVIDER=kind OBJECT_STORE_PROVIDER=aws VELERO_NAMESPACE="velero" GINKGO_FOCUS="API group versions" make test-e2e + BSL_CONFIG="region=minio,s3ForcePathStyle=\"true\",s3Url=:9000" BSL_PREFIX= BSL_BUCKET= CREDS_FILE= CLOUD_PROVIDER=kind OBJECT_STORE_PROVIDER=aws VELERO_NAMESPACE="velero" GINKGO_LABELS="APIGroup && APIVersion" make test-e2e ``` 1. Run Velero tests in a kind cluster with AWS (or Minio) as the storage provider and use Microsoft Azure as the storage provider for an additional Backup Storage Location: ```bash @@ -208,60 +208,66 @@ Migration examples: ``` -1. Datamover tests: +1. Data mover tests: - The example shows all essential `make` variables for a Datamover test which is migrate from a AKS cluster to a EKS cluster. + The example shows all essential `make` variables for a data mover test which is migrate from a AKS cluster to a EKS cluster. Note: STANDBY_CLUSTER_CLOUD_PROVIDER and STANDBY_CLUSTER_OBJECT_STORE_PROVIDER is essential here, it is for identify plugins to be installed on target cluster, since DEFAULT cluster's provider is different from STANDBY cluster, plugins are different as well. ```bash + CLOUD_PROVIDER=azure \ + DEFAULT_CLUSTER= \ + STANDBY_CLUSTER= \ + FEATURES=EnableCSI \ + OBJECT_STORE_PROVIDER=aws \ + CREDS_FILE= \ + BSL_CONFIG=region= \ + BSL_BUCKET= \ + BSL_PREFIX= \ + VSL_CONFIG=region= \ + SNAPSHOT_MOVE_DATA=true \ + STANDBY_CLUSTER_CLOUD_PROVIDER=aws \ + STANDBY_CLUSTER_OBJECT_STORE_PROVIDER=aws \ + GINKGO_LABELS="Migration" \ make test-e2e - CLOUD_PROVIDER=azure \ - DEFAULT_CLUSTER= \ - STANDBY_CLUSTER= \ - FEATURES=EnableCSI \ - OBJECT_STORE_PROVIDER=aws \ - CREDS_FILE= \ - BSL_CONFIG=region= \ - BSL_BUCKET= \ - BSL_PREFIX= \ - VSL_CONFIG=region= \ - SNAPSHOT_MOVE_DATA=true \ - STANDBY_CLUSTER_CLOUD_PROVIDER=aws \ - STANDBY_CLUSTER_OBJECT_STORE_PROVIDER=aws \ - GINKGO_FOCUS=Migration ``` ## Filtering tests -Velero E2E tests uses [Ginkgo](https://onsi.github.io/ginkgo/) testing framework which allows a subset of the tests to be run using the [`-focus` and `-skip`](https://onsi.github.io/ginkgo/#focused-specs) flags to ginkgo. +In release-1.15, Velero bumps the [Ginkgo](https://onsi.github.io/ginkgo/) version to [v2](https://onsi.github.io/ginkgo/MIGRATING_TO_V2). +Velero E2E start to use [labels](https://onsi.github.io/ginkgo/#spec-labels) to filter cases instead of [`-focus` and `-skip`](https://onsi.github.io/ginkgo/#focused-specs) parameters. -For filtering tests, using `make` variables `GINKGO_FOCUS` and `GINKGO_SKIP` : -1. `GINKGO_FOCUS`: Dot-separated list of labels to be included for Ginkgo description-based filtering. Optional. The `-focus` flag is passed to ginkgo using the `GINKGO_FOCUS` `make` variable. This can be used to focus on specific tests. -1. `GINKGO_SKIP`: Dot-separated list of labels to be excluded for Ginkgo description-based filtering.Optional. The `-skip ` flag is passed to ginkgo using the `GINKGO_SKIP` `make` variable. This can be used to skip specific tests. +Both `make run-e2e` and `make run-perf` CLI support using parameter `GINKGO_LABELS` to filter test cases. + +`GINKGO_LABELS` is interpreted into `ginkgo run` CLI's parameter [`--label-filter`](https://onsi.github.io/ginkgo/#spec-labels). - -`GINKGO_FOCUS`/`GINKGO_SKIP` can be interpreted into multiple `-focus`/`-skip ` describe in [Description-Based Filtering](https://onsi.github.io/ginkgo/#description-based-filtering:~:text=Description%2DBased%20Filtering) by dot-separated format for test execution management please refer to examples below.: - - -For example, E2E tests can be run with specific cases to be included and/or excluded using the commands below: +### Examples +E2E tests can be run with specific cases to be included and/or excluded using the commands below: 1. Run Velero tests with specific cases to be included: ```bash + GINKGO_LABELS="Basic && Restic" \ + CLOUD_PROVIDER=aws \ + BSL_BUCKET=example-bucket \ + CREDS_FILE=/path/to/aws-creds \ make test-e2e \ - GINKGO_FOCUS =Basic\][\Restic \ - CLOUD_PROVIDER=aws BSL_BUCKET= BSL_PREFIX= CREDS_FILE=/path/to/aws-creds ``` - In this example, only case `[Basic][Restic]` is included. + In this example, only case have both `Basic` and `Restic` labels are included. 1. Run Velero tests with specific cases to be excluded: ```bash - make test-e2e \ - GINKGO_SKIP=Scale.Schedule.TTL.Upgrade\]\[Restic.Migration\][\Restic \ - CLOUD_PROVIDER=aws BSL_BUCKET= BSL_PREFIX= CREDS_FILE=/path/to/aws-creds + GINKGO_LABELS="!Scale || !Schedule || !TTL || !(Upgrade && Restic) || !(Migration && Restic)" \ + CLOUD_PROVIDER=aws \ + BSL_BUCKET=example-bucket \ + CREDS_FILE=/path/to/aws-creds \ + make test-e2e ``` - In this example, case `Scale`, `Schedule`, `TTL`, `[Upgrade][Restic]` and `[Migration][Restic]` will be skipped. - - + In this example, cases are labelled as + * `Scale` + * `Schedule` + * `TTL` + * `Upgrade` and `Restic` + * `Migration` and `Restic` + will be skipped. ## Full Tests execution diff --git a/test/e2e/basic/api-group/enable_api_group_versions.go b/test/e2e/basic/api-group/enable_api_group_versions.go index 4fc17ed97..7f67dbf6e 100644 --- a/test/e2e/basic/api-group/enable_api_group_versions.go +++ b/test/e2e/basic/api-group/enable_api_group_versions.go @@ -53,7 +53,7 @@ type apiGropuVersionsTest struct { want map[string]map[string]string } -func APIGropuVersionsTest() { +func APIGroupVersionsTest() { var ( group string err error diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index 45b093b13..0d22510bc 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -112,79 +112,126 @@ func init() { // caused by no expected snapshot found. If we use retain as reclaim policy, then this label can be ignored, all test // cases can be executed as expected successful result. -var _ = Describe("[APIGroup][APIVersion][SKIP_KIND] Velero tests with various CRD API group versions", APIGropuVersionsTest) -var _ = Describe("[APIGroup][APIExtensions][SKIP_KIND] CRD of apiextentions v1beta1 should be B/R successfully from cluster(k8s version < 1.22) to cluster(k8s version >= 1.22)", APIExtensionsVersionsTest) +var _ = Describe("Velero tests with various CRD API group versions", + Label("APIGroup", "APIVersion", "SKIP_KIND"), APIGroupVersionsTest) +var _ = Describe("CRD of apiextentions v1beta1 should be B/R successfully from cluster(k8s version < 1.22) to cluster(k8s version >= 1.22)", + Label("APIGroup", "APIExtensions", "SKIP_KIND"), APIExtensionsVersionsTest) -// Test backup and restore of Kibishi using restic -var _ = Describe("[Basic][Restic] Velero tests on cluster using the plugin provider for object storage and Restic for volume backups", BackupRestoreWithRestic) +// Test backup and restore of Kibishii using restic +var _ = Describe("Velero tests on cluster using the plugin provider for object storage and Restic for volume backups", + Label("Basic", "Restic"), BackupRestoreWithRestic) -var _ = Describe("[Basic][Snapshot][SkipVanillaZfs] Velero tests on cluster using the plugin provider for object storage and snapshots for volume backups", BackupRestoreWithSnapshots) +var _ = Describe("Velero tests on cluster using the plugin provider for object storage and snapshots for volume backups", + Label("Basic", "Snapshot", "SkipVanillaZfs"), BackupRestoreWithSnapshots) -var _ = Describe("[Basic][Snapshot][RetainPV] Velero tests on cluster using the plugin provider for object storage and snapshots for volume backups", BackupRestoreRetainedPVWithSnapshots) +var _ = Describe("Velero tests on cluster using the plugin provider for object storage and snapshots for volume backups", + Label("Basic", "Snapshot", "RetainPV"), BackupRestoreRetainedPVWithSnapshots) -var _ = Describe("[Basic][Restic][RetainPV] Velero tests on cluster using the plugin provider for object storage and snapshots for volume backups", BackupRestoreRetainedPVWithRestic) +var _ = Describe("Velero tests on cluster using the plugin provider for object storage and snapshots for volume backups", + Label("Basic", "Restic", "RetainPV"), BackupRestoreRetainedPVWithRestic) -var _ = Describe("[Basic][ClusterResource] Backup/restore of cluster resources", ResourcesCheckTest) +var _ = Describe("Backup/restore of cluster resources", + Label("Basic", "ClusterResource"), ResourcesCheckTest) -var _ = Describe("[Scale][LongTime] Backup/restore of 2500 namespaces", MultiNSBackupRestore) +var _ = Describe("Service NodePort reservation during restore is configurable", + Label("Basic", "NodePort"), NodePortTest) -// Upgrade test by Kibishi using restic -var _ = Describe("[Upgrade][Restic] Velero upgrade tests on cluster using the plugin provider for object storage and Restic for volume backups", BackupUpgradeRestoreWithRestic) -var _ = Describe("[Upgrade][Snapshot][SkipVanillaZfs] Velero upgrade tests on cluster using the plugin provider for object storage and snapshots for volume backups", BackupUpgradeRestoreWithSnapshots) +var _ = Describe("Storage class of persistent volumes and persistent volume claims can be changed during restores", + Label("Basic", "StorageClass"), StorageClasssChangingTest) + +var _ = Describe("Node selectors of persistent volume claims can be changed during restores", + Label("Basic", "SelectedNode", "SKIP_KIND"), PVCSelectedNodeChangingTest) + +var _ = Describe("Backup/restore of 2500 namespaces", + Label("Scale", "LongTime"), MultiNSBackupRestore) + +// Upgrade test by Kibishii using Restic +var _ = Describe("Velero upgrade tests on cluster using the plugin provider for object storage and Restic for volume backups", + Label("Upgrade", "Restic"), BackupUpgradeRestoreWithRestic) +var _ = Describe("Velero upgrade tests on cluster using the plugin provider for object storage and snapshots for volume backups", + Label("Upgrade", "Snapshot", "SkipVanillaZfs"), BackupUpgradeRestoreWithSnapshots) // test filter objects by namespace, type, or labels when backup or restore. -var _ = Describe("[ResourceFiltering][ExcludeFromBackup] Resources with the label velero.io/exclude-from-backup=true are not included in backup", ExcludeFromBackupTest) -var _ = Describe("[ResourceFiltering][ExcludeNamespaces][Backup] Velero test on exclude namespace from the cluster backup", BackupWithExcludeNamespaces) -var _ = Describe("[ResourceFiltering][ExcludeNamespaces][Restore] Velero test on exclude namespace from the cluster restore", RestoreWithExcludeNamespaces) -var _ = Describe("[ResourceFiltering][ExcludeResources][Backup] Velero test on exclude resources from the cluster backup", BackupWithExcludeResources) -var _ = Describe("[ResourceFiltering][ExcludeResources][Restore] Velero test on exclude resources from the cluster restore", RestoreWithExcludeResources) -var _ = Describe("[ResourceFiltering][IncludeNamespaces][Backup] Velero test on include namespace from the cluster backup", BackupWithIncludeNamespaces) -var _ = Describe("[ResourceFiltering][IncludeNamespaces][Restore] Velero test on include namespace from the cluster restore", RestoreWithIncludeNamespaces) -var _ = Describe("[ResourceFiltering][IncludeResources][Backup] Velero test on include resources from the cluster backup", BackupWithIncludeResources) -var _ = Describe("[ResourceFiltering][IncludeResources][Restore] Velero test on include resources from the cluster restore", RestoreWithIncludeResources) -var _ = Describe("[ResourceFiltering][LabelSelector] Velero test on backup include resources matching the label selector", BackupWithLabelSelector) -var _ = Describe("[ResourceFiltering][ResourcePolicies][Restic] Velero test on skip backup of volume by resource policies", ResourcePoliciesTest) +var _ = Describe("Resources with the label velero.io/exclude-from-backup=true are not included in backup", + Label("ResourceFiltering", "ExcludeFromBackup"), ExcludeFromBackupTest) +var _ = Describe("Velero test on exclude namespace from the cluster backup", + Label("ResourceFiltering", "ExcludeNamespaces", "Backup"), BackupWithExcludeNamespaces) +var _ = Describe("Velero test on exclude namespace from the cluster restore", + Label("ResourceFiltering", "ExcludeNamespaces", "Restore"), RestoreWithExcludeNamespaces) +var _ = Describe("Velero test on exclude resources from the cluster backup", + Label("ResourceFiltering", "ExcludeResources", "Backup"), BackupWithExcludeResources) +var _ = Describe("Velero test on exclude resources from the cluster restore", + Label("ResourceFiltering", "ExcludeResources", "Restore"), RestoreWithExcludeResources) +var _ = Describe("Velero test on include namespace from the cluster backup", + Label("ResourceFiltering", "IncludeNamespaces", "Backup"), BackupWithIncludeNamespaces) +var _ = Describe("Velero test on include namespace from the cluster restore", + Label("ResourceFiltering", "IncludeNamespaces", "Restore"), RestoreWithIncludeNamespaces) +var _ = Describe("Velero test on include resources from the cluster backup", + Label("ResourceFiltering", "IncludeResources", "Backup"), BackupWithIncludeResources) +var _ = Describe("Velero test on include resources from the cluster restore", + Label("ResourceFiltering", "IncludeResources", "Restore"), RestoreWithIncludeResources) +var _ = Describe("Velero test on backup include resources matching the label selector", + Label("ResourceFiltering", "LabelSelector"), BackupWithLabelSelector) +var _ = Describe("Velero test on skip backup of volume by resource policies", + Label("ResourceFiltering", "ResourcePolicies", "Restic"), ResourcePoliciesTest) // backup VolumeInfo test -var _ = Describe("[BackupVolumeInfo][SkippedVolume]", SkippedVolumeInfoTest) -var _ = Describe("[BackupVolumeInfo][FilesystemUpload]", FilesystemUploadVolumeInfoTest) -var _ = Describe("[BackupVolumeInfo][CSIDataMover]", CSIDataMoverVolumeInfoTest) -var _ = Describe("[BackupVolumeInfo][CSISnapshot]", CSISnapshotVolumeInfoTest) -var _ = Describe("[BackupVolumeInfo][NativeSnapshot]", NativeSnapshotVolumeInfoTest) +var _ = Describe("", Label("BackupVolumeInfo", "SkippedVolume"), SkippedVolumeInfoTest) +var _ = Describe("", Label("BackupVolumeInfo", "FilesystemUpload"), FilesystemUploadVolumeInfoTest) +var _ = Describe("", Label("BackupVolumeInfo", "CSIDataMover"), CSIDataMoverVolumeInfoTest) +var _ = Describe("", Label("BackupVolumeInfo", "CSISnapshot"), CSISnapshotVolumeInfoTest) +var _ = Describe("", Label("BackupVolumeInfo", "NativeSnapshot"), NativeSnapshotVolumeInfoTest) -var _ = Describe("[ResourceModifier][Restore] Velero test on resource modifiers from the cluster restore", ResourceModifiersTest) +var _ = Describe("Velero test on resource modifiers from the cluster restore", + Label("ResourceModifier", "Restore"), ResourceModifiersTest) -var _ = Describe("[Backups][Deletion][Restic] Velero tests of Restic backup deletion", BackupDeletionWithRestic) -var _ = Describe("[Backups][Deletion][Snapshot][SkipVanillaZfs] Velero tests of snapshot backup deletion", BackupDeletionWithSnapshots) -var _ = Describe("[Backups][TTL][LongTime][Snapshot][SkipVanillaZfs] Local backups and restic repos will be deleted once the corresponding backup storage location is deleted", TTLTest) -var _ = Describe("[Backups][BackupsSync] Backups in object storage are synced to a new Velero and deleted backups in object storage are synced to be deleted in Velero", BackupsSyncTest) +var _ = Describe("Velero tests of Restic backup deletion", + Label("Backups", "Deletion", "Restic"), BackupDeletionWithRestic) +var _ = Describe("Velero tests of snapshot backup deletion", + Label("Backups", "Deletion", "Snapshot", "SkipVanillaZfs"), BackupDeletionWithSnapshots) +var _ = Describe("Local backups and Restic repos will be deleted once the corresponding backup storage location is deleted", + Label("Backups", "TTL", "LongTime", "Snapshot", "SkipVanillaZfs"), TTLTest) +var _ = Describe("Backups in object storage are synced to a new Velero and deleted backups in object storage are synced to be deleted in Velero", + Label("Backups", "BackupsSync"), BackupsSyncTest) -var _ = Describe("[Schedule][BR][Pause][LongTime] Backup will be created periodly by schedule defined by a Cron expression", ScheduleBackupTest) -var _ = Describe("[Schedule][OrderedResources] Backup resources should follow the specific order in schedule", ScheduleOrderedResources) -var _ = Describe("[Schedule][BackupCreation][SKIP_KIND] Schedule controller wouldn't create a new backup when it still has pending or InProgress backup", ScheduleBackupCreationTest) +var _ = Describe("Backup will be created periodically by schedule defined by a Cron expression", + Label("Schedule", "BR", "Pause", "LongTime"), ScheduleBackupTest) +var _ = Describe("Backup resources should follow the specific order in schedule", + Label("Schedule", "OrderedResources"), ScheduleOrderedResources) +var _ = Describe("Schedule controller wouldn't create a new backup when it still has pending or InProgress backup", + Label("Schedule", "BackupCreation", "SKIP_KIND"), ScheduleBackupCreationTest) -var _ = Describe("[PrivilegesMgmt][SSR] Velero test on ssr object when controller namespace mix-ups", SSRTest) +var _ = Describe("Velero test on ssr object when controller namespace mix-ups", + Label("PrivilegesMgmt", "SSR"), SSRTest) -var _ = Describe("[BSL][Deletion][Snapshot][SkipVanillaZfs] Local backups will be deleted once the corresponding backup storage location is deleted", BslDeletionWithSnapshots) -var _ = Describe("[BSL][Deletion][Restic] Local backups and restic repos will be deleted once the corresponding backup storage location is deleted", BslDeletionWithRestic) +var _ = Describe("Local backups will be deleted once the corresponding backup storage location is deleted", + Label("BSL", "Deletion", "Snapshot", "SkipVanillaZfs"), BslDeletionWithSnapshots) +var _ = Describe("Local backups and Restic repos will be deleted once the corresponding backup storage location is deleted", + Label("BSL", "Deletion", "Restic"), BslDeletionWithRestic) -var _ = Describe("[Migration][Restic] Migrate resources between clusters by Restic", MigrationWithRestic) -var _ = Describe("[Migration][Snapshot][SkipVanillaZfs] Migrate resources between clusters by snapshot", MigrationWithSnapshots) +var _ = Describe("Migrate resources between clusters by Restic", + Label("Migration", "Restic"), MigrationWithRestic) +var _ = Describe("Migrate resources between clusters by snapshot", + Label("Migration", "Snapshot", "SkipVanillaZfs"), MigrationWithSnapshots) -var _ = Describe("[NamespaceMapping][Single][Restic] Backup resources should follow the specific order in schedule", OneNamespaceMappingResticTest) -var _ = Describe("[NamespaceMapping][Multiple][Restic] Backup resources should follow the specific order in schedule", MultiNamespacesMappingResticTest) -var _ = Describe("[NamespaceMapping][Single][Snapshot][SkipVanillaZfs] Backup resources should follow the specific order in schedule", OneNamespaceMappingSnapshotTest) -var _ = Describe("[NamespaceMapping][Multiple][Snapshot]SkipVanillaZfs] Backup resources should follow the specific order in schedule", MultiNamespacesMappingSnapshotTest) +var _ = Describe("Backup resources should follow the specific order in schedule", + Label("NamespaceMapping", "Single", "Restic"), OneNamespaceMappingResticTest) +var _ = Describe("Backup resources should follow the specific order in schedule", + Label("NamespaceMapping", "Multiple", "Restic"), MultiNamespacesMappingResticTest) +var _ = Describe("Backup resources should follow the specific order in schedule", + Label("NamespaceMapping", "Single", "Snapshot", "SkipVanillaZfs"), OneNamespaceMappingSnapshotTest) +var _ = Describe("Backup resources should follow the specific order in schedule", + Label("NamespaceMapping", "Multiple", "Snapshot", "SkipVanillaZfs"), MultiNamespacesMappingSnapshotTest) -var _ = Describe("Backup resources should follow the specific order in schedule", Label("pv-backup", "Opt-In"), OptInPVBackupTest) -var _ = Describe("Backup resources should follow the specific order in schedule", Label("pv-backup", "Opt-Out"), OptOutPVBackupTest) +var _ = Describe("Backup resources should follow the specific order in schedule", + Label("PVBackup", "OptIn"), OptInPVBackupTest) +var _ = Describe("Backup resources should follow the specific order in schedule", + Label("PVBackup", "OptOut"), OptOutPVBackupTest) -var _ = Describe("[Basic][Nodeport] Service nodeport reservation during restore is configurable", NodePortTest) -var _ = Describe("[Basic][StorageClass] Storage class of persistent volumes and persistent volume claims can be changed during restores", StorageClasssChangingTest) -var _ = Describe("[Basic][SelectedNode][SKIP_KIND] Node selectors of persistent volume claims can be changed during restores", PVCSelectedNodeChangingTest) - -var _ = Describe("[UploaderConfig][ParallelFilesUpload] Velero test on parallel files upload", ParallelFilesUploadTest) -var _ = Describe("[UploaderConfig][ParallelFilesDownload] Velero test on parallel files download", ParallelFilesDownloadTest) +var _ = Describe("Velero test on parallel files upload", + Label("UploaderConfig", "ParallelFilesUpload"), ParallelFilesUploadTest) +var _ = Describe("Velero test on parallel files download", + Label("UploaderConfig", "ParallelFilesDownload"), ParallelFilesDownloadTest) func GetKubeconfigContext() error { var err error diff --git a/test/perf/e2e_suite_test.go b/test/perf/e2e_suite_test.go index a2aa85d63..e047749c2 100644 --- a/test/perf/e2e_suite_test.go +++ b/test/perf/e2e_suite_test.go @@ -94,11 +94,14 @@ func initConfig() error { return nil } -var _ = Describe("[PerformanceTest][BackupAndRestore] Velero test on both backup and restore resources", test.TestFunc(&basic.BasicTest{})) +var _ = Describe("Velero test on both backup and restore resources", + Label("PerformanceTest", "BackupAndRestore"), test.TestFunc(&basic.BasicTest{})) -var _ = Describe("[PerformanceTest][Backup] Velero test on only backup resources", test.TestFunc(&backup.BackupTest{})) +var _ = Describe("Velero test on only backup resources", + Label("PerformanceTest", "Backup"), test.TestFunc(&backup.BackupTest{})) -var _ = Describe("[PerformanceTest][Restore] Velero test on only restore resources", test.TestFunc(&restore.RestoreTest{})) +var _ = Describe("Velero test on only restore resources", + Label("PerformanceTest", "Restore"), test.TestFunc(&restore.RestoreTest{})) func TestE2e(t *testing.T) { flag.Parse() From 71c75d6dcba69ccaecde4132117f45cba5b68fe0 Mon Sep 17 00:00:00 2001 From: Xun Jiang Date: Wed, 24 Jul 2024 15:34:25 +0800 Subject: [PATCH 3/4] Set the Ginkgo timeout to 5 hours. Signed-off-by: Xun Jiang --- test/Makefile | 2 ++ test/e2e/README.md | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/test/Makefile b/test/Makefile index bdf87183b..ff62230b3 100644 --- a/test/Makefile +++ b/test/Makefile @@ -169,6 +169,7 @@ run-e2e: ginkgo -v \ --junit-report report.xml \ --label-filter="$(GINKGO_LABELS)" \ + --timeout=5h \ ./e2e \ -- $(COMMON_ARGS) \ --upgrade-from-velero-cli=$(UPGRADE_FROM_VELERO_CLI) \ @@ -208,6 +209,7 @@ run-perf: ginkgo -v \ --junit-report report.xml \ --label-filter="$(GINKGO_LABELS)" \ + --timeout=5h \ ./perf \ -- $(COMMON_ARGS) \ --nfs-server-path=$(NFS_SERVER_PATH) \ diff --git a/test/e2e/README.md b/test/e2e/README.md index 4da989f8e..1afb113fb 100644 --- a/test/e2e/README.md +++ b/test/e2e/README.md @@ -255,7 +255,7 @@ E2E tests can be run with specific cases to be included and/or excluded using th 1. Run Velero tests with specific cases to be excluded: ```bash - GINKGO_LABELS="!Scale || !Schedule || !TTL || !(Upgrade && Restic) || !(Migration && Restic)" \ + GINKGO_LABELS="!(Scale || Schedule || TTL || (Upgrade && Restic) || (Migration && Restic))" \ CLOUD_PROVIDER=aws \ BSL_BUCKET=example-bucket \ CREDS_FILE=/path/to/aws-creds \ From faa704d9091814c989d04336b396b91b5ca2e3e9 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 11 Jul 2024 14:15:28 +0800 Subject: [PATCH 4/4] data mover ms watcher Signed-off-by: Lyndon-Li --- changelogs/unreleased/7999-Lyndon-Li | 1 + pkg/cmd/cli/nodeagent/server.go | 3 + pkg/datapath/manager.go | 20 + pkg/datapath/manager_test.go | 36 +- pkg/datapath/micro_service_watcher.go | 437 +++++++++++++++ pkg/datapath/micro_service_watcher_test.go | 603 +++++++++++++++++++++ pkg/datapath/types.go | 4 +- pkg/exposer/csi_snapshot_test.go | 2 +- pkg/exposer/generic_restore_test.go | 2 +- pkg/test/test_logger.go | 12 + pkg/util/kube/pod.go | 72 ++- pkg/util/kube/pod_test.go | 275 ++++++++++ pkg/util/logging/default_logger.go | 16 +- pkg/util/logging/default_logger_test.go | 2 +- pkg/util/logging/log_merge_hook.go | 113 ++++ pkg/util/logging/log_merge_hook_test.go | 185 +++++++ 16 files changed, 1772 insertions(+), 11 deletions(-) create mode 100644 changelogs/unreleased/7999-Lyndon-Li create mode 100644 pkg/datapath/micro_service_watcher.go create mode 100644 pkg/datapath/micro_service_watcher_test.go create mode 100644 pkg/util/logging/log_merge_hook.go create mode 100644 pkg/util/logging/log_merge_hook_test.go diff --git a/changelogs/unreleased/7999-Lyndon-Li b/changelogs/unreleased/7999-Lyndon-Li new file mode 100644 index 000000000..87a719248 --- /dev/null +++ b/changelogs/unreleased/7999-Lyndon-Li @@ -0,0 +1 @@ +Data mover ms watcher according to design #7576 \ No newline at end of file diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 2748569f3..3a18ab862 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -189,6 +189,9 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi &velerov2alpha1api.DataDownload{}: { Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(), }, + &v1.Event{}: { + Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(), + }, }, } mgr, err := ctrl.NewManager(clientConfig, ctrl.Options{ diff --git a/pkg/datapath/manager.go b/pkg/datapath/manager.go index df60f165b..0b790a5cc 100644 --- a/pkg/datapath/manager.go +++ b/pkg/datapath/manager.go @@ -22,11 +22,14 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" ) var ConcurrentLimitExceed error = errors.New("Concurrent number exceeds") var FSBRCreator = newFileSystemBR +var MicroServiceBRWatcherCreator = newMicroServiceBRWatcher type Manager struct { cocurrentNum int @@ -56,6 +59,23 @@ func (m *Manager) CreateFileSystemBR(jobName string, requestorType string, ctx c return m.tracker[jobName], nil } +// CreateMicroServiceBRWatcher creates a new micro service watcher instance +func (m *Manager) CreateMicroServiceBRWatcher(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string, + taskName string, namespace string, podName string, containerName string, associatedObject string, callbacks Callbacks, resume bool, log logrus.FieldLogger) (AsyncBR, error) { + m.trackerLock.Lock() + defer m.trackerLock.Unlock() + + if !resume { + if len(m.tracker) >= m.cocurrentNum { + return nil, ConcurrentLimitExceed + } + } + + m.tracker[taskName] = MicroServiceBRWatcherCreator(client, kubeClient, mgr, taskType, taskName, namespace, podName, containerName, associatedObject, callbacks, log) + + return m.tracker[taskName], nil +} + // RemoveAsyncBR removes a file system backup/restore data path instance func (m *Manager) RemoveAsyncBR(jobName string) { m.trackerLock.Lock() diff --git a/pkg/datapath/manager_test.go b/pkg/datapath/manager_test.go index fda574400..0db605134 100644 --- a/pkg/datapath/manager_test.go +++ b/pkg/datapath/manager_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestManager(t *testing.T) { +func TestCreateFileSystemBR(t *testing.T) { m := NewManager(2) async_job_1, err := m.CreateFileSystemBR("job-1", "test", context.TODO(), nil, "velero", Callbacks{}, nil) @@ -50,3 +50,37 @@ func TestManager(t *testing.T) { ret = m.GetAsyncBR("job-1") assert.Nil(t, ret) } + +func TestCreateMicroServiceBRWatcher(t *testing.T) { + m := NewManager(2) + + async_job_1, err := m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-1", "velero", "pod-1", "container", "du-1", Callbacks{}, false, nil) + assert.NoError(t, err) + + _, err = m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-2", "velero", "pod-2", "container", "du-2", Callbacks{}, false, nil) + assert.NoError(t, err) + + _, err = m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-3", "velero", "pod-3", "container", "du-3", Callbacks{}, false, nil) + assert.Equal(t, ConcurrentLimitExceed, err) + + async_job_4, err := m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-4", "velero", "pod-4", "container", "du-4", Callbacks{}, true, nil) + assert.NoError(t, err) + + ret := m.GetAsyncBR("job-0") + assert.Nil(t, ret) + + ret = m.GetAsyncBR("job-1") + assert.Equal(t, async_job_1, ret) + + ret = m.GetAsyncBR("job-4") + assert.Equal(t, async_job_4, ret) + + m.RemoveAsyncBR("job-0") + assert.Len(t, m.tracker, 3) + + m.RemoveAsyncBR("job-1") + assert.Len(t, m.tracker, 2) + + ret = m.GetAsyncBR("job-1") + assert.Nil(t, ret) +} diff --git a/pkg/datapath/micro_service_watcher.go b/pkg/datapath/micro_service_watcher.go new file mode 100644 index 000000000..a5826459a --- /dev/null +++ b/pkg/datapath/micro_service_watcher.go @@ -0,0 +1,437 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datapath + +import ( + "context" + "encoding/json" + "os" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/kube" + + ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/vmware-tanzu/velero/pkg/util/logging" +) + +const ( + TaskTypeBackup = "backup" + TaskTypeRestore = "restore" + + ErrCancelled = "data path is canceled" + + EventReasonStarted = "Data-Path-Started" + EventReasonCompleted = "Data-Path-Completed" + EventReasonFailed = "Data-Path-Failed" + EventReasonCancelled = "Data-Path-Canceled" + EventReasonProgress = "Data-Path-Progress" +) + +type microServiceBRWatcher struct { + ctx context.Context + cancel context.CancelFunc + log logrus.FieldLogger + client client.Client + kubeClient kubernetes.Interface + mgr manager.Manager + namespace string + callbacks Callbacks + taskName string + taskType string + thisPod string + thisContainer string + associatedObject string + eventCh chan *v1.Event + podCh chan *v1.Pod + startedFromEvent bool + terminatedFromEvent bool + wgWatcher sync.WaitGroup + eventInformer ctrlcache.Informer + podInformer ctrlcache.Informer + eventHandler cache.ResourceEventHandlerRegistration + podHandler cache.ResourceEventHandlerRegistration +} + +func newMicroServiceBRWatcher(client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string, taskName string, namespace string, + podName string, containerName string, associatedObject string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR { + ms := µServiceBRWatcher{ + mgr: mgr, + client: client, + kubeClient: kubeClient, + namespace: namespace, + callbacks: callbacks, + taskType: taskType, + taskName: taskName, + thisPod: podName, + thisContainer: containerName, + associatedObject: associatedObject, + eventCh: make(chan *v1.Event, 10), + podCh: make(chan *v1.Pod, 2), + wgWatcher: sync.WaitGroup{}, + log: log, + } + + return ms +} + +func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) error { + succeeded := false + + eventInformer, err := ms.mgr.GetCache().GetInformer(ctx, &v1.Event{}) + if err != nil { + return errors.Wrap(err, "error getting event informer") + } + + podInformer, err := ms.mgr.GetCache().GetInformer(ctx, &v1.Pod{}) + if err != nil { + return errors.Wrap(err, "error getting pod informer") + } + + eventHandler, err := eventInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + evt := obj.(*v1.Event) + if evt.InvolvedObject.Namespace != ms.namespace || evt.InvolvedObject.Name != ms.associatedObject { + return + } + + ms.log.Infof("Pushed adding event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject) + + ms.eventCh <- evt + }, + UpdateFunc: func(_, obj interface{}) { + evt := obj.(*v1.Event) + if evt.InvolvedObject.Namespace != ms.namespace || evt.InvolvedObject.Name != ms.associatedObject { + return + } + + ms.log.Infof("Pushed updating event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject) + + ms.eventCh <- evt + }, + }, + ) + + if err != nil { + return errors.Wrap(err, "error registering event handler") + } + + defer func() { + if !succeeded { + if err := eventInformer.RemoveEventHandler(eventHandler); err != nil { + ms.log.WithError(err).Warn("Failed to remove event handler") + } + } + }() + + podHandler, err := podInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(_, obj interface{}) { + pod := obj.(*v1.Pod) + if pod.Namespace != ms.namespace || pod.Name != ms.thisPod { + return + } + + if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed { + ms.podCh <- pod + } + }, + }, + ) + + if err != nil { + return errors.Wrap(err, "error registering pod handler") + } + + defer func() { + if !succeeded { + if err := podInformer.RemoveEventHandler(podHandler); err != nil { + ms.log.WithError(err).Warn("Failed to remove pod handler") + } + } + }() + + ms.log.WithFields( + logrus.Fields{ + "taskType": ms.taskType, + "taskName": ms.taskName, + "thisPod": ms.thisPod, + }).Info("MicroServiceBR is initialized") + + ms.eventInformer = eventInformer + ms.podInformer = podInformer + ms.eventHandler = eventHandler + ms.podHandler = podHandler + + ms.ctx, ms.cancel = context.WithCancel(ctx) + + succeeded = true + + return nil +} + +func (ms *microServiceBRWatcher) Close(ctx context.Context) { + if ms.cancel != nil { + ms.cancel() + ms.cancel = nil + } + + ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("Closing MicroServiceBR") + + ms.wgWatcher.Wait() + + if ms.eventInformer != nil && ms.eventHandler != nil { + if err := ms.eventInformer.RemoveEventHandler(ms.eventHandler); err != nil { + ms.log.WithError(err).Warn("Failed to remove event handler") + } + } + + if ms.podInformer != nil && ms.podHandler != nil { + if err := ms.podInformer.RemoveEventHandler(ms.podHandler); err != nil { + ms.log.WithError(err).Warn("Failed to remove pod handler") + } + } + + ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed") +} + +func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error { + ms.log.Infof("Start watching backup ms for source %v", source) + + if err := ms.reEnsureThisPod(); err != nil { + return err + } + + ms.startWatch() + + return nil +} + +func (ms *microServiceBRWatcher) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error { + ms.log.Infof("Start watching restore ms to target %v, from snapshot %s", target, snapshotID) + + if err := ms.reEnsureThisPod(); err != nil { + return err + } + + ms.startWatch() + + return nil +} + +func (ms *microServiceBRWatcher) reEnsureThisPod() error { + thisPod := &v1.Pod{} + if err := ms.client.Get(ms.ctx, types.NamespacedName{ + Namespace: ms.namespace, + Name: ms.thisPod, + }, thisPod); err != nil { + return errors.Wrapf(err, "error getting this pod %s", ms.thisPod) + } + + if thisPod.Status.Phase == v1.PodSucceeded || thisPod.Status.Phase == v1.PodFailed { + ms.podCh <- thisPod + ms.log.WithField("this pod", ms.thisPod).Infof("This pod comes to terminital status %s before watch start", thisPod.Status.Phase) + } + + return nil +} + +var funcGetPodTerminationMessage = kube.GetPodContainerTerminateMessage +var funcRedirectLog = redirectDataMoverLogs +var funcGetResultFromMessage = getResultFromMessage +var funcGetProgressFromMessage = getProgressFromMessage + +var eventWaitTimeout time.Duration = time.Minute + +func (ms *microServiceBRWatcher) startWatch() { + ms.wgWatcher.Add(1) + + go func() { + ms.log.Info("Start watching data path pod") + + var lastPod *v1.Pod + + watchLoop: + for { + select { + case <-ms.ctx.Done(): + break watchLoop + case pod := <-ms.podCh: + lastPod = pod + break watchLoop + case evt := <-ms.eventCh: + ms.onEvent(evt) + } + } + + if lastPod == nil { + ms.log.Warn("Data path pod watch loop is canceled") + ms.wgWatcher.Done() + return + } + + epilogLoop: + for !ms.startedFromEvent || !ms.terminatedFromEvent { + select { + case <-time.After(eventWaitTimeout): + break epilogLoop + case evt := <-ms.eventCh: + ms.onEvent(evt) + } + } + + terminateMessage := funcGetPodTerminationMessage(lastPod, ms.thisContainer) + + logger := ms.log.WithField("data path pod", lastPod.Name) + + logger.Infof("Finish waiting data path pod, phase %s, message %s", lastPod.Status.Phase, terminateMessage) + + if !ms.startedFromEvent { + logger.Warn("VGDP seems not started") + } + + if ms.startedFromEvent && !ms.terminatedFromEvent { + logger.Warn("VGDP started but termination event is not received") + } + + logger.Info("Recording data path pod logs") + + if err := funcRedirectLog(ms.ctx, ms.kubeClient, ms.namespace, lastPod.Name, ms.thisContainer, ms.log); err != nil { + logger.WithError(err).Warn("Failed to collect data mover logs") + } + + logger.Info("Calling callback on data path pod termination") + + if lastPod.Status.Phase == v1.PodSucceeded { + ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log)) + } else { + if terminateMessage == ErrCancelled { + ms.callbacks.OnCancelled(ms.ctx, ms.namespace, ms.taskName) + } else { + ms.callbacks.OnFailed(ms.ctx, ms.namespace, ms.taskName, errors.New(terminateMessage)) + } + } + + logger.Info("Complete callback on data path pod termination") + + ms.wgWatcher.Done() + }() +} + +func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) { + switch evt.Reason { + case EventReasonStarted: + ms.startedFromEvent = true + ms.log.Infof("Received data path start message %s", evt.Message) + case EventReasonProgress: + ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log)) + case EventReasonCompleted: + ms.log.Infof("Received data path completed message %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log)) + ms.terminatedFromEvent = true + case EventReasonCancelled: + ms.log.Infof("Received data path canceled message %s", evt.Message) + ms.terminatedFromEvent = true + case EventReasonFailed: + ms.log.Infof("Received data path failed message %s", evt.Message) + ms.terminatedFromEvent = true + default: + ms.log.Debugf("Received event for data mover %s.[reason %s, message %s]", ms.taskName, evt.Reason, evt.Message) + } +} + +func getResultFromMessage(taskType string, message string, logger logrus.FieldLogger) Result { + result := Result{} + + if taskType == TaskTypeBackup { + backupResult := BackupResult{} + err := json.Unmarshal([]byte(message), &backupResult) + if err != nil { + logger.WithError(err).Errorf("Failed to unmarshal result message %s", message) + } else { + result.Backup = backupResult + } + } else { + restoreResult := RestoreResult{} + err := json.Unmarshal([]byte(message), &restoreResult) + if err != nil { + logger.WithError(err).Errorf("Failed to unmarshal result message %s", message) + } else { + result.Restore = restoreResult + } + } + + return result +} + +func getProgressFromMessage(message string, logger logrus.FieldLogger) *uploader.Progress { + progress := &uploader.Progress{} + err := json.Unmarshal([]byte(message), progress) + if err != nil { + logger.WithError(err).Debugf("Failed to unmarshal progress message %s", message) + } + + return progress +} + +func (ms *microServiceBRWatcher) Cancel() { + ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is canceled") +} + +var funcCreateTemp = os.CreateTemp +var funcCollectPodLogs = kube.CollectPodLogs + +func redirectDataMoverLogs(ctx context.Context, kubeClient kubernetes.Interface, namespace string, thisPod string, thisContainer string, logger logrus.FieldLogger) error { + logger.Infof("Starting to collect data mover pod log for %s", thisPod) + + logFile, err := funcCreateTemp("", "") + if err != nil { + return errors.Wrap(err, "error to create temp file for data mover pod log") + } + + defer logFile.Close() + + logFileName := logFile.Name() + logger.Infof("Created log file %s", logFileName) + + err = funcCollectPodLogs(ctx, kubeClient.CoreV1(), thisPod, namespace, thisContainer, logFile) + if err != nil { + return errors.Wrapf(err, "error to collect logs to %s for data mover pod %s", logFileName, thisPod) + } + + logFile.Close() + + logger.Infof("Redirecting to log file %s", logFileName) + + hookLogger := logger.WithField(logging.LogSourceKey, logFileName) + hookLogger.Logln(logging.ListeningLevel, logging.ListeningMessage) + + logger.Infof("Completed to collect data mover pod log for %s", thisPod) + + return nil +} diff --git a/pkg/datapath/micro_service_watcher_test.go b/pkg/datapath/micro_service_watcher_test.go new file mode 100644 index 000000000..f10f6b331 --- /dev/null +++ b/pkg/datapath/micro_service_watcher_test.go @@ -0,0 +1,603 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datapath + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path" + "strings" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + kubeclientfake "k8s.io/client-go/kubernetes/fake" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/vmware-tanzu/velero/pkg/builder" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/logging" +) + +func TestReEnsureThisPod(t *testing.T) { + tests := []struct { + name string + namespace string + thisPod string + kubeClientObj []runtime.Object + expectChan bool + expectErr string + }{ + { + name: "get pod error", + thisPod: "fak-pod-1", + expectErr: "error getting this pod fak-pod-1: pods \"fak-pod-1\" not found", + }, + { + name: "get pod not in terminated state", + namespace: "velero", + thisPod: "fake-pod-1", + kubeClientObj: []runtime.Object{ + builder.ForPod("velero", "fake-pod-1").Phase(v1.PodRunning).Result(), + }, + }, + { + name: "get pod succeed state", + namespace: "velero", + thisPod: "fake-pod-1", + kubeClientObj: []runtime.Object{ + builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + }, + expectChan: true, + }, + { + name: "get pod failed state", + namespace: "velero", + thisPod: "fake-pod-1", + kubeClientObj: []runtime.Object{ + builder.ForPod("velero", "fake-pod-1").Phase(v1.PodFailed).Result(), + }, + expectChan: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + scheme := runtime.NewScheme() + v1.AddToScheme(scheme) + fakeClientBuilder := fake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(scheme) + + fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build() + + ms := µServiceBRWatcher{ + namespace: test.namespace, + thisPod: test.thisPod, + client: fakeClient, + podCh: make(chan *v1.Pod, 2), + log: velerotest.NewLogger(), + } + + err := ms.reEnsureThisPod() + if test.expectErr != "" { + assert.EqualError(t, err, test.expectErr) + } else { + if test.expectChan { + assert.Len(t, ms.podCh, 1) + pod := <-ms.podCh + assert.Equal(t, pod.Name, test.thisPod) + } + } + }) + } +} + +type startWatchFake struct { + terminationMessage string + redirectErr error + complete bool + failed bool + canceled bool + progress int +} + +func (sw *startWatchFake) getPodContainerTerminateMessage(pod *v1.Pod, container string) string { + return sw.terminationMessage +} + +func (sw *startWatchFake) redirectDataMoverLogs(ctx context.Context, kubeClient kubernetes.Interface, namespace string, thisPod string, thisContainer string, logger logrus.FieldLogger) error { + return sw.redirectErr +} + +func (sw *startWatchFake) getResultFromMessage(_ string, _ string, _ logrus.FieldLogger) Result { + return Result{} +} + +func (sw *startWatchFake) OnCompleted(ctx context.Context, namespace string, task string, result Result) { + sw.complete = true +} + +func (sw *startWatchFake) OnFailed(ctx context.Context, namespace string, task string, err error) { + sw.failed = true +} + +func (sw *startWatchFake) OnCancelled(ctx context.Context, namespace string, task string) { + sw.canceled = true +} + +func (sw *startWatchFake) OnProgress(ctx context.Context, namespace string, task string, progress *uploader.Progress) { + sw.progress++ +} + +type insertEvent struct { + event *v1.Event + after time.Duration + delay time.Duration +} + +func TestStartWatch(t *testing.T) { + tests := []struct { + name string + namespace string + thisPod string + thisContainer string + terminationMessage string + redirectLogErr error + insertPod *v1.Pod + insertEventsBefore []insertEvent + insertEventsAfter []insertEvent + ctxCancel bool + expectStartEvent bool + expectTerminateEvent bool + expectComplete bool + expectCancel bool + expectFail bool + expectProgress int + }{ + { + name: "exit from ctx", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + ctxCancel: true, + }, + { + name: "completed with rantional sequence", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + { + event: &v1.Event{Reason: EventReasonCompleted}, + delay: time.Second, + }, + }, + expectStartEvent: true, + expectTerminateEvent: true, + expectComplete: true, + }, + { + name: "completed", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + { + event: &v1.Event{Reason: EventReasonCompleted}, + }, + }, + expectStartEvent: true, + expectTerminateEvent: true, + expectComplete: true, + }, + { + name: "completed with redirect error", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + { + event: &v1.Event{Reason: EventReasonCompleted}, + }, + }, + redirectLogErr: errors.New("fake-error"), + expectStartEvent: true, + expectTerminateEvent: true, + expectComplete: true, + }, + { + name: "complete but terminated event not received in time", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + }, + insertEventsAfter: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + after: time.Second * 6, + }, + }, + expectStartEvent: true, + expectComplete: true, + }, + { + name: "complete but terminated event not received immediately", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + }, + insertEventsAfter: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonCompleted}, + after: time.Second, + }, + }, + expectStartEvent: true, + expectTerminateEvent: true, + expectComplete: true, + }, + { + name: "completed with progress", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + { + event: &v1.Event{Reason: EventReasonProgress, Message: "fake-progress-1"}, + }, + { + event: &v1.Event{Reason: EventReasonProgress, Message: "fake-progress-2"}, + }, + { + event: &v1.Event{Reason: EventReasonCompleted}, + delay: time.Second, + }, + }, + expectStartEvent: true, + expectTerminateEvent: true, + expectComplete: true, + expectProgress: 2, + }, + { + name: "failed", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodFailed).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + { + event: &v1.Event{Reason: EventReasonCancelled}, + }, + }, + terminationMessage: "fake-termination-message-1", + expectStartEvent: true, + expectTerminateEvent: true, + expectFail: true, + }, + { + name: "pod crash", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodFailed).Result(), + terminationMessage: "fake-termination-message-2", + expectFail: true, + }, + { + name: "canceled", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodFailed).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + { + event: &v1.Event{Reason: EventReasonCancelled}, + }, + }, + terminationMessage: ErrCancelled, + expectStartEvent: true, + expectTerminateEvent: true, + expectCancel: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + eventWaitTimeout = time.Second * 5 + + sw := startWatchFake{ + terminationMessage: test.terminationMessage, + redirectErr: test.redirectLogErr, + } + funcGetPodTerminationMessage = sw.getPodContainerTerminateMessage + funcRedirectLog = sw.redirectDataMoverLogs + funcGetResultFromMessage = sw.getResultFromMessage + + ms := µServiceBRWatcher{ + ctx: ctx, + namespace: test.namespace, + thisPod: test.thisPod, + thisContainer: test.thisContainer, + podCh: make(chan *v1.Pod, 2), + eventCh: make(chan *v1.Event, 10), + log: velerotest.NewLogger(), + callbacks: Callbacks{ + OnCompleted: sw.OnCompleted, + OnFailed: sw.OnFailed, + OnCancelled: sw.OnCancelled, + OnProgress: sw.OnProgress, + }, + } + + ms.startWatch() + + if test.ctxCancel { + cancel() + } + + for _, ev := range test.insertEventsBefore { + if ev.after != 0 { + time.Sleep(ev.after) + } + + ms.eventCh <- ev.event + + if ev.delay != 0 { + time.Sleep(ev.delay) + } + } + + if test.insertPod != nil { + ms.podCh <- test.insertPod + } + + for _, ev := range test.insertEventsAfter { + if ev.after != 0 { + time.Sleep(ev.after) + } + + ms.eventCh <- ev.event + + if ev.delay != 0 { + time.Sleep(ev.delay) + } + } + + ms.wgWatcher.Wait() + + assert.Equal(t, test.expectStartEvent, ms.startedFromEvent) + assert.Equal(t, test.expectTerminateEvent, ms.terminatedFromEvent) + assert.Equal(t, test.expectComplete, sw.complete) + assert.Equal(t, test.expectCancel, sw.canceled) + assert.Equal(t, test.expectFail, sw.failed) + assert.Equal(t, test.expectProgress, sw.progress) + + cancel() + }) + } +} + +func TestGetResultFromMessage(t *testing.T) { + tests := []struct { + name string + taskType string + message string + expectResult Result + }{ + { + name: "error to unmarshall backup result", + taskType: TaskTypeBackup, + message: "fake-message", + expectResult: Result{}, + }, + { + name: "error to unmarshall restore result", + taskType: TaskTypeRestore, + message: "fake-message", + expectResult: Result{}, + }, + { + name: "succeed to unmarshall backup result", + taskType: TaskTypeBackup, + message: "{\"snapshotID\":\"fake-snapshot-id\",\"emptySnapshot\":true,\"source\":{\"byPath\":\"fake-path-1\",\"volumeMode\":\"Block\"}}", + expectResult: Result{ + Backup: BackupResult{ + SnapshotID: "fake-snapshot-id", + EmptySnapshot: true, + Source: AccessPoint{ + ByPath: "fake-path-1", + VolMode: uploader.PersistentVolumeBlock, + }, + }, + }, + }, + { + name: "succeed to unmarshall restore result", + taskType: TaskTypeRestore, + message: "{\"target\":{\"byPath\":\"fake-path-2\",\"volumeMode\":\"Filesystem\"}}", + expectResult: Result{ + Restore: RestoreResult{ + Target: AccessPoint{ + ByPath: "fake-path-2", + VolMode: uploader.PersistentVolumeFilesystem, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := getResultFromMessage(test.taskType, test.message, velerotest.NewLogger()) + assert.Equal(t, test.expectResult, result) + }) + } +} + +func TestGetProgressFromMessage(t *testing.T) { + tests := []struct { + name string + message string + expectProgress uploader.Progress + }{ + { + name: "error to unmarshall progress", + message: "fake-message", + expectProgress: uploader.Progress{}, + }, + { + name: "succeed to unmarshall progress", + message: "{\"totalBytes\":1000,\"doneBytes\":200}", + expectProgress: uploader.Progress{ + TotalBytes: 1000, + BytesDone: 200, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + progress := getProgressFromMessage(test.message, velerotest.NewLogger()) + assert.Equal(t, test.expectProgress, *progress) + }) + } +} + +type redirectFake struct { + logFile *os.File + createTempErr error + getPodLogErr error + logMessage string +} + +func (rf *redirectFake) fakeCreateTempFile(_ string, _ string) (*os.File, error) { + if rf.createTempErr != nil { + return nil, rf.createTempErr + } + + return rf.logFile, nil +} + +func (rf *redirectFake) fakeCollectPodLogs(_ context.Context, _ corev1client.CoreV1Interface, _ string, _ string, _ string, output io.Writer) error { + if rf.getPodLogErr != nil { + return rf.getPodLogErr + } + + _, err := output.Write([]byte(rf.logMessage)) + + return err +} + +func TestRedirectDataMoverLogs(t *testing.T) { + logFileName := path.Join(os.TempDir(), "test-logger-file.log") + + var buffer string + + tests := []struct { + name string + thisPod string + logMessage string + logger logrus.FieldLogger + createTempErr error + collectLogErr error + expectErr string + }{ + { + name: "error to create temp file", + thisPod: "fake-pod", + createTempErr: errors.New("fake-create-temp-error"), + logger: velerotest.NewLogger(), + expectErr: "error to create temp file for data mover pod log: fake-create-temp-error", + }, + { + name: "error to collect pod log", + thisPod: "fake-pod", + collectLogErr: errors.New("fake-collect-log-error"), + logger: velerotest.NewLogger(), + expectErr: fmt.Sprintf("error to collect logs to %s for data mover pod fake-pod: fake-collect-log-error", logFileName), + }, + { + name: "succeed", + thisPod: "fake-pod", + logMessage: "fake-log-message-01\nfake-log-message-02\nfake-log-message-03\n", + logger: velerotest.NewSingleLoggerWithHooks(&buffer, logging.DefaultHooks(true)), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + buffer = "" + + logFile, err := os.Create(logFileName) + require.NoError(t, err) + + rf := redirectFake{ + logFile: logFile, + createTempErr: test.createTempErr, + getPodLogErr: test.collectLogErr, + logMessage: test.logMessage, + } + + funcCreateTemp = rf.fakeCreateTempFile + funcCollectPodLogs = rf.fakeCollectPodLogs + + fakeKubeClient := kubeclientfake.NewSimpleClientset() + + err = redirectDataMoverLogs(context.Background(), fakeKubeClient, "", test.thisPod, "", test.logger) + if test.expectErr != "" { + assert.EqualError(t, err, test.expectErr) + } else { + assert.NoError(t, err) + + assert.True(t, strings.Contains(buffer, test.logMessage)) + } + }) + } +} diff --git a/pkg/datapath/types.go b/pkg/datapath/types.go index c98cd284a..a2fac3ed5 100644 --- a/pkg/datapath/types.go +++ b/pkg/datapath/types.go @@ -50,8 +50,8 @@ type Callbacks struct { // AccessPoint represents an access point that has been exposed to a data path instance type AccessPoint struct { - ByPath string - VolMode uploader.PersistentVolumeMode + ByPath string `json:"byPath"` + VolMode uploader.PersistentVolumeMode `json:"volumeMode"` } // AsyncBR is the interface for asynchronous data path methods diff --git a/pkg/exposer/csi_snapshot_test.go b/pkg/exposer/csi_snapshot_test.go index cc0a895e1..44c29b1d5 100644 --- a/pkg/exposer/csi_snapshot_test.go +++ b/pkg/exposer/csi_snapshot_test.go @@ -672,7 +672,7 @@ func TestPeekExpose(t *testing.T) { kubeClientObj: []runtime.Object{ backupPodUrecoverable, }, - err: "Pod is in abnormal state Failed", + err: "Pod is in abnormal state [Failed], message []", }, { name: "succeed", diff --git a/pkg/exposer/generic_restore_test.go b/pkg/exposer/generic_restore_test.go index 6080f8b97..9108ba228 100644 --- a/pkg/exposer/generic_restore_test.go +++ b/pkg/exposer/generic_restore_test.go @@ -456,7 +456,7 @@ func TestRestorePeekExpose(t *testing.T) { kubeClientObj: []runtime.Object{ restorePodUrecoverable, }, - err: "Pod is in abnormal state Failed", + err: "Pod is in abnormal state [Failed], message []", }, { name: "succeed", diff --git a/pkg/test/test_logger.go b/pkg/test/test_logger.go index b890fd5da..65dc8422a 100644 --- a/pkg/test/test_logger.go +++ b/pkg/test/test_logger.go @@ -50,3 +50,15 @@ func NewSingleLogger(buffer *string) logrus.FieldLogger { logger.Level = logrus.TraceLevel return logrus.NewEntry(logger) } + +func NewSingleLoggerWithHooks(buffer *string, hooks []logrus.Hook) logrus.FieldLogger { + logger := logrus.New() + logger.Out = &singleLogRecorder{buffer: buffer} + logger.Level = logrus.TraceLevel + + for _, hook := range hooks { + logger.Hooks.Add(hook) + } + + return logrus.NewEntry(logger) +} diff --git a/pkg/util/kube/pod.go b/pkg/util/kube/pod.go index 857fe9420..9def5d514 100644 --- a/pkg/util/kube/pod.go +++ b/pkg/util/kube/pod.go @@ -18,6 +18,7 @@ package kube import ( "context" "fmt" + "io" "time" "github.com/pkg/errors" @@ -117,8 +118,9 @@ func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) (bool, string) { // Check the Phase field if pod.Status.Phase == corev1api.PodFailed || pod.Status.Phase == corev1api.PodUnknown { - log.Warnf("Pod is in abnormal state %s", pod.Status.Phase) - return true, fmt.Sprintf("Pod is in abnormal state %s", pod.Status.Phase) + message := GetPodTerminateMessage(pod) + log.Warnf("Pod is in abnormal state %s, message [%s]", pod.Status.Phase, message) + return true, fmt.Sprintf("Pod is in abnormal state [%s], message [%s]", pod.Status.Phase, message) } // removed "Unschedulable" check since unschedulable condition isn't always permanent @@ -133,3 +135,69 @@ func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) (bool, strin } return false, "" } + +// GetPodContainerTerminateMessage returns the terminate message for a specific container of a pod +func GetPodContainerTerminateMessage(pod *corev1api.Pod, container string) string { + message := "" + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.Name == container { + if containerStatus.State.Terminated != nil { + message = containerStatus.State.Terminated.Message + } + break + } + } + + return message +} + +// GetPodTerminateMessage returns the terminate message for all containers of a pod +func GetPodTerminateMessage(pod *corev1api.Pod) string { + message := "" + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.State.Terminated != nil { + if containerStatus.State.Terminated.Message != "" { + message += containerStatus.State.Terminated.Message + "/" + } + } + } + + return message +} + +func getPodLogReader(ctx context.Context, podGetter corev1client.CoreV1Interface, pod string, namespace string, logOptions *corev1api.PodLogOptions) (io.ReadCloser, error) { + request := podGetter.Pods(namespace).GetLogs(pod, logOptions) + return request.Stream(ctx) +} + +var podLogReaderGetter = getPodLogReader + +// CollectPodLogs collects logs of the specified container of a pod and write to the output +func CollectPodLogs(ctx context.Context, podGetter corev1client.CoreV1Interface, pod string, namespace string, container string, output io.Writer) error { + logIndicator := fmt.Sprintf("***************************begin pod logs[%s/%s]***************************\n", pod, container) + + if _, err := output.Write([]byte(logIndicator)); err != nil { + return errors.Wrap(err, "error to write begin pod log indicator") + } + + logOptions := &corev1api.PodLogOptions{ + Container: container, + } + + if input, err := podLogReaderGetter(ctx, podGetter, pod, namespace, logOptions); err != nil { + logIndicator = fmt.Sprintf("No present log retrieved, err: %v\n", err) + } else { + if _, err := io.Copy(output, input); err != nil { + return errors.Wrap(err, "error to copy input") + } + + logIndicator = "" + } + + logIndicator += fmt.Sprintf("***************************end pod logs[%s/%s]***************************\n", pod, container) + if _, err := output.Write([]byte(logIndicator)); err != nil { + return errors.Wrap(err, "error to write end pod log indicator") + } + + return nil +} diff --git a/pkg/util/kube/pod_test.go b/pkg/util/kube/pod_test.go index f1cdac043..7ccb22578 100644 --- a/pkg/util/kube/pod_test.go +++ b/pkg/util/kube/pod_test.go @@ -18,6 +18,8 @@ package kube import ( "context" + "io" + "strings" "testing" "time" @@ -32,6 +34,8 @@ import ( clientTesting "k8s.io/client-go/testing" velerotest "github.com/vmware-tanzu/velero/pkg/test" + + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" ) func TestEnsureDeletePod(t *testing.T) { @@ -422,3 +426,274 @@ func TestIsPodUnrecoverable(t *testing.T) { }) } } + +func TestGetPodTerminateMessage(t *testing.T) { + tests := []struct { + name string + pod *corev1api.Pod + message string + }{ + { + name: "empty message when no container status", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + Phase: corev1api.PodFailed, + }, + }, + }, + { + name: "empty message when no termination status", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ImagePullBackOff"}}}, + }, + }, + }, + }, + { + name: "empty message when no termination message", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Reason: "fake-reason"}}}, + }, + }, + }, + }, + { + name: "with termination message", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-1"}}}, + {Name: "container-2", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-2"}}}, + {Name: "container-3", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-3"}}}, + }, + }, + }, + message: "message-1/message-2/message-3/", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + message := GetPodTerminateMessage(test.pod) + assert.Equal(t, test.message, message) + }) + } +} + +func TestGetPodContainerTerminateMessage(t *testing.T) { + tests := []struct { + name string + pod *corev1api.Pod + container string + message string + }{ + { + name: "empty message when no container status", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + Phase: corev1api.PodFailed, + }, + }, + }, + { + name: "empty message when no termination status", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ImagePullBackOff"}}}, + }, + }, + }, + container: "container-1", + }, + { + name: "empty message when no termination message", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Reason: "fake-reason"}}}, + }, + }, + }, + container: "container-1", + }, + { + name: "not matched container name", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-1"}}}, + {Name: "container-2", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-2"}}}, + {Name: "container-3", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-3"}}}, + }, + }, + }, + container: "container-0", + }, + { + name: "with termination message", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-1"}}}, + {Name: "container-2", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-2"}}}, + {Name: "container-3", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-3"}}}, + }, + }, + }, + container: "container-2", + message: "message-2", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + message := GetPodContainerTerminateMessage(test.pod, test.container) + assert.Equal(t, test.message, message) + }) + } +} + +type fakePodLog struct { + getError error + readError error + beginWriteError error + endWriteError error + writeError error + logMessage string + outputMessage string + readPos int +} + +func (fp *fakePodLog) GetPodLogReader(ctx context.Context, podGetter corev1client.CoreV1Interface, pod string, namespace string, logOptions *corev1api.PodLogOptions) (io.ReadCloser, error) { + if fp.getError != nil { + return nil, fp.getError + } + + return fp, nil +} + +func (fp *fakePodLog) Read(p []byte) (n int, err error) { + if fp.readError != nil { + return -1, fp.readError + } + + if fp.readPos == len(fp.logMessage) { + return 0, io.EOF + } + + copy(p, []byte(fp.logMessage)) + fp.readPos += len(fp.logMessage) + + return len(fp.logMessage), nil +} + +func (fp *fakePodLog) Close() error { + return nil +} + +func (fp *fakePodLog) Write(p []byte) (n int, err error) { + message := string(p) + if strings.Contains(message, "begin pod logs") { + if fp.beginWriteError != nil { + return -1, fp.beginWriteError + } + } else if strings.Contains(message, "end pod logs") { + if fp.endWriteError != nil { + return -1, fp.endWriteError + } + } else { + if fp.writeError != nil { + return -1, fp.writeError + } + } + + fp.outputMessage += message + + return len(message), nil +} + +func TestCollectPodLogs(t *testing.T) { + tests := []struct { + name string + pod string + container string + getError error + readError error + beginWriteError error + endWriteError error + writeError error + readMessage string + message string + expectErr string + }{ + { + name: "error to write begin indicator", + beginWriteError: errors.New("fake-write-error-01"), + expectErr: "error to write begin pod log indicator: fake-write-error-01", + }, + { + name: "error to get log", + pod: "fake-pod", + container: "fake-container", + getError: errors.New("fake-get-error"), + message: "***************************begin pod logs[fake-pod/fake-container]***************************\nNo present log retrieved, err: fake-get-error\n***************************end pod logs[fake-pod/fake-container]***************************\n", + }, + { + name: "error to read pod log", + pod: "fake-pod", + container: "fake-container", + readError: errors.New("fake-read-error"), + expectErr: "error to copy input: fake-read-error", + }, + { + name: "error to write pod log", + pod: "fake-pod", + container: "fake-container", + writeError: errors.New("fake-write-error-03"), + readMessage: "fake pod message 01\n fake pod message 02\n fake pod message 03\n", + expectErr: "error to copy input: fake-write-error-03", + }, + { + name: "error to write end indicator", + pod: "fake-pod", + container: "fake-container", + endWriteError: errors.New("fake-write-error-02"), + readMessage: "fake pod message 01\n fake pod message 02\n fake pod message 03\n", + expectErr: "error to write end pod log indicator: fake-write-error-02", + }, + { + name: "succeed", + pod: "fake-pod", + container: "fake-container", + readMessage: "fake pod message 01\n fake pod message 02\n fake pod message 03\n", + message: "***************************begin pod logs[fake-pod/fake-container]***************************\nfake pod message 01\n fake pod message 02\n fake pod message 03\n***************************end pod logs[fake-pod/fake-container]***************************\n", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fp := &fakePodLog{ + getError: test.getError, + readError: test.readError, + beginWriteError: test.beginWriteError, + endWriteError: test.endWriteError, + writeError: test.writeError, + logMessage: test.readMessage, + } + podLogReaderGetter = fp.GetPodLogReader + + err := CollectPodLogs(context.Background(), nil, test.pod, "", test.container, fp) + if test.expectErr != "" { + assert.EqualError(t, err, test.expectErr) + } else { + assert.NoError(t, err) + assert.Equal(t, fp.outputMessage, test.message) + } + }) + } +} diff --git a/pkg/util/logging/default_logger.go b/pkg/util/logging/default_logger.go index f1c22f80c..b374c3d84 100644 --- a/pkg/util/logging/default_logger.go +++ b/pkg/util/logging/default_logger.go @@ -24,16 +24,26 @@ import ( // DefaultHooks returns a slice of the default // logrus hooks to be used by a logger. -func DefaultHooks() []logrus.Hook { - return []logrus.Hook{ +func DefaultHooks(merge bool) []logrus.Hook { + hooks := []logrus.Hook{ &LogLocationHook{}, &ErrorLocationHook{}, } + + if merge { + hooks = append(hooks, &MergeHook{}) + } + + return hooks } // DefaultLogger returns a Logger with the default properties // and hooks. The desired output format is passed as a LogFormat Enum. func DefaultLogger(level logrus.Level, format Format) *logrus.Logger { + return createLogger(level, format, false) +} + +func createLogger(level logrus.Level, format Format, merge bool) *logrus.Logger { logger := logrus.New() if format == FormatJSON { @@ -62,7 +72,7 @@ func DefaultLogger(level logrus.Level, format Format) *logrus.Logger { logger.Level = level - for _, hook := range DefaultHooks() { + for _, hook := range DefaultHooks(merge) { logger.Hooks.Add(hook) } diff --git a/pkg/util/logging/default_logger_test.go b/pkg/util/logging/default_logger_test.go index 7aab50496..10f690757 100644 --- a/pkg/util/logging/default_logger_test.go +++ b/pkg/util/logging/default_logger_test.go @@ -34,7 +34,7 @@ func TestDefaultLogger(t *testing.T) { assert.Equal(t, os.Stdout, logger.Out) for _, level := range logrus.AllLevels { - assert.Equal(t, DefaultHooks(), logger.Hooks[level]) + assert.Equal(t, DefaultHooks(false), logger.Hooks[level]) } } } diff --git a/pkg/util/logging/log_merge_hook.go b/pkg/util/logging/log_merge_hook.go new file mode 100644 index 000000000..b993cb38a --- /dev/null +++ b/pkg/util/logging/log_merge_hook.go @@ -0,0 +1,113 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logging + +import ( + "bytes" + "io" + "os" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ( + ListeningLevel = logrus.ErrorLevel + ListeningMessage = "merge-log-57847fd0-0c7c-48e3-b5f7-984b293d8376" + LogSourceKey = "log-source" +) + +// MergeHook is used to redirect a batch of logs to another logger atomically. +// It hooks a log with ListeningMessage message, once the message is hit it replaces +// the logger's output to HookWriter so that HookWriter retrieves the logs from a file indicated +// by LogSourceKey field. +type MergeHook struct { +} + +type hookWriter struct { + orgWriter io.Writer + source string + logger *logrus.Logger +} + +func newHookWriter(orgWriter io.Writer, source string, logger *logrus.Logger) io.Writer { + return &hookWriter{ + orgWriter: orgWriter, + source: source, + logger: logger, + } +} + +func (h *MergeHook) Levels() []logrus.Level { + return []logrus.Level{ListeningLevel} +} + +func (h *MergeHook) Fire(entry *logrus.Entry) error { + if entry.Message != ListeningMessage { + return nil + } + + source, exist := entry.Data[LogSourceKey] + if !exist { + return nil + } + + entry.Logger.SetOutput(newHookWriter(entry.Logger.Out, source.(string), entry.Logger)) + + return nil +} + +func (w *hookWriter) Write(p []byte) (n int, err error) { + if !bytes.Contains(p, []byte(ListeningMessage)) { + return w.orgWriter.Write(p) + } + + defer func() { + w.logger.Out = w.orgWriter + }() + + sourceFile, err := os.OpenFile(w.source, os.O_RDONLY, 0400) + if err != nil { + return 0, err + } + defer sourceFile.Close() + + total := 0 + + buffer := make([]byte, 2048) + for { + read, err := sourceFile.Read(buffer) + if err == io.EOF { + return total, nil + } + + if err != nil { + return total, errors.Wrapf(err, "error to read source file %s at pos %v", w.source, total) + } + + written, err := w.orgWriter.Write(buffer[0:read]) + if err != nil { + return total, errors.Wrapf(err, "error to write log at pos %v", total) + } + + if written != read { + return total, errors.Errorf("error to write log at pos %v, read %v but written %v", total, read, written) + } + + total += read + } +} diff --git a/pkg/util/logging/log_merge_hook_test.go b/pkg/util/logging/log_merge_hook_test.go new file mode 100644 index 000000000..d103152b8 --- /dev/null +++ b/pkg/util/logging/log_merge_hook_test.go @@ -0,0 +1,185 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logging + +import ( + "fmt" + "os" + "testing" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMergeHook_Fire(t *testing.T) { + tests := []struct { + name string + entry logrus.Entry + expectHook bool + }{ + { + name: "normal message", + entry: logrus.Entry{ + Level: logrus.ErrorLevel, + Message: "fake-message", + }, + expectHook: false, + }, + { + name: "normal source", + entry: logrus.Entry{ + Level: logrus.ErrorLevel, + Message: ListeningMessage, + Data: logrus.Fields{"fake-key": "fake-value"}, + }, + expectHook: false, + }, + { + name: "hook hit", + entry: logrus.Entry{ + Level: logrus.ErrorLevel, + Message: ListeningMessage, + Data: logrus.Fields{LogSourceKey: "any-value"}, + Logger: &logrus.Logger{}, + }, + expectHook: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + hook := &MergeHook{} + // method under test + err := hook.Fire(&test.entry) + + assert.NoError(t, err) + + if test.expectHook { + assert.NotNil(t, test.entry.Logger.Out.(*hookWriter)) + } + }) + } +} + +type fakeWriter struct { + p []byte + writeError error + writtenLen int +} + +func (fw *fakeWriter) Write(p []byte) (n int, err error) { + if fw.writeError != nil || fw.writtenLen != -1 { + return fw.writtenLen, fw.writeError + } + + fw.p = append(fw.p, p...) + + return len(p), nil +} + +func TestMergeHook_Write(t *testing.T) { + sourceFile, err := os.CreateTemp("", "") + require.NoError(t, err) + + logMessage := "fake-message-1\nfake-message-2" + _, err = sourceFile.WriteString(logMessage) + require.NoError(t, err) + + tests := []struct { + name string + content []byte + source string + writeErr error + writtenLen int + expectError string + needRollBackHook bool + }{ + { + name: "normal message", + content: []byte("fake-message"), + writtenLen: -1, + }, + { + name: "failed to open source file", + content: []byte(ListeningMessage), + source: "non-exist", + needRollBackHook: true, + expectError: "open non-exist: no such file or directory", + }, + { + name: "write error", + content: []byte(ListeningMessage), + source: sourceFile.Name(), + writeErr: errors.New("fake-error"), + expectError: "error to write log at pos 0: fake-error", + needRollBackHook: true, + }, + { + name: "write len mismatch", + content: []byte(ListeningMessage), + source: sourceFile.Name(), + writtenLen: 100, + expectError: fmt.Sprintf("error to write log at pos 0, read %v but written 100", len(logMessage)), + needRollBackHook: true, + }, + { + name: "success", + content: []byte(ListeningMessage), + source: sourceFile.Name(), + writtenLen: -1, + needRollBackHook: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + writer := hookWriter{ + orgWriter: &fakeWriter{ + writeError: test.writeErr, + writtenLen: test.writtenLen, + }, + source: test.source, + logger: &logrus.Logger{}, + } + + n, err := writer.Write(test.content) + + if test.expectError == "" { + assert.NoError(t, err) + + expectStr := string(test.content) + if expectStr == ListeningMessage { + expectStr = logMessage + } + + assert.Len(t, expectStr, n) + + fakeWriter := writer.orgWriter.(*fakeWriter) + writtenStr := string(fakeWriter.p) + assert.Equal(t, writtenStr, expectStr) + } else { + assert.EqualError(t, err, test.expectError) + } + + if test.needRollBackHook { + assert.Equal(t, writer.logger.Out, writer.orgWriter) + } + }) + } +}