Support to retrieve json (#23563)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/23595/head
yah01 2023-04-21 11:46:32 +08:00 committed by GitHub
parent 03ec804e68
commit 546080dcdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 233 additions and 167 deletions

View File

@ -24,6 +24,7 @@
#include "common/FieldMeta.h"
#include "common/LoadInfo.h"
#include "common/Span.h"
#include "common/Types.h"
#include "common/Utils.h"
#include "exceptions/EasyAssert.h"
#include "fmt/core.h"
@ -32,8 +33,6 @@
namespace milvus::segcore {
#define FIELD_DATA(info, field) (info->scalars().field##_data().data())
struct Entry {
char* data;
uint32_t length;
@ -113,11 +112,11 @@ class VariableColumn : public ColumnBase {
const FieldMeta& field_meta,
const LoadFieldDataInfo& info,
Ctor&& ctor) {
auto begin = info.field_data->scalars().string_data().data().begin();
auto end = info.field_data->scalars().string_data().data().end();
if constexpr (std::is_same_v<T, nlohmann::json>) {
begin = info.field_data->scalars().json_data().data().begin();
end = info.field_data->scalars().json_data().data().end();
auto begin = FIELD_DATA(info.field_data, string).begin();
auto end = FIELD_DATA(info.field_data, string).end();
if constexpr (std::is_same_v<T, Json>) {
begin = FIELD_DATA(info.field_data, json).begin();
end = FIELD_DATA(info.field_data, json).end();
}
indices_.reserve(info.row_count);
@ -155,6 +154,13 @@ class VariableColumn : public ColumnBase {
return views_[i];
}
std::string_view
raw_at(const int i) const {
size_t len = (i == indices_.size() - 1) ? size_ - indices_.back()
: indices_[i + 1] - indices_[i];
return std::string_view(data_ + indices_[i], len);
}
protected:
template <typename Ctor>
void
@ -166,18 +172,6 @@ class VariableColumn : public ColumnBase {
}
views_.emplace_back(
ctor(data_ + indices_.back(), size_ - indices_.back()));
// as we stores the json objects entirely in memory,
// the raw data is not needed anymore
if constexpr (std::is_same_v<T, nlohmann::json>) {
if (munmap(data_, size_)) {
AssertInfo(
true,
fmt::format(
"failed to unmap json field after deserialized, err={}",
strerror(errno)));
}
}
}
private:

View File

@ -19,6 +19,7 @@
#include <tbb/concurrent_unordered_map.h>
#include <tbb/concurrent_unordered_set.h>
#include <nlohmann/json.hpp>
#include <NamedType/named_type.hpp>
#include <boost/align/aligned_allocator.hpp>
#include <boost/container/vector.hpp>
@ -78,6 +79,7 @@ using VectorArray = proto::schema::VectorField;
using IdArray = proto::schema::IDs;
using InsertData = proto::segcore::InsertRecord;
using PkType = std::variant<std::monostate, int64_t, std::string>;
using Json = nlohmann::json;
inline bool
IsPrimaryKeyDataType(DataType data_type) {

View File

@ -34,6 +34,12 @@
#include "knowhere/expected.h"
namespace milvus {
#define FIELD_DATA(data_array, type) \
(data_array->scalars().type##_data().data())
#define VEC_FIELD_DATA(data_array, type) \
(data_array->vectors().type##_vector().data())
inline DatasetPtr
GenDataset(const int64_t nb, const int64_t dim, const void* xb) {
return knowhere::GenDataSet(nb, dim, xb);
@ -196,14 +202,14 @@ GetDataSize(const FieldMeta& field, size_t row_count, const DataArray* data) {
case DataType::VARCHAR:
case DataType::STRING: {
ssize_t size{};
for (auto& data : data->scalars().string_data().data()) {
for (auto& data : FIELD_DATA(data, string)) {
size += data.size();
}
return size;
}
case DataType::JSON: {
ssize_t size{};
for (auto& data : data->scalars().json_data().data()) {
for (auto& data : FIELD_DATA(data, json)) {
size += data.size();
}
return size;
@ -225,50 +231,44 @@ FillField(DataType data_type,
auto data = info.field_data;
switch (data_type) {
case DataType::BOOL: {
return memcpy(dst, data->scalars().bool_data().data().data(), size);
return memcpy(dst, FIELD_DATA(data, bool).data(), size);
}
case DataType::INT8: {
auto src_data = data->scalars().int_data().data();
auto src_data = FIELD_DATA(data, int);
std::vector<int8_t> data_raw(src_data.size());
std::copy_n(src_data.data(), src_data.size(), data_raw.data());
return memcpy(dst, data_raw.data(), size);
}
case DataType::INT16: {
auto src_data = data->scalars().int_data().data();
auto src_data = FIELD_DATA(data, int);
std::vector<int16_t> data_raw(src_data.size());
std::copy_n(src_data.data(), src_data.size(), data_raw.data());
return memcpy(dst, data_raw.data(), size);
}
case DataType::INT32: {
return memcpy(dst, data->scalars().int_data().data().data(), size);
return memcpy(dst, FIELD_DATA(data, int).data(), size);
}
case DataType::INT64: {
return memcpy(dst, data->scalars().long_data().data().data(), size);
return memcpy(dst, FIELD_DATA(data, long).data(), size);
}
case DataType::FLOAT: {
return memcpy(
dst, data->scalars().float_data().data().data(), size);
return memcpy(dst, FIELD_DATA(data, float).data(), size);
}
case DataType::DOUBLE: {
return memcpy(
dst, data->scalars().double_data().data().data(), size);
return memcpy(dst, FIELD_DATA(data, double).data(), size);
}
case DataType::VARCHAR: {
char* dest = reinterpret_cast<char*>(dst);
auto begin = data->scalars().string_data().data().begin();
auto end = data->scalars().string_data().data().end();
while (begin != end) {
memcpy(dest, begin->data(), begin->size());
dest += begin->size();
begin++;
for (auto& data : FIELD_DATA(data, string)) {
memcpy(dest, data.data(), data.size());
dest += data.size();
}
return dst;
}
case DataType::JSON: {
char* dest = reinterpret_cast<char*>(dst);
for (auto& data : data->scalars().json_data().data()) {
for (auto& data : FIELD_DATA(data, json)) {
memcpy(dest, data.data(), data.size());
dest += data.size();
}
@ -276,11 +276,10 @@ FillField(DataType data_type,
}
case DataType::VECTOR_FLOAT:
return memcpy(
dst, data->vectors().float_vector().data().data(), size);
return memcpy(dst, VEC_FIELD_DATA(data, float).data(), size);
case DataType::VECTOR_BINARY:
return memcpy(dst, data->vectors().binary_vector().data(), size);
return memcpy(dst, VEC_FIELD_DATA(data, binary), size);
default: {
PanicInfo("unsupported");
@ -292,53 +291,59 @@ inline ssize_t
WriteFieldData(int fd, DataType data_type, const DataArray* data, size_t size) {
switch (data_type) {
case DataType::BOOL: {
return write(fd, data->scalars().bool_data().data().data(), size);
return write(fd, FIELD_DATA(data, bool).data(), size);
}
case DataType::INT8: {
auto src_data = data->scalars().int_data().data();
auto src_data = FIELD_DATA(data, int);
std::vector<int8_t> data_raw(src_data.size());
std::copy_n(src_data.data(), src_data.size(), data_raw.data());
return write(fd, data_raw.data(), size);
}
case DataType::INT16: {
auto src_data = data->scalars().int_data().data();
auto src_data = FIELD_DATA(data, int);
std::vector<int16_t> data_raw(src_data.size());
std::copy_n(src_data.data(), src_data.size(), data_raw.data());
return write(fd, data_raw.data(), size);
}
case DataType::INT32: {
return write(fd, data->scalars().int_data().data().data(), size);
return write(fd, FIELD_DATA(data, int).data(), size);
}
case DataType::INT64: {
return write(fd, data->scalars().long_data().data().data(), size);
return write(fd, FIELD_DATA(data, long).data(), size);
}
case DataType::FLOAT: {
return write(fd, data->scalars().float_data().data().data(), size);
return write(fd, FIELD_DATA(data, float).data(), size);
}
case DataType::DOUBLE: {
return write(fd, data->scalars().double_data().data().data(), size);
return write(fd, FIELD_DATA(data, double).data(), size);
}
case DataType::VARCHAR: {
auto begin = data->scalars().string_data().data().begin();
auto end = data->scalars().string_data().data().end();
ssize_t total_written{0};
while (begin != end) {
ssize_t written = write(fd, begin->data(), begin->size());
if (written < begin->size()) {
for (auto& str : FIELD_DATA(data, string)) {
ssize_t written = write(fd, str.data(), str.size());
if (written < str.size()) {
break;
}
total_written += written;
}
return total_written;
}
case DataType::JSON: {
ssize_t total_written{0};
for (auto& json : FIELD_DATA(data, json)) {
ssize_t written = write(fd, json.data(), json.size());
if (written < json.size()) {
break;
}
total_written += written;
begin++;
}
return total_written;
}
case DataType::VECTOR_FLOAT:
return write(
fd, data->vectors().float_vector().data().data(), size);
return write(fd, VEC_FIELD_DATA(data, float).data(), size);
case DataType::VECTOR_BINARY:
return write(fd, data->vectors().binary_vector().data(), size);
return write(fd, VEC_FIELD_DATA(data, binary), size);
default: {
PanicInfo("unsupported");

View File

@ -17,6 +17,7 @@
#pragma once
#include "Types.h"
#include <string>
#include <type_traits>
namespace milvus {
@ -51,7 +52,7 @@ constexpr bool IsVector = std::is_base_of_v<VectorTrait, T>;
template <typename T>
constexpr bool IsScalar =
std::is_fundamental_v<T> || std::is_same_v<T, std::string> ||
std::is_same_v<T, std::string_view>;
std::is_same_v<T, Json> || std::is_same_v<T, std::string_view>;
template <typename T, typename Enabled = void>
struct EmbeddedTypeImpl;

View File

@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <cmath>
#include <string>
#include "common/QueryInfo.h"
#include "query/SearchBruteForce.h"

View File

@ -108,6 +108,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
segment->mask_with_timestamps(*bitset_holder, timestamp_);
segment->mask_with_delete(*bitset_holder, active_count, timestamp_);
// if bitset_holder is all 1's, we got empty result
if (bitset_holder->all()) {
search_result_opt_ =

View File

@ -10,6 +10,9 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "segcore/ConcurrentVector.h"
#include "common/Types.h"
#include "common/Utils.h"
#include "nlohmann/json.hpp"
namespace milvus::segcore {
@ -21,12 +24,11 @@ VectorBase::set_data_raw(ssize_t element_offset,
if (field_meta.is_vector()) {
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
return set_data_raw(element_offset,
data->vectors().float_vector().data().data(),
VEC_FIELD_DATA(data, float).data(),
element_count);
} else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) {
return set_data_raw(element_offset,
data->vectors().binary_vector().data(),
element_count);
return set_data_raw(
element_offset, VEC_FIELD_DATA(data, binary), element_count);
} else {
PanicInfo("unsupported");
}
@ -34,52 +36,49 @@ VectorBase::set_data_raw(ssize_t element_offset,
switch (field_meta.get_data_type()) {
case DataType::BOOL: {
return set_data_raw(element_offset,
data->scalars().bool_data().data().data(),
element_count);
return set_data_raw(
element_offset, FIELD_DATA(data, bool).data(), element_count);
}
case DataType::INT8: {
auto& src_data = data->scalars().int_data().data();
auto& src_data = FIELD_DATA(data, int);
std::vector<int8_t> data_raw(src_data.size());
std::copy_n(src_data.data(), src_data.size(), data_raw.data());
return set_data_raw(element_offset, data_raw.data(), element_count);
}
case DataType::INT16: {
auto& src_data = data->scalars().int_data().data();
auto& src_data = FIELD_DATA(data, int);
std::vector<int16_t> data_raw(src_data.size());
std::copy_n(src_data.data(), src_data.size(), data_raw.data());
return set_data_raw(element_offset, data_raw.data(), element_count);
}
case DataType::INT32: {
return set_data_raw(element_offset,
data->scalars().int_data().data().data(),
element_count);
return set_data_raw(
element_offset, FIELD_DATA(data, int).data(), element_count);
}
case DataType::INT64: {
return set_data_raw(element_offset,
data->scalars().long_data().data().data(),
element_count);
return set_data_raw(
element_offset, FIELD_DATA(data, long).data(), element_count);
}
case DataType::FLOAT: {
return set_data_raw(element_offset,
data->scalars().float_data().data().data(),
element_count);
return set_data_raw(
element_offset, FIELD_DATA(data, float).data(), element_count);
}
case DataType::DOUBLE: {
return set_data_raw(element_offset,
data->scalars().double_data().data().data(),
element_count);
return set_data_raw(
element_offset, FIELD_DATA(data, double).data(), element_count);
}
case DataType::VARCHAR: {
auto begin = data->scalars().string_data().data().begin();
auto end = data->scalars().string_data().data().end();
std::vector<std::string> data_raw(begin, end);
auto& field_data = FIELD_DATA(data, string);
std::vector<std::string> data_raw(field_data.begin(),
field_data.end());
return set_data_raw(element_offset, data_raw.data(), element_count);
}
case DataType::JSON: {
auto begin = data->scalars().json_data().data().begin();
auto end = data->scalars().json_data().data().end();
std::vector<std::string> data_raw(begin, end);
auto json_data = FIELD_DATA(data, json);
std::vector<Json> data_raw(json_data.size());
for (auto& json_bytes : json_data) {
data_raw.emplace_back(Json::parse(json_bytes));
}
return set_data_raw(element_offset, data_raw.data(), element_count);
}
default: {
@ -95,11 +94,10 @@ VectorBase::fill_chunk_data(ssize_t element_count,
const FieldMeta& field_meta) {
if (field_meta.is_vector()) {
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
return fill_chunk_data(data->vectors().float_vector().data().data(),
return fill_chunk_data(VEC_FIELD_DATA(data, float).data(),
element_count);
} else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) {
return fill_chunk_data(data->vectors().binary_vector().data(),
element_count);
return fill_chunk_data(VEC_FIELD_DATA(data, binary), element_count);
} else {
PanicInfo("unsupported");
}
@ -107,45 +105,56 @@ VectorBase::fill_chunk_data(ssize_t element_count,
switch (field_meta.get_data_type()) {
case DataType::BOOL: {
return fill_chunk_data(data->scalars().bool_data().data().data(),
return fill_chunk_data(FIELD_DATA(data, bool).data(),
element_count);
}
case DataType::INT8: {
auto& src_data = data->scalars().int_data().data();
auto& src_data = FIELD_DATA(data, int);
std::vector<int8_t> data_raw(src_data.size());
std::copy_n(src_data.data(), src_data.size(), data_raw.data());
return fill_chunk_data(data_raw.data(), element_count);
}
case DataType::INT16: {
auto& src_data = data->scalars().int_data().data();
auto& src_data = FIELD_DATA(data, int);
std::vector<int16_t> data_raw(src_data.size());
std::copy_n(src_data.data(), src_data.size(), data_raw.data());
return fill_chunk_data(data_raw.data(), element_count);
}
case DataType::INT32: {
return fill_chunk_data(data->scalars().int_data().data().data(),
element_count);
return fill_chunk_data(FIELD_DATA(data, int).data(), element_count);
}
case DataType::INT64: {
return fill_chunk_data(data->scalars().long_data().data().data(),
return fill_chunk_data(FIELD_DATA(data, long).data(),
element_count);
}
case DataType::FLOAT: {
return fill_chunk_data(data->scalars().float_data().data().data(),
return fill_chunk_data(FIELD_DATA(data, float).data(),
element_count);
}
case DataType::DOUBLE: {
return fill_chunk_data(data->scalars().double_data().data().data(),
return fill_chunk_data(FIELD_DATA(data, double).data(),
element_count);
}
case DataType::VARCHAR: {
auto vec = static_cast<ConcurrentVector<std::string>*>(this);
auto count = data->scalars().string_data().data().size();
auto count = FIELD_DATA(data, string).size();
vec->grow_on_demand(count);
auto& chunk = vec->get_chunk(0);
size_t index = 0;
for (auto& str : data->scalars().string_data().data()) {
for (auto& str : FIELD_DATA(data, string)) {
chunk[index++] = str;
}
return;
}
case DataType::JSON: {
auto vec = static_cast<ConcurrentVector<Json>*>(this);
auto count = FIELD_DATA(data, json).size();
vec->grow_on_demand(count);
auto& chunk = vec->get_chunk(0);
size_t index = 0;
for (auto& str : FIELD_DATA(data, json)) {
chunk[index++] = str;
}
return;

View File

@ -21,6 +21,7 @@
#include "TimestampIndex.h"
#include "common/Schema.h"
#include "common/Types.h"
#include "segcore/AckResponder.h"
#include "segcore/ConcurrentVector.h"
#include "segcore/Record.h"
@ -217,12 +218,15 @@ struct InsertRecord {
size_per_chunk);
break;
}
case DataType::JSON:
case DataType::ARRAY: {
this->append_field_data<std::string>(field_id,
size_per_chunk);
case DataType::JSON: {
this->append_field_data<Json>(field_id, size_per_chunk);
break;
}
// case DataType::ARRAY: {
// this->append_field_data<std::string>(field_id,
// size_per_chunk);
// break;
// }
default: {
PanicInfo("unsupported");
}

View File

@ -14,8 +14,11 @@
#include <queue>
#include <thread>
#include <boost/iterator/counting_iterator.hpp>
#include <type_traits>
#include "common/Consts.h"
#include "common/Types.h"
#include "nlohmann/json.hpp"
#include "query/PlanNode.h"
#include "query/SearchOnSealed.h"
#include "segcore/SegmentGrowingImpl.h"
@ -298,6 +301,12 @@ SegmentGrowingImpl::bulk_subscript(FieldId field_id,
*vec_ptr, seg_offsets, count, output.data());
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
}
case DataType::JSON: {
FixedVector<std::string> output(count);
bulk_subscript_impl<Json, std::string>(
*vec_ptr, seg_offsets, count, output.data());
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
}
default: {
PanicInfo("unsupported type");
}
@ -327,23 +336,27 @@ SegmentGrowingImpl::bulk_subscript_impl(int64_t element_sizeof,
}
}
template <typename T>
template <typename S, typename T>
void
SegmentGrowingImpl::bulk_subscript_impl(const VectorBase& vec_raw,
const int64_t* seg_offsets,
int64_t count,
void* output_raw) const {
static_assert(IsScalar<T>);
auto vec_ptr = dynamic_cast<const ConcurrentVector<T>*>(&vec_raw);
static_assert(IsScalar<S>);
auto vec_ptr = dynamic_cast<const ConcurrentVector<S>*>(&vec_raw);
AssertInfo(vec_ptr, "Pointer of vec_raw is nullptr");
auto& vec = *vec_ptr;
auto output = reinterpret_cast<T*>(output_raw);
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
if (offset != INVALID_SEG_OFFSET) {
if constexpr (std::is_same_v<S, Json>) {
output[i] = vec[offset].dump();
} else {
output[i] = vec[offset];
}
}
}
}
void

View File

@ -144,7 +144,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
get_active_count(Timestamp ts) const override;
// for scalar vectors
template <typename T>
template <typename S, typename T = S>
void
bulk_subscript_impl(const VectorBase& vec_raw,
const int64_t* seg_offsets,

View File

@ -117,7 +117,6 @@ SegmentInternalInterface::Retrieve(const query::RetrievePlan* plan,
auto data = reinterpret_cast<const int64_t*>(output.data());
auto obj = scalar_array->mutable_long_data();
obj->mutable_data()->Add(data, data + size);
fields_data->AddAllocated(data_array.release());
continue;
}

View File

@ -24,6 +24,7 @@
#include "common/Consts.h"
#include "common/FieldMeta.h"
#include "common/Types.h"
#include "log/Log.h"
#include "nlohmann/json.hpp"
#include "query/ScalarIndex.h"
#include "query/SearchBruteForce.h"
@ -192,7 +193,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
SystemProperty::Instance().GetSystemFieldType(field_id);
if (system_field_type == SystemFieldType::Timestamp) {
auto timestamps = reinterpret_cast<const Timestamp*>(
info.field_data->scalars().long_data().data().data());
FIELD_DATA(info.field_data, long).data());
TimestampIndex index;
auto min_slice_length = size < 4096 ? 1 : 4096;
@ -211,7 +212,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
AssertInfo(system_field_type == SystemFieldType::RowId,
"System field type of id column is not RowId");
auto row_ids = reinterpret_cast<const idx_t*>(
info.field_data->scalars().long_data().data().data());
FIELD_DATA(info.field_data, long).data());
// write data under lock
std::unique_lock lck(mutex_);
AssertInfo(insert_record_.row_ids_.empty(), "already exists");
@ -247,15 +248,15 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
break;
}
case milvus::DataType::JSON: {
column = std::make_unique<VariableColumn<nlohmann::json>>(
column = std::make_unique<VariableColumn<Json>>(
get_segment_id(),
field_meta,
info,
[](const char* data, size_t len) {
if (len > 0) {
return nlohmann::json::parse(data, data + len);
return Json::parse(data, data + len);
}
return nlohmann::json{};
return Json{};
});
}
default: {
@ -420,6 +421,7 @@ SegmentSealedImpl::mask_with_delete(BitsetType& bitset,
if (del_barrier == 0) {
return;
}
auto bitmap_holder = get_deleted_bitmap(
del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp);
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
@ -598,18 +600,18 @@ SegmentSealedImpl::bulk_subscript_impl(const void* src_raw,
}
}
template <typename T>
template <typename S, typename T>
void
SegmentSealedImpl::bulk_subscript_impl(const ColumnBase* column,
const int64_t* seg_offsets,
int64_t count,
void* dst_raw) {
auto field = reinterpret_cast<const VariableColumn<T>*>(column);
auto field = reinterpret_cast<const VariableColumn<S>*>(column);
auto dst = reinterpret_cast<T*>(dst_raw);
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
if (offset != INVALID_SEG_OFFSET) {
dst[i] = std::move(T((*field)[offset]));
dst[i] = std::move(T(field->raw_at(offset)));
}
}
}
@ -685,6 +687,17 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
output.data(), count, field_meta);
}
case DataType::JSON: {
FixedVector<std::string> output(count);
bulk_subscript_impl<Json, std::string>(
variable_fields_.at(field_id).get(),
seg_offsets,
count,
output.data());
return CreateScalarDataArrayFrom(
output.data(), count, field_meta);
}
default:
PanicInfo(
fmt::format("unsupported data type: {}",

View File

@ -138,7 +138,7 @@ class SegmentSealedImpl : public SegmentSealed {
int64_t count,
void* dst_raw);
template <typename T>
template <typename S, typename T = S>
static void
bulk_subscript_impl(const ColumnBase* field,
const int64_t* seg_offsets,

View File

@ -17,7 +17,7 @@ namespace milvus::segcore {
void
ParsePksFromFieldData(std::vector<PkType>& pks, const DataArray& data) {
switch (DataType(data.type())) {
switch (static_cast<DataType>(data.type())) {
case DataType::INT64: {
auto source_data = reinterpret_cast<const int64_t*>(
data.scalars().long_data().data().data());
@ -78,14 +78,14 @@ CreateScalarDataArray(int64_t count, const FieldMeta& field_meta) {
auto data_type = field_meta.get_data_type();
auto data_array = std::make_unique<DataArray>();
data_array->set_field_id(field_meta.get_id().get());
data_array->set_type(
milvus::proto::schema::DataType(field_meta.get_data_type()));
data_array->set_type(static_cast<milvus::proto::schema::DataType>(
field_meta.get_data_type()));
auto scalar_array = data_array->mutable_scalars();
switch (data_type) {
case DataType::BOOL: {
auto obj = scalar_array->mutable_bool_data();
obj->mutable_data()->Resize(count, 0);
obj->mutable_data()->Resize(count, false);
break;
}
case DataType::INT8: {
@ -121,8 +121,9 @@ CreateScalarDataArray(int64_t count, const FieldMeta& field_meta) {
case DataType::VARCHAR: {
auto obj = scalar_array->mutable_string_data();
obj->mutable_data()->Reserve(count);
for (auto i = 0; i < count; i++)
for (auto i = 0; i < count; i++) {
*(obj->mutable_data()->Add()) = std::string();
}
break;
}
default: {
@ -138,8 +139,8 @@ CreateVectorDataArray(int64_t count, const FieldMeta& field_meta) {
auto data_type = field_meta.get_data_type();
auto data_array = std::make_unique<DataArray>();
data_array->set_field_id(field_meta.get_id().get());
data_array->set_type(
milvus::proto::schema::DataType(field_meta.get_data_type()));
data_array->set_type(static_cast<milvus::proto::schema::DataType>(
field_meta.get_data_type()));
auto vector_array = data_array->mutable_vectors();
auto dim = field_meta.get_dim();
@ -173,8 +174,8 @@ CreateScalarDataArrayFrom(const void* data_raw,
auto data_type = field_meta.get_data_type();
auto data_array = std::make_unique<DataArray>();
data_array->set_field_id(field_meta.get_id().get());
data_array->set_type(
milvus::proto::schema::DataType(field_meta.get_data_type()));
data_array->set_type(static_cast<milvus::proto::schema::DataType>(
field_meta.get_data_type()));
auto scalar_array = data_array->mutable_scalars();
switch (data_type) {
@ -223,8 +224,17 @@ CreateScalarDataArrayFrom(const void* data_raw,
case DataType::VARCHAR: {
auto data = reinterpret_cast<const std::string*>(data_raw);
auto obj = scalar_array->mutable_string_data();
for (auto i = 0; i < count; i++)
for (auto i = 0; i < count; i++) {
*(obj->mutable_data()->Add()) = data[i];
}
break;
}
case DataType::JSON: {
auto data = reinterpret_cast<const std::string*>(data_raw);
auto obj = scalar_array->mutable_json_data();
for (auto i = 0; i < count; i++) {
*(obj->mutable_data()->Add()) = data[i];
}
break;
}
default: {
@ -242,8 +252,8 @@ CreateVectorDataArrayFrom(const void* data_raw,
auto data_type = field_meta.get_data_type();
auto data_array = std::make_unique<DataArray>();
data_array->set_field_id(field_meta.get_id().get());
data_array->set_type(
milvus::proto::schema::DataType(field_meta.get_data_type()));
data_array->set_type(static_cast<milvus::proto::schema::DataType>(
field_meta.get_data_type()));
auto vector_array = data_array->mutable_vectors();
auto dim = field_meta.get_dim();
@ -293,8 +303,8 @@ MergeDataArray(
auto data_type = field_meta.get_data_type();
auto data_array = std::make_unique<DataArray>();
data_array->set_field_id(field_meta.get_id().get());
data_array->set_type(
milvus::proto::schema::DataType(field_meta.get_data_type()));
data_array->set_type(static_cast<milvus::proto::schema::DataType>(
field_meta.get_data_type()));
for (auto& result_pair : result_offsets) {
auto src_field_data =
@ -307,8 +317,7 @@ MergeDataArray(
auto dim = field_meta.get_dim();
vector_array->set_dim(dim);
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
auto data =
src_field_data->vectors().float_vector().data().data();
auto data = VEC_FIELD_DATA(src_field_data, float).data();
auto obj = vector_array->mutable_float_vector();
obj->mutable_data()->Add(data + src_offset * dim,
data + (src_offset + 1) * dim);
@ -317,7 +326,7 @@ MergeDataArray(
dim % 8 == 0,
"Binary vector field dimension is not a multiple of 8");
auto num_bytes = dim / 8;
auto data = src_field_data->vectors().binary_vector().data();
auto data = VEC_FIELD_DATA(src_field_data, binary);
auto obj = vector_array->mutable_binary_vector();
obj->assign(data + src_offset * num_bytes, num_bytes);
} else {
@ -329,7 +338,7 @@ MergeDataArray(
auto scalar_array = data_array->mutable_scalars();
switch (data_type) {
case DataType::BOOL: {
auto data = src_field_data->scalars().bool_data().data().data();
auto data = FIELD_DATA(src_field_data, bool).data();
auto obj = scalar_array->mutable_bool_data();
*(obj->mutable_data()->Add()) = data[src_offset];
continue;
@ -337,27 +346,25 @@ MergeDataArray(
case DataType::INT8:
case DataType::INT16:
case DataType::INT32: {
auto data = src_field_data->scalars().int_data().data().data();
auto data = FIELD_DATA(src_field_data, int).data();
auto obj = scalar_array->mutable_int_data();
*(obj->mutable_data()->Add()) = data[src_offset];
continue;
}
case DataType::INT64: {
auto data = src_field_data->scalars().long_data().data().data();
auto data = FIELD_DATA(src_field_data, long).data();
auto obj = scalar_array->mutable_long_data();
*(obj->mutable_data()->Add()) = data[src_offset];
continue;
}
case DataType::FLOAT: {
auto data =
src_field_data->scalars().float_data().data().data();
auto data = FIELD_DATA(src_field_data, float).data();
auto obj = scalar_array->mutable_float_data();
*(obj->mutable_data()->Add()) = data[src_offset];
continue;
}
case DataType::DOUBLE: {
auto data =
src_field_data->scalars().double_data().data().data();
auto data = FIELD_DATA(src_field_data, double).data();
auto obj = scalar_array->mutable_double_data();
*(obj->mutable_data()->Add()) = data[src_offset];
continue;
@ -386,8 +393,8 @@ ReverseDataFromIndex(const index::IndexBase* index,
auto data_type = field_meta.get_data_type();
auto data_array = std::make_unique<DataArray>();
data_array->set_field_id(field_meta.get_id().get());
data_array->set_type(
milvus::proto::schema::DataType(field_meta.get_data_type()));
data_array->set_type(static_cast<milvus::proto::schema::DataType>(
field_meta.get_data_type()));
auto scalar_array = data_array->mutable_scalars();
switch (data_type) {

View File

@ -13,7 +13,7 @@
#include <exception>
#include <memory>
#include <stdexcept>
#include <stdlib.h>
#include <cstdlib>
#include <string>
#include <utility>
#include <vector>

View File

@ -12,6 +12,7 @@
#include <gtest/gtest.h>
#include <boost/format.hpp>
#include "common/Types.h"
#include "segcore/SegmentSealedImpl.h"
#include "test_utils/DataGen.h"
#include "index/IndexFactory.h"
@ -384,6 +385,7 @@ TEST(Sealed, LoadFieldData) {
schema->AddDebugField("int8", DataType::INT8);
schema->AddDebugField("int16", DataType::INT16);
schema->AddDebugField("float", DataType::FLOAT);
schema->AddDebugField("json", DataType::JSON);
schema->set_primary_field_id(counter_id);
auto dataset = DataGen(schema, N);
@ -480,6 +482,7 @@ TEST(Sealed, LoadFieldDataMmap) {
schema->AddDebugField("int8", DataType::INT8);
schema->AddDebugField("int16", DataType::INT16);
schema->AddDebugField("float", DataType::FLOAT);
schema->AddDebugField("json", DataType::JSON);
schema->set_primary_field_id(counter_id);
auto dataset = DataGen(schema, N);
@ -764,7 +767,8 @@ TEST(Sealed, OverlapDelete) {
auto N = 10;
auto metric_type = knowhere::metric::L2;
auto schema = std::make_shared<Schema>();
auto fakevec_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type);
auto fakevec_id = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, dim, metric_type);
auto counter_id = schema->AddDebugField("counter", DataType::INT64);
auto double_id = schema->AddDebugField("double", DataType::DOUBLE);
auto nothing_id = schema->AddDebugField("nothing", DataType::INT32);
@ -807,7 +811,8 @@ TEST(Sealed, OverlapDelete) {
auto plan = CreatePlan(*schema, dsl);
auto num_queries = 5;
auto ph_group_raw = CreatePlaceholderGroup(num_queries, 16, 1024);
auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
auto ph_group =
ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time));
@ -822,7 +827,8 @@ TEST(Sealed, OverlapDelete) {
LoadDeletedRecordInfo info = {timestamps.data(), ids.get(), row_count};
segment->LoadDeletedRecord(info);
ASSERT_EQ(segment->get_deleted_count(), pks.size())
<< "deleted_count=" << segment->get_deleted_count() << " pks_count=" << pks.size() << std::endl;
<< "deleted_count=" << segment->get_deleted_count()
<< " pks_count=" << pks.size() << std::endl;
// Load overlapping delete records
row_count += 3;
@ -830,16 +836,19 @@ TEST(Sealed, OverlapDelete) {
auto new_ids = std::make_unique<IdArray>();
new_ids->mutable_int_id()->mutable_data()->Add(pks.begin(), pks.end());
timestamps.insert(timestamps.end(), {11, 11, 11});
LoadDeletedRecordInfo overlap_info = {timestamps.data(), new_ids.get(), row_count};
LoadDeletedRecordInfo overlap_info = {
timestamps.data(), new_ids.get(), row_count};
segment->LoadDeletedRecord(overlap_info);
BitsetType bitset(N, false);
// NOTE: need to change delete timestamp, so not to hit the cache
ASSERT_EQ(segment->get_deleted_count(), pks.size())
<< "deleted_count=" << segment->get_deleted_count() << " pks_count=" << pks.size() << std::endl;
<< "deleted_count=" << segment->get_deleted_count()
<< " pks_count=" << pks.size() << std::endl;
segment->mask_with_delete(bitset, 10, 12);
ASSERT_EQ(bitset.count(), pks.size())
<< "bitset_count=" << bitset.count() << " pks_count=" << pks.size() << std::endl;
<< "bitset_count=" << bitset.count() << " pks_count=" << pks.size()
<< std::endl;
}
auto

View File

@ -294,8 +294,8 @@ TEST(StringExpr, Term) {
for (int iter = 0; iter < num_iters; ++iter) {
auto raw_data = DataGen(schema, N, iter);
auto new_str_col = raw_data.get_col(str_meta.get_id());
auto begin = new_str_col->scalars().string_data().data().begin();
auto end = new_str_col->scalars().string_data().data().end();
auto begin = FIELD_DATA(new_str_col, string).begin();
auto end = FIELD_DATA(new_str_col, string).end();
str_col.insert(str_col.end(), begin, end);
seg->PreInsert(N);
seg->Insert(iter * N,
@ -396,8 +396,8 @@ TEST(StringExpr, Compare) {
auto reserve_col = [&, raw_data](const FieldMeta& field_meta,
std::vector<std::string>& str_col) {
auto new_str_col = raw_data.get_col(field_meta.get_id());
auto begin = new_str_col->scalars().string_data().data().begin();
auto end = new_str_col->scalars().string_data().data().end();
auto begin = FIELD_DATA(new_str_col, string).begin();
auto end = FIELD_DATA(new_str_col, string).end();
str_col.insert(str_col.end(), begin, end);
};
@ -495,8 +495,8 @@ TEST(StringExpr, UnaryRange) {
for (int iter = 0; iter < num_iters; ++iter) {
auto raw_data = DataGen(schema, N, iter);
auto new_str_col = raw_data.get_col(str_meta.get_id());
auto begin = new_str_col->scalars().string_data().data().begin();
auto end = new_str_col->scalars().string_data().data().end();
auto begin = FIELD_DATA(new_str_col, string).begin();
auto end = FIELD_DATA(new_str_col, string).end();
str_col.insert(str_col.end(), begin, end);
seg->PreInsert(N);
seg->Insert(iter * N,
@ -599,8 +599,8 @@ TEST(StringExpr, BinaryRange) {
for (int iter = 0; iter < num_iters; ++iter) {
auto raw_data = DataGen(schema, N, iter);
auto new_str_col = raw_data.get_col(str_meta.get_id());
auto begin = new_str_col->scalars().string_data().data().begin();
auto end = new_str_col->scalars().string_data().data().end();
auto begin = FIELD_DATA(new_str_col, string).begin();
auto end = FIELD_DATA(new_str_col, string).end();
str_col.insert(str_col.end(), begin, end);
seg->PreInsert(N);
seg->Insert(iter * N,

View File

@ -303,6 +303,17 @@ DataGen(SchemaPtr schema,
insert_cols(data, N, field_meta);
break;
}
case DataType::JSON: {
vector<std::string> data(N);
for (int i = 0; i < N / repeat_count; i++) {
auto str = R"({"key":)" + std::to_string(er()) + "}";
for (int j = 0; j < repeat_count; j++) {
data[i * repeat_count + j] = str;
}
}
insert_cols(data, N, field_meta);
break;
}
default: {
throw std::runtime_error("unimplemented");
}
@ -517,8 +528,7 @@ GenVecIndexing(int64_t N, int64_t dim, const float* vec) {
{knowhere::meta::DEVICE_ID, 0}};
auto database = knowhere::GenDataSet(N, dim, vec);
auto indexing = std::make_unique<index::VectorMemNMIndex>(
knowhere::IndexEnum::INDEX_FAISS_IVFFLAT,
knowhere::metric::L2);
knowhere::IndexEnum::INDEX_FAISS_IVFFLAT, knowhere::metric::L2);
indexing->BuildWithDataset(database, conf);
return indexing;
}

View File

@ -103,7 +103,8 @@ func (suite *RetrieveSuite) SetupTest() {
suite.Require().NoError(err)
insertRecord, err = storage.TransferInsertMsgToInsertRecord(suite.collection.Schema(), insertMsg)
suite.Require().NoError(err)
suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord)
err = suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord)
suite.Require().NoError(err)
suite.manager.Segment.Put(SegmentTypeSealed, suite.sealed)
suite.manager.Segment.Put(SegmentTypeGrowing, suite.growing)

View File

@ -83,7 +83,8 @@ func (suite *SegmentSuite) SetupTest() {
suite.Require().NoError(err)
insertRecord, err = storage.TransferInsertMsgToInsertRecord(suite.collection.Schema(), insertMsg)
suite.Require().NoError(err)
suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord)
err = suite.growing.Insert(insertMsg.RowIDs, insertMsg.Timestamps, insertRecord)
suite.Require().NoError(err)
suite.manager.Segment.Put(SegmentTypeSealed, suite.sealed)
suite.manager.Segment.Put(SegmentTypeGrowing, suite.growing)

View File

@ -30,7 +30,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
@ -330,9 +329,6 @@ func (w *PayloadWriter) AddOneJSONToPayload(msg []byte) error {
length := len(bytes)
cmsg := (*C.uint8_t)(unsafe.Pointer(&bytes[0]))
clength := C.int(length)
// defer C.free(unsafe.Pointer(cmsg))
log.Debug("yah01", zap.String("jsonBytes", string(bytes)))
status := C.AddOneJSONToPayload(w.payloadWriterPtr, cmsg, clength)
return HandleCStatus(&status, "AddOneJSONToPayload failed")