mirror of https://github.com/milvus-io/milvus.git
parent
38aa53f523
commit
9658367a3c
|
@ -115,6 +115,9 @@ linters-settings:
|
|||
- 'Reason:\s+\w+\.Error\(\)'
|
||||
- 'errors.New\((.+)\.GetReason\(\)\)'
|
||||
- 'commonpb\.Status\{[\s\n]*ErrorCode:[\s\n]*.+[\s\S\n]*?\}'
|
||||
- 'os\.Open\(.+\)'
|
||||
- 'os\.ReadFile\(.+\)'
|
||||
- 'os\.WriteFile\(.+\)'
|
||||
#- 'fmt\.Print.*' WIP
|
||||
|
||||
issues:
|
||||
|
|
|
@ -3,7 +3,6 @@ package backend
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -22,6 +21,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -432,7 +432,7 @@ func (b etcd210) Backup(meta *meta.Meta, backupFile string) error {
|
|||
return err
|
||||
}
|
||||
console.Warning(fmt.Sprintf("backup to: %s", backupFile))
|
||||
return os.WriteFile(backupFile, backup, 0o600)
|
||||
return storage.WriteFile(backupFile, backup, 0o600)
|
||||
}
|
||||
|
||||
func (b etcd210) BackupV2(file string) error {
|
||||
|
@ -487,11 +487,11 @@ func (b etcd210) BackupV2(file string) error {
|
|||
}
|
||||
|
||||
console.Warning(fmt.Sprintf("backup to: %s", file))
|
||||
return os.WriteFile(file, backup, 0o600)
|
||||
return storage.WriteFile(file, backup, 0o600)
|
||||
}
|
||||
|
||||
func (b etcd210) Restore(backupFile string) error {
|
||||
backup, err := os.ReadFile(backupFile)
|
||||
backup, err := storage.ReadFile(backupFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -368,7 +368,7 @@ func (c *ChannelMeta) retryableLoadError(err error) bool {
|
|||
case errors.Is(err, merr.ErrParameterInvalid):
|
||||
// statslog corrupted
|
||||
return false
|
||||
case errors.Is(err, storage.ErrNoSuchKey):
|
||||
case errors.Is(err, merr.ErrIoKeyNotFound):
|
||||
// statslog not found
|
||||
return false
|
||||
default:
|
||||
|
|
|
@ -1183,7 +1183,7 @@ func (s *ChannelMetaMockSuite) TestAddSegment_SkipBFLoad() {
|
|||
s.cm.EXPECT().MultiRead(mock.Anything, []string{"rootPath/stats/1/0/100/10001"}).
|
||||
Run(func(_ context.Context, _ []string) {
|
||||
<-ch
|
||||
}).Return(nil, storage.WrapErrNoSuchKey("rootPath/stats/1/0/100/10001"))
|
||||
}).Return(nil, merr.WrapErrIoKeyNotFound("rootPath/stats/1/0/100/10001"))
|
||||
|
||||
err := s.channel.addSegment(
|
||||
context.TODO(),
|
||||
|
|
|
@ -58,6 +58,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
"github.com/milvus-io/milvus/internal/proxy"
|
||||
"github.com/milvus-io/milvus/internal/proxy/accesslog"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/componentutil"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
|
@ -259,7 +260,7 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) {
|
|||
}
|
||||
|
||||
certPool := x509.NewCertPool()
|
||||
rootBuf, err := os.ReadFile(Params.CaPemPath.GetValue())
|
||||
rootBuf, err := storage.ReadFile(Params.CaPemPath.GetValue())
|
||||
if err != nil {
|
||||
log.Warn("failed read ca pem", zap.Error(err))
|
||||
errChan <- err
|
||||
|
@ -480,7 +481,7 @@ func (s *Server) init() error {
|
|||
}
|
||||
|
||||
certPool := x509.NewCertPool()
|
||||
rootBuf, err := os.ReadFile(Params.CaPemPath.GetValue())
|
||||
rootBuf, err := storage.ReadFile(Params.CaPemPath.GetValue())
|
||||
if err != nil {
|
||||
log.Error("failed read ca pem", zap.Error(err))
|
||||
return err
|
||||
|
|
|
@ -18,7 +18,6 @@ package storage
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
@ -31,6 +30,7 @@ import (
|
|||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
)
|
||||
|
||||
|
@ -63,7 +63,7 @@ func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObje
|
|||
return nil, err
|
||||
}
|
||||
if c.bucketName == "" {
|
||||
return nil, fmt.Errorf("invalid bucket name")
|
||||
return nil, merr.WrapErrParameterInvalidMsg("invalid empty bucket name")
|
||||
}
|
||||
// check valid in first query
|
||||
checkBucketFn := func() error {
|
||||
|
@ -99,20 +99,20 @@ func (AzureObjectStorage *AzureObjectStorage) GetObject(ctx context.Context, buc
|
|||
}
|
||||
object, err := AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, checkObjectStorageError(objectName, err)
|
||||
}
|
||||
return object.Body, nil
|
||||
}
|
||||
|
||||
func (AzureObjectStorage *AzureObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error {
|
||||
_, err := AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).UploadStream(ctx, reader, &azblob.UploadStreamOptions{})
|
||||
return err
|
||||
return checkObjectStorageError(objectName, err)
|
||||
}
|
||||
|
||||
func (AzureObjectStorage *AzureObjectStorage) StatObject(ctx context.Context, bucketName, objectName string) (int64, error) {
|
||||
info, err := AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).GetProperties(ctx, &blob.GetPropertiesOptions{})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, checkObjectStorageError(objectName, err)
|
||||
}
|
||||
return *info.ContentLength, nil
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ func (AzureObjectStorage *AzureObjectStorage) ListObjects(ctx context.Context, b
|
|||
if pager.More() {
|
||||
pageResp, err := pager.NextPage(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, checkObjectStorageError(prefix, err)
|
||||
}
|
||||
for _, blob := range pageResp.Segment.BlobItems {
|
||||
objects[*blob.Name] = *blob.Properties.LastModified
|
||||
|
@ -136,5 +136,5 @@ func (AzureObjectStorage *AzureObjectStorage) ListObjects(ctx context.Context, b
|
|||
|
||||
func (AzureObjectStorage *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error {
|
||||
_, err := AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).Delete(ctx, &blob.DeleteOptions{})
|
||||
return err
|
||||
return checkObjectStorageError(objectName, err)
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ package storage
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
|
@ -65,22 +64,14 @@ func (lcm *LocalChunkManager) Path(ctx context.Context, filePath string) (string
|
|||
}
|
||||
|
||||
if !exist {
|
||||
return "", fmt.Errorf("local file cannot be found with filePath: %s", filePath)
|
||||
return "", merr.WrapErrIoKeyNotFound(filePath)
|
||||
}
|
||||
|
||||
return filePath, nil
|
||||
}
|
||||
|
||||
func (lcm *LocalChunkManager) Reader(ctx context.Context, filePath string) (FileReader, error) {
|
||||
exist, err := lcm.Exist(ctx, filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exist {
|
||||
return nil, errors.New("local file cannot be found with filePath:" + filePath)
|
||||
}
|
||||
|
||||
return os.Open(filePath)
|
||||
return Open(filePath)
|
||||
}
|
||||
|
||||
// Write writes the data to local storage.
|
||||
|
@ -93,10 +84,10 @@ func (lcm *LocalChunkManager) Write(ctx context.Context, filePath string, conten
|
|||
if !exist {
|
||||
err := os.MkdirAll(dir, os.ModePerm)
|
||||
if err != nil {
|
||||
return err
|
||||
return merr.WrapErrIoFailed(filePath, err)
|
||||
}
|
||||
}
|
||||
return os.WriteFile(filePath, content, os.ModePerm)
|
||||
return WriteFile(filePath, content, os.ModePerm)
|
||||
}
|
||||
|
||||
// MultiWrite writes the data to local storage.
|
||||
|
@ -118,22 +109,14 @@ func (lcm *LocalChunkManager) Exist(ctx context.Context, filePath string) (bool,
|
|||
if os.IsNotExist(err) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
return false, merr.WrapErrIoFailed(filePath, err)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Read reads the local storage data if exists.
|
||||
func (lcm *LocalChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) {
|
||||
exist, err := lcm.Exist(ctx, filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exist {
|
||||
return nil, fmt.Errorf("file not exist: %s", filePath)
|
||||
}
|
||||
|
||||
return os.ReadFile(filePath)
|
||||
return ReadFile(filePath)
|
||||
}
|
||||
|
||||
// MultiRead reads the local storage data if exists.
|
||||
|
@ -205,26 +188,36 @@ func (lcm *LocalChunkManager) ReadAt(ctx context.Context, filePath string, off i
|
|||
return nil, io.EOF
|
||||
}
|
||||
|
||||
file, err := os.Open(path.Clean(filePath))
|
||||
file, err := Open(path.Clean(filePath))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
res := make([]byte, length)
|
||||
if _, err := file.ReadAt(res, off); err != nil {
|
||||
return nil, err
|
||||
_, err = file.ReadAt(res, off)
|
||||
if err != nil {
|
||||
return nil, merr.WrapErrIoFailed(filePath, err)
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (lcm *LocalChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) {
|
||||
return mmap.Open(path.Clean(filePath))
|
||||
reader, err := mmap.Open(path.Clean(filePath))
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return nil, merr.WrapErrIoKeyNotFound(filePath, err.Error())
|
||||
}
|
||||
|
||||
return reader, merr.WrapErrIoFailed(filePath, err)
|
||||
}
|
||||
|
||||
func (lcm *LocalChunkManager) Size(ctx context.Context, filePath string) (int64, error) {
|
||||
fi, err := os.Stat(filePath)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return 0, merr.WrapErrIoKeyNotFound(filePath, err.Error())
|
||||
}
|
||||
return 0, merr.WrapErrIoFailed(filePath, err)
|
||||
}
|
||||
// get the size
|
||||
size := fi.Size()
|
||||
|
@ -232,28 +225,17 @@ func (lcm *LocalChunkManager) Size(ctx context.Context, filePath string) (int64,
|
|||
}
|
||||
|
||||
func (lcm *LocalChunkManager) Remove(ctx context.Context, filePath string) error {
|
||||
exist, err := lcm.Exist(ctx, filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exist {
|
||||
err := os.RemoveAll(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
err := os.RemoveAll(filePath)
|
||||
return merr.WrapErrIoFailed(filePath, err)
|
||||
}
|
||||
|
||||
func (lcm *LocalChunkManager) MultiRemove(ctx context.Context, filePaths []string) error {
|
||||
var el error
|
||||
errors := make([]error, 0, len(filePaths))
|
||||
for _, filePath := range filePaths {
|
||||
err := lcm.Remove(ctx, filePath)
|
||||
if err != nil {
|
||||
el = merr.Combine(err, errors.Wrapf(err, "failed to remove %s", filePath))
|
||||
}
|
||||
errors = append(errors, err)
|
||||
}
|
||||
return el
|
||||
return merr.Combine(errors...)
|
||||
}
|
||||
|
||||
func (lcm *LocalChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error {
|
||||
|
@ -261,8 +243,8 @@ func (lcm *LocalChunkManager) RemoveWithPrefix(ctx context.Context, prefix strin
|
|||
// MultiRemove() will delete all these files. This is a danger behavior, empty prefix is not allowed.
|
||||
if len(prefix) == 0 {
|
||||
errMsg := "empty prefix is not allowed for ChunkManager remove operation"
|
||||
log.Error(errMsg)
|
||||
return errors.New(errMsg)
|
||||
log.Warn(errMsg)
|
||||
return merr.WrapErrParameterInvalidMsg(errMsg)
|
||||
}
|
||||
|
||||
filePaths, _, err := lcm.ListWithPrefix(ctx, prefix, true)
|
||||
|
@ -276,8 +258,14 @@ func (lcm *LocalChunkManager) RemoveWithPrefix(ctx context.Context, prefix strin
|
|||
func (lcm *LocalChunkManager) getModTime(filepath string) (time.Time, error) {
|
||||
fi, err := os.Stat(filepath)
|
||||
if err != nil {
|
||||
log.Error("stat fileinfo error", zap.String("relative filepath", filepath))
|
||||
return time.Time{}, err
|
||||
log.Warn("stat fileinfo error",
|
||||
zap.String("filepath", filepath),
|
||||
zap.Error(err),
|
||||
)
|
||||
if os.IsNotExist(err) {
|
||||
return time.Time{}, merr.WrapErrIoKeyNotFound(filepath)
|
||||
}
|
||||
return time.Time{}, merr.WrapErrIoFailed(filepath, err)
|
||||
}
|
||||
|
||||
return fi.ModTime(), nil
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
"bytes"
|
||||
"container/list"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -37,18 +36,6 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
)
|
||||
|
||||
const NoSuchKey = "NoSuchKey"
|
||||
|
||||
var ErrNoSuchKey = errors.New(NoSuchKey)
|
||||
|
||||
func WrapErrNoSuchKey(key string) error {
|
||||
return fmt.Errorf("%w(key=%s)", ErrNoSuchKey, key)
|
||||
}
|
||||
|
||||
func IsErrNoSuchKey(err error) bool {
|
||||
return strings.HasPrefix(err.Error(), NoSuchKey)
|
||||
}
|
||||
|
||||
var CheckBucketRetryAttempts uint = 20
|
||||
|
||||
// MinioChunkManager is responsible for read and write data stored in minio.
|
||||
|
@ -112,7 +99,7 @@ func (mcm *MinioChunkManager) Path(ctx context.Context, filePath string) (string
|
|||
return "", err
|
||||
}
|
||||
if !exist {
|
||||
return "", errors.New("minio file manage cannot be found with filePath:" + filePath)
|
||||
return "", merr.WrapErrIoKeyNotFound(filePath)
|
||||
}
|
||||
return filePath, nil
|
||||
}
|
||||
|
@ -152,25 +139,26 @@ func (mcm *MinioChunkManager) Write(ctx context.Context, filePath string, conten
|
|||
// MultiWrite saves multiple objects, the path is the key of @kvs.
|
||||
// The object value is the value of @kvs.
|
||||
func (mcm *MinioChunkManager) MultiWrite(ctx context.Context, kvs map[string][]byte) error {
|
||||
var el error
|
||||
errors := make([]error, 0, len(kvs))
|
||||
for key, value := range kvs {
|
||||
err := mcm.Write(ctx, key, value)
|
||||
if err != nil {
|
||||
el = merr.Combine(el, errors.Wrapf(err, "failed to write %s", key))
|
||||
}
|
||||
errors = append(errors, err)
|
||||
}
|
||||
return el
|
||||
return merr.Combine(errors...)
|
||||
}
|
||||
|
||||
// Exist checks whether chunk is saved to minio storage.
|
||||
func (mcm *MinioChunkManager) Exist(ctx context.Context, filePath string) (bool, error) {
|
||||
_, err := mcm.statMinioObject(ctx, mcm.bucketName, filePath, minio.StatObjectOptions{})
|
||||
if err != nil {
|
||||
errResponse := minio.ToErrorResponse(err)
|
||||
if errResponse.Code == "NoSuchKey" {
|
||||
if errors.Is(err, merr.ErrIoKeyNotFound) {
|
||||
return false, nil
|
||||
}
|
||||
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
|
||||
log.Warn("failed to stat object",
|
||||
zap.String("bucket", mcm.bucketName),
|
||||
zap.String("path", filePath),
|
||||
zap.Error(err),
|
||||
)
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
|
@ -188,31 +176,22 @@ func (mcm *MinioChunkManager) Read(ctx context.Context, filePath string) ([]byte
|
|||
// Prefetch object data
|
||||
var empty []byte
|
||||
_, err = object.Read(empty)
|
||||
err = checkObjectStorageError(filePath, err)
|
||||
if err != nil {
|
||||
errResponse := minio.ToErrorResponse(err)
|
||||
if errResponse.Code == "NoSuchKey" {
|
||||
return nil, WrapErrNoSuchKey(filePath)
|
||||
}
|
||||
log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objectInfo, err := object.Stat()
|
||||
err = checkObjectStorageError(filePath, err)
|
||||
if err != nil {
|
||||
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
|
||||
errResponse := minio.ToErrorResponse(err)
|
||||
if errResponse.Code == "NoSuchKey" {
|
||||
return nil, WrapErrNoSuchKey(filePath)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, err := Read(object, objectInfo.Size)
|
||||
err = checkObjectStorageError(filePath, err)
|
||||
if err != nil {
|
||||
errResponse := minio.ToErrorResponse(err)
|
||||
if errResponse.Code == "NoSuchKey" {
|
||||
return nil, WrapErrNoSuchKey(filePath)
|
||||
}
|
||||
log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
@ -221,17 +200,17 @@ func (mcm *MinioChunkManager) Read(ctx context.Context, filePath string) ([]byte
|
|||
}
|
||||
|
||||
func (mcm *MinioChunkManager) MultiRead(ctx context.Context, keys []string) ([][]byte, error) {
|
||||
var el error
|
||||
errors := make([]error, 0)
|
||||
var objectsValues [][]byte
|
||||
for _, key := range keys {
|
||||
objectValue, err := mcm.Read(ctx, key)
|
||||
if err != nil {
|
||||
el = merr.Combine(el, errors.Wrapf(err, "failed to read %s", key))
|
||||
errors = append(errors, err)
|
||||
}
|
||||
objectsValues = append(objectsValues, objectValue)
|
||||
}
|
||||
|
||||
return objectsValues, el
|
||||
return objectsValues, merr.Combine(errors...)
|
||||
}
|
||||
|
||||
func (mcm *MinioChunkManager) ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error) {
|
||||
|
@ -248,7 +227,7 @@ func (mcm *MinioChunkManager) ReadWithPrefix(ctx context.Context, prefix string)
|
|||
}
|
||||
|
||||
func (mcm *MinioChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) {
|
||||
return nil, errors.New("this method has not been implemented")
|
||||
return nil, merr.WrapErrServiceInternal("mmap not supported for MinIO chunk manager")
|
||||
}
|
||||
|
||||
// ReadAt reads specific position data of minio storage if exists.
|
||||
|
@ -261,7 +240,7 @@ func (mcm *MinioChunkManager) ReadAt(ctx context.Context, filePath string, off i
|
|||
err := opts.SetRange(off, off+length-1)
|
||||
if err != nil {
|
||||
log.Warn("failed to set range", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
|
||||
return nil, err
|
||||
return nil, merr.WrapErrParameterInvalidMsg("invalid range while reading %s: %v", filePath, err)
|
||||
}
|
||||
|
||||
object, err := mcm.getMinioObject(ctx, mcm.bucketName, filePath, opts)
|
||||
|
@ -273,10 +252,7 @@ func (mcm *MinioChunkManager) ReadAt(ctx context.Context, filePath string, off i
|
|||
|
||||
data, err := Read(object, length)
|
||||
if err != nil {
|
||||
errResponse := minio.ToErrorResponse(err)
|
||||
if errResponse.Code == "NoSuchKey" {
|
||||
return nil, WrapErrNoSuchKey(filePath)
|
||||
}
|
||||
err = checkObjectStorageError(filePath, err)
|
||||
log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
@ -411,14 +387,18 @@ func (mcm *MinioChunkManager) getMinioObject(ctx context.Context, bucketName, ob
|
|||
|
||||
reader, err := mcm.Client.GetObject(ctx, bucketName, objectName, opts)
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.TotalLabel).Inc()
|
||||
if err == nil && reader != nil {
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataGetLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.SuccessLabel).Inc()
|
||||
} else {
|
||||
if err != nil {
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.FailLabel).Inc()
|
||||
return nil, checkObjectStorageError(objectName, err)
|
||||
}
|
||||
if reader == nil {
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.FailLabel).Inc()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return reader, err
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataGetLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.SuccessLabel).Inc()
|
||||
return reader, nil
|
||||
}
|
||||
|
||||
func (mcm *MinioChunkManager) putMinioObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64,
|
||||
|
@ -428,14 +408,14 @@ func (mcm *MinioChunkManager) putMinioObject(ctx context.Context, bucketName, ob
|
|||
|
||||
info, err := mcm.Client.PutObject(ctx, bucketName, objectName, reader, objectSize, opts)
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataPutLabel, metrics.TotalLabel).Inc()
|
||||
if err == nil {
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataPutLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.SuccessLabel).Inc()
|
||||
} else {
|
||||
if err != nil {
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.FailLabel).Inc()
|
||||
return info, checkObjectStorageError(objectName, err)
|
||||
}
|
||||
|
||||
return info, err
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataPutLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.SuccessLabel).Inc()
|
||||
return info, nil
|
||||
}
|
||||
|
||||
func (mcm *MinioChunkManager) statMinioObject(ctx context.Context, bucketName, objectName string,
|
||||
|
@ -445,14 +425,15 @@ func (mcm *MinioChunkManager) statMinioObject(ctx context.Context, bucketName, o
|
|||
|
||||
info, err := mcm.Client.StatObject(ctx, bucketName, objectName, opts)
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.TotalLabel).Inc()
|
||||
if err == nil {
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataStatLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.SuccessLabel).Inc()
|
||||
} else {
|
||||
if err != nil {
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.FailLabel).Inc()
|
||||
err = checkObjectStorageError(objectName, err)
|
||||
return info, err
|
||||
}
|
||||
|
||||
return info, err
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataStatLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.SuccessLabel).Inc()
|
||||
return info, nil
|
||||
}
|
||||
|
||||
func (mcm *MinioChunkManager) listMinioObjects(ctx context.Context, bucketName string,
|
||||
|
@ -475,12 +456,12 @@ func (mcm *MinioChunkManager) removeMinioObject(ctx context.Context, bucketName,
|
|||
|
||||
err := mcm.Client.RemoveObject(ctx, bucketName, objectName, opts)
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.TotalLabel).Inc()
|
||||
if err == nil {
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataRemoveLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.SuccessLabel).Inc()
|
||||
} else {
|
||||
if err != nil {
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.FailLabel).Inc()
|
||||
return checkObjectStorageError(objectName, err)
|
||||
}
|
||||
|
||||
return err
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataRemoveLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.SuccessLabel).Inc()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ import (
|
|||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
)
|
||||
|
||||
// TODO: NewMinioChunkManager is deprecated. Rewrite this unittest.
|
||||
|
@ -516,11 +518,11 @@ func TestMinIOCM(t *testing.T) {
|
|||
|
||||
_, err = testCM.Read(ctx, key)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, errors.Is(err, ErrNoSuchKey))
|
||||
assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound))
|
||||
|
||||
_, err = testCM.ReadAt(ctx, key, 100, 1)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, errors.Is(err, ErrNoSuchKey))
|
||||
assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -150,24 +150,24 @@ func (minioObjectStorage *MinioObjectStorage) GetObject(ctx context.Context, buc
|
|||
err := opts.SetRange(offset, offset+size-1)
|
||||
if err != nil {
|
||||
log.Warn("failed to set range", zap.String("bucket", bucketName), zap.String("path", objectName), zap.Error(err))
|
||||
return nil, err
|
||||
return nil, checkObjectStorageError(objectName, err)
|
||||
}
|
||||
}
|
||||
object, err := minioObjectStorage.Client.GetObject(ctx, bucketName, objectName, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, checkObjectStorageError(objectName, err)
|
||||
}
|
||||
return object, nil
|
||||
}
|
||||
|
||||
func (minioObjectStorage *MinioObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error {
|
||||
_, err := minioObjectStorage.Client.PutObject(ctx, bucketName, objectName, reader, objectSize, minio.PutObjectOptions{})
|
||||
return err
|
||||
return checkObjectStorageError(objectName, err)
|
||||
}
|
||||
|
||||
func (minioObjectStorage *MinioObjectStorage) StatObject(ctx context.Context, bucketName, objectName string) (int64, error) {
|
||||
info, err := minioObjectStorage.Client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{})
|
||||
return info.Size, err
|
||||
return info.Size, checkObjectStorageError(objectName, err)
|
||||
}
|
||||
|
||||
func (minioObjectStorage *MinioObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) (map[string]time.Time, error) {
|
||||
|
|
|
@ -151,7 +151,7 @@ func (mcm *RemoteChunkManager) MultiWrite(ctx context.Context, kvs map[string][]
|
|||
func (mcm *RemoteChunkManager) Exist(ctx context.Context, filePath string) (bool, error) {
|
||||
_, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath)
|
||||
if err != nil {
|
||||
if IsErrNoSuchKey(err) {
|
||||
if errors.Is(err, merr.ErrIoKeyNotFound) {
|
||||
return false, nil
|
||||
}
|
||||
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
|
||||
|
@ -172,11 +172,8 @@ func (mcm *RemoteChunkManager) Read(ctx context.Context, filePath string) ([]byt
|
|||
// Prefetch object data
|
||||
var empty []byte
|
||||
_, err = object.Read(empty)
|
||||
err = checkObjectStorageError(filePath, err)
|
||||
if err != nil {
|
||||
errResponse := minio.ToErrorResponse(err)
|
||||
if errResponse.Code == "NoSuchKey" {
|
||||
return nil, WrapErrNoSuchKey(filePath)
|
||||
}
|
||||
log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
@ -186,11 +183,8 @@ func (mcm *RemoteChunkManager) Read(ctx context.Context, filePath string) ([]byt
|
|||
return nil, err
|
||||
}
|
||||
data, err := Read(object, size)
|
||||
err = checkObjectStorageError(filePath, err)
|
||||
if err != nil {
|
||||
errResponse := minio.ToErrorResponse(err)
|
||||
if errResponse.Code == "NoSuchKey" {
|
||||
return nil, WrapErrNoSuchKey(filePath)
|
||||
}
|
||||
log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
@ -243,11 +237,8 @@ func (mcm *RemoteChunkManager) ReadAt(ctx context.Context, filePath string, off
|
|||
defer object.Close()
|
||||
|
||||
data, err := Read(object, length)
|
||||
err = checkObjectStorageError(filePath, err)
|
||||
if err != nil {
|
||||
errResponse := minio.ToErrorResponse(err)
|
||||
if errResponse.Code == "NoSuchKey" {
|
||||
return nil, WrapErrNoSuchKey(filePath)
|
||||
}
|
||||
log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
@ -363,23 +354,13 @@ func (mcm *RemoteChunkManager) getObject(ctx context.Context, bucketName, object
|
|||
reader, err := mcm.client.GetObject(ctx, bucketName, objectName, offset, size)
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.TotalLabel).Inc()
|
||||
if err == nil && reader != nil {
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataGetLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataGetLabel).
|
||||
Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.SuccessLabel).Inc()
|
||||
} else {
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.FailLabel).Inc()
|
||||
}
|
||||
|
||||
switch err := err.(type) {
|
||||
case *azcore.ResponseError:
|
||||
if err.ErrorCode == string(bloberror.BlobNotFound) {
|
||||
return nil, WrapErrNoSuchKey(objectName)
|
||||
}
|
||||
case minio.ErrorResponse:
|
||||
if err.Code == "NoSuchKey" {
|
||||
return nil, WrapErrNoSuchKey(objectName)
|
||||
}
|
||||
}
|
||||
|
||||
return reader, err
|
||||
}
|
||||
|
||||
|
@ -389,7 +370,8 @@ func (mcm *RemoteChunkManager) putObject(ctx context.Context, bucketName, object
|
|||
err := mcm.client.PutObject(ctx, bucketName, objectName, reader, objectSize)
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataPutLabel, metrics.TotalLabel).Inc()
|
||||
if err == nil {
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataPutLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataPutLabel).
|
||||
Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.SuccessLabel).Inc()
|
||||
} else {
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.FailLabel).Inc()
|
||||
|
@ -404,23 +386,13 @@ func (mcm *RemoteChunkManager) getObjectSize(ctx context.Context, bucketName, ob
|
|||
info, err := mcm.client.StatObject(ctx, bucketName, objectName)
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.TotalLabel).Inc()
|
||||
if err == nil {
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataStatLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataStatLabel).
|
||||
Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.SuccessLabel).Inc()
|
||||
} else {
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.FailLabel).Inc()
|
||||
}
|
||||
|
||||
switch err := err.(type) {
|
||||
case *azcore.ResponseError:
|
||||
if err.ErrorCode == string(bloberror.BlobNotFound) {
|
||||
return info, WrapErrNoSuchKey(objectName)
|
||||
}
|
||||
case minio.ErrorResponse:
|
||||
if err.Code == "NoSuchKey" {
|
||||
return info, WrapErrNoSuchKey(objectName)
|
||||
}
|
||||
}
|
||||
|
||||
return info, err
|
||||
}
|
||||
|
||||
|
@ -430,7 +402,8 @@ func (mcm *RemoteChunkManager) listObjects(ctx context.Context, bucketName strin
|
|||
res, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive)
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.TotalLabel).Inc()
|
||||
if err == nil {
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataListLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataListLabel).
|
||||
Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.SuccessLabel).Inc()
|
||||
} else {
|
||||
log.Warn("failed to list with prefix", zap.String("bucket", mcm.bucketName), zap.String("prefix", prefix), zap.Error(err))
|
||||
|
@ -445,7 +418,8 @@ func (mcm *RemoteChunkManager) removeObject(ctx context.Context, bucketName, obj
|
|||
err := mcm.client.RemoveObject(ctx, bucketName, objectName)
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.TotalLabel).Inc()
|
||||
if err == nil {
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataRemoveLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataRemoveLabel).
|
||||
Observe(float64(start.ElapseSpan().Milliseconds()))
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.SuccessLabel).Inc()
|
||||
} else {
|
||||
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.FailLabel).Inc()
|
||||
|
@ -453,3 +427,23 @@ func (mcm *RemoteChunkManager) removeObject(ctx context.Context, bucketName, obj
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
func checkObjectStorageError(fileName string, err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch err := err.(type) {
|
||||
case *azcore.ResponseError:
|
||||
if err.ErrorCode == string(bloberror.BlobNotFound) {
|
||||
return merr.WrapErrIoKeyNotFound(fileName, err.Error())
|
||||
}
|
||||
return merr.WrapErrIoFailed(fileName, err)
|
||||
case minio.ErrorResponse:
|
||||
if err.Code == "NoSuchKey" {
|
||||
return merr.WrapErrIoKeyNotFound(fileName, err.Error())
|
||||
}
|
||||
return merr.WrapErrIoFailed(fileName, err)
|
||||
}
|
||||
return merr.WrapErrIoFailed(fileName, err)
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ import (
|
|||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
)
|
||||
|
||||
// TODO: NewRemoteChunkManager is deprecated. Rewrite this unittest.
|
||||
|
@ -504,7 +506,7 @@ func TestMinioChunkManager(t *testing.T) {
|
|||
|
||||
_, err = testCM.Read(ctx, key)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, errors.Is(err, ErrNoSuchKey))
|
||||
assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound))
|
||||
|
||||
file, err := testCM.Reader(ctx, key)
|
||||
assert.NoError(t, err) // todo
|
||||
|
@ -512,7 +514,7 @@ func TestMinioChunkManager(t *testing.T) {
|
|||
|
||||
_, err = testCM.ReadAt(ctx, key, 100, 1)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, errors.Is(err, ErrNoSuchKey))
|
||||
assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound))
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -958,14 +960,14 @@ func TestAzureChunkManager(t *testing.T) {
|
|||
|
||||
_, err = testCM.Read(ctx, key)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, errors.Is(err, ErrNoSuchKey))
|
||||
assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound))
|
||||
|
||||
_, err = testCM.Reader(ctx, key)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, errors.Is(err, ErrNoSuchKey))
|
||||
assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound))
|
||||
|
||||
_, err = testCM.ReadAt(ctx, key, 100, 1)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, errors.Is(err, ErrNoSuchKey))
|
||||
assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
||||
|
@ -34,11 +36,51 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// Open opens file as os.Open works,
|
||||
// also converts the os errors to Milvus errors
|
||||
func Open(filepath string) (*os.File, error) {
|
||||
// NOLINT
|
||||
reader, err := os.Open(filepath)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, merr.WrapErrIoKeyNotFound(filepath)
|
||||
} else if err != nil {
|
||||
return nil, merr.WrapErrIoFailed(filepath, err)
|
||||
}
|
||||
|
||||
return reader, nil
|
||||
}
|
||||
|
||||
// ReadFile reads file as os.ReadFile works,
|
||||
// also converts the os errors to Milvus errors
|
||||
func ReadFile(filepath string) ([]byte, error) {
|
||||
// NOLINT
|
||||
data, err := os.ReadFile(filepath)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, merr.WrapErrIoKeyNotFound(filepath)
|
||||
} else if err != nil {
|
||||
return nil, merr.WrapErrIoFailed(filepath, err)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// WriteFile writes file as os.WriteFile works,
|
||||
// also converts the os errors to Milvus errors
|
||||
func WriteFile(filepath string, data []byte, perm fs.FileMode) error {
|
||||
// NOLINT
|
||||
err := os.WriteFile(filepath, data, perm)
|
||||
if err != nil {
|
||||
return merr.WrapErrIoFailed(filepath, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkTsField(data *InsertData) bool {
|
||||
tsData, ok := data.Data[common.TimeStampField]
|
||||
if !ok {
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
|
@ -1131,7 +1132,7 @@ func GetSessions(pid int) []string {
|
|||
return []string{}
|
||||
}
|
||||
|
||||
v, err := os.ReadFile(fileFullName)
|
||||
v, err := storage.ReadFile(fileFullName)
|
||||
if err != nil {
|
||||
log.Warn("read server info file path failed", zap.String("filePath", fileFullName), zap.Error(err))
|
||||
return []string{}
|
||||
|
|
|
@ -18,6 +18,7 @@ package merr
|
|||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
|
@ -121,7 +122,7 @@ func (s *ErrSuite) TestWrap() {
|
|||
|
||||
// IO related
|
||||
s.ErrorIs(WrapErrIoKeyNotFound("test_key", "failed to read"), ErrIoKeyNotFound)
|
||||
s.ErrorIs(WrapErrIoFailed("test_key", "failed to read"), ErrIoFailed)
|
||||
s.ErrorIs(WrapErrIoFailed("test_key", os.ErrClosed), ErrIoFailed)
|
||||
|
||||
// Parameter related
|
||||
s.ErrorIs(WrapErrParameterInvalid(8, 1, "failed to create"), ErrParameterInvalid)
|
||||
|
@ -180,7 +181,7 @@ func (s *ErrSuite) TestCombineOnlyNil() {
|
|||
}
|
||||
|
||||
func (s *ErrSuite) TestCombineCode() {
|
||||
err := Combine(WrapErrIoFailed("test"), WrapErrCollectionNotFound(1))
|
||||
err := Combine(WrapErrPartitionNotFound(10), WrapErrCollectionNotFound(1))
|
||||
s.Equal(Code(ErrCollectionNotFound), Code(err))
|
||||
}
|
||||
|
||||
|
|
|
@ -648,11 +648,11 @@ func WrapErrIoKeyNotFound(key string, msg ...string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func WrapErrIoFailed(key string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrIoFailed, "key=%s", key)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
func WrapErrIoFailed(key string, err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
err = errors.Wrapf(ErrIoFailed, "key=%s: %v", key, err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue