From bd3a8ed3cf029dc455f2f73e479b84d8cb13355e Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Mon, 11 Oct 2021 07:48:55 +0800 Subject: [PATCH] Support delete in proxy (#9588) Signed-off-by: yudong.cai --- internal/proxy/impl.go | 23 +- internal/proxy/task.go | 151 +++++++++- internal/proxy/task_test.go | 532 +++++++++++++++++------------------- 3 files changed, 411 insertions(+), 295 deletions(-) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index bb99cd3ab5..2f9f0312fe 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1365,10 +1365,27 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) }, nil } + deleteReq := &milvuspb.DeleteRequest{ + DbName: request.DbName, + CollectionName: request.CollectionName, + PartitionName: request.PartitionName, + Expr: request.Expr, + } + dt := &deleteTask{ - ctx: ctx, - Condition: NewTaskCondition(ctx), - DeleteRequest: request, + ctx: ctx, + Condition: NewTaskCondition(ctx), + req: deleteReq, + DeleteRequest: &internalpb.DeleteRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Delete, + SourceID: Params.ProxyID, + }, + CollectionName: request.CollectionName, + PartitionName: request.PartitionName, + }, + chMgr: node.chMgr, + chTicker: node.chTicker, } log.Debug("Delete request enqueue", diff --git a/internal/proxy/task.go b/internal/proxy/task.go index e0a4e07bf7..22c5a4f34a 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -4591,9 +4591,14 @@ func (rpt *releasePartitionsTask) PostExecute(ctx context.Context) error { type deleteTask struct { Condition - *milvuspb.DeleteRequest - ctx context.Context - result *milvuspb.MutationResult + *internalpb.DeleteRequest + ctx context.Context + req *milvuspb.DeleteRequest + result *milvuspb.MutationResult + chMgr channelsMgr + chTicker channelsTimeTicker + vChannels []vChan + pChannels []pChan } func (dt *deleteTask) TraceCtx() context.Context { @@ -4629,31 +4634,155 @@ func (dt *deleteTask) SetTs(ts Timestamp) { } func (dt *deleteTask) OnEnqueue() error { - dt.Base = &commonpb.MsgBase{} + dt.DeleteRequest.Base = &commonpb.MsgBase{} return nil } +func getPrimaryKeysFromExpr(schema *schemapb.CollectionSchema, expr string) (res []int64, err error) { + if len(expr) == 0 { + log.Warn("empty expr") + return + } + + plan, err := CreateExprPlan(schema, expr) + if err != nil { + return res, fmt.Errorf("failed to create expr plan, expr = %s", expr) + } + + // delete request only support expr "id in [a, b]" + termExpr, ok := plan.Node.(*planpb.PlanNode_Predicates).Predicates.Expr.(*planpb.Expr_TermExpr) + if !ok { + return res, fmt.Errorf("invalid plan node type") + } + + for _, v := range termExpr.TermExpr.Values { + res = append(res, v.GetInt64Val()) + } + + return res, nil +} + func (dt *deleteTask) PreExecute(ctx context.Context) error { dt.Base.MsgType = commonpb.MsgType_Delete dt.Base.SourceID = Params.ProxyID - collName := dt.CollectionName + dt.result = &milvuspb.MutationResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + IDs: &schemapb.IDs{ + IdField: nil, + }, + Timestamp: dt.BeginTs(), + } + + collName := dt.req.CollectionName if err := ValidateCollectionName(collName); err != nil { + log.Error("Invalid collection name", zap.String("collectionName", collName)) + return err + } + collID, err := globalMetaCache.GetCollectionID(ctx, collName) + if err != nil { + log.Debug("Failed to get collection id", zap.String("collectionName", collName)) + return err + } + dt.DeleteRequest.CollectionID = collID + + if len(dt.req.PartitionName) > 0 { + partName := dt.req.PartitionName + if err := ValidatePartitionTag(partName, true); err != nil { + log.Error("Invalid partition name", zap.String("partitionName", partName)) + return err + } + partID, err := globalMetaCache.GetPartitionID(ctx, collName, partName) + if err != nil { + log.Debug("Failed to get partition id", zap.String("collectionName", collName), zap.String("partitionName", partName)) + return err + } + dt.DeleteRequest.PartitionID = partID + } + + schema, err := globalMetaCache.GetCollectionSchema(ctx, dt.req.CollectionName) + if err != nil { + log.Error("Failed to get collection schema", zap.String("collectionName", dt.req.CollectionName)) return err } - partitionName := dt.PartitionName - // partitionName is accepted, means delete from whole collection - if partitionName != "" { - if err := ValidatePartitionTag(partitionName, true); err != nil { - return err - } + primaryKeys, err := getPrimaryKeysFromExpr(schema, dt.req.Expr) + if err != nil { + log.Error("Failed to get primary keys from expr", zap.Error(err)) + return err } + log.Debug("get primary keys from expr", zap.Any("primary keys", dt.DeleteRequest.PrimaryKeys)) + dt.DeleteRequest.PrimaryKeys = primaryKeys + + // set result + dt.result.IDs.IdField = &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: primaryKeys, + }, + } + dt.result.DeleteCnt = int64(len(primaryKeys)) + + dt.DeleteRequest.Timestamp = dt.BeginTs() return nil } func (dt *deleteTask) Execute(ctx context.Context) (err error) { + sp, ctx := trace.StartSpanFromContextWithOperationName(dt.ctx, "Proxy-Delete-Execute") + defer sp.Finish() + + var tsMsg msgstream.TsMsg = &msgstream.DeleteMsg{ + DeleteRequest: *dt.DeleteRequest, + BaseMsg: msgstream.BaseMsg{ + Ctx: ctx, + HashValues: []uint32{uint32(Params.ProxyID)}, + BeginTimestamp: dt.BeginTs(), + EndTimestamp: dt.EndTs(), + }, + } + msgPack := msgstream.MsgPack{ + BeginTs: dt.BeginTs(), + EndTs: dt.EndTs(), + Msgs: make([]msgstream.TsMsg, 1), + } + msgPack.Msgs[0] = tsMsg + + //collID := dt.DeleteRequest.CollectionID + //stream, err := dt.chMgr.getDMLStream(collID) + //if err != nil { + // err = dt.chMgr.createDMLMsgStream(collID) + // if err != nil { + // dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + // dt.result.Status.Reason = err.Error() + // return err + // } + // channels, err := dt.chMgr.getChannels(collID) + // if err == nil { + // for _, pchan := range channels { + // err := dt.chTicker.addPChan(pchan) + // if err != nil { + // log.Warn("failed to add pchan to channels time ticker", + // zap.Error(err), + // zap.String("pchan", pchan)) + // } + // } + // } + // stream, err = dt.chMgr.getDMLStream(collID) + // if err != nil { + // dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + // dt.result.Status.Reason = err.Error() + // return err + // } + //} + // + //err = stream.Produce(&msgPack) + //if err != nil { + // dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + // dt.result.Status.Reason = err.Error() + // return err + //} return nil } diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 05e9dff3b3..a092dd1b05 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -3020,7 +3020,7 @@ func TestQueryTask_all(t *testing.T) { wg.Wait() } -func TestInsertTask_all(t *testing.T) { +func TestTask_all(t *testing.T) { var err error Params.Init() @@ -3036,7 +3036,7 @@ func TestInsertTask_all(t *testing.T) { assert.NoError(t, err) shardsNum := int32(2) - prefix := "TestQueryTask_all" + prefix := "TestTask_all" dbName := "" collectionName := prefix + funcutil.GenRandomStr() partitionName := prefix + funcutil.GenRandomStr() @@ -3051,42 +3051,44 @@ func TestInsertTask_all(t *testing.T) { dim := 128 nb := 10 - schema := constructCollectionSchemaWithAllType( - boolField, int32Field, int64Field, floatField, doubleField, - floatVecField, binaryVecField, dim, collectionName) - marshaledSchema, err := proto.Marshal(schema) - assert.NoError(t, err) + t.Run("create collection", func(t *testing.T) { + schema := constructCollectionSchemaWithAllType( + boolField, int32Field, int64Field, floatField, doubleField, + floatVecField, binaryVecField, dim, collectionName) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) - createColT := &createCollectionTask{ - Condition: NewTaskCondition(ctx), - CreateCollectionRequest: &milvuspb.CreateCollectionRequest{ - Base: nil, + createColT := &createCollectionTask{ + Condition: NewTaskCondition(ctx), + CreateCollectionRequest: &milvuspb.CreateCollectionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: shardsNum, + }, + ctx: ctx, + rootCoord: rc, + result: nil, + schema: nil, + } + + assert.NoError(t, createColT.OnEnqueue()) + assert.NoError(t, createColT.PreExecute(ctx)) + assert.NoError(t, createColT.Execute(ctx)) + assert.NoError(t, createColT.PostExecute(ctx)) + + _, _ = rc.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_CreatePartition, + MsgID: 0, + Timestamp: 0, + SourceID: Params.ProxyID, + }, DbName: dbName, CollectionName: collectionName, - Schema: marshaledSchema, - ShardsNum: shardsNum, - }, - ctx: ctx, - rootCoord: rc, - result: nil, - schema: nil, - } - - assert.NoError(t, createColT.OnEnqueue()) - assert.NoError(t, createColT.PreExecute(ctx)) - assert.NoError(t, createColT.Execute(ctx)) - assert.NoError(t, createColT.PostExecute(ctx)) - - _, _ = rc.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_CreatePartition, - MsgID: 0, - Timestamp: 0, - SourceID: Params.ProxyID, - }, - DbName: dbName, - CollectionName: collectionName, - PartitionName: partitionName, + PartitionName: partitionName, + }) }) collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) @@ -3122,267 +3124,235 @@ func TestInsertTask_all(t *testing.T) { _ = segAllocator.Start() defer segAllocator.Close() - hash := generateHashKeys(nb) - task := &insertTask{ - BaseInsertTask: BaseInsertTask{ - BaseMsg: msgstream.BaseMsg{ - HashValues: hash, + t.Run("insert", func(t *testing.T) { + hash := generateHashKeys(nb) + task := &insertTask{ + BaseInsertTask: BaseInsertTask{ + BaseMsg: msgstream.BaseMsg{ + HashValues: hash, + }, + InsertRequest: internalpb.InsertRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Insert, + MsgID: 0, + }, + CollectionName: collectionName, + PartitionName: partitionName, + }, }, - InsertRequest: internalpb.InsertRequest{ + req: &milvuspb.InsertRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Insert, - MsgID: 0, + MsgType: commonpb.MsgType_Insert, + MsgID: 0, + Timestamp: 0, + SourceID: Params.ProxyID, + }, + DbName: dbName, + CollectionName: collectionName, + PartitionName: partitionName, + FieldsData: make([]*schemapb.FieldData, fieldsLen), + HashKeys: hash, + NumRows: uint32(nb), + }, + Condition: NewTaskCondition(ctx), + ctx: ctx, + result: &milvuspb.MutationResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + IDs: nil, + SuccIndex: nil, + ErrIndex: nil, + Acknowledged: false, + InsertCnt: 0, + DeleteCnt: 0, + UpsertCnt: 0, + Timestamp: 0, + }, + rowIDAllocator: idAllocator, + segIDAssigner: segAllocator, + chMgr: chMgr, + chTicker: ticker, + vChannels: nil, + pChannels: nil, + schema: nil, + } + + task.req.FieldsData[0] = &schemapb.FieldData{ + Type: schemapb.DataType_Bool, + FieldName: boolField, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BoolData{ + BoolData: &schemapb.BoolArray{ + Data: generateBoolArray(nb), + }, + }, + }, + }, + FieldId: common.StartOfUserFieldID + 0, + } + + task.req.FieldsData[1] = &schemapb.FieldData{ + Type: schemapb.DataType_Int32, + FieldName: int32Field, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: generateInt32Array(nb), + }, + }, + }, + }, + FieldId: common.StartOfUserFieldID + 1, + } + + task.req.FieldsData[2] = &schemapb.FieldData{ + Type: schemapb.DataType_Int64, + FieldName: int64Field, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: generateInt64Array(nb), + }, + }, + }, + }, + FieldId: common.StartOfUserFieldID + 2, + } + + task.req.FieldsData[3] = &schemapb.FieldData{ + Type: schemapb.DataType_Float, + FieldName: floatField, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_FloatData{ + FloatData: &schemapb.FloatArray{ + Data: generateFloat32Array(nb), + }, + }, + }, + }, + FieldId: common.StartOfUserFieldID + 3, + } + + task.req.FieldsData[4] = &schemapb.FieldData{ + Type: schemapb.DataType_Double, + FieldName: doubleField, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_DoubleData{ + DoubleData: &schemapb.DoubleArray{ + Data: generateFloat64Array(nb), + }, + }, + }, + }, + FieldId: common.StartOfUserFieldID + 4, + } + + task.req.FieldsData[5] = &schemapb.FieldData{ + Type: schemapb.DataType_FloatVector, + FieldName: doubleField, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: int64(dim), + Data: &schemapb.VectorField_FloatVector{ + FloatVector: &schemapb.FloatArray{ + Data: generateFloatVectors(nb, dim), + }, + }, + }, + }, + FieldId: common.StartOfUserFieldID + 5, + } + + task.req.FieldsData[6] = &schemapb.FieldData{ + Type: schemapb.DataType_BinaryVector, + FieldName: doubleField, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: int64(dim), + Data: &schemapb.VectorField_BinaryVector{ + BinaryVector: generateBinaryVectors(nb, dim), + }, + }, + }, + FieldId: common.StartOfUserFieldID + 6, + } + + assert.NoError(t, task.OnEnqueue()) + assert.NoError(t, task.PreExecute(ctx)) + assert.NoError(t, task.Execute(ctx)) + assert.NoError(t, task.PostExecute(ctx)) + }) + + t.Run("delete", func(t *testing.T) { + task := &deleteTask{ + Condition: NewTaskCondition(ctx), + DeleteRequest: &internalpb.DeleteRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Delete, + MsgID: 0, + Timestamp: 0, + SourceID: Params.ProxyID, }, CollectionName: collectionName, PartitionName: partitionName, }, - }, - req: &milvuspb.InsertRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Insert, - MsgID: 0, - Timestamp: 0, - SourceID: Params.ProxyID, - }, - DbName: dbName, - CollectionName: collectionName, - PartitionName: partitionName, - FieldsData: make([]*schemapb.FieldData, fieldsLen), - HashKeys: hash, - NumRows: uint32(nb), - }, - Condition: NewTaskCondition(ctx), - ctx: ctx, - result: &milvuspb.MutationResult{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - IDs: nil, - SuccIndex: nil, - ErrIndex: nil, - Acknowledged: false, - InsertCnt: 0, - DeleteCnt: 0, - UpsertCnt: 0, - Timestamp: 0, - }, - rowIDAllocator: idAllocator, - segIDAssigner: segAllocator, - chMgr: chMgr, - chTicker: ticker, - vChannels: nil, - pChannels: nil, - schema: nil, - } - - task.req.FieldsData[0] = &schemapb.FieldData{ - Type: schemapb.DataType_Bool, - FieldName: boolField, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_BoolData{ - BoolData: &schemapb.BoolArray{ - Data: generateBoolArray(nb), - }, + req: &milvuspb.DeleteRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Delete, + MsgID: 0, + Timestamp: 0, + SourceID: Params.ProxyID, }, + DbName: dbName, + CollectionName: collectionName, + PartitionName: partitionName, + Expr: "int64 in [0, 1]", }, - }, - FieldId: common.StartOfUserFieldID + 0, - } - - task.req.FieldsData[1] = &schemapb.FieldData{ - Type: schemapb.DataType_Int32, - FieldName: int32Field, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: generateInt32Array(nb), - }, + ctx: ctx, + result: &milvuspb.MutationResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", }, + IDs: nil, + SuccIndex: nil, + ErrIndex: nil, + Acknowledged: false, + InsertCnt: 0, + DeleteCnt: 0, + UpsertCnt: 0, + Timestamp: 0, }, - }, - FieldId: common.StartOfUserFieldID + 1, - } + chMgr: chMgr, + chTicker: ticker, + } - task.req.FieldsData[2] = &schemapb.FieldData{ - Type: schemapb.DataType_Int64, - FieldName: int64Field, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_LongData{ - LongData: &schemapb.LongArray{ - Data: generateInt64Array(nb), - }, - }, - }, - }, - FieldId: common.StartOfUserFieldID + 2, - } + assert.NoError(t, task.OnEnqueue()) + assert.NotNil(t, task.TraceCtx()) - task.req.FieldsData[3] = &schemapb.FieldData{ - Type: schemapb.DataType_Float, - FieldName: floatField, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_FloatData{ - FloatData: &schemapb.FloatArray{ - Data: generateFloat32Array(nb), - }, - }, - }, - }, - FieldId: common.StartOfUserFieldID + 3, - } + id := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) + task.SetID(id) + assert.Equal(t, id, task.ID()) - task.req.FieldsData[4] = &schemapb.FieldData{ - Type: schemapb.DataType_Double, - FieldName: doubleField, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_DoubleData{ - DoubleData: &schemapb.DoubleArray{ - Data: generateFloat64Array(nb), - }, - }, - }, - }, - FieldId: common.StartOfUserFieldID + 4, - } + task.Base.MsgType = commonpb.MsgType_Delete + assert.Equal(t, commonpb.MsgType_Delete, task.Type()) - task.req.FieldsData[5] = &schemapb.FieldData{ - Type: schemapb.DataType_FloatVector, - FieldName: doubleField, - Field: &schemapb.FieldData_Vectors{ - Vectors: &schemapb.VectorField{ - Dim: int64(dim), - Data: &schemapb.VectorField_FloatVector{ - FloatVector: &schemapb.FloatArray{ - Data: generateFloatVectors(nb, dim), - }, - }, - }, - }, - FieldId: common.StartOfUserFieldID + 5, - } + ts := Timestamp(time.Now().UnixNano()) + task.SetTs(ts) + assert.Equal(t, ts, task.BeginTs()) + assert.Equal(t, ts, task.EndTs()) - task.req.FieldsData[6] = &schemapb.FieldData{ - Type: schemapb.DataType_BinaryVector, - FieldName: doubleField, - Field: &schemapb.FieldData_Vectors{ - Vectors: &schemapb.VectorField{ - Dim: int64(dim), - Data: &schemapb.VectorField_BinaryVector{ - BinaryVector: generateBinaryVectors(nb, dim), - }, - }, - }, - FieldId: common.StartOfUserFieldID + 6, - } - - assert.NoError(t, task.OnEnqueue()) - assert.NoError(t, task.PreExecute(ctx)) - assert.NoError(t, task.Execute(ctx)) - assert.NoError(t, task.PostExecute(ctx)) -} - -func TestDeleteTask_all(t *testing.T) { - Params.Init() - - ctx := context.Background() - - prefix := "TestDeleteTask_all" - dbName := "" - collectionName := prefix + funcutil.GenRandomStr() - partitionName := prefix + funcutil.GenRandomStr() - - task := &deleteTask{ - Condition: NewTaskCondition(ctx), - DeleteRequest: &milvuspb.DeleteRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Delete, - MsgID: 0, - Timestamp: 0, - SourceID: 0, - }, - DbName: dbName, - CollectionName: collectionName, - PartitionName: partitionName, - Expr: "", - }, - ctx: ctx, - result: &milvuspb.MutationResult{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - IDs: nil, - SuccIndex: nil, - ErrIndex: nil, - Acknowledged: false, - InsertCnt: 0, - DeleteCnt: 0, - UpsertCnt: 0, - Timestamp: 0, - }, - } - - assert.NoError(t, task.OnEnqueue()) - - assert.NotNil(t, task.TraceCtx()) - - id := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) - task.SetID(id) - assert.Equal(t, id, task.ID()) - - task.Base.MsgType = commonpb.MsgType_Delete - assert.Equal(t, commonpb.MsgType_Delete, task.Type()) - - ts := Timestamp(time.Now().UnixNano()) - task.SetTs(ts) - assert.Equal(t, ts, task.BeginTs()) - assert.Equal(t, ts, task.EndTs()) - - assert.NoError(t, task.PreExecute(ctx)) - assert.NoError(t, task.Execute(ctx)) - assert.NoError(t, task.PostExecute(ctx)) -} - -func TestDeleteTask_PreExecute(t *testing.T) { - Params.Init() - - ctx := context.Background() - - prefix := "TestDeleteTask_all" - dbName := "" - collectionName := prefix + funcutil.GenRandomStr() - partitionName := prefix + funcutil.GenRandomStr() - - task := &deleteTask{ - DeleteRequest: &milvuspb.DeleteRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Delete, - MsgID: 0, - Timestamp: 0, - SourceID: 0, - }, - DbName: dbName, - CollectionName: collectionName, - PartitionName: partitionName, - Expr: "", - }, - } - - assert.NoError(t, task.PreExecute(ctx)) - - task.DeleteRequest.CollectionName = "" // empty - assert.Error(t, task.PreExecute(ctx)) - task.DeleteRequest.CollectionName = collectionName - - task.DeleteRequest.PartitionName = "" // empty - assert.NoError(t, task.PreExecute(ctx)) - task.DeleteRequest.PartitionName = partitionName + assert.NoError(t, task.PreExecute(ctx)) + assert.NoError(t, task.Execute(ctx)) + assert.NoError(t, task.PostExecute(ctx)) + }) } func TestCreateAlias_all(t *testing.T) {