diff --git a/internal/datanode/flow_graph_write_node.go b/internal/datanode/flow_graph_write_node.go index 517f3ed98b..657db18a9a 100644 --- a/internal/datanode/flow_graph_write_node.go +++ b/internal/datanode/flow_graph_write_node.go @@ -104,6 +104,10 @@ func (wNode *writeNode) Operate(in []Msg) []Msg { wNode.wbManager.DropChannel(wNode.channelName) } + if len(fgMsg.dropPartitions) > 0 { + wNode.wbManager.DropPartitions(wNode.channelName, fgMsg.dropPartitions) + } + // send delete msg to DeleteNode return []Msg{&res} } diff --git a/internal/datanode/metacache/actions.go b/internal/datanode/metacache/actions.go index 20d18f4acd..8f6d57b42b 100644 --- a/internal/datanode/metacache/actions.go +++ b/internal/datanode/metacache/actions.go @@ -67,6 +67,13 @@ func WithPartitionID(partitionID int64) SegmentFilter { }) } +func WithPartitionIDs(partitionIDs []int64) SegmentFilter { + return SegmentFilterFunc(func(info *SegmentInfo) bool { + idSet := typeutil.NewSet(partitionIDs...) + return idSet.Contain(info.partitionID) + }) +} + func WithSegmentState(states ...commonpb.SegmentState) SegmentFilter { set := typeutil.NewSet(states...) return SegmentFilterFunc(func(info *SegmentInfo) bool { diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index a2a6af3233..7137411cbf 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -136,6 +136,10 @@ func (t *SyncTask) Run() (err error) { var has bool t.segment, has = t.metacache.GetSegmentByID(t.segmentID) if !has { + if t.isDrop { + log.Info("segment dropped, discard sync task") + return nil + } log.Warn("failed to sync data, segment not found in metacache") err := merr.WrapErrSegmentNotFound(t.segmentID) return err @@ -198,15 +202,15 @@ func (t *SyncTask) Run() (err error) { } actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchSize)} - switch { - case t.isDrop: - actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Dropped)) - case t.isFlush: + if t.isFlush { actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed)) } - t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(t.segment.SegmentID())) + if t.isDrop { + t.metacache.RemoveSegments(metacache.WithSegmentIDs(t.segment.SegmentID())) + } + log.Info("task done", zap.Float64("flushedSize", totalSize)) if !t.isFlush { diff --git a/internal/datanode/syncmgr/task_test.go b/internal/datanode/syncmgr/task_test.go index d03aa278d7..1f1c91abe9 100644 --- a/internal/datanode/syncmgr/task_test.go +++ b/internal/datanode/syncmgr/task_test.go @@ -244,6 +244,7 @@ func (s *SyncTaskSuite) TestRunNormal() { }) s.Run("with_delta_data", func() { + s.metacache.EXPECT().RemoveSegments(mock.Anything, mock.Anything).Return(nil).Once() task := s.getSuiteSyncTask() task.WithTimeRange(50, 100) task.WithMetaWriter(BrokerMetaWriter(s.broker, 1)) diff --git a/internal/datanode/writebuffer/manager.go b/internal/datanode/writebuffer/manager.go index b0191f7a9a..eea7070a5e 100644 --- a/internal/datanode/writebuffer/manager.go +++ b/internal/datanode/writebuffer/manager.go @@ -31,6 +31,7 @@ type BufferManager interface { RemoveChannel(channel string) // DropChannel remove write buffer and perform drop. DropChannel(channel string) + DropPartitions(channel string, partitionIDs []int64) // BufferData put data into channel write buffer. BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error // GetCheckpoint returns checkpoint for provided channel. @@ -259,3 +260,16 @@ func (m *bufferManager) DropChannel(channel string) { buf.Close(true) } + +func (m *bufferManager) DropPartitions(channel string, partitionIDs []int64) { + m.mut.RLock() + buf, ok := m.buffers[channel] + m.mut.RUnlock() + + if !ok { + log.Warn("failed to drop partition, channel not maintained in manager", zap.String("channel", channel), zap.Int64s("partitionIDs", partitionIDs)) + return + } + + buf.DropPartitions(partitionIDs) +} diff --git a/internal/datanode/writebuffer/manager_test.go b/internal/datanode/writebuffer/manager_test.go index f8c2d7d1ff..61b3b693a2 100644 --- a/internal/datanode/writebuffer/manager_test.go +++ b/internal/datanode/writebuffer/manager_test.go @@ -194,6 +194,27 @@ func (s *ManagerSuite) TestRemoveChannel() { }) } +func (s *ManagerSuite) TestDropPartitions() { + manager := s.manager + + s.Run("drop_not_exist", func() { + s.NotPanics(func() { + manager.DropPartitions("not_exist_channel", nil) + }) + }) + + s.Run("drop_partitions", func() { + wb := NewMockWriteBuffer(s.T()) + wb.EXPECT().DropPartitions(mock.Anything).Return() + + manager.mut.Lock() + manager.buffers[s.channelName] = wb + manager.mut.Unlock() + + manager.DropPartitions(s.channelName, []int64{1}) + }) +} + func (s *ManagerSuite) TestMemoryCheck() { manager := s.manager param := paramtable.Get() diff --git a/internal/datanode/writebuffer/mock_mananger.go b/internal/datanode/writebuffer/mock_mananger.go index 084b4bec77..0410fb8f1c 100644 --- a/internal/datanode/writebuffer/mock_mananger.go +++ b/internal/datanode/writebuffer/mock_mananger.go @@ -105,6 +105,40 @@ func (_c *MockBufferManager_DropChannel_Call) RunAndReturn(run func(string)) *Mo return _c } +// DropPartitions provides a mock function with given fields: channel, partitionIDs +func (_m *MockBufferManager) DropPartitions(channel string, partitionIDs []int64) { + _m.Called(channel, partitionIDs) +} + +// MockBufferManager_DropPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropPartitions' +type MockBufferManager_DropPartitions_Call struct { + *mock.Call +} + +// DropPartitions is a helper method to define mock.On call +// - channel string +// - partitionIDs []int64 +func (_e *MockBufferManager_Expecter) DropPartitions(channel interface{}, partitionIDs interface{}) *MockBufferManager_DropPartitions_Call { + return &MockBufferManager_DropPartitions_Call{Call: _e.mock.On("DropPartitions", channel, partitionIDs)} +} + +func (_c *MockBufferManager_DropPartitions_Call) Run(run func(channel string, partitionIDs []int64)) *MockBufferManager_DropPartitions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].([]int64)) + }) + return _c +} + +func (_c *MockBufferManager_DropPartitions_Call) Return() *MockBufferManager_DropPartitions_Call { + _c.Call.Return() + return _c +} + +func (_c *MockBufferManager_DropPartitions_Call) RunAndReturn(run func(string, []int64)) *MockBufferManager_DropPartitions_Call { + _c.Call.Return(run) + return _c +} + // FlushChannel provides a mock function with given fields: ctx, channel, flushTs func (_m *MockBufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error { ret := _m.Called(ctx, channel, flushTs) diff --git a/internal/datanode/writebuffer/mock_write_buffer.go b/internal/datanode/writebuffer/mock_write_buffer.go index 2273a01396..033c71f56a 100644 --- a/internal/datanode/writebuffer/mock_write_buffer.go +++ b/internal/datanode/writebuffer/mock_write_buffer.go @@ -102,6 +102,39 @@ func (_c *MockWriteBuffer_Close_Call) RunAndReturn(run func(bool)) *MockWriteBuf return _c } +// DropPartitions provides a mock function with given fields: partitionIDs +func (_m *MockWriteBuffer) DropPartitions(partitionIDs []int64) { + _m.Called(partitionIDs) +} + +// MockWriteBuffer_DropPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropPartitions' +type MockWriteBuffer_DropPartitions_Call struct { + *mock.Call +} + +// DropPartitions is a helper method to define mock.On call +// - partitionIDs []int64 +func (_e *MockWriteBuffer_Expecter) DropPartitions(partitionIDs interface{}) *MockWriteBuffer_DropPartitions_Call { + return &MockWriteBuffer_DropPartitions_Call{Call: _e.mock.On("DropPartitions", partitionIDs)} +} + +func (_c *MockWriteBuffer_DropPartitions_Call) Run(run func(partitionIDs []int64)) *MockWriteBuffer_DropPartitions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]int64)) + }) + return _c +} + +func (_c *MockWriteBuffer_DropPartitions_Call) Return() *MockWriteBuffer_DropPartitions_Call { + _c.Call.Return() + return _c +} + +func (_c *MockWriteBuffer_DropPartitions_Call) RunAndReturn(run func([]int64)) *MockWriteBuffer_DropPartitions_Call { + _c.Call.Return(run) + return _c +} + // EvictBuffer provides a mock function with given fields: policies func (_m *MockWriteBuffer) EvictBuffer(policies ...SyncPolicy) { _va := make([]interface{}, len(policies)) diff --git a/internal/datanode/writebuffer/options.go b/internal/datanode/writebuffer/options.go index 5dd65ae5ec..c6ad5ebd92 100644 --- a/internal/datanode/writebuffer/options.go +++ b/internal/datanode/writebuffer/options.go @@ -42,6 +42,7 @@ func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption { GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), GetCompactedSegmentsPolicy(metacache), GetSealedSegmentsPolicy(metacache), + GetDroppedSegmentPolicy(metacache), }, } } diff --git a/internal/datanode/writebuffer/sync_policy.go b/internal/datanode/writebuffer/sync_policy.go index ac40f1c068..1e5169a3a3 100644 --- a/internal/datanode/writebuffer/sync_policy.go +++ b/internal/datanode/writebuffer/sync_policy.go @@ -39,6 +39,14 @@ func wrapSelectSegmentFuncPolicy(fn SelectSegmentFunc, reason string) SelectSegm } } +func GetDroppedSegmentPolicy(meta metacache.MetaCache) SyncPolicy { + return wrapSelectSegmentFuncPolicy( + func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 { + ids := meta.GetSegmentIDsBy(metacache.WithSegmentState(commonpb.SegmentState_Dropped)) + return ids + }, "segment dropped") +} + func GetFullBufferPolicy() SyncPolicy { return wrapSelectSegmentFuncPolicy( func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 { diff --git a/internal/datanode/writebuffer/sync_policy_test.go b/internal/datanode/writebuffer/sync_policy_test.go index e155145816..d87bc553ce 100644 --- a/internal/datanode/writebuffer/sync_policy_test.go +++ b/internal/datanode/writebuffer/sync_policy_test.go @@ -73,6 +73,15 @@ func (s *SyncPolicySuite) TestSyncStalePolicy() { s.Equal(0, len(ids), "") } +func (s *SyncPolicySuite) TestSyncDroppedPolicy() { + metacache := metacache.NewMockMetaCache(s.T()) + policy := GetDroppedSegmentPolicy(metacache) + ids := []int64{1, 2, 3} + metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return(ids) + result := policy.SelectSegments([]*segmentBuffer{}, tsoutil.ComposeTSByTime(time.Now(), 0)) + s.ElementsMatch(ids, result) +} + func (s *SyncPolicySuite) TestSealedSegmentsPolicy() { metacache := metacache.NewMockMetaCache(s.T()) policy := GetSealedSegmentsPolicy(metacache) diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index 8456fb7ac2..9f0f93f304 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -46,6 +46,8 @@ type WriteBuffer interface { GetFlushTimestamp() uint64 // SealSegments is the method to perform `Sync` operation with provided options. SealSegments(ctx context.Context, segmentIDs []int64) error + // DropPartitions mark segments as Dropped of the partition + DropPartitions(partitionIDs []int64) // GetCheckpoint returns current channel checkpoint. // If there are any non-empty segment buffer, returns the earliest buffer start position. // Otherwise, returns latest buffered checkpoint. @@ -168,6 +170,13 @@ func (wb *writeBufferBase) SealSegments(ctx context.Context, segmentIDs []int64) return wb.sealSegments(ctx, segmentIDs) } +func (wb *writeBufferBase) DropPartitions(partitionIDs []int64) { + wb.mut.RLock() + defer wb.mut.RUnlock() + + wb.dropPartitions(partitionIDs) +} + func (wb *writeBufferBase) SetFlushTimestamp(flushTs uint64) { wb.flushTimestamp.Store(flushTs) } @@ -302,6 +311,14 @@ func (wb *writeBufferBase) sealSegments(_ context.Context, segmentIDs []int64) e return nil } +func (wb *writeBufferBase) dropPartitions(partitionIDs []int64) { + // mark segment dropped if partition was dropped + segIDs := wb.metaCache.GetSegmentIDsBy(metacache.WithPartitionIDs(partitionIDs)) + wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Dropped), + metacache.WithSegmentIDs(segIDs...), + ) +} + func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) []*conc.Future[struct{}] { log := log.Ctx(ctx) result := make([]*conc.Future[struct{}], 0, len(segmentIDs)) @@ -549,6 +566,10 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy pack.WithFlush() } + if segmentInfo.State() == commonpb.SegmentState_Dropped { + pack.WithDrop() + } + metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Sub(totalMemSize) return wb.serializer.EncodeBuffer(ctx, pack) diff --git a/internal/datanode/writebuffer/write_buffer_test.go b/internal/datanode/writebuffer/write_buffer_test.go index 879c8b3fea..22b11ead94 100644 --- a/internal/datanode/writebuffer/write_buffer_test.go +++ b/internal/datanode/writebuffer/write_buffer_test.go @@ -369,6 +369,21 @@ func (s *WriteBufferSuite) TestEvictBuffer() { }) } +func (s *WriteBufferSuite) TestDropPartitions() { + wb, err := newWriteBufferBase(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{ + pkStatsFactory: func(vchannel *datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }, + }) + s.Require().NoError(err) + + segIDs := []int64{1, 2, 3} + s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return(segIDs).Once() + s.metacache.EXPECT().UpdateSegments(mock.AnythingOfType("metacache.SegmentAction"), metacache.WithSegmentIDs(segIDs...)).Return().Once() + + wb.dropPartitions([]int64{100, 101}) +} + func TestWriteBufferBase(t *testing.T) { suite.Run(t, new(WriteBufferSuite)) }