fix: [2.4]Sync dropped segment for dropped partition (#33332)

See also: #33330
pr: #33331

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/33412/head
XuanYang-cn 2024-05-27 17:57:43 +08:00 committed by GitHub
parent cbeb6c169d
commit 07b995fea4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 177 additions and 5 deletions

View File

@ -104,6 +104,10 @@ func (wNode *writeNode) Operate(in []Msg) []Msg {
wNode.wbManager.DropChannel(wNode.channelName)
}
if len(fgMsg.dropPartitions) > 0 {
wNode.wbManager.DropPartitions(wNode.channelName, fgMsg.dropPartitions)
}
// send delete msg to DeleteNode
return []Msg{&res}
}

View File

@ -67,6 +67,13 @@ func WithPartitionID(partitionID int64) SegmentFilter {
})
}
func WithPartitionIDs(partitionIDs []int64) SegmentFilter {
return SegmentFilterFunc(func(info *SegmentInfo) bool {
idSet := typeutil.NewSet(partitionIDs...)
return idSet.Contain(info.partitionID)
})
}
func WithSegmentState(states ...commonpb.SegmentState) SegmentFilter {
set := typeutil.NewSet(states...)
return SegmentFilterFunc(func(info *SegmentInfo) bool {

View File

@ -136,6 +136,10 @@ func (t *SyncTask) Run() (err error) {
var has bool
t.segment, has = t.metacache.GetSegmentByID(t.segmentID)
if !has {
if t.isDrop {
log.Info("segment dropped, discard sync task")
return nil
}
log.Warn("failed to sync data, segment not found in metacache")
err := merr.WrapErrSegmentNotFound(t.segmentID)
return err
@ -198,15 +202,15 @@ func (t *SyncTask) Run() (err error) {
}
actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchSize)}
switch {
case t.isDrop:
actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Dropped))
case t.isFlush:
if t.isFlush {
actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed))
}
t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(t.segment.SegmentID()))
if t.isDrop {
t.metacache.RemoveSegments(metacache.WithSegmentIDs(t.segment.SegmentID()))
}
log.Info("task done", zap.Float64("flushedSize", totalSize))
if !t.isFlush {

View File

@ -244,6 +244,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
})
s.Run("with_delta_data", func() {
s.metacache.EXPECT().RemoveSegments(mock.Anything, mock.Anything).Return(nil).Once()
task := s.getSuiteSyncTask()
task.WithTimeRange(50, 100)
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))

View File

@ -31,6 +31,7 @@ type BufferManager interface {
RemoveChannel(channel string)
// DropChannel remove write buffer and perform drop.
DropChannel(channel string)
DropPartitions(channel string, partitionIDs []int64)
// BufferData put data into channel write buffer.
BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error
// GetCheckpoint returns checkpoint for provided channel.
@ -259,3 +260,16 @@ func (m *bufferManager) DropChannel(channel string) {
buf.Close(true)
}
func (m *bufferManager) DropPartitions(channel string, partitionIDs []int64) {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()
if !ok {
log.Warn("failed to drop partition, channel not maintained in manager", zap.String("channel", channel), zap.Int64s("partitionIDs", partitionIDs))
return
}
buf.DropPartitions(partitionIDs)
}

View File

@ -194,6 +194,27 @@ func (s *ManagerSuite) TestRemoveChannel() {
})
}
func (s *ManagerSuite) TestDropPartitions() {
manager := s.manager
s.Run("drop_not_exist", func() {
s.NotPanics(func() {
manager.DropPartitions("not_exist_channel", nil)
})
})
s.Run("drop_partitions", func() {
wb := NewMockWriteBuffer(s.T())
wb.EXPECT().DropPartitions(mock.Anything).Return()
manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()
manager.DropPartitions(s.channelName, []int64{1})
})
}
func (s *ManagerSuite) TestMemoryCheck() {
manager := s.manager
param := paramtable.Get()

View File

@ -105,6 +105,40 @@ func (_c *MockBufferManager_DropChannel_Call) RunAndReturn(run func(string)) *Mo
return _c
}
// DropPartitions provides a mock function with given fields: channel, partitionIDs
func (_m *MockBufferManager) DropPartitions(channel string, partitionIDs []int64) {
_m.Called(channel, partitionIDs)
}
// MockBufferManager_DropPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropPartitions'
type MockBufferManager_DropPartitions_Call struct {
*mock.Call
}
// DropPartitions is a helper method to define mock.On call
// - channel string
// - partitionIDs []int64
func (_e *MockBufferManager_Expecter) DropPartitions(channel interface{}, partitionIDs interface{}) *MockBufferManager_DropPartitions_Call {
return &MockBufferManager_DropPartitions_Call{Call: _e.mock.On("DropPartitions", channel, partitionIDs)}
}
func (_c *MockBufferManager_DropPartitions_Call) Run(run func(channel string, partitionIDs []int64)) *MockBufferManager_DropPartitions_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].([]int64))
})
return _c
}
func (_c *MockBufferManager_DropPartitions_Call) Return() *MockBufferManager_DropPartitions_Call {
_c.Call.Return()
return _c
}
func (_c *MockBufferManager_DropPartitions_Call) RunAndReturn(run func(string, []int64)) *MockBufferManager_DropPartitions_Call {
_c.Call.Return(run)
return _c
}
// FlushChannel provides a mock function with given fields: ctx, channel, flushTs
func (_m *MockBufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error {
ret := _m.Called(ctx, channel, flushTs)

View File

@ -102,6 +102,39 @@ func (_c *MockWriteBuffer_Close_Call) RunAndReturn(run func(bool)) *MockWriteBuf
return _c
}
// DropPartitions provides a mock function with given fields: partitionIDs
func (_m *MockWriteBuffer) DropPartitions(partitionIDs []int64) {
_m.Called(partitionIDs)
}
// MockWriteBuffer_DropPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropPartitions'
type MockWriteBuffer_DropPartitions_Call struct {
*mock.Call
}
// DropPartitions is a helper method to define mock.On call
// - partitionIDs []int64
func (_e *MockWriteBuffer_Expecter) DropPartitions(partitionIDs interface{}) *MockWriteBuffer_DropPartitions_Call {
return &MockWriteBuffer_DropPartitions_Call{Call: _e.mock.On("DropPartitions", partitionIDs)}
}
func (_c *MockWriteBuffer_DropPartitions_Call) Run(run func(partitionIDs []int64)) *MockWriteBuffer_DropPartitions_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].([]int64))
})
return _c
}
func (_c *MockWriteBuffer_DropPartitions_Call) Return() *MockWriteBuffer_DropPartitions_Call {
_c.Call.Return()
return _c
}
func (_c *MockWriteBuffer_DropPartitions_Call) RunAndReturn(run func([]int64)) *MockWriteBuffer_DropPartitions_Call {
_c.Call.Return(run)
return _c
}
// EvictBuffer provides a mock function with given fields: policies
func (_m *MockWriteBuffer) EvictBuffer(policies ...SyncPolicy) {
_va := make([]interface{}, len(policies))

View File

@ -42,6 +42,7 @@ func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption {
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetCompactedSegmentsPolicy(metacache),
GetSealedSegmentsPolicy(metacache),
GetDroppedSegmentPolicy(metacache),
},
}
}

View File

@ -39,6 +39,14 @@ func wrapSelectSegmentFuncPolicy(fn SelectSegmentFunc, reason string) SelectSegm
}
}
func GetDroppedSegmentPolicy(meta metacache.MetaCache) SyncPolicy {
return wrapSelectSegmentFuncPolicy(
func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {
ids := meta.GetSegmentIDsBy(metacache.WithSegmentState(commonpb.SegmentState_Dropped))
return ids
}, "segment dropped")
}
func GetFullBufferPolicy() SyncPolicy {
return wrapSelectSegmentFuncPolicy(
func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {

View File

@ -73,6 +73,15 @@ func (s *SyncPolicySuite) TestSyncStalePolicy() {
s.Equal(0, len(ids), "")
}
func (s *SyncPolicySuite) TestSyncDroppedPolicy() {
metacache := metacache.NewMockMetaCache(s.T())
policy := GetDroppedSegmentPolicy(metacache)
ids := []int64{1, 2, 3}
metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return(ids)
result := policy.SelectSegments([]*segmentBuffer{}, tsoutil.ComposeTSByTime(time.Now(), 0))
s.ElementsMatch(ids, result)
}
func (s *SyncPolicySuite) TestSealedSegmentsPolicy() {
metacache := metacache.NewMockMetaCache(s.T())
policy := GetSealedSegmentsPolicy(metacache)

View File

@ -46,6 +46,8 @@ type WriteBuffer interface {
GetFlushTimestamp() uint64
// SealSegments is the method to perform `Sync` operation with provided options.
SealSegments(ctx context.Context, segmentIDs []int64) error
// DropPartitions mark segments as Dropped of the partition
DropPartitions(partitionIDs []int64)
// GetCheckpoint returns current channel checkpoint.
// If there are any non-empty segment buffer, returns the earliest buffer start position.
// Otherwise, returns latest buffered checkpoint.
@ -168,6 +170,13 @@ func (wb *writeBufferBase) SealSegments(ctx context.Context, segmentIDs []int64)
return wb.sealSegments(ctx, segmentIDs)
}
func (wb *writeBufferBase) DropPartitions(partitionIDs []int64) {
wb.mut.RLock()
defer wb.mut.RUnlock()
wb.dropPartitions(partitionIDs)
}
func (wb *writeBufferBase) SetFlushTimestamp(flushTs uint64) {
wb.flushTimestamp.Store(flushTs)
}
@ -302,6 +311,14 @@ func (wb *writeBufferBase) sealSegments(_ context.Context, segmentIDs []int64) e
return nil
}
func (wb *writeBufferBase) dropPartitions(partitionIDs []int64) {
// mark segment dropped if partition was dropped
segIDs := wb.metaCache.GetSegmentIDsBy(metacache.WithPartitionIDs(partitionIDs))
wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Dropped),
metacache.WithSegmentIDs(segIDs...),
)
}
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) []*conc.Future[struct{}] {
log := log.Ctx(ctx)
result := make([]*conc.Future[struct{}], 0, len(segmentIDs))
@ -549,6 +566,10 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy
pack.WithFlush()
}
if segmentInfo.State() == commonpb.SegmentState_Dropped {
pack.WithDrop()
}
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Sub(totalMemSize)
return wb.serializer.EncodeBuffer(ctx, pack)

View File

@ -369,6 +369,21 @@ func (s *WriteBufferSuite) TestEvictBuffer() {
})
}
func (s *WriteBufferSuite) TestDropPartitions() {
wb, err := newWriteBufferBase(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{
pkStatsFactory: func(vchannel *datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
},
})
s.Require().NoError(err)
segIDs := []int64{1, 2, 3}
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return(segIDs).Once()
s.metacache.EXPECT().UpdateSegments(mock.AnythingOfType("metacache.SegmentAction"), metacache.WithSegmentIDs(segIDs...)).Return().Once()
wb.dropPartitions([]int64{100, 101})
}
func TestWriteBufferBase(t *testing.T) {
suite.Run(t, new(WriteBufferSuite))
}