mirror of https://github.com/milvus-io/milvus.git
remove useless mutex in knowhere indexes and refactor BuildAll interf… (#4487)
* remove useless mutex in knowhere indexes Signed-off-by: cmli <chengming.li@zilliz.com> * refactor BuildAll interface 4 NSG and SPTAG Signed-off-by: shengjun.li <shengjun.li@zilliz.com> * fix ut Signed-off-by: shengjun.li <shengjun.li@zilliz.com> Co-authored-by: cmli <chengming.li@zilliz.com> Co-authored-by: shengjun.li <shengjun.li@zilliz.com>pull/4493/head
parent
607ec31fbc
commit
646baed233
|
@ -12,7 +12,6 @@
|
|||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
||||
#include "annoy/src/annoylib.h"
|
||||
#include "annoy/src/kissrandom.h"
|
||||
|
|
|
@ -28,13 +28,11 @@ BinaryIDMAP::Serialize(const Config& config) {
|
|||
KNOWHERE_THROW_MSG("index not initialize");
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
return SerializeImpl(index_type_);
|
||||
}
|
||||
|
||||
void
|
||||
BinaryIDMAP::Load(const BinarySet& index_binary) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
LoadImpl(index_binary, index_type_);
|
||||
}
|
||||
|
||||
|
@ -82,7 +80,6 @@ BinaryIDMAP::AddWithoutIds(const DatasetPtr& dataset_ptr, const Config& config)
|
|||
KNOWHERE_THROW_MSG("index not initialize");
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
GETTENSOR(dataset_ptr)
|
||||
|
||||
index_->add(rows, (uint8_t*)p_data);
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
|
@ -64,9 +63,6 @@ class BinaryIDMAP : public VecIndex, public FaissBaseBinaryIndex {
|
|||
protected:
|
||||
virtual void
|
||||
QueryImpl(int64_t n, const uint8_t* data, int64_t k, float* distances, int64_t* labels, const Config& config);
|
||||
|
||||
protected:
|
||||
std::mutex mutex_;
|
||||
};
|
||||
|
||||
using BinaryIDMAPPtr = std::shared_ptr<BinaryIDMAP>;
|
||||
|
|
|
@ -32,13 +32,11 @@ BinaryIVF::Serialize(const Config& config) {
|
|||
KNOWHERE_THROW_MSG("index not initialize or trained");
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
return SerializeImpl(index_type_);
|
||||
}
|
||||
|
||||
void
|
||||
BinaryIVF::Load(const BinarySet& index_binary) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
LoadImpl(index_binary, index_type_);
|
||||
}
|
||||
|
||||
|
@ -113,10 +111,19 @@ BinaryIVF::Train(const DatasetPtr& dataset_ptr, const Config& config) {
|
|||
auto index = std::make_shared<faiss::IndexBinaryIVF>(coarse_quantizer, dim, nlist, metric_type);
|
||||
index->own_fields = true;
|
||||
index->train(rows, static_cast<const uint8_t*>(p_data));
|
||||
index->add(rows, static_cast<const uint8_t*>(p_data));
|
||||
index_ = index;
|
||||
}
|
||||
|
||||
void
|
||||
BinaryIVF::AddWithoutIds(const milvus::knowhere::DatasetPtr& dataset_ptr, const milvus::knowhere::Config& config) {
|
||||
if (!index_ || !index_->is_trained) {
|
||||
KNOWHERE_THROW_MSG("index not initialize or trained");
|
||||
}
|
||||
|
||||
GETTENSOR(dataset_ptr)
|
||||
index_->add(rows, static_cast<const uint8_t*>(p_data));
|
||||
}
|
||||
|
||||
#if 0
|
||||
DatasetPtr
|
||||
BinaryIVF::GetVectorById(const DatasetPtr& dataset_ptr, const Config& config) {
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
|
@ -38,11 +37,6 @@ class BinaryIVF : public VecIndex, public FaissBaseBinaryIndex {
|
|||
BinarySet
|
||||
Serialize(const Config& config = Config()) override;
|
||||
|
||||
void
|
||||
BuildAll(const DatasetPtr& dataset_ptr, const Config& config) override {
|
||||
Train(dataset_ptr, config);
|
||||
}
|
||||
|
||||
void
|
||||
Load(const BinarySet& index_binary) override;
|
||||
|
||||
|
@ -50,9 +44,7 @@ class BinaryIVF : public VecIndex, public FaissBaseBinaryIndex {
|
|||
Train(const DatasetPtr& dataset_ptr, const Config& config) override;
|
||||
|
||||
void
|
||||
AddWithoutIds(const DatasetPtr&, const Config&) override {
|
||||
KNOWHERE_THROW_MSG("AddWithoutIds is not supported");
|
||||
}
|
||||
AddWithoutIds(const DatasetPtr&, const Config&) override;
|
||||
|
||||
DatasetPtr
|
||||
Query(const DatasetPtr& dataset_ptr, const Config& config) override;
|
||||
|
@ -72,9 +64,6 @@ class BinaryIVF : public VecIndex, public FaissBaseBinaryIndex {
|
|||
|
||||
virtual void
|
||||
QueryImpl(int64_t n, const uint8_t* data, int64_t k, float* distances, int64_t* labels, const Config& config);
|
||||
|
||||
protected:
|
||||
std::mutex mutex_;
|
||||
};
|
||||
|
||||
using BinaryIVFIndexPtr = std::shared_ptr<BinaryIVF>;
|
||||
|
|
|
@ -100,8 +100,6 @@ IndexHNSW::AddWithoutIds(const DatasetPtr& dataset_ptr, const Config& config) {
|
|||
KNOWHERE_THROW_MSG("index not initialize");
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
GETTENSOR(dataset_ptr)
|
||||
|
||||
index_->addPoint(p_data, 0);
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
||||
#include "hnswlib/hnswlib.h"
|
||||
|
||||
|
@ -54,7 +53,6 @@ class IndexHNSW : public VecIndex {
|
|||
|
||||
private:
|
||||
bool normalize = false;
|
||||
std::mutex mutex_;
|
||||
std::shared_ptr<hnswlib::HierarchicalNSW<float>> index_;
|
||||
};
|
||||
|
||||
|
|
|
@ -41,13 +41,11 @@ IDMAP::Serialize(const Config& config) {
|
|||
KNOWHERE_THROW_MSG("index not initialize");
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
return SerializeImpl(index_type_);
|
||||
}
|
||||
|
||||
void
|
||||
IDMAP::Load(const BinarySet& binary_set) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
LoadImpl(binary_set, index_type_);
|
||||
}
|
||||
|
||||
|
@ -65,7 +63,6 @@ IDMAP::AddWithoutIds(const DatasetPtr& dataset_ptr, const Config& config) {
|
|||
KNOWHERE_THROW_MSG("index not initialize");
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
GETTENSOR(dataset_ptr)
|
||||
index_->add(rows, (float*)p_data);
|
||||
}
|
||||
|
|
|
@ -65,9 +65,6 @@ class IDMAP : public VecIndex, public FaissBaseIndex {
|
|||
protected:
|
||||
virtual void
|
||||
QueryImpl(int64_t, const float*, int64_t, float*, int64_t*, const Config&);
|
||||
|
||||
protected:
|
||||
std::mutex mutex_;
|
||||
};
|
||||
|
||||
using IDMAPPtr = std::shared_ptr<IDMAP>;
|
||||
|
|
|
@ -53,13 +53,11 @@ IVF::Serialize(const Config& config) {
|
|||
KNOWHERE_THROW_MSG("index not initialize or trained");
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
return SerializeImpl(index_type_);
|
||||
}
|
||||
|
||||
void
|
||||
IVF::Load(const BinarySet& binary_set) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
LoadImpl(binary_set, index_type_);
|
||||
}
|
||||
|
||||
|
@ -82,7 +80,6 @@ IVF::AddWithoutIds(const DatasetPtr& dataset_ptr, const Config& config) {
|
|||
KNOWHERE_THROW_MSG("index not initialize or trained");
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
GETTENSOR(dataset_ptr)
|
||||
index_->add(rows, (float*)p_data);
|
||||
}
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
|
@ -87,9 +86,6 @@ class IVF : public VecIndex, public FaissBaseIndex {
|
|||
|
||||
void
|
||||
SealImpl() override;
|
||||
|
||||
protected:
|
||||
std::mutex mutex_;
|
||||
};
|
||||
|
||||
using IVFPtr = std::shared_ptr<IVF>;
|
||||
|
|
|
@ -39,7 +39,6 @@ NSG::Serialize(const Config& config) {
|
|||
|
||||
try {
|
||||
fiu_do_on("NSG.Serialize.throw_exception", throw std::exception());
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
impl::NsgIndex* index = index_.get();
|
||||
|
||||
MemoryIOWriter writer;
|
||||
|
@ -58,7 +57,6 @@ void
|
|||
NSG::Load(const BinarySet& index_binary) {
|
||||
try {
|
||||
fiu_do_on("NSG.Load.throw_exception", throw std::exception());
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
auto binary = index_binary.GetByName("NSG");
|
||||
|
||||
MemoryIOReader reader;
|
||||
|
@ -93,7 +91,6 @@ NSG::Query(const DatasetPtr& dataset_ptr, const Config& config) {
|
|||
s_params.search_length = config[IndexParams::search_length];
|
||||
s_params.k = config[meta::TOPK];
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
index_->Search((float*)p_data, rows, dim, config[meta::TOPK].get<int64_t>(), p_dist, p_id, s_params,
|
||||
blacklist);
|
||||
}
|
||||
|
@ -110,7 +107,7 @@ NSG::Query(const DatasetPtr& dataset_ptr, const Config& config) {
|
|||
}
|
||||
|
||||
void
|
||||
NSG::Train(const DatasetPtr& dataset_ptr, const Config& config) {
|
||||
NSG::BuildAll(const DatasetPtr& dataset_ptr, const Config& config) {
|
||||
auto idmap = std::make_shared<IDMAP>();
|
||||
idmap->Train(dataset_ptr, config);
|
||||
idmap->AddWithoutIds(dataset_ptr, config);
|
||||
|
|
|
@ -41,16 +41,16 @@ class NSG : public VecIndex {
|
|||
Load(const BinarySet&) override;
|
||||
|
||||
void
|
||||
BuildAll(const DatasetPtr& dataset_ptr, const Config& config) override {
|
||||
Train(dataset_ptr, config);
|
||||
BuildAll(const DatasetPtr&, const Config&) override;
|
||||
|
||||
void
|
||||
Train(const DatasetPtr&, const Config&) override {
|
||||
KNOWHERE_THROW_MSG("NSG not support build item dynamically, please invoke BuildAll interface.");
|
||||
}
|
||||
|
||||
void
|
||||
Train(const DatasetPtr&, const Config&) override;
|
||||
|
||||
void
|
||||
AddWithoutIds(const DatasetPtr&, const Config&) override {
|
||||
KNOWHERE_THROW_MSG("Addwithoutids is not supported");
|
||||
KNOWHERE_THROW_MSG("Incremental index NSG is not supported");
|
||||
}
|
||||
|
||||
DatasetPtr
|
||||
|
@ -66,7 +66,6 @@ class NSG : public VecIndex {
|
|||
UpdateIndexSize() override;
|
||||
|
||||
private:
|
||||
std::mutex mutex_;
|
||||
int64_t gpu_;
|
||||
std::shared_ptr<impl::NsgIndex> index_;
|
||||
};
|
||||
|
|
|
@ -116,7 +116,7 @@ CPUSPTAGRNG::Load(const BinarySet& binary_set) {
|
|||
}
|
||||
|
||||
void
|
||||
CPUSPTAGRNG::Train(const DatasetPtr& origin, const Config& train_config) {
|
||||
CPUSPTAGRNG::BuildAll(const DatasetPtr& origin, const Config& train_config) {
|
||||
SetParameters(train_config);
|
||||
|
||||
DatasetPtr dataset = origin;
|
||||
|
|
|
@ -34,16 +34,16 @@ class CPUSPTAGRNG : public VecIndex {
|
|||
Load(const BinarySet& index_array) override;
|
||||
|
||||
void
|
||||
BuildAll(const DatasetPtr& dataset_ptr, const Config& config) override {
|
||||
Train(dataset_ptr, config);
|
||||
BuildAll(const DatasetPtr&, const Config&) override;
|
||||
|
||||
void
|
||||
Train(const DatasetPtr& dataset_ptr, const Config& config) override {
|
||||
KNOWHERE_THROW_MSG("SPTAGRNG not support build item dynamically, please invoke BuildAll interface.");
|
||||
}
|
||||
|
||||
void
|
||||
Train(const DatasetPtr& dataset_ptr, const Config& config) override;
|
||||
|
||||
void
|
||||
AddWithoutIds(const DatasetPtr&, const Config&) override {
|
||||
KNOWHERE_THROW_MSG("Incremental index is not supported");
|
||||
KNOWHERE_THROW_MSG("Incremental index SPTAGRNG is not supported");
|
||||
}
|
||||
|
||||
DatasetPtr
|
||||
|
|
|
@ -61,13 +61,13 @@ class VecIndex : public Index {
|
|||
|
||||
faiss::ConcurrentBitsetPtr
|
||||
GetBlacklist() {
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
std::unique_lock<std::mutex> lck(bitset_mutex_);
|
||||
return bitset_;
|
||||
}
|
||||
|
||||
void
|
||||
SetBlacklist(faiss::ConcurrentBitsetPtr bitset_ptr) {
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
std::unique_lock<std::mutex> lck(bitset_mutex_);
|
||||
bitset_ = std::move(bitset_ptr);
|
||||
}
|
||||
|
||||
|
@ -94,7 +94,7 @@ class VecIndex : public Index {
|
|||
|
||||
size_t
|
||||
BlacklistSize() {
|
||||
std::unique_lock<std::mutex> lck(mutex_);
|
||||
std::unique_lock<std::mutex> lck(bitset_mutex_);
|
||||
return bitset_ ? bitset_->size() : 0;
|
||||
}
|
||||
|
||||
|
@ -133,7 +133,7 @@ class VecIndex : public Index {
|
|||
|
||||
private:
|
||||
// multi thread may access bitset_
|
||||
std::mutex mutex_;
|
||||
std::mutex bitset_mutex_;
|
||||
faiss::ConcurrentBitsetPtr bitset_ = nullptr;
|
||||
};
|
||||
|
||||
|
|
|
@ -39,12 +39,11 @@ FPGAIVFPQ::Train(const DatasetPtr& dataset_ptr, const Config& config) {
|
|||
index_->train(rows, (float*)p_data);
|
||||
}
|
||||
void
|
||||
FPGAIVFPQ::Add(const DatasetPtr& dataset_ptr, const Config& config) {
|
||||
FPGAIVFPQ::AddWithoutIds(const DatasetPtr& dataset_ptr, const Config& config) {
|
||||
if (!index_ || !index_->is_trained) {
|
||||
KNOWHERE_THROW_MSG("index not initialize or trained");
|
||||
}
|
||||
LOG_ENGINE_DEBUG_ << " fpga ivpq add. ";
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
// GET_TENSOR_DATA_ID(dataset_ptr)
|
||||
GETTENSOR(dataset_ptr)
|
||||
index_->add(rows, (float*)p_data); // we not support add_with_id ,maybe support latter
|
||||
|
@ -52,7 +51,6 @@ FPGAIVFPQ::Add(const DatasetPtr& dataset_ptr, const Config& config) {
|
|||
|
||||
void
|
||||
FPGAIVFPQ::CopyIndexToFpga() {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
auto Fpga = Fpga::FpgaInst::GetInstance();
|
||||
auto ivf_index = static_cast<faiss::IndexIVFPQ*>(index_.get());
|
||||
ivf_index->make_direct_map();
|
||||
|
@ -63,8 +61,6 @@ FPGAIVFPQ::CopyIndexToFpga() {
|
|||
}
|
||||
void
|
||||
FPGAIVFPQ::QueryImpl(int64_t n, const float* data, int64_t k, float* distances, int64_t* labels, const Config& config) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
try {
|
||||
LOG_ENGINE_DEBUG_ << " run fpga search QueryImpl";
|
||||
auto params = GenParams(config);
|
||||
|
|
|
@ -35,7 +35,7 @@ class FPGAIVFPQ : public IVFPQ {
|
|||
Train(const DatasetPtr&, const Config&) override;
|
||||
|
||||
void
|
||||
Add(const DatasetPtr&, const Config&) override;
|
||||
AddWithoutIds(const DatasetPtr&, const Config&) override;
|
||||
|
||||
void
|
||||
CopyIndexToFpga();
|
||||
|
|
|
@ -31,8 +31,6 @@ namespace knowhere {
|
|||
|
||||
VecIndexPtr
|
||||
GPUIDMAP::CopyGpuToCpu(const Config& config) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
faiss::Index* device_index = index_.get();
|
||||
faiss::Index* host_index = faiss::gpu::index_gpu_to_cpu(device_index);
|
||||
|
||||
|
|
|
@ -64,8 +64,6 @@ GPUIVF::AddWithoutIds(const DatasetPtr& dataset_ptr, const Config& config) {
|
|||
|
||||
VecIndexPtr
|
||||
GPUIVF::CopyGpuToCpu(const Config& config) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
if (auto device_idx = std::dynamic_pointer_cast<faiss::gpu::GpuIndexIVF>(index_)) {
|
||||
faiss::Index* device_index = index_.get();
|
||||
faiss::Index* host_index = faiss::gpu::index_gpu_to_cpu(device_index);
|
||||
|
@ -136,8 +134,6 @@ GPUIVF::LoadImpl(const BinarySet& binary_set, const IndexType& type) {
|
|||
|
||||
void
|
||||
GPUIVF::QueryImpl(int64_t n, const float* data, int64_t k, float* distances, int64_t* labels, const Config& config) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
auto device_index = std::dynamic_pointer_cast<faiss::gpu::GpuIndexIVF>(index_);
|
||||
fiu_do_on("GPUIVF.search_impl.invald_index", device_index = nullptr);
|
||||
if (device_index) {
|
||||
|
|
|
@ -47,8 +47,6 @@ GPUIVFPQ::Train(const DatasetPtr& dataset_ptr, const Config& config) {
|
|||
|
||||
VecIndexPtr
|
||||
GPUIVFPQ::CopyGpuToCpu(const Config& config) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
faiss::Index* device_index = index_.get();
|
||||
faiss::Index* host_index = faiss::gpu::index_gpu_to_cpu(device_index);
|
||||
|
||||
|
|
|
@ -50,8 +50,6 @@ GPUIVFSQ::Train(const DatasetPtr& dataset_ptr, const Config& config) {
|
|||
|
||||
VecIndexPtr
|
||||
GPUIVFSQ::CopyGpuToCpu(const Config& config) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
faiss::Index* device_index = index_.get();
|
||||
faiss::Index* host_index = faiss::gpu::index_gpu_to_cpu(device_index);
|
||||
|
||||
|
|
|
@ -62,7 +62,6 @@ IVFSQHybrid::CopyGpuToCpu(const Config& config) {
|
|||
if (gpu_mode_ == 0) {
|
||||
return std::make_shared<IVFSQHybrid>(index_);
|
||||
}
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
faiss::Index* device_index = index_.get();
|
||||
faiss::Index* host_index = faiss::gpu::index_gpu_to_cpu(device_index);
|
||||
|
|
|
@ -9,8 +9,6 @@
|
|||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#include <mutex>
|
||||
|
||||
#include "knowhere/index/vector_index/helpers/SPTAGParameterMgr.h"
|
||||
|
||||
namespace milvus {
|
||||
|
|
|
@ -139,7 +139,7 @@ TEST_F(NSGInterfaceTest, delete_test) {
|
|||
assert(!xb.empty());
|
||||
|
||||
train_conf[milvus::knowhere::meta::DEVICEID] = DEVICE_GPU0;
|
||||
index_->Train(base_dataset, train_conf);
|
||||
index_->BuildAll(base_dataset, train_conf);
|
||||
|
||||
auto result = index_->Query(query_dataset, search_conf);
|
||||
AssertAnns(result, nq, k);
|
||||
|
|
|
@ -92,7 +92,7 @@ TEST_P(SPTAGTest, sptag_basic) {
|
|||
TEST_P(SPTAGTest, sptag_serialize) {
|
||||
assert(!xb.empty());
|
||||
|
||||
index_->Train(base_dataset, conf);
|
||||
index_->BuildAll(base_dataset, conf);
|
||||
// index_->Add(base_dataset, conf);
|
||||
auto binaryset = index_->Serialize();
|
||||
auto new_index = std::make_shared<milvus::knowhere::CPUSPTAGRNG>(IndexType);
|
||||
|
|
Loading…
Reference in New Issue