mirror of https://github.com/milvus-io/milvus.git
Improve check of purge memory and change purge place (#18271)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/18306/head
parent
bb2ad23e55
commit
73aef14820
|
@ -43,6 +43,7 @@ ParseMallocInfo() {
|
|||
size_t buffer_size;
|
||||
FILE* stream;
|
||||
stream = open_memstream(&mem_buffer, &buffer_size);
|
||||
AssertInfo(stream, "null stream file when open_memstream");
|
||||
// malloc_info(0, stdout);
|
||||
|
||||
/*
|
||||
|
@ -55,13 +56,15 @@ ParseMallocInfo() {
|
|||
* <https://sourceware.org/glibc/wiki/MallocInternals>
|
||||
* <https://code.woboq.org/userspace/glibc/malloc/malloc.c.html#5378>
|
||||
*/
|
||||
malloc_info(0, stream);
|
||||
auto ret = malloc_info(0, stream);
|
||||
AssertInfo(ret == 0, "malloc_info failed");
|
||||
fflush(stream);
|
||||
|
||||
rapidxml::xml_document<> doc; // character type defaults to char
|
||||
doc.parse<0>(mem_buffer); // 0 means default parse flags
|
||||
|
||||
rapidxml::xml_node<>* malloc_root_node = doc.first_node();
|
||||
AssertInfo(malloc_root_node, "null malloc_root_node detected when ParseMallocInfo");
|
||||
auto total_fast_node = malloc_root_node->first_node()->next_sibling("total");
|
||||
AssertInfo(total_fast_node, "null total_fast_node detected when ParseMallocInfo");
|
||||
auto total_fast_size = std::stoul(total_fast_node->first_attribute("size")->value());
|
||||
|
|
|
@ -137,7 +137,10 @@ func getSearchResultDataBlob(cSearchResultDataBlobs searchResultDataBlobs, blobI
|
|||
|
||||
func deleteSearchResultDataBlobs(cSearchResultDataBlobs searchResultDataBlobs) {
|
||||
C.DeleteSearchResultDataBlobs(cSearchResultDataBlobs)
|
||||
// try to do a purgeMemory operation after DeleteSearchResultDataBlobs
|
||||
}
|
||||
|
||||
func purgeMemoryAfterReduce() {
|
||||
// try to do a purgeMemory operation after reduce
|
||||
usedMem := metricsinfo.GetUsedMemoryCount()
|
||||
if usedMem == 0 {
|
||||
log.Error("Get 0 usedMemory when deleteSearchResultDataBlobs, which is unexpected")
|
||||
|
|
|
@ -120,6 +120,7 @@ func (s *searchTask) searchOnStreaming() error {
|
|||
zap.Int64("collectionID", s.CollectionID), zap.Error(sErr))
|
||||
return sErr
|
||||
}
|
||||
defer purgeMemoryAfterReduce()
|
||||
defer deleteSearchResults(partResults)
|
||||
return s.reduceResults(searchReq, partResults)
|
||||
}
|
||||
|
@ -155,6 +156,7 @@ func (s *searchTask) searchOnHistorical() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer purgeMemoryAfterReduce()
|
||||
defer deleteSearchResults(partResults)
|
||||
return s.reduceResults(searchReq, partResults)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue