Update README for script

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/4973/head^2
cai.zhang 2020-11-23 16:40:42 +08:00 committed by yefu.chen
parent 76001b29b6
commit 71beaadaf4
17 changed files with 181 additions and 191 deletions

View File

@ -8,7 +8,6 @@ set(MILVUS_QUERY_SRCS
visitors/ShowExprVisitor.cpp
visitors/ExecExprVisitor.cpp
Plan.cpp
Search.cpp
)
add_library(milvus_query ${MILVUS_QUERY_SRCS})
target_link_libraries(milvus_query milvus_proto)

View File

@ -1,107 +0,0 @@
#include "Search.h"
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
#include <knowhere/index/vector_index/VecIndexFactory.h>
#include "segcore/Reduce.h"
#include <faiss/utils/distances.h>
#include "utils/tools.h"
namespace milvus::query {
static faiss::ConcurrentBitsetPtr
create_bitmap_view(std::optional<const BitmapSimple*> bitmaps_opt, int64_t chunk_id) {
if (!bitmaps_opt.has_value()) {
return nullptr;
}
auto& bitmaps = *bitmaps_opt.value();
auto& src_vec = bitmaps.at(chunk_id);
auto dst = std::make_shared<faiss::ConcurrentBitset>(src_vec.size());
boost::to_block_range(src_vec, dst->mutable_data());
return dst;
}
using namespace segcore;
Status
QueryBruteForceImpl(const SegmentSmallIndex& segment,
const query::QueryInfo& info,
const float* query_data,
int64_t num_queries,
Timestamp timestamp,
std::optional<const BitmapSimple*> bitmaps_opt,
QueryResult& results) {
auto& record = segment.get_insert_record();
auto& schema = segment.get_schema();
auto& indexing_record = segment.get_indexing_record();
// step 1: binary search to find the barrier of the snapshot
auto ins_barrier = get_barrier(record, timestamp);
auto max_chunk = upper_div(ins_barrier, DefaultElementPerChunk);
// auto del_barrier = get_barrier(deleted_record_, timestamp);
#if 0
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
Assert(bitmap_holder);
auto bitmap = bitmap_holder->bitmap_ptr;
#endif
// step 2.1: get meta
// step 2.2: get which vector field to search
auto vecfield_offset_opt = schema.get_offset(info.field_id_);
Assert(vecfield_offset_opt.has_value());
auto vecfield_offset = vecfield_offset_opt.value();
auto& field = schema[vecfield_offset];
auto vec_ptr = record.get_vec_entity<float>(vecfield_offset);
Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
auto dim = field.get_dim();
auto topK = info.topK_;
auto total_count = topK * num_queries;
// TODO: optimize
// step 3: small indexing search
std::vector<int64_t> final_uids(total_count, -1);
std::vector<float> final_dis(total_count, std::numeric_limits<float>::max());
auto max_indexed_id = indexing_record.get_finished_ack();
const auto& indexing_entry = indexing_record.get_indexing(vecfield_offset);
auto search_conf = indexing_entry.get_search_conf(topK);
for (int chunk_id = 0; chunk_id < max_indexed_id; ++chunk_id) {
auto indexing = indexing_entry.get_indexing(chunk_id);
auto src_data = vec_ptr->get_chunk(chunk_id).data();
auto dataset = knowhere::GenDataset(num_queries, dim, src_data);
auto bitmap_view = create_bitmap_view(bitmaps_opt, chunk_id);
auto ans = indexing->Query(dataset, search_conf, bitmap_view);
auto dis = ans->Get<float*>(milvus::knowhere::meta::DISTANCE);
auto uids = ans->Get<int64_t*>(milvus::knowhere::meta::IDS);
merge_into(num_queries, topK, final_dis.data(), final_uids.data(), dis, uids);
}
// step 4: brute force search where small indexing is unavailable
for (int chunk_id = max_indexed_id; chunk_id < max_chunk; ++chunk_id) {
std::vector<int64_t> buf_uids(total_count, -1);
std::vector<float> buf_dis(total_count, std::numeric_limits<float>::max());
faiss::float_maxheap_array_t buf = {(size_t)num_queries, (size_t)topK, buf_uids.data(), buf_dis.data()};
auto src_data = vec_ptr->get_chunk(chunk_id).data();
auto nsize =
chunk_id != max_chunk - 1 ? DefaultElementPerChunk : ins_barrier - chunk_id * DefaultElementPerChunk;
auto bitmap_view = create_bitmap_view(bitmaps_opt, chunk_id);
faiss::knn_L2sqr(query_data, src_data, dim, num_queries, nsize, &buf, bitmap_view);
merge_into(num_queries, topK, final_dis.data(), final_uids.data(), buf_dis.data(), buf_uids.data());
}
// step 5: convert offset to uids
for (auto& id : final_uids) {
if (id == -1) {
continue;
}
id = record.uids_[id];
}
results.result_ids_ = std::move(final_uids);
results.result_distances_ = std::move(final_dis);
results.topK_ = topK;
results.num_queries_ = num_queries;
return Status::OK();
}
} // namespace milvus::query

View File

@ -1,19 +0,0 @@
#pragma once
#include <optional>
#include "segcore/SegmentSmallIndex.h"
#include <boost/dynamic_bitset.hpp>
namespace milvus::query {
using BitmapChunk = boost::dynamic_bitset<>;
using BitmapSimple = std::deque<BitmapChunk>;
// note: c++17 don't support optional ref
Status
QueryBruteForceImpl(const segcore::SegmentSmallIndex& segment,
const QueryInfo& info,
const float* query_data,
int64_t num_queries,
Timestamp timestamp,
std::optional<const BitmapSimple*> bitmap_opt,
segcore::QueryResult& results);
} // namespace milvus::query

View File

@ -4,7 +4,6 @@
#include "segcore/SegmentSmallIndex.h"
#include <optional>
#include "query/ExprImpl.h"
#include "boost/dynamic_bitset.hpp"
#include "ExprVisitor.h"
namespace milvus::query {
@ -23,7 +22,7 @@ class ExecExprVisitor : ExprVisitor {
visit(RangeExpr& expr) override;
public:
using RetType = std::deque<boost::dynamic_bitset<>>;
using RetType = std::vector<std::vector<bool>>;
explicit ExecExprVisitor(segcore::SegmentSmallIndex& segment) : segment_(segment) {
}
RetType

View File

@ -1,7 +1,6 @@
#include "segcore/SegmentSmallIndex.h"
#include <optional>
#include "query/ExprImpl.h"
#include "boost/dynamic_bitset.hpp"
#include "query/generated/ExecExprVisitor.h"
namespace milvus::query {
@ -11,7 +10,7 @@ namespace milvus::query {
namespace impl {
class ExecExprVisitor : ExprVisitor {
public:
using RetType = std::deque<boost::dynamic_bitset<>>;
using RetType = std::vector<std::vector<bool>>;
explicit ExecExprVisitor(segcore::SegmentSmallIndex& segment) : segment_(segment) {
}
RetType
@ -67,7 +66,7 @@ ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl<T>& expr, Func func) -> RetT
auto& field_meta = schema[field_offset];
auto vec_ptr = records.get_scalar_entity<T>(field_offset);
auto& vec = *vec_ptr;
RetType results(vec.chunk_size());
std::vector<std::vector<bool>> results(vec.chunk_size());
for (auto chunk_id = 0; chunk_id < vec.chunk_size(); ++chunk_id) {
auto& result = results[chunk_id];
result.resize(segcore::DefaultElementPerChunk);

View File

@ -3,8 +3,6 @@
#include "segcore/SegmentBase.h"
#include "query/generated/ExecPlanNodeVisitor.h"
#include "segcore/SegmentSmallIndex.h"
#include "query/generated/ExecExprVisitor.h"
#include "query/Search.h"
namespace milvus::query {
@ -51,12 +49,7 @@ ExecPlanNodeVisitor::visit(FloatVectorANNS& node) {
auto& ph = placeholder_group_.at(0);
auto src_data = ph.get_blob<float>();
auto num_queries = ph.num_of_queries_;
if (node.predicate_.has_value()) {
auto bitmap = ExecExprVisitor(*segment).call_child(*node.predicate_.value());
auto ptr = &bitmap;
QueryBruteForceImpl(*segment, node.query_info_, src_data, num_queries, timestamp_, ptr, ret);
}
QueryBruteForceImpl(*segment, node.query_info_, src_data, num_queries, timestamp_, std::nullopt, ret);
segment->QueryBruteForceImpl(node.query_info_, src_data, num_queries, timestamp_, ret);
ret_ = ret;
}

View File

@ -37,5 +37,6 @@ class AckResponder {
std::shared_mutex mutex_;
std::set<int64_t> acks_ = {0};
std::atomic<int64_t> minimum_ = 0;
// std::atomic<int64_t> maximum_ = 0;
};
} // namespace milvus::segcore

View File

@ -4,7 +4,6 @@
#include "common/Schema.h"
#include "knowhere/index/vector_index/IndexIVF.h"
#include <memory>
#include "segcore/Record.h"
namespace milvus::segcore {

View File

@ -67,7 +67,7 @@ class IndexingRecord {
// concurrent
int64_t
get_finished_ack() const {
get_finished_ack() {
return finished_ack_.GetAck();
}

View File

@ -2,7 +2,6 @@
#include "common/Schema.h"
#include "ConcurrentVector.h"
#include "AckResponder.h"
#include "segcore/Record.h"
namespace milvus::segcore {
struct InsertRecord {

View File

@ -1,21 +0,0 @@
#pragma once
#include "common/Schema.h"
namespace milvus::segcore {
template <typename RecordType>
inline int64_t
get_barrier(const RecordType& record, Timestamp timestamp) {
auto& vec = record.timestamps_;
int64_t beg = 0;
int64_t end = record.ack_responder_.GetAck();
while (beg < end) {
auto mid = (beg + end) / 2;
if (vec[mid] < timestamp) {
beg = mid + 1;
} else {
end = mid;
}
}
return beg;
}
} // namespace milvus::segcore

View File

@ -52,7 +52,10 @@ class SegmentBase {
virtual Status
Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0;
public:
// query contains metadata of
virtual Status
QueryDeprecated(query::QueryDeprecatedPtr query, Timestamp timestamp, QueryResult& results) = 0;
virtual Status
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_groups[],

View File

@ -219,6 +219,23 @@ SegmentNaive::Delete(int64_t reserved_begin, int64_t size, const int64_t* uids_r
// return Status::OK();
}
template <typename RecordType>
int64_t
get_barrier(const RecordType& record, Timestamp timestamp) {
auto& vec = record.timestamps_;
int64_t beg = 0;
int64_t end = record.ack_responder_.GetAck();
while (beg < end) {
auto mid = (beg + end) / 2;
if (vec[mid] < timestamp) {
beg = mid + 1;
} else {
end = mid;
}
}
return beg;
}
Status
SegmentNaive::QueryImpl(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) {
auto ins_barrier = get_barrier(record_, timestamp);

View File

@ -41,12 +41,10 @@ class SegmentNaive : public SegmentBase {
Status
Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override;
private:
// NOTE: now deprecated, remains for further copy out
// query contains metadata of
Status
QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results);
QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override;
public:
Status
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_groups[],

View File

@ -204,6 +204,137 @@ SegmentSmallIndex::Delete(int64_t reserved_begin,
// return Status::OK();
}
template <typename RecordType>
int64_t
get_barrier(const RecordType& record, Timestamp timestamp) {
auto& vec = record.timestamps_;
int64_t beg = 0;
int64_t end = record.ack_responder_.GetAck();
while (beg < end) {
auto mid = (beg + end) / 2;
if (vec[mid] < timestamp) {
beg = mid + 1;
} else {
end = mid;
}
}
return beg;
}
Status
SegmentSmallIndex::QueryBruteForceImpl(const query::QueryInfo& info,
const float* query_data,
int64_t num_queries,
Timestamp timestamp,
QueryResult& results) {
// step 1: binary search to find the barrier of the snapshot
auto ins_barrier = get_barrier(record_, timestamp);
// auto del_barrier = get_barrier(deleted_record_, timestamp);
#if 0
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
Assert(bitmap_holder);
auto bitmap = bitmap_holder->bitmap_ptr;
#endif
// step 2.1: get meta
// step 2.2: get which vector field to search
auto vecfield_offset_opt = schema_->get_offset(info.field_id_);
Assert(vecfield_offset_opt.has_value());
auto vecfield_offset = vecfield_offset_opt.value();
Assert(vecfield_offset < record_.entity_vec_.size());
auto& field = schema_->operator[](vecfield_offset);
auto vec_ptr = std::static_pointer_cast<ConcurrentVector<float>>(record_.entity_vec_.at(vecfield_offset));
Assert(field.get_data_type() == DataType::VECTOR_FLOAT);
auto dim = field.get_dim();
auto topK = info.topK_;
auto total_count = topK * num_queries;
// TODO: optimize
// step 3: small indexing search
std::vector<int64_t> final_uids(total_count, -1);
std::vector<float> final_dis(total_count, std::numeric_limits<float>::max());
auto max_chunk = (ins_barrier + DefaultElementPerChunk - 1) / DefaultElementPerChunk;
auto max_indexed_id = indexing_record_.get_finished_ack();
const auto& indexing_entry = indexing_record_.get_indexing(vecfield_offset);
auto search_conf = indexing_entry.get_search_conf(topK);
for (int chunk_id = 0; chunk_id < max_indexed_id; ++chunk_id) {
auto indexing = indexing_entry.get_indexing(chunk_id);
auto src_data = vec_ptr->get_chunk(chunk_id).data();
auto dataset = knowhere::GenDataset(num_queries, dim, src_data);
auto ans = indexing->Query(dataset, search_conf, nullptr);
auto dis = ans->Get<float*>(milvus::knowhere::meta::DISTANCE);
auto uids = ans->Get<int64_t*>(milvus::knowhere::meta::IDS);
merge_into(num_queries, topK, final_dis.data(), final_uids.data(), dis, uids);
}
// step 4: brute force search where small indexing is unavailable
for (int chunk_id = max_indexed_id; chunk_id < max_chunk; ++chunk_id) {
std::vector<int64_t> buf_uids(total_count, -1);
std::vector<float> buf_dis(total_count, std::numeric_limits<float>::max());
faiss::float_maxheap_array_t buf = {(size_t)num_queries, (size_t)topK, buf_uids.data(), buf_dis.data()};
auto src_data = vec_ptr->get_chunk(chunk_id).data();
auto nsize =
chunk_id != max_chunk - 1 ? DefaultElementPerChunk : ins_barrier - chunk_id * DefaultElementPerChunk;
faiss::knn_L2sqr(query_data, src_data, dim, num_queries, nsize, &buf);
merge_into(num_queries, topK, final_dis.data(), final_uids.data(), buf_dis.data(), buf_uids.data());
}
// step 5: convert offset to uids
for (auto& id : final_uids) {
if (id == -1) {
continue;
}
id = record_.uids_[id];
}
results.result_ids_ = std::move(final_uids);
results.result_distances_ = std::move(final_dis);
results.topK_ = topK;
results.num_queries_ = num_queries;
// throw std::runtime_error("unimplemented");
return Status::OK();
}
Status
SegmentSmallIndex::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) {
// TODO: enable delete
// TODO: enable index
// TODO: remove mock
if (query_info == nullptr) {
query_info = std::make_shared<query::QueryDeprecated>();
query_info->field_name = "fakevec";
query_info->topK = 10;
query_info->num_queries = 1;
auto dim = schema_->operator[]("fakevec").get_dim();
std::default_random_engine e(42);
std::uniform_real_distribution<> dis(0.0, 1.0);
query_info->query_raw_data.resize(query_info->num_queries * dim);
for (auto& x : query_info->query_raw_data) {
x = dis(e);
}
}
// TODO
query::QueryInfo info{
query_info->topK,
query_info->field_name,
"L2",
nlohmann::json{
{"nprobe", 10},
},
};
auto num_queries = query_info->num_queries;
return QueryBruteForceImpl(info, query_info->query_raw_data.data(), num_queries, timestamp, result);
}
Status
SegmentSmallIndex::Close() {
if (this->record_.reserved != this->record_.ack_responder_.GetAck()) {

View File

@ -65,6 +65,10 @@ class SegmentSmallIndex : public SegmentBase {
Status
Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override;
// query contains metadata of
Status
QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override;
Status
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_groups[],
@ -108,16 +112,6 @@ class SegmentSmallIndex : public SegmentBase {
return record_;
}
const IndexingRecord&
get_indexing_record() const {
return indexing_record_;
}
const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}
const Schema&
get_schema() const {
return *schema_;
@ -154,12 +148,12 @@ class SegmentSmallIndex : public SegmentBase {
// Status
// QueryBruteForceImpl(query::QueryPtr query, Timestamp timestamp, QueryResult& results);
// Status
// QueryBruteForceImpl(const query::QueryInfo& info,
// const float* query_data,
// int64_t num_queries,
// Timestamp timestamp,
// QueryResult& results);
Status
QueryBruteForceImpl(const query::QueryInfo& info,
const float* query_data,
int64_t num_queries,
Timestamp timestamp,
QueryResult& results);
template <typename Type>
knowhere::IndexPtr
@ -178,5 +172,4 @@ class SegmentSmallIndex : public SegmentBase {
// std::unordered_map<std::string, knowhere::IndexPtr> indexings_; // index_name => indexing
tbb::concurrent_unordered_multimap<idx_t, int64_t> uid2offset_;
};
} // namespace milvus::segcore

View File

@ -34,11 +34,7 @@
#### Generate the go files from proto file
```shell script
cd milvus-distributed
pwd_dir=`pwd`
export PATH=$PATH:$(go env GOPATH)/bin
export protoc=${pwd_dir}/cmake_build/thirdparty/protobuf/protobuf-build/protoc
./scripts/proto_gen_go.sh
make check-proto-product
```
#### Check code specifications
@ -53,6 +49,16 @@
make all
```
#### Install docker-compose
refer: https://docs.docker.com/compose/install/
```shell script
sudo curl -L "https://github.com/docker/compose/releases/download/1.27.4/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
docker-compose --version
```
#### Start service
```shell script