mirror of https://github.com/milvus-io/milvus.git
(db/snapshot) snapshot integrate (#2770)
* code opt Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * add some APIs for SSDBImpl Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * partially add GetVectorById Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * snapshot opt Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * fix typo Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * update GetVectorByID framework Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * rename GetResFiles to GetResPath Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * update GetVectorByIdSegmentHandler Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * add GetEntityByID Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * update DataType Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * update ParamField Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * update GetEntityByIDSegmentHandler Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * support INT8 and INT16 in GetEntityById Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * add GetIDsInSegment Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * update FieldType Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * add InsertEntities and DeleteEntities Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * add HybridQuery Signed-off-by: yudong.cai <yudong.cai@zilliz.com>pull/2777/head
parent
5964adef49
commit
b7d9c2a4db
|
@ -11,12 +11,20 @@
|
|||
|
||||
#include "db/SSDBImpl.h"
|
||||
#include "cache/CpuCacheMgr.h"
|
||||
#include "db/IDGenerator.h"
|
||||
#include "db/snapshot/CompoundOperations.h"
|
||||
#include "db/snapshot/ResourceHelper.h"
|
||||
#include "db/snapshot/ResourceTypes.h"
|
||||
#include "db/snapshot/Snapshots.h"
|
||||
#include "knowhere/index/vector_index/helpers/BuilderSuspend.h"
|
||||
#include "metrics/Metrics.h"
|
||||
#include "metrics/SystemInfo.h"
|
||||
#include "scheduler/Definition.h"
|
||||
#include "scheduler/SchedInst.h"
|
||||
#include "segment/SegmentReader.h"
|
||||
#include "utils/Exception.h"
|
||||
#include "utils/StringHelpFunctions.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
#include "wal/WalDefinations.h"
|
||||
|
||||
#include <fiu-local.h>
|
||||
|
@ -81,7 +89,7 @@ SSDBImpl::Start() {
|
|||
}
|
||||
|
||||
// recovery
|
||||
while (1) {
|
||||
while (true) {
|
||||
wal::MXLogRecord record;
|
||||
auto error_code = wal_mgr_->GetNextRecovery(record);
|
||||
if (error_code != WAL_SUCCESS) {
|
||||
|
@ -330,7 +338,7 @@ SSDBImpl::PreloadCollection(const server::ContextPtr& context, const std::string
|
|||
Status
|
||||
SSDBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
|
||||
const std::vector<std::string>& field_names, std::vector<VectorsData>& vector_data,
|
||||
std::vector<meta::hybrid::DataType>& attr_type, std::vector<AttrsData>& attr_data) {
|
||||
/*std::vector<meta::hybrid::DataType>& attr_type,*/ std::vector<AttrsData>& attr_data) {
|
||||
CHECK_INITIALIZED;
|
||||
|
||||
snapshot::ScopedSnapshotT ss;
|
||||
|
@ -341,15 +349,380 @@ SSDBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_
|
|||
STATUS_CHECK(handler->GetStatus());
|
||||
|
||||
vector_data = std::move(handler->vector_data_);
|
||||
attr_type = std::move(handler->attr_type_);
|
||||
// attr_type = std::move(handler->attr_type_);
|
||||
attr_data = std::move(handler->attr_data_);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
CopyToAttr(const std::vector<uint8_t>& record, int64_t row_num, const std::vector<std::string>& field_names,
|
||||
std::unordered_map<std::string, meta::hybrid::DataType>& attr_types,
|
||||
std::unordered_map<std::string, std::vector<uint8_t>>& attr_datas,
|
||||
std::unordered_map<std::string, uint64_t>& attr_nbytes,
|
||||
std::unordered_map<std::string, uint64_t>& attr_data_size) {
|
||||
int64_t offset = 0;
|
||||
for (auto name : field_names) {
|
||||
switch (attr_types.at(name)) {
|
||||
case meta::hybrid::DataType::INT8: {
|
||||
std::vector<uint8_t> data;
|
||||
data.resize(row_num * sizeof(int8_t));
|
||||
|
||||
std::vector<int64_t> attr_value(row_num, 0);
|
||||
memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
|
||||
|
||||
std::vector<int8_t> raw_value(row_num, 0);
|
||||
for (uint64_t i = 0; i < row_num; ++i) {
|
||||
raw_value[i] = attr_value[i];
|
||||
}
|
||||
|
||||
memcpy(data.data(), raw_value.data(), row_num * sizeof(int8_t));
|
||||
attr_datas.insert(std::make_pair(name, data));
|
||||
|
||||
attr_nbytes.insert(std::make_pair(name, sizeof(int8_t)));
|
||||
attr_data_size.insert(std::make_pair(name, row_num * sizeof(int8_t)));
|
||||
offset += row_num * sizeof(int64_t);
|
||||
break;
|
||||
}
|
||||
case meta::hybrid::DataType::INT16: {
|
||||
std::vector<uint8_t> data;
|
||||
data.resize(row_num * sizeof(int16_t));
|
||||
|
||||
std::vector<int64_t> attr_value(row_num, 0);
|
||||
memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
|
||||
|
||||
std::vector<int16_t> raw_value(row_num, 0);
|
||||
for (uint64_t i = 0; i < row_num; ++i) {
|
||||
raw_value[i] = attr_value[i];
|
||||
}
|
||||
|
||||
memcpy(data.data(), raw_value.data(), row_num * sizeof(int16_t));
|
||||
attr_datas.insert(std::make_pair(name, data));
|
||||
|
||||
attr_nbytes.insert(std::make_pair(name, sizeof(int16_t)));
|
||||
attr_data_size.insert(std::make_pair(name, row_num * sizeof(int16_t)));
|
||||
offset += row_num * sizeof(int64_t);
|
||||
break;
|
||||
}
|
||||
case meta::hybrid::DataType::INT32: {
|
||||
std::vector<uint8_t> data;
|
||||
data.resize(row_num * sizeof(int32_t));
|
||||
|
||||
std::vector<int64_t> attr_value(row_num, 0);
|
||||
memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
|
||||
|
||||
std::vector<int32_t> raw_value(row_num, 0);
|
||||
for (uint64_t i = 0; i < row_num; ++i) {
|
||||
raw_value[i] = attr_value[i];
|
||||
}
|
||||
|
||||
memcpy(data.data(), raw_value.data(), row_num * sizeof(int32_t));
|
||||
attr_datas.insert(std::make_pair(name, data));
|
||||
|
||||
attr_nbytes.insert(std::make_pair(name, sizeof(int32_t)));
|
||||
attr_data_size.insert(std::make_pair(name, row_num * sizeof(int32_t)));
|
||||
offset += row_num * sizeof(int64_t);
|
||||
break;
|
||||
}
|
||||
case meta::hybrid::DataType::INT64: {
|
||||
std::vector<uint8_t> data;
|
||||
data.resize(row_num * sizeof(int64_t));
|
||||
memcpy(data.data(), record.data() + offset, row_num * sizeof(int64_t));
|
||||
attr_datas.insert(std::make_pair(name, data));
|
||||
|
||||
std::vector<int64_t> test_data(row_num);
|
||||
memcpy(test_data.data(), record.data(), row_num * sizeof(int64_t));
|
||||
|
||||
attr_nbytes.insert(std::make_pair(name, sizeof(int64_t)));
|
||||
attr_data_size.insert(std::make_pair(name, row_num * sizeof(int64_t)));
|
||||
offset += row_num * sizeof(int64_t);
|
||||
break;
|
||||
}
|
||||
case meta::hybrid::DataType::FLOAT: {
|
||||
std::vector<uint8_t> data;
|
||||
data.resize(row_num * sizeof(float));
|
||||
|
||||
std::vector<double> attr_value(row_num, 0);
|
||||
memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(double));
|
||||
|
||||
std::vector<float> raw_value(row_num, 0);
|
||||
for (uint64_t i = 0; i < row_num; ++i) {
|
||||
raw_value[i] = attr_value[i];
|
||||
}
|
||||
|
||||
memcpy(data.data(), raw_value.data(), row_num * sizeof(float));
|
||||
attr_datas.insert(std::make_pair(name, data));
|
||||
|
||||
attr_nbytes.insert(std::make_pair(name, sizeof(float)));
|
||||
attr_data_size.insert(std::make_pair(name, row_num * sizeof(float)));
|
||||
offset += row_num * sizeof(double);
|
||||
break;
|
||||
}
|
||||
case meta::hybrid::DataType::DOUBLE: {
|
||||
std::vector<uint8_t> data;
|
||||
data.resize(row_num * sizeof(double));
|
||||
memcpy(data.data(), record.data() + offset, row_num * sizeof(double));
|
||||
attr_datas.insert(std::make_pair(name, data));
|
||||
|
||||
attr_nbytes.insert(std::make_pair(name, sizeof(double)));
|
||||
attr_data_size.insert(std::make_pair(name, row_num * sizeof(double)));
|
||||
offset += row_num * sizeof(double);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSDBImpl::InsertEntities(const std::string& collection_name, const std::string& partition_name,
|
||||
const std::vector<std::string>& field_names, Entity& entity,
|
||||
std::unordered_map<std::string, meta::hybrid::DataType>& attr_types) {
|
||||
CHECK_INITIALIZED;
|
||||
|
||||
snapshot::ScopedSnapshotT ss;
|
||||
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
|
||||
|
||||
auto partition_ptr = ss->GetPartition(partition_name);
|
||||
if (partition_ptr == nullptr) {
|
||||
return Status(DB_NOT_FOUND, "Fail to get partition " + partition_name);
|
||||
}
|
||||
|
||||
/* Generate id */
|
||||
if (entity.id_array_.empty()) {
|
||||
SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
|
||||
STATUS_CHECK(id_generator.GetNextIDNumbers(entity.entity_count_, entity.id_array_));
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, std::vector<uint8_t>> attr_data;
|
||||
std::unordered_map<std::string, uint64_t> attr_nbytes;
|
||||
std::unordered_map<std::string, uint64_t> attr_data_size;
|
||||
STATUS_CHECK(CopyToAttr(entity.attr_value_, entity.entity_count_, field_names, attr_types, attr_data, attr_nbytes,
|
||||
attr_data_size));
|
||||
|
||||
if (options_.wal_enable_) {
|
||||
auto vector_it = entity.vector_data_.begin();
|
||||
if (!vector_it->second.binary_data_.empty()) {
|
||||
wal_mgr_->InsertEntities(collection_name, partition_name, entity.id_array_, vector_it->second.binary_data_,
|
||||
attr_nbytes, attr_data);
|
||||
} else if (!vector_it->second.float_data_.empty()) {
|
||||
wal_mgr_->InsertEntities(collection_name, partition_name, entity.id_array_, vector_it->second.float_data_,
|
||||
attr_nbytes, attr_data);
|
||||
}
|
||||
swn_wal_.Notify();
|
||||
} else {
|
||||
// insert entities: collection_name is field id
|
||||
wal::MXLogRecord record;
|
||||
record.lsn = 0;
|
||||
record.collection_id = collection_name;
|
||||
record.partition_tag = partition_name;
|
||||
record.ids = entity.id_array_.data();
|
||||
record.length = entity.entity_count_;
|
||||
record.attr_data = attr_data;
|
||||
record.attr_nbytes = attr_nbytes;
|
||||
record.attr_data_size = attr_data_size;
|
||||
|
||||
auto vector_it = entity.vector_data_.begin();
|
||||
if (vector_it->second.binary_data_.empty()) {
|
||||
record.type = wal::MXLogType::InsertVector;
|
||||
record.data = vector_it->second.float_data_.data();
|
||||
record.data_size = vector_it->second.float_data_.size() * sizeof(float);
|
||||
} else {
|
||||
record.type = wal::MXLogType::InsertBinary;
|
||||
record.data = vector_it->second.binary_data_.data();
|
||||
record.data_size = vector_it->second.binary_data_.size() * sizeof(uint8_t);
|
||||
}
|
||||
|
||||
STATUS_CHECK(ExecWalRecord(record));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSDBImpl::DeleteEntities(const std::string& collection_id, engine::IDNumbers entity_ids) {
|
||||
CHECK_INITIALIZED;
|
||||
|
||||
Status status;
|
||||
if (options_.wal_enable_) {
|
||||
wal_mgr_->DeleteById(collection_id, entity_ids);
|
||||
swn_wal_.Notify();
|
||||
} else {
|
||||
wal::MXLogRecord record;
|
||||
record.lsn = 0; // need to get from meta ?
|
||||
record.type = wal::MXLogType::Delete;
|
||||
record.collection_id = collection_id;
|
||||
record.ids = entity_ids.data();
|
||||
record.length = entity_ids.size();
|
||||
|
||||
status = ExecWalRecord(record);
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
Status
|
||||
SSDBImpl::GetPartitionsByTags(const std::string& collection_name, const std::vector<std::string>& partition_patterns,
|
||||
std::set<std::string>& partition_names) {
|
||||
snapshot::ScopedSnapshotT ss;
|
||||
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
|
||||
|
||||
std::vector<std::string> partition_array;
|
||||
STATUS_CHECK(ShowPartitions(collection_name, partition_array));
|
||||
|
||||
for (auto& pattern : partition_patterns) {
|
||||
if (pattern == milvus::engine::DEFAULT_PARTITON_TAG) {
|
||||
partition_names.insert(collection_name);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
for (auto& p_name : partition_array) {
|
||||
if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
|
||||
partition_names.insert(p_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (partition_names.empty()) {
|
||||
return Status(DB_PARTITION_NOT_FOUND, "Specified partition does not exist");
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSDBImpl::HybridQuery(const server::ContextPtr& context, const std::string& collection_id,
|
||||
const std::vector<std::string>& partition_patterns, query::GeneralQueryPtr general_query,
|
||||
query::QueryPtr query_ptr, std::vector<std::string>& field_names,
|
||||
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type,
|
||||
engine::QueryResult& result) {
|
||||
CHECK_INITIALIZED;
|
||||
|
||||
auto query_ctx = context->Child("Query");
|
||||
|
||||
Status status;
|
||||
meta::FilesHolder files_holder;
|
||||
#if 0
|
||||
if (partition_patterns.empty()) {
|
||||
// no partition tag specified, means search in whole table
|
||||
// get all table files from parent table
|
||||
STATUS_CHECK(meta_ptr_->FilesToSearch(collection_id, files_holder));
|
||||
|
||||
std::vector<meta::CollectionSchema> partition_array;
|
||||
STATUS_CHECK(meta_ptr_->ShowPartitions(collection_id, partition_array));
|
||||
|
||||
for (auto& schema : partition_array) {
|
||||
status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
|
||||
if (!status.ok()) {
|
||||
return Status(DB_ERROR, "get files to search failed in HybridQuery");
|
||||
}
|
||||
}
|
||||
|
||||
if (files_holder.HoldFiles().empty()) {
|
||||
return Status::OK(); // no files to search
|
||||
}
|
||||
} else {
|
||||
// get files from specified partitions
|
||||
std::set<std::string> partition_name_array;
|
||||
GetPartitionsByTags(collection_id, partition_patterns, partition_name_array);
|
||||
|
||||
for (auto& partition_name : partition_name_array) {
|
||||
status = meta_ptr_->FilesToSearch(partition_name, files_holder);
|
||||
if (!status.ok()) {
|
||||
return Status(DB_ERROR, "get files to search failed in HybridQuery");
|
||||
}
|
||||
}
|
||||
|
||||
if (files_holder.HoldFiles().empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
|
||||
status = HybridQueryAsync(query_ctx, collection_id, files_holder, general_query, query_ptr, field_names, attr_type,
|
||||
result);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
|
||||
|
||||
query_ctx->GetTraceContext()->GetSpan()->Finish();
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// Internal APIs
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
Status
|
||||
SSDBImpl::HybridQueryAsync(const server::ContextPtr& context, const std::string& collection_name,
|
||||
meta::FilesHolder& files_holder, query::GeneralQueryPtr general_query,
|
||||
query::QueryPtr query_ptr, std::vector<std::string>& field_names,
|
||||
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type,
|
||||
engine::QueryResult& result) {
|
||||
auto query_async_ctx = context->Child("Query Async");
|
||||
|
||||
TimeRecorder rc("");
|
||||
|
||||
// step 1: construct search job
|
||||
VectorsData vectors;
|
||||
milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles();
|
||||
LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files_holder.HoldFiles().size());
|
||||
scheduler::SearchJobPtr job =
|
||||
std::make_shared<scheduler::SearchJob>(query_async_ctx, general_query, query_ptr, attr_type, vectors);
|
||||
for (auto& file : files) {
|
||||
scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
|
||||
job->AddIndexFile(file_ptr);
|
||||
}
|
||||
|
||||
// step 2: put search job to scheduler and wait result
|
||||
scheduler::JobMgrInst::GetInstance()->Put(job);
|
||||
job->WaitResult();
|
||||
|
||||
files_holder.ReleaseFiles();
|
||||
if (!job->GetStatus().ok()) {
|
||||
return job->GetStatus();
|
||||
}
|
||||
|
||||
// step 3: construct results
|
||||
result.row_num_ = job->vector_count();
|
||||
result.result_ids_ = job->GetResultIds();
|
||||
result.result_distances_ = job->GetResultDistances();
|
||||
|
||||
// step 4: get entities by result ids
|
||||
auto status = GetEntityByID(collection_name, result.result_ids_, field_names, result.vectors_, result.attrs_);
|
||||
if (!status.ok()) {
|
||||
query_async_ctx->GetTraceContext()->GetSpan()->Finish();
|
||||
return status;
|
||||
}
|
||||
|
||||
// step 5: filter entities by field names
|
||||
// std::vector<engine::AttrsData> filter_attrs;
|
||||
// for (auto attr : result.attrs_) {
|
||||
// AttrsData attrs_data;
|
||||
// attrs_data.attr_type_ = attr.attr_type_;
|
||||
// attrs_data.attr_count_ = attr.attr_count_;
|
||||
// attrs_data.id_array_ = attr.id_array_;
|
||||
// for (auto& name : field_names) {
|
||||
// if (attr.attr_data_.find(name) != attr.attr_data_.end()) {
|
||||
// attrs_data.attr_data_.insert(std::make_pair(name, attr.attr_data_.at(name)));
|
||||
// }
|
||||
// }
|
||||
// filter_attrs.emplace_back(attrs_data);
|
||||
// }
|
||||
//
|
||||
// result.attrs_ = filter_attrs;
|
||||
|
||||
rc.ElapseFromBegin("Engine query totally cost");
|
||||
|
||||
query_async_ctx->GetTraceContext()->GetSpan()->Finish();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void
|
||||
SSDBImpl::InternalFlush(const std::string& collection_id) {
|
||||
wal::MXLogRecord record;
|
||||
|
@ -601,5 +974,23 @@ SSDBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
void
|
||||
SSDBImpl::SuspendIfFirst() {
|
||||
std::lock_guard<std::mutex> lock(suspend_build_mutex_);
|
||||
if (++live_search_num_ == 1) {
|
||||
LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
|
||||
knowhere::BuilderSuspend();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
SSDBImpl::ResumeIfLast() {
|
||||
std::lock_guard<std::mutex> lock(suspend_build_mutex_);
|
||||
if (--live_search_num_ == 0) {
|
||||
LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
|
||||
knowhere::BuildResume();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
|
|
|
@ -85,9 +85,35 @@ class SSDBImpl {
|
|||
Status
|
||||
GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
|
||||
const std::vector<std::string>& field_names, std::vector<engine::VectorsData>& vector_data,
|
||||
std::vector<meta::hybrid::DataType>& attr_type, std::vector<engine::AttrsData>& attr_data);
|
||||
/*std::vector<meta::hybrid::DataType>& attr_type,*/ std::vector<engine::AttrsData>& attr_data);
|
||||
|
||||
Status
|
||||
InsertEntities(const std::string& collection_name, const std::string& partition_name,
|
||||
const std::vector<std::string>& field_names, Entity& entity,
|
||||
std::unordered_map<std::string, meta::hybrid::DataType>& attr_types);
|
||||
|
||||
Status
|
||||
DeleteEntities(const std::string& collection_id, engine::IDNumbers entity_ids);
|
||||
|
||||
Status
|
||||
HybridQuery(const server::ContextPtr& context, const std::string& collection_name,
|
||||
const std::vector<std::string>& partition_patterns, query::GeneralQueryPtr general_query,
|
||||
query::QueryPtr query_ptr, std::vector<std::string>& field_names,
|
||||
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type,
|
||||
engine::QueryResult& result);
|
||||
|
||||
private:
|
||||
Status
|
||||
GetPartitionsByTags(const std::string& collection_name, const std::vector<std::string>& partition_patterns,
|
||||
std::set<std::string>& partition_names);
|
||||
|
||||
Status
|
||||
HybridQueryAsync(const server::ContextPtr& context, const std::string& collection_name,
|
||||
meta::FilesHolder& files_holder, query::GeneralQueryPtr general_query, query::QueryPtr query_ptr,
|
||||
std::vector<std::string>& field_names,
|
||||
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type,
|
||||
engine::QueryResult& result);
|
||||
|
||||
void
|
||||
InternalFlush(const std::string& collection_id = "");
|
||||
|
||||
|
@ -127,6 +153,12 @@ class SSDBImpl {
|
|||
Status
|
||||
ExecWalRecord(const wal::MXLogRecord& record);
|
||||
|
||||
void
|
||||
SuspendIfFirst();
|
||||
|
||||
void
|
||||
ResumeIfLast();
|
||||
|
||||
private:
|
||||
DBOptions options_;
|
||||
std::atomic<bool> initialized_;
|
||||
|
@ -155,6 +187,9 @@ class SSDBImpl {
|
|||
ThreadPool index_thread_pool_;
|
||||
std::mutex index_result_mutex_;
|
||||
std::list<std::future<void>> index_thread_results_;
|
||||
|
||||
int64_t live_search_num_ = 0;
|
||||
std::mutex suspend_build_mutex_;
|
||||
}; // SSDBImpl
|
||||
|
||||
} // namespace engine
|
||||
|
|
|
@ -30,7 +30,8 @@ LoadVectorFieldElementHandler::LoadVectorFieldElementHandler(const std::shared_p
|
|||
|
||||
Status
|
||||
LoadVectorFieldElementHandler::Handle(const snapshot::FieldElementPtr& field_element) {
|
||||
if (field_->GetFtype() != snapshot::FieldType::VECTOR) {
|
||||
if (field_->GetFtype() != snapshot::FieldType::VECTOR_FLOAT &&
|
||||
field_->GetFtype() != snapshot::FieldType::VECTOR_BINARY) {
|
||||
return Status(DB_ERROR, "Should be VECTOR field");
|
||||
}
|
||||
if (field_->GetID() != field_element->GetFieldId()) {
|
||||
|
@ -47,7 +48,8 @@ LoadVectorFieldHandler::LoadVectorFieldHandler(const std::shared_ptr<server::Con
|
|||
|
||||
Status
|
||||
LoadVectorFieldHandler::Handle(const snapshot::FieldPtr& field) {
|
||||
if (field->GetFtype() != snapshot::FieldType::VECTOR) {
|
||||
if (field->GetFtype() != snapshot::FieldType::VECTOR_FLOAT &&
|
||||
field->GetFtype() != snapshot::FieldType::VECTOR_BINARY) {
|
||||
return Status::OK();
|
||||
}
|
||||
if (context_ && context_->IsConnectionBroken()) {
|
||||
|
@ -106,6 +108,11 @@ GetEntityByIdSegmentHandler::GetEntityByIdSegmentHandler(const std::shared_ptr<m
|
|||
engine::snapshot::ScopedSnapshotT ss, const IDNumbers& ids,
|
||||
const std::vector<std::string>& field_names)
|
||||
: BaseT(ss), context_(context), ids_(ids), field_names_(field_names), vector_data_(), attr_type_(), attr_data_() {
|
||||
for (auto& field_name : field_names_) {
|
||||
auto field_ptr = ss_->GetField(field_name);
|
||||
auto field_type = field_ptr->GetFtype();
|
||||
attr_type_.push_back((meta::hybrid::DataType)field_type);
|
||||
}
|
||||
}
|
||||
|
||||
Status
|
||||
|
@ -151,20 +158,24 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) {
|
|||
}
|
||||
|
||||
std::unordered_map<std::string, std::vector<uint8_t>> raw_attrs;
|
||||
for (auto& field_name : field_names_) {
|
||||
for (size_t i = 0; i < field_names_.size(); i++) {
|
||||
auto& field_name = field_names_[i];
|
||||
auto field_ptr = ss_->GetField(field_name);
|
||||
auto field_params = field_ptr->GetParams();
|
||||
auto dim = field_params[knowhere::meta::DIM].get<int64_t>();
|
||||
auto field_type = field_ptr->GetFtype();
|
||||
|
||||
if (field_ptr->GetFtype() == (int64_t)meta::hybrid::DataType::VECTOR_BINARY) {
|
||||
auto field_type = attr_type_[i];
|
||||
|
||||
if (field_type == meta::hybrid::DataType::VECTOR_BINARY) {
|
||||
auto field_params = field_ptr->GetParams();
|
||||
auto dim = field_params[knowhere::meta::DIM].get<int64_t>();
|
||||
size_t vector_size = dim / 8;
|
||||
std::vector<uint8_t> raw_vector;
|
||||
STATUS_CHECK(segment_reader.LoadVectors(offset * vector_size, vector_size, raw_vector));
|
||||
|
||||
vector_ref.vector_count_ = 1;
|
||||
vector_ref.binary_data_.swap(raw_vector);
|
||||
} else if (field_ptr->GetFtype() == (int64_t)meta::hybrid::DataType::VECTOR_FLOAT) {
|
||||
} else if (field_type == meta::hybrid::DataType::VECTOR_FLOAT) {
|
||||
auto field_params = field_ptr->GetParams();
|
||||
auto dim = field_params[knowhere::meta::DIM].get<int64_t>();
|
||||
size_t vector_size = dim * sizeof(float);
|
||||
std::vector<uint8_t> raw_vector;
|
||||
STATUS_CHECK(segment_reader.LoadVectors(offset * vector_size, vector_size, raw_vector));
|
||||
|
@ -177,30 +188,20 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) {
|
|||
} else {
|
||||
size_t num_bytes;
|
||||
switch (field_type) {
|
||||
case (int64_t)meta::hybrid::DataType::INT8: {
|
||||
num_bytes = sizeof(int8_t);
|
||||
case meta::hybrid::DataType::INT8:
|
||||
num_bytes = 1;
|
||||
break;
|
||||
}
|
||||
case (int64_t)meta::hybrid::DataType::INT16: {
|
||||
num_bytes = sizeof(int16_t);
|
||||
case meta::hybrid::DataType::INT16:
|
||||
num_bytes = 2;
|
||||
break;
|
||||
}
|
||||
case (int64_t)meta::hybrid::DataType::INT32: {
|
||||
num_bytes = sizeof(int32_t);
|
||||
case meta::hybrid::DataType::INT32:
|
||||
case meta::hybrid::DataType::FLOAT:
|
||||
num_bytes = 4;
|
||||
break;
|
||||
}
|
||||
case (int64_t)meta::hybrid::DataType::INT64: {
|
||||
num_bytes = sizeof(int64_t);
|
||||
case meta::hybrid::DataType::INT64:
|
||||
case meta::hybrid::DataType::DOUBLE:
|
||||
num_bytes = 8;
|
||||
break;
|
||||
}
|
||||
case (int64_t)meta::hybrid::DataType::FLOAT: {
|
||||
num_bytes = sizeof(float);
|
||||
break;
|
||||
}
|
||||
case (int64_t)meta::hybrid::DataType::DOUBLE: {
|
||||
num_bytes = sizeof(double);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
std::string msg = "Field type of " + field_name + " not supported";
|
||||
return Status(DB_ERROR, msg);
|
||||
|
|
|
@ -101,7 +101,7 @@ using Table2FileRef = std::map<std::string, File2RefCount>;
|
|||
|
||||
namespace hybrid {
|
||||
|
||||
enum class DataType {
|
||||
enum DataType {
|
||||
NONE = 0,
|
||||
BOOL = 1,
|
||||
INT8 = 2,
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "db/meta/MetaTypes.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace snapshot {
|
||||
|
@ -27,7 +29,7 @@ using LSN_TYPE = int64_t;
|
|||
using SIZE_TYPE = uint64_t;
|
||||
using MappingT = std::set<ID_TYPE>;
|
||||
|
||||
enum FieldType { VECTOR, INT32 };
|
||||
using FieldType = meta::hybrid::DataType;
|
||||
enum FieldElementType { RAW, IVFSQ8 };
|
||||
|
||||
using IDS_TYPE = std::vector<ID_TYPE>;
|
||||
|
|
|
@ -289,7 +289,7 @@ class Store {
|
|||
std::stringstream fname;
|
||||
fname << "f_" << fi << "_" << std::get<Index<Field::MapT, MockResourcesT>::value>(ids_) + 1;
|
||||
FieldPtr field;
|
||||
Field temp_f(fname.str(), fi, FieldType::VECTOR);
|
||||
Field temp_f(fname.str(), fi, FieldType::VECTOR_FLOAT);
|
||||
temp_f.Activate();
|
||||
CreateResource<Field>(std::move(temp_f), field);
|
||||
all_records.push_back(field);
|
||||
|
|
|
@ -175,7 +175,7 @@ CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) {
|
|||
auto collection_schema = std::make_shared<Collection>(collection_name);
|
||||
context.collection = collection_schema;
|
||||
auto vector_field = std::make_shared<Field>("vector", 0,
|
||||
milvus::engine::snapshot::FieldType::VECTOR);
|
||||
milvus::engine::snapshot::FieldType::VECTOR_FLOAT);
|
||||
auto vector_field_element = std::make_shared<FieldElement>(0, 0, "ivfsq8",
|
||||
milvus::engine::snapshot::FieldElementType::IVFSQ8);
|
||||
auto int_field = std::make_shared<Field>("int", 0,
|
||||
|
|
Loading…
Reference in New Issue