enhance: the estimate method when loading the collection (#36307)

- issue: #36530

---------

Signed-off-by: SimFG <bang.fu@zilliz.com>
Signed-off-by: xianliang.li <xianliang.li@zilliz.com>
Co-authored-by: xianliang.li <xianliang.li@zilliz.com>
pull/36726/head
SimFG 2024-10-09 17:35:19 +08:00 committed by GitHub
parent c84bdfa766
commit 130a923dec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 677 additions and 103 deletions

View File

@ -681,4 +681,4 @@ struct fmt::formatter<milvus::OpType> : formatter<string_view> {
}
return formatter<string_view>::format(name, ctx);
}
};
};

View File

@ -0,0 +1,33 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
typedef struct LoadResourceRequest {
float max_memory_cost; //memory cost (GB) during loading
float max_disk_cost; // disk cost (GB) during loading
float final_memory_cost; // final memory (GB) cost after loading
float final_disk_cost; // final disk cost (GB) after loading
bool has_raw_data; // the filed contains raw data or not
} LoadResourceRequest;
#ifdef __cplusplus
}
#endif

View File

@ -28,6 +28,7 @@
#include "index/BoolIndex.h"
#include "index/InvertedIndexTantivy.h"
#include "index/HybridScalarIndex.h"
#include "knowhere/comp/knowhere_check.h"
namespace milvus::index {
@ -78,6 +79,209 @@ IndexFactory::CreatePrimitiveScalarIndex<std::string>(
#endif
}
LoadResourceRequest
IndexFactory::IndexLoadResource(
DataType field_type,
IndexVersion index_version,
float index_size,
std::map<std::string, std::string>& index_params,
bool mmap_enable) {
if (milvus::IsVectorDataType(field_type)) {
return VecIndexLoadResource(
field_type, index_version, index_size, index_params, mmap_enable);
} else {
return ScalarIndexLoadResource(
field_type, index_version, index_size, index_params, mmap_enable);
}
}
LoadResourceRequest
IndexFactory::VecIndexLoadResource(
DataType field_type,
IndexVersion index_version,
float index_size,
std::map<std::string, std::string>& index_params,
bool mmap_enable) {
auto config = milvus::index::ParseConfigFromIndexParams(index_params);
AssertInfo(index_params.find("index_type") != index_params.end(),
"index type is empty");
std::string index_type = index_params.at("index_type");
bool mmaped = false;
if (mmap_enable &&
knowhere::KnowhereCheck::SupportMmapIndexTypeCheck(index_type)) {
config["enable_mmap"] = true;
mmaped = true;
}
knowhere::expected<knowhere::Resource> resource;
float index_size_gb = index_size * 1.0 / 1024.0 / 1024.0 / 1024.0;
float download_buffer_size_gb =
DEFAULT_FIELD_MAX_MEMORY_LIMIT * 1.0 / 1024.0 / 1024.0 / 1024.0;
bool has_raw_data = false;
switch (field_type) {
case milvus::DataType::VECTOR_BINARY:
resource = knowhere::IndexStaticFaced<
knowhere::bin1>::EstimateLoadResource(index_type,
index_version,
index_size_gb,
config);
has_raw_data =
knowhere::IndexStaticFaced<knowhere::bin1>::HasRawData(
index_type, index_version, config);
break;
case milvus::DataType::VECTOR_FLOAT:
resource = knowhere::IndexStaticFaced<
knowhere::fp32>::EstimateLoadResource(index_type,
index_version,
index_size_gb,
config);
has_raw_data =
knowhere::IndexStaticFaced<knowhere::fp32>::HasRawData(
index_type, index_version, config);
break;
case milvus::DataType::VECTOR_FLOAT16:
resource = knowhere::IndexStaticFaced<
knowhere::fp16>::EstimateLoadResource(index_type,
index_version,
index_size_gb,
config);
has_raw_data =
knowhere::IndexStaticFaced<knowhere::fp16>::HasRawData(
index_type, index_version, config);
break;
case milvus::DataType::VECTOR_BFLOAT16:
resource = knowhere::IndexStaticFaced<
knowhere::bf16>::EstimateLoadResource(index_type,
index_version,
index_size_gb,
config);
has_raw_data =
knowhere::IndexStaticFaced<knowhere::bf16>::HasRawData(
index_type, index_version, config);
break;
case milvus::DataType::VECTOR_SPARSE_FLOAT:
resource = knowhere::IndexStaticFaced<
knowhere::fp32>::EstimateLoadResource(index_type,
index_version,
index_size_gb,
config);
has_raw_data =
knowhere::IndexStaticFaced<knowhere::fp32>::HasRawData(
index_type, index_version, config);
break;
default:
PanicInfo(
milvus::DataTypeInvalid,
fmt::format(
"invalid data type to estimate index load resource: {}",
field_type));
}
LoadResourceRequest request{};
request.has_raw_data = has_raw_data;
request.final_disk_cost = resource.value().diskCost;
request.final_memory_cost = resource.value().memoryCost;
if (knowhere::UseDiskLoad(index_type, index_version) || mmaped) {
request.max_disk_cost = resource.value().diskCost;
request.max_memory_cost =
std::max(resource.value().memoryCost, download_buffer_size_gb);
} else {
request.max_disk_cost = 0;
request.max_memory_cost = 2 * resource.value().memoryCost;
}
return request;
}
LoadResourceRequest
IndexFactory::ScalarIndexLoadResource(
DataType field_type,
IndexVersion index_version,
float index_size,
std::map<std::string, std::string>& index_params,
bool mmap_enable) {
auto config = milvus::index::ParseConfigFromIndexParams(index_params);
AssertInfo(index_params.find("index_type") != index_params.end(),
"index type is empty");
std::string index_type = index_params.at("index_type");
knowhere::expected<knowhere::Resource> resource;
float index_size_gb = index_size * 1.0 / 1024.0 / 1024.0 / 1024.0;
LoadResourceRequest request{};
request.has_raw_data = false;
if (index_type == milvus::index::ASCENDING_SORT) {
request.final_memory_cost = index_size_gb;
request.final_disk_cost = 0;
request.max_memory_cost = 2 * index_size_gb;
request.max_disk_cost = 0;
request.has_raw_data = true;
} else if (index_type == milvus::index::MARISA_TRIE ||
index_type == milvus::index::MARISA_TRIE_UPPER) {
if (mmap_enable) {
request.final_memory_cost = 0;
request.final_disk_cost = index_size_gb;
request.max_memory_cost = index_size_gb;
request.max_disk_cost = index_size_gb;
} else {
request.final_memory_cost = index_size_gb;
request.final_disk_cost = 0;
request.max_memory_cost = 2 * index_size_gb;
request.max_disk_cost = 0;
}
request.has_raw_data = true;
} else if (index_type == milvus::index::INVERTED_INDEX_TYPE) {
if (mmap_enable) {
request.final_memory_cost = 0;
request.final_disk_cost = index_size_gb;
request.max_memory_cost = index_size_gb;
request.max_disk_cost = index_size_gb;
} else {
request.final_memory_cost = index_size_gb;
request.final_disk_cost = 0;
request.max_memory_cost = 2 * index_size_gb;
request.max_disk_cost = 0;
}
request.has_raw_data = false;
} else if (index_type == milvus::index::BITMAP_INDEX_TYPE) {
if (mmap_enable) {
request.final_memory_cost = 0;
request.final_disk_cost = index_size_gb;
request.max_memory_cost = index_size_gb;
request.max_disk_cost = index_size_gb;
} else {
request.final_memory_cost = index_size_gb;
request.final_disk_cost = 0;
request.max_memory_cost = 2 * index_size_gb;
request.max_disk_cost = 0;
}
if (field_type == milvus::DataType::ARRAY) {
request.has_raw_data = false;
} else {
request.has_raw_data = true;
}
} else if (index_type == milvus::index::HYBRID_INDEX_TYPE) {
request.final_memory_cost = index_size_gb;
request.final_disk_cost = index_size_gb;
request.max_memory_cost = 2 * index_size_gb;
request.max_disk_cost = index_size_gb;
request.has_raw_data = false;
} else {
PanicInfo(milvus::UnexpectedError,
fmt::format("invalid index type to estimate scalar index "
"load resource: {}",
index_type));
}
return request;
}
IndexBasePtr
IndexFactory::CreateIndex(
const CreateIndexInfo& create_index_info,

View File

@ -32,6 +32,7 @@
#include "index/ScalarIndexSort.h"
#include "index/StringIndexMarisa.h"
#include "index/BoolIndex.h"
#include "segcore/load_index_c.h"
namespace milvus::index {
@ -51,6 +52,27 @@ class IndexFactory {
return instance;
}
LoadResourceRequest
IndexLoadResource(DataType field_type,
IndexVersion index_version,
float index_size,
std::map<std::string, std::string>& index_params,
bool mmap_enable);
LoadResourceRequest
VecIndexLoadResource(DataType field_type,
IndexVersion index_version,
float index_size,
std::map<std::string, std::string>& index_params,
bool mmap_enable);
LoadResourceRequest
ScalarIndexLoadResource(DataType field_type,
IndexVersion index_version,
float index_size,
std::map<std::string, std::string>& index_params,
bool mmap_enable);
IndexBasePtr
CreateIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context);

View File

@ -42,6 +42,7 @@ constexpr const char* METRIC_TYPE = "metric_type";
// scalar index type
constexpr const char* ASCENDING_SORT = "STL_SORT";
constexpr const char* MARISA_TRIE = "Trie";
constexpr const char* MARISA_TRIE_UPPER = "TRIE";
constexpr const char* INVERTED_INDEX_TYPE = "INVERTED";
constexpr const char* BITMAP_INDEX_TYPE = "BITMAP";
constexpr const char* HYBRID_INDEX_TYPE = "HYBRID";

View File

@ -203,7 +203,7 @@ StringIndexMarisa::LoadWithoutAssemble(const BinarySet& set,
}
file.Seek(0, SEEK_SET);
if (config.contains(ENABLE_MMAP)) {
if (config.contains(MMAP_FILE_PATH)) {
trie_.mmap(file_name.c_str());
} else {
trie_.read(file.Descriptor());

View File

@ -1927,6 +1927,7 @@ SegmentSealedImpl::generate_interim_index(const FieldId field_id) {
return false;
}
// check data type
// TODO: QianYa when add other data type, please check the SupportInterimIndexDataType method in the go code
if (field_meta.get_data_type() != DataType::VECTOR_FLOAT &&
!is_sparse) {
return false;

View File

@ -47,6 +47,7 @@ struct LoadIndexInfo {
int64_t index_store_version;
IndexVersion index_engine_version;
proto::schema::FieldSchema schema;
int64_t index_size;
};
} // namespace milvus::segcore

View File

@ -26,6 +26,8 @@
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/LocalChunkManagerSingleton.h"
#include "pb/cgo_msg.pb.h"
#include "knowhere/index/index_static.h"
#include "knowhere/comp/knowhere_check.h"
bool
IsLoadWithDisk(const char* index_type, int index_engine_version) {
@ -204,6 +206,35 @@ appendScalarIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
}
}
LoadResourceRequest
EstimateLoadIndexResource(CLoadIndexInfo c_load_index_info) {
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
auto field_type = load_index_info->field_type;
auto& index_params = load_index_info->index_params;
bool find_index_type =
index_params.count("index_type") > 0 ? true : false;
AssertInfo(find_index_type == true,
"Can't find index type in index_params");
LoadResourceRequest request =
milvus::index::IndexFactory::GetInstance().IndexLoadResource(
field_type,
load_index_info->index_engine_version,
load_index_info->index_size,
index_params,
load_index_info->enable_mmap);
return request;
} catch (std::exception& e) {
PanicInfo(milvus::UnexpectedError,
fmt::format("failed to estimate index load resource, "
"encounter exception : {}",
e.what()));
return LoadResourceRequest{0, 0, 0, 0, false};
}
}
CStatus
AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
@ -288,6 +319,7 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
std::to_string(load_index_info->segment_id) /
std::to_string(load_index_info->field_id);
config[milvus::index::ENABLE_MMAP] = "true";
config[milvus::index::MMAP_FILE_PATH] = filepath.string();
}
@ -450,6 +482,7 @@ FinishLoadIndexInfo(CLoadIndexInfo c_load_index_info,
load_index_info->index_engine_version =
info_proto->index_engine_version();
load_index_info->schema = info_proto->field();
load_index_info->index_size = info_proto->index_file_size();
}
auto status = CStatus();
status.error_code = milvus::Success;

View File

@ -17,6 +17,7 @@ extern "C" {
#include <stdint.h>
#include <stdlib.h>
#include "common/resource_c.h"
#include "common/binary_set_c.h"
#include "common/type_c.h"
#include "segcore/collection_c.h"
@ -47,6 +48,9 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info,
bool enable_mmap,
const char* mmap_dir_path);
LoadResourceRequest
EstimateLoadIndexResource(CLoadIndexInfo c_load_index_info);
CStatus
AppendIndexInfo(CLoadIndexInfo c_load_index_info,
int64_t index_id,

View File

@ -41,6 +41,7 @@ set(MILVUS_TEST_FILES
test_concurrent_vector.cpp
test_c_stream_reduce.cpp
test_c_tokenizer.cpp
test_loading.cpp
test_data_codec.cpp
test_disk_file_manager_test.cpp
test_exec.cpp

View File

@ -0,0 +1,208 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <arrow/record_batch.h>
#include <gtest/gtest.h>
#include <iostream>
#include <memory>
#include <random>
#include <string>
#include <vector>
#include <map>
#include "segcore/Types.h"
#include "knowhere/version.h"
#include "knowhere/comp/index_param.h"
#include "segcore/load_index_c.h"
using Param =
std::pair<std::map<std::string, std::string>, LoadResourceRequest>;
class IndexLoadTest : public ::testing::TestWithParam<Param> {
protected:
void
SetUp() override {
auto param = GetParam();
index_params = param.first;
ASSERT_TRUE(index_params.find("index_type") != index_params.end());
index_type = index_params["index_type"];
enable_mmap = index_params.find("mmap") != index_params.end() &&
index_params["mmap"] == "true";
std::string field_type = index_params["field_type"];
ASSERT_TRUE(field_type.size() > 0);
if (field_type == "vector_float") {
data_type = milvus::DataType::VECTOR_FLOAT;
} else if (field_type == "vector_bf16") {
data_type = milvus::DataType::VECTOR_BFLOAT16;
} else if (field_type == "vector_fp16") {
data_type = milvus::DataType::VECTOR_FLOAT16;
} else if (field_type == "vector_binary") {
data_type = milvus::DataType::VECTOR_BINARY;
} else if (field_type == "vector_sparse_float") {
data_type = milvus::DataType::VECTOR_SPARSE_FLOAT;
} else if (field_type == "array") {
data_type = milvus::DataType::ARRAY;
} else {
data_type = milvus::DataType::STRING;
}
expected = param.second;
}
void
TearDown() override {
}
protected:
std::string index_type;
std::map<std::string, std::string> index_params;
bool enable_mmap;
milvus::DataType data_type;
LoadResourceRequest expected;
};
INSTANTIATE_TEST_SUITE_P(
IndexTypeLoadInfo,
IndexLoadTest,
::testing::Values(
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "HNSW"},
{"metric_type", "L2"},
{"efConstrcution", "300"},
{"M", "30"},
{"mmap", "false"},
{"field_type", "vector_float"}},
{2.0f, 0.0f, 1.0f, 0.0f, true}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "HNSW"},
{"metric_type", "L2"},
{"efConstrcution", "300"},
{"M", "30"},
{"mmap", "true"},
{"field_type", "vector_float"}},
{0.125f, 1.0f, 0.0f, 1.0f, true}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "HNSW"},
{"metric_type", "L2"},
{"efConstrcution", "300"},
{"M", "30"},
{"mmap", "false"},
{"field_type", "vector_bf16"}},
{2.0f, 0.0f, 1.0f, 0.0f, true}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "HNSW"},
{"metric_type", "L2"},
{"efConstrcution", "300"},
{"M", "30"},
{"mmap", "true"},
{"field_type", "vector_fp16"}},
{0.125f, 1.0f, 0.0f, 1.0f, true}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "IVFFLAT"},
{"metric_type", "L2"},
{"nlist", "1024"},
{"mmap", "false"},
{"field_type", "vector_float"}},
{2.0f, 0.0f, 1.0f, 0.0f, true}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "IVFSQ"},
{"metric_type", "L2"},
{"nlist", "1024"},
{"mmap", "false"},
{"field_type", "vector_float"}},
{2.0f, 0.0f, 1.0f, 0.0f, false}),
#ifdef BUILD_DISK_ANN
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "DISKANN"},
{"metric_type", "L2"},
{"nlist", "1024"},
{"mmap", "false"},
{"field_type", "vector_float"}},
{0.25f, 1.0f, 0.25f, 1.0f, true}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "DISKANN"},
{"metric_type", "IP"},
{"nlist", "1024"},
{"mmap", "false"},
{"field_type", "vector_float"}},
{0.25f, 1.0f, 0.25f, 1.0f, false}),
#endif
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "STL_SORT"},
{"mmap", "false"},
{"field_type", "string"}},
{2.0f, 0.0f, 1.0f, 0.0f, true}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "TRIE"},
{"mmap", "false"},
{"field_type", "string"}},
{2.0f, 0.0f, 1.0f, 0.0f, true}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "TRIE"},
{"mmap", "true"},
{"field_type", "string"}},
{1.0f, 1.0f, 0.0f, 1.0f, true}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "INVERTED"},
{"mmap", "false"},
{"field_type", "string"}},
{2.0f, 0.0f, 1.0f, 0.0f, false}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "INVERTED"},
{"mmap", "true"},
{"field_type", "string"}},
{1.0f, 1.0f, 0.0f, 1.0f, false}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "BITMAP"},
{"mmap", "false"},
{"field_type", "string"}},
{2.0f, 0.0f, 1.0f, 0.0f, true}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "BITMAP"},
{"mmap", "true"},
{"field_type", "array"}},
{1.0f, 1.0f, 0.0f, 1.0f, false}),
std::pair<std::map<std::string, std::string>, LoadResourceRequest>(
{{"index_type", "HYBRID"},
{"mmap", "true"},
{"field_type", "string"}},
{2.0f, 1.0f, 1.0f, 1.0f, false})));
TEST_P(IndexLoadTest, ResourceEstimate) {
milvus::segcore::LoadIndexInfo loadIndexInfo;
loadIndexInfo.collection_id = 1;
loadIndexInfo.partition_id = 2;
loadIndexInfo.segment_id = 3;
loadIndexInfo.field_id = 4;
loadIndexInfo.field_type = data_type;
loadIndexInfo.enable_mmap = enable_mmap;
loadIndexInfo.mmap_dir_path = "/tmp/mmap";
loadIndexInfo.index_id = 5;
loadIndexInfo.index_build_id = 6;
loadIndexInfo.index_version = 1;
loadIndexInfo.index_params = index_params;
loadIndexInfo.index_files = {"/tmp/index/1"};
loadIndexInfo.index = nullptr;
loadIndexInfo.uri = "";
loadIndexInfo.index_store_version = 1;
loadIndexInfo.index_engine_version =
knowhere::Version::GetCurrentVersion().VersionNumber();
loadIndexInfo.index_size = 1024 * 1024 * 1024; // 1G index size
LoadResourceRequest request = EstimateLoadIndexResource(&loadIndexInfo);
ASSERT_EQ(request.has_raw_data, expected.has_raw_data);
ASSERT_EQ(request.final_memory_cost, expected.final_memory_cost);
ASSERT_EQ(request.final_disk_cost, expected.final_disk_cost);
ASSERT_EQ(request.max_memory_cost, expected.max_memory_cost);
ASSERT_EQ(request.max_disk_cost, expected.max_disk_cost);
}

View File

@ -24,8 +24,6 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
@ -41,6 +39,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
)
func TestClusteringCompactionTaskSuite(t *testing.T) {

View File

@ -1807,7 +1807,6 @@ func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb.
}
if _, ok := s.stateChange[level.String()][state.String()]; !ok {
s.stateChange[level.String()][state.String()] = make(map[string]int)
}
s.stateChange[level.String()][state.String()][getSortStatus(isSorted)] += 1

View File

@ -20,4 +20,5 @@ message LoadIndexInfo {
string uri = 13;
int64 index_store_version = 14;
int32 index_engine_version = 15;
int64 index_file_size = 16;
}

View File

@ -173,7 +173,7 @@ func (li *LoadIndexInfo) appendIndexEngineVersion(ctx context.Context, indexEngi
return HandleCStatus(ctx, &status, "AppendIndexEngineVersion failed")
}
func (li *LoadIndexInfo) finish(ctx context.Context, info *cgopb.LoadIndexInfo) error {
func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, info *cgopb.LoadIndexInfo) error {
marshaled, err := proto.Marshal(info)
if err != nil {
return err
@ -185,10 +185,11 @@ func (li *LoadIndexInfo) finish(ctx context.Context, info *cgopb.LoadIndexInfo)
return nil, nil
}).Await()
if err := HandleCStatus(ctx, &status, "FinishLoadIndexInfo failed"); err != nil {
return err
}
return HandleCStatus(ctx, &status, "FinishLoadIndexInfo failed")
}
func (li *LoadIndexInfo) loadIndex(ctx context.Context) error {
var status C.CStatus
_, _ = GetLoadPool().Submit(func() (any, error) {
traceCtx := ParseCTraceContext(ctx)
status = C.AppendIndexV2(traceCtx.ctx, li.cLoadIndexInfo)

View File

@ -897,10 +897,10 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun
log.Info("start loading field data for field")
loadFieldDataInfo, err := newLoadFieldDataInfo(ctx)
defer deleteFieldDataInfo(loadFieldDataInfo)
if err != nil {
return err
}
defer deleteFieldDataInfo(loadFieldDataInfo)
err = loadFieldDataInfo.appendLoadFieldInfo(ctx, fieldID, rowCount)
if err != nil {
@ -1074,26 +1074,12 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
return nil
}
func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType) error {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", indexInfo.GetFieldID()),
zap.Int64("indexID", indexInfo.GetIndexID()),
)
old := s.GetIndex(indexInfo.GetFieldID())
// the index loaded
if old != nil && old.IndexInfo.GetIndexID() == indexInfo.GetIndexID() && old.IsLoaded {
log.Warn("index already loaded")
return nil
}
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadIndex-%d-%d", s.ID(), indexInfo.GetFieldID()))
defer sp.End()
tr := timerecord.NewTimeRecorder("loadIndex")
func GetCLoadInfoWithFunc(ctx context.Context,
fieldSchema *schemapb.FieldSchema,
s *querypb.SegmentLoadInfo,
indexInfo *querypb.FieldIndexInfo,
f func(c *LoadIndexInfo) error,
) error {
// 1.
loadIndexInfo, err := newLoadIndexInfo(ctx)
if err != nil {
@ -1101,15 +1087,6 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
}
defer deleteLoadIndexInfo(loadIndexInfo)
schema, err := typeutil.CreateSchemaHelper(s.GetCollection().Schema())
if err != nil {
return err
}
fieldSchema, err := schema.GetFieldFromID(indexInfo.GetFieldID())
if err != nil {
return err
}
indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams)
// as Knowhere reports error if encounter an unknown param, we need to delete it
delete(indexParams, common.MmapEnabledKey)
@ -1133,9 +1110,9 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
enableMmap := isIndexMmapEnable(fieldSchema, indexInfo)
indexInfoProto := &cgopb.LoadIndexInfo{
CollectionID: s.Collection(),
PartitionID: s.Partition(),
SegmentID: s.ID(),
CollectionID: s.GetCollectionID(),
PartitionID: s.GetPartitionID(),
SegmentID: s.GetSegmentID(),
Field: fieldSchema,
EnableMmap: enableMmap,
MmapDirPath: paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue(),
@ -1146,46 +1123,100 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
IndexFiles: indexInfo.GetIndexFilePaths(),
IndexEngineVersion: indexInfo.GetCurrentIndexVersion(),
IndexStoreVersion: indexInfo.GetIndexStoreVersion(),
IndexFileSize: indexInfo.GetIndexSize(),
}
newLoadIndexInfoSpan := tr.RecordSpan()
// 2.
if err := loadIndexInfo.finish(ctx, indexInfoProto); err != nil {
if loadIndexInfo.cleanLocalData(ctx) != nil {
log.Warn("failed to clean cached data on disk after append index failed",
zap.Int64("buildID", indexInfo.BuildID),
zap.Int64("index version", indexInfo.IndexVersion))
}
if err := loadIndexInfo.appendLoadIndexInfo(ctx, indexInfoProto); err != nil {
log.Warn("fail to append load index info", zap.Error(err))
return err
}
if s.Type() != SegmentTypeSealed {
errMsg := fmt.Sprintln("updateSegmentIndex failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID())
return errors.New(errMsg)
}
appendLoadIndexInfoSpan := tr.RecordSpan()
return f(loadIndexInfo)
}
// 3.
err = s.UpdateIndexInfo(ctx, indexInfo, loadIndexInfo)
if err != nil {
return err
}
updateIndexInfoSpan := tr.RecordSpan()
if !typeutil.IsVectorType(fieldType) || s.HasRawData(indexInfo.GetFieldID()) {
func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType) error {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", indexInfo.GetFieldID()),
zap.Int64("indexID", indexInfo.GetIndexID()),
)
old := s.GetIndex(indexInfo.GetFieldID())
// the index loaded
if old != nil && old.IndexInfo.GetIndexID() == indexInfo.GetIndexID() && old.IsLoaded {
log.Warn("index already loaded")
return nil
}
// 4.
mmapChunkCache := paramtable.Get().QueryNodeCfg.MmapChunkCache.GetAsBool()
s.WarmupChunkCache(ctx, indexInfo.GetFieldID(), mmapChunkCache)
warmupChunkCacheSpan := tr.RecordSpan()
log.Info("Finish loading index",
zap.Duration("newLoadIndexInfoSpan", newLoadIndexInfoSpan),
zap.Duration("appendLoadIndexInfoSpan", appendLoadIndexInfoSpan),
zap.Duration("updateIndexInfoSpan", updateIndexInfoSpan),
zap.Duration("warmupChunkCacheSpan", warmupChunkCacheSpan),
)
return nil
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadIndex-%d-%d", s.ID(), indexInfo.GetFieldID()))
defer sp.End()
tr := timerecord.NewTimeRecorder("loadIndex")
schemaHelper, err := typeutil.CreateSchemaHelper(s.GetCollection().Schema())
if err != nil {
return err
}
fieldSchema, err := schemaHelper.GetFieldFromID(indexInfo.GetFieldID())
if err != nil {
return err
}
return s.innerLoadIndex(ctx, fieldSchema, indexInfo, tr, fieldType)
}
func (s *LocalSegment) innerLoadIndex(ctx context.Context,
fieldSchema *schemapb.FieldSchema,
indexInfo *querypb.FieldIndexInfo,
tr *timerecord.TimeRecorder,
fieldType schemapb.DataType,
) error {
err := GetCLoadInfoWithFunc(ctx, fieldSchema,
s.LoadInfo(), indexInfo, func(loadIndexInfo *LoadIndexInfo) error {
newLoadIndexInfoSpan := tr.RecordSpan()
if err := loadIndexInfo.loadIndex(ctx); err != nil {
if loadIndexInfo.cleanLocalData(ctx) != nil {
log.Warn("failed to clean cached data on disk after append index failed",
zap.Int64("buildID", indexInfo.BuildID),
zap.Int64("index version", indexInfo.IndexVersion))
}
return err
}
if s.Type() != SegmentTypeSealed {
errMsg := fmt.Sprintln("updateSegmentIndex failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID())
return errors.New(errMsg)
}
appendLoadIndexInfoSpan := tr.RecordSpan()
// 3.
err := s.UpdateIndexInfo(ctx, indexInfo, loadIndexInfo)
if err != nil {
return err
}
updateIndexInfoSpan := tr.RecordSpan()
if !typeutil.IsVectorType(fieldType) || s.HasRawData(indexInfo.GetFieldID()) {
return nil
}
// 4.
mmapChunkCache := paramtable.Get().QueryNodeCfg.MmapChunkCache.GetAsBool()
s.WarmupChunkCache(ctx, indexInfo.GetFieldID(), mmapChunkCache)
warmupChunkCacheSpan := tr.RecordSpan()
log.Info("Finish loading index",
zap.Duration("newLoadIndexInfoSpan", newLoadIndexInfoSpan),
zap.Duration("appendLoadIndexInfoSpan", appendLoadIndexInfoSpan),
zap.Duration("updateIndexInfoSpan", updateIndexInfoSpan),
zap.Duration("warmupChunkCacheSpan", warmupChunkCacheSpan),
)
return nil
})
if err != nil {
log.Warn("load index failed", zap.Error(err))
}
return err
}
func (s *LocalSegment) LoadTextIndex(ctx context.Context, textLogs *datapb.TextIndexStats, schemaHelper *typeutil.SchemaHelper) error {

View File

@ -49,6 +49,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/contextutil"
"github.com/milvus-io/milvus/pkg/util/funcutil"
@ -88,6 +89,24 @@ type Loader interface {
) error
}
type ResourceEstimate struct {
MaxMemoryCost uint64
MaxDiskCost uint64
FinalMemoryCost uint64
FinalDiskCost uint64
HasRawData bool
}
func GetResourceEstimate(estimate *C.LoadResourceRequest) ResourceEstimate {
return ResourceEstimate{
MaxMemoryCost: uint64(float64(estimate.max_memory_cost) * util.GB),
MaxDiskCost: uint64(float64(estimate.max_disk_cost) * util.GB),
FinalMemoryCost: uint64(float64(estimate.final_memory_cost) * util.GB),
FinalDiskCost: uint64(float64(estimate.final_disk_cost) * util.GB),
HasRawData: bool(estimate.has_raw_data),
}
}
type requestResourceResult struct {
Resource LoadResource
CommittedResource LoadResource
@ -1283,6 +1302,9 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
log.Warn("failed to create schema helper", zap.String("name", schema.GetName()), zap.Error(err))
return nil, err
}
calculateDataSizeCount := 0
ctx := context.Background()
for _, fieldBinlog := range loadInfo.BinlogPaths {
fieldID := fieldBinlog.FieldID
var mmapEnabled bool
@ -1293,43 +1315,48 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
return nil, err
}
binlogSize := uint64(getBinlogDataMemorySize(fieldBinlog))
shouldCalculateDataSize := false
if fieldIndexInfo, ok := fieldID2IndexInfo[fieldID]; ok {
mmapEnabled = isIndexMmapEnable(fieldSchema, fieldIndexInfo)
neededMemSize, neededDiskSize, err := getIndexAttrCache().GetIndexResourceUsage(fieldIndexInfo, multiplyFactor.memoryIndexUsageFactor, fieldBinlog)
var estimateResult ResourceEstimate
err := GetCLoadInfoWithFunc(ctx, fieldSchema, loadInfo, fieldIndexInfo, func(c *LoadIndexInfo) error {
loadResourceRequest := C.EstimateLoadIndexResource(c.cLoadIndexInfo)
estimateResult = GetResourceEstimate(&loadResourceRequest)
return nil
})
if err != nil {
return nil, errors.Wrapf(err, "failed to get index size collection %d, segment %d, indexBuildID %d",
return nil, errors.Wrapf(err, "failed to estimate resource usage of index, collection %d, segment %d, indexBuildID %d",
loadInfo.GetCollectionID(),
loadInfo.GetSegmentID(),
fieldIndexInfo.GetBuildID())
}
indexMemorySize += neededMemSize
if mmapEnabled {
segmentDiskSize += neededMemSize + neededDiskSize
} else {
segmentDiskSize += neededDiskSize
}
if !hasRawData(fieldIndexInfo) {
dataMmapEnable := isDataMmapEnable(fieldSchema)
segmentMemorySize += binlogSize
if dataMmapEnable {
segmentDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog))
} else {
segmentMemorySize += binlogSize
}
indexMemorySize += estimateResult.MaxMemoryCost
segmentDiskSize += estimateResult.MaxDiskCost
if !estimateResult.HasRawData {
shouldCalculateDataSize = true
}
} else {
shouldCalculateDataSize = true
}
if shouldCalculateDataSize {
calculateDataSizeCount += 1
mmapEnabled = isDataMmapEnable(fieldSchema)
segmentMemorySize += binlogSize
if mmapEnabled {
segmentDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog))
} else {
if multiplyFactor.enableTempSegmentIndex {
if !mmapEnabled || common.IsSystemField(fieldSchema.GetFieldID()) {
segmentMemorySize += binlogSize
if multiplyFactor.enableTempSegmentIndex && SupportInterimIndexDataType(fieldSchema.GetDataType()) {
segmentMemorySize += uint64(float64(binlogSize) * multiplyFactor.tempSegmentIndexFactor)
}
if DoubleMemorySystemField(fieldSchema.GetFieldID()) || DoubleMemoryDataType(fieldSchema.GetDataType()) {
segmentMemorySize += binlogSize
}
} else {
segmentDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog))
}
}
if mmapEnabled {
mmapFieldCount++
}
@ -1340,9 +1367,6 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
segmentMemorySize += uint64(getBinlogDataMemorySize(fieldBinlog))
}
// binlog & statslog use general load factor
segmentMemorySize = uint64(float64(segmentMemorySize) * multiplyFactor.memoryUsageFactor)
// get size of delete data
for _, fieldBinlog := range loadInfo.Deltalogs {
// MemorySize of filedBinlog is the actual size in memory, so the expansionFactor
@ -1365,6 +1389,21 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
}, nil
}
func DoubleMemoryDataType(dataType schemapb.DataType) bool {
return dataType == schemapb.DataType_String ||
dataType == schemapb.DataType_VarChar ||
dataType == schemapb.DataType_JSON
}
func DoubleMemorySystemField(fieldID int64) bool {
return fieldID == common.TimeStampField
}
func SupportInterimIndexDataType(dataType schemapb.DataType) bool {
return dataType == schemapb.DataType_FloatVector ||
dataType == schemapb.DataType_SparseFloatVector
}
func (loader *segmentLoader) getFieldType(collectionID, fieldID int64) (schemapb.DataType, error) {
collection := loader.manager.Collection.Get(collectionID)
if collection == nil {

View File

@ -284,8 +284,3 @@ func isDataMmapEnable(fieldSchema *schemapb.FieldSchema) bool {
}
return params.Params.QueryNodeCfg.MmapScalarField.GetAsBool()
}
func hasRawData(indexInfo *querypb.FieldIndexInfo) bool {
log.Warn("hasRawData is not implemented, please check it", zap.Int64("field_id", indexInfo.FieldID))
return true
}

View File

@ -73,6 +73,7 @@ const (
RoleConfigPrivilege = "privilege"
MaxEtcdTxnNum = 128
GB = 1024 * 1024 * 1024
)
const (