Support using mmap to load data (#22052)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/22503/head
yah01 2023-03-01 18:07:49 +08:00 committed by GitHub
parent fa5c33975e
commit 7478e44911
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 709 additions and 145 deletions

View File

@ -26,7 +26,7 @@
namespace milvus {
inline int
inline size_t
datatype_sizeof(DataType data_type, int dim = 1) {
switch (data_type) {
case DataType::BOOL:
@ -103,6 +103,17 @@ datatype_is_string(DataType datatype) {
}
}
inline bool
datatype_is_variable(DataType datatype) {
switch (datatype) {
case DataType::VARCHAR:
case DataType::STRING:
return true;
default:
return false;
}
}
inline bool
datatype_is_integer(DataType datatype) {
switch (datatype) {
@ -200,7 +211,7 @@ class FieldMeta {
return type_;
}
int64_t
size_t
get_sizeof() const {
if (is_vector()) {
return datatype_sizeof(type_, get_dim());

View File

@ -18,8 +18,8 @@
#include <map>
#include <string>
#include "Types.h"
#include "Types.h"
#include "common/CDataType.h"
// NOTE: field_id can be system field
@ -29,11 +29,8 @@ struct LoadFieldDataInfo {
int64_t field_id;
// const void* blob = nullptr;
const milvus::DataArray* field_data;
int64_t row_count = -1;
// ~LoadFieldDataInfo() {
// delete field_data;
// }
int64_t row_count{-1};
const char* mmap_dir_path{nullptr};
};
struct LoadDeletedRecordInfo {

View File

@ -17,8 +17,8 @@
#pragma once
#include <cassert>
#include <type_traits>
#include <string>
#include <type_traits>
#include "Types.h"
#include "VectorTrait.h"

View File

@ -11,11 +11,18 @@
#pragma once
#include <fcntl.h>
#include <fmt/core.h>
#include <google/protobuf/text_format.h>
#include <sys/mman.h>
#include <filesystem>
#include <string>
#include <string_view>
#include "common/Consts.h"
#include "common/FieldMeta.h"
#include "common/LoadInfo.h"
#include "config/ConfigChunkManager.h"
#include "exceptions/EasyAssert.h"
#include "knowhere/dataset.h"
@ -58,8 +65,8 @@ GetDatasetLims(const DatasetPtr& dataset) {
}
inline bool
PrefixMatch(const std::string& str, const std::string& prefix) {
auto ret = strncmp(str.c_str(), prefix.c_str(), prefix.length());
PrefixMatch(const std::string_view str, const std::string_view prefix) {
auto ret = strncmp(str.data(), prefix.data(), prefix.length());
if (ret != 0) {
return false;
}
@ -79,13 +86,13 @@ GenResultDataset(const int64_t nq, const int64_t topk, const int64_t* ids, const
}
inline bool
PostfixMatch(const std::string& str, const std::string& postfix) {
PostfixMatch(const std::string_view str, const std::string& postfix) {
if (postfix.length() > str.length()) {
return false;
}
int offset = str.length() - postfix.length();
auto ret = strncmp(str.c_str() + offset, postfix.c_str(), postfix.length());
auto ret = strncmp(str.data() + offset, postfix.c_str(), postfix.length());
if (ret != 0) {
return false;
}
@ -166,4 +173,210 @@ MatchKnowhereError(knowhere::Status status) {
}
}
inline size_t
GetDataSize(const FieldMeta& field, size_t row_count, const DataArray* data) {
auto data_type = field.get_data_type();
if (datatype_is_variable(data_type)) {
switch (data_type) {
case DataType::VARCHAR:
case DataType::STRING: {
auto begin = data->scalars().string_data().data().begin();
auto end = data->scalars().string_data().data().end();
ssize_t size{0};
while (begin != end) {
size += begin->size();
begin++;
}
return size;
}
default:
PanicInfo(fmt::format("not supported data type {}", datatype_name(data_type)));
}
}
return field.get_sizeof() * row_count;
}
inline void*
FillField(DataType data_type, size_t size, const LoadFieldDataInfo& info, void* dst) {
auto data = info.field_data;
switch (data_type) {
case DataType::BOOL: {
return memcpy(dst, data->scalars().bool_data().data().data(), size);
}
case DataType::INT8: {
auto src_data = data->scalars().int_data().data();
std::vector<int8_t> data_raw(src_data.size());
std::copy_n(src_data.data(), src_data.size(), data_raw.data());
return memcpy(dst, data_raw.data(), size);
}
case DataType::INT16: {
auto src_data = data->scalars().int_data().data();
std::vector<int16_t> data_raw(src_data.size());
std::copy_n(src_data.data(), src_data.size(), data_raw.data());
return memcpy(dst, data_raw.data(), size);
}
case DataType::INT32: {
return memcpy(dst, data->scalars().int_data().data().data(), size);
}
case DataType::INT64: {
return memcpy(dst, data->scalars().long_data().data().data(), size);
}
case DataType::FLOAT: {
return memcpy(dst, data->scalars().float_data().data().data(), size);
}
case DataType::DOUBLE: {
return memcpy(dst, data->scalars().double_data().data().data(), size);
}
case DataType::VARCHAR: {
char* dest = reinterpret_cast<char*>(dst);
auto begin = data->scalars().string_data().data().begin();
auto end = data->scalars().string_data().data().end();
while (begin != end) {
memcpy(dest, begin->data(), begin->size());
dest += begin->size();
begin++;
}
return dst;
}
case DataType::VECTOR_FLOAT:
return memcpy(dst, data->vectors().float_vector().data().data(), size);
case DataType::VECTOR_BINARY:
return memcpy(dst, data->vectors().binary_vector().data(), size);
default: {
PanicInfo("unsupported");
}
}
}
inline ssize_t
WriteFieldData(int fd, DataType data_type, const DataArray* data, size_t size) {
switch (data_type) {
case DataType::BOOL: {
return write(fd, data->scalars().bool_data().data().data(), size);
}
case DataType::INT8: {
auto src_data = data->scalars().int_data().data();
std::vector<int8_t> data_raw(src_data.size());
std::copy_n(src_data.data(), src_data.size(), data_raw.data());
return write(fd, data_raw.data(), size);
}
case DataType::INT16: {
auto src_data = data->scalars().int_data().data();
std::vector<int16_t> data_raw(src_data.size());
std::copy_n(src_data.data(), src_data.size(), data_raw.data());
return write(fd, data_raw.data(), size);
}
case DataType::INT32: {
return write(fd, data->scalars().int_data().data().data(), size);
}
case DataType::INT64: {
return write(fd, data->scalars().long_data().data().data(), size);
}
case DataType::FLOAT: {
return write(fd, data->scalars().float_data().data().data(), size);
}
case DataType::DOUBLE: {
return write(fd, data->scalars().double_data().data().data(), size);
}
case DataType::VARCHAR: {
auto begin = data->scalars().string_data().data().begin();
auto end = data->scalars().string_data().data().end();
ssize_t total_written{0};
while (begin != end) {
ssize_t written = write(fd, begin->data(), begin->size());
if (written < begin->size()) {
break;
}
total_written += written;
begin++;
}
return total_written;
}
case DataType::VECTOR_FLOAT:
return write(fd, data->vectors().float_vector().data().data(), size);
case DataType::VECTOR_BINARY:
return write(fd, data->vectors().binary_vector().data(), size);
default: {
PanicInfo("unsupported");
}
}
}
// CreateMap creates a memory mapping,
// if mmap enabled, this writes field data to disk and create a map to the file,
// otherwise this just alloc memory
inline void*
CreateMap(int64_t segment_id, const FieldMeta& field_meta, const LoadFieldDataInfo& info) {
static int mmap_flags = MAP_PRIVATE;
#ifdef MAP_POPULATE
// macOS doesn't support MAP_POPULATE
mmap_flags |= MAP_POPULATE;
#endif
// Allocate memory
if (info.mmap_dir_path == nullptr) {
auto data_type = field_meta.get_data_type();
auto data_size = GetDataSize(field_meta, info.row_count, info.field_data);
if (data_size == 0)
return nullptr;
// Use anon mapping so we are able to free these memory with munmap only
void* map = mmap(NULL, data_size, PROT_READ | PROT_WRITE, mmap_flags | MAP_ANON, -1, 0);
AssertInfo(map != MAP_FAILED, fmt::format("failed to create anon map, err: {}", strerror(errno)));
FillField(data_type, data_size, info, map);
return map;
}
auto filepath =
std::filesystem::path(info.mmap_dir_path) / std::to_string(segment_id) / std::to_string(info.field_id);
auto dir = filepath.parent_path();
std::filesystem::create_directories(dir);
int fd = open(filepath.c_str(), O_CREAT | O_TRUNC | O_RDWR, S_IRUSR | S_IWUSR);
AssertInfo(fd != -1, fmt::format("failed to create mmap file {}", filepath.c_str()));
auto data_type = field_meta.get_data_type();
size_t size = field_meta.get_sizeof() * info.row_count;
auto written = WriteFieldData(fd, data_type, info.field_data, size);
AssertInfo(written == size || written != -1 && datatype_is_variable(field_meta.get_data_type()),
fmt::format("failed to write data file {}, written {} but total {}, err: {}", filepath.c_str(), written,
size, strerror(errno)));
int ok = fsync(fd);
AssertInfo(ok == 0, fmt::format("failed to fsync mmap data file {}, err: {}", filepath.c_str(), strerror(errno)));
// Empty field
if (written == 0) {
return nullptr;
}
auto map = mmap(NULL, written, PROT_READ, mmap_flags, fd, 0);
AssertInfo(map != MAP_FAILED,
fmt::format("failed to create map for data file {}, err: {}", filepath.c_str(), strerror(errno)));
#ifndef MAP_POPULATE
// Manually access the mapping to populate it
const size_t PAGE_SIZE = 4 << 10; // 4KiB
char* begin = (char*)map;
char* end = begin + written;
for (char* page = begin; page < end; page += PAGE_SIZE) {
char value = page[0];
}
#endif
// unlink this data file so
// then it will be auto removed after we don't need it again
ok = unlink(filepath.c_str());
AssertInfo(ok == 0, fmt::format("failed to unlink mmap data file {}, err: {}", filepath.c_str(), strerror(errno)));
ok = close(fd);
AssertInfo(ok == 0, fmt::format("failed to close data file {}, err: {}", filepath.c_str(), strerror(errno)));
return map;
}
} // namespace milvus

View File

@ -49,7 +49,8 @@ template <typename T>
constexpr bool IsVector = std::is_base_of_v<VectorTrait, T>;
template <typename T>
constexpr bool IsScalar = std::is_fundamental_v<T> || std::is_same_v<T, std::string>;
constexpr bool IsScalar =
std::is_fundamental_v<T> || std::is_same_v<T, std::string> || std::is_same_v<T, std::string_view>;
template <typename T, typename Enabled = void>
struct EmbeddedTypeImpl;

View File

@ -74,6 +74,9 @@ typedef struct CLoadFieldDataInfo {
const uint8_t* blob;
uint64_t blob_size;
int64_t row_count;
// Set null to disable mmap,
// mmap file path will be {mmap_dir_path}/{segment_id}/{field_id}
const char* mmap_dir_path;
} CLoadFieldDataInfo;
typedef struct CLoadDeletedRecordInfo {

View File

@ -16,13 +16,14 @@
#pragma once
#include <boost/dynamic_bitset.hpp>
#include <map>
#include <memory>
#include <string>
#include <boost/dynamic_bitset.hpp>
#include "index/Index.h"
#include "common/Types.h"
#include "exceptions/EasyAssert.h"
#include "index/Index.h"
namespace milvus::index {

View File

@ -16,13 +16,15 @@
#pragma once
#include "index/ScalarIndex.h"
#include <string>
#include <memory>
#include <vector>
#include "index/Meta.h"
#include <pb/schema.pb.h>
#include <memory>
#include <string>
#include <vector>
#include "index/Meta.h"
#include "index/ScalarIndex.h"
namespace milvus::index {
class StringIndex : public ScalarIndex<std::string> {

View File

@ -37,7 +37,7 @@ class StringIndexSort : public ScalarIndexSort<std::string> {
}
const TargetBitmapPtr
PrefixMatch(std::string prefix) {
PrefixMatch(std::string_view prefix) {
auto data = GetData();
TargetBitmapPtr bitset = std::make_unique<TargetBitmap>(data.size());
for (size_t i = 0; i < data.size(); i++) {

View File

@ -9,15 +9,16 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "PlanProto.h"
#include <google/protobuf/text_format.h>
#include <string>
#include "ExprImpl.h"
#include "PlanProto.h"
#include "common/VectorTrait.h"
#include "generated/ExtractInfoExprVisitor.h"
#include "generated/ExtractInfoPlanNodeVisitor.h"
#include "common/VectorTrait.h"
namespace milvus::query {
namespace planpb = milvus::proto::plan;

View File

@ -71,7 +71,7 @@ SearchOnSealedIndex(const Schema& schema,
void
SearchOnSealed(const Schema& schema,
const segcore::InsertRecord<true>& record,
const void* vec_data,
const SearchInfo& search_info,
const void* query_data,
int64_t num_queries,
@ -83,11 +83,9 @@ SearchOnSealed(const Schema& schema,
query::dataset::SearchDataset dataset{search_info.metric_type_, num_queries, search_info.topk_,
search_info.round_decimal_, field.get_dim(), query_data};
auto vec_data = record.get_field_data_base(field_id);
AssertInfo(vec_data->num_chunk() == 1, "num chunk not equal to 1 for sealed segment");
auto chunk_data = vec_data->get_chunk_data(0);
CheckBruteForceSearchParam(field, search_info);
auto sub_qr = BruteForceSearch(dataset, chunk_data, row_count, search_info.search_params_, bitset);
auto sub_qr = BruteForceSearch(dataset, vec_data, row_count, search_info.search_params_, bitset);
result.distances_ = std::move(sub_qr.mutable_distances());
result.seg_offsets_ = std::move(sub_qr.mutable_seg_offsets());

View File

@ -29,7 +29,7 @@ SearchOnSealedIndex(const Schema& schema,
void
SearchOnSealed(const Schema& schema,
const segcore::InsertRecord<true>& record,
const void* vec_data,
const SearchInfo& search_info,
const void* query_data,
int64_t num_queries,

View File

@ -35,4 +35,17 @@ Match<std::string>(const std::string& str, const std::string& val, OpType op) {
PanicInfo("not supported");
}
}
template <>
inline bool
Match<std::string_view>(const std::string_view& str, const std::string& val, OpType op) {
switch (op) {
case OpType::PrefixMatch:
return PrefixMatch(str, val);
case OpType::PostfixMatch:
return PostfixMatch(str, val);
default:
PanicInfo("not supported");
}
}
} // namespace milvus::query

View File

@ -9,17 +9,18 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "query/generated/ExecExprVisitor.h"
#include <boost/variant.hpp>
#include <deque>
#include <optional>
#include <unordered_set>
#include <utility>
#include <boost/variant.hpp>
#include "query/ExprImpl.h"
#include "query/generated/ExecExprVisitor.h"
#include "segcore/SegmentGrowingImpl.h"
#include "query/Utils.h"
#include "query/Relational.h"
#include "query/Utils.h"
#include "segcore/SegmentGrowingImpl.h"
namespace milvus::query {
// THIS CONTAINS EXTRA BODY FOR VISITOR
@ -158,9 +159,10 @@ ExecExprVisitor::ExecRangeVisitorImpl(FieldId field_id, IndexFunc index_func, El
auto num_chunk = upper_div(row_count_, size_per_chunk);
std::deque<BitsetType> results;
using Index = index::ScalarIndex<T>;
typedef std::conditional_t<std::is_same_v<T, std::string_view>, std::string, T> IndexInnerType;
using Index = index::ScalarIndex<IndexInnerType>;
for (auto chunk_id = 0; chunk_id < indexing_barrier; ++chunk_id) {
const Index& indexing = segment_.chunk_scalar_index<T>(field_id, chunk_id);
const Index& indexing = segment_.chunk_scalar_index<IndexInnerType>(field_id, chunk_id);
// NOTE: knowhere is not const-ready
// This is a dirty workaround
auto data = index_func(const_cast<Index*>(&indexing));
@ -173,7 +175,8 @@ ExecExprVisitor::ExecRangeVisitorImpl(FieldId field_id, IndexFunc index_func, El
auto chunk = segment_.chunk_data<T>(field_id, chunk_id);
const T* data = chunk.data();
for (int index = 0; index < this_size; ++index) {
result[index] = element_func(data[index]);
auto x = data[index];
result[index] = element_func(x);
}
AssertInfo(result.size() == this_size, "");
results.emplace_back(std::move(result));
@ -215,9 +218,10 @@ ExecExprVisitor::ExecDataRangeVisitorImpl(FieldId field_id, IndexFunc index_func
// if sealed segment has loaded scalar index for this field, then index_barrier = 1 and data_barrier = 0
// in this case, sealed segment execute expr plan using scalar index
using Index = index::ScalarIndex<T>;
typedef std::conditional_t<std::is_same_v<T, std::string_view>, std::string, T> IndexInnerType;
using Index = index::ScalarIndex<IndexInnerType>;
for (auto chunk_id = data_barrier; chunk_id < indexing_barrier; ++chunk_id) {
auto& indexing = segment_.chunk_scalar_index<T>(field_id, chunk_id);
auto& indexing = segment_.chunk_scalar_index<IndexInnerType>(field_id, chunk_id);
auto this_size = const_cast<Index*>(&indexing)->Count();
BitsetType result(this_size);
for (int offset = 0; offset < this_size; ++offset) {
@ -236,10 +240,12 @@ ExecExprVisitor::ExecDataRangeVisitorImpl(FieldId field_id, IndexFunc index_func
template <typename T>
auto
ExecExprVisitor::ExecUnaryRangeVisitorDispatcher(UnaryRangeExpr& expr_raw) -> BitsetType {
auto& expr = static_cast<UnaryRangeExprImpl<T>&>(expr_raw);
using Index = index::ScalarIndex<T>;
typedef std::conditional_t<std::is_same_v<T, std::string_view>, std::string, T> IndexInnerType;
using Index = index::ScalarIndex<IndexInnerType>;
auto& expr = static_cast<UnaryRangeExprImpl<IndexInnerType>&>(expr_raw);
auto op = expr.op_type_;
auto val = expr.value_;
auto val = IndexInnerType(expr.value_);
switch (op) {
case OpType::Equal: {
auto index_func = [val](Index* index) { return index->In(1, &val); };
@ -412,12 +418,14 @@ ExecExprVisitor::ExecBinaryArithOpEvalRangeVisitorDispatcher(BinaryArithOpEvalRa
template <typename T>
auto
ExecExprVisitor::ExecBinaryRangeVisitorDispatcher(BinaryRangeExpr& expr_raw) -> BitsetType {
auto& expr = static_cast<BinaryRangeExprImpl<T>&>(expr_raw);
using Index = index::ScalarIndex<T>;
typedef std::conditional_t<std::is_same_v<T, std::string_view>, std::string, T> IndexInnerType;
using Index = index::ScalarIndex<IndexInnerType>;
auto& expr = static_cast<BinaryRangeExprImpl<IndexInnerType>&>(expr_raw);
bool lower_inclusive = expr.lower_inclusive_;
bool upper_inclusive = expr.upper_inclusive_;
T val1 = expr.lower_value_;
T val2 = expr.upper_value_;
IndexInnerType val1 = IndexInnerType(expr.lower_value_);
IndexInnerType val2 = IndexInnerType(expr.upper_value_);
auto index_func = [=](Index* index) { return index->Range(val1, lower_inclusive, val2, upper_inclusive); };
if (lower_inclusive && upper_inclusive) {
@ -472,7 +480,11 @@ ExecExprVisitor::visit(UnaryRangeExpr& expr) {
break;
}
case DataType::VARCHAR: {
res = ExecUnaryRangeVisitorDispatcher<std::string>(expr);
if (segment_.type() == SegmentType::Growing) {
res = ExecUnaryRangeVisitorDispatcher<std::string>(expr);
} else {
res = ExecUnaryRangeVisitorDispatcher<std::string_view>(expr);
}
break;
}
default:
@ -556,7 +568,11 @@ ExecExprVisitor::visit(BinaryRangeExpr& expr) {
break;
}
case DataType::VARCHAR: {
res = ExecBinaryRangeVisitorDispatcher<std::string>(expr);
if (segment_.type() == SegmentType::Growing) {
res = ExecBinaryRangeVisitorDispatcher<std::string>(expr);
} else {
res = ExecBinaryRangeVisitorDispatcher<std::string_view>(expr);
}
break;
}
default:
@ -676,8 +692,13 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op) -> BitsetTy
}
case DataType::VARCHAR: {
if (chunk_id < data_barrier) {
auto chunk_data = segment_.chunk_data<std::string>(field_id, chunk_id).data();
return [chunk_data](int i) -> const number { return chunk_data[i]; };
if (segment_.type() == SegmentType::Growing) {
auto chunk_data = segment_.chunk_data<std::string>(field_id, chunk_id).data();
return [chunk_data](int i) -> const number { return chunk_data[i]; };
} else {
auto chunk_data = segment_.chunk_data<std::string_view>(field_id, chunk_id).data();
return [chunk_data](int i) -> const number { return std::string(chunk_data[i]); };
}
} else {
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<std::string>(field_id, chunk_id);
@ -808,12 +829,19 @@ ExecExprVisitor::ExecTermVisitorImpl<std::string>(TermExpr& expr_raw) -> BitsetT
return ExecTermVisitorImplTemplate<std::string>(expr_raw);
}
template <>
auto
ExecExprVisitor::ExecTermVisitorImpl<std::string_view>(TermExpr& expr_raw) -> BitsetType {
return ExecTermVisitorImplTemplate<std::string_view>(expr_raw);
}
template <typename T>
auto
ExecExprVisitor::ExecTermVisitorImplTemplate(TermExpr& expr_raw) -> BitsetType {
auto& expr = static_cast<TermExprImpl<T>&>(expr_raw);
using Index = index::ScalarIndex<T>;
const auto& terms = expr.terms_;
typedef std::conditional_t<std::is_same_v<T, std::string_view>, std::string, T> IndexInnerType;
using Index = index::ScalarIndex<IndexInnerType>;
auto& expr = static_cast<TermExprImpl<IndexInnerType>&>(expr_raw);
const std::vector<IndexInnerType> terms(expr.terms_.begin(), expr.terms_.end());
auto n = terms.size();
std::unordered_set<T> term_set(expr.terms_.begin(), expr.terms_.end());
@ -894,7 +922,11 @@ ExecExprVisitor::visit(TermExpr& expr) {
break;
}
case DataType::VARCHAR: {
res = ExecTermVisitorImpl<std::string>(expr);
if (segment_.type() == SegmentType::Growing) {
res = ExecTermVisitorImpl<std::string>(expr);
} else {
res = ExecTermVisitorImpl<std::string_view>(expr);
}
break;
}
default:

View File

@ -9,12 +9,13 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "query/generated/ExecPlanNodeVisitor.h"
#include <utility>
#include "query/PlanImpl.h"
#include "query/generated/ExecPlanNodeVisitor.h"
#include "query/generated/ExecExprVisitor.h"
#include "query/SubSearchResult.h"
#include "query/generated/ExecExprVisitor.h"
#include "segcore/SegmentGrowing.h"
#include "utils/Json.h"

View File

@ -11,18 +11,19 @@
#pragma once
#include <memory>
#include <vector>
#include <string>
#include <algorithm>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "TimestampIndex.h"
#include "common/Schema.h"
#include "easylogging++.h"
#include "segcore/AckResponder.h"
#include "segcore/ConcurrentVector.h"
#include "segcore/Record.h"
#include "TimestampIndex.h"
namespace milvus::segcore {
@ -310,8 +311,8 @@ struct InsertRecord {
private:
// std::vector<std::unique_ptr<VectorBase>> fields_data_;
std::unordered_map<FieldId, std::unique_ptr<VectorBase>> fields_data_;
mutable std::shared_mutex shared_mutex_;
std::unordered_map<FieldId, std::unique_ptr<VectorBase>> fields_data_{};
mutable std::shared_mutex shared_mutex_{};
};
} // namespace milvus::segcore

View File

@ -38,6 +38,11 @@ class SegmentGrowing : public SegmentInternalInterface {
const Timestamp* timestamps,
const InsertData* insert_data) = 0;
virtual SegmentType
type() const override {
return SegmentType::Growing;
}
// virtual int64_t
// PreDelete(int64_t size) = 0;

View File

@ -10,11 +10,13 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "SegmentInterface.h"
#include <cstdint>
#include "Utils.h"
#include "common/SystemProperty.h"
#include "common/Types.h"
#include "query/generated/ExecPlanNodeVisitor.h"
#include "Utils.h"
namespace milvus::segcore {
@ -72,7 +74,6 @@ SegmentInternalInterface::Retrieve(const query::RetrievePlan* plan, Timestamp ti
query::ExecPlanNodeVisitor visitor(*this, timestamp);
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
retrieve_results.segment_ = (void*)this;
results->mutable_offset()->Add(retrieve_results.result_offsets_.begin(), retrieve_results.result_offsets_.end());
auto fields_data = results->mutable_fields_data();

View File

@ -80,6 +80,9 @@ class SegmentInterface {
virtual int64_t
get_segment_id() const = 0;
virtual SegmentType
type() const = 0;
};
// internal API for DSL calculation

View File

@ -33,6 +33,11 @@ class SegmentSealed : public SegmentInternalInterface {
DropIndex(const FieldId field_id) = 0;
virtual void
DropFieldData(const FieldId field_id) = 0;
virtual SegmentType
type() const override {
return SegmentType::Sealed;
}
};
using SegmentSealedPtr = std::unique_ptr<SegmentSealed>;

View File

@ -11,6 +11,11 @@
#include "SegmentSealedImpl.h"
#include <fcntl.h>
#include <fmt/core.h>
#include <filesystem>
#include "Utils.h"
#include "common/Consts.h"
#include "common/FieldMeta.h"
@ -195,20 +200,28 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
// Don't allow raw data and index exist at the same time
AssertInfo(!get_bit(index_ready_bitset_, field_id), "field data can't be loaded when indexing exists");
auto field_data = insert_record_.get_field_data_base(field_id);
AssertInfo(field_data->empty(), "already exists");
// insert data to insertRecord
field_data->fill_chunk_data(size, info.field_data, field_meta);
AssertInfo(field_data->num_chunk() == 1, "num chunk not equal to 1 for sealed segment");
void* field_data = nullptr;
size_t size = 0;
if (datatype_is_variable(data_type)) {
VariableField field(get_segment_id(), field_meta, info);
size = field.size();
field_data = reinterpret_cast<void*>(field.data());
variable_fields_.emplace(field_id, std::move(field));
} else {
field_data = CreateMap(get_segment_id(), field_meta, info);
fixed_fields_[field_id] = field_data;
size = field_meta.get_sizeof() * info.row_count;
}
// set pks to offset
if (schema_->get_primary_field_id() == field_id) {
AssertInfo(field_id.get() != -1, "Primary key is -1");
AssertInfo(insert_record_.empty_pks(), "already exists");
std::vector<PkType> pks(size);
std::vector<PkType> pks(info.row_count);
ParsePksFromFieldData(pks, *info.field_data);
for (int i = 0; i < size; ++i) {
for (int i = 0; i < info.row_count; ++i) {
insert_record_.insert_pk(pks[i], i);
}
insert_record_.seal_pks();
@ -216,6 +229,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
set_bit(field_data_ready_bitset_, field_id, true);
}
std::unique_lock lck(mutex_);
update_row_count(info.row_count);
}
@ -253,9 +267,7 @@ SegmentSealedImpl::num_chunk_index(FieldId field_id) const {
int64_t
SegmentSealedImpl::num_chunk_data(FieldId field_id) const {
auto field_data = insert_record_.get_field_data_base(field_id);
AssertInfo(field_data != nullptr, "null field data ptr");
return field_data->num_chunk();
return get_bit(field_data_ready_bitset_, field_id) ? 1 : 0;
}
int64_t
@ -275,6 +287,14 @@ SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
"Can't get bitset element at " + std::to_string(field_id.get()));
auto& field_meta = schema_->operator[](field_id);
auto element_sizeof = field_meta.get_sizeof();
if (auto it = fixed_fields_.find(field_id); it != fixed_fields_.end()) {
auto field_data = it->second;
return SpanBase(field_data, get_row_count(), element_sizeof);
}
if (auto it = variable_fields_.find(field_id); it != variable_fields_.end()) {
auto& field = it->second;
return SpanBase(field.views().data(), field.views().size(), sizeof(std::string_view));
}
auto field_data = insert_record_.get_field_data_base(field_id);
AssertInfo(field_data->num_chunk() == 1, "num chunk not equal to 1 for sealed segment");
return field_data->get_span_base(0);
@ -349,8 +369,8 @@ SegmentSealedImpl::vector_search(SearchInfo& search_info,
"Field Data is not loaded: " + std::to_string(field_id.get()));
AssertInfo(row_count_opt_.has_value(), "Can't get row count value");
auto row_count = row_count_opt_.value();
query::SearchOnSealed(*schema_, insert_record_, search_info, query_data, query_count, row_count, bitset,
output);
auto vec_data = fixed_fields_.at(field_id);
query::SearchOnSealed(*schema_, vec_data, search_info, query_data, query_count, row_count, bitset, output);
}
}
@ -458,6 +478,22 @@ SegmentSealedImpl::bulk_subscript_impl(const void* src_raw, const int64_t* seg_o
}
}
template <typename T>
void
SegmentSealedImpl::bulk_subscript_impl(const VariableField& field,
const int64_t* seg_offsets,
int64_t count,
void* dst_raw) {
auto dst = reinterpret_cast<T*>(dst_raw);
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
if (offset != INVALID_SEG_OFFSET) {
auto entry = field[offset];
dst[i] = std::move(T(entry.data(), entry.row_count()));
}
}
}
// for vector
void
SegmentSealedImpl::bulk_subscript_impl(
@ -506,10 +542,22 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, const int64_t* seg_offsets,
}
Assert(get_bit(field_data_ready_bitset_, field_id));
auto field_data = insert_record_.get_field_data_base(field_id);
AssertInfo(field_data->num_chunk() == 1, std::string("num chunk not equal to 1 for sealed segment, num_chunk: ") +
std::to_string(field_data->num_chunk()));
auto src_vec = field_data->get_chunk_data(0);
if (datatype_is_variable(field_meta.get_data_type())) {
switch (field_meta.get_data_type()) {
case DataType::VARCHAR:
case DataType::STRING: {
FixedVector<std::string> output(count);
bulk_subscript_impl<std::string>(variable_fields_.at(field_id), seg_offsets, count, output.data());
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
}
default:
PanicInfo(fmt::format("unsupported data type: {}", datatype_name(field_meta.get_data_type())));
}
}
auto src_vec = fixed_fields_.at(field_id);
switch (field_meta.get_data_type()) {
case DataType::BOOL: {
FixedVector<bool> output(count);
@ -546,11 +594,6 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, const int64_t* seg_offsets,
bulk_subscript_impl<double>(src_vec, seg_offsets, count, output.data());
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
}
case DataType::VARCHAR: {
FixedVector<std::string> output(count);
bulk_subscript_impl<std::string>(src_vec, seg_offsets, count, output.data());
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
}
case DataType::VECTOR_FLOAT:
case DataType::VECTOR_BINARY: {

View File

@ -11,15 +11,16 @@
#pragma once
#include <tbb/concurrent_priority_queue.h>
#include <tbb/concurrent_vector.h>
#include <deque>
#include <unordered_map>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include <tbb/concurrent_priority_queue.h>
#include <tbb/concurrent_vector.h>
#include "ConcurrentVector.h"
#include "DeletedRecord.h"
@ -27,13 +28,24 @@
#include "SealedIndexingRecord.h"
#include "SegmentSealed.h"
#include "TimestampIndex.h"
#include "VariableField.h"
#include "index/ScalarIndex.h"
#include "sys/mman.h"
namespace milvus::segcore {
class SegmentSealedImpl : public SegmentSealed {
public:
explicit SegmentSealedImpl(SchemaPtr schema, int64_t segment_id);
~SegmentSealedImpl() {
for (auto& [field_id, data] : fixed_fields_) {
auto field_meta = schema_->operator[](field_id);
auto data_type = field_meta.get_data_type();
if (munmap(data, field_meta.get_sizeof() * get_row_count())) {
AssertInfo(true, "failed to unmap field " + std::to_string(field_id.get()) + " err=" + strerror(errno));
}
}
}
void
LoadIndex(const LoadIndexInfo& info) override;
void
@ -122,6 +134,10 @@ class SegmentSealedImpl : public SegmentSealed {
static void
bulk_subscript_impl(const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw);
template <typename T>
static void
bulk_subscript_impl(const VariableField& field, const int64_t* seg_offsets, int64_t count, void* dst_raw);
static void
bulk_subscript_impl(
int64_t element_sizeof, const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw);
@ -131,11 +147,11 @@ class SegmentSealedImpl : public SegmentSealed {
void
update_row_count(int64_t row_count) {
if (row_count_opt_.has_value()) {
AssertInfo(row_count_opt_.value() == row_count, "load data has different row count from other columns");
} else {
row_count_opt_ = row_count;
}
// if (row_count_opt_.has_value()) {
// AssertInfo(row_count_opt_.value() == row_count, "load data has different row count from other columns");
// } else {
row_count_opt_ = row_count;
// }
}
void
@ -200,6 +216,8 @@ class SegmentSealedImpl : public SegmentSealed {
SchemaPtr schema_;
int64_t id_;
std::unordered_map<FieldId, void*> fixed_fields_;
std::unordered_map<FieldId, VariableField> variable_fields_;
};
inline SegmentSealedPtr

View File

@ -0,0 +1,89 @@
#pragma once
#include <sys/mman.h>
#include <string_view>
#include <vector>
#include "common/LoadInfo.h"
namespace milvus::segcore {
struct Entry {
char* data;
uint32_t length;
};
// Used for string/varchar field only,
// TODO(yah01): make this generic
class VariableField {
public:
explicit VariableField(int64_t segment_id, const FieldMeta& field_meta, const LoadFieldDataInfo& info) {
auto begin = info.field_data->scalars().string_data().data().begin();
auto end = info.field_data->scalars().string_data().data().end();
indices_.reserve(info.row_count);
while (begin != end) {
indices_.push_back(size_);
size_ += begin->size();
begin++;
}
data_ = (char*)CreateMap(segment_id, field_meta, info);
construct_views();
}
VariableField(VariableField&& field)
: indices_(std::move(field.indices_)), size_(field.size_), data_(field.data_), views_(std::move(field.views_)) {
field.data_ = nullptr;
}
~VariableField() {
if (data_ != MAP_FAILED && data_ != nullptr) {
if (munmap(data_, size_)) {
AssertInfo(true, std::string("failed to unmap variable field err=") + strerror(errno));
}
}
}
char*
data() {
return data_;
}
const std::vector<std::string_view>&
views() const {
return views_;
}
size_t
size() const {
return size_;
}
Span<char>
operator[](const int i) const {
uint64_t next = (i + 1 == indices_.size()) ? size_ : indices_[i + 1];
uint64_t offset = indices_[i];
return Span<char>(data_ + offset, uint32_t(next - offset));
}
protected:
void
construct_views() {
views_.reserve(indices_.size());
for (size_t i = 0; i < indices_.size() - 1; i++) {
views_.emplace_back(data_ + indices_[i], indices_[i + 1] - indices_[i]);
}
views_.emplace_back(data_ + indices_.back(), size_ - indices_.back());
}
private:
std::vector<uint64_t> indices_{};
uint64_t size_{0};
char* data_{nullptr};
// Compatible with current Span type
std::vector<std::string_view> views_{};
};
} // namespace milvus::segcore

View File

@ -9,18 +9,18 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "segcore/segment_c.h"
#include "common/CGoHelper.h"
#include "common/LoadInfo.h"
#include "common/Types.h"
#include "common/type_c.h"
#include "google/protobuf/text_format.h"
#include "index/IndexInfo.h"
#include "log/Log.h"
#include "segcore/Collection.h"
#include "segcore/SegmentGrowingImpl.h"
#include "segcore/SegmentSealedImpl.h"
#include "segcore/segment_c.h"
#include "index/IndexInfo.h"
#include "google/protobuf/text_format.h"
////////////////////////////// common interfaces //////////////////////////////
CSegmentInterface
@ -206,8 +206,8 @@ LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_in
auto field_data = std::make_unique<milvus::DataArray>();
auto suc = field_data->ParseFromArray(load_field_data_info.blob, load_field_data_info.blob_size);
AssertInfo(suc, "unmarshal field data string failed");
auto load_info =
LoadFieldDataInfo{load_field_data_info.field_id, field_data.get(), load_field_data_info.row_count};
auto load_info = LoadFieldDataInfo{load_field_data_info.field_id, field_data.get(),
load_field_data_info.row_count, load_field_data_info.mmap_dir_path};
segment->LoadFieldData(load_info);
return milvus::SuccessCStatus();
} catch (std::exception& e) {

View File

@ -9,27 +9,27 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <boost/format.hpp>
#include <chrono>
#include <google/protobuf/text_format.h>
#include <gtest/gtest.h>
#include <boost/format.hpp>
#include <chrono>
#include <iostream>
#include <random>
#include <string>
#include <unordered_set>
#include "knowhere/comp/index_param.h"
#include "common/LoadInfo.h"
#include "index/IndexFactory.h"
#include "knowhere/comp/index_param.h"
#include "pb/plan.pb.h"
#include "query/ExprImpl.h"
#include "segcore/Collection.h"
#include "segcore/reduce_c.h"
#include "segcore/Reduce.h"
#include "segcore/reduce_c.h"
#include "test_utils/DataGen.h"
#include "index/IndexFactory.h"
#include "test_utils/indexbuilder_test_utils.h"
#include "test_utils/PbHelper.h"
#include "test_utils/indexbuilder_test_utils.h"
namespace chrono = std::chrono;

View File

@ -13,17 +13,18 @@
#include <segcore/ConcurrentVector.h>
#include "common/Types.h"
#include "common/Span.h"
#include "common/VectorTrait.h"
TEST(Common, Span) {
using namespace milvus;
using namespace milvus::segcore;
Span<float> s1(nullptr, 100);
Span<FloatVector> s2(nullptr, 10, 16 * sizeof(float));
Span<milvus::FloatVector> s2(nullptr, 10, 16 * sizeof(float));
SpanBase b1 = s1;
SpanBase b2 = s2;
auto r1 = static_cast<Span<float>>(b1);
auto r2 = static_cast<Span<FloatVector>>(b2);
auto r2 = static_cast<Span<milvus::FloatVector>>(b2);
ASSERT_EQ(r1.row_count(), 100);
ASSERT_EQ(r2.row_count(), 10);
ASSERT_EQ(r2.element_sizeof(), 16 * sizeof(float));

View File

@ -352,6 +352,9 @@ TEST(Sealed, LoadFieldData) {
auto double_id = schema->AddDebugField("double", DataType::DOUBLE);
auto nothing_id = schema->AddDebugField("nothing", DataType::INT32);
auto str_id = schema->AddDebugField("str", DataType::VARCHAR);
schema->AddDebugField("int8", DataType::INT8);
schema->AddDebugField("int16", DataType::INT16);
schema->AddDebugField("float", DataType::FLOAT);
schema->set_primary_field_id(counter_id);
auto dataset = DataGen(schema, N);
@ -398,7 +401,6 @@ TEST(Sealed, LoadFieldData) {
ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time));
SealedLoadFieldData(dataset, *segment);
segment->DropFieldData(nothing_id);
segment->Search(plan.get(), ph_group.get(), time);
segment->DropFieldData(fakevec_id);
@ -415,7 +417,101 @@ TEST(Sealed, LoadFieldData) {
ASSERT_EQ(segment->num_chunk_index(str_id), 0);
auto chunk_span1 = segment->chunk_data<int64_t>(counter_id, 0);
auto chunk_span2 = segment->chunk_data<double>(double_id, 0);
auto chunk_span3 = segment->chunk_data<std::string>(str_id, 0);
auto chunk_span3 = segment->chunk_data<std::string_view>(str_id, 0);
auto ref1 = dataset.get_col<int64_t>(counter_id);
auto ref2 = dataset.get_col<double>(double_id);
auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();
for (int i = 0; i < N; ++i) {
ASSERT_EQ(chunk_span1[i], ref1[i]);
ASSERT_EQ(chunk_span2[i], ref2[i]);
ASSERT_EQ(chunk_span3[i], ref3[i]);
}
auto sr = segment->Search(plan.get(), ph_group.get(), time);
auto json = SearchResultToJson(*sr);
std::cout << json.dump(1);
segment->DropIndex(fakevec_id);
ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time));
}
TEST(Sealed, LoadFieldDataMmap) {
auto dim = 16;
auto topK = 5;
auto N = ROW_COUNT;
auto metric_type = knowhere::metric::L2;
auto schema = std::make_shared<Schema>();
auto fakevec_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type);
auto counter_id = schema->AddDebugField("counter", DataType::INT64);
auto double_id = schema->AddDebugField("double", DataType::DOUBLE);
auto nothing_id = schema->AddDebugField("nothing", DataType::INT32);
auto str_id = schema->AddDebugField("str", DataType::VARCHAR);
schema->AddDebugField("int8", DataType::INT8);
schema->AddDebugField("int16", DataType::INT16);
schema->AddDebugField("float", DataType::FLOAT);
schema->set_primary_field_id(counter_id);
auto dataset = DataGen(schema, N);
auto fakevec = dataset.get_col<float>(fakevec_id);
auto indexing = GenVecIndexing(N, dim, fakevec.data());
auto segment = CreateSealedSegment(schema);
std::string dsl = R"({
"bool": {
"must": [
{
"range": {
"double": {
"GE": -1,
"LT": 1
}
}
},
{
"vector": {
"fakevec": {
"metric_type": "L2",
"params": {
"nprobe": 10
},
"query": "$0",
"topk": 5,
"round_decimal": 3
}
}
}
]
}
})";
Timestamp time = 1000000;
auto plan = CreatePlan(*schema, dsl);
auto num_queries = 5;
auto ph_group_raw = CreatePlaceholderGroup(num_queries, 16, 1024);
auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time));
SealedLoadFieldData(dataset, *segment, {}, true);
segment->Search(plan.get(), ph_group.get(), time);
segment->DropFieldData(fakevec_id);
ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time));
LoadIndexInfo vec_info;
vec_info.field_id = fakevec_id.get();
vec_info.index = std::move(indexing);
vec_info.index_params["metric_type"] = knowhere::metric::L2;
segment->LoadIndex(vec_info);
ASSERT_EQ(segment->num_chunk(), 1);
ASSERT_EQ(segment->num_chunk_index(double_id), 0);
ASSERT_EQ(segment->num_chunk_index(str_id), 0);
auto chunk_span1 = segment->chunk_data<int64_t>(counter_id, 0);
auto chunk_span2 = segment->chunk_data<double>(double_id, 0);
auto chunk_span3 = segment->chunk_data<std::string_view>(str_id, 0);
auto ref1 = dataset.get_col<int64_t>(counter_id);
auto ref2 = dataset.get_col<double>(double_id);
auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();
@ -431,36 +527,6 @@ TEST(Sealed, LoadFieldData) {
segment->DropIndex(fakevec_id);
ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time));
// segment->LoadIndex(vec_info);
// auto sr2 = segment->Search(plan.get(), ph_group.get(), time);
// auto json2 = SearchResultToJson(*sr);
// ASSERT_EQ(json.dump(-2), json2.dump(-2));
// segment->DropFieldData(double_id);
// ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), time));
// #ifdef __linux__
// auto std_json = Json::parse(R"(
//[
// [
// ["982->0.000000", "25315->4.742000", "57893->4.758000", "48201->6.075000", "53853->6.223000"],
// ["41772->10.111000", "74859->11.790000", "79777->11.842000", "3785->11.983000", "35888->12.193000"],
// ["59251->2.543000", "65551->4.454000", "72204->5.332000", "96905->5.479000", "87833->5.765000"],
// ["59219->5.458000", "21995->6.078000", "97922->6.764000", "25710->7.158000", "14048->7.294000"],
// ["66353->5.696000", "30664->5.881000", "41087->5.917000", "10393->6.633000", "90215->7.202000"]
// ]
//])");
// #else // for mac
// auto std_json = Json::parse(R"(
//[
// [
// ["982->0.000000", "31864->4.270000", "18916->4.651000", "71547->5.125000", "86706->5.991000"],
// ["96984->4.192000", "65514->6.011000", "89328->6.138000", "80284->6.526000", "68218->6.563000"],
// ["30119->2.464000", "82365->4.725000", "74834->5.009000", "79995->5.725000", "33359->5.816000"],
// ["99625->6.129000", "86582->6.900000", "85934->7.792000", "60450->8.087000", "19257->8.530000"],
// ["37759->3.581000", "31292->5.780000", "98124->6.216000", "63535->6.439000", "11707->6.553000"]
// ]
//])");
// #endif
// ASSERT_EQ(std_json.dump(-2), json.dump(-2));
}
TEST(Sealed, LoadScalarIndex) {

View File

@ -425,7 +425,10 @@ SearchResultToJson(const SearchResult& sr) {
};
inline void
SealedLoadFieldData(const GeneratedData& dataset, SegmentSealed& seg, const std::set<int64_t>& exclude_fields = {}) {
SealedLoadFieldData(const GeneratedData& dataset,
SegmentSealed& seg,
const std::set<int64_t>& exclude_fields = {},
bool with_mmap = false) {
auto row_count = dataset.row_ids_.size();
{
LoadFieldDataInfo info;
@ -451,6 +454,9 @@ SealedLoadFieldData(const GeneratedData& dataset, SegmentSealed& seg, const std:
continue;
}
LoadFieldDataInfo info;
if (with_mmap) {
info.mmap_dir_path = "./data/mmap-test";
}
info.field_id = field_data.field_id();
info.row_count = row_count;
info.field_data = &field_data;

View File

@ -295,6 +295,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
indexBlobs,
)
if err != nil {
log.Warn("failed to serialize index", zap.Error(err))
return err
}
encodeIndexFileDur := it.tr.Record("index codec serialize done")

View File

@ -780,6 +780,7 @@ func (s *Segment) segmentDelete(offset int64, entityIDs []primaryKey, timestamps
}
// -------------------------------------------------------------------------------------- interfaces for sealed segment
func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *schemapb.FieldData) error {
/*
CStatus
@ -800,11 +801,18 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *sche
return err
}
var mmapDirPath *C.char = nil
path := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()
if len(path) > 0 {
mmapDirPath = C.CString(path)
defer C.free(unsafe.Pointer(mmapDirPath))
}
loadInfo := C.CLoadFieldDataInfo{
field_id: C.int64_t(fieldID),
blob: (*C.uint8_t)(unsafe.Pointer(&dataBlob[0])),
blob_size: C.uint64_t(len(dataBlob)),
row_count: C.int64_t(rowCount),
field_id: C.int64_t(fieldID),
blob: (*C.uint8_t)(unsafe.Pointer(&dataBlob[0])),
blob_size: C.uint64_t(len(dataBlob)),
row_count: C.int64_t(rowCount),
mmap_dir_path: mmapDirPath,
}
status := C.LoadFieldData(s.segmentPtr, loadInfo)

View File

@ -41,6 +41,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
func TestSegmentLoader_loadSegment(t *testing.T) {
@ -82,6 +83,40 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
assert.NoError(t, err)
})
t.Run("test load segment with mmap", func(t *testing.T) {
key := paramtable.Get().QueryNodeCfg.MmapDirPath.Key
paramtable.Get().Save(key, "/tmp/mmap-test")
defer paramtable.Get().Reset(key)
node, err := genSimpleQueryNode(ctx)
require.NoError(t, err)
defer node.Stop()
node.metaReplica.removeSegment(defaultSegmentID, segmentTypeSealed)
loader := node.loader
assert.NotNil(t, loader)
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadSegments,
MsgID: rand.Int63(),
},
DstNodeID: 0,
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
Statslogs: statsLog,
},
},
}
_, err = loader.LoadSegment(ctx, req, segmentTypeSealed)
assert.NoError(t, err)
})
t.Run("test load segment error due to partial success", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)

View File

@ -1305,6 +1305,7 @@ type queryNodeConfig struct {
// cache limit
CacheEnabled ParamItem `refreshable:"false"`
CacheMemoryLimit ParamItem `refreshable:"false"`
MmapDirPath ParamItem `refreshable:"false"`
GroupEnabled ParamItem `refreshable:"true"`
MaxReceiveChanSize ParamItem `refreshable:"false"`
@ -1449,6 +1450,14 @@ func (p *queryNodeConfig) init(base *BaseTable) {
}
p.CacheEnabled.Init(base.mgr)
p.MmapDirPath = ParamItem{
Key: "queryNode.mmapDirPath",
Version: "2.3.0",
DefaultValue: "",
Doc: "The folder that storing data files for mmap, setting to a path will enable Milvus to load data with mmap",
}
p.MmapDirPath.Init(base.mgr)
p.GroupEnabled = ParamItem{
Key: "queryNode.grouping.enabled",
Version: "2.0.0",