enhance: make json path index support term filter ()

issue: 

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
pull/40316/head
Bingyi Sun 2025-03-04 11:56:02 +08:00 committed by GitHub
parent f2ea4d6370
commit 7040ba1c12
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 134 additions and 56 deletions

View File

@ -237,7 +237,11 @@ PhyTermFilterExpr::ExecVisitorImplTemplateJson(OffsetVector* input) {
if (expr_->is_in_field_) {
return ExecTermJsonVariableInField<ValueType>(input);
} else {
return ExecTermJsonFieldInVariable<ValueType>(input);
if (is_index_mode_) {
return ExecVisitorImplForIndex<ValueType>(input);
} else {
return ExecTermJsonFieldInVariable<ValueType>(input);
}
}
}

View File

@ -63,7 +63,9 @@ class PhyTermFilterExpr : public SegmentExpr {
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
DataType::NONE,
expr->vals_.size() == 0
? DataType::NONE
: FromValCase(expr->vals_[0].val_case()),
active_count,
batch_size),
expr_(expr),

View File

@ -58,7 +58,8 @@ InitRemoteArrowFileSystemSingleton(CStorageConfig c_storage_config) {
conf.useIAM = c_storage_config.useIAM;
conf.useVirtualHost = c_storage_config.useVirtualHost;
conf.requestTimeoutMs = c_storage_config.requestTimeoutMs;
conf.gcp_credential_json = std::string(c_storage_config.gcp_credential_json);
conf.gcp_credential_json =
std::string(c_storage_config.gcp_credential_json);
conf.use_custom_part_upload = c_storage_config.use_custom_part_upload;
milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf);

View File

@ -34,7 +34,6 @@ InitRemoteArrowFileSystemSingleton(CStorageConfig c_storage_config);
void
CleanRemoteArrowFileSystemSingleton();
#ifdef __cplusplus
}
#endif

View File

@ -21,27 +21,33 @@ using VecVecInt = std::vector<std::vector<int>>;
extern "C" {
CColumnGroups NewCColumnGroups() {
auto vv = std::make_unique<VecVecInt>();
return vv.release();
CColumnGroups
NewCColumnGroups() {
auto vv = std::make_unique<VecVecInt>();
return vv.release();
}
void AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) {
if (!cgs || !group)
return;
void
AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) {
if (!cgs || !group)
return;
auto vv = static_cast<VecVecInt*>(cgs);
std::vector<int> new_group(group, group + group_size);
vv->emplace_back(std::move(new_group));
auto vv = static_cast<VecVecInt*>(cgs);
std::vector<int> new_group(group, group + group_size);
vv->emplace_back(std::move(new_group));
}
int CColumnGroupsSize(CColumnGroups cgs) {
if (!cgs)
return 0;
int
CColumnGroupsSize(CColumnGroups cgs) {
if (!cgs)
return 0;
auto vv = static_cast<VecVecInt*>(cgs);
return static_cast<int>(vv->size());
auto vv = static_cast<VecVecInt*>(cgs);
return static_cast<int>(vv->size());
}
void FreeCColumnGroups(CColumnGroups cgs) { delete static_cast<VecVecInt*>(cgs); }
void
FreeCColumnGroups(CColumnGroups cgs) {
delete static_cast<VecVecInt*>(cgs);
}
}

View File

@ -22,13 +22,17 @@ extern "C" {
typedef void* CColumnGroups;
CColumnGroups NewCColumnGroups();
CColumnGroups
NewCColumnGroups();
void AddCColumnGroup(CColumnGroups cgs, int* group, int group_size);
void
AddCColumnGroup(CColumnGroups cgs, int* group, int group_size);
int CColumnGroupsSize(CColumnGroups cgs);
int
CColumnGroupsSize(CColumnGroups cgs);
void FreeCColumnGroups(CColumnGroups cgs);
void
FreeCColumnGroups(CColumnGroups cgs);
#ifdef __cplusplus
}

View File

@ -25,7 +25,6 @@
#include "common/EasyAssert.h"
#include "common/type_c.h"
CStatus
NewPackedReader(char** paths,
int64_t num_paths,
@ -34,9 +33,11 @@ NewPackedReader(char** paths,
CPackedReader* c_packed_reader) {
try {
auto truePaths = std::vector<std::string>(paths, paths + num_paths);
auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance().GetArrowFileSystem();
auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
if (!trueFs) {
return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, "Failed to get filesystem");
return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed,
"Failed to get filesystem");
}
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
std::set<int> needed_columns;
@ -63,7 +64,8 @@ ReadNext(CPackedReader c_packed_reader,
std::shared_ptr<arrow::RecordBatch> record_batch;
auto status = packed_reader->ReadNext(&record_batch);
if (!status.ok()) {
return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, status.ToString());
return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed,
status.ToString());
}
if (record_batch == nullptr) {
// end of file
@ -75,7 +77,8 @@ ReadNext(CPackedReader c_packed_reader,
auto status = arrow::ExportRecordBatch(
*record_batch, arr.get(), schema.get());
if (!status.ok()) {
return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, status.ToString());
return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed,
status.ToString());
}
*out_array = arr.release();
*out_schema = schema.release();

View File

@ -39,14 +39,16 @@ NewPackedWriter(struct ArrowSchema* schema,
conf.part_size = part_upload_size;
auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
.GetArrowFileSystem();
if (!trueFs) {
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, "Failed to get filesystem");
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed,
"Failed to get filesystem");
}
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto columnGroups = *static_cast<std::vector<std::vector<int>>*>(column_groups);
auto columnGroups =
*static_cast<std::vector<std::vector<int>>*>(column_groups);
auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
trueFs, truePaths, trueSchema, conf, columnGroups, buffer_size);
@ -70,7 +72,8 @@ WriteRecordBatch(CPackedWriter c_packed_writer,
arrow::ImportRecordBatch(array, schema).ValueOrDie();
auto status = packed_writer->Write(record_batch);
if (!status.ok()) {
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, status.ToString());
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed,
status.ToString());
}
return milvus::SuccessCStatus();
} catch (std::exception& e) {
@ -87,7 +90,8 @@ CloseWriter(CPackedWriter c_packed_writer) {
auto status = packed_writer->Close();
delete packed_writer;
if (!status.ok()) {
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, status.ToString());
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed,
status.ToString());
}
return milvus::SuccessCStatus();
} catch (std::exception& e) {

View File

@ -19,29 +19,28 @@
#include <cstring>
#include "segcore/column_groups_c.h"
TEST(CColumnGroups, TestCColumnGroups) {
CColumnGroups cgs = NewCColumnGroups();
int group1[] = {2, 4, 5};
int group2[] = {0, 1};
int group3[] = {3, 6, 7, 8};
CColumnGroups cgs = NewCColumnGroups();
int group1[] = {2, 4, 5};
int group2[] = {0, 1};
int group3[] = {3, 6, 7, 8};
int* test_groups[] = {group1, group2, group3};
int group_sizes[] = {3, 2, 4};
int* test_groups[] = {group1, group2, group3};
int group_sizes[] = {3, 2, 4};
for (int i = 0; i < 3; i++) {
AddCColumnGroup(cgs, test_groups[i], group_sizes[i]);
}
ASSERT_EQ(CColumnGroupsSize(cgs), 3);
auto vv = static_cast<std::vector<std::vector<int>>*>(cgs);
for (int i = 0; i < 3; i++) {
ASSERT_EQ(vv->at(i).size(), group_sizes[i]);
for (int j = 0; j < group_sizes[i]; j++) {
EXPECT_EQ(vv->at(i)[j], test_groups[i][j]);
for (int i = 0; i < 3; i++) {
AddCColumnGroup(cgs, test_groups[i], group_sizes[i]);
}
}
FreeCColumnGroups(cgs);
ASSERT_EQ(CColumnGroupsSize(cgs), 3);
auto vv = static_cast<std::vector<std::vector<int>>*>(cgs);
for (int i = 0; i < 3; i++) {
ASSERT_EQ(vv->at(i).size(), group_sizes[i]);
for (int j = 0; j < group_sizes[i]; j++) {
EXPECT_EQ(vv->at(i)[j], test_groups[i][j]);
}
}
FreeCColumnGroups(cgs);
}

View File

@ -17,6 +17,7 @@
#include <memory>
#include <regex>
#include <string>
#include <string_view>
#include <type_traits>
#include <vector>
#include <chrono>
@ -16125,4 +16126,52 @@ TYPED_TEST(JsonIndexTestFixture, TestJsonIndexUnaryExpr) {
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, unary_expr);
final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP);
EXPECT_EQ(final.count(), N);
auto term_expr = std::make_shared<expr::TermFilterExpr>(
expr::ColumnInfo(json_fid, DataType::JSON, {this->json_path.substr(1)}),
std::vector<proto::plan::GenericValue>(),
false);
plan =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, term_expr);
final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP);
EXPECT_EQ(final.count(), 0);
using DT = std::conditional_t<
std::is_same_v<typename TestFixture::DataType, std::string>,
std::string_view,
typename TestFixture::DataType>;
std::vector<proto::plan::GenericValue> vals;
int expect_count = 10;
if constexpr (std::is_same_v<DT, bool>) {
proto::plan::GenericValue val;
val.set_bool_val(true);
vals.push_back(val);
val.set_bool_val(false);
vals.push_back(val);
expect_count = N;
} else {
for (int i = 0; i < expect_count; ++i) {
proto::plan::GenericValue val;
auto v = jsons[i].at<DT>(this->json_path).value();
if constexpr (std::is_same_v<DT, int64_t>) {
val.set_int64_val(v);
} else if constexpr (std::is_same_v<DT, double>) {
val.set_float_val(v);
} else if constexpr (std::is_same_v<DT, std::string_view>) {
val.set_string_val(std::string(v));
} else if constexpr (std::is_same_v<DT, bool>) {
val.set_bool_val(i % 2 == 0);
}
vals.push_back(val);
}
}
term_expr = std::make_shared<expr::TermFilterExpr>(
expr::ColumnInfo(json_fid, DataType::JSON, {this->json_path.substr(1)}),
vals,
false);
plan =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, term_expr);
final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP);
EXPECT_EQ(final.count(), expect_count);
}

View File

@ -36,7 +36,7 @@ TEST(CPackedTest, PackedWriterAndReader) {
auto status = builder->AppendValues(test_data.begin(), test_data.end());
ASSERT_TRUE(status.ok());
auto res = builder->Finish();
ASSERT_TRUE(res.ok());
ASSERT_TRUE(res.ok());
std::shared_ptr<arrow::Array> array = res.ValueOrDie();
auto schema = arrow::schema({arrow::field("int64", arrow::int64())});
@ -57,7 +57,13 @@ TEST(CPackedTest, PackedWriterAndReader) {
auto c_status = InitLocalArrowFileSystemSingleton(path);
EXPECT_EQ(c_status.error_code, 0);
CPackedWriter c_packed_writer = nullptr;
c_status = NewPackedWriter(&c_write_schema, buffer_size, paths, 1, part_upload_size, cgs, &c_packed_writer);
c_status = NewPackedWriter(&c_write_schema,
buffer_size,
paths,
1,
part_upload_size,
cgs,
&c_packed_writer);
EXPECT_EQ(c_status.error_code, 0);
EXPECT_NE(c_packed_writer, nullptr);
@ -74,7 +80,8 @@ TEST(CPackedTest, PackedWriterAndReader) {
struct ArrowSchema c_read_schema;
ASSERT_TRUE(arrow::ExportSchema(*schema, &c_read_schema).ok());
CPackedReader c_packed_reader = nullptr;
c_status = NewPackedReader(paths, 1, &c_read_schema, buffer_size, &c_packed_reader);
c_status = NewPackedReader(
paths, 1, &c_read_schema, buffer_size, &c_packed_reader);
EXPECT_EQ(c_status.error_code, 0);
EXPECT_NE(c_packed_reader, nullptr);