update collection target after observer start (#27774)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/27812/head
wei liu 2023-10-19 21:52:10 +08:00 committed by GitHub
parent 9dd369dd99
commit 55e5f80e24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 19 deletions

View File

@ -41,6 +41,8 @@ type targetUpdateRequest struct {
ReadyNotifier chan struct{}
}
type initRequest struct{}
type TargetObserver struct {
cancel context.CancelFunc
wg sync.WaitGroup
@ -49,6 +51,7 @@ type TargetObserver struct {
distMgr *meta.DistributionManager
broker meta.Broker
initChan chan initRequest
manualCheck chan checkRequest
nextTargetLastUpdate map[int64]time.Time
updateChan chan targetUpdateRequest
@ -68,6 +71,7 @@ func NewTargetObserver(meta *meta.Meta, targetMgr *meta.TargetManager, distMgr *
nextTargetLastUpdate: make(map[int64]time.Time),
updateChan: make(chan targetUpdateRequest),
readyNotifiers: make(map[int64][]chan struct{}),
initChan: make(chan initRequest),
}
}
@ -77,6 +81,9 @@ func (ob *TargetObserver) Start() {
ob.wg.Add(1)
go ob.schedule(ctx)
// after target observer start, update target for all collection
ob.initChan <- initRequest{}
}
func (ob *TargetObserver) Stop() {
@ -100,9 +107,16 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
log.Info("Close target observer")
return
case <-ob.initChan:
for _, collectionID := range ob.meta.GetAll() {
ob.init(collectionID)
}
case <-ticker.C:
ob.clean()
ob.tryUpdateTarget()
for _, collectionID := range ob.meta.GetAll() {
ob.check(collectionID)
}
case req := <-ob.manualCheck:
ob.check(req.CollectionID)
@ -119,11 +133,6 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
}
req.Notifier <- err
// Manually trigger the observer,
// to avoid waiting for a long time (10s)
ob.clean()
ob.tryUpdateTarget()
}
}
}
@ -166,6 +175,18 @@ func (ob *TargetObserver) check(collectionID int64) {
}
}
func (ob *TargetObserver) init(collectionID int64) {
// pull next target first if not exist
if !ob.targetMgr.IsNextTargetExist(collectionID) {
ob.updateNextTarget(collectionID)
}
// try to update current target if all segment/channel are ready
if ob.shouldUpdateCurrentTarget(collectionID) {
ob.updateCurrentTarget(collectionID)
}
}
// UpdateNextTarget updates the next target,
// returns a channel which will be closed when the next target is ready,
// or returns error if failed to pull target
@ -191,28 +212,19 @@ func (ob *TargetObserver) ReleaseCollection(collectionID int64) {
delete(ob.readyNotifiers, collectionID)
}
func (ob *TargetObserver) tryUpdateTarget() {
collections := ob.meta.GetAll()
for _, collectionID := range collections {
ob.check(collectionID)
}
collectionSet := typeutil.NewUniqueSet(collections...)
func (ob *TargetObserver) clean() {
collectionSet := typeutil.NewUniqueSet(ob.meta.GetAll()...)
// for collection which has been removed from target, try to clear nextTargetLastUpdate
for collection := range ob.nextTargetLastUpdate {
if !collectionSet.Contain(collection) {
delete(ob.nextTargetLastUpdate, collection)
}
}
}
func (ob *TargetObserver) clean() {
collections := typeutil.NewSet(ob.meta.GetAll()...)
ob.mut.Lock()
defer ob.mut.Unlock()
for collectionID, notifiers := range ob.readyNotifiers {
if !collections.Contain(collectionID) {
if !collectionSet.Contain(collectionID) {
for i := range notifiers {
close(notifiers[i])
}

View File

@ -83,7 +83,6 @@ func (suite *TargetObserverSuite) SetupTest() {
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.distMgr = meta.NewDistributionManager()
suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker)
suite.observer.Start()
suite.collectionID = int64(1000)
suite.partitionID = int64(100)
@ -122,6 +121,7 @@ func (suite *TargetObserverSuite) SetupTest() {
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(suite.nextTargetChannels, suite.nextTargetSegments, nil)
suite.observer.Start()
}
func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {