2023-10-30 18:30:16 +00:00
|
|
|
package syncmgr
|
|
|
|
|
|
|
|
import (
|
2023-11-23 09:26:24 +00:00
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
2023-10-30 18:30:16 +00:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
|
|
|
"github.com/milvus-io/milvus/pkg/util/lock"
|
|
|
|
)
|
|
|
|
|
2024-06-23 13:12:01 +00:00
|
|
|
//go:generate mockery --name=Task --structname=MockTask --output=./ --filename=mock_task.go --with-expecter --inpackage
|
2023-10-30 18:30:16 +00:00
|
|
|
type Task interface {
|
2023-11-23 09:26:24 +00:00
|
|
|
SegmentID() int64
|
|
|
|
Checkpoint() *msgpb.MsgPosition
|
|
|
|
StartPosition() *msgpb.MsgPosition
|
|
|
|
ChannelName() string
|
2023-10-30 18:30:16 +00:00
|
|
|
Run() error
|
2024-05-09 02:09:30 +00:00
|
|
|
HandleError(error)
|
2023-10-30 18:30:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type keyLockDispatcher[K comparable] struct {
|
|
|
|
keyLock *lock.KeyLock[K]
|
2024-05-09 02:09:30 +00:00
|
|
|
workerPool *conc.Pool[struct{}]
|
2023-10-30 18:30:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func newKeyLockDispatcher[K comparable](maxParallel int) *keyLockDispatcher[K] {
|
2023-12-15 01:58:43 +00:00
|
|
|
dispatcher := &keyLockDispatcher[K]{
|
2024-05-09 02:09:30 +00:00
|
|
|
workerPool: conc.NewPool[struct{}](maxParallel, conc.WithPreAlloc(false)),
|
2023-10-30 18:30:16 +00:00
|
|
|
keyLock: lock.NewKeyLock[K](),
|
|
|
|
}
|
2023-12-15 01:58:43 +00:00
|
|
|
return dispatcher
|
2023-10-30 18:30:16 +00:00
|
|
|
}
|
|
|
|
|
2024-05-09 02:09:30 +00:00
|
|
|
func (d *keyLockDispatcher[K]) Submit(key K, t Task, callbacks ...func(error) error) *conc.Future[struct{}] {
|
2023-10-30 18:30:16 +00:00
|
|
|
d.keyLock.Lock(key)
|
|
|
|
|
2024-05-09 02:09:30 +00:00
|
|
|
return d.workerPool.Submit(func() (struct{}, error) {
|
2023-11-15 07:24:18 +00:00
|
|
|
defer d.keyLock.Unlock(key)
|
2023-10-30 18:30:16 +00:00
|
|
|
err := t.Run()
|
2023-11-15 07:24:18 +00:00
|
|
|
|
|
|
|
for _, callback := range callbacks {
|
2024-05-09 02:09:30 +00:00
|
|
|
err = callback(err)
|
2023-11-15 07:24:18 +00:00
|
|
|
}
|
2023-12-22 06:20:43 +00:00
|
|
|
|
2024-05-09 02:09:30 +00:00
|
|
|
return struct{}{}, err
|
2023-10-30 18:30:16 +00:00
|
|
|
})
|
|
|
|
}
|