Fix force release segment logic (#19471)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/19435/head
congqixia 2022-09-27 19:22:54 +08:00 committed by GitHub
parent 73463d030d
commit 384e7c3169
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 92 additions and 41 deletions

View File

@ -1221,6 +1221,8 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
}
func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannel()))
log.Debug("SyncDistribution received")
shardCluster, ok := node.ShardClusterService.getShardCluster(req.GetChannel())
if !ok {
return &commonpb.Status{
@ -1229,9 +1231,13 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
}, nil
}
for _, action := range req.GetActions() {
log.Debug("sync action", zap.String("Action", action.GetType().String()), zap.Int64("segmentID", action.SegmentID))
switch action.GetType() {
case querypb.SyncType_Remove:
shardCluster.forceRemoveSegment(action.GetSegmentID())
shardCluster.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{
SegmentIDs: []UniqueID{action.GetSegmentID()},
Scope: querypb.DataScope_Historical,
}, true)
case querypb.SyncType_Set:
shardCluster.SyncSegments([]*querypb.ReplicaSegmentsInfo{
{NodeId: action.GetNodeID(), PartitionId: action.GetPartitionID(), SegmentIds: []int64{action.GetSegmentID()}},

View File

@ -835,6 +835,25 @@ func TestSyncDistribution(t *testing.T) {
assert.Equal(t, defaultPartitionID, segment.partitionID)
assert.Equal(t, segmentStateLoaded, segment.state)
resp, err = node.SyncDistribution(ctx, &querypb.SyncDistributionRequest{
CollectionID: defaultCollectionID,
Channel: defaultDMLChannel,
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Remove,
PartitionID: defaultPartitionID,
SegmentID: defaultSegmentID,
NodeID: 99,
},
},
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
cs, ok = node.ShardClusterService.getShardCluster(defaultDMLChannel)
require.True(t, ok)
_, ok = cs.getSegment(defaultSegmentID)
require.False(t, ok)
})
}

View File

@ -4,7 +4,9 @@ import (
"context"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
"go.uber.org/zap"
)
func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
@ -15,6 +17,7 @@ func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmen
shard := req.GetInfos()[0].GetInsertChannel()
shardCluster, ok := node.ShardClusterService.getShardCluster(shard)
if !ok {
log.Warn("TransferLoad failed to find shard cluster", zap.String("shard", shard))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NotShardLeader,
Reason: "shard cluster not found, the leader may have changed",
@ -22,8 +25,9 @@ func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmen
}
req.NeedTransfer = false
err := shardCluster.loadSegments(ctx, req)
err := shardCluster.LoadSegments(ctx, req)
if err != nil {
log.Warn("shard cluster failed to load segments", zap.String("shard", shard), zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
@ -38,6 +42,7 @@ func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmen
func (node *QueryNode) TransferRelease(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
shardCluster, ok := node.ShardClusterService.getShardCluster(req.GetShard())
if !ok {
log.Warn("TransferLoad failed to find shard cluster", zap.String("shard", req.GetShard()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NotShardLeader,
Reason: "shard cluster not found, the leader may have changed",
@ -45,8 +50,9 @@ func (node *QueryNode) TransferRelease(ctx context.Context, req *querypb.Release
}
req.NeedTransfer = false
err := shardCluster.releaseSegments(ctx, req)
err := shardCluster.ReleaseSegments(ctx, req, false)
if err != nil {
log.Warn("shard cluster failed to release segments", zap.String("shard", req.GetShard()), zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),

View File

@ -387,7 +387,7 @@ func (node *QueryNode) watchChangeInfo() {
return
}
// if watch loop return due to event canceled, the datanode is not functional anymore
log.Panic("querynoe3 is not functional for event canceled", zap.Error(err))
log.Panic("querynode is not functional for event canceled", zap.Error(err))
return
}
for _, event := range resp.Events {

View File

@ -188,6 +188,7 @@ func NewShardCluster(collectionID int64, replicaID int64, vchannelName string,
}
func (sc *ShardCluster) Close() {
log := sc.getLogger()
log.Info("Close shard cluster")
sc.closeOnce.Do(func() {
sc.updateShardClusterState(unavailable)
@ -202,6 +203,12 @@ func (sc *ShardCluster) Close() {
})
}
func (sc *ShardCluster) getLogger() *zap.Logger {
return log.With(zap.Int64("collectionID", sc.collectionID),
zap.String("channel", sc.vchannelName),
zap.Int64("replicaID", sc.replicaID))
}
// serviceable returns whether shard cluster could provide query service.
func (sc *ShardCluster) serviceable() bool {
// all segment in loaded state
@ -217,6 +224,7 @@ func (sc *ShardCluster) serviceable() bool {
// addNode add a node into cluster
func (sc *ShardCluster) addNode(evt nodeEvent) {
log := sc.getLogger()
log.Info("ShardCluster add node", zap.Int64("nodeID", evt.nodeID))
sc.mut.Lock()
defer sc.mut.Unlock()
@ -243,6 +251,7 @@ func (sc *ShardCluster) addNode(evt nodeEvent) {
// removeNode handles node offline and setup related segments
func (sc *ShardCluster) removeNode(evt nodeEvent) {
log := sc.getLogger()
log.Info("ShardCluster remove node", zap.Int64("nodeID", evt.nodeID))
sc.mut.Lock()
defer sc.mut.Unlock()
@ -268,6 +277,7 @@ func (sc *ShardCluster) removeNode(evt nodeEvent) {
// updateSegment apply segment change to shard cluster
func (sc *ShardCluster) updateSegment(evt shardSegmentInfo) {
log := sc.getLogger()
log.Info("ShardCluster update segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state)))
// notify handoff wait online if any
defer func() {
@ -295,7 +305,7 @@ func (sc *ShardCluster) updateSegment(evt shardSegmentInfo) {
// SyncSegments synchronize segment distribution in batch
func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo, state segmentState) {
log := log.With(zap.Int64("collectionID", sc.collectionID), zap.String("vchannel", sc.vchannelName), zap.Int64("replicaID", sc.replicaID))
log := sc.getLogger()
log.Info("ShardCluster sync segments", zap.Any("replica segments", distribution), zap.Int32("state", int32(state)))
sc.mut.Lock()
@ -350,6 +360,7 @@ func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo
// Loading | OK | OK | NodeID check
// Loaded | OK | OK | legacy pending
func (sc *ShardCluster) transferSegment(old shardSegmentInfo, evt shardSegmentInfo) {
log := sc.getLogger()
switch old.state {
case segmentStateOffline: // safe to update nodeID and state
old.nodeID = evt.nodeID
@ -383,6 +394,7 @@ func (sc *ShardCluster) transferSegment(old shardSegmentInfo, evt shardSegmentIn
// removeSegment removes segment from cluster
// should only applied in hand-off or load balance procedure
func (sc *ShardCluster) removeSegment(evt shardSegmentInfo) {
log := sc.getLogger()
log.Info("ShardCluster remove segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state)))
sc.mut.Lock()
@ -403,15 +415,6 @@ func (sc *ShardCluster) removeSegment(evt shardSegmentInfo) {
sc.healthCheck()
}
func (sc *ShardCluster) forceRemoveSegment(segmentID int64) {
log := log.With(zap.String("shard", sc.vchannelName), zap.Int64("segmentID", segmentID))
log.Info("remove segment")
sc.mut.Lock()
defer sc.mut.Unlock()
delete(sc.segments, segmentID)
}
// init list all nodes and semgent states ant start watching
func (sc *ShardCluster) init() {
// list nodes
@ -462,14 +465,14 @@ func (sc *ShardCluster) selectNodeInReplica(nodeIDs []int64) (int64, bool) {
}
func (sc *ShardCluster) updateShardClusterState(state shardClusterState) {
log := sc.getLogger()
old := sc.state.Load()
sc.state.Store(int32(state))
pc, _, _, _ := runtime.Caller(1)
callerName := runtime.FuncForPC(pc).Name()
log.Info("Shard Cluster update state", zap.Int64("collectionID", sc.collectionID),
zap.Int64("replicaID", sc.replicaID), zap.String("channel", sc.vchannelName),
log.Info("Shard Cluster update state",
zap.Int32("old state", old), zap.Int32("new state", int32(state)),
zap.String("caller", callerName))
}
@ -488,11 +491,12 @@ func (sc *ShardCluster) healthCheck() {
// watchNodes handles node events.
func (sc *ShardCluster) watchNodes(evtCh <-chan nodeEvent) {
log := sc.getLogger()
for {
select {
case evt, ok := <-evtCh:
if !ok {
log.Warn("ShardCluster node channel closed", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID))
log.Warn("ShardCluster node channel closed")
return
}
switch evt.eventType {
@ -502,7 +506,7 @@ func (sc *ShardCluster) watchNodes(evtCh <-chan nodeEvent) {
sc.removeNode(evt)
}
case <-sc.closeCh:
log.Info("ShardCluster watchNode quit", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID), zap.String("vchannelName", sc.vchannelName))
log.Info("ShardCluster watchNode quit")
return
}
}
@ -510,11 +514,12 @@ func (sc *ShardCluster) watchNodes(evtCh <-chan nodeEvent) {
// watchSegments handles segment events.
func (sc *ShardCluster) watchSegments(evtCh <-chan segmentEvent) {
log := sc.getLogger()
for {
select {
case evt, ok := <-evtCh:
if !ok {
log.Warn("ShardCluster segment channel closed", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID))
log.Warn("ShardCluster segment channel closed")
return
}
info, ok := sc.pickNode(evt)
@ -530,7 +535,7 @@ func (sc *ShardCluster) watchSegments(evtCh <-chan segmentEvent) {
sc.removeSegment(info)
}
case <-sc.closeCh:
log.Info("ShardCluster watchSegments quit", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID), zap.String("vchannelName", sc.vchannelName))
log.Info("ShardCluster watchSegments quit")
return
}
}
@ -590,6 +595,7 @@ func (sc *ShardCluster) finishUsage(versionID int64) {
// HandoffSegments processes the handoff/load balance segments update procedure.
func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
log := sc.getLogger()
// wait for all OnlineSegment is loaded
onlineSegmentIDs := make([]int64, 0, len(info.OnlineSegments))
onlineSegments := make([]shardSegmentInfo, 0, len(info.OnlineSegments))
@ -640,7 +646,7 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
for nodeID, segmentIDs := range removes {
node, ok := sc.getNode(nodeID)
if !ok {
log.Warn("node not in cluster", zap.Int64("nodeID", nodeID), zap.Int64("collectionID", sc.collectionID), zap.String("vchannel", sc.vchannelName))
log.Warn("node not in cluster", zap.Int64("nodeID", nodeID))
errs = append(errs, fmt.Errorf("node not in cluster nodeID %d", nodeID))
continue
}
@ -667,12 +673,13 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
return nil
}
func (sc *ShardCluster) loadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
// LoadSegments loads segments with shardCluster.
// shard cluster shall try to loadSegments in the follower then update the allocation.
func (sc *ShardCluster) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
log := sc.getLogger()
// add common log fields
log := log.With(zap.Int64("collectionID", sc.collectionID),
zap.Int64("replicaID", sc.replicaID),
zap.String("vchannel", sc.vchannelName),
zap.Int64("dstNodeID", req.GetDstNodeID()))
log = log.With(zap.Int64("dstNodeID", req.GetDstNodeID()))
segmentIDs := make([]int64, 0, len(req.Infos))
for _, info := range req.Infos {
segmentIDs = append(segmentIDs, info.SegmentID)
@ -722,14 +729,17 @@ func (sc *ShardCluster) loadSegments(ctx context.Context, req *querypb.LoadSegme
return nil
}
func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) error {
// ReleaseSegments releases segments via ShardCluster.
// ShardCluster will wait all on-going search until finished, update the current version,
// then release the segments through follower.
func (sc *ShardCluster) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error {
log := sc.getLogger()
// add common log fields
log := log.With(zap.Int64("collectionID", sc.collectionID),
zap.Int64("replicaID", sc.replicaID),
zap.String("vchannel", sc.vchannelName),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
zap.String("scope", req.GetScope().String()))
log = log.With(zap.Int64s("segmentIDs", req.GetSegmentIDs()),
zap.String("scope", req.GetScope().String()),
zap.Bool("force", force))
//shardCluster.forceRemoveSegment(action.GetSegmentID())
offlineSegments := make(typeutil.UniqueSet)
if req.Scope != querypb.DataScope_Streaming {
offlineSegments.Insert(req.GetSegmentIDs()...)
@ -744,7 +754,7 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas
var allocations SegmentsStatus
if sc.currentVersion != nil {
allocations = sc.currentVersion.segments.Clone(func(segmentID UniqueID, nodeID UniqueID) bool {
return nodeID == req.NodeID && offlineSegments.Contain(segmentID)
return (nodeID == req.NodeID || force) && offlineSegments.Contain(segmentID)
})
}
@ -755,16 +765,24 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas
version := NewShardClusterVersion(versionID, allocations, sc.currentVersion)
sc.versions.Store(versionID, version)
// currentVersion shall be not nil
if sc.currentVersion != nil {
// wait for last version search done
<-sc.currentVersion.Expire()
lastVersionID = sc.currentVersion.versionID
// force release means current distribution has error
if !force {
// currentVersion shall be not nil
if sc.currentVersion != nil {
// wait for last version search done
<-sc.currentVersion.Expire()
lastVersionID = sc.currentVersion.versionID
}
}
// set current version to new one
sc.currentVersion = version
// force release skips the release call
if force {
return
}
// try to release segments from nodes
node, ok := sc.getNode(req.GetNodeID())
if !ok {
@ -793,7 +811,7 @@ func (sc *ShardCluster) releaseSegments(ctx context.Context, req *querypb.Releas
info, ok := sc.segments[segmentID]
if ok {
// otherwise, segment is on another node, do nothing
if info.nodeID == req.NodeID {
if force || info.nodeID == req.NodeID {
delete(sc.segments, segmentID)
}
}

View File

@ -2462,6 +2462,7 @@ func (suite *ShardClusterSuite) TestReleaseSegments() {
scope querypb.DataScope
expectError bool
force bool
}
cases := []TestCase{
@ -2471,6 +2472,7 @@ func (suite *ShardClusterSuite) TestReleaseSegments() {
nodeID: 2,
scope: querypb.DataScope_All,
expectError: false,
force: false,
},
}
@ -2479,11 +2481,11 @@ func (suite *ShardClusterSuite) TestReleaseSegments() {
suite.TearDownTest()
suite.SetupTest()
err := suite.sc.releaseSegments(context.Background(), &querypb.ReleaseSegmentsRequest{
err := suite.sc.ReleaseSegments(context.Background(), &querypb.ReleaseSegmentsRequest{
NodeID: test.nodeID,
SegmentIDs: test.segmentIDs,
Scope: test.scope,
})
}, test.force)
if test.expectError {
suite.Error(err)
} else {