enhance: pass partition key scalar info if enable for vector mem index (#39123)

issue: #34332

---------

Signed-off-by: chasingegg <chao.gao@zilliz.com>
pull/39338/head
Gao 2025-01-16 14:33:03 +08:00 committed by GitHub
parent 1f6fd54146
commit 75d7978a18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 175 additions and 62 deletions

View File

@ -496,7 +496,7 @@ struct TypeTraits<DataType::INT32> {
template <>
struct TypeTraits<DataType::INT64> {
using NativeType = int32_t;
using NativeType = int64_t;
static constexpr DataType TypeKind = DataType::INT64;
static constexpr bool IsPrimitiveType = true;
static constexpr bool IsFixedWidth = true;

View File

@ -285,6 +285,13 @@ VectorMemIndex<T>::Build(const Config& config) {
auto field_datas =
file_manager_->CacheRawDataToMemory(insert_files.value());
auto opt_fields = GetValueFromConfig<OptFieldT>(config, VEC_OPT_FIELDS);
std::unordered_map<int64_t, std::vector<std::vector<uint32_t>>> scalar_info;
if (opt_fields.has_value() && index_.IsAdditionalScalarSupported() &&
config.value("partition_key_isolation", false)) {
scalar_info = file_manager_->CacheOptFieldToMemory(opt_fields.value());
}
Config build_config;
build_config.update(config);
build_config.erase("insert_files");
@ -312,6 +319,9 @@ VectorMemIndex<T>::Build(const Config& config) {
field_datas.clear();
auto dataset = GenDataset(total_num_rows, dim, buf.get());
if (!scalar_info.empty()) {
dataset->Set(knowhere::meta::SCALAR_INFO, std::move(scalar_info));
}
BuildWithDataset(dataset, build_config);
} else {
// sparse
@ -342,6 +352,9 @@ VectorMemIndex<T>::Build(const Config& config) {
}
auto dataset = GenDataset(total_rows, dim, vec.data());
dataset->SetIsSparse(true);
if (!scalar_info.empty()) {
dataset->Set(knowhere::meta::SCALAR_INFO, std::move(scalar_info));
}
BuildWithDataset(dataset, build_config);
}
}

View File

@ -56,7 +56,9 @@ ProtoParser::PlanNodeFromProto(const planpb::PlanNode& plan_node_proto) {
// currently, iterative filter does not support range search
if (!search_info.search_params_.contains(RADIUS)) {
if (query_info_proto.hints() != "") {
if (query_info_proto.hints() == ITERATIVE_FILTER) {
if (query_info_proto.hints() == "disable") {
search_info.iterative_filter_execution = false;
} else if (query_info_proto.hints() == ITERATIVE_FILTER) {
search_info.iterative_filter_execution = true;
} else {
// check if hints is valid
@ -64,9 +66,7 @@ ProtoParser::PlanNodeFromProto(const planpb::PlanNode& plan_node_proto) {
"hints: {} not supported",
query_info_proto.hints());
}
}
if (!search_info.iterative_filter_execution &&
search_info.search_params_.contains(HINTS)) {
} else if (search_info.search_params_.contains(HINTS)) {
if (search_info.search_params_[HINTS] == ITERATIVE_FILTER) {
search_info.iterative_filter_execution = true;
} else {

View File

@ -359,16 +359,6 @@ DiskFileManagerImpl::CacheTextLogToDisk(
}
}
void
SortByPath(std::vector<std::string>& paths) {
std::sort(paths.begin(),
paths.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of("/") + 1)) <
std::stol(b.substr(b.find_last_of("/") + 1));
});
}
template <typename DataType>
std::string
DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
@ -480,20 +470,6 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
return local_data_path;
}
template <typename T, typename = void>
struct has_native_type : std::false_type {};
template <typename T>
struct has_native_type<T, std::void_t<typename T::NativeType>>
: std::true_type {};
template <DataType T>
using DataTypeNativeOrVoid =
typename std::conditional<has_native_type<TypeTraits<T>>::value,
typename TypeTraits<T>::NativeType,
void>::type;
template <DataType T>
using DataTypeToOffsetMap =
std::unordered_map<DataTypeNativeOrVoid<T>, int64_t>;
template <DataType T>
bool
WriteOptFieldIvfDataImpl(
@ -515,7 +491,7 @@ WriteOptFieldIvfDataImpl(
}
// Do not write to disk if there is only one value
if (mp.size() == 1) {
if (mp.size() <= 1) {
return false;
}
@ -626,23 +602,10 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
local_chunk_manager, segment_id, vec_field_id) +
std::string(VEC_OPT_FIELDS);
local_chunk_manager->CreateFile(local_data_path);
std::vector<FieldDataPtr> field_datas;
std::vector<std::string> batch_files;
uint64_t write_offset = 0;
WriteOptFieldsIvfMeta(
local_chunk_manager, local_data_path, num_of_fields, write_offset);
auto FetchRawData = [&]() {
auto fds = GetObjectData(rcm_.get(), batch_files);
for (size_t i = 0; i < batch_files.size(); ++i) {
auto data = fds[i].get()->GetFieldData();
field_datas.emplace_back(data);
}
};
auto parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
std::unordered_set<int64_t> actual_field_ids;
for (auto& [field_id, tup] : fields_map) {
const auto& field_type = std::get<1>(tup);
@ -652,19 +615,10 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
return "";
}
std::vector<FieldDataPtr>().swap(field_datas);
SortByPath(field_paths);
std::vector<FieldDataPtr> field_datas =
FetchFieldData(rcm_.get(), field_paths);
for (auto& file : field_paths) {
if (batch_files.size() >= parallel_degree) {
FetchRawData();
batch_files.clear();
}
batch_files.emplace_back(file);
}
if (batch_files.size() > 0) {
FetchRawData();
}
if (WriteOptFieldIvfData(field_type,
field_id,
local_chunk_manager,

View File

@ -126,12 +126,7 @@ MemFileManagerImpl::LoadIndexToMemory(
std::vector<FieldDataPtr>
MemFileManagerImpl::CacheRawDataToMemory(
std::vector<std::string> remote_files) {
std::sort(remote_files.begin(),
remote_files.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of("/") + 1)) <
std::stol(b.substr(b.find_last_of("/") + 1));
});
SortByPath(remote_files);
auto parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
@ -161,6 +156,90 @@ MemFileManagerImpl::CacheRawDataToMemory(
return field_datas;
}
template <DataType T>
std::vector<std::vector<uint32_t>>
GetOptFieldIvfDataImpl(const std::vector<FieldDataPtr>& field_datas) {
using FieldDataT = DataTypeNativeOrVoid<T>;
std::unordered_map<FieldDataT, std::vector<uint32_t>> mp;
uint32_t offset = 0;
for (const auto& field_data : field_datas) {
for (int64_t i = 0; i < field_data->get_num_rows(); ++i) {
auto val =
*reinterpret_cast<const FieldDataT*>(field_data->RawValue(i));
mp[val].push_back(offset++);
}
}
// opt field data is not used if there is only one value
if (mp.size() <= 1) {
return {};
}
std::vector<std::vector<uint32_t>> scalar_info;
scalar_info.reserve(mp.size());
for (auto& [field_id, tup] : mp) {
scalar_info.emplace_back(std::move(tup));
}
LOG_INFO("Get opt fields with {} categories", scalar_info.size());
return scalar_info;
}
std::vector<std::vector<uint32_t>>
GetOptFieldIvfData(const DataType& dt,
const std::vector<FieldDataPtr>& field_datas) {
switch (dt) {
case DataType::BOOL:
return GetOptFieldIvfDataImpl<DataType::BOOL>(field_datas);
case DataType::INT8:
return GetOptFieldIvfDataImpl<DataType::INT8>(field_datas);
case DataType::INT16:
return GetOptFieldIvfDataImpl<DataType::INT16>(field_datas);
case DataType::INT32:
return GetOptFieldIvfDataImpl<DataType::INT32>(field_datas);
case DataType::INT64:
return GetOptFieldIvfDataImpl<DataType::INT64>(field_datas);
case DataType::FLOAT:
return GetOptFieldIvfDataImpl<DataType::FLOAT>(field_datas);
case DataType::DOUBLE:
return GetOptFieldIvfDataImpl<DataType::DOUBLE>(field_datas);
case DataType::STRING:
return GetOptFieldIvfDataImpl<DataType::STRING>(field_datas);
case DataType::VARCHAR:
return GetOptFieldIvfDataImpl<DataType::VARCHAR>(field_datas);
default:
LOG_WARN("Unsupported data type in optional scalar field: ", dt);
return {};
}
return {};
}
std::unordered_map<int64_t, std::vector<std::vector<uint32_t>>>
MemFileManagerImpl::CacheOptFieldToMemory(OptFieldT& fields_map) {
const uint32_t num_of_fields = fields_map.size();
if (0 == num_of_fields) {
return {};
} else if (num_of_fields > 1) {
PanicInfo(
ErrorCode::NotImplemented,
"vector index build with multiple fields is not supported yet");
}
std::unordered_map<int64_t, std::vector<std::vector<uint32_t>>> res;
for (auto& [field_id, tup] : fields_map) {
const auto& field_type = std::get<1>(tup);
auto& field_paths = std::get<2>(tup);
if (0 == field_paths.size()) {
LOG_WARN("optional field {} has no data", field_id);
return {};
}
SortByPath(field_paths);
std::vector<FieldDataPtr> field_datas =
FetchFieldData(rcm_.get(), field_paths);
res[field_id] = GetOptFieldIvfData(field_type, field_datas);
}
return res;
}
std::optional<bool>
MemFileManagerImpl::IsExisted(const std::string& filename) noexcept {
// TODO: implement this interface

View File

@ -21,6 +21,7 @@
#include <string>
#include <vector>
#include <memory>
#include <unordered_map>
#include "storage/IndexData.h"
#include "storage/FileManager.h"
@ -69,6 +70,9 @@ class MemFileManagerImpl : public FileManagerImpl {
return added_total_mem_size_;
}
std::unordered_map<int64_t, std::vector<std::vector<uint32_t>>>
CacheOptFieldToMemory(OptFieldT& fields_map);
private:
// remote file path
std::map<std::string, int64_t> remote_paths_to_size_;

View File

@ -865,4 +865,32 @@ MergeFieldData(std::vector<FieldDataPtr>& data_array) {
return merged_data;
}
std::vector<FieldDataPtr>
FetchFieldData(ChunkManager* cm, const std::vector<std::string>& remote_files) {
std::vector<FieldDataPtr> field_datas;
std::vector<std::string> batch_files;
auto FetchRawData = [&]() {
auto fds = GetObjectData(cm, batch_files);
for (size_t i = 0; i < batch_files.size(); ++i) {
auto data = fds[i].get()->GetFieldData();
field_datas.emplace_back(data);
}
};
auto parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
for (auto& file : remote_files) {
if (batch_files.size() >= parallel_degree) {
FetchRawData();
batch_files.clear();
}
batch_files.emplace_back(file);
}
if (batch_files.size() > 0) {
FetchRawData();
}
return field_datas;
}
} // namespace milvus::storage

View File

@ -163,4 +163,31 @@ CollectFieldDataChannel(FieldDataChannelPtr& channel);
FieldDataPtr
MergeFieldData(std::vector<FieldDataPtr>& data_array);
template <typename T, typename = void>
struct has_native_type : std::false_type {};
template <typename T>
struct has_native_type<T, std::void_t<typename T::NativeType>>
: std::true_type {};
template <DataType T>
using DataTypeNativeOrVoid =
typename std::conditional<has_native_type<TypeTraits<T>>::value,
typename TypeTraits<T>::NativeType,
void>::type;
template <DataType T>
using DataTypeToOffsetMap =
std::unordered_map<DataTypeNativeOrVoid<T>, int64_t>;
std::vector<FieldDataPtr>
FetchFieldData(ChunkManager* cm, const std::vector<std::string>& batch_files);
inline void
SortByPath(std::vector<std::string>& paths) {
std::sort(paths.begin(),
paths.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of("/") + 1)) <
std::stol(b.substr(b.find_last_of("/") + 1));
});
}
} // namespace milvus::storage

View File

@ -371,7 +371,7 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error {
return err
}
hasPartitionKey := hasParitionKeyModeField(t.schema)
hasPartitionKey := hasPartitionKeyModeField(t.schema)
if _, err := validatePartitionKeyIsolation(ctx, t.CollectionName, hasPartitionKey, t.GetProperties()...); err != nil {
return err
}

View File

@ -342,6 +342,8 @@ func setQueryInfoIfMvEnable(queryInfo *planpb.QueryInfo, t *searchTask, plan *pl
if err != nil {
return err
}
// force set hints to disable
queryInfo.Hints = "disable"
}
queryInfo.MaterializedViewInvolved = true
} else {

View File

@ -3461,6 +3461,7 @@ func (s *MaterializedViewTestSuite) TestMvNotEnabledWithNoPartitionKey() {
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(false, task.queryInfos[0].MaterializedViewInvolved)
s.Equal("", task.queryInfos[0].Hints)
}
func (s *MaterializedViewTestSuite) TestMvNotEnabledWithPartitionKey() {
@ -3477,6 +3478,7 @@ func (s *MaterializedViewTestSuite) TestMvNotEnabledWithPartitionKey() {
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(false, task.queryInfos[0].MaterializedViewInvolved)
s.Equal("", task.queryInfos[0].Hints)
}
func (s *MaterializedViewTestSuite) TestMvEnabledNoPartitionKey() {
@ -3490,6 +3492,7 @@ func (s *MaterializedViewTestSuite) TestMvEnabledNoPartitionKey() {
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(false, task.queryInfos[0].MaterializedViewInvolved)
s.Equal("", task.queryInfos[0].Hints)
}
func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnInt64() {
@ -3506,6 +3509,7 @@ func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnInt64() {
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(true, task.queryInfos[0].MaterializedViewInvolved)
s.Equal("disable", task.queryInfos[0].Hints)
}
func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarChar() {
@ -3522,6 +3526,7 @@ func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarChar() {
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(true, task.queryInfos[0].MaterializedViewInvolved)
s.Equal("disable", task.queryInfos[0].Hints)
}
func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarCharWithIsolation() {
@ -3540,6 +3545,7 @@ func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarCharWithIsolat
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(true, task.queryInfos[0].MaterializedViewInvolved)
s.Equal("disable", task.queryInfos[0].Hints)
}
}

View File

@ -1784,7 +1784,7 @@ func isPartitionKeyMode(ctx context.Context, dbName string, colName string) (boo
return false, nil
}
func hasParitionKeyModeField(schema *schemapb.CollectionSchema) bool {
func hasPartitionKeyModeField(schema *schemapb.CollectionSchema) bool {
for _, fieldSchema := range schema.GetFields() {
if fieldSchema.IsPartitionKey {
return true