mirror of https://github.com/milvus-io/milvus.git
set blacklist a NULL pointer if no item deleted (#3827)
* set blacklist a nullptr is no item deleted Signed-off-by: shengjun.li <shengjun.li@zilliz.com> * fix multi thread access bitset Signed-off-by: shengjun.li <shengjun.li@zilliz.com>pull/3915/head
parent
50d5b9f179
commit
7692a6d437
12
CHANGELOG.md
12
CHANGELOG.md
|
@ -2,7 +2,17 @@
|
|||
|
||||
Please mark all change in change log and use the issue from GitHub
|
||||
|
||||
# Milvus 0.10.3 (TBD)
|
||||
# Milvus 0.10.4 (TBD)
|
||||
## Bug
|
||||
|
||||
## Feature
|
||||
|
||||
## Improvement
|
||||
- \#3775 Improve search performance in the case that no item deleted
|
||||
|
||||
## Task
|
||||
|
||||
# Milvus 0.10.3 (2020-9-21)
|
||||
## Bug
|
||||
- \#3536 Release search task in time to avoid excessive memory usage
|
||||
- \#3656 Fix to check search params 'nprobe' of BIN_IVF_FLAT
|
||||
|
|
|
@ -568,6 +568,10 @@ DBImpl::ReLoadSegmentsDeletedDocs(const std::string& collection_id, const std::v
|
|||
segment::DeletedDocsPtr delete_docs = std::make_shared<segment::DeletedDocs>();
|
||||
segment_reader.LoadDeletedDocs(delete_docs);
|
||||
auto& docs_offsets = delete_docs->GetDeletedDocs();
|
||||
if (docs_offsets.empty()) {
|
||||
LOG_ENGINE_DEBUG_ << "delete_docs is empty";
|
||||
continue;
|
||||
}
|
||||
|
||||
faiss::ConcurrentBitsetPtr blacklist = index->GetBlacklist();
|
||||
if (nullptr == blacklist) {
|
||||
|
@ -579,9 +583,7 @@ DBImpl::ReLoadSegmentsDeletedDocs(const std::string& collection_id, const std::v
|
|||
}
|
||||
|
||||
for (auto& i : docs_offsets) {
|
||||
if (!blacklist->test(i)) {
|
||||
blacklist->set(i);
|
||||
}
|
||||
blacklist->set(i);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -115,12 +115,12 @@ class ExecutionEngine {
|
|||
|
||||
virtual Status
|
||||
GetVectorByID(const int64_t id, uint8_t* vector, bool hybrid) = 0;
|
||||
#endif
|
||||
|
||||
virtual Status
|
||||
ExecBinaryQuery(query::GeneralQueryPtr general_query, faiss::ConcurrentBitsetPtr bitset,
|
||||
std::unordered_map<std::string, DataType>& attr_type, uint64_t& nq, uint64_t& topk,
|
||||
std::vector<float>& distances, std::vector<int64_t>& labels) = 0;
|
||||
#endif
|
||||
|
||||
virtual Status
|
||||
Search(int64_t n, const float* data, int64_t k, const milvus::json& extra_params, float* distances, int64_t* labels,
|
||||
|
|
|
@ -438,9 +438,12 @@ ExecutionEngineImpl::Load(bool to_cache) {
|
|||
|
||||
vector_count_ = count;
|
||||
|
||||
faiss::ConcurrentBitsetPtr concurrent_bitset_ptr = std::make_shared<faiss::ConcurrentBitset>(count);
|
||||
for (auto& offset : deleted_docs) {
|
||||
concurrent_bitset_ptr->set(offset);
|
||||
faiss::ConcurrentBitsetPtr concurrent_bitset_ptr = nullptr;
|
||||
if (!deleted_docs.empty()) {
|
||||
concurrent_bitset_ptr = std::make_shared<faiss::ConcurrentBitset>(count);
|
||||
for (auto& offset : deleted_docs) {
|
||||
concurrent_bitset_ptr->set(offset);
|
||||
}
|
||||
}
|
||||
|
||||
auto dataset = knowhere::GenDataset(count, this->dim_, vectors_data.data());
|
||||
|
@ -488,11 +491,13 @@ ExecutionEngineImpl::Load(bool to_cache) {
|
|||
}
|
||||
auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();
|
||||
|
||||
faiss::ConcurrentBitsetPtr concurrent_bitset_ptr =
|
||||
std::make_shared<faiss::ConcurrentBitset>(index_->Count());
|
||||
for (auto& offset : deleted_docs) {
|
||||
if (!concurrent_bitset_ptr->test(offset)) {
|
||||
concurrent_bitset_ptr->set(offset);
|
||||
faiss::ConcurrentBitsetPtr concurrent_bitset_ptr = nullptr;
|
||||
if (!deleted_docs.empty()) {
|
||||
concurrent_bitset_ptr = std::make_shared<faiss::ConcurrentBitset>(index_->Count());
|
||||
for (auto& offset : deleted_docs) {
|
||||
if (!concurrent_bitset_ptr->test(offset)) {
|
||||
concurrent_bitset_ptr->set(offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -754,6 +759,7 @@ MapAndCopyResult(const knowhere::DatasetPtr& dataset, const std::vector<milvus::
|
|||
free(res_dist);
|
||||
}
|
||||
|
||||
#if 0
|
||||
template <typename T>
|
||||
void
|
||||
ProcessRangeQuery(std::vector<T> data, T value, query::CompareOperator type, faiss::ConcurrentBitsetPtr& bitset) {
|
||||
|
@ -1106,62 +1112,11 @@ ExecutionEngineImpl::ExecBinaryQuery(milvus::query::GeneralQueryPtr general_quer
|
|||
}
|
||||
return Status::OK();
|
||||
}
|
||||
#endif
|
||||
|
||||
Status
|
||||
ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, const milvus::json& extra_params, float* distances,
|
||||
int64_t* labels, bool hybrid) {
|
||||
#if 0
|
||||
if (index_type_ == EngineType::FAISS_IVFSQ8H) {
|
||||
if (!hybrid) {
|
||||
const std::string key = location_ + ".quantizer";
|
||||
std::vector<uint64_t> gpus = scheduler::get_gpu_pool();
|
||||
|
||||
const int64_t NOT_FOUND = -1;
|
||||
int64_t device_id = NOT_FOUND;
|
||||
|
||||
// cache hit
|
||||
{
|
||||
knowhere::QuantizerPtr quantizer = nullptr;
|
||||
|
||||
for (auto& gpu : gpus) {
|
||||
auto cache = cache::GpuCacheMgr::GetInstance(gpu);
|
||||
if (auto cached_quantizer = cache->GetIndex(key)) {
|
||||
device_id = gpu;
|
||||
quantizer = std::static_pointer_cast<CachedQuantizer>(cached_quantizer)->Data();
|
||||
}
|
||||
}
|
||||
|
||||
if (device_id != NOT_FOUND) {
|
||||
// cache hit
|
||||
milvus::json quantizer_conf{{knowhere::meta::DEVICEID : device_id}, {"mode" : 2}};
|
||||
auto new_index = index_->LoadData(quantizer, config);
|
||||
index_ = new_index;
|
||||
}
|
||||
}
|
||||
|
||||
if (device_id == NOT_FOUND) {
|
||||
// cache miss
|
||||
std::vector<int64_t> all_free_mem;
|
||||
for (auto& gpu : gpus) {
|
||||
auto cache = cache::GpuCacheMgr::GetInstance(gpu);
|
||||
auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
|
||||
all_free_mem.push_back(free_mem);
|
||||
}
|
||||
|
||||
auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
|
||||
auto best_index = std::distance(all_free_mem.begin(), max_e);
|
||||
device_id = gpus[best_index];
|
||||
|
||||
auto pair = index_->CopyToGpuWithQuantizer(device_id);
|
||||
index_ = pair.first;
|
||||
|
||||
// cache
|
||||
auto cached_quantizer = std::make_shared<CachedQuantizer>(pair.second);
|
||||
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
TimeRecorder rc(LogOut("[%s][%ld] ExecutionEngineImpl::Search float", "search", 0));
|
||||
|
||||
if (index_ == nullptr) {
|
||||
|
|
|
@ -69,12 +69,12 @@ class ExecutionEngineImpl : public ExecutionEngine {
|
|||
|
||||
Status
|
||||
GetVectorByID(const int64_t id, uint8_t* vector, bool hybrid) override;
|
||||
#endif
|
||||
|
||||
Status
|
||||
ExecBinaryQuery(query::GeneralQueryPtr general_query, faiss::ConcurrentBitsetPtr bitset,
|
||||
std::unordered_map<std::string, DataType>& attr_type, uint64_t& nq, uint64_t& topk,
|
||||
std::vector<float>& distances, std::vector<int64_t>& labels) override;
|
||||
#endif
|
||||
|
||||
Status
|
||||
Search(int64_t n, const float* data, int64_t k, const milvus::json& extra_params, float* distances, int64_t* labels,
|
||||
|
|
|
@ -278,9 +278,15 @@ MemTable::ApplyDeletes() {
|
|||
auto index = std::static_pointer_cast<knowhere::VecIndex>(data_obj_ptr);
|
||||
if (index != nullptr) {
|
||||
faiss::ConcurrentBitsetPtr blacklist = index->GetBlacklist();
|
||||
if (blacklist != nullptr) {
|
||||
if (blacklist == nullptr) {
|
||||
// to update and set the blacklist
|
||||
blacklist = std::make_shared<faiss::ConcurrentBitset>(index->Count());
|
||||
indexes.emplace_back(index);
|
||||
blacklists.emplace_back(blacklist);
|
||||
} else {
|
||||
// just to update the blacklist
|
||||
indexes.emplace_back(nullptr);
|
||||
blacklists.emplace_back(blacklist);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -306,7 +312,6 @@ MemTable::ApplyDeletes() {
|
|||
|
||||
rec.RecordSection("Sorting " + std::to_string(ids_to_check.size()) + " ids");
|
||||
|
||||
size_t delete_count = 0;
|
||||
auto find_diff = std::chrono::duration<double>::zero();
|
||||
auto set_diff = std::chrono::duration<double>::zero();
|
||||
|
||||
|
@ -321,18 +326,11 @@ MemTable::ApplyDeletes() {
|
|||
if (found) {
|
||||
auto set_start = std::chrono::high_resolution_clock::now();
|
||||
|
||||
delete_count++;
|
||||
|
||||
deleted_docs->AddDeletedDoc(i);
|
||||
|
||||
if (id_bloom_filter_ptr->Check(uids[i])) {
|
||||
id_bloom_filter_ptr->Remove(uids[i]);
|
||||
}
|
||||
id_bloom_filter_ptr->Remove(uids[i]);
|
||||
|
||||
for (auto& blacklist : blacklists) {
|
||||
if (!blacklist->test(i)) {
|
||||
blacklist->set(i);
|
||||
}
|
||||
blacklist->set(i);
|
||||
}
|
||||
|
||||
auto set_end = std::chrono::high_resolution_clock::now();
|
||||
|
@ -346,8 +344,15 @@ MemTable::ApplyDeletes() {
|
|||
|
||||
rec.RecordSection("Find uids and set deleted docs and bloom filter");
|
||||
|
||||
if (deleted_docs->GetSize() == 0) {
|
||||
LOG_ENGINE_DEBUG_ << "deleted_docs does not need to be updated";
|
||||
continue;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < indexes.size(); ++i) {
|
||||
indexes[i]->SetBlacklist(blacklists[i]);
|
||||
if (indexes[i]) {
|
||||
indexes[i]->SetBlacklist(blacklists[i]);
|
||||
}
|
||||
}
|
||||
|
||||
segment::Segment tmp_segment;
|
||||
|
@ -372,7 +377,7 @@ MemTable::ApplyDeletes() {
|
|||
segment_file.file_type_ == meta::SegmentSchema::TO_INDEX ||
|
||||
segment_file.file_type_ == meta::SegmentSchema::INDEX ||
|
||||
segment_file.file_type_ == meta::SegmentSchema::BACKUP) {
|
||||
segment_file.row_count_ -= delete_count;
|
||||
segment_file.row_count_ -= deleted_docs->GetSize();
|
||||
files_to_update.emplace_back(segment_file);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -145,7 +145,7 @@ BinaryIDMAP::QueryImpl(int64_t n, const uint8_t* data, int64_t k, float* distanc
|
|||
flat_index->metric_type = GetMetricType(config[Metric::TYPE].get<std::string>());
|
||||
|
||||
int32_t* i_distances = reinterpret_cast<int32_t*>(distances);
|
||||
index_->search(n, (uint8_t*)data, k, i_distances, labels, bitset_);
|
||||
flat_index->search(n, (uint8_t*)data, k, i_distances, labels, GetBlacklist());
|
||||
|
||||
// if hamming, it need transform int32 to float
|
||||
if (flat_index->metric_type == faiss::METRIC_Hamming) {
|
||||
|
|
|
@ -208,7 +208,7 @@ BinaryIVF::QueryImpl(int64_t n, const uint8_t* data, int64_t k, float* distances
|
|||
|
||||
stdclock::time_point before = stdclock::now();
|
||||
int32_t* i_distances = reinterpret_cast<int32_t*>(distances);
|
||||
index_->search(n, (uint8_t*)data, k, i_distances, labels, bitset_);
|
||||
index_->search(n, (uint8_t*)data, k, i_distances, labels, GetBlacklist());
|
||||
|
||||
stdclock::time_point after = stdclock::now();
|
||||
double search_cost = (std::chrono::duration<double, std::micro>(after - before)).count();
|
||||
|
|
|
@ -225,7 +225,7 @@ IDMAP::QueryImpl(int64_t n, const float* data, int64_t k, float* distances, int6
|
|||
auto default_type = flat_index->metric_type;
|
||||
if (config.contains(Metric::TYPE))
|
||||
flat_index->metric_type = GetMetricType(config[Metric::TYPE].get<std::string>());
|
||||
index_->search(n, (float*)data, k, distances, labels, bitset_);
|
||||
flat_index->search(n, (float*)data, k, distances, labels, GetBlacklist());
|
||||
flat_index->metric_type = default_type;
|
||||
}
|
||||
|
||||
|
|
|
@ -329,7 +329,7 @@ IVF::QueryImpl(int64_t n, const float* data, int64_t k, float* distances, int64_
|
|||
} else {
|
||||
ivf_index->parallel_mode = 0;
|
||||
}
|
||||
ivf_index->search(n, (float*)data, k, distances, labels, bitset_);
|
||||
ivf_index->search(n, (float*)data, k, distances, labels, GetBlacklist());
|
||||
stdclock::time_point after = stdclock::now();
|
||||
double search_cost = (std::chrono::duration<double, std::micro>(after - before)).count();
|
||||
LOG_KNOWHERE_DEBUG_ << "IVF search cost: " << search_cost
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
#include <faiss/utils/ConcurrentBitset.h>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
|
@ -83,11 +84,13 @@ class VecIndex : public Index {
|
|||
|
||||
faiss::ConcurrentBitsetPtr
|
||||
GetBlacklist() {
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
return bitset_;
|
||||
}
|
||||
|
||||
void
|
||||
SetBlacklist(faiss::ConcurrentBitsetPtr bitset_ptr) {
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
bitset_ = std::move(bitset_ptr);
|
||||
}
|
||||
|
||||
|
@ -104,11 +107,8 @@ class VecIndex : public Index {
|
|||
|
||||
size_t
|
||||
BlacklistSize() {
|
||||
if (bitset_) {
|
||||
return bitset_->size() * sizeof(uint8_t);
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
return bitset_ ? bitset_->size() : 0;
|
||||
}
|
||||
|
||||
size_t
|
||||
|
@ -141,9 +141,13 @@ class VecIndex : public Index {
|
|||
protected:
|
||||
IndexType index_type_ = "";
|
||||
IndexMode index_mode_ = IndexMode::MODE_CPU;
|
||||
faiss::ConcurrentBitsetPtr bitset_ = nullptr;
|
||||
std::vector<IDType> uids_;
|
||||
int64_t index_size_ = -1;
|
||||
|
||||
private:
|
||||
// multi thread may access bitset_
|
||||
std::mutex mutex_;
|
||||
faiss::ConcurrentBitsetPtr bitset_ = nullptr;
|
||||
};
|
||||
|
||||
using VecIndexPtr = std::shared_ptr<VecIndex>;
|
||||
|
|
|
@ -111,7 +111,7 @@ GPUIDMAP::QueryImpl(int64_t n, const float* data, int64_t k, float* distances, i
|
|||
auto default_type = flat_index->metric_type;
|
||||
if (config.contains(Metric::TYPE))
|
||||
flat_index->metric_type = GetMetricType(config[Metric::TYPE].get<std::string>());
|
||||
index_->search(n, (float*)data, k, distances, labels, bitset_);
|
||||
flat_index->search(n, (float*)data, k, distances, labels, GetBlacklist());
|
||||
flat_index->metric_type = default_type;
|
||||
}
|
||||
|
||||
|
|
|
@ -149,7 +149,8 @@ GPUIVF::QueryImpl(int64_t n, const float* data, int64_t k, float* distances, int
|
|||
int64_t dim = device_index->d;
|
||||
for (int64_t i = 0; i < n; i += block_size) {
|
||||
int64_t search_size = (n - i > block_size) ? block_size : (n - i);
|
||||
device_index->search(search_size, (float*)data + i * dim, k, distances + i * k, labels + i * k, bitset_);
|
||||
device_index->search(search_size, (float*)data + i * dim, k, distances + i * k, labels + i * k,
|
||||
GetBlacklist());
|
||||
}
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("Not a GpuIndexIVF type.");
|
||||
|
|
|
@ -281,6 +281,7 @@ XSearchTask::Execute() {
|
|||
// step 2: search
|
||||
bool hybrid = std::dynamic_pointer_cast<SpecResLabel>(label_)->IsHybrid();
|
||||
Status s;
|
||||
#if 0
|
||||
if (general_query != nullptr) {
|
||||
std::unordered_map<std::string, engine::DataType> types;
|
||||
auto attr_type = search_job->attr_type();
|
||||
|
@ -313,6 +314,7 @@ XSearchTask::Execute() {
|
|||
search_job->SearchDone(index_id_);
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
if (!vectors.float_data_.empty()) {
|
||||
s = index_engine_->Search(nq, vectors.float_data_.data(), topk, extra_params, output_distance.data(),
|
||||
output_ids.data(), hybrid);
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#if 0
|
||||
#include <fiu-control.h>
|
||||
#include <fiu-local.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
@ -477,3 +478,4 @@ TEST_F(DBTest, HYBRID_INVALID_TEST) {
|
|||
ASSERT_FALSE(stat.ok());
|
||||
fiu_disable("read_attrs_internal_open_file_fail");
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1193,6 +1193,7 @@ TEST_F(RpcHandlerTest, CMD_TEST) {
|
|||
handler->Cmd(&context, &command, &reply);
|
||||
}
|
||||
|
||||
#if 0
|
||||
TEST_F(RpcHandlerTest, HYBRID_TEST) {
|
||||
::grpc::ServerContext context;
|
||||
milvus::grpc::Mapping mapping;
|
||||
|
@ -1447,6 +1448,7 @@ TEST_F(RpcHandlerTest, HYBRID_INVALID_TEST) {
|
|||
handler->HybridSearch(&context, &search_param, &topk_query_result);
|
||||
fiu_disable("SearchRequest.OnExecute.throw_std_exception");
|
||||
}
|
||||
#endif
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
namespace {
|
||||
|
|
|
@ -561,6 +561,7 @@ TEST_F(WebControllerTest, CREATE_COLLECTION) {
|
|||
ASSERT_EQ(OStatus::CODE_400.code, response->getStatusCode());
|
||||
}
|
||||
|
||||
#if 0
|
||||
TEST_F(WebControllerTest, HYBRID_TEST) {
|
||||
nlohmann::json create_json;
|
||||
create_json["collection_name"] = "test_hybrid";
|
||||
|
@ -646,6 +647,7 @@ TEST_F(WebControllerTest, HYBRID_TEST) {
|
|||
ASSERT_TRUE(result0_json.is_array());
|
||||
ASSERT_EQ(topk, result0_json.size());
|
||||
}
|
||||
#endif
|
||||
|
||||
TEST_F(WebControllerTest, GET_COLLECTION_META) {
|
||||
OString collection_name = "web_test_create_collection" + OString(RandomName().c_str());
|
||||
|
|
Loading…
Reference in New Issue