Fix bug for batch delete files on gcp of minio (#23052)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
hotfix-2.2.3
cai.zhang 2023-03-28 16:30:21 +08:00 committed by GitHub
parent 3484851511
commit c60addc877
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 4 deletions

View File

@ -176,6 +176,8 @@ func (gc *garbageCollector) recycleSegIndexesMeta() {
if meta.IsDeleted || gc.metaTable.IsIndexDeleted(meta.CollectionID, meta.IndexID) {
if meta.NodeID != 0 {
// wait for releasing reference lock
log.Ctx(gc.ctx).Warn("nodeID is not zero, wait for releasing reference lock",
zap.Int64("buildID", meta.BuildID), zap.Int64("segID", meta.SegmentID), zap.Int64("nodeID", meta.NodeID))
continue
}
if err := gc.metaTable.RemoveSegmentIndex(meta.CollectionID, meta.PartitionID, meta.SegmentID, meta.BuildID); err != nil {

View File

@ -26,6 +26,8 @@ import (
"strings"
"time"
"golang.org/x/sync/errgroup"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap"
@ -353,10 +355,31 @@ func (mcm *MinioChunkManager) MultiRemove(ctx context.Context, keys []string) er
// RemoveWithPrefix removes all objects with the same prefix @prefix from minio.
func (mcm *MinioChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error {
objects := mcm.listMinioObjects(ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true})
for rErr := range mcm.removeMinioObjects(ctx, mcm.bucketName, objects, minio.RemoveObjectsOptions{GovernanceBypass: false}) {
if rErr.Err != nil {
log.Warn("failed to remove objects", zap.String("prefix", prefix), zap.Error(rErr.Err))
return rErr.Err
i := 0
maxGoroutine := 100
removeKeys := make([]string, 0, len(objects))
for object := range objects {
if object.Err != nil {
return object.Err
}
removeKeys = append(removeKeys, object.Key)
}
for i < len(removeKeys) {
runningGroup, groupCtx := errgroup.WithContext(ctx)
for j := 0; j < maxGoroutine && i < len(removeKeys); j++ {
key := removeKeys[i]
runningGroup.Go(func() error {
err := mcm.removeMinioObject(groupCtx, mcm.bucketName, key, minio.RemoveObjectOptions{})
if err != nil {
log.Warn("failed to remove object", zap.String("path", key), zap.Error(err))
return err
}
return nil
})
i++
}
if err := runningGroup.Wait(); err != nil {
return err
}
}
return nil