Clean segments as releasing collection (#17932)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/18021/head
yah01 2022-07-01 19:16:25 +08:00 committed by GitHub
parent f33b090819
commit 27eca2881a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 16 deletions

View File

@ -182,6 +182,11 @@ func (m *MetaReplica) reloadFromKV() error {
if err := m.segmentsInfo.loadSegments(); err != nil {
return err
}
for id, segment := range m.segmentsInfo.segmentIDMap {
if _, ok := m.collectionInfos[segment.CollectionID]; !ok {
delete(m.segmentsInfo.segmentIDMap, id)
}
}
deltaChannelKeys, deltaChannelValues, err := m.getKvClient().LoadWithPrefix(deltaChannelMetaPrefix)
if err != nil {
@ -524,6 +529,14 @@ func (m *MetaReplica) releaseCollection(collectionID UniqueID) error {
m.replicas.Remove(collection.ReplicaIds...)
m.segmentsInfo.mu.Lock()
for id, segment := range m.segmentsInfo.segmentIDMap {
if segment.CollectionID == collectionID {
delete(m.segmentsInfo.segmentIDMap, id)
}
}
m.segmentsInfo.mu.Unlock()
return nil
}
@ -1182,6 +1195,9 @@ func removeCollectionMeta(collectionID UniqueID, replicas []UniqueID, kv kv.Meta
prefixes = append(prefixes, replicaPrefix)
}
prefixes = append(prefixes,
fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, collectionID))
return kv.MultiRemoveWithPrefix(prefixes)
}

View File

@ -319,7 +319,8 @@ func TestReloadMetaFromKV(t *testing.T) {
kvs[collectionKey] = string(collectionBlobs)
segmentInfo := &querypb.SegmentInfo{
SegmentID: defaultSegmentID,
SegmentID: defaultSegmentID,
CollectionID: defaultCollectionID,
}
segmentBlobs, err := proto.Marshal(segmentInfo)
assert.Nil(t, err)

View File

@ -1913,7 +1913,14 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro
recoveredCollectionIDs.Insert(watchInfo.CollectionID)
}
log.Debug("loadBalanceTask: ready to process segments and channels on offline node",
zap.Int64("taskID", lbt.getTaskID()),
zap.Int("segmentNum", len(segments)),
zap.Int("dmChannelNum", len(dmChannels)))
if len(segments) == 0 && len(dmChannels) == 0 {
log.Debug("loadBalanceTask: no segment/channel on this offline node, skip it",
zap.Int64("taskID", lbt.getTaskID()))
continue
}
@ -1922,7 +1929,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro
watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0)
collectionInfo, err := lbt.meta.getCollectionInfoByID(collectionID)
if err != nil {
log.Error("loadBalanceTask: get collectionInfo from meta failed", zap.Int64("collectionID", collectionID), zap.Error(err))
log.Error("loadBalanceTask: get collectionInfo from meta failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Error(err))
lbt.setResultInfo(err)
return err
}
@ -1934,25 +1941,25 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro
if collectionInfo.LoadType == querypb.LoadType_LoadCollection {
toRecoverPartitionIDs, err = lbt.broker.showPartitionIDs(ctx, collectionID)
if err != nil {
log.Error("loadBalanceTask: show collection's partitionIDs failed", zap.Int64("collectionID", collectionID), zap.Error(err))
log.Error("loadBalanceTask: show collection's partitionIDs failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Error(err))
lbt.setResultInfo(err)
panic(err)
}
} else {
toRecoverPartitionIDs = collectionInfo.PartitionIDs
}
log.Info("loadBalanceTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", toRecoverPartitionIDs))
log.Info("loadBalanceTask: get collection's all partitionIDs", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", toRecoverPartitionIDs))
replica, err := lbt.getReplica(nodeID, collectionID)
if err != nil {
// getReplica maybe failed, it will cause the balanceTask execute infinitely
log.Warn("loadBalanceTask: get replica failed", zap.Int64("collectionID", collectionID), zap.Int64("nodeId", nodeID))
log.Warn("loadBalanceTask: get replica failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64("nodeId", nodeID))
continue
}
for _, partitionID := range toRecoverPartitionIDs {
vChannelInfos, binlogs, err := lbt.broker.getRecoveryInfo(lbt.ctx, collectionID, partitionID)
if err != nil {
log.Error("loadBalanceTask: getRecoveryInfo failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err))
log.Error("loadBalanceTask: getRecoveryInfo failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err))
lbt.setResultInfo(err)
panic(err)
}
@ -1986,7 +1993,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro
for _, info := range vChannelInfos {
deltaChannel, err := generateWatchDeltaChannelInfo(info)
if err != nil {
log.Error("loadBalanceTask: generateWatchDeltaChannelInfo failed", zap.Int64("collectionID", collectionID), zap.String("channelName", info.ChannelName), zap.Error(err))
log.Error("loadBalanceTask: generateWatchDeltaChannelInfo failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.String("channelName", info.ChannelName), zap.Error(err))
lbt.setResultInfo(err)
panic(err)
}
@ -1999,7 +2006,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
err = lbt.meta.setDeltaChannel(collectionID, mergedDeltaChannel)
if err != nil {
log.Error("loadBalanceTask: set delta channel info meta failed", zap.Int64("collectionID", collectionID), zap.Error(err))
log.Error("loadBalanceTask: set delta channel info meta failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Error(err))
lbt.setResultInfo(err)
panic(err)
}
@ -2038,7 +2045,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro
tasks, err := assignInternalTask(ctx, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, false, lbt.SourceNodeIDs, lbt.DstNodeIDs, replica.GetReplicaID(), lbt.broker)
if err != nil {
log.Error("loadBalanceTask: assign child task failed", zap.Int64("sourceNodeID", nodeID))
log.Error("loadBalanceTask: assign child task failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("sourceNodeID", nodeID))
lbt.setResultInfo(err)
return err
}
@ -2047,9 +2054,9 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro
}
for _, internalTask := range internalTasks {
lbt.addChildTask(internalTask)
log.Info("loadBalanceTask: add a childTask", zap.String("task type", internalTask.msgType().String()), zap.Any("task", internalTask))
log.Info("loadBalanceTask: add a childTask", zap.Int64("taskID", lbt.getTaskID()), zap.String("task type", internalTask.msgType().String()), zap.Any("task", internalTask))
}
log.Info("loadBalanceTask: assign child task done", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs))
log.Info("loadBalanceTask: assign child task done", zap.Int64("taskID", lbt.getTaskID()), zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs))
return nil
}
@ -2250,7 +2257,8 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error {
zap.Int32("trigger type", int32(lbt.triggerCondition)),
zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs),
zap.Any("balanceReason", lbt.BalanceReason),
zap.Int64("taskID", lbt.getTaskID()))
zap.Int64("taskID", lbt.getTaskID()),
zap.Int("childTaskNum", len(lbt.childTasks)))
return nil
}
@ -2285,16 +2293,20 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
}
log.Debug("removing offline nodes from replicas and segments...",
zap.Int("replicaNum", len(replicas)),
zap.Int("segmentNum", len(segments)),
zap.Int64("triggerTaskID", lbt.getTaskID()),
)
zap.Int("replicaNum", len(replicas)),
zap.Int("segmentNum", len(segments)))
wg := errgroup.Group{}
// Remove offline nodes from replica
for replicaID := range replicas {
replicaID := replicaID
wg.Go(func() error {
log.Debug("remove offline nodes from replica",
zap.Int64("taskID", lbt.taskID),
zap.Int64("replicaID", replicaID),
zap.Int64s("offlineNodes", lbt.SourceNodeIDs))
return lbt.meta.applyReplicaBalancePlan(
NewRemoveBalancePlan(replicaID, lbt.SourceNodeIDs...))
})

View File

@ -136,6 +136,9 @@ func syncReplicaSegments(ctx context.Context, meta Meta, cluster Cluster, replic
for _, shard := range replica.ShardReplicas {
if len(shards) > 0 && !isInShards(shard.DmChannelName, shards) {
log.Debug("skip this shard",
zap.Int64("replicaID", replicaID),
zap.String("shard", shard.DmChannelName))
continue
}
@ -155,6 +158,12 @@ func syncReplicaSegments(ctx context.Context, meta Meta, cluster Cluster, replic
for i, j := 0, 0; i < len(segments); i = j {
node := getNodeInReplica(replica, segments[i].NodeIds)
if node < 0 {
log.Warn("syncReplicaSegments: no segment node in replica",
zap.Int64("replicaID", replicaID),
zap.Any("segment", segments[i]))
}
partition := segments[i].PartitionID
j++
@ -207,7 +216,7 @@ func getNodeInReplica(replica *milvuspb.ReplicaInfo, nodes []UniqueID) UniqueID
}
}
return 0
return -1
}
func removeFromSlice(origin []UniqueID, del ...UniqueID) []UniqueID {