fix: delete may lost when enable lru cache, some field should be reset when ReleaseData (#32012)

issue: #30361

- Delete may be lost when segment is not data-loaded status in lru
cache. skip filtering to fix it.

- `stats_` and `variable_fields_avg_size_` should be reset when
`ReleaseData`

- Remove repeat load delta log operation in lru.

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/32182/head
chyezh 2024-04-16 11:17:20 +08:00 committed by GitHub
parent 55d894bd5e
commit e19d17076f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 121 additions and 69 deletions

View File

@ -182,12 +182,12 @@ GetDataTypeName(DataType data_type) {
}
inline size_t
CalcPksSize(const std::vector<PkType>& pks) {
CalcPksSize(const PkType* data, size_t n) {
size_t size = 0;
for (auto& pk : pks) {
size += sizeof(pk);
if (std::holds_alternative<std::string>(pk)) {
size += std::get<std::string>(pk).size();
for (size_t i = 0; i < n; ++i) {
size += sizeof(data[i]);
if (std::holds_alternative<std::string>(data[i])) {
size += std::get<std::string>(data[i]).size();
}
}
return size;

View File

@ -105,6 +105,8 @@ struct DeletedRecord {
pks_.set_data_raw(n, pks.data() + divide_point, size);
timestamps_.set_data_raw(n, timestamps + divide_point, size);
n_ += size;
mem_size_ += sizeof(Timestamp) * size +
CalcPksSize(pks.data() + divide_point, size);
}
const ConcurrentVector<Timestamp>&
@ -122,12 +124,18 @@ struct DeletedRecord {
return n_.load();
}
size_t
mem_size() const {
return mem_size_.load();
}
private:
std::shared_ptr<TmpBitmap> lru_;
std::shared_mutex shared_mutex_;
std::shared_mutex buffer_mutex_;
std::atomic<int64_t> n_ = 0;
std::atomic<int64_t> mem_size_ = 0;
ConcurrentVector<Timestamp> timestamps_;
ConcurrentVector<PkType> pks_;
};

View File

@ -395,7 +395,6 @@ SegmentGrowingImpl::Delete(int64_t reserved_begin,
// step 2: fill delete record
deleted_record_.push(sort_pks, sort_timestamps.data());
stats_.mem_size += size * sizeof(Timestamp) + CalcPksSize(sort_pks);
return SegcoreError::success();
}
@ -417,8 +416,6 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
// step 2: fill pks and timestamps
deleted_record_.push(pks, timestamps);
stats_.mem_size += info.row_count * sizeof(Timestamp) + CalcPksSize(pks);
}
SpanBase

View File

@ -130,6 +130,11 @@ class SegmentGrowingImpl : public SegmentGrowing {
try_remove_chunks(FieldId fieldId);
public:
size_t
GetMemoryUsageInBytes() const override {
return stats_.mem_size.load() + deleted_record_.mem_size();
}
int64_t
get_row_count() const override {
return insert_record_.ack_responder_.GetAck();
@ -305,6 +310,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
mutable DeletedRecord deleted_record_;
int64_t id_;
SegmentStats stats_{};
};
const static IndexMetaPtr empty_index_meta =

View File

@ -69,10 +69,8 @@ class SegmentInterface {
Timestamp timestamp,
int64_t limit_size) const = 0;
size_t
GetMemoryUsageInBytes() const {
return stats_.mem_size;
};
virtual size_t
GetMemoryUsageInBytes() const = 0;
virtual int64_t
get_row_count() const = 0;
@ -120,9 +118,6 @@ class SegmentInterface {
virtual bool
HasRawData(int64_t field_id) const = 0;
protected:
SegmentStats stats_{};
};
// internal API for DSL calculation

View File

@ -627,8 +627,6 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
// step 2: fill pks and timestamps
deleted_record_.push(pks, timestamps);
stats_.mem_size += sizeof(Timestamp) * info.row_count + CalcPksSize(pks);
}
void
@ -1125,15 +1123,20 @@ SegmentSealedImpl::bulk_subscript_impl(int64_t element_sizeof,
void
SegmentSealedImpl::ClearData() {
field_data_ready_bitset_.reset();
index_ready_bitset_.reset();
binlog_index_bitset_.reset();
system_ready_count_ = 0;
num_rows_ = std::nullopt;
scalar_indexings_.clear();
vector_indexings_.clear();
insert_record_.clear();
fields_.clear();
{
std::unique_lock lck(mutex_);
field_data_ready_bitset_.reset();
index_ready_bitset_.reset();
binlog_index_bitset_.reset();
system_ready_count_ = 0;
num_rows_ = std::nullopt;
scalar_indexings_.clear();
vector_indexings_.clear();
insert_record_.clear();
fields_.clear();
variable_fields_avg_size_.clear();
stats_.mem_size = 0;
}
auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache();
if (cc == nullptr) {
return;
@ -1459,14 +1462,18 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
for (int i = 0; i < size; i++) {
ordering[i] = std::make_tuple(timestamps_raw[i], pks[i]);
}
auto end =
std::remove_if(ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
// if insert_record_ is empty (may be only-load meta but not data for lru-cache at go side),
// filtering may cause the deletion lost, skip the filtering to avoid it.
if (!insert_record_.empty_pks()) {
auto end = std::remove_if(
ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
}
if (size == 0) {
return SegcoreError::success();
}
@ -1483,9 +1490,6 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
}
deleted_record_.push(sort_pks, sort_timestamps.data());
stats_.mem_size +=
sizeof(Timestamp) * sort_pks.size() + CalcPksSize(sort_pks);
return SegcoreError::success();
}

View File

@ -89,6 +89,11 @@ class SegmentSealedImpl : public SegmentSealed {
GetFieldDataType(FieldId fieldId) const override;
public:
size_t
GetMemoryUsageInBytes() const override {
return stats_.mem_size.load() + deleted_record_.mem_size();
}
int64_t
get_row_count() const override;
@ -302,6 +307,8 @@ class SegmentSealedImpl : public SegmentSealed {
SegcoreConfig segcore_config_;
std::unordered_map<FieldId, std::unique_ptr<VecIndexConfig>>
vec_binlog_config_;
SegmentStats stats_{};
};
inline SegmentSealedUPtr

View File

@ -1229,23 +1229,51 @@ TEST(Sealed, BF_Overflow) {
}
TEST(Sealed, DeleteCount) {
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk);
auto segment = CreateSealedSegment(schema);
{
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk);
auto segment = CreateSealedSegment(schema);
int64_t c = 10;
auto offset = segment->get_deleted_count();
ASSERT_EQ(offset, 0);
int64_t c = 10;
auto offset = segment->get_deleted_count();
ASSERT_EQ(offset, 0);
Timestamp begin_ts = 100;
auto tss = GenTss(c, begin_ts);
auto pks = GenPKs(c, 0);
auto status = segment->Delete(offset, c, pks.get(), tss.data());
ASSERT_TRUE(status.ok());
Timestamp begin_ts = 100;
auto tss = GenTss(c, begin_ts);
auto pks = GenPKs(c, 0);
auto status = segment->Delete(offset, c, pks.get(), tss.data());
ASSERT_TRUE(status.ok());
auto cnt = segment->get_deleted_count();
ASSERT_EQ(cnt, 0);
// shouldn't be filtered for empty segment.
auto cnt = segment->get_deleted_count();
ASSERT_EQ(cnt, 10);
}
{
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk);
auto segment = CreateSealedSegment(schema);
int64_t c = 10;
auto dataset = DataGen(schema, c);
auto pks = dataset.get_col<int64_t>(pk);
SealedLoadFieldData(dataset, *segment);
auto offset = segment->get_deleted_count();
ASSERT_EQ(offset, 0);
auto iter = std::max_element(pks.begin(), pks.end());
auto delete_pks = GenPKs(c, *iter);
Timestamp begin_ts = 100;
auto tss = GenTss(c, begin_ts);
auto status = segment->Delete(offset, c, delete_pks.get(), tss.data());
ASSERT_TRUE(status.ok());
// 9 of element should be filtered.
auto cnt = segment->get_deleted_count();
ASSERT_EQ(cnt, 1);
}
}
TEST(Sealed, RealCount) {

View File

@ -654,35 +654,39 @@ func (loader *segmentLoader) Load(ctx context.Context,
newSegments.Insert(loadInfo.GetSegmentID(), segment)
}
loadSegmentFunc := func(idx int) error {
loadSegmentFunc := func(idx int) (err error) {
loadInfo := infos[idx]
partitionID := loadInfo.PartitionID
segmentID := loadInfo.SegmentID
segment, _ := newSegments.Get(segmentID)
logger := log.With(zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.String("segmentType", loadInfo.GetLevel().String()))
metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Inc()
defer metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Dec()
defer func() {
metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Dec()
if err != nil {
logger.Warn("load segment failed when load data into memory", zap.Error(err))
}
logger.Info("load segment done")
}()
tr := timerecord.NewTimeRecorder("loadDurationPerSegment")
logger.Info("load segment...")
var err error
if loadInfo.GetLevel() == datapb.SegmentLevel_L0 {
err = loader.LoadDeltaLogs(ctx, segment, loadInfo.GetDeltalogs())
} else {
err = loader.LoadSegment(ctx, segment.(*LocalSegment), loadInfo, loadStatus)
// L0 segment has no index or data to be load
if loadInfo.GetLevel() != datapb.SegmentLevel_L0 {
if err = loader.LoadSegment(ctx, segment.(*LocalSegment), loadInfo, loadStatus); err != nil {
return errors.Wrap(err, "At LoadSegment")
}
}
if err != nil {
log.Warn("load segment failed when load data into memory",
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.Error(err),
)
return err
if err = loader.LoadDeltaLogs(ctx, segment, loadInfo.GetDeltalogs()); err != nil {
return errors.Wrap(err, "At LoadDeltaLogs")
}
loader.manager.Segment.Put(segmentType, segment)
newSegments.GetAndRemove(segmentID)
loaded.Insert(segmentID, segment)
log.Info("load segment done", zap.Int64("segmentID", segmentID))
loader.notifyLoadFinish(loadInfo)
metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
@ -1096,8 +1100,7 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context,
strconv.FormatInt(int64(len(segment.Indexes())), 10),
).Add(float64(loadInfo.GetNumOfRows()))
log.Info("loading delta...")
return loader.LoadDeltaLogs(ctx, segment, loadInfo.Deltalogs)
return nil
}
func (loader *segmentLoader) filterPKStatsBinlogs(fieldBinlogs []*datapb.FieldBinlog, pkFieldID int64) ([]string, storage.StatsLogType) {
@ -1279,7 +1282,10 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
defer sp.End()
log := log.Ctx(ctx).With(
zap.Int64("segmentID", segment.ID()),
zap.Int("deltaNum", len(deltaLogs)),
)
log.Info("loading delta...")
dCodec := storage.DeleteCodec{}
var blobs []*storage.Blob
var futures []*conc.Future[any]