diff --git a/internal/core/src/query/SearchOnGrowing.cpp b/internal/core/src/query/SearchOnGrowing.cpp index ce13c2a75a..9e4096886b 100644 --- a/internal/core/src/query/SearchOnGrowing.cpp +++ b/internal/core/src/query/SearchOnGrowing.cpp @@ -55,10 +55,13 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment, AssertInfo(vec_ptr->get_size_per_chunk() == field_indexing.get_size_per_chunk(), "[FloatSearch]Chunk size of vector not equal to chunk size of field index"); + auto size_per_chunk = field_indexing.get_size_per_chunk(); for (int chunk_id = current_chunk_id; chunk_id < max_indexed_id; ++chunk_id) { - auto size_per_chunk = field_indexing.get_size_per_chunk(); - auto indexing = field_indexing.get_chunk_indexing(chunk_id); + if ((chunk_id + 1) * size_per_chunk > ins_barrier) { + break; + } + auto indexing = field_indexing.get_chunk_indexing(chunk_id); auto sub_view = bitset.subview(chunk_id * size_per_chunk, size_per_chunk); auto sub_qr = SearchOnIndex(search_dataset, *indexing, search_conf, sub_view); @@ -70,8 +73,8 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment, } final_qr.merge(sub_qr); + current_chunk_id++; } - current_chunk_id = max_indexed_id; } // step 3: brute force search where small indexing is unavailable diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index f32f05414f..e74b73db87 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -14,6 +14,7 @@ #include #include #include +#include #include "common/Schema.h" #include "segcore/AckResponder.h" @@ -34,8 +35,49 @@ struct InsertRecord { // used for timestamps index of sealed segment TimestampIndex timestamp_index_; + // pks to row offset + Pk2OffsetType pk2offset_; + explicit InsertRecord(const Schema& schema, int64_t size_per_chunk); + std::vector + search_pk(const PkType pk, Timestamp timestamp) const { + std::vector res_offsets; + auto [iter_b, iter_e] = pk2offset_.equal_range(pk); + for (auto iter = iter_b; iter != iter_e; ++iter) { + auto offset = SegOffset(iter->second); + if (timestamps_[offset.get()] <= timestamp) { + res_offsets.push_back(offset); + } + } + + return res_offsets; + } + + std::vector + search_pk(const PkType pk, int64_t insert_barrier) const { + std::vector res_offsets; + auto [iter_b, iter_e] = pk2offset_.equal_range(pk); + for (auto iter = iter_b; iter != iter_e; ++iter) { + auto offset = SegOffset(iter->second); + if (offset.get() < insert_barrier) { + res_offsets.push_back(offset); + } + } + + return res_offsets; + } + + void + insert_pk(const PkType pk, int64_t offset) { + pk2offset_.insert(std::make_pair(pk, offset)); + } + + bool + empty_pks() const { + return pk2offset_.empty(); + } + // get field data without knowing the type VectorBase* get_field_data_base(FieldId field_id) const { diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index c4d770acae..ba956b3293 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -44,8 +44,7 @@ SegmentGrowingImpl::mask_with_delete(BitsetType& bitset, int64_t ins_barrier, Ti if (del_barrier == 0) { return; } - auto bitmap_holder = - get_deleted_bitmap(del_barrier, ins_barrier, deleted_record_, insert_record_, pk2offset_, timestamp); + auto bitmap_holder = get_deleted_bitmap(del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp); if (!bitmap_holder || !bitmap_holder->bitmap_ptr) { return; } @@ -91,7 +90,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, std::vector pks(size); ParsePksFromFieldData(pks, insert_data->fields_data(field_id_to_offset[field_id])); for (int i = 0; i < size; ++i) { - pk2offset_.insert(std::make_pair(pks[i], reserved_offset + i)); + insert_record_.insert_pk(pks[i], reserved_offset + i); } // step 5: update small indexes @@ -359,25 +358,22 @@ SegmentGrowingImpl::search_ids(const IdArray& id_array, Timestamp timestamp) con auto res_id_arr = std::make_unique(); std::vector res_offsets; for (auto pk : pks) { - auto [iter_b, iter_e] = pk2offset_.equal_range(pk); - for (auto iter = iter_b; iter != iter_e; ++iter) { - auto offset = SegOffset(iter->second); - if (insert_record_.timestamps_[offset.get()] <= timestamp) { - switch (data_type) { - case DataType::INT64: { - res_id_arr->mutable_int_id()->add_data(std::get(pk)); - break; - } - case DataType::VARCHAR: { - res_id_arr->mutable_str_id()->add_data(std::get(pk)); - break; - } - default: { - PanicInfo("unsupported type"); - } + auto segOffsets = insert_record_.search_pk(pk, timestamp); + for (auto offset : segOffsets) { + switch (data_type) { + case DataType::INT64: { + res_id_arr->mutable_int_id()->add_data(std::get(pk)); + break; + } + case DataType::VARCHAR: { + res_id_arr->mutable_str_id()->add_data(std::get(pk)); + break; + } + default: { + PanicInfo("unsupported type"); } - res_offsets.push_back(offset); } + res_offsets.push_back(offset); } } return {std::move(res_id_arr), std::move(res_offsets)}; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 88d050155a..0b5ad587c5 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -227,8 +227,6 @@ class SegmentGrowingImpl : public SegmentGrowing { // deleted pks mutable DeletedRecord deleted_record_; - // pks to row offset - Pk2OffsetType pk2offset_; int64_t id_; private: diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 6f8ebd902f..bc82bada41 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -135,19 +135,19 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) { // reverse pk from scalar index and set pks to offset if (schema_->get_primary_field_id() == field_id) { AssertInfo(field_id.get() != -1, "Primary key is -1"); - AssertInfo(pk2offset_.empty(), "already exists"); + AssertInfo(insert_record_.empty_pks(), "already exists"); switch (field_meta.get_data_type()) { case DataType::INT64: { auto int64_index = std::dynamic_pointer_cast>(info.index); for (int i = 0; i < row_count; ++i) { - pk2offset_.insert(std::make_pair(int64_index->Reverse_Lookup(i), i)); + insert_record_.insert_pk(int64_index->Reverse_Lookup(i), i); } break; } case DataType::VARCHAR: { auto string_index = std::dynamic_pointer_cast>(info.index); for (int i = 0; i < row_count; ++i) { - pk2offset_.insert(std::make_pair(string_index->Reverse_Lookup(i), i)); + insert_record_.insert_pk(string_index->Reverse_Lookup(i), i); } break; } @@ -226,11 +226,11 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { // set pks to offset if (schema_->get_primary_field_id() == field_id) { AssertInfo(field_id.get() != -1, "Primary key is -1"); - AssertInfo(pk2offset_.empty(), "already exists"); + AssertInfo(insert_record_.empty_pks(), "already exists"); std::vector pks(size); ParsePksFromFieldData(pks, *info.field_data); for (int i = 0; i < size; ++i) { - pk2offset_.insert(std::make_pair(pks[i], i)); + insert_record_.insert_pk(pks[i], i); } } @@ -333,8 +333,7 @@ SegmentSealedImpl::mask_with_delete(BitsetType& bitset, int64_t ins_barrier, Tim if (del_barrier == 0) { return; } - auto bitmap_holder = - get_deleted_bitmap(del_barrier, ins_barrier, deleted_record_, insert_record_, pk2offset_, timestamp); + auto bitmap_holder = get_deleted_bitmap(del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp); if (!bitmap_holder || !bitmap_holder->bitmap_ptr) { return; } @@ -668,25 +667,22 @@ SegmentSealedImpl::search_ids(const IdArray& id_array, Timestamp timestamp) cons auto res_id_arr = std::make_unique(); std::vector res_offsets; for (auto pk : pks) { - auto [iter_b, iter_e] = pk2offset_.equal_range(pk); - for (auto iter = iter_b; iter != iter_e; ++iter) { - auto offset = SegOffset(iter->second); - if (insert_record_.timestamps_[offset.get()] <= timestamp) { - switch (data_type) { - case DataType::INT64: { - res_id_arr->mutable_int_id()->add_data(std::get(pk)); - break; - } - case DataType::VARCHAR: { - res_id_arr->mutable_str_id()->add_data(std::get(pk)); - break; - } - default: { - PanicInfo("unsupported type"); - } + auto segOffsets = insert_record_.search_pk(pk, timestamp); + for (auto offset : segOffsets) { + switch (data_type) { + case DataType::INT64: { + res_id_arr->mutable_int_id()->add_data(std::get(pk)); + break; + } + case DataType::VARCHAR: { + res_id_arr->mutable_str_id()->add_data(std::get(pk)); + break; + } + default: { + PanicInfo("unsupported type"); } - res_offsets.push_back(offset); } + res_offsets.push_back(offset); } } return {std::move(res_id_arr), std::move(res_offsets)}; diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index f97d7034cc..32b4ed7b26 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -190,9 +190,6 @@ class SegmentSealedImpl : public SegmentSealed { // deleted pks mutable DeletedRecord deleted_record_; - // pks to row offset - Pk2OffsetType pk2offset_; - SchemaPtr schema_; int64_t id_; }; diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index 7ab3cc2a42..560233a8d8 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -379,7 +379,6 @@ get_deleted_bitmap(int64_t del_barrier, int64_t insert_barrier, DeletedRecord& delete_record, const InsertRecord& insert_record, - const Pk2OffsetType& pk2offset, Timestamp query_timestamp) { auto old = delete_record.get_lru_entry(); // if insert_barrier and del_barrier have not changed, use cache data directly @@ -412,9 +411,9 @@ get_deleted_bitmap(int64_t del_barrier, // get pk in delete logs auto pk = delete_record.pks_[del_index]; // find insert data which has same pk - auto [iter_b, iter_e] = pk2offset.equal_range(pk); - for (auto iter = iter_b; iter != iter_e; ++iter) { - auto insert_row_offset = iter->second; + auto segOffsets = insert_record.search_pk(pk, insert_barrier); + for (auto offset : segOffsets) { + int64_t insert_row_offset = offset.get(); // for now, insert_barrier == insert count of segment, so this Assert will always work AssertInfo(insert_row_offset < insert_barrier, "Timestamp offset is larger than insert barrier"); diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h index 2f628ed5a6..895944f755 100644 --- a/internal/core/src/segcore/Utils.h +++ b/internal/core/src/segcore/Utils.h @@ -87,7 +87,6 @@ get_deleted_bitmap(int64_t del_barrier, int64_t insert_barrier, DeletedRecord& delete_record, const InsertRecord& insert_record, - const Pk2OffsetType& pk2offset, Timestamp query_timestamp); std::unique_ptr diff --git a/internal/core/unittest/test_utils.cpp b/internal/core/unittest/test_utils.cpp index 88cb52de11..c687e0ad1f 100644 --- a/internal/core/unittest/test_utils.cpp +++ b/internal/core/unittest/test_utils.cpp @@ -66,7 +66,6 @@ TEST(Util, GetDeleteBitmap) { schema->set_primary_field_id(i64_fid); auto N = 10; - Pk2OffsetType pk2offset; InsertRecord insert_record(*schema, N); DeletedRecord delete_record; @@ -76,7 +75,7 @@ TEST(Util, GetDeleteBitmap) { for (int i = 0; i < N; ++i) { age_data[i] = 1; tss[i] = i + 1; - pk2offset.insert(std::make_pair(1, i)); + insert_record.insert_pk(1, i); } auto insert_offset = insert_record.reserved.fetch_add(N); insert_record.timestamps_.fill_chunk_data(tss.data(), N); @@ -95,8 +94,7 @@ TEST(Util, GetDeleteBitmap) { auto query_timestamp = tss[N - 1]; auto del_barrier = get_barrier(delete_record, query_timestamp); auto insert_barrier = get_barrier(insert_record, query_timestamp); - auto res_bitmap = - get_deleted_bitmap(del_barrier, insert_barrier, delete_record, insert_record, pk2offset, query_timestamp); + auto res_bitmap = get_deleted_bitmap(del_barrier, insert_barrier, delete_record, insert_record, query_timestamp); ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0); // test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N) @@ -108,13 +106,12 @@ TEST(Util, GetDeleteBitmap) { delete_record.ack_responder_.AddSegment(offset, offset + 1); del_barrier = get_barrier(delete_record, query_timestamp); - res_bitmap = - get_deleted_bitmap(del_barrier, insert_barrier, delete_record, insert_record, pk2offset, query_timestamp); + res_bitmap = get_deleted_bitmap(del_barrier, insert_barrier, delete_record, insert_record, query_timestamp); ASSERT_EQ(res_bitmap->bitmap_ptr->count(), N); // test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N/2) query_timestamp = tss[N - 1] / 2; del_barrier = get_barrier(delete_record, query_timestamp); - res_bitmap = get_deleted_bitmap(del_barrier, N, delete_record, insert_record, pk2offset, query_timestamp); + res_bitmap = get_deleted_bitmap(del_barrier, N, delete_record, insert_record, query_timestamp); ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0); }