Optimize large memory usage of InsertRecord (#19197 #19245) (#19421)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/19476/head
aoiasd 2022-09-27 11:50:56 +08:00 committed by GitHub
parent 2753e05457
commit 6e9c8f7cf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 364 additions and 191 deletions

View File

@ -72,9 +72,6 @@ using VectorArray = proto::schema::VectorField;
using IdArray = proto::schema::IDs;
using InsertData = proto::segcore::InsertRecord;
using PkType = std::variant<std::monostate, int64_t, std::string>;
// tbb::concurrent_unordered_multimap equal_range too slow when multi repeated key
// using Pk2OffsetType = tbb::concurrent_unordered_multimap<PkType, int64_t, std::hash<PkType>>;
using Pk2OffsetType = std::unordered_map<PkType, std::vector<int64_t>, std::hash<PkType>>;
inline bool
IsPrimaryKeyDataType(DataType data_type) {

View File

@ -75,29 +75,6 @@ VectorFieldIndexing::get_search_params(int top_K) const {
return base_params;
}
void
IndexingRecord::UpdateResourceAck(int64_t chunk_ack, const InsertRecord& record) {
if (resource_ack_ >= chunk_ack) {
return;
}
std::unique_lock lck(mutex_);
int64_t old_ack = resource_ack_;
if (old_ack >= chunk_ack) {
return;
}
resource_ack_ = chunk_ack;
lck.unlock();
// std::thread([this, old_ack, chunk_ack, &record] {
for (auto& [field_offset, entry] : field_indexings_) {
auto vec_base = record.get_field_data_base(field_offset);
entry->BuildIndexRange(old_ack, chunk_ack, vec_base);
}
finished_ack_.AddSegment(old_ack, chunk_ack);
// }).detach();
}
template <typename T>
void
ScalarFieldIndexing<T>::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) {

View File

@ -138,8 +138,29 @@ class IndexingRecord {
}
// concurrent, reentrant
template <bool is_sealed>
void
UpdateResourceAck(int64_t chunk_ack, const InsertRecord& record);
UpdateResourceAck(int64_t chunk_ack, const InsertRecord<is_sealed>& record) {
if (resource_ack_ >= chunk_ack) {
return;
}
std::unique_lock lck(mutex_);
int64_t old_ack = resource_ack_;
if (old_ack >= chunk_ack) {
return;
}
resource_ack_ = chunk_ack;
lck.unlock();
// std::thread([this, old_ack, chunk_ack, &record] {
for (auto& [field_offset, entry] : field_indexings_) {
auto vec_base = record.get_field_data_base(field_offset);
entry->BuildIndexRange(old_ack, chunk_ack, vec_base);
}
finished_ack_.AddSegment(old_ack, chunk_ack);
// }).detach();
}
// concurrent
int64_t

View File

@ -10,64 +10,3 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "InsertRecord.h"
namespace milvus::segcore {
InsertRecord::InsertRecord(const Schema& schema, int64_t size_per_chunk)
: row_ids_(size_per_chunk), timestamps_(size_per_chunk) {
for (auto& field : schema) {
auto field_id = field.first;
auto& field_meta = field.second;
if (field_meta.is_vector()) {
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
this->append_field_data<FloatVector>(field_id, field_meta.get_dim(), size_per_chunk);
continue;
} else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) {
this->append_field_data<BinaryVector>(field_id, field_meta.get_dim(), size_per_chunk);
continue;
} else {
PanicInfo("unsupported");
}
}
switch (field_meta.get_data_type()) {
case DataType::BOOL: {
this->append_field_data<bool>(field_id, size_per_chunk);
break;
}
case DataType::INT8: {
this->append_field_data<int8_t>(field_id, size_per_chunk);
break;
}
case DataType::INT16: {
this->append_field_data<int16_t>(field_id, size_per_chunk);
break;
}
case DataType::INT32: {
this->append_field_data<int32_t>(field_id, size_per_chunk);
break;
}
case DataType::INT64: {
this->append_field_data<int64_t>(field_id, size_per_chunk);
break;
}
case DataType::FLOAT: {
this->append_field_data<float>(field_id, size_per_chunk);
break;
}
case DataType::DOUBLE: {
this->append_field_data<double>(field_id, size_per_chunk);
break;
}
case DataType::VARCHAR: {
this->append_field_data<std::string>(field_id, size_per_chunk);
break;
}
default: {
PanicInfo("unsupported");
}
}
}
}
} // namespace milvus::segcore

View File

@ -13,6 +13,8 @@
#include <memory>
#include <vector>
#include <string>
#include <algorithm>
#include <unordered_map>
#include <utility>
@ -24,6 +26,103 @@
namespace milvus::segcore {
class OffsetMap {
public:
virtual ~OffsetMap() = default;
virtual std::vector<int64_t>
find(const PkType pk) const = 0;
virtual void
insert(const PkType pk, int64_t offset) = 0;
virtual void
seal() = 0;
virtual bool
empty() const = 0;
};
template <typename T>
class OffsetHashMap : public OffsetMap {
public:
std::vector<int64_t>
find(const PkType pk) const {
auto offset_vector = map_.find(std::get<T>(pk));
return offset_vector != map_.end() ? offset_vector->second : std::vector<int64_t>();
}
void
insert(const PkType pk, int64_t offset) {
map_[std::get<T>(pk)].emplace_back(offset);
}
void
seal() {
PanicInfo("OffsetHashMap used for growing segment could not be sealed.");
}
bool
empty() const {
return map_.empty();
}
private:
std::unordered_map<T, std::vector<int64_t>> map_;
};
template <typename T>
class OffsetOrderedArray : public OffsetMap {
public:
std::vector<int64_t>
find(const PkType pk) const {
int left = 0, right = array_.size() - 1;
T target = std::get<T>(pk);
if (!is_sealed)
PanicInfo("OffsetOrderedArray could not search before seal");
while (left < right) {
int mid = (left + right) >> 1;
if (array_[mid].first < target)
left = mid + 1;
else
right = mid;
}
std::vector<int64_t> offset_vector;
for (int offset_id = right; offset_id < array_.size(); offset_id++) {
if (offset_id < 0 || array_[offset_id].first != target)
break;
offset_vector.push_back(array_[offset_id].second);
}
return offset_vector;
}
void
insert(const PkType pk, int64_t offset) {
if (is_sealed)
PanicInfo("OffsetOrderedArray could not insert after seal");
array_.push_back(std::make_pair(std::get<T>(pk), offset));
}
void
seal() {
sort(array_.begin(), array_.end());
is_sealed = true;
}
bool
empty() const {
return array_.empty();
}
private:
bool is_sealed = false;
std::vector<std::pair<T, int64_t>> array_;
};
template <bool is_sealed = false>
struct InsertRecord {
ConcurrentVector<Timestamp> timestamps_;
ConcurrentVector<idx_t> row_ids_;
@ -36,23 +135,96 @@ struct InsertRecord {
TimestampIndex timestamp_index_;
// pks to row offset
Pk2OffsetType pk2offset_;
std::unique_ptr<OffsetMap> pk2offset_;
explicit InsertRecord(const Schema& schema, int64_t size_per_chunk);
InsertRecord(const Schema& schema, int64_t size_per_chunk) : row_ids_(size_per_chunk), timestamps_(size_per_chunk) {
std::optional<FieldId> pk_field_id = schema.get_primary_field_id();
for (auto& field : schema) {
auto field_id = field.first;
auto& field_meta = field.second;
if (pk2offset_ == nullptr && pk_field_id.has_value() && pk_field_id.value() == field_id) {
switch (field_meta.get_data_type()) {
case DataType::INT64: {
if (is_sealed)
pk2offset_ = std::make_unique<OffsetOrderedArray<int64_t>>();
else
pk2offset_ = std::make_unique<OffsetHashMap<int64_t>>();
break;
}
case DataType::VARCHAR: {
if (is_sealed)
pk2offset_ = std::make_unique<OffsetOrderedArray<std::string>>();
else
pk2offset_ = std::make_unique<OffsetHashMap<std::string>>();
break;
}
default: {
PanicInfo("unsupported pk type");
}
}
}
if (field_meta.is_vector()) {
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
this->append_field_data<FloatVector>(field_id, field_meta.get_dim(), size_per_chunk);
continue;
} else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) {
this->append_field_data<BinaryVector>(field_id, field_meta.get_dim(), size_per_chunk);
continue;
} else {
PanicInfo("unsupported");
}
}
switch (field_meta.get_data_type()) {
case DataType::BOOL: {
this->append_field_data<bool>(field_id, size_per_chunk);
break;
}
case DataType::INT8: {
this->append_field_data<int8_t>(field_id, size_per_chunk);
break;
}
case DataType::INT16: {
this->append_field_data<int16_t>(field_id, size_per_chunk);
break;
}
case DataType::INT32: {
this->append_field_data<int32_t>(field_id, size_per_chunk);
break;
}
case DataType::INT64: {
this->append_field_data<int64_t>(field_id, size_per_chunk);
break;
}
case DataType::FLOAT: {
this->append_field_data<float>(field_id, size_per_chunk);
break;
}
case DataType::DOUBLE: {
this->append_field_data<double>(field_id, size_per_chunk);
break;
}
case DataType::VARCHAR: {
this->append_field_data<std::string>(field_id, size_per_chunk);
break;
}
default: {
PanicInfo("unsupported");
}
}
}
}
std::vector<SegOffset>
search_pk(const PkType pk, Timestamp timestamp) const {
std::shared_lock lck(shared_mutex_);
std::vector<SegOffset> res_offsets;
auto offset_iter = pk2offset_.find(pk);
if (offset_iter != pk2offset_.end()) {
for (auto offset : offset_iter->second) {
if (timestamps_[offset] <= timestamp) {
res_offsets.push_back(SegOffset(offset));
}
auto offset_iter = pk2offset_->find(pk);
for (auto offset : offset_iter) {
if (timestamps_[offset] <= timestamp) {
res_offsets.push_back(SegOffset(offset));
}
}
return res_offsets;
}
@ -60,28 +232,30 @@ struct InsertRecord {
search_pk(const PkType pk, int64_t insert_barrier) const {
std::shared_lock lck(shared_mutex_);
std::vector<SegOffset> res_offsets;
auto offset_iter = pk2offset_.find(pk);
if (offset_iter != pk2offset_.end()) {
for (auto offset : offset_iter->second) {
if (offset < insert_barrier) {
res_offsets.push_back(SegOffset(offset));
}
auto offset_iter = pk2offset_->find(pk);
for (auto offset : offset_iter) {
if (offset <= insert_barrier) {
res_offsets.push_back(SegOffset(offset));
}
}
return res_offsets;
}
void
insert_pk(const PkType pk, int64_t offset) {
std::lock_guard lck(shared_mutex_);
pk2offset_[pk].emplace_back(offset);
pk2offset_->insert(pk, offset);
}
bool
empty_pks() const {
std::shared_lock lck(shared_mutex_);
return pk2offset_.empty();
return pk2offset_->empty();
}
void
seal_pks() {
pk2offset_->seal();
}
// get field data without knowing the type

View File

@ -70,7 +70,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
debug() const override;
public:
const InsertRecord&
const InsertRecord<>&
get_insert_record() const {
return insert_record_;
}
@ -226,7 +226,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
SealedIndexingRecord sealed_indexing_record_; // not used
// inserted fields data and row_ids, timestamps
InsertRecord insert_record_;
InsertRecord<false> insert_record_;
// deleted pks
mutable DeletedRecord deleted_record_;

View File

@ -142,6 +142,7 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
for (int i = 0; i < row_count; ++i) {
insert_record_.insert_pk(int64_index->Reverse_Lookup(i), i);
}
insert_record_.seal_pks();
break;
}
case DataType::VARCHAR: {
@ -149,6 +150,7 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
for (int i = 0; i < row_count; ++i) {
insert_record_.insert_pk(string_index->Reverse_Lookup(i), i);
}
insert_record_.seal_pks();
break;
}
default: {
@ -232,6 +234,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
for (int i = 0; i < size; ++i) {
insert_record_.insert_pk(pks[i], i);
}
insert_record_.seal_pks();
}
set_bit(field_data_ready_bitset_, field_id, true);

View File

@ -190,7 +190,7 @@ class SegmentSealedImpl : public SegmentSealed {
SealedIndexingRecord vector_indexings_;
// inserted fields data and row_ids, timestamps
InsertRecord insert_record_;
InsertRecord<true> insert_record_;
// deleted pks
mutable DeletedRecord deleted_record_;

View File

@ -366,84 +366,4 @@ ReverseDataFromIndex(const knowhere::Index* index,
return data_array;
}
// insert_barrier means num row of insert data in a segment
// del_barrier means that if the pk of the insert data is in delete record[0 : del_barrier]
// then the data corresponding to this pk may be ignored when searching/querying
// and refer to func get_barrier, all ts in delete record[0 : del_barrier] < query_timestamp
// assert old insert record pks = [5, 2, 4, 1, 3, 8, 7, 6]
// assert old delete record pks = [2, 4, 3, 8, 5], old delete record ts = [100, 100, 150, 200, 400, 500, 500, 500]
// if delete_barrier = 3, query time = 180, then insert records with pks in [2, 4, 3] will be deleted
// then the old bitmap = [0, 1, 1, 0, 1, 0, 0, 0]
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
const InsertRecord& insert_record,
Timestamp query_timestamp) {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
auto current = delete_record.clone_lru_entry(insert_barrier, del_barrier, old_del_barrier, hit_cache);
if (hit_cache) {
return current;
}
auto bitmap = current->bitmap_ptr;
int64_t start, end;
if (del_barrier < old_del_barrier) {
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
// so these deletion records do not take effect in query/search
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] wil be reset to 0
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
start = del_barrier;
end = old_del_barrier;
} else {
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
start = old_del_barrier;
end = del_barrier;
}
// Avoid invalid calculations when there are a lot of repeated delete pks
std::unordered_map<PkType, Timestamp> delete_timestamps;
for (auto del_index = start; del_index < end; ++del_index) {
auto pk = delete_record.pks_[del_index];
auto timestamp = delete_record.timestamps_[del_index];
delete_timestamps[pk] = timestamp > delete_timestamps[pk] ? timestamp : delete_timestamps[pk];
}
for (auto iter = delete_timestamps.begin(); iter != delete_timestamps.end(); iter++) {
auto pk = iter->first;
auto delete_timestamp = iter->second;
auto segOffsets = insert_record.search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();
// for now, insert_barrier == insert count of segment, so this Assert will always work
AssertInfo(insert_row_offset < insert_barrier, "Timestamp offset is larger than insert barrier");
// insert after delete with same pk, delete will not task effect on this insert record
// and reset bitmap to 0
if (insert_record.timestamps_[insert_row_offset] > delete_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// the deletion record do not take effect in search/query
// and reset bitmap to 0
if (delete_timestamp > query_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}
delete_record.insert_lru_entry(current);
return current;
}
} // namespace milvus::segcore

View File

@ -9,6 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <unordered_map>
#include <exception>
#include <memory>
#include <stdexcept>
@ -48,12 +49,77 @@ CreateDataArrayFrom(const void* data_raw, int64_t count, const FieldMeta& field_
std::unique_ptr<DataArray>
MergeDataArray(std::vector<std::pair<milvus::SearchResult*, int64_t>>& result_offsets, const FieldMeta& field_meta);
template <bool is_sealed>
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
const InsertRecord& insert_record,
Timestamp query_timestamp);
const InsertRecord<is_sealed>& insert_record,
Timestamp query_timestamp) {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
auto current = delete_record.clone_lru_entry(insert_barrier, del_barrier, old_del_barrier, hit_cache);
if (hit_cache) {
return current;
}
auto bitmap = current->bitmap_ptr;
int64_t start, end;
if (del_barrier < old_del_barrier) {
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
// so these deletion records do not take effect in query/search
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] wil be reset to 0
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
start = del_barrier;
end = old_del_barrier;
} else {
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
start = old_del_barrier;
end = del_barrier;
}
// Avoid invalid calculations when there are a lot of repeated delete pks
std::unordered_map<PkType, Timestamp> delete_timestamps;
for (auto del_index = start; del_index < end; ++del_index) {
auto pk = delete_record.pks_[del_index];
auto timestamp = delete_record.timestamps_[del_index];
delete_timestamps[pk] = timestamp > delete_timestamps[pk] ? timestamp : delete_timestamps[pk];
}
for (auto iter = delete_timestamps.begin(); iter != delete_timestamps.end(); iter++) {
auto pk = iter->first;
auto delete_timestamp = iter->second;
auto segOffsets = insert_record.search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();
// for now, insert_barrier == insert count of segment, so this Assert will always work
AssertInfo(insert_row_offset < insert_barrier, "Timestamp offset is larger than insert barrier");
// insert after delete with same pk, delete will not task effect on this insert record
// and reset bitmap to 0
if (insert_record.timestamps_[insert_row_offset] > delete_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// the deletion record do not take effect in search/query
// and reset bitmap to 0
if (delete_timestamp > query_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}
delete_record.insert_lru_entry(current);
return current;
}
std::unique_ptr<DataArray>
ReverseDataFromIndex(const knowhere::Index* index,

View File

@ -12,6 +12,7 @@
#include <gtest/gtest.h>
#include <random>
#include <string>
#include <iostream>
#include "segcore/SegmentGrowingImpl.h"
#include "test_utils/DataGen.h"
@ -79,3 +80,78 @@ TEST(SegmentCoreTest, SmallIndex) {
schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
schema->AddDebugField("age", DataType::INT32);
}
TEST(InsertRecordTest, growing_int64_t) {
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto record = milvus::segcore::InsertRecord<false>(*schema, int64_t(32));
const int N=100000;
for (int i = 1; i <= N; i++)
record.insert_pk(PkType(int64_t(i)), int64_t(i));
for (int i = 1; i <= N; i++){
std::vector<SegOffset> offset = record.search_pk(PkType(int64_t(i)), int64_t(N + 1));
ASSERT_EQ(offset[0].get(), int64_t(i));
}
}
TEST(InsertRecordTest, growing_string) {
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("name", DataType::VARCHAR);
schema->set_primary_field_id(i64_fid);
auto record = milvus::segcore::InsertRecord<false>(*schema, int64_t(32));
const int N = 100000;
for (int i = 1; i <= N; i++)
record.insert_pk(PkType(std::to_string(i)), int64_t(i));
for (int i = 1; i <= N; i++){
std::vector<SegOffset> offset = record.search_pk(std::to_string(i), int64_t(N + 1));
ASSERT_EQ(offset[0].get(), int64_t(i));
}
}
TEST(InsertRecordTest, sealed_int64_t) {
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto record = milvus::segcore::InsertRecord<true>(*schema, int64_t(32));
const int N = 100000;
for (int i = N; i >= 1; i--)
record.insert_pk(PkType(int64_t(i)), int64_t(i));
record.seal_pks();
for (int i = 1;i <= N; i++){
std::vector<SegOffset> offset = record.search_pk(PkType(int64_t(i)), int64_t(N + 1));
ASSERT_EQ(offset[0].get(), int64_t(i));
}
}
TEST(InsertRecordTest, sealed_string) {
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("name", DataType::VARCHAR);
schema->set_primary_field_id(i64_fid);
auto record = milvus::segcore::InsertRecord<true>(*schema, int64_t(32));
const int N = 100000;
for (int i = 1; i <= N; i++)
record.insert_pk(PkType(std::to_string(i)), int64_t(i));
record.seal_pks();
for (int i = 1; i <= N; i++){
std::vector<SegOffset> offset = record.search_pk(std::to_string(i), int64_t(N + 1));
ASSERT_EQ(offset[0].get(), int64_t(i));
}
}