feat: support inverted index for array (#33452) (#34053)

pr: https://github.com/milvus-io/milvus/pull/33184
pr: https://github.com/milvus-io/milvus/pull/33452
pr: https://github.com/milvus-io/milvus/pull/33633
issue: https://github.com/milvus-io/milvus/issues/27704
Co-authored-by: xiaocai2333 <cai.zhang@zilliz.com>

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
Signed-off-by: longjiquan <jiquan.long@zilliz.com>
Co-authored-by: cai.zhang <cai.zhang@zilliz.com>
pull/34093/head
Jiquan Long 2024-06-24 10:50:03 +08:00 committed by GitHub
parent 630a726f35
commit 22e6807e9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
54 changed files with 1979 additions and 559 deletions

View File

@ -51,6 +51,15 @@ class Schema {
return field_id;
}
FieldId
AddDebugArrayField(const std::string& name, DataType element_type) {
auto field_id = FieldId(debug_id);
debug_id++;
this->AddField(
FieldName(name), field_id, DataType::ARRAY, element_type);
return field_id;
}
// auto gen field_id for convenience
FieldId
AddDebugField(const std::string& name,

View File

@ -280,6 +280,22 @@ class SegmentExpr : public Expr {
return result;
}
template <typename T, typename FUNC, typename... ValTypes>
void
ProcessIndexChunksV2(FUNC func, ValTypes... values) {
typedef std::
conditional_t<std::is_same_v<T, std::string_view>, std::string, T>
IndexInnerType;
using Index = index::ScalarIndex<IndexInnerType>;
for (size_t i = current_index_chunk_; i < num_index_chunk_; i++) {
const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(field_id_, i);
auto* index_ptr = const_cast<Index*>(&index);
func(index_ptr, values...);
}
}
template <typename T>
bool
CanUseIndex(OpType op) const {

View File

@ -23,7 +23,14 @@ namespace exec {
void
PhyJsonContainsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
switch (expr_->column_.data_type_) {
case DataType::ARRAY:
case DataType::ARRAY: {
if (is_index_mode_) {
result = EvalArrayContainsForIndexSegment();
} else {
result = EvalJsonContainsForDataSegment();
}
break;
}
case DataType::JSON: {
if (is_index_mode_) {
PanicInfo(
@ -94,7 +101,6 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment() {
return ExecJsonContainsWithDiffType();
}
}
break;
}
case proto::plan::JSONContainsExpr_JSONOp_ContainsAll: {
if (IsArrayDataType(data_type)) {
@ -145,7 +151,6 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment() {
return ExecJsonContainsAllWithDiffType();
}
}
break;
}
default:
PanicInfo(ExprInvalid,
@ -748,5 +753,92 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType() {
return res_vec;
}
VectorPtr
PhyJsonContainsFilterExpr::EvalArrayContainsForIndexSegment() {
switch (expr_->column_.element_type_) {
case DataType::BOOL: {
return ExecArrayContainsForIndexSegmentImpl<bool>();
}
case DataType::INT8: {
return ExecArrayContainsForIndexSegmentImpl<int8_t>();
}
case DataType::INT16: {
return ExecArrayContainsForIndexSegmentImpl<int16_t>();
}
case DataType::INT32: {
return ExecArrayContainsForIndexSegmentImpl<int32_t>();
}
case DataType::INT64: {
return ExecArrayContainsForIndexSegmentImpl<int64_t>();
}
case DataType::FLOAT: {
return ExecArrayContainsForIndexSegmentImpl<float>();
}
case DataType::DOUBLE: {
return ExecArrayContainsForIndexSegmentImpl<double>();
}
case DataType::VARCHAR:
case DataType::STRING: {
return ExecArrayContainsForIndexSegmentImpl<std::string>();
}
default:
PanicInfo(DataTypeInvalid,
fmt::format("unsupported data type for "
"ExecArrayContainsForIndexSegmentImpl: {}",
expr_->column_.element_type_));
}
}
template <typename ExprValueType>
VectorPtr
PhyJsonContainsFilterExpr::ExecArrayContainsForIndexSegmentImpl() {
typedef std::conditional_t<std::is_same_v<ExprValueType, std::string_view>,
std::string,
ExprValueType>
GetType;
using Index = index::ScalarIndex<GetType>;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
std::unordered_set<GetType> elements;
for (auto const& element : expr_->vals_) {
elements.insert(GetValueFromProto<GetType>(element));
}
boost::container::vector<GetType> elems(elements.begin(), elements.end());
auto execute_sub_batch =
[this](Index* index_ptr,
const boost::container::vector<GetType>& vals) {
switch (expr_->op_) {
case proto::plan::JSONContainsExpr_JSONOp_Contains:
case proto::plan::JSONContainsExpr_JSONOp_ContainsAny: {
return index_ptr->In(vals.size(), vals.data());
}
case proto::plan::JSONContainsExpr_JSONOp_ContainsAll: {
TargetBitmap result(index_ptr->Count());
result.set();
for (size_t i = 0; i < vals.size(); i++) {
auto sub = index_ptr->In(1, &vals[i]);
result &= sub;
}
return result;
}
default:
PanicInfo(
ExprInvalid,
"unsupported array contains type {}",
proto::plan::JSONContainsExpr_JSONOp_Name(expr_->op_));
}
};
auto res = ProcessIndexChunks<GetType>(execute_sub_batch, elems);
AssertInfo(res.size() == real_batch_size,
"internal error: expr processed rows {} not equal "
"expect batch size {}",
res.size(),
real_batch_size);
return std::make_shared<ColumnVector>(std::move(res));
}
} //namespace exec
} // namespace milvus

View File

@ -80,6 +80,13 @@ class PhyJsonContainsFilterExpr : public SegmentExpr {
VectorPtr
ExecJsonContainsWithDiffType();
VectorPtr
EvalArrayContainsForIndexSegment();
template <typename ExprValueType>
VectorPtr
ExecArrayContainsForIndexSegmentImpl();
private:
std::shared_ptr<const milvus::expr::JsonContainsExpr> expr_;
};

View File

@ -20,6 +20,66 @@
namespace milvus {
namespace exec {
template <typename T>
VectorPtr
PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex() {
return ExecRangeVisitorImplArray<T>();
}
template <>
VectorPtr
PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex<
proto::plan::Array>() {
switch (expr_->op_type_) {
case proto::plan::Equal:
case proto::plan::NotEqual: {
switch (expr_->column_.element_type_) {
case DataType::BOOL: {
return ExecArrayEqualForIndex<bool>(expr_->op_type_ ==
proto::plan::NotEqual);
}
case DataType::INT8: {
return ExecArrayEqualForIndex<int8_t>(
expr_->op_type_ == proto::plan::NotEqual);
}
case DataType::INT16: {
return ExecArrayEqualForIndex<int16_t>(
expr_->op_type_ == proto::plan::NotEqual);
}
case DataType::INT32: {
return ExecArrayEqualForIndex<int32_t>(
expr_->op_type_ == proto::plan::NotEqual);
}
case DataType::INT64: {
return ExecArrayEqualForIndex<int64_t>(
expr_->op_type_ == proto::plan::NotEqual);
}
case DataType::FLOAT:
case DataType::DOUBLE: {
// not accurate on floating point number, rollback to bruteforce.
return ExecRangeVisitorImplArray<proto::plan::Array>();
}
case DataType::VARCHAR: {
if (segment_->type() == SegmentType::Growing) {
return ExecArrayEqualForIndex<std::string>(
expr_->op_type_ == proto::plan::NotEqual);
} else {
return ExecArrayEqualForIndex<std::string_view>(
expr_->op_type_ == proto::plan::NotEqual);
}
}
default:
PanicInfo(DataTypeInvalid,
"unsupported element type when execute array "
"equal for index: {}",
expr_->column_.element_type_);
}
}
default:
return ExecRangeVisitorImplArray<proto::plan::Array>();
}
}
void
PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
switch (expr_->column_.data_type_) {
@ -99,7 +159,13 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
result = ExecRangeVisitorImplArray<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result = ExecRangeVisitorImplArray<proto::plan::Array>();
if (is_index_mode_) {
result = ExecRangeVisitorImplArrayForIndex<
proto::plan::Array>();
} else {
result =
ExecRangeVisitorImplArray<proto::plan::Array>();
}
break;
default:
PanicInfo(
@ -196,6 +262,104 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray() {
return res_vec;
}
template <typename T>
VectorPtr
PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) {
typedef std::
conditional_t<std::is_same_v<T, std::string_view>, std::string, T>
IndexInnerType;
using Index = index::ScalarIndex<IndexInnerType>;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
// get all elements.
auto val = GetValueFromProto<proto::plan::Array>(expr_->val_);
if (val.array_size() == 0) {
// rollback to bruteforce. no candidates will be filtered out via index.
return ExecRangeVisitorImplArray<proto::plan::Array>();
}
// cache the result to suit the framework.
auto batch_res =
ProcessIndexChunks<IndexInnerType>([this, &val, reverse](Index* _) {
boost::container::vector<IndexInnerType> elems;
for (auto const& element : val.array()) {
auto e = GetValueFromProto<IndexInnerType>(element);
if (std::find(elems.begin(), elems.end(), e) == elems.end()) {
elems.push_back(e);
}
}
// filtering by index, get candidates.
auto size_per_chunk = segment_->size_per_chunk();
auto retrieve = [ size_per_chunk, this ](int64_t offset) -> auto {
auto chunk_idx = offset / size_per_chunk;
auto chunk_offset = offset % size_per_chunk;
const auto& chunk =
segment_->template chunk_data<milvus::ArrayView>(field_id_,
chunk_idx);
return chunk.data() + chunk_offset;
};
// compare the array via the raw data.
auto filter = [&retrieve, &val, reverse](size_t offset) -> bool {
auto data_ptr = retrieve(offset);
return data_ptr->is_same_array(val) ^ reverse;
};
// collect all candidates.
std::unordered_set<size_t> candidates;
std::unordered_set<size_t> tmp_candidates;
auto first_callback = [&candidates](size_t offset) -> void {
candidates.insert(offset);
};
auto callback = [&candidates,
&tmp_candidates](size_t offset) -> void {
if (candidates.find(offset) != candidates.end()) {
tmp_candidates.insert(offset);
}
};
auto execute_sub_batch =
[](Index* index_ptr,
const IndexInnerType& val,
const std::function<void(size_t /* offset */)>& callback) {
index_ptr->InApplyCallback(1, &val, callback);
};
// run in-filter.
for (size_t idx = 0; idx < elems.size(); idx++) {
if (idx == 0) {
ProcessIndexChunksV2<IndexInnerType>(
execute_sub_batch, elems[idx], first_callback);
} else {
ProcessIndexChunksV2<IndexInnerType>(
execute_sub_batch, elems[idx], callback);
candidates = std::move(tmp_candidates);
}
// the size of candidates is small enough.
if (candidates.size() * 100 < active_count_) {
break;
}
}
TargetBitmap res(active_count_);
// run post-filter. The filter will only be executed once in the framework.
for (const auto& candidate : candidates) {
res[candidate] = filter(candidate);
}
return res;
});
AssertInfo(batch_res.size() == real_batch_size,
"internal error: expr processed rows {} not equal "
"expect batch size {}",
batch_res.size(),
real_batch_size);
// return the result.
return std::make_shared<ColumnVector>(std::move(batch_res));
}
template <typename ExprValueType>
VectorPtr
PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson() {

View File

@ -310,6 +310,14 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr {
VectorPtr
ExecRangeVisitorImplArray();
template <typename T>
VectorPtr
ExecRangeVisitorImplArrayForIndex();
template <typename T>
VectorPtr
ExecArrayEqualForIndex(bool reverse);
// Check overflow and cache result for performace
template <typename T>
ColumnVectorPtr

View File

@ -113,11 +113,13 @@ IsMaterializedViewSupported(const DataType& data_type) {
struct ColumnInfo {
FieldId field_id_;
DataType data_type_;
DataType element_type_;
std::vector<std::string> nested_path_;
ColumnInfo(const proto::plan::ColumnInfo& column_info)
: field_id_(column_info.field_id()),
data_type_(static_cast<DataType>(column_info.data_type())),
element_type_(static_cast<DataType>(column_info.element_type())),
nested_path_(column_info.nested_path().begin(),
column_info.nested_path().end()) {
}
@ -127,6 +129,7 @@ struct ColumnInfo {
std::vector<std::string> nested_path = {})
: field_id_(field_id),
data_type_(data_type),
element_type_(DataType::NONE),
nested_path_(std::move(nested_path)) {
}
@ -140,6 +143,10 @@ struct ColumnInfo {
return false;
}
if (element_type_ != other.element_type_) {
return false;
}
for (int i = 0; i < nested_path_.size(); ++i) {
if (nested_path_[i] != other.nested_path_[i]) {
return false;
@ -151,10 +158,12 @@ struct ColumnInfo {
std::string
ToString() const {
return fmt::format("[FieldId:{}, data_type:{}, nested_path:{}]",
std::to_string(field_id_.get()),
data_type_,
milvus::Join<std::string>(nested_path_, ","));
return fmt::format(
"[FieldId:{}, data_type:{}, element_type:{}, nested_path:{}]",
std::to_string(field_id_.get()),
data_type_,
element_type_,
milvus::Join<std::string>(nested_path_, ","));
}
};

View File

@ -34,13 +34,9 @@ template <typename T>
ScalarIndexPtr<T>
IndexFactory::CreateScalarIndex(
const IndexType& index_type,
const storage::FileManagerContext& file_manager_context,
DataType d_type) {
const storage::FileManagerContext& file_manager_context) {
if (index_type == INVERTED_INDEX_TYPE) {
TantivyConfig cfg;
cfg.data_type_ = d_type;
return std::make_unique<InvertedIndexTantivy<T>>(cfg,
file_manager_context);
return std::make_unique<InvertedIndexTantivy<T>>(file_manager_context);
}
return CreateScalarIndexSort<T>(file_manager_context);
}
@ -56,14 +52,11 @@ template <>
ScalarIndexPtr<std::string>
IndexFactory::CreateScalarIndex<std::string>(
const IndexType& index_type,
const storage::FileManagerContext& file_manager_context,
DataType d_type) {
const storage::FileManagerContext& file_manager_context) {
#if defined(__linux__) || defined(__APPLE__)
if (index_type == INVERTED_INDEX_TYPE) {
TantivyConfig cfg;
cfg.data_type_ = d_type;
return std::make_unique<InvertedIndexTantivy<std::string>>(
cfg, file_manager_context);
file_manager_context);
}
return CreateStringIndexMarisa(file_manager_context);
#else
@ -76,13 +69,10 @@ ScalarIndexPtr<T>
IndexFactory::CreateScalarIndex(
const IndexType& index_type,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space,
DataType d_type) {
std::shared_ptr<milvus_storage::Space> space) {
if (index_type == INVERTED_INDEX_TYPE) {
TantivyConfig cfg;
cfg.data_type_ = d_type;
return std::make_unique<InvertedIndexTantivy<T>>(
cfg, file_manager_context, space);
return std::make_unique<InvertedIndexTantivy<T>>(file_manager_context,
space);
}
return CreateScalarIndexSort<T>(file_manager_context, space);
}
@ -92,14 +82,11 @@ ScalarIndexPtr<std::string>
IndexFactory::CreateScalarIndex<std::string>(
const IndexType& index_type,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space,
DataType d_type) {
std::shared_ptr<milvus_storage::Space> space) {
#if defined(__linux__) || defined(__APPLE__)
if (index_type == INVERTED_INDEX_TYPE) {
TantivyConfig cfg;
cfg.data_type_ = d_type;
return std::make_unique<InvertedIndexTantivy<std::string>>(
cfg, file_manager_context, space);
file_manager_context, space);
}
return CreateStringIndexMarisa(file_manager_context, space);
#else
@ -132,41 +119,32 @@ IndexFactory::CreateIndex(
}
IndexBasePtr
IndexFactory::CreateScalarIndex(
const CreateIndexInfo& create_index_info,
IndexFactory::CreatePrimitiveScalarIndex(
DataType data_type,
IndexType index_type,
const storage::FileManagerContext& file_manager_context) {
auto data_type = create_index_info.field_type;
auto index_type = create_index_info.index_type;
switch (data_type) {
// create scalar index
case DataType::BOOL:
return CreateScalarIndex<bool>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<bool>(index_type, file_manager_context);
case DataType::INT8:
return CreateScalarIndex<int8_t>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<int8_t>(index_type, file_manager_context);
case DataType::INT16:
return CreateScalarIndex<int16_t>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<int16_t>(index_type, file_manager_context);
case DataType::INT32:
return CreateScalarIndex<int32_t>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<int32_t>(index_type, file_manager_context);
case DataType::INT64:
return CreateScalarIndex<int64_t>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<int64_t>(index_type, file_manager_context);
case DataType::FLOAT:
return CreateScalarIndex<float>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<float>(index_type, file_manager_context);
case DataType::DOUBLE:
return CreateScalarIndex<double>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<double>(index_type, file_manager_context);
// create string index
case DataType::STRING:
case DataType::VARCHAR:
return CreateScalarIndex<std::string>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<std::string>(index_type,
file_manager_context);
default:
throw SegcoreError(
DataTypeInvalid,
@ -174,6 +152,24 @@ IndexFactory::CreateScalarIndex(
}
}
IndexBasePtr
IndexFactory::CreateScalarIndex(
const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context) {
switch (create_index_info.field_type) {
case DataType::ARRAY:
return CreatePrimitiveScalarIndex(
static_cast<DataType>(
file_manager_context.fieldDataMeta.schema.element_type()),
create_index_info.index_type,
file_manager_context);
default:
return CreatePrimitiveScalarIndex(create_index_info.field_type,
create_index_info.index_type,
file_manager_context);
}
}
IndexBasePtr
IndexFactory::CreateVectorIndex(
const CreateIndexInfo& create_index_info,
@ -249,32 +245,25 @@ IndexFactory::CreateScalarIndex(const CreateIndexInfo& create_index_info,
switch (data_type) {
// create scalar index
case DataType::BOOL:
return CreateScalarIndex<bool>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<bool>(index_type, file_manager, space);
case DataType::INT8:
return CreateScalarIndex<int8_t>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<int8_t>(index_type, file_manager, space);
case DataType::INT16:
return CreateScalarIndex<int16_t>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<int16_t>(index_type, file_manager, space);
case DataType::INT32:
return CreateScalarIndex<int32_t>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<int32_t>(index_type, file_manager, space);
case DataType::INT64:
return CreateScalarIndex<int64_t>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<int64_t>(index_type, file_manager, space);
case DataType::FLOAT:
return CreateScalarIndex<float>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<float>(index_type, file_manager, space);
case DataType::DOUBLE:
return CreateScalarIndex<double>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<double>(index_type, file_manager, space);
// create string index
case DataType::STRING:
case DataType::VARCHAR:
return CreateScalarIndex<std::string>(
index_type, file_manager, space, data_type);
index_type, file_manager, space);
default:
throw SegcoreError(
DataTypeInvalid,

View File

@ -65,6 +65,13 @@ class IndexFactory {
CreateVectorIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context);
IndexBasePtr
CreatePrimitiveScalarIndex(
DataType data_type,
IndexType index_type,
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());
IndexBasePtr
CreateScalarIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context =
@ -89,15 +96,13 @@ class IndexFactory {
ScalarIndexPtr<T>
CreateScalarIndex(const IndexType& index_type,
const storage::FileManagerContext& file_manager =
storage::FileManagerContext(),
DataType d_type = DataType::NONE);
storage::FileManagerContext());
template <typename T>
ScalarIndexPtr<T>
CreateScalarIndex(const IndexType& index_type,
const storage::FileManagerContext& file_manager,
std::shared_ptr<milvus_storage::Space> space,
DataType d_type = DataType::NONE);
std::shared_ptr<milvus_storage::Space> space);
};
// template <>
@ -112,6 +117,5 @@ ScalarIndexPtr<std::string>
IndexFactory::CreateScalarIndex<std::string>(
const IndexType& index_type,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space,
DataType d_type);
std::shared_ptr<milvus_storage::Space> space);
} // namespace milvus::index

View File

@ -23,12 +23,50 @@
#include "InvertedIndexTantivy.h"
namespace milvus::index {
inline TantivyDataType
get_tantivy_data_type(proto::schema::DataType data_type) {
switch (data_type) {
case proto::schema::DataType::Bool: {
return TantivyDataType::Bool;
}
case proto::schema::DataType::Int8:
case proto::schema::DataType::Int16:
case proto::schema::DataType::Int32:
case proto::schema::DataType::Int64: {
return TantivyDataType::I64;
}
case proto::schema::DataType::Float:
case proto::schema::DataType::Double: {
return TantivyDataType::F64;
}
case proto::schema::DataType::VarChar: {
return TantivyDataType::Keyword;
}
default:
PanicInfo(ErrorCode::NotImplemented,
fmt::format("not implemented data type: {}", data_type));
}
}
inline TantivyDataType
get_tantivy_data_type(const proto::schema::FieldSchema& schema) {
switch (schema.data_type()) {
case proto::schema::Array:
return get_tantivy_data_type(schema.element_type());
default:
return get_tantivy_data_type(schema.data_type());
}
}
template <typename T>
InvertedIndexTantivy<T>::InvertedIndexTantivy(
const TantivyConfig& cfg,
const storage::FileManagerContext& ctx,
std::shared_ptr<milvus_storage::Space> space)
: cfg_(cfg), space_(space) {
: space_(space), schema_(ctx.fieldDataMeta.schema) {
mem_file_manager_ = std::make_shared<MemFileManager>(ctx, ctx.space_);
disk_file_manager_ = std::make_shared<DiskFileManager>(ctx, ctx.space_);
auto field =
@ -36,7 +74,7 @@ InvertedIndexTantivy<T>::InvertedIndexTantivy(
auto prefix = disk_file_manager_->GetLocalIndexObjectPrefix();
path_ = prefix;
boost::filesystem::create_directories(path_);
d_type_ = cfg_.to_tantivy_data_type();
d_type_ = get_tantivy_data_type(schema_);
if (tantivy_index_exist(path_.c_str())) {
LOG_INFO(
"index {} already exists, which should happen in loading progress",
@ -114,83 +152,7 @@ InvertedIndexTantivy<T>::Build(const Config& config) {
AssertInfo(insert_files.has_value(), "insert_files were empty");
auto field_datas =
mem_file_manager_->CacheRawDataToMemory(insert_files.value());
switch (cfg_.data_type_) {
case DataType::BOOL: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<bool>(static_cast<const bool*>(data->Data()),
n);
}
break;
}
case DataType::INT8: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<int8_t>(
static_cast<const int8_t*>(data->Data()), n);
}
break;
}
case DataType::INT16: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<int16_t>(
static_cast<const int16_t*>(data->Data()), n);
}
break;
}
case DataType::INT32: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<int32_t>(
static_cast<const int32_t*>(data->Data()), n);
}
break;
}
case DataType::INT64: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<int64_t>(
static_cast<const int64_t*>(data->Data()), n);
}
break;
}
case DataType::FLOAT: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<float>(
static_cast<const float*>(data->Data()), n);
}
break;
}
case DataType::DOUBLE: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<double>(
static_cast<const double*>(data->Data()), n);
}
break;
}
case DataType::VARCHAR: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<std::string>(
static_cast<const std::string*>(data->Data()), n);
}
break;
}
default:
PanicInfo(ErrorCode::NotImplemented,
fmt::format("todo: not supported, {}", cfg_.data_type_));
}
build_index(field_datas);
}
template <typename T>
@ -211,84 +173,7 @@ InvertedIndexTantivy<T>::BuildV2(const Config& config) {
field_data->FillFieldData(col_data);
field_datas.push_back(field_data);
}
switch (cfg_.data_type_) {
case DataType::BOOL: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<bool>(static_cast<const bool*>(data->Data()),
n);
}
break;
}
case DataType::INT8: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<int8_t>(
static_cast<const int8_t*>(data->Data()), n);
}
break;
}
case DataType::INT16: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<int16_t>(
static_cast<const int16_t*>(data->Data()), n);
}
break;
}
case DataType::INT32: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<int32_t>(
static_cast<const int32_t*>(data->Data()), n);
}
break;
}
case DataType::INT64: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<int64_t>(
static_cast<const int64_t*>(data->Data()), n);
}
break;
}
case DataType::FLOAT: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<float>(
static_cast<const float*>(data->Data()), n);
}
break;
}
case DataType::DOUBLE: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<double>(
static_cast<const double*>(data->Data()), n);
}
break;
}
case DataType::VARCHAR: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<std::string>(
static_cast<const std::string*>(data->Data()), n);
}
break;
}
default:
PanicInfo(ErrorCode::NotImplemented,
fmt::format("todo: not supported, {}", cfg_.data_type_));
}
build_index(field_datas);
}
template <typename T>
@ -319,6 +204,25 @@ apply_hits(TargetBitmap& bitset, const RustArrayWrapper& w, bool v) {
}
}
inline void
apply_hits_with_filter(TargetBitmap& bitset,
const RustArrayWrapper& w,
const std::function<bool(size_t /* offset */)>& filter) {
for (size_t j = 0; j < w.array_.len; j++) {
auto the_offset = w.array_.array[j];
bitset[the_offset] = filter(the_offset);
}
}
inline void
apply_hits_with_callback(
const RustArrayWrapper& w,
const std::function<void(size_t /* offset */)>& callback) {
for (size_t j = 0; j < w.array_.len; j++) {
callback(w.array_.array[j]);
}
}
template <typename T>
const TargetBitmap
InvertedIndexTantivy<T>::In(size_t n, const T* values) {
@ -330,10 +234,33 @@ InvertedIndexTantivy<T>::In(size_t n, const T* values) {
return bitset;
}
template <typename T>
const TargetBitmap
InvertedIndexTantivy<T>::InApplyFilter(
size_t n, const T* values, const std::function<bool(size_t)>& filter) {
TargetBitmap bitset(Count());
for (size_t i = 0; i < n; ++i) {
auto array = wrapper_->term_query(values[i]);
apply_hits_with_filter(bitset, array, filter);
}
return bitset;
}
template <typename T>
void
InvertedIndexTantivy<T>::InApplyCallback(
size_t n, const T* values, const std::function<void(size_t)>& callback) {
for (size_t i = 0; i < n; ++i) {
auto array = wrapper_->term_query(values[i]);
apply_hits_with_callback(array, callback);
}
}
template <typename T>
const TargetBitmap
InvertedIndexTantivy<T>::NotIn(size_t n, const T* values) {
TargetBitmap bitset(Count(), true);
TargetBitmap bitset(Count());
bitset.set();
for (size_t i = 0; i < n; ++i) {
auto array = wrapper_->term_query(values[i]);
apply_hits(bitset, array, false);
@ -425,25 +352,118 @@ void
InvertedIndexTantivy<T>::BuildWithRawData(size_t n,
const void* values,
const Config& config) {
if constexpr (!std::is_same_v<T, std::string>) {
PanicInfo(Unsupported,
"InvertedIndex.BuildWithRawData only support string");
if constexpr (std::is_same_v<bool, T>) {
schema_.set_data_type(proto::schema::DataType::Bool);
}
if constexpr (std::is_same_v<int8_t, T>) {
schema_.set_data_type(proto::schema::DataType::Int8);
}
if constexpr (std::is_same_v<int16_t, T>) {
schema_.set_data_type(proto::schema::DataType::Int16);
}
if constexpr (std::is_same_v<int32_t, T>) {
schema_.set_data_type(proto::schema::DataType::Int32);
}
if constexpr (std::is_same_v<int64_t, T>) {
schema_.set_data_type(proto::schema::DataType::Int64);
}
if constexpr (std::is_same_v<float, T>) {
schema_.set_data_type(proto::schema::DataType::Float);
}
if constexpr (std::is_same_v<double, T>) {
schema_.set_data_type(proto::schema::DataType::Double);
}
if constexpr (std::is_same_v<std::string, T>) {
schema_.set_data_type(proto::schema::DataType::VarChar);
}
boost::uuids::random_generator generator;
auto uuid = generator();
auto prefix = boost::uuids::to_string(uuid);
path_ = fmt::format("/tmp/{}", prefix);
boost::filesystem::create_directories(path_);
d_type_ = get_tantivy_data_type(schema_);
std::string field = "test_inverted_index";
wrapper_ = std::make_shared<TantivyIndexWrapper>(
field.c_str(), d_type_, path_.c_str());
if (config.find("is_array") != config.end()) {
// only used in ut.
auto arr = static_cast<const boost::container::vector<T>*>(values);
for (size_t i = 0; i < n; i++) {
wrapper_->template add_multi_data(arr[i].data(), arr[i].size());
}
} else {
boost::uuids::random_generator generator;
auto uuid = generator();
auto prefix = boost::uuids::to_string(uuid);
path_ = fmt::format("/tmp/{}", prefix);
boost::filesystem::create_directories(path_);
cfg_ = TantivyConfig{
.data_type_ = DataType::VARCHAR,
};
d_type_ = cfg_.to_tantivy_data_type();
std::string field = "test_inverted_index";
wrapper_ = std::make_shared<TantivyIndexWrapper>(
field.c_str(), d_type_, path_.c_str());
wrapper_->add_data<std::string>(static_cast<const std::string*>(values),
n);
finish();
wrapper_->add_data<T>(static_cast<const T*>(values), n);
}
finish();
}
template <typename T>
void
InvertedIndexTantivy<T>::build_index(
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas) {
switch (schema_.data_type()) {
case proto::schema::DataType::Bool:
case proto::schema::DataType::Int8:
case proto::schema::DataType::Int16:
case proto::schema::DataType::Int32:
case proto::schema::DataType::Int64:
case proto::schema::DataType::Float:
case proto::schema::DataType::Double:
case proto::schema::DataType::String:
case proto::schema::DataType::VarChar: {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
wrapper_->add_data<T>(static_cast<const T*>(data->Data()), n);
}
break;
}
case proto::schema::DataType::Array: {
build_index_for_array(field_datas);
break;
}
default:
PanicInfo(ErrorCode::NotImplemented,
fmt::format("Inverted index not supported on {}",
schema_.data_type()));
}
}
template <typename T>
void
InvertedIndexTantivy<T>::build_index_for_array(
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas) {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
auto array_column = static_cast<const Array*>(data->Data());
for (int64_t i = 0; i < n; i++) {
assert(array_column[i].get_element_type() ==
static_cast<DataType>(schema_.element_type()));
wrapper_->template add_multi_data(
reinterpret_cast<const T*>(array_column[i].data()),
array_column[i].length());
}
}
}
template <>
void
InvertedIndexTantivy<std::string>::build_index_for_array(
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas) {
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
auto array_column = static_cast<const Array*>(data->Data());
for (int64_t i = 0; i < n; i++) {
assert(array_column[i].get_element_type() ==
static_cast<DataType>(schema_.element_type()));
std::vector<std::string> output;
for (int64_t j = 0; j < array_column[i].length(); j++) {
output.push_back(
array_column[i].template get_data<std::string>(j));
}
wrapper_->template add_multi_data(output.data(), output.size());
}
}
}

View File

@ -18,7 +18,6 @@
#include "tantivy-binding.h"
#include "tantivy-wrapper.h"
#include "index/StringIndex.h"
#include "index/TantivyConfig.h"
#include "storage/space.h"
namespace milvus::index {
@ -36,13 +35,11 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
InvertedIndexTantivy() = default;
explicit InvertedIndexTantivy(const TantivyConfig& cfg,
const storage::FileManagerContext& ctx)
: InvertedIndexTantivy(cfg, ctx, nullptr) {
explicit InvertedIndexTantivy(const storage::FileManagerContext& ctx)
: InvertedIndexTantivy(ctx, nullptr) {
}
explicit InvertedIndexTantivy(const TantivyConfig& cfg,
const storage::FileManagerContext& ctx,
explicit InvertedIndexTantivy(const storage::FileManagerContext& ctx,
std::shared_ptr<milvus_storage::Space> space);
~InvertedIndexTantivy();
@ -114,6 +111,18 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
const TargetBitmap
In(size_t n, const T* values) override;
const TargetBitmap
InApplyFilter(
size_t n,
const T* values,
const std::function<bool(size_t /* offset */)>& filter) override;
void
InApplyCallback(
size_t n,
const T* values,
const std::function<void(size_t /* offset */)>& callback) override;
const TargetBitmap
NotIn(size_t n, const T* values) override;
@ -160,11 +169,18 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
void
finish();
void
build_index(const std::vector<std::shared_ptr<FieldDataBase>>& field_datas);
void
build_index_for_array(
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas);
private:
std::shared_ptr<TantivyIndexWrapper> wrapper_;
TantivyConfig cfg_;
TantivyDataType d_type_;
std::string path_;
proto::schema::FieldSchema schema_;
/*
* To avoid IO amplification, we use both mem file manager & disk file manager

View File

@ -50,6 +50,20 @@ class ScalarIndex : public IndexBase {
virtual const TargetBitmap
In(size_t n, const T* values) = 0;
virtual const TargetBitmap
InApplyFilter(size_t n,
const T* values,
const std::function<bool(size_t /* offset */)>& filter) {
PanicInfo(ErrorCode::Unsupported, "InApplyFilter is not implemented");
}
virtual void
InApplyCallback(size_t n,
const T* values,
const std::function<void(size_t /* offset */)>& callback) {
PanicInfo(ErrorCode::Unsupported, "InApplyCallback is not implemented");
}
virtual const TargetBitmap
NotIn(size_t n, const T* values) = 0;

View File

@ -1,51 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include "storage/Types.h"
#include "tantivy-binding.h"
namespace milvus::index {
struct TantivyConfig {
DataType data_type_;
TantivyDataType
to_tantivy_data_type() {
switch (data_type_) {
case DataType::BOOL: {
return TantivyDataType::Bool;
}
case DataType::INT8:
case DataType::INT16:
case DataType::INT32:
case DataType::INT64: {
return TantivyDataType::I64;
}
case DataType::FLOAT:
case DataType::DOUBLE: {
return TantivyDataType::F64;
}
case DataType::VARCHAR: {
return TantivyDataType::Keyword;
}
default:
PanicInfo(
ErrorCode::NotImplemented,
fmt::format("not implemented data type: {}", data_type_));
}
}
};
} // namespace milvus::index

View File

@ -60,6 +60,7 @@ class IndexFactory {
case DataType::DOUBLE:
case DataType::VARCHAR:
case DataType::STRING:
case DataType::ARRAY:
return CreateScalarIndex(type, config, context);
case DataType::VECTOR_FLOAT:

View File

@ -84,29 +84,95 @@ CreateIndexV0(enum CDataType dtype,
return status;
}
milvus::storage::StorageConfig
get_storage_config(const milvus::proto::indexcgo::StorageConfig& config) {
auto storage_config = milvus::storage::StorageConfig();
storage_config.address = std::string(config.address());
storage_config.bucket_name = std::string(config.bucket_name());
storage_config.access_key_id = std::string(config.access_keyid());
storage_config.access_key_value = std::string(config.secret_access_key());
storage_config.root_path = std::string(config.root_path());
storage_config.storage_type = std::string(config.storage_type());
storage_config.cloud_provider = std::string(config.cloud_provider());
storage_config.iam_endpoint = std::string(config.iamendpoint());
storage_config.cloud_provider = std::string(config.cloud_provider());
storage_config.useSSL = config.usessl();
storage_config.sslCACert = config.sslcacert();
storage_config.useIAM = config.useiam();
storage_config.region = config.region();
storage_config.useVirtualHost = config.use_virtual_host();
storage_config.requestTimeoutMs = config.request_timeout_ms();
return storage_config;
}
milvus::OptFieldT
get_opt_field(const ::google::protobuf::RepeatedPtrField<
milvus::proto::indexcgo::OptionalFieldInfo>& field_infos) {
milvus::OptFieldT opt_fields_map;
for (const auto& field_info : field_infos) {
auto field_id = field_info.fieldid();
if (opt_fields_map.find(field_id) == opt_fields_map.end()) {
opt_fields_map[field_id] = {
field_info.field_name(),
static_cast<milvus::DataType>(field_info.field_type()),
{}};
}
for (const auto& str : field_info.data_paths()) {
std::get<2>(opt_fields_map[field_id]).emplace_back(str);
}
}
return opt_fields_map;
}
milvus::Config
get_config(std::unique_ptr<milvus::proto::indexcgo::BuildIndexInfo>& info) {
milvus::Config config;
for (auto i = 0; i < info->index_params().size(); ++i) {
const auto& param = info->index_params(i);
config[param.key()] = param.value();
}
for (auto i = 0; i < info->type_params().size(); ++i) {
const auto& param = info->type_params(i);
config[param.key()] = param.value();
}
config["insert_files"] = info->insert_files();
if (info->opt_fields().size()) {
config["opt_fields"] = get_opt_field(info->opt_fields());
}
return config;
}
CStatus
CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
CreateIndex(CIndex* res_index,
const uint8_t* serialized_build_index_info,
const uint64_t len) {
try {
auto build_index_info = (BuildIndexInfo*)c_build_index_info;
auto field_type = build_index_info->field_type;
auto build_index_info =
std::make_unique<milvus::proto::indexcgo::BuildIndexInfo>();
auto res =
build_index_info->ParseFromArray(serialized_build_index_info, len);
AssertInfo(res, "Unmarshall build index info failed");
auto field_type =
static_cast<DataType>(build_index_info->field_schema().data_type());
milvus::index::CreateIndexInfo index_info;
index_info.field_type = build_index_info->field_type;
auto& config = build_index_info->config;
config["insert_files"] = build_index_info->insert_files;
if (build_index_info->opt_fields.size()) {
config["opt_fields"] = build_index_info->opt_fields;
}
index_info.field_type = field_type;
auto storage_config =
get_storage_config(build_index_info->storage_config());
auto config = get_config(build_index_info);
// get index type
auto index_type = milvus::index::GetValueFromConfig<std::string>(
config, "index_type");
AssertInfo(index_type.has_value(), "index type is empty");
index_info.index_type = index_type.value();
auto engine_version = build_index_info->index_engine_version;
auto engine_version = build_index_info->current_index_version();
index_info.index_engine_version = engine_version;
config[milvus::index::INDEX_ENGINE_VERSION] =
std::to_string(engine_version);
@ -121,24 +187,31 @@ CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
// init file manager
milvus::storage::FieldDataMeta field_meta{
build_index_info->collection_id,
build_index_info->partition_id,
build_index_info->segment_id,
build_index_info->field_id};
build_index_info->collectionid(),
build_index_info->partitionid(),
build_index_info->segmentid(),
build_index_info->field_schema().fieldid(),
build_index_info->field_schema()};
milvus::storage::IndexMeta index_meta{build_index_info->segment_id,
build_index_info->field_id,
build_index_info->index_build_id,
build_index_info->index_version};
auto chunk_manager = milvus::storage::CreateChunkManager(
build_index_info->storage_config);
milvus::storage::IndexMeta index_meta{
build_index_info->segmentid(),
build_index_info->field_schema().fieldid(),
build_index_info->buildid(),
build_index_info->index_version(),
"",
build_index_info->field_schema().name(),
field_type,
build_index_info->dim(),
};
auto chunk_manager =
milvus::storage::CreateChunkManager(storage_config);
milvus::storage::FileManagerContext fileManagerContext(
field_meta, index_meta, chunk_manager);
auto index =
milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex(
build_index_info->field_type, config, fileManagerContext);
field_type, config, fileManagerContext);
index->Build();
*res_index = index.release();
auto status = CStatus();
@ -159,22 +232,32 @@ CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
}
CStatus
CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
CreateIndexV2(CIndex* res_index,
const uint8_t* serialized_build_index_info,
const uint64_t len) {
try {
auto build_index_info = (BuildIndexInfo*)c_build_index_info;
auto field_type = build_index_info->field_type;
milvus::index::CreateIndexInfo index_info;
index_info.field_type = build_index_info->field_type;
index_info.dim = build_index_info->dim;
auto build_index_info =
std::make_unique<milvus::proto::indexcgo::BuildIndexInfo>();
auto res =
build_index_info->ParseFromArray(serialized_build_index_info, len);
AssertInfo(res, "Unmarshall build index info failed");
auto field_type =
static_cast<DataType>(build_index_info->field_schema().data_type());
auto& config = build_index_info->config;
milvus::index::CreateIndexInfo index_info;
index_info.field_type = field_type;
index_info.dim = build_index_info->dim();
auto storage_config =
get_storage_config(build_index_info->storage_config());
auto config = get_config(build_index_info);
// get index type
auto index_type = milvus::index::GetValueFromConfig<std::string>(
config, "index_type");
AssertInfo(index_type.has_value(), "index type is empty");
index_info.index_type = index_type.value();
auto engine_version = build_index_info->index_engine_version;
auto engine_version = build_index_info->current_index_version();
index_info.index_engine_version = engine_version;
config[milvus::index::INDEX_ENGINE_VERSION] =
std::to_string(engine_version);
@ -188,39 +271,39 @@ CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
}
milvus::storage::FieldDataMeta field_meta{
build_index_info->collection_id,
build_index_info->partition_id,
build_index_info->segment_id,
build_index_info->field_id};
build_index_info->collectionid(),
build_index_info->partitionid(),
build_index_info->segmentid(),
build_index_info->field_schema().fieldid()};
milvus::storage::IndexMeta index_meta{
build_index_info->segment_id,
build_index_info->field_id,
build_index_info->index_build_id,
build_index_info->index_version,
build_index_info->field_name,
build_index_info->segmentid(),
build_index_info->field_schema().fieldid(),
build_index_info->buildid(),
build_index_info->index_version(),
"",
build_index_info->field_type,
build_index_info->dim,
build_index_info->field_schema().name(),
field_type,
build_index_info->dim(),
};
auto store_space = milvus_storage::Space::Open(
build_index_info->data_store_path,
build_index_info->store_path(),
milvus_storage::Options{nullptr,
build_index_info->data_store_version});
build_index_info->store_version()});
AssertInfo(store_space.ok() && store_space.has_value(),
"create space failed: {}",
store_space.status().ToString());
auto index_space = milvus_storage::Space::Open(
build_index_info->index_store_path,
build_index_info->index_store_path(),
milvus_storage::Options{.schema = store_space.value()->schema()});
AssertInfo(index_space.ok() && index_space.has_value(),
"create space failed: {}",
index_space.status().ToString());
LOG_INFO("init space success");
auto chunk_manager = milvus::storage::CreateChunkManager(
build_index_info->storage_config);
auto chunk_manager =
milvus::storage::CreateChunkManager(storage_config);
milvus::storage::FileManagerContext fileManagerContext(
field_meta,
index_meta,
@ -229,9 +312,9 @@ CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
auto index =
milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex(
build_index_info->field_type,
build_index_info->field_name,
build_index_info->dim,
field_type,
build_index_info->field_schema().name(),
build_index_info->dim(),
config,
fileManagerContext,
std::move(store_space.value()));

View File

@ -28,7 +28,9 @@ CreateIndexV0(enum CDataType dtype,
CIndex* res_index);
CStatus
CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info);
CreateIndex(CIndex* res_index,
const uint8_t* serialized_build_index_info,
const uint64_t len);
CStatus
DeleteIndex(CIndex index);
@ -130,7 +132,9 @@ CStatus
SerializeIndexAndUpLoadV2(CIndex index, CBinarySet* c_binary_set);
CStatus
CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info);
CreateIndexV2(CIndex* res_index,
const uint8_t* serialized_build_index_info,
const uint64_t len);
CStatus
AppendIndexStorageInfo(CBuildIndexInfo c_build_index_info,

View File

@ -11,12 +11,10 @@
find_package(Protobuf REQUIRED)
file(GLOB_RECURSE milvus_proto_srcs
"${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
add_library(milvus_proto STATIC
common.pb.cc
index_cgo_msg.pb.cc
plan.pb.cc
schema.pb.cc
segcore.pb.cc
${milvus_proto_srcs}
)
message(STATUS "milvus proto sources: " ${milvus_proto_srcs})

View File

@ -46,6 +46,7 @@ struct LoadIndexInfo {
std::string uri;
int64_t index_store_version;
IndexVersion index_engine_version;
proto::schema::FieldSchema schema;
};
} // namespace milvus::segcore

View File

@ -25,6 +25,7 @@
#include "storage/Util.h"
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/LocalChunkManagerSingleton.h"
#include "pb/cgo_msg.pb.h"
bool
IsLoadWithDisk(const char* index_type, int index_engine_version) {
@ -258,7 +259,8 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
load_index_info->collection_id,
load_index_info->partition_id,
load_index_info->segment_id,
load_index_info->field_id};
load_index_info->field_id,
load_index_info->schema};
milvus::storage::IndexMeta index_meta{load_index_info->segment_id,
load_index_info->field_id,
load_index_info->index_build_id,
@ -484,3 +486,50 @@ AppendStorageInfo(CLoadIndexInfo c_load_index_info,
load_index_info->uri = uri;
load_index_info->index_store_version = version;
}
CStatus
FinishLoadIndexInfo(CLoadIndexInfo c_load_index_info,
const uint8_t* serialized_load_index_info,
const uint64_t len) {
try {
auto info_proto = std::make_unique<milvus::proto::cgo::LoadIndexInfo>();
info_proto->ParseFromArray(serialized_load_index_info, len);
auto load_index_info =
static_cast<milvus::segcore::LoadIndexInfo*>(c_load_index_info);
// TODO: keep this since LoadIndexInfo is used by SegmentSealed.
{
load_index_info->collection_id = info_proto->collectionid();
load_index_info->partition_id = info_proto->partitionid();
load_index_info->segment_id = info_proto->segmentid();
load_index_info->field_id = info_proto->field().fieldid();
load_index_info->field_type =
static_cast<milvus::DataType>(info_proto->field().data_type());
load_index_info->enable_mmap = info_proto->enable_mmap();
load_index_info->mmap_dir_path = info_proto->mmap_dir_path();
load_index_info->index_id = info_proto->indexid();
load_index_info->index_build_id = info_proto->index_buildid();
load_index_info->index_version = info_proto->index_version();
for (const auto& [k, v] : info_proto->index_params()) {
load_index_info->index_params[k] = v;
}
load_index_info->index_files.assign(
info_proto->index_files().begin(),
info_proto->index_files().end());
load_index_info->uri = info_proto->uri();
load_index_info->index_store_version =
info_proto->index_store_version();
load_index_info->index_engine_version =
info_proto->index_engine_version();
load_index_info->schema = info_proto->field();
}
auto status = CStatus();
status.error_code = milvus::Success;
status.error_msg = "";
return status;
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = milvus::UnexpectedError;
status.error_msg = strdup(e.what());
return status;
}
}

View File

@ -76,6 +76,11 @@ void
AppendStorageInfo(CLoadIndexInfo c_load_index_info,
const char* uri,
int64_t version);
CStatus
FinishLoadIndexInfo(CLoadIndexInfo c_load_index_info,
const uint8_t* serialized_load_index_info,
const uint64_t len);
#ifdef __cplusplus
}
#endif

View File

@ -64,6 +64,7 @@ struct FieldDataMeta {
int64_t partition_id;
int64_t segment_id;
int64_t field_id;
proto::schema::FieldSchema schema;
};
enum CodecType {

View File

@ -71,3 +71,9 @@ target_link_libraries(bench_tantivy
boost_filesystem
dl
)
add_executable(ffi_demo ffi_demo.cpp)
target_link_libraries(ffi_demo
tantivy_binding
dl
)

View File

@ -0,0 +1,17 @@
#include <string>
#include <vector>
#include "tantivy-binding.h"
int
main(int argc, char* argv[]) {
std::vector<std::string> data{"data1", "data2", "data3"};
std::vector<const char*> datas{};
for (auto& s : data) {
datas.push_back(s.c_str());
}
print_vector_of_strings(datas.data(), datas.size());
return 0;
}

View File

@ -97,6 +97,24 @@ void tantivy_index_add_bools(void *ptr, const bool *array, uintptr_t len);
void tantivy_index_add_keyword(void *ptr, const char *s);
void tantivy_index_add_multi_int8s(void *ptr, const int8_t *array, uintptr_t len);
void tantivy_index_add_multi_int16s(void *ptr, const int16_t *array, uintptr_t len);
void tantivy_index_add_multi_int32s(void *ptr, const int32_t *array, uintptr_t len);
void tantivy_index_add_multi_int64s(void *ptr, const int64_t *array, uintptr_t len);
void tantivy_index_add_multi_f32s(void *ptr, const float *array, uintptr_t len);
void tantivy_index_add_multi_f64s(void *ptr, const double *array, uintptr_t len);
void tantivy_index_add_multi_bools(void *ptr, const bool *array, uintptr_t len);
void tantivy_index_add_multi_keywords(void *ptr, const char *const *array, uintptr_t len);
bool tantivy_index_exist(const char *path);
void print_vector_of_strings(const char *const *ptr, uintptr_t len);
} // extern "C"

View File

@ -0,0 +1,14 @@
use std::{ffi::{c_char, CStr}, slice};
#[no_mangle]
pub extern "C" fn print_vector_of_strings(ptr: *const *const c_char, len: usize) {
let arr : &[*const c_char] = unsafe {
slice::from_raw_parts(ptr, len)
};
for element in arr {
let c_str = unsafe {
CStr::from_ptr(*element)
};
println!("{}", c_str.to_str().unwrap());
}
}

View File

@ -1,10 +1,11 @@
use futures::executor::block_on;
use std::ffi::CStr;
use libc::c_char;
use tantivy::schema::{Field, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, INDEXED};
use tantivy::{doc, tokenizer, Index, IndexWriter, SingleSegmentIndexWriter};
use tantivy::{doc, tokenizer, Index, SingleSegmentIndexWriter, Document};
use crate::data_type::TantivyDataType;
use crate::index_writer;
use crate::log::init_log;
pub struct IndexWriterWrapper {
@ -98,7 +99,74 @@ impl IndexWriterWrapper {
.unwrap();
}
pub fn finish(mut self) {
pub fn add_multi_i8s(&mut self, datas: &[i8]) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data as i64);
}
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_i16s(&mut self, datas: &[i16]) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data as i64);
}
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_i32s(&mut self, datas: &[i32]) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data as i64);
}
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_i64s(&mut self, datas: &[i64]) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data);
}
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_f32s(&mut self, datas: &[f32]) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data as f64);
}
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_f64s(&mut self, datas: &[f64]) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data);
}
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_bools(&mut self, datas: &[bool]) {
let mut document = Document::default();
for data in datas {
document.add_field_value(self.field, *data);
}
self.index_writer.add_document(document).unwrap();
}
pub fn add_multi_keywords(&mut self, datas: &[*const c_char]) {
let mut document = Document::default();
for element in datas {
let data = unsafe {
CStr::from_ptr(*element)
};
document.add_field_value(self.field, data.to_str().unwrap());
}
self.index_writer.add_document(document).unwrap();
}
pub fn finish(self) {
self.index_writer
.finalize()
.expect("failed to build inverted index");

View File

@ -122,3 +122,77 @@ pub extern "C" fn tantivy_index_add_keyword(ptr: *mut c_void, s: *const c_char)
let c_str = unsafe { CStr::from_ptr(s) };
unsafe { (*real).add_keyword(c_str.to_str().unwrap()) }
}
// --------------------------------------------- array ------------------------------------------
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_int8s(ptr: *mut c_void, array: *const i8, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len);
(*real).add_multi_i8s(arr)
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_int16s(ptr: *mut c_void, array: *const i16, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len) ;
(*real).add_multi_i16s(arr);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_int32s(ptr: *mut c_void, array: *const i32, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len) ;
(*real).add_multi_i32s(arr);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_int64s(ptr: *mut c_void, array: *const i64, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len) ;
(*real).add_multi_i64s(arr);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_f32s(ptr: *mut c_void, array: *const f32, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len) ;
(*real).add_multi_f32s(arr);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_f64s(ptr: *mut c_void, array: *const f64, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len) ;
(*real).add_multi_f64s(arr);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_bools(ptr: *mut c_void, array: *const bool, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len) ;
(*real).add_multi_bools(arr);
}
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_multi_keywords(ptr: *mut c_void, array: *const *const c_char, len: usize) {
let real = ptr as *mut IndexWriterWrapper;
unsafe {
let arr = slice::from_raw_parts(array, len);
(*real).add_multi_keywords(arr)
}
}

View File

@ -10,6 +10,7 @@ mod log;
mod util;
mod util_c;
mod vec_collector;
mod demo_c;
pub fn add(left: usize, right: usize) -> usize {
left + right

View File

@ -1,5 +1,7 @@
#include <sstream>
#include <fmt/format.h>
#include <set>
#include <iostream>
#include "tantivy-binding.h"
namespace milvus::tantivy {
@ -186,6 +188,60 @@ struct TantivyIndexWrapper {
typeid(T).name());
}
template <typename T>
void
add_multi_data(const T* array, uintptr_t len) {
assert(!finished_);
if constexpr (std::is_same_v<T, bool>) {
tantivy_index_add_multi_bools(writer_, array, len);
return;
}
if constexpr (std::is_same_v<T, int8_t>) {
tantivy_index_add_multi_int8s(writer_, array, len);
return;
}
if constexpr (std::is_same_v<T, int16_t>) {
tantivy_index_add_multi_int16s(writer_, array, len);
return;
}
if constexpr (std::is_same_v<T, int32_t>) {
tantivy_index_add_multi_int32s(writer_, array, len);
return;
}
if constexpr (std::is_same_v<T, int64_t>) {
tantivy_index_add_multi_int64s(writer_, array, len);
return;
}
if constexpr (std::is_same_v<T, float>) {
tantivy_index_add_multi_f32s(writer_, array, len);
return;
}
if constexpr (std::is_same_v<T, double>) {
tantivy_index_add_multi_f64s(writer_, array, len);
return;
}
if constexpr (std::is_same_v<T, std::string>) {
std::vector<const char*> views;
for (uintptr_t i = 0; i < len; i++) {
views.push_back(array[i].c_str());
}
tantivy_index_add_multi_keywords(writer_, views.data(), len);
return;
}
throw fmt::format(
"InvertedIndex.add_multi_data: unsupported data type: {}",
typeid(T).name());
}
inline void
finish() {
if (!finished_) {

View File

@ -200,6 +200,83 @@ test_32717() {
}
}
std::set<uint32_t>
to_set(const RustArrayWrapper& w) {
std::set<uint32_t> s(w.array_.array, w.array_.array + w.array_.len);
return s;
}
template <typename T>
std::map<T, std::set<uint32_t>>
build_inverted_index(const std::vector<std::vector<T>>& vec_of_array) {
std::map<T, std::set<uint32_t>> inverted_index;
for (uint32_t i = 0; i < vec_of_array.size(); i++) {
for (const auto& term : vec_of_array[i]) {
inverted_index[term].insert(i);
}
}
return inverted_index;
}
void
test_array_int() {
using T = int64_t;
auto path = "/tmp/inverted-index/test-binding/";
boost::filesystem::remove_all(path);
boost::filesystem::create_directories(path);
auto w = TantivyIndexWrapper("test_field_name", guess_data_type<T>(), path);
std::vector<std::vector<T>> vec_of_array{
{10, 40, 50},
{20, 50},
{10, 50, 60},
};
for (const auto& arr : vec_of_array) {
w.add_multi_data(arr.data(), arr.size());
}
w.finish();
assert(w.count() == vec_of_array.size());
auto inverted_index = build_inverted_index(vec_of_array);
for (const auto& [term, posting_list] : inverted_index) {
auto hits = to_set(w.term_query(term));
assert(posting_list == hits);
}
}
void
test_array_string() {
using T = std::string;
auto path = "/tmp/inverted-index/test-binding/";
boost::filesystem::remove_all(path);
boost::filesystem::create_directories(path);
auto w =
TantivyIndexWrapper("test_field_name", TantivyDataType::Keyword, path);
std::vector<std::vector<T>> vec_of_array{
{"10", "40", "50"},
{"20", "50"},
{"10", "50", "60"},
};
for (const auto& arr : vec_of_array) {
w.add_multi_data(arr.data(), arr.size());
}
w.finish();
assert(w.count() == vec_of_array.size());
auto inverted_index = build_inverted_index(vec_of_array);
for (const auto& [term, posting_list] : inverted_index) {
auto hits = to_set(w.term_query(term));
assert(posting_list == hits);
}
}
int
main(int argc, char* argv[]) {
test_32717();
@ -216,5 +293,8 @@ main(int argc, char* argv[]) {
run<std::string>();
test_array_int();
test_array_string();
return 0;
}

View File

@ -66,6 +66,7 @@ set(MILVUS_TEST_FILES
test_group_by.cpp
test_regex_query_util.cpp
test_regex_query.cpp
test_array_inverted_index.cpp
)
if ( BUILD_DISK_ANN STREQUAL "ON" )

View File

@ -0,0 +1,297 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICEN_SE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRAN_TIES OR CON_DITION_S OF AN_Y KIN_D, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include <regex>
#include "pb/plan.pb.h"
#include "index/InvertedIndexTantivy.h"
#include "common/Schema.h"
#include "segcore/SegmentSealedImpl.h"
#include "test_utils/DataGen.h"
#include "test_utils/GenExprProto.h"
#include "query/PlanProto.h"
#include "query/generated/ExecPlanNodeVisitor.h"
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
template <typename T>
SchemaPtr
GenTestSchema() {
auto schema_ = std::make_shared<Schema>();
schema_->AddDebugField(
"fvec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto pk = schema_->AddDebugField("pk", DataType::INT64);
schema_->set_primary_field_id(pk);
if constexpr (std::is_same_v<T, bool>) {
schema_->AddDebugArrayField("array", DataType::BOOL);
} else if constexpr (std::is_same_v<T, int8_t>) {
schema_->AddDebugArrayField("array", DataType::INT8);
} else if constexpr (std::is_same_v<T, int16_t>) {
schema_->AddDebugArrayField("array", DataType::INT16);
} else if constexpr (std::is_same_v<T, int32_t>) {
schema_->AddDebugArrayField("array", DataType::INT32);
} else if constexpr (std::is_same_v<T, int64_t>) {
schema_->AddDebugArrayField("array", DataType::INT64);
} else if constexpr (std::is_same_v<T, float>) {
schema_->AddDebugArrayField("array", DataType::FLOAT);
} else if constexpr (std::is_same_v<T, double>) {
schema_->AddDebugArrayField("array", DataType::DOUBLE);
} else if constexpr (std::is_same_v<T, std::string>) {
schema_->AddDebugArrayField("array", DataType::VARCHAR);
}
return schema_;
}
template <typename T>
class ArrayInvertedIndexTest : public ::testing::Test {
public:
void
SetUp() override {
schema_ = GenTestSchema<T>();
seg_ = CreateSealedSegment(schema_);
N_ = 3000;
uint64_t seed = 19190504;
auto raw_data = DataGen(schema_, N_, seed);
auto array_col =
raw_data.get_col(schema_->get_field_id(FieldName("array")))
->scalars()
.array_data()
.data();
for (size_t i = 0; i < N_; i++) {
boost::container::vector<T> array;
if constexpr (std::is_same_v<T, bool>) {
for (size_t j = 0; j < array_col[i].bool_data().data_size();
j++) {
array.push_back(array_col[i].bool_data().data(j));
}
} else if constexpr (std::is_same_v<T, int64_t>) {
for (size_t j = 0; j < array_col[i].long_data().data_size();
j++) {
array.push_back(array_col[i].long_data().data(j));
}
} else if constexpr (std::is_integral_v<T>) {
for (size_t j = 0; j < array_col[i].int_data().data_size();
j++) {
array.push_back(array_col[i].int_data().data(j));
}
} else if constexpr (std::is_floating_point_v<T>) {
for (size_t j = 0; j < array_col[i].float_data().data_size();
j++) {
array.push_back(array_col[i].float_data().data(j));
}
} else if constexpr (std::is_same_v<T, std::string>) {
for (size_t j = 0; j < array_col[i].string_data().data_size();
j++) {
array.push_back(array_col[i].string_data().data(j));
}
}
vec_of_array_.push_back(array);
}
SealedLoadFieldData(raw_data, *seg_);
LoadInvertedIndex();
}
void
TearDown() override {
}
void
LoadInvertedIndex() {
auto index = std::make_unique<index::InvertedIndexTantivy<T>>();
Config cfg;
cfg["is_array"] = true;
index->BuildWithRawData(N_, vec_of_array_.data(), cfg);
LoadIndexInfo info{
.field_id = schema_->get_field_id(FieldName("array")).get(),
.index = std::move(index),
};
seg_->LoadIndex(info);
}
public:
SchemaPtr schema_;
SegmentSealedUPtr seg_;
int64_t N_;
std::vector<boost::container::vector<T>> vec_of_array_;
};
TYPED_TEST_SUITE_P(ArrayInvertedIndexTest);
TYPED_TEST_P(ArrayInvertedIndexTest, ArrayContainsAny) {
const auto& meta = this->schema_->operator[](FieldName("array"));
auto column_info = test::GenColumnInfo(
meta.get_id().get(),
static_cast<proto::schema::DataType>(meta.get_data_type()),
false,
false,
static_cast<proto::schema::DataType>(meta.get_element_type()));
auto contains_expr = std::make_unique<proto::plan::JSONContainsExpr>();
contains_expr->set_allocated_column_info(column_info);
contains_expr->set_op(proto::plan::JSONContainsExpr_JSONOp::
JSONContainsExpr_JSONOp_ContainsAny);
contains_expr->set_elements_same_type(true);
for (const auto& elem : this->vec_of_array_[0]) {
auto t = test::GenGenericValue(elem);
contains_expr->mutable_elements()->AddAllocated(t);
}
auto expr = test::GenExpr();
expr->set_allocated_json_contains_expr(contains_expr.release());
auto parser = ProtoParser(*this->schema_);
auto typed_expr = parser.ParseExprs(*expr);
auto parsed =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, typed_expr);
auto segpromote = dynamic_cast<SegmentSealedImpl*>(this->seg_.get());
query::ExecPlanNodeVisitor visitor(*segpromote, MAX_TIMESTAMP);
BitsetType final;
visitor.ExecuteExprNode(parsed, segpromote, this->N_, final);
std::unordered_set<TypeParam> elems(this->vec_of_array_[0].begin(),
this->vec_of_array_[0].end());
auto ref = [this, &elems](size_t offset) -> bool {
std::unordered_set<TypeParam> row(this->vec_of_array_[offset].begin(),
this->vec_of_array_[offset].end());
for (const auto& elem : elems) {
if (row.find(elem) != row.end()) {
return true;
}
}
return false;
};
ASSERT_EQ(final.size(), this->N_);
for (size_t i = 0; i < this->N_; i++) {
ASSERT_EQ(final[i], ref(i)) << "i: " << i << ", final[i]: " << final[i]
<< ", ref(i): " << ref(i);
}
}
TYPED_TEST_P(ArrayInvertedIndexTest, ArrayContainsAll) {
const auto& meta = this->schema_->operator[](FieldName("array"));
auto column_info = test::GenColumnInfo(
meta.get_id().get(),
static_cast<proto::schema::DataType>(meta.get_data_type()),
false,
false,
static_cast<proto::schema::DataType>(meta.get_element_type()));
auto contains_expr = std::make_unique<proto::plan::JSONContainsExpr>();
contains_expr->set_allocated_column_info(column_info);
contains_expr->set_op(proto::plan::JSONContainsExpr_JSONOp::
JSONContainsExpr_JSONOp_ContainsAll);
contains_expr->set_elements_same_type(true);
for (const auto& elem : this->vec_of_array_[0]) {
auto t = test::GenGenericValue(elem);
contains_expr->mutable_elements()->AddAllocated(t);
}
auto expr = test::GenExpr();
expr->set_allocated_json_contains_expr(contains_expr.release());
auto parser = ProtoParser(*this->schema_);
auto typed_expr = parser.ParseExprs(*expr);
auto parsed =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, typed_expr);
auto segpromote = dynamic_cast<SegmentSealedImpl*>(this->seg_.get());
query::ExecPlanNodeVisitor visitor(*segpromote, MAX_TIMESTAMP);
BitsetType final;
visitor.ExecuteExprNode(parsed, segpromote, this->N_, final);
std::unordered_set<TypeParam> elems(this->vec_of_array_[0].begin(),
this->vec_of_array_[0].end());
auto ref = [this, &elems](size_t offset) -> bool {
std::unordered_set<TypeParam> row(this->vec_of_array_[offset].begin(),
this->vec_of_array_[offset].end());
for (const auto& elem : elems) {
if (row.find(elem) == row.end()) {
return false;
}
}
return true;
};
ASSERT_EQ(final.size(), this->N_);
for (size_t i = 0; i < this->N_; i++) {
ASSERT_EQ(final[i], ref(i)) << "i: " << i << ", final[i]: " << final[i]
<< ", ref(i): " << ref(i);
}
}
TYPED_TEST_P(ArrayInvertedIndexTest, ArrayEqual) {
if (std::is_floating_point_v<TypeParam>) {
GTEST_SKIP() << "not accurate to perform equal comparison on floating "
"point number";
}
const auto& meta = this->schema_->operator[](FieldName("array"));
auto column_info = test::GenColumnInfo(
meta.get_id().get(),
static_cast<proto::schema::DataType>(meta.get_data_type()),
false,
false,
static_cast<proto::schema::DataType>(meta.get_element_type()));
auto unary_range_expr = std::make_unique<proto::plan::UnaryRangeExpr>();
unary_range_expr->set_allocated_column_info(column_info);
unary_range_expr->set_op(proto::plan::OpType::Equal);
auto arr = new proto::plan::GenericValue;
arr->mutable_array_val()->set_element_type(
static_cast<proto::schema::DataType>(meta.get_element_type()));
arr->mutable_array_val()->set_same_type(true);
for (const auto& elem : this->vec_of_array_[0]) {
auto e = test::GenGenericValue(elem);
arr->mutable_array_val()->mutable_array()->AddAllocated(e);
}
unary_range_expr->set_allocated_value(arr);
auto expr = test::GenExpr();
expr->set_allocated_unary_range_expr(unary_range_expr.release());
auto parser = ProtoParser(*this->schema_);
auto typed_expr = parser.ParseExprs(*expr);
auto parsed =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, typed_expr);
auto segpromote = dynamic_cast<SegmentSealedImpl*>(this->seg_.get());
query::ExecPlanNodeVisitor visitor(*segpromote, MAX_TIMESTAMP);
BitsetType final;
visitor.ExecuteExprNode(parsed, segpromote, this->N_, final);
auto ref = [this](size_t offset) -> bool {
if (this->vec_of_array_[0].size() !=
this->vec_of_array_[offset].size()) {
return false;
}
auto size = this->vec_of_array_[0].size();
for (size_t i = 0; i < size; i++) {
if (this->vec_of_array_[0][i] != this->vec_of_array_[offset][i]) {
return false;
}
}
return true;
};
ASSERT_EQ(final.size(), this->N_);
for (size_t i = 0; i < this->N_; i++) {
ASSERT_EQ(final[i], ref(i)) << "i: " << i << ", final[i]: " << final[i]
<< ", ref(i): " << ref(i);
}
}
using ElementType = testing::
Types<bool, int8_t, int16_t, int32_t, int64_t, float, double, std::string>;
REGISTER_TYPED_TEST_CASE_P(ArrayInvertedIndexTest,
ArrayContainsAny,
ArrayContainsAll,
ArrayEqual);
INSTANTIATE_TYPED_TEST_SUITE_P(Naive, ArrayInvertedIndexTest, ElementType);

View File

@ -23,7 +23,7 @@
using namespace milvus;
using namespace milvus::segcore;
using namespace milvus::proto::indexcgo;
using namespace milvus::proto;
using Param = std::pair<knowhere::IndexType, knowhere::MetricType>;

View File

@ -25,20 +25,25 @@
using namespace milvus;
// TODO: I would suggest that our all indexes use this test to simulate the real production environment.
namespace milvus::test {
auto
gen_field_meta(int64_t collection_id = 1,
int64_t partition_id = 2,
int64_t segment_id = 3,
int64_t field_id = 101) -> storage::FieldDataMeta {
return storage::FieldDataMeta{
int64_t field_id = 101,
DataType data_type = DataType::NONE,
DataType element_type = DataType::NONE)
-> storage::FieldDataMeta {
auto meta = storage::FieldDataMeta{
.collection_id = collection_id,
.partition_id = partition_id,
.segment_id = segment_id,
.field_id = field_id,
};
meta.schema.set_data_type(static_cast<proto::schema::DataType>(data_type));
meta.schema.set_element_type(
static_cast<proto::schema::DataType>(element_type));
return meta;
}
auto
@ -86,7 +91,7 @@ struct ChunkManagerWrapper {
};
} // namespace milvus::test
template <typename T, DataType dtype>
template <typename T, DataType dtype, DataType element_type = DataType::NONE>
void
test_run() {
int64_t collection_id = 1;
@ -96,8 +101,8 @@ test_run() {
int64_t index_build_id = 1000;
int64_t index_version = 10000;
auto field_meta =
test::gen_field_meta(collection_id, partition_id, segment_id, field_id);
auto field_meta = test::gen_field_meta(
collection_id, partition_id, segment_id, field_id, dtype, element_type);
auto index_meta = test::gen_index_meta(
segment_id, field_id, index_build_id, index_version);
@ -305,8 +310,12 @@ test_string() {
int64_t index_build_id = 1000;
int64_t index_version = 10000;
auto field_meta =
test::gen_field_meta(collection_id, partition_id, segment_id, field_id);
auto field_meta = test::gen_field_meta(collection_id,
partition_id,
segment_id,
field_id,
dtype,
DataType::NONE);
auto index_meta = test::gen_index_meta(
segment_id, field_id, index_build_id, index_version);

View File

@ -49,6 +49,14 @@ TYPED_TEST_P(TypedScalarIndexTest, Dummy) {
std::cout << milvus::GetDType<T>() << std::endl;
}
auto
GetTempFileManagerCtx(CDataType data_type) {
auto ctx = milvus::storage::FileManagerContext();
ctx.fieldDataMeta.schema.set_data_type(
static_cast<milvus::proto::schema::DataType>(data_type));
return ctx;
}
TYPED_TEST_P(TypedScalarIndexTest, Constructor) {
using T = TypeParam;
auto dtype = milvus::GetDType<T>();
@ -59,7 +67,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Constructor) {
create_index_info.index_type = index_type;
auto index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info);
create_index_info, GetTempFileManagerCtx(dtype));
}
}
@ -73,7 +81,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Count) {
create_index_info.index_type = index_type;
auto index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info);
create_index_info, GetTempFileManagerCtx(dtype));
auto scalar_index =
dynamic_cast<milvus::index::ScalarIndex<T>*>(index.get());
auto arr = GenSortedArr<T>(nb);
@ -92,7 +100,7 @@ TYPED_TEST_P(TypedScalarIndexTest, HasRawData) {
create_index_info.index_type = index_type;
auto index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info);
create_index_info, GetTempFileManagerCtx(dtype));
auto scalar_index =
dynamic_cast<milvus::index::ScalarIndex<T>*>(index.get());
auto arr = GenSortedArr<T>(nb);
@ -112,7 +120,7 @@ TYPED_TEST_P(TypedScalarIndexTest, In) {
create_index_info.index_type = index_type;
auto index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info);
create_index_info, GetTempFileManagerCtx(dtype));
auto scalar_index =
dynamic_cast<milvus::index::ScalarIndex<T>*>(index.get());
auto arr = GenSortedArr<T>(nb);
@ -131,7 +139,7 @@ TYPED_TEST_P(TypedScalarIndexTest, NotIn) {
create_index_info.index_type = index_type;
auto index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info);
create_index_info, GetTempFileManagerCtx(dtype));
auto scalar_index =
dynamic_cast<milvus::index::ScalarIndex<T>*>(index.get());
auto arr = GenSortedArr<T>(nb);
@ -150,7 +158,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Reverse) {
create_index_info.index_type = index_type;
auto index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info);
create_index_info, GetTempFileManagerCtx(dtype));
auto scalar_index =
dynamic_cast<milvus::index::ScalarIndex<T>*>(index.get());
auto arr = GenSortedArr<T>(nb);
@ -169,7 +177,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Range) {
create_index_info.index_type = index_type;
auto index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info);
create_index_info, GetTempFileManagerCtx(dtype));
auto scalar_index =
dynamic_cast<milvus::index::ScalarIndex<T>*>(index.get());
auto arr = GenSortedArr<T>(nb);
@ -188,7 +196,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Codec) {
create_index_info.index_type = index_type;
auto index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info);
create_index_info, GetTempFileManagerCtx(dtype));
auto scalar_index =
dynamic_cast<milvus::index::ScalarIndex<T>*>(index.get());
auto arr = GenSortedArr<T>(nb);
@ -197,7 +205,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Codec) {
auto binary_set = index->Serialize(nullptr);
auto copy_index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info);
create_index_info, GetTempFileManagerCtx(dtype));
copy_index->Load(binary_set);
auto copy_scalar_index =
@ -368,6 +376,8 @@ TYPED_TEST_P(TypedScalarIndexTestV2, Base) {
auto space = TestSpace<T>(temp_path, vec_size, dataset, scalars);
milvus::storage::FileManagerContext file_manager_context(
{}, {.field_name = "scalar"}, chunk_manager, space);
file_manager_context.fieldDataMeta.schema.set_data_type(
static_cast<milvus::proto::schema::DataType>(dtype));
auto index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info, file_manager_context, space);

View File

@ -480,8 +480,30 @@ inline GeneratedData DataGen(SchemaPtr schema,
}
break;
}
case DataType::INT8:
case DataType::INT16:
case DataType::INT8: {
for (int i = 0; i < N / repeat_count; i++) {
milvus::proto::schema::ScalarField field_data;
for (int j = 0; j < array_len; j++) {
field_data.mutable_int_data()->add_data(
static_cast<int8_t>(random()));
}
data[i] = field_data;
}
break;
}
case DataType::INT16: {
for (int i = 0; i < N / repeat_count; i++) {
milvus::proto::schema::ScalarField field_data;
for (int j = 0; j < array_len; j++) {
field_data.mutable_int_data()->add_data(
static_cast<int16_t>(random()));
}
data[i] = field_data;
}
break;
}
case DataType::INT32: {
for (int i = 0; i < N / repeat_count; i++) {
milvus::proto::schema::ScalarField field_data;

View File

@ -15,15 +15,18 @@
namespace milvus::test {
inline auto
GenColumnInfo(int64_t field_id,
proto::schema::DataType field_type,
bool auto_id,
bool is_pk) {
GenColumnInfo(
int64_t field_id,
proto::schema::DataType field_type,
bool auto_id,
bool is_pk,
proto::schema::DataType element_type = proto::schema::DataType::None) {
auto column_info = new proto::plan::ColumnInfo();
column_info->set_field_id(field_id);
column_info->set_data_type(field_type);
column_info->set_is_autoid(auto_id);
column_info->set_is_primary_key(is_pk);
column_info->set_element_type(element_type);
return column_info;
}

View File

@ -347,28 +347,29 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
}
}
var req *indexpb.CreateJobRequest
collectionInfo, err := ib.handler.GetCollection(ib.ctx, segment.GetCollectionID())
if err != nil {
log.Ctx(ib.ctx).Info("index builder get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err))
return false
}
schema := collectionInfo.Schema
var field *schemapb.FieldSchema
for _, f := range schema.Fields {
if f.FieldID == fieldID {
field = f
break
}
}
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
log.Ctx(ib.ctx).Warn("failed to get dim from field type params",
zap.String("field type", field.GetDataType().String()), zap.Error(err))
// don't return, maybe field is scalar field or sparseFloatVector
}
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
collectionInfo, err := ib.handler.GetCollection(ib.ctx, segment.GetCollectionID())
if err != nil {
log.Info("index builder get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err))
return false
}
schema := collectionInfo.Schema
var field *schemapb.FieldSchema
for _, f := range schema.Fields {
if f.FieldID == fieldID {
field = f
break
}
}
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
return false
}
storePath, err := itypeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segment.GetID())
if err != nil {
log.Ctx(ib.ctx).Warn("failed to get storage uri", zap.Error(err))
@ -402,6 +403,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
DataIds: binlogIDs,
OptionalScalarFields: optionalFields,
Field: field,
}
} else {
req = &indexpb.CreateJobRequest{
@ -420,6 +422,8 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
SegmentID: segment.GetID(),
FieldID: fieldID,
OptionalScalarFields: optionalFields,
Dim: int64(dim),
Field: field,
}
}

View File

@ -675,7 +675,30 @@ func TestIndexBuilder(t *testing.T) {
chunkManager := &mocks.ChunkManager{}
chunkManager.EXPECT().RootPath().Return("root")
ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager(), nil)
handler := NewNMockHandler(t)
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "coll",
Fields: []*schemapb.FieldSchema{
{
FieldID: fieldID,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "128",
},
},
},
},
EnableDynamicField: false,
Properties: nil,
},
}, nil)
ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager(), handler)
assert.Equal(t, 6, len(ib.tasks))
assert.Equal(t, indexTaskInit, ib.tasks[buildID])
@ -741,6 +764,30 @@ func TestIndexBuilder_Error(t *testing.T) {
chunkManager := &mocks.ChunkManager{}
chunkManager.EXPECT().RootPath().Return("root")
handler := NewNMockHandler(t)
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "coll",
Fields: []*schemapb.FieldSchema{
{
FieldID: fieldID,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "128",
},
},
},
},
EnableDynamicField: false,
Properties: nil,
},
}, nil)
ib := &indexBuilder{
ctx: context.Background(),
tasks: map[int64]indexTaskState{
@ -749,6 +796,7 @@ func TestIndexBuilder_Error(t *testing.T) {
meta: createMetaTable(ec),
chunkManager: chunkManager,
indexEngineVersionManager: newIndexEngineVersionManager(),
handler: handler,
}
t.Run("meta not exist", func(t *testing.T) {
@ -1414,9 +1462,32 @@ func TestVecIndexWithOptionalScalarField(t *testing.T) {
mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar
}
handler := NewNMockHandler(t)
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "coll",
Fields: []*schemapb.FieldSchema{
{
FieldID: fieldID,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "128",
},
},
},
},
EnableDynamicField: false,
Properties: nil,
},
}, nil)
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
ib := newIndexBuilder(ctx, &mt, nodeManager, cm, newIndexEngineVersionManager(), nil)
ib := newIndexBuilder(ctx, &mt, nodeManager, cm, newIndexEngineVersionManager(), handler)
t.Run("success to get opt field on startup", func(t *testing.T) {
ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(

View File

@ -55,6 +55,8 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
defer i.lifetime.Done()
log.Info("IndexNode building index ...",
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64("partitionID", req.GetPartitionID()),
zap.Int64("segmentID", req.GetSegmentID()),
zap.Int64("indexID", req.GetIndexID()),
zap.String("indexName", req.GetIndexName()),
zap.String("indexFilePrefix", req.GetIndexFilePrefix()),

View File

@ -18,7 +18,6 @@ package indexnode
import (
"context"
"encoding/json"
"fmt"
"runtime/debug"
"strconv"
@ -30,6 +29,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/indexcgopb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
@ -84,12 +84,21 @@ type indexBuildTaskV2 struct {
}
func (it *indexBuildTaskV2) parseParams(ctx context.Context) error {
it.collectionID = it.req.CollectionID
it.partitionID = it.req.PartitionID
it.segmentID = it.req.SegmentID
it.fieldType = it.req.FieldType
it.fieldID = it.req.FieldID
it.fieldName = it.req.FieldName
it.collectionID = it.req.GetCollectionID()
it.partitionID = it.req.GetPartitionID()
it.segmentID = it.req.GetSegmentID()
it.fieldType = it.req.GetFieldType()
if it.fieldType == schemapb.DataType_None {
it.fieldType = it.req.GetField().GetDataType()
}
it.fieldID = it.req.GetFieldID()
if it.fieldID == 0 {
it.fieldID = it.req.GetField().GetFieldID()
}
it.fieldName = it.req.GetFieldName()
if it.fieldName == "" {
it.fieldName = it.req.GetField().GetName()
}
return nil
}
@ -138,61 +147,66 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error {
}
}
var buildIndexInfo *indexcgowrapper.BuildIndexInfo
buildIndexInfo, err = indexcgowrapper.NewBuildIndexInfo(it.req.GetStorageConfig())
defer indexcgowrapper.DeleteBuildIndexInfo(buildIndexInfo)
if err != nil {
log.Ctx(ctx).Warn("create build index info failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendFieldMetaInfoV2(it.collectionID, it.partitionID, it.segmentID, it.fieldID, it.fieldType, it.fieldName, it.req.Dim)
if err != nil {
log.Ctx(ctx).Warn("append field meta failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendIndexMetaInfo(it.req.IndexID, it.req.BuildID, it.req.IndexVersion)
if err != nil {
log.Ctx(ctx).Warn("append index meta failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendBuildIndexParam(it.newIndexParams)
if err != nil {
log.Ctx(ctx).Warn("append index params failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendIndexStorageInfo(it.req.StorePath, it.req.IndexStorePath, it.req.StoreVersion)
if err != nil {
log.Ctx(ctx).Warn("append storage info failed", zap.Error(err))
return err
}
jsonIndexParams, err := json.Marshal(it.newIndexParams)
if err != nil {
log.Ctx(ctx).Error("failed to json marshal index params", zap.Error(err))
return err
}
log.Ctx(ctx).Info("index params are ready",
zap.Int64("buildID", it.BuildID),
zap.String("index params", string(jsonIndexParams)))
err = buildIndexInfo.AppendBuildTypeParam(it.newTypeParams)
if err != nil {
log.Ctx(ctx).Warn("append type params failed", zap.Error(err))
return err
storageConfig := &indexcgopb.StorageConfig{
Address: it.req.GetStorageConfig().GetAddress(),
AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(),
SecretAccessKey: it.req.GetStorageConfig().GetSecretAccessKey(),
UseSSL: it.req.GetStorageConfig().GetUseSSL(),
BucketName: it.req.GetStorageConfig().GetBucketName(),
RootPath: it.req.GetStorageConfig().GetRootPath(),
UseIAM: it.req.GetStorageConfig().GetUseIAM(),
IAMEndpoint: it.req.GetStorageConfig().GetIAMEndpoint(),
StorageType: it.req.GetStorageConfig().GetStorageType(),
UseVirtualHost: it.req.GetStorageConfig().GetUseVirtualHost(),
Region: it.req.GetStorageConfig().GetRegion(),
CloudProvider: it.req.GetStorageConfig().GetCloudProvider(),
RequestTimeoutMs: it.req.GetStorageConfig().GetRequestTimeoutMs(),
SslCACert: it.req.GetStorageConfig().GetSslCACert(),
}
optFields := make([]*indexcgopb.OptionalFieldInfo, 0, len(it.req.GetOptionalScalarFields()))
for _, optField := range it.req.GetOptionalScalarFields() {
if err := buildIndexInfo.AppendOptionalField(optField); err != nil {
log.Ctx(ctx).Warn("append optional field failed", zap.Error(err))
return err
optFields = append(optFields, &indexcgopb.OptionalFieldInfo{
FieldID: optField.GetFieldID(),
FieldName: optField.GetFieldName(),
FieldType: optField.GetFieldType(),
DataPaths: optField.GetDataPaths(),
})
}
it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion())
field := it.req.GetField()
if field == nil || field.GetDataType() == schemapb.DataType_None {
field = &schemapb.FieldSchema{
FieldID: it.fieldID,
Name: it.fieldName,
DataType: it.fieldType,
}
}
it.index, err = indexcgowrapper.CreateIndexV2(ctx, buildIndexInfo)
buildIndexParams := &indexcgopb.BuildIndexInfo{
ClusterID: it.ClusterID,
BuildID: it.BuildID,
CollectionID: it.collectionID,
PartitionID: it.partitionID,
SegmentID: it.segmentID,
IndexVersion: it.req.GetIndexVersion(),
CurrentIndexVersion: it.currentIndexVersion,
NumRows: it.req.GetNumRows(),
Dim: it.req.GetDim(),
IndexFilePrefix: it.req.GetIndexFilePrefix(),
InsertFiles: it.req.GetDataPaths(),
FieldSchema: field,
StorageConfig: storageConfig,
IndexParams: mapToKVPairs(it.newIndexParams),
TypeParams: mapToKVPairs(it.newTypeParams),
StorePath: it.req.GetStorePath(),
StoreVersion: it.req.GetStoreVersion(),
IndexStorePath: it.req.GetIndexStorePath(),
OptFields: optFields,
}
it.index, err = indexcgowrapper.CreateIndexV2(ctx, buildIndexParams)
if err != nil {
if it.index != nil && it.index.CleanLocalData() != nil {
log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed",
@ -328,7 +342,7 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error {
if len(it.req.DataPaths) == 0 {
for _, id := range it.req.GetDataIds() {
path := metautil.BuildInsertLogPath(it.req.GetStorageConfig().RootPath, it.req.GetCollectionID(), it.req.GetPartitionID(), it.req.GetSegmentID(), it.req.GetFieldID(), id)
path := metautil.BuildInsertLogPath(it.req.GetStorageConfig().RootPath, it.req.GetCollectionID(), it.req.GetPartitionID(), it.req.GetSegmentID(), it.req.GetField().GetFieldID(), id)
it.req.DataPaths = append(it.req.DataPaths, path)
}
}
@ -362,16 +376,10 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error {
}
it.newTypeParams = typeParams
it.newIndexParams = indexParams
it.statistic.IndexParams = it.req.GetIndexParams()
// ugly codes to get dimension
if dimStr, ok := typeParams[common.DimKey]; ok {
var err error
it.statistic.Dim, err = strconv.ParseInt(dimStr, 10, 64)
if err != nil {
log.Ctx(ctx).Error("parse dimesion failed", zap.Error(err))
// ignore error
}
}
it.statistic.Dim = it.req.GetDim()
log.Ctx(ctx).Info("Successfully prepare indexBuildTask", zap.Int64("buildID", it.BuildID),
zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID))
return nil
@ -482,69 +490,65 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
}
}
var buildIndexInfo *indexcgowrapper.BuildIndexInfo
buildIndexInfo, err = indexcgowrapper.NewBuildIndexInfo(it.req.GetStorageConfig())
defer indexcgowrapper.DeleteBuildIndexInfo(buildIndexInfo)
if err != nil {
log.Ctx(ctx).Warn("create build index info failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendFieldMetaInfo(it.collectionID, it.partitionID, it.segmentID, it.fieldID, it.fieldType)
if err != nil {
log.Ctx(ctx).Warn("append field meta failed", zap.Error(err))
return err
storageConfig := &indexcgopb.StorageConfig{
Address: it.req.GetStorageConfig().GetAddress(),
AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(),
SecretAccessKey: it.req.GetStorageConfig().GetSecretAccessKey(),
UseSSL: it.req.GetStorageConfig().GetUseSSL(),
BucketName: it.req.GetStorageConfig().GetBucketName(),
RootPath: it.req.GetStorageConfig().GetRootPath(),
UseIAM: it.req.GetStorageConfig().GetUseIAM(),
IAMEndpoint: it.req.GetStorageConfig().GetIAMEndpoint(),
StorageType: it.req.GetStorageConfig().GetStorageType(),
UseVirtualHost: it.req.GetStorageConfig().GetUseVirtualHost(),
Region: it.req.GetStorageConfig().GetRegion(),
CloudProvider: it.req.GetStorageConfig().GetCloudProvider(),
RequestTimeoutMs: it.req.GetStorageConfig().GetRequestTimeoutMs(),
SslCACert: it.req.GetStorageConfig().GetSslCACert(),
}
err = buildIndexInfo.AppendIndexMetaInfo(it.req.IndexID, it.req.BuildID, it.req.IndexVersion)
if err != nil {
log.Ctx(ctx).Warn("append index meta failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendBuildIndexParam(it.newIndexParams)
if err != nil {
log.Ctx(ctx).Warn("append index params failed", zap.Error(err))
return err
}
jsonIndexParams, err := json.Marshal(it.newIndexParams)
if err != nil {
log.Ctx(ctx).Error("failed to json marshal index params", zap.Error(err))
return err
}
log.Ctx(ctx).Info("index params are ready",
zap.Int64("buildID", it.BuildID),
zap.String("index params", string(jsonIndexParams)))
err = buildIndexInfo.AppendBuildTypeParam(it.newTypeParams)
if err != nil {
log.Ctx(ctx).Warn("append type params failed", zap.Error(err))
return err
}
for _, path := range it.req.GetDataPaths() {
err = buildIndexInfo.AppendInsertFile(path)
if err != nil {
log.Ctx(ctx).Warn("append insert binlog path failed", zap.Error(err))
return err
}
optFields := make([]*indexcgopb.OptionalFieldInfo, 0, len(it.req.GetOptionalScalarFields()))
for _, optField := range it.req.GetOptionalScalarFields() {
optFields = append(optFields, &indexcgopb.OptionalFieldInfo{
FieldID: optField.GetFieldID(),
FieldName: optField.GetFieldName(),
FieldType: optField.GetFieldType(),
DataPaths: optField.GetDataPaths(),
})
}
it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion())
if err := buildIndexInfo.AppendIndexEngineVersion(it.currentIndexVersion); err != nil {
log.Ctx(ctx).Warn("append index engine version failed", zap.Error(err))
return err
}
for _, optField := range it.req.GetOptionalScalarFields() {
if err := buildIndexInfo.AppendOptionalField(optField); err != nil {
log.Ctx(ctx).Warn("append optional field failed", zap.Error(err))
return err
field := it.req.GetField()
if field == nil || field.GetDataType() == schemapb.DataType_None {
field = &schemapb.FieldSchema{
FieldID: it.fieldID,
Name: it.fieldName,
DataType: it.fieldType,
}
}
buildIndexParams := &indexcgopb.BuildIndexInfo{
ClusterID: it.ClusterID,
BuildID: it.BuildID,
CollectionID: it.collectionID,
PartitionID: it.partitionID,
SegmentID: it.segmentID,
IndexVersion: it.req.GetIndexVersion(),
CurrentIndexVersion: it.currentIndexVersion,
NumRows: it.req.GetNumRows(),
Dim: it.req.GetDim(),
IndexFilePrefix: it.req.GetIndexFilePrefix(),
InsertFiles: it.req.GetDataPaths(),
FieldSchema: field,
StorageConfig: storageConfig,
IndexParams: mapToKVPairs(it.newIndexParams),
TypeParams: mapToKVPairs(it.newTypeParams),
StorePath: it.req.GetStorePath(),
StoreVersion: it.req.GetStoreVersion(),
IndexStorePath: it.req.GetIndexStorePath(),
OptFields: optFields,
}
it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexInfo)
it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexParams)
if err != nil {
if it.index != nil && it.index.CleanLocalData() != nil {
log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed",
@ -653,8 +657,6 @@ func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob
deserializeDur := it.tr.RecordSpan()
log.Ctx(ctx).Info("IndexNode deserialize data success",
zap.Int64("index id", it.req.IndexID),
zap.String("index name", it.req.IndexName),
zap.Int64("collectionID", it.collectionID),
zap.Int64("partitionID", it.partitionID),
zap.Int64("segmentID", it.segmentID),

View File

@ -283,12 +283,14 @@ func (suite *IndexBuildTaskV2Suite) TestBuildIndex() {
RootPath: "/tmp/milvus/data",
StorageType: "local",
},
CollectionID: 1,
PartitionID: 1,
SegmentID: 1,
FieldID: 3,
FieldName: "vec",
FieldType: schemapb.DataType_FloatVector,
CollectionID: 1,
PartitionID: 1,
SegmentID: 1,
Field: &schemapb.FieldSchema{
FieldID: 3,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
},
StorePath: "file://" + suite.space.Path(),
StoreVersion: suite.space.GetCurrentVersion(),
IndexStorePath: "file://" + suite.space.Path(),

View File

@ -19,6 +19,7 @@ package indexnode
import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
)
@ -36,3 +37,14 @@ func estimateFieldDataSize(dim int64, numRows int64, dataType schemapb.DataType)
return 0, nil
}
}
func mapToKVPairs(m map[string]string) []*commonpb.KeyValuePair {
kvs := make([]*commonpb.KeyValuePair, 0, len(m))
for k, v := range m {
kvs = append(kvs, &commonpb.KeyValuePair{
Key: k,
Value: v,
})
}
return kvs
}

View File

@ -0,0 +1,41 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexnode
import (
"testing"
"github.com/stretchr/testify/suite"
)
type utilSuite struct {
suite.Suite
}
func (s *utilSuite) Test_mapToKVPairs() {
indexParams := map[string]string{
"index_type": "IVF_FLAT",
"dim": "128",
"nlist": "1024",
}
s.Equal(3, len(mapToKVPairs(indexParams)))
}
func Test_utilSuite(t *testing.T) {
suite.Run(t, new(utilSuite))
}

View File

@ -0,0 +1,23 @@
syntax = "proto3";
package milvus.proto.cgo;
option go_package="github.com/milvus-io/milvus/internal/proto/cgopb";
import "schema.proto";
message LoadIndexInfo {
int64 collectionID = 1;
int64 partitionID = 2;
int64 segmentID = 3;
schema.FieldSchema field = 5;
bool enable_mmap = 6;
string mmap_dir_path = 7;
int64 indexID = 8;
int64 index_buildID = 9;
int64 index_version = 10;
map<string, string> index_params = 11;
repeated string index_files = 12;
string uri = 13;
int64 index_store_version = 14;
int32 index_engine_version = 15;
}

View File

@ -4,6 +4,7 @@ package milvus.proto.indexcgo;
option go_package="github.com/milvus-io/milvus/internal/proto/indexcgopb";
import "common.proto";
import "schema.proto";
message TypeParams {
repeated common.KeyValuePair params = 1;
@ -30,3 +31,52 @@ message Binary {
message BinarySet {
repeated Binary datas = 1;
}
// Synchronously modify StorageConfig in index_coord.proto file
message StorageConfig {
string address = 1;
string access_keyID = 2;
string secret_access_key = 3;
bool useSSL = 4;
string bucket_name = 5;
string root_path = 6;
bool useIAM = 7;
string IAMEndpoint = 8;
string storage_type = 9;
bool use_virtual_host = 10;
string region = 11;
string cloud_provider = 12;
int64 request_timeout_ms = 13;
string sslCACert = 14;
}
// Synchronously modify OptionalFieldInfo in index_coord.proto file
message OptionalFieldInfo {
int64 fieldID = 1;
string field_name = 2;
int32 field_type = 3;
repeated string data_paths = 4;
}
message BuildIndexInfo {
string clusterID = 1;
int64 buildID = 2;
int64 collectionID = 3;
int64 partitionID = 4;
int64 segmentID = 5;
int64 index_version = 6;
int32 current_index_version = 7;
int64 num_rows = 8;
int64 dim = 9;
string index_file_prefix = 10;
repeated string insert_files = 11;
// repeated int64 data_ids = 12;
schema.FieldSchema field_schema = 12;
StorageConfig storage_config = 13;
repeated common.KeyValuePair index_params = 14;
repeated common.KeyValuePair type_params = 15;
string store_path = 16;
int64 store_version = 17;
string index_store_path = 18;
repeated OptionalFieldInfo opt_fields = 19;
}

View File

@ -8,6 +8,7 @@ import "common.proto";
import "internal.proto";
import "milvus.proto";
import "schema.proto";
import "index_cgo_msg.proto";
service IndexCoord {
rpc GetComponentStates(milvus.GetComponentStatesRequest) returns (milvus.ComponentStates) {}
@ -226,6 +227,7 @@ message GetIndexBuildProgressResponse {
int64 pending_index_rows = 4;
}
// Synchronously modify StorageConfig in index_cgo_msg.proto file
message StorageConfig {
string address = 1;
string access_keyID = 2;
@ -243,6 +245,7 @@ message StorageConfig {
string sslCACert = 14;
}
// Synchronously modify OptionalFieldInfo in index_cgo_msg.proto file
message OptionalFieldInfo {
int64 fieldID = 1;
string field_name = 2;
@ -276,6 +279,7 @@ message CreateJobRequest {
int64 dim = 22;
repeated int64 data_ids = 23;
repeated OptionalFieldInfo optional_scalar_fields = 24;
schema.FieldSchema field = 25;
}
message QueryJobsRequest {

View File

@ -29,11 +29,13 @@ import (
"runtime"
"unsafe"
"github.com/golang/protobuf/proto"
"github.com/pingcap/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord"
"github.com/milvus-io/milvus/internal/proto/cgopb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/common"
@ -245,3 +247,33 @@ func (li *LoadIndexInfo) appendIndexEngineVersion(ctx context.Context, indexEngi
return HandleCStatus(ctx, &status, "AppendIndexEngineVersion failed")
}
func (li *LoadIndexInfo) finish(ctx context.Context, info *cgopb.LoadIndexInfo) error {
marshaled, err := proto.Marshal(info)
if err != nil {
return err
}
var status C.CStatus
_, _ = GetDynamicPool().Submit(func() (any, error) {
status = C.FinishLoadIndexInfo(li.cLoadIndexInfo, (*C.uint8_t)(unsafe.Pointer(&marshaled[0])), (C.uint64_t)(len(marshaled)))
return nil, nil
}).Await()
if err := HandleCStatus(ctx, &status, "FinishLoadIndexInfo failed"); err != nil {
return err
}
_, _ = GetLoadPool().Submit(func() (any, error) {
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
status = C.AppendIndexV3(li.cLoadIndexInfo)
} else {
traceCtx := ParseCTraceContext(ctx)
status = C.AppendIndexV2(traceCtx.ctx, li.cLoadIndexInfo)
runtime.KeepAlive(traceCtx)
}
return nil, nil
}).Await()
return HandleCStatus(ctx, &status, "AppendIndex failed")
}

View File

@ -45,6 +45,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
milvus_storage "github.com/milvus-io/milvus-storage/go/storage"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus/internal/proto/cgopb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
@ -56,6 +57,9 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -1262,18 +1266,58 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
return err
}
defer deleteLoadIndexInfo(loadIndexInfo)
schema, err := typeutil.CreateSchemaHelper(s.GetCollection().Schema())
if err != nil {
return err
}
fieldSchema, err := schema.GetFieldFromID(indexInfo.GetFieldID())
if err != nil {
return err
}
indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams)
// as Knowhere reports error if encounter an unknown param, we need to delete it
delete(indexParams, common.MmapEnabledKey)
// some build params also exist in indexParams, which are useless during loading process
if indexParams["index_type"] == indexparamcheck.IndexDISKANN {
if err := indexparams.SetDiskIndexLoadParams(paramtable.Get(), indexParams, indexInfo.GetNumRows()); err != nil {
return err
}
}
if err := indexparams.AppendPrepareLoadParams(paramtable.Get(), indexParams); err != nil {
return err
}
indexInfoProto := &cgopb.LoadIndexInfo{
CollectionID: s.Collection(),
PartitionID: s.Partition(),
SegmentID: s.ID(),
Field: fieldSchema,
EnableMmap: isIndexMmapEnable(indexInfo),
MmapDirPath: paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue(),
IndexID: indexInfo.GetIndexID(),
IndexBuildID: indexInfo.GetBuildID(),
IndexVersion: indexInfo.GetIndexVersion(),
IndexParams: indexParams,
IndexFiles: indexInfo.GetIndexFilePaths(),
IndexEngineVersion: indexInfo.GetCurrentIndexVersion(),
IndexStoreVersion: indexInfo.GetIndexStoreVersion(),
}
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.ID())
if err != nil {
return err
}
loadIndexInfo.appendStorageInfo(uri, indexInfo.IndexStoreVersion)
indexInfoProto.Uri = uri
}
newLoadIndexInfoSpan := tr.RecordSpan()
// 2.
err = loadIndexInfo.appendLoadIndexInfo(ctx, indexInfo, s.Collection(), s.Partition(), s.ID(), fieldType)
if err != nil {
if err := loadIndexInfo.finish(ctx, indexInfoProto); err != nil {
if loadIndexInfo.cleanLocalData(ctx) != nil {
log.Warn("failed to clean cached data on disk after append index failed",
zap.Int64("buildID", indexInfo.BuildID),

View File

@ -16,6 +16,7 @@ import (
"unsafe"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -94,9 +95,17 @@ func NewCgoIndex(dtype schemapb.DataType, typeParams, indexParams map[string]str
return index, nil
}
func CreateIndex(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecIndex, error) {
func CreateIndex(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (CodecIndex, error) {
buildIndexInfoBlob, err := proto.Marshal(buildIndexInfo)
if err != nil {
log.Ctx(ctx).Warn("marshal buildIndexInfo failed",
zap.String("clusterID", buildIndexInfo.GetClusterID()),
zap.Int64("buildID", buildIndexInfo.GetBuildID()),
zap.Error(err))
return nil, err
}
var indexPtr C.CIndex
status := C.CreateIndex(&indexPtr, buildIndexInfo.cBuildIndexInfo)
status := C.CreateIndex(&indexPtr, (*C.uint8_t)(unsafe.Pointer(&buildIndexInfoBlob[0])), (C.uint64_t)(len(buildIndexInfoBlob)))
if err := HandleCStatus(&status, "failed to create index"); err != nil {
return nil, err
}
@ -109,9 +118,17 @@ func CreateIndex(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecInde
return index, nil
}
func CreateIndexV2(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecIndex, error) {
func CreateIndexV2(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (CodecIndex, error) {
buildIndexInfoBlob, err := proto.Marshal(buildIndexInfo)
if err != nil {
log.Ctx(ctx).Warn("marshal buildIndexInfo failed",
zap.String("clusterID", buildIndexInfo.GetClusterID()),
zap.Int64("buildID", buildIndexInfo.GetBuildID()),
zap.Error(err))
return nil, err
}
var indexPtr C.CIndex
status := C.CreateIndexV2(&indexPtr, buildIndexInfo.cBuildIndexInfo)
status := C.CreateIndexV2(&indexPtr, (*C.uint8_t)(unsafe.Pointer(&buildIndexInfoBlob[0])), (C.uint64_t)(len(buildIndexInfoBlob)))
if err := HandleCStatus(&status, "failed to create index"); err != nil {
return nil, err
}

View File

@ -17,7 +17,8 @@ func (c *INVERTEDChecker) CheckTrain(params map[string]string) error {
}
func (c *INVERTEDChecker) CheckValidDataType(dType schemapb.DataType) error {
if !typeutil.IsBoolType(dType) && !typeutil.IsArithmetic(dType) && !typeutil.IsStringType(dType) {
if !typeutil.IsBoolType(dType) && !typeutil.IsArithmetic(dType) && !typeutil.IsStringType(dType) &&
!typeutil.IsArrayType(dType) {
return fmt.Errorf("INVERTED are not supported on %s field", dType.String())
}
return nil

View File

@ -18,8 +18,8 @@ func Test_INVERTEDIndexChecker(t *testing.T) {
assert.NoError(t, c.CheckValidDataType(schemapb.DataType_Bool))
assert.NoError(t, c.CheckValidDataType(schemapb.DataType_Int64))
assert.NoError(t, c.CheckValidDataType(schemapb.DataType_Float))
assert.NoError(t, c.CheckValidDataType(schemapb.DataType_Array))
assert.Error(t, c.CheckValidDataType(schemapb.DataType_JSON))
assert.Error(t, c.CheckValidDataType(schemapb.DataType_Array))
assert.Error(t, c.CheckValidDataType(schemapb.DataType_FloatVector))
}

View File

@ -44,6 +44,7 @@ pushd ${PROTO_DIR}
mkdir -p etcdpb
mkdir -p indexcgopb
mkdir -p cgopb
mkdir -p internalpb
mkdir -p rootcoordpb
@ -62,6 +63,7 @@ protoc_opt="${PROTOC_BIN} --proto_path=${API_PROTO_DIR} --proto_path=."
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./etcdpb etcd_meta.proto || { echo 'generate etcd_meta.proto failed'; exit 1; }
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./indexcgopb index_cgo_msg.proto || { echo 'generate index_cgo_msg failed '; exit 1; }
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./cgopb cgo_msg.proto || { echo 'generate cgo_msg failed '; exit 1; }
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./rootcoordpb root_coord.proto || { echo 'generate root_coord.proto failed'; exit 1; }
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./internalpb internal.proto || { echo 'generate internal.proto failed'; exit 1; }
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./proxypb proxy.proto|| { echo 'generate proxy.proto failed'; exit 1; }
@ -78,6 +80,7 @@ ${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb schema.proto|| { echo 'generate sche
${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb common.proto|| { echo 'generate common.proto failed'; exit 1; }
${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb segcore.proto|| { echo 'generate segcore.proto failed'; exit 1; }
${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb index_cgo_msg.proto|| { echo 'generate index_cgo_msg.proto failed'; exit 1; }
${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb cgo_msg.proto|| { echo 'generate cgo_msg.proto failed'; exit 1; }
${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb plan.proto|| { echo 'generate plan.proto failed'; exit 1; }
popd

View File

@ -1313,10 +1313,7 @@ class TestIndexInvalid(TestcaseBase):
collection_w = self.init_collection_wrap(schema=schema)
# 2. create index
scalar_index_params = {"index_type": "INVERTED"}
collection_w.create_index(ct.default_int32_array_field_name, index_params=scalar_index_params,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1100,
ct.err_msg: "create index on Array field is not supported"})
collection_w.create_index(ct.default_int32_array_field_name, index_params=scalar_index_params)
@pytest.mark.tags(CaseLabel.L1)
def test_create_inverted_index_no_vector_index(self):