fix: Reject compaction task with growing segments (#28925)

See also #28924
The compaction task generated before datanode finish `SaveBinlogPath`
grpc call contains segments which are still in Growing state DataNode
shall verify each non-levelzero segments before submit compaction task
to executor

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/28962/head
congqixia 2023-12-04 19:06:40 +08:00 committed by GitHub
parent e79c3eaa90
commit 393b1f943c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 129 additions and 0 deletions

View File

@ -235,6 +235,7 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
// Compaction handles compaction request from DataCoord
// returns status as long as compaction task enqueued or invalid
func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(zap.Int64("planID", req.GetPlanID()))
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
log.Warn("DataNode.Compaction failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err))
return merr.Status(err), nil
@ -251,6 +252,20 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil
}
meta := ds.metacache
for _, segment := range req.GetSegmentBinlogs() {
if segment.GetLevel() == datapb.SegmentLevel_L0 {
continue
}
_, ok := meta.GetSegmentByID(segment.GetSegmentID(), metacache.WithSegmentState(commonpb.SegmentState_Flushed))
if !ok {
log.Warn("compaction plan contains segment which is not flushed",
zap.Int64("segmentID", segment.GetSegmentID()),
)
return merr.Status(merr.WrapErrSegmentNotFound(segment.GetSegmentID(), "segment with flushed state not found")), nil
}
}
var task compactor
switch req.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:

View File

@ -212,6 +212,120 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() {
})
}
func (s *DataNodeServicesSuite) TestCompaction() {
dmChannelName := "by-dev-rootcoord-dml_0_100v0"
schema := &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64},
{FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64},
{FieldID: common.StartOfUserFieldID, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"},
{FieldID: common.StartOfUserFieldID + 1, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
}},
},
}
flushedSegmentID := int64(100)
growingSegmentID := int64(101)
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: dmChannelName,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
}
err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, vchan, schema, genTestTickler())
s.Require().NoError(err)
fgservice, ok := s.node.flowgraphManager.GetFlowgraphService(dmChannelName)
s.Require().True(ok)
metaCache := metacache.NewMockMetaCache(s.T())
metaCache.EXPECT().Collection().Return(1).Maybe()
metaCache.EXPECT().Schema().Return(schema).Maybe()
s.node.writeBufferManager.Register(dmChannelName, metaCache, nil)
fgservice.metacache.AddSegment(&datapb.SegmentInfo{
ID: flushedSegmentID,
CollectionID: 1,
PartitionID: 2,
StartPosition: &msgpb.MsgPosition{},
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() })
fgservice.metacache.AddSegment(&datapb.SegmentInfo{
ID: growingSegmentID,
CollectionID: 1,
PartitionID: 2,
StartPosition: &msgpb.MsgPosition{},
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() })
s.Run("service_not_ready", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := &DataNode{}
node.UpdateStateCode(commonpb.StateCode_Abnormal)
req := &datapb.CompactionPlan{
PlanID: 1000,
Channel: dmChannelName,
}
resp, err := node.Compaction(ctx, req)
s.NoError(err)
s.False(merr.Ok(resp))
})
s.Run("channel_not_match", func() {
node := s.node
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := &datapb.CompactionPlan{
PlanID: 1000,
Channel: dmChannelName + "other",
}
resp, err := node.Compaction(ctx, req)
s.NoError(err)
s.False(merr.Ok(resp))
})
s.Run("channel_dropped", func() {
node := s.node
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node.compactionExecutor.dropped.Insert(dmChannelName)
defer node.compactionExecutor.dropped.Remove(dmChannelName)
req := &datapb.CompactionPlan{
PlanID: 1000,
Channel: dmChannelName,
}
resp, err := node.Compaction(ctx, req)
s.NoError(err)
s.False(merr.Ok(resp))
})
s.Run("compact_growing_segment", func() {
node := s.node
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := &datapb.CompactionPlan{
PlanID: 1000,
Channel: dmChannelName,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 102, Level: datapb.SegmentLevel_L0},
{SegmentID: growingSegmentID, Level: datapb.SegmentLevel_L1},
},
}
resp, err := node.Compaction(ctx, req)
s.NoError(err)
s.False(merr.Ok(resp))
})
}
func (s *DataNodeServicesSuite) TestFlushSegments() {
dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-FlushSegments"
schema := &schemapb.CollectionSchema{