Support delete in SegmentSealed (#10181)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/10429/head
yukun 2021-10-22 13:11:11 +08:00 committed by GitHub
parent 5a88ce29b0
commit fe1927c05d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 269 additions and 3 deletions

View File

@ -29,3 +29,9 @@ struct LoadFieldDataInfo {
const void* blob = nullptr;
int64_t row_count = -1;
};
struct LoadDeletedRecordInfo {
const void* timestamps = nullptr;
const void* primary_keys = nullptr;
int64_t row_count = -1;
};

View File

@ -47,6 +47,12 @@ typedef struct CLoadFieldDataInfo {
int64_t row_count;
} CLoadFieldDataInfo;
typedef struct CLoadDeletedRecordInfo {
void* timestamps;
void* primary_keys;
int64_t row_count;
} CLoadDeletedRecordInfo;
typedef struct CProtoResult {
CStatus status;
CProto proto;

View File

@ -60,6 +60,7 @@ struct DeletedRecord {
AckResponder ack_responder_;
ConcurrentVector<Timestamp> timestamps_;
ConcurrentVector<idx_t> uids_;
int64_t record_size_ = 0;
private:
std::shared_ptr<TmpBitmap> lru_;

View File

@ -47,6 +47,33 @@ ScalarIndexVector::do_search_ids(const IdArray& ids) const {
}
return {std::move(res_ids), std::move(dst_offsets)};
}
std::pair<std::vector<idx_t>, std::vector<SegOffset>>
ScalarIndexVector::do_search_ids(const std::vector<idx_t>& ids) const {
std::vector<SegOffset> dst_offsets;
std::vector<idx_t> dst_ids;
for (auto id : ids) {
using Pair = std::pair<T, SegOffset>;
auto [iter_beg, iter_end] =
std::equal_range(mapping_.begin(), mapping_.end(), std::make_pair(id, SegOffset(0)),
[](const Pair& left, const Pair& right) { return left.first < right.first; });
if (iter_beg == iter_end) {
// no data
continue;
}
// TODO: for repeated key, decide the final offset with Timestamp
// no repeated key, simplified logic
AssertInfo(iter_beg + 1 == iter_end, "There are no repeated keys in more than one results");
auto [entry_id, entry_offset] = *iter_beg;
dst_ids.push_back(entry_id);
dst_offsets.push_back(entry_offset);
}
return {std::move(dst_ids), std::move(dst_offsets)};
}
void
ScalarIndexVector::append_data(const ScalarIndexVector::T* ids, int64_t count, SegOffset base) {
for (int64_t i = 0; i < count; ++i) {

View File

@ -25,6 +25,8 @@ class ScalarIndexBase {
public:
virtual std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
do_search_ids(const IdArray& ids) const = 0;
virtual std::pair<std::vector<idx_t>, std::vector<SegOffset>>
do_search_ids(const std::vector<idx_t>& ids) const = 0;
virtual ~ScalarIndexBase() = default;
virtual std::string
debug() const = 0;
@ -44,6 +46,9 @@ class ScalarIndexVector : public ScalarIndexBase {
std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
do_search_ids(const IdArray& ids) const override;
std::pair<std::vector<idx_t>, std::vector<SegOffset>>
do_search_ids(const std::vector<idx_t>& ids) const override;
std::string
debug() const override {
std::string dbg_str;

View File

@ -27,6 +27,8 @@ class SegmentSealed : public SegmentInternalInterface {
virtual void
LoadFieldData(const LoadFieldDataInfo& info) = 0;
virtual void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) = 0;
virtual void
DropIndex(const FieldId field_id) = 0;
virtual void
DropFieldData(const FieldId field_id) = 0;

View File

@ -158,6 +158,21 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
}
}
void
SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
AssertInfo(info.row_count > 0, "The row count of deleted record is 0");
AssertInfo(info.primary_keys, "Deleted primary keys is null");
AssertInfo(info.timestamps, "Deleted timestamps is null");
auto primary_keys = reinterpret_cast<const idx_t*>(info.primary_keys);
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);
int64_t size = info.row_count;
deleted_record_.uids_.set_data(0, primary_keys, size);
deleted_record_.timestamps_.set_data(0, timestamps, size);
deleted_record_.ack_responder_.AddSegment(0, size);
deleted_record_.record_size_ = size;
}
int64_t
SegmentSealedImpl::num_chunk_index(FieldOffset field_offset) const {
return 1;
@ -212,10 +227,74 @@ SegmentSealedImpl::get_schema() const {
return *schema_;
}
std::shared_ptr<DeletedRecord::TmpBitmap>
SegmentSealedImpl::get_deleted_bitmap(int64_t del_barrier,
Timestamp query_timestamp,
int64_t insert_barrier,
bool force) const {
auto old = deleted_record_.get_lru_entry();
if (old->bitmap_ptr->count() == insert_barrier) {
if (old->del_barrier == del_barrier) {
return old;
}
}
auto current = old->clone(insert_barrier);
current->del_barrier = del_barrier;
auto bitmap = current->bitmap_ptr;
// Sealed segment only has one chunk with chunk_id 0
auto span = deleted_record_.uids_.get_span_base(0);
auto uids_ptr = reinterpret_cast<const idx_t*>(span.data());
auto del_size = deleted_record_.record_size_;
std::vector<idx_t> ids(del_size);
std::copy_n(uids_ptr, del_size, ids.data());
auto [uids, seg_offsets] = primary_key_index_->do_search_ids(ids);
if (del_barrier < old->del_barrier) {
for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) {
int64_t the_offset = seg_offsets[del_index].get();
AssertInfo(the_offset > 0, "Seg offset is invalid");
if (deleted_record_.timestamps_[del_index] < query_timestamp) {
bitmap->clear(the_offset);
}
}
return current;
} else {
for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) {
int64_t the_offset = seg_offsets[del_index].get();
AssertInfo(the_offset > 0, "Seg offset is invalid");
if (deleted_record_.timestamps_[del_index] < query_timestamp) {
bitmap->set(the_offset);
}
}
this->deleted_record_.insert_lru_entry(current);
}
return current;
}
BitsetView
SegmentSealedImpl::get_filtered_bitmap(const BitsetView& bitset, int64_t ins_barrier, Timestamp timestamp) const {
// TODO(yukun)
return bitset;
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return bitset;
}
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
if (bitmap_holder == nullptr) {
return bitset;
}
AssertInfo(bitmap_holder, "bitmap_holder is null");
auto deleted_bitmap = bitmap_holder->bitmap_ptr;
if (bitset.size() == 0) {
return BitsetView(deleted_bitmap);
}
AssertInfo(deleted_bitmap->count() == bitset.size(), "Deleted bitmap count not equal to filtered bitmap count");
auto filtered_bitmap = std::make_shared<faiss::ConcurrentBitset>(bitset.size(), bitset.data());
auto final_bitmap = (*deleted_bitmap.get()) | (*filtered_bitmap.get());
return BitsetView(final_bitmap);
}
void
@ -463,6 +542,27 @@ SegmentSealedImpl::search_ids(const IdArray& id_array, Timestamp timestamp) cons
return primary_key_index_->do_search_ids(id_array);
}
void
SegmentSealedImpl::Delete(int64_t row_count, const int64_t* uids_raw, const Timestamp* timestamps_raw) {
std::vector<std::tuple<Timestamp, idx_t>> ordering(row_count);
for (int i = 0; i < row_count; i++) {
ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i]);
}
std::sort(ordering.begin(), ordering.end());
std::vector<idx_t> src_uids(row_count);
std::vector<Timestamp> src_timestamps(row_count);
for (int i = 0; i < row_count; i++) {
auto [t, uid] = ordering[i];
src_timestamps[i] = t;
src_uids[i] = uid;
}
deleted_record_.timestamps_.set_data(0, src_timestamps.data(), row_count);
deleted_record_.uids_.set_data(0, src_uids.data(), row_count);
deleted_record_.ack_responder_.AddSegment(0, row_count);
return;
}
std::vector<SegOffset>
SegmentSealedImpl::search_ids(const boost::dynamic_bitset<>& bitset, Timestamp timestamp) const {
std::vector<SegOffset> dst_offset;

View File

@ -10,9 +10,15 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <tbb/concurrent_priority_queue.h>
#include <tbb/concurrent_unordered_map.h>
#include <tbb/concurrent_vector.h>
#include <segcore/TimestampIndex.h>
#include "segcore/SegmentSealed.h"
#include "ConcurrentVector.h"
#include "SealedIndexingRecord.h"
#include "segcore/DeletedRecord.h"
#include "ScalarIndex.h"
#include <deque>
#include <map>
@ -30,6 +36,8 @@ class SegmentSealedImpl : public SegmentSealed {
void
LoadFieldData(const LoadFieldDataInfo& info) override;
void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) override;
void
LoadSegmentMeta(const milvus::proto::segcore::LoadSegmentMeta& segment_meta) override;
void
DropIndex(const FieldId field_id) override;
@ -50,6 +58,12 @@ class SegmentSealedImpl : public SegmentSealed {
const Schema&
get_schema() const override;
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier,
Timestamp query_timestamp,
int64_t insert_barrier,
bool force = false) const;
public:
int64_t
num_chunk_index(FieldOffset field_offset) const override;
@ -126,12 +140,20 @@ class SegmentSealedImpl : public SegmentSealed {
return system_ready_count_ == 2;
}
const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}
std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
search_ids(const IdArray& id_array, Timestamp timestamp) const override;
std::vector<SegOffset>
search_ids(const boost::dynamic_bitset<>& view, Timestamp timestamp) const override;
void
Delete(int64_t row_count, const int64_t* uids_raw, const Timestamp* timestamps_raw);
// virtual void
// build_index_if_primary_key(FieldId field_id);
@ -151,6 +173,7 @@ class SegmentSealedImpl : public SegmentSealed {
std::unique_ptr<ScalarIndexBase> primary_key_index_;
std::vector<aligned_vector<char>> fields_data_;
mutable DeletedRecord deleted_record_;
SealedIndexingRecord vecindexs_;
aligned_vector<idx_t> row_ids_;

View File

@ -184,6 +184,21 @@ LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_in
}
}
CStatus
LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_record_info) {
try {
auto segment_interface = reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto segment = dynamic_cast<milvus::segcore::SegmentSealed*>(segment_interface);
AssertInfo(segment != nullptr, "segment conversion failed");
auto load_info = LoadDeletedRecordInfo{deleted_record_info.timestamps, deleted_record_info.primary_keys,
deleted_record_info.row_count};
segment->LoadDeletedRecord(load_info);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(UnexpectedError, e.what());
}
}
CStatus
UpdateSealedSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info) {
try {

View File

@ -313,4 +313,71 @@ TEST(Sealed, LoadFieldData) {
]
])");
ASSERT_EQ(std_json.dump(-2), json.dump(-2));
}
}
TEST(Sealed, Delete) {
auto dim = 16;
auto topK = 5;
auto N = 10;
auto metric_type = MetricType::METRIC_L2;
auto schema = std::make_shared<Schema>();
auto fakevec_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type);
auto counter_id = schema->AddDebugField("counter", DataType::INT64);
auto double_id = schema->AddDebugField("double", DataType::DOUBLE);
auto nothing_id = schema->AddDebugField("nothing", DataType::INT32);
auto dataset = DataGen(schema, N);
auto fakevec = dataset.get_col<float>(0);
auto segment = CreateSealedSegment(schema);
std::string dsl = R"({
"bool": {
"must": [
{
"range": {
"double": {
"GE": -1,
"LT": 1
}
}
},
{
"vector": {
"fakevec": {
"metric_type": "L2",
"params": {
"nprobe": 10
},
"query": "$0",
"topk": 5,
"round_decimal": 3
}
}
}
]
}
})";
Timestamp time = 1000000;
auto plan = CreatePlan(*schema, dsl);
auto num_queries = 5;
auto ph_group_raw = CreatePlaceholderGroup(num_queries, 16, 1024);
auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
ASSERT_ANY_THROW(segment->Search(plan.get(), *ph_group, time));
SealedLoader(dataset, *segment);
int64_t row_count = 5;
std::vector<idx_t> pks{1, 2, 3, 4, 5};
std::vector<Timestamp> timestamps{10, 10, 10, 10, 10};
LoadDeletedRecordInfo info = {timestamps.data(), pks.data(), row_count};
segment->LoadDeletedRecord(info);
std::vector<uint8_t> tmp_block{0, 0};
auto view = BitsetView(tmp_block.data(), 10);
auto bitset = segment->get_filtered_bitmap(view, 10, 11);
ASSERT_EQ(bitset.size(), N);
}

View File

@ -789,6 +789,20 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int, data interfa
return nil
}
func (s *Segment) LoadDeletedRecord(primaryKeys []IntPrimaryKey) error {
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
}
if s.segmentType != segmentTypeSealed {
errMsg := fmt.Sprintln("segmentLoadFieldData failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID())
return errors.New(errMsg)
}
return nil
}
func (s *Segment) dropFieldData(fieldID int64) error {
/*
CStatus