mirror of https://github.com/milvus-io/milvus.git
enhance: delete should inc ProxyFunctionCall metric failed label when run failed instead abandon (#29621)
Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/30083/head
parent
57bd3e2181
commit
04e21564b6
|
@ -2424,7 +2424,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
|||
if err := dr.Run(ctx); err != nil {
|
||||
log.Error("Failed to enqueue delete task: " + err.Error())
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.AbandonLabel).Inc()
|
||||
metrics.FailLabel).Inc()
|
||||
|
||||
return &milvuspb.MutationResult{
|
||||
Status: merr.Status(err),
|
||||
|
|
|
@ -19,6 +19,7 @@ package proxy
|
|||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -32,6 +33,8 @@ import (
|
|||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/internal/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
|
@ -39,6 +42,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
||||
|
@ -1120,6 +1124,83 @@ func TestProxy_AllocTimestamp(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestProxy_Delete(t *testing.T) {
|
||||
collectionName := "test_delete"
|
||||
collectionID := int64(111)
|
||||
partitionName := "default"
|
||||
partitionID := int64(222)
|
||||
channels := []string{"test_vchannel"}
|
||||
dbName := "test_1"
|
||||
collSchema := &schemapb.CollectionSchema{
|
||||
Name: collectionName,
|
||||
Description: "",
|
||||
AutoID: false,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: common.StartOfUserFieldID,
|
||||
Name: "pk",
|
||||
IsPrimaryKey: true,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: common.StartOfUserFieldID + 1,
|
||||
Name: "non_pk",
|
||||
IsPrimaryKey: false,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
}
|
||||
schema := newSchemaInfo(collSchema)
|
||||
paramtable.Init()
|
||||
|
||||
t.Run("delete run failed", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
chMgr := NewMockChannelsMgr(t)
|
||||
|
||||
req := &milvuspb.DeleteRequest{
|
||||
CollectionName: collectionName,
|
||||
DbName: dbName,
|
||||
PartitionName: partitionName,
|
||||
Expr: "pk in [1, 2, 3]",
|
||||
}
|
||||
cache := NewMockCache(t)
|
||||
cache.On("GetCollectionID",
|
||||
mock.Anything, // context.Context
|
||||
mock.AnythingOfType("string"),
|
||||
mock.AnythingOfType("string"),
|
||||
).Return(collectionID, nil)
|
||||
cache.On("GetCollectionSchema",
|
||||
mock.Anything, // context.Context
|
||||
mock.AnythingOfType("string"),
|
||||
mock.AnythingOfType("string"),
|
||||
).Return(schema, nil)
|
||||
cache.On("GetPartitionID",
|
||||
mock.Anything, // context.Context
|
||||
mock.AnythingOfType("string"),
|
||||
mock.AnythingOfType("string"),
|
||||
mock.AnythingOfType("string"),
|
||||
).Return(partitionID, nil)
|
||||
chMgr.On("getVChannels", mock.Anything).Return(channels, nil)
|
||||
chMgr.On("getChannels", mock.Anything).Return(nil, fmt.Errorf("mock error"))
|
||||
globalMetaCache = cache
|
||||
rc := mocks.NewMockRootCoordClient(t)
|
||||
tsoAllocator := &mockTsoAllocator{}
|
||||
idAllocator, err := allocator.NewIDAllocator(ctx, rc, 0)
|
||||
assert.NoError(t, err)
|
||||
|
||||
queue, err := newTaskScheduler(ctx, tsoAllocator, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
node := &Proxy{chMgr: chMgr, rowIDAllocator: idAllocator, sched: queue}
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
resp, err := node.Delete(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Error(t, merr.Error(resp.GetStatus()))
|
||||
})
|
||||
}
|
||||
|
||||
func TestProxy_ReplicateMessage(t *testing.T) {
|
||||
paramtable.Init()
|
||||
defer paramtable.Get().Save(paramtable.Get().CommonCfg.TTMsgEnabled.Key, "true")
|
||||
|
|
Loading…
Reference in New Issue