fix: Return error when startup Delete/AddNode fail (#33193) (#33258)

See also: #33151, #33149
pr: #33193

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/33271/head
XuanYang-cn 2024-05-22 14:49:40 +08:00 committed by GitHub
parent 4b8680894f
commit e5ca1f7c45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 84 additions and 9 deletions

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -130,16 +131,19 @@ func (m *ChannelManagerImplV2) Startup(ctx context.Context, legacyNodes, allNode
oNodes := m.store.GetNodes()
m.mu.Unlock()
// Add new online nodes to the cluster.
offLines, newOnLines := lo.Difference(oNodes, allNodes)
lo.ForEach(newOnLines, func(nodeID int64, _ int) {
m.AddNode(nodeID)
})
// Delete offlines from the cluster
lo.ForEach(offLines, func(nodeID int64, _ int) {
m.DeleteNode(nodeID)
})
for _, nodeID := range offLines {
if err := m.DeleteNode(nodeID); err != nil {
return err
}
}
// Add new online nodes to the cluster.
for _, nodeID := range newOnLines {
if err := m.AddNode(nodeID); err != nil {
return err
}
}
m.mu.Lock()
nodeChannels := m.store.GetNodeChannelsBy(
@ -653,7 +657,10 @@ func (m *ChannelManagerImplV2) Check(ctx context.Context, nodeID int64, info *da
)
resp, err := m.subCluster.CheckChannelOperationProgress(ctx, nodeID, info)
if err != nil {
log.Warn("Fail to check channel operation progress")
log.Warn("Fail to check channel operation progress", zap.Error(err))
if errors.Is(err, merr.ErrNodeNotFound) {
return false, true
}
return false, false
}
log.Info("Got channel operation progress",

View File

@ -21,6 +21,7 @@ import (
"fmt"
"testing"
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
@ -31,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/kv/predicates"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -446,6 +448,29 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
s.checkAssignment(m, 1, "ch1", Watching)
s.checkAssignment(m, 1, "ch2", Watching)
})
s.Run("advance watching channels check ErrNodeNotFound", func() {
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
m.AdvanceChannelState(ctx)
s.checkAssignment(m, 1, "ch1", Watching)
s.checkAssignment(m, 1, "ch2", Watching)
s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything).
Return(nil, merr.WrapErrNodeNotFound(1)).Twice()
m.AdvanceChannelState(ctx)
s.checkAssignment(m, 1, "ch1", Standby)
s.checkAssignment(m, 1, "ch2", Standby)
})
s.Run("advance watching channels check watch success", func() {
chNodes := map[string]int64{
"ch1": 1,
@ -517,6 +542,28 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
s.checkAssignment(m, 1, "ch1", Releasing)
s.checkAssignment(m, 1, "ch2", Releasing)
})
s.Run("advance releasing channels check ErrNodeNotFound", func() {
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
m.AdvanceChannelState(ctx)
s.checkAssignment(m, 1, "ch1", Releasing)
s.checkAssignment(m, 1, "ch2", Releasing)
s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything).
Return(nil, merr.WrapErrNodeNotFound(1)).Twice()
m.AdvanceChannelState(ctx)
s.checkAssignment(m, 1, "ch1", Standby)
s.checkAssignment(m, 1, "ch2", Standby)
})
s.Run("advance releasing channels check release success", func() {
chNodes := map[string]int64{
"ch1": 1,
@ -659,5 +706,26 @@ func (s *ChannelManagerSuite) TestStartup() {
s.checkAssignment(m, 2, "ch3", ToWatch)
}
func (s *ChannelManagerSuite) TestStartupRootCoordFailed() {
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
"ch3": 1,
"ch4": bufferID,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockAlloc = NewNMockAllocator(s.T())
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(0, errors.New("mock rootcoord failure"))
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
err = m.Startup(context.TODO(), nil, []int64{2})
s.Error(err)
err = m.Startup(context.TODO(), nil, []int64{1, 2})
s.Error(err)
}
func (s *ChannelManagerSuite) TestCheckLoop() {}
func (s *ChannelManagerSuite) TestGet() {}