enhance: [hotfix] Reject compaction task with growing segments (#28953)

Cherry-pick from 2.3
pr: #28927
See also #28924
The compaction task generated before datanode finish SaveBinlogPath grpc
call contains segments which are still in Growing state
DataNode shall verify each non-levelzero segments before submit
compaction task to executor

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/28972/head
congqixia 2023-12-05 09:56:33 +08:00 committed by GitHub
parent cacc018ece
commit a0c5b67d18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 24 deletions

View File

@ -75,7 +75,7 @@ type Channel interface {
listNewSegmentsStartPositions() []*datapb.SegmentStartPosition
transferNewSegments(segmentIDs []UniqueID)
updateSegmentPKRange(segID UniqueID, ids storage.FieldData)
mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID)
mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error
listCompactedSegmentIDs() map[UniqueID][]UniqueID
listSegmentIDsToSync(ts Timestamp) []UniqueID
@ -575,6 +575,10 @@ func (c *ChannelMeta) hasSegment(segID UniqueID, countFlushed bool) bool {
c.segMu.RLock()
defer c.segMu.RUnlock()
return c.hasSegmentInternal(segID, countFlushed)
}
func (c *ChannelMeta) hasSegmentInternal(segID UniqueID, countFlushed bool) bool {
seg, ok := c.segments[segID]
if !ok {
return false
@ -677,7 +681,7 @@ func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schem
return c.collSchema, nil
}
func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) {
func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error {
log := log.Ctx(ctx).With(
zap.Int64("segmentID", seg.segmentID),
zap.Int64("collectionID", seg.collectionID),
@ -686,11 +690,28 @@ func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, pl
zap.Int64("planID", planID),
zap.String("channelName", c.channelName))
c.segMu.Lock()
defer c.segMu.Unlock()
var inValidSegments []UniqueID
for _, ID := range compactedFrom {
// no such segments in channel or the segments are unflushed.
if !c.hasSegment(ID, true) || c.hasSegment(ID, false) {
inValidSegments = append(inValidSegments, ID)
for _, segID := range compactedFrom {
seg, ok := c.segments[segID]
if !ok {
inValidSegments = append(inValidSegments, segID)
continue
}
// compacted
if !seg.isValid() {
inValidSegments = append(inValidSegments, segID)
continue
}
if seg.notFlushed() {
log.Warn("segment is not flushed, skip mergeFlushedSegments",
zap.Int64("segmentID", segID),
zap.String("segType", seg.getType().String()),
)
return merr.WrapErrSegmentNotFound(segID, "segment in flush state not found")
}
}
@ -700,8 +721,6 @@ func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, pl
}
log.Info("merge flushed segments")
c.segMu.Lock()
defer c.segMu.Unlock()
for _, ID := range compactedFrom {
// the existent of the segments are already checked
@ -718,6 +737,8 @@ func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, pl
seg.setType(datapb.SegmentType_Flushed)
c.segments[seg.segmentID] = seg
}
return nil
}
// for tests only

View File

@ -679,7 +679,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
collectionID: 1,
numRows: 15,
}},
{"segment exists but not flushed", true, true, []UniqueID{1, 4}, []UniqueID{1}, &Segment{
{"segment exists but not flushed", true, false, []UniqueID{1, 4}, []UniqueID{1}, &Segment{
segmentID: 3,
collectionID: 1,
numRows: 15,

View File

@ -849,6 +849,11 @@ func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) fl
func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMetaFunc {
return func(pack *segmentFlushPack) {
log := log.Ctx(context.Background()).With(
zap.Int64("segmentID", pack.segmentID),
zap.Int64("collectionID", dsService.collectionID),
zap.String("vchannel", dsService.vchannelName),
)
if pack.err != nil {
log.Error("flush pack with error, DataNode quit now", zap.Error(pack.err))
// TODO silverxia change to graceful stop datanode
@ -883,13 +888,11 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
log.Info("SaveBinlogPath",
zap.Int64("SegmentID", pack.segmentID),
zap.Int64("CollectionID", dsService.collectionID),
zap.Any("startPos", startPos),
zap.Any("checkPoints", checkPoints),
zap.Int("Length of Field2BinlogPaths", len(fieldInsert)),
zap.Int("Length of Field2Stats", len(fieldStats)),
zap.Int("Length of Field2Deltalogs", len(deltaInfos[0].GetBinlogs())),
zap.String("vChannelName", dsService.vchannelName),
)
req := &datapb.SaveBinlogPathsRequest{
@ -916,16 +919,14 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
// Segment not found during stale segment flush. Segment might get compacted already.
// Stop retry and still proceed to the end, ignoring this error.
if !pack.flushed && errors.Is(err, merr.ErrSegmentNotFound) {
log.Warn("stale segment not found, could be compacted",
zap.Int64("segmentID", pack.segmentID))
log.Warn("stale segment not found, could be compacted")
log.Warn("failed to SaveBinlogPaths",
zap.Int64("segmentID", pack.segmentID),
zap.Error(err))
return nil
}
// meta error, datanode handles a virtual channel does not belong here
if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrChannelNotFound) {
log.Warn("meta error found, skip sync and start to drop virtual channel", zap.String("channel", dsService.vchannelName))
log.Warn("meta error found, skip sync and start to drop virtual channel")
return nil
}
@ -940,7 +941,6 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
}, opts...)
if err != nil {
log.Warn("failed to SaveBinlogPaths",
zap.Int64("segmentID", pack.segmentID),
zap.Error(err))
// TODO change to graceful stop
panic(err)
@ -960,6 +960,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
segment := dsService.channel.getSegment(req.GetSegmentID())
dsService.channel.updateSingleSegmentMemorySize(req.GetSegmentID())
segment.setSyncing(false)
// dsService.channel.saveBinlogPath(fieldStats)
log.Info("successfully save binlog")
}
}

View File

@ -275,6 +275,16 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil
}
for _, segment := range req.GetSegmentBinlogs() {
segmentInfo := ds.channel.getSegment(segment.GetSegmentID())
if segmentInfo == nil || segmentInfo.getType() != datapb.SegmentType_Flushed {
log.Warn("compaction plan contains segment which is not flushed or missing",
zap.Int64("segmentID", segment.GetSegmentID()),
)
return merr.Status(merr.WrapErrSegmentNotFound(segment.GetSegmentID(), "segment in flushed state not found")), nil
}
}
binlogIO := &binlogIO{node.chunkManager, ds.idAllocator}
task := newCompactionTask(
node.ctx,
@ -331,10 +341,12 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac
// SyncSegments called by DataCoord, sync the compacted segments' meta between DC and DN
func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegmentsRequest) (*commonpb.Status, error) {
log.Ctx(ctx).Info("DataNode receives SyncSegments",
log := log.Ctx(ctx).With(
zap.Int64("planID", req.GetPlanID()),
zap.Int64("target segmentID", req.GetCompactedTo()),
zap.Int64s("compacted from", req.GetCompactedFrom()),
zap.Int64("targetSegmentID", req.GetCompactedTo()),
zap.Int64s("compactedFrom", req.GetCompactedFrom()),
)
log.Info("DataNode receives SyncSegments",
zap.Int64("numOfRows", req.GetNumOfRows()),
)
@ -356,21 +368,24 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
)
for _, fromSegment := range req.GetCompactedFrom() {
log := log.With(
zap.Int64("segment", fromSegment),
)
channel, err = node.flowgraphManager.getChannel(fromSegment)
if err != nil {
log.Ctx(ctx).Warn("fail to get the channel", zap.Int64("segment", fromSegment), zap.Error(err))
log.Warn("fail to get the channel", zap.Error(err))
continue
}
ds, ok = node.flowgraphManager.getFlowgraphService(channel.getChannelName(fromSegment))
if !ok {
log.Ctx(ctx).Warn("fail to find flow graph service", zap.Int64("segment", fromSegment))
log.Warn("fail to find flow graph service")
continue
}
oneSegment = fromSegment
break
}
if oneSegment == 0 {
log.Ctx(ctx).Warn("no valid segment, maybe the request is a retry")
log.Warn("no valid segment, maybe the request is a retry")
return merr.Success(), nil
}
@ -390,7 +405,12 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
ds.fg.Blockall()
defer ds.fg.Unblock()
channel.mergeFlushedSegments(ctx, targetSeg, req.GetPlanID(), req.GetCompactedFrom())
err = channel.mergeFlushedSegments(ctx, targetSeg, req.GetPlanID(), req.GetCompactedFrom())
if err != nil {
log.Warn("mergeFlushedSegments fail", zap.Error(err))
return merr.Status(err), nil
}
node.compactionExecutor.injectDone(req.GetPlanID(), true)
return merr.Success(), nil
}

View File

@ -658,6 +658,84 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() {
})
}
func (s *DataNodeServicesSuite) TestCompaction() {
chanName := "fake-by-dev-rootcoord-dml-test-syncsegments-1"
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: chanName,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{100, 200, 300},
}, nil, genTestTickler())
s.Require().NoError(err)
fg, ok := s.node.flowgraphManager.getFlowgraphService(chanName)
s.Assert().True(ok)
s1 := Segment{segmentID: 100, collectionID: 1}
s2 := Segment{segmentID: 200, collectionID: 1}
s3 := Segment{segmentID: 300, collectionID: 1}
s4 := Segment{segmentID: 400, collectionID: 1}
s1.setType(datapb.SegmentType_Flushed)
s2.setType(datapb.SegmentType_Flushed)
s3.setType(datapb.SegmentType_Flushed)
s4.setType(datapb.SegmentType_Normal)
fg.channel.(*ChannelMeta).segments = map[UniqueID]*Segment{
s1.segmentID: &s1,
s2.segmentID: &s2,
s3.segmentID: &s3,
s4.segmentID: &s4,
}
s.Run("datanode_not_serviceable", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := &DataNode{}
node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
node.stateCode.Store(commonpb.StateCode_Abnormal)
resp, err := node.Compaction(ctx, &datapb.CompactionPlan{
PlanID: 1000,
})
s.NoError(err)
s.False(merr.Ok(resp))
})
s.Run("channel_not_match", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := s.node
resp, err := node.Compaction(ctx, &datapb.CompactionPlan{
PlanID: 1000,
Channel: chanName + "another",
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: s1.segmentID},
},
})
s.NoError(err)
s.False(merr.Ok(resp))
})
s.Run("plan_contains_growing", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := s.node
resp, err := node.Compaction(ctx, &datapb.CompactionPlan{
PlanID: 1000,
Channel: chanName,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: s4.segmentID},
},
})
s.NoError(err)
s.False(merr.Ok(resp))
})
}
func (s *DataNodeServicesSuite) TestSyncSegments() {
chanName := "fake-by-dev-rootcoord-dml-test-syncsegments-1"