mirror of https://github.com/milvus-io/milvus.git
parent
2214397646
commit
84318f251b
|
@ -1,38 +0,0 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "query/BinaryQuery.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace search {
|
||||
class Task;
|
||||
|
||||
using TaskPtr = std::shared_ptr<Task>;
|
||||
} // namespace search
|
||||
|
||||
namespace context {
|
||||
|
||||
struct HybridSearchContext {
|
||||
query::GeneralQueryPtr general_query_;
|
||||
std::vector<::milvus::search::TaskPtr> tasks_;
|
||||
};
|
||||
|
||||
using HybridSearchContextPtr = std::shared_ptr<HybridSearchContext>;
|
||||
|
||||
} // namespace context
|
||||
} // namespace milvus
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
#include "Options.h"
|
||||
#include "Types.h"
|
||||
#include "context/HybridSearchContext.h"
|
||||
#include "meta/Meta.h"
|
||||
#include "query/GeneralQuery.h"
|
||||
#include "server/context/Context.h"
|
||||
|
@ -159,17 +158,6 @@ class DB {
|
|||
|
||||
virtual Status
|
||||
DropAll() = 0;
|
||||
|
||||
virtual Status
|
||||
CreateHybridCollection(meta::CollectionSchema& collection_schema, meta::hybrid::FieldsSchema& fields_schema) = 0;
|
||||
|
||||
virtual Status
|
||||
DescribeHybridCollection(meta::CollectionSchema& collection_schema, meta::hybrid::FieldsSchema& fields_schema) = 0;
|
||||
|
||||
virtual Status
|
||||
InsertEntities(const std::string& collection_id, const std::string& partition_tag,
|
||||
const std::vector<std::string>& field_names, Entity& entity,
|
||||
std::unordered_map<std::string, meta::hybrid::DataType>& field_types) = 0;
|
||||
}; // DB
|
||||
|
||||
using DBPtr = std::shared_ptr<DB>;
|
||||
|
|
|
@ -245,29 +245,6 @@ DBImpl::CreateCollection(meta::CollectionSchema& collection_schema) {
|
|||
return status;
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema, meta::hybrid::FieldsSchema& fields_schema) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
meta::CollectionSchema temp_schema = collection_schema;
|
||||
temp_schema.index_file_size_ *= MB;
|
||||
|
||||
return meta_ptr_->CreateHybridCollection(temp_schema, fields_schema);
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::DescribeHybridCollection(meta::CollectionSchema& collection_schema,
|
||||
milvus::engine::meta::hybrid::FieldsSchema& fields_schema) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
auto stat = meta_ptr_->DescribeHybridCollection(collection_schema, fields_schema);
|
||||
return stat;
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::DropCollection(const std::string& collection_id) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
|
@ -924,101 +901,6 @@ CopyToAttr(std::vector<uint8_t>& record, uint64_t row_num, const std::vector<std
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::InsertEntities(const std::string& collection_id, const std::string& partition_tag,
|
||||
const std::vector<std::string>& field_names, Entity& entity,
|
||||
std::unordered_map<std::string, meta::hybrid::DataType>& attr_types) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
// Generate id
|
||||
if (entity.id_array_.empty()) {
|
||||
SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
|
||||
Status status = id_generator.GetNextIDNumbers(entity.entity_count_, entity.id_array_);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
Status status;
|
||||
std::unordered_map<std::string, std::vector<uint8_t>> attr_data;
|
||||
std::unordered_map<std::string, uint64_t> attr_nbytes;
|
||||
std::unordered_map<std::string, uint64_t> attr_data_size;
|
||||
status = CopyToAttr(entity.attr_value_, entity.entity_count_, field_names, attr_types, attr_data, attr_nbytes,
|
||||
attr_data_size);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
wal::MXLogRecord record;
|
||||
record.lsn = 0;
|
||||
record.collection_id = collection_id;
|
||||
record.partition_tag = partition_tag;
|
||||
record.ids = entity.id_array_.data();
|
||||
record.length = entity.entity_count_;
|
||||
|
||||
auto vector_it = entity.vector_data_.begin();
|
||||
if (vector_it->second.binary_data_.empty()) {
|
||||
record.type = wal::MXLogType::Entity;
|
||||
record.data = vector_it->second.float_data_.data();
|
||||
record.data_size = vector_it->second.float_data_.size() * sizeof(float);
|
||||
} else {
|
||||
// record.type = wal::MXLogType::InsertBinary;
|
||||
// record.data = entities.vector_data_[0].binary_data_.data();
|
||||
// record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t);
|
||||
}
|
||||
|
||||
status = ExecWalRecord(record);
|
||||
|
||||
#if 0
|
||||
if (options_.wal_enable_) {
|
||||
std::string target_collection_name;
|
||||
status = GetPartitionByTag(collection_id, partition_tag, target_collection_name);
|
||||
if (!status.ok()) {
|
||||
LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get partition fail: %s", "insert", 0, status.message().c_str());
|
||||
return status;
|
||||
}
|
||||
|
||||
auto vector_it = entity.vector_data_.begin();
|
||||
if (!vector_it->second.binary_data_.empty()) {
|
||||
wal_mgr_->InsertEntities(collection_id, partition_tag, entity.id_array_, vector_it->second.binary_data_,
|
||||
attr_nbytes, attr_data);
|
||||
} else if (!vector_it->second.float_data_.empty()) {
|
||||
wal_mgr_->InsertEntities(collection_id, partition_tag, entity.id_array_, vector_it->second.float_data_,
|
||||
attr_nbytes, attr_data);
|
||||
}
|
||||
swn_wal_.Notify();
|
||||
} else {
|
||||
// insert entities: collection_name is field id
|
||||
wal::MXLogRecord record;
|
||||
record.lsn = 0;
|
||||
record.collection_id = collection_id;
|
||||
record.partition_tag = partition_tag;
|
||||
record.ids = entity.id_array_.data();
|
||||
record.length = entity.entity_count_;
|
||||
|
||||
auto vector_it = entity.vector_data_.begin();
|
||||
if (vector_it->second.binary_data_.empty()) {
|
||||
record.type = wal::MXLogType::Entity;
|
||||
record.data = vector_it->second.float_data_.data();
|
||||
record.data_size = vector_it->second.float_data_.size() * sizeof(float);
|
||||
record.attr_data = attr_data;
|
||||
record.attr_nbytes = attr_nbytes;
|
||||
record.attr_data_size = attr_data_size;
|
||||
} else {
|
||||
// record.type = wal::MXLogType::InsertBinary;
|
||||
// record.data = entities.vector_data_[0].binary_data_.data();
|
||||
// record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t);
|
||||
}
|
||||
|
||||
status = ExecWalRecord(record);
|
||||
}
|
||||
#endif
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::DeleteVectors(const std::string& collection_id, const std::string& partition_tag, IDNumbers vector_ids) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
|
@ -2026,102 +1908,6 @@ DBImpl::StartMergeTask(const std::set<std::string>& merge_collection_ids, bool f
|
|||
// LOG_ENGINE_DEBUG_ << "End StartMergeTask";
|
||||
}
|
||||
|
||||
// Status
|
||||
// DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
|
||||
// // const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
|
||||
//
|
||||
// LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;
|
||||
//
|
||||
// // step 1: create table file
|
||||
// meta::SegmentSchema table_file;
|
||||
// table_file.collection_id_ = collection_id;
|
||||
// table_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
|
||||
// Status status = meta_ptr_->CreateHybridCollectionFile(table_file);
|
||||
//
|
||||
// if (!status.ok()) {
|
||||
// LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
|
||||
// return status;
|
||||
// }
|
||||
//
|
||||
// // step 2: merge files
|
||||
// /*
|
||||
// ExecutionEnginePtr index =
|
||||
// EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
|
||||
// (MetricType)table_file.metric_type_, table_file.nlist_);
|
||||
//*/
|
||||
// meta::SegmentsSchema updated;
|
||||
//
|
||||
// std::string new_segment_dir;
|
||||
// utils::GetParentPath(table_file.location_, new_segment_dir);
|
||||
// auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
|
||||
//
|
||||
// // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
|
||||
// milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
|
||||
// for (auto& file : files) {
|
||||
// server::CollectMergeFilesMetrics metrics;
|
||||
// std::string segment_dir_to_merge;
|
||||
// utils::GetParentPath(file.location_, segment_dir_to_merge);
|
||||
// segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_);
|
||||
//
|
||||
// files_holder.UnmarkFile(file);
|
||||
//
|
||||
// auto file_schema = file;
|
||||
// file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
|
||||
// updated.push_back(file_schema);
|
||||
// int64_t size = segment_writer_ptr->Size();
|
||||
// if (size >= file_schema.index_file_size_) {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // step 3: serialize to disk
|
||||
// try {
|
||||
// status = segment_writer_ptr->Serialize();
|
||||
// fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
|
||||
// fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
|
||||
// } catch (std::exception& ex) {
|
||||
// std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
|
||||
// LOG_ENGINE_ERROR_ << msg;
|
||||
// status = Status(DB_ERROR, msg);
|
||||
// }
|
||||
//
|
||||
// if (!status.ok()) {
|
||||
// LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " <<
|
||||
// status.message();
|
||||
//
|
||||
// // if failed to serialize merge file to disk
|
||||
// // typical error: out of disk space, out of memory or permission denied
|
||||
// table_file.file_type_ = meta::SegmentSchema::TO_DELETE;
|
||||
// status = meta_ptr_->UpdateCollectionFile(table_file);
|
||||
// LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
|
||||
//
|
||||
// return status;
|
||||
// }
|
||||
//
|
||||
// // step 4: update table files state
|
||||
// // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
|
||||
// // else set file type to RAW, no need to build index
|
||||
// if (!utils::IsRawIndexType(table_file.engine_type_)) {
|
||||
// table_file.file_type_ = (segment_writer_ptr->Size() >= (size_t)(table_file.index_file_size_))
|
||||
// ? meta::SegmentSchema::TO_INDEX
|
||||
// : meta::SegmentSchema::RAW;
|
||||
// } else {
|
||||
// table_file.file_type_ = meta::SegmentSchema::RAW;
|
||||
// }
|
||||
// table_file.file_size_ = segment_writer_ptr->Size();
|
||||
// table_file.row_count_ = segment_writer_ptr->VectorCount();
|
||||
// updated.push_back(table_file);
|
||||
// status = meta_ptr_->UpdateCollectionFiles(updated);
|
||||
// LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size()
|
||||
// << " bytes";
|
||||
//
|
||||
// if (options_.insert_cache_immediately_) {
|
||||
// segment_writer_ptr->Cache();
|
||||
// }
|
||||
//
|
||||
// return status;
|
||||
//}
|
||||
|
||||
void
|
||||
DBImpl::BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all) {
|
||||
// LOG_ENGINE_TRACE_ << " Background merge thread start";
|
||||
|
|
|
@ -150,19 +150,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
Status
|
||||
DropIndex(const std::string& collection_id) override;
|
||||
|
||||
Status
|
||||
CreateHybridCollection(meta::CollectionSchema& collection_schema,
|
||||
meta::hybrid::FieldsSchema& fields_schema) override;
|
||||
|
||||
Status
|
||||
DescribeHybridCollection(meta::CollectionSchema& collection_schema,
|
||||
meta::hybrid::FieldsSchema& fields_schema) override;
|
||||
|
||||
Status
|
||||
InsertEntities(const std::string& collection_name, const std::string& partition_tag,
|
||||
const std::vector<std::string>& field_names, engine::Entity& entity,
|
||||
std::unordered_map<std::string, meta::hybrid::DataType>& field_types) override;
|
||||
|
||||
Status
|
||||
QueryByIDs(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
|
||||
const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
|
||||
|
@ -228,9 +215,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
void
|
||||
BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all);
|
||||
|
||||
// Status
|
||||
// MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder);
|
||||
|
||||
void
|
||||
StartBuildIndexTask();
|
||||
|
||||
|
|
|
@ -171,12 +171,6 @@ class Meta {
|
|||
|
||||
virtual Status
|
||||
GetGlobalLastLSN(uint64_t& lsn) = 0;
|
||||
|
||||
virtual Status
|
||||
CreateHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) = 0;
|
||||
|
||||
virtual Status
|
||||
DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) = 0;
|
||||
}; // MetaData
|
||||
|
||||
using MetaPtr = std::shared_ptr<Meta>;
|
||||
|
|
|
@ -2965,183 +2965,6 @@ MySQLMetaImpl::GetGlobalLastLSN(uint64_t& lsn) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::CreateHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
{
|
||||
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
|
||||
|
||||
bool is_null_connection = (connectionPtr == nullptr);
|
||||
fiu_do_on("MySQLMetaImpl.CreateCollection.null_connection", is_null_connection = true);
|
||||
fiu_do_on("MySQLMetaImpl.CreateCollection.throw_exception", throw std::exception(););
|
||||
if (is_null_connection) {
|
||||
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
|
||||
}
|
||||
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
|
||||
if (collection_schema.collection_id_.empty()) {
|
||||
NextCollectionId(collection_schema.collection_id_);
|
||||
} else {
|
||||
statement << "SELECT state FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
|
||||
<< collection_schema.collection_id_ << ";";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "CreateCollection: " << statement.str();
|
||||
|
||||
mysqlpp::StoreQueryResult res = statement.store();
|
||||
|
||||
if (res.num_rows() == 1) {
|
||||
int state = res[0]["state"];
|
||||
fiu_do_on("MySQLMetaImpl.CreateCollection.schema_TO_DELETE", state = CollectionSchema::TO_DELETE);
|
||||
if (CollectionSchema::TO_DELETE == state) {
|
||||
return Status(DB_ERROR,
|
||||
"Collection already exists and it is in delete state, please wait a second");
|
||||
} else {
|
||||
return Status(DB_ALREADY_EXIST, "Collection already exists");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
collection_schema.id_ = -1;
|
||||
collection_schema.created_on_ = utils::GetMicroSecTimeStamp();
|
||||
|
||||
std::string id = "NULL"; // auto-increment
|
||||
std::string& collection_id = collection_schema.collection_id_;
|
||||
std::string state = std::to_string(collection_schema.state_);
|
||||
std::string dimension = std::to_string(collection_schema.dimension_);
|
||||
std::string created_on = std::to_string(collection_schema.created_on_);
|
||||
std::string flag = std::to_string(collection_schema.flag_);
|
||||
std::string index_file_size = std::to_string(collection_schema.index_file_size_);
|
||||
std::string engine_type = std::to_string(collection_schema.engine_type_);
|
||||
std::string& index_params = collection_schema.index_params_;
|
||||
std::string metric_type = std::to_string(collection_schema.metric_type_);
|
||||
std::string& owner_collection = collection_schema.owner_collection_;
|
||||
std::string& partition_tag = collection_schema.partition_tag_;
|
||||
std::string& version = collection_schema.version_;
|
||||
std::string flush_lsn = std::to_string(collection_schema.flush_lsn_);
|
||||
|
||||
statement << "INSERT INTO " << META_TABLES << " VALUES(" << id << ", " << mysqlpp::quote << collection_id
|
||||
<< ", " << state << ", " << dimension << ", " << created_on << ", " << flag << ", "
|
||||
<< index_file_size << ", " << engine_type << ", " << mysqlpp::quote << index_params << ", "
|
||||
<< metric_type << ", " << mysqlpp::quote << owner_collection << ", " << mysqlpp::quote
|
||||
<< partition_tag << ", " << mysqlpp::quote << version << ", " << flush_lsn << ");";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "CreateHybridCollection: " << statement.str();
|
||||
|
||||
if (mysqlpp::SimpleResult res = statement.execute()) {
|
||||
collection_schema.id_ = res.insert_id(); // Might need to use SELECT LAST_INSERT_ID()?
|
||||
|
||||
// Consume all results to avoid "Commands out of sync" error
|
||||
} else {
|
||||
return HandleException("Failed to create collection", statement.error());
|
||||
}
|
||||
|
||||
for (auto schema : fields_schema.fields_schema_) {
|
||||
std::string id = "NULL";
|
||||
std::string collection_id = schema.collection_id_;
|
||||
std::string field_name = schema.field_name_;
|
||||
std::string field_type = std::to_string(schema.field_type_);
|
||||
std::string field_params = schema.field_params_;
|
||||
|
||||
statement << "INSERT INTO " << META_FIELDS << " VALUES(" << mysqlpp::quote << collection_id << ", "
|
||||
<< mysqlpp::quote << field_name << ", " << field_type << ", " << mysqlpp::quote << ", "
|
||||
<< field_params << ");";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "Create field: " << statement.str();
|
||||
|
||||
if (mysqlpp::SimpleResult field_res = statement.execute()) {
|
||||
// TODO(yukun): need field id?
|
||||
|
||||
} else {
|
||||
return HandleException("Failed to create field table", statement.error());
|
||||
}
|
||||
}
|
||||
} // Scoped Connection
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "Successfully create hybrid collection: " << collection_schema.collection_id_;
|
||||
std::cout << collection_schema.collection_id_;
|
||||
return utils::CreateCollectionPath(options_, collection_schema.collection_id_);
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Failed to create collection", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
mysqlpp::StoreQueryResult res, field_res;
|
||||
{
|
||||
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
|
||||
|
||||
bool is_null_connection = (connectionPtr == nullptr);
|
||||
fiu_do_on("MySQLMetaImpl.DescribeCollection.null_connection", is_null_connection = true);
|
||||
fiu_do_on("MySQLMetaImpl.DescribeCollection.throw_exception", throw std::exception(););
|
||||
if (is_null_connection) {
|
||||
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
|
||||
}
|
||||
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
statement << "SELECT id, state, dimension, created_on, flag, index_file_size, engine_type, index_params"
|
||||
<< " , metric_type ,owner_table, partition_tag, version, flush_lsn"
|
||||
<< " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
|
||||
<< collection_schema.collection_id_ << " AND state <> "
|
||||
<< std::to_string(CollectionSchema::TO_DELETE) << ";";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "DescribeHybridCollection: " << statement.str();
|
||||
|
||||
res = statement.store();
|
||||
|
||||
mysqlpp::Query field_statement = connectionPtr->query();
|
||||
field_statement << "SELECT collection_id, field_name, field_type, field_params"
|
||||
<< " FROM " << META_FIELDS << " WHERE collection_id = " << mysqlpp::quote
|
||||
<< collection_schema.collection_id_ << ";";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "Describe Collection Fields: " << field_statement.str();
|
||||
|
||||
field_res = field_statement.store();
|
||||
} // Scoped Connection
|
||||
|
||||
if (res.num_rows() == 1) {
|
||||
const mysqlpp::Row& resRow = res[0];
|
||||
collection_schema.id_ = resRow["id"]; // implicit conversion
|
||||
collection_schema.state_ = resRow["state"];
|
||||
collection_schema.dimension_ = resRow["dimension"];
|
||||
collection_schema.created_on_ = resRow["created_on"];
|
||||
collection_schema.flag_ = resRow["flag"];
|
||||
collection_schema.index_file_size_ = resRow["index_file_size"];
|
||||
collection_schema.engine_type_ = resRow["engine_type"];
|
||||
resRow["index_params"].to_string(collection_schema.index_params_);
|
||||
collection_schema.metric_type_ = resRow["metric_type"];
|
||||
resRow["owner_table"].to_string(collection_schema.owner_collection_);
|
||||
resRow["partition_tag"].to_string(collection_schema.partition_tag_);
|
||||
resRow["version"].to_string(collection_schema.version_);
|
||||
collection_schema.flush_lsn_ = resRow["flush_lsn"];
|
||||
} else {
|
||||
return Status(DB_NOT_FOUND, "Collection " + collection_schema.collection_id_ + " not found");
|
||||
}
|
||||
|
||||
auto num_row = field_res.num_rows();
|
||||
if (num_row >= 1) {
|
||||
fields_schema.fields_schema_.resize(num_row);
|
||||
for (uint64_t i = 0; i < num_row; ++i) {
|
||||
const mysqlpp::Row& resRow = field_res[i];
|
||||
resRow["collection_id"].to_string(fields_schema.fields_schema_[i].collection_id_);
|
||||
resRow["field_name"].to_string(fields_schema.fields_schema_[i].field_name_);
|
||||
fields_schema.fields_schema_[i].field_type_ = resRow["field_type"];
|
||||
resRow["field_params"].to_string(fields_schema.fields_schema_[i].field_params_);
|
||||
}
|
||||
} else {
|
||||
return Status(DB_NOT_FOUND, "Fields of " + collection_schema.collection_id_ + " not found");
|
||||
}
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Failed to describe collection", e.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace meta
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
|
|
|
@ -158,12 +158,6 @@ class MySQLMetaImpl : public Meta {
|
|||
Status
|
||||
GetGlobalLastLSN(uint64_t& lsn) override;
|
||||
|
||||
Status
|
||||
CreateHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) override;
|
||||
|
||||
Status
|
||||
DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) override;
|
||||
|
||||
private:
|
||||
Status
|
||||
NextFileId(std::string& file_id);
|
||||
|
|
|
@ -2569,162 +2569,6 @@ SqliteMetaImpl::GetGlobalLastLSN(uint64_t& lsn) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema,
|
||||
meta::hybrid::FieldsSchema& fields_schema) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
if (collection_schema.collection_id_ == "") {
|
||||
NextCollectionId(collection_schema.collection_id_);
|
||||
} else {
|
||||
fiu_do_on("SqliteMetaImpl.CreateCollection.throw_exception", throw std::exception());
|
||||
|
||||
std::string statement = "SELECT state FROM " + std::string(META_TABLES)
|
||||
+ " WHERE table_id = " + Quote(collection_schema.collection_id_) + ";";
|
||||
|
||||
AttrsMapList res;
|
||||
auto status = SqlQuery(statement, &res);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
if (res.size() == 1) {
|
||||
int state = std::stoi(res[0]["state"]);
|
||||
fiu_do_on("MySQLMetaImpl.CreateCollection.schema_TO_DELETE", state = CollectionSchema::TO_DELETE);
|
||||
if (CollectionSchema::TO_DELETE == state) {
|
||||
return Status(DB_ERROR,
|
||||
"Collection already exists and it is in delete state, please wait a second");
|
||||
} else {
|
||||
return Status(DB_ALREADY_EXIST, "Collection already exists");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
collection_schema.id_ = -1;
|
||||
collection_schema.created_on_ = utils::GetMicroSecTimeStamp();
|
||||
|
||||
std::string id = "NULL"; // auto-increment
|
||||
std::string& collection_id = collection_schema.collection_id_;
|
||||
std::string state = std::to_string(collection_schema.state_);
|
||||
std::string dimension = std::to_string(collection_schema.dimension_);
|
||||
std::string created_on = std::to_string(collection_schema.created_on_);
|
||||
std::string flag = std::to_string(collection_schema.flag_);
|
||||
std::string index_file_size = std::to_string(collection_schema.index_file_size_);
|
||||
std::string engine_type = std::to_string(collection_schema.engine_type_);
|
||||
std::string& index_params = collection_schema.index_params_;
|
||||
std::string metric_type = std::to_string(collection_schema.metric_type_);
|
||||
std::string& owner_collection = collection_schema.owner_collection_;
|
||||
std::string& partition_tag = collection_schema.partition_tag_;
|
||||
std::string& version = collection_schema.version_;
|
||||
std::string flush_lsn = std::to_string(collection_schema.flush_lsn_);
|
||||
|
||||
std::string statement = "INSERT INTO " + std::string(META_TABLES)
|
||||
+ " VALUES(" + id + ", " + Quote(collection_id) + ", " + state + ", " + dimension + ", "
|
||||
+ created_on + ", " + flag + ", " + index_file_size + ", " + engine_type + ", "
|
||||
+ Quote(index_params) + ", " + metric_type + ", " + Quote(owner_collection) + ", "
|
||||
+ Quote(partition_tag) + ", " + Quote(version) + ", " + flush_lsn + ");";
|
||||
|
||||
auto status = SqlTransaction({statement});
|
||||
if (!status.ok()) {
|
||||
return HandleException("Encounter exception when create collection", status.message().c_str());
|
||||
}
|
||||
collection_schema.id_ = sqlite3_last_insert_rowid(db_);
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "Successfully create collection collection: " << collection_schema.collection_id_;
|
||||
|
||||
for (auto schema : fields_schema.fields_schema_) {
|
||||
std::string id = "NULL";
|
||||
std::string collection_id = schema.collection_id_;
|
||||
std::string field_name = schema.field_name_;
|
||||
std::string field_type = std::to_string(schema.field_type_);
|
||||
std::string field_params = schema.field_params_;
|
||||
|
||||
statement = "INSERT INTO " + std::string(META_FIELDS) + " VALUES(" + Quote(collection_id) + ", "
|
||||
+ Quote(field_name) + ", " + field_type + ", " + Quote(field_params) + ");";
|
||||
|
||||
status = SqlTransaction({statement});
|
||||
if (!status.ok()) {
|
||||
return HandleException("Failed to create field table", status.message().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "Successfully create hybrid collection: " << collection_schema.collection_id_;
|
||||
return utils::CreateCollectionPath(options_, collection_schema.collection_id_);
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when create collection", e.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::DescribeHybridCollection(milvus::engine::meta::CollectionSchema& collection_schema,
|
||||
milvus::engine::meta::hybrid::FieldsSchema& fields_schema) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
fiu_do_on("SqliteMetaImpl.DescriCollection.throw_exception", throw std::exception());
|
||||
|
||||
std::string statement = "SELECT id, state, dimension, created_on, flag, index_file_size, engine_type,"
|
||||
" index_params, metric_type ,owner_table, partition_tag, version, flush_lsn"
|
||||
" FROM " + std::string(META_TABLES)
|
||||
+ " WHERE table_id = " + Quote(collection_schema.collection_id_)
|
||||
+ " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";";
|
||||
|
||||
AttrsMapList res;
|
||||
auto status = SqlQuery(statement, &res);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
if (res.size() == 1) {
|
||||
auto& resRow = res[0];
|
||||
collection_schema.id_ = std::stoul(resRow["id"]);
|
||||
collection_schema.state_ = std::stoi(resRow["state"]);
|
||||
collection_schema.dimension_ = std::stoi(resRow["dimension"]);
|
||||
collection_schema.created_on_ = std::stol(resRow["created_on"]);
|
||||
collection_schema.flag_ = std::stol(resRow["flag"]);
|
||||
collection_schema.index_file_size_ = std::stol(resRow["index_file_size"]);
|
||||
collection_schema.engine_type_ = std::stoi(resRow["engine_type"]);
|
||||
collection_schema.index_params_ = resRow["index_params"];
|
||||
collection_schema.metric_type_ = std::stoi(resRow["metric_type"]);
|
||||
collection_schema.owner_collection_ = resRow["owner_table"];
|
||||
collection_schema.partition_tag_ = resRow["partition_tag"];
|
||||
collection_schema.version_ = resRow["version"];
|
||||
collection_schema.flush_lsn_ = std::stoul(resRow["flush_lsn"]);
|
||||
} else {
|
||||
return Status(DB_NOT_FOUND, "Collection " + collection_schema.collection_id_ + " not found");
|
||||
}
|
||||
|
||||
statement = "SELECT collection_id, field_name, field_type, field_params FROM " + std::string(META_FIELDS)
|
||||
+ " WHERE collection_id = " + Quote(collection_schema.collection_id_) + ";";
|
||||
|
||||
AttrsMapList field_res;
|
||||
status = SqlQuery(statement, &field_res);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
auto num_row = field_res.size();
|
||||
if (num_row >= 1) {
|
||||
fields_schema.fields_schema_.resize(num_row);
|
||||
for (uint64_t i = 0; i < num_row; ++i) {
|
||||
auto& resRow = field_res[i];
|
||||
fields_schema.fields_schema_[i].collection_id_ = resRow["collection_id"];
|
||||
fields_schema.fields_schema_[i].field_name_ = resRow["field_name"];
|
||||
fields_schema.fields_schema_[i].field_type_ = std::stoi(resRow["field_type"]);
|
||||
fields_schema.fields_schema_[i].field_params_ = resRow["field_params"];
|
||||
}
|
||||
} else {
|
||||
return Status(DB_NOT_FOUND, "Fields of " + collection_schema.collection_id_ + " not found");
|
||||
}
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when describe collection", e.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace meta
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
|
|
|
@ -160,12 +160,6 @@ class SqliteMetaImpl : public Meta {
|
|||
Status
|
||||
GetGlobalLastLSN(uint64_t& lsn) override;
|
||||
|
||||
Status
|
||||
CreateHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) override;
|
||||
|
||||
Status
|
||||
DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) override;
|
||||
|
||||
private:
|
||||
Status
|
||||
NextFileId(std::string& file_id);
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "context/HybridSearchContext.h"
|
||||
#include "query/BooleanQuery.h"
|
||||
#include "server/delivery/request/BaseRequest.h"
|
||||
#include "utils/Status.h"
|
||||
|
|
|
@ -39,7 +39,6 @@ RequestGroup(BaseRequest::RequestType type) {
|
|||
{BaseRequest::kDeleteByID, DDL_DML_REQUEST_GROUP},
|
||||
{BaseRequest::kGetVectorByID, INFO_REQUEST_GROUP},
|
||||
{BaseRequest::kGetVectorIDs, INFO_REQUEST_GROUP},
|
||||
{BaseRequest::kInsertEntity, DDL_DML_REQUEST_GROUP},
|
||||
|
||||
// collection operations
|
||||
{BaseRequest::kShowCollections, INFO_REQUEST_GROUP},
|
||||
|
@ -51,8 +50,6 @@ RequestGroup(BaseRequest::RequestType type) {
|
|||
{BaseRequest::kDropCollection, DDL_DML_REQUEST_GROUP},
|
||||
{BaseRequest::kPreloadCollection, DQL_REQUEST_GROUP},
|
||||
{BaseRequest::kReleaseCollection, DQL_REQUEST_GROUP},
|
||||
{BaseRequest::kCreateHybridCollection, DDL_DML_REQUEST_GROUP},
|
||||
{BaseRequest::kDescribeHybridCollection, INFO_REQUEST_GROUP},
|
||||
{BaseRequest::kReloadSegments, DQL_REQUEST_GROUP},
|
||||
|
||||
// partition operations
|
||||
|
@ -69,7 +66,6 @@ RequestGroup(BaseRequest::RequestType type) {
|
|||
{BaseRequest::kSearchByID, DQL_REQUEST_GROUP},
|
||||
{BaseRequest::kSearch, DQL_REQUEST_GROUP},
|
||||
{BaseRequest::kSearchCombine, DQL_REQUEST_GROUP},
|
||||
{BaseRequest::kHybridSearch, DQL_REQUEST_GROUP},
|
||||
};
|
||||
|
||||
auto iter = s_map_type_group.find(type);
|
||||
|
|
|
@ -117,7 +117,6 @@ class BaseRequest {
|
|||
kDeleteByID,
|
||||
kGetVectorByID,
|
||||
kGetVectorIDs,
|
||||
kInsertEntity,
|
||||
|
||||
// collection operations
|
||||
kShowCollections = 300,
|
||||
|
@ -128,9 +127,6 @@ class BaseRequest {
|
|||
kShowCollectionInfo,
|
||||
kDropCollection,
|
||||
kPreloadCollection,
|
||||
kCreateHybridCollection,
|
||||
kHasHybridCollection,
|
||||
kDescribeHybridCollection,
|
||||
kReloadSegments,
|
||||
kReleaseCollection,
|
||||
|
||||
|
@ -148,7 +144,6 @@ class BaseRequest {
|
|||
kSearchByID = 600,
|
||||
kSearch,
|
||||
kSearchCombine,
|
||||
kHybridSearch,
|
||||
};
|
||||
|
||||
protected:
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "context/HybridSearchContext.h"
|
||||
#include "query/BinaryQuery.h"
|
||||
#include "server/context/ConnectionContext.h"
|
||||
#include "tracing/TextMapCarrier.h"
|
||||
|
@ -78,13 +77,9 @@ ErrorMap(ErrorCode code) {
|
|||
std::string
|
||||
RequestMap(BaseRequest::RequestType request_type) {
|
||||
static const std::unordered_map<BaseRequest::RequestType, std::string> request_map = {
|
||||
{BaseRequest::kInsert, "Insert"},
|
||||
{BaseRequest::kCreateIndex, "CreateIndex"},
|
||||
{BaseRequest::kSearch, "Search"},
|
||||
{BaseRequest::kSearchByID, "SearchByID"},
|
||||
{BaseRequest::kHybridSearch, "HybridSearch"},
|
||||
{BaseRequest::kFlush, "Flush"},
|
||||
{BaseRequest::kCompact, "Compact"},
|
||||
{BaseRequest::kInsert, "Insert"}, {BaseRequest::kCreateIndex, "CreateIndex"},
|
||||
{BaseRequest::kSearch, "Search"}, {BaseRequest::kSearchByID, "SearchByID"},
|
||||
{BaseRequest::kFlush, "Flush"}, {BaseRequest::kCompact, "Compact"},
|
||||
};
|
||||
|
||||
if (request_map.find(request_type) != request_map.end()) {
|
||||
|
|
|
@ -394,34 +394,6 @@ TEST_F(MetaTest, COLLECTION_FILE_TEST) {
|
|||
ASSERT_EQ(table_file.file_type_, new_file_type);
|
||||
}
|
||||
|
||||
TEST_F(MetaTest, HYBRID_COLLECTION_TEST) {
|
||||
auto collection_id = "meta_test_hybrid";
|
||||
|
||||
milvus::engine::meta::CollectionSchema collection;
|
||||
collection.collection_id_ = collection_id;
|
||||
collection.dimension_ = 128;
|
||||
milvus::engine::meta::hybrid::FieldsSchema fields_schema;
|
||||
fields_schema.fields_schema_.resize(2);
|
||||
fields_schema.fields_schema_[0].collection_id_ = collection_id;
|
||||
fields_schema.fields_schema_[0].field_name_ = "field_0";
|
||||
fields_schema.fields_schema_[0].field_type_ = (int32_t)milvus::engine::meta::hybrid::DataType::INT64;
|
||||
fields_schema.fields_schema_[0].field_params_ = "";
|
||||
|
||||
fields_schema.fields_schema_[1].collection_id_ = collection_id;
|
||||
fields_schema.fields_schema_[1].field_name_ = "field_1";
|
||||
fields_schema.fields_schema_[1].field_type_ = (int32_t)milvus::engine::meta::hybrid::DataType::VECTOR;
|
||||
fields_schema.fields_schema_[1].field_params_ = "";
|
||||
|
||||
auto status = impl_->CreateHybridCollection(collection, fields_schema);
|
||||
ASSERT_TRUE(status.ok());
|
||||
milvus::engine::meta::CollectionSchema describe_collection;
|
||||
milvus::engine::meta::hybrid::FieldsSchema describe_fields;
|
||||
describe_collection.collection_id_ = collection_id;
|
||||
status = impl_->DescribeHybridCollection(describe_collection, describe_fields);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(describe_fields.fields_schema_.size(), 2);
|
||||
}
|
||||
|
||||
TEST_F(MetaTest, COLLECTION_FILE_ROW_COUNT_TEST) {
|
||||
auto collection_id = "row_count_test_table";
|
||||
|
||||
|
|
|
@ -117,34 +117,6 @@ TEST_F(MySqlMetaTest, COLLECTION_TEST) {
|
|||
ASSERT_TRUE(status.ok());
|
||||
}
|
||||
|
||||
TEST_F(MySqlMetaTest, HYBRID_COLLECTION_TEST) {
|
||||
auto collection_id = "meta_test_hybrid";
|
||||
|
||||
milvus::engine::meta::CollectionSchema collection;
|
||||
collection.collection_id_ = collection_id;
|
||||
collection.dimension_ = 128;
|
||||
milvus::engine::meta::hybrid::FieldsSchema fields_schema;
|
||||
fields_schema.fields_schema_.resize(2);
|
||||
fields_schema.fields_schema_[0].collection_id_ = collection_id;
|
||||
fields_schema.fields_schema_[0].field_name_ = "field_0";
|
||||
fields_schema.fields_schema_[0].field_type_ = (int32_t)milvus::engine::meta::hybrid::DataType::INT64;
|
||||
fields_schema.fields_schema_[0].field_params_ = "";
|
||||
|
||||
fields_schema.fields_schema_[1].collection_id_ = collection_id;
|
||||
fields_schema.fields_schema_[1].field_name_ = "field_1";
|
||||
fields_schema.fields_schema_[1].field_type_ = (int32_t)milvus::engine::meta::hybrid::DataType::VECTOR;
|
||||
fields_schema.fields_schema_[1].field_params_ = "";
|
||||
|
||||
auto status = impl_->CreateHybridCollection(collection, fields_schema);
|
||||
ASSERT_TRUE(status.ok());
|
||||
milvus::engine::meta::CollectionSchema describe_collection;
|
||||
milvus::engine::meta::hybrid::FieldsSchema describe_fields;
|
||||
describe_collection.collection_id_ = collection_id;
|
||||
status = impl_->DescribeHybridCollection(describe_collection, describe_fields);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(describe_fields.fields_schema_.size(), 2);
|
||||
}
|
||||
|
||||
TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) {
|
||||
auto collection_id = "meta_test_table";
|
||||
fiu_init(0);
|
||||
|
|
|
@ -1194,263 +1194,6 @@ TEST_F(RpcHandlerTest, CMD_TEST) {
|
|||
handler->Cmd(&context, &command, &reply);
|
||||
}
|
||||
|
||||
#if 0
|
||||
TEST_F(RpcHandlerTest, HYBRID_TEST) {
|
||||
::grpc::ServerContext context;
|
||||
milvus::grpc::Mapping mapping;
|
||||
milvus::grpc::Status response;
|
||||
|
||||
uint64_t row_num = 1000;
|
||||
uint64_t dimension = 128;
|
||||
|
||||
// Create Hybrid Collection
|
||||
mapping.set_collection_name("test_hybrid");
|
||||
auto field_0 = mapping.add_fields();
|
||||
field_0->set_name("field_0");
|
||||
field_0->mutable_type()->set_data_type(::milvus::grpc::DataType::INT64);
|
||||
|
||||
auto field_1 = mapping.add_fields();
|
||||
field_1->mutable_type()->mutable_vector_param()->set_dimension(128);
|
||||
field_1->set_name("field_1");
|
||||
|
||||
milvus::json json_param = {{"metric_type", 1}, {"engine_type", 1}};
|
||||
auto extra_param = field_1->add_extra_params();
|
||||
extra_param->set_key("params");
|
||||
extra_param->set_value(json_param.dump());
|
||||
|
||||
handler->CreateHybridCollection(&context, &mapping, &response);
|
||||
|
||||
// Insert Entities
|
||||
milvus::grpc::HInsertParam insert_param;
|
||||
milvus::grpc::HEntityIDs entity_ids;
|
||||
insert_param.set_collection_name("test_hybrid");
|
||||
|
||||
auto entity = insert_param.mutable_entities();
|
||||
auto field_name_0 = entity->add_field_names();
|
||||
*field_name_0 = "field_0";
|
||||
auto field_name_1 = entity->add_field_names();
|
||||
*field_name_1 = "field_1";
|
||||
|
||||
entity->set_row_num(row_num);
|
||||
std::vector<int64_t> field_value(row_num, 0);
|
||||
for (uint64_t i = 0; i < row_num; i++) {
|
||||
field_value[i] = i;
|
||||
}
|
||||
entity->set_attr_records(field_value.data(), row_num * sizeof(int64_t));
|
||||
|
||||
std::vector<std::vector<float>> vector_field;
|
||||
vector_field.resize(row_num);
|
||||
for (uint64_t i = 0; i < row_num; ++i) {
|
||||
vector_field[i].resize(dimension);
|
||||
for (uint64_t j = 0; j < dimension; ++j) {
|
||||
vector_field[i][j] = (float)((i + 10) / (j + 20));
|
||||
}
|
||||
}
|
||||
auto vector_record = entity->add_result_values();
|
||||
for (uint64_t i = 0; i < row_num; ++i) {
|
||||
auto record = vector_record->mutable_vector_value()->add_value();
|
||||
auto vector_data = record->mutable_float_data();
|
||||
vector_data->Resize(static_cast<int>(vector_field[i].size()), 0.0);
|
||||
memcpy(vector_data->mutable_data(), vector_field[i].data(), vector_field[i].size() * sizeof(float));
|
||||
}
|
||||
handler->InsertEntity(&context, &insert_param, &entity_ids);
|
||||
ASSERT_EQ(entity_ids.entity_id_array_size(), row_num);
|
||||
|
||||
uint64_t nq = 10;
|
||||
uint64_t topk = 10;
|
||||
milvus::grpc::HSearchParam search_param;
|
||||
auto general_query = search_param.mutable_general_query();
|
||||
auto boolean_query_1 = general_query->mutable_boolean_query();
|
||||
boolean_query_1->set_occur(milvus::grpc::Occur::MUST);
|
||||
auto general_query_1 = boolean_query_1->add_general_query();
|
||||
auto boolean_query_2 = general_query_1->mutable_boolean_query();
|
||||
auto term_query = boolean_query_2->add_general_query()->mutable_term_query();
|
||||
term_query->set_field_name("field_0");
|
||||
std::vector<int64_t> term_value(nq, 0);
|
||||
for (uint64_t i = 0; i < nq; ++i) {
|
||||
term_value[i] = i + nq;
|
||||
}
|
||||
term_query->set_value_num(nq);
|
||||
term_query->set_values(term_value.data(), nq * sizeof(int64_t));
|
||||
|
||||
auto range_query = boolean_query_2->add_general_query()->mutable_range_query();
|
||||
range_query->set_field_name("field_0");
|
||||
auto comp1 = range_query->add_operand();
|
||||
comp1->set_operator_(::milvus::grpc::CompareOperator::GTE);
|
||||
comp1->set_operand("0");
|
||||
auto comp2 = range_query->add_operand();
|
||||
comp2->set_operator_(::milvus::grpc::CompareOperator::LTE);
|
||||
comp2->set_operand("100000");
|
||||
|
||||
auto vector_query = boolean_query_2->add_general_query()->mutable_vector_query();
|
||||
vector_query->set_field_name("field_1");
|
||||
vector_query->set_topk(topk);
|
||||
vector_query->set_query_boost(2);
|
||||
std::vector<std::vector<float>> query_vector;
|
||||
query_vector.resize(nq);
|
||||
for (uint64_t i = 0; i < nq; ++i) {
|
||||
query_vector[i].resize(dimension);
|
||||
for (uint64_t j = 0; j < dimension; ++j) {
|
||||
query_vector[i][j] = (float)((j + 1) / (i + dimension));
|
||||
}
|
||||
}
|
||||
for (auto record : query_vector) {
|
||||
auto row_record = vector_query->add_records();
|
||||
CopyRowRecord(row_record, record);
|
||||
}
|
||||
auto extra_param_1 = vector_query->add_extra_params();
|
||||
extra_param_1->set_key("params");
|
||||
milvus::json param = {{"nprobe", 16}};
|
||||
extra_param_1->set_value(param.dump());
|
||||
|
||||
search_param.set_collection_name("test_hybrid");
|
||||
auto search_extra_param = search_param.add_extra_params();
|
||||
search_extra_param->set_key("params");
|
||||
search_extra_param->set_value("");
|
||||
|
||||
milvus::grpc::TopKQueryResult topk_query_result;
|
||||
handler->HybridSearch(&context, &search_param, &topk_query_result);
|
||||
}
|
||||
|
||||
TEST_F(RpcHandlerTest, HYBRID_INVALID_TEST) {
|
||||
fiu_init(0);
|
||||
|
||||
::grpc::ServerContext context;
|
||||
milvus::grpc::Mapping mapping;
|
||||
milvus::grpc::Status response;
|
||||
|
||||
uint64_t row_num = 1000;
|
||||
uint64_t dimension = 128;
|
||||
|
||||
// Create Hybrid Collection
|
||||
mapping.set_collection_name("test_hybrid");
|
||||
auto field_0 = mapping.add_fields();
|
||||
field_0->set_name("field_0");
|
||||
field_0->mutable_type()->set_data_type(::milvus::grpc::DataType::INT64);
|
||||
|
||||
auto field_1 = mapping.add_fields();
|
||||
field_1->mutable_type()->mutable_vector_param()->set_dimension(128);
|
||||
field_1->set_name("field_1");
|
||||
|
||||
milvus::json json_param = {{"metric_type", 1}, {"engine_type", 1}};
|
||||
auto extra_param = field_1->add_extra_params();
|
||||
extra_param->set_key("params");
|
||||
extra_param->set_value(json_param.dump());
|
||||
|
||||
fiu_enable("CreateHybridCollectionRequest.OnExecute.invalid_collection_name", 1, NULL, 0);
|
||||
handler->CreateHybridCollection(&context, &mapping, &response);
|
||||
fiu_disable("CreateHybridCollectionRequest.OnExecute.invalid_collection_name");
|
||||
|
||||
fiu_enable("CreateHybridCollectionRequest.OnExecute.invalid_db_execute", 1, NULL, 0);
|
||||
handler->CreateHybridCollection(&context, &mapping, &response);
|
||||
fiu_disable("CreateHybridCollectionRequest.OnExecute.invalid_db_execute");
|
||||
|
||||
handler->CreateHybridCollection(&context, &mapping, &response);
|
||||
milvus::grpc::CollectionName grpc_collection_name;
|
||||
grpc_collection_name.set_collection_name("test_hybrid");
|
||||
fiu_enable("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute", 1, NULL, 0);
|
||||
handler->DescribeHybridCollection(&context, &grpc_collection_name, &mapping);
|
||||
fiu_disable("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute");
|
||||
handler->DescribeHybridCollection(&context, &grpc_collection_name, &mapping);
|
||||
|
||||
// Insert Entities
|
||||
milvus::grpc::HInsertParam insert_param;
|
||||
milvus::grpc::HEntityIDs entity_ids;
|
||||
insert_param.set_collection_name("test_hybrid");
|
||||
|
||||
auto entity = insert_param.mutable_entities();
|
||||
auto field_name_0 = entity->add_field_names();
|
||||
*field_name_0 = "field_0";
|
||||
auto field_name_1 = entity->add_field_names();
|
||||
*field_name_1 = "field_1";
|
||||
|
||||
entity->set_row_num(row_num);
|
||||
std::vector<int64_t> field_value(row_num, 0);
|
||||
for (uint64_t i = 0; i < row_num; i++) {
|
||||
field_value[i] = i;
|
||||
}
|
||||
entity->set_attr_records(field_value.data(), row_num * sizeof(int64_t));
|
||||
|
||||
std::vector<std::vector<float>> vector_field;
|
||||
vector_field.resize(row_num);
|
||||
for (uint64_t i = 0; i < row_num; ++i) {
|
||||
vector_field[i].resize(dimension);
|
||||
for (uint64_t j = 0; j < dimension; ++j) {
|
||||
vector_field[i][j] = (float)((i + 10) / (j + 20));
|
||||
}
|
||||
}
|
||||
auto vector_record = entity->add_result_values();
|
||||
for (uint64_t i = 0; i < row_num; ++i) {
|
||||
auto record = vector_record->mutable_vector_value()->add_value();
|
||||
auto vector_data = record->mutable_float_data();
|
||||
vector_data->Resize(static_cast<int>(vector_field[i].size()), 0.0);
|
||||
memcpy(vector_data->mutable_data(), vector_field[i].data(), vector_field[i].size() * sizeof(float));
|
||||
}
|
||||
|
||||
fiu_enable("InsertEntityRequest.OnExecute.throw_std_exception", 1, NULL, 0);
|
||||
handler->InsertEntity(&context, &insert_param, &entity_ids);
|
||||
fiu_disable("InsertEntityRequest.OnExecute.throw_std_exception");
|
||||
handler->InsertEntity(&context, &insert_param, &entity_ids);
|
||||
|
||||
uint64_t nq = 10;
|
||||
uint64_t topk = 10;
|
||||
milvus::grpc::HSearchParam search_param;
|
||||
auto general_query = search_param.mutable_general_query();
|
||||
auto boolean_query_1 = general_query->mutable_boolean_query();
|
||||
boolean_query_1->set_occur(milvus::grpc::Occur::MUST);
|
||||
auto general_query_1 = boolean_query_1->add_general_query();
|
||||
auto boolean_query_2 = general_query_1->mutable_boolean_query();
|
||||
auto term_query = boolean_query_2->add_general_query()->mutable_term_query();
|
||||
term_query->set_field_name("field_0");
|
||||
std::vector<int64_t> term_value(nq, 0);
|
||||
for (uint64_t i = 0; i < nq; ++i) {
|
||||
term_value[i] = i + nq;
|
||||
}
|
||||
term_query->set_value_num(nq);
|
||||
term_query->set_values(term_value.data(), nq * sizeof(int64_t));
|
||||
|
||||
auto range_query = boolean_query_2->add_general_query()->mutable_range_query();
|
||||
range_query->set_field_name("field_0");
|
||||
auto comp1 = range_query->add_operand();
|
||||
comp1->set_operator_(::milvus::grpc::CompareOperator::GTE);
|
||||
comp1->set_operand("0");
|
||||
auto comp2 = range_query->add_operand();
|
||||
comp2->set_operator_(::milvus::grpc::CompareOperator::LTE);
|
||||
comp2->set_operand("100000");
|
||||
|
||||
auto vector_query = boolean_query_2->add_general_query()->mutable_vector_query();
|
||||
vector_query->set_field_name("field_1");
|
||||
vector_query->set_topk(topk);
|
||||
vector_query->set_query_boost(2);
|
||||
std::vector<std::vector<float>> query_vector;
|
||||
query_vector.resize(nq);
|
||||
for (uint64_t i = 0; i < nq; ++i) {
|
||||
query_vector[i].resize(dimension);
|
||||
for (uint64_t j = 0; j < dimension; ++j) {
|
||||
query_vector[i][j] = (float)((j + 1) / (i + dimension));
|
||||
}
|
||||
}
|
||||
for (auto record : query_vector) {
|
||||
auto row_record = vector_query->add_records();
|
||||
CopyRowRecord(row_record, record);
|
||||
}
|
||||
auto extra_param_1 = vector_query->add_extra_params();
|
||||
extra_param_1->set_key("params");
|
||||
milvus::json param = {{"nprobe", 16}};
|
||||
extra_param_1->set_value(param.dump());
|
||||
|
||||
search_param.set_collection_name("test_hybrid");
|
||||
auto search_extra_param = search_param.add_extra_params();
|
||||
search_extra_param->set_key("params");
|
||||
search_extra_param->set_value("");
|
||||
|
||||
milvus::grpc::TopKQueryResult topk_query_result;
|
||||
fiu_enable("SearchRequest.OnExecute.throw_std_exception", 1, NULL, 0);
|
||||
handler->HybridSearch(&context, &search_param, &topk_query_result);
|
||||
fiu_disable("SearchRequest.OnExecute.throw_std_exception");
|
||||
}
|
||||
#endif
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
namespace {
|
||||
class DummyRequest : public milvus::server::BaseRequest {
|
||||
|
|
|
@ -296,11 +296,6 @@ class TestClient : public oatpp::web::client::ApiClient {
|
|||
|
||||
API_CALL("PUT", "/system/{op}", op, PATH(String, cmd_str, "op"), BODY_STRING(String, body))
|
||||
|
||||
API_CALL("POST", "/hybrid_collections", createHybridCollection, BODY_STRING(String, body_str))
|
||||
|
||||
API_CALL("POST", "/hybrid_collections/{collection_name}/entities", InsertEntity,
|
||||
PATH(String, collection_name), BODY_STRING(String, body))
|
||||
|
||||
#include OATPP_CODEGEN_END(ApiClient)
|
||||
};
|
||||
|
||||
|
@ -561,94 +556,6 @@ TEST_F(WebControllerTest, CREATE_COLLECTION) {
|
|||
ASSERT_EQ(OStatus::CODE_400.code, response->getStatusCode());
|
||||
}
|
||||
|
||||
#if 0
|
||||
TEST_F(WebControllerTest, HYBRID_TEST) {
|
||||
nlohmann::json create_json;
|
||||
create_json["collection_name"] = "test_hybrid";
|
||||
nlohmann::json field_json_0, field_json_1;
|
||||
field_json_0["field_name"] = "field_0";
|
||||
field_json_0["field_type"] = "int64";
|
||||
field_json_0["extra_params"] = "";
|
||||
|
||||
field_json_1["field_name"] = "field_1";
|
||||
field_json_1["field_type"] = "vector";
|
||||
nlohmann::json extra_params;
|
||||
extra_params["dimension"] = 128;
|
||||
field_json_1["extra_params"] = extra_params;
|
||||
|
||||
create_json["fields"].push_back(field_json_0);
|
||||
create_json["fields"].push_back(field_json_1);
|
||||
|
||||
auto response = client_ptr->createHybridCollection(create_json.dump().c_str());
|
||||
ASSERT_EQ(OStatus::CODE_201.code, response->getStatusCode());
|
||||
auto result_dto = response->readBodyToDto<milvus::server::web::StatusDto>(object_mapper.get());
|
||||
ASSERT_EQ(milvus::server::web::StatusCode::SUCCESS, result_dto->code->getValue()) << result_dto->message->std_str();
|
||||
|
||||
int64_t dimension = 128;
|
||||
int64_t row_num = 1000;
|
||||
nlohmann::json insert_json;
|
||||
insert_json["partition_tag"] = "";
|
||||
nlohmann::json entity_0, entity_1;
|
||||
entity_0["field_name"] = "field_0";
|
||||
entity_0["field_value"] = RandomAttrRecordsJson(row_num);
|
||||
entity_1["field_name"] = "field_1";
|
||||
entity_1["field_value"] = RandomRecordsJson(dimension, row_num);
|
||||
|
||||
insert_json["entity"].push_back(entity_0);
|
||||
insert_json["entity"].push_back(entity_1);
|
||||
insert_json["row_num"] = row_num;
|
||||
|
||||
OString collection_name = "test_hybrid";
|
||||
response = client_ptr->InsertEntity(collection_name, insert_json.dump().c_str(), conncetion_ptr);
|
||||
ASSERT_EQ(OStatus::CODE_201.code, response->getStatusCode());
|
||||
auto vector_dto = response->readBodyToDto<milvus::server::web::VectorIdsDto>(object_mapper.get());
|
||||
ASSERT_EQ(row_num, vector_dto->ids->count());
|
||||
|
||||
auto status = FlushCollection(client_ptr, conncetion_ptr, collection_name);
|
||||
ASSERT_TRUE(status.ok()) << status.message();
|
||||
|
||||
// TODO(yukun): when hybrid operation is added to wal, the sleep() can be deleted
|
||||
sleep(2);
|
||||
|
||||
int64_t nq = 10;
|
||||
int64_t topk = 100;
|
||||
nlohmann::json query_json, bool_json, term_json, range_json, vector_json;
|
||||
term_json["term"]["field_name"] = "field_0";
|
||||
term_json["term"]["values"] = RandomAttrRecordsJson(nq);
|
||||
bool_json["must"].push_back(term_json);
|
||||
|
||||
range_json["range"]["field_name"] = "field_0";
|
||||
nlohmann::json comp_json;
|
||||
comp_json["gte"] = "0";
|
||||
comp_json["lte"] = "100000";
|
||||
range_json["range"]["values"] = comp_json;
|
||||
bool_json["must"].push_back(range_json);
|
||||
|
||||
vector_json["vector"]["field_name"] = "field_1";
|
||||
vector_json["vector"]["topk"] = topk;
|
||||
vector_json["vector"]["nq"] = nq;
|
||||
vector_json["vector"]["values"] = RandomRecordsJson(128, nq);
|
||||
bool_json["must"].push_back(vector_json);
|
||||
|
||||
query_json["query"]["bool"] = bool_json;
|
||||
|
||||
response = client_ptr->vectorsOp(collection_name, query_json.dump().c_str(), conncetion_ptr);
|
||||
ASSERT_EQ(OStatus::CODE_200.code, response->getStatusCode());
|
||||
|
||||
auto result_json = nlohmann::json::parse(response->readBodyToString()->std_str());
|
||||
ASSERT_TRUE(result_json.contains("num"));
|
||||
ASSERT_TRUE(result_json["num"].is_number());
|
||||
ASSERT_EQ(nq, result_json["num"].get<int64_t>());
|
||||
|
||||
ASSERT_TRUE(result_json.contains("result"));
|
||||
ASSERT_TRUE(result_json["result"].is_array());
|
||||
|
||||
auto result0_json = result_json["result"][0];
|
||||
ASSERT_TRUE(result0_json.is_array());
|
||||
ASSERT_EQ(topk, result0_json.size());
|
||||
}
|
||||
#endif
|
||||
|
||||
TEST_F(WebControllerTest, GET_COLLECTION_META) {
|
||||
OString collection_name = "web_test_create_collection" + OString(RandomName().c_str());
|
||||
GenCollection(client_ptr, conncetion_ptr, collection_name, 10, 10, "L2");
|
||||
|
|
Loading…
Reference in New Issue