Fix handoff isn't processed correctly (#17738)

Signed-off-by: yah01 <yang.cen@zilliz.com>
Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>

Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
pull/17761/head
yah01 2022-06-27 22:08:17 +08:00 committed by GitHub
parent ef0fc37ad7
commit 797218a8ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 74 additions and 142 deletions

View File

@ -602,38 +602,38 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
for _, info := range onlineInfos {
segmentID := info.SegmentID
onlineInfo := proto.Clone(info).(*querypb.SegmentInfo)
// LoadBalance case
// A node loads the segment, and the other one offloads
offlineInfo, err := m.getSegmentInfoByID(segmentID)
if err == nil {
if err == nil && offlineInfo.SegmentState == commonpb.SegmentState_Sealed {
// if the offline segment state is growing, it will not impact the global sealed segments
if offlineInfo.SegmentState == commonpb.SegmentState_Sealed {
offlineNodes := diffSlice(offlineInfo.NodeIds, info.NodeIds...)
onlineInfo.NodeIds = diffSlice(info.NodeIds, offlineInfo.NodeIds...)
offlineInfo.NodeIds = diffSlice(offlineInfo.NodeIds, info.NodeIds...)
for _, node := range offlineNodes {
segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos,
&querypb.SegmentChangeInfo{
OfflineNodeID: node,
OfflineSegments: []*querypb.SegmentInfo{offlineInfo},
})
}
}
segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos,
&querypb.SegmentChangeInfo{
OnlineSegments: []*querypb.SegmentInfo{onlineInfo},
OfflineSegments: []*querypb.SegmentInfo{offlineInfo},
})
}
// Handoff case
// generate offline segment change info if the loaded segment is compacted from other sealed segments
compactChangeInfo := &querypb.SegmentChangeInfo{}
for _, compactionSegmentID := range info.CompactionFrom {
compactionSegmentInfo, err := m.getSegmentInfoByID(compactionSegmentID)
if err == nil && compactionSegmentInfo.SegmentState == commonpb.SegmentState_Sealed {
for _, node := range compactionSegmentInfo.NodeIds {
segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, &querypb.SegmentChangeInfo{
OfflineNodeID: node,
OfflineSegments: []*querypb.SegmentInfo{compactionSegmentInfo},
})
}
segmentsCompactionFrom = append(segmentsCompactionFrom, compactionSegmentInfo)
offlineInfo, err := m.getSegmentInfoByID(compactionSegmentID)
if err == nil && offlineInfo.SegmentState == commonpb.SegmentState_Sealed {
compactChangeInfo.OfflineSegments = append(compactChangeInfo.OfflineSegments, offlineInfo)
segmentsCompactionFrom = append(segmentsCompactionFrom, offlineInfo)
} else {
return nil, fmt.Errorf("saveGlobalSealedSegInfos: the compacted segment %d has not been loaded into memory", compactionSegmentID)
}
}
compactChangeInfo.OnlineSegments = append(compactChangeInfo.OnlineSegments, onlineInfo)
segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, compactChangeInfo)
}
col2SegmentChangeInfos[collectionID] = segmentsChangeInfo
}

View File

@ -372,8 +372,6 @@ func (node *QueryNode) handleSealedSegmentsChangeInfo(info *querypb.SealedSegmen
log.Warn("failed to validate vchannel for SegmentChangeInfo", zap.Error(err))
continue
}
// ignore segments that are online and offline in the same QueryNode
filterDuplicateChangeInfo(line)
node.ShardClusterService.HandoffVChannelSegments(vchannel, line)
}
@ -405,41 +403,3 @@ func validateChangeChannel(info *querypb.SegmentChangeInfo) (string, error) {
return channelName, nil
}
// filterDuplicateChangeInfo filters out duplicated sealed segments which are both online and offline (Fix issue#17347)
func filterDuplicateChangeInfo(line *querypb.SegmentChangeInfo) {
if line.OnlineNodeID == line.OfflineNodeID {
dupSegmentIDs := make(map[UniqueID]struct{})
for _, onlineSegment := range line.OnlineSegments {
for _, offlineSegment := range line.OfflineSegments {
if onlineSegment.SegmentID == offlineSegment.SegmentID && onlineSegment.SegmentState == segmentTypeSealed && offlineSegment.SegmentState == segmentTypeSealed {
dupSegmentIDs[onlineSegment.SegmentID] = struct{}{}
}
}
}
if len(dupSegmentIDs) == 0 {
return
}
var dupSegmentIDsList []UniqueID
for sid := range dupSegmentIDs {
dupSegmentIDsList = append(dupSegmentIDsList, sid)
}
log.Warn("Found sealed segments are that are online and offline.", zap.Int64s("SegmentIDs", dupSegmentIDsList))
var filteredOnlineSegments []*querypb.SegmentInfo
for _, onlineSegment := range line.OnlineSegments {
if _, ok := dupSegmentIDs[onlineSegment.SegmentID]; !ok {
filteredOnlineSegments = append(filteredOnlineSegments, onlineSegment)
}
}
line.OnlineSegments = filteredOnlineSegments
var filteredOfflineSegments []*querypb.SegmentInfo
for _, offlineSegment := range line.OfflineSegments {
if _, ok := dupSegmentIDs[offlineSegment.SegmentID]; !ok {
filteredOfflineSegments = append(filteredOfflineSegments, offlineSegment)
}
}
line.OfflineSegments = filteredOfflineSegments
}
}

View File

@ -413,74 +413,6 @@ func TestQueryNode_validateChangeChannel(t *testing.T) {
}
}
func TestQueryNode_filterDuplicateChangeInfo(t *testing.T) {
t.Run("dup change info", func(t *testing.T) {
info := &querypb.SegmentChangeInfo{
OnlineNodeID: 233,
OnlineSegments: []*querypb.SegmentInfo{
{
SegmentID: 23333,
SegmentState: segmentTypeSealed,
},
},
OfflineNodeID: 233,
OfflineSegments: []*querypb.SegmentInfo{
{
SegmentID: 23333,
SegmentState: segmentTypeSealed,
},
},
}
filterDuplicateChangeInfo(info)
assert.Equal(t, 0, len(info.OnlineSegments))
assert.Equal(t, 0, len(info.OfflineSegments))
})
t.Run("normal change info1", func(t *testing.T) {
info := &querypb.SegmentChangeInfo{
OnlineNodeID: 233,
OnlineSegments: []*querypb.SegmentInfo{
{
SegmentID: 23333,
SegmentState: segmentTypeSealed,
},
},
OfflineNodeID: 234,
OfflineSegments: []*querypb.SegmentInfo{
{
SegmentID: 23333,
SegmentState: segmentTypeSealed,
},
},
}
filterDuplicateChangeInfo(info)
assert.Equal(t, 1, len(info.OnlineSegments))
assert.Equal(t, 1, len(info.OfflineSegments))
})
t.Run("normal change info2", func(t *testing.T) {
info := &querypb.SegmentChangeInfo{
OnlineNodeID: 233,
OnlineSegments: []*querypb.SegmentInfo{
{
SegmentID: 23333,
SegmentState: segmentTypeSealed,
},
},
OfflineNodeID: 234,
OfflineSegments: []*querypb.SegmentInfo{
{
SegmentID: 23333,
SegmentState: segmentTypeSealed,
},
},
}
filterDuplicateChangeInfo(info)
assert.Equal(t, 1, len(info.OnlineSegments))
assert.Equal(t, 1, len(info.OfflineSegments))
})
}
func TestQueryNode_handleSealedSegmentsChangeInfo(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -394,20 +394,31 @@ func (sc *ShardCluster) init() {
sc.healthCheck()
}
// pickNode selects node id in cluster
// pickNode selects node in the cluster
func (sc *ShardCluster) pickNode(evt segmentEvent) (shardSegmentInfo, bool) {
for _, nodeID := range evt.nodeIDs {
nodeID, has := sc.selectNodeInReplica(evt.nodeIDs)
if has { // assume one segment shall exist once in one replica
return shardSegmentInfo{
segmentID: evt.segmentID,
partitionID: evt.partitionID,
nodeID: nodeID,
state: evt.state,
}, true
}
return shardSegmentInfo{}, false
}
// selectNodeInReplica returns first node id inside the shard cluster replica.
// if there is no nodeID found, returns 0.
func (sc *ShardCluster) selectNodeInReplica(nodeIDs []int64) (int64, bool) {
for _, nodeID := range nodeIDs {
_, has := sc.getNode(nodeID)
if has { // assume one segment shall exist once in one replica
return shardSegmentInfo{
segmentID: evt.segmentID,
partitionID: evt.partitionID,
nodeID: nodeID,
state: evt.state,
}, true
if has {
return nodeID, true
}
}
return shardSegmentInfo{}, false
return 0, false
}
// healthCheck iterate all segments to to check cluster could provide service.
@ -454,6 +465,8 @@ func (sc *ShardCluster) watchSegments(evtCh <-chan segmentEvent) {
}
info, ok := sc.pickNode(evt)
if !ok {
log.Info("No node of event is in cluster, skip to process it",
zap.Int64s("nodes", evt.nodeIDs))
continue
}
switch evt.eventType {
@ -531,8 +544,12 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
if seg.GetCollectionID() != sc.collectionID || seg.GetDmChannel() != sc.vchannelName {
continue
}
nodeID, has := sc.selectNodeInReplica(seg.NodeIds)
if !has {
continue
}
onlineSegments = append(onlineSegments, shardSegmentInfo{
nodeID: seg.GetNodeID(),
nodeID: nodeID,
segmentID: seg.GetSegmentID(),
})
onlineSegmentIDs = append(onlineSegmentIDs, seg.GetSegmentID())
@ -550,9 +567,13 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
if seg.GetCollectionID() != sc.collectionID || seg.GetDmChannel() != sc.vchannelName {
continue
}
sc.removeSegment(shardSegmentInfo{segmentID: seg.GetSegmentID(), nodeID: seg.GetNodeID()})
nodeID, has := sc.selectNodeInReplica(seg.NodeIds)
if !has {
continue
}
sc.removeSegment(shardSegmentInfo{segmentID: seg.GetSegmentID(), nodeID: nodeID})
removes[seg.GetNodeID()] = append(removes[seg.GetNodeID()], seg.SegmentID)
removes[nodeID] = append(removes[nodeID], seg.SegmentID)
}
var errs errorutil.ErrorList

View File

@ -2,6 +2,7 @@ package querynode
import (
"context"
"errors"
"testing"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -113,6 +114,24 @@ func TestShardClusterService_HandoffVChannelSegments(t *testing.T) {
})
t.Run("error case", func(t *testing.T) {
mqn := &mockShardQueryNode{}
nodeEvents := []nodeEvent{
{
nodeID: 3,
nodeAddr: "addr_3",
},
}
sc := NewShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel,
&mockNodeDetector{initNodes: nodeEvents}, &mockSegmentDetector{}, func(nodeID int64, addr string) shardQueryNode {
return mqn
})
defer sc.Close()
mqn.releaseSegmentsErr = errors.New("mocked error")
// set mocked shard cluster
clusterService.clusters.Store(defaultDMLChannel, sc)
assert.NotPanics(t, func() {
err = clusterService.HandoffVChannelSegments(defaultDMLChannel, &querypb.SegmentChangeInfo{
OfflineSegments: []*querypb.SegmentInfo{

View File

@ -1986,7 +1986,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
},
})
assert.Error(t, err)
assert.NoError(t, err)
})
t.Run("release failed", func(t *testing.T) {