From 546080dcdd38a42addd13bc11bb0162449a74b86 Mon Sep 17 00:00:00 2001 From: yah01 Date: Fri, 21 Apr 2023 11:46:32 +0800 Subject: [PATCH] Support to retrieve json (#23563) Signed-off-by: yah01 --- internal/core/src/common/Column.h | 32 +++---- internal/core/src/common/Types.h | 2 + internal/core/src/common/Utils.h | 83 +++++++++-------- internal/core/src/common/VectorTrait.h | 3 +- internal/core/src/query/SearchOnSealed.cpp | 1 + .../query/visitors/ExecPlanNodeVisitor.cpp | 1 + .../core/src/segcore/ConcurrentVector.cpp | 89 ++++++++++--------- internal/core/src/segcore/InsertRecord.h | 12 ++- .../core/src/segcore/SegmentGrowingImpl.cpp | 21 ++++- .../core/src/segcore/SegmentGrowingImpl.h | 2 +- .../core/src/segcore/SegmentInterface.cpp | 1 - .../core/src/segcore/SegmentSealedImpl.cpp | 29 ++++-- internal/core/src/segcore/SegmentSealedImpl.h | 2 +- internal/core/src/segcore/Utils.cpp | 59 ++++++------ internal/core/src/segcore/Utils.h | 2 +- internal/core/unittest/test_sealed.cpp | 21 +++-- internal/core/unittest/test_string_expr.cpp | 16 ++-- internal/core/unittest/test_utils/DataGen.h | 14 ++- .../querynodev2/segments/retrieve_test.go | 3 +- internal/querynodev2/segments/segment_test.go | 3 +- internal/storage/payload.go | 4 - 21 files changed, 233 insertions(+), 167 deletions(-) diff --git a/internal/core/src/common/Column.h b/internal/core/src/common/Column.h index f5a3660e83..cb577ddf22 100644 --- a/internal/core/src/common/Column.h +++ b/internal/core/src/common/Column.h @@ -24,6 +24,7 @@ #include "common/FieldMeta.h" #include "common/LoadInfo.h" #include "common/Span.h" +#include "common/Types.h" #include "common/Utils.h" #include "exceptions/EasyAssert.h" #include "fmt/core.h" @@ -32,8 +33,6 @@ namespace milvus::segcore { -#define FIELD_DATA(info, field) (info->scalars().field##_data().data()) - struct Entry { char* data; uint32_t length; @@ -113,11 +112,11 @@ class VariableColumn : public ColumnBase { const FieldMeta& field_meta, const LoadFieldDataInfo& info, Ctor&& ctor) { - auto begin = info.field_data->scalars().string_data().data().begin(); - auto end = info.field_data->scalars().string_data().data().end(); - if constexpr (std::is_same_v) { - begin = info.field_data->scalars().json_data().data().begin(); - end = info.field_data->scalars().json_data().data().end(); + auto begin = FIELD_DATA(info.field_data, string).begin(); + auto end = FIELD_DATA(info.field_data, string).end(); + if constexpr (std::is_same_v) { + begin = FIELD_DATA(info.field_data, json).begin(); + end = FIELD_DATA(info.field_data, json).end(); } indices_.reserve(info.row_count); @@ -155,6 +154,13 @@ class VariableColumn : public ColumnBase { return views_[i]; } + std::string_view + raw_at(const int i) const { + size_t len = (i == indices_.size() - 1) ? size_ - indices_.back() + : indices_[i + 1] - indices_[i]; + return std::string_view(data_ + indices_[i], len); + } + protected: template void @@ -166,18 +172,6 @@ class VariableColumn : public ColumnBase { } views_.emplace_back( ctor(data_ + indices_.back(), size_ - indices_.back())); - - // as we stores the json objects entirely in memory, - // the raw data is not needed anymore - if constexpr (std::is_same_v) { - if (munmap(data_, size_)) { - AssertInfo( - true, - fmt::format( - "failed to unmap json field after deserialized, err={}", - strerror(errno))); - } - } } private: diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index c359baf20c..10fc0a345e 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -78,6 +79,7 @@ using VectorArray = proto::schema::VectorField; using IdArray = proto::schema::IDs; using InsertData = proto::segcore::InsertRecord; using PkType = std::variant; +using Json = nlohmann::json; inline bool IsPrimaryKeyDataType(DataType data_type) { diff --git a/internal/core/src/common/Utils.h b/internal/core/src/common/Utils.h index 8f3ae1068d..8a0cb28d3c 100644 --- a/internal/core/src/common/Utils.h +++ b/internal/core/src/common/Utils.h @@ -34,6 +34,12 @@ #include "knowhere/expected.h" namespace milvus { +#define FIELD_DATA(data_array, type) \ + (data_array->scalars().type##_data().data()) + +#define VEC_FIELD_DATA(data_array, type) \ + (data_array->vectors().type##_vector().data()) + inline DatasetPtr GenDataset(const int64_t nb, const int64_t dim, const void* xb) { return knowhere::GenDataSet(nb, dim, xb); @@ -196,14 +202,14 @@ GetDataSize(const FieldMeta& field, size_t row_count, const DataArray* data) { case DataType::VARCHAR: case DataType::STRING: { ssize_t size{}; - for (auto& data : data->scalars().string_data().data()) { + for (auto& data : FIELD_DATA(data, string)) { size += data.size(); } return size; } case DataType::JSON: { ssize_t size{}; - for (auto& data : data->scalars().json_data().data()) { + for (auto& data : FIELD_DATA(data, json)) { size += data.size(); } return size; @@ -225,50 +231,44 @@ FillField(DataType data_type, auto data = info.field_data; switch (data_type) { case DataType::BOOL: { - return memcpy(dst, data->scalars().bool_data().data().data(), size); + return memcpy(dst, FIELD_DATA(data, bool).data(), size); } case DataType::INT8: { - auto src_data = data->scalars().int_data().data(); + auto src_data = FIELD_DATA(data, int); std::vector data_raw(src_data.size()); std::copy_n(src_data.data(), src_data.size(), data_raw.data()); return memcpy(dst, data_raw.data(), size); } case DataType::INT16: { - auto src_data = data->scalars().int_data().data(); + auto src_data = FIELD_DATA(data, int); std::vector data_raw(src_data.size()); std::copy_n(src_data.data(), src_data.size(), data_raw.data()); return memcpy(dst, data_raw.data(), size); } case DataType::INT32: { - return memcpy(dst, data->scalars().int_data().data().data(), size); + return memcpy(dst, FIELD_DATA(data, int).data(), size); } case DataType::INT64: { - return memcpy(dst, data->scalars().long_data().data().data(), size); + return memcpy(dst, FIELD_DATA(data, long).data(), size); } case DataType::FLOAT: { - return memcpy( - dst, data->scalars().float_data().data().data(), size); + return memcpy(dst, FIELD_DATA(data, float).data(), size); } case DataType::DOUBLE: { - return memcpy( - dst, data->scalars().double_data().data().data(), size); + return memcpy(dst, FIELD_DATA(data, double).data(), size); } case DataType::VARCHAR: { char* dest = reinterpret_cast(dst); - auto begin = data->scalars().string_data().data().begin(); - auto end = data->scalars().string_data().data().end(); - - while (begin != end) { - memcpy(dest, begin->data(), begin->size()); - dest += begin->size(); - begin++; + for (auto& data : FIELD_DATA(data, string)) { + memcpy(dest, data.data(), data.size()); + dest += data.size(); } return dst; } case DataType::JSON: { char* dest = reinterpret_cast(dst); - for (auto& data : data->scalars().json_data().data()) { + for (auto& data : FIELD_DATA(data, json)) { memcpy(dest, data.data(), data.size()); dest += data.size(); } @@ -276,11 +276,10 @@ FillField(DataType data_type, } case DataType::VECTOR_FLOAT: - return memcpy( - dst, data->vectors().float_vector().data().data(), size); + return memcpy(dst, VEC_FIELD_DATA(data, float).data(), size); case DataType::VECTOR_BINARY: - return memcpy(dst, data->vectors().binary_vector().data(), size); + return memcpy(dst, VEC_FIELD_DATA(data, binary), size); default: { PanicInfo("unsupported"); @@ -292,53 +291,59 @@ inline ssize_t WriteFieldData(int fd, DataType data_type, const DataArray* data, size_t size) { switch (data_type) { case DataType::BOOL: { - return write(fd, data->scalars().bool_data().data().data(), size); + return write(fd, FIELD_DATA(data, bool).data(), size); } case DataType::INT8: { - auto src_data = data->scalars().int_data().data(); + auto src_data = FIELD_DATA(data, int); std::vector data_raw(src_data.size()); std::copy_n(src_data.data(), src_data.size(), data_raw.data()); return write(fd, data_raw.data(), size); } case DataType::INT16: { - auto src_data = data->scalars().int_data().data(); + auto src_data = FIELD_DATA(data, int); std::vector data_raw(src_data.size()); std::copy_n(src_data.data(), src_data.size(), data_raw.data()); return write(fd, data_raw.data(), size); } case DataType::INT32: { - return write(fd, data->scalars().int_data().data().data(), size); + return write(fd, FIELD_DATA(data, int).data(), size); } case DataType::INT64: { - return write(fd, data->scalars().long_data().data().data(), size); + return write(fd, FIELD_DATA(data, long).data(), size); } case DataType::FLOAT: { - return write(fd, data->scalars().float_data().data().data(), size); + return write(fd, FIELD_DATA(data, float).data(), size); } case DataType::DOUBLE: { - return write(fd, data->scalars().double_data().data().data(), size); + return write(fd, FIELD_DATA(data, double).data(), size); } case DataType::VARCHAR: { - auto begin = data->scalars().string_data().data().begin(); - auto end = data->scalars().string_data().data().end(); - ssize_t total_written{0}; - while (begin != end) { - ssize_t written = write(fd, begin->data(), begin->size()); - if (written < begin->size()) { + for (auto& str : FIELD_DATA(data, string)) { + ssize_t written = write(fd, str.data(), str.size()); + if (written < str.size()) { + break; + } + total_written += written; + } + return total_written; + } + case DataType::JSON: { + ssize_t total_written{0}; + for (auto& json : FIELD_DATA(data, json)) { + ssize_t written = write(fd, json.data(), json.size()); + if (written < json.size()) { break; } total_written += written; - begin++; } return total_written; } case DataType::VECTOR_FLOAT: - return write( - fd, data->vectors().float_vector().data().data(), size); + return write(fd, VEC_FIELD_DATA(data, float).data(), size); case DataType::VECTOR_BINARY: - return write(fd, data->vectors().binary_vector().data(), size); + return write(fd, VEC_FIELD_DATA(data, binary), size); default: { PanicInfo("unsupported"); diff --git a/internal/core/src/common/VectorTrait.h b/internal/core/src/common/VectorTrait.h index 4c40a68aad..bd8805bcad 100644 --- a/internal/core/src/common/VectorTrait.h +++ b/internal/core/src/common/VectorTrait.h @@ -17,6 +17,7 @@ #pragma once #include "Types.h" #include +#include namespace milvus { @@ -51,7 +52,7 @@ constexpr bool IsVector = std::is_base_of_v; template constexpr bool IsScalar = std::is_fundamental_v || std::is_same_v || - std::is_same_v; + std::is_same_v || std::is_same_v; template struct EmbeddedTypeImpl; diff --git a/internal/core/src/query/SearchOnSealed.cpp b/internal/core/src/query/SearchOnSealed.cpp index adc4001809..22350f5a24 100644 --- a/internal/core/src/query/SearchOnSealed.cpp +++ b/internal/core/src/query/SearchOnSealed.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include +#include #include "common/QueryInfo.h" #include "query/SearchBruteForce.h" diff --git a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp index ae9ee3aa71..29bcd596c7 100644 --- a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp +++ b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp @@ -108,6 +108,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) { segment->mask_with_timestamps(*bitset_holder, timestamp_); segment->mask_with_delete(*bitset_holder, active_count, timestamp_); + // if bitset_holder is all 1's, we got empty result if (bitset_holder->all()) { search_result_opt_ = diff --git a/internal/core/src/segcore/ConcurrentVector.cpp b/internal/core/src/segcore/ConcurrentVector.cpp index 18921955d6..b1ff0944bb 100644 --- a/internal/core/src/segcore/ConcurrentVector.cpp +++ b/internal/core/src/segcore/ConcurrentVector.cpp @@ -10,6 +10,9 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include "segcore/ConcurrentVector.h" +#include "common/Types.h" +#include "common/Utils.h" +#include "nlohmann/json.hpp" namespace milvus::segcore { @@ -21,12 +24,11 @@ VectorBase::set_data_raw(ssize_t element_offset, if (field_meta.is_vector()) { if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { return set_data_raw(element_offset, - data->vectors().float_vector().data().data(), + VEC_FIELD_DATA(data, float).data(), element_count); } else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) { - return set_data_raw(element_offset, - data->vectors().binary_vector().data(), - element_count); + return set_data_raw( + element_offset, VEC_FIELD_DATA(data, binary), element_count); } else { PanicInfo("unsupported"); } @@ -34,52 +36,49 @@ VectorBase::set_data_raw(ssize_t element_offset, switch (field_meta.get_data_type()) { case DataType::BOOL: { - return set_data_raw(element_offset, - data->scalars().bool_data().data().data(), - element_count); + return set_data_raw( + element_offset, FIELD_DATA(data, bool).data(), element_count); } case DataType::INT8: { - auto& src_data = data->scalars().int_data().data(); + auto& src_data = FIELD_DATA(data, int); std::vector data_raw(src_data.size()); std::copy_n(src_data.data(), src_data.size(), data_raw.data()); return set_data_raw(element_offset, data_raw.data(), element_count); } case DataType::INT16: { - auto& src_data = data->scalars().int_data().data(); + auto& src_data = FIELD_DATA(data, int); std::vector data_raw(src_data.size()); std::copy_n(src_data.data(), src_data.size(), data_raw.data()); return set_data_raw(element_offset, data_raw.data(), element_count); } case DataType::INT32: { - return set_data_raw(element_offset, - data->scalars().int_data().data().data(), - element_count); + return set_data_raw( + element_offset, FIELD_DATA(data, int).data(), element_count); } case DataType::INT64: { - return set_data_raw(element_offset, - data->scalars().long_data().data().data(), - element_count); + return set_data_raw( + element_offset, FIELD_DATA(data, long).data(), element_count); } case DataType::FLOAT: { - return set_data_raw(element_offset, - data->scalars().float_data().data().data(), - element_count); + return set_data_raw( + element_offset, FIELD_DATA(data, float).data(), element_count); } case DataType::DOUBLE: { - return set_data_raw(element_offset, - data->scalars().double_data().data().data(), - element_count); + return set_data_raw( + element_offset, FIELD_DATA(data, double).data(), element_count); } case DataType::VARCHAR: { - auto begin = data->scalars().string_data().data().begin(); - auto end = data->scalars().string_data().data().end(); - std::vector data_raw(begin, end); + auto& field_data = FIELD_DATA(data, string); + std::vector data_raw(field_data.begin(), + field_data.end()); return set_data_raw(element_offset, data_raw.data(), element_count); } case DataType::JSON: { - auto begin = data->scalars().json_data().data().begin(); - auto end = data->scalars().json_data().data().end(); - std::vector data_raw(begin, end); + auto json_data = FIELD_DATA(data, json); + std::vector data_raw(json_data.size()); + for (auto& json_bytes : json_data) { + data_raw.emplace_back(Json::parse(json_bytes)); + } return set_data_raw(element_offset, data_raw.data(), element_count); } default: { @@ -95,11 +94,10 @@ VectorBase::fill_chunk_data(ssize_t element_count, const FieldMeta& field_meta) { if (field_meta.is_vector()) { if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { - return fill_chunk_data(data->vectors().float_vector().data().data(), + return fill_chunk_data(VEC_FIELD_DATA(data, float).data(), element_count); } else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) { - return fill_chunk_data(data->vectors().binary_vector().data(), - element_count); + return fill_chunk_data(VEC_FIELD_DATA(data, binary), element_count); } else { PanicInfo("unsupported"); } @@ -107,45 +105,56 @@ VectorBase::fill_chunk_data(ssize_t element_count, switch (field_meta.get_data_type()) { case DataType::BOOL: { - return fill_chunk_data(data->scalars().bool_data().data().data(), + return fill_chunk_data(FIELD_DATA(data, bool).data(), element_count); } case DataType::INT8: { - auto& src_data = data->scalars().int_data().data(); + auto& src_data = FIELD_DATA(data, int); std::vector data_raw(src_data.size()); std::copy_n(src_data.data(), src_data.size(), data_raw.data()); return fill_chunk_data(data_raw.data(), element_count); } case DataType::INT16: { - auto& src_data = data->scalars().int_data().data(); + auto& src_data = FIELD_DATA(data, int); std::vector data_raw(src_data.size()); std::copy_n(src_data.data(), src_data.size(), data_raw.data()); return fill_chunk_data(data_raw.data(), element_count); } case DataType::INT32: { - return fill_chunk_data(data->scalars().int_data().data().data(), - element_count); + return fill_chunk_data(FIELD_DATA(data, int).data(), element_count); } case DataType::INT64: { - return fill_chunk_data(data->scalars().long_data().data().data(), + return fill_chunk_data(FIELD_DATA(data, long).data(), element_count); } case DataType::FLOAT: { - return fill_chunk_data(data->scalars().float_data().data().data(), + return fill_chunk_data(FIELD_DATA(data, float).data(), element_count); } case DataType::DOUBLE: { - return fill_chunk_data(data->scalars().double_data().data().data(), + return fill_chunk_data(FIELD_DATA(data, double).data(), element_count); } case DataType::VARCHAR: { auto vec = static_cast*>(this); - auto count = data->scalars().string_data().data().size(); + auto count = FIELD_DATA(data, string).size(); vec->grow_on_demand(count); auto& chunk = vec->get_chunk(0); size_t index = 0; - for (auto& str : data->scalars().string_data().data()) { + for (auto& str : FIELD_DATA(data, string)) { + chunk[index++] = str; + } + return; + } + case DataType::JSON: { + auto vec = static_cast*>(this); + auto count = FIELD_DATA(data, json).size(); + vec->grow_on_demand(count); + auto& chunk = vec->get_chunk(0); + + size_t index = 0; + for (auto& str : FIELD_DATA(data, json)) { chunk[index++] = str; } return; diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index 9f32a21430..405e6ae643 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -21,6 +21,7 @@ #include "TimestampIndex.h" #include "common/Schema.h" +#include "common/Types.h" #include "segcore/AckResponder.h" #include "segcore/ConcurrentVector.h" #include "segcore/Record.h" @@ -217,12 +218,15 @@ struct InsertRecord { size_per_chunk); break; } - case DataType::JSON: - case DataType::ARRAY: { - this->append_field_data(field_id, - size_per_chunk); + case DataType::JSON: { + this->append_field_data(field_id, size_per_chunk); break; } + // case DataType::ARRAY: { + // this->append_field_data(field_id, + // size_per_chunk); + // break; + // } default: { PanicInfo("unsupported"); } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index e18291e083..dafd4a20fb 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -14,8 +14,11 @@ #include #include #include +#include #include "common/Consts.h" +#include "common/Types.h" +#include "nlohmann/json.hpp" #include "query/PlanNode.h" #include "query/SearchOnSealed.h" #include "segcore/SegmentGrowingImpl.h" @@ -298,6 +301,12 @@ SegmentGrowingImpl::bulk_subscript(FieldId field_id, *vec_ptr, seg_offsets, count, output.data()); return CreateScalarDataArrayFrom(output.data(), count, field_meta); } + case DataType::JSON: { + FixedVector output(count); + bulk_subscript_impl( + *vec_ptr, seg_offsets, count, output.data()); + return CreateScalarDataArrayFrom(output.data(), count, field_meta); + } default: { PanicInfo("unsupported type"); } @@ -327,21 +336,25 @@ SegmentGrowingImpl::bulk_subscript_impl(int64_t element_sizeof, } } -template +template void SegmentGrowingImpl::bulk_subscript_impl(const VectorBase& vec_raw, const int64_t* seg_offsets, int64_t count, void* output_raw) const { - static_assert(IsScalar); - auto vec_ptr = dynamic_cast*>(&vec_raw); + static_assert(IsScalar); + auto vec_ptr = dynamic_cast*>(&vec_raw); AssertInfo(vec_ptr, "Pointer of vec_raw is nullptr"); auto& vec = *vec_ptr; auto output = reinterpret_cast(output_raw); for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; if (offset != INVALID_SEG_OFFSET) { - output[i] = vec[offset]; + if constexpr (std::is_same_v) { + output[i] = vec[offset].dump(); + } else { + output[i] = vec[offset]; + } } } } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 033a833aa7..5665a9d7dd 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -144,7 +144,7 @@ class SegmentGrowingImpl : public SegmentGrowing { get_active_count(Timestamp ts) const override; // for scalar vectors - template + template void bulk_subscript_impl(const VectorBase& vec_raw, const int64_t* seg_offsets, diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index ba99745609..0184af0e5c 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -117,7 +117,6 @@ SegmentInternalInterface::Retrieve(const query::RetrievePlan* plan, auto data = reinterpret_cast(output.data()); auto obj = scalar_array->mutable_long_data(); obj->mutable_data()->Add(data, data + size); - fields_data->AddAllocated(data_array.release()); continue; } diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index d4f6425221..7ec0626a22 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -24,6 +24,7 @@ #include "common/Consts.h" #include "common/FieldMeta.h" #include "common/Types.h" +#include "log/Log.h" #include "nlohmann/json.hpp" #include "query/ScalarIndex.h" #include "query/SearchBruteForce.h" @@ -192,7 +193,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { SystemProperty::Instance().GetSystemFieldType(field_id); if (system_field_type == SystemFieldType::Timestamp) { auto timestamps = reinterpret_cast( - info.field_data->scalars().long_data().data().data()); + FIELD_DATA(info.field_data, long).data()); TimestampIndex index; auto min_slice_length = size < 4096 ? 1 : 4096; @@ -211,7 +212,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { AssertInfo(system_field_type == SystemFieldType::RowId, "System field type of id column is not RowId"); auto row_ids = reinterpret_cast( - info.field_data->scalars().long_data().data().data()); + FIELD_DATA(info.field_data, long).data()); // write data under lock std::unique_lock lck(mutex_); AssertInfo(insert_record_.row_ids_.empty(), "already exists"); @@ -247,15 +248,15 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { break; } case milvus::DataType::JSON: { - column = std::make_unique>( + column = std::make_unique>( get_segment_id(), field_meta, info, [](const char* data, size_t len) { if (len > 0) { - return nlohmann::json::parse(data, data + len); + return Json::parse(data, data + len); } - return nlohmann::json{}; + return Json{}; }); } default: { @@ -420,6 +421,7 @@ SegmentSealedImpl::mask_with_delete(BitsetType& bitset, if (del_barrier == 0) { return; } + auto bitmap_holder = get_deleted_bitmap( del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp); if (!bitmap_holder || !bitmap_holder->bitmap_ptr) { @@ -598,18 +600,18 @@ SegmentSealedImpl::bulk_subscript_impl(const void* src_raw, } } -template +template void SegmentSealedImpl::bulk_subscript_impl(const ColumnBase* column, const int64_t* seg_offsets, int64_t count, void* dst_raw) { - auto field = reinterpret_cast*>(column); + auto field = reinterpret_cast*>(column); auto dst = reinterpret_cast(dst_raw); for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; if (offset != INVALID_SEG_OFFSET) { - dst[i] = std::move(T((*field)[offset])); + dst[i] = std::move(T(field->raw_at(offset))); } } } @@ -685,6 +687,17 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, output.data(), count, field_meta); } + case DataType::JSON: { + FixedVector output(count); + bulk_subscript_impl( + variable_fields_.at(field_id).get(), + seg_offsets, + count, + output.data()); + return CreateScalarDataArrayFrom( + output.data(), count, field_meta); + } + default: PanicInfo( fmt::format("unsupported data type: {}", diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 965c7f053f..e9732d7752 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -138,7 +138,7 @@ class SegmentSealedImpl : public SegmentSealed { int64_t count, void* dst_raw); - template + template static void bulk_subscript_impl(const ColumnBase* field, const int64_t* seg_offsets, diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index 89cfcc57e3..fb7cac7215 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -17,7 +17,7 @@ namespace milvus::segcore { void ParsePksFromFieldData(std::vector& pks, const DataArray& data) { - switch (DataType(data.type())) { + switch (static_cast(data.type())) { case DataType::INT64: { auto source_data = reinterpret_cast( data.scalars().long_data().data().data()); @@ -78,14 +78,14 @@ CreateScalarDataArray(int64_t count, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); - data_array->set_type( - milvus::proto::schema::DataType(field_meta.get_data_type())); + data_array->set_type(static_cast( + field_meta.get_data_type())); auto scalar_array = data_array->mutable_scalars(); switch (data_type) { case DataType::BOOL: { auto obj = scalar_array->mutable_bool_data(); - obj->mutable_data()->Resize(count, 0); + obj->mutable_data()->Resize(count, false); break; } case DataType::INT8: { @@ -121,8 +121,9 @@ CreateScalarDataArray(int64_t count, const FieldMeta& field_meta) { case DataType::VARCHAR: { auto obj = scalar_array->mutable_string_data(); obj->mutable_data()->Reserve(count); - for (auto i = 0; i < count; i++) + for (auto i = 0; i < count; i++) { *(obj->mutable_data()->Add()) = std::string(); + } break; } default: { @@ -138,8 +139,8 @@ CreateVectorDataArray(int64_t count, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); - data_array->set_type( - milvus::proto::schema::DataType(field_meta.get_data_type())); + data_array->set_type(static_cast( + field_meta.get_data_type())); auto vector_array = data_array->mutable_vectors(); auto dim = field_meta.get_dim(); @@ -173,8 +174,8 @@ CreateScalarDataArrayFrom(const void* data_raw, auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); - data_array->set_type( - milvus::proto::schema::DataType(field_meta.get_data_type())); + data_array->set_type(static_cast( + field_meta.get_data_type())); auto scalar_array = data_array->mutable_scalars(); switch (data_type) { @@ -223,8 +224,17 @@ CreateScalarDataArrayFrom(const void* data_raw, case DataType::VARCHAR: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_string_data(); - for (auto i = 0; i < count; i++) + for (auto i = 0; i < count; i++) { *(obj->mutable_data()->Add()) = data[i]; + } + break; + } + case DataType::JSON: { + auto data = reinterpret_cast(data_raw); + auto obj = scalar_array->mutable_json_data(); + for (auto i = 0; i < count; i++) { + *(obj->mutable_data()->Add()) = data[i]; + } break; } default: { @@ -242,8 +252,8 @@ CreateVectorDataArrayFrom(const void* data_raw, auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); - data_array->set_type( - milvus::proto::schema::DataType(field_meta.get_data_type())); + data_array->set_type(static_cast( + field_meta.get_data_type())); auto vector_array = data_array->mutable_vectors(); auto dim = field_meta.get_dim(); @@ -293,8 +303,8 @@ MergeDataArray( auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); - data_array->set_type( - milvus::proto::schema::DataType(field_meta.get_data_type())); + data_array->set_type(static_cast( + field_meta.get_data_type())); for (auto& result_pair : result_offsets) { auto src_field_data = @@ -307,8 +317,7 @@ MergeDataArray( auto dim = field_meta.get_dim(); vector_array->set_dim(dim); if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { - auto data = - src_field_data->vectors().float_vector().data().data(); + auto data = VEC_FIELD_DATA(src_field_data, float).data(); auto obj = vector_array->mutable_float_vector(); obj->mutable_data()->Add(data + src_offset * dim, data + (src_offset + 1) * dim); @@ -317,7 +326,7 @@ MergeDataArray( dim % 8 == 0, "Binary vector field dimension is not a multiple of 8"); auto num_bytes = dim / 8; - auto data = src_field_data->vectors().binary_vector().data(); + auto data = VEC_FIELD_DATA(src_field_data, binary); auto obj = vector_array->mutable_binary_vector(); obj->assign(data + src_offset * num_bytes, num_bytes); } else { @@ -329,7 +338,7 @@ MergeDataArray( auto scalar_array = data_array->mutable_scalars(); switch (data_type) { case DataType::BOOL: { - auto data = src_field_data->scalars().bool_data().data().data(); + auto data = FIELD_DATA(src_field_data, bool).data(); auto obj = scalar_array->mutable_bool_data(); *(obj->mutable_data()->Add()) = data[src_offset]; continue; @@ -337,27 +346,25 @@ MergeDataArray( case DataType::INT8: case DataType::INT16: case DataType::INT32: { - auto data = src_field_data->scalars().int_data().data().data(); + auto data = FIELD_DATA(src_field_data, int).data(); auto obj = scalar_array->mutable_int_data(); *(obj->mutable_data()->Add()) = data[src_offset]; continue; } case DataType::INT64: { - auto data = src_field_data->scalars().long_data().data().data(); + auto data = FIELD_DATA(src_field_data, long).data(); auto obj = scalar_array->mutable_long_data(); *(obj->mutable_data()->Add()) = data[src_offset]; continue; } case DataType::FLOAT: { - auto data = - src_field_data->scalars().float_data().data().data(); + auto data = FIELD_DATA(src_field_data, float).data(); auto obj = scalar_array->mutable_float_data(); *(obj->mutable_data()->Add()) = data[src_offset]; continue; } case DataType::DOUBLE: { - auto data = - src_field_data->scalars().double_data().data().data(); + auto data = FIELD_DATA(src_field_data, double).data(); auto obj = scalar_array->mutable_double_data(); *(obj->mutable_data()->Add()) = data[src_offset]; continue; @@ -386,8 +393,8 @@ ReverseDataFromIndex(const index::IndexBase* index, auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); - data_array->set_type( - milvus::proto::schema::DataType(field_meta.get_data_type())); + data_array->set_type(static_cast( + field_meta.get_data_type())); auto scalar_array = data_array->mutable_scalars(); switch (data_type) { diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h index 5c6f0cb123..2a1869fd22 100644 --- a/internal/core/src/segcore/Utils.h +++ b/internal/core/src/segcore/Utils.h @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index d597b4ed08..bd5fb4f178 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -12,6 +12,7 @@ #include #include +#include "common/Types.h" #include "segcore/SegmentSealedImpl.h" #include "test_utils/DataGen.h" #include "index/IndexFactory.h" @@ -384,6 +385,7 @@ TEST(Sealed, LoadFieldData) { schema->AddDebugField("int8", DataType::INT8); schema->AddDebugField("int16", DataType::INT16); schema->AddDebugField("float", DataType::FLOAT); + schema->AddDebugField("json", DataType::JSON); schema->set_primary_field_id(counter_id); auto dataset = DataGen(schema, N); @@ -480,6 +482,7 @@ TEST(Sealed, LoadFieldDataMmap) { schema->AddDebugField("int8", DataType::INT8); schema->AddDebugField("int16", DataType::INT16); schema->AddDebugField("float", DataType::FLOAT); + schema->AddDebugField("json", DataType::JSON); schema->set_primary_field_id(counter_id); auto dataset = DataGen(schema, N); @@ -764,7 +767,8 @@ TEST(Sealed, OverlapDelete) { auto N = 10; auto metric_type = knowhere::metric::L2; auto schema = std::make_shared(); - auto fakevec_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type); + auto fakevec_id = schema->AddDebugField( + "fakevec", DataType::VECTOR_FLOAT, dim, metric_type); auto counter_id = schema->AddDebugField("counter", DataType::INT64); auto double_id = schema->AddDebugField("double", DataType::DOUBLE); auto nothing_id = schema->AddDebugField("nothing", DataType::INT32); @@ -807,7 +811,8 @@ TEST(Sealed, OverlapDelete) { auto plan = CreatePlan(*schema, dsl); auto num_queries = 5; auto ph_group_raw = CreatePlaceholderGroup(num_queries, 16, 1024); - auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString()); + auto ph_group = + ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString()); ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time)); @@ -822,7 +827,8 @@ TEST(Sealed, OverlapDelete) { LoadDeletedRecordInfo info = {timestamps.data(), ids.get(), row_count}; segment->LoadDeletedRecord(info); ASSERT_EQ(segment->get_deleted_count(), pks.size()) - << "deleted_count=" << segment->get_deleted_count() << " pks_count=" << pks.size() << std::endl; + << "deleted_count=" << segment->get_deleted_count() + << " pks_count=" << pks.size() << std::endl; // Load overlapping delete records row_count += 3; @@ -830,16 +836,19 @@ TEST(Sealed, OverlapDelete) { auto new_ids = std::make_unique(); new_ids->mutable_int_id()->mutable_data()->Add(pks.begin(), pks.end()); timestamps.insert(timestamps.end(), {11, 11, 11}); - LoadDeletedRecordInfo overlap_info = {timestamps.data(), new_ids.get(), row_count}; + LoadDeletedRecordInfo overlap_info = { + timestamps.data(), new_ids.get(), row_count}; segment->LoadDeletedRecord(overlap_info); BitsetType bitset(N, false); // NOTE: need to change delete timestamp, so not to hit the cache ASSERT_EQ(segment->get_deleted_count(), pks.size()) - << "deleted_count=" << segment->get_deleted_count() << " pks_count=" << pks.size() << std::endl; + << "deleted_count=" << segment->get_deleted_count() + << " pks_count=" << pks.size() << std::endl; segment->mask_with_delete(bitset, 10, 12); ASSERT_EQ(bitset.count(), pks.size()) - << "bitset_count=" << bitset.count() << " pks_count=" << pks.size() << std::endl; + << "bitset_count=" << bitset.count() << " pks_count=" << pks.size() + << std::endl; } auto diff --git a/internal/core/unittest/test_string_expr.cpp b/internal/core/unittest/test_string_expr.cpp index 43b34abe21..782ea3cbb2 100644 --- a/internal/core/unittest/test_string_expr.cpp +++ b/internal/core/unittest/test_string_expr.cpp @@ -294,8 +294,8 @@ TEST(StringExpr, Term) { for (int iter = 0; iter < num_iters; ++iter) { auto raw_data = DataGen(schema, N, iter); auto new_str_col = raw_data.get_col(str_meta.get_id()); - auto begin = new_str_col->scalars().string_data().data().begin(); - auto end = new_str_col->scalars().string_data().data().end(); + auto begin = FIELD_DATA(new_str_col, string).begin(); + auto end = FIELD_DATA(new_str_col, string).end(); str_col.insert(str_col.end(), begin, end); seg->PreInsert(N); seg->Insert(iter * N, @@ -396,8 +396,8 @@ TEST(StringExpr, Compare) { auto reserve_col = [&, raw_data](const FieldMeta& field_meta, std::vector& str_col) { auto new_str_col = raw_data.get_col(field_meta.get_id()); - auto begin = new_str_col->scalars().string_data().data().begin(); - auto end = new_str_col->scalars().string_data().data().end(); + auto begin = FIELD_DATA(new_str_col, string).begin(); + auto end = FIELD_DATA(new_str_col, string).end(); str_col.insert(str_col.end(), begin, end); }; @@ -495,8 +495,8 @@ TEST(StringExpr, UnaryRange) { for (int iter = 0; iter < num_iters; ++iter) { auto raw_data = DataGen(schema, N, iter); auto new_str_col = raw_data.get_col(str_meta.get_id()); - auto begin = new_str_col->scalars().string_data().data().begin(); - auto end = new_str_col->scalars().string_data().data().end(); + auto begin = FIELD_DATA(new_str_col, string).begin(); + auto end = FIELD_DATA(new_str_col, string).end(); str_col.insert(str_col.end(), begin, end); seg->PreInsert(N); seg->Insert(iter * N, @@ -599,8 +599,8 @@ TEST(StringExpr, BinaryRange) { for (int iter = 0; iter < num_iters; ++iter) { auto raw_data = DataGen(schema, N, iter); auto new_str_col = raw_data.get_col(str_meta.get_id()); - auto begin = new_str_col->scalars().string_data().data().begin(); - auto end = new_str_col->scalars().string_data().data().end(); + auto begin = FIELD_DATA(new_str_col, string).begin(); + auto end = FIELD_DATA(new_str_col, string).end(); str_col.insert(str_col.end(), begin, end); seg->PreInsert(N); seg->Insert(iter * N, diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index f4f8dbb62f..26344bf357 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -303,6 +303,17 @@ DataGen(SchemaPtr schema, insert_cols(data, N, field_meta); break; } + case DataType::JSON: { + vector data(N); + for (int i = 0; i < N / repeat_count; i++) { + auto str = R"({"key":)" + std::to_string(er()) + "}"; + for (int j = 0; j < repeat_count; j++) { + data[i * repeat_count + j] = str; + } + } + insert_cols(data, N, field_meta); + break; + } default: { throw std::runtime_error("unimplemented"); } @@ -517,8 +528,7 @@ GenVecIndexing(int64_t N, int64_t dim, const float* vec) { {knowhere::meta::DEVICE_ID, 0}}; auto database = knowhere::GenDataSet(N, dim, vec); auto indexing = std::make_unique( - knowhere::IndexEnum::INDEX_FAISS_IVFFLAT, - knowhere::metric::L2); + knowhere::IndexEnum::INDEX_FAISS_IVFFLAT, knowhere::metric::L2); indexing->BuildWithDataset(database, conf); return indexing; } diff --git a/internal/querynodev2/segments/retrieve_test.go b/internal/querynodev2/segments/retrieve_test.go index f429e374c4..f16a3d34ab 100644 --- a/internal/querynodev2/segments/retrieve_test.go +++ b/internal/querynodev2/segments/retrieve_test.go @@ -103,7 +103,8 @@ func (suite *RetrieveSuite) SetupTest() { suite.Require().NoError(err) insertRecord, err = storage.TransferInsertMsgToInsertRecord(suite.collection.Schema(), insertMsg) suite.Require().NoError(err) - suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord) + err = suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord) + suite.Require().NoError(err) suite.manager.Segment.Put(SegmentTypeSealed, suite.sealed) suite.manager.Segment.Put(SegmentTypeGrowing, suite.growing) diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index 068584ecf5..a7c50cd323 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -83,7 +83,8 @@ func (suite *SegmentSuite) SetupTest() { suite.Require().NoError(err) insertRecord, err = storage.TransferInsertMsgToInsertRecord(suite.collection.Schema(), insertMsg) suite.Require().NoError(err) - suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord) + err = suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord) + suite.Require().NoError(err) suite.manager.Segment.Put(SegmentTypeSealed, suite.sealed) suite.manager.Segment.Put(SegmentTypeGrowing, suite.growing) diff --git a/internal/storage/payload.go b/internal/storage/payload.go index 37f5fe079b..6876bfb1fe 100644 --- a/internal/storage/payload.go +++ b/internal/storage/payload.go @@ -30,7 +30,6 @@ import ( "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" - "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" @@ -330,9 +329,6 @@ func (w *PayloadWriter) AddOneJSONToPayload(msg []byte) error { length := len(bytes) cmsg := (*C.uint8_t)(unsafe.Pointer(&bytes[0])) clength := C.int(length) - // defer C.free(unsafe.Pointer(cmsg)) - - log.Debug("yah01", zap.String("jsonBytes", string(bytes))) status := C.AddOneJSONToPayload(w.payloadWriterPtr, cmsg, clength) return HandleCStatus(&status, "AddOneJSONToPayload failed")