mirror of https://github.com/milvus-io/milvus.git
Clear useless segments info. (#12262)
Signed-off-by: godchen <qingxiang.chen@zilliz.com>pull/12274/head
parent
eb48013042
commit
88012acb86
|
@ -408,12 +408,10 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
|
|||
|
||||
if len(watchDeltaChannels) != len(recoveryInfo.Channels) {
|
||||
for _, info := range recoveryInfo.Channels {
|
||||
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
|
||||
deltaChannel, err := generateWatchDeltaChannelInfo(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
deltaChannel := proto.Clone(info).(*datapb.VchannelInfo)
|
||||
deltaChannel.ChannelName = deltaChannelName
|
||||
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
|
||||
}
|
||||
}
|
||||
|
@ -775,12 +773,10 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
|
|||
|
||||
if len(watchDeltaChannels) != len(recoveryInfo.Channels) {
|
||||
for _, info := range recoveryInfo.Channels {
|
||||
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
|
||||
deltaChannel, err := generateWatchDeltaChannelInfo(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
deltaChannel := proto.Clone(info).(*datapb.VchannelInfo)
|
||||
deltaChannel.ChannelName = deltaChannelName
|
||||
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
|
||||
}
|
||||
}
|
||||
|
@ -1578,12 +1574,10 @@ func (ht *handoffTask) execute(ctx context.Context) error {
|
|||
}
|
||||
if len(watchDeltaChannels) != len(recoveryInfo.Channels) {
|
||||
for _, info := range recoveryInfo.Channels {
|
||||
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
|
||||
deltaChannel, err := generateWatchDeltaChannelInfo(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
deltaChannel := proto.Clone(info).(*datapb.VchannelInfo)
|
||||
deltaChannel.ChannelName = deltaChannelName
|
||||
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
|
||||
}
|
||||
}
|
||||
|
@ -1770,12 +1764,10 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
|
|||
|
||||
if len(watchDeltaChannels) != len(recoveryInfo.Channels) {
|
||||
for _, info := range recoveryInfo.Channels {
|
||||
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
|
||||
deltaChannel, err := generateWatchDeltaChannelInfo(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
deltaChannel := proto.Clone(info).(*datapb.VchannelInfo)
|
||||
deltaChannel.ChannelName = deltaChannelName
|
||||
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
|
||||
}
|
||||
}
|
||||
|
@ -1975,12 +1967,10 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
|
|||
|
||||
if len(watchDeltaChannels) != len(recoveryInfo.Channels) {
|
||||
for _, info := range recoveryInfo.Channels {
|
||||
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
|
||||
deltaChannel, err := generateWatchDeltaChannelInfo(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
deltaChannel := proto.Clone(info).(*datapb.VchannelInfo)
|
||||
deltaChannel.ChannelName = deltaChannelName
|
||||
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
|
||||
}
|
||||
}
|
||||
|
@ -2213,3 +2203,16 @@ func assignInternalTask(ctx context.Context,
|
|||
func getSizeOfLoadSegmentReq(req *querypb.LoadSegmentsRequest) int {
|
||||
return proto.Size(req)
|
||||
}
|
||||
|
||||
func generateWatchDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelInfo, error) {
|
||||
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
deltaChannel := proto.Clone(info).(*datapb.VchannelInfo)
|
||||
deltaChannel.ChannelName = deltaChannelName
|
||||
deltaChannel.UnflushedSegments = nil
|
||||
deltaChannel.FlushedSegments = nil
|
||||
deltaChannel.DroppedSegments = nil
|
||||
return deltaChannel, nil
|
||||
}
|
||||
|
|
|
@ -853,9 +853,10 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("start release history pids", zap.Any("pids", pids))
|
||||
log.Debug("start release history pids", zap.Any("pids", pids), zap.Any("load type", hCol.getLoadType()))
|
||||
if len(pids) == 0 && hCol.getLoadType() == loadTypePartition {
|
||||
r.node.dataSyncService.removeCollectionDeltaFlowGraph(r.req.CollectionID)
|
||||
log.Debug("release delta channels", zap.Any("deltaChannels", hCol.getVDeltaChannels()))
|
||||
vChannels := hCol.getVDeltaChannels()
|
||||
for _, channel := range vChannels {
|
||||
log.Debug("Releasing tSafe in releasePartitionTask...",
|
||||
|
|
Loading…
Reference in New Issue