Merge pull request #7999 from Lyndon-Li/data-mover-ms-watcher-01

Data mover micro-service watcher
pull/8046/head
lyndon-li 2024-07-26 10:10:07 +08:00 committed by GitHub
commit 53b57f8bdf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 1772 additions and 11 deletions

View File

@ -0,0 +1 @@
Data mover ms watcher according to design #7576

View File

@ -189,6 +189,9 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi
&velerov2alpha1api.DataDownload{}: {
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
},
&v1.Event{}: {
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
},
},
}
mgr, err := ctrl.NewManager(clientConfig, ctrl.Options{

View File

@ -22,11 +22,14 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
var ConcurrentLimitExceed error = errors.New("Concurrent number exceeds")
var FSBRCreator = newFileSystemBR
var MicroServiceBRWatcherCreator = newMicroServiceBRWatcher
type Manager struct {
cocurrentNum int
@ -56,6 +59,23 @@ func (m *Manager) CreateFileSystemBR(jobName string, requestorType string, ctx c
return m.tracker[jobName], nil
}
// CreateMicroServiceBRWatcher creates a new micro service watcher instance
func (m *Manager) CreateMicroServiceBRWatcher(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string,
taskName string, namespace string, podName string, containerName string, associatedObject string, callbacks Callbacks, resume bool, log logrus.FieldLogger) (AsyncBR, error) {
m.trackerLock.Lock()
defer m.trackerLock.Unlock()
if !resume {
if len(m.tracker) >= m.cocurrentNum {
return nil, ConcurrentLimitExceed
}
}
m.tracker[taskName] = MicroServiceBRWatcherCreator(client, kubeClient, mgr, taskType, taskName, namespace, podName, containerName, associatedObject, callbacks, log)
return m.tracker[taskName], nil
}
// RemoveAsyncBR removes a file system backup/restore data path instance
func (m *Manager) RemoveAsyncBR(jobName string) {
m.trackerLock.Lock()

View File

@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/assert"
)
func TestManager(t *testing.T) {
func TestCreateFileSystemBR(t *testing.T) {
m := NewManager(2)
async_job_1, err := m.CreateFileSystemBR("job-1", "test", context.TODO(), nil, "velero", Callbacks{}, nil)
@ -50,3 +50,37 @@ func TestManager(t *testing.T) {
ret = m.GetAsyncBR("job-1")
assert.Nil(t, ret)
}
func TestCreateMicroServiceBRWatcher(t *testing.T) {
m := NewManager(2)
async_job_1, err := m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-1", "velero", "pod-1", "container", "du-1", Callbacks{}, false, nil)
assert.NoError(t, err)
_, err = m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-2", "velero", "pod-2", "container", "du-2", Callbacks{}, false, nil)
assert.NoError(t, err)
_, err = m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-3", "velero", "pod-3", "container", "du-3", Callbacks{}, false, nil)
assert.Equal(t, ConcurrentLimitExceed, err)
async_job_4, err := m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-4", "velero", "pod-4", "container", "du-4", Callbacks{}, true, nil)
assert.NoError(t, err)
ret := m.GetAsyncBR("job-0")
assert.Nil(t, ret)
ret = m.GetAsyncBR("job-1")
assert.Equal(t, async_job_1, ret)
ret = m.GetAsyncBR("job-4")
assert.Equal(t, async_job_4, ret)
m.RemoveAsyncBR("job-0")
assert.Len(t, m.tracker, 3)
m.RemoveAsyncBR("job-1")
assert.Len(t, m.tracker, 2)
ret = m.GetAsyncBR("job-1")
assert.Nil(t, ret)
}

View File

@ -0,0 +1,437 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datapath
import (
"context"
"encoding/json"
"os"
"sync"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/kube"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/manager"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
const (
TaskTypeBackup = "backup"
TaskTypeRestore = "restore"
ErrCancelled = "data path is canceled"
EventReasonStarted = "Data-Path-Started"
EventReasonCompleted = "Data-Path-Completed"
EventReasonFailed = "Data-Path-Failed"
EventReasonCancelled = "Data-Path-Canceled"
EventReasonProgress = "Data-Path-Progress"
)
type microServiceBRWatcher struct {
ctx context.Context
cancel context.CancelFunc
log logrus.FieldLogger
client client.Client
kubeClient kubernetes.Interface
mgr manager.Manager
namespace string
callbacks Callbacks
taskName string
taskType string
thisPod string
thisContainer string
associatedObject string
eventCh chan *v1.Event
podCh chan *v1.Pod
startedFromEvent bool
terminatedFromEvent bool
wgWatcher sync.WaitGroup
eventInformer ctrlcache.Informer
podInformer ctrlcache.Informer
eventHandler cache.ResourceEventHandlerRegistration
podHandler cache.ResourceEventHandlerRegistration
}
func newMicroServiceBRWatcher(client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string, taskName string, namespace string,
podName string, containerName string, associatedObject string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR {
ms := &microServiceBRWatcher{
mgr: mgr,
client: client,
kubeClient: kubeClient,
namespace: namespace,
callbacks: callbacks,
taskType: taskType,
taskName: taskName,
thisPod: podName,
thisContainer: containerName,
associatedObject: associatedObject,
eventCh: make(chan *v1.Event, 10),
podCh: make(chan *v1.Pod, 2),
wgWatcher: sync.WaitGroup{},
log: log,
}
return ms
}
func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) error {
succeeded := false
eventInformer, err := ms.mgr.GetCache().GetInformer(ctx, &v1.Event{})
if err != nil {
return errors.Wrap(err, "error getting event informer")
}
podInformer, err := ms.mgr.GetCache().GetInformer(ctx, &v1.Pod{})
if err != nil {
return errors.Wrap(err, "error getting pod informer")
}
eventHandler, err := eventInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
evt := obj.(*v1.Event)
if evt.InvolvedObject.Namespace != ms.namespace || evt.InvolvedObject.Name != ms.associatedObject {
return
}
ms.log.Infof("Pushed adding event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject)
ms.eventCh <- evt
},
UpdateFunc: func(_, obj interface{}) {
evt := obj.(*v1.Event)
if evt.InvolvedObject.Namespace != ms.namespace || evt.InvolvedObject.Name != ms.associatedObject {
return
}
ms.log.Infof("Pushed updating event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject)
ms.eventCh <- evt
},
},
)
if err != nil {
return errors.Wrap(err, "error registering event handler")
}
defer func() {
if !succeeded {
if err := eventInformer.RemoveEventHandler(eventHandler); err != nil {
ms.log.WithError(err).Warn("Failed to remove event handler")
}
}
}()
podHandler, err := podInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, obj interface{}) {
pod := obj.(*v1.Pod)
if pod.Namespace != ms.namespace || pod.Name != ms.thisPod {
return
}
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
ms.podCh <- pod
}
},
},
)
if err != nil {
return errors.Wrap(err, "error registering pod handler")
}
defer func() {
if !succeeded {
if err := podInformer.RemoveEventHandler(podHandler); err != nil {
ms.log.WithError(err).Warn("Failed to remove pod handler")
}
}
}()
ms.log.WithFields(
logrus.Fields{
"taskType": ms.taskType,
"taskName": ms.taskName,
"thisPod": ms.thisPod,
}).Info("MicroServiceBR is initialized")
ms.eventInformer = eventInformer
ms.podInformer = podInformer
ms.eventHandler = eventHandler
ms.podHandler = podHandler
ms.ctx, ms.cancel = context.WithCancel(ctx)
succeeded = true
return nil
}
func (ms *microServiceBRWatcher) Close(ctx context.Context) {
if ms.cancel != nil {
ms.cancel()
ms.cancel = nil
}
ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("Closing MicroServiceBR")
ms.wgWatcher.Wait()
if ms.eventInformer != nil && ms.eventHandler != nil {
if err := ms.eventInformer.RemoveEventHandler(ms.eventHandler); err != nil {
ms.log.WithError(err).Warn("Failed to remove event handler")
}
}
if ms.podInformer != nil && ms.podHandler != nil {
if err := ms.podInformer.RemoveEventHandler(ms.podHandler); err != nil {
ms.log.WithError(err).Warn("Failed to remove pod handler")
}
}
ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed")
}
func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error {
ms.log.Infof("Start watching backup ms for source %v", source)
if err := ms.reEnsureThisPod(); err != nil {
return err
}
ms.startWatch()
return nil
}
func (ms *microServiceBRWatcher) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error {
ms.log.Infof("Start watching restore ms to target %v, from snapshot %s", target, snapshotID)
if err := ms.reEnsureThisPod(); err != nil {
return err
}
ms.startWatch()
return nil
}
func (ms *microServiceBRWatcher) reEnsureThisPod() error {
thisPod := &v1.Pod{}
if err := ms.client.Get(ms.ctx, types.NamespacedName{
Namespace: ms.namespace,
Name: ms.thisPod,
}, thisPod); err != nil {
return errors.Wrapf(err, "error getting this pod %s", ms.thisPod)
}
if thisPod.Status.Phase == v1.PodSucceeded || thisPod.Status.Phase == v1.PodFailed {
ms.podCh <- thisPod
ms.log.WithField("this pod", ms.thisPod).Infof("This pod comes to terminital status %s before watch start", thisPod.Status.Phase)
}
return nil
}
var funcGetPodTerminationMessage = kube.GetPodContainerTerminateMessage
var funcRedirectLog = redirectDataMoverLogs
var funcGetResultFromMessage = getResultFromMessage
var funcGetProgressFromMessage = getProgressFromMessage
var eventWaitTimeout time.Duration = time.Minute
func (ms *microServiceBRWatcher) startWatch() {
ms.wgWatcher.Add(1)
go func() {
ms.log.Info("Start watching data path pod")
var lastPod *v1.Pod
watchLoop:
for {
select {
case <-ms.ctx.Done():
break watchLoop
case pod := <-ms.podCh:
lastPod = pod
break watchLoop
case evt := <-ms.eventCh:
ms.onEvent(evt)
}
}
if lastPod == nil {
ms.log.Warn("Data path pod watch loop is canceled")
ms.wgWatcher.Done()
return
}
epilogLoop:
for !ms.startedFromEvent || !ms.terminatedFromEvent {
select {
case <-time.After(eventWaitTimeout):
break epilogLoop
case evt := <-ms.eventCh:
ms.onEvent(evt)
}
}
terminateMessage := funcGetPodTerminationMessage(lastPod, ms.thisContainer)
logger := ms.log.WithField("data path pod", lastPod.Name)
logger.Infof("Finish waiting data path pod, phase %s, message %s", lastPod.Status.Phase, terminateMessage)
if !ms.startedFromEvent {
logger.Warn("VGDP seems not started")
}
if ms.startedFromEvent && !ms.terminatedFromEvent {
logger.Warn("VGDP started but termination event is not received")
}
logger.Info("Recording data path pod logs")
if err := funcRedirectLog(ms.ctx, ms.kubeClient, ms.namespace, lastPod.Name, ms.thisContainer, ms.log); err != nil {
logger.WithError(err).Warn("Failed to collect data mover logs")
}
logger.Info("Calling callback on data path pod termination")
if lastPod.Status.Phase == v1.PodSucceeded {
ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log))
} else {
if terminateMessage == ErrCancelled {
ms.callbacks.OnCancelled(ms.ctx, ms.namespace, ms.taskName)
} else {
ms.callbacks.OnFailed(ms.ctx, ms.namespace, ms.taskName, errors.New(terminateMessage))
}
}
logger.Info("Complete callback on data path pod termination")
ms.wgWatcher.Done()
}()
}
func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) {
switch evt.Reason {
case EventReasonStarted:
ms.startedFromEvent = true
ms.log.Infof("Received data path start message %s", evt.Message)
case EventReasonProgress:
ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log))
case EventReasonCompleted:
ms.log.Infof("Received data path completed message %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log))
ms.terminatedFromEvent = true
case EventReasonCancelled:
ms.log.Infof("Received data path canceled message %s", evt.Message)
ms.terminatedFromEvent = true
case EventReasonFailed:
ms.log.Infof("Received data path failed message %s", evt.Message)
ms.terminatedFromEvent = true
default:
ms.log.Debugf("Received event for data mover %s.[reason %s, message %s]", ms.taskName, evt.Reason, evt.Message)
}
}
func getResultFromMessage(taskType string, message string, logger logrus.FieldLogger) Result {
result := Result{}
if taskType == TaskTypeBackup {
backupResult := BackupResult{}
err := json.Unmarshal([]byte(message), &backupResult)
if err != nil {
logger.WithError(err).Errorf("Failed to unmarshal result message %s", message)
} else {
result.Backup = backupResult
}
} else {
restoreResult := RestoreResult{}
err := json.Unmarshal([]byte(message), &restoreResult)
if err != nil {
logger.WithError(err).Errorf("Failed to unmarshal result message %s", message)
} else {
result.Restore = restoreResult
}
}
return result
}
func getProgressFromMessage(message string, logger logrus.FieldLogger) *uploader.Progress {
progress := &uploader.Progress{}
err := json.Unmarshal([]byte(message), progress)
if err != nil {
logger.WithError(err).Debugf("Failed to unmarshal progress message %s", message)
}
return progress
}
func (ms *microServiceBRWatcher) Cancel() {
ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is canceled")
}
var funcCreateTemp = os.CreateTemp
var funcCollectPodLogs = kube.CollectPodLogs
func redirectDataMoverLogs(ctx context.Context, kubeClient kubernetes.Interface, namespace string, thisPod string, thisContainer string, logger logrus.FieldLogger) error {
logger.Infof("Starting to collect data mover pod log for %s", thisPod)
logFile, err := funcCreateTemp("", "")
if err != nil {
return errors.Wrap(err, "error to create temp file for data mover pod log")
}
defer logFile.Close()
logFileName := logFile.Name()
logger.Infof("Created log file %s", logFileName)
err = funcCollectPodLogs(ctx, kubeClient.CoreV1(), thisPod, namespace, thisContainer, logFile)
if err != nil {
return errors.Wrapf(err, "error to collect logs to %s for data mover pod %s", logFileName, thisPod)
}
logFile.Close()
logger.Infof("Redirecting to log file %s", logFileName)
hookLogger := logger.WithField(logging.LogSourceKey, logFileName)
hookLogger.Logln(logging.ListeningLevel, logging.ListeningMessage)
logger.Infof("Completed to collect data mover pod log for %s", thisPod)
return nil
}

View File

@ -0,0 +1,603 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datapath
import (
"context"
"errors"
"fmt"
"io"
"os"
"path"
"strings"
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
kubeclientfake "k8s.io/client-go/kubernetes/fake"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/pkg/builder"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
func TestReEnsureThisPod(t *testing.T) {
tests := []struct {
name string
namespace string
thisPod string
kubeClientObj []runtime.Object
expectChan bool
expectErr string
}{
{
name: "get pod error",
thisPod: "fak-pod-1",
expectErr: "error getting this pod fak-pod-1: pods \"fak-pod-1\" not found",
},
{
name: "get pod not in terminated state",
namespace: "velero",
thisPod: "fake-pod-1",
kubeClientObj: []runtime.Object{
builder.ForPod("velero", "fake-pod-1").Phase(v1.PodRunning).Result(),
},
},
{
name: "get pod succeed state",
namespace: "velero",
thisPod: "fake-pod-1",
kubeClientObj: []runtime.Object{
builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(),
},
expectChan: true,
},
{
name: "get pod failed state",
namespace: "velero",
thisPod: "fake-pod-1",
kubeClientObj: []runtime.Object{
builder.ForPod("velero", "fake-pod-1").Phase(v1.PodFailed).Result(),
},
expectChan: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scheme := runtime.NewScheme()
v1.AddToScheme(scheme)
fakeClientBuilder := fake.NewClientBuilder()
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
ms := &microServiceBRWatcher{
namespace: test.namespace,
thisPod: test.thisPod,
client: fakeClient,
podCh: make(chan *v1.Pod, 2),
log: velerotest.NewLogger(),
}
err := ms.reEnsureThisPod()
if test.expectErr != "" {
assert.EqualError(t, err, test.expectErr)
} else {
if test.expectChan {
assert.Len(t, ms.podCh, 1)
pod := <-ms.podCh
assert.Equal(t, pod.Name, test.thisPod)
}
}
})
}
}
type startWatchFake struct {
terminationMessage string
redirectErr error
complete bool
failed bool
canceled bool
progress int
}
func (sw *startWatchFake) getPodContainerTerminateMessage(pod *v1.Pod, container string) string {
return sw.terminationMessage
}
func (sw *startWatchFake) redirectDataMoverLogs(ctx context.Context, kubeClient kubernetes.Interface, namespace string, thisPod string, thisContainer string, logger logrus.FieldLogger) error {
return sw.redirectErr
}
func (sw *startWatchFake) getResultFromMessage(_ string, _ string, _ logrus.FieldLogger) Result {
return Result{}
}
func (sw *startWatchFake) OnCompleted(ctx context.Context, namespace string, task string, result Result) {
sw.complete = true
}
func (sw *startWatchFake) OnFailed(ctx context.Context, namespace string, task string, err error) {
sw.failed = true
}
func (sw *startWatchFake) OnCancelled(ctx context.Context, namespace string, task string) {
sw.canceled = true
}
func (sw *startWatchFake) OnProgress(ctx context.Context, namespace string, task string, progress *uploader.Progress) {
sw.progress++
}
type insertEvent struct {
event *v1.Event
after time.Duration
delay time.Duration
}
func TestStartWatch(t *testing.T) {
tests := []struct {
name string
namespace string
thisPod string
thisContainer string
terminationMessage string
redirectLogErr error
insertPod *v1.Pod
insertEventsBefore []insertEvent
insertEventsAfter []insertEvent
ctxCancel bool
expectStartEvent bool
expectTerminateEvent bool
expectComplete bool
expectCancel bool
expectFail bool
expectProgress int
}{
{
name: "exit from ctx",
thisPod: "fak-pod-1",
thisContainer: "fake-container-1",
ctxCancel: true,
},
{
name: "completed with rantional sequence",
thisPod: "fak-pod-1",
thisContainer: "fake-container-1",
insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(),
insertEventsBefore: []insertEvent{
{
event: &v1.Event{Reason: EventReasonStarted},
},
{
event: &v1.Event{Reason: EventReasonCompleted},
delay: time.Second,
},
},
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
},
{
name: "completed",
thisPod: "fak-pod-1",
thisContainer: "fake-container-1",
insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(),
insertEventsBefore: []insertEvent{
{
event: &v1.Event{Reason: EventReasonStarted},
},
{
event: &v1.Event{Reason: EventReasonCompleted},
},
},
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
},
{
name: "completed with redirect error",
thisPod: "fak-pod-1",
thisContainer: "fake-container-1",
insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(),
insertEventsBefore: []insertEvent{
{
event: &v1.Event{Reason: EventReasonStarted},
},
{
event: &v1.Event{Reason: EventReasonCompleted},
},
},
redirectLogErr: errors.New("fake-error"),
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
},
{
name: "complete but terminated event not received in time",
thisPod: "fak-pod-1",
thisContainer: "fake-container-1",
insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(),
insertEventsBefore: []insertEvent{
{
event: &v1.Event{Reason: EventReasonStarted},
},
},
insertEventsAfter: []insertEvent{
{
event: &v1.Event{Reason: EventReasonStarted},
after: time.Second * 6,
},
},
expectStartEvent: true,
expectComplete: true,
},
{
name: "complete but terminated event not received immediately",
thisPod: "fak-pod-1",
thisContainer: "fake-container-1",
insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(),
insertEventsBefore: []insertEvent{
{
event: &v1.Event{Reason: EventReasonStarted},
},
},
insertEventsAfter: []insertEvent{
{
event: &v1.Event{Reason: EventReasonCompleted},
after: time.Second,
},
},
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
},
{
name: "completed with progress",
thisPod: "fak-pod-1",
thisContainer: "fake-container-1",
insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(),
insertEventsBefore: []insertEvent{
{
event: &v1.Event{Reason: EventReasonStarted},
},
{
event: &v1.Event{Reason: EventReasonProgress, Message: "fake-progress-1"},
},
{
event: &v1.Event{Reason: EventReasonProgress, Message: "fake-progress-2"},
},
{
event: &v1.Event{Reason: EventReasonCompleted},
delay: time.Second,
},
},
expectStartEvent: true,
expectTerminateEvent: true,
expectComplete: true,
expectProgress: 2,
},
{
name: "failed",
thisPod: "fak-pod-1",
thisContainer: "fake-container-1",
insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodFailed).Result(),
insertEventsBefore: []insertEvent{
{
event: &v1.Event{Reason: EventReasonStarted},
},
{
event: &v1.Event{Reason: EventReasonCancelled},
},
},
terminationMessage: "fake-termination-message-1",
expectStartEvent: true,
expectTerminateEvent: true,
expectFail: true,
},
{
name: "pod crash",
thisPod: "fak-pod-1",
thisContainer: "fake-container-1",
insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodFailed).Result(),
terminationMessage: "fake-termination-message-2",
expectFail: true,
},
{
name: "canceled",
thisPod: "fak-pod-1",
thisContainer: "fake-container-1",
insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodFailed).Result(),
insertEventsBefore: []insertEvent{
{
event: &v1.Event{Reason: EventReasonStarted},
},
{
event: &v1.Event{Reason: EventReasonCancelled},
},
},
terminationMessage: ErrCancelled,
expectStartEvent: true,
expectTerminateEvent: true,
expectCancel: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
eventWaitTimeout = time.Second * 5
sw := startWatchFake{
terminationMessage: test.terminationMessage,
redirectErr: test.redirectLogErr,
}
funcGetPodTerminationMessage = sw.getPodContainerTerminateMessage
funcRedirectLog = sw.redirectDataMoverLogs
funcGetResultFromMessage = sw.getResultFromMessage
ms := &microServiceBRWatcher{
ctx: ctx,
namespace: test.namespace,
thisPod: test.thisPod,
thisContainer: test.thisContainer,
podCh: make(chan *v1.Pod, 2),
eventCh: make(chan *v1.Event, 10),
log: velerotest.NewLogger(),
callbacks: Callbacks{
OnCompleted: sw.OnCompleted,
OnFailed: sw.OnFailed,
OnCancelled: sw.OnCancelled,
OnProgress: sw.OnProgress,
},
}
ms.startWatch()
if test.ctxCancel {
cancel()
}
for _, ev := range test.insertEventsBefore {
if ev.after != 0 {
time.Sleep(ev.after)
}
ms.eventCh <- ev.event
if ev.delay != 0 {
time.Sleep(ev.delay)
}
}
if test.insertPod != nil {
ms.podCh <- test.insertPod
}
for _, ev := range test.insertEventsAfter {
if ev.after != 0 {
time.Sleep(ev.after)
}
ms.eventCh <- ev.event
if ev.delay != 0 {
time.Sleep(ev.delay)
}
}
ms.wgWatcher.Wait()
assert.Equal(t, test.expectStartEvent, ms.startedFromEvent)
assert.Equal(t, test.expectTerminateEvent, ms.terminatedFromEvent)
assert.Equal(t, test.expectComplete, sw.complete)
assert.Equal(t, test.expectCancel, sw.canceled)
assert.Equal(t, test.expectFail, sw.failed)
assert.Equal(t, test.expectProgress, sw.progress)
cancel()
})
}
}
func TestGetResultFromMessage(t *testing.T) {
tests := []struct {
name string
taskType string
message string
expectResult Result
}{
{
name: "error to unmarshall backup result",
taskType: TaskTypeBackup,
message: "fake-message",
expectResult: Result{},
},
{
name: "error to unmarshall restore result",
taskType: TaskTypeRestore,
message: "fake-message",
expectResult: Result{},
},
{
name: "succeed to unmarshall backup result",
taskType: TaskTypeBackup,
message: "{\"snapshotID\":\"fake-snapshot-id\",\"emptySnapshot\":true,\"source\":{\"byPath\":\"fake-path-1\",\"volumeMode\":\"Block\"}}",
expectResult: Result{
Backup: BackupResult{
SnapshotID: "fake-snapshot-id",
EmptySnapshot: true,
Source: AccessPoint{
ByPath: "fake-path-1",
VolMode: uploader.PersistentVolumeBlock,
},
},
},
},
{
name: "succeed to unmarshall restore result",
taskType: TaskTypeRestore,
message: "{\"target\":{\"byPath\":\"fake-path-2\",\"volumeMode\":\"Filesystem\"}}",
expectResult: Result{
Restore: RestoreResult{
Target: AccessPoint{
ByPath: "fake-path-2",
VolMode: uploader.PersistentVolumeFilesystem,
},
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result := getResultFromMessage(test.taskType, test.message, velerotest.NewLogger())
assert.Equal(t, test.expectResult, result)
})
}
}
func TestGetProgressFromMessage(t *testing.T) {
tests := []struct {
name string
message string
expectProgress uploader.Progress
}{
{
name: "error to unmarshall progress",
message: "fake-message",
expectProgress: uploader.Progress{},
},
{
name: "succeed to unmarshall progress",
message: "{\"totalBytes\":1000,\"doneBytes\":200}",
expectProgress: uploader.Progress{
TotalBytes: 1000,
BytesDone: 200,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
progress := getProgressFromMessage(test.message, velerotest.NewLogger())
assert.Equal(t, test.expectProgress, *progress)
})
}
}
type redirectFake struct {
logFile *os.File
createTempErr error
getPodLogErr error
logMessage string
}
func (rf *redirectFake) fakeCreateTempFile(_ string, _ string) (*os.File, error) {
if rf.createTempErr != nil {
return nil, rf.createTempErr
}
return rf.logFile, nil
}
func (rf *redirectFake) fakeCollectPodLogs(_ context.Context, _ corev1client.CoreV1Interface, _ string, _ string, _ string, output io.Writer) error {
if rf.getPodLogErr != nil {
return rf.getPodLogErr
}
_, err := output.Write([]byte(rf.logMessage))
return err
}
func TestRedirectDataMoverLogs(t *testing.T) {
logFileName := path.Join(os.TempDir(), "test-logger-file.log")
var buffer string
tests := []struct {
name string
thisPod string
logMessage string
logger logrus.FieldLogger
createTempErr error
collectLogErr error
expectErr string
}{
{
name: "error to create temp file",
thisPod: "fake-pod",
createTempErr: errors.New("fake-create-temp-error"),
logger: velerotest.NewLogger(),
expectErr: "error to create temp file for data mover pod log: fake-create-temp-error",
},
{
name: "error to collect pod log",
thisPod: "fake-pod",
collectLogErr: errors.New("fake-collect-log-error"),
logger: velerotest.NewLogger(),
expectErr: fmt.Sprintf("error to collect logs to %s for data mover pod fake-pod: fake-collect-log-error", logFileName),
},
{
name: "succeed",
thisPod: "fake-pod",
logMessage: "fake-log-message-01\nfake-log-message-02\nfake-log-message-03\n",
logger: velerotest.NewSingleLoggerWithHooks(&buffer, logging.DefaultHooks(true)),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
buffer = ""
logFile, err := os.Create(logFileName)
require.NoError(t, err)
rf := redirectFake{
logFile: logFile,
createTempErr: test.createTempErr,
getPodLogErr: test.collectLogErr,
logMessage: test.logMessage,
}
funcCreateTemp = rf.fakeCreateTempFile
funcCollectPodLogs = rf.fakeCollectPodLogs
fakeKubeClient := kubeclientfake.NewSimpleClientset()
err = redirectDataMoverLogs(context.Background(), fakeKubeClient, "", test.thisPod, "", test.logger)
if test.expectErr != "" {
assert.EqualError(t, err, test.expectErr)
} else {
assert.NoError(t, err)
assert.True(t, strings.Contains(buffer, test.logMessage))
}
})
}
}

View File

@ -50,8 +50,8 @@ type Callbacks struct {
// AccessPoint represents an access point that has been exposed to a data path instance
type AccessPoint struct {
ByPath string
VolMode uploader.PersistentVolumeMode
ByPath string `json:"byPath"`
VolMode uploader.PersistentVolumeMode `json:"volumeMode"`
}
// AsyncBR is the interface for asynchronous data path methods

View File

@ -682,7 +682,7 @@ func TestPeekExpose(t *testing.T) {
kubeClientObj: []runtime.Object{
backupPodUrecoverable,
},
err: "Pod is in abnormal state Failed",
err: "Pod is in abnormal state [Failed], message []",
},
{
name: "succeed",

View File

@ -467,7 +467,7 @@ func TestRestorePeekExpose(t *testing.T) {
kubeClientObj: []runtime.Object{
restorePodUrecoverable,
},
err: "Pod is in abnormal state Failed",
err: "Pod is in abnormal state [Failed], message []",
},
{
name: "succeed",

View File

@ -50,3 +50,15 @@ func NewSingleLogger(buffer *string) logrus.FieldLogger {
logger.Level = logrus.TraceLevel
return logrus.NewEntry(logger)
}
func NewSingleLoggerWithHooks(buffer *string, hooks []logrus.Hook) logrus.FieldLogger {
logger := logrus.New()
logger.Out = &singleLogRecorder{buffer: buffer}
logger.Level = logrus.TraceLevel
for _, hook := range hooks {
logger.Hooks.Add(hook)
}
return logrus.NewEntry(logger)
}

View File

@ -18,6 +18,7 @@ package kube
import (
"context"
"fmt"
"io"
"time"
"github.com/pkg/errors"
@ -117,8 +118,9 @@ func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface
func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) (bool, string) {
// Check the Phase field
if pod.Status.Phase == corev1api.PodFailed || pod.Status.Phase == corev1api.PodUnknown {
log.Warnf("Pod is in abnormal state %s", pod.Status.Phase)
return true, fmt.Sprintf("Pod is in abnormal state %s", pod.Status.Phase)
message := GetPodTerminateMessage(pod)
log.Warnf("Pod is in abnormal state %s, message [%s]", pod.Status.Phase, message)
return true, fmt.Sprintf("Pod is in abnormal state [%s], message [%s]", pod.Status.Phase, message)
}
// removed "Unschedulable" check since unschedulable condition isn't always permanent
@ -133,3 +135,69 @@ func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) (bool, strin
}
return false, ""
}
// GetPodContainerTerminateMessage returns the terminate message for a specific container of a pod
func GetPodContainerTerminateMessage(pod *corev1api.Pod, container string) string {
message := ""
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.Name == container {
if containerStatus.State.Terminated != nil {
message = containerStatus.State.Terminated.Message
}
break
}
}
return message
}
// GetPodTerminateMessage returns the terminate message for all containers of a pod
func GetPodTerminateMessage(pod *corev1api.Pod) string {
message := ""
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.State.Terminated != nil {
if containerStatus.State.Terminated.Message != "" {
message += containerStatus.State.Terminated.Message + "/"
}
}
}
return message
}
func getPodLogReader(ctx context.Context, podGetter corev1client.CoreV1Interface, pod string, namespace string, logOptions *corev1api.PodLogOptions) (io.ReadCloser, error) {
request := podGetter.Pods(namespace).GetLogs(pod, logOptions)
return request.Stream(ctx)
}
var podLogReaderGetter = getPodLogReader
// CollectPodLogs collects logs of the specified container of a pod and write to the output
func CollectPodLogs(ctx context.Context, podGetter corev1client.CoreV1Interface, pod string, namespace string, container string, output io.Writer) error {
logIndicator := fmt.Sprintf("***************************begin pod logs[%s/%s]***************************\n", pod, container)
if _, err := output.Write([]byte(logIndicator)); err != nil {
return errors.Wrap(err, "error to write begin pod log indicator")
}
logOptions := &corev1api.PodLogOptions{
Container: container,
}
if input, err := podLogReaderGetter(ctx, podGetter, pod, namespace, logOptions); err != nil {
logIndicator = fmt.Sprintf("No present log retrieved, err: %v\n", err)
} else {
if _, err := io.Copy(output, input); err != nil {
return errors.Wrap(err, "error to copy input")
}
logIndicator = ""
}
logIndicator += fmt.Sprintf("***************************end pod logs[%s/%s]***************************\n", pod, container)
if _, err := output.Write([]byte(logIndicator)); err != nil {
return errors.Wrap(err, "error to write end pod log indicator")
}
return nil
}

View File

@ -18,6 +18,8 @@ package kube
import (
"context"
"io"
"strings"
"testing"
"time"
@ -32,6 +34,8 @@ import (
clientTesting "k8s.io/client-go/testing"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
)
func TestEnsureDeletePod(t *testing.T) {
@ -422,3 +426,274 @@ func TestIsPodUnrecoverable(t *testing.T) {
})
}
}
func TestGetPodTerminateMessage(t *testing.T) {
tests := []struct {
name string
pod *corev1api.Pod
message string
}{
{
name: "empty message when no container status",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
Phase: corev1api.PodFailed,
},
},
},
{
name: "empty message when no termination status",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Name: "container-1", State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ImagePullBackOff"}}},
},
},
},
},
{
name: "empty message when no termination message",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Reason: "fake-reason"}}},
},
},
},
},
{
name: "with termination message",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-1"}}},
{Name: "container-2", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-2"}}},
{Name: "container-3", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-3"}}},
},
},
},
message: "message-1/message-2/message-3/",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
message := GetPodTerminateMessage(test.pod)
assert.Equal(t, test.message, message)
})
}
}
func TestGetPodContainerTerminateMessage(t *testing.T) {
tests := []struct {
name string
pod *corev1api.Pod
container string
message string
}{
{
name: "empty message when no container status",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
Phase: corev1api.PodFailed,
},
},
},
{
name: "empty message when no termination status",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Name: "container-1", State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ImagePullBackOff"}}},
},
},
},
container: "container-1",
},
{
name: "empty message when no termination message",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Reason: "fake-reason"}}},
},
},
},
container: "container-1",
},
{
name: "not matched container name",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-1"}}},
{Name: "container-2", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-2"}}},
{Name: "container-3", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-3"}}},
},
},
},
container: "container-0",
},
{
name: "with termination message",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-1"}}},
{Name: "container-2", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-2"}}},
{Name: "container-3", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-3"}}},
},
},
},
container: "container-2",
message: "message-2",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
message := GetPodContainerTerminateMessage(test.pod, test.container)
assert.Equal(t, test.message, message)
})
}
}
type fakePodLog struct {
getError error
readError error
beginWriteError error
endWriteError error
writeError error
logMessage string
outputMessage string
readPos int
}
func (fp *fakePodLog) GetPodLogReader(ctx context.Context, podGetter corev1client.CoreV1Interface, pod string, namespace string, logOptions *corev1api.PodLogOptions) (io.ReadCloser, error) {
if fp.getError != nil {
return nil, fp.getError
}
return fp, nil
}
func (fp *fakePodLog) Read(p []byte) (n int, err error) {
if fp.readError != nil {
return -1, fp.readError
}
if fp.readPos == len(fp.logMessage) {
return 0, io.EOF
}
copy(p, []byte(fp.logMessage))
fp.readPos += len(fp.logMessage)
return len(fp.logMessage), nil
}
func (fp *fakePodLog) Close() error {
return nil
}
func (fp *fakePodLog) Write(p []byte) (n int, err error) {
message := string(p)
if strings.Contains(message, "begin pod logs") {
if fp.beginWriteError != nil {
return -1, fp.beginWriteError
}
} else if strings.Contains(message, "end pod logs") {
if fp.endWriteError != nil {
return -1, fp.endWriteError
}
} else {
if fp.writeError != nil {
return -1, fp.writeError
}
}
fp.outputMessage += message
return len(message), nil
}
func TestCollectPodLogs(t *testing.T) {
tests := []struct {
name string
pod string
container string
getError error
readError error
beginWriteError error
endWriteError error
writeError error
readMessage string
message string
expectErr string
}{
{
name: "error to write begin indicator",
beginWriteError: errors.New("fake-write-error-01"),
expectErr: "error to write begin pod log indicator: fake-write-error-01",
},
{
name: "error to get log",
pod: "fake-pod",
container: "fake-container",
getError: errors.New("fake-get-error"),
message: "***************************begin pod logs[fake-pod/fake-container]***************************\nNo present log retrieved, err: fake-get-error\n***************************end pod logs[fake-pod/fake-container]***************************\n",
},
{
name: "error to read pod log",
pod: "fake-pod",
container: "fake-container",
readError: errors.New("fake-read-error"),
expectErr: "error to copy input: fake-read-error",
},
{
name: "error to write pod log",
pod: "fake-pod",
container: "fake-container",
writeError: errors.New("fake-write-error-03"),
readMessage: "fake pod message 01\n fake pod message 02\n fake pod message 03\n",
expectErr: "error to copy input: fake-write-error-03",
},
{
name: "error to write end indicator",
pod: "fake-pod",
container: "fake-container",
endWriteError: errors.New("fake-write-error-02"),
readMessage: "fake pod message 01\n fake pod message 02\n fake pod message 03\n",
expectErr: "error to write end pod log indicator: fake-write-error-02",
},
{
name: "succeed",
pod: "fake-pod",
container: "fake-container",
readMessage: "fake pod message 01\n fake pod message 02\n fake pod message 03\n",
message: "***************************begin pod logs[fake-pod/fake-container]***************************\nfake pod message 01\n fake pod message 02\n fake pod message 03\n***************************end pod logs[fake-pod/fake-container]***************************\n",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fp := &fakePodLog{
getError: test.getError,
readError: test.readError,
beginWriteError: test.beginWriteError,
endWriteError: test.endWriteError,
writeError: test.writeError,
logMessage: test.readMessage,
}
podLogReaderGetter = fp.GetPodLogReader
err := CollectPodLogs(context.Background(), nil, test.pod, "", test.container, fp)
if test.expectErr != "" {
assert.EqualError(t, err, test.expectErr)
} else {
assert.NoError(t, err)
assert.Equal(t, fp.outputMessage, test.message)
}
})
}
}

View File

@ -24,16 +24,26 @@ import (
// DefaultHooks returns a slice of the default
// logrus hooks to be used by a logger.
func DefaultHooks() []logrus.Hook {
return []logrus.Hook{
func DefaultHooks(merge bool) []logrus.Hook {
hooks := []logrus.Hook{
&LogLocationHook{},
&ErrorLocationHook{},
}
if merge {
hooks = append(hooks, &MergeHook{})
}
return hooks
}
// DefaultLogger returns a Logger with the default properties
// and hooks. The desired output format is passed as a LogFormat Enum.
func DefaultLogger(level logrus.Level, format Format) *logrus.Logger {
return createLogger(level, format, false)
}
func createLogger(level logrus.Level, format Format, merge bool) *logrus.Logger {
logger := logrus.New()
if format == FormatJSON {
@ -62,7 +72,7 @@ func DefaultLogger(level logrus.Level, format Format) *logrus.Logger {
logger.Level = level
for _, hook := range DefaultHooks() {
for _, hook := range DefaultHooks(merge) {
logger.Hooks.Add(hook)
}

View File

@ -34,7 +34,7 @@ func TestDefaultLogger(t *testing.T) {
assert.Equal(t, os.Stdout, logger.Out)
for _, level := range logrus.AllLevels {
assert.Equal(t, DefaultHooks(), logger.Hooks[level])
assert.Equal(t, DefaultHooks(false), logger.Hooks[level])
}
}
}

View File

@ -0,0 +1,113 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging
import (
"bytes"
"io"
"os"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
ListeningLevel = logrus.ErrorLevel
ListeningMessage = "merge-log-57847fd0-0c7c-48e3-b5f7-984b293d8376"
LogSourceKey = "log-source"
)
// MergeHook is used to redirect a batch of logs to another logger atomically.
// It hooks a log with ListeningMessage message, once the message is hit it replaces
// the logger's output to HookWriter so that HookWriter retrieves the logs from a file indicated
// by LogSourceKey field.
type MergeHook struct {
}
type hookWriter struct {
orgWriter io.Writer
source string
logger *logrus.Logger
}
func newHookWriter(orgWriter io.Writer, source string, logger *logrus.Logger) io.Writer {
return &hookWriter{
orgWriter: orgWriter,
source: source,
logger: logger,
}
}
func (h *MergeHook) Levels() []logrus.Level {
return []logrus.Level{ListeningLevel}
}
func (h *MergeHook) Fire(entry *logrus.Entry) error {
if entry.Message != ListeningMessage {
return nil
}
source, exist := entry.Data[LogSourceKey]
if !exist {
return nil
}
entry.Logger.SetOutput(newHookWriter(entry.Logger.Out, source.(string), entry.Logger))
return nil
}
func (w *hookWriter) Write(p []byte) (n int, err error) {
if !bytes.Contains(p, []byte(ListeningMessage)) {
return w.orgWriter.Write(p)
}
defer func() {
w.logger.Out = w.orgWriter
}()
sourceFile, err := os.OpenFile(w.source, os.O_RDONLY, 0400)
if err != nil {
return 0, err
}
defer sourceFile.Close()
total := 0
buffer := make([]byte, 2048)
for {
read, err := sourceFile.Read(buffer)
if err == io.EOF {
return total, nil
}
if err != nil {
return total, errors.Wrapf(err, "error to read source file %s at pos %v", w.source, total)
}
written, err := w.orgWriter.Write(buffer[0:read])
if err != nil {
return total, errors.Wrapf(err, "error to write log at pos %v", total)
}
if written != read {
return total, errors.Errorf("error to write log at pos %v, read %v but written %v", total, read, written)
}
total += read
}
}

View File

@ -0,0 +1,185 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging
import (
"fmt"
"os"
"testing"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMergeHook_Fire(t *testing.T) {
tests := []struct {
name string
entry logrus.Entry
expectHook bool
}{
{
name: "normal message",
entry: logrus.Entry{
Level: logrus.ErrorLevel,
Message: "fake-message",
},
expectHook: false,
},
{
name: "normal source",
entry: logrus.Entry{
Level: logrus.ErrorLevel,
Message: ListeningMessage,
Data: logrus.Fields{"fake-key": "fake-value"},
},
expectHook: false,
},
{
name: "hook hit",
entry: logrus.Entry{
Level: logrus.ErrorLevel,
Message: ListeningMessage,
Data: logrus.Fields{LogSourceKey: "any-value"},
Logger: &logrus.Logger{},
},
expectHook: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
hook := &MergeHook{}
// method under test
err := hook.Fire(&test.entry)
assert.NoError(t, err)
if test.expectHook {
assert.NotNil(t, test.entry.Logger.Out.(*hookWriter))
}
})
}
}
type fakeWriter struct {
p []byte
writeError error
writtenLen int
}
func (fw *fakeWriter) Write(p []byte) (n int, err error) {
if fw.writeError != nil || fw.writtenLen != -1 {
return fw.writtenLen, fw.writeError
}
fw.p = append(fw.p, p...)
return len(p), nil
}
func TestMergeHook_Write(t *testing.T) {
sourceFile, err := os.CreateTemp("", "")
require.NoError(t, err)
logMessage := "fake-message-1\nfake-message-2"
_, err = sourceFile.WriteString(logMessage)
require.NoError(t, err)
tests := []struct {
name string
content []byte
source string
writeErr error
writtenLen int
expectError string
needRollBackHook bool
}{
{
name: "normal message",
content: []byte("fake-message"),
writtenLen: -1,
},
{
name: "failed to open source file",
content: []byte(ListeningMessage),
source: "non-exist",
needRollBackHook: true,
expectError: "open non-exist: no such file or directory",
},
{
name: "write error",
content: []byte(ListeningMessage),
source: sourceFile.Name(),
writeErr: errors.New("fake-error"),
expectError: "error to write log at pos 0: fake-error",
needRollBackHook: true,
},
{
name: "write len mismatch",
content: []byte(ListeningMessage),
source: sourceFile.Name(),
writtenLen: 100,
expectError: fmt.Sprintf("error to write log at pos 0, read %v but written 100", len(logMessage)),
needRollBackHook: true,
},
{
name: "success",
content: []byte(ListeningMessage),
source: sourceFile.Name(),
writtenLen: -1,
needRollBackHook: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
writer := hookWriter{
orgWriter: &fakeWriter{
writeError: test.writeErr,
writtenLen: test.writtenLen,
},
source: test.source,
logger: &logrus.Logger{},
}
n, err := writer.Write(test.content)
if test.expectError == "" {
assert.NoError(t, err)
expectStr := string(test.content)
if expectStr == ListeningMessage {
expectStr = logMessage
}
assert.Len(t, expectStr, n)
fakeWriter := writer.orgWriter.(*fakeWriter)
writtenStr := string(fakeWriter.p)
assert.Equal(t, writtenStr, expectStr)
} else {
assert.EqualError(t, err, test.expectError)
}
if test.needRollBackHook {
assert.Equal(t, writer.logger.Out, writer.orgWriter)
}
})
}
}