diff --git a/core/src/db/DB.h b/core/src/db/DB.h index 9bc86d794b..397c6ce22a 100644 --- a/core/src/db/DB.h +++ b/core/src/db/DB.h @@ -91,16 +91,19 @@ class DB { virtual Status DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) = 0; + // op_id is for wal machinery, this id will be used in MemManager virtual Status - Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) = 0; + Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, + id_t op_id = 0) = 0; virtual Status GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, const std::vector& field_names, std::vector& valid_row, DataChunkPtr& data_chunk) = 0; + // op_id is for wal machinery, this id will be used in MemManager virtual Status - DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) = 0; + DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id = 0) = 0; virtual Status ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) = 0; diff --git a/core/src/db/DBFactory.cpp b/core/src/db/DBFactory.cpp index 3ce37a1ab9..1a86dcc132 100644 --- a/core/src/db/DBFactory.cpp +++ b/core/src/db/DBFactory.cpp @@ -11,8 +11,8 @@ #include "db/DBFactory.h" #include "db/DBImpl.h" -#include "db/transcript/Transcript.h" -#include "db/wal/WriteAheadLog.h" +#include "db/transcript/TranscriptProxy.h" +#include "db/wal/WalProxy.h" namespace milvus { namespace engine { @@ -23,12 +23,12 @@ DBFactory::BuildDB(const DBOptions& options) { // need wal? wal must be after db if (options.wal_enable_) { - db = std::make_shared(db, options); + db = std::make_shared(db, options); } // need transcript? transcript must be after wal if (options.transcript_enable_) { - db = std::make_shared(db, options); + db = std::make_shared(db, options); } return db; diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 19605bdffd..8e104e1784 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -412,7 +412,8 @@ DBImpl::DescribeIndex(const std::string& collection_name, const std::string& fie } Status -DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) { +DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, + id_t op_id) { CHECK_INITIALIZED; if (data_chunk == nullptr) { @@ -473,7 +474,7 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_ int64_t collection_id = ss->GetCollectionId(); int64_t partition_id = part->GetID(); - auto status = mem_mgr_->InsertEntities(collection_id, partition_id, data_chunk, 0); + auto status = mem_mgr_->InsertEntities(collection_id, partition_id, data_chunk, op_id); if (!status.ok()) { return status; } @@ -509,7 +510,7 @@ DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_ar } Status -DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) { +DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) { CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; @@ -519,7 +520,7 @@ DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNum return status; } - status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), entity_ids, 0); + status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), entity_ids, op_id); return status; } diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index e98669f12c..228dd5eb52 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -86,7 +86,8 @@ class DBImpl : public DB, public ConfigObserver { DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) override; Status - Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) override; + Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, + id_t op_id) override; Status GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, @@ -94,7 +95,7 @@ class DBImpl : public DB, public ConfigObserver { DataChunkPtr& data_chunk) override; Status - DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override; + DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) override; Status Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) override; diff --git a/core/src/db/DBProxy.cpp b/core/src/db/DBProxy.cpp index 3fabc7fa33..179a8fef58 100644 --- a/core/src/db/DBProxy.cpp +++ b/core/src/db/DBProxy.cpp @@ -121,9 +121,10 @@ DBProxy::DescribeIndex(const std::string& collection_name, const std::string& fi } Status -DBProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) { +DBProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, + id_t op_id) { DB_CHECK - return db_->Insert(collection_name, partition_name, data_chunk); + return db_->Insert(collection_name, partition_name, data_chunk, op_id); } Status @@ -135,9 +136,9 @@ DBProxy::GetEntityByID(const std::string& collection_name, const IDNumbers& id_a } Status -DBProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) { +DBProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) { DB_CHECK - return db_->DeleteEntityByID(collection_name, entity_ids); + return db_->DeleteEntityByID(collection_name, entity_ids, op_id); } Status diff --git a/core/src/db/DBProxy.h b/core/src/db/DBProxy.h index e86aee680c..ae9d0deede 100644 --- a/core/src/db/DBProxy.h +++ b/core/src/db/DBProxy.h @@ -75,7 +75,8 @@ class DBProxy : public DB { DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) override; Status - Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) override; + Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, + id_t op_id) override; Status GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, @@ -83,7 +84,7 @@ class DBProxy : public DB { DataChunkPtr& data_chunk) override; Status - DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override; + DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) override; Status ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) override; diff --git a/core/src/db/IDGenerator.h b/core/src/db/IDGenerator.h index b063303b4c..c44f27872a 100644 --- a/core/src/db/IDGenerator.h +++ b/core/src/db/IDGenerator.h @@ -57,6 +57,7 @@ class SafeIDGenerator : public IDGenerator { return instance; } + SafeIDGenerator() = default; ~SafeIDGenerator() override = default; id_t @@ -66,8 +67,6 @@ class SafeIDGenerator : public IDGenerator { GetNextIDNumbers(size_t n, IDNumbers& ids) override; private: - SafeIDGenerator() = default; - Status NextIDNumbers(size_t n, IDNumbers& ids); diff --git a/core/src/db/Utils.cpp b/core/src/db/Utils.cpp index 731d385726..77508e3563 100644 --- a/core/src/db/Utils.cpp +++ b/core/src/db/Utils.cpp @@ -146,6 +146,29 @@ GetIDFromChunk(const engine::DataChunkPtr& chunk, engine::IDNumbers& ids) { } } +int64_t +GetSizeOfChunk(const engine::DataChunkPtr& chunk) { + if (chunk == nullptr) { + return 0; + } + + int64_t total_size = 0; + for (auto& pair : chunk->fixed_fields_) { + if (pair.second == nullptr) { + continue; + } + total_size += pair.second->Size(); + } + for (auto& pair : chunk->variable_fields_) { + if (pair.second == nullptr) { + continue; + } + total_size += pair.second->Size(); + } + + return total_size; +} + } // namespace utils } // namespace engine } // namespace milvus diff --git a/core/src/db/Utils.h b/core/src/db/Utils.h index 2b0914444c..395b756309 100644 --- a/core/src/db/Utils.h +++ b/core/src/db/Utils.h @@ -55,6 +55,9 @@ SendExitSignal(); void GetIDFromChunk(const engine::DataChunkPtr& chunk, engine::IDNumbers& ids); +int64_t +GetSizeOfChunk(const engine::DataChunkPtr& chunk); + } // namespace utils } // namespace engine } // namespace milvus diff --git a/core/src/db/transcript/Transcript.cpp b/core/src/db/transcript/TranscriptProxy.cpp similarity index 54% rename from core/src/db/transcript/Transcript.cpp rename to core/src/db/transcript/TranscriptProxy.cpp index b86e74f053..96869496aa 100644 --- a/core/src/db/transcript/Transcript.cpp +++ b/core/src/db/transcript/TranscriptProxy.cpp @@ -9,7 +9,7 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. -#include "db/transcript/Transcript.h" +#include "db/transcript/TranscriptProxy.h" #include "db/transcript/ScriptCodec.h" #include "db/transcript/ScriptReplay.h" #include "utils/CommonUtil.h" @@ -20,7 +20,7 @@ namespace milvus { namespace engine { -Transcript::Transcript(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) { +TranscriptProxy::TranscriptProxy(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) { // db must implemented if (db == nullptr) { throw Exception(DB_ERROR, "null pointer"); @@ -28,7 +28,7 @@ Transcript::Transcript(const DBPtr& db, const DBOptions& options) : DBProxy(db, } Status -Transcript::Start() { +TranscriptProxy::Start() { // let service start auto status = db_->Start(); if (!status.ok()) { @@ -58,127 +58,130 @@ Transcript::Start() { } Status -Transcript::Stop() { +TranscriptProxy::Stop() { return db_->Stop(); } Status -Transcript::CreateCollection(const snapshot::CreateCollectionContext& context) { +TranscriptProxy::CreateCollection(const snapshot::CreateCollectionContext& context) { return db_->CreateCollection(context); } Status -Transcript::DropCollection(const std::string& name) { +TranscriptProxy::DropCollection(const std::string& name) { return db_->DropCollection(name); } Status -Transcript::HasCollection(const std::string& collection_name, bool& has_or_not) { +TranscriptProxy::HasCollection(const std::string& collection_name, bool& has_or_not) { return db_->HasCollection(collection_name, has_or_not); } Status -Transcript::ListCollections(std::vector& names) { +TranscriptProxy::ListCollections(std::vector& names) { return db_->ListCollections(names); } Status -Transcript::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection, - snapshot::FieldElementMappings& fields_schema) { +TranscriptProxy::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection, + snapshot::FieldElementMappings& fields_schema) { return db_->GetCollectionInfo(collection_name, collection, fields_schema); } Status -Transcript::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) { +TranscriptProxy::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) { return db_->GetCollectionStats(collection_name, collection_stats); } Status -Transcript::CountEntities(const std::string& collection_name, int64_t& row_count) { +TranscriptProxy::CountEntities(const std::string& collection_name, int64_t& row_count) { return db_->CountEntities(collection_name, row_count); } Status -Transcript::CreatePartition(const std::string& collection_name, const std::string& partition_name) { +TranscriptProxy::CreatePartition(const std::string& collection_name, const std::string& partition_name) { return db_->CreatePartition(collection_name, partition_name); } Status -Transcript::DropPartition(const std::string& collection_name, const std::string& partition_name) { +TranscriptProxy::DropPartition(const std::string& collection_name, const std::string& partition_name) { return db_->DropPartition(collection_name, partition_name); } Status -Transcript::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) { +TranscriptProxy::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) { return db_->HasPartition(collection_name, partition_tag, exist); } Status -Transcript::ListPartitions(const std::string& collection_name, std::vector& partition_names) { +TranscriptProxy::ListPartitions(const std::string& collection_name, std::vector& partition_names) { return db_->ListPartitions(collection_name, partition_names); } Status -Transcript::CreateIndex(const server::ContextPtr& context, const std::string& collection_name, - const std::string& field_name, const CollectionIndex& index) { +TranscriptProxy::CreateIndex(const server::ContextPtr& context, const std::string& collection_name, + const std::string& field_name, const CollectionIndex& index) { return db_->CreateIndex(context, collection_name, field_name, index); } Status -Transcript::DropIndex(const std::string& collection_name, const std::string& field_name) { +TranscriptProxy::DropIndex(const std::string& collection_name, const std::string& field_name) { return db_->DropIndex(collection_name, field_name); } Status -Transcript::DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) { +TranscriptProxy::DescribeIndex(const std::string& collection_name, const std::string& field_name, + CollectionIndex& index) { return db_->DescribeIndex(collection_name, field_name, index); } Status -Transcript::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) { +TranscriptProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, + id_t op_id) { return db_->Insert(collection_name, partition_name, data_chunk); } Status -Transcript::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, - const std::vector& field_names, std::vector& valid_row, - DataChunkPtr& data_chunk) { +TranscriptProxy::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, + const std::vector& field_names, std::vector& valid_row, + DataChunkPtr& data_chunk) { return db_->GetEntityByID(collection_name, id_array, field_names, valid_row, data_chunk); } Status -Transcript::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) { +TranscriptProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) { return db_->DeleteEntityByID(collection_name, entity_ids); } Status -Transcript::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) { +TranscriptProxy::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) { return db_->ListIDInSegment(collection_name, segment_id, entity_ids); } Status -Transcript::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) { +TranscriptProxy::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, + engine::QueryResultPtr& result) { return db_->Query(context, query_ptr, result); } Status -Transcript::LoadCollection(const server::ContextPtr& context, const std::string& collection_name, - const std::vector& field_names, bool force) { +TranscriptProxy::LoadCollection(const server::ContextPtr& context, const std::string& collection_name, + const std::vector& field_names, bool force) { return db_->LoadCollection(context, collection_name, field_names, force); } Status -Transcript::Flush(const std::string& collection_name) { +TranscriptProxy::Flush(const std::string& collection_name) { return db_->Flush(collection_name); } Status -Transcript::Flush() { +TranscriptProxy::Flush() { return db_->Flush(); } Status -Transcript::Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) { +TranscriptProxy::Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) { return db_->Compact(context, collection_name, threshold); } diff --git a/core/src/db/transcript/Transcript.h b/core/src/db/transcript/TranscriptProxy.h similarity index 93% rename from core/src/db/transcript/Transcript.h rename to core/src/db/transcript/TranscriptProxy.h index 3c499be233..19e6743abe 100644 --- a/core/src/db/transcript/Transcript.h +++ b/core/src/db/transcript/TranscriptProxy.h @@ -20,9 +20,9 @@ namespace milvus { namespace engine { -class Transcript : public DBProxy { +class TranscriptProxy : public DBProxy { public: - Transcript(const DBPtr& db, const DBOptions& options); + TranscriptProxy(const DBPtr& db, const DBOptions& options); Status Start() override; @@ -75,7 +75,8 @@ class Transcript : public DBProxy { DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) override; Status - Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) override; + Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, + id_t op_id) override; Status GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, @@ -83,7 +84,7 @@ class Transcript : public DBProxy { DataChunkPtr& data_chunk) override; Status - DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override; + DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) override; Status ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) override; diff --git a/core/src/db/wal/WalBuffer.cpp b/core/src/db/wal/WalBuffer.cpp deleted file mode 100644 index dbc975a7ec..0000000000 --- a/core/src/db/wal/WalBuffer.cpp +++ /dev/null @@ -1,679 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#include "db/wal/WalBuffer.h" - -#include -#include -#include - -#include "db/wal/WalDefinations.h" -#include "utils/Log.h" - -namespace milvus { -namespace engine { -namespace wal { - -inline std::string -ToFileName(int32_t file_no) { - return std::to_string(file_no) + ".wal"; -} - -inline void -BuildLsn(uint32_t file_no, uint32_t offset, uint64_t& lsn) { - lsn = (uint64_t)file_no << 32 | offset; -} - -inline void -ParserLsn(uint64_t lsn, uint32_t& file_no, uint32_t& offset) { - file_no = uint32_t(lsn >> 32); - offset = uint32_t(lsn & LSN_OFFSET_MASK); -} - -MXLogBuffer::MXLogBuffer(const std::string& mxlog_path, const uint32_t buffer_size) - : mxlog_buffer_size_(buffer_size * UNIT_MB), mxlog_writer_(mxlog_path) { -} - -MXLogBuffer::~MXLogBuffer() { -} - -/** - * alloc space for buffers - * @param buffer_size - * @return - */ -bool -MXLogBuffer::Init(uint64_t start_lsn, uint64_t end_lsn) { - LOG_WAL_DEBUG_ << "start_lsn " << start_lsn << " end_lsn " << end_lsn; - - ParserLsn(start_lsn, mxlog_buffer_reader_.file_no, mxlog_buffer_reader_.buf_offset); - ParserLsn(end_lsn, mxlog_buffer_writer_.file_no, mxlog_buffer_writer_.buf_offset); - - if (start_lsn == end_lsn) { - // no data need recovery, start a new file_no - if (mxlog_buffer_writer_.buf_offset != 0) { - mxlog_buffer_writer_.file_no++; - mxlog_buffer_writer_.buf_offset = 0; - mxlog_buffer_reader_.file_no++; - mxlog_buffer_reader_.buf_offset = 0; - } - } else { - // to check whether buffer_size is enough - MXLogFileHandler file_handler(mxlog_writer_.GetFilePath()); - - uint32_t buffer_size_need = 0; - for (auto i = mxlog_buffer_reader_.file_no; i < mxlog_buffer_writer_.file_no; i++) { - file_handler.SetFileName(ToFileName(i)); - auto file_size = file_handler.GetFileSize(); - if (file_size == 0) { - LOG_WAL_ERROR_ << "bad wal file " << i; - return false; - } - if (file_size > buffer_size_need) { - buffer_size_need = file_size; - } - } - if (mxlog_buffer_writer_.buf_offset > buffer_size_need) { - buffer_size_need = mxlog_buffer_writer_.buf_offset; - } - - if (buffer_size_need > mxlog_buffer_size_) { - mxlog_buffer_size_ = buffer_size_need; - LOG_WAL_INFO_ << "recovery will need more buffer, buffer size changed " << mxlog_buffer_size_; - } - } - - buf_[0] = BufferPtr(new char[mxlog_buffer_size_]); - buf_[1] = BufferPtr(new char[mxlog_buffer_size_]); - - if (mxlog_buffer_reader_.file_no == mxlog_buffer_writer_.file_no) { - // read-write buffer - mxlog_buffer_reader_.buf_idx = 0; - mxlog_buffer_writer_.buf_idx = 0; - - mxlog_writer_.SetFileName(ToFileName(mxlog_buffer_writer_.file_no)); - if (mxlog_buffer_writer_.buf_offset == 0) { - mxlog_writer_.SetFileOpenMode("w"); - - } else { - mxlog_writer_.SetFileOpenMode("r+"); - if (!mxlog_writer_.FileExists()) { - LOG_WAL_ERROR_ << "wal file not exist " << mxlog_buffer_writer_.file_no; - return false; - } - - auto read_offset = mxlog_buffer_reader_.buf_offset; - auto read_size = mxlog_buffer_writer_.buf_offset - mxlog_buffer_reader_.buf_offset; - if (!mxlog_writer_.Load(buf_[0].get() + read_offset, read_offset, read_size)) { - LOG_WAL_ERROR_ << "load wal file error " << read_offset << " " << read_size; - return false; - } - } - - } else { - // read buffer - mxlog_buffer_reader_.buf_idx = 0; - - MXLogFileHandler file_handler(mxlog_writer_.GetFilePath()); - file_handler.SetFileName(ToFileName(mxlog_buffer_reader_.file_no)); - file_handler.SetFileOpenMode("r"); - - auto read_offset = mxlog_buffer_reader_.buf_offset; - auto read_size = file_handler.Load(buf_[0].get() + read_offset, read_offset); - mxlog_buffer_reader_.max_offset = read_size + read_offset; - file_handler.CloseFile(); - - // write buffer - mxlog_buffer_writer_.buf_idx = 1; - - mxlog_writer_.SetFileName(ToFileName(mxlog_buffer_writer_.file_no)); - mxlog_writer_.SetFileOpenMode("r+"); - if (!mxlog_writer_.FileExists()) { - LOG_WAL_ERROR_ << "wal file not exist " << mxlog_buffer_writer_.file_no; - return false; - } - if (!mxlog_writer_.Load(buf_[1].get(), 0, mxlog_buffer_writer_.buf_offset)) { - LOG_WAL_ERROR_ << "load wal file error " << mxlog_buffer_writer_.file_no; - return false; - } - } - - SetFileNoFrom(mxlog_buffer_reader_.file_no); - - return true; -} - -void -MXLogBuffer::Reset(uint64_t lsn) { - LOG_WAL_DEBUG_ << "reset lsn " << lsn; - - buf_[0] = BufferPtr(new char[mxlog_buffer_size_]); - buf_[1] = BufferPtr(new char[mxlog_buffer_size_]); - - ParserLsn(lsn, mxlog_buffer_writer_.file_no, mxlog_buffer_writer_.buf_offset); - if (mxlog_buffer_writer_.buf_offset != 0) { - mxlog_buffer_writer_.file_no++; - mxlog_buffer_writer_.buf_offset = 0; - } - mxlog_buffer_writer_.buf_idx = 0; - - memcpy(&mxlog_buffer_reader_, &mxlog_buffer_writer_, sizeof(MXLogBufferHandler)); - - mxlog_writer_.CloseFile(); - mxlog_writer_.SetFileName(ToFileName(mxlog_buffer_writer_.file_no)); - mxlog_writer_.SetFileOpenMode("w"); - - SetFileNoFrom(mxlog_buffer_reader_.file_no); -} - -uint32_t -MXLogBuffer::GetBufferSize() { - return mxlog_buffer_size_; -} - -// buffer writer cares about surplus space of buffer -uint32_t -MXLogBuffer::SurplusSpace() { - return mxlog_buffer_size_ - mxlog_buffer_writer_.buf_offset; -} - -uint32_t -MXLogBuffer::RecordSize(const MXLogRecord& record) { - return SizeOfMXLogRecordHeader + (uint32_t)record.collection_id.size() + (uint32_t)record.partition_tag.size() + - record.length * (uint32_t)sizeof(id_t) + record.data_size; -} - -uint32_t -MXLogBuffer::EntityRecordSize(const milvus::engine::wal::MXLogRecord& record, uint32_t attr_num, - std::vector& field_name_size) { - uint32_t attr_header_size = 0; - attr_header_size += sizeof(uint32_t); - attr_header_size += attr_num * sizeof(uint64_t) * 3; - - uint32_t name_sizes = 0; - for (auto field_name : record.field_names) { - field_name_size.emplace_back(field_name.size()); - name_sizes += field_name.size(); - } - - uint64_t attr_size = 0; - auto attr_it = record.attr_data_size.begin(); - for (; attr_it != record.attr_data_size.end(); attr_it++) { - attr_size += attr_it->second; - } - - return RecordSize(record) + name_sizes + attr_size + attr_header_size; -} - -ErrorCode -MXLogBuffer::Append(MXLogRecord& record) { - uint32_t record_size = RecordSize(record); - if (SurplusSpace() < record_size) { - // writer buffer has no space, switch wal file and write to a new buffer - std::unique_lock lck(mutex_); - if (mxlog_buffer_writer_.buf_idx == mxlog_buffer_reader_.buf_idx) { - // swith writer buffer - mxlog_buffer_reader_.max_offset = mxlog_buffer_writer_.buf_offset; - mxlog_buffer_writer_.buf_idx ^= 1; - } - mxlog_buffer_writer_.file_no++; - mxlog_buffer_writer_.buf_offset = 0; - lck.unlock(); - - // Reborn means close old wal file and open new wal file - if (!mxlog_writer_.ReBorn(ToFileName(mxlog_buffer_writer_.file_no), "w")) { - LOG_WAL_ERROR_ << "ReBorn wal file error " << mxlog_buffer_writer_.file_no; - return WAL_FILE_ERROR; - } - } - - // point to the offset of current record in wal file - char* current_write_buf = buf_[mxlog_buffer_writer_.buf_idx].get(); - uint32_t current_write_offset = mxlog_buffer_writer_.buf_offset; - - MXLogRecordHeader head; - BuildLsn(mxlog_buffer_writer_.file_no, mxlog_buffer_writer_.buf_offset + (uint32_t)record_size, head.mxl_lsn); - head.mxl_type = (uint8_t)record.type; - head.collection_id_size = (uint16_t)record.collection_id.size(); - head.partition_tag_size = (uint16_t)record.partition_tag.size(); - head.vector_num = record.length; - head.data_size = record.data_size; - - memcpy(current_write_buf + current_write_offset, &head, SizeOfMXLogRecordHeader); - current_write_offset += SizeOfMXLogRecordHeader; - - if (!record.collection_id.empty()) { - memcpy(current_write_buf + current_write_offset, record.collection_id.data(), record.collection_id.size()); - current_write_offset += record.collection_id.size(); - } - - if (!record.partition_tag.empty()) { - memcpy(current_write_buf + current_write_offset, record.partition_tag.data(), record.partition_tag.size()); - current_write_offset += record.partition_tag.size(); - } - if (record.ids != nullptr && record.length > 0) { - memcpy(current_write_buf + current_write_offset, record.ids, record.length * sizeof(id_t)); - current_write_offset += record.length * sizeof(id_t); - } - - if (record.data != nullptr && record.data_size > 0) { - memcpy(current_write_buf + current_write_offset, record.data, record.data_size); - current_write_offset += record.data_size; - } - - bool write_rst = mxlog_writer_.Write(current_write_buf + mxlog_buffer_writer_.buf_offset, record_size); - if (!write_rst) { - LOG_WAL_ERROR_ << "write wal file error"; - return WAL_FILE_ERROR; - } - - mxlog_buffer_writer_.buf_offset = current_write_offset; - - record.lsn = head.mxl_lsn; - return WAL_SUCCESS; -} - -ErrorCode -MXLogBuffer::AppendEntity(milvus::engine::wal::MXLogRecord& record) { - std::vector field_name_size; - MXLogAttrRecordHeader attr_header; - attr_header.attr_num = 0; - for (auto name : record.field_names) { - attr_header.attr_num++; - attr_header.field_name_size.emplace_back(name.size()); - attr_header.attr_size.emplace_back(record.attr_data_size.at(name)); - attr_header.attr_nbytes.emplace_back(record.attr_nbytes.at(name)); - } - - uint32_t record_size = EntityRecordSize(record, attr_header.attr_num, field_name_size); - if (SurplusSpace() < record_size) { - // writer buffer has no space, switch wal file and write to a new buffer - std::unique_lock lck(mutex_); - if (mxlog_buffer_writer_.buf_idx == mxlog_buffer_reader_.buf_idx) { - // swith writer buffer - mxlog_buffer_reader_.max_offset = mxlog_buffer_writer_.buf_offset; - mxlog_buffer_writer_.buf_idx ^= 1; - } - mxlog_buffer_writer_.file_no++; - mxlog_buffer_writer_.buf_offset = 0; - lck.unlock(); - - // Reborn means close old wal file and open new wal file - if (!mxlog_writer_.ReBorn(ToFileName(mxlog_buffer_writer_.file_no), "w")) { - LOG_WAL_ERROR_ << "ReBorn wal file error " << mxlog_buffer_writer_.file_no; - return WAL_FILE_ERROR; - } - } - - // point to the offset of current record in wal file - char* current_write_buf = buf_[mxlog_buffer_writer_.buf_idx].get(); - uint32_t current_write_offset = mxlog_buffer_writer_.buf_offset; - - MXLogRecordHeader head; - BuildLsn(mxlog_buffer_writer_.file_no, mxlog_buffer_writer_.buf_offset + (uint32_t)record_size, head.mxl_lsn); - head.mxl_type = (uint8_t)record.type; - head.collection_id_size = (uint16_t)record.collection_id.size(); - head.partition_tag_size = (uint16_t)record.partition_tag.size(); - head.vector_num = record.length; - head.data_size = record.data_size; - - memcpy(current_write_buf + current_write_offset, &head, SizeOfMXLogRecordHeader); - current_write_offset += SizeOfMXLogRecordHeader; - - memcpy(current_write_buf + current_write_offset, &attr_header.attr_num, sizeof(int32_t)); - current_write_offset += sizeof(int32_t); - - memcpy(current_write_buf + current_write_offset, attr_header.field_name_size.data(), - sizeof(int64_t) * attr_header.attr_num); - current_write_offset += sizeof(int64_t) * attr_header.attr_num; - - memcpy(current_write_buf + current_write_offset, attr_header.attr_size.data(), - sizeof(int64_t) * attr_header.attr_num); - current_write_offset += sizeof(int64_t) * attr_header.attr_num; - - memcpy(current_write_buf + current_write_offset, attr_header.attr_nbytes.data(), - sizeof(int64_t) * attr_header.attr_num); - current_write_offset += sizeof(int64_t) * attr_header.attr_num; - - if (!record.collection_id.empty()) { - memcpy(current_write_buf + current_write_offset, record.collection_id.data(), record.collection_id.size()); - current_write_offset += record.collection_id.size(); - } - - if (!record.partition_tag.empty()) { - memcpy(current_write_buf + current_write_offset, record.partition_tag.data(), record.partition_tag.size()); - current_write_offset += record.partition_tag.size(); - } - if (record.ids != nullptr && record.length > 0) { - memcpy(current_write_buf + current_write_offset, record.ids, record.length * sizeof(id_t)); - current_write_offset += record.length * sizeof(id_t); - } - - if (record.data != nullptr && record.data_size > 0) { - memcpy(current_write_buf + current_write_offset, record.data, record.data_size); - current_write_offset += record.data_size; - } - - // Assign attr names - for (auto name : record.field_names) { - if (name.size() > 0) { - memcpy(current_write_buf + current_write_offset, name.data(), name.size()); - current_write_offset += name.size(); - } - } - - // Assign attr values - for (auto name : record.field_names) { - if (record.attr_data_size.at(name) != 0) { - memcpy(current_write_buf + current_write_offset, record.attr_data.at(name).data(), - record.attr_data_size.at(name)); - current_write_offset += record.attr_data_size.at(name); - } - } - - bool write_rst = mxlog_writer_.Write(current_write_buf + mxlog_buffer_writer_.buf_offset, record_size); - if (!write_rst) { - LOG_WAL_ERROR_ << "write wal file error"; - return WAL_FILE_ERROR; - } - - mxlog_buffer_writer_.buf_offset = current_write_offset; - - record.lsn = head.mxl_lsn; - return WAL_SUCCESS; -} - -ErrorCode -MXLogBuffer::Next(const uint64_t last_applied_lsn, MXLogRecord& record) { - // init output - record.type = MXLogType::None; - - // reader catch up to writer, no next record, read fail - if (GetReadLsn() >= last_applied_lsn) { - return WAL_SUCCESS; - } - - // otherwise, it means there must exists next record, in buffer or wal log - bool need_load_new = false; - std::unique_lock lck(mutex_); - if (mxlog_buffer_reader_.file_no != mxlog_buffer_writer_.file_no) { - if (mxlog_buffer_reader_.buf_offset == mxlog_buffer_reader_.max_offset) { // last record - mxlog_buffer_reader_.file_no++; - mxlog_buffer_reader_.buf_offset = 0; - need_load_new = (mxlog_buffer_reader_.file_no != mxlog_buffer_writer_.file_no); - if (!need_load_new) { - // read reach write buffer - mxlog_buffer_reader_.buf_idx = mxlog_buffer_writer_.buf_idx; - } - } - } - lck.unlock(); - - if (need_load_new) { - MXLogFileHandler mxlog_reader(mxlog_writer_.GetFilePath()); - mxlog_reader.SetFileName(ToFileName(mxlog_buffer_reader_.file_no)); - mxlog_reader.SetFileOpenMode("r"); - uint32_t file_size = mxlog_reader.Load(buf_[mxlog_buffer_reader_.buf_idx].get(), 0); - if (file_size == 0) { - LOG_WAL_ERROR_ << "load wal file error " << mxlog_buffer_reader_.file_no; - return WAL_FILE_ERROR; - } - mxlog_buffer_reader_.max_offset = file_size; - } - - char* current_read_buf = buf_[mxlog_buffer_reader_.buf_idx].get(); - uint64_t current_read_offset = mxlog_buffer_reader_.buf_offset; - - MXLogRecordHeader* head = (MXLogRecordHeader*)(current_read_buf + current_read_offset); - record.type = (MXLogType)head->mxl_type; - record.lsn = head->mxl_lsn; - record.length = head->vector_num; - record.data_size = head->data_size; - - current_read_offset += SizeOfMXLogRecordHeader; - - if (head->collection_id_size != 0) { - record.collection_id.assign(current_read_buf + current_read_offset, head->collection_id_size); - current_read_offset += head->collection_id_size; - } else { - record.collection_id = ""; - } - - if (head->partition_tag_size != 0) { - record.partition_tag.assign(current_read_buf + current_read_offset, head->partition_tag_size); - current_read_offset += head->partition_tag_size; - } else { - record.partition_tag = ""; - } - - if (head->vector_num != 0) { - record.ids = (id_t*)(current_read_buf + current_read_offset); - current_read_offset += head->vector_num * sizeof(id_t); - } else { - record.ids = nullptr; - } - - if (record.data_size != 0) { - record.data = current_read_buf + current_read_offset; - } else { - record.data = nullptr; - } - - mxlog_buffer_reader_.buf_offset = uint32_t(head->mxl_lsn & LSN_OFFSET_MASK); - return WAL_SUCCESS; -} - -ErrorCode -MXLogBuffer::NextEntity(const uint64_t last_applied_lsn, milvus::engine::wal::MXLogRecord& record) { - // init output - record.type = MXLogType::None; - - // reader catch up to writer, no next record, read fail - if (GetReadLsn() >= last_applied_lsn) { - return WAL_SUCCESS; - } - - // otherwise, it means there must exists next record, in buffer or wal log - bool need_load_new = false; - std::unique_lock lck(mutex_); - if (mxlog_buffer_reader_.file_no != mxlog_buffer_writer_.file_no) { - if (mxlog_buffer_reader_.buf_offset == mxlog_buffer_reader_.max_offset) { // last record - mxlog_buffer_reader_.file_no++; - mxlog_buffer_reader_.buf_offset = 0; - need_load_new = (mxlog_buffer_reader_.file_no != mxlog_buffer_writer_.file_no); - if (!need_load_new) { - // read reach write buffer - mxlog_buffer_reader_.buf_idx = mxlog_buffer_writer_.buf_idx; - } - } - } - lck.unlock(); - - if (need_load_new) { - MXLogFileHandler mxlog_reader(mxlog_writer_.GetFilePath()); - mxlog_reader.SetFileName(ToFileName(mxlog_buffer_reader_.file_no)); - mxlog_reader.SetFileOpenMode("r"); - uint32_t file_size = mxlog_reader.Load(buf_[mxlog_buffer_reader_.buf_idx].get(), 0); - if (file_size == 0) { - LOG_WAL_ERROR_ << "load wal file error " << mxlog_buffer_reader_.file_no; - return WAL_FILE_ERROR; - } - mxlog_buffer_reader_.max_offset = file_size; - } - - char* current_read_buf = buf_[mxlog_buffer_reader_.buf_idx].get(); - uint64_t current_read_offset = mxlog_buffer_reader_.buf_offset; - - MXLogRecordHeader* head = (MXLogRecordHeader*)(current_read_buf + current_read_offset); - - record.type = (MXLogType)head->mxl_type; - record.lsn = head->mxl_lsn; - record.length = head->vector_num; - record.data_size = head->data_size; - - current_read_offset += SizeOfMXLogRecordHeader; - - MXLogAttrRecordHeader attr_head; - - memcpy(&attr_head.attr_num, current_read_buf + current_read_offset, sizeof(uint32_t)); - current_read_offset += sizeof(uint32_t); - - attr_head.attr_size.resize(attr_head.attr_num); - attr_head.field_name_size.resize(attr_head.attr_num); - attr_head.attr_nbytes.resize(attr_head.attr_num); - memcpy(attr_head.field_name_size.data(), current_read_buf + current_read_offset, - sizeof(uint64_t) * attr_head.attr_num); - current_read_offset += sizeof(uint64_t) * attr_head.attr_num; - - memcpy(attr_head.attr_size.data(), current_read_buf + current_read_offset, sizeof(uint64_t) * attr_head.attr_num); - current_read_offset += sizeof(uint64_t) * attr_head.attr_num; - - memcpy(attr_head.attr_nbytes.data(), current_read_buf + current_read_offset, sizeof(uint64_t) * attr_head.attr_num); - current_read_offset += sizeof(uint64_t) * attr_head.attr_num; - - if (head->collection_id_size != 0) { - record.collection_id.assign(current_read_buf + current_read_offset, head->collection_id_size); - current_read_offset += head->collection_id_size; - } else { - record.collection_id = ""; - } - - if (head->partition_tag_size != 0) { - record.partition_tag.assign(current_read_buf + current_read_offset, head->partition_tag_size); - current_read_offset += head->partition_tag_size; - } else { - record.partition_tag = ""; - } - - if (head->vector_num != 0) { - record.ids = (id_t*)(current_read_buf + current_read_offset); - current_read_offset += head->vector_num * sizeof(id_t); - } else { - record.ids = nullptr; - } - - if (record.data_size != 0) { - record.data = current_read_buf + current_read_offset; - current_read_offset += record.data_size; - } else { - record.data = nullptr; - } - - // Read field names - auto attr_num = attr_head.attr_num; - record.field_names.clear(); - if (attr_num > 0) { - for (auto size : attr_head.field_name_size) { - if (size != 0) { - std::string name; - name.assign(current_read_buf + current_read_offset, size); - record.field_names.emplace_back(name); - current_read_offset += size; - } else { - record.field_names.emplace_back(""); - } - } - } - - // Read attributes data - record.attr_data.clear(); - record.attr_data_size.clear(); - record.attr_nbytes.clear(); - if (attr_num > 0) { - for (uint64_t i = 0; i < attr_num; ++i) { - auto attr_size = attr_head.attr_size[i]; - record.attr_data_size.insert(std::make_pair(record.field_names[i], attr_size)); - record.attr_nbytes.insert(std::make_pair(record.field_names[i], attr_head.attr_nbytes[i])); - std::vector data(attr_size); - memcpy(data.data(), current_read_buf + current_read_offset, attr_size); - record.attr_data.insert(std::make_pair(record.field_names[i], data)); - current_read_offset += attr_size; - } - } - - mxlog_buffer_reader_.buf_offset = uint32_t(head->mxl_lsn & LSN_OFFSET_MASK); - return WAL_SUCCESS; -} - -uint64_t -MXLogBuffer::GetReadLsn() { - uint64_t read_lsn; - BuildLsn(mxlog_buffer_reader_.file_no, mxlog_buffer_reader_.buf_offset, read_lsn); - return read_lsn; -} - -bool -MXLogBuffer::ResetWriteLsn(uint64_t lsn) { - LOG_WAL_INFO_ << "reset write lsn " << lsn; - - uint32_t old_file_no = mxlog_buffer_writer_.file_no; - ParserLsn(lsn, mxlog_buffer_writer_.file_no, mxlog_buffer_writer_.buf_offset); - if (old_file_no == mxlog_buffer_writer_.file_no) { - LOG_WAL_DEBUG_ << "file No. is not changed"; - return true; - } - - std::unique_lock lck(mutex_); - if (mxlog_buffer_writer_.file_no == mxlog_buffer_reader_.file_no) { - mxlog_buffer_writer_.buf_idx = mxlog_buffer_reader_.buf_idx; - LOG_WAL_DEBUG_ << "file No. is the same as reader"; - return true; - } - lck.unlock(); - - if (!mxlog_writer_.ReBorn(ToFileName(mxlog_buffer_writer_.file_no), "r+")) { - LOG_WAL_ERROR_ << "reborn file error " << mxlog_buffer_writer_.file_no; - return false; - } - if (!mxlog_writer_.Load(buf_[mxlog_buffer_writer_.buf_idx].get(), 0, mxlog_buffer_writer_.buf_offset)) { - LOG_WAL_ERROR_ << "load file error"; - return false; - } - - return true; -} - -void -MXLogBuffer::SetFileNoFrom(uint32_t file_no) { - file_no_from_ = file_no; - - if (file_no > 0) { - // remove the files whose No. are less than file_no - MXLogFileHandler file_handler(mxlog_writer_.GetFilePath()); - do { - file_handler.SetFileName(ToFileName(--file_no)); - if (!file_handler.FileExists()) { - break; - } - LOG_WAL_INFO_ << "Delete wal file " << file_no; - file_handler.DeleteFile(); - } while (file_no > 0); - } -} - -void -MXLogBuffer::RemoveOldFiles(uint64_t flushed_lsn) { - uint32_t file_no; - uint32_t offset; - ParserLsn(flushed_lsn, file_no, offset); - if (file_no_from_ < file_no) { - MXLogFileHandler file_handler(mxlog_writer_.GetFilePath()); - do { - file_handler.SetFileName(ToFileName(file_no_from_)); - LOG_WAL_INFO_ << "Delete wal file " << file_no_from_; - file_handler.DeleteFile(); - } while (++file_no_from_ < file_no); - } -} - -} // namespace wal -} // namespace engine -} // namespace milvus diff --git a/core/src/db/wal/WalBuffer.h b/core/src/db/wal/WalBuffer.h deleted file mode 100644 index 9e421b84a2..0000000000 --- a/core/src/db/wal/WalBuffer.h +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include - -#include "WalDefinations.h" -#include "WalFileHandler.h" -#include "WalMetaHandler.h" -#include "utils/Error.h" - -namespace milvus { -namespace engine { -namespace wal { - -#pragma pack(push) -#pragma pack(1) - -struct MXLogRecordHeader { - uint64_t mxl_lsn; // log sequence number (high 32 bits: file No. inc by 1, low 32 bits: offset in file, max 4GB) - uint8_t mxl_type; // record type, insert/delete/update/flush... - uint16_t collection_id_size; - uint16_t partition_tag_size; - uint32_t vector_num; - uint32_t data_size; -}; - -const uint32_t SizeOfMXLogRecordHeader = sizeof(MXLogRecordHeader); - -struct MXLogAttrRecordHeader { - uint32_t attr_num; - std::vector field_name_size; - std::vector attr_size; - std::vector attr_nbytes; -}; - -#pragma pack(pop) - -struct MXLogBufferHandler { - uint32_t max_offset; - uint32_t file_no; - uint32_t buf_offset; - uint8_t buf_idx; -}; - -using BufferPtr = std::shared_ptr; - -class MXLogBuffer { - public: - MXLogBuffer(const std::string& mxlog_path, const uint32_t buffer_size); - ~MXLogBuffer(); - - bool - Init(uint64_t read_lsn, uint64_t write_lsn); - - // ignore all old wal file - void - Reset(uint64_t lsn); - - // Note: record.lsn will be set inner - ErrorCode - Append(MXLogRecord& record); - - ErrorCode - AppendEntity(MXLogRecord& record); - - ErrorCode - Next(const uint64_t last_applied_lsn, MXLogRecord& record); - - ErrorCode - NextEntity(const uint64_t last_applied_lsn, MXLogRecord& record); - - uint64_t - GetReadLsn(); - - bool - ResetWriteLsn(uint64_t lsn); - - void - SetFileNoFrom(uint32_t file_no); - - void - RemoveOldFiles(uint64_t flushed_lsn); - - uint32_t - GetBufferSize(); - - uint32_t - SurplusSpace(); - - private: - uint32_t - RecordSize(const MXLogRecord& record); - - uint32_t - EntityRecordSize(const milvus::engine::wal::MXLogRecord& record, uint32_t attr_num, - std::vector& field_name_size); - - private: - uint32_t mxlog_buffer_size_; // from config - BufferPtr buf_[2]; - std::mutex mutex_; - uint32_t file_no_from_; - MXLogBufferHandler mxlog_buffer_reader_; - MXLogBufferHandler mxlog_buffer_writer_; - MXLogFileHandler mxlog_writer_; -}; - -using MXLogBufferPtr = std::shared_ptr; - -} // namespace wal -} // namespace engine -} // namespace milvus diff --git a/core/src/db/wal/WalDefinations.h b/core/src/db/wal/WalDefinations.h deleted file mode 100644 index 0cf3bdbf45..0000000000 --- a/core/src/db/wal/WalDefinations.h +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#pragma once - -#include -#include -#include -#include - -#include "db/Types.h" -#include "segment/Segment.h" - -namespace milvus { -namespace engine { -namespace wal { - -#define UNIT_MB (1024 * 1024) -#define UNIT_B 1 -#define LSN_OFFSET_MASK 0x00000000ffffffff - -enum class MXLogType { None, InsertBinary, InsertVector, Delete, Update, Flush, Entity }; - -struct MXLogRecord { - uint64_t lsn; - MXLogType type; - std::string collection_id; - std::string partition_tag; - uint32_t length; - const id_t* ids; - uint32_t data_size; - const void* data; - std::vector field_names; // will be removed - // std::vector attrs_size; - // std::vector attrs_data; - std::unordered_map attr_nbytes; // will be removed - std::unordered_map attr_data_size; // will be removed - std::unordered_map> attr_data; // will be removed - - engine::DataChunkPtr data_chunk; // for hybird data transfer -}; - -struct MXLogConfiguration { - bool recovery_error_ignore; - uint32_t buffer_size; - std::string mxlog_path; -}; - -} // namespace wal -} // namespace engine -} // namespace milvus diff --git a/core/src/db/wal/WalFileHandler.cpp b/core/src/db/wal/WalFileHandler.cpp deleted file mode 100644 index ecc7d1fc2a..0000000000 --- a/core/src/db/wal/WalFileHandler.cpp +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#include "db/wal/WalFileHandler.h" -#include "utils/Log.h" - -#include -#include - -namespace milvus { -namespace engine { -namespace wal { - -MXLogFileHandler::MXLogFileHandler(const std::string& mxlog_path) : file_path_(mxlog_path), p_file_(nullptr) { -} - -MXLogFileHandler::~MXLogFileHandler() { - CloseFile(); -} - -bool -MXLogFileHandler::OpenFile() { - if (p_file_ == nullptr) { - p_file_ = fopen((file_path_ + file_name_).c_str(), file_mode_.c_str()); - } - return (p_file_ != nullptr); -} - -uint32_t -MXLogFileHandler::Load(char* buf, uint32_t data_offset) { - uint32_t read_size = 0; - if (OpenFile()) { - uint32_t file_size = GetFileSize(); - if (file_size > data_offset) { - read_size = file_size - data_offset; - fseek(p_file_, data_offset, SEEK_SET); - auto ret = fread(buf, 1, read_size, p_file_); - if (ret != read_size) { - LOG_WAL_ERROR_ << LogOut("MXLogFileHandler::Load error, expect read %d but read %d", read_size, ret); - } - } - } - return read_size; -} - -bool -MXLogFileHandler::Load(char* buf, uint32_t data_offset, uint32_t data_size) { - if (OpenFile() && data_size != 0) { - auto file_size = GetFileSize(); - if ((file_size < data_offset) || (file_size - data_offset < data_size)) { - return false; - } - - fseek(p_file_, data_offset, SEEK_SET); - size_t ret = fread(buf, 1, data_size, p_file_); - if (ret != data_size) { - LOG_WAL_ERROR_ << LogOut("MXLogFileHandler::Load error, expect read %d but read %d", data_size, ret); - } - } - return true; -} - -bool -MXLogFileHandler::Write(char* buf, uint32_t data_size, bool is_sync) { - uint32_t written_size = 0; - if (OpenFile() && data_size != 0) { - written_size = fwrite(buf, 1, data_size, p_file_); - fflush(p_file_); - } - return (written_size == data_size); -} - -bool -MXLogFileHandler::ReBorn(const std::string& file_name, const std::string& open_mode) { - CloseFile(); - SetFileName(file_name); - SetFileOpenMode(open_mode); - return OpenFile(); -} - -bool -MXLogFileHandler::CloseFile() { - if (p_file_ != nullptr) { - fclose(p_file_); - p_file_ = nullptr; - } - return true; -} - -std::string -MXLogFileHandler::GetFilePath() { - return file_path_; -} - -std::string -MXLogFileHandler::GetFileName() { - return file_name_; -} - -uint32_t -MXLogFileHandler::GetFileSize() { - struct stat statbuf; - if (0 == stat((file_path_ + file_name_).c_str(), &statbuf)) { - return (uint32_t)statbuf.st_size; - } - - return 0; -} - -void -MXLogFileHandler::DeleteFile() { - remove((file_path_ + file_name_).c_str()); - file_name_ = ""; -} - -bool -MXLogFileHandler::FileExists() { - return access((file_path_ + file_name_).c_str(), 0) != -1; -} - -void -MXLogFileHandler::SetFileOpenMode(const std::string& open_mode) { - file_mode_ = open_mode; -} - -void -MXLogFileHandler::SetFileName(const std::string& file_name) { - file_name_ = file_name; -} - -void -MXLogFileHandler::SetFilePath(const std::string& file_path) { - file_path_ = file_path; -} - -} // namespace wal -} // namespace engine -} // namespace milvus diff --git a/core/src/db/wal/WalFileHandler.h b/core/src/db/wal/WalFileHandler.h deleted file mode 100644 index 5eb2add2e7..0000000000 --- a/core/src/db/wal/WalFileHandler.h +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#pragma once - -#include - -#include "WalDefinations.h" - -namespace milvus { -namespace engine { -namespace wal { - -class MXLogFileHandler { - public: - explicit MXLogFileHandler(const std::string& mxlog_path); - ~MXLogFileHandler(); - - std::string - GetFilePath(); - std::string - GetFileName(); - bool - OpenFile(); - bool - CloseFile(); - uint32_t - Load(char* buf, uint32_t data_offset); - bool - Load(char* buf, uint32_t data_offset, uint32_t data_size); - bool - Write(char* buf, uint32_t data_size, bool is_sync = false); - bool - ReBorn(const std::string& file_name, const std::string& open_mode); - uint32_t - GetFileSize(); - void - SetFileOpenMode(const std::string& open_mode); - void - SetFilePath(const std::string& file_path); - void - SetFileName(const std::string& file_name); - void - DeleteFile(); - bool - FileExists(); - - private: - std::string file_path_; - std::string file_name_; - std::string file_mode_; - FILE* p_file_; -}; - -} // namespace wal -} // namespace engine -} // namespace milvus diff --git a/core/src/db/wal/WalManager.cpp b/core/src/db/wal/WalManager.cpp index 9663556c22..c3c91264e0 100644 --- a/core/src/db/wal/WalManager.cpp +++ b/core/src/db/wal/WalManager.cpp @@ -10,693 +10,77 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/wal/WalManager.h" - -#include - -#include -#include -#include - #include "config/ServerConfig.h" -#include "db/snapshot/Snapshots.h" -#include "utils/CommonUtil.h" -#include "utils/Exception.h" -#include "utils/Log.h" +#include "db/Utils.h" namespace milvus { namespace engine { -namespace wal { -WalManager::WalManager(const MXLogConfiguration& config) { - __glibcxx_assert(config.buffer_size <= milvus::server::CONFIG_WAL_BUFFER_SIZE_MAX / 2); - __glibcxx_assert(config.buffer_size >= milvus::server::CONFIG_WAL_BUFFER_SIZE_MIN / 2); - - mxlog_config_.recovery_error_ignore = config.recovery_error_ignore; - mxlog_config_.buffer_size = config.buffer_size; - mxlog_config_.mxlog_path = config.mxlog_path; - - // check the path end with '/' - if (mxlog_config_.mxlog_path.back() != '/') { - mxlog_config_.mxlog_path += '/'; - } - // check path exist - auto status = CommonUtil::CreateDirectory(mxlog_config_.mxlog_path); - if (!status.ok()) { - std::string msg = "failed to create wal directory " + mxlog_config_.mxlog_path; - LOG_ENGINE_ERROR_ << msg; - throw Exception(WAL_PATH_ERROR, msg); - } +WalManager::WalManager() { + wal_path_ = config.wal.path(); + wal_buffer_size_ = config.wal.buffer_size(); + insert_buffer_size_ = config.cache.insert_buffer_size(); } -WalManager::~WalManager() { +WalManager& +WalManager::GetInstance() { + static WalManager s_mgr; + return s_mgr; } -ErrorCode -WalManager::Init() { - uint64_t applied_lsn = 0; - p_meta_handler_ = std::make_shared(mxlog_config_.mxlog_path); - if (p_meta_handler_ != nullptr) { - p_meta_handler_->GetMXLogInternalMeta(applied_lsn); +Status +WalManager::RecordOperation(const WalOperationPtr& operation, const DBPtr& db) { + if (operation == nullptr) { + return Status(DB_ERROR, "Wal operation is null pointer"); } - uint64_t recovery_start = 0; - std::vector collection_names; - auto status = snapshot::Snapshots::GetInstance().GetCollectionNames(collection_names); - if (!status.ok()) { - return WAL_META_ERROR; - } - - if (!collection_names.empty()) { - u_int64_t min_flushed_lsn = ~(u_int64_t)0; - u_int64_t max_flushed_lsn = 0; - auto update_limit_lsn = [&](u_int64_t lsn) { - if (min_flushed_lsn > lsn) { - min_flushed_lsn = lsn; - } - if (max_flushed_lsn < lsn) { - max_flushed_lsn = lsn; - } - }; - - for (auto& col_name : collection_names) { - auto& collection = collections_[col_name]; - auto& default_part = collection[""]; - - snapshot::ScopedSnapshotT ss; - status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, col_name); - if (!status.ok()) { - return WAL_META_ERROR; - } - - default_part.flush_lsn = ss->GetMaxLsn(); - update_limit_lsn(default_part.flush_lsn); - - std::vector partition_names = ss->GetPartitionNames(); - for (auto& part_name : partition_names) { - auto& partition = collection[part_name]; - - snapshot::PartitionPtr ss_part = ss->GetPartition(part_name); - if (ss_part == nullptr) { - return WAL_META_ERROR; - } - - partition.flush_lsn = ss_part->GetLsn(); - update_limit_lsn(partition.flush_lsn); - } - } - - if (applied_lsn < max_flushed_lsn) { - // a new WAL folder? - applied_lsn = max_flushed_lsn; - } - if (recovery_start < min_flushed_lsn) { - // not flush all yet - recovery_start = min_flushed_lsn; - } - - for (auto& col : collections_) { - for (auto& part : col.second) { - part.second.wal_lsn = applied_lsn; - } - } - } - - // all tables are droped and a new wal path? - if (applied_lsn < recovery_start) { - applied_lsn = recovery_start; - } - - ErrorCode error_code = WAL_ERROR; - p_buffer_ = std::make_shared(mxlog_config_.mxlog_path, mxlog_config_.buffer_size); - if (p_buffer_ != nullptr) { - if (p_buffer_->Init(recovery_start, applied_lsn)) { - error_code = WAL_SUCCESS; - } else if (mxlog_config_.recovery_error_ignore) { - p_buffer_->Reset(applied_lsn); - error_code = WAL_SUCCESS; - } else { - error_code = WAL_FILE_ERROR; - } - } - - // buffer size may changed - mxlog_config_.buffer_size = p_buffer_->GetBufferSize(); - - last_applied_lsn_ = applied_lsn; - return error_code; -} - -ErrorCode -WalManager::GetNextRecovery(MXLogRecord& record) { - ErrorCode error_code = WAL_SUCCESS; - while (true) { - error_code = p_buffer_->Next(last_applied_lsn_, record); - if (error_code != WAL_SUCCESS) { - if (mxlog_config_.recovery_error_ignore) { - // reset and break recovery - p_buffer_->Reset(last_applied_lsn_); - - record.type = MXLogType::None; - error_code = WAL_SUCCESS; - } + Status status; + switch (operation->Type()) { + case WalOperationType::INSERT_ENTITY: { + InsertEntityOperationPtr op = std::static_pointer_cast(operation); + status = RecordInsertOperation(op, db); break; } - if (record.type == MXLogType::None) { + case WalOperationType::DELETE_ENTITY: { + DeleteEntityOperationPtr op = std::static_pointer_cast(operation); + status = RecordDeleteOperation(op, db); break; } - - // background thread has not started. - // so, needn't lock here. - auto it_col = collections_.find(record.collection_id); - if (it_col != collections_.end()) { - auto it_part = it_col->second.find(record.partition_tag); - if (it_part->second.flush_lsn < record.lsn) { - break; - } - } - } - - // print the log only when record.type != MXLogType::None - if (record.type != MXLogType::None) { - LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " record lsn " << record.lsn << " error code " - << error_code; - } - - return error_code; -} - -ErrorCode -WalManager::GetNextEntityRecovery(milvus::engine::wal::MXLogRecord& record) { - ErrorCode error_code = WAL_SUCCESS; - while (true) { - error_code = p_buffer_->NextEntity(last_applied_lsn_, record); - if (error_code != WAL_SUCCESS) { - if (mxlog_config_.recovery_error_ignore) { - // reset and break recovery - p_buffer_->Reset(last_applied_lsn_); - - record.type = MXLogType::None; - error_code = WAL_SUCCESS; - } + default: break; - } - if (record.type == MXLogType::None) { - break; - } - - // background thread has not started. - // so, needn't lock here. - auto it_col = collections_.find(record.collection_id); - if (it_col != collections_.end()) { - auto it_part = it_col->second.find(record.partition_tag); - if (it_part->second.flush_lsn < record.lsn) { - break; - } - } } - // print the log only when record.type != MXLogType::None - if (record.type != MXLogType::None) { - LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " record lsn " << record.lsn << " error code " - << error_code; - } - - return error_code; + return Status::OK(); } -ErrorCode -WalManager::GetNextRecord(MXLogRecord& record) { - auto check_flush = [&]() -> bool { - std::lock_guard lck(mutex_); - if (flush_info_.IsValid()) { - if (p_buffer_->GetReadLsn() >= flush_info_.lsn_) { - // can exec flush requirement - record.type = MXLogType::Flush; - record.collection_id = flush_info_.collection_id_; - record.lsn = flush_info_.lsn_; - flush_info_.Clear(); +Status +WalManager::RecordInsertOperation(const InsertEntityOperationPtr& operation, const DBPtr& db) { + std::vector chunks; + SplitChunk(operation->data_chunk_, chunks); - LOG_WAL_INFO_ << "record flush collection " << record.collection_id << " lsn " << record.lsn; - return true; - } - } - return false; - }; - - if (check_flush()) { - return WAL_SUCCESS; - } - - ErrorCode error_code = WAL_SUCCESS; - while (WAL_SUCCESS == p_buffer_->Next(last_applied_lsn_, record)) { - if (record.type == MXLogType::None) { - if (check_flush()) { - return WAL_SUCCESS; - } - break; - } - - std::lock_guard lck(mutex_); - auto it_col = collections_.find(record.collection_id); - if (it_col != collections_.end()) { - auto it_part = it_col->second.find(record.partition_tag); - if (it_part->second.flush_lsn < record.lsn) { - break; - } - } - } - - if (record.type != MXLogType::None) { - LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " collection " << record.collection_id << " lsn " - << record.lsn; - } - return error_code; + return Status::OK(); } -ErrorCode -WalManager::GetNextEntityRecord(milvus::engine::wal::MXLogRecord& record) { - auto check_flush = [&]() -> bool { - std::lock_guard lck(mutex_); - if (flush_info_.IsValid()) { - if (p_buffer_->GetReadLsn() >= flush_info_.lsn_) { - // can exec flush requirement - record.type = MXLogType::Flush; - record.collection_id = flush_info_.collection_id_; - record.lsn = flush_info_.lsn_; - flush_info_.Clear(); - - LOG_WAL_INFO_ << "record flush collection " << record.collection_id << " lsn " << record.lsn; - return true; - } - } - return false; - }; - - if (check_flush()) { - return WAL_SUCCESS; - } - - ErrorCode error_code = WAL_SUCCESS; - while (WAL_SUCCESS == p_buffer_->NextEntity(last_applied_lsn_, record)) { - if (record.type == MXLogType::None) { - if (check_flush()) { - return WAL_SUCCESS; - } - break; - } - - std::lock_guard lck(mutex_); - auto it_col = collections_.find(record.collection_id); - if (it_col != collections_.end()) { - auto it_part = it_col->second.find(record.partition_tag); - if (it_part->second.flush_lsn < record.lsn) { - break; - } - } - } - - LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " collection " << record.collection_id << " lsn " - << record.lsn; - return error_code; -} - -uint64_t -WalManager::CreateCollection(const std::string& collection_id) { - LOG_WAL_INFO_ << "create collection " << collection_id << " " << last_applied_lsn_; - std::lock_guard lck(mutex_); - uint64_t applied_lsn = last_applied_lsn_; - collections_[collection_id][""] = {applied_lsn, applied_lsn}; - return applied_lsn; -} - -uint64_t -WalManager::CreatePartition(const std::string& collection_id, const std::string& partition_tag) { - LOG_WAL_INFO_ << "create collection " << collection_id << " " << partition_tag << " " << last_applied_lsn_; - std::lock_guard lck(mutex_); - uint64_t applied_lsn = last_applied_lsn_; - collections_[collection_id][partition_tag] = {applied_lsn, applied_lsn}; - return applied_lsn; -} - -uint64_t -WalManager::CreateHybridCollection(const std::string& collection_id) { - LOG_WAL_INFO_ << "create hybrid collection " << collection_id << " " << last_applied_lsn_; - std::lock_guard lck(mutex_); - uint64_t applied_lsn = last_applied_lsn_; - collections_[collection_id][""] = {applied_lsn, applied_lsn}; - return applied_lsn; -} - -void -WalManager::DropCollection(const std::string& collection_id) { - LOG_WAL_INFO_ << "drop collection " << collection_id; - std::lock_guard lck(mutex_); - collections_.erase(collection_id); -} - -void -WalManager::DropPartition(const std::string& collection_id, const std::string& partition_tag) { - LOG_WAL_INFO_ << collection_id << " drop partition " << partition_tag; - std::lock_guard lck(mutex_); - auto it = collections_.find(collection_id); - if (it != collections_.end()) { - it->second.erase(partition_tag); - } -} - -void -WalManager::CollectionFlushed(const std::string& collection_id, uint64_t lsn) { - std::unique_lock lck(mutex_); - if (collection_id.empty()) { - // all collections - for (auto& col : collections_) { - for (auto& part : col.second) { - part.second.flush_lsn = lsn; - } - } - +Status +WalManager::SplitChunk(const DataChunkPtr& chunk, std::vector& chunks) { + int64_t chunk_size = utils::GetSizeOfChunk(chunk); + if (chunk_size > insert_buffer_size_) { } else { - // one collection - auto it_col = collections_.find(collection_id); - if (it_col != collections_.end()) { - for (auto& part : it_col->second) { - part.second.flush_lsn = lsn; - } - } + chunks.push_back(chunk); } - lck.unlock(); - LOG_WAL_INFO_ << collection_id << " is flushed by lsn " << lsn; + return Status::OK(); } -void -WalManager::PartitionFlushed(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn) { - std::unique_lock lck(mutex_); - auto it_col = collections_.find(collection_id); - if (it_col != collections_.end()) { - auto it_part = it_col->second.find(partition_tag); - if (it_part != it_col->second.end()) { - it_part->second.flush_lsn = lsn; - } - } - lck.unlock(); - - LOG_WAL_INFO_ << collection_id << " " << partition_tag << " is flushed by lsn " << lsn; +Status +WalManager::RecordDeleteOperation(const DeleteEntityOperationPtr& operation, const DBPtr& db) { + return Status::OK(); } -void -WalManager::CollectionUpdated(const std::string& collection_id, uint64_t lsn) { - std::unique_lock lck(mutex_); - auto it_col = collections_.find(collection_id); - if (it_col != collections_.end()) { - for (auto& part : it_col->second) { - part.second.wal_lsn = lsn; - } - } - lck.unlock(); +Status +WalManager::OperationDone(id_t op_id) { + return Status::OK(); } -void -WalManager::PartitionUpdated(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn) { - std::unique_lock lck(mutex_); - auto it_col = collections_.find(collection_id); - if (it_col != collections_.end()) { - auto it_part = it_col->second.find(partition_tag); - if (it_part != it_col->second.end()) { - it_part->second.wal_lsn = lsn; - } - } - lck.unlock(); -} - -template -bool -WalManager::Insert(const std::string& collection_id, const std::string& partition_tag, const IDNumbers& vector_ids, - const std::vector& vectors) { - MXLogType log_type; - if (std::is_same::value) { - log_type = MXLogType::InsertVector; - } else if (std::is_same::value) { - log_type = MXLogType::InsertBinary; - } else { - return false; - } - - size_t vector_num = vector_ids.size(); - if (vector_num == 0) { - LOG_WAL_ERROR_ << LogOut("[%s][%ld] The ids is empty.", "insert", 0); - return false; - } - size_t dim = vectors.size() / vector_num; - size_t unit_size = dim * sizeof(T) + sizeof(id_t); - size_t head_size = SizeOfMXLogRecordHeader + collection_id.length() + partition_tag.length(); - - MXLogRecord record; - record.type = log_type; - record.collection_id = collection_id; - record.partition_tag = partition_tag; - - uint64_t new_lsn = 0; - for (size_t i = 0; i < vector_num; i += record.length) { - size_t surplus_space = p_buffer_->SurplusSpace(); - size_t max_rcd_num = 0; - if (surplus_space >= head_size + unit_size) { - max_rcd_num = (surplus_space - head_size) / unit_size; - } else { - max_rcd_num = (mxlog_config_.buffer_size - head_size) / unit_size; - } - if (max_rcd_num == 0) { - LOG_WAL_ERROR_ << LogOut("[%s][%ld]", "insert", 0) << "Wal buffer size is too small " - << mxlog_config_.buffer_size << " unit " << unit_size; - return false; - } - - record.length = std::min(vector_num - i, max_rcd_num); - record.ids = vector_ids.data() + i; - record.data_size = record.length * dim * sizeof(T); - record.data = vectors.data() + i * dim; - - auto error_code = p_buffer_->Append(record); - if (error_code != WAL_SUCCESS) { - p_buffer_->ResetWriteLsn(last_applied_lsn_); - return false; - } - new_lsn = record.lsn; - } - - last_applied_lsn_ = new_lsn; - PartitionUpdated(collection_id, partition_tag, new_lsn); - - LOG_WAL_INFO_ << LogOut("[%s][%ld]", "insert", 0) << collection_id << " insert in part " << partition_tag - << " with lsn " << new_lsn; - - return p_meta_handler_->SetMXLogInternalMeta(new_lsn); -} - -template -bool -WalManager::InsertEntities(const std::string& collection_id, const std::string& partition_tag, - const milvus::engine::IDNumbers& entity_ids, const std::vector& vectors, - const std::unordered_map& attr_nbytes, - const std::unordered_map>& attrs) { - MXLogType log_type; - if (std::is_same::value) { - log_type = MXLogType::InsertVector; - } else if (std::is_same::value) { - log_type = MXLogType::InsertBinary; - } - - size_t entity_num = entity_ids.size(); - if (entity_num == 0) { - LOG_WAL_ERROR_ << LogOut("[%s][%ld] The ids is empty.", "insert", 0); - return false; - } - size_t dim = vectors.size() / entity_num; - - MXLogRecord record; - - size_t attr_unit_size = 0; - auto attr_it = attr_nbytes.begin(); - for (; attr_it != attr_nbytes.end(); attr_it++) { - record.field_names.emplace_back(attr_it->first); - attr_unit_size += attr_it->second; - } - - size_t unit_size = dim * sizeof(T) + sizeof(id_t) + attr_unit_size; - size_t head_size = SizeOfMXLogRecordHeader + collection_id.length() + partition_tag.length(); - - // TODO(yukun): field_name put into MXLogRecord??? - - record.type = log_type; - record.collection_id = collection_id; - record.partition_tag = partition_tag; - record.attr_nbytes = attr_nbytes; - - uint64_t new_lsn = 0; - for (size_t i = 0; i < entity_num; i += record.length) { - size_t surplus_space = p_buffer_->SurplusSpace(); - size_t max_rcd_num = 0; - if (surplus_space >= head_size + unit_size) { - max_rcd_num = (surplus_space - head_size) / unit_size; - } else { - max_rcd_num = (mxlog_config_.buffer_size - head_size) / unit_size; - } - if (max_rcd_num == 0) { - LOG_WAL_ERROR_ << LogOut("[%s][%ld]", "insert", 0) << "Wal buffer size is too small " - << mxlog_config_.buffer_size << " unit " << unit_size; - return false; - } - - size_t length = std::min(entity_num - i, max_rcd_num); - record.length = length; - record.ids = entity_ids.data() + i; - record.data_size = record.length * dim * sizeof(T); - record.data = vectors.data() + i * dim; - - record.attr_data.clear(); - record.attr_data_size.clear(); - for (auto field_name : record.field_names) { - size_t attr_size = length * attr_nbytes.at(field_name); - record.attr_data_size.insert(std::make_pair(field_name, attr_size)); - std::vector attr_data(attr_size, 0); - memcpy(attr_data.data(), attrs.at(field_name).data() + i * attr_nbytes.at(field_name), attr_size); - record.attr_data.insert(std::make_pair(field_name, attr_data)); - } - - auto error_code = p_buffer_->AppendEntity(record); - if (error_code != WAL_SUCCESS) { - p_buffer_->ResetWriteLsn(last_applied_lsn_); - return false; - } - new_lsn = record.lsn; - } - - last_applied_lsn_ = new_lsn; - PartitionUpdated(collection_id, partition_tag, new_lsn); - - LOG_WAL_INFO_ << LogOut("[%s][%ld]", "insert", 0) << collection_id << " insert in part " << partition_tag - << " with lsn " << new_lsn; - - return p_meta_handler_->SetMXLogInternalMeta(new_lsn); -} - -bool -WalManager::DeleteById(const std::string& collection_id, const IDNumbers& entity_ids) { - size_t entity_num = entity_ids.size(); - if (entity_num == 0) { - LOG_WAL_ERROR_ << "The ids is empty."; - return false; - } - - size_t unit_size = sizeof(id_t); - size_t head_size = SizeOfMXLogRecordHeader + collection_id.length(); - - MXLogRecord record; - record.type = MXLogType::Delete; - record.collection_id = collection_id; - record.partition_tag = ""; - - uint64_t new_lsn = 0; - for (size_t i = 0; i < entity_num; i += record.length) { - size_t surplus_space = p_buffer_->SurplusSpace(); - size_t max_rcd_num = 0; - if (surplus_space >= head_size + unit_size) { - max_rcd_num = (surplus_space - head_size) / unit_size; - } else { - max_rcd_num = (mxlog_config_.buffer_size - head_size) / unit_size; - } - - record.length = std::min(entity_num - i, max_rcd_num); - record.ids = entity_ids.data() + i; - record.data_size = 0; - record.data = nullptr; - - auto error_code = p_buffer_->AppendEntity(record); - if (error_code != WAL_SUCCESS) { - p_buffer_->ResetWriteLsn(last_applied_lsn_); - return false; - } - new_lsn = record.lsn; - } - - last_applied_lsn_ = new_lsn; - CollectionUpdated(collection_id, new_lsn); - - LOG_WAL_INFO_ << collection_id << " delete rows by id, lsn " << new_lsn; - - return p_meta_handler_->SetMXLogInternalMeta(new_lsn); -} - -uint64_t -WalManager::Flush(const std::string& collection_id) { - std::lock_guard lck(mutex_); - // At most one flush requirement is waiting at any time. - // Otherwise, flush_info_ should be modified to a list. - __glibcxx_assert(!flush_info_.IsValid()); - - uint64_t lsn = 0; - if (collection_id.empty()) { - // flush all tables - for (auto& col : collections_) { - for (auto& part : col.second) { - if (part.second.wal_lsn > part.second.flush_lsn) { - lsn = last_applied_lsn_; - break; - } - } - } - - } else { - // flush one collection - auto it_col = collections_.find(collection_id); - if (it_col != collections_.end()) { - for (auto& part : it_col->second) { - auto wal_lsn = part.second.wal_lsn; - auto flush_lsn = part.second.flush_lsn; - if (wal_lsn > flush_lsn && wal_lsn > lsn) { - lsn = wal_lsn; - } - } - } - } - - if (lsn != 0) { - flush_info_.collection_id_ = collection_id; - flush_info_.lsn_ = lsn; - } - - LOG_WAL_INFO_ << collection_id << " want to be flush, lsn " << lsn; - - return lsn; -} - -void -WalManager::RemoveOldFiles(uint64_t flushed_lsn) { - if (p_buffer_ != nullptr) { - p_buffer_->RemoveOldFiles(flushed_lsn); - } -} - -template bool -WalManager::Insert(const std::string& collection_id, const std::string& partition_tag, - const IDNumbers& vector_ids, const std::vector& vectors); - -template bool -WalManager::Insert(const std::string& collection_id, const std::string& partition_tag, - const IDNumbers& vector_ids, const std::vector& vectors); - -template bool -WalManager::InsertEntities(const std::string& collection_id, const std::string& partition_tag, - const milvus::engine::IDNumbers& entity_ids, const std::vector& vectors, - const std::unordered_map& attr_nbytes, - const std::unordered_map>& attrs); - -template bool -WalManager::InsertEntities(const std::string& collection_id, const std::string& partition_tag, - const milvus::engine::IDNumbers& entity_ids, const std::vector& vectors, - const std::unordered_map& attr_nbytes, - const std::unordered_map>& attrs); - -} // namespace wal } // namespace engine } // namespace milvus diff --git a/core/src/db/wal/WalManager.h b/core/src/db/wal/WalManager.h index 88138e7884..a1dfe91d15 100644 --- a/core/src/db/wal/WalManager.h +++ b/core/src/db/wal/WalManager.h @@ -11,219 +11,53 @@ #pragma once -#include -#include -#include -#include -#include -#include +#include "db/DB.h" +#include "db/IDGenerator.h" +#include "db/Types.h" +#include "db/wal/WalOperation.h" +#include "utils/Status.h" -#include "WalBuffer.h" -#include "WalDefinations.h" -#include "WalFileHandler.h" -#include "WalMetaHandler.h" -#include "utils/Error.h" +#include +#include namespace milvus { namespace engine { -namespace wal { class WalManager { public: - explicit WalManager(const MXLogConfiguration& config); - ~WalManager(); + WalManager(); - ErrorCode - Init(); - - /* - * Get next recovery - * @param record[out]: record - * @retval error_code - */ - ErrorCode - GetNextRecovery(MXLogRecord& record); - - ErrorCode - GetNextEntityRecovery(MXLogRecord& record); - - /* - * Get next record - * @param record[out]: record - * @retval error_code - */ - ErrorCode - GetNextRecord(MXLogRecord& record); - - ErrorCode - GetNextEntityRecord(MXLogRecord& record); - - /* - * Create collection - * @param collection_id: collection id - * @retval lsn - */ - uint64_t - CreateCollection(const std::string& collection_id); - - /* - * Create partition - * @param collection_id: collection id - * @param partition_tag: partition tag - * @retval lsn - */ - uint64_t - CreatePartition(const std::string& collection_id, const std::string& partition_tag); - - /* - * Create hybrid collection - * @param collection_id: collection id - * @retval lsn - */ - uint64_t - CreateHybridCollection(const std::string& collection_id); - - /* - * Drop collection - * @param collection_id: collection id - * @retval none - */ - void - DropCollection(const std::string& collection_id); - - /* - * Drop partition - * @param collection_id: collection id - * @param partition_tag: partition tag - * @retval none - */ - void - DropPartition(const std::string& collection_id, const std::string& partition_tag); - - /* - * Collection is flushed (update flushed_lsn) - * @param collection_id: collection id - * @param lsn: flushed lsn - */ - void - CollectionFlushed(const std::string& collection_id, uint64_t lsn); - - /* - * Partition is flushed (update flushed_lsn) - * @param collection_id: collection id - * @param partition_tag: partition_tag - * @param lsn: flushed lsn - */ - void - PartitionFlushed(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn); - - /* - * Collection is updated (update wal_lsn) - * @param collection_id: collection id - * @param partition_tag: partition_tag - * @param lsn: flushed lsn - */ - void - CollectionUpdated(const std::string& collection_id, uint64_t lsn); - - /* - * Partition is updated (update wal_lsn) - * @param collection_id: collection id - * @param partition_tag: partition_tag - * @param lsn: flushed lsn - */ - void - PartitionUpdated(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn); - - /* - * Insert - * @param collection_id: collection id - * @param collection_id: partition tag - * @param vector_ids: vector ids - * @param vectors: vectors - */ - template - bool - Insert(const std::string& collection_id, const std::string& partition_tag, const IDNumbers& vector_ids, - const std::vector& vectors); - - /* - * Insert - * @param collection_id: collection id - * @param partition_tag: partition tag - * @param vector_ids: vector ids - * @param vectors: vectors - * @param attrs: attributes - */ - template - bool - InsertEntities(const std::string& collection_id, const std::string& partition_tag, - const milvus::engine::IDNumbers& entity_ids, const std::vector& vectors, - const std::unordered_map& attr_nbytes, - const std::unordered_map>& attrs); - - /* - * Insert - * @param collection_id: collection id - * @param vector_ids: vector ids - */ - bool - DeleteById(const std::string& collection_id, const IDNumbers& vector_ids); - - /* - * Get flush lsn - * @param collection_id: collection id (empty means all tables) - * @retval if there is something not flushed, return lsn; - * else, return 0 - */ - uint64_t - Flush(const std::string& collection_id = ""); + static WalManager& + GetInstance(); void - RemoveOldFiles(uint64_t flushed_lsn); + SetWalPath(const std::string& path) { + wal_path_ = path; + } + + Status + RecordOperation(const WalOperationPtr& operation, const DBPtr& db); + + Status + OperationDone(id_t op_id); private: - WalManager - operator=(WalManager&); + Status + RecordInsertOperation(const InsertEntityOperationPtr& operation, const DBPtr& db); - MXLogConfiguration mxlog_config_; + Status + RecordDeleteOperation(const DeleteEntityOperationPtr& operation, const DBPtr& db); - MXLogBufferPtr p_buffer_; - MXLogMetaHandlerPtr p_meta_handler_; + Status + SplitChunk(const DataChunkPtr& chunk, std::vector& chunks); - struct TableLsn { - uint64_t flush_lsn; - uint64_t wal_lsn; - }; - std::mutex mutex_; - std::map> collections_; - std::atomic last_applied_lsn_; + private: + SafeIDGenerator id_gen_; - // if multi-thread call Flush(), use list - struct FlushInfo { - std::string collection_id_; - uint64_t lsn_ = 0; - - bool - IsValid() { - return (lsn_ != 0); - } - void - Clear() { - lsn_ = 0; - } - }; - FlushInfo flush_info_; + std::string wal_path_; + int64_t wal_buffer_size_ = 0; + int64_t insert_buffer_size_ = 0; }; -extern template bool -WalManager::Insert(const std::string& collection_id, const std::string& partition_tag, - const IDNumbers& vector_ids, const std::vector& vectors); - -extern template bool -WalManager::Insert(const std::string& collection_id, const std::string& partition_tag, - const IDNumbers& vector_ids, const std::vector& vectors); - -} // namespace wal } // namespace engine } // namespace milvus diff --git a/core/src/db/wal/WalMetaHandler.cpp b/core/src/db/wal/WalMetaHandler.cpp deleted file mode 100644 index d6a3483c77..0000000000 --- a/core/src/db/wal/WalMetaHandler.cpp +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#include "db/wal/WalMetaHandler.h" - -#include - -namespace milvus { -namespace engine { -namespace wal { - -const char* WAL_META_FILE_NAME = "mxlog.meta"; - -MXLogMetaHandler::MXLogMetaHandler(const std::string& internal_meta_file_path) { - std::string file_full_path = internal_meta_file_path + WAL_META_FILE_NAME; - - wal_meta_fp_ = fopen(file_full_path.c_str(), "r+"); - if (wal_meta_fp_ == nullptr) { - wal_meta_fp_ = fopen(file_full_path.c_str(), "w"); - - } else { - uint64_t all_wal_lsn[3] = {0, 0, 0}; - auto rt_val = fread(&all_wal_lsn, sizeof(all_wal_lsn), 1, wal_meta_fp_); - if (rt_val == 1) { - if (all_wal_lsn[2] == all_wal_lsn[1]) { - latest_wal_lsn_ = all_wal_lsn[2]; - } else { - latest_wal_lsn_ = all_wal_lsn[0]; - } - } - } -} - -MXLogMetaHandler::~MXLogMetaHandler() { - if (wal_meta_fp_ != nullptr) { - fclose(wal_meta_fp_); - wal_meta_fp_ = nullptr; - } -} - -bool -MXLogMetaHandler::GetMXLogInternalMeta(uint64_t& wal_lsn) { - wal_lsn = latest_wal_lsn_; - return true; -} - -bool -MXLogMetaHandler::SetMXLogInternalMeta(uint64_t wal_lsn) { - if (wal_meta_fp_ != nullptr) { - uint64_t all_wal_lsn[3] = {latest_wal_lsn_, wal_lsn, wal_lsn}; - fseek(wal_meta_fp_, 0, SEEK_SET); - auto rt_val = fwrite(&all_wal_lsn, sizeof(all_wal_lsn), 1, wal_meta_fp_); - if (rt_val == 1) { - fflush(wal_meta_fp_); - latest_wal_lsn_ = wal_lsn; - return true; - } - } - return false; -} - -} // namespace wal -} // namespace engine -} // namespace milvus diff --git a/core/src/db/wal/WalOperation.cpp b/core/src/db/wal/WalOperation.cpp new file mode 100644 index 0000000000..0ea3e9baa5 --- /dev/null +++ b/core/src/db/wal/WalOperation.cpp @@ -0,0 +1,30 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/wal/WalOperation.h" + +namespace milvus { +namespace engine { + +/////////////////////////////////////////////////////////////////////////////////////////////////// +WalOperation::WalOperation(WalOperationType type) : type_(type) { +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +InsertEntityOperation::InsertEntityOperation() : WalOperation(WalOperationType::INSERT_ENTITY) { +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +DeleteEntityOperation::DeleteEntityOperation() : WalOperation(WalOperationType::DELETE_ENTITY) { +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/wal/WalOperation.h b/core/src/db/wal/WalOperation.h new file mode 100644 index 0000000000..f34e5af846 --- /dev/null +++ b/core/src/db/wal/WalOperation.h @@ -0,0 +1,81 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include "db/Types.h" + +#include +#include + +namespace milvus { +namespace engine { + +/////////////////////////////////////////////////////////////////////////////////////////////////// +enum WalOperationType { + INVALID = 0, + INSERT_ENTITY = 1, + DELETE_ENTITY = 2, +}; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +class WalOperation { + public: + explicit WalOperation(WalOperationType type); + + void + SetID(id_t id) { + id_ = id; + } + id_t + ID() const { + return id_; + } + + WalOperationType + Type() const { + return type_; + } + + protected: + id_t id_ = 0; + WalOperationType type_ = WalOperationType::INVALID; +}; + +using WalOperationPtr = std::shared_ptr; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +class InsertEntityOperation : public WalOperation { + public: + InsertEntityOperation(); + + public: + std::string collection_name_; + std::string partition_name; + DataChunkPtr data_chunk_; +}; + +using InsertEntityOperationPtr = std::shared_ptr; + +/////////////////////////////////////////////////////////////////////////////////////////////////// +class DeleteEntityOperation : public WalOperation { + public: + DeleteEntityOperation(); + + public: + std::string collection_name_; + engine::IDNumbers entity_ids_; +}; + +using DeleteEntityOperationPtr = std::shared_ptr; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/wal/WalOperationCodec.cpp b/core/src/db/wal/WalOperationCodec.cpp new file mode 100644 index 0000000000..cd37b47c8e --- /dev/null +++ b/core/src/db/wal/WalOperationCodec.cpp @@ -0,0 +1,28 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/wal/WalOperationCodec.h" + +namespace milvus { +namespace engine { + +Status +WalOperationCodec::SerializeOperation(const std::string& path, const InsertEntityOperationPtr& operation) { + return Status::OK(); +} + +Status +WalOperationCodec::SerializeOperation(const std::string& path, const DeleteEntityOperationPtr& operation) { + return Status::OK(); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/wal/WalMetaHandler.h b/core/src/db/wal/WalOperationCodec.h similarity index 54% rename from core/src/db/wal/WalMetaHandler.h rename to core/src/db/wal/WalOperationCodec.h index 6d1a9d4b9b..a9d6bd5a1c 100644 --- a/core/src/db/wal/WalMetaHandler.h +++ b/core/src/db/wal/WalOperationCodec.h @@ -11,39 +11,22 @@ #pragma once -#include #include -#include -#include "db/Types.h" -#include "db/meta/MetaFactory.h" -#include "db/wal/WalDefinations.h" -#include "db/wal/WalFileHandler.h" +#include "db/wal/WalOperation.h" +#include "utils/Status.h" namespace milvus { namespace engine { -namespace wal { -extern const char* WAL_META_FILE_NAME; - -class MXLogMetaHandler { +class WalOperationCodec { public: - explicit MXLogMetaHandler(const std::string& internal_meta_file_path); - ~MXLogMetaHandler(); + static Status + SerializeOperation(const std::string& path, const InsertEntityOperationPtr& operation); - bool - GetMXLogInternalMeta(uint64_t& wal_lsn); - - bool - SetMXLogInternalMeta(uint64_t wal_lsn); - - private: - FILE* wal_meta_fp_; - uint64_t latest_wal_lsn_ = 0; + static Status + SerializeOperation(const std::string& path, const DeleteEntityOperationPtr& operation); }; -using MXLogMetaHandlerPtr = std::shared_ptr; - -} // namespace wal } // namespace engine } // namespace milvus diff --git a/core/src/db/wal/WalProxy.cpp b/core/src/db/wal/WalProxy.cpp new file mode 100644 index 0000000000..7dda24e846 --- /dev/null +++ b/core/src/db/wal/WalProxy.cpp @@ -0,0 +1,63 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/wal/WalProxy.h" +#include "config/ServerConfig.h" +#include "db/wal/WalManager.h" +#include "db/wal/WalOperation.h" +#include "utils/Exception.h" + +namespace milvus { +namespace engine { + +WalProxy::WalProxy(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) { + // db must implemented + if (db == nullptr) { + throw Exception(DB_ERROR, "null pointer"); + } +} + +Status +WalProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, + id_t op_id) { + // write operation into disk + InsertEntityOperationPtr op = std::make_shared(); + op->collection_name_ = collection_name; + op->partition_name = partition_name; + op->data_chunk_ = data_chunk; + + return WalManager::GetInstance().RecordOperation(op, db_); +} + +Status +WalProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) { + // write operation into disk + DeleteEntityOperationPtr op = std::make_shared(); + op->collection_name_ = collection_name; + op->entity_ids_ = entity_ids; + + return WalManager::GetInstance().RecordOperation(op, db_); +} + +Status +WalProxy::Flush(const std::string& collection_name) { + auto status = db_->Flush(collection_name); + return status; +} + +Status +WalProxy::Flush() { + auto status = db_->Flush(); + return status; +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/wal/WriteAheadLog.h b/core/src/db/wal/WalProxy.h similarity index 82% rename from core/src/db/wal/WriteAheadLog.h rename to core/src/db/wal/WalProxy.h index 1851d44789..6fe9c31a4d 100644 --- a/core/src/db/wal/WriteAheadLog.h +++ b/core/src/db/wal/WalProxy.h @@ -20,15 +20,16 @@ namespace milvus { namespace engine { -class WriteAheadLog : public DBProxy { +class WalProxy : public DBProxy { public: - WriteAheadLog(const DBPtr& db, const DBOptions& options); + WalProxy(const DBPtr& db, const DBOptions& options); Status - Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) override; + Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, + id_t op_id) override; Status - DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override; + DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) override; Status Flush(const std::string& collection_name) override; diff --git a/core/src/db/wal/WriteAheadLog.cpp b/core/src/db/wal/WriteAheadLog.cpp deleted file mode 100644 index a84319dcde..0000000000 --- a/core/src/db/wal/WriteAheadLog.cpp +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#include "db/wal/WriteAheadLog.h" - -#include "utils/Exception.h" - -namespace milvus { -namespace engine { - -WriteAheadLog::WriteAheadLog(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) { - // db must implemented - if (db == nullptr) { - throw Exception(DB_ERROR, "null pointer"); - } -} - -Status -WriteAheadLog::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) { - return db_->Insert(collection_name, partition_name, data_chunk); -} - -Status -WriteAheadLog::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) { - return db_->DeleteEntityByID(collection_name, entity_ids); -} - -Status -WriteAheadLog::Flush(const std::string& collection_name) { - return db_->Flush(collection_name); -} - -Status -WriteAheadLog::Flush() { - return db_->Flush(); -} - -} // namespace engine -} // namespace milvus