fix: use the object heap to keep the min ddl ts order (#39118)

issue: #39002

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/39165/head
SimFG 2025-01-10 18:16:58 +08:00 committed by GitHub
parent 826b726c86
commit 357eaf0d71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 256 additions and 11 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type IScheduler interface {
@ -46,6 +47,7 @@ type scheduler struct {
tsoAllocator tso.Allocator
taskChan chan task
taskHeap typeutil.Heap[task]
lock sync.Mutex
@ -56,16 +58,22 @@ type scheduler struct {
lockMapping map[LockLevel]*lock.KeyLock[string]
}
func GetTaskHeapOrder(t task) Timestamp {
return t.GetTs()
}
func newScheduler(ctx context.Context, idAllocator allocator.Interface, tsoAllocator tso.Allocator) *scheduler {
ctx1, cancel := context.WithCancel(ctx)
// TODO
n := 1024 * 10
taskArr := make([]task, 0)
s := &scheduler{
ctx: ctx1,
cancel: cancel,
idAllocator: idAllocator,
tsoAllocator: tsoAllocator,
taskChan: make(chan task, n),
taskHeap: typeutil.NewObjectArrayBasedMinimumHeap[task, Timestamp](taskArr, GetTaskHeapOrder),
minDdlTs: *atomic.NewUint64(0),
clusterLock: lock.NewKeyLock[string](),
databaseLock: lock.NewKeyLock[string](),
@ -93,7 +101,7 @@ func (s *scheduler) Stop() {
}
func (s *scheduler) execute(task task) {
defer s.setMinDdlTs(task.GetTs()) // we should update ts, whatever task succeeds or not.
defer s.setMinDdlTs() // we should update ts, whatever task succeeds or not.
task.SetInQueueDuration()
if err := task.Prepare(task.GetCtx()); err != nil {
task.NotifyDone(err)
@ -153,6 +161,7 @@ func (s *scheduler) setTs(task task) error {
return err
}
task.SetTs(ts)
s.taskHeap.Push(task)
return nil
}
@ -186,8 +195,14 @@ func (s *scheduler) GetMinDdlTs() Timestamp {
return s.minDdlTs.Load()
}
func (s *scheduler) setMinDdlTs(ts Timestamp) {
s.minDdlTs.Store(ts)
func (s *scheduler) setMinDdlTs() {
s.lock.Lock()
defer s.lock.Unlock()
for s.taskHeap.Len() > 0 && s.taskHeap.Peek().IsFinished() {
t := s.taskHeap.Pop()
s.minDdlTs.Store(t.GetTs())
}
}
func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error {
@ -195,9 +210,12 @@ func (s *scheduler) executeTaskWithLock(task task, lockerKey LockerKey) error {
if err := s.setID(task); err != nil {
return err
}
s.lock.Lock()
if err := s.setTs(task); err != nil {
s.lock.Unlock()
return err
}
s.lock.Unlock()
s.execute(task)
return nil
}

View File

@ -82,6 +82,29 @@ func newMockNormalTask() *mockNormalTask {
return task
}
type mockLockerKeyTask struct {
baseTask
lockerKey string
rw bool
}
func (m *mockLockerKeyTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(m.lockerKey, m.rw),
)
}
func newMockLockerKeyTask(lockerKey string, rw bool) *mockLockerKeyTask {
task := &mockLockerKeyTask{
baseTask: newBaseTask(context.Background(), nil),
lockerKey: lockerKey,
rw: rw,
}
task.SetCtx(context.Background())
return task
}
func Test_scheduler_Start_Stop(t *testing.T) {
idAlloc := newMockIDAllocator()
tsoAlloc := newMockTsoAllocator()
@ -247,6 +270,87 @@ func Test_scheduler_updateDdlMinTsLoop(t *testing.T) {
assert.Zero(t, s.GetMinDdlTs())
s.Stop()
})
t.Run("concurrent task schedule", func(t *testing.T) {
idAlloc := newMockIDAllocator()
tsoAlloc := newMockTsoAllocator()
tso := atomic.NewUint64(100)
idAlloc.AllocOneF = func() (UniqueID, error) {
return 100, nil
}
tsoAlloc.GenerateTSOF = func(count uint32) (uint64, error) {
got := tso.Inc()
return got, nil
}
ctx := context.Background()
s := newScheduler(ctx, idAlloc, tsoAlloc)
paramtable.Init()
paramtable.Get().Save(Params.ProxyCfg.TimeTickInterval.Key, "1")
s.Start()
for i := 0; i < 100; i++ {
if s.GetMinDdlTs() > Timestamp(100) {
break
}
assert.True(t, i < 100)
time.Sleep(time.Millisecond)
}
w := &sync.WaitGroup{}
w.Add(5)
// locker key rw true
lockerKey := "hello"
go func() {
defer w.Done()
n := 200
for i := 0; i < n; i++ {
task := newMockLockerKeyTask(lockerKey, true)
err := s.AddTask(task)
assert.NoError(t, err)
}
}()
// locker key rw false
go func() {
defer w.Done()
n := 200
for i := 0; i < n; i++ {
task := newMockLockerKeyTask(lockerKey, false)
err := s.AddTask(task)
assert.NoError(t, err)
}
}()
go func() {
defer w.Done()
n := 200
for i := 0; i < n; i++ {
task := newMockLockerKeyTask(lockerKey, false)
err := s.AddTask(task)
assert.NoError(t, err)
}
}()
go func() {
defer w.Done()
n := 200
for i := 0; i < n; i++ {
task := newMockNormalTask()
err := s.AddTask(task)
assert.NoError(t, err)
}
}()
lastMin := s.GetMinDdlTs()
go func() {
defer w.Done()
current := s.GetMinDdlTs()
assert.True(t, current >= lastMin)
lastMin = current
time.Sleep(time.Millisecond * 100)
}()
w.Wait()
})
}
type WithLockKeyTask struct {

View File

@ -21,6 +21,7 @@ import (
"fmt"
"time"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
@ -53,16 +54,18 @@ type task interface {
Execute(ctx context.Context) error
WaitToFinish() error
NotifyDone(err error)
IsFinished() bool
SetInQueueDuration()
GetLockerKey() LockerKey
}
type baseTask struct {
ctx context.Context
core *Core
done chan error
ts Timestamp
id UniqueID
ctx context.Context
core *Core
done chan error
isFinished *atomic.Bool
ts Timestamp
id UniqueID
tr *timerecord.TimeRecorder
queueDur time.Duration
@ -70,9 +73,10 @@ type baseTask struct {
func newBaseTask(ctx context.Context, core *Core) baseTask {
b := baseTask{
core: core,
done: make(chan error, 1),
tr: timerecord.NewTimeRecorderWithTrace(ctx, "new task"),
core: core,
done: make(chan error, 1),
tr: timerecord.NewTimeRecorderWithTrace(ctx, "new task"),
isFinished: atomic.NewBool(false),
}
b.SetCtx(ctx)
return b
@ -116,12 +120,17 @@ func (b *baseTask) WaitToFinish() error {
func (b *baseTask) NotifyDone(err error) {
b.done <- err
b.isFinished.Store(true)
}
func (b *baseTask) SetInQueueDuration() {
b.queueDur = b.tr.ElapseSpan()
}
func (b *baseTask) IsFinished() bool {
return b.isFinished.Load()
}
func (b *baseTask) GetLockerKey() LockerKey {
return nil
}

View File

@ -69,6 +69,39 @@ func (h *heapArray[E]) Peek() interface{} {
return (*h)[0]
}
type objectHeapArray[O any, E constraints.Ordered] struct {
objects []O
getOrderFunc func(O) E
}
func (h *objectHeapArray[O, E]) Len() int {
return len(h.objects)
}
func (h *objectHeapArray[O, E]) Less(i, j int) bool {
return h.getOrderFunc(h.objects[i]) < h.getOrderFunc(h.objects[j])
}
func (h *objectHeapArray[O, E]) Swap(i, j int) {
h.objects[i], h.objects[j] = h.objects[j], h.objects[i]
}
func (h *objectHeapArray[O, E]) Push(x interface{}) {
h.objects = append(h.objects, x.(O))
}
func (h *objectHeapArray[O, E]) Pop() interface{} {
old := h.objects
n := len(old)
x := old[n-1]
h.objects = old[0 : n-1]
return x
}
func (h *objectHeapArray[O, E]) Peek() interface{} {
return h.objects[0]
}
// reverseOrderedInterface is a heap base interface that reverses the order of the elements.
type reverseOrderedInterface[E constraints.Ordered] struct {
HeapInterface
@ -107,6 +140,37 @@ func NewArrayBasedMinimumHeap[E constraints.Ordered](initial []E) Heap[E] {
}
}
func NewObjectArrayBasedMaximumHeap[O any, E constraints.Ordered](initial []O, getOrderFunc func(O) E) Heap[O] {
if initial == nil {
initial = make([]O, 0)
}
ha := &objectHeapArray[O, E]{
objects: initial,
getOrderFunc: getOrderFunc,
}
reverse := reverseOrderedInterface[E]{
HeapInterface: ha,
}
heap.Init(reverse)
return &heapImpl[O, reverseOrderedInterface[E]]{
inner: reverse,
}
}
func NewObjectArrayBasedMinimumHeap[O any, E constraints.Ordered](initial []O, getOrderFunc func(O) E) Heap[O] {
if initial == nil {
initial = make([]O, 0)
}
ha := &objectHeapArray[O, E]{
objects: initial,
getOrderFunc: getOrderFunc,
}
heap.Init(ha)
return &heapImpl[O, *objectHeapArray[O, E]]{
inner: ha,
}
}
// heapImpl is a min-heap of E.
type heapImpl[E any, H HeapInterface] struct {
inner H

View File

@ -39,3 +39,53 @@ func TestMaximumHeap(t *testing.T) {
assert.Equal(t, i, heap.Pop())
}
}
type FooHeapObject struct {
value int
}
func GetFooHeapObjectOrderFunc(obj *FooHeapObject) int {
return obj.value
}
func TestMinimumObjectHeap(t *testing.T) {
h := []*FooHeapObject{
{value: 4},
{value: 5},
{value: 2},
}
heap := NewObjectArrayBasedMinimumHeap(h, GetFooHeapObjectOrderFunc)
assert.Equal(t, 2, heap.Peek().value)
assert.Equal(t, 3, heap.Len())
heap.Push(&FooHeapObject{value: 3})
assert.Equal(t, 2, heap.Peek().value)
assert.Equal(t, 4, heap.Len())
heap.Push(&FooHeapObject{value: 1})
assert.Equal(t, 1, heap.Peek().value)
assert.Equal(t, 5, heap.Len())
for i := 1; i <= 5; i++ {
assert.Equal(t, i, heap.Peek().value)
assert.Equal(t, i, heap.Pop().value)
}
}
func TestMaximumObjectHeap(t *testing.T) {
h := []*FooHeapObject{
{value: 4},
{value: 1},
{value: 2},
}
heap := NewObjectArrayBasedMaximumHeap(h, GetFooHeapObjectOrderFunc)
assert.Equal(t, 4, heap.Peek().value)
assert.Equal(t, 3, heap.Len())
heap.Push(&FooHeapObject{value: 3})
assert.Equal(t, 4, heap.Peek().value)
assert.Equal(t, 4, heap.Len())
heap.Push(&FooHeapObject{value: 5})
assert.Equal(t, 5, heap.Peek().value)
assert.Equal(t, 5, heap.Len())
for i := 5; i >= 1; i-- {
assert.Equal(t, i, heap.Peek().value)
assert.Equal(t, i, heap.Pop().value)
}
}