mirror of https://github.com/milvus-io/milvus.git
Fix segOffset grater than insert barrier when mark delete (#17444)
Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/17501/head
parent
2b298e4230
commit
36ad989590
|
@ -55,10 +55,13 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment,
|
|||
AssertInfo(vec_ptr->get_size_per_chunk() == field_indexing.get_size_per_chunk(),
|
||||
"[FloatSearch]Chunk size of vector not equal to chunk size of field index");
|
||||
|
||||
auto size_per_chunk = field_indexing.get_size_per_chunk();
|
||||
for (int chunk_id = current_chunk_id; chunk_id < max_indexed_id; ++chunk_id) {
|
||||
auto size_per_chunk = field_indexing.get_size_per_chunk();
|
||||
auto indexing = field_indexing.get_chunk_indexing(chunk_id);
|
||||
if ((chunk_id + 1) * size_per_chunk > ins_barrier) {
|
||||
break;
|
||||
}
|
||||
|
||||
auto indexing = field_indexing.get_chunk_indexing(chunk_id);
|
||||
auto sub_view = bitset.subview(chunk_id * size_per_chunk, size_per_chunk);
|
||||
auto sub_qr = SearchOnIndex(search_dataset, *indexing, search_conf, sub_view);
|
||||
|
||||
|
@ -70,8 +73,8 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment,
|
|||
}
|
||||
|
||||
final_qr.merge(sub_qr);
|
||||
current_chunk_id++;
|
||||
}
|
||||
current_chunk_id = max_indexed_id;
|
||||
}
|
||||
|
||||
// step 3: brute force search where small indexing is unavailable
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include <memory>
|
||||
#include <vector>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
|
||||
#include "common/Schema.h"
|
||||
#include "segcore/AckResponder.h"
|
||||
|
@ -34,8 +35,49 @@ struct InsertRecord {
|
|||
// used for timestamps index of sealed segment
|
||||
TimestampIndex timestamp_index_;
|
||||
|
||||
// pks to row offset
|
||||
Pk2OffsetType pk2offset_;
|
||||
|
||||
explicit InsertRecord(const Schema& schema, int64_t size_per_chunk);
|
||||
|
||||
std::vector<SegOffset>
|
||||
search_pk(const PkType pk, Timestamp timestamp) const {
|
||||
std::vector<SegOffset> res_offsets;
|
||||
auto [iter_b, iter_e] = pk2offset_.equal_range(pk);
|
||||
for (auto iter = iter_b; iter != iter_e; ++iter) {
|
||||
auto offset = SegOffset(iter->second);
|
||||
if (timestamps_[offset.get()] <= timestamp) {
|
||||
res_offsets.push_back(offset);
|
||||
}
|
||||
}
|
||||
|
||||
return res_offsets;
|
||||
}
|
||||
|
||||
std::vector<SegOffset>
|
||||
search_pk(const PkType pk, int64_t insert_barrier) const {
|
||||
std::vector<SegOffset> res_offsets;
|
||||
auto [iter_b, iter_e] = pk2offset_.equal_range(pk);
|
||||
for (auto iter = iter_b; iter != iter_e; ++iter) {
|
||||
auto offset = SegOffset(iter->second);
|
||||
if (offset.get() < insert_barrier) {
|
||||
res_offsets.push_back(offset);
|
||||
}
|
||||
}
|
||||
|
||||
return res_offsets;
|
||||
}
|
||||
|
||||
void
|
||||
insert_pk(const PkType pk, int64_t offset) {
|
||||
pk2offset_.insert(std::make_pair(pk, offset));
|
||||
}
|
||||
|
||||
bool
|
||||
empty_pks() const {
|
||||
return pk2offset_.empty();
|
||||
}
|
||||
|
||||
// get field data without knowing the type
|
||||
VectorBase*
|
||||
get_field_data_base(FieldId field_id) const {
|
||||
|
|
|
@ -44,8 +44,7 @@ SegmentGrowingImpl::mask_with_delete(BitsetType& bitset, int64_t ins_barrier, Ti
|
|||
if (del_barrier == 0) {
|
||||
return;
|
||||
}
|
||||
auto bitmap_holder =
|
||||
get_deleted_bitmap(del_barrier, ins_barrier, deleted_record_, insert_record_, pk2offset_, timestamp);
|
||||
auto bitmap_holder = get_deleted_bitmap(del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp);
|
||||
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
|
||||
return;
|
||||
}
|
||||
|
@ -91,7 +90,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
|
|||
std::vector<PkType> pks(size);
|
||||
ParsePksFromFieldData(pks, insert_data->fields_data(field_id_to_offset[field_id]));
|
||||
for (int i = 0; i < size; ++i) {
|
||||
pk2offset_.insert(std::make_pair(pks[i], reserved_offset + i));
|
||||
insert_record_.insert_pk(pks[i], reserved_offset + i);
|
||||
}
|
||||
|
||||
// step 5: update small indexes
|
||||
|
@ -359,25 +358,22 @@ SegmentGrowingImpl::search_ids(const IdArray& id_array, Timestamp timestamp) con
|
|||
auto res_id_arr = std::make_unique<IdArray>();
|
||||
std::vector<SegOffset> res_offsets;
|
||||
for (auto pk : pks) {
|
||||
auto [iter_b, iter_e] = pk2offset_.equal_range(pk);
|
||||
for (auto iter = iter_b; iter != iter_e; ++iter) {
|
||||
auto offset = SegOffset(iter->second);
|
||||
if (insert_record_.timestamps_[offset.get()] <= timestamp) {
|
||||
switch (data_type) {
|
||||
case DataType::INT64: {
|
||||
res_id_arr->mutable_int_id()->add_data(std::get<int64_t>(pk));
|
||||
break;
|
||||
}
|
||||
case DataType::VARCHAR: {
|
||||
res_id_arr->mutable_str_id()->add_data(std::get<std::string>(pk));
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
PanicInfo("unsupported type");
|
||||
}
|
||||
auto segOffsets = insert_record_.search_pk(pk, timestamp);
|
||||
for (auto offset : segOffsets) {
|
||||
switch (data_type) {
|
||||
case DataType::INT64: {
|
||||
res_id_arr->mutable_int_id()->add_data(std::get<int64_t>(pk));
|
||||
break;
|
||||
}
|
||||
case DataType::VARCHAR: {
|
||||
res_id_arr->mutable_str_id()->add_data(std::get<std::string>(pk));
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
PanicInfo("unsupported type");
|
||||
}
|
||||
res_offsets.push_back(offset);
|
||||
}
|
||||
res_offsets.push_back(offset);
|
||||
}
|
||||
}
|
||||
return {std::move(res_id_arr), std::move(res_offsets)};
|
||||
|
|
|
@ -227,8 +227,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
|||
// deleted pks
|
||||
mutable DeletedRecord deleted_record_;
|
||||
|
||||
// pks to row offset
|
||||
Pk2OffsetType pk2offset_;
|
||||
int64_t id_;
|
||||
|
||||
private:
|
||||
|
|
|
@ -135,19 +135,19 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
|
|||
// reverse pk from scalar index and set pks to offset
|
||||
if (schema_->get_primary_field_id() == field_id) {
|
||||
AssertInfo(field_id.get() != -1, "Primary key is -1");
|
||||
AssertInfo(pk2offset_.empty(), "already exists");
|
||||
AssertInfo(insert_record_.empty_pks(), "already exists");
|
||||
switch (field_meta.get_data_type()) {
|
||||
case DataType::INT64: {
|
||||
auto int64_index = std::dynamic_pointer_cast<scalar::ScalarIndex<int64_t>>(info.index);
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
pk2offset_.insert(std::make_pair(int64_index->Reverse_Lookup(i), i));
|
||||
insert_record_.insert_pk(int64_index->Reverse_Lookup(i), i);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case DataType::VARCHAR: {
|
||||
auto string_index = std::dynamic_pointer_cast<scalar::ScalarIndex<std::string>>(info.index);
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
pk2offset_.insert(std::make_pair(string_index->Reverse_Lookup(i), i));
|
||||
insert_record_.insert_pk(string_index->Reverse_Lookup(i), i);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -226,11 +226,11 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
|
|||
// set pks to offset
|
||||
if (schema_->get_primary_field_id() == field_id) {
|
||||
AssertInfo(field_id.get() != -1, "Primary key is -1");
|
||||
AssertInfo(pk2offset_.empty(), "already exists");
|
||||
AssertInfo(insert_record_.empty_pks(), "already exists");
|
||||
std::vector<PkType> pks(size);
|
||||
ParsePksFromFieldData(pks, *info.field_data);
|
||||
for (int i = 0; i < size; ++i) {
|
||||
pk2offset_.insert(std::make_pair(pks[i], i));
|
||||
insert_record_.insert_pk(pks[i], i);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -333,8 +333,7 @@ SegmentSealedImpl::mask_with_delete(BitsetType& bitset, int64_t ins_barrier, Tim
|
|||
if (del_barrier == 0) {
|
||||
return;
|
||||
}
|
||||
auto bitmap_holder =
|
||||
get_deleted_bitmap(del_barrier, ins_barrier, deleted_record_, insert_record_, pk2offset_, timestamp);
|
||||
auto bitmap_holder = get_deleted_bitmap(del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp);
|
||||
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
|
||||
return;
|
||||
}
|
||||
|
@ -668,25 +667,22 @@ SegmentSealedImpl::search_ids(const IdArray& id_array, Timestamp timestamp) cons
|
|||
auto res_id_arr = std::make_unique<IdArray>();
|
||||
std::vector<SegOffset> res_offsets;
|
||||
for (auto pk : pks) {
|
||||
auto [iter_b, iter_e] = pk2offset_.equal_range(pk);
|
||||
for (auto iter = iter_b; iter != iter_e; ++iter) {
|
||||
auto offset = SegOffset(iter->second);
|
||||
if (insert_record_.timestamps_[offset.get()] <= timestamp) {
|
||||
switch (data_type) {
|
||||
case DataType::INT64: {
|
||||
res_id_arr->mutable_int_id()->add_data(std::get<int64_t>(pk));
|
||||
break;
|
||||
}
|
||||
case DataType::VARCHAR: {
|
||||
res_id_arr->mutable_str_id()->add_data(std::get<std::string>(pk));
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
PanicInfo("unsupported type");
|
||||
}
|
||||
auto segOffsets = insert_record_.search_pk(pk, timestamp);
|
||||
for (auto offset : segOffsets) {
|
||||
switch (data_type) {
|
||||
case DataType::INT64: {
|
||||
res_id_arr->mutable_int_id()->add_data(std::get<int64_t>(pk));
|
||||
break;
|
||||
}
|
||||
case DataType::VARCHAR: {
|
||||
res_id_arr->mutable_str_id()->add_data(std::get<std::string>(pk));
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
PanicInfo("unsupported type");
|
||||
}
|
||||
res_offsets.push_back(offset);
|
||||
}
|
||||
res_offsets.push_back(offset);
|
||||
}
|
||||
}
|
||||
return {std::move(res_id_arr), std::move(res_offsets)};
|
||||
|
|
|
@ -190,9 +190,6 @@ class SegmentSealedImpl : public SegmentSealed {
|
|||
// deleted pks
|
||||
mutable DeletedRecord deleted_record_;
|
||||
|
||||
// pks to row offset
|
||||
Pk2OffsetType pk2offset_;
|
||||
|
||||
SchemaPtr schema_;
|
||||
int64_t id_;
|
||||
};
|
||||
|
|
|
@ -379,7 +379,6 @@ get_deleted_bitmap(int64_t del_barrier,
|
|||
int64_t insert_barrier,
|
||||
DeletedRecord& delete_record,
|
||||
const InsertRecord& insert_record,
|
||||
const Pk2OffsetType& pk2offset,
|
||||
Timestamp query_timestamp) {
|
||||
auto old = delete_record.get_lru_entry();
|
||||
// if insert_barrier and del_barrier have not changed, use cache data directly
|
||||
|
@ -412,9 +411,9 @@ get_deleted_bitmap(int64_t del_barrier,
|
|||
// get pk in delete logs
|
||||
auto pk = delete_record.pks_[del_index];
|
||||
// find insert data which has same pk
|
||||
auto [iter_b, iter_e] = pk2offset.equal_range(pk);
|
||||
for (auto iter = iter_b; iter != iter_e; ++iter) {
|
||||
auto insert_row_offset = iter->second;
|
||||
auto segOffsets = insert_record.search_pk(pk, insert_barrier);
|
||||
for (auto offset : segOffsets) {
|
||||
int64_t insert_row_offset = offset.get();
|
||||
// for now, insert_barrier == insert count of segment, so this Assert will always work
|
||||
AssertInfo(insert_row_offset < insert_barrier, "Timestamp offset is larger than insert barrier");
|
||||
|
||||
|
|
|
@ -87,7 +87,6 @@ get_deleted_bitmap(int64_t del_barrier,
|
|||
int64_t insert_barrier,
|
||||
DeletedRecord& delete_record,
|
||||
const InsertRecord& insert_record,
|
||||
const Pk2OffsetType& pk2offset,
|
||||
Timestamp query_timestamp);
|
||||
|
||||
std::unique_ptr<DataArray>
|
||||
|
|
|
@ -66,7 +66,6 @@ TEST(Util, GetDeleteBitmap) {
|
|||
schema->set_primary_field_id(i64_fid);
|
||||
auto N = 10;
|
||||
|
||||
Pk2OffsetType pk2offset;
|
||||
InsertRecord insert_record(*schema, N);
|
||||
DeletedRecord delete_record;
|
||||
|
||||
|
@ -76,7 +75,7 @@ TEST(Util, GetDeleteBitmap) {
|
|||
for (int i = 0; i < N; ++i) {
|
||||
age_data[i] = 1;
|
||||
tss[i] = i + 1;
|
||||
pk2offset.insert(std::make_pair(1, i));
|
||||
insert_record.insert_pk(1, i);
|
||||
}
|
||||
auto insert_offset = insert_record.reserved.fetch_add(N);
|
||||
insert_record.timestamps_.fill_chunk_data(tss.data(), N);
|
||||
|
@ -95,8 +94,7 @@ TEST(Util, GetDeleteBitmap) {
|
|||
auto query_timestamp = tss[N - 1];
|
||||
auto del_barrier = get_barrier(delete_record, query_timestamp);
|
||||
auto insert_barrier = get_barrier(insert_record, query_timestamp);
|
||||
auto res_bitmap =
|
||||
get_deleted_bitmap(del_barrier, insert_barrier, delete_record, insert_record, pk2offset, query_timestamp);
|
||||
auto res_bitmap = get_deleted_bitmap(del_barrier, insert_barrier, delete_record, insert_record, query_timestamp);
|
||||
ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0);
|
||||
|
||||
// test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N)
|
||||
|
@ -108,13 +106,12 @@ TEST(Util, GetDeleteBitmap) {
|
|||
delete_record.ack_responder_.AddSegment(offset, offset + 1);
|
||||
|
||||
del_barrier = get_barrier(delete_record, query_timestamp);
|
||||
res_bitmap =
|
||||
get_deleted_bitmap(del_barrier, insert_barrier, delete_record, insert_record, pk2offset, query_timestamp);
|
||||
res_bitmap = get_deleted_bitmap(del_barrier, insert_barrier, delete_record, insert_record, query_timestamp);
|
||||
ASSERT_EQ(res_bitmap->bitmap_ptr->count(), N);
|
||||
|
||||
// test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N/2)
|
||||
query_timestamp = tss[N - 1] / 2;
|
||||
del_barrier = get_barrier(delete_record, query_timestamp);
|
||||
res_bitmap = get_deleted_bitmap(del_barrier, N, delete_record, insert_record, pk2offset, query_timestamp);
|
||||
res_bitmap = get_deleted_bitmap(del_barrier, N, delete_record, insert_record, query_timestamp);
|
||||
ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue