fix: Dynamic release parition may fail search/query. (#37049) (#37099)

issue: #33550
pr: #37049
cause wrong impl of UpdateCollectionNextTarget, if ReleaseCollection and
UpdateCollectionNextTarget happens at same time, the the released
partition's segment list may be add to target again, and delegator will
be marked as unserviceable due to lack of segment.

This PR fix the impl of UpdateCollectionNextTarget

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/37132/head
wei liu 2024-10-24 18:01:30 +08:00 committed by GitHub
parent 3db137f4ad
commit 59b2563029
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 40 additions and 305 deletions

View File

@ -606,164 +606,6 @@ func (_c *MockTargetManager_IsNextTargetExist_Call) RunAndReturn(run func(int64)
return _c
}
// PullNextTargetV1 provides a mock function with given fields: broker, collectionID, chosenPartitionIDs
func (_m *MockTargetManager) PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) {
_va := make([]interface{}, len(chosenPartitionIDs))
for _i := range chosenPartitionIDs {
_va[_i] = chosenPartitionIDs[_i]
}
var _ca []interface{}
_ca = append(_ca, broker, collectionID)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 map[int64]*datapb.SegmentInfo
var r1 map[string]*DmChannel
var r2 error
if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)); ok {
return rf(broker, collectionID, chosenPartitionIDs...)
}
if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) map[int64]*datapb.SegmentInfo); ok {
r0 = rf(broker, collectionID, chosenPartitionIDs...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo)
}
}
if rf, ok := ret.Get(1).(func(Broker, int64, ...int64) map[string]*DmChannel); ok {
r1 = rf(broker, collectionID, chosenPartitionIDs...)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(map[string]*DmChannel)
}
}
if rf, ok := ret.Get(2).(func(Broker, int64, ...int64) error); ok {
r2 = rf(broker, collectionID, chosenPartitionIDs...)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// MockTargetManager_PullNextTargetV1_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullNextTargetV1'
type MockTargetManager_PullNextTargetV1_Call struct {
*mock.Call
}
// PullNextTargetV1 is a helper method to define mock.On call
// - broker Broker
// - collectionID int64
// - chosenPartitionIDs ...int64
func (_e *MockTargetManager_Expecter) PullNextTargetV1(broker interface{}, collectionID interface{}, chosenPartitionIDs ...interface{}) *MockTargetManager_PullNextTargetV1_Call {
return &MockTargetManager_PullNextTargetV1_Call{Call: _e.mock.On("PullNextTargetV1",
append([]interface{}{broker, collectionID}, chosenPartitionIDs...)...)}
}
func (_c *MockTargetManager_PullNextTargetV1_Call) Run(run func(broker Broker, collectionID int64, chosenPartitionIDs ...int64)) *MockTargetManager_PullNextTargetV1_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]int64, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(int64)
}
}
run(args[0].(Broker), args[1].(int64), variadicArgs...)
})
return _c
}
func (_c *MockTargetManager_PullNextTargetV1_Call) Return(_a0 map[int64]*datapb.SegmentInfo, _a1 map[string]*DmChannel, _a2 error) *MockTargetManager_PullNextTargetV1_Call {
_c.Call.Return(_a0, _a1, _a2)
return _c
}
func (_c *MockTargetManager_PullNextTargetV1_Call) RunAndReturn(run func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)) *MockTargetManager_PullNextTargetV1_Call {
_c.Call.Return(run)
return _c
}
// PullNextTargetV2 provides a mock function with given fields: broker, collectionID, chosenPartitionIDs
func (_m *MockTargetManager) PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) {
_va := make([]interface{}, len(chosenPartitionIDs))
for _i := range chosenPartitionIDs {
_va[_i] = chosenPartitionIDs[_i]
}
var _ca []interface{}
_ca = append(_ca, broker, collectionID)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 map[int64]*datapb.SegmentInfo
var r1 map[string]*DmChannel
var r2 error
if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)); ok {
return rf(broker, collectionID, chosenPartitionIDs...)
}
if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) map[int64]*datapb.SegmentInfo); ok {
r0 = rf(broker, collectionID, chosenPartitionIDs...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo)
}
}
if rf, ok := ret.Get(1).(func(Broker, int64, ...int64) map[string]*DmChannel); ok {
r1 = rf(broker, collectionID, chosenPartitionIDs...)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(map[string]*DmChannel)
}
}
if rf, ok := ret.Get(2).(func(Broker, int64, ...int64) error); ok {
r2 = rf(broker, collectionID, chosenPartitionIDs...)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// MockTargetManager_PullNextTargetV2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullNextTargetV2'
type MockTargetManager_PullNextTargetV2_Call struct {
*mock.Call
}
// PullNextTargetV2 is a helper method to define mock.On call
// - broker Broker
// - collectionID int64
// - chosenPartitionIDs ...int64
func (_e *MockTargetManager_Expecter) PullNextTargetV2(broker interface{}, collectionID interface{}, chosenPartitionIDs ...interface{}) *MockTargetManager_PullNextTargetV2_Call {
return &MockTargetManager_PullNextTargetV2_Call{Call: _e.mock.On("PullNextTargetV2",
append([]interface{}{broker, collectionID}, chosenPartitionIDs...)...)}
}
func (_c *MockTargetManager_PullNextTargetV2_Call) Run(run func(broker Broker, collectionID int64, chosenPartitionIDs ...int64)) *MockTargetManager_PullNextTargetV2_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]int64, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(int64)
}
}
run(args[0].(Broker), args[1].(int64), variadicArgs...)
})
return _c
}
func (_c *MockTargetManager_PullNextTargetV2_Call) Return(_a0 map[int64]*datapb.SegmentInfo, _a1 map[string]*DmChannel, _a2 error) *MockTargetManager_PullNextTargetV2_Call {
_c.Call.Return(_a0, _a1, _a2)
return _c
}
func (_c *MockTargetManager_PullNextTargetV2_Call) RunAndReturn(run func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)) *MockTargetManager_PullNextTargetV2_Call {
_c.Call.Return(run)
return _c
}
// Recover provides a mock function with given fields: catalog
func (_m *MockTargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
ret := _m.Called(catalog)

View File

@ -22,7 +22,6 @@ import (
"runtime"
"sync"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
@ -34,7 +33,6 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
@ -53,8 +51,6 @@ const (
type TargetManagerInterface interface {
UpdateCollectionCurrentTarget(collectionID int64) bool
UpdateCollectionNextTarget(collectionID int64) error
PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)
PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)
RemoveCollection(collectionID int64)
RemovePartition(collectionID int64, partitionIDs ...int64)
GetGrowingSegmentsByCollection(collectionID int64, scope TargetScope) typeutil.UniqueSet
@ -140,150 +136,70 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool
// WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update,
// which may make the current target not available
func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
var vChannelInfos []*datapb.VchannelInfo
var segmentInfos []*datapb.SegmentInfo
err := retry.Handle(context.TODO(), func() (bool, error) {
var err error
vChannelInfos, segmentInfos, err = mgr.broker.GetRecoveryInfoV2(context.TODO(), collectionID)
if err != nil {
return true, err
}
return false, nil
}, retry.Attempts(10))
if err != nil {
log.Warn("failed to get next targets for collection", zap.Int64("collectionID", collectionID), zap.Error(err))
return err
}
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
partitions := mgr.meta.GetPartitionsByCollection(collectionID)
partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 {
return partition.PartitionID
})
allocatedTarget := NewCollectionTarget(nil, nil, partitionIDs)
mgr.rwMutex.Unlock()
log := log.With(zap.Int64("collectionID", collectionID),
zap.Int64s("PartitionIDs", partitionIDs))
segments, channels, err := mgr.PullNextTargetV2(mgr.broker, collectionID, partitionIDs...)
if err != nil {
log.Warn("failed to get next targets for collection", zap.Error(err))
return err
}
if len(segments) == 0 && len(channels) == 0 {
log.Debug("skip empty next targets for collection")
return nil
}
allocatedTarget.segments = segments
allocatedTarget.dmChannels = channels
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
mgr.next.updateCollectionTarget(collectionID, allocatedTarget)
log.Debug("finish to update next targets for collection",
zap.Int64s("segments", allocatedTarget.GetAllSegmentIDs()),
zap.Strings("channels", allocatedTarget.GetAllDmChannelNames()))
return nil
}
func (mgr *TargetManager) PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) {
if len(chosenPartitionIDs) == 0 {
return nil, nil, nil
}
channelInfos := make(map[string][]*datapb.VchannelInfo)
segments := make(map[int64]*datapb.SegmentInfo, 0)
dmChannels := make(map[string]*DmChannel)
fullPartitions, err := broker.GetPartitions(context.Background(), collectionID)
if err != nil {
return nil, nil, err
}
// we should pull `channel targets` from all partitions because QueryNodes need to load
// the complete growing segments. And we should pull `segments targets` only from the chosen partitions.
for _, partitionID := range fullPartitions {
log.Debug("get recovery info...",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID))
vChannelInfos, binlogs, err := broker.GetRecoveryInfo(context.TODO(), collectionID, partitionID)
if err != nil {
return nil, nil, err
}
for _, info := range vChannelInfos {
channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info)
}
if !lo.Contains(chosenPartitionIDs, partitionID) {
continue
}
for _, binlog := range binlogs {
segments[binlog.GetSegmentID()] = &datapb.SegmentInfo{
ID: binlog.GetSegmentID(),
for _, info := range vChannelInfos {
channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info)
for _, segmentID := range info.GetLevelZeroSegmentIds() {
segments[segmentID] = &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: binlog.GetInsertChannel(),
NumOfRows: binlog.GetNumOfRows(),
Binlogs: binlog.GetFieldBinlogs(),
Statslogs: binlog.GetStatslogs(),
Deltalogs: binlog.GetDeltalogs(),
InsertChannel: info.GetChannelName(),
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
}
}
}
partitionSet := typeutil.NewUniqueSet(partitionIDs...)
for _, segmentInfo := range segmentInfos {
if partitionSet.Contain(segmentInfo.GetPartitionID()) || segmentInfo.GetPartitionID() == common.AllPartitionsID {
segments[segmentInfo.GetID()] = segmentInfo
}
}
for _, infos := range channelInfos {
merged := mgr.mergeDmChannelInfo(infos)
dmChannels[merged.GetChannelName()] = merged
}
return segments, dmChannels, nil
}
func (mgr *TargetManager) PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) {
log.Debug("start to pull next targets for collection",
zap.Int64("collectionID", collectionID),
zap.Int64s("chosenPartitionIDs", chosenPartitionIDs))
if len(chosenPartitionIDs) == 0 {
return nil, nil, nil
}
channelInfos := make(map[string][]*datapb.VchannelInfo)
segments := make(map[int64]*datapb.SegmentInfo, 0)
dmChannels := make(map[string]*DmChannel)
getRecoveryInfo := func() error {
var err error
vChannelInfos, segmentInfos, err := broker.GetRecoveryInfoV2(context.TODO(), collectionID)
if err != nil {
// if meet rpc error, for compatibility with previous versions, try pull next target v1
if errors.Is(err, merr.ErrServiceUnimplemented) {
segments, dmChannels, err = mgr.PullNextTargetV1(broker, collectionID, chosenPartitionIDs...)
return err
}
return err
}
for _, info := range vChannelInfos {
channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info)
for _, segmentID := range info.GetLevelZeroSegmentIds() {
segments[segmentID] = &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
InsertChannel: info.GetChannelName(),
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
}
}
}
partitionSet := typeutil.NewUniqueSet(chosenPartitionIDs...)
for _, segmentInfo := range segmentInfos {
if partitionSet.Contain(segmentInfo.GetPartitionID()) || segmentInfo.GetPartitionID() == common.AllPartitionsID {
segments[segmentInfo.GetID()] = segmentInfo
}
}
for _, infos := range channelInfos {
merged := mgr.mergeDmChannelInfo(infos)
dmChannels[merged.GetChannelName()] = merged
}
if len(segments) == 0 && len(dmChannels) == 0 {
log.Debug("skip empty next targets for collection", zap.Int64("collectionID", collectionID), zap.Int64s("PartitionIDs", partitionIDs))
return nil
}
err := retry.Do(context.TODO(), getRecoveryInfo, retry.Attempts(10))
if err != nil {
return nil, nil, err
}
mgr.next.updateCollectionTarget(collectionID, NewCollectionTarget(segments, dmChannels, partitionIDs))
log.Debug("finish to update next targets for collection",
zap.Int64("collectionID", collectionID),
zap.Int64s("PartitionIDs", partitionIDs),
zap.Int64s("segments", allocatedTarget.GetAllSegmentIDs()),
zap.Strings("channels", allocatedTarget.GetAllDmChannelNames()))
return segments, dmChannels, nil
return nil
}
func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel {

View File

@ -24,8 +24,6 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
@ -36,7 +34,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -224,17 +221,6 @@ func (suite *TargetManagerSuite) TestUpdateNextTarget() {
},
}
nextTargetBinlogs := []*datapb.SegmentBinlogs{
{
SegmentID: 11,
InsertChannel: "channel-1",
},
{
SegmentID: 12,
InsertChannel: "channel-2",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nextTargetChannels, nextTargetSegments, nil)
suite.mgr.UpdateCollectionNextTarget(collectionID)
suite.assertSegments([]int64{11, 12}, suite.mgr.GetSealedSegmentsByCollection(collectionID, NextTarget))
@ -242,20 +228,11 @@ func (suite *TargetManagerSuite) TestUpdateNextTarget() {
suite.assertSegments([]int64{}, suite.mgr.GetSealedSegmentsByCollection(collectionID, CurrentTarget))
suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(collectionID, CurrentTarget))
suite.broker.ExpectedCalls = nil
// test getRecoveryInfoV2 failed , then back to getRecoveryInfo succeed
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(
nil, nil, merr.WrapErrServiceUnimplemented(status.Errorf(codes.Unimplemented, "fake not found")))
suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{1}, nil)
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, collectionID, int64(1)).Return(nextTargetChannels, nextTargetBinlogs, nil)
err := suite.mgr.UpdateCollectionNextTarget(collectionID)
suite.NoError(err)
suite.broker.ExpectedCalls = nil
// test getRecoveryInfoV2 failed , then retry getRecoveryInfoV2 succeed
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nil, nil, errors.New("fake error")).Times(1)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nextTargetChannels, nextTargetSegments, nil)
err = suite.mgr.UpdateCollectionNextTarget(collectionID)
err := suite.mgr.UpdateCollectionNextTarget(collectionID)
suite.NoError(err)
err = suite.mgr.UpdateCollectionNextTarget(collectionID)