mirror of https://github.com/milvus-io/milvus.git
229 lines
8.2 KiB
Go
229 lines
8.2 KiB
Go
package flusherimpl
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"github.com/samber/lo"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
|
"github.com/milvus-io/milvus/internal/flushcommon/broker"
|
|
"github.com/milvus-io/milvus/internal/flushcommon/util"
|
|
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
|
|
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
|
|
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/recovery"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/adaptor"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/options"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
|
|
)
|
|
|
|
var errChannelLifetimeUnrecoverable = errors.New("channel lifetime unrecoverable")
|
|
|
|
// RecoverWALFlusherParam is the parameter for building wal flusher.
|
|
type RecoverWALFlusherParam struct {
|
|
ChannelInfo types.PChannelInfo
|
|
WAL *syncutil.Future[wal.WAL]
|
|
RecoverySnapshot *recovery.RecoverySnapshot
|
|
RecoveryStorage recovery.RecoveryStorage
|
|
}
|
|
|
|
// RecoverWALFlusher recovers the wal flusher.
|
|
func RecoverWALFlusher(param *RecoverWALFlusherParam) *WALFlusherImpl {
|
|
flusher := &WALFlusherImpl{
|
|
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
|
|
wal: param.WAL,
|
|
logger: resource.Resource().Logger().With(
|
|
log.FieldComponent("flusher"),
|
|
zap.String("pchannel", param.ChannelInfo.String())),
|
|
metrics: newFlusherMetrics(param.ChannelInfo),
|
|
RecoveryStorage: param.RecoveryStorage,
|
|
}
|
|
go flusher.Execute(param.RecoverySnapshot)
|
|
return flusher
|
|
}
|
|
|
|
type WALFlusherImpl struct {
|
|
notifier *syncutil.AsyncTaskNotifier[struct{}]
|
|
wal *syncutil.Future[wal.WAL]
|
|
flusherComponents *flusherComponents
|
|
logger *log.MLogger
|
|
metrics *flusherMetrics
|
|
recovery.RecoveryStorage
|
|
}
|
|
|
|
// Execute starts the wal flusher.
|
|
func (impl *WALFlusherImpl) Execute(recoverSnapshot *recovery.RecoverySnapshot) (err error) {
|
|
defer func() {
|
|
impl.notifier.Finish(struct{}{})
|
|
if err == nil {
|
|
impl.logger.Info("wal flusher stop")
|
|
return
|
|
}
|
|
if !errors.Is(err, context.Canceled) {
|
|
impl.logger.DPanic("wal flusher stop to executing with unexpected error", zap.Error(err))
|
|
return
|
|
}
|
|
impl.logger.Warn("wal flusher is canceled before executing", zap.Error(err))
|
|
}()
|
|
|
|
impl.logger.Info("wal flusher start to recovery...")
|
|
l, err := impl.wal.GetWithContext(impl.notifier.Context())
|
|
if err != nil {
|
|
return errors.Wrap(err, "when get wal from future")
|
|
}
|
|
impl.logger.Info("wal ready for flusher recovery")
|
|
|
|
var checkpoint message.MessageID
|
|
impl.flusherComponents, checkpoint, err = impl.buildFlusherComponents(impl.notifier.Context(), l, recoverSnapshot)
|
|
if err != nil {
|
|
return errors.Wrap(err, "when build flusher components")
|
|
}
|
|
defer impl.flusherComponents.Close()
|
|
|
|
scanner, err := impl.generateScanner(impl.notifier.Context(), impl.wal.Get(), checkpoint)
|
|
if err != nil {
|
|
return errors.Wrap(err, "when generate scanner")
|
|
}
|
|
defer scanner.Close()
|
|
|
|
impl.logger.Info("wal flusher start to work")
|
|
impl.metrics.IntoState(flusherStateInWorking)
|
|
defer impl.metrics.IntoState(flusherStateOnClosing)
|
|
|
|
for {
|
|
select {
|
|
case <-impl.notifier.Context().Done():
|
|
return nil
|
|
case msg, ok := <-scanner.Chan():
|
|
if !ok {
|
|
impl.logger.Warn("wal flusher is closing for closed scanner channel, which is unexpected at graceful way")
|
|
return nil
|
|
}
|
|
impl.metrics.ObserveMetrics(msg.TimeTick())
|
|
if err := impl.dispatch(msg); err != nil {
|
|
// The error is always context canceled.
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close closes the wal flusher and release all related resources for it.
|
|
func (impl *WALFlusherImpl) Close() {
|
|
impl.notifier.Cancel()
|
|
impl.notifier.BlockUntilFinish()
|
|
|
|
impl.logger.Info("wal flusher start to close the recovery storage...")
|
|
impl.RecoveryStorage.Close()
|
|
impl.logger.Info("recovery storage closed")
|
|
|
|
impl.metrics.Close()
|
|
}
|
|
|
|
// buildFlusherComponents builds the components of the flusher.
|
|
func (impl *WALFlusherImpl) buildFlusherComponents(ctx context.Context, l wal.WAL, snapshot *recovery.RecoverySnapshot) (*flusherComponents, message.MessageID, error) {
|
|
// Get all existed vchannels of the pchannel.
|
|
vchannels := lo.Keys(snapshot.VChannels)
|
|
impl.logger.Info("fetch vchannel done", zap.Int("vchannelNum", len(vchannels)))
|
|
|
|
// Get all the recovery info of the recoverable vchannels.
|
|
recoverInfos, checkpoint, err := impl.getRecoveryInfos(ctx, vchannels)
|
|
if err != nil {
|
|
impl.logger.Warn("get recovery info failed", zap.Error(err))
|
|
return nil, nil, err
|
|
}
|
|
impl.logger.Info("fetch recovery info done", zap.Int("recoveryInfoNum", len(recoverInfos)))
|
|
|
|
mixc, err := resource.Resource().MixCoordClient().GetWithContext(ctx)
|
|
if err != nil {
|
|
impl.logger.Warn("flusher recovery is canceled before data coord client ready", zap.Error(err))
|
|
return nil, nil, err
|
|
}
|
|
impl.logger.Info("data coord client ready")
|
|
|
|
// build all components.
|
|
broker := broker.NewCoordBroker(mixc, paramtable.GetNodeID())
|
|
chunkManager := resource.Resource().ChunkManager()
|
|
|
|
cpUpdater := util.NewChannelCheckpointUpdaterWithCallback(broker, func(mp *msgpb.MsgPosition) {
|
|
messageID := adaptor.MustGetMessageIDFromMQWrapperIDBytes(l.WALName(), mp.MsgID)
|
|
impl.RecoveryStorage.UpdateFlusherCheckpoint(mp.ChannelName, &recovery.WALCheckpoint{
|
|
MessageID: messageID,
|
|
TimeTick: mp.Timestamp,
|
|
Magic: recovery.RecoveryMagicStreamingInitialized,
|
|
})
|
|
})
|
|
go cpUpdater.Start()
|
|
|
|
fc := &flusherComponents{
|
|
wal: l,
|
|
broker: broker,
|
|
cpUpdater: cpUpdater,
|
|
chunkManager: chunkManager,
|
|
dataServices: make(map[string]*dataSyncServiceWrapper),
|
|
logger: impl.logger,
|
|
recoveryCheckPointTimeTick: snapshot.Checkpoint.TimeTick,
|
|
rs: impl.RecoveryStorage,
|
|
}
|
|
impl.logger.Info("flusher components intiailizing done")
|
|
if err := fc.recover(ctx, recoverInfos); err != nil {
|
|
impl.logger.Warn("flusher recovery is canceled before recovery done, recycle the resource", zap.Error(err))
|
|
fc.Close()
|
|
impl.logger.Info("flusher recycle the resource done")
|
|
return nil, nil, err
|
|
}
|
|
impl.logger.Info("flusher recovery done")
|
|
return fc, checkpoint, nil
|
|
}
|
|
|
|
// generateScanner create a new scanner for the wal.
|
|
func (impl *WALFlusherImpl) generateScanner(ctx context.Context, l wal.WAL, checkpoint message.MessageID) (wal.Scanner, error) {
|
|
handler := make(adaptor.ChanMessageHandler, 64)
|
|
readOpt := wal.ReadOption{
|
|
VChannel: "", // We need consume all message from wal.
|
|
MesasgeHandler: handler,
|
|
DeliverPolicy: options.DeliverPolicyAll(),
|
|
}
|
|
if checkpoint != nil {
|
|
impl.logger.Info("wal start to scan from minimum checkpoint", zap.Stringer("checkpointMessageID", checkpoint))
|
|
readOpt.DeliverPolicy = options.DeliverPolicyStartFrom(checkpoint)
|
|
} else {
|
|
impl.logger.Info("wal start to scan from the earliest checkpoint")
|
|
}
|
|
return l.Read(ctx, readOpt)
|
|
}
|
|
|
|
// dispatch dispatches the message to the related handler for flusher components.
|
|
func (impl *WALFlusherImpl) dispatch(msg message.ImmutableMessage) (err error) {
|
|
// TODO: We will merge the flusher into recovery storage in future.
|
|
// Currently, flusher works as a separate component.
|
|
defer func() {
|
|
if err = impl.RecoveryStorage.ObserveMessage(impl.notifier.Context(), msg); err != nil {
|
|
impl.logger.Warn("failed to observe message", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
// Do the data sync service management here.
|
|
switch msg.MessageType() {
|
|
case message.MessageTypeCreateCollection:
|
|
createCollectionMsg, err := message.AsImmutableCreateCollectionMessageV1(msg)
|
|
if err != nil {
|
|
impl.logger.DPanic("the message type is not CreateCollectionMessage", zap.Error(err))
|
|
return nil
|
|
}
|
|
impl.flusherComponents.WhenCreateCollection(createCollectionMsg)
|
|
case message.MessageTypeDropCollection:
|
|
// defer to remove the data sync service from the components.
|
|
// TODO: Current drop collection message will be handled by the underlying data sync service.
|
|
defer func() {
|
|
impl.flusherComponents.WhenDropCollection(msg.VChannel())
|
|
}()
|
|
}
|
|
return impl.flusherComponents.HandleMessage(impl.notifier.Context(), msg)
|
|
}
|