mirror of https://github.com/milvus-io/milvus.git
parent
dbdb9e15d8
commit
63ac43a3b8
|
@ -24,7 +24,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
|
@ -50,6 +49,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/interceptor"
|
||||
"github.com/milvus-io/milvus/pkg/util/logutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
)
|
||||
|
@ -326,11 +326,8 @@ func (s *Server) WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannel
|
|||
}
|
||||
|
||||
func (s *Server) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
|
||||
if s.datanode.GetStateCode() != commonpb.StateCode_Healthy {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "DataNode isn't healthy.",
|
||||
}, errors.New("DataNode is not ready yet")
|
||||
if err := merr.CheckHealthy(s.datanode.GetStateCode()); err != nil {
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
return s.datanode.FlushSegments(ctx, req)
|
||||
}
|
||||
|
|
|
@ -262,7 +262,7 @@ func Test_NewServer(t *testing.T) {
|
|||
status: &commonpb.Status{},
|
||||
}
|
||||
states, err := server.FlushSegments(ctx, nil)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, states)
|
||||
})
|
||||
|
||||
|
|
|
@ -868,19 +868,13 @@ func (s *Server) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasReq
|
|||
|
||||
func (s *Server) DescribeAlias(ctx context.Context, request *milvuspb.DescribeAliasRequest) (*milvuspb.DescribeAliasResponse, error) {
|
||||
return &milvuspb.DescribeAliasResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "TODO: implement me",
|
||||
},
|
||||
Status: merr.Status(merr.WrapErrServiceUnavailable("DescribeAlias unimplemented")),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Server) ListAliases(ctx context.Context, request *milvuspb.ListAliasesRequest) (*milvuspb.ListAliasesResponse, error) {
|
||||
return &milvuspb.ListAliasesResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "TODO: implement me",
|
||||
},
|
||||
Status: merr.Status(merr.WrapErrServiceUnavailable("ListAliases unimplemented")),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1070,19 +1064,13 @@ func (s *Server) ListResourceGroups(ctx context.Context, req *milvuspb.ListResou
|
|||
|
||||
func (s *Server) ListIndexedSegment(ctx context.Context, req *federpb.ListIndexedSegmentRequest) (*federpb.ListIndexedSegmentResponse, error) {
|
||||
return &federpb.ListIndexedSegmentResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "not implemented",
|
||||
},
|
||||
Status: merr.Status(merr.WrapErrServiceUnavailable("ListIndexedSegment unimplemented")),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Server) DescribeSegmentIndexData(ctx context.Context, req *federpb.DescribeSegmentIndexDataRequest) (*federpb.DescribeSegmentIndexDataResponse, error) {
|
||||
return &federpb.DescribeSegmentIndexDataResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "not implemented",
|
||||
},
|
||||
Status: merr.Status(merr.WrapErrServiceUnavailable("DescribeSegmentIndexData unimplemented")),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -42,19 +42,20 @@ import (
|
|||
)
|
||||
|
||||
func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest) (*commonpb.Status, error) {
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.String("clusterID", req.GetClusterID()),
|
||||
zap.Int64("indexBuildID", req.GetBuildID()),
|
||||
)
|
||||
|
||||
if !i.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
stateCode := i.lifetime.GetState()
|
||||
log.Ctx(ctx).Warn("index node not ready",
|
||||
log.Warn("index node not ready",
|
||||
zap.String("state", stateCode.String()),
|
||||
zap.String("clusterID", req.GetClusterID()),
|
||||
zap.Int64("indexBuildID", req.GetBuildID()),
|
||||
)
|
||||
return merr.Status(merr.WrapErrServiceNotReady(stateCode.String())), nil
|
||||
}
|
||||
defer i.lifetime.Done()
|
||||
log.Ctx(ctx).Info("IndexNode building index ...",
|
||||
zap.String("clusterID", req.GetClusterID()),
|
||||
zap.Int64("indexBuildID", req.GetBuildID()),
|
||||
log.Info("IndexNode building index ...",
|
||||
zap.Int64("indexID", req.GetIndexID()),
|
||||
zap.String("indexName", req.GetIndexName()),
|
||||
zap.String("indexFilePrefix", req.GetIndexFilePrefix()),
|
||||
|
@ -77,26 +78,20 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
|
|||
cancel: taskCancel,
|
||||
state: commonpb.IndexState_InProgress,
|
||||
}); oldInfo != nil {
|
||||
log.Ctx(ctx).Warn("duplicated index build task", zap.String("clusterID", req.GetClusterID()), zap.Int64("buildID", req.GetBuildID()))
|
||||
err := merr.WrapErrIndexDuplicate(req.GetIndexName(), "building index task existed")
|
||||
log.Warn("duplicated index build task", zap.Error(err))
|
||||
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_BuildIndexError,
|
||||
Reason: "duplicated index build task",
|
||||
}, nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
cm, err := i.storageFactory.NewChunkManager(i.loopCtx, req.GetStorageConfig())
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error("create chunk manager failed", zap.String("bucket", req.GetStorageConfig().GetBucketName()),
|
||||
log.Error("create chunk manager failed", zap.String("bucket", req.GetStorageConfig().GetBucketName()),
|
||||
zap.String("accessKey", req.GetStorageConfig().GetAccessKeyID()),
|
||||
zap.String("clusterID", req.GetClusterID()), zap.Int64("indexBuildID", req.GetBuildID()),
|
||||
zap.Error(err),
|
||||
)
|
||||
i.deleteTaskInfos(ctx, []taskKey{{ClusterID: req.GetClusterID(), BuildID: req.GetBuildID()}})
|
||||
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_BuildIndexError,
|
||||
Reason: "create chunk manager failed, error: " + err.Error(),
|
||||
}, nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
task := &indexBuildTask{
|
||||
ident: fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID),
|
||||
|
@ -113,15 +108,15 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
|
|||
}
|
||||
ret := merr.Status(nil)
|
||||
if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil {
|
||||
log.Ctx(ctx).Warn("IndexNode failed to schedule", zap.Int64("indexBuildID", req.GetBuildID()),
|
||||
zap.String("clusterID", req.GetClusterID()), zap.Error(err))
|
||||
log.Warn("IndexNode failed to schedule",
|
||||
zap.Error(err))
|
||||
ret = merr.Status(err)
|
||||
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.FailLabel).Inc()
|
||||
return ret, nil
|
||||
}
|
||||
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel).Inc()
|
||||
log.Ctx(ctx).Info("IndexNode successfully scheduled", zap.Int64("indexBuildID", req.GetBuildID()),
|
||||
zap.String("clusterID", req.GetClusterID()), zap.String("indexName", req.GetIndexName()))
|
||||
log.Info("IndexNode successfully scheduled",
|
||||
zap.String("indexName", req.GetIndexName()))
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
|
@ -253,7 +248,6 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
|
|||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: msgIndexNodeIsUnhealthy(paramtable.GetNodeID()),
|
||||
},
|
||||
Response: "",
|
||||
}, nil
|
||||
}
|
||||
defer i.lifetime.Done()
|
||||
|
@ -266,8 +260,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
|
|||
zap.Error(err))
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: merr.Status(err),
|
||||
Response: "",
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -289,10 +282,6 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
|
|||
zap.String("metricType", metricType))
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: metricsinfo.MsgUnimplementedMetric,
|
||||
},
|
||||
Response: "",
|
||||
Status: merr.Status(merr.WrapErrMetricNotFound(metricType)),
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -92,8 +92,7 @@ func TestGetMetricsError(t *testing.T) {
|
|||
}
|
||||
resp, err = in.GetMetrics(ctx, unsupportedReq)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_UnexpectedError)
|
||||
assert.Equal(t, resp.GetStatus().GetReason(), metricsinfo.MsgUnimplementedMetric)
|
||||
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrMetricNotFound)
|
||||
}
|
||||
|
||||
func TestMockFieldData(t *testing.T) {
|
||||
|
|
|
@ -1,42 +0,0 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
)
|
||||
|
||||
// Keep this error temporarily
|
||||
// this error belongs to ErrServiceMemoryLimitExceeded
|
||||
// but in the error returned by querycoord,the collection id is given
|
||||
// which can not be thrown out
|
||||
// the error will be deleted after reaching an agreement on collection name and id in qn
|
||||
|
||||
// ErrInsufficientMemory returns insufficient memory error.
|
||||
var ErrInsufficientMemory = errors.New("InsufficientMemoryToLoad")
|
||||
|
||||
// InSufficientMemoryStatus returns insufficient memory status.
|
||||
func InSufficientMemoryStatus(collectionName string) *commonpb.Status {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_InsufficientMemoryToLoad,
|
||||
Reason: fmt.Sprintf("deny to load, insufficient memory, please allocate more resources, collectionName: %s", collectionName),
|
||||
}
|
||||
}
|
|
@ -1,35 +0,0 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
)
|
||||
|
||||
func Test_ErrInsufficientMemory(t *testing.T) {
|
||||
err := fmt.Errorf("%w, mock insufficient memory error", ErrInsufficientMemory)
|
||||
assert.True(t, errors.Is(err, ErrInsufficientMemory))
|
||||
|
||||
status := InSufficientMemoryStatus("collection1")
|
||||
assert.Equal(t, commonpb.ErrorCode_InsufficientMemoryToLoad, status.GetErrorCode())
|
||||
}
|
|
@ -1483,13 +1483,13 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get
|
|||
|
||||
getErrResponse := func(err error) *milvuspb.GetLoadingProgressResponse {
|
||||
log.Warn("fail to get loading progress",
|
||||
zap.String("collection_name", request.CollectionName),
|
||||
zap.Strings("partition_name", request.PartitionNames),
|
||||
zap.String("collectionName", request.CollectionName),
|
||||
zap.Strings("partitionName", request.PartitionNames),
|
||||
zap.Error(err))
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
|
||||
if errors.Is(err, ErrInsufficientMemory) {
|
||||
if errors.Is(err, merr.ErrServiceMemoryLimitExceeded) {
|
||||
return &milvuspb.GetLoadingProgressResponse{
|
||||
Status: InSufficientMemoryStatus(request.GetCollectionName()),
|
||||
Status: merr.Status(err),
|
||||
}
|
||||
}
|
||||
return &milvuspb.GetLoadingProgressResponse{
|
||||
|
@ -1574,14 +1574,6 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt
|
|||
return getErrResponse(err), nil
|
||||
}
|
||||
|
||||
// TODO(longjiquan): https://github.com/milvus-io/milvus/issues/21485, Remove `GetComponentStates` after error code
|
||||
// is ready to distinguish case whether the querycoord is not healthy or the collection is not even loaded.
|
||||
if statesResp, err := node.queryCoord.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}); err != nil {
|
||||
return getErrResponse(err), nil
|
||||
} else if statesResp.State == nil || statesResp.State.StateCode != commonpb.StateCode_Healthy {
|
||||
return getErrResponse(fmt.Errorf("the querycoord server isn't healthy, state: %v", statesResp.State)), nil
|
||||
}
|
||||
|
||||
successResponse := &milvuspb.GetLoadStateResponse{
|
||||
Status: merr.Status(nil),
|
||||
}
|
||||
|
@ -1615,24 +1607,30 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt
|
|||
var progress int64
|
||||
if len(request.GetPartitionNames()) == 0 {
|
||||
if progress, _, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil {
|
||||
if errors.Is(err, ErrInsufficientMemory) {
|
||||
if err != nil {
|
||||
if errors.Is(err, merr.ErrCollectionNotLoaded) {
|
||||
successResponse.State = commonpb.LoadState_LoadStateNotLoad
|
||||
return successResponse, nil
|
||||
}
|
||||
return &milvuspb.GetLoadStateResponse{
|
||||
Status: InSufficientMemoryStatus(request.GetCollectionName()),
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
successResponse.State = commonpb.LoadState_LoadStateNotLoad
|
||||
return successResponse, nil
|
||||
}
|
||||
} else {
|
||||
if progress, _, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
|
||||
request.GetPartitionNames(), request.GetCollectionName(), collectionID, request.GetDbName()); err != nil {
|
||||
if errors.Is(err, ErrInsufficientMemory) {
|
||||
if err != nil {
|
||||
if errors.IsAny(err,
|
||||
merr.ErrCollectionNotLoaded,
|
||||
merr.ErrPartitionNotLoaded) {
|
||||
successResponse.State = commonpb.LoadState_LoadStateNotLoad
|
||||
return successResponse, nil
|
||||
}
|
||||
return &milvuspb.GetLoadStateResponse{
|
||||
Status: InSufficientMemoryStatus(request.GetCollectionName()),
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
successResponse.State = commonpb.LoadState_LoadStateNotLoad
|
||||
return successResponse, nil
|
||||
}
|
||||
}
|
||||
if progress >= 100 {
|
||||
|
|
|
@ -4242,18 +4242,8 @@ func TestProxy_GetLoadState(t *testing.T) {
|
|||
|
||||
{
|
||||
qc := getQueryCoordClient()
|
||||
qc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
|
||||
State: &milvuspb.ComponentInfo{
|
||||
NodeID: 0,
|
||||
Role: typeutil.QueryCoordRole,
|
||||
StateCode: commonpb.StateCode_Abnormal,
|
||||
ExtraInfo: nil,
|
||||
},
|
||||
SubcomponentStates: nil,
|
||||
Status: merr.Status(nil),
|
||||
}, nil)
|
||||
qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady("initialization")),
|
||||
CollectionIDs: nil,
|
||||
InMemoryPercentages: []int64{},
|
||||
}, nil)
|
||||
|
@ -4261,27 +4251,17 @@ func TestProxy_GetLoadState(t *testing.T) {
|
|||
proxy.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stateResp.GetStatus().GetErrorCode())
|
||||
assert.ErrorIs(t, merr.Error(stateResp.GetStatus()), merr.ErrServiceNotReady)
|
||||
|
||||
progressResp, err := proxy.GetLoadingProgress(context.Background(), &milvuspb.GetLoadingProgressRequest{CollectionName: "foo"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, progressResp.GetStatus().GetErrorCode())
|
||||
assert.ErrorIs(t, merr.Error(progressResp.GetStatus()), merr.ErrServiceNotReady)
|
||||
}
|
||||
|
||||
{
|
||||
qc := getQueryCoordClient()
|
||||
qc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
|
||||
State: &milvuspb.ComponentInfo{
|
||||
NodeID: 0,
|
||||
Role: typeutil.QueryCoordRole,
|
||||
StateCode: commonpb.StateCode_Healthy,
|
||||
ExtraInfo: nil,
|
||||
},
|
||||
SubcomponentStates: nil,
|
||||
Status: merr.Status(nil),
|
||||
}, nil)
|
||||
qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(nil, errors.New("test"))
|
||||
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(nil, errors.New("test"))
|
||||
qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(nil, merr.WrapErrCollectionNotLoaded("foo"))
|
||||
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(nil, merr.WrapErrPartitionNotLoaded("p1"))
|
||||
proxy := &Proxy{queryCoord: qc}
|
||||
proxy.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
|
||||
|
@ -4306,37 +4286,6 @@ func TestProxy_GetLoadState(t *testing.T) {
|
|||
assert.Equal(t, int64(0), progressResp.Progress)
|
||||
}
|
||||
|
||||
{
|
||||
qc := getQueryCoordClient()
|
||||
qc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
|
||||
State: &milvuspb.ComponentInfo{
|
||||
NodeID: 0,
|
||||
Role: typeutil.QueryCoordRole,
|
||||
StateCode: commonpb.StateCode_Healthy,
|
||||
ExtraInfo: nil,
|
||||
},
|
||||
SubcomponentStates: nil,
|
||||
Status: merr.Status(nil),
|
||||
}, nil)
|
||||
qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
CollectionIDs: nil,
|
||||
InMemoryPercentages: []int64{},
|
||||
}, nil)
|
||||
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(nil, errors.New("test"))
|
||||
proxy := &Proxy{queryCoord: qc}
|
||||
proxy.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
|
||||
stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, stateResp.GetStatus().GetErrorCode())
|
||||
assert.Equal(t, commonpb.LoadState_LoadStateNotLoad, stateResp.State)
|
||||
|
||||
progressResp, err := proxy.GetLoadingProgress(context.Background(), &milvuspb.GetLoadingProgressRequest{CollectionName: "foo"})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, progressResp.GetStatus().GetErrorCode())
|
||||
}
|
||||
|
||||
{
|
||||
qc := getQueryCoordClient()
|
||||
qc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
|
||||
|
@ -4415,11 +4364,13 @@ func TestProxy_GetLoadState(t *testing.T) {
|
|||
SubcomponentStates: nil,
|
||||
Status: merr.Status(nil),
|
||||
}, nil)
|
||||
|
||||
mockErr := merr.WrapErrServiceMemoryLimitExceeded(110, 100)
|
||||
qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_InsufficientMemoryToLoad},
|
||||
Status: merr.Status(mockErr),
|
||||
}, nil)
|
||||
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&querypb.ShowPartitionsResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_InsufficientMemoryToLoad},
|
||||
Status: merr.Status(mockErr),
|
||||
}, nil)
|
||||
proxy := &Proxy{queryCoord: qc}
|
||||
proxy.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
|
|
|
@ -1241,32 +1241,22 @@ func getCollectionProgress(
|
|||
CollectionIDs: []int64{collectionID},
|
||||
})
|
||||
if err != nil {
|
||||
log.Warn("fail to show collections", zap.Int64("collection_id", collectionID), zap.Error(err))
|
||||
log.Warn("fail to show collections",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Error(err),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_InsufficientMemoryToLoad {
|
||||
err = ErrInsufficientMemory
|
||||
log.Warn("detected insufficientMemoryError when getCollectionProgress", zap.Int64("collection_id", collectionID), zap.String("reason", resp.GetStatus().GetReason()))
|
||||
return
|
||||
}
|
||||
|
||||
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
err = merr.Error(resp.GetStatus())
|
||||
log.Warn("fail to show collections", zap.Int64("collection_id", collectionID),
|
||||
zap.String("reason", resp.GetStatus().GetReason()))
|
||||
return
|
||||
}
|
||||
|
||||
if len(resp.InMemoryPercentages) == 0 {
|
||||
errMsg := "fail to show collections from the querycoord, no data"
|
||||
err = errors.New(errMsg)
|
||||
log.Warn(errMsg, zap.Int64("collection_id", collectionID))
|
||||
err = merr.Error(resp.GetStatus())
|
||||
if err != nil {
|
||||
log.Warn("fail to show collections",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
loadProgress = resp.GetInMemoryPercentages()[0]
|
||||
|
||||
if len(resp.GetRefreshProgress()) > 0 { // Compatibility for new Proxy with old QueryCoord
|
||||
refreshProgress = resp.GetRefreshProgress()[0]
|
||||
}
|
||||
|
@ -1311,34 +1301,17 @@ func getPartitionProgress(
|
|||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
if resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_InsufficientMemoryToLoad {
|
||||
err = ErrInsufficientMemory
|
||||
log.Warn("detected insufficientMemoryError when getPartitionProgress",
|
||||
zap.Int64("collection_id", collectionID),
|
||||
zap.String("collection_name", collectionName),
|
||||
zap.Strings("partition_names", partitionNames),
|
||||
zap.String("reason", resp.GetStatus().GetReason()),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
err = merr.Error(resp.GetStatus())
|
||||
if err != nil {
|
||||
err = merr.Error(resp.GetStatus())
|
||||
log.Warn("fail to show partitions",
|
||||
zap.String("collection_name", collectionName),
|
||||
zap.Strings("partition_names", partitionNames),
|
||||
zap.String("reason", resp.GetStatus().GetReason()))
|
||||
zap.String("collectionName", collectionName),
|
||||
zap.Strings("partitionNames", partitionNames),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if len(resp.InMemoryPercentages) != len(partitionIDs) {
|
||||
errMsg := "fail to show partitions from the querycoord, invalid data num"
|
||||
err = errors.New(errMsg)
|
||||
log.Warn(errMsg, zap.Int64("collection_id", collectionID),
|
||||
zap.String("collection_name", collectionName),
|
||||
zap.Strings("partition_names", partitionNames))
|
||||
return
|
||||
}
|
||||
for _, p := range resp.InMemoryPercentages {
|
||||
loadProgress += p
|
||||
}
|
||||
|
|
|
@ -104,7 +104,7 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio
|
|||
}, nil
|
||||
}
|
||||
|
||||
err = fmt.Errorf("collection %d has not been loaded to memory or load failed", collectionID)
|
||||
err = merr.WrapErrCollectionNotLoaded(collectionID)
|
||||
log.Warn("show collection failed", zap.Error(err))
|
||||
return &querypb.ShowCollectionsResponse{
|
||||
Status: merr.Status(err),
|
||||
|
@ -162,10 +162,9 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
|
|||
}
|
||||
|
||||
err = merr.WrapErrPartitionNotLoaded(partitionID)
|
||||
msg := fmt.Sprintf("partition %d has not been loaded to memory or load failed", partitionID)
|
||||
log.Warn(msg)
|
||||
log.Warn("show partitions failed", zap.Error(err))
|
||||
return &querypb.ShowPartitionsResponse{
|
||||
Status: merr.Status(errors.Wrap(err, msg)),
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
percentages = append(percentages, int64(percentage))
|
||||
|
|
|
@ -419,21 +419,16 @@ func (m *importManager) isRowbased(files []string) (bool, error) {
|
|||
// importJob processes the import request, generates import tasks, sends these tasks to DataCoord, and returns
|
||||
// immediately.
|
||||
func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportRequest, cID int64, pID int64) *milvuspb.ImportResponse {
|
||||
returnErrorFunc := func(reason string) *milvuspb.ImportResponse {
|
||||
if len(req.GetFiles()) == 0 {
|
||||
return &milvuspb.ImportResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: reason,
|
||||
},
|
||||
Status: merr.Status(merr.WrapErrParameterInvalidMsg("import request is empty")),
|
||||
}
|
||||
}
|
||||
|
||||
if req == nil || len(req.Files) == 0 {
|
||||
return returnErrorFunc("import request is empty")
|
||||
}
|
||||
|
||||
if m.callImportService == nil {
|
||||
return returnErrorFunc("import service is not available")
|
||||
return &milvuspb.ImportResponse{
|
||||
Status: merr.Status(merr.WrapErrServiceUnavailable("import service unavailable")),
|
||||
}
|
||||
}
|
||||
|
||||
resp := &milvuspb.ImportResponse{
|
||||
|
@ -553,7 +548,9 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque
|
|||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
return returnErrorFunc(err.Error())
|
||||
return &milvuspb.ImportResponse{
|
||||
Status: merr.Status(err),
|
||||
}
|
||||
}
|
||||
if sendOutTasksErr := m.sendOutTasks(ctx); sendOutTasksErr != nil {
|
||||
log.Error("fail to send out tasks", zap.Error(sendOutTasksErr))
|
||||
|
@ -755,11 +752,8 @@ func (m *importManager) copyTaskInfo(input *datapb.ImportTaskInfo, output *milvu
|
|||
// getTaskState looks for task with the given ID and returns its import state.
|
||||
func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse {
|
||||
resp := &milvuspb.GetImportStateResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "import task id doesn't exist",
|
||||
},
|
||||
Infos: make([]*commonpb.KeyValuePair, 0),
|
||||
Status: merr.Status(nil),
|
||||
Infos: make([]*commonpb.KeyValuePair, 0),
|
||||
}
|
||||
// (1) Search in pending tasks list.
|
||||
found := false
|
||||
|
@ -786,24 +780,24 @@ func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse
|
|||
return resp
|
||||
}
|
||||
// (3) Search in Etcd.
|
||||
if v, err := m.taskStore.Load(BuildImportTaskKey(tID)); err == nil && v != "" {
|
||||
ti := &datapb.ImportTaskInfo{}
|
||||
if err := proto.Unmarshal([]byte(v), ti); err != nil {
|
||||
log.Error("failed to unmarshal proto", zap.String("taskInfo", v), zap.Error(err))
|
||||
} else {
|
||||
m.copyTaskInfo(ti, resp)
|
||||
found = true
|
||||
}
|
||||
} else {
|
||||
v, err := m.taskStore.Load(BuildImportTaskKey(tID))
|
||||
if err != nil {
|
||||
log.Warn("failed to load task info from Etcd",
|
||||
zap.String("value", v),
|
||||
zap.Error(err))
|
||||
}
|
||||
if found {
|
||||
log.Info("getting import task state", zap.Int64("task ID", tID), zap.Any("state", resp.State), zap.Int64s("segment", resp.SegmentIds))
|
||||
zap.Error(err),
|
||||
)
|
||||
resp.Status = merr.Status(err)
|
||||
return resp
|
||||
}
|
||||
log.Debug("get import task state failed", zap.Int64("taskID", tID))
|
||||
|
||||
ti := &datapb.ImportTaskInfo{}
|
||||
if err := proto.Unmarshal([]byte(v), ti); err != nil {
|
||||
log.Error("failed to unmarshal proto", zap.String("taskInfo", v), zap.Error(err))
|
||||
resp.Status = merr.Status(err)
|
||||
return resp
|
||||
}
|
||||
|
||||
m.copyTaskInfo(ti, resp)
|
||||
return resp
|
||||
}
|
||||
|
||||
|
|
|
@ -1865,25 +1865,25 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus
|
|||
// Currently, Backup tool call import must with a partition name, each time restore a partition
|
||||
if req.GetPartitionName() != "" {
|
||||
if pID, err = c.meta.GetPartitionByName(cID, req.GetPartitionName(), typeutil.MaxTimestamp); err != nil {
|
||||
log.Warn("failed to get partition ID from its name", zap.String("partition name", req.GetPartitionName()), zap.Error(err))
|
||||
log.Warn("failed to get partition ID from its name", zap.String("partitionName", req.GetPartitionName()), zap.Error(err))
|
||||
return &milvuspb.ImportResponse{
|
||||
Status: merr.Status(merr.WrapBulkInsertPartitionNotFound(req.GetCollectionName(), req.GetPartitionName())),
|
||||
Status: merr.Status(merr.WrapErrPartitionNotFound(req.GetPartitionName())),
|
||||
}, nil
|
||||
}
|
||||
} else {
|
||||
log.Info("partition name not specified when backup recovery",
|
||||
zap.String("collectionName", req.GetCollectionName()))
|
||||
return &milvuspb.ImportResponse{
|
||||
Status: merr.Status(merr.WrapBadBulkInsertRequest("partition name not specified when backup")),
|
||||
Status: merr.Status(merr.WrapErrParameterInvalidMsg("partition not specified")),
|
||||
}, nil
|
||||
}
|
||||
} else {
|
||||
if hasPartitionKey {
|
||||
if req.GetPartitionName() != "" {
|
||||
msg := "not allow to set partition name for collection with partition key"
|
||||
log.Warn(msg, zap.String("collection name", req.GetCollectionName()))
|
||||
log.Warn(msg, zap.String("collectionName", req.GetCollectionName()))
|
||||
return &milvuspb.ImportResponse{
|
||||
Status: merr.Status(merr.WrapBadBulkInsertRequest(msg)),
|
||||
Status: merr.Status(merr.WrapErrParameterInvalidMsg(msg)),
|
||||
}, nil
|
||||
}
|
||||
} else {
|
||||
|
@ -1895,7 +1895,7 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus
|
|||
zap.String("partition name", req.GetPartitionName()),
|
||||
zap.Error(err))
|
||||
return &milvuspb.ImportResponse{
|
||||
Status: merr.Status(merr.WrapBulkInsertPartitionNotFound(req.GetCollectionName(), req.GetPartitionName())),
|
||||
Status: merr.Status(merr.WrapErrPartitionNotFound(req.GetPartitionName())),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
@ -1904,8 +1904,8 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus
|
|||
log.Info("RootCoord receive import request",
|
||||
zap.String("collectionName", req.GetCollectionName()),
|
||||
zap.Int64("collectionID", cID),
|
||||
zap.String("partition name", req.GetPartitionName()),
|
||||
zap.Strings("virtual channel names", req.GetChannelNames()),
|
||||
zap.String("partitionName", req.GetPartitionName()),
|
||||
zap.Strings("virtualChannelNames", req.GetChannelNames()),
|
||||
zap.Int64("partitionID", pID),
|
||||
zap.Int("# of files = ", len(req.GetFiles())),
|
||||
)
|
||||
|
@ -1997,11 +1997,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
|
|||
// Upon receiving ReportImport request, update the related task's state in task store.
|
||||
ti, err := c.importManager.updateTaskInfo(ir)
|
||||
if err != nil {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UpdateImportTaskFailure,
|
||||
Reason: err.Error(),
|
||||
Code: merr.Code(err),
|
||||
}, nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
// If task failed, send task to idle datanode
|
||||
|
|
|
@ -1057,7 +1057,7 @@ func TestCore_Import(t *testing.T) {
|
|||
CollectionName: "a-good-name",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrBulkInsertPartitionNotFound)
|
||||
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrPartitionNotFound)
|
||||
})
|
||||
|
||||
t.Run("normal case", func(t *testing.T) {
|
||||
|
@ -1101,7 +1101,7 @@ func TestCore_Import(t *testing.T) {
|
|||
},
|
||||
})
|
||||
assert.NotNil(t, resp)
|
||||
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrBadBulkInsertRequest)
|
||||
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrParameterInvalid)
|
||||
})
|
||||
|
||||
// Remove the following case after bulkinsert can support partition key
|
||||
|
@ -1159,7 +1159,7 @@ func TestCore_Import(t *testing.T) {
|
|||
PartitionName: "p1",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrBadBulkInsertRequest)
|
||||
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrParameterInvalid)
|
||||
})
|
||||
|
||||
t.Run("backup should set partition name", func(t *testing.T) {
|
||||
|
@ -1201,7 +1201,7 @@ func TestCore_Import(t *testing.T) {
|
|||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.ErrorIs(t, merr.Error(resp1.GetStatus()), merr.ErrBadBulkInsertRequest)
|
||||
assert.ErrorIs(t, merr.Error(resp1.GetStatus()), merr.ErrParameterInvalid)
|
||||
|
||||
meta.GetPartitionByNameFunc = func(collID UniqueID, partitionName string, ts Timestamp) (UniqueID, error) {
|
||||
return common.InvalidPartitionID, fmt.Errorf("partition ID not found for partition name '%s'", partitionName)
|
||||
|
@ -1217,7 +1217,7 @@ func TestCore_Import(t *testing.T) {
|
|||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.ErrorIs(t, merr.Error(resp2.GetStatus()), merr.ErrBulkInsertPartitionNotFound)
|
||||
assert.ErrorIs(t, merr.Error(resp2.GetStatus()), merr.ErrPartitionNotFound)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -117,10 +117,6 @@ var (
|
|||
ErrInvalidSearchResult = newMilvusError("fail to parse search result", 1805, false)
|
||||
ErrCheckPrimaryKey = newMilvusError("please check the primary key and its' type can only in [int, string]", 1806, false)
|
||||
|
||||
// bulkinsert related
|
||||
ErrBadBulkInsertRequest = newMilvusError("bad bulkinsert request", 1900, false)
|
||||
ErrBulkInsertPartitionNotFound = newMilvusError("partition not found during bulkinsert", 1901, false)
|
||||
|
||||
// Segcore related
|
||||
ErrSegcore = newMilvusError("segcore error", 2000, false)
|
||||
|
||||
|
|
|
@ -133,10 +133,6 @@ func (s *ErrSuite) TestWrap() {
|
|||
|
||||
// field related
|
||||
s.ErrorIs(WrapErrFieldNotFound("meta", "failed to get field"), ErrFieldNotFound)
|
||||
|
||||
// bulkinsert related
|
||||
s.ErrorIs(WrapBadBulkInsertRequest("fail reason"), ErrBadBulkInsertRequest)
|
||||
s.ErrorIs(WrapBulkInsertPartitionNotFound("hello_milvus", "notexist"), ErrBulkInsertPartitionNotFound)
|
||||
}
|
||||
|
||||
func (s *ErrSuite) TestOldCode() {
|
||||
|
|
|
@ -639,11 +639,3 @@ func WrapErrFieldNotFound[T any](field T, msg ...string) error {
|
|||
func wrapWithField(err error, name string, value any) error {
|
||||
return errors.Wrapf(err, "%s=%v", name, value)
|
||||
}
|
||||
|
||||
func WrapBadBulkInsertRequest(msg ...string) error {
|
||||
return errors.Wrap(ErrBadBulkInsertRequest, strings.Join(msg, "; "))
|
||||
}
|
||||
|
||||
func WrapBulkInsertPartitionNotFound(collection any, partition any) error {
|
||||
return errors.Wrapf(ErrBulkInsertPartitionNotFound, "collection=%s, partition=%s", collection, partition)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue