From 8780d65b6602d63fa7d5d431888af70b4ba739ef Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Mon, 22 Jan 2024 14:36:55 +0800 Subject: [PATCH] fix: Use channel cp as the dml&start position for import segments (#30107) This PR discontinuing the subscription to the mq and, instead, employing the channel checkpoint as the DML and starting position for the import segments. issue: https://github.com/milvus-io/milvus/issues/30106 Signed-off-by: bigsheeper --- internal/datacoord/server_test.go | 16 ++++++ internal/datacoord/services.go | 19 +++++--- internal/datanode/data_sync_service_test.go | 16 ------ internal/datanode/services.go | 54 +++++---------------- internal/datanode/services_test.go | 1 - internal/proto/data_coord.proto | 2 +- 6 files changed, 42 insertions(+), 66 deletions(-) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 351b1e535e..36fcd28b7c 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -3207,6 +3207,12 @@ func TestDataCoord_SaveImportSegment(t *testing.T) { assert.NoError(t, err) err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "ch1", CollectionID: 100}) assert.NoError(t, err) + err = svr.meta.UpdateChannelCheckpoint("ch1", &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, + Timestamp: 1000, + }) + assert.NoError(t, err) status, err := svr.SaveImportSegment(context.TODO(), &datapb.SaveImportSegmentRequest{ SegmentId: 100, @@ -3230,6 +3236,16 @@ func TestDataCoord_SaveImportSegment(t *testing.T) { SegmentID: 100, }, }, + CheckPoints: []*datapb.CheckPoint{ + { + SegmentID: 100, + Position: &msgpb.MsgPosition{ + ChannelName: "ch1", + Timestamp: 1, + }, + NumOfRows: int64(1), + }, + }, }, }) assert.NoError(t, err) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index d73dda8762..09ec15aab4 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1528,16 +1528,16 @@ func getDiff(base, remove []int64) []int64 { func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSegmentRequest) (*commonpb.Status, error) { log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionId()), - ) - log.Info("DataCoord putting segment to the right DataNode and saving binlog path", zap.Int64("segmentID", req.GetSegmentId()), zap.Int64("partitionID", req.GetPartitionId()), zap.String("channelName", req.GetChannelName()), - zap.Int64("# of rows", req.GetRowNum())) + zap.Int64("# of rows", req.GetRowNum()), + ) + log.Info("DataCoord putting segment to the right DataNode and saving binlog path") if err := merr.CheckHealthy(s.GetStateCode()); err != nil { return merr.Status(err), nil } - resp, err := s.cluster.AddImportSegment(ctx, + _, err := s.cluster.AddImportSegment(ctx, &datapb.AddImportSegmentRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithTimeStamp(req.GetBase().GetTimestamp()), @@ -1554,8 +1554,15 @@ func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSe return merr.Status(err), nil } - // Fill in start position message ID. - req.SaveBinlogPathReq.StartPositions[0].StartPosition.MsgID = resp.GetChannelPos() + // Fill in position message ID by channel checkpoint. + channelCP := s.meta.GetChannelCheckpoint(req.GetChannelName()) + if channelCP == nil { + log.Warn("SaveImportSegment get nil channel checkpoint") + return merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("nil checkpoint when saving import segment, segmentID=%d, channel=%s", + req.GetSegmentId(), req.GetChannelName()))), nil + } + req.SaveBinlogPathReq.StartPositions[0].StartPosition.MsgID = channelCP.GetMsgID() + req.SaveBinlogPathReq.CheckPoints[0].Position.MsgID = channelCP.GetMsgID() // Start saving bin log paths. rsp, err := s.SaveBinlogPaths(context.Background(), req.GetSaveBinlogPathReq()) diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 566bed3e0d..f04ab8a646 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -24,7 +24,6 @@ import ( "math" "math/rand" "testing" - "time" "github.com/samber/lo" "github.com/stretchr/testify/assert" @@ -249,21 +248,6 @@ func TestBytesReader(t *testing.T) { assert.Equal(t, int8(100), dataInt8) } -func TestGetChannelLatestMsgID(t *testing.T) { - delay := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, cancel := context.WithDeadline(context.Background(), delay) - defer cancel() - node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) - - dmlChannelName := "fake-by-dev-rootcoord-dml-channel_12345v0" - - insertStream, _ := node.factory.NewMsgStream(ctx) - insertStream.AsProducer([]string{dmlChannelName}) - id, err := node.getChannelLatestMsgID(ctx, dmlChannelName, 0) - assert.NoError(t, err) - assert.NotNil(t, id) -} - func TestGetChannelWithTickler(t *testing.T) { channelName := "by-dev-rootcoord-dml-0" info := getWatchInfoByOpID(100, channelName, datapb.ChannelWatchState_ToWatch) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index accbb04c34..2c7b1a01d9 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -45,9 +45,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/util/commonpbutil" - "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/metricsinfo" @@ -562,19 +560,6 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor }, }, nil } - // Get the current dml channel position ID, that will be used in segments start positions and end positions. - var posID []byte - err = retry.Do(ctx, func() error { - id, innerError := node.getChannelLatestMsgID(context.Background(), req.GetChannelName(), req.GetSegmentId()) - posID = id - return innerError - }, retry.Attempts(30)) - - if err != nil { - return &datapb.AddImportSegmentResponse{ - Status: merr.Status(err), - }, nil - } // Add the new segment to the channel. if len(ds.metacache.GetSegmentIDsBy(metacache.WithSegmentIDs(req.GetSegmentId()), metacache.WithSegmentState(commonpb.SegmentState_Flushed))) == 0 { log.Info("adding a new segment to channel", logFields...) @@ -607,12 +592,10 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor Statslogs: req.GetStatsLog(), StartPosition: &msgpb.MsgPosition{ ChannelName: req.GetChannelName(), - MsgID: posID, Timestamp: req.GetBase().GetTimestamp(), }, DmlPosition: &msgpb.MsgPosition{ ChannelName: req.GetChannelName(), - MsgID: posID, Timestamp: req.GetBase().GetTimestamp(), }, }, func(info *datapb.SegmentInfo) *metacache.BloomFilterSet { @@ -622,33 +605,10 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor } return &datapb.AddImportSegmentResponse{ - Status: merr.Success(), - ChannelPos: posID, + Status: merr.Success(), }, nil } -func (node *DataNode) getChannelLatestMsgID(ctx context.Context, channelName string, segmentID int64) ([]byte, error) { - pChannelName := funcutil.ToPhysicalChannel(channelName) - dmlStream, err := node.factory.NewMsgStream(ctx) - if err != nil { - return nil, err - } - defer dmlStream.Close() - - subName := fmt.Sprintf("datanode-%d-%s-%d", paramtable.GetNodeID(), channelName, segmentID) - log.Debug("dataSyncService register consumer for getChannelLatestMsgID", - zap.String("pChannelName", pChannelName), - zap.String("subscription", subName), - ) - dmlStream.AsConsumer(ctx, []string{pChannelName}, subName, mqwrapper.SubscriptionPositionUnknown) - id, err := dmlStream.GetLatestMsgID(pChannelName) - if err != nil { - log.Error("fail to GetLatestMsgID", zap.String("pChannelName", pChannelName), zap.Error(err)) - return nil, err - } - return id.Serialize(), nil -} - func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil.AssignSegmentFunc { return func(shardID int, partID int64) (int64, string, error) { chNames := req.GetImportTask().GetChannelNames() @@ -791,6 +751,16 @@ func saveSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest, res *rootcoo SegmentID: segmentID, }, }, + CheckPoints: []*datapb.CheckPoint{ + { + SegmentID: segmentID, + Position: &msgpb.MsgPosition{ + ChannelName: targetChName, + Timestamp: ts, + }, + NumOfRows: rowCount, + }, + }, Importing: true, }, }) @@ -802,7 +772,7 @@ func saveSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest, res *rootcoo return err } return nil - }) + }, retry.Attempts(60)) // about 3min if err != nil { log.Warn("failed to save import segment", zap.Error(err)) return err diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 67b5af1e05..554ff8f99f 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -756,7 +756,6 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() { s.Assert().NoError(err) s.Assert().True(merr.Ok(resp.GetStatus())) s.Assert().Equal("", resp.GetStatus().GetReason()) - s.Assert().NotEqual(nil, resp.GetChannelPos()) getFlowGraphServiceAttempts = 3 resp, err = s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{ diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 2ca14da53b..da63ccf410 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -679,7 +679,7 @@ message AddImportSegmentRequest { message AddImportSegmentResponse { common.Status status = 1; - bytes channel_pos = 2; + bytes channel_pos = 2; // deprecated } message SaveImportSegmentRequest {