fix: Use channel cp as the dml&start position for import segments (#30107)

This PR discontinuing the subscription to the mq and, instead, employing
the channel checkpoint as the DML and starting position for the import
segments.

issue: https://github.com/milvus-io/milvus/issues/30106

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/30166/head
yihao.dai 2024-01-22 14:36:55 +08:00 committed by GitHub
parent a81d2b4780
commit 8780d65b66
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 42 additions and 66 deletions

View File

@ -3207,6 +3207,12 @@ func TestDataCoord_SaveImportSegment(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "ch1", CollectionID: 100}) err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "ch1", CollectionID: 100})
assert.NoError(t, err) assert.NoError(t, err)
err = svr.meta.UpdateChannelCheckpoint("ch1", &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
Timestamp: 1000,
})
assert.NoError(t, err)
status, err := svr.SaveImportSegment(context.TODO(), &datapb.SaveImportSegmentRequest{ status, err := svr.SaveImportSegment(context.TODO(), &datapb.SaveImportSegmentRequest{
SegmentId: 100, SegmentId: 100,
@ -3230,6 +3236,16 @@ func TestDataCoord_SaveImportSegment(t *testing.T) {
SegmentID: 100, SegmentID: 100,
}, },
}, },
CheckPoints: []*datapb.CheckPoint{
{
SegmentID: 100,
Position: &msgpb.MsgPosition{
ChannelName: "ch1",
Timestamp: 1,
},
NumOfRows: int64(1),
},
},
}, },
}) })
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -1528,16 +1528,16 @@ func getDiff(base, remove []int64) []int64 {
func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSegmentRequest) (*commonpb.Status, error) { func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSegmentRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With( log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionId()), zap.Int64("collectionID", req.GetCollectionId()),
)
log.Info("DataCoord putting segment to the right DataNode and saving binlog path",
zap.Int64("segmentID", req.GetSegmentId()), zap.Int64("segmentID", req.GetSegmentId()),
zap.Int64("partitionID", req.GetPartitionId()), zap.Int64("partitionID", req.GetPartitionId()),
zap.String("channelName", req.GetChannelName()), zap.String("channelName", req.GetChannelName()),
zap.Int64("# of rows", req.GetRowNum())) zap.Int64("# of rows", req.GetRowNum()),
)
log.Info("DataCoord putting segment to the right DataNode and saving binlog path")
if err := merr.CheckHealthy(s.GetStateCode()); err != nil { if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return merr.Status(err), nil return merr.Status(err), nil
} }
resp, err := s.cluster.AddImportSegment(ctx, _, err := s.cluster.AddImportSegment(ctx,
&datapb.AddImportSegmentRequest{ &datapb.AddImportSegmentRequest{
Base: commonpbutil.NewMsgBase( Base: commonpbutil.NewMsgBase(
commonpbutil.WithTimeStamp(req.GetBase().GetTimestamp()), commonpbutil.WithTimeStamp(req.GetBase().GetTimestamp()),
@ -1554,8 +1554,15 @@ func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSe
return merr.Status(err), nil return merr.Status(err), nil
} }
// Fill in start position message ID. // Fill in position message ID by channel checkpoint.
req.SaveBinlogPathReq.StartPositions[0].StartPosition.MsgID = resp.GetChannelPos() channelCP := s.meta.GetChannelCheckpoint(req.GetChannelName())
if channelCP == nil {
log.Warn("SaveImportSegment get nil channel checkpoint")
return merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("nil checkpoint when saving import segment, segmentID=%d, channel=%s",
req.GetSegmentId(), req.GetChannelName()))), nil
}
req.SaveBinlogPathReq.StartPositions[0].StartPosition.MsgID = channelCP.GetMsgID()
req.SaveBinlogPathReq.CheckPoints[0].Position.MsgID = channelCP.GetMsgID()
// Start saving bin log paths. // Start saving bin log paths.
rsp, err := s.SaveBinlogPaths(context.Background(), req.GetSaveBinlogPathReq()) rsp, err := s.SaveBinlogPaths(context.Background(), req.GetSaveBinlogPathReq())

View File

@ -24,7 +24,6 @@ import (
"math" "math"
"math/rand" "math/rand"
"testing" "testing"
"time"
"github.com/samber/lo" "github.com/samber/lo"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -249,21 +248,6 @@ func TestBytesReader(t *testing.T) {
assert.Equal(t, int8(100), dataInt8) assert.Equal(t, int8(100), dataInt8)
} }
func TestGetChannelLatestMsgID(t *testing.T) {
delay := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), delay)
defer cancel()
node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
dmlChannelName := "fake-by-dev-rootcoord-dml-channel_12345v0"
insertStream, _ := node.factory.NewMsgStream(ctx)
insertStream.AsProducer([]string{dmlChannelName})
id, err := node.getChannelLatestMsgID(ctx, dmlChannelName, 0)
assert.NoError(t, err)
assert.NotNil(t, id)
}
func TestGetChannelWithTickler(t *testing.T) { func TestGetChannelWithTickler(t *testing.T) {
channelName := "by-dev-rootcoord-dml-0" channelName := "by-dev-rootcoord-dml-0"
info := getWatchInfoByOpID(100, channelName, datapb.ChannelWatchState_ToWatch) info := getWatchInfoByOpID(100, channelName, datapb.ChannelWatchState_ToWatch)

View File

@ -45,9 +45,7 @@ import (
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/metricsinfo"
@ -562,19 +560,6 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor
}, },
}, nil }, nil
} }
// Get the current dml channel position ID, that will be used in segments start positions and end positions.
var posID []byte
err = retry.Do(ctx, func() error {
id, innerError := node.getChannelLatestMsgID(context.Background(), req.GetChannelName(), req.GetSegmentId())
posID = id
return innerError
}, retry.Attempts(30))
if err != nil {
return &datapb.AddImportSegmentResponse{
Status: merr.Status(err),
}, nil
}
// Add the new segment to the channel. // Add the new segment to the channel.
if len(ds.metacache.GetSegmentIDsBy(metacache.WithSegmentIDs(req.GetSegmentId()), metacache.WithSegmentState(commonpb.SegmentState_Flushed))) == 0 { if len(ds.metacache.GetSegmentIDsBy(metacache.WithSegmentIDs(req.GetSegmentId()), metacache.WithSegmentState(commonpb.SegmentState_Flushed))) == 0 {
log.Info("adding a new segment to channel", logFields...) log.Info("adding a new segment to channel", logFields...)
@ -607,12 +592,10 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor
Statslogs: req.GetStatsLog(), Statslogs: req.GetStatsLog(),
StartPosition: &msgpb.MsgPosition{ StartPosition: &msgpb.MsgPosition{
ChannelName: req.GetChannelName(), ChannelName: req.GetChannelName(),
MsgID: posID,
Timestamp: req.GetBase().GetTimestamp(), Timestamp: req.GetBase().GetTimestamp(),
}, },
DmlPosition: &msgpb.MsgPosition{ DmlPosition: &msgpb.MsgPosition{
ChannelName: req.GetChannelName(), ChannelName: req.GetChannelName(),
MsgID: posID,
Timestamp: req.GetBase().GetTimestamp(), Timestamp: req.GetBase().GetTimestamp(),
}, },
}, func(info *datapb.SegmentInfo) *metacache.BloomFilterSet { }, func(info *datapb.SegmentInfo) *metacache.BloomFilterSet {
@ -622,33 +605,10 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor
} }
return &datapb.AddImportSegmentResponse{ return &datapb.AddImportSegmentResponse{
Status: merr.Success(), Status: merr.Success(),
ChannelPos: posID,
}, nil }, nil
} }
func (node *DataNode) getChannelLatestMsgID(ctx context.Context, channelName string, segmentID int64) ([]byte, error) {
pChannelName := funcutil.ToPhysicalChannel(channelName)
dmlStream, err := node.factory.NewMsgStream(ctx)
if err != nil {
return nil, err
}
defer dmlStream.Close()
subName := fmt.Sprintf("datanode-%d-%s-%d", paramtable.GetNodeID(), channelName, segmentID)
log.Debug("dataSyncService register consumer for getChannelLatestMsgID",
zap.String("pChannelName", pChannelName),
zap.String("subscription", subName),
)
dmlStream.AsConsumer(ctx, []string{pChannelName}, subName, mqwrapper.SubscriptionPositionUnknown)
id, err := dmlStream.GetLatestMsgID(pChannelName)
if err != nil {
log.Error("fail to GetLatestMsgID", zap.String("pChannelName", pChannelName), zap.Error(err))
return nil, err
}
return id.Serialize(), nil
}
func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil.AssignSegmentFunc { func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil.AssignSegmentFunc {
return func(shardID int, partID int64) (int64, string, error) { return func(shardID int, partID int64) (int64, string, error) {
chNames := req.GetImportTask().GetChannelNames() chNames := req.GetImportTask().GetChannelNames()
@ -791,6 +751,16 @@ func saveSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest, res *rootcoo
SegmentID: segmentID, SegmentID: segmentID,
}, },
}, },
CheckPoints: []*datapb.CheckPoint{
{
SegmentID: segmentID,
Position: &msgpb.MsgPosition{
ChannelName: targetChName,
Timestamp: ts,
},
NumOfRows: rowCount,
},
},
Importing: true, Importing: true,
}, },
}) })
@ -802,7 +772,7 @@ func saveSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest, res *rootcoo
return err return err
} }
return nil return nil
}) }, retry.Attempts(60)) // about 3min
if err != nil { if err != nil {
log.Warn("failed to save import segment", zap.Error(err)) log.Warn("failed to save import segment", zap.Error(err))
return err return err

View File

@ -756,7 +756,6 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() {
s.Assert().NoError(err) s.Assert().NoError(err)
s.Assert().True(merr.Ok(resp.GetStatus())) s.Assert().True(merr.Ok(resp.GetStatus()))
s.Assert().Equal("", resp.GetStatus().GetReason()) s.Assert().Equal("", resp.GetStatus().GetReason())
s.Assert().NotEqual(nil, resp.GetChannelPos())
getFlowGraphServiceAttempts = 3 getFlowGraphServiceAttempts = 3
resp, err = s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{ resp, err = s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{

View File

@ -679,7 +679,7 @@ message AddImportSegmentRequest {
message AddImportSegmentResponse { message AddImportSegmentResponse {
common.Status status = 1; common.Status status = 1;
bytes channel_pos = 2; bytes channel_pos = 2; // deprecated
} }
message SaveImportSegmentRequest { message SaveImportSegmentRequest {