diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 8ad0fde42a..4ceed03d2d 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -4501,6 +4501,43 @@ func (dt *deleteTask) OnEnqueue() error { return nil } +func (dt *deleteTask) getPChanStats() (map[pChan]pChanStatistics, error) { + ret := make(map[pChan]pChanStatistics) + + channels, err := dt.getChannels() + if err != nil { + return ret, err + } + + beginTs := dt.BeginTs() + endTs := dt.EndTs() + + for _, channel := range channels { + ret[channel] = pChanStatistics{ + minTs: beginTs, + maxTs: endTs, + } + } + return ret, nil +} + +func (dt *deleteTask) getChannels() ([]pChan, error) { + collID, err := globalMetaCache.GetCollectionID(dt.ctx, dt.CollectionName) + if err != nil { + return nil, err + } + var channels []pChan + channels, err = dt.chMgr.getChannels(collID) + if err != nil { + err = dt.chMgr.createDMLMsgStream(collID) + if err != nil { + return nil, err + } + channels, err = dt.chMgr.getChannels(collID) + } + return channels, err +} + func getPrimaryKeysFromExpr(schema *schemapb.CollectionSchema, expr string) (res []int64, err error) { if len(expr) == 0 { log.Warn("empty expr") diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index 9e44395126..9a25445340 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -239,7 +239,10 @@ func (queue *dmTaskQueue) Enqueue(t task) error { if err != nil { return err } - _ = queue.addPChanStats(t) + err = queue.addPChanStats(t) + if err != nil { + return err + } return nil }