mirror of https://github.com/milvus-io/milvus.git
fix: delete by expr failed at retry progress (#35241)
issue: #35240 delete by expr shard the same err object between channels, so if one channel's query failed, it will fail all channel, which will break channel level retry policy, and make delete operation failed. Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/35274/head
parent
4b553b0333
commit
c22b0a636e
|
@ -249,7 +249,6 @@ type deleteRunner struct {
|
|||
ts uint64
|
||||
lb LBPolicy
|
||||
count atomic.Int64
|
||||
err error
|
||||
|
||||
// task queue
|
||||
queue *dmTaskQueue
|
||||
|
@ -441,7 +440,11 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
|
|||
}
|
||||
|
||||
taskCh := make(chan *deleteTask, 256)
|
||||
go dr.receiveQueryResult(ctx, client, taskCh, partitionIDs)
|
||||
var receiveErr error
|
||||
go func() {
|
||||
receiveErr = dr.receiveQueryResult(ctx, client, taskCh, partitionIDs)
|
||||
close(taskCh)
|
||||
}()
|
||||
var allQueryCnt int64
|
||||
// wait all task finish
|
||||
for task := range taskCh {
|
||||
|
@ -454,51 +457,43 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
|
|||
}
|
||||
|
||||
// query or produce task failed
|
||||
if dr.err != nil {
|
||||
return dr.err
|
||||
if receiveErr != nil {
|
||||
return receiveErr
|
||||
}
|
||||
dr.allQueryCnt.Add(allQueryCnt)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask, partitionIDs []int64) {
|
||||
defer func() {
|
||||
close(taskCh)
|
||||
}()
|
||||
|
||||
func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask, partitionIDs []int64) error {
|
||||
for {
|
||||
result, err := client.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
log.Debug("query stream for delete finished", zap.Int64("msgID", dr.msgID))
|
||||
return
|
||||
return nil
|
||||
}
|
||||
dr.err = err
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
err = merr.Error(result.GetStatus())
|
||||
if err != nil {
|
||||
dr.err = err
|
||||
log.Warn("query stream for delete get error status", zap.Int64("msgID", dr.msgID), zap.Error(err))
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
if dr.limiter != nil {
|
||||
err := dr.limiter.Alloc(ctx, dr.dbID, map[int64][]int64{dr.collectionID: partitionIDs}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds()))
|
||||
if err != nil {
|
||||
dr.err = err
|
||||
log.Warn("query stream for delete failed because rate limiter", zap.Int64("msgID", dr.msgID), zap.Error(err))
|
||||
return
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
task, err := dr.produce(ctx, result.GetIds())
|
||||
if err != nil {
|
||||
dr.err = err
|
||||
log.Warn("produce delete task failed", zap.Error(err))
|
||||
return
|
||||
return err
|
||||
}
|
||||
task.allQueryCnt = result.GetAllRetrieveCount()
|
||||
|
||||
|
|
Loading…
Reference in New Issue