diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 9fbf6ccdec..8583da73ef 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -231,7 +231,7 @@ queryNode: # And this value should be a number greater than 1 and less than 32. chunkRows: 1024 # The number of vectors in a chunk. growing: # growing a vector index for growing segment to accelerate search - enableIndex: false + enableIndex: true nlist: 128 # growing segment index nlist nprobe: 16 # nprobe to search growing segment, based on your accuracy requirement, must smaller than nlist loadMemoryUsageFactor: 3 # The multiply factor of calculating the memory usage while loading segments diff --git a/internal/core/src/query/SearchOnGrowing.cpp b/internal/core/src/query/SearchOnGrowing.cpp index 8782890fa5..1419a7525f 100644 --- a/internal/core/src/query/SearchOnGrowing.cpp +++ b/internal/core/src/query/SearchOnGrowing.cpp @@ -100,6 +100,8 @@ SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, results.unity_topK_ = topk; results.total_nq_ = num_queries; } else { + std::shared_lock read_chunk_mutex( + segment.get_chunk_mutex()); int32_t current_chunk_id = 0; // step 3: brute force search where small indexing is unavailable auto vec_ptr = record.get_field_data_base(vecfield_id); diff --git a/internal/core/src/segcore/FieldIndexing.cpp b/internal/core/src/segcore/FieldIndexing.cpp index 836bf6d369..cf636d127d 100644 --- a/internal/core/src/segcore/FieldIndexing.cpp +++ b/internal/core/src/segcore/FieldIndexing.cpp @@ -28,7 +28,10 @@ VectorFieldIndexing::VectorFieldIndexing(const FieldMeta& field_meta, : FieldIndexing(field_meta, segcore_config), config_(std::make_unique( segment_max_row_count, field_index_meta, segcore_config)), + build(false), sync_with_index(false) { + index_ = std::make_unique(config_->GetIndexType(), + config_->GetMetricType()); } void @@ -87,7 +90,7 @@ VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, auto per_chunk = source->get_size_per_chunk(); //append vector [vector_id_beg, vector_id_end] into index //build index [vector_id_beg, build_threshold) when index not exist - if (!index_.get()) { + if (!build) { idx_t vector_id_beg = index_cur_.load(); idx_t vector_id_end = get_build_threshold() - 1; auto chunk_id_beg = vector_id_beg / per_chunk; @@ -122,13 +125,17 @@ VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, } auto dataset = knowhere::GenDataSet(vec_num, dim, data_addr); dataset->SetIsOwner(false); - auto indexing = std::make_unique( - config_->GetIndexType(), config_->GetMetricType()); - indexing->BuildWithDataset(dataset, conf); + try { + index_->BuildWithDataset(dataset, conf); + } catch (SegcoreError& error) { + LOG_SEGCORE_ERROR_ << " growing index build error : " + << error.what(); + return; + } index_cur_.fetch_add(vec_num); - index_ = std::move(indexing); + build = true; } - //append rest data when index exist + //append rest data when index has built idx_t vector_id_beg = index_cur_.load(); idx_t vector_id_end = reserved_offset + size - 1; auto chunk_id_beg = vector_id_beg / per_chunk; @@ -188,6 +195,11 @@ VectorFieldIndexing::sync_data_with_index() const { return sync_with_index.load(); } +bool +VectorFieldIndexing::has_raw_data() const { + return index_->HasRawData(); +} + template void ScalarFieldIndexing::BuildIndexRange(int64_t ack_beg, diff --git a/internal/core/src/segcore/FieldIndexing.h b/internal/core/src/segcore/FieldIndexing.h index 10752f432f..4917db2fbf 100644 --- a/internal/core/src/segcore/FieldIndexing.h +++ b/internal/core/src/segcore/FieldIndexing.h @@ -66,6 +66,11 @@ class FieldIndexing { virtual bool sync_data_with_index() const = 0; + virtual bool + has_raw_data() const { + return true; + } + const FieldMeta& get_field_meta() { return field_meta_; @@ -192,6 +197,9 @@ class VectorFieldIndexing : public FieldIndexing { bool sync_data_with_index() const override; + bool + has_raw_data() const override; + idx_t get_index_cursor() override; @@ -203,6 +211,7 @@ class VectorFieldIndexing : public FieldIndexing { private: std::atomic index_cur_ = 0; + std::atomic build; std::atomic sync_with_index; std::unique_ptr config_; std::unique_ptr index_; @@ -323,6 +332,16 @@ class IndexingRecord { } return false; } + + bool + HasRawData(FieldId fieldId) const { + if (is_in(fieldId)) { + const FieldIndexing& indexing = get_field_indexing(fieldId); + return indexing.has_raw_data(); + } + return false; + } + // concurrent int64_t get_finished_ack() const { diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 5f031c7627..a9364758da 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -53,6 +53,21 @@ SegmentGrowingImpl::mask_with_delete(BitsetType& bitset, bitset |= delete_bitset; } +void +SegmentGrowingImpl::try_remove_chunks(FieldId fieldId) { + //remove the chunk data to reduce memory consumption + if (indexing_record_.SyncDataWithIndex(fieldId)) { + auto vec_data_base = + dynamic_cast*>( + insert_record_.get_field_data_base(fieldId)); + if (vec_data_base && vec_data_base->num_chunk() > 0 && + chunk_mutex_.try_lock()) { + vec_data_base->clear(); + chunk_mutex_.unlock(); + } + } +} + void SegmentGrowingImpl::Insert(int64_t reserved_offset, int64_t size, @@ -89,6 +104,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, &insert_data->fields_data(data_offset), field_meta); } + //insert vector data into index if (segcore_config_.get_enable_growing_segment_index()) { indexing_record_.AppendingIndex( reserved_offset, @@ -97,6 +113,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, &insert_data->fields_data(data_offset), insert_record_); } + try_remove_chunks(field_id); } // step 4: set pks to offset @@ -174,6 +191,7 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) { offset += row_count; } } + try_remove_chunks(field_id); if (field_id == primary_field_id) { insert_record_.insert_pks(field_datas); @@ -392,10 +410,7 @@ SegmentGrowingImpl::bulk_subscript_impl(FieldId field_id, auto& vec = *vec_ptr; std::vector empty(element_sizeof, 0); - if (indexing_record_.SyncDataWithIndex(field_id)) { - indexing_record_.GetDataFromIndex( - field_id, seg_offsets, count, element_sizeof, output_raw); - } else { + auto copy_from_chunk = [&]() { auto output_base = reinterpret_cast(output_raw); for (int i = 0; i < count; ++i) { auto dst = output_base + i * element_sizeof; @@ -406,7 +421,20 @@ SegmentGrowingImpl::bulk_subscript_impl(FieldId field_id, : (const uint8_t*)vec.get_element(offset)); memcpy(dst, src, element_sizeof); } + }; + //HasRawData interface guarantees that data can be fetched from growing segment + if (HasRawData(field_id.get())) { + //When data sync with index + if (indexing_record_.SyncDataWithIndex(field_id)) { + indexing_record_.GetDataFromIndex( + field_id, seg_offsets, count, element_sizeof, output_raw); + } else { + //Else copy from chunk + std::lock_guard guard(chunk_mutex_); + copy_from_chunk(); + } } + AssertInfo(HasRawData(field_id.get()), "Growing segment loss raw data"); } template diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index ddaa357816..d7775949bd 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -89,6 +89,11 @@ class SegmentGrowingImpl : public SegmentGrowing { return deleted_record_; } + std::shared_mutex& + get_chunk_mutex() const { + return chunk_mutex_; + } + const SealedIndexingRecord& get_sealed_indexing_record() const { return sealed_indexing_record_; @@ -124,6 +129,9 @@ class SegmentGrowingImpl : public SegmentGrowing { return segcore_config_.get_chunk_rows(); } + void + try_remove_chunks(FieldId fieldId); + public: int64_t get_row_count() const override { @@ -228,6 +236,12 @@ class SegmentGrowingImpl : public SegmentGrowing { bool HasRawData(int64_t field_id) const override { + //growing index hold raw data when + // 1. growing index enabled and it holds raw data + // 2. growing index disabled then raw data held by chunk + if (indexing_record_.is_in(FieldId(field_id))) { + return indexing_record_.HasRawData(FieldId(field_id)); + } return true; } @@ -255,6 +269,8 @@ class SegmentGrowingImpl : public SegmentGrowing { // inserted fields data and row_ids, timestamps InsertRecord insert_record_; + mutable std::shared_mutex chunk_mutex_; + // deleted pks mutable DeletedRecord deleted_record_; diff --git a/internal/core/thirdparty/knowhere/CMakeLists.txt b/internal/core/thirdparty/knowhere/CMakeLists.txt index 8dd74db784..34fdb6cc55 100644 --- a/internal/core/thirdparty/knowhere/CMakeLists.txt +++ b/internal/core/thirdparty/knowhere/CMakeLists.txt @@ -11,7 +11,7 @@ # or implied. See the License for the specific language governing permissions and limitations under the License. #------------------------------------------------------------------------------- -set( KNOWHERE_VERSION 4eea3c1 ) +set( KNOWHERE_VERSION 37d764a) message(STATUS "Building knowhere-${KNOWHERE_SOURCE_VER} from source") message(STATUS ${CMAKE_BUILD_TYPE}) diff --git a/internal/core/unittest/test_growing_index.cpp b/internal/core/unittest/test_growing_index.cpp index 801b6957a5..9e5fab5a21 100644 --- a/internal/core/unittest/test_growing_index.cpp +++ b/internal/core/unittest/test_growing_index.cpp @@ -42,6 +42,7 @@ TEST(GrowingIndex, Correctness) { IndexMetaPtr metaPtr = std::make_shared(226985, std::move(filedMap)); auto segment = CreateGrowingSegment(schema, metaPtr); + auto segmentImplPtr = dynamic_cast(segment.get()); // std::string dsl = R"({ // "bool": { @@ -86,6 +87,17 @@ TEST(GrowingIndex, Correctness) { dataset.row_ids_.data(), dataset.timestamps_.data(), dataset.raw_); + auto filed_data = segmentImplPtr->get_insert_record() + .get_field_data(vec); + + auto inserted = (i + 1) * per_batch; + //once index built, chunk data will be removed + if (i < 2) { + EXPECT_EQ(filed_data->num_chunk(), + upper_div(inserted, filed_data->get_size_per_chunk())); + } else { + EXPECT_EQ(filed_data->num_chunk(), 0); + } auto plan = milvus::query::CreateSearchPlanByExpr( *schema, plan_str.data(), plan_str.size()); @@ -102,16 +114,37 @@ TEST(GrowingIndex, Correctness) { } } -TEST(GrowingIndex, GetVector) { +using Param = const char*; + +class GrowingIndexGetVectorTest : public ::testing::TestWithParam { + void + SetUp() override { + auto param = GetParam(); + metricType = param; + } + + protected: + const char* metricType; +}; + +INSTANTIATE_TEST_CASE_P(IndexTypeParameters, + GrowingIndexGetVectorTest, + ::testing::Values(knowhere::metric::L2, + knowhere::metric::COSINE, + knowhere::metric::IP)); + +TEST_P(GrowingIndexGetVectorTest, GetVector) { auto schema = std::make_shared(); auto pk = schema->AddDebugField("pk", DataType::INT64); auto random = schema->AddDebugField("random", DataType::DOUBLE); auto vec = schema->AddDebugField( - "embeddings", DataType::VECTOR_FLOAT, 128, knowhere::metric::L2); + "embeddings", DataType::VECTOR_FLOAT, 128, metricType); schema->set_primary_field_id(pk); std::map index_params = { - {"index_type", "IVF_FLAT"}, {"metric_type", "L2"}, {"nlist", "128"}}; + {"index_type", "IVF_FLAT"}, + {"metric_type", metricType}, + {"nlist", "128"}}; std::map type_params = {{"dim", "128"}}; FieldIndexMeta fieldIndexMeta( vec, std::move(index_params), std::move(type_params)); diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 9229ac7b1a..23111655ab 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -330,7 +330,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(8192), chunkRows) enableGrowingIndex := Params.EnableGrowingSegmentIndex.GetAsBool() - assert.Equal(t, false, enableGrowingIndex) + assert.Equal(t, true, enableGrowingIndex) params.Save("queryNode.segcore.growing.enableIndex", "true") enableGrowingIndex = Params.EnableGrowingSegmentIndex.GetAsBool()