mirror of https://github.com/milvus-io/milvus.git
Add Delete func in Segcore SegmentSealed (#10501)
Signed-off-by: fishpenguin <kun.yu@zilliz.com>pull/10525/head
parent
e18debb3cf
commit
7acb48919b
|
@ -60,11 +60,11 @@ class SegmentGrowing : public SegmentInternalInterface {
|
|||
const Timestamp* timestamps,
|
||||
const ColumnBasedRawData& values) = 0;
|
||||
|
||||
virtual int64_t
|
||||
PreDelete(int64_t size) = 0;
|
||||
// virtual int64_t
|
||||
// PreDelete(int64_t size) = 0;
|
||||
|
||||
virtual Status
|
||||
Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0;
|
||||
// virtual Status
|
||||
// Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0;
|
||||
|
||||
public:
|
||||
virtual ssize_t
|
||||
|
|
|
@ -52,6 +52,12 @@ class SegmentInterface {
|
|||
virtual const Schema&
|
||||
get_schema() const = 0;
|
||||
|
||||
virtual int64_t
|
||||
PreDelete(int64_t size) = 0;
|
||||
|
||||
virtual Status
|
||||
Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0;
|
||||
|
||||
virtual ~SegmentInterface() = default;
|
||||
|
||||
protected:
|
||||
|
|
|
@ -36,8 +36,6 @@ class SegmentSealed : public SegmentInternalInterface {
|
|||
HasIndex(FieldId field_id) const = 0;
|
||||
virtual bool
|
||||
HasFieldData(FieldId field_id) const = 0;
|
||||
virtual void
|
||||
Delete(int64_t row_count, const int64_t* uids_raw, const Timestamp* timestamps_raw) = 0;
|
||||
};
|
||||
|
||||
using SegmentSealedPtr = std::unique_ptr<SegmentSealed>;
|
||||
|
|
|
@ -26,6 +26,12 @@ get_bit(const boost::dynamic_bitset<>& bitset, FieldOffset field_offset) {
|
|||
return bitset[field_offset.get()];
|
||||
}
|
||||
|
||||
int64_t
|
||||
SegmentSealedImpl::PreDelete(int64_t size) {
|
||||
auto reserved_begin = deleted_record_.reserved.fetch_add(size);
|
||||
return reserved_begin;
|
||||
}
|
||||
|
||||
void
|
||||
SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) {
|
||||
// NOTE: lock only when data is ready to avoid starvation
|
||||
|
@ -170,6 +176,7 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
|
|||
deleted_record_.uids_.set_data(0, primary_keys, size);
|
||||
deleted_record_.timestamps_.set_data(0, timestamps, size);
|
||||
deleted_record_.ack_responder_.AddSegment(0, size);
|
||||
deleted_record_.reserved.fetch_add(size);
|
||||
deleted_record_.record_size_ = size;
|
||||
}
|
||||
|
||||
|
@ -246,7 +253,7 @@ SegmentSealedImpl::get_deleted_bitmap(int64_t del_barrier,
|
|||
// Sealed segment only has one chunk with chunk_id 0
|
||||
auto span = deleted_record_.uids_.get_span_base(0);
|
||||
auto uids_ptr = reinterpret_cast<const idx_t*>(span.data());
|
||||
auto del_size = deleted_record_.record_size_;
|
||||
auto del_size = deleted_record_.reserved.load();
|
||||
std::vector<idx_t> ids(del_size);
|
||||
std::copy_n(uids_ptr, del_size, ids.data());
|
||||
|
||||
|
@ -542,8 +549,11 @@ SegmentSealedImpl::search_ids(const IdArray& id_array, Timestamp timestamp) cons
|
|||
return primary_key_index_->do_search_ids(id_array);
|
||||
}
|
||||
|
||||
void
|
||||
SegmentSealedImpl::Delete(int64_t row_count, const int64_t* uids_raw, const Timestamp* timestamps_raw) {
|
||||
Status
|
||||
SegmentSealedImpl::Delete(int64_t reserved_offset,
|
||||
int64_t row_count,
|
||||
const int64_t* uids_raw,
|
||||
const Timestamp* timestamps_raw) {
|
||||
std::vector<std::tuple<Timestamp, idx_t>> ordering(row_count);
|
||||
for (int i = 0; i < row_count; i++) {
|
||||
ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i]);
|
||||
|
@ -558,10 +568,10 @@ SegmentSealedImpl::Delete(int64_t row_count, const int64_t* uids_raw, const Time
|
|||
src_uids[i] = uid;
|
||||
}
|
||||
auto current_size = deleted_record_.record_size_;
|
||||
deleted_record_.timestamps_.set_data(current_size, src_timestamps.data(), row_count);
|
||||
deleted_record_.uids_.set_data(current_size, src_uids.data(), row_count);
|
||||
deleted_record_.ack_responder_.AddSegment(current_size, row_count);
|
||||
return;
|
||||
deleted_record_.timestamps_.set_data(reserved_offset, src_timestamps.data(), row_count);
|
||||
deleted_record_.uids_.set_data(reserved_offset, src_uids.data(), row_count);
|
||||
deleted_record_.ack_responder_.AddSegment(reserved_offset, row_count);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
std::vector<SegOffset>
|
||||
|
|
|
@ -78,6 +78,12 @@ class SegmentSealedImpl : public SegmentSealed {
|
|||
std::string
|
||||
debug() const override;
|
||||
|
||||
int64_t
|
||||
PreDelete(int64_t size) override;
|
||||
|
||||
Status
|
||||
Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override;
|
||||
|
||||
protected:
|
||||
// blob and row_count
|
||||
SpanBase
|
||||
|
@ -154,9 +160,6 @@ class SegmentSealedImpl : public SegmentSealed {
|
|||
std::vector<SegOffset>
|
||||
search_ids(const boost::dynamic_bitset<>& view, Timestamp timestamp) const override;
|
||||
|
||||
void
|
||||
Delete(int64_t row_count, const int64_t* uids_raw, const Timestamp* timestamps_raw) override;
|
||||
|
||||
// virtual void
|
||||
// build_index_if_primary_key(FieldId field_id);
|
||||
|
||||
|
|
|
@ -151,7 +151,7 @@ Delete(CSegmentInterface c_segment,
|
|||
int64_t size,
|
||||
const int64_t* row_ids,
|
||||
const uint64_t* timestamps) {
|
||||
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
|
||||
auto segment = (milvus::segcore::SegmentInterface*)c_segment;
|
||||
|
||||
try {
|
||||
auto res = segment->Delete(reserved_offset, size, row_ids, timestamps);
|
||||
|
@ -163,7 +163,7 @@ Delete(CSegmentInterface c_segment,
|
|||
|
||||
int64_t
|
||||
PreDelete(CSegmentInterface c_segment, int64_t size) {
|
||||
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
|
||||
auto segment = (milvus::segcore::SegmentInterface*)c_segment;
|
||||
|
||||
return segment->PreDelete(size);
|
||||
}
|
||||
|
|
|
@ -384,5 +384,8 @@ TEST(Sealed, Delete) {
|
|||
int64_t new_count = 3;
|
||||
std::vector<idx_t> new_pks{6, 7, 8};
|
||||
std::vector<idx_t> new_timestamps{10, 10, 10};
|
||||
segment->Delete(new_count, reinterpret_cast<const int64_t*>(new_pks.data()), reinterpret_cast<const Timestamp*>(new_timestamps.data()));
|
||||
auto reserved_offset = segment->PreDelete(new_count);
|
||||
ASSERT_EQ(reserved_offset, row_count);
|
||||
segment->Delete(reserved_offset, new_count, reinterpret_cast<const int64_t*>(new_pks.data()),
|
||||
reinterpret_cast<const Timestamp*>(new_timestamps.data()));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue