444 lines
13 KiB
Go
444 lines
13 KiB
Go
/*
|
|
Copyright The Velero Contributors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package datapath
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
corev1api "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/cache"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
|
|
"github.com/vmware-tanzu/velero/pkg/uploader"
|
|
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
|
|
|
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
|
|
"sigs.k8s.io/controller-runtime/pkg/manager"
|
|
|
|
"github.com/vmware-tanzu/velero/pkg/util/logging"
|
|
)
|
|
|
|
const (
|
|
TaskTypeBackup = "backup"
|
|
TaskTypeRestore = "restore"
|
|
|
|
ErrCancelled = "data path is canceled"
|
|
|
|
EventReasonStarted = "Data-Path-Started"
|
|
EventReasonCompleted = "Data-Path-Completed"
|
|
EventReasonFailed = "Data-Path-Failed"
|
|
EventReasonCancelled = "Data-Path-Canceled"
|
|
EventReasonProgress = "Data-Path-Progress"
|
|
EventReasonCancelling = "Data-Path-Canceling"
|
|
EventReasonStopped = "Data-Path-Stopped"
|
|
)
|
|
|
|
type microServiceBRWatcher struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
log logrus.FieldLogger
|
|
client client.Client
|
|
kubeClient kubernetes.Interface
|
|
mgr manager.Manager
|
|
namespace string
|
|
callbacks Callbacks
|
|
taskName string
|
|
taskType string
|
|
thisPod string
|
|
thisContainer string
|
|
associatedObject string
|
|
eventCh chan *corev1api.Event
|
|
podCh chan *corev1api.Pod
|
|
startedFromEvent bool
|
|
terminatedFromEvent bool
|
|
wgWatcher sync.WaitGroup
|
|
eventInformer ctrlcache.Informer
|
|
podInformer ctrlcache.Informer
|
|
eventHandler cache.ResourceEventHandlerRegistration
|
|
podHandler cache.ResourceEventHandlerRegistration
|
|
watcherLock sync.Mutex
|
|
}
|
|
|
|
func newMicroServiceBRWatcher(client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string, taskName string, namespace string,
|
|
podName string, containerName string, associatedObject string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR {
|
|
ms := µServiceBRWatcher{
|
|
mgr: mgr,
|
|
client: client,
|
|
kubeClient: kubeClient,
|
|
namespace: namespace,
|
|
callbacks: callbacks,
|
|
taskType: taskType,
|
|
taskName: taskName,
|
|
thisPod: podName,
|
|
thisContainer: containerName,
|
|
associatedObject: associatedObject,
|
|
eventCh: make(chan *corev1api.Event, 10),
|
|
podCh: make(chan *corev1api.Pod, 2),
|
|
wgWatcher: sync.WaitGroup{},
|
|
log: log,
|
|
}
|
|
|
|
return ms
|
|
}
|
|
|
|
func (ms *microServiceBRWatcher) Init(ctx context.Context, param any) error {
|
|
eventInformer, err := ms.mgr.GetCache().GetInformer(ctx, &corev1api.Event{})
|
|
if err != nil {
|
|
return errors.Wrap(err, "error getting event informer")
|
|
}
|
|
|
|
podInformer, err := ms.mgr.GetCache().GetInformer(ctx, &corev1api.Pod{})
|
|
if err != nil {
|
|
return errors.Wrap(err, "error getting pod informer")
|
|
}
|
|
|
|
eventHandler, err := eventInformer.AddEventHandler(
|
|
cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj any) {
|
|
evt := obj.(*corev1api.Event)
|
|
if evt.InvolvedObject.Namespace != ms.namespace || evt.InvolvedObject.Name != ms.associatedObject {
|
|
return
|
|
}
|
|
|
|
ms.eventCh <- evt
|
|
},
|
|
UpdateFunc: func(_, obj any) {
|
|
evt := obj.(*corev1api.Event)
|
|
if evt.InvolvedObject.Namespace != ms.namespace || evt.InvolvedObject.Name != ms.associatedObject {
|
|
return
|
|
}
|
|
|
|
ms.eventCh <- evt
|
|
},
|
|
},
|
|
)
|
|
if err != nil {
|
|
return errors.Wrap(err, "error registering event handler")
|
|
}
|
|
|
|
podHandler, err := podInformer.AddEventHandler(
|
|
cache.ResourceEventHandlerFuncs{
|
|
UpdateFunc: func(_, obj any) {
|
|
pod := obj.(*corev1api.Pod)
|
|
if pod.Namespace != ms.namespace || pod.Name != ms.thisPod {
|
|
return
|
|
}
|
|
|
|
if pod.Status.Phase == corev1api.PodSucceeded || pod.Status.Phase == corev1api.PodFailed {
|
|
ms.podCh <- pod
|
|
}
|
|
},
|
|
},
|
|
)
|
|
if err != nil {
|
|
return errors.Wrap(err, "error registering pod handler")
|
|
}
|
|
|
|
if err := ms.reEnsureThisPod(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
ms.eventInformer = eventInformer
|
|
ms.podInformer = podInformer
|
|
ms.eventHandler = eventHandler
|
|
ms.podHandler = podHandler
|
|
|
|
ms.ctx, ms.cancel = context.WithCancel(ctx)
|
|
|
|
ms.log.WithFields(
|
|
logrus.Fields{
|
|
"taskType": ms.taskType,
|
|
"taskName": ms.taskName,
|
|
"thisPod": ms.thisPod,
|
|
}).Info("MicroServiceBR is initialized")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ms *microServiceBRWatcher) Close(ctx context.Context) {
|
|
if ms.cancel != nil {
|
|
ms.cancel()
|
|
}
|
|
|
|
ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("Closing MicroServiceBR")
|
|
|
|
ms.wgWatcher.Wait()
|
|
|
|
ms.close()
|
|
|
|
ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed")
|
|
}
|
|
|
|
func (ms *microServiceBRWatcher) close() {
|
|
ms.watcherLock.Lock()
|
|
defer ms.watcherLock.Unlock()
|
|
|
|
if ms.eventHandler != nil {
|
|
if err := ms.eventInformer.RemoveEventHandler(ms.eventHandler); err != nil {
|
|
ms.log.WithError(err).Warn("Failed to remove event handler")
|
|
}
|
|
|
|
ms.eventHandler = nil
|
|
}
|
|
|
|
if ms.podHandler != nil {
|
|
if err := ms.podInformer.RemoveEventHandler(ms.podHandler); err != nil {
|
|
ms.log.WithError(err).Warn("Failed to remove pod handler")
|
|
}
|
|
|
|
ms.podHandler = nil
|
|
}
|
|
}
|
|
|
|
func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig map[string]string, param any) error {
|
|
ms.log.Infof("Start watching backup ms for source %v", source.ByPath)
|
|
|
|
ms.startWatch()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ms *microServiceBRWatcher) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error {
|
|
ms.log.Infof("Start watching restore ms to target %s, from snapshot %s", target.ByPath, snapshotID)
|
|
|
|
ms.startWatch()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ms *microServiceBRWatcher) reEnsureThisPod(ctx context.Context) error {
|
|
thisPod := &corev1api.Pod{}
|
|
if err := ms.client.Get(ctx, types.NamespacedName{
|
|
Namespace: ms.namespace,
|
|
Name: ms.thisPod,
|
|
}, thisPod); err != nil {
|
|
return errors.Wrapf(err, "error getting this pod %s", ms.thisPod)
|
|
}
|
|
|
|
if thisPod.Status.Phase == corev1api.PodSucceeded || thisPod.Status.Phase == corev1api.PodFailed {
|
|
ms.podCh <- thisPod
|
|
ms.log.WithField("this pod", ms.thisPod).Infof("This pod comes to terminital status %s before watch start", thisPod.Status.Phase)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var funcGetPodTerminationMessage = kube.GetPodContainerTerminateMessage
|
|
var funcRedirectLog = redirectDataMoverLogs
|
|
var funcGetResultFromMessage = getResultFromMessage
|
|
var funcGetProgressFromMessage = getProgressFromMessage
|
|
|
|
var eventWaitTimeout = time.Minute
|
|
|
|
func (ms *microServiceBRWatcher) startWatch() {
|
|
ms.wgWatcher.Add(1)
|
|
|
|
go func() {
|
|
ms.log.Info("Start watching data path pod")
|
|
|
|
defer func() {
|
|
ms.close()
|
|
ms.wgWatcher.Done()
|
|
}()
|
|
|
|
var lastPod *corev1api.Pod
|
|
|
|
watchLoop:
|
|
for {
|
|
select {
|
|
case <-ms.ctx.Done():
|
|
break watchLoop
|
|
case pod := <-ms.podCh:
|
|
lastPod = pod
|
|
break watchLoop
|
|
case evt := <-ms.eventCh:
|
|
ms.onEvent(evt)
|
|
}
|
|
}
|
|
|
|
if lastPod == nil {
|
|
ms.log.Warn("Watch loop is canceled on waiting data path pod")
|
|
return
|
|
}
|
|
|
|
epilogLoop:
|
|
for !ms.startedFromEvent || !ms.terminatedFromEvent {
|
|
select {
|
|
case <-ms.ctx.Done():
|
|
ms.log.Warn("Watch loop is canceled on waiting final event")
|
|
return
|
|
case <-time.After(eventWaitTimeout):
|
|
break epilogLoop
|
|
case evt := <-ms.eventCh:
|
|
ms.onEvent(evt)
|
|
}
|
|
}
|
|
|
|
terminateMessage := funcGetPodTerminationMessage(lastPod, ms.thisContainer)
|
|
|
|
logger := ms.log.WithField("data path pod", lastPod.Name)
|
|
|
|
logger.Infof("Finish waiting data path pod, phase %s, message %s", lastPod.Status.Phase, terminateMessage)
|
|
|
|
if !ms.startedFromEvent {
|
|
logger.Warn("VGDP seems not started")
|
|
}
|
|
|
|
if ms.startedFromEvent && !ms.terminatedFromEvent {
|
|
logger.Warn("VGDP started but termination event is not received")
|
|
}
|
|
|
|
logger.Info("Recording data path pod logs")
|
|
|
|
if err := funcRedirectLog(ms.ctx, ms.kubeClient, ms.namespace, lastPod.Name, ms.thisContainer, ms.log); err != nil {
|
|
logger.WithError(err).Warn("Failed to collect data mover logs")
|
|
}
|
|
|
|
logger.Info("Calling callback on data path pod termination")
|
|
|
|
if lastPod.Status.Phase == corev1api.PodSucceeded {
|
|
result := funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log)
|
|
ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, getCompletionProgressFromResult(ms.taskType, result))
|
|
ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, result)
|
|
} else {
|
|
if strings.HasSuffix(terminateMessage, ErrCancelled) {
|
|
ms.callbacks.OnCancelled(ms.ctx, ms.namespace, ms.taskName)
|
|
} else {
|
|
ms.callbacks.OnFailed(ms.ctx, ms.namespace, ms.taskName, errors.New(terminateMessage))
|
|
}
|
|
}
|
|
|
|
logger.Info("Complete callback on data path pod termination")
|
|
}()
|
|
}
|
|
|
|
func (ms *microServiceBRWatcher) onEvent(evt *corev1api.Event) {
|
|
switch evt.Reason {
|
|
case EventReasonStarted:
|
|
ms.startedFromEvent = true
|
|
ms.log.Infof("Received data path start message: %s", evt.Message)
|
|
case EventReasonProgress:
|
|
ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log))
|
|
case EventReasonCompleted:
|
|
ms.log.Infof("Received data path completed message: %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log))
|
|
case EventReasonCancelled:
|
|
ms.log.Infof("Received data path canceled message: %s", evt.Message)
|
|
case EventReasonFailed:
|
|
ms.log.Infof("Received data path failed message: %s", evt.Message)
|
|
case EventReasonCancelling:
|
|
ms.log.Infof("Received data path canceling message: %s", evt.Message)
|
|
case EventReasonStopped:
|
|
ms.terminatedFromEvent = true
|
|
ms.log.Infof("Received data path stop message: %s", evt.Message)
|
|
default:
|
|
ms.log.Infof("Received event for data path %s, reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message)
|
|
}
|
|
}
|
|
|
|
func getResultFromMessage(taskType string, message string, logger logrus.FieldLogger) Result {
|
|
result := Result{}
|
|
|
|
if taskType == TaskTypeBackup {
|
|
backupResult := BackupResult{}
|
|
err := json.Unmarshal([]byte(message), &backupResult)
|
|
if err != nil {
|
|
logger.WithError(err).Errorf("Failed to unmarshal result message %s", message)
|
|
} else {
|
|
result.Backup = backupResult
|
|
}
|
|
} else {
|
|
restoreResult := RestoreResult{}
|
|
err := json.Unmarshal([]byte(message), &restoreResult)
|
|
if err != nil {
|
|
logger.WithError(err).Errorf("Failed to unmarshal result message %s", message)
|
|
} else {
|
|
result.Restore = restoreResult
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func getProgressFromMessage(message string, logger logrus.FieldLogger) *uploader.Progress {
|
|
progress := &uploader.Progress{}
|
|
err := json.Unmarshal([]byte(message), progress)
|
|
if err != nil {
|
|
logger.WithError(err).Debugf("Failed to unmarshal progress message %s", message)
|
|
}
|
|
|
|
return progress
|
|
}
|
|
|
|
func getCompletionProgressFromResult(taskType string, result Result) *uploader.Progress {
|
|
progress := &uploader.Progress{}
|
|
if taskType == TaskTypeBackup {
|
|
progress.BytesDone = result.Backup.TotalBytes
|
|
progress.TotalBytes = result.Backup.TotalBytes
|
|
} else {
|
|
progress.BytesDone = result.Restore.TotalBytes
|
|
progress.TotalBytes = result.Restore.TotalBytes
|
|
}
|
|
|
|
return progress
|
|
}
|
|
|
|
func (ms *microServiceBRWatcher) Cancel() {
|
|
ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is canceled")
|
|
}
|
|
|
|
var funcCreateTemp = os.CreateTemp
|
|
var funcCollectPodLogs = kube.CollectPodLogs
|
|
|
|
func redirectDataMoverLogs(ctx context.Context, kubeClient kubernetes.Interface, namespace string, thisPod string, thisContainer string, logger logrus.FieldLogger) error {
|
|
logger.Infof("Starting to collect data mover pod log for %s", thisPod)
|
|
|
|
logFile, err := funcCreateTemp("", "")
|
|
if err != nil {
|
|
return errors.Wrap(err, "error to create temp file for data mover pod log")
|
|
}
|
|
|
|
defer logFile.Close()
|
|
|
|
logFileName := logFile.Name()
|
|
logger.Infof("Created log file %s", logFileName)
|
|
|
|
err = funcCollectPodLogs(ctx, kubeClient.CoreV1(), thisPod, namespace, thisContainer, logFile)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "error to collect logs to %s for data mover pod %s", logFileName, thisPod)
|
|
}
|
|
|
|
logFile.Close()
|
|
|
|
logger.Infof("Redirecting to log file %s", logFileName)
|
|
|
|
hookLogger := logger.WithField(logging.LogSourceKey, logFileName)
|
|
hookLogger.Logln(logging.ListeningLevel, logging.ListeningMessage)
|
|
|
|
logger.Infof("Completed to collect data mover pod log for %s", thisPod)
|
|
|
|
return nil
|
|
}
|