mirror of https://github.com/milvus-io/milvus.git
enhance: improve rootcoord task scheduling policy (#37352)
- issue: #30301 Signed-off-by: SimFG <bang.fu@zilliz.com>pull/37106/head
parent
c83b93946e
commit
f1dd55e0c0
2
go.sum
2
go.sum
|
@ -627,8 +627,6 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
|
|||
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34 h1:Fwxpg98128gfWRbQ1A3PMP9o2IfYZk7RSEy8rcoCWDA=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7 h1:HwAitQk+V59QdYUwwVVYHTujd4QZrebg2Cc2hmcjhAg=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
|
||||
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
|
||||
|
|
|
@ -43,3 +43,11 @@ func (t *alterAliasTask) Execute(ctx context.Context) error {
|
|||
// alter alias is atomic enough.
|
||||
return t.core.meta.AlterAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs())
|
||||
}
|
||||
|
||||
func (t *alterAliasTask) GetLockerKey() LockerKey {
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(t.Req.GetDbName(), false),
|
||||
NewCollectionLockerKey(t.Req.GetCollectionName(), true),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -124,3 +124,12 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error {
|
|||
|
||||
return redoTask.Execute(ctx)
|
||||
}
|
||||
|
||||
func (a *alterCollectionTask) GetLockerKey() LockerKey {
|
||||
collectionName := a.core.getRealCollectionName(a.ctx, a.Req.GetDbName(), a.Req.GetCollectionName())
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(a.Req.GetDbName(), false),
|
||||
NewCollectionLockerKey(collectionName, true),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -118,6 +118,13 @@ func (a *alterDatabaseTask) Execute(ctx context.Context) error {
|
|||
return redoTask.Execute(ctx)
|
||||
}
|
||||
|
||||
func (a *alterDatabaseTask) GetLockerKey() LockerKey {
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(a.Req.GetDbName(), true),
|
||||
)
|
||||
}
|
||||
|
||||
func MergeProperties(oldProps []*commonpb.KeyValuePair, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair {
|
||||
props := make(map[string]string)
|
||||
for _, prop := range oldProps {
|
||||
|
|
|
@ -39,3 +39,11 @@ func (t *createAliasTask) Execute(ctx context.Context) error {
|
|||
// create alias is atomic enough.
|
||||
return t.core.meta.CreateAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs())
|
||||
}
|
||||
|
||||
func (t *createAliasTask) GetLockerKey() LockerKey {
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(t.Req.GetDbName(), false),
|
||||
NewCollectionLockerKey(t.Req.GetCollectionName(), true),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -658,3 +658,10 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
|
|||
|
||||
return undoTask.Execute(ctx)
|
||||
}
|
||||
|
||||
func (t *createCollectionTask) GetLockerKey() LockerKey {
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(t.Req.GetDbName(), true),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -53,3 +53,7 @@ func (t *createDatabaseTask) Execute(ctx context.Context) error {
|
|||
db := model.NewDatabase(t.dbID, t.Req.GetDbName(), etcdpb.DatabaseState_DatabaseCreated, t.Req.GetProperties())
|
||||
return t.core.meta.CreateDatabase(ctx, db, t.GetTs())
|
||||
}
|
||||
|
||||
func (t *createDatabaseTask) GetLockerKey() LockerKey {
|
||||
return NewLockerKeyChain(NewClusterLockerKey(true))
|
||||
}
|
||||
|
|
|
@ -128,3 +128,12 @@ func (t *createPartitionTask) Execute(ctx context.Context) error {
|
|||
|
||||
return undoTask.Execute(ctx)
|
||||
}
|
||||
|
||||
func (t *createPartitionTask) GetLockerKey() LockerKey {
|
||||
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(t.Req.GetDbName(), false),
|
||||
NewCollectionLockerKey(collectionName, true),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -53,3 +53,12 @@ func (t *describeCollectionTask) Execute(ctx context.Context) (err error) {
|
|||
t.Rsp = convertModelToDesc(coll, aliases, db.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *describeCollectionTask) GetLockerKey() LockerKey {
|
||||
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(t.Req.GetDbName(), false),
|
||||
NewCollectionLockerKey(collectionName, false),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -55,3 +55,10 @@ func (t *describeDBTask) Execute(ctx context.Context) (err error) {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *describeDBTask) GetLockerKey() LockerKey {
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(t.Req.GetDbName(), false),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -43,3 +43,12 @@ func (t *dropAliasTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
return t.core.meta.DropAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.GetTs())
|
||||
}
|
||||
|
||||
func (t *dropAliasTask) GetLockerKey() LockerKey {
|
||||
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetAlias())
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(t.Req.GetDbName(), false),
|
||||
NewCollectionLockerKey(collectionName, true),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -119,3 +119,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
|
|||
|
||||
return redoTask.Execute(ctx)
|
||||
}
|
||||
|
||||
func (t *dropCollectionTask) GetLockerKey() LockerKey {
|
||||
return NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey(t.Req.GetDbName(), true))
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ func (t *dropDatabaseTask) Execute(ctx context.Context) error {
|
|||
databaseName: dbName,
|
||||
ts: ts,
|
||||
})
|
||||
redoTask.AddAsyncStep(&expireCacheStep{
|
||||
redoTask.AddSyncStep(&expireCacheStep{
|
||||
baseStep: baseStep{core: t.core},
|
||||
dbName: dbName,
|
||||
ts: ts,
|
||||
|
@ -60,3 +60,7 @@ func (t *dropDatabaseTask) Execute(ctx context.Context) error {
|
|||
})
|
||||
return redoTask.Execute(ctx)
|
||||
}
|
||||
|
||||
func (t *dropDatabaseTask) GetLockerKey() LockerKey {
|
||||
return NewLockerKeyChain(NewClusterLockerKey(true))
|
||||
}
|
||||
|
|
|
@ -111,3 +111,12 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
|
|||
|
||||
return redoTask.Execute(ctx)
|
||||
}
|
||||
|
||||
func (t *dropPartitionTask) GetLockerKey() LockerKey {
|
||||
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(t.Req.GetDbName(), false),
|
||||
NewCollectionLockerKey(collectionName, true),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -47,3 +47,10 @@ func (t *hasCollectionTask) Execute(ctx context.Context) error {
|
|||
t.Rsp.Value = err == nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *hasCollectionTask) GetLockerKey() LockerKey {
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(t.Req.GetDbName(), false),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -57,3 +57,12 @@ func (t *hasPartitionTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *hasPartitionTask) GetLockerKey() LockerKey {
|
||||
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(t.Req.GetDbName(), false),
|
||||
NewCollectionLockerKey(collectionName, false),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -124,3 +124,7 @@ func (t *listDatabaseTask) Execute(ctx context.Context) error {
|
|||
t.Resp.CreatedTimestamp = createdTimes
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *listDatabaseTask) GetLockerKey() LockerKey {
|
||||
return NewLockerKeyChain(NewClusterLockerKey(false))
|
||||
}
|
||||
|
|
|
@ -42,3 +42,9 @@ func (t *renameCollectionTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewDBName(), t.Req.GetNewName(), t.GetTs())
|
||||
}
|
||||
|
||||
func (t *renameCollectionTask) GetLockerKey() LockerKey {
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(true),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -1118,6 +1118,15 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
|
|||
return t.Rsp, nil
|
||||
}
|
||||
|
||||
// getRealCollectionName get origin collection name to avoid the alias name
|
||||
func (c *Core) getRealCollectionName(ctx context.Context, db, collection string) string {
|
||||
realName, err := c.meta.DescribeAlias(ctx, db, collection, 0)
|
||||
if err != nil {
|
||||
return collection
|
||||
}
|
||||
return realName
|
||||
}
|
||||
|
||||
func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*model.Collection, error) {
|
||||
ts := getTravelTs(in)
|
||||
if in.GetCollectionName() != "" {
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/internal/tso"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
)
|
||||
|
||||
type IScheduler interface {
|
||||
|
@ -48,21 +49,34 @@ type scheduler struct {
|
|||
|
||||
lock sync.Mutex
|
||||
|
||||
minDdlTs atomic.Uint64
|
||||
minDdlTs atomic.Uint64
|
||||
clusterLock *lock.KeyLock[string]
|
||||
databaseLock *lock.KeyLock[string]
|
||||
collectionLock *lock.KeyLock[string]
|
||||
lockMapping map[LockLevel]*lock.KeyLock[string]
|
||||
}
|
||||
|
||||
func newScheduler(ctx context.Context, idAllocator allocator.Interface, tsoAllocator tso.Allocator) *scheduler {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
// TODO
|
||||
n := 1024 * 10
|
||||
return &scheduler{
|
||||
ctx: ctx1,
|
||||
cancel: cancel,
|
||||
idAllocator: idAllocator,
|
||||
tsoAllocator: tsoAllocator,
|
||||
taskChan: make(chan task, n),
|
||||
minDdlTs: *atomic.NewUint64(0),
|
||||
s := &scheduler{
|
||||
ctx: ctx1,
|
||||
cancel: cancel,
|
||||
idAllocator: idAllocator,
|
||||
tsoAllocator: tsoAllocator,
|
||||
taskChan: make(chan task, n),
|
||||
minDdlTs: *atomic.NewUint64(0),
|
||||
clusterLock: lock.NewKeyLock[string](),
|
||||
databaseLock: lock.NewKeyLock[string](),
|
||||
collectionLock: lock.NewKeyLock[string](),
|
||||
}
|
||||
s.lockMapping = map[LockLevel]*lock.KeyLock[string]{
|
||||
ClusterLock: s.clusterLock,
|
||||
DatabaseLock: s.databaseLock,
|
||||
CollectionLock: s.collectionLock,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *scheduler) Start() {
|
||||
|
@ -147,6 +161,13 @@ func (s *scheduler) enqueue(task task) {
|
|||
}
|
||||
|
||||
func (s *scheduler) AddTask(task task) error {
|
||||
if Params.RootCoordCfg.UseLockScheduler.GetAsBool() {
|
||||
lockKey := task.GetLockerKey()
|
||||
if lockKey != nil {
|
||||
return s.executeTaskWithLock(task, lockKey)
|
||||
}
|
||||
}
|
||||
|
||||
// make sure that setting ts and enqueue is atomic.
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
@ -168,3 +189,25 @@ func (s *scheduler) GetMinDdlTs() Timestamp {
|
|||
func (s *scheduler) setMinDdlTs(ts Timestamp) {
|
||||
s.minDdlTs.Store(ts)
|
||||
}
|
||||
|
||||
func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error {
|
||||
if lockerKey == nil {
|
||||
if err := s.setID(task); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.setTs(task); err != nil {
|
||||
return err
|
||||
}
|
||||
s.execute(task)
|
||||
return nil
|
||||
}
|
||||
taskLock := s.lockMapping[lockerKey.Level()]
|
||||
if lockerKey.IsWLock() {
|
||||
taskLock.Lock(lockerKey.LockKey())
|
||||
defer taskLock.Unlock(lockerKey.LockKey())
|
||||
} else {
|
||||
taskLock.RLock(lockerKey.LockKey())
|
||||
defer taskLock.RUnlock(lockerKey.LockKey())
|
||||
}
|
||||
return s.executeTaskWithLock(task, lockerKey.Next())
|
||||
}
|
||||
|
|
|
@ -20,13 +20,19 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
mocktso "github.com/milvus-io/milvus/internal/tso/mocks"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
|
@ -242,3 +248,90 @@ func Test_scheduler_updateDdlMinTsLoop(t *testing.T) {
|
|||
s.Stop()
|
||||
})
|
||||
}
|
||||
|
||||
type WithLockKeyTask struct {
|
||||
baseTask
|
||||
lockKey LockerKey
|
||||
workDuration time.Duration
|
||||
newTime time.Time
|
||||
name string
|
||||
}
|
||||
|
||||
func NewWithLockKeyTask(lockKey LockerKey, duration time.Duration, name string) *WithLockKeyTask {
|
||||
task := &WithLockKeyTask{
|
||||
baseTask: newBaseTask(context.Background(), nil),
|
||||
lockKey: lockKey,
|
||||
workDuration: duration,
|
||||
newTime: time.Now(),
|
||||
name: name,
|
||||
}
|
||||
return task
|
||||
}
|
||||
|
||||
func (t *WithLockKeyTask) GetLockerKey() LockerKey {
|
||||
return t.lockKey
|
||||
}
|
||||
|
||||
func (t *WithLockKeyTask) Execute(ctx context.Context) error {
|
||||
log.Info("execute task", zap.String("name", t.name), zap.Duration("duration", time.Since(t.newTime)))
|
||||
time.Sleep(t.workDuration)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestExecuteTaskWithLock(t *testing.T) {
|
||||
paramtable.Init()
|
||||
Params.Save(Params.RootCoordCfg.UseLockScheduler.Key, "true")
|
||||
defer Params.Reset(Params.RootCoordCfg.UseLockScheduler.Key)
|
||||
idMock := allocator.NewMockAllocator(t)
|
||||
tsMock := mocktso.NewAllocator(t)
|
||||
idMock.EXPECT().AllocOne().Return(1000, nil)
|
||||
tsMock.EXPECT().GenerateTSO(mock.Anything).Return(10000, nil)
|
||||
s := newScheduler(context.Background(), idMock, tsMock)
|
||||
w := &sync.WaitGroup{}
|
||||
w.Add(4)
|
||||
{
|
||||
go func() {
|
||||
defer w.Done()
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
lockKey := NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey("test", false))
|
||||
t1 := NewWithLockKeyTask(lockKey, time.Second*2, "t1-1")
|
||||
err := s.AddTask(t1)
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
}
|
||||
{
|
||||
go func() {
|
||||
defer w.Done()
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
lockKey := NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey("test", false))
|
||||
t1 := NewWithLockKeyTask(lockKey, time.Second*3, "t1-2")
|
||||
err := s.AddTask(t1)
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
}
|
||||
{
|
||||
go func() {
|
||||
defer w.Done()
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
lockKey := NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey("test", true))
|
||||
t2 := NewWithLockKeyTask(lockKey, time.Second*2, "t2")
|
||||
err := s.AddTask(t2)
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
}
|
||||
{
|
||||
go func() {
|
||||
defer w.Done()
|
||||
lockKey := NewLockerKeyChain(NewClusterLockerKey(true))
|
||||
t3 := NewWithLockKeyTask(lockKey, time.Second, "t3")
|
||||
err := s.AddTask(t3)
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
w.Wait()
|
||||
delta := time.Since(startTime)
|
||||
assert.True(t, delta > 6*time.Second)
|
||||
assert.True(t, delta < 8*time.Second)
|
||||
}
|
||||
|
|
|
@ -147,3 +147,7 @@ func (t *showCollectionTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *showCollectionTask) GetLockerKey() LockerKey {
|
||||
return NewLockerKeyChain(NewClusterLockerKey(false), NewDatabaseLockerKey(t.Req.GetDbName(), false))
|
||||
}
|
||||
|
|
|
@ -67,3 +67,12 @@ func (t *showPartitionTask) Execute(ctx context.Context) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *showPartitionTask) GetLockerKey() LockerKey {
|
||||
collectionName := t.core.getRealCollectionName(t.ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
|
||||
return NewLockerKeyChain(
|
||||
NewClusterLockerKey(false),
|
||||
NewDatabaseLockerKey(t.Req.GetDbName(), false),
|
||||
NewCollectionLockerKey(collectionName, false),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -20,9 +20,27 @@ import (
|
|||
"context"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
)
|
||||
|
||||
type LockLevel int
|
||||
|
||||
const (
|
||||
ClusterLock LockLevel = iota
|
||||
DatabaseLock
|
||||
CollectionLock
|
||||
)
|
||||
|
||||
type LockerKey interface {
|
||||
LockKey() string
|
||||
Level() LockLevel
|
||||
IsWLock() bool
|
||||
Next() LockerKey
|
||||
}
|
||||
|
||||
type task interface {
|
||||
GetCtx() context.Context
|
||||
SetCtx(context.Context)
|
||||
|
@ -35,6 +53,7 @@ type task interface {
|
|||
WaitToFinish() error
|
||||
NotifyDone(err error)
|
||||
SetInQueueDuration()
|
||||
GetLockerKey() LockerKey
|
||||
}
|
||||
|
||||
type baseTask struct {
|
||||
|
@ -101,3 +120,74 @@ func (b *baseTask) NotifyDone(err error) {
|
|||
func (b *baseTask) SetInQueueDuration() {
|
||||
b.queueDur = b.tr.ElapseSpan()
|
||||
}
|
||||
|
||||
func (b *baseTask) GetLockerKey() LockerKey {
|
||||
return nil
|
||||
}
|
||||
|
||||
type taskLockerKey struct {
|
||||
key string
|
||||
rw bool
|
||||
level LockLevel
|
||||
next LockerKey
|
||||
}
|
||||
|
||||
func (t *taskLockerKey) LockKey() string {
|
||||
return t.key
|
||||
}
|
||||
|
||||
func (t *taskLockerKey) Level() LockLevel {
|
||||
return t.level
|
||||
}
|
||||
|
||||
func (t *taskLockerKey) IsWLock() bool {
|
||||
return t.rw
|
||||
}
|
||||
|
||||
func (t *taskLockerKey) Next() LockerKey {
|
||||
return t.next
|
||||
}
|
||||
|
||||
func NewClusterLockerKey(rw bool) LockerKey {
|
||||
return &taskLockerKey{
|
||||
key: "$",
|
||||
rw: rw,
|
||||
level: ClusterLock,
|
||||
}
|
||||
}
|
||||
|
||||
func NewDatabaseLockerKey(db string, rw bool) LockerKey {
|
||||
return &taskLockerKey{
|
||||
key: db,
|
||||
rw: rw,
|
||||
level: DatabaseLock,
|
||||
}
|
||||
}
|
||||
|
||||
func NewCollectionLockerKey(collection string, rw bool) LockerKey {
|
||||
return &taskLockerKey{
|
||||
key: collection,
|
||||
rw: rw,
|
||||
level: CollectionLock,
|
||||
}
|
||||
}
|
||||
|
||||
func NewLockerKeyChain(lockerKeys ...LockerKey) LockerKey {
|
||||
log.Info("NewLockerKeyChain", zap.Any("lockerKeys", len(lockerKeys)))
|
||||
if len(lockerKeys) == 0 {
|
||||
return nil
|
||||
}
|
||||
if lockerKeys[0] == nil || lockerKeys[0].Level() != ClusterLock {
|
||||
log.Warn("Invalid locker key chain", zap.Stack("stack"))
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < len(lockerKeys)-1; i++ {
|
||||
if lockerKeys[i] == nil || lockerKeys[i].Level() >= lockerKeys[i+1].Level() {
|
||||
log.Warn("Invalid locker key chain", zap.Stack("stack"))
|
||||
return nil
|
||||
}
|
||||
lockerKeys[i].(*taskLockerKey).next = lockerKeys[i+1]
|
||||
}
|
||||
return lockerKeys[0]
|
||||
}
|
||||
|
|
|
@ -0,0 +1,341 @@
|
|||
/*
|
||||
* Licensed to the LF AI & Data foundation under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package rootcoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
|
||||
)
|
||||
|
||||
func TestLockerKey(t *testing.T) {
|
||||
clusterLock := NewClusterLockerKey(true)
|
||||
assert.Equal(t, clusterLock.IsWLock(), true)
|
||||
assert.Equal(t, clusterLock.Level(), ClusterLock)
|
||||
assert.Equal(t, clusterLock.LockKey(), "$")
|
||||
|
||||
dbLock := NewDatabaseLockerKey("foo", true)
|
||||
assert.Equal(t, dbLock.IsWLock(), true)
|
||||
assert.Equal(t, dbLock.Level(), DatabaseLock)
|
||||
assert.Equal(t, dbLock.LockKey(), "foo")
|
||||
|
||||
collectionLock := NewCollectionLockerKey("foo", true)
|
||||
assert.Equal(t, collectionLock.IsWLock(), true)
|
||||
assert.Equal(t, collectionLock.Level(), CollectionLock)
|
||||
assert.Equal(t, collectionLock.LockKey(), "foo")
|
||||
|
||||
{
|
||||
lockerChain := NewLockerKeyChain(nil)
|
||||
assert.Nil(t, lockerChain)
|
||||
}
|
||||
|
||||
{
|
||||
lockerChain := NewLockerKeyChain(dbLock)
|
||||
assert.Nil(t, lockerChain)
|
||||
}
|
||||
|
||||
{
|
||||
lockerChain := NewLockerKeyChain(clusterLock, collectionLock, dbLock)
|
||||
assert.Nil(t, lockerChain)
|
||||
}
|
||||
|
||||
{
|
||||
lockerChain := NewLockerKeyChain(clusterLock, dbLock, collectionLock)
|
||||
assert.NotNil(t, lockerChain)
|
||||
assert.Equal(t, lockerChain.Next(), dbLock)
|
||||
assert.Equal(t, lockerChain.Next().Next(), collectionLock)
|
||||
}
|
||||
}
|
||||
|
||||
func GetLockerKeyString(k LockerKey) string {
|
||||
key := k.LockKey()
|
||||
level := k.Level()
|
||||
wLock := k.IsWLock()
|
||||
if k.Next() == nil {
|
||||
return fmt.Sprintf("%s-%d-%t", key, level, wLock)
|
||||
}
|
||||
return fmt.Sprintf("%s-%d-%t|%s", key, level, wLock, GetLockerKeyString(k.Next()))
|
||||
}
|
||||
|
||||
func TestGetLockerKey(t *testing.T) {
|
||||
t.Run("alter alias task locker key", func(t *testing.T) {
|
||||
tt := &alterAliasTask{
|
||||
Req: &milvuspb.AlterAliasRequest{
|
||||
DbName: "foo",
|
||||
CollectionName: "bar",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-true")
|
||||
})
|
||||
t.Run("alter collection task locker key", func(t *testing.T) {
|
||||
metaMock := mockrootcoord.NewIMetaTable(t)
|
||||
metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) {
|
||||
return s2, nil
|
||||
})
|
||||
c := &Core{
|
||||
meta: metaMock,
|
||||
}
|
||||
tt := &alterCollectionTask{
|
||||
baseTask: baseTask{
|
||||
core: c,
|
||||
},
|
||||
Req: &milvuspb.AlterCollectionRequest{
|
||||
DbName: "foo",
|
||||
CollectionName: "bar",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-true")
|
||||
})
|
||||
t.Run("alter database task locker key", func(t *testing.T) {
|
||||
tt := &alterDatabaseTask{
|
||||
Req: &rootcoordpb.AlterDatabaseRequest{
|
||||
DbName: "foo",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-true")
|
||||
})
|
||||
t.Run("create alias task locker key", func(t *testing.T) {
|
||||
metaMock := mockrootcoord.NewIMetaTable(t)
|
||||
c := &Core{
|
||||
meta: metaMock,
|
||||
}
|
||||
tt := &createAliasTask{
|
||||
baseTask: baseTask{
|
||||
core: c,
|
||||
},
|
||||
Req: &milvuspb.CreateAliasRequest{
|
||||
DbName: "foo",
|
||||
CollectionName: "bar",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-true")
|
||||
})
|
||||
t.Run("create collection task locker key", func(t *testing.T) {
|
||||
tt := &createCollectionTask{
|
||||
Req: &milvuspb.CreateCollectionRequest{
|
||||
DbName: "foo",
|
||||
CollectionName: "bar",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-true")
|
||||
})
|
||||
t.Run("create database task locker key", func(t *testing.T) {
|
||||
tt := &createDatabaseTask{
|
||||
Req: &milvuspb.CreateDatabaseRequest{
|
||||
DbName: "foo",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-true")
|
||||
})
|
||||
t.Run("create partition task locker key", func(t *testing.T) {
|
||||
metaMock := mockrootcoord.NewIMetaTable(t)
|
||||
metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) {
|
||||
return "real" + s2, nil
|
||||
})
|
||||
c := &Core{
|
||||
meta: metaMock,
|
||||
}
|
||||
tt := &createPartitionTask{
|
||||
baseTask: baseTask{core: c},
|
||||
Req: &milvuspb.CreatePartitionRequest{
|
||||
DbName: "foo",
|
||||
CollectionName: "bar",
|
||||
PartitionName: "baz",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|realbar-2-true")
|
||||
})
|
||||
t.Run("describe collection task locker key", func(t *testing.T) {
|
||||
metaMock := mockrootcoord.NewIMetaTable(t)
|
||||
metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) {
|
||||
return "", errors.New("not found")
|
||||
})
|
||||
c := &Core{
|
||||
meta: metaMock,
|
||||
}
|
||||
tt := &describeCollectionTask{
|
||||
baseTask: baseTask{core: c},
|
||||
Req: &milvuspb.DescribeCollectionRequest{
|
||||
DbName: "foo",
|
||||
CollectionName: "bar",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-false")
|
||||
})
|
||||
t.Run("describe database task locker key", func(t *testing.T) {
|
||||
tt := &describeDBTask{
|
||||
Req: &rootcoordpb.DescribeDatabaseRequest{
|
||||
DbName: "foo",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false")
|
||||
})
|
||||
t.Run("drop alias task locker key", func(t *testing.T) {
|
||||
metaMock := mockrootcoord.NewIMetaTable(t)
|
||||
metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) {
|
||||
return "real" + s2, nil
|
||||
})
|
||||
c := &Core{
|
||||
meta: metaMock,
|
||||
}
|
||||
tt := &dropAliasTask{
|
||||
baseTask: baseTask{core: c},
|
||||
Req: &milvuspb.DropAliasRequest{
|
||||
DbName: "foo",
|
||||
Alias: "bar",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|realbar-2-true")
|
||||
})
|
||||
t.Run("drop collection task locker key", func(t *testing.T) {
|
||||
tt := &dropCollectionTask{
|
||||
Req: &milvuspb.DropCollectionRequest{
|
||||
DbName: "foo",
|
||||
CollectionName: "bar",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-true")
|
||||
})
|
||||
t.Run("drop database task locker key", func(t *testing.T) {
|
||||
tt := &dropDatabaseTask{
|
||||
Req: &milvuspb.DropDatabaseRequest{
|
||||
DbName: "foo",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-true")
|
||||
})
|
||||
t.Run("drop partition task locker key", func(t *testing.T) {
|
||||
metaMock := mockrootcoord.NewIMetaTable(t)
|
||||
metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) {
|
||||
return "real" + s2, nil
|
||||
})
|
||||
c := &Core{
|
||||
meta: metaMock,
|
||||
}
|
||||
tt := &dropPartitionTask{
|
||||
baseTask: baseTask{core: c},
|
||||
Req: &milvuspb.DropPartitionRequest{
|
||||
DbName: "foo",
|
||||
CollectionName: "bar",
|
||||
PartitionName: "baz",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|realbar-2-true")
|
||||
})
|
||||
t.Run("has collection task locker key", func(t *testing.T) {
|
||||
tt := &hasCollectionTask{
|
||||
Req: &milvuspb.HasCollectionRequest{
|
||||
DbName: "foo",
|
||||
CollectionName: "bar",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false")
|
||||
})
|
||||
t.Run("has partition task locker key", func(t *testing.T) {
|
||||
metaMock := mockrootcoord.NewIMetaTable(t)
|
||||
metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) {
|
||||
return "real" + s2, nil
|
||||
})
|
||||
c := &Core{
|
||||
meta: metaMock,
|
||||
}
|
||||
tt := &hasPartitionTask{
|
||||
baseTask: baseTask{core: c},
|
||||
Req: &milvuspb.HasPartitionRequest{
|
||||
DbName: "foo",
|
||||
CollectionName: "bar",
|
||||
PartitionName: "baz",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|realbar-2-false")
|
||||
})
|
||||
t.Run("list db task locker key", func(t *testing.T) {
|
||||
tt := &listDatabaseTask{}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false")
|
||||
})
|
||||
t.Run("rename collection task locker key", func(t *testing.T) {
|
||||
tt := &renameCollectionTask{
|
||||
Req: &milvuspb.RenameCollectionRequest{
|
||||
DbName: "foo",
|
||||
OldName: "bar",
|
||||
NewName: "baz",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-true")
|
||||
})
|
||||
t.Run("show collection task locker key", func(t *testing.T) {
|
||||
tt := &showCollectionTask{
|
||||
Req: &milvuspb.ShowCollectionsRequest{
|
||||
DbName: "foo",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false")
|
||||
})
|
||||
t.Run("show partition task locker key", func(t *testing.T) {
|
||||
metaMock := mockrootcoord.NewIMetaTable(t)
|
||||
metaMock.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (string, error) {
|
||||
return "real" + s2, nil
|
||||
})
|
||||
c := &Core{
|
||||
meta: metaMock,
|
||||
}
|
||||
tt := &showPartitionTask{
|
||||
baseTask: baseTask{core: c},
|
||||
Req: &milvuspb.ShowPartitionsRequest{
|
||||
DbName: "foo",
|
||||
CollectionName: "bar",
|
||||
},
|
||||
}
|
||||
key := tt.GetLockerKey()
|
||||
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|realbar-2-false")
|
||||
})
|
||||
}
|
|
@ -1117,6 +1117,7 @@ type rootCoordConfig struct {
|
|||
MaxDatabaseNum ParamItem `refreshable:"false"`
|
||||
MaxGeneralCapacity ParamItem `refreshable:"true"`
|
||||
GracefulStopTimeout ParamItem `refreshable:"true"`
|
||||
UseLockScheduler ParamItem `refreshable:"true"`
|
||||
}
|
||||
|
||||
func (p *rootCoordConfig) init(base *BaseTable) {
|
||||
|
@ -1191,6 +1192,15 @@ Segments with smaller size than this parameter will not be indexed, and will be
|
|||
Export: true,
|
||||
}
|
||||
p.GracefulStopTimeout.Init(base.mgr)
|
||||
|
||||
p.UseLockScheduler = ParamItem{
|
||||
Key: "rootCoord.useLockScheduler",
|
||||
Version: "2.4.15",
|
||||
DefaultValue: "false",
|
||||
Doc: "use lock to schedule the task",
|
||||
Export: false,
|
||||
}
|
||||
p.UseLockScheduler.Init(base.mgr)
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////
|
||||
|
|
Loading…
Reference in New Issue