mirror of https://github.com/milvus-io/milvus.git
fix bug caused by bloomfilter file not release (#3173)
* fix bug caused by bloomfilter file not release Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com> * fix bug caused by apply delete Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com> * cancel annotation Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com> Co-authored-by: shengjun.li <shengjun.li@zilliz.com>pull/3177/head
parent
237e909e7c
commit
f9e8c16e29
|
@ -167,7 +167,6 @@ DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
|
|||
|
||||
auto ctx = context;
|
||||
|
||||
LOG_SERVER_DEBUG_ << "check id auto gen";
|
||||
// default id is auto-generated
|
||||
auto params = ctx.collection->GetParams();
|
||||
if (params.find(PARAM_UID_AUTOGEN) == params.end()) {
|
||||
|
@ -175,7 +174,6 @@ DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
|
|||
ctx.collection->SetParams(params);
|
||||
}
|
||||
|
||||
LOG_SERVER_DEBUG_ << "check uid existence";
|
||||
// check uid existence
|
||||
snapshot::FieldPtr uid_field;
|
||||
for (auto& pair : ctx.fields_schema) {
|
||||
|
@ -185,7 +183,6 @@ DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
|
|||
}
|
||||
}
|
||||
|
||||
LOG_SERVER_DEBUG_ << "add uid field";
|
||||
// add uid field if not specified
|
||||
if (uid_field == nullptr) {
|
||||
uid_field = std::make_shared<snapshot::Field>(FIELD_UID, 0, DataType::INT64);
|
||||
|
@ -198,9 +195,7 @@ DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
|
|||
0, 0, ELEMENT_DELETED_DOCS, milvus::engine::FieldElementType::FET_DELETED_DOCS);
|
||||
ctx.fields_schema[uid_field] = {bloom_filter_element, delete_doc_element};
|
||||
|
||||
LOG_SERVER_DEBUG_ << "Create Collection Operation";
|
||||
auto op = std::make_shared<snapshot::CreateCollectionOperation>(ctx);
|
||||
LOG_SERVER_DEBUG_ << "Create Collection Operation end";
|
||||
return op->Push();
|
||||
}
|
||||
|
||||
|
|
|
@ -183,22 +183,25 @@ MemCollection::ApplyDeletes() {
|
|||
auto seg_visitor = engine::SegmentVisitor::Build(ss, segment->GetID());
|
||||
segment::SegmentReaderPtr segment_reader =
|
||||
std::make_shared<segment::SegmentReader>(options_.meta_.path_, seg_visitor);
|
||||
segment::IdBloomFilterPtr pre_bloom_filter;
|
||||
STATUS_CHECK(segment_reader->LoadBloomFilter(pre_bloom_filter));
|
||||
std::vector<engine::id_t> uids;
|
||||
STATUS_CHECK(segment_reader->LoadUids(uids));
|
||||
|
||||
// Step 1: Check delete_id in mem
|
||||
std::vector<id_t> delete_ids;
|
||||
for (auto& id : doc_ids_to_delete_) {
|
||||
if (pre_bloom_filter->Check(id)) {
|
||||
delete_ids.push_back(id);
|
||||
{
|
||||
segment::IdBloomFilterPtr pre_bloom_filter;
|
||||
STATUS_CHECK(segment_reader->LoadBloomFilter(pre_bloom_filter));
|
||||
for (auto& id : doc_ids_to_delete_) {
|
||||
if (pre_bloom_filter->Check(id)) {
|
||||
delete_ids.push_back(id);
|
||||
}
|
||||
}
|
||||
|
||||
if (delete_ids.empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
if (delete_ids.empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
std::vector<engine::id_t> uids;
|
||||
STATUS_CHECK(segment_reader->LoadUids(uids));
|
||||
|
||||
// Step 2: Load previous delete_id and merge into 'delete_ids'
|
||||
segment::DeletedDocsPtr prev_del_docs;
|
||||
|
@ -274,24 +277,27 @@ MemCollection::ApplyDeletes() {
|
|||
snapshot::GetResPath<snapshot::SegmentFile>(collection_root_path, bloom_filter_file);
|
||||
|
||||
// Step 5: Write to file
|
||||
segment::IdBloomFilterPtr bloom_filter;
|
||||
STATUS_CHECK(segment_writer->CreateBloomFilter(bloom_filter_file_path, bloom_filter));
|
||||
auto delete_docs = std::make_shared<segment::DeletedDocs>();
|
||||
|
||||
for (size_t i = 0; i < uids.size(); i++) {
|
||||
if (std::binary_search(ids_to_check.begin(), ids_to_check.end(), uids[i])) {
|
||||
delete_docs->AddDeletedDoc(i);
|
||||
} else {
|
||||
bloom_filter->Add(uids[i]);
|
||||
{
|
||||
segment::IdBloomFilterPtr bloom_filter;
|
||||
STATUS_CHECK(segment_writer->CreateBloomFilter(bloom_filter_file_path, bloom_filter));
|
||||
auto delete_docs = std::make_shared<segment::DeletedDocs>();
|
||||
std::vector<id_t> uids;
|
||||
STATUS_CHECK(segment_reader->LoadUids(uids));
|
||||
for (size_t i = 0; i < uids.size(); i++) {
|
||||
if (std::binary_search(ids_to_check.begin(), ids_to_check.end(), uids[i])) {
|
||||
delete_docs->AddDeletedDoc(i);
|
||||
} else {
|
||||
bloom_filter->Add(uids[i]);
|
||||
}
|
||||
}
|
||||
|
||||
STATUS_CHECK(
|
||||
segments_op->CommitRowCountDelta(segment->GetID(), delete_docs->GetCount() - pre_del_ids.size(), true));
|
||||
|
||||
STATUS_CHECK(segment_writer->WriteDeletedDocs(del_docs_path, delete_docs));
|
||||
STATUS_CHECK(segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter));
|
||||
}
|
||||
|
||||
STATUS_CHECK(
|
||||
segments_op->CommitRowCountDelta(segment->GetID(), delete_docs->GetCount() - pre_del_ids.size(), true));
|
||||
|
||||
STATUS_CHECK(segment_writer->WriteDeletedDocs(del_docs_path, delete_docs));
|
||||
STATUS_CHECK(segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter));
|
||||
|
||||
delete_file->SetSize(CommonUtil::GetFileSize(del_docs_path + codec::DeletedDocsFormat::FilePostfix()));
|
||||
bloom_filter_file->SetSize(
|
||||
CommonUtil::GetFileSize(bloom_filter_file_path + codec::IdBloomFilterFormat::FilePostfix()));
|
||||
|
|
|
@ -473,6 +473,7 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
|
|||
|
||||
if (id_bloom_filter_ptr) {
|
||||
segment_ptr_->SetBloomFilter(id_bloom_filter_ptr);
|
||||
// TODO: disable cache for solving bloom filter ptr problem
|
||||
cache::CpuCacheMgr::GetInstance().InsertItem(file_path, id_bloom_filter_ptr); // put into cache
|
||||
}
|
||||
} catch (std::exception& e) {
|
||||
|
|
|
@ -60,13 +60,8 @@ CreateCollectionReq::OnExecute() {
|
|||
|
||||
// step 2: create snapshot collection context
|
||||
engine::snapshot::CreateCollectionContext create_collection_context;
|
||||
LOG_SERVER_DEBUG_ << "make collection_schema";
|
||||
auto collection_schema = std::make_shared<engine::snapshot::Collection>(collection_name_, extra_params_);
|
||||
if (collection_schema == nullptr) {
|
||||
LOG_SERVER_DEBUG_ << "collection_schema null";
|
||||
}
|
||||
|
||||
LOG_SERVER_DEBUG_ << "create_collection_context";
|
||||
create_collection_context.collection = collection_schema;
|
||||
for (auto& field_kv : fields_) {
|
||||
auto& field_name = field_kv.first;
|
||||
|
@ -83,7 +78,6 @@ CreateCollectionReq::OnExecute() {
|
|||
index_name = index_params["name"];
|
||||
}
|
||||
|
||||
LOG_SERVER_DEBUG_ << "checkout Default_UID_NAME";
|
||||
// validate id field
|
||||
if (field_name == engine::FIELD_UID) {
|
||||
if (field_type != engine::DataType::INT64) {
|
||||
|
@ -110,11 +104,9 @@ CreateCollectionReq::OnExecute() {
|
|||
}
|
||||
|
||||
// step 3: create collection
|
||||
LOG_SERVER_FATAL_ << "create collection";
|
||||
status = DBWrapper::DB()->CreateCollection(create_collection_context);
|
||||
fiu_do_on("CreateCollectionReq.OnExecute.invalid_db_execute",
|
||||
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
|
||||
LOG_SERVER_FATAL_ << "create collection end";
|
||||
if (!status.ok()) {
|
||||
// collection could exist
|
||||
if (status.code() == DB_ALREADY_EXIST) {
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include "murmur.h"
|
||||
#include "dablooms.h"
|
||||
|
||||
|
@ -312,6 +314,10 @@ int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len)
|
|||
|
||||
int free_scaling_bloom(scaling_bloom_t *bloom)
|
||||
{
|
||||
if(close(bloom->fd) == -1) {
|
||||
std::cerr << " Close fd " << bloom->fd << "Failed: " << strerror(errno) << std::endl;
|
||||
}
|
||||
|
||||
int i;
|
||||
for (i = bloom->num_blooms - 1; i >= 0; i--) {
|
||||
free(bloom->blooms[i]->hashes);
|
||||
|
|
|
@ -1026,6 +1026,7 @@ class TestSearchDSL(object):
|
|||
return request.param
|
||||
|
||||
# TODO
|
||||
@pytest.mark.level(2)
|
||||
def test_query_term_wrong_format(self, connect, collection, get_invalid_term):
|
||||
'''
|
||||
method: build query with wrong format term
|
||||
|
@ -1089,6 +1090,7 @@ class TestSearchDSL(object):
|
|||
return request.param
|
||||
|
||||
# TODO
|
||||
@pytest.mark.level(2)
|
||||
def test_query_range_wrong_format(self, connect, collection, get_invalid_range):
|
||||
'''
|
||||
method: build query with wrong format range
|
||||
|
|
Loading…
Reference in New Issue