Introduce simdjson (#23644)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/23720/head
yah01 2023-04-26 10:30:34 +08:00 committed by GitHub
parent 796f9355ee
commit 60fdd7e4f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 213 additions and 84 deletions

View File

@ -30,6 +30,7 @@ target_link_libraries(milvus_common
milvus_log
yaml-cpp
boost_bitset_ext
simdjson
${CONAN_LIBS}
)

View File

@ -75,22 +75,22 @@ class ColumnBase {
uint64_t size_{0};
};
class FixedColumn : public ColumnBase {
class Column : public ColumnBase {
public:
FixedColumn(int64_t segment_id,
const FieldMeta& field_meta,
const LoadFieldDataInfo& info) {
Column(int64_t segment_id,
const FieldMeta& field_meta,
const LoadFieldDataInfo& info) {
data_ = static_cast<char*>(CreateMap(segment_id, field_meta, info));
size_ = field_meta.get_sizeof() * info.row_count;
row_count_ = info.row_count;
}
FixedColumn(FixedColumn&& column) noexcept
Column(Column&& column) noexcept
: ColumnBase(std::move(column)), row_count_(column.row_count_) {
column.row_count_ = 0;
}
~FixedColumn() override = default;
~Column() override = default;
SpanBase
span() const override {
@ -107,11 +107,9 @@ class VariableColumn : public ColumnBase {
using ViewType =
std::conditional_t<std::is_same_v<T, std::string>, std::string_view, T>;
template <typename Ctor>
VariableColumn(int64_t segment_id,
const FieldMeta& field_meta,
const LoadFieldDataInfo& info,
Ctor&& ctor) {
const LoadFieldDataInfo& info) {
auto begin = FIELD_DATA(info.field_data, string).begin();
auto end = FIELD_DATA(info.field_data, string).end();
if constexpr (std::is_same_v<T, Json>) {
@ -127,7 +125,7 @@ class VariableColumn : public ColumnBase {
}
data_ = static_cast<char*>(CreateMap(segment_id, field_meta, info));
construct_views(std::forward<Ctor>(ctor));
construct_views();
}
VariableColumn(VariableColumn&& field) noexcept
@ -162,16 +160,14 @@ class VariableColumn : public ColumnBase {
}
protected:
template <typename Ctor>
void
construct_views(Ctor ctor) {
construct_views() {
views_.reserve(indices_.size());
for (size_t i = 0; i < indices_.size() - 1; i++) {
views_.emplace_back(
ctor(data_ + indices_[i], indices_[i + 1] - indices_[i]));
views_.emplace_back(data_ + indices_[i],
indices_[i + 1] - indices_[i]);
}
views_.emplace_back(
ctor(data_ + indices_.back(), size_ - indices_.back()));
views_.emplace_back(data_ + indices_.back(), size_ - indices_.back());
}
private:

View File

@ -0,0 +1,107 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <cstddef>
#include <optional>
#include <string_view>
#include "exceptions/EasyAssert.h"
#include "simdjson.h"
#include "fmt/core.h"
namespace milvus {
class Json {
public:
Json() = default;
explicit Json(simdjson::padded_string data) : own_data_(std::move(data)) {
data_ = own_data_.value();
}
explicit Json(simdjson::padded_string_view data) : data_(data) {
}
Json(const char* data, size_t len, size_t cap) : data_(data, len) {
AssertInfo(len + simdjson::SIMDJSON_PADDING <= cap,
fmt::format("create json without enough memory size for "
"SIMD, len={}, cap={}",
len,
cap));
}
// WARN: this is used for fast non-copy construction,
// MUST make sure that the data points to a memory that
// with size at least len + SIMDJSON_PADDING
Json(const char* data, size_t len) : data_(data, len) {
}
Json(Json&& json) = default;
Json&
operator=(const Json& json) {
if (json.own_data_.has_value()) {
own_data_ = simdjson::padded_string(
json.own_data_.value().data(), json.own_data_.value().length());
}
data_ = json.data_;
return *this;
}
operator std::string_view() const {
return data_;
}
void
parse(simdjson::padded_string_view data) {
data_ = data;
}
auto
doc() const {
thread_local simdjson::ondemand::parser parser;
// it's always safe to add the padding,
// as we have allocated the memory with this padding
auto doc =
parser.iterate(data_, data_.size() + simdjson::SIMDJSON_PADDING);
return doc.get_object();
}
auto
operator[](const std::string_view field) const {
return doc()[field];
}
auto
at_pointer(const std::string_view pointer) const {
return doc().at_pointer(pointer);
}
std::string_view
data() const {
return data_;
}
private:
std::optional<simdjson::padded_string>
own_data_; // this could be empty, then the Json will be just s view on bytes
simdjson::padded_string_view data_;
};
} // namespace milvus

View File

@ -18,15 +18,16 @@
#include <tbb/concurrent_unordered_map.h>
#include <tbb/concurrent_unordered_set.h>
#include <nlohmann/json.hpp>
#include <NamedType/named_type.hpp>
#include <boost/align/aligned_allocator.hpp>
#include <boost/container/vector.hpp>
#include <boost/dynamic_bitset.hpp>
#include <limits>
#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <variant>
@ -35,10 +36,11 @@
#include "knowhere/binaryset.h"
#include "knowhere/comp/index_param.h"
#include "knowhere/dataset.h"
#include "nlohmann/json.hpp"
#include "simdjson.h"
#include "pb/plan.pb.h"
#include "pb/schema.pb.h"
#include "pb/segcore.pb.h"
#include "Json.h"
namespace milvus {
@ -79,7 +81,6 @@ using VectorArray = proto::schema::VectorField;
using IdArray = proto::schema::IDs;
using InsertData = proto::segcore::InsertRecord;
using PkType = std::variant<std::monostate, int64_t, std::string>;
using Json = nlohmann::json;
inline bool
IsPrimaryKeyDataType(DataType data_type) {
@ -153,5 +154,4 @@ struct LargeType {
int64_t x, y, z;
};
static_assert(std::is_same_v<LargeType&, Parameter<LargeType>>);
} // namespace milvus

View File

@ -32,6 +32,7 @@
#include "exceptions/EasyAssert.h"
#include "knowhere/dataset.h"
#include "knowhere/expected.h"
#include "simdjson.h"
namespace milvus {
#define FIELD_DATA(data_array, type) \
@ -373,6 +374,11 @@ CreateMap(int64_t segment_id,
// macOS doesn't support MAP_POPULATE
mmap_flags |= MAP_POPULATE;
#endif
// simdjson requires a padding following the json data
size_t padding = field_meta.get_data_type() == DataType::JSON
? simdjson::SIMDJSON_PADDING
: 0;
// Allocate memory
if (info.mmap_dir_path == nullptr) {
auto data_type = field_meta.get_data_type();
@ -383,7 +389,7 @@ CreateMap(int64_t segment_id,
// Use anon mapping so we are able to free these memory with munmap only
void* map = mmap(nullptr,
data_size,
data_size + padding,
PROT_READ | PROT_WRITE,
mmap_flags | MAP_ANON,
-1,
@ -428,7 +434,7 @@ CreateMap(int64_t segment_id,
return nullptr;
}
auto map = mmap(nullptr, written, PROT_READ, mmap_flags, fd, 0);
auto map = mmap(nullptr, written + padding, PROT_READ, mmap_flags, fd, 0);
AssertInfo(map != MAP_FAILED,
fmt::format("failed to create map for data file {}, err: {}",
filepath.c_str(),

View File

@ -197,7 +197,7 @@ ExecExprVisitor::ExecRangeVisitorImpl(FieldId field_id,
auto x = data[index];
result[index] = element_func(x);
}
AssertInfo(result.size() == this_size, "");
results.emplace_back(std::move(result));
}
auto final_result = Assemble(results);

View File

@ -13,6 +13,7 @@
#include "common/Types.h"
#include "common/Utils.h"
#include "nlohmann/json.hpp"
#include "simdjson.h"
namespace milvus::segcore {
@ -74,10 +75,10 @@ VectorBase::set_data_raw(ssize_t element_offset,
return set_data_raw(element_offset, data_raw.data(), element_count);
}
case DataType::JSON: {
auto json_data = FIELD_DATA(data, json);
auto& json_data = FIELD_DATA(data, json);
std::vector<Json> data_raw(json_data.size());
for (auto& json_bytes : json_data) {
data_raw.emplace_back(Json::parse(json_bytes));
data_raw.emplace_back(simdjson::padded_string(json_bytes));
}
return set_data_raw(element_offset, data_raw.data(), element_count);
}
@ -155,7 +156,7 @@ VectorBase::fill_chunk_data(ssize_t element_count,
size_t index = 0;
for (auto& str : FIELD_DATA(data, json)) {
chunk[index++] = str;
chunk[index++] = Json(simdjson::padded_string(str));
}
return;
}

View File

@ -364,11 +364,7 @@ SegmentGrowingImpl::bulk_subscript_impl(const VectorBase& vec_raw,
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
if (offset != INVALID_SEG_OFFSET) {
if constexpr (std::is_same_v<S, Json>) {
output[i] = vec[offset].dump();
} else {
output[i] = vec[offset];
}
output[i] = vec[offset];
}
}
}

View File

@ -20,12 +20,12 @@
#include <string_view>
#include "Utils.h"
#include "Types.h"
#include "common/Column.h"
#include "common/Consts.h"
#include "common/FieldMeta.h"
#include "common/Types.h"
#include "log/Log.h"
#include "nlohmann/json.hpp"
#include "query/ScalarIndex.h"
#include "query/SearchBruteForce.h"
#include "query/SearchOnSealed.h"
@ -240,25 +240,12 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
case milvus::DataType::STRING:
case milvus::DataType::VARCHAR: {
column = std::make_unique<VariableColumn<std::string>>(
get_segment_id(),
field_meta,
info,
[](const char* data, size_t len) {
return std::string_view(data, len);
});
get_segment_id(), field_meta, info);
break;
}
case milvus::DataType::JSON: {
column = std::make_unique<VariableColumn<Json>>(
get_segment_id(),
field_meta,
info,
[](const char* data, size_t len) {
if (len > 0) {
return Json::parse(data, data + len);
}
return Json{};
});
get_segment_id(), field_meta, info);
}
default: {
}
@ -267,7 +254,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
std::unique_lock lck(mutex_);
variable_fields_.emplace(field_id, std::move(column));
} else {
auto column = FixedColumn(get_segment_id(), field_meta, info);
auto column = Column(get_segment_id(), field_meta, info);
size = column.size();
std::unique_lock lck(mutex_);
fixed_fields_.emplace(field_id, std::move(column));
@ -728,7 +715,7 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
default:
PanicInfo(
fmt::format("unsupported data type: {}",
fmt::format("718 unsupported data type: {}",
datatype_name(field_meta.get_data_type())));
}
}

View File

@ -235,7 +235,7 @@ class SegmentSealedImpl : public SegmentSealed {
SchemaPtr schema_;
int64_t id_;
std::unordered_map<FieldId, FixedColumn> fixed_fields_;
std::unordered_map<FieldId, Column> fixed_fields_;
std::unordered_map<FieldId, std::unique_ptr<ColumnBase>> variable_fields_;
};

View File

@ -10,7 +10,9 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "segcore/Utils.h"
#include <string>
#include "common/Utils.h"
#include "index/ScalarIndex.h"
namespace milvus::segcore {
@ -126,6 +128,13 @@ CreateScalarDataArray(int64_t count, const FieldMeta& field_meta) {
}
break;
}
case DataType::JSON: {
auto obj = scalar_array->mutable_json_data();
obj->mutable_data()->Reserve(count);
for (int i = 0; i < count; i++) {
*(obj->mutable_data()->Add()) = std::string();
}
}
default: {
PanicInfo("unsupported datatype");
}
@ -341,7 +350,7 @@ MergeDataArray(
auto data = FIELD_DATA(src_field_data, bool).data();
auto obj = scalar_array->mutable_bool_data();
*(obj->mutable_data()->Add()) = data[src_offset];
continue;
break;
}
case DataType::INT8:
case DataType::INT16:
@ -349,34 +358,40 @@ MergeDataArray(
auto data = FIELD_DATA(src_field_data, int).data();
auto obj = scalar_array->mutable_int_data();
*(obj->mutable_data()->Add()) = data[src_offset];
continue;
break;
}
case DataType::INT64: {
auto data = FIELD_DATA(src_field_data, long).data();
auto obj = scalar_array->mutable_long_data();
*(obj->mutable_data()->Add()) = data[src_offset];
continue;
break;
}
case DataType::FLOAT: {
auto data = FIELD_DATA(src_field_data, float).data();
auto obj = scalar_array->mutable_float_data();
*(obj->mutable_data()->Add()) = data[src_offset];
continue;
break;
}
case DataType::DOUBLE: {
auto data = FIELD_DATA(src_field_data, double).data();
auto obj = scalar_array->mutable_double_data();
*(obj->mutable_data()->Add()) = data[src_offset];
continue;
break;
}
case DataType::VARCHAR: {
auto& data = src_field_data->scalars().string_data();
auto& data = FIELD_DATA(src_field_data, string);
auto obj = scalar_array->mutable_string_data();
*(obj->mutable_data()->Add()) = data.data(src_offset);
continue;
*(obj->mutable_data()->Add()) = data[src_offset];
break;
}
case DataType::JSON: {
auto& data = FIELD_DATA(src_field_data, json);
auto obj = scalar_array->mutable_json_data();
*(obj->mutable_data()->Add()) = data[src_offset];
break;
}
default: {
PanicInfo("unsupported datatype");
PanicInfo(fmt::format("unsupported data type {}", data_type));
}
}
}

View File

@ -42,7 +42,9 @@ add_subdirectory(knowhere)
add_subdirectory(boost_ext)
add_subdirectory(rocksdb)
add_subdirectory(simdjson)
if (LINUX)
add_subdirectory(jemalloc)
endif()

View File

@ -0,0 +1,19 @@
#-------------------------------------------------------------------------------
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
#-------------------------------------------------------------------------------
FetchContent_Declare(
simdjson
GIT_REPOSITORY https://github.com/simdjson/simdjson.git
GIT_TAG v3.1.7
)
FetchContent_MakeAvailable(simdjson)

View File

@ -433,10 +433,9 @@ TEST(Expr, TestTerm) {
}
TEST(Expr, TestSimpleDsl) {
using namespace milvus::query;
using namespace milvus::segcore;
auto vec_dsl = Json::parse(R"({
auto vec_dsl = query::Json::parse(R"({
"vector": {
"fakevec": {
"metric_type": "L2",
@ -459,47 +458,47 @@ TEST(Expr, TestSimpleDsl) {
terms.push_back(i);
}
}
Json s;
query::Json s;
s["term"]["age"]["values"] = terms;
return s;
};
// std::cout << get_item(0).dump(-2);
// std::cout << vec_dsl.dump(-2);
std::vector<std::tuple<Json, std::function<bool(int)>>> testcases;
std::vector<std::tuple<query::Json, std::function<bool(int)>>> testcases;
{
Json dsl;
dsl["must"] = Json::array(
query::Json dsl;
dsl["must"] = query::Json::array(
{vec_dsl, get_item(0), get_item(1), get_item(2, 0), get_item(3)});
testcases.emplace_back(
dsl, [](int64_t x) { return (x & 0b1111) == 0b1011; });
}
{
Json dsl;
Json sub_dsl;
sub_dsl["must"] = Json::array(
query::Json dsl;
query::Json sub_dsl;
sub_dsl["must"] = query::Json::array(
{get_item(0), get_item(1), get_item(2, 0), get_item(3)});
dsl["must"] = Json::array({sub_dsl, vec_dsl});
dsl["must"] = query::Json::array({sub_dsl, vec_dsl});
testcases.emplace_back(
dsl, [](int64_t x) { return (x & 0b1111) == 0b1011; });
}
{
Json dsl;
Json sub_dsl;
sub_dsl["should"] = Json::array(
query::Json dsl;
query::Json sub_dsl;
sub_dsl["should"] = query::Json::array(
{get_item(0), get_item(1), get_item(2, 0), get_item(3)});
dsl["must"] = Json::array({sub_dsl, vec_dsl});
dsl["must"] = query::Json::array({sub_dsl, vec_dsl});
testcases.emplace_back(
dsl, [](int64_t x) { return !!((x & 0b1111) ^ 0b0100); });
}
{
Json dsl;
Json sub_dsl;
sub_dsl["must_not"] = Json::array(
query::Json dsl;
query::Json sub_dsl;
sub_dsl["must_not"] = query::Json::array(
{get_item(0), get_item(1), get_item(2, 0), get_item(3)});
dsl["must"] = Json::array({sub_dsl, vec_dsl});
dsl["must"] = query::Json::array({sub_dsl, vec_dsl});
testcases.emplace_back(
dsl, [](int64_t x) { return (x & 0b1111) != 0b1011; });
}
@ -526,13 +525,13 @@ TEST(Expr, TestSimpleDsl) {
}
auto seg_promote = dynamic_cast<SegmentGrowingImpl*>(seg.get());
ExecExprVisitor visitor(
query::ExecExprVisitor visitor(
*seg_promote, seg_promote->get_row_count(), MAX_TIMESTAMP);
for (auto [clause, ref_func] : testcases) {
Json dsl;
query::Json dsl;
dsl["bool"] = clause;
// std::cout << dsl.dump(2);
auto plan = CreatePlan(*schema, dsl.dump());
auto plan = query::CreatePlan(*schema, dsl.dump());
auto final = visitor.call_child(*plan->plan_node_->predicate_.value());
EXPECT_EQ(final.size(), N * num_iters);

View File

@ -194,7 +194,7 @@ TEST(Query, ExecWithPredicateLoader) {
auto sr = segment->Search(plan.get(), ph_group.get(), time);
int topk = 5;
Json json = SearchResultToJson(*sr);
query::Json json = SearchResultToJson(*sr);
#ifdef __linux__
auto ref = json::parse(R"(
[
@ -278,7 +278,7 @@ TEST(Query, ExecWithPredicateSmallN) {
auto sr = segment->Search(plan.get(), ph_group.get(), time);
int topk = 5;
Json json = SearchResultToJson(*sr);
query::Json json = SearchResultToJson(*sr);
std::cout << json.dump(2);
}
@ -338,7 +338,7 @@ TEST(Query, ExecWithPredicate) {
auto sr = segment->Search(plan.get(), ph_group.get(), time);
int topk = 5;
Json json = SearchResultToJson(*sr);
query::Json json = SearchResultToJson(*sr);
#ifdef __linux__
auto ref = json::parse(R"(
[
@ -874,7 +874,7 @@ TEST(Query, ExecWithPredicateBinary) {
auto sr = segment->Search(plan.get(), ph_group.get(), time);
int topk = 5;
Json json = SearchResultToJson(*sr);
query::Json json = SearchResultToJson(*sr);
std::cout << json.dump(2);
// ASSERT_EQ(json.dump(2), ref.dump(2));
}