mirror of https://github.com/milvus-io/milvus.git
Caiyd 1655 gpu ivfflat delete (#1767)
* update sdk Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * update segment interfaces Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * update some interfaces Signed-off-by: yudong.cai <yudong.cai@zilliz.com>pull/1773/head
parent
792538734b
commit
207b854c3f
|
@ -644,13 +644,13 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t
|
|||
knowhere::GenDatasetWithIds(Count(), Dimension(), from_index->GetRawVectors(), from_index->GetRawIds());
|
||||
to_index->BuildAll(dataset, conf);
|
||||
uids = from_index->GetUids();
|
||||
from_index->GetBlacklist(blacklist);
|
||||
blacklist = from_index->GetBlacklist();
|
||||
} else if (bin_from_index) {
|
||||
auto dataset = knowhere::GenDatasetWithIds(Count(), Dimension(), bin_from_index->GetRawVectors(),
|
||||
bin_from_index->GetRawIds());
|
||||
to_index->BuildAll(dataset, conf);
|
||||
uids = bin_from_index->GetUids();
|
||||
bin_from_index->GetBlacklist(blacklist);
|
||||
blacklist = bin_from_index->GetBlacklist();
|
||||
}
|
||||
|
||||
#ifdef MILVUS_GPU_VERSION
|
||||
|
|
|
@ -247,9 +247,8 @@ MemTable::ApplyDeletes() {
|
|||
for (auto& file : segment_files) {
|
||||
auto data_obj_ptr = cache::CpuCacheMgr::GetInstance()->GetIndex(file.location_);
|
||||
auto index = std::static_pointer_cast<knowhere::VecIndex>(data_obj_ptr);
|
||||
faiss::ConcurrentBitsetPtr blacklist = nullptr;
|
||||
if (index != nullptr) {
|
||||
index->GetBlacklist(blacklist);
|
||||
faiss::ConcurrentBitsetPtr blacklist = index->GetBlacklist();
|
||||
if (blacklist != nullptr) {
|
||||
indexes.emplace_back(index);
|
||||
blacklists.emplace_back(blacklist);
|
||||
|
|
|
@ -143,8 +143,7 @@ IndexHNSW::Query(const DatasetPtr& dataset_ptr, const Config& config) {
|
|||
using P = std::pair<float, int64_t>;
|
||||
auto compare = [](const P& v1, const P& v2) { return v1.first < v2.first; };
|
||||
|
||||
faiss::ConcurrentBitsetPtr blacklist = nullptr;
|
||||
GetBlacklist(blacklist);
|
||||
faiss::ConcurrentBitsetPtr blacklist = GetBlacklist();
|
||||
#pragma omp parallel for
|
||||
for (unsigned int i = 0; i < rows; ++i) {
|
||||
std::vector<P> ret;
|
||||
|
|
|
@ -78,9 +78,9 @@ class VecIndex : public Index {
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
GetBlacklist(faiss::ConcurrentBitsetPtr& bitset_ptr) {
|
||||
bitset_ptr = bitset_;
|
||||
faiss::ConcurrentBitsetPtr
|
||||
GetBlacklist() {
|
||||
return bitset_;
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
@ -24,13 +24,21 @@ namespace milvus {
|
|||
namespace knowhere {
|
||||
namespace cloner {
|
||||
|
||||
void
|
||||
CopyIndexData(const VecIndexPtr& dst_index, const VecIndexPtr& src_index) {
|
||||
/* do real copy */
|
||||
auto uids = src_index->GetUids();
|
||||
dst_index->SetUids(uids);
|
||||
|
||||
dst_index->SetBlacklist(src_index->GetBlacklist());
|
||||
dst_index->SetIndexSize(src_index->IndexSize());
|
||||
}
|
||||
|
||||
VecIndexPtr
|
||||
CopyGpuToCpu(const VecIndexPtr& index, const Config& config) {
|
||||
if (auto device_index = std::dynamic_pointer_cast<GPUIndex>(index)) {
|
||||
VecIndexPtr result = device_index->CopyGpuToCpu(config);
|
||||
auto uids = index->GetUids();
|
||||
result->SetUids(uids);
|
||||
result->SetIndexSize(index->IndexSize());
|
||||
CopyIndexData(result, index);
|
||||
return result;
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("index type is not gpuindex");
|
||||
|
@ -40,23 +48,11 @@ CopyGpuToCpu(const VecIndexPtr& index, const Config& config) {
|
|||
VecIndexPtr
|
||||
CopyCpuToGpu(const VecIndexPtr& index, const int64_t device_id, const Config& config) {
|
||||
VecIndexPtr result;
|
||||
auto uids = index->GetUids();
|
||||
int64_t index_size = index->IndexSize();
|
||||
if (auto device_index = std::dynamic_pointer_cast<IVFSQHybrid>(index)) {
|
||||
result = device_index->CopyCpuToGpu(device_id, config);
|
||||
result->SetUids(uids);
|
||||
result->SetIndexSize(index_size);
|
||||
return result;
|
||||
}
|
||||
|
||||
if (auto device_index = std::dynamic_pointer_cast<GPUIndex>(index)) {
|
||||
} else if (auto device_index = std::dynamic_pointer_cast<GPUIndex>(index)) {
|
||||
result = device_index->CopyGpuToGpu(device_id, config);
|
||||
result->SetUids(uids);
|
||||
result->SetIndexSize(index_size);
|
||||
return result;
|
||||
}
|
||||
|
||||
if (auto cpu_index = std::dynamic_pointer_cast<IVFSQ>(index)) {
|
||||
} else if (auto cpu_index = std::dynamic_pointer_cast<IVFSQ>(index)) {
|
||||
result = cpu_index->CopyCpuToGpu(device_id, config);
|
||||
} else if (auto cpu_index = std::dynamic_pointer_cast<IVFPQ>(index)) {
|
||||
result = cpu_index->CopyCpuToGpu(device_id, config);
|
||||
|
@ -68,8 +64,7 @@ CopyCpuToGpu(const VecIndexPtr& index, const int64_t device_id, const Config& co
|
|||
KNOWHERE_THROW_MSG("this index type not support transfer to gpu");
|
||||
}
|
||||
|
||||
result->SetUids(uids);
|
||||
result->SetIndexSize(index_size);
|
||||
CopyIndexData(result, index);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
|
||||
#include <faiss/Index.h>
|
||||
#include <faiss/gpu/utils/MemorySpace.h>
|
||||
#include <faiss/utils/ConcurrentBitset.h>
|
||||
|
||||
namespace faiss { namespace gpu {
|
||||
|
||||
|
@ -124,7 +125,7 @@ private:
|
|||
int k,
|
||||
float* outDistancesData,
|
||||
Index::idx_t* outIndicesData,
|
||||
ConcurrentBitsetPtr bitset = nullptr) const;
|
||||
ConcurrentBitsetPtr bitset = nullptr) const;
|
||||
|
||||
/// Calls searchImpl_ for a single page of GPU-resident data,
|
||||
/// handling paging of the data and copies from the CPU
|
||||
|
|
|
@ -221,10 +221,9 @@ GpuIndexFlat::searchImpl_(int n,
|
|||
auto bitsetDevice = toDevice<uint8_t, 1>(resources_, device_, nullptr, stream, {0});
|
||||
data_->query(queries, bitsetDevice, k, outDistances, outIntLabels, true);
|
||||
} else {
|
||||
auto bitsetData = bitset->bitset();
|
||||
auto bitsetDevice = toDevice<uint8_t, 1>(resources_, device_,
|
||||
const_cast<uint8_t*>(bitsetData), stream,
|
||||
{(int) bitset->size()});
|
||||
const_cast<uint8_t*>(bitset->data()), stream,
|
||||
{(int) bitset->size()});
|
||||
data_->query(queries, bitsetDevice, k, outDistances, outIntLabels, true);
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
|
||||
#include <vector>
|
||||
#include <faiss/gpu/GpuIndex.h>
|
||||
#include <faiss/utils/ConcurrentBitset.h>
|
||||
|
||||
namespace faiss {
|
||||
|
||||
|
|
|
@ -245,9 +245,15 @@ GpuIndexIVFFlat::searchImpl_(int n,
|
|||
static_assert(sizeof(long) == sizeof(Index::idx_t), "size mismatch");
|
||||
Tensor<long, 2, true> outLabels(const_cast<long*>(labels), {n, k});
|
||||
|
||||
auto bitsetDevice = toDevice<uint8_t, 1>(resources_, device_, nullptr, stream, {0});
|
||||
|
||||
index_->query(queries, bitsetDevice, nprobe, k, outDistances, outLabels);
|
||||
if (!bitset) {
|
||||
auto bitsetDevice = toDevice<uint8_t, 1>(resources_, device_, nullptr, stream, {0});
|
||||
index_->query(queries, bitsetDevice, nprobe, k, outDistances, outLabels);
|
||||
} else {
|
||||
auto bitsetDevice = toDevice<uint8_t, 1>(resources_, device_,
|
||||
const_cast<uint8_t*>(bitset->data()), stream,
|
||||
{(int) bitset->size()});
|
||||
index_->query(queries, bitsetDevice, nprobe, k, outDistances, outLabels);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#pragma once
|
||||
|
||||
#include <faiss/gpu/GpuIndexIVF.h>
|
||||
#include <faiss/utils/ConcurrentBitset.h>
|
||||
|
||||
namespace faiss { struct IndexIVFFlat; }
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#pragma once
|
||||
|
||||
#include <faiss/gpu/GpuIndexIVF.h>
|
||||
#include <faiss/utils/ConcurrentBitset.h>
|
||||
#include <vector>
|
||||
|
||||
namespace faiss { struct IndexIVFPQ; }
|
||||
|
|
|
@ -48,7 +48,7 @@ ConcurrentBitset::size() {
|
|||
}
|
||||
|
||||
const uint8_t*
|
||||
ConcurrentBitset::bitset() {
|
||||
ConcurrentBitset::data() {
|
||||
return reinterpret_cast<const uint8_t*>(bitset_.data());
|
||||
}
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ class ConcurrentBitset {
|
|||
size();
|
||||
|
||||
const uint8_t*
|
||||
bitset();
|
||||
data();
|
||||
|
||||
private:
|
||||
size_t capacity_;
|
||||
|
|
|
@ -247,13 +247,14 @@ SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) {
|
|||
size_t
|
||||
SegmentWriter::Size() {
|
||||
// TODO(zhiru): switch to actual directory size
|
||||
size_t ret = segment_ptr_->vectors_ptr_->Size();
|
||||
size_t vectors_size = segment_ptr_->vectors_ptr_->VectorsSize();
|
||||
size_t uids_size = segment_ptr_->vectors_ptr_->UidsSize();
|
||||
/*
|
||||
if (segment_ptr_->id_bloom_filter_ptr_) {
|
||||
ret += segment_ptr_->id_bloom_filter_ptr_->Size();
|
||||
}
|
||||
*/
|
||||
return ret;
|
||||
return (vectors_size * sizeof(uint8_t) + uids_size * sizeof(doc_id_t));
|
||||
}
|
||||
|
||||
size_t
|
||||
|
|
|
@ -141,8 +141,13 @@ Vectors::GetCodeLength() const {
|
|||
}
|
||||
|
||||
size_t
|
||||
Vectors::Size() {
|
||||
return data_.size() + uids_.size() * sizeof(doc_id_t);
|
||||
Vectors::VectorsSize() {
|
||||
return data_.size();
|
||||
}
|
||||
|
||||
size_t
|
||||
Vectors::UidsSize() {
|
||||
return uids_.size();
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
@ -63,7 +63,10 @@ class Vectors {
|
|||
Erase(std::vector<int32_t>& offsets);
|
||||
|
||||
size_t
|
||||
Size();
|
||||
VectorsSize();
|
||||
|
||||
size_t
|
||||
UidsSize();
|
||||
|
||||
void
|
||||
Clear();
|
||||
|
|
|
@ -30,7 +30,7 @@ constexpr int64_t BATCH_ENTITY_COUNT = 100000;
|
|||
constexpr int64_t NQ = 5;
|
||||
constexpr int64_t TOP_K = 10;
|
||||
constexpr int64_t NPROBE = 32;
|
||||
constexpr int64_t SEARCH_TARGET = 5000; // change this value, result is different
|
||||
constexpr int64_t SEARCH_TARGET = BATCH_ENTITY_COUNT / 2; // change this value, result is different
|
||||
constexpr int64_t ADD_ENTITY_LOOP = 5;
|
||||
constexpr milvus::IndexType INDEX_TYPE = milvus::IndexType::IVFSQ8;
|
||||
constexpr int32_t NLIST = 16384;
|
||||
|
@ -180,10 +180,18 @@ ClientTest::CreateIndex(const std::string& collection_name, milvus::IndexType ty
|
|||
|
||||
void
|
||||
ClientTest::PreloadCollection(const std::string& collection_name) {
|
||||
milvus_sdk::TimeRecorder rc("Preload");
|
||||
milvus::Status stat = conn_->PreloadCollection(collection_name);
|
||||
std::cout << "PreloadCollection function call status: " << stat.message() << std::endl;
|
||||
}
|
||||
|
||||
void
|
||||
ClientTest::CompactCollection(const std::string& collection_name) {
|
||||
milvus_sdk::TimeRecorder rc("Compact");
|
||||
milvus::Status stat = conn_->CompactCollection(collection_name);
|
||||
std::cout << "CompactCollection function call status: " << stat.message() << std::endl;
|
||||
}
|
||||
|
||||
void
|
||||
ClientTest::DeleteByIds(const std::string& collection_name, const std::vector<int64_t>& id_array) {
|
||||
std::cout << "Delete entity: ";
|
||||
|
@ -200,13 +208,6 @@ ClientTest::DeleteByIds(const std::string& collection_name, const std::vector<in
|
|||
stat = conn_->FlushCollection(collection_name);
|
||||
std::cout << "FlushCollection function call status: " << stat.message() << std::endl;
|
||||
}
|
||||
|
||||
{
|
||||
// compact table
|
||||
milvus_sdk::TimeRecorder rc1("Compact");
|
||||
stat = conn_->CompactCollection(collection_name);
|
||||
std::cout << "CompactCollection function call status: " << stat.message() << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -255,6 +256,7 @@ ClientTest::Test() {
|
|||
|
||||
std::vector<int64_t> delete_ids = {search_id_array_[0], search_id_array_[1]};
|
||||
DeleteByIds(collection_name, delete_ids);
|
||||
CompactCollection(collection_name);
|
||||
SearchEntities(collection_name, TOP_K, NPROBE); // this line get two search error since we delete two entities
|
||||
|
||||
DropIndex(collection_name);
|
||||
|
|
|
@ -66,6 +66,9 @@ class ClientTest {
|
|||
void
|
||||
PreloadCollection(const std::string&);
|
||||
|
||||
void
|
||||
CompactCollection(const std::string&);
|
||||
|
||||
void
|
||||
DeleteByIds(const std::string&, const std::vector<int64_t>& id_array);
|
||||
|
||||
|
|
Loading…
Reference in New Issue