mirror of https://github.com/milvus-io/milvus.git
Remove blacklist index from VecIndex (#4239)
Signed-off-by: shengjun.li <shengjun.li@zilliz.com>pull/4263/head
parent
3c181d8f74
commit
c71a16fb8d
|
@ -29,6 +29,7 @@ Please mark all changes in change log and use the issue from GitHub
|
|||
- \#4200 The scalar field information of search result is wrong after some entities deleted
|
||||
- \#4205 Restful API error: When the specified partition is retrieved, the partition_tag is not valid
|
||||
- \#4226 The Prometheus performance metrics sent by Milvus 0.11.0 is not right
|
||||
- \#4242 Fix query hang up at unittest case
|
||||
|
||||
## Feature
|
||||
- \#4163 Update C++ sdk search interface
|
||||
|
|
|
@ -98,6 +98,10 @@ fi
|
|||
|
||||
# run unittest
|
||||
for test in `ls ${UNITTEST_DIR}`; do
|
||||
if [[ ${test} == *".log" ]] || [[ ${test} == *".info" ]]; then
|
||||
echo "skip file ${test}"
|
||||
continue
|
||||
fi
|
||||
echo $test " running..."
|
||||
# run unittest
|
||||
${UNITTEST_DIR}/${test}
|
||||
|
|
|
@ -5,10 +5,7 @@ conf/log_config.conf
|
|||
src/config.h
|
||||
src/version.h
|
||||
lcov_out/
|
||||
base.info
|
||||
output.info
|
||||
output_new.info
|
||||
server.info
|
||||
*.info
|
||||
*.pyc
|
||||
src/grpc/python_gen.h
|
||||
src/grpc/python/
|
||||
|
|
|
@ -27,6 +27,10 @@ fi
|
|||
|
||||
# run unittest
|
||||
for test in `ls ${UNITTEST_DIR}`; do
|
||||
if [[ ${test} == *".log" ]] || [[ ${test} == *".info" ]]; then
|
||||
echo "skip file ${test}"
|
||||
continue
|
||||
fi
|
||||
echo $test "running..."
|
||||
# run unittest
|
||||
./${UNITTEST_DIR}/${test}
|
||||
|
|
|
@ -28,6 +28,7 @@ set( DB_META_FILES ${DB_META_MAIN_FILES}
|
|||
|
||||
aux_source_directory( ${MILVUS_ENGINE_SRC}/index/archive WRAPPER_FILES )
|
||||
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/server/context SERVER_CONTEXT_FILES)
|
||||
|
||||
aux_source_directory( ${MILVUS_THIRDPARTY_SRC}/easyloggingpp THIRDPARTY_EASYLOGGINGPP_FILES )
|
||||
aux_source_directory( ${MILVUS_THIRDPARTY_SRC}/nlohmann THIRDPARTY_NLOHMANN_FILES )
|
||||
|
@ -46,6 +47,7 @@ set( ENGINE_FILES ${DB_MAIN_FILES}
|
|||
${DB_TRANSCRIPT_FILES}
|
||||
${THIRDPARTY_FILES}
|
||||
${WRAPPER_FILES}
|
||||
${SERVER_CONTEXT_FILES}
|
||||
)
|
||||
|
||||
|
||||
|
@ -123,7 +125,6 @@ endif ()
|
|||
# **************************** Link Libraries with milvus engine ****************************
|
||||
target_link_libraries( milvus_engine
|
||||
PUBLIC knowhere
|
||||
server
|
||||
segment
|
||||
cache
|
||||
codecs
|
||||
|
|
|
@ -160,7 +160,7 @@ struct AttrsData {
|
|||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
struct QueryResult {
|
||||
uint64_t row_num_;
|
||||
uint64_t row_num_ = 0;
|
||||
engine::ResultIds result_ids_;
|
||||
engine::ResultDistances result_distances_;
|
||||
engine::DataChunkPtr data_chunk_;
|
||||
|
|
|
@ -45,8 +45,7 @@
|
|||
#include "knowhere/index/vector_index/helpers/Cloner.h"
|
||||
#endif
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace milvus::engine {
|
||||
|
||||
ExecutionEngineImpl::ExecutionEngineImpl(const std::string& dir_root, const SegmentVisitorPtr& segment_visitor)
|
||||
: gpu_enable_(config.gpu.enable()) {
|
||||
|
@ -292,33 +291,17 @@ ExecutionEngineImpl::Search(ExecutionEngineContext& context) {
|
|||
segment::DeletedDocsPtr deleted_docs_ptr;
|
||||
segment_reader_->LoadDeletedDocs(deleted_docs_ptr);
|
||||
if (deleted_docs_ptr) {
|
||||
faiss::ConcurrentBitsetPtr del_bitset = std::make_shared<faiss::ConcurrentBitset>(entity_count_);
|
||||
auto& del_docs = deleted_docs_ptr->GetDeletedDocs();
|
||||
for (auto& offset : del_docs) {
|
||||
del_bitset->set(offset);
|
||||
}
|
||||
if (bitset != nullptr) {
|
||||
auto del_bitset = deleted_docs_ptr->GetBlacklist();
|
||||
if (bitset != nullptr && del_bitset != nullptr) {
|
||||
filter_list = (*bitset) | (*del_bitset);
|
||||
} else {
|
||||
filter_list = del_bitset;
|
||||
filter_list = (bitset != nullptr) ? bitset : del_bitset;
|
||||
}
|
||||
} else {
|
||||
filter_list = bitset;
|
||||
}
|
||||
|
||||
// TODO(yhz): The black list is obtain from deleted docs above,
|
||||
// there is no need to get blacklist from index.
|
||||
// list = vec_index->GetBlacklist();
|
||||
// if (list != nullptr) {
|
||||
// if (filter_list != nullptr) {
|
||||
// list = (*list) | (*filter_list);
|
||||
// }
|
||||
// } else {
|
||||
// if (filter_list != nullptr) {
|
||||
// list = filter_list;
|
||||
// }
|
||||
// }
|
||||
|
||||
auto& vector_param = context.query_ptr_->vectors.at(vector_placeholder);
|
||||
if (!vector_param->query_vector.float_data.empty()) {
|
||||
vector_param->nq = vector_param->query_vector.float_data.size() / vec_index->Dim();
|
||||
|
@ -790,18 +773,15 @@ ExecutionEngineImpl::BuildKnowhereIndex(const std::string& field_name, const Col
|
|||
LOG_ENGINE_DEBUG_ << "Index config: " << conf.dump();
|
||||
|
||||
std::shared_ptr<std::vector<idx_t>> uids;
|
||||
ConCurrentBitsetPtr blacklist;
|
||||
knowhere::DatasetPtr dataset;
|
||||
if (from_index) {
|
||||
dataset =
|
||||
knowhere::GenDatasetWithIds(row_count, dimension, from_index->GetRawVectors(), from_index->GetRawIds());
|
||||
uids = from_index->GetUids();
|
||||
blacklist = from_index->GetBlacklist();
|
||||
} else if (bin_from_index) {
|
||||
dataset = knowhere::GenDatasetWithIds(row_count, dimension, bin_from_index->GetRawVectors(),
|
||||
bin_from_index->GetRawIds());
|
||||
uids = bin_from_index->GetUids();
|
||||
blacklist = bin_from_index->GetBlacklist();
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -820,10 +800,8 @@ ExecutionEngineImpl::BuildKnowhereIndex(const std::string& field_name, const Col
|
|||
#endif
|
||||
|
||||
new_index->SetUids(uids);
|
||||
new_index->SetBlacklist(blacklist);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
} // namespace milvus::engine
|
||||
|
|
|
@ -84,16 +84,6 @@ class VecIndex : public Index {
|
|||
}
|
||||
#endif
|
||||
|
||||
faiss::ConcurrentBitsetPtr
|
||||
GetBlacklist() {
|
||||
return bitset_;
|
||||
}
|
||||
|
||||
void
|
||||
SetBlacklist(faiss::ConcurrentBitsetPtr bitset_ptr) {
|
||||
bitset_ = std::move(bitset_ptr);
|
||||
}
|
||||
|
||||
std::shared_ptr<std::vector<IDType>>
|
||||
GetUids() const {
|
||||
return uids_;
|
||||
|
@ -104,11 +94,6 @@ class VecIndex : public Index {
|
|||
uids_ = uids;
|
||||
}
|
||||
|
||||
size_t
|
||||
BlacklistSize() {
|
||||
return bitset_ ? bitset_->size() * sizeof(uint8_t) : 0;
|
||||
}
|
||||
|
||||
size_t
|
||||
UidsSize() {
|
||||
return uids_ ? uids_->size() * sizeof(IDType) : 0;
|
||||
|
@ -133,7 +118,7 @@ class VecIndex : public Index {
|
|||
|
||||
int64_t
|
||||
Size() override {
|
||||
return BlacklistSize() + UidsSize() + IndexSize();
|
||||
return UidsSize() + IndexSize();
|
||||
}
|
||||
|
||||
protected:
|
||||
|
@ -141,9 +126,6 @@ class VecIndex : public Index {
|
|||
IndexMode index_mode_ = IndexMode::MODE_CPU;
|
||||
std::shared_ptr<std::vector<IDType>> uids_ = nullptr;
|
||||
int64_t index_size_ = -1;
|
||||
|
||||
private:
|
||||
faiss::ConcurrentBitsetPtr bitset_ = nullptr;
|
||||
};
|
||||
|
||||
using VecIndexPtr = std::shared_ptr<VecIndex>;
|
||||
|
|
|
@ -28,7 +28,6 @@ namespace cloner {
|
|||
void
|
||||
CopyIndexData(const VecIndexPtr& dst_index, const VecIndexPtr& src_index) {
|
||||
dst_index->SetUids(src_index->GetUids());
|
||||
dst_index->SetBlacklist(src_index->GetBlacklist());
|
||||
dst_index->SetIndexSize(src_index->IndexSize());
|
||||
}
|
||||
|
||||
|
|
|
@ -294,8 +294,7 @@ main() {
|
|||
|
||||
printf("----------------search xq with delete------------\n");
|
||||
{ // search xq with delete
|
||||
index.SetBlacklist(bitset);
|
||||
auto res = index.Query(query_dataset, query_conf);
|
||||
auto res = index.Query(query_dataset, query_conf, bitset);
|
||||
auto I = res->Get<int64_t*>(milvus::knowhere::meta::IDS);
|
||||
|
||||
printf("I=\n");
|
||||
|
|
|
@ -271,8 +271,7 @@ main() {
|
|||
|
||||
printf("----------------search xq with delete------------\n");
|
||||
{ // search xq with delete
|
||||
index.SetBlacklist(bitset);
|
||||
auto res = index.Query(query_dataset, query_conf);
|
||||
auto res = index.Query(query_dataset, query_conf, bitset);
|
||||
auto I = res->Get<int64_t*>(milvus::knowhere::meta::IDS);
|
||||
|
||||
printf("I=\n");
|
||||
|
|
|
@ -11,8 +11,8 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "MetricBase.h"
|
||||
#include "db/Types.h"
|
||||
#include "metrics/MetricBase.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace server {
|
||||
|
@ -87,7 +87,7 @@ class CollectQueryMetrics : CollectMetricsBase {
|
|||
explicit CollectQueryMetrics(size_t nq) : nq_(nq) {
|
||||
}
|
||||
|
||||
~CollectQueryMetrics() {
|
||||
~CollectQueryMetrics() override {
|
||||
if (nq_ > 0) {
|
||||
auto total_time = TimeFromBegine();
|
||||
for (size_t i = 0; i < nq_; ++i) {
|
||||
|
|
|
@ -70,7 +70,7 @@ struct RangeQuery {
|
|||
using RangeQueryPtr = std::shared_ptr<RangeQuery>;
|
||||
|
||||
struct VectorRecord {
|
||||
size_t vector_count;
|
||||
size_t vector_count = 0;
|
||||
std::vector<float> float_data;
|
||||
std::vector<uint8_t> binary_data;
|
||||
};
|
||||
|
@ -78,10 +78,10 @@ struct VectorRecord {
|
|||
struct VectorQuery {
|
||||
std::string field_name;
|
||||
milvus::json extra_params = {};
|
||||
int64_t topk;
|
||||
int64_t nq;
|
||||
int64_t topk = 0;
|
||||
int64_t nq = 0;
|
||||
std::string metric_type = "";
|
||||
float boost;
|
||||
float boost = 0.0f;
|
||||
VectorRecord query_vector;
|
||||
};
|
||||
using VectorQueryPtr = std::shared_ptr<VectorQuery>;
|
||||
|
|
|
@ -38,6 +38,20 @@ DeletedDocs::GetDeletedDocs() const {
|
|||
// DeletedDocs::GetName() const {
|
||||
// return name_;
|
||||
//}
|
||||
const faiss::ConcurrentBitsetPtr
|
||||
DeletedDocs::GetBlacklist() const {
|
||||
return bitset_;
|
||||
}
|
||||
|
||||
void
|
||||
DeletedDocs::GenBlacklist(size_t size) {
|
||||
if (size > 0) {
|
||||
bitset_ = std::make_shared<faiss::ConcurrentBitset>(size);
|
||||
for (auto& offset : deleted_doc_offsets_) {
|
||||
bitset_->set(offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t
|
||||
DeletedDocs::GetCount() const {
|
||||
|
|
|
@ -23,8 +23,7 @@
|
|||
#include "cache/DataObj.h"
|
||||
#include "db/Types.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace segment {
|
||||
namespace milvus::segment {
|
||||
|
||||
class DeletedDocs : public cache::DataObj {
|
||||
public:
|
||||
|
@ -42,6 +41,12 @@ class DeletedDocs : public cache::DataObj {
|
|||
// const std::string&
|
||||
// GetName() const;
|
||||
|
||||
const faiss::ConcurrentBitsetPtr
|
||||
GetBlacklist() const;
|
||||
|
||||
void
|
||||
GenBlacklist(size_t size);
|
||||
|
||||
size_t
|
||||
GetCount() const;
|
||||
|
||||
|
@ -62,11 +67,10 @@ class DeletedDocs : public cache::DataObj {
|
|||
|
||||
private:
|
||||
std::vector<engine::offset_t> deleted_doc_offsets_;
|
||||
// faiss::ConcurrentBitsetPtr bitset_;
|
||||
faiss::ConcurrentBitsetPtr bitset_;
|
||||
// const std::string name_ = "deleted_docs";
|
||||
};
|
||||
|
||||
using DeletedDocsPtr = std::shared_ptr<DeletedDocs>;
|
||||
|
||||
} // namespace segment
|
||||
} // namespace milvus
|
||||
} // namespace milvus::segment
|
||||
|
|
|
@ -161,6 +161,10 @@ SegmentReader::LoadField(const std::string& field_name, engine::BinaryDataPtr& r
|
|||
}
|
||||
|
||||
auto raw_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW);
|
||||
if (raw_visitor->GetFile() == nullptr) {
|
||||
std::string emsg = "File of field " + field_name + " is not found";
|
||||
return Status(DB_FILE_NOT_FOUND, emsg);
|
||||
}
|
||||
std::string file_path =
|
||||
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, raw_visitor->GetFile());
|
||||
|
||||
|
@ -390,20 +394,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
|
|||
return Status(DB_ERROR, "Field is not vector type");
|
||||
}
|
||||
|
||||
// load deleted doc
|
||||
int64_t row_count = GetRowCount();
|
||||
faiss::ConcurrentBitsetPtr concurrent_bitset_ptr = nullptr;
|
||||
segment::DeletedDocsPtr deleted_docs_ptr;
|
||||
LoadDeletedDocs(deleted_docs_ptr);
|
||||
if (deleted_docs_ptr != nullptr) {
|
||||
auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();
|
||||
if (!deleted_docs.empty()) {
|
||||
concurrent_bitset_ptr = std::make_shared<faiss::ConcurrentBitset>(row_count);
|
||||
for (auto& offset : deleted_docs) {
|
||||
concurrent_bitset_ptr->set(offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
recorder.RecordSection("prepare");
|
||||
|
||||
knowhere::BinarySet index_data;
|
||||
|
@ -448,7 +439,6 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
|
|||
index_ptr->Train(knowhere::DatasetPtr(), conf);
|
||||
index_ptr->AddWithoutIds(dataset, conf);
|
||||
index_ptr->SetUids(uids_ptr);
|
||||
index_ptr->SetBlacklist(concurrent_bitset_ptr);
|
||||
segment_ptr_->SetVectorIndex(field_name, index_ptr);
|
||||
|
||||
cache::CpuCacheMgr::GetInstance().InsertItem(temp_index_path, index_ptr);
|
||||
|
@ -508,7 +498,6 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
|
|||
STATUS_CHECK(LoadUids(*uids_ptr));
|
||||
|
||||
index_ptr->SetUids(uids_ptr);
|
||||
index_ptr->SetBlacklist(concurrent_bitset_ptr);
|
||||
segment_ptr_->SetVectorIndex(field_name, index_ptr);
|
||||
|
||||
cache::CpuCacheMgr::GetInstance().InsertItem(index_file_path, index_ptr); // put into cache
|
||||
|
@ -679,6 +668,11 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
|
|||
if (data_obj == nullptr) {
|
||||
auto& ss_codec = codec::Codec::instance();
|
||||
STATUS_CHECK(ss_codec.GetDeletedDocsFormat()->Read(fs_ptr_, file_path, deleted_docs_ptr));
|
||||
auto id = segment_visitor_->GetSegment()->GetID();
|
||||
auto sc = segment_visitor_->GetSnapshot()->GetSegmentCommitBySegmentId(id);
|
||||
if (sc) {
|
||||
deleted_docs_ptr->GenBlacklist(sc->GetRowCount());
|
||||
}
|
||||
cache::CpuCacheMgr::GetInstance().InsertItem(file_path, deleted_docs_ptr); // put into cache
|
||||
} else {
|
||||
deleted_docs_ptr = std::static_pointer_cast<segment::DeletedDocs>(data_obj);
|
||||
|
|
|
@ -21,7 +21,7 @@ set( STORAGE_FILES ${STORAGE_MAIN_FILES}
|
|||
add_library( storage STATIC )
|
||||
target_sources( storage PRIVATE ${STORAGE_FILES} )
|
||||
|
||||
set ( LINK_LIBRARY fiu log )
|
||||
set ( LINK_LIBRARY stdc++fs fiu log )
|
||||
|
||||
if ( MILVUS_WITH_AWS )
|
||||
list( APPEND LINK_LIBRARY
|
||||
|
|
|
@ -112,6 +112,7 @@ constexpr ErrorCode DB_BLOOM_FILTER_ERROR = ToDbErrorCode(9);
|
|||
constexpr ErrorCode DB_PARTITION_NOT_FOUND = ToDbErrorCode(10);
|
||||
constexpr ErrorCode DB_OUT_OF_STORAGE = ToDbErrorCode(11);
|
||||
constexpr ErrorCode DB_META_QUERY_FAILED = ToDbErrorCode(12);
|
||||
constexpr ErrorCode DB_FILE_NOT_FOUND = ToDbErrorCode(13);
|
||||
|
||||
// knowhere error code
|
||||
constexpr ErrorCode KNOWHERE_ERROR = ToKnowhereErrorCode(1);
|
||||
|
|
|
@ -52,7 +52,7 @@ set( UNITTEST_LIBS sqlite
|
|||
gmock
|
||||
gtest_main
|
||||
gmock_main
|
||||
libstdc++fs.a
|
||||
stdc++fs
|
||||
pthread
|
||||
gfortran
|
||||
opentracing::opentracing
|
||||
|
|
|
@ -24,16 +24,9 @@ set( TEST_FILES ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
|
|||
add_executable( test_db
|
||||
${SCHEDULER_FILES}
|
||||
${TEST_FILES}
|
||||
# ${grpc_server_files}
|
||||
# ${grpc_service_files}
|
||||
# ${web_server_files}
|
||||
# ${server_delivery_files}
|
||||
# ${server_files}
|
||||
# ${server_init_files}
|
||||
)
|
||||
|
||||
target_link_libraries( test_db ${UNITTEST_LIBS}
|
||||
server
|
||||
milvus_engine
|
||||
metrics
|
||||
value
|
||||
|
|
|
@ -13,12 +13,12 @@
|
|||
#include <fiu/fiu-local.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <src/cache/CpuCacheMgr.h>
|
||||
#include <algorithm>
|
||||
#include <experimental/filesystem>
|
||||
#include <set>
|
||||
#include <string>
|
||||
|
||||
#include "cache/CpuCacheMgr.h"
|
||||
#include "db/SnapshotUtils.h"
|
||||
#include "db/SnapshotVisitor.h"
|
||||
#include "db/merge/MergeAdaptiveStrategy.h"
|
||||
|
@ -233,6 +233,7 @@ BuildQueryPtr(const std::string& collection_name, int64_t n, int64_t topk, std::
|
|||
for (int64_t j = 0; j < COLLECTION_DIM; j++) vector_record.float_data[COLLECTION_DIM * i + j] = drand48();
|
||||
vector_record.float_data[COLLECTION_DIM * i] += i / 2000.;
|
||||
}
|
||||
vector_record.vector_count = n;
|
||||
vector_query->query_vector = vector_record;
|
||||
vector_query->metric_type = "L2";
|
||||
vector_query->extra_params = {{"nprobe", 1024}};
|
||||
|
|
|
@ -27,13 +27,9 @@ add_executable(test_storage
|
|||
target_link_libraries(test_storage
|
||||
${UNITTEST_LIBS}
|
||||
stdc++
|
||||
server
|
||||
milvus_engine
|
||||
metrics
|
||||
storage
|
||||
value
|
||||
utils
|
||||
tracing
|
||||
query
|
||||
log
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue