fix: [cherry-pick] azure ListObjects (#27931) (#28913)

issue: #27932
master pr: #27931
2.3 pr: #28894

Signed-off-by: PowderLi <min.li@zilliz.com>
pull/28936/head
PowderLi 2023-12-03 19:00:32 +08:00 committed by GitHub
parent 0b6235a1f5
commit 628df9741c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 207 additions and 71 deletions

View File

@ -2021,8 +2021,10 @@ func TestSearchTask_Requery(t *testing.T) {
err := qt.Requery()
assert.NoError(t, err)
assert.Len(t, qt.result.Results.FieldsData, 2)
assert.Equal(t, pkField, qt.result.Results.FieldsData[0].GetFieldName())
assert.Equal(t, vecField, qt.result.Results.FieldsData[1].GetFieldName())
for _, field := range qt.result.Results.FieldsData {
fieldName := field.GetFieldName()
assert.Contains(t, []string{pkField, vecField}, fieldName)
}
})
t.Run("Test no primary key", func(t *testing.T) {

View File

@ -117,21 +117,43 @@ func (AzureObjectStorage *AzureObjectStorage) StatObject(ctx context.Context, bu
return *info.ContentLength, nil
}
func (AzureObjectStorage *AzureObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error) {
pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{
Prefix: &prefix,
})
objects := map[string]time.Time{}
if pager.More() {
pageResp, err := pager.NextPage(context.Background())
if err != nil {
return nil, err
func (AzureObjectStorage *AzureObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) {
var objectsKeys []string
var modTimes []time.Time
if recursive {
pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{
Prefix: &prefix,
})
if pager.More() {
pageResp, err := pager.NextPage(context.Background())
if err != nil {
return []string{}, []time.Time{}, checkObjectStorageError(prefix, err)
}
for _, blob := range pageResp.Segment.BlobItems {
objectsKeys = append(objectsKeys, *blob.Name)
modTimes = append(modTimes, *blob.Properties.LastModified)
}
}
for _, blob := range pageResp.Segment.BlobItems {
objects[*blob.Name] = *blob.Properties.LastModified
} else {
pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
Prefix: &prefix,
})
if pager.More() {
pageResp, err := pager.NextPage(context.Background())
if err != nil {
return []string{}, []time.Time{}, checkObjectStorageError(prefix, err)
}
for _, blob := range pageResp.Segment.BlobItems {
objectsKeys = append(objectsKeys, *blob.Name)
modTimes = append(modTimes, *blob.Properties.LastModified)
}
for _, blob := range pageResp.Segment.BlobPrefixes {
objectsKeys = append(objectsKeys, *blob.Name)
modTimes = append(modTimes, time.Now())
}
}
}
return objects, nil
return objectsKeys, modTimes, nil
}
func (AzureObjectStorage *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error {

View File

@ -19,6 +19,7 @@ package storage
import (
"bytes"
"context"
"fmt"
"io"
"os"
"testing"
@ -127,10 +128,10 @@ func TestAzureObjectStorage(t *testing.T) {
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false)
gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for key := range gotk {
for _, key := range gotk {
err := testCM.RemoveObject(ctx, config.bucketName, key)
assert.NoError(t, err)
}
@ -138,6 +139,58 @@ func TestAzureObjectStorage(t *testing.T) {
}
})
t.Run("test list", func(t *testing.T) {
testCM, err := newAzureObjectStorageWithConfig(ctx, &config)
assert.Equal(t, err, nil)
defer testCM.DeleteContainer(ctx, config.bucketName, &azblob.DeleteContainerOptions{})
prepareTests := []struct {
valid bool
key string
value []byte
}{
{false, "abc/", []byte("123")},
{true, "abc/d", []byte("1234")},
{false, "abc/d/e", []byte("12345")},
{true, "abc/e/d", []byte("12354")},
{true, "key_/1/1", []byte("111")},
{true, "key_/1/2", []byte("222")},
{false, "key_/1/2/3", []byte("333")},
{true, "key_/2/3", []byte("333")},
}
for _, test := range prepareTests {
err := testCM.PutObject(ctx, config.bucketName, test.key, bytes.NewReader(test.value), int64(len(test.value)))
require.Nil(t, err)
if !test.valid {
err := testCM.RemoveObject(ctx, config.bucketName, test.key)
require.Nil(t, err)
}
}
insertWithPrefixTests := []struct {
recursive bool
prefix string
expectedValue []string
}{
{true, "abc/", []string{"abc/d", "abc/e/d"}},
{true, "key_/", []string{"key_/1/1", "key_/1/2", "key_/2/3"}},
{false, "abc/", []string{"abc/d", "abc/e/"}},
{false, "key_/", []string{"key_/1/", "key_/2/"}},
}
for _, test := range insertWithPrefixTests {
t.Run(fmt.Sprintf("prefix: %s, recursive: %t", test.prefix, test.recursive), func(t *testing.T) {
gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, test.recursive)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for _, key := range gotk {
assert.Contains(t, test.expectedValue, key)
}
})
}
})
t.Run("test useIAM", func(t *testing.T) {
var err error
config.useIAM = true

View File

@ -17,6 +17,7 @@
package storage
import (
"container/list"
"context"
"fmt"
"io"
@ -170,20 +171,43 @@ func (minioObjectStorage *MinioObjectStorage) StatObject(ctx context.Context, bu
return info.Size, err
}
func (minioObjectStorage *MinioObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error) {
res := minioObjectStorage.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
Prefix: prefix,
Recursive: recursive,
})
func (minioObjectStorage *MinioObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) {
var objectsKeys []string
var modTimes []time.Time
tasks := list.New()
tasks.PushBack(prefix)
for tasks.Len() > 0 {
e := tasks.Front()
pre := e.Value.(string)
tasks.Remove(e)
objects := map[string]time.Time{}
for object := range res {
if !recursive && object.Err != nil {
return map[string]time.Time{}, object.Err
res := minioObjectStorage.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
Prefix: pre,
Recursive: false,
})
objects := map[string]time.Time{}
for object := range res {
if object.Err != nil {
log.Warn("failed to list with prefix", zap.String("bucket", bucketName), zap.String("prefix", prefix), zap.Error(object.Err))
return []string{}, []time.Time{}, object.Err
}
objects[object.Key] = object.LastModified
}
for object, lastModified := range objects {
// with tailing "/", object is a "directory"
if strings.HasSuffix(object, "/") && recursive {
// enqueue when recursive is true
if object != pre {
tasks.PushBack(object)
}
continue
}
objectsKeys = append(objectsKeys, object)
modTimes = append(modTimes, lastModified)
}
objects[object.Key] = object.LastModified
}
return objects, nil
return objectsKeys, modTimes, nil
}
func (minioObjectStorage *MinioObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error {

View File

@ -19,6 +19,7 @@ package storage
import (
"bytes"
"context"
"fmt"
"io"
"testing"
@ -131,10 +132,10 @@ func TestMinioObjectStorage(t *testing.T) {
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false)
gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for key := range gotk {
for _, key := range gotk {
err := testCM.RemoveObject(ctx, config.bucketName, key)
assert.NoError(t, err)
}
@ -142,6 +143,54 @@ func TestMinioObjectStorage(t *testing.T) {
}
})
t.Run("test list", func(t *testing.T) {
testCM, err := newMinioObjectStorageWithConfig(ctx, &config)
assert.Equal(t, err, nil)
defer testCM.RemoveBucket(ctx, config.bucketName)
prepareTests := []struct {
valid bool
key string
value []byte
}{
{false, "abc/", []byte("123")},
{true, "abc/d", []byte("1234")},
{false, "abc/d/e", []byte("12345")},
{true, "abc/e/d", []byte("12354")},
{true, "key_/1/1", []byte("111")},
{true, "key_/1/2", []byte("222")},
{false, "key_/1/2/3", []byte("333")},
{true, "key_/2/3", []byte("333")},
}
for _, test := range prepareTests {
err := testCM.PutObject(ctx, config.bucketName, test.key, bytes.NewReader(test.value), int64(len(test.value)))
require.Equal(t, test.valid, err == nil)
}
insertWithPrefixTests := []struct {
recursive bool
prefix string
expectedValue []string
}{
{true, "abc/", []string{"abc/d", "abc/e/d"}},
{true, "key_/", []string{"key_/1/1", "key_/1/2", "key_/2/3"}},
{false, "abc/", []string{"abc/d", "abc/e/"}},
{false, "key_/", []string{"key_/1/", "key_/2/"}},
}
for _, test := range insertWithPrefixTests {
t.Run(fmt.Sprintf("prefix: %s, recursive: %t", test.prefix, test.recursive), func(t *testing.T) {
gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, test.recursive)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for _, key := range gotk {
assert.Contains(t, test.expectedValue, key)
}
})
}
})
t.Run("test useIAM", func(t *testing.T) {
var err error
config.useIAM = true

View File

@ -18,7 +18,6 @@ package storage
import (
"bytes"
"container/list"
"context"
"io"
"strings"
@ -50,7 +49,7 @@ type ObjectStorage interface {
GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error)
PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error
StatObject(ctx context.Context, bucketName, objectName string) (int64, error)
ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error)
ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error)
RemoveObject(ctx context.Context, bucketName, objectName string) error
}
@ -279,14 +278,10 @@ func (mcm *RemoteChunkManager) MultiRemove(ctx context.Context, keys []string) e
// RemoveWithPrefix removes all objects with the same prefix @prefix from minio.
func (mcm *RemoteChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error {
objects, err := mcm.listObjects(ctx, mcm.bucketName, prefix, true)
removeKeys, _, err := mcm.listObjects(ctx, mcm.bucketName, prefix, true)
if err != nil {
return err
}
removeKeys := make([]string, 0)
for key := range objects {
removeKeys = append(removeKeys, key)
}
i := 0
maxGoroutine := 10
for i < len(removeKeys) {
@ -321,38 +316,9 @@ func (mcm *RemoteChunkManager) ListWithPrefix(ctx context.Context, prefix string
// recursive = true may timeout during the recursive browsing the objects.
// See also: https://github.com/milvus-io/milvus/issues/19095
var objectsKeys []string
var modTimes []time.Time
tasks := list.New()
tasks.PushBack(prefix)
for tasks.Len() > 0 {
e := tasks.Front()
pre := e.Value.(string)
tasks.Remove(e)
// TODO add concurrent call if performance matters
// only return current level per call
objects, err := mcm.listObjects(ctx, mcm.bucketName, pre, false)
if err != nil {
return nil, nil, err
}
for object, lastModified := range objects {
// with tailing "/", object is a "directory"
if strings.HasSuffix(object, "/") && recursive {
// enqueue when recursive is true
if object != pre {
tasks.PushBack(object)
}
continue
}
objectsKeys = append(objectsKeys, object)
modTimes = append(modTimes, lastModified)
}
}
return objectsKeys, modTimes, nil
// TODO add concurrent call if performance matters
// only return current level per call
return mcm.listObjects(ctx, mcm.bucketName, prefix, recursive)
}
func (mcm *RemoteChunkManager) getObject(ctx context.Context, bucketName, objectName string,
@ -424,10 +390,10 @@ func (mcm *RemoteChunkManager) getObjectSize(ctx context.Context, bucketName, ob
return info, err
}
func (mcm *RemoteChunkManager) listObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error) {
func (mcm *RemoteChunkManager) listObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) {
start := timerecord.NewTimeRecorder("listObjects")
res, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive)
blobNames, lastModifiedTime, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive)
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.TotalLabel).Inc()
if err == nil {
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataListLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
@ -436,7 +402,7 @@ func (mcm *RemoteChunkManager) listObjects(ctx context.Context, bucketName strin
log.Warn("failed to list with prefix", zap.String("bucket", mcm.bucketName), zap.String("prefix", prefix), zap.Error(err))
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.FailLabel).Inc()
}
return res, err
return blobNames, lastModifiedTime, err
}
func (mcm *RemoteChunkManager) removeObject(ctx context.Context, bucketName, objectName string) error {
@ -453,3 +419,23 @@ func (mcm *RemoteChunkManager) removeObject(ctx context.Context, bucketName, obj
return err
}
func checkObjectStorageError(fileName string, err error) error {
if err == nil {
return nil
}
switch err := err.(type) {
case *azcore.ResponseError:
if err.ErrorCode == string(bloberror.BlobNotFound) {
return merr.WrapErrIoKeyNotFound(fileName, err.Error())
}
return merr.WrapErrIoFailed(fileName, err.Error())
case minio.ErrorResponse:
if err.Code == "NoSuchKey" {
return merr.WrapErrIoKeyNotFound(fileName, err.Error())
}
return merr.WrapErrIoFailed(fileName, err.Error())
}
return merr.WrapErrIoFailed(fileName, err.Error())
}