mirror of https://github.com/milvus-io/milvus.git
enhance: optimize some cache to reduce memory usage (#33534)
#33533 Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>pull/33599/head
parent
2422084a29
commit
c6f8a73bb2
|
@ -459,9 +459,7 @@ class VariableColumn : public ColumnBase {
|
|||
|
||||
std::string_view
|
||||
RawAt(const int i) const {
|
||||
size_t len = (i == indices_.size() - 1) ? size_ - indices_.back()
|
||||
: indices_[i + 1] - indices_[i];
|
||||
return std::string_view(data_ + indices_[i], len);
|
||||
return std::string_view(views_[i]);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -502,6 +500,9 @@ class VariableColumn : public ColumnBase {
|
|||
}
|
||||
|
||||
ConstructViews();
|
||||
|
||||
// Not need indices_ after
|
||||
indices_.clear();
|
||||
}
|
||||
|
||||
protected:
|
||||
|
|
|
@ -212,7 +212,8 @@ class OffsetOrderedArray : public OffsetMap {
|
|||
PanicInfo(Unsupported,
|
||||
"OffsetOrderedArray could not insert after seal");
|
||||
}
|
||||
array_.push_back(std::make_pair(std::get<T>(pk), offset));
|
||||
array_.push_back(
|
||||
std::make_pair(std::get<T>(pk), static_cast<int32_t>(offset)));
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -285,13 +286,13 @@ class OffsetOrderedArray : public OffsetMap {
|
|||
|
||||
private:
|
||||
bool is_sealed = false;
|
||||
std::vector<std::pair<T, int64_t>> array_;
|
||||
std::vector<std::pair<T, int32_t>> array_;
|
||||
};
|
||||
|
||||
template <bool is_sealed = false>
|
||||
struct InsertRecord {
|
||||
InsertRecord(const Schema& schema, int64_t size_per_chunk)
|
||||
: row_ids_(size_per_chunk), timestamps_(size_per_chunk) {
|
||||
: timestamps_(size_per_chunk) {
|
||||
std::optional<FieldId> pk_field_id = schema.get_primary_field_id();
|
||||
|
||||
for (auto& field : schema) {
|
||||
|
@ -590,10 +591,8 @@ struct InsertRecord {
|
|||
void
|
||||
clear() {
|
||||
timestamps_.clear();
|
||||
row_ids_.clear();
|
||||
reserved = 0;
|
||||
ack_responder_.clear();
|
||||
timestamp_index_ = TimestampIndex();
|
||||
pk2offset_->clear();
|
||||
fields_data_.clear();
|
||||
}
|
||||
|
@ -605,15 +604,11 @@ struct InsertRecord {
|
|||
|
||||
public:
|
||||
ConcurrentVector<Timestamp> timestamps_;
|
||||
ConcurrentVector<idx_t> row_ids_;
|
||||
|
||||
// used for preInsert of growing segment
|
||||
std::atomic<int64_t> reserved = 0;
|
||||
AckResponder ack_responder_;
|
||||
|
||||
// used for timestamps index of sealed segment
|
||||
TimestampIndex timestamp_index_;
|
||||
|
||||
// pks to row offset
|
||||
std::unique_ptr<OffsetMap> pk2offset_;
|
||||
|
||||
|
|
|
@ -110,7 +110,6 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
|
|||
// step 3: fill into Segment.ConcurrentVector
|
||||
insert_record_.timestamps_.set_data_raw(
|
||||
reserved_offset, timestamps_raw, num_rows);
|
||||
insert_record_.row_ids_.set_data_raw(reserved_offset, row_ids, num_rows);
|
||||
|
||||
// update the mem size of timestamps and row IDs
|
||||
stats_.mem_size += num_rows * (sizeof(Timestamp) + sizeof(idx_t));
|
||||
|
@ -224,7 +223,6 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
|
|||
}
|
||||
|
||||
if (field_id == RowFieldID) {
|
||||
insert_record_.row_ids_.set_data_raw(reserved_offset, field_data);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -313,7 +311,6 @@ SegmentGrowingImpl::LoadFieldDataV2(const LoadFieldDataInfo& infos) {
|
|||
}
|
||||
|
||||
if (field_id == RowFieldID) {
|
||||
insert_record_.row_ids_.set_data_raw(reserved_offset, field_data);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -766,10 +763,8 @@ SegmentGrowingImpl::bulk_subscript(SystemFieldType system_type,
|
|||
static_cast<Timestamp*>(output));
|
||||
break;
|
||||
case SystemFieldType::RowId:
|
||||
bulk_subscript_impl<int64_t>(&this->insert_record_.row_ids_,
|
||||
seg_offsets,
|
||||
count,
|
||||
static_cast<int64_t*>(output));
|
||||
PanicInfo(ErrorCode::Unsupported,
|
||||
"RowId retrieve is not supported");
|
||||
break;
|
||||
default:
|
||||
PanicInfo(DataTypeInvalid, "unknown subscript fields");
|
||||
|
|
|
@ -235,6 +235,7 @@ class SegmentInternalInterface : public SegmentInterface {
|
|||
virtual int64_t
|
||||
num_chunk_data(FieldId field_id) const = 0;
|
||||
|
||||
// bitset 1 means not hit. 0 means hit.
|
||||
virtual void
|
||||
mask_with_timestamps(BitsetType& bitset_chunk,
|
||||
Timestamp timestamp) const = 0;
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include <string_view>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include <boost/iterator/counting_iterator.hpp>
|
||||
|
||||
#include "Utils.h"
|
||||
#include "Types.h"
|
||||
|
@ -348,35 +349,15 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
|
|||
offset += row_count;
|
||||
}
|
||||
|
||||
TimestampIndex index;
|
||||
auto min_slice_length = num_rows < 4096 ? 1 : 4096;
|
||||
auto meta = GenerateFakeSlices(
|
||||
timestamps.data(), num_rows, min_slice_length);
|
||||
index.set_length_meta(std::move(meta));
|
||||
// todo ::opt to avoid copy timestamps from field data
|
||||
index.build_with(timestamps.data(), num_rows);
|
||||
|
||||
// use special index
|
||||
std::unique_lock lck(mutex_);
|
||||
AssertInfo(insert_record_.timestamps_.empty(), "already exists");
|
||||
insert_record_.timestamps_.fill_chunk_data(field_data);
|
||||
insert_record_.timestamp_index_ = std::move(index);
|
||||
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
|
||||
"num chunk not equal to 1 for sealed segment");
|
||||
stats_.mem_size += sizeof(Timestamp) * data.row_count;
|
||||
} else {
|
||||
AssertInfo(system_field_type == SystemFieldType::RowId,
|
||||
"System field type of id column is not RowId");
|
||||
|
||||
auto field_data = storage::CollectFieldDataChannel(data.channel);
|
||||
|
||||
// write data under lock
|
||||
std::unique_lock lck(mutex_);
|
||||
AssertInfo(insert_record_.row_ids_.empty(), "already exists");
|
||||
insert_record_.row_ids_.fill_chunk_data(field_data);
|
||||
AssertInfo(insert_record_.row_ids_.num_chunk() == 1,
|
||||
"num chunk not equal to 1 for sealed segment");
|
||||
stats_.mem_size += sizeof(idx_t) * data.row_count;
|
||||
}
|
||||
++system_ready_count_;
|
||||
} else {
|
||||
|
@ -925,9 +906,7 @@ SegmentSealedImpl::DropFieldData(const FieldId field_id) {
|
|||
|
||||
std::unique_lock lck(mutex_);
|
||||
--system_ready_count_;
|
||||
if (system_field_type == SystemFieldType::RowId) {
|
||||
insert_record_.row_ids_.clear();
|
||||
} else if (system_field_type == SystemFieldType::Timestamp) {
|
||||
if (system_field_type == SystemFieldType::Timestamp) {
|
||||
insert_record_.timestamps_.clear();
|
||||
}
|
||||
lck.unlock();
|
||||
|
@ -1042,13 +1021,7 @@ SegmentSealedImpl::bulk_subscript(SystemFieldType system_type,
|
|||
static_cast<Timestamp*>(output));
|
||||
break;
|
||||
case SystemFieldType::RowId:
|
||||
AssertInfo(insert_record_.row_ids_.num_chunk() == 1,
|
||||
"num chunk of rowID not equal to 1 for sealed segment");
|
||||
bulk_subscript_impl<int64_t>(
|
||||
this->insert_record_.row_ids_.get_chunk_data(0),
|
||||
seg_offsets,
|
||||
count,
|
||||
static_cast<int64_t*>(output));
|
||||
PanicInfo(ErrorCode::Unsupported, "RowId retrieve not supported");
|
||||
break;
|
||||
default:
|
||||
PanicInfo(DataTypeInvalid,
|
||||
|
@ -1512,12 +1485,6 @@ SegmentSealedImpl::debug() const {
|
|||
void
|
||||
SegmentSealedImpl::LoadSegmentMeta(
|
||||
const proto::segcore::LoadSegmentMeta& segment_meta) {
|
||||
std::unique_lock lck(mutex_);
|
||||
std::vector<int64_t> slice_lengths;
|
||||
for (auto& info : segment_meta.metas()) {
|
||||
slice_lengths.push_back(info.row_count());
|
||||
}
|
||||
insert_record_.timestamp_index_.set_length_meta(std::move(slice_lengths));
|
||||
PanicInfo(NotImplemented, "unimplemented");
|
||||
}
|
||||
|
||||
|
@ -1529,33 +1496,17 @@ SegmentSealedImpl::get_active_count(Timestamp ts) const {
|
|||
|
||||
void
|
||||
SegmentSealedImpl::mask_with_timestamps(BitsetType& bitset_chunk,
|
||||
Timestamp timestamp) const {
|
||||
// TODO change the
|
||||
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
|
||||
"num chunk not equal to 1 for sealed segment");
|
||||
const auto& timestamps_data = insert_record_.timestamps_.get_chunk(0);
|
||||
AssertInfo(timestamps_data.size() == get_row_count(),
|
||||
fmt::format("Timestamp size not equal to row count: {}, {}",
|
||||
timestamps_data.size(),
|
||||
get_row_count()));
|
||||
auto range = insert_record_.timestamp_index_.get_active_range(timestamp);
|
||||
|
||||
// range == (size_, size_) and size_ is this->timestamps_.size().
|
||||
// it means these data are all useful, we don't need to update bitset_chunk.
|
||||
// It can be thought of as an OR operation with another bitmask that is all 0s, but it is not necessary to do so.
|
||||
if (range.first == range.second && range.first == timestamps_data.size()) {
|
||||
// just skip
|
||||
return;
|
||||
Timestamp ts) const {
|
||||
auto row_count = this->get_row_count();
|
||||
auto& ts_vec = this->insert_record_.timestamps_;
|
||||
auto iter = std::upper_bound(
|
||||
boost::make_counting_iterator(static_cast<int64_t>(0)),
|
||||
boost::make_counting_iterator(row_count),
|
||||
ts,
|
||||
[&](Timestamp ts, int64_t index) { return ts < ts_vec[index]; });
|
||||
for (size_t i = *iter; i < row_count; ++i) {
|
||||
bitset_chunk.set(i);
|
||||
}
|
||||
// range == (0, 0). it means these data can not be used, directly set bitset_chunk to all 1s.
|
||||
// It can be thought of as an OR operation with another bitmask that is all 1s.
|
||||
if (range.first == range.second && range.first == 0) {
|
||||
bitset_chunk.set();
|
||||
return;
|
||||
}
|
||||
auto mask = TimestampIndex::GenerateBitset(
|
||||
timestamp, range, timestamps_data.data(), timestamps_data.size());
|
||||
bitset_chunk |= mask;
|
||||
}
|
||||
|
||||
bool
|
||||
|
|
Loading…
Reference in New Issue