From 384e7c316912d1e5fc48bbf60457aa39a60fb809 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 27 Sep 2022 19:22:54 +0800 Subject: [PATCH] Fix force release segment logic (#19471) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- internal/querynode/impl.go | 8 ++- internal/querynode/impl_test.go | 19 +++++ internal/querynode/impl_utils.go | 10 ++- internal/querynode/query_node.go | 2 +- internal/querynode/shard_cluster.go | 88 ++++++++++++++---------- internal/querynode/shard_cluster_test.go | 6 +- 6 files changed, 92 insertions(+), 41 deletions(-) diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 239b016b37..fbe4940694 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -1221,6 +1221,8 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get } func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) { + log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannel())) + log.Debug("SyncDistribution received") shardCluster, ok := node.ShardClusterService.getShardCluster(req.GetChannel()) if !ok { return &commonpb.Status{ @@ -1229,9 +1231,13 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi }, nil } for _, action := range req.GetActions() { + log.Debug("sync action", zap.String("Action", action.GetType().String()), zap.Int64("segmentID", action.SegmentID)) switch action.GetType() { case querypb.SyncType_Remove: - shardCluster.forceRemoveSegment(action.GetSegmentID()) + shardCluster.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{ + SegmentIDs: []UniqueID{action.GetSegmentID()}, + Scope: querypb.DataScope_Historical, + }, true) case querypb.SyncType_Set: shardCluster.SyncSegments([]*querypb.ReplicaSegmentsInfo{ {NodeId: action.GetNodeID(), PartitionId: action.GetPartitionID(), SegmentIds: []int64{action.GetSegmentID()}}, diff --git a/internal/querynode/impl_test.go b/internal/querynode/impl_test.go index aad8067a56..74d18f5de2 100644 --- a/internal/querynode/impl_test.go +++ b/internal/querynode/impl_test.go @@ -835,6 +835,25 @@ func TestSyncDistribution(t *testing.T) { assert.Equal(t, defaultPartitionID, segment.partitionID) assert.Equal(t, segmentStateLoaded, segment.state) + resp, err = node.SyncDistribution(ctx, &querypb.SyncDistributionRequest{ + CollectionID: defaultCollectionID, + Channel: defaultDMLChannel, + Actions: []*querypb.SyncAction{ + { + Type: querypb.SyncType_Remove, + PartitionID: defaultPartitionID, + SegmentID: defaultSegmentID, + NodeID: 99, + }, + }, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) + + cs, ok = node.ShardClusterService.getShardCluster(defaultDMLChannel) + require.True(t, ok) + _, ok = cs.getSegment(defaultSegmentID) + require.False(t, ok) }) } diff --git a/internal/querynode/impl_utils.go b/internal/querynode/impl_utils.go index cce55827ee..dec692e761 100644 --- a/internal/querynode/impl_utils.go +++ b/internal/querynode/impl_utils.go @@ -4,7 +4,9 @@ import ( "context" "github.com/milvus-io/milvus/api/commonpb" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/querypb" + "go.uber.org/zap" ) func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) { @@ -15,6 +17,7 @@ func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmen shard := req.GetInfos()[0].GetInsertChannel() shardCluster, ok := node.ShardClusterService.getShardCluster(shard) if !ok { + log.Warn("TransferLoad failed to find shard cluster", zap.String("shard", shard)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_NotShardLeader, Reason: "shard cluster not found, the leader may have changed", @@ -22,8 +25,9 @@ func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmen } req.NeedTransfer = false - err := shardCluster.loadSegments(ctx, req) + err := shardCluster.LoadSegments(ctx, req) if err != nil { + log.Warn("shard cluster failed to load segments", zap.String("shard", shard), zap.Error(err)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), @@ -38,6 +42,7 @@ func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmen func (node *QueryNode) TransferRelease(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) { shardCluster, ok := node.ShardClusterService.getShardCluster(req.GetShard()) if !ok { + log.Warn("TransferLoad failed to find shard cluster", zap.String("shard", req.GetShard())) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_NotShardLeader, Reason: "shard cluster not found, the leader may have changed", @@ -45,8 +50,9 @@ func (node *QueryNode) TransferRelease(ctx context.Context, req *querypb.Release } req.NeedTransfer = false - err := shardCluster.releaseSegments(ctx, req) + err := shardCluster.ReleaseSegments(ctx, req, false) if err != nil { + log.Warn("shard cluster failed to release segments", zap.String("shard", req.GetShard()), zap.Error(err)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index b40cba7120..87f8592cdf 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -387,7 +387,7 @@ func (node *QueryNode) watchChangeInfo() { return } // if watch loop return due to event canceled, the datanode is not functional anymore - log.Panic("querynoe3 is not functional for event canceled", zap.Error(err)) + log.Panic("querynode is not functional for event canceled", zap.Error(err)) return } for _, event := range resp.Events { diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index 4c3213b23f..140070121a 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -188,6 +188,7 @@ func NewShardCluster(collectionID int64, replicaID int64, vchannelName string, } func (sc *ShardCluster) Close() { + log := sc.getLogger() log.Info("Close shard cluster") sc.closeOnce.Do(func() { sc.updateShardClusterState(unavailable) @@ -202,6 +203,12 @@ func (sc *ShardCluster) Close() { }) } +func (sc *ShardCluster) getLogger() *zap.Logger { + return log.With(zap.Int64("collectionID", sc.collectionID), + zap.String("channel", sc.vchannelName), + zap.Int64("replicaID", sc.replicaID)) +} + // serviceable returns whether shard cluster could provide query service. func (sc *ShardCluster) serviceable() bool { // all segment in loaded state @@ -217,6 +224,7 @@ func (sc *ShardCluster) serviceable() bool { // addNode add a node into cluster func (sc *ShardCluster) addNode(evt nodeEvent) { + log := sc.getLogger() log.Info("ShardCluster add node", zap.Int64("nodeID", evt.nodeID)) sc.mut.Lock() defer sc.mut.Unlock() @@ -243,6 +251,7 @@ func (sc *ShardCluster) addNode(evt nodeEvent) { // removeNode handles node offline and setup related segments func (sc *ShardCluster) removeNode(evt nodeEvent) { + log := sc.getLogger() log.Info("ShardCluster remove node", zap.Int64("nodeID", evt.nodeID)) sc.mut.Lock() defer sc.mut.Unlock() @@ -268,6 +277,7 @@ func (sc *ShardCluster) removeNode(evt nodeEvent) { // updateSegment apply segment change to shard cluster func (sc *ShardCluster) updateSegment(evt shardSegmentInfo) { + log := sc.getLogger() log.Info("ShardCluster update segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state))) // notify handoff wait online if any defer func() { @@ -295,7 +305,7 @@ func (sc *ShardCluster) updateSegment(evt shardSegmentInfo) { // SyncSegments synchronize segment distribution in batch func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo, state segmentState) { - log := log.With(zap.Int64("collectionID", sc.collectionID), zap.String("vchannel", sc.vchannelName), zap.Int64("replicaID", sc.replicaID)) + log := sc.getLogger() log.Info("ShardCluster sync segments", zap.Any("replica segments", distribution), zap.Int32("state", int32(state))) sc.mut.Lock() @@ -350,6 +360,7 @@ func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo // Loading | OK | OK | NodeID check // Loaded | OK | OK | legacy pending func (sc *ShardCluster) transferSegment(old shardSegmentInfo, evt shardSegmentInfo) { + log := sc.getLogger() switch old.state { case segmentStateOffline: // safe to update nodeID and state old.nodeID = evt.nodeID @@ -383,6 +394,7 @@ func (sc *ShardCluster) transferSegment(old shardSegmentInfo, evt shardSegmentIn // removeSegment removes segment from cluster // should only applied in hand-off or load balance procedure func (sc *ShardCluster) removeSegment(evt shardSegmentInfo) { + log := sc.getLogger() log.Info("ShardCluster remove segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state))) sc.mut.Lock() @@ -403,15 +415,6 @@ func (sc *ShardCluster) removeSegment(evt shardSegmentInfo) { sc.healthCheck() } -func (sc *ShardCluster) forceRemoveSegment(segmentID int64) { - log := log.With(zap.String("shard", sc.vchannelName), zap.Int64("segmentID", segmentID)) - log.Info("remove segment") - sc.mut.Lock() - defer sc.mut.Unlock() - - delete(sc.segments, segmentID) -} - // init list all nodes and semgent states ant start watching func (sc *ShardCluster) init() { // list nodes @@ -462,14 +465,14 @@ func (sc *ShardCluster) selectNodeInReplica(nodeIDs []int64) (int64, bool) { } func (sc *ShardCluster) updateShardClusterState(state shardClusterState) { + log := sc.getLogger() old := sc.state.Load() sc.state.Store(int32(state)) pc, _, _, _ := runtime.Caller(1) callerName := runtime.FuncForPC(pc).Name() - log.Info("Shard Cluster update state", zap.Int64("collectionID", sc.collectionID), - zap.Int64("replicaID", sc.replicaID), zap.String("channel", sc.vchannelName), + log.Info("Shard Cluster update state", zap.Int32("old state", old), zap.Int32("new state", int32(state)), zap.String("caller", callerName)) } @@ -488,11 +491,12 @@ func (sc *ShardCluster) healthCheck() { // watchNodes handles node events. func (sc *ShardCluster) watchNodes(evtCh <-chan nodeEvent) { + log := sc.getLogger() for { select { case evt, ok := <-evtCh: if !ok { - log.Warn("ShardCluster node channel closed", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID)) + log.Warn("ShardCluster node channel closed") return } switch evt.eventType { @@ -502,7 +506,7 @@ func (sc *ShardCluster) watchNodes(evtCh <-chan nodeEvent) { sc.removeNode(evt) } case <-sc.closeCh: - log.Info("ShardCluster watchNode quit", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID), zap.String("vchannelName", sc.vchannelName)) + log.Info("ShardCluster watchNode quit") return } } @@ -510,11 +514,12 @@ func (sc *ShardCluster) watchNodes(evtCh <-chan nodeEvent) { // watchSegments handles segment events. func (sc *ShardCluster) watchSegments(evtCh <-chan segmentEvent) { + log := sc.getLogger() for { select { case evt, ok := <-evtCh: if !ok { - log.Warn("ShardCluster segment channel closed", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID)) + log.Warn("ShardCluster segment channel closed") return } info, ok := sc.pickNode(evt) @@ -530,7 +535,7 @@ func (sc *ShardCluster) watchSegments(evtCh <-chan segmentEvent) { sc.removeSegment(info) } case <-sc.closeCh: - log.Info("ShardCluster watchSegments quit", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID), zap.String("vchannelName", sc.vchannelName)) + log.Info("ShardCluster watchSegments quit") return } } @@ -590,6 +595,7 @@ func (sc *ShardCluster) finishUsage(versionID int64) { // HandoffSegments processes the handoff/load balance segments update procedure. func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error { + log := sc.getLogger() // wait for all OnlineSegment is loaded onlineSegmentIDs := make([]int64, 0, len(info.OnlineSegments)) onlineSegments := make([]shardSegmentInfo, 0, len(info.OnlineSegments)) @@ -640,7 +646,7 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error { for nodeID, segmentIDs := range removes { node, ok := sc.getNode(nodeID) if !ok { - log.Warn("node not in cluster", zap.Int64("nodeID", nodeID), zap.Int64("collectionID", sc.collectionID), zap.String("vchannel", sc.vchannelName)) + log.Warn("node not in cluster", zap.Int64("nodeID", nodeID)) errs = append(errs, fmt.Errorf("node not in cluster nodeID %d", nodeID)) continue } @@ -667,12 +673,13 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error { return nil } -func (sc *ShardCluster) loadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error { +// LoadSegments loads segments with shardCluster. +// shard cluster shall try to loadSegments in the follower then update the allocation. +func (sc *ShardCluster) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error { + log := sc.getLogger() // add common log fields - log := log.With(zap.Int64("collectionID", sc.collectionID), - zap.Int64("replicaID", sc.replicaID), - zap.String("vchannel", sc.vchannelName), - zap.Int64("dstNodeID", req.GetDstNodeID())) + log = log.With(zap.Int64("dstNodeID", req.GetDstNodeID())) + segmentIDs := make([]int64, 0, len(req.Infos)) for _, info := range req.Infos { segmentIDs = append(segmentIDs, info.SegmentID) @@ -722,14 +729,17 @@ func (sc *ShardCluster) loadSegments(ctx context.Context, req *querypb.LoadSegme return nil } -func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) error { +// ReleaseSegments releases segments via ShardCluster. +// ShardCluster will wait all on-going search until finished, update the current version, +// then release the segments through follower. +func (sc *ShardCluster) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error { + log := sc.getLogger() // add common log fields - log := log.With(zap.Int64("collectionID", sc.collectionID), - zap.Int64("replicaID", sc.replicaID), - zap.String("vchannel", sc.vchannelName), - zap.Int64s("segmentIDs", req.GetSegmentIDs()), - zap.String("scope", req.GetScope().String())) + log = log.With(zap.Int64s("segmentIDs", req.GetSegmentIDs()), + zap.String("scope", req.GetScope().String()), + zap.Bool("force", force)) + //shardCluster.forceRemoveSegment(action.GetSegmentID()) offlineSegments := make(typeutil.UniqueSet) if req.Scope != querypb.DataScope_Streaming { offlineSegments.Insert(req.GetSegmentIDs()...) @@ -744,7 +754,7 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas var allocations SegmentsStatus if sc.currentVersion != nil { allocations = sc.currentVersion.segments.Clone(func(segmentID UniqueID, nodeID UniqueID) bool { - return nodeID == req.NodeID && offlineSegments.Contain(segmentID) + return (nodeID == req.NodeID || force) && offlineSegments.Contain(segmentID) }) } @@ -755,16 +765,24 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas version := NewShardClusterVersion(versionID, allocations, sc.currentVersion) sc.versions.Store(versionID, version) - // currentVersion shall be not nil - if sc.currentVersion != nil { - // wait for last version search done - <-sc.currentVersion.Expire() - lastVersionID = sc.currentVersion.versionID + // force release means current distribution has error + if !force { + // currentVersion shall be not nil + if sc.currentVersion != nil { + // wait for last version search done + <-sc.currentVersion.Expire() + lastVersionID = sc.currentVersion.versionID + } } // set current version to new one sc.currentVersion = version + // force release skips the release call + if force { + return + } + // try to release segments from nodes node, ok := sc.getNode(req.GetNodeID()) if !ok { @@ -793,7 +811,7 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas info, ok := sc.segments[segmentID] if ok { // otherwise, segment is on another node, do nothing - if info.nodeID == req.NodeID { + if force || info.nodeID == req.NodeID { delete(sc.segments, segmentID) } } diff --git a/internal/querynode/shard_cluster_test.go b/internal/querynode/shard_cluster_test.go index 672ff2c2f5..60a197c6bc 100644 --- a/internal/querynode/shard_cluster_test.go +++ b/internal/querynode/shard_cluster_test.go @@ -2462,6 +2462,7 @@ func (suite *ShardClusterSuite) TestReleaseSegments() { scope querypb.DataScope expectError bool + force bool } cases := []TestCase{ @@ -2471,6 +2472,7 @@ func (suite *ShardClusterSuite) TestReleaseSegments() { nodeID: 2, scope: querypb.DataScope_All, expectError: false, + force: false, }, } @@ -2479,11 +2481,11 @@ func (suite *ShardClusterSuite) TestReleaseSegments() { suite.TearDownTest() suite.SetupTest() - err := suite.sc.releaseSegments(context.Background(), &querypb.ReleaseSegmentsRequest{ + err := suite.sc.ReleaseSegments(context.Background(), &querypb.ReleaseSegmentsRequest{ NodeID: test.nodeID, SegmentIDs: test.segmentIDs, Scope: test.scope, - }) + }, test.force) if test.expectError { suite.Error(err) } else {