enhance: Remove useless ops when there is no write (#34767)

Related to #33235

THe querynode pipeline will make map & call ProcessInsert when there is
no write messages. So querynodes will have high CPU usage even when
there is no workload.

This PR check msg length before composing data struct and calling method

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/34773/head^2
congqixia 2024-07-19 14:31:42 +08:00 committed by GitHub
parent 8e64bf929c
commit 2ac7164c39
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 23 additions and 22 deletions

View File

@ -66,14 +66,13 @@ func (dNode *deleteNode) Operate(in Msg) Msg {
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Dec()
nodeMsg := in.(*deleteNodeMsg)
// partition id = > DeleteData
deleteDatas := make(map[UniqueID]*delegator.DeleteData)
if len(nodeMsg.deleteMsgs) > 0 {
// partition id = > DeleteData
deleteDatas := make(map[UniqueID]*delegator.DeleteData)
for _, msg := range nodeMsg.deleteMsgs {
dNode.addDeleteData(deleteDatas, msg)
}
if len(deleteDatas) > 0 {
for _, msg := range nodeMsg.deleteMsgs {
dNode.addDeleteData(deleteDatas, msg)
}
// do Delete, use ts range max as ts
dNode.delegator.ProcessDelete(lo.Values(deleteDatas), nodeMsg.timeRange.timestampMax)
}

View File

@ -90,24 +90,26 @@ func (iNode *insertNode) Operate(in Msg) Msg {
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Dec()
nodeMsg := in.(*insertNodeMsg)
sort.Slice(nodeMsg.insertMsgs, func(i, j int) bool {
return nodeMsg.insertMsgs[i].BeginTs() < nodeMsg.insertMsgs[j].BeginTs()
})
if len(nodeMsg.insertMsgs) > 0 {
sort.Slice(nodeMsg.insertMsgs, func(i, j int) bool {
return nodeMsg.insertMsgs[i].BeginTs() < nodeMsg.insertMsgs[j].BeginTs()
})
insertDatas := make(map[UniqueID]*delegator.InsertData)
collection := iNode.manager.Collection.Get(iNode.collectionID)
if collection == nil {
log.Error("insertNode with collection not exist", zap.Int64("collection", iNode.collectionID))
panic("insertNode with collection not exist")
insertDatas := make(map[UniqueID]*delegator.InsertData)
collection := iNode.manager.Collection.Get(iNode.collectionID)
if collection == nil {
log.Error("insertNode with collection not exist", zap.Int64("collection", iNode.collectionID))
panic("insertNode with collection not exist")
}
// get InsertData and merge datas of same segment
for _, msg := range nodeMsg.insertMsgs {
iNode.addInsertData(insertDatas, msg, collection)
}
iNode.delegator.ProcessInsert(insertDatas)
}
// get InsertData and merge datas of same segment
for _, msg := range nodeMsg.insertMsgs {
iNode.addInsertData(insertDatas, msg, collection)
}
iNode.delegator.ProcessInsert(insertDatas)
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Inc()
return &deleteNodeMsg{