mirror of https://github.com/milvus-io/milvus.git
Add new method called EntireRowCount to get count of uid (#4365)
* Fix may logout memory leak Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com> * Add new method called EntireRowCount to get count of uid Signed-off-by: yinghao.zou <yinghao.zou@zilliz.com> Signed-off-by: shengjun.li <shengjun.li@zilliz.com>pull/4485/head
parent
9345db2dd4
commit
5ca3a65eca
|
@ -49,16 +49,26 @@ MultiSegmentsOperation::DoExecute(StorePtr store) {
|
|||
std::map<ID_TYPE, SegmentCommit::VecT> new_segment_commits;
|
||||
for (auto& iter : new_segments_) {
|
||||
for (auto& new_segment : iter.second) {
|
||||
size_t row_count = new_segment_counts_[new_segment->GetID()];
|
||||
OperationContext context;
|
||||
context.new_segment = new_segment;
|
||||
// TODO(yhz): Why here get adjusted ss
|
||||
context.new_segment_files = context_.new_segment_file_map[new_segment->GetID()];
|
||||
// Set segment file row count.
|
||||
for (auto& file : context.new_segment_files) {
|
||||
if (file->GetFEtype() == engine::FieldElementType::FET_RAW) {
|
||||
file->SetRowCount(row_count);
|
||||
auto sf_ctx_p =
|
||||
ResourceContextBuilder<SegmentFile>(meta::oUpdate).AddAttr(RowCountField::Name).CreatePtr();
|
||||
AddStepWithLsn(*file, context.lsn, sf_ctx_p);
|
||||
}
|
||||
}
|
||||
auto sc_op = SegmentCommitOperation(context, GetAdjustedSS());
|
||||
STATUS_CHECK(sc_op(store));
|
||||
SegmentCommit::Ptr sc;
|
||||
STATUS_CHECK(sc_op.GetResource(sc));
|
||||
|
||||
sc->SetRowCount(new_segment_counts_[new_segment->GetID()]);
|
||||
sc->SetRowCount(row_count);
|
||||
|
||||
if (new_segment_commits.find(new_segment->GetPartitionId()) == new_segment_commits.end()) {
|
||||
new_segment_commits[new_segment->GetPartitionId()] = SegmentCommit::VecT();
|
||||
|
|
|
@ -27,7 +27,7 @@ LogOut(const char* pattern, ...) {
|
|||
|
||||
va_list vl;
|
||||
va_start(vl, pattern);
|
||||
vsnprintf(str_p.get(), len, pattern, vl); // NOLINT
|
||||
vsnprintf(str_p.get(), len - 1, pattern, vl); // NOLINT
|
||||
va_end(vl);
|
||||
|
||||
return std::string(str_p.get());
|
||||
|
|
|
@ -28,7 +28,7 @@ LogOut(const char* pattern, ...) {
|
|||
|
||||
va_list vl;
|
||||
va_start(vl, pattern);
|
||||
vsnprintf(str_p.get(), len, pattern, vl); // NOLINT
|
||||
vsnprintf(str_p.get(), len - 1, pattern, vl); // NOLINT
|
||||
va_end(vl);
|
||||
|
||||
return std::string(str_p.get());
|
||||
|
|
|
@ -675,12 +675,8 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
|
|||
if (data_obj == nullptr) {
|
||||
auto& ss_codec = codec::Codec::instance();
|
||||
STATUS_CHECK(ss_codec.GetDeletedDocsFormat()->Read(fs_ptr_, file_path, deleted_docs_ptr));
|
||||
auto id = segment_visitor_->GetSegment()->GetID();
|
||||
auto sc = segment_visitor_->GetSnapshot()->GetSegmentCommitBySegmentId(id);
|
||||
// The black list size must be equal to total entity count containing deleted count
|
||||
// and segment row count.
|
||||
if (sc != nullptr && deleted_docs_ptr != nullptr) {
|
||||
deleted_docs_ptr->GenBlacklist(sc->GetRowCount() + deleted_docs_ptr->GetCount());
|
||||
if (deleted_docs_ptr != nullptr) {
|
||||
deleted_docs_ptr->GenBlacklist(GetEntireRowCount());
|
||||
}
|
||||
cache::CpuCacheMgr::GetInstance().InsertItem(file_path, deleted_docs_ptr); // put into cache
|
||||
} else {
|
||||
|
@ -791,6 +787,30 @@ SegmentReader::GetRowCount() {
|
|||
return count;
|
||||
}
|
||||
|
||||
int64_t
|
||||
SegmentReader::GetEntireRowCount() const {
|
||||
if (segment_visitor_ == nullptr) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::FIELD_UID);
|
||||
if (uid_field_visitor == nullptr) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto uid_element_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW);
|
||||
if (uid_element_visitor == nullptr) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto uid_file = uid_element_visitor->GetFile();
|
||||
if (uid_file == nullptr) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return uid_file->GetRowCount();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentReader::ClearCache() {
|
||||
TimeRecorderAuto recorder("SegmentReader::ClearCache");
|
||||
|
|
|
@ -104,6 +104,10 @@ class SegmentReader {
|
|||
int64_t
|
||||
GetRowCount();
|
||||
|
||||
// get the entire row count, which value is equal to count of uid.
|
||||
int64_t
|
||||
GetEntireRowCount() const;
|
||||
|
||||
// clear cache from cache manager, use this method for segment merge/compact and collection/partition drop
|
||||
Status
|
||||
ClearCache();
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "db/utils.h"
|
||||
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
|
||||
#include "segment/Segment.h"
|
||||
#include "segment/SegmentReader.h"
|
||||
|
||||
using SegmentVisitor = milvus::engine::SegmentVisitor;
|
||||
using InActiveResourcesGCEvent = milvus::engine::snapshot::InActiveResourcesGCEvent;
|
||||
|
@ -1356,6 +1357,57 @@ TEST_F(DBTest, FetchTest2) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTest, SegmentEntireRowCountTest) {
|
||||
/*
|
||||
* Test the function of 'GetEntireRowCount()'.
|
||||
* The expected behavior is that before and after deleted, the return value
|
||||
* of 'GetEntireRowCount()' should always be equal to SegmentRowCount and
|
||||
* DeletedRowCount.
|
||||
*/
|
||||
std::string collection_name = "test_collection_delete_";
|
||||
CreateCollection2(db_, collection_name, false);
|
||||
size_t count = 100;
|
||||
|
||||
milvus::engine::IDNumbers entity_ids;
|
||||
milvus::engine::DataChunkPtr data_chunk;
|
||||
BuildEntities(count, 0, data_chunk, true);
|
||||
auto status = db_->Insert(collection_name, "", data_chunk);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
milvus::engine::utils::GetIDFromChunk(data_chunk, entity_ids);
|
||||
|
||||
status = db_->Flush();
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
milvus::json stats;
|
||||
status = db_->GetCollectionStats(collection_name, stats);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
auto seg = stats["partitions"][0]["segments"][0];
|
||||
std::string path = seg["files"][0]["path"];
|
||||
int64_t segment_id = seg["id"];
|
||||
|
||||
ScopedSnapshotT ss;
|
||||
status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
auto visitor = SegmentVisitor::Build(ss, segment_id);
|
||||
milvus::segment::SegmentReader segment_reader(path, visitor);
|
||||
ASSERT_EQ(segment_reader.GetEntireRowCount(), count);
|
||||
|
||||
milvus::engine::IDNumbers delete_ids = {entity_ids[0], entity_ids[1]};
|
||||
status = db_->DeleteEntityByID(collection_name, delete_ids);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
status = db_->Flush();
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
ScopedSnapshotT nss;
|
||||
status = Snapshots::GetInstance().GetSnapshot(nss, collection_name);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
auto nvisitor = SegmentVisitor::Build(nss, segment_id);
|
||||
milvus::segment::SegmentReader segment_nreader(path, nvisitor);
|
||||
ASSERT_EQ(segment_nreader.GetEntireRowCount(), count);
|
||||
}
|
||||
|
||||
TEST_F(DBTest, DeleteEntitiesTest) {
|
||||
std::string collection_name = "test_collection_delete_";
|
||||
CreateCollection2(db_, collection_name, false);
|
||||
|
|
Loading…
Reference in New Issue