mirror of https://github.com/milvus-io/milvus.git
Refine delete by expression for forbid proxy dml task scheduler hang (#29340)
relate: https://github.com/milvus-io/milvus/issues/29146 --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/29512/head
parent
7c7003bff6
commit
a76e3b2813
2
go.mod
2
go.mod
|
@ -23,7 +23,7 @@ require (
|
|||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
github.com/klauspost/compress v1.16.7
|
||||
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231220103033-abd0d12ba669
|
||||
github.com/minio/minio-go/v7 v7.0.61
|
||||
github.com/prometheus/client_golang v1.14.0
|
||||
github.com/prometheus/client_model v0.3.0
|
||||
|
|
4
go.sum
4
go.sum
|
@ -583,8 +583,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le
|
|||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c h1:Wbc2IZt/13+B5jc8JPU/dOxGYy+1jeOsChVgcza+qgw=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231220103033-abd0d12ba669 h1:yUtc+pVKVhmmnwTY9iyV8+EmhrNjZ74Hxm3y5QKCNyg=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231220103033-abd0d12ba669/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
|
||||
github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092 h1:UYJ7JB+QlMOoFHNdd8mUa3/lV63t9dnBX7ILXmEEWPY=
|
||||
github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
|
||||
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
|
||||
|
|
|
@ -0,0 +1,142 @@
|
|||
// Code generated by mockery v2.32.4. DO NOT EDIT.
|
||||
|
||||
package allocator
|
||||
|
||||
import mock "github.com/stretchr/testify/mock"
|
||||
|
||||
// MockAllocator is an autogenerated mock type for the Interface type
|
||||
type MockAllocator struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
type MockAllocator_Expecter struct {
|
||||
mock *mock.Mock
|
||||
}
|
||||
|
||||
func (_m *MockAllocator) EXPECT() *MockAllocator_Expecter {
|
||||
return &MockAllocator_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// Alloc provides a mock function with given fields: count
|
||||
func (_m *MockAllocator) Alloc(count uint32) (int64, int64, error) {
|
||||
ret := _m.Called(count)
|
||||
|
||||
var r0 int64
|
||||
var r1 int64
|
||||
var r2 error
|
||||
if rf, ok := ret.Get(0).(func(uint32) (int64, int64, error)); ok {
|
||||
return rf(count)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(uint32) int64); ok {
|
||||
r0 = rf(count)
|
||||
} else {
|
||||
r0 = ret.Get(0).(int64)
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(uint32) int64); ok {
|
||||
r1 = rf(count)
|
||||
} else {
|
||||
r1 = ret.Get(1).(int64)
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(2).(func(uint32) error); ok {
|
||||
r2 = rf(count)
|
||||
} else {
|
||||
r2 = ret.Error(2)
|
||||
}
|
||||
|
||||
return r0, r1, r2
|
||||
}
|
||||
|
||||
// MockAllocator_Alloc_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Alloc'
|
||||
type MockAllocator_Alloc_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// Alloc is a helper method to define mock.On call
|
||||
// - count uint32
|
||||
func (_e *MockAllocator_Expecter) Alloc(count interface{}) *MockAllocator_Alloc_Call {
|
||||
return &MockAllocator_Alloc_Call{Call: _e.mock.On("Alloc", count)}
|
||||
}
|
||||
|
||||
func (_c *MockAllocator_Alloc_Call) Run(run func(count uint32)) *MockAllocator_Alloc_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(uint32))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockAllocator_Alloc_Call) Return(_a0 int64, _a1 int64, _a2 error) *MockAllocator_Alloc_Call {
|
||||
_c.Call.Return(_a0, _a1, _a2)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockAllocator_Alloc_Call) RunAndReturn(run func(uint32) (int64, int64, error)) *MockAllocator_Alloc_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// AllocOne provides a mock function with given fields:
|
||||
func (_m *MockAllocator) AllocOne() (int64, error) {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 int64
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func() (int64, error)); ok {
|
||||
return rf()
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func() int64); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(int64)
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func() error); ok {
|
||||
r1 = rf()
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockAllocator_AllocOne_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocOne'
|
||||
type MockAllocator_AllocOne_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// AllocOne is a helper method to define mock.On call
|
||||
func (_e *MockAllocator_Expecter) AllocOne() *MockAllocator_AllocOne_Call {
|
||||
return &MockAllocator_AllocOne_Call{Call: _e.mock.On("AllocOne")}
|
||||
}
|
||||
|
||||
func (_c *MockAllocator_AllocOne_Call) Run(run func()) *MockAllocator_AllocOne_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run()
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockAllocator_AllocOne_Call) Return(_a0 int64, _a1 error) *MockAllocator_AllocOne_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockAllocator_AllocOne_Call) RunAndReturn(run func() (int64, error)) *MockAllocator_AllocOne_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// NewMockAllocator creates a new instance of MockAllocator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewMockAllocator(t interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}) *MockAllocator {
|
||||
mock := &MockAllocator{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
|
@ -2397,20 +2397,19 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
|||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.TotalLabel).Inc()
|
||||
dt := &deleteTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
req: request,
|
||||
idAllocator: node.rowIDAllocator,
|
||||
chMgr: node.chMgr,
|
||||
chTicker: node.chTicker,
|
||||
lb: node.lbPolicy,
|
||||
|
||||
dr := &deleteRunner{
|
||||
req: request,
|
||||
idAllocator: node.rowIDAllocator,
|
||||
tsoAllocatorIns: node.tsoAllocator,
|
||||
chMgr: node.chMgr,
|
||||
chTicker: node.chTicker,
|
||||
queue: node.sched.dmQueue,
|
||||
lb: node.lbPolicy,
|
||||
}
|
||||
|
||||
log.Debug("Enqueue delete request in Proxy")
|
||||
|
||||
// MsgID will be set by Enqueue()
|
||||
if err := node.sched.dmQueue.Enqueue(dt); err != nil {
|
||||
log.Debug("init delete runner in Proxy")
|
||||
if err := dr.Init(ctx); err != nil {
|
||||
log.Error("Failed to enqueue delete task: " + err.Error())
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.AbandonLabel).Inc()
|
||||
|
@ -2420,25 +2419,26 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
|||
}, nil
|
||||
}
|
||||
|
||||
log.Debug("Detail of delete request in Proxy")
|
||||
log.Debug("Run delete in Proxy")
|
||||
|
||||
if err := dt.WaitToFinish(); err != nil {
|
||||
log.Error("Failed to execute delete task in task scheduler: " + err.Error())
|
||||
if err := dr.Run(ctx); err != nil {
|
||||
log.Error("Failed to enqueue delete task: " + err.Error())
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.FailLabel).Inc()
|
||||
metrics.AbandonLabel).Inc()
|
||||
|
||||
return &milvuspb.MutationResult{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
receiveSize := proto.Size(dt.req)
|
||||
receiveSize := proto.Size(dr.req)
|
||||
rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize))
|
||||
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
|
||||
metrics.SuccessLabel).Inc()
|
||||
metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return dt.result, nil
|
||||
return dr.result, nil
|
||||
}
|
||||
|
||||
// Upsert upsert records into collection.
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
|
@ -37,8 +38,7 @@ type deleteTask struct {
|
|||
ctx context.Context
|
||||
tr *timerecord.TimeRecorder
|
||||
|
||||
req *milvuspb.DeleteRequest
|
||||
result *milvuspb.MutationResult
|
||||
req *milvuspb.DeleteRequest
|
||||
|
||||
// channel
|
||||
chMgr channelsMgr
|
||||
|
@ -46,17 +46,20 @@ type deleteTask struct {
|
|||
pChannels []pChan
|
||||
vChannels []vChan
|
||||
|
||||
idAllocator *allocator.IDAllocator
|
||||
lb LBPolicy
|
||||
idAllocator allocator.Interface
|
||||
|
||||
// delete info
|
||||
schema *schemapb.CollectionSchema
|
||||
ts Timestamp
|
||||
msgID UniqueID
|
||||
primaryKeys *schemapb.IDs
|
||||
collectionID UniqueID
|
||||
partitionID UniqueID
|
||||
count int
|
||||
partitionKeyMode bool
|
||||
|
||||
// set by scheduler
|
||||
ts Timestamp
|
||||
msgID UniqueID
|
||||
|
||||
// result
|
||||
count int64
|
||||
}
|
||||
|
||||
func (dt *deleteTask) TraceCtx() context.Context {
|
||||
|
@ -112,115 +115,14 @@ func (dt *deleteTask) getChannels() []pChan {
|
|||
return dt.pChannels
|
||||
}
|
||||
|
||||
func getExpr(plan *planpb.PlanNode) (bool, *planpb.Expr_TermExpr) {
|
||||
// simple delete request need expr with "pk in [a, b]"
|
||||
termExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_TermExpr)
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if !termExpr.TermExpr.GetColumnInfo().GetIsPrimaryKey() {
|
||||
return false, nil
|
||||
}
|
||||
return true, termExpr
|
||||
}
|
||||
|
||||
func getPrimaryKeysFromExpr(schema *schemapb.CollectionSchema, termExpr *planpb.Expr_TermExpr) (res *schemapb.IDs, rowNum int64, err error) {
|
||||
res = &schemapb.IDs{}
|
||||
rowNum = int64(len(termExpr.TermExpr.Values))
|
||||
switch termExpr.TermExpr.ColumnInfo.GetDataType() {
|
||||
case schemapb.DataType_Int64:
|
||||
ids := make([]int64, 0)
|
||||
for _, v := range termExpr.TermExpr.Values {
|
||||
ids = append(ids, v.GetInt64Val())
|
||||
}
|
||||
res.IdField = &schemapb.IDs_IntId{
|
||||
IntId: &schemapb.LongArray{
|
||||
Data: ids,
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_VarChar:
|
||||
ids := make([]string, 0)
|
||||
for _, v := range termExpr.TermExpr.Values {
|
||||
ids = append(ids, v.GetStringVal())
|
||||
}
|
||||
res.IdField = &schemapb.IDs_StrId{
|
||||
StrId: &schemapb.StringArray{
|
||||
Data: ids,
|
||||
},
|
||||
}
|
||||
default:
|
||||
return res, 0, fmt.Errorf("invalid field data type specifyed in delete expr")
|
||||
}
|
||||
|
||||
return res, rowNum, nil
|
||||
}
|
||||
|
||||
func (dt *deleteTask) PreExecute(ctx context.Context) error {
|
||||
dt.result = &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
IDs: &schemapb.IDs{
|
||||
IdField: nil,
|
||||
},
|
||||
Timestamp: dt.BeginTs(),
|
||||
}
|
||||
|
||||
log := log.Ctx(ctx)
|
||||
collName := dt.req.GetCollectionName()
|
||||
if err := validateCollectionName(collName); err != nil {
|
||||
return ErrWithLog(log, "Invalid collection name", err)
|
||||
}
|
||||
collID, err := globalMetaCache.GetCollectionID(ctx, dt.req.GetDbName(), collName)
|
||||
if err != nil {
|
||||
return ErrWithLog(log, "Failed to get collection id", err)
|
||||
}
|
||||
dt.collectionID = collID
|
||||
|
||||
dt.partitionKeyMode, err = isPartitionKeyMode(ctx, dt.req.GetDbName(), dt.req.GetCollectionName())
|
||||
if err != nil {
|
||||
return ErrWithLog(log, "Failed to get partition key mode", err)
|
||||
}
|
||||
if dt.partitionKeyMode && len(dt.req.PartitionName) != 0 {
|
||||
return errors.New("not support manually specifying the partition names if partition key mode is used")
|
||||
}
|
||||
|
||||
// If partitionName is not empty, partitionID will be set.
|
||||
if len(dt.req.PartitionName) > 0 {
|
||||
partName := dt.req.GetPartitionName()
|
||||
if err := validatePartitionTag(partName, true); err != nil {
|
||||
return ErrWithLog(log, "Invalid partition name", err)
|
||||
}
|
||||
partID, err := globalMetaCache.GetPartitionID(ctx, dt.req.GetDbName(), collName, partName)
|
||||
if err != nil {
|
||||
return ErrWithLog(log, "Failed to get partition id", err)
|
||||
}
|
||||
dt.partitionID = partID
|
||||
} else {
|
||||
dt.partitionID = common.InvalidPartitionID
|
||||
}
|
||||
|
||||
schema, err := globalMetaCache.GetCollectionSchema(ctx, dt.req.GetDbName(), collName)
|
||||
if err != nil {
|
||||
return ErrWithLog(log, "Failed to get collection schema", err)
|
||||
}
|
||||
dt.schema = schema
|
||||
|
||||
// hash primary keys to channels
|
||||
channelNames, err := dt.chMgr.getVChannels(dt.collectionID)
|
||||
if err != nil {
|
||||
return ErrWithLog(log, "Failed to get primary keys from expr", err)
|
||||
}
|
||||
dt.vChannels = channelNames
|
||||
|
||||
log.Debug("pre delete done", zap.Int64("collection_id", dt.collectionID))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dt *deleteTask) Execute(ctx context.Context) (err error) {
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute")
|
||||
defer sp.End()
|
||||
log := log.Ctx(ctx)
|
||||
// log := log.Ctx(ctx)
|
||||
|
||||
if len(dt.req.GetExpr()) == 0 {
|
||||
return merr.WrapErrParameterInvalid("valid expr", "empty expr", "invalid expression")
|
||||
|
@ -232,162 +134,7 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
plan, err := planparserv2.CreateRetrievePlan(dt.schema, dt.req.Expr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create expr plan, expr = %s", dt.req.GetExpr())
|
||||
}
|
||||
|
||||
isSimple, termExp := getExpr(plan)
|
||||
if isSimple {
|
||||
// if could get delete.primaryKeys from delete expr
|
||||
err := dt.simpleDelete(ctx, termExp, stream)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// if get complex delete expr
|
||||
// need query from querynode before delete
|
||||
err = dt.complexDelete(ctx, plan, stream)
|
||||
if err != nil {
|
||||
log.Warn("complex delete failed,but delete some data", zap.Int("count", dt.count), zap.String("expr", dt.req.GetExpr()))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dt *deleteTask) PostExecute(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dt *deleteTask) getStreamingQueryAndDelteFunc(stream msgstream.MsgStream, plan *planpb.PlanNode) executeFunc {
|
||||
return func(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channelIDs ...string) error {
|
||||
var partitionIDs []int64
|
||||
|
||||
// optimize query when partitionKey on
|
||||
if dt.partitionKeyMode {
|
||||
expr, err := ParseExprFromPlan(plan)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
partitionKeys := ParsePartitionKeys(expr)
|
||||
hashedPartitionNames, err := assignPartitionKeys(ctx, dt.req.GetDbName(), dt.req.GetCollectionName(), partitionKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
partitionIDs, err = getPartitionIDs(ctx, dt.req.GetDbName(), dt.req.GetCollectionName(), hashedPartitionNames)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if dt.partitionID != common.InvalidFieldID {
|
||||
partitionIDs = []int64{dt.partitionID}
|
||||
}
|
||||
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("collectionID", dt.collectionID),
|
||||
zap.Int64s("partitionIDs", partitionIDs),
|
||||
zap.Strings("channels", channelIDs),
|
||||
zap.Int64("nodeID", nodeID))
|
||||
// set plan
|
||||
_, outputFieldIDs := translatePkOutputFields(dt.schema)
|
||||
outputFieldIDs = append(outputFieldIDs, common.TimeStampField)
|
||||
plan.OutputFieldIds = outputFieldIDs
|
||||
log.Debug("start query for delete")
|
||||
|
||||
serializedPlan, err := proto.Marshal(plan)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
queryReq := &querypb.QueryRequest{
|
||||
Req: &internalpb.RetrieveRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
|
||||
commonpbutil.WithMsgID(dt.msgID),
|
||||
commonpbutil.WithSourceID(paramtable.GetNodeID()),
|
||||
commonpbutil.WithTargetID(nodeID),
|
||||
),
|
||||
MvccTimestamp: dt.ts,
|
||||
ReqID: paramtable.GetNodeID(),
|
||||
DbID: 0, // TODO
|
||||
CollectionID: dt.collectionID,
|
||||
PartitionIDs: partitionIDs,
|
||||
SerializedExprPlan: serializedPlan,
|
||||
OutputFieldsId: outputFieldIDs,
|
||||
GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dt.ts, dt.ts, commonpb.ConsistencyLevel_Bounded),
|
||||
},
|
||||
DmlChannels: channelIDs,
|
||||
Scope: querypb.DataScope_All,
|
||||
}
|
||||
|
||||
rc := timerecord.NewTimeRecorder("QueryStreamDelete")
|
||||
client, err := qn.QueryStream(ctx, queryReq)
|
||||
if err != nil {
|
||||
log.Warn("query stream for delete create failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
result, err := client.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
log.Debug("query stream for delete finished", zap.Int64("msgID", dt.msgID), zap.Duration("duration", rc.ElapseSpan()))
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
err = merr.Error(result.GetStatus())
|
||||
if err != nil {
|
||||
log.Warn("query stream for delete get error status", zap.Int64("msgID", dt.msgID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = dt.produce(ctx, stream, result.GetIds())
|
||||
if err != nil {
|
||||
log.Warn("query stream for delete produce result failed", zap.Int64("msgID", dt.msgID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dt *deleteTask) complexDelete(ctx context.Context, plan *planpb.PlanNode, stream msgstream.MsgStream) error {
|
||||
err := dt.lb.Execute(ctx, CollectionWorkLoad{
|
||||
db: dt.req.GetDbName(),
|
||||
collectionName: dt.req.GetCollectionName(),
|
||||
collectionID: dt.collectionID,
|
||||
nq: 1,
|
||||
exec: dt.getStreamingQueryAndDelteFunc(stream, plan),
|
||||
})
|
||||
if err != nil {
|
||||
log.Warn("fail to get or create dml stream", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dt *deleteTask) simpleDelete(ctx context.Context, termExp *planpb.Expr_TermExpr, stream msgstream.MsgStream) error {
|
||||
primaryKeys, numRow, err := getPrimaryKeysFromExpr(dt.schema, termExp)
|
||||
if err != nil {
|
||||
log.Info("Failed to get primary keys from expr", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("get primary keys from expr",
|
||||
zap.Int64("len of primary keys", numRow),
|
||||
zap.Int64("collectionID", dt.collectionID),
|
||||
zap.Int64("partitionID", dt.partitionID))
|
||||
err = dt.produce(ctx, stream, primaryKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dt *deleteTask) produce(ctx context.Context, stream msgstream.MsgStream, primaryKeys *schemapb.IDs) error {
|
||||
hashValues := typeutil.HashPK2Channels(primaryKeys, dt.vChannels)
|
||||
hashValues := typeutil.HashPK2Channels(dt.primaryKeys, dt.vChannels)
|
||||
// repack delete msg by dmChannel
|
||||
result := make(map[uint32]msgstream.TsMsg)
|
||||
numRows := int64(0)
|
||||
|
@ -406,7 +153,7 @@ func (dt *deleteTask) produce(ctx context.Context, stream msgstream.MsgStream, p
|
|||
curMsg.HashValues = append(curMsg.HashValues, hashValues[index])
|
||||
curMsg.Timestamps = append(curMsg.Timestamps, dt.ts)
|
||||
|
||||
typeutil.AppendIDs(curMsg.PrimaryKeys, primaryKeys, index)
|
||||
typeutil.AppendIDs(curMsg.PrimaryKeys, dt.primaryKeys, index)
|
||||
curMsg.NumRows++
|
||||
numRows++
|
||||
}
|
||||
|
@ -430,11 +177,15 @@ func (dt *deleteTask) produce(ctx context.Context, stream msgstream.MsgStream, p
|
|||
zap.Int64("taskID", dt.ID()),
|
||||
zap.Duration("prepare duration", dt.tr.RecordSpan()))
|
||||
|
||||
err := stream.Produce(msgPack)
|
||||
err = stream.Produce(msgPack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dt.result.DeleteCnt += numRows
|
||||
dt.count += numRows
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dt *deleteTask) PostExecute(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -465,3 +216,395 @@ func (dt *deleteTask) newDeleteMsg(ctx context.Context) (*msgstream.DeleteMsg, e
|
|||
DeleteRequest: sliceRequest,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type deleteRunner struct {
|
||||
req *milvuspb.DeleteRequest
|
||||
result *milvuspb.MutationResult
|
||||
|
||||
// channel
|
||||
chMgr channelsMgr
|
||||
chTicker channelsTimeTicker
|
||||
vChannels []vChan
|
||||
|
||||
idAllocator allocator.Interface
|
||||
tsoAllocatorIns tsoAllocator
|
||||
|
||||
// delete info
|
||||
schema *schemapb.CollectionSchema
|
||||
collectionID UniqueID
|
||||
partitionID UniqueID
|
||||
partitionKeyMode bool
|
||||
|
||||
// for query
|
||||
msgID int64
|
||||
ts uint64
|
||||
lb LBPolicy
|
||||
count atomic.Int64
|
||||
err error
|
||||
|
||||
// task queue
|
||||
queue *dmTaskQueue
|
||||
}
|
||||
|
||||
func (dr *deleteRunner) Init(ctx context.Context) error {
|
||||
log := log.Ctx(ctx)
|
||||
var err error
|
||||
|
||||
collName := dr.req.GetCollectionName()
|
||||
if err := validateCollectionName(collName); err != nil {
|
||||
return ErrWithLog(log, "Invalid collection name", err)
|
||||
}
|
||||
dr.collectionID, err = globalMetaCache.GetCollectionID(ctx, dr.req.GetDbName(), collName)
|
||||
if err != nil {
|
||||
return ErrWithLog(log, "Failed to get collection id", err)
|
||||
}
|
||||
|
||||
dr.schema, err = globalMetaCache.GetCollectionSchema(ctx, dr.req.GetDbName(), collName)
|
||||
if err != nil {
|
||||
return ErrWithLog(log, "Failed to get collection schema", err)
|
||||
}
|
||||
|
||||
dr.partitionKeyMode = hasParitionKeyModeField(dr.schema)
|
||||
// get prititionIDs of delete
|
||||
dr.partitionID = common.InvalidPartitionID
|
||||
if len(dr.req.PartitionName) > 0 {
|
||||
if dr.partitionKeyMode {
|
||||
return errors.New("not support manually specifying the partition names if partition key mode is used")
|
||||
}
|
||||
|
||||
partName := dr.req.GetPartitionName()
|
||||
if err := validatePartitionTag(partName, true); err != nil {
|
||||
return ErrWithLog(log, "Invalid partition name", err)
|
||||
}
|
||||
partID, err := globalMetaCache.GetPartitionID(ctx, dr.req.GetDbName(), collName, partName)
|
||||
if err != nil {
|
||||
return ErrWithLog(log, "Failed to get partition id", err)
|
||||
}
|
||||
dr.partitionID = partID
|
||||
}
|
||||
|
||||
// hash primary keys to channels
|
||||
channelNames, err := dr.chMgr.getVChannels(dr.collectionID)
|
||||
if err != nil {
|
||||
return ErrWithLog(log, "Failed to get primary keys from expr", err)
|
||||
}
|
||||
dr.vChannels = channelNames
|
||||
|
||||
dr.result = &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
IDs: &schemapb.IDs{
|
||||
IdField: nil,
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dr *deleteRunner) Run(ctx context.Context) error {
|
||||
plan, err := planparserv2.CreateRetrievePlan(dr.schema, dr.req.Expr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create expr plan, expr = %s", dr.req.GetExpr())
|
||||
}
|
||||
|
||||
isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema, plan)
|
||||
if isSimple {
|
||||
// if could get delete.primaryKeys from delete expr
|
||||
err := dr.simpleDelete(ctx, pk, numRow)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// if get complex delete expr
|
||||
// need query from querynode before delete
|
||||
err = dr.complexDelete(ctx, plan)
|
||||
if err != nil {
|
||||
log.Warn("complex delete failed,but delete some data", zap.Int64("count", dr.result.DeleteCnt), zap.String("expr", dr.req.GetExpr()))
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs) (*deleteTask, error) {
|
||||
task := &deleteTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
req: dr.req,
|
||||
idAllocator: dr.idAllocator,
|
||||
chMgr: dr.chMgr,
|
||||
chTicker: dr.chTicker,
|
||||
collectionID: dr.collectionID,
|
||||
partitionID: dr.partitionID,
|
||||
partitionKeyMode: dr.partitionKeyMode,
|
||||
vChannels: dr.vChannels,
|
||||
primaryKeys: primaryKeys,
|
||||
}
|
||||
|
||||
if err := dr.queue.Enqueue(task); err != nil {
|
||||
log.Error("Failed to enqueue delete task: " + err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return task, nil
|
||||
}
|
||||
|
||||
// getStreamingQueryAndDelteFunc return query function used by LBPolicy
|
||||
// make sure it concurrent safe
|
||||
func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) executeFunc {
|
||||
return func(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channelIDs ...string) error {
|
||||
var partitionIDs []int64
|
||||
|
||||
// optimize query when partitionKey on
|
||||
if dr.partitionKeyMode {
|
||||
expr, err := ParseExprFromPlan(plan)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
partitionKeys := ParsePartitionKeys(expr)
|
||||
hashedPartitionNames, err := assignPartitionKeys(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), partitionKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
partitionIDs, err = getPartitionIDs(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), hashedPartitionNames)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if dr.partitionID != common.InvalidFieldID {
|
||||
partitionIDs = []int64{dr.partitionID}
|
||||
}
|
||||
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("collectionID", dr.collectionID),
|
||||
zap.Int64s("partitionIDs", partitionIDs),
|
||||
zap.Strings("channels", channelIDs),
|
||||
zap.Int64("nodeID", nodeID))
|
||||
|
||||
// set plan
|
||||
_, outputFieldIDs := translatePkOutputFields(dr.schema)
|
||||
outputFieldIDs = append(outputFieldIDs, common.TimeStampField)
|
||||
plan.OutputFieldIds = outputFieldIDs
|
||||
|
||||
serializedPlan, err := proto.Marshal(plan)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
queryReq := &querypb.QueryRequest{
|
||||
Req: &internalpb.RetrieveRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
|
||||
commonpbutil.WithMsgID(dr.msgID),
|
||||
commonpbutil.WithSourceID(paramtable.GetNodeID()),
|
||||
commonpbutil.WithTargetID(nodeID),
|
||||
),
|
||||
MvccTimestamp: dr.ts,
|
||||
ReqID: paramtable.GetNodeID(),
|
||||
DbID: 0, // TODO
|
||||
CollectionID: dr.collectionID,
|
||||
PartitionIDs: partitionIDs,
|
||||
SerializedExprPlan: serializedPlan,
|
||||
OutputFieldsId: outputFieldIDs,
|
||||
GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dr.ts, dr.ts, dr.req.GetConsistencyLevel()),
|
||||
},
|
||||
DmlChannels: channelIDs,
|
||||
Scope: querypb.DataScope_All,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
log.Debug("start query for delete", zap.Int64("msgID", dr.msgID))
|
||||
client, err := qn.QueryStream(ctx, queryReq)
|
||||
if err != nil {
|
||||
log.Warn("query stream for delete create failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
taskCh := make(chan *deleteTask, 256)
|
||||
go dr.receiveQueryResult(ctx, client, taskCh)
|
||||
// wait all task finish
|
||||
for task := range taskCh {
|
||||
err := task.WaitToFinish()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dr.count.Add(task.count)
|
||||
}
|
||||
|
||||
// query or produce task failed
|
||||
if dr.err != nil {
|
||||
return dr.err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) {
|
||||
defer func() {
|
||||
close(taskCh)
|
||||
}()
|
||||
|
||||
for {
|
||||
result, err := client.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
log.Debug("query stream for delete finished", zap.Int64("msgID", dr.msgID))
|
||||
return
|
||||
}
|
||||
dr.err = err
|
||||
return
|
||||
}
|
||||
|
||||
err = merr.Error(result.GetStatus())
|
||||
if err != nil {
|
||||
dr.err = err
|
||||
log.Warn("query stream for delete get error status", zap.Int64("msgID", dr.msgID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
task, err := dr.produce(ctx, result.GetIds())
|
||||
if err != nil {
|
||||
dr.err = err
|
||||
log.Warn("produce delete task failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
taskCh <- task
|
||||
}
|
||||
}
|
||||
|
||||
func (dr *deleteRunner) complexDelete(ctx context.Context, plan *planpb.PlanNode) error {
|
||||
rc := timerecord.NewTimeRecorder("QueryStreamDelete")
|
||||
var err error
|
||||
|
||||
dr.msgID, err = dr.idAllocator.AllocOne()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dr.ts, err = dr.tsoAllocatorIns.AllocOne(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = dr.lb.Execute(ctx, CollectionWorkLoad{
|
||||
db: dr.req.GetDbName(),
|
||||
collectionName: dr.req.GetCollectionName(),
|
||||
collectionID: dr.collectionID,
|
||||
nq: 1,
|
||||
exec: dr.getStreamingQueryAndDelteFunc(plan),
|
||||
})
|
||||
dr.result.DeleteCnt = dr.count.Load()
|
||||
if err != nil {
|
||||
log.Warn("fail to execute complex delete",
|
||||
zap.Int64("deleteCnt", dr.result.GetDeleteCnt()),
|
||||
zap.Duration("interval", rc.ElapseSpan()),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("complex delete finished", zap.Int64("deleteCnt", dr.result.GetDeleteCnt()), zap.Duration("interval", rc.ElapseSpan()))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dr *deleteRunner) simpleDelete(ctx context.Context, pk *schemapb.IDs, numRow int64) error {
|
||||
log.Debug("get primary keys from expr",
|
||||
zap.Int64("len of primary keys", numRow),
|
||||
zap.Int64("collectionID", dr.collectionID),
|
||||
zap.Int64("partitionID", dr.partitionID))
|
||||
|
||||
task, err := dr.produce(ctx, pk)
|
||||
if err != nil {
|
||||
log.Warn("produce delete task failed")
|
||||
return err
|
||||
}
|
||||
|
||||
err = task.WaitToFinish()
|
||||
if err == nil {
|
||||
dr.result.DeleteCnt = task.count
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func getPrimaryKeysFromPlan(schema *schemapb.CollectionSchema, plan *planpb.PlanNode) (bool, *schemapb.IDs, int64) {
|
||||
// simple delete request need expr with "pk in [a, b]"
|
||||
termExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_TermExpr)
|
||||
if ok {
|
||||
if !termExpr.TermExpr.GetColumnInfo().GetIsPrimaryKey() {
|
||||
return false, nil, 0
|
||||
}
|
||||
|
||||
ids, rowNum, err := getPrimaryKeysFromTermExpr(schema, termExpr)
|
||||
if err != nil {
|
||||
return false, nil, 0
|
||||
}
|
||||
return true, ids, rowNum
|
||||
}
|
||||
|
||||
// simple delete if expr with "pk == a"
|
||||
unaryRangeExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_UnaryRangeExpr)
|
||||
if ok {
|
||||
if unaryRangeExpr.UnaryRangeExpr.GetOp() != planpb.OpType_Equal || !unaryRangeExpr.UnaryRangeExpr.GetColumnInfo().GetIsPrimaryKey() {
|
||||
return false, nil, 0
|
||||
}
|
||||
|
||||
ids, err := getPrimaryKeysFromUnaryRangeExpr(schema, unaryRangeExpr)
|
||||
if err != nil {
|
||||
return false, nil, 0
|
||||
}
|
||||
return true, ids, 1
|
||||
}
|
||||
|
||||
return false, nil, 0
|
||||
}
|
||||
|
||||
func getPrimaryKeysFromUnaryRangeExpr(schema *schemapb.CollectionSchema, unaryRangeExpr *planpb.Expr_UnaryRangeExpr) (res *schemapb.IDs, err error) {
|
||||
res = &schemapb.IDs{}
|
||||
switch unaryRangeExpr.UnaryRangeExpr.GetColumnInfo().GetDataType() {
|
||||
case schemapb.DataType_Int64:
|
||||
res.IdField = &schemapb.IDs_IntId{
|
||||
IntId: &schemapb.LongArray{
|
||||
Data: []int64{unaryRangeExpr.UnaryRangeExpr.GetValue().GetInt64Val()},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_VarChar:
|
||||
res.IdField = &schemapb.IDs_StrId{
|
||||
StrId: &schemapb.StringArray{
|
||||
Data: []string{unaryRangeExpr.UnaryRangeExpr.GetValue().GetStringVal()},
|
||||
},
|
||||
}
|
||||
default:
|
||||
return res, fmt.Errorf("invalid field data type specifyed in simple delete expr")
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *planpb.Expr_TermExpr) (res *schemapb.IDs, rowNum int64, err error) {
|
||||
res = &schemapb.IDs{}
|
||||
rowNum = int64(len(termExpr.TermExpr.Values))
|
||||
switch termExpr.TermExpr.ColumnInfo.GetDataType() {
|
||||
case schemapb.DataType_Int64:
|
||||
ids := make([]int64, 0)
|
||||
for _, v := range termExpr.TermExpr.Values {
|
||||
ids = append(ids, v.GetInt64Val())
|
||||
}
|
||||
res.IdField = &schemapb.IDs_IntId{
|
||||
IntId: &schemapb.LongArray{
|
||||
Data: ids,
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_VarChar:
|
||||
ids := make([]string, 0)
|
||||
for _, v := range termExpr.TermExpr.Values {
|
||||
ids = append(ids, v.GetStringVal())
|
||||
}
|
||||
res.IdField = &schemapb.IDs_StrId{
|
||||
StrId: &schemapb.StringArray{
|
||||
Data: ids,
|
||||
},
|
||||
}
|
||||
default:
|
||||
return res, 0, fmt.Errorf("invalid field data type specifyed in simple delete expr")
|
||||
}
|
||||
|
||||
return res, rowNum, nil
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1671,19 +1671,13 @@ func TestTask_Int64PrimaryKey(t *testing.T) {
|
|||
},
|
||||
idAllocator: idAllocator,
|
||||
ctx: ctx,
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
IDs: nil,
|
||||
SuccIndex: nil,
|
||||
ErrIndex: nil,
|
||||
Acknowledged: false,
|
||||
InsertCnt: 0,
|
||||
DeleteCnt: 0,
|
||||
UpsertCnt: 0,
|
||||
Timestamp: 0,
|
||||
primaryKeys: &schemapb.IDs{
|
||||
IdField: &schemapb.IDs_IntId{IntId: &schemapb.LongArray{Data: []int64{0, 1}}},
|
||||
},
|
||||
chMgr: chMgr,
|
||||
chTicker: ticker,
|
||||
chMgr: chMgr,
|
||||
chTicker: ticker,
|
||||
collectionID: collectionID,
|
||||
vChannels: []string{"test-ch"},
|
||||
}
|
||||
|
||||
assert.NoError(t, task.OnEnqueue())
|
||||
|
@ -1703,51 +1697,6 @@ func TestTask_Int64PrimaryKey(t *testing.T) {
|
|||
assert.NoError(t, task.Execute(ctx))
|
||||
assert.NoError(t, task.PostExecute(ctx))
|
||||
})
|
||||
|
||||
t.Run("complex delete", func(t *testing.T) {
|
||||
lb := NewMockLBPolicy(t)
|
||||
task := &deleteTask{
|
||||
Condition: NewTaskCondition(ctx),
|
||||
lb: lb,
|
||||
req: &milvuspb.DeleteRequest{
|
||||
CollectionName: collectionName,
|
||||
PartitionName: partitionName,
|
||||
Expr: "int64 < 2",
|
||||
},
|
||||
idAllocator: idAllocator,
|
||||
ctx: ctx,
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
IDs: nil,
|
||||
SuccIndex: nil,
|
||||
ErrIndex: nil,
|
||||
Acknowledged: false,
|
||||
InsertCnt: 0,
|
||||
DeleteCnt: 0,
|
||||
UpsertCnt: 0,
|
||||
Timestamp: 0,
|
||||
},
|
||||
chMgr: chMgr,
|
||||
chTicker: ticker,
|
||||
}
|
||||
lb.EXPECT().Execute(mock.Anything, mock.Anything).Return(nil)
|
||||
assert.NoError(t, task.OnEnqueue())
|
||||
assert.NotNil(t, task.TraceCtx())
|
||||
|
||||
id := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
|
||||
task.SetID(id)
|
||||
assert.Equal(t, id, task.ID())
|
||||
assert.Equal(t, commonpb.MsgType_Delete, task.Type())
|
||||
|
||||
ts := Timestamp(time.Now().UnixNano())
|
||||
task.SetTs(ts)
|
||||
assert.Equal(t, ts, task.BeginTs())
|
||||
assert.Equal(t, ts, task.EndTs())
|
||||
|
||||
assert.NoError(t, task.PreExecute(ctx))
|
||||
assert.NoError(t, task.Execute(ctx))
|
||||
assert.NoError(t, task.PostExecute(ctx))
|
||||
})
|
||||
}
|
||||
|
||||
func TestTask_VarCharPrimaryKey(t *testing.T) {
|
||||
|
@ -2003,19 +1952,13 @@ func TestTask_VarCharPrimaryKey(t *testing.T) {
|
|||
},
|
||||
idAllocator: idAllocator,
|
||||
ctx: ctx,
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
IDs: nil,
|
||||
SuccIndex: nil,
|
||||
ErrIndex: nil,
|
||||
Acknowledged: false,
|
||||
InsertCnt: 0,
|
||||
DeleteCnt: 0,
|
||||
UpsertCnt: 0,
|
||||
Timestamp: 0,
|
||||
chMgr: chMgr,
|
||||
chTicker: ticker,
|
||||
vChannels: []string{"test-channel"},
|
||||
primaryKeys: &schemapb.IDs{
|
||||
IdField: &schemapb.IDs_StrId{StrId: &schemapb.StringArray{Data: []string{"milvus", "test"}}},
|
||||
},
|
||||
chMgr: chMgr,
|
||||
chTicker: ticker,
|
||||
collectionID: collectionID,
|
||||
}
|
||||
|
||||
assert.NoError(t, task.OnEnqueue())
|
||||
|
@ -3432,24 +3375,15 @@ func TestPartitionKey(t *testing.T) {
|
|||
Expr: "int64_field in [0, 1]",
|
||||
},
|
||||
ctx: ctx,
|
||||
result: &milvuspb.MutationResult{
|
||||
Status: merr.Success(),
|
||||
IDs: nil,
|
||||
SuccIndex: nil,
|
||||
ErrIndex: nil,
|
||||
Acknowledged: false,
|
||||
InsertCnt: 0,
|
||||
DeleteCnt: 0,
|
||||
UpsertCnt: 0,
|
||||
Timestamp: 0,
|
||||
primaryKeys: &schemapb.IDs{
|
||||
IdField: &schemapb.IDs_IntId{IntId: &schemapb.LongArray{Data: []int64{0, 1}}},
|
||||
},
|
||||
idAllocator: idAllocator,
|
||||
chMgr: chMgr,
|
||||
chTicker: ticker,
|
||||
idAllocator: idAllocator,
|
||||
chMgr: chMgr,
|
||||
chTicker: ticker,
|
||||
collectionID: collectionID,
|
||||
vChannels: []string{"test-channel"},
|
||||
}
|
||||
// don't support specify partition name if use partition key
|
||||
dt.req.PartitionName = partitionNames[0]
|
||||
assert.Error(t, dt.PreExecute(ctx))
|
||||
|
||||
dt.req.PartitionName = ""
|
||||
assert.NoError(t, dt.PreExecute(ctx))
|
||||
|
|
|
@ -220,7 +220,6 @@ func validatePartitionTag(partitionTag string, strictCheck bool) error {
|
|||
msg := invalidMsg + "Partition name should not be empty."
|
||||
return errors.New(msg)
|
||||
}
|
||||
|
||||
if len(partitionTag) > Params.ProxyCfg.MaxNameLength.GetAsInt() {
|
||||
msg := invalidMsg + "The length of a partition name must be less than " + Params.ProxyCfg.MaxNameLength.GetValue() + " characters."
|
||||
return errors.New(msg)
|
||||
|
@ -1367,6 +1366,15 @@ func isPartitionKeyMode(ctx context.Context, dbName string, colName string) (boo
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func hasParitionKeyModeField(schema *schemapb.CollectionSchema) bool {
|
||||
for _, fieldSchema := range schema.GetFields() {
|
||||
if fieldSchema.IsPartitionKey {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// getDefaultPartitionNames only used in partition key mode
|
||||
func getDefaultPartitionsInPartitionKeyMode(ctx context.Context, dbName string, collectionName string) ([]string, error) {
|
||||
partitions, err := globalMetaCache.GetPartitions(ctx, dbName, collectionName)
|
||||
|
|
|
@ -101,12 +101,14 @@ func retrieveOnSegmentsWithStream(ctx context.Context, segments []Segment, segTy
|
|||
return
|
||||
}
|
||||
|
||||
if err = svr.Send(&internalpb.RetrieveResults{
|
||||
Status: merr.Success(),
|
||||
Ids: result.GetIds(),
|
||||
FieldsData: result.GetFieldsData(),
|
||||
}); err != nil {
|
||||
errs[i] = err
|
||||
if len(result.GetOffset()) != 0 {
|
||||
if err = svr.Send(&internalpb.RetrieveResults{
|
||||
Status: merr.Success(),
|
||||
Ids: result.GetIds(),
|
||||
FieldsData: result.GetFieldsData(),
|
||||
}); err != nil {
|
||||
errs[i] = err
|
||||
}
|
||||
}
|
||||
|
||||
errs[i] = nil
|
||||
|
|
|
@ -14,7 +14,7 @@ require (
|
|||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
github.com/klauspost/compress v1.16.5
|
||||
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231220103033-abd0d12ba669
|
||||
github.com/nats-io/nats-server/v2 v2.9.17
|
||||
github.com/nats-io/nats.go v1.24.0
|
||||
github.com/panjf2000/ants/v2 v2.7.2
|
||||
|
|
|
@ -479,8 +479,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
|
|||
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
|
||||
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c h1:Wbc2IZt/13+B5jc8JPU/dOxGYy+1jeOsChVgcza+qgw=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231220103033-abd0d12ba669 h1:yUtc+pVKVhmmnwTY9iyV8+EmhrNjZ74Hxm3y5QKCNyg=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231220103033-abd0d12ba669/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
|
||||
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
|
||||
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
|
||||
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
|
||||
|
|
Loading…
Reference in New Issue