mirror of https://github.com/milvus-io/milvus.git
fix: avoid concurrent sync and segment tasks (#33087)
issue: https://github.com/milvus-io/milvus/issues/33089 --------- Signed-off-by: sunby <sunbingyi1992@gmail.com>pull/33127/head
parent
d842efbff3
commit
bbf3a27472
|
@ -36,6 +36,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
task2 "github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
|
@ -69,6 +70,7 @@ type CollectionObserverSuite struct {
|
|||
targetObserver *TargetObserver
|
||||
leaderObserver *LeaderObserver
|
||||
checkerController *checkers.CheckerController
|
||||
scheduler *task2.MockScheduler
|
||||
|
||||
// Test object
|
||||
ob *CollectionObserver
|
||||
|
@ -201,7 +203,10 @@ func (suite *CollectionObserverSuite) SetupTest() {
|
|||
suite.checkerController = &checkers.CheckerController{}
|
||||
|
||||
mockCluster := session.NewMockCluster(suite.T())
|
||||
suite.leaderObserver = NewLeaderObserver(suite.dist, suite.meta, suite.targetMgr, suite.broker, mockCluster, nodeMgr)
|
||||
suite.scheduler = task2.NewMockScheduler(suite.T())
|
||||
suite.scheduler.EXPECT().Sync(mock.Anything, mock.Anything).Return(true).Maybe()
|
||||
suite.scheduler.EXPECT().RemoveSync(mock.Anything, mock.Anything).Maybe()
|
||||
suite.leaderObserver = NewLeaderObserver(suite.dist, suite.meta, suite.targetMgr, suite.broker, mockCluster, nodeMgr, suite.scheduler)
|
||||
mockCluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
|
||||
|
||||
// Test object
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
task2 "github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
|
@ -44,6 +45,7 @@ type LeaderObserver struct {
|
|||
cluster session.Cluster
|
||||
nodeMgr *session.NodeManager
|
||||
|
||||
scheduler task2.Scheduler
|
||||
dispatcher *taskDispatcher[int64]
|
||||
|
||||
stopOnce sync.Once
|
||||
|
@ -126,7 +128,23 @@ func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64
|
|||
|
||||
actions := o.findNeedLoadedSegments(leaderView, dists)
|
||||
actions = append(actions, o.findNeedRemovedSegments(leaderView, dists)...)
|
||||
o.sync(ctx, replica.GetID(), leaderView, actions)
|
||||
// Try to add a sync task to scheduler and block concurrent segment tasks to avoid inconsistent state
|
||||
executableActions := make([]*querypb.SyncAction, 0)
|
||||
for _, action := range actions {
|
||||
segmentID := action.SegmentID
|
||||
replicaID := replica.ID
|
||||
if ok := o.scheduler.Sync(segmentID, replicaID); !ok {
|
||||
continue
|
||||
}
|
||||
defer func() {
|
||||
o.scheduler.RemoveSync(segmentID, replicaID)
|
||||
}()
|
||||
executableActions = append(executableActions, action)
|
||||
}
|
||||
if len(executableActions) == 0 {
|
||||
continue
|
||||
}
|
||||
o.sync(ctx, replica.GetID(), leaderView, executableActions)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -273,14 +291,16 @@ func NewLeaderObserver(
|
|||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
nodeMgr *session.NodeManager,
|
||||
scheduler task2.Scheduler,
|
||||
) *LeaderObserver {
|
||||
ob := &LeaderObserver{
|
||||
dist: dist,
|
||||
meta: meta,
|
||||
target: targetMgr,
|
||||
broker: broker,
|
||||
cluster: cluster,
|
||||
nodeMgr: nodeMgr,
|
||||
dist: dist,
|
||||
meta: meta,
|
||||
target: targetMgr,
|
||||
broker: broker,
|
||||
cluster: cluster,
|
||||
nodeMgr: nodeMgr,
|
||||
scheduler: scheduler,
|
||||
}
|
||||
|
||||
dispatcher := newTaskDispatcher[int64](ob.observeCollection)
|
||||
|
|
|
@ -37,6 +37,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
task2 "github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
|
@ -48,8 +49,9 @@ type LeaderObserverTestSuite struct {
|
|||
kv kv.MetaKv
|
||||
mockCluster *session.MockCluster
|
||||
|
||||
meta *meta.Meta
|
||||
broker *meta.MockBroker
|
||||
meta *meta.Meta
|
||||
broker *meta.MockBroker
|
||||
scheduler *task2.MockScheduler
|
||||
}
|
||||
|
||||
func (suite *LeaderObserverTestSuite) SetupSuite() {
|
||||
|
@ -83,7 +85,10 @@ func (suite *LeaderObserverTestSuite) SetupTest() {
|
|||
// }, nil).Maybe()
|
||||
distManager := meta.NewDistributionManager()
|
||||
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
|
||||
suite.observer = NewLeaderObserver(distManager, suite.meta, targetManager, suite.broker, suite.mockCluster, nodeMgr)
|
||||
suite.scheduler = task2.NewMockScheduler(suite.T())
|
||||
suite.scheduler.EXPECT().Sync(mock.Anything, mock.Anything).Return(true).Maybe()
|
||||
suite.scheduler.EXPECT().RemoveSync(mock.Anything, mock.Anything).Maybe()
|
||||
suite.observer = NewLeaderObserver(distManager, suite.meta, targetManager, suite.broker, suite.mockCluster, nodeMgr, suite.scheduler)
|
||||
}
|
||||
|
||||
func (suite *LeaderObserverTestSuite) TearDownTest() {
|
||||
|
|
|
@ -373,6 +373,7 @@ func (s *Server) initObserver() {
|
|||
s.broker,
|
||||
s.cluster,
|
||||
s.nodeMgr,
|
||||
s.taskScheduler,
|
||||
)
|
||||
s.targetObserver = observers.NewTargetObserver(
|
||||
s.meta,
|
||||
|
|
|
@ -357,6 +357,40 @@ func (_c *MockScheduler_RemoveExecutor_Call) RunAndReturn(run func(int64)) *Mock
|
|||
return _c
|
||||
}
|
||||
|
||||
// RemoveSync provides a mock function with given fields: segmentID, replicaID
|
||||
func (_m *MockScheduler) RemoveSync(segmentID int64, replicaID int64) {
|
||||
_m.Called(segmentID, replicaID)
|
||||
}
|
||||
|
||||
// MockScheduler_RemoveSync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveSync'
|
||||
type MockScheduler_RemoveSync_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// RemoveSync is a helper method to define mock.On call
|
||||
// - segmentID int64
|
||||
// - replicaID int64
|
||||
func (_e *MockScheduler_Expecter) RemoveSync(segmentID interface{}, replicaID interface{}) *MockScheduler_RemoveSync_Call {
|
||||
return &MockScheduler_RemoveSync_Call{Call: _e.mock.On("RemoveSync", segmentID, replicaID)}
|
||||
}
|
||||
|
||||
func (_c *MockScheduler_RemoveSync_Call) Run(run func(segmentID int64, replicaID int64)) *MockScheduler_RemoveSync_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(int64), args[1].(int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockScheduler_RemoveSync_Call) Return() *MockScheduler_RemoveSync_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockScheduler_RemoveSync_Call) RunAndReturn(run func(int64, int64)) *MockScheduler_RemoveSync_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Start provides a mock function with given fields:
|
||||
func (_m *MockScheduler) Start() {
|
||||
_m.Called()
|
||||
|
@ -421,6 +455,49 @@ func (_c *MockScheduler_Stop_Call) RunAndReturn(run func()) *MockScheduler_Stop_
|
|||
return _c
|
||||
}
|
||||
|
||||
// Sync provides a mock function with given fields: segmentID, replicaID
|
||||
func (_m *MockScheduler) Sync(segmentID int64, replicaID int64) bool {
|
||||
ret := _m.Called(segmentID, replicaID)
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func(int64, int64) bool); ok {
|
||||
r0 = rf(segmentID, replicaID)
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// MockScheduler_Sync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Sync'
|
||||
type MockScheduler_Sync_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// Sync is a helper method to define mock.On call
|
||||
// - segmentID int64
|
||||
// - replicaID int64
|
||||
func (_e *MockScheduler_Expecter) Sync(segmentID interface{}, replicaID interface{}) *MockScheduler_Sync_Call {
|
||||
return &MockScheduler_Sync_Call{Call: _e.mock.On("Sync", segmentID, replicaID)}
|
||||
}
|
||||
|
||||
func (_c *MockScheduler_Sync_Call) Run(run func(segmentID int64, replicaID int64)) *MockScheduler_Sync_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(int64), args[1].(int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockScheduler_Sync_Call) Return(_a0 bool) *MockScheduler_Sync_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockScheduler_Sync_Call) RunAndReturn(run func(int64, int64) bool) *MockScheduler_Sync_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// NewMockScheduler creates a new instance of MockScheduler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewMockScheduler(t interface {
|
||||
|
|
|
@ -137,6 +137,8 @@ type Scheduler interface {
|
|||
GetNodeChannelDelta(nodeID int64) int
|
||||
GetChannelTaskNum() int
|
||||
GetSegmentTaskNum() int
|
||||
Sync(segmentID, replicaID int64) bool
|
||||
RemoveSync(segmentID, replicaID int64)
|
||||
}
|
||||
|
||||
type taskScheduler struct {
|
||||
|
@ -157,6 +159,8 @@ type taskScheduler struct {
|
|||
channelTasks map[replicaChannelIndex]Task
|
||||
processQueue *taskQueue
|
||||
waitQueue *taskQueue
|
||||
|
||||
syncTasks map[replicaSegmentIndex]struct{}
|
||||
}
|
||||
|
||||
func NewScheduler(ctx context.Context,
|
||||
|
@ -188,6 +192,7 @@ func NewScheduler(ctx context.Context,
|
|||
channelTasks: make(map[replicaChannelIndex]Task),
|
||||
processQueue: newTaskQueue(),
|
||||
waitQueue: newTaskQueue(),
|
||||
syncTasks: make(map[replicaSegmentIndex]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -242,6 +247,35 @@ func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) {
|
|||
}
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) Sync(segmentID, replicaID int64) bool {
|
||||
scheduler.rwmutex.Lock()
|
||||
defer scheduler.rwmutex.Unlock()
|
||||
|
||||
t := replicaSegmentIndex{
|
||||
ReplicaID: replicaID,
|
||||
SegmentID: segmentID,
|
||||
IsGrowing: false,
|
||||
}
|
||||
if _, ok := scheduler.segmentTasks[t]; ok {
|
||||
return false
|
||||
}
|
||||
|
||||
scheduler.syncTasks[t] = struct{}{}
|
||||
return true
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) RemoveSync(segmentID, replicaID int64) {
|
||||
scheduler.rwmutex.Lock()
|
||||
defer scheduler.rwmutex.Unlock()
|
||||
|
||||
t := replicaSegmentIndex{
|
||||
ReplicaID: replicaID,
|
||||
SegmentID: segmentID,
|
||||
IsGrowing: false,
|
||||
}
|
||||
delete(scheduler.syncTasks, t)
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) Add(task Task) error {
|
||||
scheduler.rwmutex.Lock()
|
||||
defer scheduler.rwmutex.Unlock()
|
||||
|
@ -311,6 +345,9 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
|
|||
switch task := task.(type) {
|
||||
case *SegmentTask:
|
||||
index := NewReplicaSegmentIndex(task)
|
||||
if _, ok := scheduler.syncTasks[index]; ok {
|
||||
return merr.WrapErrServiceInternal("task with the same segment exists")
|
||||
}
|
||||
if old, ok := scheduler.segmentTasks[index]; ok {
|
||||
if task.Priority() > old.Priority() {
|
||||
log.Info("replace old task, the new one with higher priority",
|
||||
|
|
Loading…
Reference in New Issue