enhance: Simplify querynode tsafe & reduce goroutine number (#38416)

Related to #37630

TSafe manager is too complex for current implementation and each
delegator need one goroutine waiting for tsafe update event.

Tsafe updating could be executed in pipeline. This PR remove tsafe
manager and simplify the entire logic of tsafe updating.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/38066/head
congqixia 2024-12-13 10:56:43 +08:00 committed by GitHub
parent 6ffc57c8dc
commit 10460ed3f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 144 additions and 585 deletions

View File

@ -41,7 +41,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/delegator/deletebuffer"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/function"
"github.com/milvus-io/milvus/internal/util/reduce"
@ -91,6 +90,10 @@ type ShardDelegator interface {
VerifyExcludedSegments(segmentID int64, ts uint64) bool
TryCleanExcludedSegments(ts uint64)
// tsafe
UpdateTSafe(ts uint64)
GetTSafe() uint64
// control
Serviceable() bool
Start()
@ -117,7 +120,6 @@ type shardDelegator struct {
idfOracle IDFOracle
segmentManager segments.SegmentManager
tsafeManager tsafe.Manager
pkOracle pkoracle.PkOracle
level0Mut sync.RWMutex
// stream delete buffer
@ -790,36 +792,9 @@ func (sd *shardDelegator) waitTSafe(ctx context.Context, ts uint64) (uint64, err
}
}
// watchTSafe is the worker function to update serviceable timestamp.
func (sd *shardDelegator) watchTSafe() {
defer sd.lifetime.Done()
listener := sd.tsafeManager.WatchChannel(sd.vchannelName)
sd.updateTSafe()
log := sd.getLogger(context.Background())
for {
select {
case _, ok := <-listener.On():
if !ok {
// listener close
log.Warn("tsafe listener closed")
return
}
sd.updateTSafe()
case <-sd.lifetime.CloseCh():
log.Info("updateTSafe quit")
// shard delegator closed
return
}
}
}
// updateTSafe read current tsafe value from tsafeManager.
func (sd *shardDelegator) updateTSafe() {
func (sd *shardDelegator) UpdateTSafe(tsafe uint64) {
sd.tsCond.L.Lock()
tsafe, err := sd.tsafeManager.Get(sd.vchannelName)
if err != nil {
log.Warn("tsafeManager failed to get lastest", zap.Error(err))
}
if tsafe > sd.latestTsafe.Load() {
sd.latestTsafe.Store(tsafe)
sd.tsCond.Broadcast()
@ -827,6 +802,10 @@ func (sd *shardDelegator) updateTSafe() {
sd.tsCond.L.Unlock()
}
func (sd *shardDelegator) GetTSafe() uint64 {
return sd.latestTsafe.Load()
}
// Close closes the delegator.
func (sd *shardDelegator) Close() {
sd.lifetime.SetState(lifetime.Stopped)
@ -888,7 +867,7 @@ func (sd *shardDelegator) loadPartitionStats(ctx context.Context, partStatsVersi
// NewShardDelegator creates a new ShardDelegator instance with all fields initialized.
func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID UniqueID, channel string, version int64,
workerManager cluster.Manager, manager *segments.Manager, tsafeManager tsafe.Manager, loader segments.Loader,
workerManager cluster.Manager, manager *segments.Manager, loader segments.Loader,
factory msgstream.Factory, startTs uint64, queryHook optimizers.QueryHook, chunkManager storage.ChunkManager,
) (ShardDelegator, error) {
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID),
@ -924,7 +903,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock,
[]string{fmt.Sprint(paramtable.GetNodeID()), channel}),
pkOracle: pkoracle.NewPkOracle(),
tsafeManager: tsafeManager,
latestTsafe: atomic.NewUint64(startTs),
loader: loader,
factory: factory,
@ -957,9 +935,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
m := sync.Mutex{}
sd.tsCond = sync.NewCond(&m)
if sd.lifetime.Add(lifetime.NotStopped) == nil {
go sd.watchTSafe()
}
log.Info("finish build new shardDelegator")
return sd, nil
}

View File

@ -45,7 +45,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/bloomfilter"
"github.com/milvus-io/milvus/internal/util/function"
@ -70,7 +69,6 @@ type DelegatorDataSuite struct {
version int64
workerManager *cluster.MockManager
manager *segments.Manager
tsafeManager tsafe.Manager
loader *segments.MockLoader
mq *msgstream.MockMsgStream
channel metautil.Channel
@ -192,7 +190,7 @@ func (s *DelegatorDataSuite) genCollectionWithFunction() {
}},
}, nil, nil)
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},
@ -204,7 +202,6 @@ func (s *DelegatorDataSuite) genCollectionWithFunction() {
func (s *DelegatorDataSuite) SetupTest() {
s.workerManager = &cluster.MockManager{}
s.manager = segments.NewManager()
s.tsafeManager = tsafe.NewTSafeReplica()
s.loader = &segments.MockLoader{}
// init schema
@ -213,7 +210,7 @@ func (s *DelegatorDataSuite) SetupTest() {
s.rootPath = s.Suite.T().Name()
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath)
s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},
@ -829,7 +826,6 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
s.version,
s.workerManager,
s.manager,
s.tsafeManager,
s.loader,
&msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {

View File

@ -19,7 +19,6 @@ package delegator
import (
"context"
"io"
"sync"
"testing"
"time"
@ -29,7 +28,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
@ -39,13 +37,11 @@ import (
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -61,7 +57,6 @@ type DelegatorSuite struct {
version int64
workerManager *cluster.MockManager
manager *segments.Manager
tsafeManager tsafe.Manager
loader *segments.MockLoader
mq *msgstream.MockMsgStream
@ -85,7 +80,6 @@ func (s *DelegatorSuite) SetupTest() {
s.version = 2000
s.workerManager = &cluster.MockManager{}
s.manager = segments.NewManager()
s.tsafeManager = tsafe.NewTSafeReplica()
s.loader = &segments.MockLoader{}
s.loader.EXPECT().
Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything).
@ -165,7 +159,7 @@ func (s *DelegatorSuite) SetupTest() {
var err error
// s.delegator, err = NewShardDelegator(s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader)
s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},
@ -204,7 +198,7 @@ func (s *DelegatorSuite) TestCreateDelegatorWithFunction() {
}},
}, nil, nil)
_, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
_, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, manager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},
@ -247,7 +241,7 @@ func (s *DelegatorSuite) TestCreateDelegatorWithFunction() {
}},
}, nil, nil)
_, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
_, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, manager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},
@ -1199,75 +1193,6 @@ func TestDelegatorSuite(t *testing.T) {
suite.Run(t, new(DelegatorSuite))
}
func TestDelegatorWatchTsafe(t *testing.T) {
channelName := "default_dml_channel"
tsafeManager := tsafe.NewTSafeReplica()
tsafeManager.Add(context.Background(), channelName, 100)
sd := &shardDelegator{
tsafeManager: tsafeManager,
vchannelName: channelName,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
latestTsafe: atomic.NewUint64(0),
}
defer sd.Close()
m := sync.Mutex{}
sd.tsCond = sync.NewCond(&m)
if sd.lifetime.Add(lifetime.NotStopped) == nil {
go sd.watchTSafe()
}
err := tsafeManager.Set(channelName, 200)
require.NoError(t, err)
assert.Eventually(t, func() bool {
return sd.latestTsafe.Load() == 200
}, time.Second*10, time.Millisecond*10)
}
func TestDelegatorTSafeListenerClosed(t *testing.T) {
channelName := "default_dml_channel"
tsafeManager := tsafe.NewTSafeReplica()
tsafeManager.Add(context.Background(), channelName, 100)
sd := &shardDelegator{
tsafeManager: tsafeManager,
vchannelName: channelName,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
latestTsafe: atomic.NewUint64(0),
}
defer sd.Close()
m := sync.Mutex{}
sd.tsCond = sync.NewCond(&m)
signal := make(chan struct{})
if sd.lifetime.Add(lifetime.NotStopped) == nil {
go func() {
sd.watchTSafe()
close(signal)
}()
}
select {
case <-signal:
assert.FailNow(t, "watchTsafe quit unexpectedly")
case <-time.After(time.Millisecond * 10):
}
tsafeManager.Remove(context.Background(), channelName)
select {
case <-signal:
case <-time.After(time.Second):
assert.FailNow(t, "watchTsafe still working after listener closed")
}
sd.Close()
assert.Equal(t, sd.Serviceable(), false)
assert.Equal(t, sd.Stopped(), true)
}
func TestDelegatorSearchBM25InvalidMetricType(t *testing.T) {
paramtable.Init()
searchReq := &querypb.SearchRequest{

View File

@ -34,7 +34,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
@ -54,7 +53,6 @@ type StreamingForwardSuite struct {
version int64
workerManager *cluster.MockManager
manager *segments.Manager
tsafeManager tsafe.Manager
loader *segments.MockLoader
mq *msgstream.MockMsgStream
@ -76,7 +74,6 @@ func (s *StreamingForwardSuite) SetupTest() {
s.version = 2000
s.workerManager = &cluster.MockManager{}
s.manager = segments.NewManager()
s.tsafeManager = tsafe.NewTSafeReplica()
s.loader = &segments.MockLoader{}
s.loader.EXPECT().
Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything).
@ -154,7 +151,7 @@ func (s *StreamingForwardSuite) SetupTest() {
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath)
s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},
@ -286,7 +283,6 @@ type GrowingMergeL0Suite struct {
schema *schemapb.CollectionSchema
workerManager *cluster.MockManager
manager *segments.Manager
tsafeManager tsafe.Manager
loader *segments.MockLoader
mq *msgstream.MockMsgStream
@ -308,7 +304,6 @@ func (s *GrowingMergeL0Suite) SetupTest() {
s.version = 2000
s.workerManager = &cluster.MockManager{}
s.manager = segments.NewManager()
s.tsafeManager = tsafe.NewTSafeReplica()
s.loader = &segments.MockLoader{}
s.loader.EXPECT().
Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything).
@ -387,7 +382,7 @@ func (s *GrowingMergeL0Suite) SetupTest() {
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath)
s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},

View File

@ -360,6 +360,51 @@ func (_c *MockShardDelegator_GetStatistics_Call) RunAndReturn(run func(context.C
return _c
}
// GetTSafe provides a mock function with given fields:
func (_m *MockShardDelegator) GetTSafe() uint64 {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetTSafe")
}
var r0 uint64
if rf, ok := ret.Get(0).(func() uint64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(uint64)
}
return r0
}
// MockShardDelegator_GetTSafe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTSafe'
type MockShardDelegator_GetTSafe_Call struct {
*mock.Call
}
// GetTSafe is a helper method to define mock.On call
func (_e *MockShardDelegator_Expecter) GetTSafe() *MockShardDelegator_GetTSafe_Call {
return &MockShardDelegator_GetTSafe_Call{Call: _e.mock.On("GetTSafe")}
}
func (_c *MockShardDelegator_GetTSafe_Call) Run(run func()) *MockShardDelegator_GetTSafe_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockShardDelegator_GetTSafe_Call) Return(_a0 uint64) *MockShardDelegator_GetTSafe_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockShardDelegator_GetTSafe_Call) RunAndReturn(run func() uint64) *MockShardDelegator_GetTSafe_Call {
_c.Call.Return(run)
return _c
}
// GetTargetVersion provides a mock function with given fields:
func (_m *MockShardDelegator) GetTargetVersion() int64 {
ret := _m.Called()
@ -1011,6 +1056,39 @@ func (_c *MockShardDelegator_TryCleanExcludedSegments_Call) RunAndReturn(run fun
return _c
}
// UpdateTSafe provides a mock function with given fields: ts
func (_m *MockShardDelegator) UpdateTSafe(ts uint64) {
_m.Called(ts)
}
// MockShardDelegator_UpdateTSafe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateTSafe'
type MockShardDelegator_UpdateTSafe_Call struct {
*mock.Call
}
// UpdateTSafe is a helper method to define mock.On call
// - ts uint64
func (_e *MockShardDelegator_Expecter) UpdateTSafe(ts interface{}) *MockShardDelegator_UpdateTSafe_Call {
return &MockShardDelegator_UpdateTSafe_Call{Call: _e.mock.On("UpdateTSafe", ts)}
}
func (_c *MockShardDelegator_UpdateTSafe_Call) Run(run func(ts uint64)) *MockShardDelegator_UpdateTSafe_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(uint64))
})
return _c
}
func (_c *MockShardDelegator_UpdateTSafe_Call) Return() *MockShardDelegator_UpdateTSafe_Call {
_c.Call.Return()
return _c
}
func (_c *MockShardDelegator_UpdateTSafe_Call) RunAndReturn(run func(uint64)) *MockShardDelegator_UpdateTSafe_Call {
_c.Call.Return(run)
return _c
}
// VerifyExcludedSegments provides a mock function with given fields: segmentID, ts
func (_m *MockShardDelegator) VerifyExcludedSegments(segmentID int64, ts uint64) bool {
ret := _m.Called(segmentID, ts)

View File

@ -19,6 +19,7 @@ package querynodev2
import (
"context"
"fmt"
"math"
"github.com/samber/lo"
"go.uber.org/zap"
@ -60,7 +61,17 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
return nil, err
}
minTsafeChannel, minTsafe := node.tSafeManager.Min()
minTsafeChannel := ""
minTsafe := uint64(math.MaxUint64)
node.delegators.Range(func(channel string, delegator delegator.ShardDelegator) bool {
tsafe := delegator.GetTSafe()
if tsafe < minTsafe {
minTsafeChannel = channel
minTsafe = tsafe
}
return true
})
collections := node.manager.Collection.ListWithName()
nodeID := fmt.Sprint(node.GetNodeID())

View File

@ -17,7 +17,6 @@
package querynodev2
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
@ -28,7 +27,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/pipeline"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -40,10 +38,9 @@ func TestGetPipelineJSON(t *testing.T) {
paramtable.Init()
ch := "ch"
tSafeManager := tsafe.NewTSafeReplica()
tSafeManager.Add(context.Background(), ch, 0)
delegators := typeutil.NewConcurrentMap[string, delegator.ShardDelegator]()
d := delegator.NewMockShardDelegator(t)
d.EXPECT().GetTSafe().Return(0)
delegators.Insert(ch, d)
msgDispatcher := msgdispatcher.NewMockClient(t)
@ -55,7 +52,7 @@ func TestGetPipelineJSON(t *testing.T) {
Segment: segmentManager,
}
pipelineManager := pipeline.NewManager(manager, tSafeManager, msgDispatcher, delegators)
pipelineManager := pipeline.NewManager(manager, msgDispatcher, delegators)
_, err := pipelineManager.Add(1, ch)
assert.NoError(t, err)

View File

@ -35,9 +35,8 @@ type deleteNode struct {
collectionID UniqueID
channel string
manager *DataManager
tSafeManager TSafeManager
delegator delegator.ShardDelegator
manager *DataManager
delegator delegator.ShardDelegator
}
// addDeleteData find the segment of delete column in DeleteMsg and save in deleteData
@ -78,17 +77,13 @@ func (dNode *deleteNode) Operate(in Msg) Msg {
}
// update tSafe
err := dNode.tSafeManager.Set(dNode.channel, nodeMsg.timeRange.timestampMax)
if err != nil {
// should not happen, QueryNode should addTSafe before start pipeline
panic(fmt.Errorf("serviceTimeNode setTSafe timeout, collectionID = %d, err = %s", dNode.collectionID, err))
}
dNode.delegator.UpdateTSafe(nodeMsg.timeRange.timestampMax)
return nil
}
func newDeleteNode(
collectionID UniqueID, channel string,
manager *DataManager, tSafeManager TSafeManager, delegator delegator.ShardDelegator,
manager *DataManager, delegator delegator.ShardDelegator,
maxQueueLength int32,
) *deleteNode {
return &deleteNode{
@ -96,7 +91,6 @@ func newDeleteNode(
collectionID: collectionID,
channel: channel,
manager: manager,
tSafeManager: tSafeManager,
delegator: delegator,
}
}

View File

@ -17,7 +17,6 @@
package pipeline
import (
"context"
"testing"
"github.com/samber/lo"
@ -26,7 +25,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -40,8 +38,6 @@ type DeleteNodeSuite struct {
channel string
timeRange TimeRange
// dependency
tSafeManager TSafeManager
// mocks
manager *segments.Manager
delegator *delegator.MockShardDelegator
@ -93,18 +89,13 @@ func (suite *DeleteNodeSuite) TestBasic() {
}
})
// init dependency
suite.tSafeManager = tsafe.NewTSafeReplica()
suite.tSafeManager.Add(context.Background(), suite.channel, 0)
// build delete node and data
node := newDeleteNode(suite.collectionID, suite.channel, suite.manager, suite.tSafeManager, suite.delegator, 8)
node := newDeleteNode(suite.collectionID, suite.channel, suite.manager, suite.delegator, 8)
in := suite.buildDeleteNodeMsg()
suite.delegator.EXPECT().UpdateTSafe(in.timeRange.timestampMax).Return()
// run
out := node.Operate(in)
suite.Nil(out)
// check tsafe
tt, err := suite.tSafeManager.Get(suite.channel)
suite.NoError(err)
suite.Equal(suite.timeRange.timestampMax, tt)
}
func TestDeleteNode(t *testing.T) {

View File

@ -50,9 +50,8 @@ type manager struct {
dataManager *DataManager
delegators *typeutil.ConcurrentMap[string, delegator.ShardDelegator]
tSafeManager TSafeManager
dispatcher msgdispatcher.Client
mu sync.RWMutex
dispatcher msgdispatcher.Client
mu sync.RWMutex
}
func (m *manager) Num() int {
@ -86,7 +85,7 @@ func (m *manager) Add(collectionID UniqueID, channel string) (Pipeline, error) {
return nil, merr.WrapErrChannelNotFound(channel, "delegator not found")
}
newPipeLine, err := NewPipeLine(collectionID, channel, m.dataManager, m.tSafeManager, m.dispatcher, delegator)
newPipeLine, err := NewPipeLine(collectionID, channel, m.dataManager, m.dispatcher, delegator)
if err != nil {
return nil, merr.WrapErrServiceUnavailable(err.Error(), "failed to create new pipeline")
}
@ -164,23 +163,22 @@ func (m *manager) GetChannelStats() []*metricsinfo.Channel {
ret := make([]*metricsinfo.Channel, 0, len(m.channel2Pipeline))
for ch, p := range m.channel2Pipeline {
tt, err := m.tSafeManager.Get(ch)
if err != nil {
log.Warn("get tSafe failed", zap.String("channel", ch), zap.Error(err))
delegator, ok := m.delegators.Get(ch)
if ok {
tt := delegator.GetTSafe()
ret = append(ret, &metricsinfo.Channel{
Name: ch,
WatchState: p.Status(),
LatestTimeTick: tsoutil.PhysicalTimeFormat(tt),
NodeID: paramtable.GetNodeID(),
CollectionID: p.GetCollectionID(),
})
}
ret = append(ret, &metricsinfo.Channel{
Name: ch,
WatchState: p.Status(),
LatestTimeTick: tsoutil.PhysicalTimeFormat(tt),
NodeID: paramtable.GetNodeID(),
CollectionID: p.GetCollectionID(),
})
}
return ret
}
func NewManager(dataManager *DataManager,
tSafeManager TSafeManager,
dispatcher msgdispatcher.Client,
delegators *typeutil.ConcurrentMap[string, delegator.ShardDelegator],
) Manager {
@ -188,7 +186,6 @@ func NewManager(dataManager *DataManager,
channel2Pipeline: make(map[string]Pipeline),
dataManager: dataManager,
delegators: delegators,
tSafeManager: tSafeManager,
dispatcher: dispatcher,
}
}

View File

@ -26,7 +26,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
@ -40,8 +39,7 @@ type PipelineManagerTestSuite struct {
collectionID int64
channel string
// dependencies
tSafeManager TSafeManager
delegators *typeutil.ConcurrentMap[string, delegator.ShardDelegator]
delegators *typeutil.ConcurrentMap[string, delegator.ShardDelegator]
// mocks
segmentManager *segments.MockSegmentManager
@ -59,9 +57,6 @@ func (suite *PipelineManagerTestSuite) SetupSuite() {
func (suite *PipelineManagerTestSuite) SetupTest() {
paramtable.Init()
// init dependency
// init tsafeManager
suite.tSafeManager = tsafe.NewTSafeReplica()
suite.tSafeManager.Add(context.Background(), suite.channel, 0)
suite.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]()
// init mock
@ -88,7 +83,7 @@ func (suite *PipelineManagerTestSuite) TestBasic() {
Collection: suite.collectionManager,
Segment: suite.segmentManager,
}
pipelineManager := NewManager(manager, suite.tSafeManager, suite.msgDispatcher, suite.delegators)
pipelineManager := NewManager(manager, suite.msgDispatcher, suite.delegators)
defer pipelineManager.Close()
// Add pipeline

View File

@ -48,7 +48,6 @@ func NewPipeLine(
collectionID UniqueID,
channel string,
manager *DataManager,
tSafeManager TSafeManager,
dispatcher msgdispatcher.Client,
delegator delegator.ShardDelegator,
) (Pipeline, error) {
@ -67,7 +66,7 @@ func NewPipeLine(
}
insertNode := newInsertNode(collectionID, channel, manager, delegator, pipelineQueueLength)
deleteNode := newDeleteNode(collectionID, channel, manager, tSafeManager, delegator, pipelineQueueLength)
deleteNode := newDeleteNode(collectionID, channel, manager, delegator, pipelineQueueLength)
// skip add embedding node when collection has no function.
if embeddingNode != nil {

View File

@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
@ -47,9 +46,6 @@ type PipelineTestSuite struct {
insertSegmentIDs []int64
deletePKs []int64
// dependencies
tSafeManager TSafeManager
// mocks
segmentManager *segments.MockSegmentManager
collectionManager *segments.MockCollectionManager
@ -99,11 +95,6 @@ func (suite *PipelineTestSuite) SetupTest() {
suite.delegator = delegator.NewMockShardDelegator(suite.T())
// init mq dispatcher
suite.msgDispatcher = msgdispatcher.NewMockClient(suite.T())
// init dependency
// init tsafeManager
suite.tSafeManager = tsafe.NewTSafeReplica()
suite.tSafeManager.Add(context.Background(), suite.channel, 0)
}
func (suite *PipelineTestSuite) TestBasic() {
@ -139,12 +130,13 @@ func (suite *PipelineTestSuite) TestBasic() {
}
}
})
// build pipleine
manager := &segments.Manager{
Collection: suite.collectionManager,
Segment: suite.segmentManager,
}
pipeline, err := NewPipeLine(suite.collectionID, suite.channel, manager, suite.tSafeManager, suite.msgDispatcher, suite.delegator)
pipeline, err := NewPipeLine(suite.collectionID, suite.channel, manager, suite.msgDispatcher, suite.delegator)
suite.NoError(err)
// Init Consumer
@ -155,20 +147,10 @@ func (suite *PipelineTestSuite) TestBasic() {
suite.NoError(err)
defer pipeline.Close()
// watch tsafe manager
listener := suite.tSafeManager.WatchChannel(suite.channel)
// build input msg
in := suite.buildMsgPack(schema)
suite.delegator.EXPECT().UpdateTSafe(in.EndTs).Return()
suite.msgChan <- in
// wait pipeline work
<-listener.On()
// check tsafe
tsafe, err := suite.tSafeManager.Get(suite.channel)
suite.NoError(err)
suite.Equal(in.EndTs, tsafe)
}
func TestQueryNodePipeline(t *testing.T) {

View File

@ -22,7 +22,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
base "github.com/milvus-io/milvus/internal/util/pipeline"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
@ -51,8 +50,6 @@ type (
DataManager = segments.Manager
Segment = segments.Segment
TSafeManager = tsafe.Manager
BaseNode = base.BaseNode
Msg = base.Msg
)

View File

@ -52,7 +52,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/pipeline"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/registry"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
@ -103,7 +102,6 @@ type QueryNode struct {
// internal components
manager *segments.Manager
clusterManager cluster.Manager
tSafeManager tsafe.Manager
pipelineManager pipeline.Manager
subscribingChannels *typeutil.ConcurrentSet[string]
unsubscribingChannels *typeutil.ConcurrentSet[string]
@ -153,7 +151,6 @@ func NewQueryNode(ctx context.Context, factory dependency.Factory) *QueryNode {
metricsRequest: metricsinfo.NewMetricsRequest(),
}
node.tSafeManager = tsafe.NewTSafeReplica()
expr.Register("querynode", node)
return node
}
@ -375,7 +372,7 @@ func (node *QueryNode) Init() error {
node.manager.SetLoader(node.loader)
node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, node.GetNodeID())
// init pipeline manager
node.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators)
node.pipelineManager = pipeline.NewManager(node.manager, node.dispClient, node.delegators)
err = node.InitSegcore()
if err != nil {

View File

@ -254,7 +254,6 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
req.GetVersion(),
node.clusterManager,
node.manager,
node.tSafeManager,
node.loader,
node.factory,
channel.GetSeekPosition().GetTimestamp(),
@ -273,12 +272,12 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
}()
// create tSafe
node.tSafeManager.Add(ctx, channel.ChannelName, channel.GetSeekPosition().GetTimestamp())
defer func() {
if err != nil {
node.tSafeManager.Remove(ctx, channel.ChannelName)
}
}()
// node.tSafeManager.Add(ctx, channel.ChannelName, channel.GetSeekPosition().GetTimestamp())
// defer func() {
// if err != nil {
// node.tSafeManager.Remove(ctx, channel.ChannelName)
// }
// }()
pipeline, err := node.pipelineManager.Add(req.GetCollectionID(), channel.GetChannelName())
if err != nil {
@ -369,7 +368,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
node.pipelineManager.Remove(req.GetChannelName())
node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing))
_, sealed := node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0))
node.tSafeManager.Remove(ctx, req.GetChannelName())
// node.tSafeManager.Remove(ctx, req.GetChannelName())
node.manager.Collection.Unref(req.GetCollectionID(), uint32(1+sealed))
}

View File

@ -1787,12 +1787,14 @@ func (suite *ServiceSuite) TestGetMetric_Normal() {
sd1 := delegator.NewMockShardDelegator(suite.T())
sd1.EXPECT().Collection().Return(100)
sd1.EXPECT().GetDeleteBufferSize().Return(10, 1000)
sd1.EXPECT().GetTSafe().Return(100)
sd1.EXPECT().Close().Maybe()
suite.node.delegators.Insert("qn_unitest_dml_0_100v0", sd1)
defer suite.node.delegators.GetAndRemove("qn_unitest_dml_0_100v0")
sd2 := delegator.NewMockShardDelegator(suite.T())
sd2.EXPECT().Collection().Return(100)
sd2.EXPECT().GetTSafe().Return(200)
sd2.EXPECT().GetDeleteBufferSize().Return(10, 1000)
sd2.EXPECT().Close().Maybe()
suite.node.delegators.Insert("qn_unitest_dml_1_100v1", sd2)

View File

@ -1,9 +0,0 @@
reviewers:
- aoiasd
- bigsheeper
- congqixia
- yah01
approvers:
- maintainers

View File

@ -1,64 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsafe
type Listener interface {
On() <-chan struct{}
Close()
}
type listener struct {
tsafe *tSafeManager
channel string
ch chan struct{}
}
func (l *listener) On() <-chan struct{} {
return l.ch
}
func (l *listener) Close() {
l.tsafe.mu.Lock()
defer l.tsafe.mu.Unlock()
l.close()
}
// close remove the listener from the tSafeReplica without lock
func (l *listener) close() {
for i, listen := range l.tsafe.listeners[l.channel] {
if l == listen {
close(l.ch)
l.tsafe.listeners[l.channel] = append(l.tsafe.listeners[l.channel][:i], l.tsafe.listeners[l.channel][i+1:]...)
break
}
}
}
func (l *listener) nonBlockingNotify() {
select {
case l.ch <- struct{}{}:
default:
}
}
func newListener(tsafe *tSafeManager, channel string) *listener {
return &listener{
tsafe: tsafe,
channel: channel,
ch: make(chan struct{}, 1),
}
}

View File

@ -1,149 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsafe
import (
"context"
"fmt"
"sync"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
)
// Manager is the interface for tsafe manager.
type Manager interface {
Get(vChannel string) (Timestamp, error)
Set(vChannel string, timestamp Timestamp) error
Add(ctx context.Context, vChannel string, timestamp Timestamp)
Remove(ctx context.Context, vChannel string)
Watch() Listener
WatchChannel(channel string) Listener
Min() (string, Timestamp)
}
// tSafeManager implements `Manager` interface.
type tSafeManager struct {
mu sync.Mutex // guards tSafes
tSafes map[string]*tSafe // map[DMLChannel]*tSafe
listeners map[string][]*listener // map[DMLChannel][]*listener, key "" means all channels.
}
func (t *tSafeManager) Watch() Listener {
return t.WatchChannel("")
}
func (t *tSafeManager) WatchChannel(channel string) Listener {
t.mu.Lock()
defer t.mu.Unlock()
l := newListener(t, channel)
t.listeners[channel] = append(t.listeners[channel], l)
return l
}
func (t *tSafeManager) Add(ctx context.Context, vChannel string, timestamp uint64) {
ts, _ := tsoutil.ParseTS(timestamp)
t.mu.Lock()
defer t.mu.Unlock()
if _, ok := t.tSafes[vChannel]; !ok {
t.tSafes[vChannel] = newTSafe(vChannel, timestamp)
}
log.Ctx(ctx).Info("add tSafe done",
zap.String("channel", vChannel), zap.Time("timestamp", ts))
}
func (t *tSafeManager) Get(vChannel string) (Timestamp, error) {
t.mu.Lock()
defer t.mu.Unlock()
ts, err := t.get(vChannel)
if err != nil {
return 0, err
}
return ts.get(), nil
}
func (t *tSafeManager) Set(vChannel string, timestamp Timestamp) error {
t.mu.Lock()
defer t.mu.Unlock()
ts, err := t.get(vChannel)
if err != nil {
return fmt.Errorf("set tSafe failed, err = %w", err)
}
ts.set(timestamp)
t.notifyAll(vChannel)
return nil
}
func (t *tSafeManager) Remove(ctx context.Context, vChannel string) {
t.mu.Lock()
defer t.mu.Unlock()
tsafe, ok := t.tSafes[vChannel]
if ok {
tsafe.close()
}
for _, l := range t.listeners[vChannel] {
l.close()
}
delete(t.tSafes, vChannel)
delete(t.listeners, vChannel)
log.Ctx(ctx).Info("remove tSafe replica",
zap.String("vChannel", vChannel))
}
func (t *tSafeManager) Min() (string, Timestamp) {
t.mu.Lock()
defer t.mu.Unlock()
var minChannel string
minTt := MaxTimestamp
for channel, tsafe := range t.tSafes {
t := tsafe.get()
if t < minTt && t != 0 {
minChannel = channel
minTt = t
}
}
return minChannel, minTt
}
func (t *tSafeManager) get(vChannel string) (*tSafe, error) {
if _, ok := t.tSafes[vChannel]; !ok {
return nil, fmt.Errorf("cannot found tSafer, vChannel = %s", vChannel)
}
return t.tSafes[vChannel], nil
}
// since notifyAll called by setTSafe, no need to lock
func (t *tSafeManager) notifyAll(channel string) {
for _, l := range t.listeners[""] {
l.nonBlockingNotify()
}
for _, l := range t.listeners[channel] {
l.nonBlockingNotify()
}
}
func NewTSafeReplica() Manager {
replica := &tSafeManager{
tSafes: make(map[string]*tSafe),
listeners: make(map[string][]*listener),
}
return replica
}

View File

@ -1,55 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsafe
import (
"go.uber.org/atomic"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
)
type tSafe struct {
channel string
tSafe atomic.Uint64
closed atomic.Bool
}
func (ts *tSafe) valid() bool {
return !ts.closed.Load()
}
func (ts *tSafe) close() {
ts.closed.Store(true)
}
func (ts *tSafe) get() Timestamp {
return ts.tSafe.Load()
}
func (ts *tSafe) set(t Timestamp) {
ts.tSafe.Store(t)
}
func newTSafe(channel string, timestamp uint64) *tSafe {
ts := &tSafe{
channel: channel,
}
ts.tSafe.Store(timestamp)
ts.closed.Store(false)
return ts
}

View File

@ -1,94 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsafe
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/suite"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
)
type TSafeTestSuite struct {
suite.Suite
tSafeReplica Manager
channel string
time Timestamp
}
func (suite *TSafeTestSuite) SetupSuite() {
suite.channel = "test-channel"
suite.time = uint64(time.Now().Unix())
}
func (suite *TSafeTestSuite) SetupTest() {
suite.tSafeReplica = NewTSafeReplica()
}
// test Basic use of TSafeReplica
func (suite *TSafeTestSuite) TestBasic() {
suite.tSafeReplica.Add(context.Background(), suite.channel, ZeroTimestamp)
t, err := suite.tSafeReplica.Get(suite.channel)
suite.NoError(err)
suite.Equal(ZeroTimestamp, t)
// Add listener
globalWatcher := suite.tSafeReplica.WatchChannel(suite.channel)
channelWatcher := suite.tSafeReplica.Watch()
defer globalWatcher.Close()
defer channelWatcher.Close()
// Test Set tSafe
suite.tSafeReplica.Set(suite.channel, suite.time)
t, err = suite.tSafeReplica.Get(suite.channel)
suite.NoError(err)
suite.Equal(suite.time, t)
// Test listener
select {
case <-globalWatcher.On():
default:
suite.Fail("global watcher should be triggered")
}
select {
case <-channelWatcher.On():
default:
suite.Fail("channel watcher should be triggered")
}
}
func (suite *TSafeTestSuite) TestRemoveAndInvalid() {
suite.tSafeReplica.Add(context.Background(), suite.channel, ZeroTimestamp)
t, err := suite.tSafeReplica.Get(suite.channel)
suite.NoError(err)
suite.Equal(ZeroTimestamp, t)
suite.tSafeReplica.Remove(context.Background(), suite.channel)
_, err = suite.tSafeReplica.Get(suite.channel)
suite.Error(err)
err = suite.tSafeReplica.Set(suite.channel, suite.time)
suite.Error(err)
}
func TestTSafe(t *testing.T) {
suite.Run(t, new(TSafeTestSuite))
}