mirror of https://github.com/milvus-io/milvus.git
Add delete task channel stats interface (#12250)
Signed-off-by: godchen <qingxiang.chen@zilliz.com>pull/12236/head
parent
6c4c0ef6b5
commit
927f72d588
|
@ -4501,6 +4501,43 @@ func (dt *deleteTask) OnEnqueue() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (dt *deleteTask) getPChanStats() (map[pChan]pChanStatistics, error) {
|
||||
ret := make(map[pChan]pChanStatistics)
|
||||
|
||||
channels, err := dt.getChannels()
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
|
||||
beginTs := dt.BeginTs()
|
||||
endTs := dt.EndTs()
|
||||
|
||||
for _, channel := range channels {
|
||||
ret[channel] = pChanStatistics{
|
||||
minTs: beginTs,
|
||||
maxTs: endTs,
|
||||
}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (dt *deleteTask) getChannels() ([]pChan, error) {
|
||||
collID, err := globalMetaCache.GetCollectionID(dt.ctx, dt.CollectionName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var channels []pChan
|
||||
channels, err = dt.chMgr.getChannels(collID)
|
||||
if err != nil {
|
||||
err = dt.chMgr.createDMLMsgStream(collID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
channels, err = dt.chMgr.getChannels(collID)
|
||||
}
|
||||
return channels, err
|
||||
}
|
||||
|
||||
func getPrimaryKeysFromExpr(schema *schemapb.CollectionSchema, expr string) (res []int64, err error) {
|
||||
if len(expr) == 0 {
|
||||
log.Warn("empty expr")
|
||||
|
|
|
@ -239,7 +239,10 @@ func (queue *dmTaskQueue) Enqueue(t task) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_ = queue.addPChanStats(t)
|
||||
err = queue.addPChanStats(t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue