fix azure ListObjects (#27931)

Signed-off-by: PowderLi <min.li@zilliz.com>
pull/27992/head
PowderLi 2023-11-01 11:34:14 +08:00 committed by GitHub
parent 2aa330146a
commit 0252871d30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 188 additions and 72 deletions

View File

@ -47,7 +47,7 @@ jobs:
- name: Setup Go environment
uses: actions/setup-go@v2.2.0
with:
go-version: '1.20'
go-version: '~1.20.7'
- name: Mac Cache Go Mod Volumes
uses: actions/cache@v3
with:

View File

@ -2007,8 +2007,10 @@ func TestSearchTask_Requery(t *testing.T) {
err := qt.Requery()
assert.NoError(t, err)
assert.Len(t, qt.result.Results.FieldsData, 2)
assert.Equal(t, pkField, qt.result.Results.FieldsData[0].GetFieldName())
assert.Equal(t, vecField, qt.result.Results.FieldsData[1].GetFieldName())
for _, field := range qt.result.Results.FieldsData {
fieldName := field.GetFieldName()
assert.Contains(t, []string{pkField, vecField}, fieldName)
}
})
t.Run("Test no primary key", func(t *testing.T) {

View File

@ -117,21 +117,43 @@ func (AzureObjectStorage *AzureObjectStorage) StatObject(ctx context.Context, bu
return *info.ContentLength, nil
}
func (AzureObjectStorage *AzureObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error) {
pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{
Prefix: &prefix,
})
objects := map[string]time.Time{}
if pager.More() {
pageResp, err := pager.NextPage(context.Background())
if err != nil {
return nil, checkObjectStorageError(prefix, err)
func (AzureObjectStorage *AzureObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) {
var objectsKeys []string
var modTimes []time.Time
if recursive {
pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{
Prefix: &prefix,
})
if pager.More() {
pageResp, err := pager.NextPage(context.Background())
if err != nil {
return []string{}, []time.Time{}, checkObjectStorageError(prefix, err)
}
for _, blob := range pageResp.Segment.BlobItems {
objectsKeys = append(objectsKeys, *blob.Name)
modTimes = append(modTimes, *blob.Properties.LastModified)
}
}
for _, blob := range pageResp.Segment.BlobItems {
objects[*blob.Name] = *blob.Properties.LastModified
} else {
pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
Prefix: &prefix,
})
if pager.More() {
pageResp, err := pager.NextPage(context.Background())
if err != nil {
return []string{}, []time.Time{}, checkObjectStorageError(prefix, err)
}
for _, blob := range pageResp.Segment.BlobItems {
objectsKeys = append(objectsKeys, *blob.Name)
modTimes = append(modTimes, *blob.Properties.LastModified)
}
for _, blob := range pageResp.Segment.BlobPrefixes {
objectsKeys = append(objectsKeys, *blob.Name)
modTimes = append(modTimes, time.Now())
}
}
}
return objects, nil
return objectsKeys, modTimes, nil
}
func (AzureObjectStorage *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error {

View File

@ -19,6 +19,7 @@ package storage
import (
"bytes"
"context"
"fmt"
"io"
"os"
"testing"
@ -127,10 +128,10 @@ func TestAzureObjectStorage(t *testing.T) {
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false)
gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for key := range gotk {
for _, key := range gotk {
err := testCM.RemoveObject(ctx, config.bucketName, key)
assert.NoError(t, err)
}
@ -138,6 +139,58 @@ func TestAzureObjectStorage(t *testing.T) {
}
})
t.Run("test list", func(t *testing.T) {
testCM, err := newAzureObjectStorageWithConfig(ctx, &config)
assert.Equal(t, err, nil)
defer testCM.DeleteContainer(ctx, config.bucketName, &azblob.DeleteContainerOptions{})
prepareTests := []struct {
valid bool
key string
value []byte
}{
{false, "abc/", []byte("123")},
{true, "abc/d", []byte("1234")},
{false, "abc/d/e", []byte("12345")},
{true, "abc/e/d", []byte("12354")},
{true, "key_/1/1", []byte("111")},
{true, "key_/1/2", []byte("222")},
{false, "key_/1/2/3", []byte("333")},
{true, "key_/2/3", []byte("333")},
}
for _, test := range prepareTests {
err := testCM.PutObject(ctx, config.bucketName, test.key, bytes.NewReader(test.value), int64(len(test.value)))
require.Nil(t, err)
if !test.valid {
err := testCM.RemoveObject(ctx, config.bucketName, test.key)
require.Nil(t, err)
}
}
insertWithPrefixTests := []struct {
recursive bool
prefix string
expectedValue []string
}{
{true, "abc/", []string{"abc/d", "abc/e/d"}},
{true, "key_/", []string{"key_/1/1", "key_/1/2", "key_/2/3"}},
{false, "abc/", []string{"abc/d", "abc/e/"}},
{false, "key_/", []string{"key_/1/", "key_/2/"}},
}
for _, test := range insertWithPrefixTests {
t.Run(fmt.Sprintf("prefix: %s, recursive: %t", test.prefix, test.recursive), func(t *testing.T) {
gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, test.recursive)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for _, key := range gotk {
assert.Contains(t, test.expectedValue, key)
}
})
}
})
t.Run("test useIAM", func(t *testing.T) {
var err error
config.useIAM = true

View File

@ -17,6 +17,7 @@
package storage
import (
"container/list"
"context"
"fmt"
"io"
@ -170,20 +171,43 @@ func (minioObjectStorage *MinioObjectStorage) StatObject(ctx context.Context, bu
return info.Size, checkObjectStorageError(objectName, err)
}
func (minioObjectStorage *MinioObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error) {
res := minioObjectStorage.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
Prefix: prefix,
Recursive: recursive,
})
func (minioObjectStorage *MinioObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) {
var objectsKeys []string
var modTimes []time.Time
tasks := list.New()
tasks.PushBack(prefix)
for tasks.Len() > 0 {
e := tasks.Front()
pre := e.Value.(string)
tasks.Remove(e)
objects := map[string]time.Time{}
for object := range res {
if !recursive && object.Err != nil {
return map[string]time.Time{}, object.Err
res := minioObjectStorage.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
Prefix: pre,
Recursive: false,
})
objects := map[string]time.Time{}
for object := range res {
if object.Err != nil {
log.Warn("failed to list with prefix", zap.String("bucket", bucketName), zap.String("prefix", prefix), zap.Error(object.Err))
return []string{}, []time.Time{}, object.Err
}
objects[object.Key] = object.LastModified
}
for object, lastModified := range objects {
// with tailing "/", object is a "directory"
if strings.HasSuffix(object, "/") && recursive {
// enqueue when recursive is true
if object != pre {
tasks.PushBack(object)
}
continue
}
objectsKeys = append(objectsKeys, object)
modTimes = append(modTimes, lastModified)
}
objects[object.Key] = object.LastModified
}
return objects, nil
return objectsKeys, modTimes, nil
}
func (minioObjectStorage *MinioObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error {

View File

@ -19,6 +19,7 @@ package storage
import (
"bytes"
"context"
"fmt"
"io"
"testing"
@ -131,10 +132,10 @@ func TestMinioObjectStorage(t *testing.T) {
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false)
gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for key := range gotk {
for _, key := range gotk {
err := testCM.RemoveObject(ctx, config.bucketName, key)
assert.NoError(t, err)
}
@ -142,6 +143,54 @@ func TestMinioObjectStorage(t *testing.T) {
}
})
t.Run("test list", func(t *testing.T) {
testCM, err := newMinioObjectStorageWithConfig(ctx, &config)
assert.Equal(t, err, nil)
defer testCM.RemoveBucket(ctx, config.bucketName)
prepareTests := []struct {
valid bool
key string
value []byte
}{
{false, "abc/", []byte("123")},
{true, "abc/d", []byte("1234")},
{false, "abc/d/e", []byte("12345")},
{true, "abc/e/d", []byte("12354")},
{true, "key_/1/1", []byte("111")},
{true, "key_/1/2", []byte("222")},
{false, "key_/1/2/3", []byte("333")},
{true, "key_/2/3", []byte("333")},
}
for _, test := range prepareTests {
err := testCM.PutObject(ctx, config.bucketName, test.key, bytes.NewReader(test.value), int64(len(test.value)))
require.Equal(t, test.valid, err == nil)
}
insertWithPrefixTests := []struct {
recursive bool
prefix string
expectedValue []string
}{
{true, "abc/", []string{"abc/d", "abc/e/d"}},
{true, "key_/", []string{"key_/1/1", "key_/1/2", "key_/2/3"}},
{false, "abc/", []string{"abc/d", "abc/e/"}},
{false, "key_/", []string{"key_/1/", "key_/2/"}},
}
for _, test := range insertWithPrefixTests {
t.Run(fmt.Sprintf("prefix: %s, recursive: %t", test.prefix, test.recursive), func(t *testing.T) {
gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, test.recursive)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for _, key := range gotk {
assert.Contains(t, test.expectedValue, key)
}
})
}
})
t.Run("test useIAM", func(t *testing.T) {
var err error
config.useIAM = true

View File

@ -18,7 +18,6 @@ package storage
import (
"bytes"
"container/list"
"context"
"io"
"strings"
@ -50,7 +49,7 @@ type ObjectStorage interface {
GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error)
PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error
StatObject(ctx context.Context, bucketName, objectName string) (int64, error)
ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error)
ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error)
RemoveObject(ctx context.Context, bucketName, objectName string) error
}
@ -270,14 +269,10 @@ func (mcm *RemoteChunkManager) MultiRemove(ctx context.Context, keys []string) e
// RemoveWithPrefix removes all objects with the same prefix @prefix from minio.
func (mcm *RemoteChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error {
objects, err := mcm.listObjects(ctx, mcm.bucketName, prefix, true)
removeKeys, _, err := mcm.listObjects(ctx, mcm.bucketName, prefix, true)
if err != nil {
return err
}
removeKeys := make([]string, 0)
for key := range objects {
removeKeys = append(removeKeys, key)
}
i := 0
maxGoroutine := 10
for i < len(removeKeys) {
@ -312,38 +307,9 @@ func (mcm *RemoteChunkManager) ListWithPrefix(ctx context.Context, prefix string
// recursive = true may timeout during the recursive browsing the objects.
// See also: https://github.com/milvus-io/milvus/issues/19095
var objectsKeys []string
var modTimes []time.Time
tasks := list.New()
tasks.PushBack(prefix)
for tasks.Len() > 0 {
e := tasks.Front()
pre := e.Value.(string)
tasks.Remove(e)
// TODO add concurrent call if performance matters
// only return current level per call
objects, err := mcm.listObjects(ctx, mcm.bucketName, pre, false)
if err != nil {
return nil, nil, err
}
for object, lastModified := range objects {
// with tailing "/", object is a "directory"
if strings.HasSuffix(object, "/") && recursive {
// enqueue when recursive is true
if object != pre {
tasks.PushBack(object)
}
continue
}
objectsKeys = append(objectsKeys, object)
modTimes = append(modTimes, lastModified)
}
}
return objectsKeys, modTimes, nil
// TODO add concurrent call if performance matters
// only return current level per call
return mcm.listObjects(ctx, mcm.bucketName, prefix, recursive)
}
func (mcm *RemoteChunkManager) getObject(ctx context.Context, bucketName, objectName string,
@ -396,10 +362,10 @@ func (mcm *RemoteChunkManager) getObjectSize(ctx context.Context, bucketName, ob
return info, err
}
func (mcm *RemoteChunkManager) listObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error) {
func (mcm *RemoteChunkManager) listObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) {
start := timerecord.NewTimeRecorder("listObjects")
res, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive)
blobNames, lastModifiedTime, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive)
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.TotalLabel).Inc()
if err == nil {
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataListLabel).
@ -409,7 +375,7 @@ func (mcm *RemoteChunkManager) listObjects(ctx context.Context, bucketName strin
log.Warn("failed to list with prefix", zap.String("bucket", mcm.bucketName), zap.String("prefix", prefix), zap.Error(err))
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.FailLabel).Inc()
}
return res, err
return blobNames, lastModifiedTime, err
}
func (mcm *RemoteChunkManager) removeObject(ctx context.Context, bucketName, objectName string) error {