diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index c620aa1c1b..1defa7fc8c 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -79,6 +79,11 @@ type ShardDelegator interface { SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition) GetTargetVersion() int64 + // manage exclude segments + AddExcludedSegments(excludeInfo map[int64]uint64) + VerifyExcludedSegments(segmentID int64, ts uint64) bool + TryCleanExcludedSegments(ts uint64) + // control Serviceable() bool Start() @@ -121,6 +126,11 @@ type shardDelegator struct { queryHook optimizers.QueryHook partitionStats map[UniqueID]*storage.PartitionStatsSnapshot chunkManager storage.ChunkManager + + excludedSegments *ExcludedSegments + // cause growing segment meta has been stored in segmentManager/distribution/pkOracle/excludeSegments + // in order to make add/remove growing be atomic, need lock before modify these meta info + growingSegmentLock sync.RWMutex } // getLogger returns the zap logger with pre-defined shard attributes. @@ -836,26 +846,29 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni sizePerBlock := paramtable.Get().QueryNodeCfg.DeleteBufferBlockSize.GetAsInt64() log.Info("Init delete cache with list delete buffer", zap.Int64("sizePerBlock", sizePerBlock), zap.Time("startTime", tsoutil.PhysicalTime(startTs))) + excludedSegments := NewExcludedSegments(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.GetAsDuration(time.Second)) + sd := &shardDelegator{ - collectionID: collectionID, - replicaID: replicaID, - vchannelName: channel, - version: version, - collection: collection, - segmentManager: manager.Segment, - workerManager: workerManager, - lifetime: lifetime.NewLifetime(lifetime.Initializing), - distribution: NewDistribution(), - level0Deletions: make(map[int64]*storage.DeleteData), - deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock), - pkOracle: pkoracle.NewPkOracle(), - tsafeManager: tsafeManager, - latestTsafe: atomic.NewUint64(startTs), - loader: loader, - factory: factory, - queryHook: queryHook, - chunkManager: chunkManager, - partitionStats: make(map[UniqueID]*storage.PartitionStatsSnapshot), + collectionID: collectionID, + replicaID: replicaID, + vchannelName: channel, + version: version, + collection: collection, + segmentManager: manager.Segment, + workerManager: workerManager, + lifetime: lifetime.NewLifetime(lifetime.Initializing), + distribution: NewDistribution(), + level0Deletions: make(map[int64]*storage.DeleteData), + deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock), + pkOracle: pkoracle.NewPkOracle(), + tsafeManager: tsafeManager, + latestTsafe: atomic.NewUint64(startTs), + loader: loader, + factory: factory, + queryHook: queryHook, + chunkManager: chunkManager, + partitionStats: make(map[UniqueID]*storage.PartitionStatsSnapshot), + excludedSegments: excludedSegments, } m := sync.Mutex{} sd.tsCond = sync.NewCond(&m) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index c63717dcdf..79e01d9f8e 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -86,6 +86,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { log := sd.getLogger(context.Background()) for segmentID, insertData := range insertRecords { growing := sd.segmentManager.GetGrowing(segmentID) + newGrowingSegment := false if growing == nil { var err error // TODO: It's a wired implementation that growing segment have load info. @@ -111,6 +112,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { zap.Error(err)) panic(err) } + newGrowingSegment = true } err := growing.Insert(context.Background(), insertData.RowIDs, insertData.Timestamps, insertData.InsertRecord) @@ -136,17 +138,29 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { ).Add(float64(len(insertData.RowIDs))) growing.UpdateBloomFilter(insertData.PrimaryKeys) - if !sd.pkOracle.Exists(growing, paramtable.GetNodeID()) { - // register created growing segment after insert, avoid to add empty growing to delegator - sd.pkOracle.Register(growing, paramtable.GetNodeID()) - sd.segmentManager.Put(segments.SegmentTypeGrowing, growing) - sd.addGrowing(SegmentEntry{ - NodeID: paramtable.GetNodeID(), - SegmentID: segmentID, - PartitionID: insertData.PartitionID, - Version: 0, - TargetVersion: initialTargetVersion, - }) + if newGrowingSegment { + sd.growingSegmentLock.Lock() + // check whether segment has been excluded + if ok := sd.VerifyExcludedSegments(segmentID, typeutil.MaxTimestamp); !ok { + log.Warn("try to insert data into released segment, skip it", zap.Int64("segmentID", segmentID)) + sd.growingSegmentLock.Unlock() + growing.Release() + continue + } + + if !sd.pkOracle.Exists(growing, paramtable.GetNodeID()) { + // register created growing segment after insert, avoid to add empty growing to delegator + sd.pkOracle.Register(growing, paramtable.GetNodeID()) + sd.segmentManager.Put(segments.SegmentTypeGrowing, growing) + sd.addGrowing(SegmentEntry{ + NodeID: paramtable.GetNodeID(), + SegmentID: segmentID, + PartitionID: insertData.PartitionID, + Version: 0, + TargetVersion: initialTargetVersion, + }) + } + sd.growingSegmentLock.Unlock() } log.Debug("insert into growing segment", @@ -819,6 +833,16 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele sealed = lo.Map(req.GetSegmentIDs(), convertSealed) } + if len(growing) > 0 { + sd.growingSegmentLock.Lock() + } + // when we try to release a segment, add it to pipeline's exclude list first + // in case of consumed it's growing segment again + droppedInfos := lo.SliceToMap(req.GetSegmentIDs(), func(id int64) (int64, uint64) { + return id, typeutil.MaxTimestamp + }) + sd.AddExcludedSegments(droppedInfos) + signal := sd.distribution.RemoveDistributions(sealed, growing) // wait cleared signal <-signal @@ -836,22 +860,26 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele ) } + var releaseErr error if !force { worker, err := sd.workerManager.GetWorker(ctx, targetNodeID) if err != nil { - log.Warn("delegator failed to find worker", - zap.Error(err), - ) - return err + log.Warn("delegator failed to find worker", zap.Error(err)) + releaseErr = err } req.Base.TargetID = targetNodeID err = worker.ReleaseSegments(ctx, req) if err != nil { - log.Warn("worker failed to release segments", - zap.Error(err), - ) + log.Warn("worker failed to release segments", zap.Error(err)) + releaseErr = err } - return err + } + if len(growing) > 0 { + sd.growingSegmentLock.Unlock() + } + + if releaseErr != nil { + return releaseErr } if hasLevel0 { @@ -907,3 +935,17 @@ func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget [] func (sd *shardDelegator) GetTargetVersion() int64 { return sd.distribution.getTargetVersion() } + +func (sd *shardDelegator) AddExcludedSegments(excludeInfo map[int64]uint64) { + sd.excludedSegments.Insert(excludeInfo) +} + +func (sd *shardDelegator) VerifyExcludedSegments(segmentID int64, ts uint64) bool { + return sd.excludedSegments.Verify(segmentID, ts) +} + +func (sd *shardDelegator) TryCleanExcludedSegments(ts uint64) { + if sd.excludedSegments.ShouldClean() { + sd.excludedSegments.CleanInvalid(ts) + } +} diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 51b71f21f3..8754d53c50 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -21,6 +21,7 @@ import ( "path" "strconv" "testing" + "time" bloom "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" @@ -69,6 +70,11 @@ type DelegatorDataSuite struct { func (s *DelegatorDataSuite) SetupSuite() { paramtable.Init() paramtable.SetNodeID(1) + paramtable.Get().Save(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.Key, "1") +} + +func (s *DelegatorDataSuite) TearDownSuite() { + paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.Key) } func (s *DelegatorDataSuite) SetupTest() { @@ -1147,6 +1153,20 @@ func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() { s.Equal(2, len(result.Pks)) } +func (s *DelegatorDataSuite) TestDelegatorData_ExcludeSegments() { + s.delegator.AddExcludedSegments(map[int64]uint64{ + 1: 3, + }) + + s.False(s.delegator.VerifyExcludedSegments(1, 1)) + s.True(s.delegator.VerifyExcludedSegments(1, 5)) + + time.Sleep(time.Second * 1) + s.delegator.TryCleanExcludedSegments(4) + s.True(s.delegator.VerifyExcludedSegments(1, 1)) + s.True(s.delegator.VerifyExcludedSegments(1, 5)) +} + func TestDelegatorDataSuite(t *testing.T) { suite.Run(t, new(DelegatorDataSuite)) } diff --git a/internal/querynodev2/delegator/exclude_info.go b/internal/querynodev2/delegator/exclude_info.go new file mode 100644 index 0000000000..72d0354e34 --- /dev/null +++ b/internal/querynodev2/delegator/exclude_info.go @@ -0,0 +1,88 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delegator + +import ( + "sync" + "time" + + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" +) + +type ExcludedSegments struct { + mu sync.RWMutex + segments map[int64]uint64 // segmentID -> Excluded TS + lastClean atomic.Time + cleanInterval time.Duration +} + +func NewExcludedSegments(cleanInterval time.Duration) *ExcludedSegments { + return &ExcludedSegments{ + segments: make(map[int64]uint64), + cleanInterval: cleanInterval, + } +} + +func (s *ExcludedSegments) Insert(excludeInfo map[int64]uint64) { + s.mu.Lock() + defer s.mu.Unlock() + + for segmentID, ts := range excludeInfo { + log.Debug("add exclude info", + zap.Int64("segmentID", segmentID), + zap.Uint64("ts", ts), + ) + s.segments[segmentID] = ts + } +} + +// return false if segment has been excluded +func (s *ExcludedSegments) Verify(segmentID int64, ts uint64) bool { + s.mu.RLock() + defer s.mu.RUnlock() + if excludeTs, ok := s.segments[segmentID]; ok && ts <= excludeTs { + return false + } + return true +} + +func (s *ExcludedSegments) CleanInvalid(ts uint64) { + s.mu.Lock() + defer s.mu.Unlock() + + invalidExcludedInfos := []int64{} + for segmentsID, excludeTs := range s.segments { + if excludeTs < ts { + invalidExcludedInfos = append(invalidExcludedInfos, segmentsID) + } + } + + for _, segmentID := range invalidExcludedInfos { + delete(s.segments, segmentID) + log.Info("remove segment from exclude info", zap.Int64("segmentID", segmentID)) + } + s.lastClean.Store(time.Now()) +} + +func (s *ExcludedSegments) ShouldClean() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return time.Since(s.lastClean.Load()) > s.cleanInterval +} diff --git a/internal/querynodev2/delegator/exclude_info_test.go b/internal/querynodev2/delegator/exclude_info_test.go new file mode 100644 index 0000000000..b04231cbd2 --- /dev/null +++ b/internal/querynodev2/delegator/exclude_info_test.go @@ -0,0 +1,56 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delegator + +import ( + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +type ExcludedInfoSuite struct { + suite.Suite + + excludedSegments ExcludedSegments +} + +func (s *ExcludedInfoSuite) SetupSuite() { + s.excludedSegments = *NewExcludedSegments(1 * time.Second) +} + +func (s *ExcludedInfoSuite) TestBasic() { + s.excludedSegments.Insert(map[int64]uint64{ + 1: 3, + }) + + s.False(s.excludedSegments.Verify(1, 1)) + s.True(s.excludedSegments.Verify(1, 4)) + + time.Sleep(1 * time.Second) + + s.True(s.excludedSegments.ShouldClean()) + s.excludedSegments.CleanInvalid(5) + s.Len(s.excludedSegments.segments, 0) + + s.True(s.excludedSegments.Verify(1, 1)) + s.True(s.excludedSegments.Verify(1, 4)) +} + +func TestExcludedInfoSuite(t *testing.T) { + suite.Run(t, new(ExcludedInfoSuite)) +} diff --git a/internal/querynodev2/delegator/mock_delegator.go b/internal/querynodev2/delegator/mock_delegator.go index ae0191b443..383dc5bc78 100644 --- a/internal/querynodev2/delegator/mock_delegator.go +++ b/internal/querynodev2/delegator/mock_delegator.go @@ -253,6 +253,39 @@ func (_c *MockShardDelegator_GetTargetVersion_Call) RunAndReturn(run func() int6 return _c } +// AddExcludedSegments provides a mock function with given fields: excludeInfo +func (_m *MockShardDelegator) AddExcludedSegments(excludeInfo map[int64]uint64) { + _m.Called(excludeInfo) +} + +// MockShardDelegator_AddExcludedSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddExcludedSegments' +type MockShardDelegator_AddExcludedSegments_Call struct { + *mock.Call +} + +// AddExcludedSegments is a helper method to define mock.On call +// - excludeInfo map[int64]uint64 +func (_e *MockShardDelegator_Expecter) AddExcludedSegments(excludeInfo interface{}) *MockShardDelegator_AddExcludedSegments_Call { + return &MockShardDelegator_AddExcludedSegments_Call{Call: _e.mock.On("AddExcludedSegments", excludeInfo)} +} + +func (_c *MockShardDelegator_AddExcludedSegments_Call) Run(run func(excludeInfo map[int64]uint64)) *MockShardDelegator_AddExcludedSegments_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(map[int64]uint64)) + }) + return _c +} + +func (_c *MockShardDelegator_AddExcludedSegments_Call) Return() *MockShardDelegator_AddExcludedSegments_Call { + _c.Call.Return() + return _c +} + +func (_c *MockShardDelegator_AddExcludedSegments_Call) RunAndReturn(run func(map[int64]uint64)) *MockShardDelegator_AddExcludedSegments_Call { + _c.Call.Return(run) + return _c +} + // LoadGrowing provides a mock function with given fields: ctx, infos, version func (_m *MockShardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error { ret := _m.Called(ctx, infos, version) @@ -763,6 +796,82 @@ func (_c *MockShardDelegator_SyncTargetVersion_Call) RunAndReturn(run func(int64 return _c } +// TryCleanExcludedSegments provides a mock function with given fields: ts +func (_m *MockShardDelegator) TryCleanExcludedSegments(ts uint64) { + _m.Called(ts) +} + +// MockShardDelegator_TryCleanExcludedSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TryCleanExcludedSegments' +type MockShardDelegator_TryCleanExcludedSegments_Call struct { + *mock.Call +} + +// TryCleanExcludedSegments is a helper method to define mock.On call +// - ts uint64 +func (_e *MockShardDelegator_Expecter) TryCleanExcludedSegments(ts interface{}) *MockShardDelegator_TryCleanExcludedSegments_Call { + return &MockShardDelegator_TryCleanExcludedSegments_Call{Call: _e.mock.On("TryCleanExcludedSegments", ts)} +} + +func (_c *MockShardDelegator_TryCleanExcludedSegments_Call) Run(run func(ts uint64)) *MockShardDelegator_TryCleanExcludedSegments_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(uint64)) + }) + return _c +} + +func (_c *MockShardDelegator_TryCleanExcludedSegments_Call) Return() *MockShardDelegator_TryCleanExcludedSegments_Call { + _c.Call.Return() + return _c +} + +func (_c *MockShardDelegator_TryCleanExcludedSegments_Call) RunAndReturn(run func(uint64)) *MockShardDelegator_TryCleanExcludedSegments_Call { + _c.Call.Return(run) + return _c +} + +// VerifyExcludedSegments provides a mock function with given fields: segmentID, ts +func (_m *MockShardDelegator) VerifyExcludedSegments(segmentID int64, ts uint64) bool { + ret := _m.Called(segmentID, ts) + + var r0 bool + if rf, ok := ret.Get(0).(func(int64, uint64) bool); ok { + r0 = rf(segmentID, ts) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockShardDelegator_VerifyExcludedSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'VerifyExcludedSegments' +type MockShardDelegator_VerifyExcludedSegments_Call struct { + *mock.Call +} + +// VerifyExcludedSegments is a helper method to define mock.On call +// - segmentID int64 +// - ts uint64 +func (_e *MockShardDelegator_Expecter) VerifyExcludedSegments(segmentID interface{}, ts interface{}) *MockShardDelegator_VerifyExcludedSegments_Call { + return &MockShardDelegator_VerifyExcludedSegments_Call{Call: _e.mock.On("VerifyExcludedSegments", segmentID, ts)} +} + +func (_c *MockShardDelegator_VerifyExcludedSegments_Call) Run(run func(segmentID int64, ts uint64)) *MockShardDelegator_VerifyExcludedSegments_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(uint64)) + }) + return _c +} + +func (_c *MockShardDelegator_VerifyExcludedSegments_Call) Return(_a0 bool) *MockShardDelegator_VerifyExcludedSegments_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockShardDelegator_VerifyExcludedSegments_Call) RunAndReturn(run func(int64, uint64) bool) *MockShardDelegator_VerifyExcludedSegments_Call { + _c.Call.Return(run) + return _c +} + // Version provides a mock function with given fields: func (_m *MockShardDelegator) Version() int64 { ret := _m.Called() diff --git a/internal/querynodev2/pipeline/filter_node.go b/internal/querynodev2/pipeline/filter_node.go index 5fb6f18b30..d13e2bc5a0 100644 --- a/internal/querynodev2/pipeline/filter_node.go +++ b/internal/querynodev2/pipeline/filter_node.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/querynodev2/delegator" base "github.com/milvus-io/milvus/internal/util/pipeline" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -37,10 +38,11 @@ type filterNode struct { *BaseNode collectionID UniqueID manager *DataManager - excludedSegments *ExcludedSegments channel string InsertMsgPolicys []InsertMsgFilter DeleteMsgPolicys []DeleteMsgFilter + + delegator delegator.ShardDelegator } func (fNode *filterNode) Operate(in Msg) Msg { @@ -95,9 +97,7 @@ func (fNode *filterNode) Operate(in Msg) Msg { out.append(msg) } } - if fNode.excludedSegments.ShouldClean() { - fNode.excludedSegments.CleanInvalid(streamMsgPack.EndTs) - } + fNode.delegator.TryCleanExcludedSegments(streamMsgPack.EndTs) metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Inc() return out } @@ -115,6 +115,14 @@ func (fNode *filterNode) filtrate(c *Collection, msg msgstream.TsMsg) error { } } + // check segment whether excluded + ok := fNode.delegator.VerifyExcludedSegments(insertMsg.SegmentID, insertMsg.EndTimestamp) + if !ok { + m := fmt.Sprintf("Segment excluded, id: %d", insertMsg.GetSegmentID()) + return merr.WrapErrSegmentLack(insertMsg.GetSegmentID(), m) + } + return nil + case commonpb.MsgType_Delete: deleteMsg := msg.(*msgstream.DeleteMsg) metrics.QueryNodeConsumeCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Add(float64(deleteMsg.Size())) @@ -134,20 +142,19 @@ func newFilterNode( collectionID int64, channel string, manager *DataManager, - excludedSegments *ExcludedSegments, + delegator delegator.ShardDelegator, maxQueueLength int32, ) *filterNode { return &filterNode{ - BaseNode: base.NewBaseNode(fmt.Sprintf("FilterNode-%s", channel), maxQueueLength), - collectionID: collectionID, - manager: manager, - channel: channel, - excludedSegments: excludedSegments, + BaseNode: base.NewBaseNode(fmt.Sprintf("FilterNode-%s", channel), maxQueueLength), + collectionID: collectionID, + manager: manager, + channel: channel, + delegator: delegator, InsertMsgPolicys: []InsertMsgFilter{ InsertNotAligned, InsertEmpty, InsertOutOfTarget, - InsertExcluded, }, DeleteMsgPolicys: []DeleteMsgFilter{ DeleteNotAligned, diff --git a/internal/querynodev2/pipeline/filter_node_test.go b/internal/querynodev2/pipeline/filter_node_test.go index b25d8d6930..001ca4cef2 100644 --- a/internal/querynodev2/pipeline/filter_node_test.go +++ b/internal/querynodev2/pipeline/filter_node_test.go @@ -18,12 +18,13 @@ package pipeline import ( "testing" - "time" "github.com/samber/lo" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -38,7 +39,6 @@ type FilterNodeSuite struct { channel string validSegmentIDs []int64 - excludedSegments *ExcludedSegments excludedSegmentIDs []int64 insertSegmentIDs []int64 deleteSegmentSum int @@ -47,6 +47,8 @@ type FilterNodeSuite struct { // mocks manager *segments.Manager + + delegator *delegator.MockShardDelegator } func (suite *FilterNodeSuite) SetupSuite() { @@ -61,13 +63,7 @@ func (suite *FilterNodeSuite) SetupSuite() { suite.deleteSegmentSum = 4 suite.errSegmentID = 7 - // init excludedSegment - suite.excludedSegments = NewExcludedSegments(0 * time.Second) - excludeInfo := map[int64]uint64{} - for _, id := range suite.excludedSegmentIDs { - excludeInfo[id] = 1 - } - suite.excludedSegments.Insert(excludeInfo) + suite.delegator = delegator.NewMockShardDelegator(suite.T()) } // test filter node with collection load collection @@ -91,7 +87,11 @@ func (suite *FilterNodeSuite) TestWithLoadCollection() { Segment: mockSegmentManager, } - node := newFilterNode(suite.collectionID, suite.channel, suite.manager, suite.excludedSegments, 8) + suite.delegator.EXPECT().VerifyExcludedSegments(mock.Anything, mock.Anything).RunAndReturn(func(segmentID int64, ts uint64) bool { + return !(lo.Contains(suite.excludedSegmentIDs, segmentID) && ts <= 1) + }) + suite.delegator.EXPECT().TryCleanExcludedSegments(mock.Anything) + node := newFilterNode(suite.collectionID, suite.channel, suite.manager, suite.delegator, 8) in := suite.buildMsgPack() out := node.Operate(in) @@ -124,7 +124,11 @@ func (suite *FilterNodeSuite) TestWithLoadPartation() { Segment: mockSegmentManager, } - node := newFilterNode(suite.collectionID, suite.channel, suite.manager, suite.excludedSegments, 8) + suite.delegator.EXPECT().VerifyExcludedSegments(mock.Anything, mock.Anything).RunAndReturn(func(segmentID int64, ts uint64) bool { + return !(lo.Contains(suite.excludedSegmentIDs, segmentID) && ts <= 1) + }) + suite.delegator.EXPECT().TryCleanExcludedSegments(mock.Anything) + node := newFilterNode(suite.collectionID, suite.channel, suite.manager, suite.delegator, 8) in := suite.buildMsgPack() out := node.Operate(in) diff --git a/internal/querynodev2/pipeline/filter_policy.go b/internal/querynodev2/pipeline/filter_policy.go index ec2b83f38a..90cf6b9ffb 100644 --- a/internal/querynodev2/pipeline/filter_policy.go +++ b/internal/querynodev2/pipeline/filter_policy.go @@ -17,14 +17,6 @@ package pipeline import ( - "fmt" - "sync" - "time" - - "go.uber.org/atomic" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -59,15 +51,6 @@ func InsertOutOfTarget(n *filterNode, c *Collection, msg *InsertMsg) error { return nil } -func InsertExcluded(n *filterNode, c *Collection, msg *InsertMsg) error { - ok := n.excludedSegments.Verify(msg.SegmentID, msg.EndTimestamp) - if !ok { - m := fmt.Sprintf("Segment excluded, id: %d", msg.GetSegmentID()) - return merr.WrapErrSegmentLack(msg.GetSegmentID(), m) - } - return nil -} - func DeleteNotAligned(n *filterNode, c *Collection, msg *DeleteMsg) error { err := msg.CheckAligned() if err != nil { @@ -91,60 +74,3 @@ func DeleteOutOfTarget(n *filterNode, c *Collection, msg *DeleteMsg) error { // all growing will be in-memory to support dynamic partition load/release return nil } - -type ExcludedSegments struct { - mu sync.RWMutex - segments map[int64]uint64 // segmentID -> Excluded TS - lastClean atomic.Time - cleanInterval time.Duration -} - -func NewExcludedSegments(cleanInterval time.Duration) *ExcludedSegments { - return &ExcludedSegments{ - segments: make(map[int64]uint64), - cleanInterval: cleanInterval, - } -} - -func (s *ExcludedSegments) Insert(excludeInfo map[int64]uint64) { - s.mu.Lock() - defer s.mu.Unlock() - - for segmentID, ts := range excludeInfo { - log.Debug("add exclude info", - zap.Int64("segmentID", segmentID), - zap.Uint64("ts", ts), - ) - s.segments[segmentID] = ts - } -} - -func (s *ExcludedSegments) Verify(segmentID int64, ts uint64) bool { - s.mu.RLock() - defer s.mu.RUnlock() - if excludeTs, ok := s.segments[segmentID]; ok && ts <= excludeTs { - return false - } - return true -} - -func (s *ExcludedSegments) CleanInvalid(ts uint64) { - s.mu.Lock() - defer s.mu.Unlock() - - invalidExcludedInfos := []int64{} - for segmentsID, excludeTs := range s.segments { - if excludeTs < ts { - invalidExcludedInfos = append(invalidExcludedInfos, segmentsID) - } - } - - for _, segmentID := range invalidExcludedInfos { - delete(s.segments, segmentID) - } - s.lastClean.Store(time.Now()) -} - -func (s *ExcludedSegments) ShouldClean() bool { - return time.Since(s.lastClean.Load()) > s.cleanInterval -} diff --git a/internal/querynodev2/pipeline/pipeline.go b/internal/querynodev2/pipeline/pipeline.go index 995adec414..dc2cd70780 100644 --- a/internal/querynodev2/pipeline/pipeline.go +++ b/internal/querynodev2/pipeline/pipeline.go @@ -17,8 +17,6 @@ package pipeline import ( - "time" - "github.com/milvus-io/milvus/internal/querynodev2/delegator" base "github.com/milvus-io/milvus/internal/util/pipeline" "github.com/milvus-io/milvus/pkg/metrics" @@ -29,18 +27,12 @@ import ( // pipeline used for querynode type Pipeline interface { base.StreamPipeline - ExcludedSegments(info map[int64]uint64) } type pipeline struct { base.StreamPipeline - excludedSegments *ExcludedSegments - collectionID UniqueID -} - -func (p *pipeline) ExcludedSegments(excludeInfo map[int64]uint64) { //(segInfos ...*datapb.SegmentInfo) { - p.excludedSegments.Insert(excludeInfo) + collectionID UniqueID } func (p *pipeline) Close() { @@ -57,15 +49,13 @@ func NewPipeLine( delegator delegator.ShardDelegator, ) (Pipeline, error) { pipelineQueueLength := paramtable.Get().QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32() - excludedSegments := NewExcludedSegments(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.GetAsDuration(time.Second)) p := &pipeline{ - collectionID: collectionID, - excludedSegments: excludedSegments, - StreamPipeline: base.NewPipelineWithStream(dispatcher, nodeCtxTtInterval, enableTtChecker, channel), + collectionID: collectionID, + StreamPipeline: base.NewPipelineWithStream(dispatcher, nodeCtxTtInterval, enableTtChecker, channel), } - filterNode := newFilterNode(collectionID, channel, manager, excludedSegments, pipelineQueueLength) + filterNode := newFilterNode(collectionID, channel, manager, delegator, pipelineQueueLength) insertNode := newInsertNode(collectionID, channel, manager, delegator, pipelineQueueLength) deleteNode := newDeleteNode(collectionID, channel, manager, tSafeManager, delegator, pipelineQueueLength) p.Add(filterNode, insertNode, deleteNode) diff --git a/internal/querynodev2/pipeline/pipeline_test.go b/internal/querynodev2/pipeline/pipeline_test.go index 4d7aa36cf4..ecad79b9ee 100644 --- a/internal/querynodev2/pipeline/pipeline_test.go +++ b/internal/querynodev2/pipeline/pipeline_test.go @@ -119,6 +119,10 @@ func (suite *PipelineTestSuite) TestBasic() { suite.msgDispatcher.EXPECT().Deregister(suite.channel) // mock delegator + suite.delegator.EXPECT().AddExcludedSegments(mock.Anything).Maybe() + suite.delegator.EXPECT().VerifyExcludedSegments(mock.Anything, mock.Anything).Return(true).Maybe() + suite.delegator.EXPECT().TryCleanExcludedSegments(mock.Anything).Maybe() + suite.delegator.EXPECT().ProcessInsert(mock.Anything).Run( func(insertRecords map[int64]*delegator.InsertData) { for segmentID := range insertRecords { diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index d782d89c15..1ea9b779a7 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -291,17 +291,17 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm info := req.GetSegmentInfos()[id] return id, info.GetDmlPosition().GetTimestamp() }) - pipeline.ExcludedSegments(growingInfo) + delegator.AddExcludedSegments(growingInfo) flushedInfo := lo.SliceToMap(channel.GetFlushedSegmentIds(), func(id int64) (int64, uint64) { return id, typeutil.MaxTimestamp }) - pipeline.ExcludedSegments(flushedInfo) + delegator.AddExcludedSegments(flushedInfo) for _, channelInfo := range req.GetInfos() { droppedInfos := lo.SliceToMap(channelInfo.GetDroppedSegmentIds(), func(id int64) (int64, uint64) { return id, typeutil.MaxTimestamp }) - pipeline.ExcludedSegments(droppedInfos) + delegator.AddExcludedSegments(droppedInfos) } err = loadL0Segments(ctx, delegator, req) @@ -545,16 +545,6 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release return merr.Status(err), nil } - // when we try to release a segment, add it to pipeline's exclude list first - // in case of consumed it's growing segment again - pipeline := node.pipelineManager.Get(req.GetShard()) - if pipeline != nil { - droppedInfos := lo.SliceToMap(req.GetSegmentIDs(), func(id int64) (int64, uint64) { - return id, typeutil.MaxTimestamp - }) - pipeline.ExcludedSegments(droppedInfos) - } - req.NeedTransfer = false err := delegator.ReleaseSegments(ctx, req, false) if err != nil { @@ -1320,14 +1310,10 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi }) case querypb.SyncType_UpdateVersion: log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion())) - pipeline := node.pipelineManager.Get(req.GetChannel()) - if pipeline != nil { - droppedInfos := lo.SliceToMap(action.GetDroppedInTarget(), func(id int64) (int64, uint64) { - return id, typeutil.MaxTimestamp - }) - - pipeline.ExcludedSegments(droppedInfos) - } + droppedInfos := lo.SliceToMap(action.GetDroppedInTarget(), func(id int64) (int64, uint64) { + return id, typeutil.MaxTimestamp + }) + shardDelegator.AddExcludedSegments(droppedInfos) shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(), action.GetSealedInTarget(), action.GetDroppedInTarget(), action.GetCheckpoint()) default: diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 120ff98bca..f8ecfb104a 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -883,8 +883,10 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() { suite.node.delegators.Insert(suite.vchannel, delegator) defer suite.node.delegators.GetAndRemove(suite.vchannel) - delegator.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")). - Return(nil) + delegator.EXPECT().AddExcludedSegments(mock.Anything).Maybe() + delegator.EXPECT().VerifyExcludedSegments(mock.Anything, mock.Anything).Return(true).Maybe() + delegator.EXPECT().TryCleanExcludedSegments(mock.Anything).Maybe() + delegator.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).Return(nil) // data schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, false) req := &querypb.LoadSegmentsRequest{ @@ -932,6 +934,9 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() { delegator := &delegator.MockShardDelegator{} suite.node.delegators.Insert(suite.vchannel, delegator) defer suite.node.delegators.GetAndRemove(suite.vchannel) + delegator.EXPECT().AddExcludedSegments(mock.Anything).Maybe() + delegator.EXPECT().VerifyExcludedSegments(mock.Anything, mock.Anything).Return(true).Maybe() + delegator.EXPECT().TryCleanExcludedSegments(mock.Anything).Maybe() delegator.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")). Return(errors.New("mocked error")) // data @@ -1091,6 +1096,9 @@ func (suite *ServiceSuite) TestReleaseSegments_Transfer() { suite.node.delegators.Insert(suite.vchannel, delegator) defer suite.node.delegators.GetAndRemove(suite.vchannel) + delegator.EXPECT().AddExcludedSegments(mock.Anything).Maybe() + delegator.EXPECT().VerifyExcludedSegments(mock.Anything, mock.Anything).Return(true).Maybe() + delegator.EXPECT().TryCleanExcludedSegments(mock.Anything).Maybe() delegator.EXPECT().ReleaseSegments(mock.Anything, mock.AnythingOfType("*querypb.ReleaseSegmentsRequest"), false). Return(errors.New("mocked error"))