mirror of https://github.com/milvus-io/milvus.git
Fix SegmentChangeInfo is not set properly (#17701)
Signed-off-by: yah01 <yang.cen@zilliz.com>pull/17719/head
parent
16075d358b
commit
b4f21259ef
|
@ -405,11 +405,16 @@ func (c *queryNodeCluster) GetSegmentInfoByID(ctx context.Context, segmentID Uni
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if len(segmentInfo.NodeIds) == 0 {
|
||||
return nil, fmt.Errorf("GetSegmentInfoByID: no node loaded the segment %v", segmentID)
|
||||
}
|
||||
|
||||
node := segmentInfo.NodeIds[0]
|
||||
c.RLock()
|
||||
targetNode, ok := c.nodes[segmentInfo.NodeID]
|
||||
targetNode, ok := c.nodes[node]
|
||||
c.RUnlock()
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("updateSegmentInfo: can't find query node by nodeID, nodeID = %d", segmentInfo.NodeID)
|
||||
return nil, fmt.Errorf("GetSegmentInfoByID: can't find query node by nodeID, nodeID=%v", node)
|
||||
}
|
||||
|
||||
res, err := targetNode.getSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
|
||||
|
@ -429,7 +434,7 @@ func (c *queryNodeCluster) GetSegmentInfoByID(ctx context.Context, segmentID Uni
|
|||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("updateSegmentInfo: can't find segment %d on QueryNode %d", segmentID, segmentInfo.NodeID)
|
||||
return nil, fmt.Errorf("GetSegmentInfoByID: can't find segment %d on QueryNode %d", segmentID, node)
|
||||
}
|
||||
|
||||
func (c *queryNodeCluster) GetSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
|
||||
|
|
|
@ -20,6 +20,25 @@ type balancePlan struct {
|
|||
targetReplica UniqueID
|
||||
}
|
||||
|
||||
// NewAddBalancePlan creates plan for adding nodes into dest replica
|
||||
func NewAddBalancePlan(dest UniqueID, nodes ...UniqueID) *balancePlan {
|
||||
return NewMoveBalancePlan(invalidReplicaID, dest, nodes...)
|
||||
}
|
||||
|
||||
// NewRemoveBalancePlan creates plan for removing nodes from src replica
|
||||
func NewRemoveBalancePlan(src UniqueID, nodes ...UniqueID) *balancePlan {
|
||||
return NewMoveBalancePlan(src, invalidReplicaID, nodes...)
|
||||
}
|
||||
|
||||
// NewMoveBalancePlan creates plan for moving nodes from src replica into dest replicas
|
||||
func NewMoveBalancePlan(src, dest UniqueID, nodes ...UniqueID) *balancePlan {
|
||||
return &balancePlan{
|
||||
nodes: nodes,
|
||||
sourceReplica: src,
|
||||
targetReplica: dest,
|
||||
}
|
||||
}
|
||||
|
||||
type replicaBalancer struct {
|
||||
meta Meta
|
||||
cluster Cluster
|
||||
|
@ -72,11 +91,8 @@ func (b *replicaBalancer) AddNode(nodeID int64) ([]*balancePlan, error) {
|
|||
return replicaAvailableMemory[replicai] < replicaAvailableMemory[replicaj]
|
||||
})
|
||||
|
||||
ret = append(ret, &balancePlan{
|
||||
nodes: []UniqueID{nodeID},
|
||||
sourceReplica: invalidReplicaID,
|
||||
targetReplica: replicas[0].GetReplicaID(),
|
||||
})
|
||||
ret = append(ret,
|
||||
NewAddBalancePlan(replicas[0].GetReplicaID(), nodeID))
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
|
|
@ -599,32 +599,36 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
|
|||
},
|
||||
Infos: []*querypb.SegmentChangeInfo{},
|
||||
}
|
||||
|
||||
for _, info := range onlineInfos {
|
||||
segmentID := info.SegmentID
|
||||
onlineNodeID := info.NodeID
|
||||
changeInfo := &querypb.SegmentChangeInfo{
|
||||
OnlineNodeID: onlineNodeID,
|
||||
OnlineSegments: []*querypb.SegmentInfo{info},
|
||||
}
|
||||
|
||||
offlineInfo, err := m.getSegmentInfoByID(segmentID)
|
||||
if err == nil {
|
||||
offlineNodeID := offlineInfo.NodeID
|
||||
// if the offline segment state is growing, it will not impact the global sealed segments
|
||||
if offlineInfo.SegmentState == commonpb.SegmentState_Sealed {
|
||||
changeInfo.OfflineNodeID = offlineNodeID
|
||||
changeInfo.OfflineSegments = []*querypb.SegmentInfo{offlineInfo}
|
||||
offlineNodes := diffSlice(offlineInfo.NodeIds, info.NodeIds...)
|
||||
|
||||
for _, node := range offlineNodes {
|
||||
segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos,
|
||||
&querypb.SegmentChangeInfo{
|
||||
OfflineNodeID: node,
|
||||
OfflineSegments: []*querypb.SegmentInfo{offlineInfo},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, changeInfo)
|
||||
|
||||
// generate offline segment change info if the loaded segment is compacted from other sealed segments
|
||||
for _, compactionSegmentID := range info.CompactionFrom {
|
||||
compactionSegmentInfo, err := m.getSegmentInfoByID(compactionSegmentID)
|
||||
if err == nil && compactionSegmentInfo.SegmentState == commonpb.SegmentState_Sealed {
|
||||
segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, &querypb.SegmentChangeInfo{
|
||||
OfflineNodeID: compactionSegmentInfo.NodeID,
|
||||
OfflineSegments: []*querypb.SegmentInfo{compactionSegmentInfo},
|
||||
})
|
||||
for _, node := range compactionSegmentInfo.NodeIds {
|
||||
segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, &querypb.SegmentChangeInfo{
|
||||
OfflineNodeID: node,
|
||||
OfflineSegments: []*querypb.SegmentInfo{compactionSegmentInfo},
|
||||
})
|
||||
}
|
||||
segmentsCompactionFrom = append(segmentsCompactionFrom, compactionSegmentInfo)
|
||||
} else {
|
||||
return nil, fmt.Errorf("saveGlobalSealedSegInfos: the compacted segment %d has not been loaded into memory", compactionSegmentID)
|
||||
|
@ -633,29 +637,6 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
|
|||
}
|
||||
col2SegmentChangeInfos[collectionID] = segmentsChangeInfo
|
||||
}
|
||||
queryChannelInfosMap := make(map[UniqueID]*querypb.QueryChannelInfo)
|
||||
for collectionID, infos := range saves {
|
||||
// TODO silverxia change QueryChannelInfo struct to simplifed one
|
||||
// queryChannelInfo contains the GlobalSealedSegment list
|
||||
queryChannelInfo := m.getQueryChannelInfoByID(collectionID)
|
||||
|
||||
// merge save segment info and existing GlobalSealedSegments
|
||||
seg2Info := make(map[UniqueID]*querypb.SegmentInfo)
|
||||
for _, segmentInfo := range queryChannelInfo.GlobalSealedSegments {
|
||||
segmentID := segmentInfo.SegmentID
|
||||
seg2Info[segmentID] = segmentInfo
|
||||
}
|
||||
for _, segmentInfo := range infos {
|
||||
segmentID := segmentInfo.SegmentID
|
||||
seg2Info[segmentID] = segmentInfo
|
||||
}
|
||||
|
||||
globalSealedSegmentInfos := make([]*querypb.SegmentInfo, len(seg2Info))
|
||||
for _, info := range seg2Info {
|
||||
globalSealedSegmentInfos = append(globalSealedSegmentInfos, info)
|
||||
}
|
||||
queryChannelInfo.GlobalSealedSegments = globalSealedSegmentInfos
|
||||
}
|
||||
|
||||
// save segmentInfo to etcd
|
||||
for _, infos := range saves {
|
||||
|
@ -693,13 +674,6 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
|
|||
panic(err)
|
||||
}
|
||||
|
||||
// Write back to cache
|
||||
m.channelMu.Lock()
|
||||
for collectionID, channelInfo := range queryChannelInfosMap {
|
||||
m.queryChannelInfos[collectionID] = channelInfo
|
||||
}
|
||||
m.channelMu.Unlock()
|
||||
|
||||
return col2SegmentChangeInfos, nil
|
||||
}
|
||||
|
||||
|
@ -716,13 +690,14 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
|
|||
Infos: []*querypb.SegmentChangeInfo{},
|
||||
}
|
||||
for _, info := range removes {
|
||||
offlineNodeID := info.NodeID
|
||||
changeInfo := &querypb.SegmentChangeInfo{
|
||||
OfflineNodeID: offlineNodeID,
|
||||
OfflineSegments: []*querypb.SegmentInfo{info},
|
||||
}
|
||||
for _, node := range info.NodeIds {
|
||||
segmentChangeInfos.Infos = append(segmentChangeInfos.Infos,
|
||||
&querypb.SegmentChangeInfo{
|
||||
OfflineNodeID: node,
|
||||
OfflineSegments: []*querypb.SegmentInfo{info},
|
||||
})
|
||||
|
||||
segmentChangeInfos.Infos = append(segmentChangeInfos.Infos, changeInfo)
|
||||
}
|
||||
}
|
||||
|
||||
// produce sealedSegmentChangeInfos to query channel
|
||||
|
@ -1334,6 +1309,33 @@ func getShardNodes(collectionID UniqueID, meta Meta) map[string]map[UniqueID]str
|
|||
return shardNodes
|
||||
}
|
||||
|
||||
// addNode2Segment addes node into segment,
|
||||
// the old one within the same replica will be replaced
|
||||
func addNode2Segment(meta Meta, node UniqueID, replicas []*milvuspb.ReplicaInfo, segment *querypb.SegmentInfo) {
|
||||
for _, oldNode := range segment.NodeIds {
|
||||
isInReplica := false
|
||||
for _, replica := range replicas {
|
||||
if nodeIncluded(oldNode, replica.NodeIds) {
|
||||
// new node is in the same replica, replace the old ones
|
||||
if nodeIncluded(node, replica.NodeIds) {
|
||||
break
|
||||
}
|
||||
|
||||
// The old node is not the offline one
|
||||
isInReplica = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !isInReplica {
|
||||
segment.NodeIds = removeFromSlice(segment.NodeIds, oldNode)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
segment.NodeIds = append(segment.NodeIds, node)
|
||||
}
|
||||
|
||||
// getDataSegmentInfosByIDs return the SegmentInfo details according to the given ids through RPC to datacoord
|
||||
func (m *MetaReplica) getDataSegmentInfosByIDs(segmentIds []int64) ([]*datapb.SegmentInfo, error) {
|
||||
var segmentInfos []*datapb.SegmentInfo
|
||||
|
|
|
@ -281,7 +281,7 @@ func (qs *queryNodeServerMock) GetMetrics(ctx context.Context, req *milvuspb.Get
|
|||
|
||||
totalMemUsage := uint64(0)
|
||||
for _, info := range qs.segmentInfos {
|
||||
if info.NodeID == qs.queryNodeID {
|
||||
if nodeIncluded(qs.queryNodeID, info.NodeIds) {
|
||||
totalMemUsage += uint64(info.MemSize)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -275,7 +275,7 @@ func TestSealedSegmentChangeAfterQueryNodeStop(t *testing.T) {
|
|||
segmentInfos := queryCoord.meta.showSegmentInfos(defaultCollectionID, nil)
|
||||
recoverDone := true
|
||||
for _, info := range segmentInfos {
|
||||
if info.NodeID != queryNode2.queryNodeID {
|
||||
if !nodeIncluded(queryNode2.queryNodeID, info.NodeIds) {
|
||||
recoverDone = false
|
||||
break
|
||||
}
|
||||
|
|
|
@ -2309,38 +2309,36 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
|
|||
for replicaID := range replicas {
|
||||
replicaID := replicaID
|
||||
wg.Go(func() error {
|
||||
return lbt.meta.applyReplicaBalancePlan(&balancePlan{
|
||||
nodes: lbt.SourceNodeIDs,
|
||||
sourceReplica: replicaID,
|
||||
})
|
||||
return lbt.meta.applyReplicaBalancePlan(
|
||||
NewRemoveBalancePlan(replicaID, lbt.SourceNodeIDs...))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Remove offline nodes from segment
|
||||
for _, segment := range segments {
|
||||
segment := segment
|
||||
wg.Go(func() error {
|
||||
segment.NodeID = -1
|
||||
segment.NodeIds = removeFromSlice(segment.NodeIds, lbt.SourceNodeIDs...)
|
||||
// for _, segment := range segments {
|
||||
// segment := segment
|
||||
// wg.Go(func() error {
|
||||
// segment.NodeID = -1
|
||||
// segment.NodeIds = removeFromSlice(segment.NodeIds, lbt.SourceNodeIDs...)
|
||||
|
||||
err := lbt.meta.saveSegmentInfo(segment)
|
||||
if err != nil {
|
||||
log.Error("failed to remove offline nodes from segment info",
|
||||
zap.Int64("segmentID", segment.SegmentID),
|
||||
zap.Error(err))
|
||||
// err := lbt.meta.saveSegmentInfo(segment)
|
||||
// if err != nil {
|
||||
// log.Error("failed to remove offline nodes from segment info",
|
||||
// zap.Int64("segmentID", segment.SegmentID),
|
||||
// zap.Error(err))
|
||||
|
||||
return err
|
||||
}
|
||||
// return err
|
||||
// }
|
||||
|
||||
log.Info("remove offline nodes from segment",
|
||||
zap.Int64("taskID", lbt.getTaskID()),
|
||||
zap.Int64("segmentID", segment.GetSegmentID()),
|
||||
zap.Int64s("nodeIds", segment.GetNodeIds()))
|
||||
// log.Info("remove offline nodes from segment",
|
||||
// zap.Int64("taskID", lbt.getTaskID()),
|
||||
// zap.Int64("segmentID", segment.GetSegmentID()),
|
||||
// zap.Int64s("nodeIds", segment.GetNodeIds()))
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
// return nil
|
||||
// })
|
||||
// }
|
||||
|
||||
// Remove offline nodes from dmChannels
|
||||
for _, dmChannel := range dmChannels {
|
||||
|
|
|
@ -659,14 +659,6 @@ func (scheduler *TaskScheduler) scheduleLoop() {
|
|||
// if triggerCondition == NodeDown, loadSegment and watchDmchannel request will keep reschedule until the success
|
||||
// the node info has been deleted after assgining child task to triggerTask
|
||||
// so it is necessary to update the meta of segment and dmchannel, or some data may be lost in meta
|
||||
|
||||
// triggerTask may be LoadCollection, LoadPartitions, LoadBalance
|
||||
if triggerTask.getResultInfo().ErrorCode == commonpb.ErrorCode_Success || triggerTask.getTriggerCondition() == querypb.TriggerCondition_NodeDown {
|
||||
err = updateSegmentInfoFromTask(scheduler.ctx, triggerTask, scheduler.meta)
|
||||
if err != nil {
|
||||
triggerTask.setResultInfo(err)
|
||||
}
|
||||
}
|
||||
resultInfo := triggerTask.getResultInfo()
|
||||
if resultInfo.ErrorCode != commonpb.ErrorCode_Success {
|
||||
if !alreadyNotify {
|
||||
|
@ -695,6 +687,14 @@ func (scheduler *TaskScheduler) scheduleLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
// triggerTask may be LoadCollection, LoadPartitions, LoadBalance, Handoff
|
||||
if triggerTask.getResultInfo().ErrorCode == commonpb.ErrorCode_Success || triggerTask.getTriggerCondition() == querypb.TriggerCondition_NodeDown {
|
||||
err = updateSegmentInfoFromTask(scheduler.ctx, triggerTask, scheduler.meta)
|
||||
if err != nil {
|
||||
triggerTask.setResultInfo(err)
|
||||
}
|
||||
}
|
||||
|
||||
err = removeTaskFromKVFn(triggerTask)
|
||||
if err != nil {
|
||||
log.Error("scheduleLoop: error when remove trigger and internal tasks from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err))
|
||||
|
@ -963,7 +963,7 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta)
|
|||
if err != nil {
|
||||
segment = &querypb.SegmentInfo{
|
||||
SegmentID: segmentID,
|
||||
CollectionID: loadInfo.CollectionID,
|
||||
CollectionID: collectionID,
|
||||
PartitionID: loadInfo.PartitionID,
|
||||
DmChannel: loadInfo.InsertChannel,
|
||||
SegmentState: commonpb.SegmentState_Sealed,
|
||||
|
@ -975,10 +975,12 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta)
|
|||
}
|
||||
}
|
||||
segment.ReplicaIds = append(segment.ReplicaIds, req.ReplicaID)
|
||||
segment.ReplicaIds = removeFromSlice(segment.GetReplicaIds())
|
||||
|
||||
segment.NodeIds = append(segment.NodeIds, dstNodeID)
|
||||
segment.NodeID = dstNodeID
|
||||
segment.ReplicaIds = uniqueSlice(segment.GetReplicaIds())
|
||||
replicas, err := meta.getReplicasByCollectionID(collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
addNode2Segment(meta, dstNodeID, replicas, segment)
|
||||
|
||||
segments[segmentID] = segment
|
||||
|
||||
|
|
|
@ -1171,7 +1171,7 @@ func TestLoadBalanceAndReschedulSegmentTaskAfterNodeDown(t *testing.T) {
|
|||
|
||||
segmentInfos := queryCoord.meta.getSegmentInfosByNode(node3.queryNodeID)
|
||||
for _, segmentInfo := range segmentInfos {
|
||||
if segmentInfo.NodeID == node3.queryNodeID {
|
||||
if nodeIncluded(node3.queryNodeID, segmentInfo.NodeIds) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
|
@ -209,6 +209,15 @@ func uniqueSlice(origin []UniqueID) []UniqueID {
|
|||
return set.Collect()
|
||||
}
|
||||
|
||||
// diffSlice returns a slice containing items in src but not in diff
|
||||
func diffSlice(src []UniqueID, diff ...UniqueID) []UniqueID {
|
||||
set := make(typeutil.UniqueSet, len(src))
|
||||
set.Insert(src...)
|
||||
set.Remove(diff...)
|
||||
|
||||
return set.Collect()
|
||||
}
|
||||
|
||||
func getReplicaAvailableMemory(cluster Cluster, replica *milvuspb.ReplicaInfo) uint64 {
|
||||
availableMemory := uint64(0)
|
||||
nodes := getNodeInfos(cluster, replica.NodeIds)
|
||||
|
|
Loading…
Reference in New Issue