From 3d40a3886fa80e1364ff2633f17a830f4ceedf34 Mon Sep 17 00:00:00 2001 From: groot Date: Fri, 21 Aug 2020 11:14:29 +0800 Subject: [PATCH] prepare change memmanager for wal (#3376) Signed-off-by: groot --- core/src/db/DBImpl.cpp | 10 +++-- core/src/db/insert/MemCollection.cpp | 15 +------ core/src/db/insert/MemCollection.h | 12 +---- core/src/db/insert/MemManager.h | 6 +-- core/src/db/insert/MemManagerImpl.cpp | 28 +++--------- core/src/db/insert/MemManagerImpl.h | 11 ++--- core/src/db/insert/MemSegment.cpp | 4 +- core/src/db/insert/MemSegment.h | 5 +-- core/src/db/insert/VectorSource.cpp | 2 +- core/src/db/insert/VectorSource.h | 13 +++--- core/src/db/wal/WalManager.cpp | 65 +++++++++++++++++---------- core/unittest/db/test_wal.cpp | 2 +- 12 files changed, 80 insertions(+), 93 deletions(-) diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index c4f60b198b..ea01cf7724 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -206,7 +206,8 @@ DBImpl::DropCollection(const std::string& collection_name) { auto& snapshots = snapshot::Snapshots::GetInstance(); STATUS_CHECK(snapshots.GetSnapshot(ss, collection_name)); - mem_mgr_->EraseMem(ss->GetCollectionId()); // not allow insert + // erase insert buffer of this collection + mem_mgr_->EraseMem(ss->GetCollectionId()); return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits::max()); } @@ -291,8 +292,11 @@ DBImpl::DropPartition(const std::string& collection_name, const std::string& par snapshot::ScopedSnapshotT ss; STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); - // SS TODO: Is below step needed? Or How to implement it? - /* mem_mgr_->EraseMem(partition_name); */ + // erase insert buffer of this partition + auto partition = ss->GetPartition(partition_name); + if (partition != nullptr) { + mem_mgr_->EraseMem(ss->GetCollectionId(), partition->GetID()); + } snapshot::PartitionContext context; context.name = partition_name; diff --git a/core/src/db/insert/MemCollection.cpp b/core/src/db/insert/MemCollection.cpp index 94b49f59c7..beafa86db7 100644 --- a/core/src/db/insert/MemCollection.cpp +++ b/core/src/db/insert/MemCollection.cpp @@ -106,7 +106,7 @@ MemCollection::EraseMem(int64_t partition_id) { } Status -MemCollection::Serialize(uint64_t wal_lsn) { +MemCollection::Serialize() { TimeRecorder recorder("MemCollection::Serialize collection " + std::to_string(collection_id_)); if (!doc_ids_to_delete_.empty()) { @@ -132,7 +132,7 @@ MemCollection::Serialize(uint64_t wal_lsn) { for (auto& partition_segments : mem_segments_) { MemSegmentList& segments = partition_segments.second; for (auto& segment : segments) { - auto status = segment->Serialize(wal_lsn); + auto status = segment->Serialize(); if (!status.ok()) { return status; } @@ -171,7 +171,6 @@ MemCollection::ApplyDeletes() { STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_)); snapshot::OperationContext context; - context.lsn = lsn_; auto segments_op = std::make_shared(context, ss); int64_t segment_iterated = 0; @@ -308,15 +307,5 @@ MemCollection::ApplyDeletes() { return segments_op->Push(); } -uint64_t -MemCollection::GetLSN() { - return lsn_; -} - -void -MemCollection::SetLSN(uint64_t lsn) { - lsn_ = lsn; -} - } // namespace engine } // namespace milvus diff --git a/core/src/db/insert/MemCollection.h b/core/src/db/insert/MemCollection.h index 98680ab06a..6b746eef38 100644 --- a/core/src/db/insert/MemCollection.h +++ b/core/src/db/insert/MemCollection.h @@ -46,7 +46,7 @@ class MemCollection { EraseMem(int64_t partition_id); Status - Serialize(uint64_t wal_lsn); + Serialize(); int64_t GetCollectionId() const; @@ -54,12 +54,6 @@ class MemCollection { size_t GetCurrentMem(); - uint64_t - GetLSN(); - - void - SetLSN(uint64_t lsn); - private: Status ApplyDeletes(); @@ -74,9 +68,7 @@ class MemCollection { std::mutex mutex_; std::set doc_ids_to_delete_; - - std::atomic lsn_; -}; // SSMemCollection +}; using MemCollectionPtr = std::shared_ptr; diff --git a/core/src/db/insert/MemManager.h b/core/src/db/insert/MemManager.h index 606d5afdb2..3e48607672 100644 --- a/core/src/db/insert/MemManager.h +++ b/core/src/db/insert/MemManager.h @@ -27,10 +27,10 @@ namespace engine { class MemManager { public: virtual Status - InsertEntities(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk, uint64_t lsn) = 0; + InsertEntities(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk, idx_t op_id) = 0; virtual Status - DeleteEntities(int64_t collection_id, const std::vector& entity_ids, uint64_t lsn) = 0; + DeleteEntities(int64_t collection_id, const std::vector& entity_ids, idx_t op_id) = 0; virtual Status Flush(int64_t collection_id) = 0; @@ -52,7 +52,7 @@ class MemManager { virtual size_t GetCurrentMem() = 0; -}; // MemManagerAbstract +}; using MemManagerPtr = std::shared_ptr; diff --git a/core/src/db/insert/MemManagerImpl.cpp b/core/src/db/insert/MemManagerImpl.cpp index 81d9880330..bbed1030c7 100644 --- a/core/src/db/insert/MemManagerImpl.cpp +++ b/core/src/db/insert/MemManagerImpl.cpp @@ -36,15 +36,15 @@ MemManagerImpl::GetMemByCollection(int64_t collection_id) { } Status -MemManagerImpl::InsertEntities(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk, uint64_t lsn) { +MemManagerImpl::InsertEntities(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk, idx_t op_id) { auto status = ValidateChunk(collection_id, chunk); if (!status.ok()) { return status; } - VectorSourcePtr source = std::make_shared(chunk); + VectorSourcePtr source = std::make_shared(chunk, op_id); std::unique_lock lock(mutex_); - return InsertEntitiesNoLock(collection_id, partition_id, source, lsn); + return InsertEntitiesNoLock(collection_id, partition_id, source); } Status @@ -141,21 +141,18 @@ MemManagerImpl::ValidateChunk(int64_t collection_id, const DataChunkPtr& chunk) Status MemManagerImpl::InsertEntitiesNoLock(int64_t collection_id, int64_t partition_id, - const milvus::engine::VectorSourcePtr& source, uint64_t lsn) { + const milvus::engine::VectorSourcePtr& source) { MemCollectionPtr mem = GetMemByCollection(collection_id); - mem->SetLSN(lsn); auto status = mem->Add(partition_id, source); return status; } Status -MemManagerImpl::DeleteEntities(int64_t collection_id, const std::vector& entity_ids, uint64_t lsn) { +MemManagerImpl::DeleteEntities(int64_t collection_id, const std::vector& entity_ids, idx_t op_id) { std::unique_lock lock(mutex_); MemCollectionPtr mem = GetMemByCollection(collection_id); - mem->SetLSN(lsn); - auto status = mem->Delete(entity_ids); if (!status.ok()) { return status; @@ -188,10 +185,9 @@ MemManagerImpl::InternalFlush(std::set& collection_ids) { } std::unique_lock lock(serialization_mtx_); - auto max_lsn = GetMaxLSN(temp_immutable_list); for (auto& mem : temp_immutable_list) { LOG_ENGINE_DEBUG_ << "Flushing collection: " << mem->GetCollectionId(); - auto status = mem->Serialize(max_lsn); + auto status = mem->Serialize(); if (!status.ok()) { LOG_ENGINE_ERROR_ << "Flush collection " << mem->GetCollectionId() << " failed"; return status; @@ -294,17 +290,5 @@ MemManagerImpl::GetCurrentMem() { return GetCurrentMutableMem() + GetCurrentImmutableMem(); } -uint64_t -MemManagerImpl::GetMaxLSN(const MemList& collections) { - uint64_t max_lsn = 0; - for (auto& collection : collections) { - auto cur_lsn = collection->GetLSN(); - if (collection->GetLSN() > max_lsn) { - max_lsn = cur_lsn; - } - } - return max_lsn; -} - } // namespace engine } // namespace milvus diff --git a/core/src/db/insert/MemManagerImpl.h b/core/src/db/insert/MemManagerImpl.h index 0106e70c92..1b420540ea 100644 --- a/core/src/db/insert/MemManagerImpl.h +++ b/core/src/db/insert/MemManagerImpl.h @@ -39,10 +39,10 @@ class MemManagerImpl : public MemManager { ~MemManagerImpl() = default; Status - InsertEntities(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk, uint64_t lsn) override; + InsertEntities(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk, idx_t op_id) override; Status - DeleteEntities(int64_t collection_id, const std::vector& entity_ids, uint64_t lsn) override; + DeleteEntities(int64_t collection_id, const std::vector& entity_ids, idx_t op_id) override; Status Flush(int64_t collection_id) override; @@ -73,7 +73,7 @@ class MemManagerImpl : public MemManager { ValidateChunk(int64_t collection_id, const DataChunkPtr& chunk); Status - InsertEntitiesNoLock(int64_t collection_id, int64_t partition_id, const VectorSourcePtr& source, uint64_t lsn); + InsertEntitiesNoLock(int64_t collection_id, int64_t partition_id, const VectorSourcePtr& source); Status ToImmutable(); @@ -81,9 +81,6 @@ class MemManagerImpl : public MemManager { Status ToImmutable(int64_t collection_id); - uint64_t - GetMaxLSN(const MemList& collections); - Status InternalFlush(std::set& collection_ids); @@ -94,7 +91,7 @@ class MemManagerImpl : public MemManager { DBOptions options_; std::mutex mutex_; std::mutex serialization_mtx_; -}; // NewMemManager +}; } // namespace engine } // namespace milvus diff --git a/core/src/db/insert/MemSegment.cpp b/core/src/db/insert/MemSegment.cpp index dab8c404f4..137a7d4dfb 100644 --- a/core/src/db/insert/MemSegment.cpp +++ b/core/src/db/insert/MemSegment.cpp @@ -242,7 +242,7 @@ MemSegment::IsFull() { } Status -MemSegment::Serialize(uint64_t wal_lsn) { +MemSegment::Serialize() { int64_t size = GetCurrentMem(); server::CollectSerializeMetrics metrics(size); @@ -260,7 +260,7 @@ MemSegment::Serialize(uint64_t wal_lsn) { STATUS_CHECK(operation_->CommitRowCount(segment_writer_ptr_->RowCount())); STATUS_CHECK(operation_->Push()); - LOG_ENGINE_DEBUG_ << "New segment " << segment_->GetID() << " serialized, lsn = " << wal_lsn; + LOG_ENGINE_DEBUG_ << "New segment " << segment_->GetID() << " serialized"; return Status::OK(); } diff --git a/core/src/db/insert/MemSegment.h b/core/src/db/insert/MemSegment.h index 1229a0d4e5..c5201e77c4 100644 --- a/core/src/db/insert/MemSegment.h +++ b/core/src/db/insert/MemSegment.h @@ -51,7 +51,7 @@ class MemSegment { IsFull(); Status - Serialize(uint64_t wal_lsn); + Serialize(); int64_t GetSegmentId() const; @@ -69,9 +69,8 @@ class MemSegment { DBOptions options_; int64_t current_mem_; - // ExecutionEnginePtr execution_engine_; segment::SegmentWriterPtr segment_writer_ptr_; -}; // SSMemTableFile +}; using MemSegmentPtr = std::shared_ptr; diff --git a/core/src/db/insert/VectorSource.cpp b/core/src/db/insert/VectorSource.cpp index 65465ba9e4..5ee34907c2 100644 --- a/core/src/db/insert/VectorSource.cpp +++ b/core/src/db/insert/VectorSource.cpp @@ -21,7 +21,7 @@ namespace milvus { namespace engine { -VectorSource::VectorSource(const DataChunkPtr& chunk) : chunk_(chunk) { +VectorSource::VectorSource(const DataChunkPtr& chunk, idx_t op_id) : chunk_(chunk), op_id_(op_id) { } Status diff --git a/core/src/db/insert/VectorSource.h b/core/src/db/insert/VectorSource.h index 44285fcccb..bc66b52dca 100644 --- a/core/src/db/insert/VectorSource.h +++ b/core/src/db/insert/VectorSource.h @@ -25,11 +25,9 @@ namespace milvus { namespace engine { -// TODO(zhiru): this class needs to be refactored once attributes are added - class VectorSource { public: - explicit VectorSource(const DataChunkPtr& chunk); + explicit VectorSource(const DataChunkPtr& chunk, idx_t op_id); Status Add(const segment::SegmentWriterPtr& segment_writer_ptr, const int64_t& num_attrs_to_add, int64_t& num_attrs_added); @@ -37,11 +35,16 @@ class VectorSource { bool AllAdded(); + idx_t + OperationID() const { + return op_id_; + } + private: DataChunkPtr chunk_; - + idx_t op_id_ = 0; int64_t current_num_added_ = 0; -}; // SSVectorSource +}; using VectorSourcePtr = std::shared_ptr; diff --git a/core/src/db/wal/WalManager.cpp b/core/src/db/wal/WalManager.cpp index 9d94e53c41..1f23b8a5e7 100644 --- a/core/src/db/wal/WalManager.cpp +++ b/core/src/db/wal/WalManager.cpp @@ -11,6 +11,9 @@ #include "db/wal/WalManager.h" #include "db/Utils.h" +#include "db/snapshot/ResourceHelper.h" +#include "db/snapshot/ResourceTypes.h" +#include "db/snapshot/Snapshots.h" #include "db/wal/WalOperationCodec.h" #include "utils/CommonUtil.h" @@ -67,13 +70,15 @@ Status WalManager::DropCollection(const std::string& collection_name) { // write a placeholder file 'del' under collection folder, let cleanup thread remove this folder std::string path = ConstructFilePath(collection_name, WAL_DEL_FILE_NAME); - WalFile file; - file.OpenFile(path, WalFile::OVER_WRITE); - bool del = true; - file.Write(&del); + if (!path.empty()) { + WalFile file; + file.OpenFile(path, WalFile::OVER_WRITE); + bool del = true; + file.Write(&del); - AddCleanupTask(collection_name); - StartCleanupThread(); + AddCleanupTask(collection_name); + StartCleanupThread(); + } return Status::OK(); } @@ -124,9 +129,11 @@ WalManager::OperationDone(const std::string& collection_name, idx_t op_id) { // write max op id to disk std::string path = ConstructFilePath(collection_name, WAL_MAX_OP_FILE_NAME); - WalFile file; - file.OpenFile(path, WalFile::OVER_WRITE); - file.Write(&op_id); + if (!path.empty()) { + WalFile file; + file.OpenFile(path, WalFile::OVER_WRITE); + file.Write(&op_id); + } } } @@ -256,9 +263,9 @@ WalManager::RecordInsertOperation(const InsertEntityOperationPtr& operation, con DataChunkPtr& chunk = chunks[i]; int64_t chunk_size = utils::GetSizeOfChunk(chunk); - { - // open wal file - std::string path = ConstructFilePath(operation->collection_name_, std::to_string(op_id)); + // open wal file + std::string path = ConstructFilePath(operation->collection_name_, std::to_string(op_id)); + if (!path.empty()) { std::lock_guard lock(file_map_mutex_); WalFilePtr file = file_map_[operation->collection_name_]; if (file == nullptr) { @@ -309,9 +316,9 @@ WalManager::RecordDeleteOperation(const DeleteEntityOperationPtr& operation, con idx_t op_id = id_gen_.GetNextIDNumber(); int64_t append_size = operation->entity_ids_.size() * sizeof(idx_t); - { - // open wal file - std::string path = ConstructFilePath(operation->collection_name_, std::to_string(op_id)); + // open wal file + std::string path = ConstructFilePath(operation->collection_name_, std::to_string(op_id)); + if (!path.empty()) { std::lock_guard lock(file_map_mutex_); WalFilePtr file = file_map_[operation->collection_name_]; if (file == nullptr) { @@ -339,17 +346,29 @@ WalManager::RecordDeleteOperation(const DeleteEntityOperationPtr& operation, con std::string WalManager::ConstructFilePath(const std::string& collection_name, const std::string& file_name) { - std::experimental::filesystem::path full_path(wal_path_); - std::experimental::filesystem::create_directory(full_path); - full_path.append(collection_name); - std::experimental::filesystem::create_directory(full_path); + // use snapshot to construct wal path + // typically, the wal file path is like: /xxx/xxx/wal/C_1/xxxxxxxxxx + // if the snapshot not work, use collection name to construct path + snapshot::ScopedSnapshotT ss; + auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name); + if (status.ok() && ss->GetCollection() != nullptr) { + std::string col_path = snapshot::GetResPath(wal_path_, ss->GetCollection()); - if (!file_name.empty()) { + std::experimental::filesystem::path full_path(col_path); + std::experimental::filesystem::create_directory(full_path); full_path.append(file_name); - } - std::string path(full_path.c_str()); - return path; + std::string path(full_path.c_str()); + return path; + } else { + std::experimental::filesystem::path full_path(wal_path_); + full_path.append(collection_name); + std::experimental::filesystem::create_directory(full_path); + full_path.append(file_name); + + std::string path(full_path.c_str()); + return path; + } } void diff --git a/core/unittest/db/test_wal.cpp b/core/unittest/db/test_wal.cpp index fe2ec881c6..54f96f4a4f 100644 --- a/core/unittest/db/test_wal.cpp +++ b/core/unittest/db/test_wal.cpp @@ -436,4 +436,4 @@ TEST_F(WalTest, WalManagerTest) { WalManager::GetInstance().Recovery(db_2); ASSERT_EQ(db_2->InsertCount(), insert_count); ASSERT_EQ(db_2->DeleteCount(), delete_count); -} \ No newline at end of file +}