mirror of https://github.com/milvus-io/milvus.git
Fix memory leak in indexnode
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/4973/head^2
parent
f0c16a228e
commit
68518fec38
|
@ -214,8 +214,9 @@ IndexWrapper::StoreRawData(const knowhere::DatasetPtr& dataset) {
|
|||
|
||||
/*
|
||||
* brief Return serialized binary set
|
||||
* TODO: use a more efficient method to manage memory, consider std::vector later
|
||||
*/
|
||||
milvus::indexbuilder::IndexWrapper::Binary
|
||||
std::unique_ptr<IndexWrapper::Binary>
|
||||
IndexWrapper::Serialize() {
|
||||
auto binarySet = index_->Serialize(config_);
|
||||
auto index_type = get_index_type();
|
||||
|
@ -238,10 +239,11 @@ IndexWrapper::Serialize() {
|
|||
auto ok = ret.SerializeToString(&serialized_data);
|
||||
Assert(ok);
|
||||
|
||||
auto data = new char[serialized_data.length()];
|
||||
memcpy(data, serialized_data.c_str(), serialized_data.length());
|
||||
auto binary = std::make_unique<IndexWrapper::Binary>();
|
||||
binary->data.resize(serialized_data.length());
|
||||
memcpy(binary->data.data(), serialized_data.c_str(), serialized_data.length());
|
||||
|
||||
return {data, static_cast<int32_t>(serialized_data.length())};
|
||||
return binary;
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
@ -29,11 +29,10 @@ class IndexWrapper {
|
|||
BuildWithoutIds(const knowhere::DatasetPtr& dataset);
|
||||
|
||||
struct Binary {
|
||||
char* data;
|
||||
int32_t size;
|
||||
std::vector<char> data;
|
||||
};
|
||||
|
||||
Binary
|
||||
std::unique_ptr<Binary>
|
||||
Serialize();
|
||||
|
||||
void
|
||||
|
|
|
@ -85,13 +85,12 @@ BuildBinaryVecIndexWithoutIds(CIndex index, int64_t data_size, const uint8_t* ve
|
|||
}
|
||||
|
||||
CStatus
|
||||
SerializeToSlicedBuffer(CIndex index, int32_t* buffer_size, char** res_buffer) {
|
||||
SerializeToSlicedBuffer(CIndex index, CBinary* c_binary) {
|
||||
auto status = CStatus();
|
||||
try {
|
||||
auto cIndex = (milvus::indexbuilder::IndexWrapper*)index;
|
||||
auto binary = cIndex->Serialize();
|
||||
*buffer_size = binary.size;
|
||||
*res_buffer = binary.data;
|
||||
*c_binary = binary.release();
|
||||
status.error_code = Success;
|
||||
status.error_msg = "";
|
||||
} catch (std::exception& e) {
|
||||
|
@ -101,6 +100,25 @@ SerializeToSlicedBuffer(CIndex index, int32_t* buffer_size, char** res_buffer) {
|
|||
return status;
|
||||
}
|
||||
|
||||
int64_t
|
||||
GetCBinarySize(CBinary c_binary) {
|
||||
auto cBinary = (milvus::indexbuilder::IndexWrapper::Binary*)c_binary;
|
||||
return cBinary->data.size();
|
||||
}
|
||||
|
||||
// Note: the memory of data is allocated outside
|
||||
void
|
||||
GetCBinaryData(CBinary c_binary, void* data) {
|
||||
auto cBinary = (milvus::indexbuilder::IndexWrapper::Binary*)c_binary;
|
||||
memcpy(data, cBinary->data.data(), cBinary->data.size());
|
||||
}
|
||||
|
||||
void
|
||||
DeleteCBinary(CBinary c_binary) {
|
||||
auto cBinary = (milvus::indexbuilder::IndexWrapper::Binary*)c_binary;
|
||||
delete cBinary;
|
||||
}
|
||||
|
||||
CStatus
|
||||
LoadFromSlicedBuffer(CIndex index, const char* serialized_sliced_blob_buffer, int32_t size) {
|
||||
auto status = CStatus();
|
||||
|
@ -265,3 +283,8 @@ DeleteIndexQueryResult(CIndexQueryResult res) {
|
|||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
void
|
||||
DeleteByteArray(const char* array) {
|
||||
delete[] array;
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ extern "C" {
|
|||
|
||||
typedef void* CIndex;
|
||||
typedef void* CIndexQueryResult;
|
||||
typedef void* CBinary;
|
||||
|
||||
// TODO: how could we pass map between go and c++ more efficiently?
|
||||
// Solution: using protobuf instead of json, this way significantly increase programming efficiency
|
||||
|
@ -49,7 +50,17 @@ CStatus
|
|||
BuildBinaryVecIndexWithoutIds(CIndex index, int64_t data_size, const uint8_t* vectors);
|
||||
|
||||
CStatus
|
||||
SerializeToSlicedBuffer(CIndex index, int32_t* buffer_size, char** res_buffer);
|
||||
SerializeToSlicedBuffer(CIndex index, CBinary* c_binary);
|
||||
|
||||
int64_t
|
||||
GetCBinarySize(CBinary c_binary);
|
||||
|
||||
// Note: the memory of data is allocated outside
|
||||
void
|
||||
GetCBinaryData(CBinary c_binary, void* data);
|
||||
|
||||
void
|
||||
DeleteCBinary(CBinary c_binary);
|
||||
|
||||
CStatus
|
||||
LoadFromSlicedBuffer(CIndex index, const char* serialized_sliced_blob_buffer, int32_t size);
|
||||
|
@ -92,6 +103,9 @@ GetDistancesOfQueryResult(CIndexQueryResult res, float* distances);
|
|||
CStatus
|
||||
DeleteIndexQueryResult(CIndexQueryResult res);
|
||||
|
||||
void
|
||||
DeleteByteArray(const char* array);
|
||||
|
||||
#ifdef __cplusplus
|
||||
};
|
||||
#endif
|
||||
|
|
|
@ -24,19 +24,20 @@ add_executable(all_tests
|
|||
${MILVUS_TEST_FILES}
|
||||
)
|
||||
|
||||
set(INDEX_BUILDER_TEST_FILES
|
||||
test_index_wrapper.cpp)
|
||||
add_executable(index_builder_test
|
||||
${INDEX_BUILDER_TEST_FILES}
|
||||
)
|
||||
target_link_libraries(index_builder_test
|
||||
gtest
|
||||
gtest_main
|
||||
milvus_segcore
|
||||
milvus_indexbuilder
|
||||
log
|
||||
pthread
|
||||
)
|
||||
# check if memory leak exists in index builder
|
||||
# set(INDEX_BUILDER_TEST_FILES
|
||||
# test_index_wrapper.cpp)
|
||||
# add_executable(index_builder_test
|
||||
# ${INDEX_BUILDER_TEST_FILES}
|
||||
# )
|
||||
# target_link_libraries(index_builder_test
|
||||
# gtest
|
||||
# gtest_main
|
||||
# milvus_segcore
|
||||
# milvus_indexbuilder
|
||||
# log
|
||||
# pthread
|
||||
# )
|
||||
|
||||
target_link_libraries(all_tests
|
||||
gtest
|
||||
|
|
|
@ -556,7 +556,7 @@ TEST(IVFFLATNMWrapper, Codec) {
|
|||
auto binary = index->Serialize();
|
||||
auto copy_index =
|
||||
std::make_unique<milvus::indexbuilder::IndexWrapper>(type_params_str.c_str(), index_params_str.c_str());
|
||||
ASSERT_NO_THROW(copy_index->Load(binary.data, binary.size));
|
||||
ASSERT_NO_THROW(copy_index->Load(binary->data.data(), binary->data.size()));
|
||||
ASSERT_EQ(copy_index->dim(), copy_index->dim());
|
||||
auto copy_binary = copy_index->Serialize();
|
||||
}
|
||||
|
@ -657,13 +657,13 @@ TEST_P(IndexWrapperTest, Codec) {
|
|||
auto binary = index->Serialize();
|
||||
auto copy_index =
|
||||
std::make_unique<milvus::indexbuilder::IndexWrapper>(type_params_str.c_str(), index_params_str.c_str());
|
||||
ASSERT_NO_THROW(copy_index->Load(binary.data, binary.size));
|
||||
ASSERT_NO_THROW(copy_index->Load(binary->data.data(), binary->data.size()));
|
||||
ASSERT_EQ(copy_index->dim(), copy_index->dim());
|
||||
auto copy_binary = copy_index->Serialize();
|
||||
if (!milvus::indexbuilder::is_in_nm_list(index_type)) {
|
||||
// binary may be not same due to uncertain internal map order
|
||||
ASSERT_EQ(binary.size, copy_binary.size);
|
||||
ASSERT_EQ(strcmp(binary.data, copy_binary.data), 0);
|
||||
ASSERT_EQ(binary->data.size(), copy_binary->data.size());
|
||||
ASSERT_EQ(binary->data, copy_binary->data);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -130,14 +130,10 @@ type CIndex struct {
|
|||
}
|
||||
|
||||
func (index *CIndex) Serialize() ([]*Blob, error) {
|
||||
/*
|
||||
CStatus
|
||||
SerializeToSlicedBuffer(CIndex index, int32_t* buffer_size, char** res_buffer);
|
||||
*/
|
||||
var cBinary C.CBinary
|
||||
defer C.DeleteCBinary(cBinary)
|
||||
|
||||
var cDumpedSlicedBuffer *C.char
|
||||
var bufferSize int32
|
||||
status := C.SerializeToSlicedBuffer(index.indexPtr, (*C.int32_t)(unsafe.Pointer(&bufferSize)), &cDumpedSlicedBuffer)
|
||||
status := C.SerializeToSlicedBuffer(index.indexPtr, &cBinary)
|
||||
errorCode := status.error_code
|
||||
if errorCode != 0 {
|
||||
errorMsg := C.GoString(status.error_msg)
|
||||
|
@ -145,11 +141,12 @@ func (index *CIndex) Serialize() ([]*Blob, error) {
|
|||
return nil, errors.New("SerializeToSlicedBuffer failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
|
||||
}
|
||||
|
||||
defer C.free(unsafe.Pointer(cDumpedSlicedBuffer))
|
||||
binarySize := C.GetCBinarySize(cBinary)
|
||||
binaryData := make([]byte, binarySize)
|
||||
C.GetCBinaryData(cBinary, unsafe.Pointer(&binaryData[0]))
|
||||
|
||||
dumpedSlicedBuffer := C.GoBytes(unsafe.Pointer(cDumpedSlicedBuffer), (C.int32_t)(bufferSize))
|
||||
var blobs indexcgopb.BinarySet
|
||||
err := proto.Unmarshal(dumpedSlicedBuffer, &blobs)
|
||||
err := proto.Unmarshal(binaryData, &blobs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -195,6 +195,12 @@ func (it *IndexBuildTask) Execute() error {
|
|||
fmt.Println("NewCIndex err:", err.Error())
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err = it.index.Delete()
|
||||
if err != nil {
|
||||
log.Print("CIndexDelete Failed")
|
||||
}
|
||||
}()
|
||||
|
||||
getKeyByPathNaive := func(path string) string {
|
||||
// splitElements := strings.Split(path, "/")
|
||||
|
@ -301,9 +307,9 @@ func (it *IndexBuildTask) Execute() error {
|
|||
it.savePaths = append(it.savePaths, savePath)
|
||||
}
|
||||
}
|
||||
err = it.index.Delete()
|
||||
if err != nil {
|
||||
log.Print("CIndexDelete Failed")
|
||||
}
|
||||
// err = it.index.Delete()
|
||||
// if err != nil {
|
||||
// log.Print("CIndexDelete Failed")
|
||||
// }
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue