diff --git a/internal/querynodev2/pipeline/delete_node.go b/internal/querynodev2/pipeline/delete_node.go index a408f98f91..04773b4eaa 100644 --- a/internal/querynodev2/pipeline/delete_node.go +++ b/internal/querynodev2/pipeline/delete_node.go @@ -66,14 +66,13 @@ func (dNode *deleteNode) Operate(in Msg) Msg { metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Dec() nodeMsg := in.(*deleteNodeMsg) - // partition id = > DeleteData - deleteDatas := make(map[UniqueID]*delegator.DeleteData) + if len(nodeMsg.deleteMsgs) > 0 { + // partition id = > DeleteData + deleteDatas := make(map[UniqueID]*delegator.DeleteData) - for _, msg := range nodeMsg.deleteMsgs { - dNode.addDeleteData(deleteDatas, msg) - } - - if len(deleteDatas) > 0 { + for _, msg := range nodeMsg.deleteMsgs { + dNode.addDeleteData(deleteDatas, msg) + } // do Delete, use ts range max as ts dNode.delegator.ProcessDelete(lo.Values(deleteDatas), nodeMsg.timeRange.timestampMax) } diff --git a/internal/querynodev2/pipeline/insert_node.go b/internal/querynodev2/pipeline/insert_node.go index b2b56c8cc5..6ae9368501 100644 --- a/internal/querynodev2/pipeline/insert_node.go +++ b/internal/querynodev2/pipeline/insert_node.go @@ -90,24 +90,26 @@ func (iNode *insertNode) Operate(in Msg) Msg { metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Dec() nodeMsg := in.(*insertNodeMsg) - sort.Slice(nodeMsg.insertMsgs, func(i, j int) bool { - return nodeMsg.insertMsgs[i].BeginTs() < nodeMsg.insertMsgs[j].BeginTs() - }) + if len(nodeMsg.insertMsgs) > 0 { + sort.Slice(nodeMsg.insertMsgs, func(i, j int) bool { + return nodeMsg.insertMsgs[i].BeginTs() < nodeMsg.insertMsgs[j].BeginTs() + }) - insertDatas := make(map[UniqueID]*delegator.InsertData) - collection := iNode.manager.Collection.Get(iNode.collectionID) - if collection == nil { - log.Error("insertNode with collection not exist", zap.Int64("collection", iNode.collectionID)) - panic("insertNode with collection not exist") + insertDatas := make(map[UniqueID]*delegator.InsertData) + collection := iNode.manager.Collection.Get(iNode.collectionID) + if collection == nil { + log.Error("insertNode with collection not exist", zap.Int64("collection", iNode.collectionID)) + panic("insertNode with collection not exist") + } + + // get InsertData and merge datas of same segment + for _, msg := range nodeMsg.insertMsgs { + iNode.addInsertData(insertDatas, msg, collection) + } + + iNode.delegator.ProcessInsert(insertDatas) } - // get InsertData and merge datas of same segment - for _, msg := range nodeMsg.insertMsgs { - iNode.addInsertData(insertDatas, msg, collection) - } - - iNode.delegator.ProcessInsert(insertDatas) - metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Inc() return &deleteNodeMsg{