mirror of https://github.com/milvus-io/milvus.git
Clear error message in the delete request (#26656)
Signed-off-by: SimFG <bang.fu@zilliz.com>pull/26712/head
parent
1b1bafaff1
commit
9311dc91ee
|
@ -164,39 +164,35 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
log := log.Ctx(ctx)
|
log := log.Ctx(ctx)
|
||||||
|
|
||||||
collName := dt.deleteMsg.CollectionName
|
collName := dt.deleteMsg.CollectionName
|
||||||
if err := validateCollectionName(collName); err != nil {
|
if err := validateCollectionName(collName); err != nil {
|
||||||
log.Warn("Invalid collection name", zap.Error(err))
|
return ErrWithLog(log, "Invalid collection name", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
collID, err := globalMetaCache.GetCollectionID(ctx, dt.deleteMsg.GetDbName(), collName)
|
collID, err := globalMetaCache.GetCollectionID(ctx, dt.deleteMsg.GetDbName(), collName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Failed to get collection id", zap.Error(err))
|
return ErrWithLog(log, "Failed to get collection id", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
dt.deleteMsg.CollectionID = collID
|
dt.deleteMsg.CollectionID = collID
|
||||||
dt.collectionID = collID
|
dt.collectionID = collID
|
||||||
|
|
||||||
partitionKeyMode, err := isPartitionKeyMode(ctx, dt.deleteMsg.GetDbName(), dt.deleteMsg.CollectionName)
|
partitionKeyMode, err := isPartitionKeyMode(ctx, dt.deleteMsg.GetDbName(), dt.deleteMsg.CollectionName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Failed to get partition key mode", zap.Error(err))
|
return ErrWithLog(log, "Failed to get partition key mode", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
if partitionKeyMode && len(dt.deleteMsg.PartitionName) != 0 {
|
if partitionKeyMode && len(dt.deleteMsg.PartitionName) != 0 {
|
||||||
return errors.New("not support manually specifying the partition names if partition key mode is used")
|
return ErrWithLog(log, "", errors.New("not support manually specifying the partition names if partition key mode is used"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// If partitionName is not empty, partitionID will be set.
|
// If partitionName is not empty, partitionID will be set.
|
||||||
if len(dt.deleteMsg.PartitionName) > 0 {
|
if len(dt.deleteMsg.PartitionName) > 0 {
|
||||||
partName := dt.deleteMsg.PartitionName
|
partName := dt.deleteMsg.PartitionName
|
||||||
if err := validatePartitionTag(partName, true); err != nil {
|
if err := validatePartitionTag(partName, true); err != nil {
|
||||||
log.Info("Invalid partition name", zap.Error(err))
|
return ErrWithLog(log, "Invalid partition name", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
partID, err := globalMetaCache.GetPartitionID(ctx, dt.deleteMsg.GetDbName(), collName, partName)
|
partID, err := globalMetaCache.GetPartitionID(ctx, dt.deleteMsg.GetDbName(), collName, partName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Info("Failed to get partition id", zap.Error(err))
|
return ErrWithLog(log, "Failed to get partition id", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
dt.deleteMsg.PartitionID = partID
|
dt.deleteMsg.PartitionID = partID
|
||||||
} else {
|
} else {
|
||||||
|
@ -205,16 +201,14 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error {
|
||||||
|
|
||||||
schema, err := globalMetaCache.GetCollectionSchema(ctx, dt.deleteMsg.GetDbName(), collName)
|
schema, err := globalMetaCache.GetCollectionSchema(ctx, dt.deleteMsg.GetDbName(), collName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Info("Failed to get collection schema", zap.Error(err))
|
return ErrWithLog(log, "Failed to get collection schema", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
dt.schema = schema
|
dt.schema = schema
|
||||||
|
|
||||||
// get delete.primaryKeys from delete expr
|
// get delete.primaryKeys from delete expr
|
||||||
primaryKeys, numRow, err := getPrimaryKeysFromExpr(schema, dt.deleteExpr)
|
primaryKeys, numRow, err := getPrimaryKeysFromExpr(schema, dt.deleteExpr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Info("Failed to get primary keys from expr", zap.Error(err))
|
return ErrWithLog(log, "Failed to get primary keys from expr", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dt.deleteMsg.NumRows = numRow
|
dt.deleteMsg.NumRows = numRow
|
||||||
|
|
|
@ -5,6 +5,10 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
|
|
||||||
|
"github.com/cockroachdb/errors"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
|
|
||||||
|
@ -81,4 +85,159 @@ func TestDeleteTask(t *testing.T) {
|
||||||
resChannels = dt.getChannels()
|
resChannels = dt.getChannels()
|
||||||
assert.ElementsMatch(t, channels, resChannels)
|
assert.ElementsMatch(t, channels, resChannels)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("empty collection name", func(t *testing.T) {
|
||||||
|
dt := deleteTask{
|
||||||
|
deleteMsg: &BaseDeleteTask{
|
||||||
|
DeleteRequest: msgpb.DeleteRequest{
|
||||||
|
Base: &commonpb.MsgBase{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.Error(t, dt.PreExecute(context.Background()))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("fail to get collection id", func(t *testing.T) {
|
||||||
|
dt := deleteTask{deleteMsg: &BaseDeleteTask{
|
||||||
|
DeleteRequest: msgpb.DeleteRequest{
|
||||||
|
Base: &commonpb.MsgBase{},
|
||||||
|
CollectionName: "foo",
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
cache := NewMockCache(t)
|
||||||
|
cache.On("GetCollectionID",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
).Return(int64(0), errors.New("mock GetCollectionID err"))
|
||||||
|
globalMetaCache = cache
|
||||||
|
assert.Error(t, dt.PreExecute(context.Background()))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("fail partition key mode", func(t *testing.T) {
|
||||||
|
dt := deleteTask{deleteMsg: &BaseDeleteTask{
|
||||||
|
DeleteRequest: msgpb.DeleteRequest{
|
||||||
|
Base: &commonpb.MsgBase{},
|
||||||
|
CollectionName: "foo",
|
||||||
|
DbName: "db_1",
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
cache := NewMockCache(t)
|
||||||
|
cache.On("GetCollectionID",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
).Return(int64(10000), nil)
|
||||||
|
cache.On("GetCollectionSchema",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
).Return(nil, errors.New("mock GetCollectionSchema err"))
|
||||||
|
|
||||||
|
globalMetaCache = cache
|
||||||
|
assert.Error(t, dt.PreExecute(context.Background()))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("invalid partition name", func(t *testing.T) {
|
||||||
|
dt := deleteTask{deleteMsg: &BaseDeleteTask{
|
||||||
|
DeleteRequest: msgpb.DeleteRequest{
|
||||||
|
Base: &commonpb.MsgBase{},
|
||||||
|
CollectionName: "foo",
|
||||||
|
DbName: "db_1",
|
||||||
|
PartitionName: "aaa",
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
cache := NewMockCache(t)
|
||||||
|
cache.On("GetCollectionID",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
).Return(int64(10000), nil)
|
||||||
|
cache.On("GetCollectionSchema",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
).Return(&schemapb.CollectionSchema{
|
||||||
|
Name: "test_delete",
|
||||||
|
Description: "",
|
||||||
|
AutoID: false,
|
||||||
|
Fields: []*schemapb.FieldSchema{
|
||||||
|
{
|
||||||
|
FieldID: common.StartOfUserFieldID,
|
||||||
|
Name: "pk",
|
||||||
|
IsPrimaryKey: true,
|
||||||
|
DataType: schemapb.DataType_Int64,
|
||||||
|
IsPartitionKey: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
globalMetaCache = cache
|
||||||
|
assert.Error(t, dt.PreExecute(context.Background()))
|
||||||
|
})
|
||||||
|
|
||||||
|
schema := &schemapb.CollectionSchema{
|
||||||
|
Name: "test_delete",
|
||||||
|
Description: "",
|
||||||
|
AutoID: false,
|
||||||
|
Fields: []*schemapb.FieldSchema{
|
||||||
|
{
|
||||||
|
FieldID: common.StartOfUserFieldID,
|
||||||
|
Name: "pk",
|
||||||
|
IsPrimaryKey: true,
|
||||||
|
DataType: schemapb.DataType_Int64,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
FieldID: common.StartOfUserFieldID + 1,
|
||||||
|
Name: "non_pk",
|
||||||
|
IsPrimaryKey: false,
|
||||||
|
DataType: schemapb.DataType_Int64,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("invalie partition", func(t *testing.T) {
|
||||||
|
dt := deleteTask{
|
||||||
|
deleteMsg: &BaseDeleteTask{
|
||||||
|
DeleteRequest: msgpb.DeleteRequest{
|
||||||
|
Base: &commonpb.MsgBase{},
|
||||||
|
CollectionName: "foo",
|
||||||
|
DbName: "db_1",
|
||||||
|
PartitionName: "_aaa",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
deleteExpr: "non_pk in [1, 2, 3]",
|
||||||
|
}
|
||||||
|
cache := NewMockCache(t)
|
||||||
|
cache.On("GetCollectionID",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
).Return(int64(10000), nil)
|
||||||
|
cache.On("GetCollectionSchema",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
).Return(schema, nil)
|
||||||
|
cache.On("GetPartitionID",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
).Return(int64(0), errors.New("mock GetPartitionID err"))
|
||||||
|
|
||||||
|
globalMetaCache = cache
|
||||||
|
assert.Error(t, dt.PreExecute(context.Background()))
|
||||||
|
|
||||||
|
dt.deleteMsg.PartitionName = "aaa"
|
||||||
|
assert.Error(t, dt.PreExecute(context.Background()))
|
||||||
|
|
||||||
|
cache.On("GetPartitionID",
|
||||||
|
mock.Anything, // context.Context
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
mock.AnythingOfType("string"),
|
||||||
|
).Return(int64(100001), nil)
|
||||||
|
assert.Error(t, dt.PreExecute(context.Background()))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -1393,6 +1393,16 @@ func memsetLoop[T any](v T, numRows int) []T {
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ErrWithLog(logger *log.MLogger, msg string, err error) error {
|
||||||
|
wrapErr := errors.Wrap(err, msg)
|
||||||
|
if logger != nil {
|
||||||
|
logger.Warn(msg, zap.Error(err))
|
||||||
|
return wrapErr
|
||||||
|
}
|
||||||
|
log.Warn(msg, zap.Error(err))
|
||||||
|
return wrapErr
|
||||||
|
}
|
||||||
|
|
||||||
func verifyDynamicFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error {
|
func verifyDynamicFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error {
|
||||||
for _, field := range insertMsg.FieldsData {
|
for _, field := range insertMsg.FieldsData {
|
||||||
if field.GetFieldName() == common.MetaFieldName {
|
if field.GetFieldName() == common.MetaFieldName {
|
||||||
|
|
|
@ -25,6 +25,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
|
@ -1814,6 +1816,12 @@ func Test_GetPartitionProgressFailed(t *testing.T) {
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestErrWithLog(t *testing.T) {
|
||||||
|
err := errors.New("test")
|
||||||
|
assert.ErrorIs(t, ErrWithLog(nil, "foo", err), err)
|
||||||
|
assert.ErrorIs(t, ErrWithLog(log.Ctx(context.Background()), "foo", err), err)
|
||||||
|
}
|
||||||
|
|
||||||
func Test_CheckDynamicFieldData(t *testing.T) {
|
func Test_CheckDynamicFieldData(t *testing.T) {
|
||||||
t.Run("normal case", func(t *testing.T) {
|
t.Run("normal case", func(t *testing.T) {
|
||||||
jsonData := make([][]byte, 0)
|
jsonData := make([][]byte, 0)
|
||||||
|
|
Loading…
Reference in New Issue