fix: [2.5]Add bypass thread pool mode to avoid growing indexes blocking insert/load (#41013)

issue: #40825 
related: #41012
pr: #41012

Signed-off-by: xianliang.li <xianliang.li@zilliz.com>
pull/41071/head
foxspy 2025-04-02 16:58:25 +08:00 committed by GitHub
parent 0d3bd3131c
commit a906466d8f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 171 additions and 23 deletions

View File

@ -297,9 +297,11 @@ IndexFactory::ScalarIndexLoadResource(
IndexBasePtr
IndexFactory::CreateIndex(
const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context) {
const storage::FileManagerContext& file_manager_context,
bool use_build_pool) {
if (IsVectorDataType(create_index_info.field_type)) {
return CreateVectorIndex(create_index_info, file_manager_context);
return CreateVectorIndex(
create_index_info, file_manager_context, use_build_pool);
}
return CreateScalarIndex(create_index_info, file_manager_context);
@ -436,7 +438,8 @@ IndexFactory::CreateScalarIndex(
IndexBasePtr
IndexFactory::CreateVectorIndex(
const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context) {
const storage::FileManagerContext& file_manager_context,
bool use_knowhere_build_pool) {
auto index_type = create_index_info.index_type;
auto metric_type = create_index_info.metric_type;
auto version = create_index_info.index_engine_version;
@ -475,19 +478,35 @@ IndexFactory::CreateVectorIndex(
case DataType::VECTOR_FLOAT:
case DataType::VECTOR_SPARSE_FLOAT: {
return std::make_unique<VectorMemIndex<float>>(
index_type, metric_type, version, file_manager_context);
index_type,
metric_type,
version,
use_knowhere_build_pool,
file_manager_context);
}
case DataType::VECTOR_BINARY: {
return std::make_unique<VectorMemIndex<bin1>>(
index_type, metric_type, version, file_manager_context);
index_type,
metric_type,
version,
use_knowhere_build_pool,
file_manager_context);
}
case DataType::VECTOR_FLOAT16: {
return std::make_unique<VectorMemIndex<float16>>(
index_type, metric_type, version, file_manager_context);
index_type,
metric_type,
version,
use_knowhere_build_pool,
file_manager_context);
}
case DataType::VECTOR_BFLOAT16: {
return std::make_unique<VectorMemIndex<bfloat16>>(
index_type, metric_type, version, file_manager_context);
index_type,
metric_type,
version,
use_knowhere_build_pool,
file_manager_context);
}
default:
PanicInfo(

View File

@ -77,11 +77,13 @@ class IndexFactory {
IndexBasePtr
CreateIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context);
const storage::FileManagerContext& file_manager_context,
bool use_build_pool = true);
IndexBasePtr
CreateVectorIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context);
const storage::FileManagerContext& file_manager_context,
bool use_knowhere_build_pool_ = true);
// For base types like int, float, double, string, etc
IndexBasePtr

View File

@ -60,8 +60,10 @@ VectorMemIndex<T>::VectorMemIndex(
const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
bool use_knowhere_build_pool,
const storage::FileManagerContext& file_manager_context)
: VectorIndex(index_type, metric_type) {
: VectorIndex(index_type, metric_type),
use_knowhere_build_pool_(use_knowhere_build_pool) {
CheckMetricTypeSupport<T>(metric_type);
AssertInfo(!is_unsupported(index_type, metric_type),
index_type + " doesn't support metric: " + metric_type);
@ -88,8 +90,10 @@ template <typename T>
VectorMemIndex<T>::VectorMemIndex(const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
bool use_knowhere_build_pool,
const knowhere::ViewDataOp view_data)
: VectorIndex(index_type, metric_type) {
: VectorIndex(index_type, metric_type),
use_knowhere_build_pool_(use_knowhere_build_pool) {
CheckMetricTypeSupport<T>(metric_type);
AssertInfo(!is_unsupported(index_type, metric_type),
index_type + " doesn't support metric: " + metric_type);
@ -292,7 +296,7 @@ VectorMemIndex<T>::BuildWithDataset(const DatasetPtr& dataset,
SetDim(dataset->GetDim());
knowhere::TimeRecorder rc("BuildWithoutIds", 1);
auto stat = index_.Build(dataset, index_config);
auto stat = index_.Build(dataset, index_config, use_knowhere_build_pool_);
if (stat != knowhere::Status::success)
PanicInfo(ErrorCode::IndexBuildError,
"failed to build index, " + KnowhereStatusString(stat));
@ -395,7 +399,7 @@ VectorMemIndex<T>::AddWithDataset(const DatasetPtr& dataset,
index_config.update(config);
knowhere::TimeRecorder rc("AddWithDataset", 1);
auto stat = index_.Add(dataset, index_config);
auto stat = index_.Add(dataset, index_config, use_knowhere_build_pool_);
if (stat != knowhere::Status::success)
PanicInfo(ErrorCode::IndexBuildError,
"failed to append index, " + KnowhereStatusString(stat));

View File

@ -28,7 +28,9 @@
#include "index/IndexInfo.h"
namespace milvus::index {
// TODO : growing index should be isolated from VectorMemIndex
// For general index, it should not suppport AddWithDataset etc.
// For growing index, it should suppport AddWithDataset etc.
template <typename T>
class VectorMemIndex : public VectorIndex {
public:
@ -36,6 +38,7 @@ class VectorMemIndex : public VectorIndex {
const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
bool use_knowhere_build_pool = true,
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());
@ -43,6 +46,7 @@ class VectorMemIndex : public VectorIndex {
VectorMemIndex(const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
bool use_knowhere_build_pool,
const knowhere::ViewDataOp view_data);
BinarySet
@ -106,6 +110,7 @@ class VectorMemIndex : public VectorIndex {
std::shared_ptr<storage::MemFileManagerImpl> file_manager_;
CreateIndexInfo create_index_info_;
bool use_knowhere_build_pool_;
};
template <typename T>

View File

@ -2065,6 +2065,7 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id) {
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
view_data);
} else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) {
vec_index =
@ -2072,6 +2073,7 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id) {
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
view_data);
} else if (field_meta.get_data_type() ==
DataType::VECTOR_BFLOAT16) {
@ -2080,13 +2082,15 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id) {
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
view_data);
}
} else {
vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber());
knowhere::Version::GetCurrentVersion().VersionNumber(),
false);
}
if (vec_index == nullptr) {
LOG_INFO("fail to generate intermin index, invalid data type.");

View File

@ -49,7 +49,8 @@ VectorFieldIndexing::recreate_index(DataType data_type,
index_ = std::make_unique<index::VectorMemIndex<float>>(
config_->GetIndexType(),
config_->GetMetricType(),
knowhere::Version::GetCurrentVersion().VersionNumber());
knowhere::Version::GetCurrentVersion().VersionNumber(),
false);
} else if (data_type == DataType::VECTOR_FLOAT) {
auto concurrent_fp32_vec =
reinterpret_cast<const ConcurrentVector<FloatVector>*>(
@ -65,6 +66,7 @@ VectorFieldIndexing::recreate_index(DataType data_type,
config_->GetIndexType(),
config_->GetMetricType(),
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
view_data);
} else if (data_type == DataType::VECTOR_FLOAT16) {
auto concurrent_fp16_vec =
@ -81,6 +83,7 @@ VectorFieldIndexing::recreate_index(DataType data_type,
config_->GetIndexType(),
config_->GetMetricType(),
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
view_data);
} else if (data_type == DataType::VECTOR_BFLOAT16) {
auto concurrent_bf16_vec =
@ -97,6 +100,7 @@ VectorFieldIndexing::recreate_index(DataType data_type,
config_->GetIndexType(),
config_->GetMetricType(),
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
view_data);
}
}
@ -126,17 +130,20 @@ VectorFieldIndexing::BuildIndexRange(int64_t ack_beg,
indexing = std::make_unique<index::VectorMemIndex<float>>(
knowhere::IndexEnum::INDEX_FAISS_IVFFLAT,
knowhere::metric::L2,
knowhere::Version::GetCurrentVersion().VersionNumber());
knowhere::Version::GetCurrentVersion().VersionNumber(),
false);
} else if (field_meta_.get_data_type() == DataType::VECTOR_FLOAT16) {
indexing = std::make_unique<index::VectorMemIndex<float16>>(
knowhere::IndexEnum::INDEX_FAISS_IVFFLAT,
knowhere::metric::L2,
knowhere::Version::GetCurrentVersion().VersionNumber());
knowhere::Version::GetCurrentVersion().VersionNumber(),
false);
} else {
indexing = std::make_unique<index::VectorMemIndex<bfloat16>>(
knowhere::IndexEnum::INDEX_FAISS_IVFFLAT,
knowhere::metric::L2,
knowhere::Version::GetCurrentVersion().VersionNumber());
knowhere::Version::GetCurrentVersion().VersionNumber(),
false);
}
auto dataset = knowhere::GenDataSet(
vec_base->get_size_per_chunk(), dim, chunk_data);

View File

@ -1966,6 +1966,7 @@ SegmentSealedImpl::generate_interim_index(const FieldId field_id) {
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
view_data);
} else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) {
data_size = dim * sizeof(knowhere::fp16);
@ -1980,6 +1981,7 @@ SegmentSealedImpl::generate_interim_index(const FieldId field_id) {
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
view_data);
} else if (field_meta.get_data_type() ==
DataType::VECTOR_BFLOAT16) {
@ -1995,13 +1997,15 @@ SegmentSealedImpl::generate_interim_index(const FieldId field_id) {
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
view_data);
}
} else {
vec_index = std::make_unique<index::VectorMemIndex<float>>(
field_binlog_config->GetIndexType(),
index_metric,
knowhere::Version::GetCurrentVersion().VersionNumber());
knowhere::Version::GetCurrentVersion().VersionNumber(),
false);
}
if (vec_index == nullptr) {
LOG_INFO("fail to generate intermin index, invalid data type.");

View File

@ -14,7 +14,7 @@
# Update KNOWHERE_VERSION for the first occurrence
milvus_add_pkg_config("knowhere")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( KNOWHERE_VERSION v2.5.4 )
set( KNOWHERE_VERSION v2.5.5 )
set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git")
message(STATUS "Knowhere repo: ${GIT_REPOSITORY}")
message(STATUS "Knowhere version: ${KNOWHERE_VERSION}")

View File

@ -19,6 +19,8 @@
#include "segcore/SegmentGrowing.h"
#include "segcore/SegmentGrowingImpl.h"
#include "test_utils/DataGen.h"
#include "index/IndexFactory.h"
#include "test_utils/indexbuilder_test_utils.h"
using namespace milvus;
using namespace milvus::segcore;
@ -292,6 +294,97 @@ TEST_P(GrowingIndexTest, Correctness) {
}
}
TEST_P(GrowingIndexTest, AddWithoutBuildPool) {
constexpr int N = 1024;
constexpr int TOPK = 100;
constexpr int dim = 128;
constexpr int add_cont = 5;
milvus::index::CreateIndexInfo create_index_info;
create_index_info.field_type = data_type;
create_index_info.metric_type = metric_type;
create_index_info.index_type = index_type;
create_index_info.index_engine_version =
knowhere::Version::GetCurrentVersion().VersionNumber();
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);
auto random = schema->AddDebugField("random", DataType::DOUBLE);
auto vec = schema->AddDebugField("embeddings", data_type, 128, metric_type);
schema->set_primary_field_id(pk);
auto dataset = DataGen(schema, N);
auto build_config = generate_build_conf(index_type, metric_type);
if (data_type == DataType::VECTOR_FLOAT) {
auto index = std::make_unique<milvus::index::VectorMemIndex<float>>(
index_type,
metric_type,
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
milvus::storage::FileManagerContext());
auto float_data = dataset.get_col<float>(vec);
index->BuildWithDataset(knowhere::GenDataSet(N, dim, float_data.data()),
build_config);
for (int i = 0; i < add_cont; i++) {
index->AddWithDataset(
knowhere::GenDataSet(N, dim, float_data.data()), build_config);
}
EXPECT_EQ(index->Count(), (add_cont + 1) * N);
} else if (data_type == DataType::VECTOR_FLOAT16) {
auto index = std::make_unique<milvus::index::VectorMemIndex<float16>>(
index_type,
metric_type,
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
milvus::storage::FileManagerContext());
auto float16_data = dataset.get_col<float16>(vec);
index->BuildWithDataset(
knowhere::GenDataSet(N, dim, float16_data.data()), build_config);
for (int i = 0; i < add_cont; i++) {
index->AddWithDataset(
knowhere::GenDataSet(N, dim, float16_data.data()),
build_config);
}
EXPECT_EQ(index->Count(), (add_cont + 1) * N);
} else if (data_type == DataType::VECTOR_BFLOAT16) {
auto index = std::make_unique<milvus::index::VectorMemIndex<bfloat16>>(
index_type,
metric_type,
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
milvus::storage::FileManagerContext());
auto bfloat16_data = dataset.get_col<bfloat16>(vec);
index->BuildWithDataset(
knowhere::GenDataSet(N, dim, bfloat16_data.data()), build_config);
for (int i = 0; i < add_cont; i++) {
index->AddWithDataset(
knowhere::GenDataSet(N, dim, bfloat16_data.data()),
build_config);
}
EXPECT_EQ(index->Count(), (add_cont + 1) * N);
} else if (is_sparse) {
auto index = std::make_unique<milvus::index::VectorMemIndex<float>>(
index_type,
metric_type,
knowhere::Version::GetCurrentVersion().VersionNumber(),
false,
milvus::storage::FileManagerContext());
auto sparse_data =
dataset.get_col<knowhere::sparse::SparseRow<float>>(vec);
index->BuildWithDataset(
knowhere::GenDataSet(N, dim, sparse_data.data()), build_config);
for (int i = 0; i < add_cont; i++) {
index->AddWithDataset(
knowhere::GenDataSet(N, dim, sparse_data.data()), build_config);
}
EXPECT_EQ(index->Count(), (add_cont + 1) * N);
} else {
throw std::invalid_argument("Unsupported data type");
}
}
TEST_P(GrowingIndexTest, MissIndexMeta) {
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);

View File

@ -1277,7 +1277,8 @@ inline std::unique_ptr<milvus::index::VectorIndex>
GenVecIndexing(int64_t N,
int64_t dim,
const float* vec,
const char* index_type) {
const char* index_type,
bool use_knowhere_build_pool = true) {
auto conf =
knowhere::Json{{knowhere::meta::METRIC_TYPE, knowhere::metric::L2},
{knowhere::meta::DIM, std::to_string(dim)},
@ -1296,6 +1297,7 @@ GenVecIndexing(int64_t N,
index_type,
knowhere::metric::L2,
knowhere::Version::GetCurrentVersion().VersionNumber(),
use_knowhere_build_pool,
file_manager_context);
indexing->BuildWithDataset(database, conf);
auto create_index_result = indexing->Upload();

View File

@ -28,6 +28,7 @@
#include "knowhere/comp/index_param.h"
#include "pb/index_cgo_msg.pb.h"
#include "storage/Types.h"
#include "knowhere/comp/index_param.h"
constexpr int64_t DIM = 16;
constexpr int64_t NQ = 10;
@ -59,7 +60,8 @@ generate_build_conf(const milvus::IndexType& index_type,
{knowhere::indexparam::M, "4"},
{knowhere::indexparam::NBITS, "8"},
};
} else if (index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT) {
} else if (index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT ||
index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC) {
return knowhere::Json{
{knowhere::meta::METRIC_TYPE, metric_type},
{knowhere::meta::DIM, std::to_string(DIM)},
@ -113,6 +115,12 @@ generate_build_conf(const milvus::IndexType& index_type,
{knowhere::meta::METRIC_TYPE, metric_type},
{knowhere::indexparam::DROP_RATIO_BUILD, "0.1"},
};
} else if (index_type == knowhere::IndexEnum::INDEX_FAISS_SCANN ||
index_type == knowhere::IndexEnum::INDEX_FAISS_SCANN_DVR) {
return knowhere::Json{
{knowhere::meta::METRIC_TYPE, metric_type},
{knowhere::meta::DIM, std::to_string(DIM)},
};
}
return knowhere::Json();
}