From e19d17076f21183883fc8acd648e1a112f8c42fb Mon Sep 17 00:00:00 2001 From: chyezh Date: Tue, 16 Apr 2024 11:17:20 +0800 Subject: [PATCH] fix: delete may lost when enable lru cache, some field should be reset when ReleaseData (#32012) issue: #30361 - Delete may be lost when segment is not data-loaded status in lru cache. skip filtering to fix it. - `stats_` and `variable_fields_avg_size_` should be reset when `ReleaseData` - Remove repeat load delta log operation in lru. --------- Signed-off-by: chyezh --- internal/core/src/common/Types.h | 10 ++-- internal/core/src/segcore/DeletedRecord.h | 8 +++ .../core/src/segcore/SegmentGrowingImpl.cpp | 3 - .../core/src/segcore/SegmentGrowingImpl.h | 7 +++ internal/core/src/segcore/SegmentInterface.h | 9 +-- .../core/src/segcore/SegmentSealedImpl.cpp | 48 ++++++++-------- internal/core/src/segcore/SegmentSealedImpl.h | 7 +++ internal/core/unittest/test_sealed.cpp | 56 ++++++++++++++----- .../querynodev2/segments/segment_loader.go | 42 ++++++++------ 9 files changed, 121 insertions(+), 69 deletions(-) diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index f26f43e6e5..e22f1e230e 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -182,12 +182,12 @@ GetDataTypeName(DataType data_type) { } inline size_t -CalcPksSize(const std::vector& pks) { +CalcPksSize(const PkType* data, size_t n) { size_t size = 0; - for (auto& pk : pks) { - size += sizeof(pk); - if (std::holds_alternative(pk)) { - size += std::get(pk).size(); + for (size_t i = 0; i < n; ++i) { + size += sizeof(data[i]); + if (std::holds_alternative(data[i])) { + size += std::get(data[i]).size(); } } return size; diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index 93e56c81c3..f2f0e2d8a0 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -105,6 +105,8 @@ struct DeletedRecord { pks_.set_data_raw(n, pks.data() + divide_point, size); timestamps_.set_data_raw(n, timestamps + divide_point, size); n_ += size; + mem_size_ += sizeof(Timestamp) * size + + CalcPksSize(pks.data() + divide_point, size); } const ConcurrentVector& @@ -122,12 +124,18 @@ struct DeletedRecord { return n_.load(); } + size_t + mem_size() const { + return mem_size_.load(); + } + private: std::shared_ptr lru_; std::shared_mutex shared_mutex_; std::shared_mutex buffer_mutex_; std::atomic n_ = 0; + std::atomic mem_size_ = 0; ConcurrentVector timestamps_; ConcurrentVector pks_; }; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index d1ab15e491..3d1f277c43 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -395,7 +395,6 @@ SegmentGrowingImpl::Delete(int64_t reserved_begin, // step 2: fill delete record deleted_record_.push(sort_pks, sort_timestamps.data()); - stats_.mem_size += size * sizeof(Timestamp) + CalcPksSize(sort_pks); return SegcoreError::success(); } @@ -417,8 +416,6 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { // step 2: fill pks and timestamps deleted_record_.push(pks, timestamps); - - stats_.mem_size += info.row_count * sizeof(Timestamp) + CalcPksSize(pks); } SpanBase diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 903147f6b1..1cc308216b 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -130,6 +130,11 @@ class SegmentGrowingImpl : public SegmentGrowing { try_remove_chunks(FieldId fieldId); public: + size_t + GetMemoryUsageInBytes() const override { + return stats_.mem_size.load() + deleted_record_.mem_size(); + } + int64_t get_row_count() const override { return insert_record_.ack_responder_.GetAck(); @@ -305,6 +310,8 @@ class SegmentGrowingImpl : public SegmentGrowing { mutable DeletedRecord deleted_record_; int64_t id_; + + SegmentStats stats_{}; }; const static IndexMetaPtr empty_index_meta = diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 030977fb15..0b24cecb83 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -69,10 +69,8 @@ class SegmentInterface { Timestamp timestamp, int64_t limit_size) const = 0; - size_t - GetMemoryUsageInBytes() const { - return stats_.mem_size; - }; + virtual size_t + GetMemoryUsageInBytes() const = 0; virtual int64_t get_row_count() const = 0; @@ -120,9 +118,6 @@ class SegmentInterface { virtual bool HasRawData(int64_t field_id) const = 0; - - protected: - SegmentStats stats_{}; }; // internal API for DSL calculation diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index cda939b582..18f4bfcee3 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -627,8 +627,6 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { // step 2: fill pks and timestamps deleted_record_.push(pks, timestamps); - - stats_.mem_size += sizeof(Timestamp) * info.row_count + CalcPksSize(pks); } void @@ -1125,15 +1123,20 @@ SegmentSealedImpl::bulk_subscript_impl(int64_t element_sizeof, void SegmentSealedImpl::ClearData() { - field_data_ready_bitset_.reset(); - index_ready_bitset_.reset(); - binlog_index_bitset_.reset(); - system_ready_count_ = 0; - num_rows_ = std::nullopt; - scalar_indexings_.clear(); - vector_indexings_.clear(); - insert_record_.clear(); - fields_.clear(); + { + std::unique_lock lck(mutex_); + field_data_ready_bitset_.reset(); + index_ready_bitset_.reset(); + binlog_index_bitset_.reset(); + system_ready_count_ = 0; + num_rows_ = std::nullopt; + scalar_indexings_.clear(); + vector_indexings_.clear(); + insert_record_.clear(); + fields_.clear(); + variable_fields_avg_size_.clear(); + stats_.mem_size = 0; + } auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache(); if (cc == nullptr) { return; @@ -1459,14 +1462,18 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated for (int i = 0; i < size; i++) { ordering[i] = std::make_tuple(timestamps_raw[i], pks[i]); } - auto end = - std::remove_if(ordering.begin(), - ordering.end(), - [&](const std::tuple& record) { - return !insert_record_.contain(std::get<1>(record)); - }); - size = end - ordering.begin(); - ordering.resize(size); + // if insert_record_ is empty (may be only-load meta but not data for lru-cache at go side), + // filtering may cause the deletion lost, skip the filtering to avoid it. + if (!insert_record_.empty_pks()) { + auto end = std::remove_if( + ordering.begin(), + ordering.end(), + [&](const std::tuple& record) { + return !insert_record_.contain(std::get<1>(record)); + }); + size = end - ordering.begin(); + ordering.resize(size); + } if (size == 0) { return SegcoreError::success(); } @@ -1483,9 +1490,6 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated } deleted_record_.push(sort_pks, sort_timestamps.data()); - - stats_.mem_size += - sizeof(Timestamp) * sort_pks.size() + CalcPksSize(sort_pks); return SegcoreError::success(); } diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 88de1b674b..309a286040 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -89,6 +89,11 @@ class SegmentSealedImpl : public SegmentSealed { GetFieldDataType(FieldId fieldId) const override; public: + size_t + GetMemoryUsageInBytes() const override { + return stats_.mem_size.load() + deleted_record_.mem_size(); + } + int64_t get_row_count() const override; @@ -302,6 +307,8 @@ class SegmentSealedImpl : public SegmentSealed { SegcoreConfig segcore_config_; std::unordered_map> vec_binlog_config_; + + SegmentStats stats_{}; }; inline SegmentSealedUPtr diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 42bc859f4c..1fed5034ce 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -1229,23 +1229,51 @@ TEST(Sealed, BF_Overflow) { } TEST(Sealed, DeleteCount) { - auto schema = std::make_shared(); - auto pk = schema->AddDebugField("pk", DataType::INT64); - schema->set_primary_field_id(pk); - auto segment = CreateSealedSegment(schema); + { + auto schema = std::make_shared(); + auto pk = schema->AddDebugField("pk", DataType::INT64); + schema->set_primary_field_id(pk); + auto segment = CreateSealedSegment(schema); - int64_t c = 10; - auto offset = segment->get_deleted_count(); - ASSERT_EQ(offset, 0); + int64_t c = 10; + auto offset = segment->get_deleted_count(); + ASSERT_EQ(offset, 0); - Timestamp begin_ts = 100; - auto tss = GenTss(c, begin_ts); - auto pks = GenPKs(c, 0); - auto status = segment->Delete(offset, c, pks.get(), tss.data()); - ASSERT_TRUE(status.ok()); + Timestamp begin_ts = 100; + auto tss = GenTss(c, begin_ts); + auto pks = GenPKs(c, 0); + auto status = segment->Delete(offset, c, pks.get(), tss.data()); + ASSERT_TRUE(status.ok()); - auto cnt = segment->get_deleted_count(); - ASSERT_EQ(cnt, 0); + // shouldn't be filtered for empty segment. + auto cnt = segment->get_deleted_count(); + ASSERT_EQ(cnt, 10); + } + { + auto schema = std::make_shared(); + auto pk = schema->AddDebugField("pk", DataType::INT64); + schema->set_primary_field_id(pk); + auto segment = CreateSealedSegment(schema); + + int64_t c = 10; + auto dataset = DataGen(schema, c); + auto pks = dataset.get_col(pk); + SealedLoadFieldData(dataset, *segment); + + auto offset = segment->get_deleted_count(); + ASSERT_EQ(offset, 0); + + auto iter = std::max_element(pks.begin(), pks.end()); + auto delete_pks = GenPKs(c, *iter); + Timestamp begin_ts = 100; + auto tss = GenTss(c, begin_ts); + auto status = segment->Delete(offset, c, delete_pks.get(), tss.data()); + ASSERT_TRUE(status.ok()); + + // 9 of element should be filtered. + auto cnt = segment->get_deleted_count(); + ASSERT_EQ(cnt, 1); + } } TEST(Sealed, RealCount) { diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 2e1a303a7d..1013118a17 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -654,35 +654,39 @@ func (loader *segmentLoader) Load(ctx context.Context, newSegments.Insert(loadInfo.GetSegmentID(), segment) } - loadSegmentFunc := func(idx int) error { + loadSegmentFunc := func(idx int) (err error) { loadInfo := infos[idx] partitionID := loadInfo.PartitionID segmentID := loadInfo.SegmentID segment, _ := newSegments.Get(segmentID) + logger := log.With(zap.Int64("partitionID", partitionID), + zap.Int64("segmentID", segmentID), + zap.String("segmentType", loadInfo.GetLevel().String())) metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Inc() - defer metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Dec() - + defer func() { + metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Dec() + if err != nil { + logger.Warn("load segment failed when load data into memory", zap.Error(err)) + } + logger.Info("load segment done") + }() tr := timerecord.NewTimeRecorder("loadDurationPerSegment") + logger.Info("load segment...") - var err error - if loadInfo.GetLevel() == datapb.SegmentLevel_L0 { - err = loader.LoadDeltaLogs(ctx, segment, loadInfo.GetDeltalogs()) - } else { - err = loader.LoadSegment(ctx, segment.(*LocalSegment), loadInfo, loadStatus) + // L0 segment has no index or data to be load + if loadInfo.GetLevel() != datapb.SegmentLevel_L0 { + if err = loader.LoadSegment(ctx, segment.(*LocalSegment), loadInfo, loadStatus); err != nil { + return errors.Wrap(err, "At LoadSegment") + } } - if err != nil { - log.Warn("load segment failed when load data into memory", - zap.Int64("partitionID", partitionID), - zap.Int64("segmentID", segmentID), - zap.Error(err), - ) - return err + if err = loader.LoadDeltaLogs(ctx, segment, loadInfo.GetDeltalogs()); err != nil { + return errors.Wrap(err, "At LoadDeltaLogs") } + loader.manager.Segment.Put(segmentType, segment) newSegments.GetAndRemove(segmentID) loaded.Insert(segmentID, segment) - log.Info("load segment done", zap.Int64("segmentID", segmentID)) loader.notifyLoadFinish(loadInfo) metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds())) @@ -1096,8 +1100,7 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context, strconv.FormatInt(int64(len(segment.Indexes())), 10), ).Add(float64(loadInfo.GetNumOfRows())) - log.Info("loading delta...") - return loader.LoadDeltaLogs(ctx, segment, loadInfo.Deltalogs) + return nil } func (loader *segmentLoader) filterPKStatsBinlogs(fieldBinlogs []*datapb.FieldBinlog, pkFieldID int64) ([]string, storage.StatsLogType) { @@ -1279,7 +1282,10 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, defer sp.End() log := log.Ctx(ctx).With( zap.Int64("segmentID", segment.ID()), + zap.Int("deltaNum", len(deltaLogs)), ) + log.Info("loading delta...") + dCodec := storage.DeleteCodec{} var blobs []*storage.Blob var futures []*conc.Future[any]