mirror of https://github.com/milvus-io/milvus.git
80 lines
2.7 KiB
Go
80 lines
2.7 KiB
Go
package flusherimpl
|
|
|
|
import (
|
|
"context"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/internal/flushcommon/pipeline"
|
|
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
|
|
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/adaptor"
|
|
)
|
|
|
|
// newDataSyncServiceWrapper creates a new data sync service wrapper.
|
|
func newDataSyncServiceWrapper(
|
|
channelName string,
|
|
input chan<- *msgstream.MsgPack,
|
|
ds *pipeline.DataSyncService,
|
|
channelCheckpointTimeTick uint64,
|
|
) *dataSyncServiceWrapper {
|
|
handler := adaptor.NewBaseMsgPackAdaptorHandler()
|
|
return &dataSyncServiceWrapper{
|
|
channelName: channelName,
|
|
input: input,
|
|
handler: handler,
|
|
ds: ds,
|
|
channelCheckpointTimeTick: channelCheckpointTimeTick,
|
|
}
|
|
}
|
|
|
|
// dataSyncServiceWrapper wraps DataSyncService and related input channel.
|
|
type dataSyncServiceWrapper struct {
|
|
channelName string
|
|
channelCheckpointTimeTick uint64
|
|
input chan<- *msgstream.MsgPack
|
|
handler *adaptor.BaseMsgPackAdaptorHandler
|
|
ds *pipeline.DataSyncService
|
|
}
|
|
|
|
// Start starts the data sync service.
|
|
func (ds *dataSyncServiceWrapper) Start() {
|
|
ds.ds.Start()
|
|
}
|
|
|
|
// HandleMessage handles the incoming message.
|
|
func (ds *dataSyncServiceWrapper) HandleMessage(ctx context.Context, msg message.ImmutableMessage) error {
|
|
ds.handler.GenerateMsgPack(msg)
|
|
for ds.handler.PendingMsgPack.Len() > 0 {
|
|
next := ds.handler.PendingMsgPack.Next()
|
|
nextTsMsg := msgstream.MustBuildMsgPackFromConsumeMsgPack(next, adaptor.UnmashalerDispatcher)
|
|
|
|
// filter out the message less than vchannel level checkpoint.
|
|
if nextTsMsg.EndTs < ds.channelCheckpointTimeTick {
|
|
ds.handler.Logger.Debug("skip the message less than vchannel checkpoint",
|
|
zap.Uint64("timestamp", nextTsMsg.EndTs),
|
|
zap.Uint64("checkpoint", ds.channelCheckpointTimeTick),
|
|
)
|
|
ds.handler.PendingMsgPack.UnsafeAdvance()
|
|
continue
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case ds.input <- nextTsMsg:
|
|
// The input channel will never get stuck because the data sync service will consume the message continuously.
|
|
ds.handler.PendingMsgPack.UnsafeAdvance()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close close the input channel and gracefully close the data sync service.
|
|
func (ds *dataSyncServiceWrapper) Close() {
|
|
// The input channel should be closed first, otherwise the flowgraph in datasync service will be blocked.
|
|
close(ds.input)
|
|
ds.ds.GracefullyClose()
|
|
resource.Resource().WriteBufferManager().RemoveChannel(ds.channelName)
|
|
}
|