From 75716e0deb3245a7552703b9add40b849da095bf Mon Sep 17 00:00:00 2001 From: Xiangyu Wang Date: Sat, 9 Oct 2021 22:50:39 +0800 Subject: [PATCH] Update partition name validation in DeleteTask (#9560) Signed-off-by: Xiangyu Wang --- internal/proxy/impl.go | 31 ++++++++++++++----------------- internal/proxy/task.go | 9 ++++++--- internal/proxy/task_scheduler.go | 12 ++++++++++++ internal/proxy/task_test.go | 2 +- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 2bd43e4774..bb99cd3ab5 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1353,6 +1353,12 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) } func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) { + sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete") + defer sp.Finish() + traceID, _, _ := trace.InfoFromSpan(sp) + log.Info("Delete request begin", zap.String("traceID", traceID)) + defer log.Info("Delete request end", zap.String("traceID", traceID)) + if !node.checkHealthy() { return &milvuspb.MutationResult{ Status: unhealthyStatus(), @@ -1365,14 +1371,16 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) DeleteRequest: request, } - log.Debug("Delete enqueue", + log.Debug("Delete request enqueue", zap.String("role", Params.RoleName), zap.String("db", request.DbName), zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName), zap.String("expr", request.Expr)) - err := node.sched.dmQueue.Enqueue(dt) - if err != nil { + + // MsgID will be set by Enqueue() + if err := node.sched.dmQueue.Enqueue(dt); err != nil { + log.Error("Failed to enqueue delete task: "+err.Error(), zap.String("traceID", traceID)) return &milvuspb.MutationResult{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1381,7 +1389,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) }, nil } - log.Debug("Delete", + log.Debug("Delete request detail", zap.String("role", Params.RoleName), zap.Int64("msgID", dt.Base.MsgID), zap.Uint64("timestamp", dt.Base.Timestamp), @@ -1389,20 +1397,9 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName), zap.String("expr", request.Expr)) - defer func() { - log.Debug("Delete Done", - zap.Error(err), - zap.String("role", Params.RoleName), - zap.Int64("msgID", dt.Base.MsgID), - zap.Uint64("timestamp", dt.Base.Timestamp), - zap.String("db", request.DbName), - zap.String("collection", request.CollectionName), - zap.String("partition", request.PartitionName), - zap.String("expr", request.Expr)) - }() - err = dt.WaitToFinish() - if err != nil { + if err := dt.WaitToFinish(); err != nil { + log.Error("Failed to execute delete task in task scheduler: "+err.Error(), zap.String("traceID", traceID)) return &milvuspb.MutationResult{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, diff --git a/internal/proxy/task.go b/internal/proxy/task.go index d0d345d954..e0a4e07bf7 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -4642,9 +4642,12 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error { return err } - partitionTag := dt.PartitionName - if err := ValidatePartitionTag(partitionTag, true); err != nil { - return err + partitionName := dt.PartitionName + // partitionName is accepted, means delete from whole collection + if partitionName != "" { + if err := ValidatePartitionTag(partitionName, true); err != nil { + return err + } } return nil diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index fde0de8fab..d6ef329662 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -425,6 +425,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) { "ID": t.ID(), }) defer span.Finish() + traceID, _, _ := trace.InfoFromSpan(span) span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID())) q.AddActiveTask(t) @@ -442,6 +443,8 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) { }() if err != nil { trace.LogError(span, err) + log.Error("Failed to pre-execute task: "+err.Error(), + zap.String("traceID", traceID)) return } @@ -449,11 +452,20 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) { err = t.Execute(ctx) if err != nil { trace.LogError(span, err) + log.Error("Failed to execute task: "+err.Error(), + zap.String("traceID", traceID)) return } span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID())) err = t.PostExecute(ctx) + + if err != nil { + trace.LogError(span, err) + log.Error("Failed to post-execute task: "+err.Error(), + zap.String("traceID", traceID)) + return + } } func (sched *taskScheduler) definitionLoop() { diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 149fb72b05..05e9dff3b3 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -3381,7 +3381,7 @@ func TestDeleteTask_PreExecute(t *testing.T) { task.DeleteRequest.CollectionName = collectionName task.DeleteRequest.PartitionName = "" // empty - assert.Error(t, task.PreExecute(ctx)) + assert.NoError(t, task.PreExecute(ctx)) task.DeleteRequest.PartitionName = partitionName }