mirror of https://github.com/milvus-io/milvus.git
Update partition name validation in DeleteTask (#9560)
Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>pull/9560/merge
parent
5f17f8200b
commit
75716e0deb
|
@ -1353,6 +1353,12 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
|
|||
}
|
||||
|
||||
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
|
||||
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
|
||||
defer sp.Finish()
|
||||
traceID, _, _ := trace.InfoFromSpan(sp)
|
||||
log.Info("Delete request begin", zap.String("traceID", traceID))
|
||||
defer log.Info("Delete request end", zap.String("traceID", traceID))
|
||||
|
||||
if !node.checkHealthy() {
|
||||
return &milvuspb.MutationResult{
|
||||
Status: unhealthyStatus(),
|
||||
|
@ -1365,14 +1371,16 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
|||
DeleteRequest: request,
|
||||
}
|
||||
|
||||
log.Debug("Delete enqueue",
|
||||
log.Debug("Delete request enqueue",
|
||||
zap.String("role", Params.RoleName),
|
||||
zap.String("db", request.DbName),
|
||||
zap.String("collection", request.CollectionName),
|
||||
zap.String("partition", request.PartitionName),
|
||||
zap.String("expr", request.Expr))
|
||||
err := node.sched.dmQueue.Enqueue(dt)
|
||||
if err != nil {
|
||||
|
||||
// MsgID will be set by Enqueue()
|
||||
if err := node.sched.dmQueue.Enqueue(dt); err != nil {
|
||||
log.Error("Failed to enqueue delete task: "+err.Error(), zap.String("traceID", traceID))
|
||||
return &milvuspb.MutationResult{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -1381,7 +1389,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
|||
}, nil
|
||||
}
|
||||
|
||||
log.Debug("Delete",
|
||||
log.Debug("Delete request detail",
|
||||
zap.String("role", Params.RoleName),
|
||||
zap.Int64("msgID", dt.Base.MsgID),
|
||||
zap.Uint64("timestamp", dt.Base.Timestamp),
|
||||
|
@ -1389,20 +1397,9 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
|||
zap.String("collection", request.CollectionName),
|
||||
zap.String("partition", request.PartitionName),
|
||||
zap.String("expr", request.Expr))
|
||||
defer func() {
|
||||
log.Debug("Delete Done",
|
||||
zap.Error(err),
|
||||
zap.String("role", Params.RoleName),
|
||||
zap.Int64("msgID", dt.Base.MsgID),
|
||||
zap.Uint64("timestamp", dt.Base.Timestamp),
|
||||
zap.String("db", request.DbName),
|
||||
zap.String("collection", request.CollectionName),
|
||||
zap.String("partition", request.PartitionName),
|
||||
zap.String("expr", request.Expr))
|
||||
}()
|
||||
|
||||
err = dt.WaitToFinish()
|
||||
if err != nil {
|
||||
if err := dt.WaitToFinish(); err != nil {
|
||||
log.Error("Failed to execute delete task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
|
||||
return &milvuspb.MutationResult{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
|
|
@ -4642,9 +4642,12 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
partitionTag := dt.PartitionName
|
||||
if err := ValidatePartitionTag(partitionTag, true); err != nil {
|
||||
return err
|
||||
partitionName := dt.PartitionName
|
||||
// partitionName is accepted, means delete from whole collection
|
||||
if partitionName != "" {
|
||||
if err := ValidatePartitionTag(partitionName, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -425,6 +425,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
|
|||
"ID": t.ID(),
|
||||
})
|
||||
defer span.Finish()
|
||||
traceID, _, _ := trace.InfoFromSpan(span)
|
||||
|
||||
span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID()))
|
||||
q.AddActiveTask(t)
|
||||
|
@ -442,6 +443,8 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
|
|||
}()
|
||||
if err != nil {
|
||||
trace.LogError(span, err)
|
||||
log.Error("Failed to pre-execute task: "+err.Error(),
|
||||
zap.String("traceID", traceID))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -449,11 +452,20 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
|
|||
err = t.Execute(ctx)
|
||||
if err != nil {
|
||||
trace.LogError(span, err)
|
||||
log.Error("Failed to execute task: "+err.Error(),
|
||||
zap.String("traceID", traceID))
|
||||
return
|
||||
}
|
||||
|
||||
span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID()))
|
||||
err = t.PostExecute(ctx)
|
||||
|
||||
if err != nil {
|
||||
trace.LogError(span, err)
|
||||
log.Error("Failed to post-execute task: "+err.Error(),
|
||||
zap.String("traceID", traceID))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (sched *taskScheduler) definitionLoop() {
|
||||
|
|
|
@ -3381,7 +3381,7 @@ func TestDeleteTask_PreExecute(t *testing.T) {
|
|||
task.DeleteRequest.CollectionName = collectionName
|
||||
|
||||
task.DeleteRequest.PartitionName = "" // empty
|
||||
assert.Error(t, task.PreExecute(ctx))
|
||||
assert.NoError(t, task.PreExecute(ctx))
|
||||
task.DeleteRequest.PartitionName = partitionName
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue