mirror of https://github.com/milvus-io/milvus.git
fix: fix growing index data race and properly handle build error (#31170)
issue: https://github.com/milvus-io/milvus/issues/31169 also properly handling index build error by re-create a new index so that nothing will be left in the previous failed index build attempt. Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>pull/31247/head
parent
57a5e44b88
commit
7fc3094a42
|
@ -36,6 +36,11 @@ VectorFieldIndexing::VectorFieldIndexing(const FieldMeta& field_meta,
|
|||
field_index_meta,
|
||||
segcore_config,
|
||||
SegmentType::Growing)) {
|
||||
recreate_index();
|
||||
}
|
||||
|
||||
void
|
||||
VectorFieldIndexing::recreate_index() {
|
||||
index_ = std::make_unique<index::VectorMemIndex<float>>(
|
||||
config_->GetIndexType(),
|
||||
config_->GetMetricType(),
|
||||
|
@ -128,6 +133,8 @@ VectorFieldIndexing::AppendSegmentIndexSparse(int64_t reserved_offset,
|
|||
}
|
||||
} catch (SegcoreError& error) {
|
||||
LOG_ERROR("growing sparse index build error: {}", error.what());
|
||||
recreate_index();
|
||||
index_cur_ = 0;
|
||||
return;
|
||||
}
|
||||
index_cur_.fetch_add(rows);
|
||||
|
@ -204,6 +211,7 @@ VectorFieldIndexing::AppendSegmentIndexDense(int64_t reserved_offset,
|
|||
index_->BuildWithDataset(dataset, conf);
|
||||
} catch (SegcoreError& error) {
|
||||
LOG_ERROR("growing index build error: {}", error.what());
|
||||
recreate_index();
|
||||
return;
|
||||
}
|
||||
index_cur_.fetch_add(vec_num);
|
||||
|
|
|
@ -232,6 +232,8 @@ class VectorFieldIndexing : public FieldIndexing {
|
|||
get_search_params(const SearchInfo& searchInfo) const;
|
||||
|
||||
private:
|
||||
void
|
||||
recreate_index();
|
||||
// current number of rows in index.
|
||||
std::atomic<idx_t> index_cur_ = 0;
|
||||
// whether the growing index has been built.
|
||||
|
|
|
@ -627,22 +627,35 @@ SegmentGrowingImpl::bulk_subscript_sparse_float_vector_impl(
|
|||
milvus::proto::schema::SparseFloatArray* output) const {
|
||||
AssertInfo(HasRawData(field_id.get()), "Growing segment loss raw data");
|
||||
|
||||
// if index has finished building index, grab from index
|
||||
// if index has finished building, grab from index without any
|
||||
// synchronization operations.
|
||||
if (indexing_record_.SyncDataWithIndex(field_id)) {
|
||||
indexing_record_.GetDataFromIndex(
|
||||
field_id, seg_offsets, count, 0, output);
|
||||
return;
|
||||
}
|
||||
// else copy from raw data
|
||||
std::lock_guard<std::shared_mutex> guard(chunk_mutex_);
|
||||
SparseRowsToProto(
|
||||
[&](size_t i) {
|
||||
auto offset = seg_offsets[i];
|
||||
return offset != INVALID_SEG_OFFSET ? vec_raw->get_element(offset)
|
||||
: nullptr;
|
||||
},
|
||||
count,
|
||||
output);
|
||||
{
|
||||
std::lock_guard<std::shared_mutex> guard(chunk_mutex_);
|
||||
// check again after lock to make sure: if index has finished building
|
||||
// after the above check but before we grabbed the lock, we should grab
|
||||
// from index as the data in chunk may have been removed in
|
||||
// try_remove_chunks.
|
||||
if (!indexing_record_.SyncDataWithIndex(field_id)) {
|
||||
// copy from raw data
|
||||
SparseRowsToProto(
|
||||
[&](size_t i) {
|
||||
auto offset = seg_offsets[i];
|
||||
return offset != INVALID_SEG_OFFSET
|
||||
? vec_raw->get_element(offset)
|
||||
: nullptr;
|
||||
},
|
||||
count,
|
||||
output);
|
||||
return;
|
||||
}
|
||||
// else: release lock and copy from index
|
||||
}
|
||||
indexing_record_.GetDataFromIndex(field_id, seg_offsets, count, 0, output);
|
||||
}
|
||||
|
||||
template <typename S, typename T>
|
||||
|
@ -675,25 +688,38 @@ SegmentGrowingImpl::bulk_subscript_impl(FieldId field_id,
|
|||
|
||||
// HasRawData interface guarantees that data can be fetched from growing segment
|
||||
AssertInfo(HasRawData(field_id.get()), "Growing segment loss raw data");
|
||||
// when data is in sync with index
|
||||
|
||||
// if index has finished building, grab from index without any
|
||||
// synchronization operations.
|
||||
if (indexing_record_.SyncDataWithIndex(field_id)) {
|
||||
indexing_record_.GetDataFromIndex(
|
||||
field_id, seg_offsets, count, element_sizeof, output_raw);
|
||||
return;
|
||||
}
|
||||
// else copy from chunk
|
||||
std::lock_guard<std::shared_mutex> guard(chunk_mutex_);
|
||||
auto output_base = reinterpret_cast<char*>(output_raw);
|
||||
for (int i = 0; i < count; ++i) {
|
||||
auto dst = output_base + i * element_sizeof;
|
||||
auto offset = seg_offsets[i];
|
||||
if (offset == INVALID_SEG_OFFSET) {
|
||||
memset(dst, 0, element_sizeof);
|
||||
} else {
|
||||
auto src = (const uint8_t*)vec.get_element(offset);
|
||||
memcpy(dst, src, element_sizeof);
|
||||
{
|
||||
std::lock_guard<std::shared_mutex> guard(chunk_mutex_);
|
||||
// check again after lock to make sure: if index has finished building
|
||||
// after the above check but before we grabbed the lock, we should grab
|
||||
// from index as the data in chunk may have been removed in
|
||||
// try_remove_chunks.
|
||||
if (!indexing_record_.SyncDataWithIndex(field_id)) {
|
||||
auto output_base = reinterpret_cast<char*>(output_raw);
|
||||
for (int i = 0; i < count; ++i) {
|
||||
auto dst = output_base + i * element_sizeof;
|
||||
auto offset = seg_offsets[i];
|
||||
if (offset == INVALID_SEG_OFFSET) {
|
||||
memset(dst, 0, element_sizeof);
|
||||
} else {
|
||||
auto src = (const uint8_t*)vec.get_element(offset);
|
||||
memcpy(dst, src, element_sizeof);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
// else: release lock and copy from index
|
||||
}
|
||||
indexing_record_.GetDataFromIndex(
|
||||
field_id, seg_offsets, count, element_sizeof, output_raw);
|
||||
}
|
||||
|
||||
template <typename S, typename T>
|
||||
|
|
Loading…
Reference in New Issue