From 7acb48919b213415ade02009a04326373b54d0e4 Mon Sep 17 00:00:00 2001 From: yukun Date: Sun, 24 Oct 2021 14:19:10 +0800 Subject: [PATCH] Add Delete func in Segcore SegmentSealed (#10501) Signed-off-by: fishpenguin --- internal/core/src/segcore/SegmentGrowing.h | 8 +++---- internal/core/src/segcore/SegmentInterface.h | 6 +++++ internal/core/src/segcore/SegmentSealed.h | 2 -- .../core/src/segcore/SegmentSealedImpl.cpp | 24 +++++++++++++------ internal/core/src/segcore/SegmentSealedImpl.h | 9 ++++--- internal/core/src/segcore/segment_c.cpp | 4 ++-- internal/core/unittest/test_sealed.cpp | 5 +++- 7 files changed, 39 insertions(+), 19 deletions(-) diff --git a/internal/core/src/segcore/SegmentGrowing.h b/internal/core/src/segcore/SegmentGrowing.h index 7207a660ec..4b8e950e3a 100644 --- a/internal/core/src/segcore/SegmentGrowing.h +++ b/internal/core/src/segcore/SegmentGrowing.h @@ -60,11 +60,11 @@ class SegmentGrowing : public SegmentInternalInterface { const Timestamp* timestamps, const ColumnBasedRawData& values) = 0; - virtual int64_t - PreDelete(int64_t size) = 0; + // virtual int64_t + // PreDelete(int64_t size) = 0; - virtual Status - Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0; + // virtual Status + // Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0; public: virtual ssize_t diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index e2af536410..b3c18fc227 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -52,6 +52,12 @@ class SegmentInterface { virtual const Schema& get_schema() const = 0; + virtual int64_t + PreDelete(int64_t size) = 0; + + virtual Status + Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0; + virtual ~SegmentInterface() = default; protected: diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index 11916f881c..12a3b23a52 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -36,8 +36,6 @@ class SegmentSealed : public SegmentInternalInterface { HasIndex(FieldId field_id) const = 0; virtual bool HasFieldData(FieldId field_id) const = 0; - virtual void - Delete(int64_t row_count, const int64_t* uids_raw, const Timestamp* timestamps_raw) = 0; }; using SegmentSealedPtr = std::unique_ptr; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 1c8377b454..337a985d72 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -26,6 +26,12 @@ get_bit(const boost::dynamic_bitset<>& bitset, FieldOffset field_offset) { return bitset[field_offset.get()]; } +int64_t +SegmentSealedImpl::PreDelete(int64_t size) { + auto reserved_begin = deleted_record_.reserved.fetch_add(size); + return reserved_begin; +} + void SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { // NOTE: lock only when data is ready to avoid starvation @@ -170,6 +176,7 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { deleted_record_.uids_.set_data(0, primary_keys, size); deleted_record_.timestamps_.set_data(0, timestamps, size); deleted_record_.ack_responder_.AddSegment(0, size); + deleted_record_.reserved.fetch_add(size); deleted_record_.record_size_ = size; } @@ -246,7 +253,7 @@ SegmentSealedImpl::get_deleted_bitmap(int64_t del_barrier, // Sealed segment only has one chunk with chunk_id 0 auto span = deleted_record_.uids_.get_span_base(0); auto uids_ptr = reinterpret_cast(span.data()); - auto del_size = deleted_record_.record_size_; + auto del_size = deleted_record_.reserved.load(); std::vector ids(del_size); std::copy_n(uids_ptr, del_size, ids.data()); @@ -542,8 +549,11 @@ SegmentSealedImpl::search_ids(const IdArray& id_array, Timestamp timestamp) cons return primary_key_index_->do_search_ids(id_array); } -void -SegmentSealedImpl::Delete(int64_t row_count, const int64_t* uids_raw, const Timestamp* timestamps_raw) { +Status +SegmentSealedImpl::Delete(int64_t reserved_offset, + int64_t row_count, + const int64_t* uids_raw, + const Timestamp* timestamps_raw) { std::vector> ordering(row_count); for (int i = 0; i < row_count; i++) { ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i]); @@ -558,10 +568,10 @@ SegmentSealedImpl::Delete(int64_t row_count, const int64_t* uids_raw, const Time src_uids[i] = uid; } auto current_size = deleted_record_.record_size_; - deleted_record_.timestamps_.set_data(current_size, src_timestamps.data(), row_count); - deleted_record_.uids_.set_data(current_size, src_uids.data(), row_count); - deleted_record_.ack_responder_.AddSegment(current_size, row_count); - return; + deleted_record_.timestamps_.set_data(reserved_offset, src_timestamps.data(), row_count); + deleted_record_.uids_.set_data(reserved_offset, src_uids.data(), row_count); + deleted_record_.ack_responder_.AddSegment(reserved_offset, row_count); + return Status::OK(); } std::vector diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 9db3e0affd..f5c01d03c8 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -78,6 +78,12 @@ class SegmentSealedImpl : public SegmentSealed { std::string debug() const override; + int64_t + PreDelete(int64_t size) override; + + Status + Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override; + protected: // blob and row_count SpanBase @@ -154,9 +160,6 @@ class SegmentSealedImpl : public SegmentSealed { std::vector search_ids(const boost::dynamic_bitset<>& view, Timestamp timestamp) const override; - void - Delete(int64_t row_count, const int64_t* uids_raw, const Timestamp* timestamps_raw) override; - // virtual void // build_index_if_primary_key(FieldId field_id); diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 20d9f0bd5b..85fe691de4 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -151,7 +151,7 @@ Delete(CSegmentInterface c_segment, int64_t size, const int64_t* row_ids, const uint64_t* timestamps) { - auto segment = (milvus::segcore::SegmentGrowing*)c_segment; + auto segment = (milvus::segcore::SegmentInterface*)c_segment; try { auto res = segment->Delete(reserved_offset, size, row_ids, timestamps); @@ -163,7 +163,7 @@ Delete(CSegmentInterface c_segment, int64_t PreDelete(CSegmentInterface c_segment, int64_t size) { - auto segment = (milvus::segcore::SegmentGrowing*)c_segment; + auto segment = (milvus::segcore::SegmentInterface*)c_segment; return segment->PreDelete(size); } diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 253f4d3368..268d852111 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -384,5 +384,8 @@ TEST(Sealed, Delete) { int64_t new_count = 3; std::vector new_pks{6, 7, 8}; std::vector new_timestamps{10, 10, 10}; - segment->Delete(new_count, reinterpret_cast(new_pks.data()), reinterpret_cast(new_timestamps.data())); + auto reserved_offset = segment->PreDelete(new_count); + ASSERT_EQ(reserved_offset, row_count); + segment->Delete(reserved_offset, new_count, reinterpret_cast(new_pks.data()), + reinterpret_cast(new_timestamps.data())); }