enhance: retry to read when the s3 get the unexpect eof error (#30861)

/kind improvement
issue: #30877

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/30883/head
SimFG 2024-02-28 16:28:53 +08:00 committed by GitHub
parent a115b731ed
commit 229fc4f755
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 120 additions and 50 deletions

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
@ -166,38 +167,46 @@ func (mcm *MinioChunkManager) Exist(ctx context.Context, filePath string) (bool,
// Read reads the minio storage data if exists.
func (mcm *MinioChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) {
start := time.Now()
object, err := mcm.getMinioObject(ctx, mcm.bucketName, filePath, minio.GetObjectOptions{})
if err != nil {
log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
}
defer object.Close()
var data []byte
err := retry.Do(ctx, func() error {
start := time.Now()
object, err := mcm.getMinioObject(ctx, mcm.bucketName, filePath, minio.GetObjectOptions{})
if err != nil {
log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err
}
defer object.Close()
// Prefetch object data
var empty []byte
_, err = object.Read(empty)
err = checkObjectStorageError(filePath, err)
// Prefetch object data
var empty []byte
_, err = object.Read(empty)
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err))
return err
}
objectInfo, err := object.Stat()
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err
}
data, err = Read(object, objectInfo.Size)
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err
}
metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(objectInfo.Size))
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataGetLabel).Observe(float64(time.Since(start).Milliseconds()))
return nil
}, retry.Attempts(3), retry.RetryErr(merr.IsRetryableErr))
if err != nil {
log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err))
return nil, err
}
objectInfo, err := object.Stat()
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
}
data, err := Read(object, objectInfo.Size)
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
}
metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(objectInfo.Size))
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataGetLabel).Observe(float64(time.Since(start).Milliseconds()))
return data, nil
}

View File

@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
@ -161,33 +162,41 @@ func (mcm *RemoteChunkManager) Exist(ctx context.Context, filePath string) (bool
// Read reads the minio storage data if exists.
func (mcm *RemoteChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) {
object, err := mcm.getObject(ctx, mcm.bucketName, filePath, int64(0), int64(0))
if err != nil {
log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
}
defer object.Close()
var data []byte
err := retry.Do(ctx, func() error {
object, err := mcm.getObject(ctx, mcm.bucketName, filePath, int64(0), int64(0))
if err != nil {
log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err
}
defer object.Close()
// Prefetch object data
var empty []byte
_, err = object.Read(empty)
err = checkObjectStorageError(filePath, err)
// Prefetch object data
var empty []byte
_, err = object.Read(empty)
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err))
return err
}
size, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath)
if err != nil {
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err
}
data, err = Read(object, size)
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err
}
metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(size))
return nil
}, retry.Attempts(3), retry.RetryErr(merr.IsRetryableErr))
if err != nil {
log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err))
return nil, err
}
size, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath)
if err != nil {
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
}
data, err := Read(object, size)
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
}
metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(size))
return data, nil
}
@ -407,5 +416,8 @@ func checkObjectStorageError(fileName string, err error) error {
}
return merr.WrapErrIoFailed(fileName, err)
}
if err == io.ErrUnexpectedEOF {
return merr.WrapErrIoUnexpectEOF(fileName, err)
}
return merr.WrapErrIoFailed(fileName, err)
}

View File

@ -99,6 +99,7 @@ var (
// IO related
ErrIoKeyNotFound = newMilvusError("key not found", 1000, false)
ErrIoFailed = newMilvusError("IO failed", 1001, false)
ErrIoUnexpectEOF = newMilvusError("unexpected EOF", 1002, true)
// Parameter related
ErrParameterInvalid = newMilvusError("invalid parameter", 1100, false)

View File

@ -124,6 +124,7 @@ func (s *ErrSuite) TestWrap() {
// IO related
s.ErrorIs(WrapErrIoKeyNotFound("test_key", "failed to read"), ErrIoKeyNotFound)
s.ErrorIs(WrapErrIoFailed("test_key", os.ErrClosed), ErrIoFailed)
s.ErrorIs(WrapErrIoUnexpectEOF("test_key", os.ErrClosed), ErrIoUnexpectEOF)
// Parameter related
s.ErrorIs(WrapErrParameterInvalid(8, 1, "failed to create"), ErrParameterInvalid)

View File

@ -774,6 +774,13 @@ func WrapErrIoFailedReason(reason string, msg ...string) error {
return err
}
func WrapErrIoUnexpectEOF(key string, err error) error {
if err == nil {
return nil
}
return wrapFieldsWithDesc(ErrIoUnexpectEOF, err.Error(), value("key", key))
}
// Parameter related
func WrapErrParameterInvalid[T any](expected, actual T, msg ...string) error {
err := wrapFields(ErrParameterInvalid,

View File

@ -17,6 +17,7 @@ type config struct {
attempts uint
sleep time.Duration
maxSleepTime time.Duration
isRetryErr func(err error) bool
}
func newDefaultConfig() *config {
@ -59,3 +60,9 @@ func MaxSleepTime(maxSleepTime time.Duration) Option {
}
}
}
func RetryErr(isRetryErr func(err error) bool) Option {
return func(c *config) {
c.isRetryErr = isRetryErr
}
}

View File

@ -52,6 +52,9 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error {
}
return err
}
if c.isRetryErr != nil && !c.isRetryErr(err) {
return err
}
deadline, ok := ctx.Deadline()
if ok && time.Until(deadline) < c.sleep {

View File

@ -152,3 +152,33 @@ func TestWrap(t *testing.T) {
assert.True(t, errors.Is(err2, merr.ErrSegmentNotFound))
assert.False(t, IsRecoverable(err2))
}
func TestRetryErrorParam(t *testing.T) {
{
mockErr := errors.New("mock not retry error")
runTimes := 0
err := Do(context.Background(), func() error {
runTimes++
return mockErr
}, RetryErr(func(err error) bool {
return err != mockErr
}))
assert.Error(t, err)
assert.Equal(t, 1, runTimes)
}
{
mockErr := errors.New("mock retry error")
runTimes := 0
err := Do(context.Background(), func() error {
runTimes++
return mockErr
}, Attempts(3), RetryErr(func(err error) bool {
return err == mockErr
}))
assert.Error(t, err)
assert.Equal(t, 3, runTimes)
}
}