mirror of https://github.com/milvus-io/milvus.git
enhance: support sparse cardinal hnsw index (#33656)
issue: #29419 Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>pull/33766/head
parent
1697706ac0
commit
47b04ea167
|
@ -213,6 +213,10 @@ IndexFactory::CreateVectorIndex(
|
|||
return std::make_unique<VectorDiskAnnIndex<bin1>>(
|
||||
index_type, metric_type, version, file_manager_context);
|
||||
}
|
||||
case DataType::VECTOR_SPARSE_FLOAT: {
|
||||
return std::make_unique<VectorDiskAnnIndex<float>>(
|
||||
index_type, metric_type, version, file_manager_context);
|
||||
}
|
||||
default:
|
||||
throw SegcoreError(
|
||||
DataTypeInvalid,
|
||||
|
@ -328,6 +332,14 @@ IndexFactory::CreateVectorIndex(
|
|||
space,
|
||||
file_manager_context);
|
||||
}
|
||||
case DataType::VECTOR_SPARSE_FLOAT: {
|
||||
return std::make_unique<VectorDiskAnnIndex<float>>(
|
||||
index_type,
|
||||
metric_type,
|
||||
version,
|
||||
space,
|
||||
file_manager_context);
|
||||
}
|
||||
default:
|
||||
throw SegcoreError(
|
||||
DataTypeInvalid,
|
||||
|
|
|
@ -32,10 +32,12 @@ VectorFieldIndexing::VectorFieldIndexing(const FieldMeta& field_meta,
|
|||
: FieldIndexing(field_meta, segcore_config),
|
||||
built_(false),
|
||||
sync_with_index_(false),
|
||||
config_(std::make_unique<VecIndexConfig>(segment_max_row_count,
|
||||
field_index_meta,
|
||||
segcore_config,
|
||||
SegmentType::Growing)) {
|
||||
config_(std::make_unique<VecIndexConfig>(
|
||||
segment_max_row_count,
|
||||
field_index_meta,
|
||||
segcore_config,
|
||||
SegmentType::Growing,
|
||||
IsSparseFloatVectorDataType(field_meta.get_data_type()))) {
|
||||
recreate_index();
|
||||
}
|
||||
|
||||
|
|
|
@ -16,8 +16,11 @@ namespace milvus::segcore {
|
|||
VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout,
|
||||
const FieldIndexMeta& index_meta_,
|
||||
const SegcoreConfig& config,
|
||||
const SegmentType& segment_type)
|
||||
: max_index_row_count_(max_index_row_cout), config_(config) {
|
||||
const SegmentType& segment_type,
|
||||
const bool is_sparse)
|
||||
: max_index_row_count_(max_index_row_cout),
|
||||
config_(config),
|
||||
is_sparse_(is_sparse) {
|
||||
origin_index_type_ = index_meta_.GetIndexType();
|
||||
metric_type_ = index_meta_.GeMetricType();
|
||||
// Currently for dense vector index, if the segment is growing, we use IVFCC
|
||||
|
@ -29,11 +32,15 @@ VecIndexConfig::VecIndexConfig(const int64_t max_index_row_cout,
|
|||
// But for sparse vector index(INDEX_SPARSE_INVERTED_INDEX and
|
||||
// INDEX_SPARSE_WAND), those index themselves can be used as the temp index
|
||||
// type, so we can avoid the extra step of "releast temp and load".
|
||||
// When using HNSW(cardinal) for sparse, we use INDEX_SPARSE_INVERTED_INDEX
|
||||
// as the growing index.
|
||||
|
||||
if (origin_index_type_ ==
|
||||
knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX ||
|
||||
origin_index_type_ == knowhere::IndexEnum::INDEX_SPARSE_WAND) {
|
||||
index_type_ = origin_index_type_;
|
||||
} else if (is_sparse_) {
|
||||
index_type_ = knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX;
|
||||
} else {
|
||||
index_type_ = support_index_types.at(segment_type);
|
||||
}
|
||||
|
@ -58,9 +65,7 @@ VecIndexConfig::GetBuildThreshold() const noexcept {
|
|||
// For sparse, do not impose a threshold and start using index with any
|
||||
// number of rows. Unlike dense vector index, growing sparse vector index
|
||||
// does not require a minimum number of rows to train.
|
||||
if (origin_index_type_ ==
|
||||
knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX ||
|
||||
origin_index_type_ == knowhere::IndexEnum::INDEX_SPARSE_WAND) {
|
||||
if (is_sparse_) {
|
||||
return 0;
|
||||
}
|
||||
assert(VecIndexConfig::index_build_ratio.count(index_type_));
|
||||
|
|
|
@ -44,7 +44,8 @@ class VecIndexConfig {
|
|||
VecIndexConfig(const int64_t max_index_row_count,
|
||||
const FieldIndexMeta& index_meta_,
|
||||
const SegcoreConfig& config,
|
||||
const SegmentType& segment_type);
|
||||
const SegmentType& segment_type,
|
||||
const bool is_sparse);
|
||||
|
||||
int64_t
|
||||
GetBuildThreshold() const noexcept;
|
||||
|
@ -72,6 +73,8 @@ class VecIndexConfig {
|
|||
|
||||
knowhere::MetricType metric_type_;
|
||||
|
||||
bool is_sparse_;
|
||||
|
||||
knowhere::Json build_params_;
|
||||
|
||||
knowhere::Json search_params_;
|
||||
|
|
|
@ -1593,7 +1593,8 @@ SegmentSealedImpl::generate_interim_index(const FieldId field_id) {
|
|||
new VecIndexConfig(row_count,
|
||||
field_index_meta,
|
||||
segcore_config_,
|
||||
SegmentType::Sealed));
|
||||
SegmentType::Sealed,
|
||||
is_sparse));
|
||||
if (row_count < field_binlog_config->GetBuildThreshold()) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -453,10 +453,18 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
|
|||
|
||||
auto local_chunk_manager =
|
||||
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
|
||||
auto local_data_path = storage::GenFieldRawDataPathPrefix(
|
||||
local_chunk_manager, segment_id, field_id) +
|
||||
"raw_data";
|
||||
local_chunk_manager->CreateFile(local_data_path);
|
||||
std::string local_data_path;
|
||||
bool file_created = false;
|
||||
|
||||
auto init_file_info = [&](milvus::DataType dt) {
|
||||
local_data_path = storage::GenFieldRawDataPathPrefix(
|
||||
local_chunk_manager, segment_id, field_id) +
|
||||
"raw_data";
|
||||
if (dt == milvus::DataType::VECTOR_SPARSE_FLOAT) {
|
||||
local_data_path += ".sparse_u32_f32";
|
||||
}
|
||||
local_chunk_manager->CreateFile(local_data_path);
|
||||
};
|
||||
|
||||
// get batch raw data from s3 and write batch data to disk file
|
||||
// TODO: load and write of different batches at the same time
|
||||
|
@ -474,17 +482,50 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
|
|||
for (int i = 0; i < batch_size; ++i) {
|
||||
auto field_data = field_datas[i].get()->GetFieldData();
|
||||
num_rows += uint32_t(field_data->get_num_rows());
|
||||
AssertInfo(dim == 0 || dim == field_data->get_dim(),
|
||||
"inconsistent dim value in multi binlogs!");
|
||||
dim = field_data->get_dim();
|
||||
auto data_type = field_data->get_data_type();
|
||||
if (!file_created) {
|
||||
init_file_info(data_type);
|
||||
file_created = true;
|
||||
}
|
||||
if (data_type == milvus::DataType::VECTOR_SPARSE_FLOAT) {
|
||||
dim = std::max(
|
||||
dim,
|
||||
(uint32_t)(
|
||||
std::dynamic_pointer_cast<FieldData<SparseFloatVector>>(
|
||||
field_data)
|
||||
->Dim()));
|
||||
auto sparse_rows =
|
||||
static_cast<const knowhere::sparse::SparseRow<float>*>(
|
||||
field_data->Data());
|
||||
for (size_t i = 0; i < field_data->Length(); ++i) {
|
||||
auto row = sparse_rows[i];
|
||||
auto row_byte_size = row.data_byte_size();
|
||||
uint32_t nnz = row.size();
|
||||
local_chunk_manager->Write(local_data_path,
|
||||
write_offset,
|
||||
const_cast<uint32_t*>(&nnz),
|
||||
sizeof(nnz));
|
||||
write_offset += sizeof(nnz);
|
||||
local_chunk_manager->Write(local_data_path,
|
||||
write_offset,
|
||||
row.data(),
|
||||
row_byte_size);
|
||||
write_offset += row_byte_size;
|
||||
}
|
||||
} else {
|
||||
AssertInfo(dim == 0 || dim == field_data->get_dim(),
|
||||
"inconsistent dim value in multi binlogs!");
|
||||
dim = field_data->get_dim();
|
||||
|
||||
auto data_size =
|
||||
field_data->get_num_rows() * dim * sizeof(DataType);
|
||||
local_chunk_manager->Write(local_data_path,
|
||||
write_offset,
|
||||
const_cast<void*>(field_data->Data()),
|
||||
data_size);
|
||||
write_offset += data_size;
|
||||
auto data_size =
|
||||
field_data->get_num_rows() * dim * sizeof(DataType);
|
||||
local_chunk_manager->Write(
|
||||
local_data_path,
|
||||
write_offset,
|
||||
const_cast<void*>(field_data->Data()),
|
||||
data_size);
|
||||
write_offset += data_size;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue