prepare for wal (#3297)

* prepare for wal

Signed-off-by: groot <yihua.mo@zilliz.com>

* prepare for wal

Signed-off-by: groot <yihua.mo@zilliz.com>
pull/3282/head
groot 2020-08-18 14:07:35 +08:00 committed by GitHub
parent 2f6920f840
commit aa2fe7263d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 376 additions and 2130 deletions

View File

@ -91,16 +91,19 @@ class DB {
virtual Status
DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) = 0;
// op_id is for wal machinery, this id will be used in MemManager
virtual Status
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) = 0;
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
id_t op_id = 0) = 0;
virtual Status
GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
DataChunkPtr& data_chunk) = 0;
// op_id is for wal machinery, this id will be used in MemManager
virtual Status
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) = 0;
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id = 0) = 0;
virtual Status
ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) = 0;

View File

@ -11,8 +11,8 @@
#include "db/DBFactory.h"
#include "db/DBImpl.h"
#include "db/transcript/Transcript.h"
#include "db/wal/WriteAheadLog.h"
#include "db/transcript/TranscriptProxy.h"
#include "db/wal/WalProxy.h"
namespace milvus {
namespace engine {
@ -23,12 +23,12 @@ DBFactory::BuildDB(const DBOptions& options) {
// need wal? wal must be after db
if (options.wal_enable_) {
db = std::make_shared<WriteAheadLog>(db, options);
db = std::make_shared<WalProxy>(db, options);
}
// need transcript? transcript must be after wal
if (options.transcript_enable_) {
db = std::make_shared<Transcript>(db, options);
db = std::make_shared<TranscriptProxy>(db, options);
}
return db;

View File

@ -412,7 +412,8 @@ DBImpl::DescribeIndex(const std::string& collection_name, const std::string& fie
}
Status
DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) {
DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
id_t op_id) {
CHECK_INITIALIZED;
if (data_chunk == nullptr) {
@ -473,7 +474,7 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_
int64_t collection_id = ss->GetCollectionId();
int64_t partition_id = part->GetID();
auto status = mem_mgr_->InsertEntities(collection_id, partition_id, data_chunk, 0);
auto status = mem_mgr_->InsertEntities(collection_id, partition_id, data_chunk, op_id);
if (!status.ok()) {
return status;
}
@ -509,7 +510,7 @@ DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_ar
}
Status
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) {
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) {
CHECK_INITIALIZED;
snapshot::ScopedSnapshotT ss;
@ -519,7 +520,7 @@ DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNum
return status;
}
status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), entity_ids, 0);
status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), entity_ids, op_id);
return status;
}

View File

@ -86,7 +86,8 @@ class DBImpl : public DB, public ConfigObserver {
DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) override;
Status
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) override;
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
id_t op_id) override;
Status
GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
@ -94,7 +95,7 @@ class DBImpl : public DB, public ConfigObserver {
DataChunkPtr& data_chunk) override;
Status
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override;
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) override;
Status
Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) override;

View File

@ -121,9 +121,10 @@ DBProxy::DescribeIndex(const std::string& collection_name, const std::string& fi
}
Status
DBProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) {
DBProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
id_t op_id) {
DB_CHECK
return db_->Insert(collection_name, partition_name, data_chunk);
return db_->Insert(collection_name, partition_name, data_chunk, op_id);
}
Status
@ -135,9 +136,9 @@ DBProxy::GetEntityByID(const std::string& collection_name, const IDNumbers& id_a
}
Status
DBProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) {
DBProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) {
DB_CHECK
return db_->DeleteEntityByID(collection_name, entity_ids);
return db_->DeleteEntityByID(collection_name, entity_ids, op_id);
}
Status

View File

@ -75,7 +75,8 @@ class DBProxy : public DB {
DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) override;
Status
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) override;
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
id_t op_id) override;
Status
GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
@ -83,7 +84,7 @@ class DBProxy : public DB {
DataChunkPtr& data_chunk) override;
Status
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override;
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) override;
Status
ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) override;

View File

@ -57,6 +57,7 @@ class SafeIDGenerator : public IDGenerator {
return instance;
}
SafeIDGenerator() = default;
~SafeIDGenerator() override = default;
id_t
@ -66,8 +67,6 @@ class SafeIDGenerator : public IDGenerator {
GetNextIDNumbers(size_t n, IDNumbers& ids) override;
private:
SafeIDGenerator() = default;
Status
NextIDNumbers(size_t n, IDNumbers& ids);

View File

@ -146,6 +146,29 @@ GetIDFromChunk(const engine::DataChunkPtr& chunk, engine::IDNumbers& ids) {
}
}
int64_t
GetSizeOfChunk(const engine::DataChunkPtr& chunk) {
if (chunk == nullptr) {
return 0;
}
int64_t total_size = 0;
for (auto& pair : chunk->fixed_fields_) {
if (pair.second == nullptr) {
continue;
}
total_size += pair.second->Size();
}
for (auto& pair : chunk->variable_fields_) {
if (pair.second == nullptr) {
continue;
}
total_size += pair.second->Size();
}
return total_size;
}
} // namespace utils
} // namespace engine
} // namespace milvus

View File

@ -55,6 +55,9 @@ SendExitSignal();
void
GetIDFromChunk(const engine::DataChunkPtr& chunk, engine::IDNumbers& ids);
int64_t
GetSizeOfChunk(const engine::DataChunkPtr& chunk);
} // namespace utils
} // namespace engine
} // namespace milvus

View File

@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/transcript/Transcript.h"
#include "db/transcript/TranscriptProxy.h"
#include "db/transcript/ScriptCodec.h"
#include "db/transcript/ScriptReplay.h"
#include "utils/CommonUtil.h"
@ -20,7 +20,7 @@
namespace milvus {
namespace engine {
Transcript::Transcript(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) {
TranscriptProxy::TranscriptProxy(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) {
// db must implemented
if (db == nullptr) {
throw Exception(DB_ERROR, "null pointer");
@ -28,7 +28,7 @@ Transcript::Transcript(const DBPtr& db, const DBOptions& options) : DBProxy(db,
}
Status
Transcript::Start() {
TranscriptProxy::Start() {
// let service start
auto status = db_->Start();
if (!status.ok()) {
@ -58,127 +58,130 @@ Transcript::Start() {
}
Status
Transcript::Stop() {
TranscriptProxy::Stop() {
return db_->Stop();
}
Status
Transcript::CreateCollection(const snapshot::CreateCollectionContext& context) {
TranscriptProxy::CreateCollection(const snapshot::CreateCollectionContext& context) {
return db_->CreateCollection(context);
}
Status
Transcript::DropCollection(const std::string& name) {
TranscriptProxy::DropCollection(const std::string& name) {
return db_->DropCollection(name);
}
Status
Transcript::HasCollection(const std::string& collection_name, bool& has_or_not) {
TranscriptProxy::HasCollection(const std::string& collection_name, bool& has_or_not) {
return db_->HasCollection(collection_name, has_or_not);
}
Status
Transcript::ListCollections(std::vector<std::string>& names) {
TranscriptProxy::ListCollections(std::vector<std::string>& names) {
return db_->ListCollections(names);
}
Status
Transcript::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
snapshot::FieldElementMappings& fields_schema) {
TranscriptProxy::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
snapshot::FieldElementMappings& fields_schema) {
return db_->GetCollectionInfo(collection_name, collection, fields_schema);
}
Status
Transcript::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) {
TranscriptProxy::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) {
return db_->GetCollectionStats(collection_name, collection_stats);
}
Status
Transcript::CountEntities(const std::string& collection_name, int64_t& row_count) {
TranscriptProxy::CountEntities(const std::string& collection_name, int64_t& row_count) {
return db_->CountEntities(collection_name, row_count);
}
Status
Transcript::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
TranscriptProxy::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
return db_->CreatePartition(collection_name, partition_name);
}
Status
Transcript::DropPartition(const std::string& collection_name, const std::string& partition_name) {
TranscriptProxy::DropPartition(const std::string& collection_name, const std::string& partition_name) {
return db_->DropPartition(collection_name, partition_name);
}
Status
Transcript::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) {
TranscriptProxy::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) {
return db_->HasPartition(collection_name, partition_tag, exist);
}
Status
Transcript::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
TranscriptProxy::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
return db_->ListPartitions(collection_name, partition_names);
}
Status
Transcript::CreateIndex(const server::ContextPtr& context, const std::string& collection_name,
const std::string& field_name, const CollectionIndex& index) {
TranscriptProxy::CreateIndex(const server::ContextPtr& context, const std::string& collection_name,
const std::string& field_name, const CollectionIndex& index) {
return db_->CreateIndex(context, collection_name, field_name, index);
}
Status
Transcript::DropIndex(const std::string& collection_name, const std::string& field_name) {
TranscriptProxy::DropIndex(const std::string& collection_name, const std::string& field_name) {
return db_->DropIndex(collection_name, field_name);
}
Status
Transcript::DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) {
TranscriptProxy::DescribeIndex(const std::string& collection_name, const std::string& field_name,
CollectionIndex& index) {
return db_->DescribeIndex(collection_name, field_name, index);
}
Status
Transcript::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) {
TranscriptProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
id_t op_id) {
return db_->Insert(collection_name, partition_name, data_chunk);
}
Status
Transcript::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
DataChunkPtr& data_chunk) {
TranscriptProxy::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
DataChunkPtr& data_chunk) {
return db_->GetEntityByID(collection_name, id_array, field_names, valid_row, data_chunk);
}
Status
Transcript::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) {
TranscriptProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) {
return db_->DeleteEntityByID(collection_name, entity_ids);
}
Status
Transcript::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
TranscriptProxy::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
return db_->ListIDInSegment(collection_name, segment_id, entity_ids);
}
Status
Transcript::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) {
TranscriptProxy::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr,
engine::QueryResultPtr& result) {
return db_->Query(context, query_ptr, result);
}
Status
Transcript::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force) {
TranscriptProxy::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force) {
return db_->LoadCollection(context, collection_name, field_names, force);
}
Status
Transcript::Flush(const std::string& collection_name) {
TranscriptProxy::Flush(const std::string& collection_name) {
return db_->Flush(collection_name);
}
Status
Transcript::Flush() {
TranscriptProxy::Flush() {
return db_->Flush();
}
Status
Transcript::Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) {
TranscriptProxy::Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) {
return db_->Compact(context, collection_name, threshold);
}

View File

@ -20,9 +20,9 @@
namespace milvus {
namespace engine {
class Transcript : public DBProxy {
class TranscriptProxy : public DBProxy {
public:
Transcript(const DBPtr& db, const DBOptions& options);
TranscriptProxy(const DBPtr& db, const DBOptions& options);
Status
Start() override;
@ -75,7 +75,8 @@ class Transcript : public DBProxy {
DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) override;
Status
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) override;
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
id_t op_id) override;
Status
GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
@ -83,7 +84,7 @@ class Transcript : public DBProxy {
DataChunkPtr& data_chunk) override;
Status
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override;
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) override;
Status
ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) override;

View File

@ -1,679 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/wal/WalBuffer.h"
#include <cstring>
#include <utility>
#include <vector>
#include "db/wal/WalDefinations.h"
#include "utils/Log.h"
namespace milvus {
namespace engine {
namespace wal {
inline std::string
ToFileName(int32_t file_no) {
return std::to_string(file_no) + ".wal";
}
inline void
BuildLsn(uint32_t file_no, uint32_t offset, uint64_t& lsn) {
lsn = (uint64_t)file_no << 32 | offset;
}
inline void
ParserLsn(uint64_t lsn, uint32_t& file_no, uint32_t& offset) {
file_no = uint32_t(lsn >> 32);
offset = uint32_t(lsn & LSN_OFFSET_MASK);
}
MXLogBuffer::MXLogBuffer(const std::string& mxlog_path, const uint32_t buffer_size)
: mxlog_buffer_size_(buffer_size * UNIT_MB), mxlog_writer_(mxlog_path) {
}
MXLogBuffer::~MXLogBuffer() {
}
/**
* alloc space for buffers
* @param buffer_size
* @return
*/
bool
MXLogBuffer::Init(uint64_t start_lsn, uint64_t end_lsn) {
LOG_WAL_DEBUG_ << "start_lsn " << start_lsn << " end_lsn " << end_lsn;
ParserLsn(start_lsn, mxlog_buffer_reader_.file_no, mxlog_buffer_reader_.buf_offset);
ParserLsn(end_lsn, mxlog_buffer_writer_.file_no, mxlog_buffer_writer_.buf_offset);
if (start_lsn == end_lsn) {
// no data need recovery, start a new file_no
if (mxlog_buffer_writer_.buf_offset != 0) {
mxlog_buffer_writer_.file_no++;
mxlog_buffer_writer_.buf_offset = 0;
mxlog_buffer_reader_.file_no++;
mxlog_buffer_reader_.buf_offset = 0;
}
} else {
// to check whether buffer_size is enough
MXLogFileHandler file_handler(mxlog_writer_.GetFilePath());
uint32_t buffer_size_need = 0;
for (auto i = mxlog_buffer_reader_.file_no; i < mxlog_buffer_writer_.file_no; i++) {
file_handler.SetFileName(ToFileName(i));
auto file_size = file_handler.GetFileSize();
if (file_size == 0) {
LOG_WAL_ERROR_ << "bad wal file " << i;
return false;
}
if (file_size > buffer_size_need) {
buffer_size_need = file_size;
}
}
if (mxlog_buffer_writer_.buf_offset > buffer_size_need) {
buffer_size_need = mxlog_buffer_writer_.buf_offset;
}
if (buffer_size_need > mxlog_buffer_size_) {
mxlog_buffer_size_ = buffer_size_need;
LOG_WAL_INFO_ << "recovery will need more buffer, buffer size changed " << mxlog_buffer_size_;
}
}
buf_[0] = BufferPtr(new char[mxlog_buffer_size_]);
buf_[1] = BufferPtr(new char[mxlog_buffer_size_]);
if (mxlog_buffer_reader_.file_no == mxlog_buffer_writer_.file_no) {
// read-write buffer
mxlog_buffer_reader_.buf_idx = 0;
mxlog_buffer_writer_.buf_idx = 0;
mxlog_writer_.SetFileName(ToFileName(mxlog_buffer_writer_.file_no));
if (mxlog_buffer_writer_.buf_offset == 0) {
mxlog_writer_.SetFileOpenMode("w");
} else {
mxlog_writer_.SetFileOpenMode("r+");
if (!mxlog_writer_.FileExists()) {
LOG_WAL_ERROR_ << "wal file not exist " << mxlog_buffer_writer_.file_no;
return false;
}
auto read_offset = mxlog_buffer_reader_.buf_offset;
auto read_size = mxlog_buffer_writer_.buf_offset - mxlog_buffer_reader_.buf_offset;
if (!mxlog_writer_.Load(buf_[0].get() + read_offset, read_offset, read_size)) {
LOG_WAL_ERROR_ << "load wal file error " << read_offset << " " << read_size;
return false;
}
}
} else {
// read buffer
mxlog_buffer_reader_.buf_idx = 0;
MXLogFileHandler file_handler(mxlog_writer_.GetFilePath());
file_handler.SetFileName(ToFileName(mxlog_buffer_reader_.file_no));
file_handler.SetFileOpenMode("r");
auto read_offset = mxlog_buffer_reader_.buf_offset;
auto read_size = file_handler.Load(buf_[0].get() + read_offset, read_offset);
mxlog_buffer_reader_.max_offset = read_size + read_offset;
file_handler.CloseFile();
// write buffer
mxlog_buffer_writer_.buf_idx = 1;
mxlog_writer_.SetFileName(ToFileName(mxlog_buffer_writer_.file_no));
mxlog_writer_.SetFileOpenMode("r+");
if (!mxlog_writer_.FileExists()) {
LOG_WAL_ERROR_ << "wal file not exist " << mxlog_buffer_writer_.file_no;
return false;
}
if (!mxlog_writer_.Load(buf_[1].get(), 0, mxlog_buffer_writer_.buf_offset)) {
LOG_WAL_ERROR_ << "load wal file error " << mxlog_buffer_writer_.file_no;
return false;
}
}
SetFileNoFrom(mxlog_buffer_reader_.file_no);
return true;
}
void
MXLogBuffer::Reset(uint64_t lsn) {
LOG_WAL_DEBUG_ << "reset lsn " << lsn;
buf_[0] = BufferPtr(new char[mxlog_buffer_size_]);
buf_[1] = BufferPtr(new char[mxlog_buffer_size_]);
ParserLsn(lsn, mxlog_buffer_writer_.file_no, mxlog_buffer_writer_.buf_offset);
if (mxlog_buffer_writer_.buf_offset != 0) {
mxlog_buffer_writer_.file_no++;
mxlog_buffer_writer_.buf_offset = 0;
}
mxlog_buffer_writer_.buf_idx = 0;
memcpy(&mxlog_buffer_reader_, &mxlog_buffer_writer_, sizeof(MXLogBufferHandler));
mxlog_writer_.CloseFile();
mxlog_writer_.SetFileName(ToFileName(mxlog_buffer_writer_.file_no));
mxlog_writer_.SetFileOpenMode("w");
SetFileNoFrom(mxlog_buffer_reader_.file_no);
}
uint32_t
MXLogBuffer::GetBufferSize() {
return mxlog_buffer_size_;
}
// buffer writer cares about surplus space of buffer
uint32_t
MXLogBuffer::SurplusSpace() {
return mxlog_buffer_size_ - mxlog_buffer_writer_.buf_offset;
}
uint32_t
MXLogBuffer::RecordSize(const MXLogRecord& record) {
return SizeOfMXLogRecordHeader + (uint32_t)record.collection_id.size() + (uint32_t)record.partition_tag.size() +
record.length * (uint32_t)sizeof(id_t) + record.data_size;
}
uint32_t
MXLogBuffer::EntityRecordSize(const milvus::engine::wal::MXLogRecord& record, uint32_t attr_num,
std::vector<uint32_t>& field_name_size) {
uint32_t attr_header_size = 0;
attr_header_size += sizeof(uint32_t);
attr_header_size += attr_num * sizeof(uint64_t) * 3;
uint32_t name_sizes = 0;
for (auto field_name : record.field_names) {
field_name_size.emplace_back(field_name.size());
name_sizes += field_name.size();
}
uint64_t attr_size = 0;
auto attr_it = record.attr_data_size.begin();
for (; attr_it != record.attr_data_size.end(); attr_it++) {
attr_size += attr_it->second;
}
return RecordSize(record) + name_sizes + attr_size + attr_header_size;
}
ErrorCode
MXLogBuffer::Append(MXLogRecord& record) {
uint32_t record_size = RecordSize(record);
if (SurplusSpace() < record_size) {
// writer buffer has no space, switch wal file and write to a new buffer
std::unique_lock<std::mutex> lck(mutex_);
if (mxlog_buffer_writer_.buf_idx == mxlog_buffer_reader_.buf_idx) {
// swith writer buffer
mxlog_buffer_reader_.max_offset = mxlog_buffer_writer_.buf_offset;
mxlog_buffer_writer_.buf_idx ^= 1;
}
mxlog_buffer_writer_.file_no++;
mxlog_buffer_writer_.buf_offset = 0;
lck.unlock();
// Reborn means close old wal file and open new wal file
if (!mxlog_writer_.ReBorn(ToFileName(mxlog_buffer_writer_.file_no), "w")) {
LOG_WAL_ERROR_ << "ReBorn wal file error " << mxlog_buffer_writer_.file_no;
return WAL_FILE_ERROR;
}
}
// point to the offset of current record in wal file
char* current_write_buf = buf_[mxlog_buffer_writer_.buf_idx].get();
uint32_t current_write_offset = mxlog_buffer_writer_.buf_offset;
MXLogRecordHeader head;
BuildLsn(mxlog_buffer_writer_.file_no, mxlog_buffer_writer_.buf_offset + (uint32_t)record_size, head.mxl_lsn);
head.mxl_type = (uint8_t)record.type;
head.collection_id_size = (uint16_t)record.collection_id.size();
head.partition_tag_size = (uint16_t)record.partition_tag.size();
head.vector_num = record.length;
head.data_size = record.data_size;
memcpy(current_write_buf + current_write_offset, &head, SizeOfMXLogRecordHeader);
current_write_offset += SizeOfMXLogRecordHeader;
if (!record.collection_id.empty()) {
memcpy(current_write_buf + current_write_offset, record.collection_id.data(), record.collection_id.size());
current_write_offset += record.collection_id.size();
}
if (!record.partition_tag.empty()) {
memcpy(current_write_buf + current_write_offset, record.partition_tag.data(), record.partition_tag.size());
current_write_offset += record.partition_tag.size();
}
if (record.ids != nullptr && record.length > 0) {
memcpy(current_write_buf + current_write_offset, record.ids, record.length * sizeof(id_t));
current_write_offset += record.length * sizeof(id_t);
}
if (record.data != nullptr && record.data_size > 0) {
memcpy(current_write_buf + current_write_offset, record.data, record.data_size);
current_write_offset += record.data_size;
}
bool write_rst = mxlog_writer_.Write(current_write_buf + mxlog_buffer_writer_.buf_offset, record_size);
if (!write_rst) {
LOG_WAL_ERROR_ << "write wal file error";
return WAL_FILE_ERROR;
}
mxlog_buffer_writer_.buf_offset = current_write_offset;
record.lsn = head.mxl_lsn;
return WAL_SUCCESS;
}
ErrorCode
MXLogBuffer::AppendEntity(milvus::engine::wal::MXLogRecord& record) {
std::vector<uint32_t> field_name_size;
MXLogAttrRecordHeader attr_header;
attr_header.attr_num = 0;
for (auto name : record.field_names) {
attr_header.attr_num++;
attr_header.field_name_size.emplace_back(name.size());
attr_header.attr_size.emplace_back(record.attr_data_size.at(name));
attr_header.attr_nbytes.emplace_back(record.attr_nbytes.at(name));
}
uint32_t record_size = EntityRecordSize(record, attr_header.attr_num, field_name_size);
if (SurplusSpace() < record_size) {
// writer buffer has no space, switch wal file and write to a new buffer
std::unique_lock<std::mutex> lck(mutex_);
if (mxlog_buffer_writer_.buf_idx == mxlog_buffer_reader_.buf_idx) {
// swith writer buffer
mxlog_buffer_reader_.max_offset = mxlog_buffer_writer_.buf_offset;
mxlog_buffer_writer_.buf_idx ^= 1;
}
mxlog_buffer_writer_.file_no++;
mxlog_buffer_writer_.buf_offset = 0;
lck.unlock();
// Reborn means close old wal file and open new wal file
if (!mxlog_writer_.ReBorn(ToFileName(mxlog_buffer_writer_.file_no), "w")) {
LOG_WAL_ERROR_ << "ReBorn wal file error " << mxlog_buffer_writer_.file_no;
return WAL_FILE_ERROR;
}
}
// point to the offset of current record in wal file
char* current_write_buf = buf_[mxlog_buffer_writer_.buf_idx].get();
uint32_t current_write_offset = mxlog_buffer_writer_.buf_offset;
MXLogRecordHeader head;
BuildLsn(mxlog_buffer_writer_.file_no, mxlog_buffer_writer_.buf_offset + (uint32_t)record_size, head.mxl_lsn);
head.mxl_type = (uint8_t)record.type;
head.collection_id_size = (uint16_t)record.collection_id.size();
head.partition_tag_size = (uint16_t)record.partition_tag.size();
head.vector_num = record.length;
head.data_size = record.data_size;
memcpy(current_write_buf + current_write_offset, &head, SizeOfMXLogRecordHeader);
current_write_offset += SizeOfMXLogRecordHeader;
memcpy(current_write_buf + current_write_offset, &attr_header.attr_num, sizeof(int32_t));
current_write_offset += sizeof(int32_t);
memcpy(current_write_buf + current_write_offset, attr_header.field_name_size.data(),
sizeof(int64_t) * attr_header.attr_num);
current_write_offset += sizeof(int64_t) * attr_header.attr_num;
memcpy(current_write_buf + current_write_offset, attr_header.attr_size.data(),
sizeof(int64_t) * attr_header.attr_num);
current_write_offset += sizeof(int64_t) * attr_header.attr_num;
memcpy(current_write_buf + current_write_offset, attr_header.attr_nbytes.data(),
sizeof(int64_t) * attr_header.attr_num);
current_write_offset += sizeof(int64_t) * attr_header.attr_num;
if (!record.collection_id.empty()) {
memcpy(current_write_buf + current_write_offset, record.collection_id.data(), record.collection_id.size());
current_write_offset += record.collection_id.size();
}
if (!record.partition_tag.empty()) {
memcpy(current_write_buf + current_write_offset, record.partition_tag.data(), record.partition_tag.size());
current_write_offset += record.partition_tag.size();
}
if (record.ids != nullptr && record.length > 0) {
memcpy(current_write_buf + current_write_offset, record.ids, record.length * sizeof(id_t));
current_write_offset += record.length * sizeof(id_t);
}
if (record.data != nullptr && record.data_size > 0) {
memcpy(current_write_buf + current_write_offset, record.data, record.data_size);
current_write_offset += record.data_size;
}
// Assign attr names
for (auto name : record.field_names) {
if (name.size() > 0) {
memcpy(current_write_buf + current_write_offset, name.data(), name.size());
current_write_offset += name.size();
}
}
// Assign attr values
for (auto name : record.field_names) {
if (record.attr_data_size.at(name) != 0) {
memcpy(current_write_buf + current_write_offset, record.attr_data.at(name).data(),
record.attr_data_size.at(name));
current_write_offset += record.attr_data_size.at(name);
}
}
bool write_rst = mxlog_writer_.Write(current_write_buf + mxlog_buffer_writer_.buf_offset, record_size);
if (!write_rst) {
LOG_WAL_ERROR_ << "write wal file error";
return WAL_FILE_ERROR;
}
mxlog_buffer_writer_.buf_offset = current_write_offset;
record.lsn = head.mxl_lsn;
return WAL_SUCCESS;
}
ErrorCode
MXLogBuffer::Next(const uint64_t last_applied_lsn, MXLogRecord& record) {
// init output
record.type = MXLogType::None;
// reader catch up to writer, no next record, read fail
if (GetReadLsn() >= last_applied_lsn) {
return WAL_SUCCESS;
}
// otherwise, it means there must exists next record, in buffer or wal log
bool need_load_new = false;
std::unique_lock<std::mutex> lck(mutex_);
if (mxlog_buffer_reader_.file_no != mxlog_buffer_writer_.file_no) {
if (mxlog_buffer_reader_.buf_offset == mxlog_buffer_reader_.max_offset) { // last record
mxlog_buffer_reader_.file_no++;
mxlog_buffer_reader_.buf_offset = 0;
need_load_new = (mxlog_buffer_reader_.file_no != mxlog_buffer_writer_.file_no);
if (!need_load_new) {
// read reach write buffer
mxlog_buffer_reader_.buf_idx = mxlog_buffer_writer_.buf_idx;
}
}
}
lck.unlock();
if (need_load_new) {
MXLogFileHandler mxlog_reader(mxlog_writer_.GetFilePath());
mxlog_reader.SetFileName(ToFileName(mxlog_buffer_reader_.file_no));
mxlog_reader.SetFileOpenMode("r");
uint32_t file_size = mxlog_reader.Load(buf_[mxlog_buffer_reader_.buf_idx].get(), 0);
if (file_size == 0) {
LOG_WAL_ERROR_ << "load wal file error " << mxlog_buffer_reader_.file_no;
return WAL_FILE_ERROR;
}
mxlog_buffer_reader_.max_offset = file_size;
}
char* current_read_buf = buf_[mxlog_buffer_reader_.buf_idx].get();
uint64_t current_read_offset = mxlog_buffer_reader_.buf_offset;
MXLogRecordHeader* head = (MXLogRecordHeader*)(current_read_buf + current_read_offset);
record.type = (MXLogType)head->mxl_type;
record.lsn = head->mxl_lsn;
record.length = head->vector_num;
record.data_size = head->data_size;
current_read_offset += SizeOfMXLogRecordHeader;
if (head->collection_id_size != 0) {
record.collection_id.assign(current_read_buf + current_read_offset, head->collection_id_size);
current_read_offset += head->collection_id_size;
} else {
record.collection_id = "";
}
if (head->partition_tag_size != 0) {
record.partition_tag.assign(current_read_buf + current_read_offset, head->partition_tag_size);
current_read_offset += head->partition_tag_size;
} else {
record.partition_tag = "";
}
if (head->vector_num != 0) {
record.ids = (id_t*)(current_read_buf + current_read_offset);
current_read_offset += head->vector_num * sizeof(id_t);
} else {
record.ids = nullptr;
}
if (record.data_size != 0) {
record.data = current_read_buf + current_read_offset;
} else {
record.data = nullptr;
}
mxlog_buffer_reader_.buf_offset = uint32_t(head->mxl_lsn & LSN_OFFSET_MASK);
return WAL_SUCCESS;
}
ErrorCode
MXLogBuffer::NextEntity(const uint64_t last_applied_lsn, milvus::engine::wal::MXLogRecord& record) {
// init output
record.type = MXLogType::None;
// reader catch up to writer, no next record, read fail
if (GetReadLsn() >= last_applied_lsn) {
return WAL_SUCCESS;
}
// otherwise, it means there must exists next record, in buffer or wal log
bool need_load_new = false;
std::unique_lock<std::mutex> lck(mutex_);
if (mxlog_buffer_reader_.file_no != mxlog_buffer_writer_.file_no) {
if (mxlog_buffer_reader_.buf_offset == mxlog_buffer_reader_.max_offset) { // last record
mxlog_buffer_reader_.file_no++;
mxlog_buffer_reader_.buf_offset = 0;
need_load_new = (mxlog_buffer_reader_.file_no != mxlog_buffer_writer_.file_no);
if (!need_load_new) {
// read reach write buffer
mxlog_buffer_reader_.buf_idx = mxlog_buffer_writer_.buf_idx;
}
}
}
lck.unlock();
if (need_load_new) {
MXLogFileHandler mxlog_reader(mxlog_writer_.GetFilePath());
mxlog_reader.SetFileName(ToFileName(mxlog_buffer_reader_.file_no));
mxlog_reader.SetFileOpenMode("r");
uint32_t file_size = mxlog_reader.Load(buf_[mxlog_buffer_reader_.buf_idx].get(), 0);
if (file_size == 0) {
LOG_WAL_ERROR_ << "load wal file error " << mxlog_buffer_reader_.file_no;
return WAL_FILE_ERROR;
}
mxlog_buffer_reader_.max_offset = file_size;
}
char* current_read_buf = buf_[mxlog_buffer_reader_.buf_idx].get();
uint64_t current_read_offset = mxlog_buffer_reader_.buf_offset;
MXLogRecordHeader* head = (MXLogRecordHeader*)(current_read_buf + current_read_offset);
record.type = (MXLogType)head->mxl_type;
record.lsn = head->mxl_lsn;
record.length = head->vector_num;
record.data_size = head->data_size;
current_read_offset += SizeOfMXLogRecordHeader;
MXLogAttrRecordHeader attr_head;
memcpy(&attr_head.attr_num, current_read_buf + current_read_offset, sizeof(uint32_t));
current_read_offset += sizeof(uint32_t);
attr_head.attr_size.resize(attr_head.attr_num);
attr_head.field_name_size.resize(attr_head.attr_num);
attr_head.attr_nbytes.resize(attr_head.attr_num);
memcpy(attr_head.field_name_size.data(), current_read_buf + current_read_offset,
sizeof(uint64_t) * attr_head.attr_num);
current_read_offset += sizeof(uint64_t) * attr_head.attr_num;
memcpy(attr_head.attr_size.data(), current_read_buf + current_read_offset, sizeof(uint64_t) * attr_head.attr_num);
current_read_offset += sizeof(uint64_t) * attr_head.attr_num;
memcpy(attr_head.attr_nbytes.data(), current_read_buf + current_read_offset, sizeof(uint64_t) * attr_head.attr_num);
current_read_offset += sizeof(uint64_t) * attr_head.attr_num;
if (head->collection_id_size != 0) {
record.collection_id.assign(current_read_buf + current_read_offset, head->collection_id_size);
current_read_offset += head->collection_id_size;
} else {
record.collection_id = "";
}
if (head->partition_tag_size != 0) {
record.partition_tag.assign(current_read_buf + current_read_offset, head->partition_tag_size);
current_read_offset += head->partition_tag_size;
} else {
record.partition_tag = "";
}
if (head->vector_num != 0) {
record.ids = (id_t*)(current_read_buf + current_read_offset);
current_read_offset += head->vector_num * sizeof(id_t);
} else {
record.ids = nullptr;
}
if (record.data_size != 0) {
record.data = current_read_buf + current_read_offset;
current_read_offset += record.data_size;
} else {
record.data = nullptr;
}
// Read field names
auto attr_num = attr_head.attr_num;
record.field_names.clear();
if (attr_num > 0) {
for (auto size : attr_head.field_name_size) {
if (size != 0) {
std::string name;
name.assign(current_read_buf + current_read_offset, size);
record.field_names.emplace_back(name);
current_read_offset += size;
} else {
record.field_names.emplace_back("");
}
}
}
// Read attributes data
record.attr_data.clear();
record.attr_data_size.clear();
record.attr_nbytes.clear();
if (attr_num > 0) {
for (uint64_t i = 0; i < attr_num; ++i) {
auto attr_size = attr_head.attr_size[i];
record.attr_data_size.insert(std::make_pair(record.field_names[i], attr_size));
record.attr_nbytes.insert(std::make_pair(record.field_names[i], attr_head.attr_nbytes[i]));
std::vector<uint8_t> data(attr_size);
memcpy(data.data(), current_read_buf + current_read_offset, attr_size);
record.attr_data.insert(std::make_pair(record.field_names[i], data));
current_read_offset += attr_size;
}
}
mxlog_buffer_reader_.buf_offset = uint32_t(head->mxl_lsn & LSN_OFFSET_MASK);
return WAL_SUCCESS;
}
uint64_t
MXLogBuffer::GetReadLsn() {
uint64_t read_lsn;
BuildLsn(mxlog_buffer_reader_.file_no, mxlog_buffer_reader_.buf_offset, read_lsn);
return read_lsn;
}
bool
MXLogBuffer::ResetWriteLsn(uint64_t lsn) {
LOG_WAL_INFO_ << "reset write lsn " << lsn;
uint32_t old_file_no = mxlog_buffer_writer_.file_no;
ParserLsn(lsn, mxlog_buffer_writer_.file_no, mxlog_buffer_writer_.buf_offset);
if (old_file_no == mxlog_buffer_writer_.file_no) {
LOG_WAL_DEBUG_ << "file No. is not changed";
return true;
}
std::unique_lock<std::mutex> lck(mutex_);
if (mxlog_buffer_writer_.file_no == mxlog_buffer_reader_.file_no) {
mxlog_buffer_writer_.buf_idx = mxlog_buffer_reader_.buf_idx;
LOG_WAL_DEBUG_ << "file No. is the same as reader";
return true;
}
lck.unlock();
if (!mxlog_writer_.ReBorn(ToFileName(mxlog_buffer_writer_.file_no), "r+")) {
LOG_WAL_ERROR_ << "reborn file error " << mxlog_buffer_writer_.file_no;
return false;
}
if (!mxlog_writer_.Load(buf_[mxlog_buffer_writer_.buf_idx].get(), 0, mxlog_buffer_writer_.buf_offset)) {
LOG_WAL_ERROR_ << "load file error";
return false;
}
return true;
}
void
MXLogBuffer::SetFileNoFrom(uint32_t file_no) {
file_no_from_ = file_no;
if (file_no > 0) {
// remove the files whose No. are less than file_no
MXLogFileHandler file_handler(mxlog_writer_.GetFilePath());
do {
file_handler.SetFileName(ToFileName(--file_no));
if (!file_handler.FileExists()) {
break;
}
LOG_WAL_INFO_ << "Delete wal file " << file_no;
file_handler.DeleteFile();
} while (file_no > 0);
}
}
void
MXLogBuffer::RemoveOldFiles(uint64_t flushed_lsn) {
uint32_t file_no;
uint32_t offset;
ParserLsn(flushed_lsn, file_no, offset);
if (file_no_from_ < file_no) {
MXLogFileHandler file_handler(mxlog_writer_.GetFilePath());
do {
file_handler.SetFileName(ToFileName(file_no_from_));
LOG_WAL_INFO_ << "Delete wal file " << file_no_from_;
file_handler.DeleteFile();
} while (++file_no_from_ < file_no);
}
}
} // namespace wal
} // namespace engine
} // namespace milvus

View File

@ -1,126 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <atomic>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include "WalDefinations.h"
#include "WalFileHandler.h"
#include "WalMetaHandler.h"
#include "utils/Error.h"
namespace milvus {
namespace engine {
namespace wal {
#pragma pack(push)
#pragma pack(1)
struct MXLogRecordHeader {
uint64_t mxl_lsn; // log sequence number (high 32 bits: file No. inc by 1, low 32 bits: offset in file, max 4GB)
uint8_t mxl_type; // record type, insert/delete/update/flush...
uint16_t collection_id_size;
uint16_t partition_tag_size;
uint32_t vector_num;
uint32_t data_size;
};
const uint32_t SizeOfMXLogRecordHeader = sizeof(MXLogRecordHeader);
struct MXLogAttrRecordHeader {
uint32_t attr_num;
std::vector<uint64_t> field_name_size;
std::vector<uint64_t> attr_size;
std::vector<uint64_t> attr_nbytes;
};
#pragma pack(pop)
struct MXLogBufferHandler {
uint32_t max_offset;
uint32_t file_no;
uint32_t buf_offset;
uint8_t buf_idx;
};
using BufferPtr = std::shared_ptr<char[]>;
class MXLogBuffer {
public:
MXLogBuffer(const std::string& mxlog_path, const uint32_t buffer_size);
~MXLogBuffer();
bool
Init(uint64_t read_lsn, uint64_t write_lsn);
// ignore all old wal file
void
Reset(uint64_t lsn);
// Note: record.lsn will be set inner
ErrorCode
Append(MXLogRecord& record);
ErrorCode
AppendEntity(MXLogRecord& record);
ErrorCode
Next(const uint64_t last_applied_lsn, MXLogRecord& record);
ErrorCode
NextEntity(const uint64_t last_applied_lsn, MXLogRecord& record);
uint64_t
GetReadLsn();
bool
ResetWriteLsn(uint64_t lsn);
void
SetFileNoFrom(uint32_t file_no);
void
RemoveOldFiles(uint64_t flushed_lsn);
uint32_t
GetBufferSize();
uint32_t
SurplusSpace();
private:
uint32_t
RecordSize(const MXLogRecord& record);
uint32_t
EntityRecordSize(const milvus::engine::wal::MXLogRecord& record, uint32_t attr_num,
std::vector<uint32_t>& field_name_size);
private:
uint32_t mxlog_buffer_size_; // from config
BufferPtr buf_[2];
std::mutex mutex_;
uint32_t file_no_from_;
MXLogBufferHandler mxlog_buffer_reader_;
MXLogBufferHandler mxlog_buffer_writer_;
MXLogFileHandler mxlog_writer_;
};
using MXLogBufferPtr = std::shared_ptr<MXLogBuffer>;
} // namespace wal
} // namespace engine
} // namespace milvus

View File

@ -1,59 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/Types.h"
#include "segment/Segment.h"
namespace milvus {
namespace engine {
namespace wal {
#define UNIT_MB (1024 * 1024)
#define UNIT_B 1
#define LSN_OFFSET_MASK 0x00000000ffffffff
enum class MXLogType { None, InsertBinary, InsertVector, Delete, Update, Flush, Entity };
struct MXLogRecord {
uint64_t lsn;
MXLogType type;
std::string collection_id;
std::string partition_tag;
uint32_t length;
const id_t* ids;
uint32_t data_size;
const void* data;
std::vector<std::string> field_names; // will be removed
// std::vector<uint32_t> attrs_size;
// std::vector<const void* > attrs_data;
std::unordered_map<std::string, uint64_t> attr_nbytes; // will be removed
std::unordered_map<std::string, uint64_t> attr_data_size; // will be removed
std::unordered_map<std::string, std::vector<uint8_t>> attr_data; // will be removed
engine::DataChunkPtr data_chunk; // for hybird data transfer
};
struct MXLogConfiguration {
bool recovery_error_ignore;
uint32_t buffer_size;
std::string mxlog_path;
};
} // namespace wal
} // namespace engine
} // namespace milvus

View File

@ -1,146 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/wal/WalFileHandler.h"
#include "utils/Log.h"
#include <sys/stat.h>
#include <unistd.h>
namespace milvus {
namespace engine {
namespace wal {
MXLogFileHandler::MXLogFileHandler(const std::string& mxlog_path) : file_path_(mxlog_path), p_file_(nullptr) {
}
MXLogFileHandler::~MXLogFileHandler() {
CloseFile();
}
bool
MXLogFileHandler::OpenFile() {
if (p_file_ == nullptr) {
p_file_ = fopen((file_path_ + file_name_).c_str(), file_mode_.c_str());
}
return (p_file_ != nullptr);
}
uint32_t
MXLogFileHandler::Load(char* buf, uint32_t data_offset) {
uint32_t read_size = 0;
if (OpenFile()) {
uint32_t file_size = GetFileSize();
if (file_size > data_offset) {
read_size = file_size - data_offset;
fseek(p_file_, data_offset, SEEK_SET);
auto ret = fread(buf, 1, read_size, p_file_);
if (ret != read_size) {
LOG_WAL_ERROR_ << LogOut("MXLogFileHandler::Load error, expect read %d but read %d", read_size, ret);
}
}
}
return read_size;
}
bool
MXLogFileHandler::Load(char* buf, uint32_t data_offset, uint32_t data_size) {
if (OpenFile() && data_size != 0) {
auto file_size = GetFileSize();
if ((file_size < data_offset) || (file_size - data_offset < data_size)) {
return false;
}
fseek(p_file_, data_offset, SEEK_SET);
size_t ret = fread(buf, 1, data_size, p_file_);
if (ret != data_size) {
LOG_WAL_ERROR_ << LogOut("MXLogFileHandler::Load error, expect read %d but read %d", data_size, ret);
}
}
return true;
}
bool
MXLogFileHandler::Write(char* buf, uint32_t data_size, bool is_sync) {
uint32_t written_size = 0;
if (OpenFile() && data_size != 0) {
written_size = fwrite(buf, 1, data_size, p_file_);
fflush(p_file_);
}
return (written_size == data_size);
}
bool
MXLogFileHandler::ReBorn(const std::string& file_name, const std::string& open_mode) {
CloseFile();
SetFileName(file_name);
SetFileOpenMode(open_mode);
return OpenFile();
}
bool
MXLogFileHandler::CloseFile() {
if (p_file_ != nullptr) {
fclose(p_file_);
p_file_ = nullptr;
}
return true;
}
std::string
MXLogFileHandler::GetFilePath() {
return file_path_;
}
std::string
MXLogFileHandler::GetFileName() {
return file_name_;
}
uint32_t
MXLogFileHandler::GetFileSize() {
struct stat statbuf;
if (0 == stat((file_path_ + file_name_).c_str(), &statbuf)) {
return (uint32_t)statbuf.st_size;
}
return 0;
}
void
MXLogFileHandler::DeleteFile() {
remove((file_path_ + file_name_).c_str());
file_name_ = "";
}
bool
MXLogFileHandler::FileExists() {
return access((file_path_ + file_name_).c_str(), 0) != -1;
}
void
MXLogFileHandler::SetFileOpenMode(const std::string& open_mode) {
file_mode_ = open_mode;
}
void
MXLogFileHandler::SetFileName(const std::string& file_name) {
file_name_ = file_name;
}
void
MXLogFileHandler::SetFilePath(const std::string& file_path) {
file_path_ = file_path;
}
} // namespace wal
} // namespace engine
} // namespace milvus

View File

@ -1,65 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <string>
#include "WalDefinations.h"
namespace milvus {
namespace engine {
namespace wal {
class MXLogFileHandler {
public:
explicit MXLogFileHandler(const std::string& mxlog_path);
~MXLogFileHandler();
std::string
GetFilePath();
std::string
GetFileName();
bool
OpenFile();
bool
CloseFile();
uint32_t
Load(char* buf, uint32_t data_offset);
bool
Load(char* buf, uint32_t data_offset, uint32_t data_size);
bool
Write(char* buf, uint32_t data_size, bool is_sync = false);
bool
ReBorn(const std::string& file_name, const std::string& open_mode);
uint32_t
GetFileSize();
void
SetFileOpenMode(const std::string& open_mode);
void
SetFilePath(const std::string& file_path);
void
SetFileName(const std::string& file_name);
void
DeleteFile();
bool
FileExists();
private:
std::string file_path_;
std::string file_name_;
std::string file_mode_;
FILE* p_file_;
};
} // namespace wal
} // namespace engine
} // namespace milvus

View File

@ -10,693 +10,77 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/wal/WalManager.h"
#include <unistd.h>
#include <algorithm>
#include <memory>
#include <unordered_map>
#include "config/ServerConfig.h"
#include "db/snapshot/Snapshots.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "db/Utils.h"
namespace milvus {
namespace engine {
namespace wal {
WalManager::WalManager(const MXLogConfiguration& config) {
__glibcxx_assert(config.buffer_size <= milvus::server::CONFIG_WAL_BUFFER_SIZE_MAX / 2);
__glibcxx_assert(config.buffer_size >= milvus::server::CONFIG_WAL_BUFFER_SIZE_MIN / 2);
mxlog_config_.recovery_error_ignore = config.recovery_error_ignore;
mxlog_config_.buffer_size = config.buffer_size;
mxlog_config_.mxlog_path = config.mxlog_path;
// check the path end with '/'
if (mxlog_config_.mxlog_path.back() != '/') {
mxlog_config_.mxlog_path += '/';
}
// check path exist
auto status = CommonUtil::CreateDirectory(mxlog_config_.mxlog_path);
if (!status.ok()) {
std::string msg = "failed to create wal directory " + mxlog_config_.mxlog_path;
LOG_ENGINE_ERROR_ << msg;
throw Exception(WAL_PATH_ERROR, msg);
}
WalManager::WalManager() {
wal_path_ = config.wal.path();
wal_buffer_size_ = config.wal.buffer_size();
insert_buffer_size_ = config.cache.insert_buffer_size();
}
WalManager::~WalManager() {
WalManager&
WalManager::GetInstance() {
static WalManager s_mgr;
return s_mgr;
}
ErrorCode
WalManager::Init() {
uint64_t applied_lsn = 0;
p_meta_handler_ = std::make_shared<MXLogMetaHandler>(mxlog_config_.mxlog_path);
if (p_meta_handler_ != nullptr) {
p_meta_handler_->GetMXLogInternalMeta(applied_lsn);
Status
WalManager::RecordOperation(const WalOperationPtr& operation, const DBPtr& db) {
if (operation == nullptr) {
return Status(DB_ERROR, "Wal operation is null pointer");
}
uint64_t recovery_start = 0;
std::vector<std::string> collection_names;
auto status = snapshot::Snapshots::GetInstance().GetCollectionNames(collection_names);
if (!status.ok()) {
return WAL_META_ERROR;
}
if (!collection_names.empty()) {
u_int64_t min_flushed_lsn = ~(u_int64_t)0;
u_int64_t max_flushed_lsn = 0;
auto update_limit_lsn = [&](u_int64_t lsn) {
if (min_flushed_lsn > lsn) {
min_flushed_lsn = lsn;
}
if (max_flushed_lsn < lsn) {
max_flushed_lsn = lsn;
}
};
for (auto& col_name : collection_names) {
auto& collection = collections_[col_name];
auto& default_part = collection[""];
snapshot::ScopedSnapshotT ss;
status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, col_name);
if (!status.ok()) {
return WAL_META_ERROR;
}
default_part.flush_lsn = ss->GetMaxLsn();
update_limit_lsn(default_part.flush_lsn);
std::vector<std::string> partition_names = ss->GetPartitionNames();
for (auto& part_name : partition_names) {
auto& partition = collection[part_name];
snapshot::PartitionPtr ss_part = ss->GetPartition(part_name);
if (ss_part == nullptr) {
return WAL_META_ERROR;
}
partition.flush_lsn = ss_part->GetLsn();
update_limit_lsn(partition.flush_lsn);
}
}
if (applied_lsn < max_flushed_lsn) {
// a new WAL folder?
applied_lsn = max_flushed_lsn;
}
if (recovery_start < min_flushed_lsn) {
// not flush all yet
recovery_start = min_flushed_lsn;
}
for (auto& col : collections_) {
for (auto& part : col.second) {
part.second.wal_lsn = applied_lsn;
}
}
}
// all tables are droped and a new wal path?
if (applied_lsn < recovery_start) {
applied_lsn = recovery_start;
}
ErrorCode error_code = WAL_ERROR;
p_buffer_ = std::make_shared<MXLogBuffer>(mxlog_config_.mxlog_path, mxlog_config_.buffer_size);
if (p_buffer_ != nullptr) {
if (p_buffer_->Init(recovery_start, applied_lsn)) {
error_code = WAL_SUCCESS;
} else if (mxlog_config_.recovery_error_ignore) {
p_buffer_->Reset(applied_lsn);
error_code = WAL_SUCCESS;
} else {
error_code = WAL_FILE_ERROR;
}
}
// buffer size may changed
mxlog_config_.buffer_size = p_buffer_->GetBufferSize();
last_applied_lsn_ = applied_lsn;
return error_code;
}
ErrorCode
WalManager::GetNextRecovery(MXLogRecord& record) {
ErrorCode error_code = WAL_SUCCESS;
while (true) {
error_code = p_buffer_->Next(last_applied_lsn_, record);
if (error_code != WAL_SUCCESS) {
if (mxlog_config_.recovery_error_ignore) {
// reset and break recovery
p_buffer_->Reset(last_applied_lsn_);
record.type = MXLogType::None;
error_code = WAL_SUCCESS;
}
Status status;
switch (operation->Type()) {
case WalOperationType::INSERT_ENTITY: {
InsertEntityOperationPtr op = std::static_pointer_cast<InsertEntityOperation>(operation);
status = RecordInsertOperation(op, db);
break;
}
if (record.type == MXLogType::None) {
case WalOperationType::DELETE_ENTITY: {
DeleteEntityOperationPtr op = std::static_pointer_cast<DeleteEntityOperation>(operation);
status = RecordDeleteOperation(op, db);
break;
}
// background thread has not started.
// so, needn't lock here.
auto it_col = collections_.find(record.collection_id);
if (it_col != collections_.end()) {
auto it_part = it_col->second.find(record.partition_tag);
if (it_part->second.flush_lsn < record.lsn) {
break;
}
}
}
// print the log only when record.type != MXLogType::None
if (record.type != MXLogType::None) {
LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " record lsn " << record.lsn << " error code "
<< error_code;
}
return error_code;
}
ErrorCode
WalManager::GetNextEntityRecovery(milvus::engine::wal::MXLogRecord& record) {
ErrorCode error_code = WAL_SUCCESS;
while (true) {
error_code = p_buffer_->NextEntity(last_applied_lsn_, record);
if (error_code != WAL_SUCCESS) {
if (mxlog_config_.recovery_error_ignore) {
// reset and break recovery
p_buffer_->Reset(last_applied_lsn_);
record.type = MXLogType::None;
error_code = WAL_SUCCESS;
}
default:
break;
}
if (record.type == MXLogType::None) {
break;
}
// background thread has not started.
// so, needn't lock here.
auto it_col = collections_.find(record.collection_id);
if (it_col != collections_.end()) {
auto it_part = it_col->second.find(record.partition_tag);
if (it_part->second.flush_lsn < record.lsn) {
break;
}
}
}
// print the log only when record.type != MXLogType::None
if (record.type != MXLogType::None) {
LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " record lsn " << record.lsn << " error code "
<< error_code;
}
return error_code;
return Status::OK();
}
ErrorCode
WalManager::GetNextRecord(MXLogRecord& record) {
auto check_flush = [&]() -> bool {
std::lock_guard<std::mutex> lck(mutex_);
if (flush_info_.IsValid()) {
if (p_buffer_->GetReadLsn() >= flush_info_.lsn_) {
// can exec flush requirement
record.type = MXLogType::Flush;
record.collection_id = flush_info_.collection_id_;
record.lsn = flush_info_.lsn_;
flush_info_.Clear();
Status
WalManager::RecordInsertOperation(const InsertEntityOperationPtr& operation, const DBPtr& db) {
std::vector<DataChunkPtr> chunks;
SplitChunk(operation->data_chunk_, chunks);
LOG_WAL_INFO_ << "record flush collection " << record.collection_id << " lsn " << record.lsn;
return true;
}
}
return false;
};
if (check_flush()) {
return WAL_SUCCESS;
}
ErrorCode error_code = WAL_SUCCESS;
while (WAL_SUCCESS == p_buffer_->Next(last_applied_lsn_, record)) {
if (record.type == MXLogType::None) {
if (check_flush()) {
return WAL_SUCCESS;
}
break;
}
std::lock_guard<std::mutex> lck(mutex_);
auto it_col = collections_.find(record.collection_id);
if (it_col != collections_.end()) {
auto it_part = it_col->second.find(record.partition_tag);
if (it_part->second.flush_lsn < record.lsn) {
break;
}
}
}
if (record.type != MXLogType::None) {
LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " collection " << record.collection_id << " lsn "
<< record.lsn;
}
return error_code;
return Status::OK();
}
ErrorCode
WalManager::GetNextEntityRecord(milvus::engine::wal::MXLogRecord& record) {
auto check_flush = [&]() -> bool {
std::lock_guard<std::mutex> lck(mutex_);
if (flush_info_.IsValid()) {
if (p_buffer_->GetReadLsn() >= flush_info_.lsn_) {
// can exec flush requirement
record.type = MXLogType::Flush;
record.collection_id = flush_info_.collection_id_;
record.lsn = flush_info_.lsn_;
flush_info_.Clear();
LOG_WAL_INFO_ << "record flush collection " << record.collection_id << " lsn " << record.lsn;
return true;
}
}
return false;
};
if (check_flush()) {
return WAL_SUCCESS;
}
ErrorCode error_code = WAL_SUCCESS;
while (WAL_SUCCESS == p_buffer_->NextEntity(last_applied_lsn_, record)) {
if (record.type == MXLogType::None) {
if (check_flush()) {
return WAL_SUCCESS;
}
break;
}
std::lock_guard<std::mutex> lck(mutex_);
auto it_col = collections_.find(record.collection_id);
if (it_col != collections_.end()) {
auto it_part = it_col->second.find(record.partition_tag);
if (it_part->second.flush_lsn < record.lsn) {
break;
}
}
}
LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " collection " << record.collection_id << " lsn "
<< record.lsn;
return error_code;
}
uint64_t
WalManager::CreateCollection(const std::string& collection_id) {
LOG_WAL_INFO_ << "create collection " << collection_id << " " << last_applied_lsn_;
std::lock_guard<std::mutex> lck(mutex_);
uint64_t applied_lsn = last_applied_lsn_;
collections_[collection_id][""] = {applied_lsn, applied_lsn};
return applied_lsn;
}
uint64_t
WalManager::CreatePartition(const std::string& collection_id, const std::string& partition_tag) {
LOG_WAL_INFO_ << "create collection " << collection_id << " " << partition_tag << " " << last_applied_lsn_;
std::lock_guard<std::mutex> lck(mutex_);
uint64_t applied_lsn = last_applied_lsn_;
collections_[collection_id][partition_tag] = {applied_lsn, applied_lsn};
return applied_lsn;
}
uint64_t
WalManager::CreateHybridCollection(const std::string& collection_id) {
LOG_WAL_INFO_ << "create hybrid collection " << collection_id << " " << last_applied_lsn_;
std::lock_guard<std::mutex> lck(mutex_);
uint64_t applied_lsn = last_applied_lsn_;
collections_[collection_id][""] = {applied_lsn, applied_lsn};
return applied_lsn;
}
void
WalManager::DropCollection(const std::string& collection_id) {
LOG_WAL_INFO_ << "drop collection " << collection_id;
std::lock_guard<std::mutex> lck(mutex_);
collections_.erase(collection_id);
}
void
WalManager::DropPartition(const std::string& collection_id, const std::string& partition_tag) {
LOG_WAL_INFO_ << collection_id << " drop partition " << partition_tag;
std::lock_guard<std::mutex> lck(mutex_);
auto it = collections_.find(collection_id);
if (it != collections_.end()) {
it->second.erase(partition_tag);
}
}
void
WalManager::CollectionFlushed(const std::string& collection_id, uint64_t lsn) {
std::unique_lock<std::mutex> lck(mutex_);
if (collection_id.empty()) {
// all collections
for (auto& col : collections_) {
for (auto& part : col.second) {
part.second.flush_lsn = lsn;
}
}
Status
WalManager::SplitChunk(const DataChunkPtr& chunk, std::vector<DataChunkPtr>& chunks) {
int64_t chunk_size = utils::GetSizeOfChunk(chunk);
if (chunk_size > insert_buffer_size_) {
} else {
// one collection
auto it_col = collections_.find(collection_id);
if (it_col != collections_.end()) {
for (auto& part : it_col->second) {
part.second.flush_lsn = lsn;
}
}
chunks.push_back(chunk);
}
lck.unlock();
LOG_WAL_INFO_ << collection_id << " is flushed by lsn " << lsn;
return Status::OK();
}
void
WalManager::PartitionFlushed(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn) {
std::unique_lock<std::mutex> lck(mutex_);
auto it_col = collections_.find(collection_id);
if (it_col != collections_.end()) {
auto it_part = it_col->second.find(partition_tag);
if (it_part != it_col->second.end()) {
it_part->second.flush_lsn = lsn;
}
}
lck.unlock();
LOG_WAL_INFO_ << collection_id << " " << partition_tag << " is flushed by lsn " << lsn;
Status
WalManager::RecordDeleteOperation(const DeleteEntityOperationPtr& operation, const DBPtr& db) {
return Status::OK();
}
void
WalManager::CollectionUpdated(const std::string& collection_id, uint64_t lsn) {
std::unique_lock<std::mutex> lck(mutex_);
auto it_col = collections_.find(collection_id);
if (it_col != collections_.end()) {
for (auto& part : it_col->second) {
part.second.wal_lsn = lsn;
}
}
lck.unlock();
Status
WalManager::OperationDone(id_t op_id) {
return Status::OK();
}
void
WalManager::PartitionUpdated(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn) {
std::unique_lock<std::mutex> lck(mutex_);
auto it_col = collections_.find(collection_id);
if (it_col != collections_.end()) {
auto it_part = it_col->second.find(partition_tag);
if (it_part != it_col->second.end()) {
it_part->second.wal_lsn = lsn;
}
}
lck.unlock();
}
template <typename T>
bool
WalManager::Insert(const std::string& collection_id, const std::string& partition_tag, const IDNumbers& vector_ids,
const std::vector<T>& vectors) {
MXLogType log_type;
if (std::is_same<T, float>::value) {
log_type = MXLogType::InsertVector;
} else if (std::is_same<T, uint8_t>::value) {
log_type = MXLogType::InsertBinary;
} else {
return false;
}
size_t vector_num = vector_ids.size();
if (vector_num == 0) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld] The ids is empty.", "insert", 0);
return false;
}
size_t dim = vectors.size() / vector_num;
size_t unit_size = dim * sizeof(T) + sizeof(id_t);
size_t head_size = SizeOfMXLogRecordHeader + collection_id.length() + partition_tag.length();
MXLogRecord record;
record.type = log_type;
record.collection_id = collection_id;
record.partition_tag = partition_tag;
uint64_t new_lsn = 0;
for (size_t i = 0; i < vector_num; i += record.length) {
size_t surplus_space = p_buffer_->SurplusSpace();
size_t max_rcd_num = 0;
if (surplus_space >= head_size + unit_size) {
max_rcd_num = (surplus_space - head_size) / unit_size;
} else {
max_rcd_num = (mxlog_config_.buffer_size - head_size) / unit_size;
}
if (max_rcd_num == 0) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld]", "insert", 0) << "Wal buffer size is too small "
<< mxlog_config_.buffer_size << " unit " << unit_size;
return false;
}
record.length = std::min(vector_num - i, max_rcd_num);
record.ids = vector_ids.data() + i;
record.data_size = record.length * dim * sizeof(T);
record.data = vectors.data() + i * dim;
auto error_code = p_buffer_->Append(record);
if (error_code != WAL_SUCCESS) {
p_buffer_->ResetWriteLsn(last_applied_lsn_);
return false;
}
new_lsn = record.lsn;
}
last_applied_lsn_ = new_lsn;
PartitionUpdated(collection_id, partition_tag, new_lsn);
LOG_WAL_INFO_ << LogOut("[%s][%ld]", "insert", 0) << collection_id << " insert in part " << partition_tag
<< " with lsn " << new_lsn;
return p_meta_handler_->SetMXLogInternalMeta(new_lsn);
}
template <typename T>
bool
WalManager::InsertEntities(const std::string& collection_id, const std::string& partition_tag,
const milvus::engine::IDNumbers& entity_ids, const std::vector<T>& vectors,
const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, std::vector<uint8_t>>& attrs) {
MXLogType log_type;
if (std::is_same<T, float>::value) {
log_type = MXLogType::InsertVector;
} else if (std::is_same<T, uint8_t>::value) {
log_type = MXLogType::InsertBinary;
}
size_t entity_num = entity_ids.size();
if (entity_num == 0) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld] The ids is empty.", "insert", 0);
return false;
}
size_t dim = vectors.size() / entity_num;
MXLogRecord record;
size_t attr_unit_size = 0;
auto attr_it = attr_nbytes.begin();
for (; attr_it != attr_nbytes.end(); attr_it++) {
record.field_names.emplace_back(attr_it->first);
attr_unit_size += attr_it->second;
}
size_t unit_size = dim * sizeof(T) + sizeof(id_t) + attr_unit_size;
size_t head_size = SizeOfMXLogRecordHeader + collection_id.length() + partition_tag.length();
// TODO(yukun): field_name put into MXLogRecord??
record.type = log_type;
record.collection_id = collection_id;
record.partition_tag = partition_tag;
record.attr_nbytes = attr_nbytes;
uint64_t new_lsn = 0;
for (size_t i = 0; i < entity_num; i += record.length) {
size_t surplus_space = p_buffer_->SurplusSpace();
size_t max_rcd_num = 0;
if (surplus_space >= head_size + unit_size) {
max_rcd_num = (surplus_space - head_size) / unit_size;
} else {
max_rcd_num = (mxlog_config_.buffer_size - head_size) / unit_size;
}
if (max_rcd_num == 0) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld]", "insert", 0) << "Wal buffer size is too small "
<< mxlog_config_.buffer_size << " unit " << unit_size;
return false;
}
size_t length = std::min(entity_num - i, max_rcd_num);
record.length = length;
record.ids = entity_ids.data() + i;
record.data_size = record.length * dim * sizeof(T);
record.data = vectors.data() + i * dim;
record.attr_data.clear();
record.attr_data_size.clear();
for (auto field_name : record.field_names) {
size_t attr_size = length * attr_nbytes.at(field_name);
record.attr_data_size.insert(std::make_pair(field_name, attr_size));
std::vector<uint8_t> attr_data(attr_size, 0);
memcpy(attr_data.data(), attrs.at(field_name).data() + i * attr_nbytes.at(field_name), attr_size);
record.attr_data.insert(std::make_pair(field_name, attr_data));
}
auto error_code = p_buffer_->AppendEntity(record);
if (error_code != WAL_SUCCESS) {
p_buffer_->ResetWriteLsn(last_applied_lsn_);
return false;
}
new_lsn = record.lsn;
}
last_applied_lsn_ = new_lsn;
PartitionUpdated(collection_id, partition_tag, new_lsn);
LOG_WAL_INFO_ << LogOut("[%s][%ld]", "insert", 0) << collection_id << " insert in part " << partition_tag
<< " with lsn " << new_lsn;
return p_meta_handler_->SetMXLogInternalMeta(new_lsn);
}
bool
WalManager::DeleteById(const std::string& collection_id, const IDNumbers& entity_ids) {
size_t entity_num = entity_ids.size();
if (entity_num == 0) {
LOG_WAL_ERROR_ << "The ids is empty.";
return false;
}
size_t unit_size = sizeof(id_t);
size_t head_size = SizeOfMXLogRecordHeader + collection_id.length();
MXLogRecord record;
record.type = MXLogType::Delete;
record.collection_id = collection_id;
record.partition_tag = "";
uint64_t new_lsn = 0;
for (size_t i = 0; i < entity_num; i += record.length) {
size_t surplus_space = p_buffer_->SurplusSpace();
size_t max_rcd_num = 0;
if (surplus_space >= head_size + unit_size) {
max_rcd_num = (surplus_space - head_size) / unit_size;
} else {
max_rcd_num = (mxlog_config_.buffer_size - head_size) / unit_size;
}
record.length = std::min(entity_num - i, max_rcd_num);
record.ids = entity_ids.data() + i;
record.data_size = 0;
record.data = nullptr;
auto error_code = p_buffer_->AppendEntity(record);
if (error_code != WAL_SUCCESS) {
p_buffer_->ResetWriteLsn(last_applied_lsn_);
return false;
}
new_lsn = record.lsn;
}
last_applied_lsn_ = new_lsn;
CollectionUpdated(collection_id, new_lsn);
LOG_WAL_INFO_ << collection_id << " delete rows by id, lsn " << new_lsn;
return p_meta_handler_->SetMXLogInternalMeta(new_lsn);
}
uint64_t
WalManager::Flush(const std::string& collection_id) {
std::lock_guard<std::mutex> lck(mutex_);
// At most one flush requirement is waiting at any time.
// Otherwise, flush_info_ should be modified to a list.
__glibcxx_assert(!flush_info_.IsValid());
uint64_t lsn = 0;
if (collection_id.empty()) {
// flush all tables
for (auto& col : collections_) {
for (auto& part : col.second) {
if (part.second.wal_lsn > part.second.flush_lsn) {
lsn = last_applied_lsn_;
break;
}
}
}
} else {
// flush one collection
auto it_col = collections_.find(collection_id);
if (it_col != collections_.end()) {
for (auto& part : it_col->second) {
auto wal_lsn = part.second.wal_lsn;
auto flush_lsn = part.second.flush_lsn;
if (wal_lsn > flush_lsn && wal_lsn > lsn) {
lsn = wal_lsn;
}
}
}
}
if (lsn != 0) {
flush_info_.collection_id_ = collection_id;
flush_info_.lsn_ = lsn;
}
LOG_WAL_INFO_ << collection_id << " want to be flush, lsn " << lsn;
return lsn;
}
void
WalManager::RemoveOldFiles(uint64_t flushed_lsn) {
if (p_buffer_ != nullptr) {
p_buffer_->RemoveOldFiles(flushed_lsn);
}
}
template bool
WalManager::Insert<float>(const std::string& collection_id, const std::string& partition_tag,
const IDNumbers& vector_ids, const std::vector<float>& vectors);
template bool
WalManager::Insert<uint8_t>(const std::string& collection_id, const std::string& partition_tag,
const IDNumbers& vector_ids, const std::vector<uint8_t>& vectors);
template bool
WalManager::InsertEntities<float>(const std::string& collection_id, const std::string& partition_tag,
const milvus::engine::IDNumbers& entity_ids, const std::vector<float>& vectors,
const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, std::vector<uint8_t>>& attrs);
template bool
WalManager::InsertEntities<uint8_t>(const std::string& collection_id, const std::string& partition_tag,
const milvus::engine::IDNumbers& entity_ids, const std::vector<uint8_t>& vectors,
const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, std::vector<uint8_t>>& attrs);
} // namespace wal
} // namespace engine
} // namespace milvus

View File

@ -11,219 +11,53 @@
#pragma once
#include <atomic>
#include <map>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "db/DB.h"
#include "db/IDGenerator.h"
#include "db/Types.h"
#include "db/wal/WalOperation.h"
#include "utils/Status.h"
#include "WalBuffer.h"
#include "WalDefinations.h"
#include "WalFileHandler.h"
#include "WalMetaHandler.h"
#include "utils/Error.h"
#include <string>
#include <vector>
namespace milvus {
namespace engine {
namespace wal {
class WalManager {
public:
explicit WalManager(const MXLogConfiguration& config);
~WalManager();
WalManager();
ErrorCode
Init();
/*
* Get next recovery
* @param record[out]: record
* @retval error_code
*/
ErrorCode
GetNextRecovery(MXLogRecord& record);
ErrorCode
GetNextEntityRecovery(MXLogRecord& record);
/*
* Get next record
* @param record[out]: record
* @retval error_code
*/
ErrorCode
GetNextRecord(MXLogRecord& record);
ErrorCode
GetNextEntityRecord(MXLogRecord& record);
/*
* Create collection
* @param collection_id: collection id
* @retval lsn
*/
uint64_t
CreateCollection(const std::string& collection_id);
/*
* Create partition
* @param collection_id: collection id
* @param partition_tag: partition tag
* @retval lsn
*/
uint64_t
CreatePartition(const std::string& collection_id, const std::string& partition_tag);
/*
* Create hybrid collection
* @param collection_id: collection id
* @retval lsn
*/
uint64_t
CreateHybridCollection(const std::string& collection_id);
/*
* Drop collection
* @param collection_id: collection id
* @retval none
*/
void
DropCollection(const std::string& collection_id);
/*
* Drop partition
* @param collection_id: collection id
* @param partition_tag: partition tag
* @retval none
*/
void
DropPartition(const std::string& collection_id, const std::string& partition_tag);
/*
* Collection is flushed (update flushed_lsn)
* @param collection_id: collection id
* @param lsn: flushed lsn
*/
void
CollectionFlushed(const std::string& collection_id, uint64_t lsn);
/*
* Partition is flushed (update flushed_lsn)
* @param collection_id: collection id
* @param partition_tag: partition_tag
* @param lsn: flushed lsn
*/
void
PartitionFlushed(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn);
/*
* Collection is updated (update wal_lsn)
* @param collection_id: collection id
* @param partition_tag: partition_tag
* @param lsn: flushed lsn
*/
void
CollectionUpdated(const std::string& collection_id, uint64_t lsn);
/*
* Partition is updated (update wal_lsn)
* @param collection_id: collection id
* @param partition_tag: partition_tag
* @param lsn: flushed lsn
*/
void
PartitionUpdated(const std::string& collection_id, const std::string& partition_tag, uint64_t lsn);
/*
* Insert
* @param collection_id: collection id
* @param collection_id: partition tag
* @param vector_ids: vector ids
* @param vectors: vectors
*/
template <typename T>
bool
Insert(const std::string& collection_id, const std::string& partition_tag, const IDNumbers& vector_ids,
const std::vector<T>& vectors);
/*
* Insert
* @param collection_id: collection id
* @param partition_tag: partition tag
* @param vector_ids: vector ids
* @param vectors: vectors
* @param attrs: attributes
*/
template <typename T>
bool
InsertEntities(const std::string& collection_id, const std::string& partition_tag,
const milvus::engine::IDNumbers& entity_ids, const std::vector<T>& vectors,
const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, std::vector<uint8_t>>& attrs);
/*
* Insert
* @param collection_id: collection id
* @param vector_ids: vector ids
*/
bool
DeleteById(const std::string& collection_id, const IDNumbers& vector_ids);
/*
* Get flush lsn
* @param collection_id: collection id (empty means all tables)
* @retval if there is something not flushed, return lsn;
* else, return 0
*/
uint64_t
Flush(const std::string& collection_id = "");
static WalManager&
GetInstance();
void
RemoveOldFiles(uint64_t flushed_lsn);
SetWalPath(const std::string& path) {
wal_path_ = path;
}
Status
RecordOperation(const WalOperationPtr& operation, const DBPtr& db);
Status
OperationDone(id_t op_id);
private:
WalManager
operator=(WalManager&);
Status
RecordInsertOperation(const InsertEntityOperationPtr& operation, const DBPtr& db);
MXLogConfiguration mxlog_config_;
Status
RecordDeleteOperation(const DeleteEntityOperationPtr& operation, const DBPtr& db);
MXLogBufferPtr p_buffer_;
MXLogMetaHandlerPtr p_meta_handler_;
Status
SplitChunk(const DataChunkPtr& chunk, std::vector<DataChunkPtr>& chunks);
struct TableLsn {
uint64_t flush_lsn;
uint64_t wal_lsn;
};
std::mutex mutex_;
std::map<std::string, std::map<std::string, TableLsn>> collections_;
std::atomic<uint64_t> last_applied_lsn_;
private:
SafeIDGenerator id_gen_;
// if multi-thread call Flush(), use list
struct FlushInfo {
std::string collection_id_;
uint64_t lsn_ = 0;
bool
IsValid() {
return (lsn_ != 0);
}
void
Clear() {
lsn_ = 0;
}
};
FlushInfo flush_info_;
std::string wal_path_;
int64_t wal_buffer_size_ = 0;
int64_t insert_buffer_size_ = 0;
};
extern template bool
WalManager::Insert<float>(const std::string& collection_id, const std::string& partition_tag,
const IDNumbers& vector_ids, const std::vector<float>& vectors);
extern template bool
WalManager::Insert<uint8_t>(const std::string& collection_id, const std::string& partition_tag,
const IDNumbers& vector_ids, const std::vector<uint8_t>& vectors);
} // namespace wal
} // namespace engine
} // namespace milvus

View File

@ -1,72 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/wal/WalMetaHandler.h"
#include <cstring>
namespace milvus {
namespace engine {
namespace wal {
const char* WAL_META_FILE_NAME = "mxlog.meta";
MXLogMetaHandler::MXLogMetaHandler(const std::string& internal_meta_file_path) {
std::string file_full_path = internal_meta_file_path + WAL_META_FILE_NAME;
wal_meta_fp_ = fopen(file_full_path.c_str(), "r+");
if (wal_meta_fp_ == nullptr) {
wal_meta_fp_ = fopen(file_full_path.c_str(), "w");
} else {
uint64_t all_wal_lsn[3] = {0, 0, 0};
auto rt_val = fread(&all_wal_lsn, sizeof(all_wal_lsn), 1, wal_meta_fp_);
if (rt_val == 1) {
if (all_wal_lsn[2] == all_wal_lsn[1]) {
latest_wal_lsn_ = all_wal_lsn[2];
} else {
latest_wal_lsn_ = all_wal_lsn[0];
}
}
}
}
MXLogMetaHandler::~MXLogMetaHandler() {
if (wal_meta_fp_ != nullptr) {
fclose(wal_meta_fp_);
wal_meta_fp_ = nullptr;
}
}
bool
MXLogMetaHandler::GetMXLogInternalMeta(uint64_t& wal_lsn) {
wal_lsn = latest_wal_lsn_;
return true;
}
bool
MXLogMetaHandler::SetMXLogInternalMeta(uint64_t wal_lsn) {
if (wal_meta_fp_ != nullptr) {
uint64_t all_wal_lsn[3] = {latest_wal_lsn_, wal_lsn, wal_lsn};
fseek(wal_meta_fp_, 0, SEEK_SET);
auto rt_val = fwrite(&all_wal_lsn, sizeof(all_wal_lsn), 1, wal_meta_fp_);
if (rt_val == 1) {
fflush(wal_meta_fp_);
latest_wal_lsn_ = wal_lsn;
return true;
}
}
return false;
}
} // namespace wal
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,30 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/wal/WalOperation.h"
namespace milvus {
namespace engine {
///////////////////////////////////////////////////////////////////////////////////////////////////
WalOperation::WalOperation(WalOperationType type) : type_(type) {
}
///////////////////////////////////////////////////////////////////////////////////////////////////
InsertEntityOperation::InsertEntityOperation() : WalOperation(WalOperationType::INSERT_ENTITY) {
}
///////////////////////////////////////////////////////////////////////////////////////////////////
DeleteEntityOperation::DeleteEntityOperation() : WalOperation(WalOperationType::DELETE_ENTITY) {
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,81 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/Types.h"
#include <memory>
#include <string>
namespace milvus {
namespace engine {
///////////////////////////////////////////////////////////////////////////////////////////////////
enum WalOperationType {
INVALID = 0,
INSERT_ENTITY = 1,
DELETE_ENTITY = 2,
};
///////////////////////////////////////////////////////////////////////////////////////////////////
class WalOperation {
public:
explicit WalOperation(WalOperationType type);
void
SetID(id_t id) {
id_ = id;
}
id_t
ID() const {
return id_;
}
WalOperationType
Type() const {
return type_;
}
protected:
id_t id_ = 0;
WalOperationType type_ = WalOperationType::INVALID;
};
using WalOperationPtr = std::shared_ptr<WalOperation>;
///////////////////////////////////////////////////////////////////////////////////////////////////
class InsertEntityOperation : public WalOperation {
public:
InsertEntityOperation();
public:
std::string collection_name_;
std::string partition_name;
DataChunkPtr data_chunk_;
};
using InsertEntityOperationPtr = std::shared_ptr<InsertEntityOperation>;
///////////////////////////////////////////////////////////////////////////////////////////////////
class DeleteEntityOperation : public WalOperation {
public:
DeleteEntityOperation();
public:
std::string collection_name_;
engine::IDNumbers entity_ids_;
};
using DeleteEntityOperationPtr = std::shared_ptr<DeleteEntityOperation>;
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,28 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/wal/WalOperationCodec.h"
namespace milvus {
namespace engine {
Status
WalOperationCodec::SerializeOperation(const std::string& path, const InsertEntityOperationPtr& operation) {
return Status::OK();
}
Status
WalOperationCodec::SerializeOperation(const std::string& path, const DeleteEntityOperationPtr& operation) {
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -11,39 +11,22 @@
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include "db/Types.h"
#include "db/meta/MetaFactory.h"
#include "db/wal/WalDefinations.h"
#include "db/wal/WalFileHandler.h"
#include "db/wal/WalOperation.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
namespace wal {
extern const char* WAL_META_FILE_NAME;
class MXLogMetaHandler {
class WalOperationCodec {
public:
explicit MXLogMetaHandler(const std::string& internal_meta_file_path);
~MXLogMetaHandler();
static Status
SerializeOperation(const std::string& path, const InsertEntityOperationPtr& operation);
bool
GetMXLogInternalMeta(uint64_t& wal_lsn);
bool
SetMXLogInternalMeta(uint64_t wal_lsn);
private:
FILE* wal_meta_fp_;
uint64_t latest_wal_lsn_ = 0;
static Status
SerializeOperation(const std::string& path, const DeleteEntityOperationPtr& operation);
};
using MXLogMetaHandlerPtr = std::shared_ptr<MXLogMetaHandler>;
} // namespace wal
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,63 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/wal/WalProxy.h"
#include "config/ServerConfig.h"
#include "db/wal/WalManager.h"
#include "db/wal/WalOperation.h"
#include "utils/Exception.h"
namespace milvus {
namespace engine {
WalProxy::WalProxy(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) {
// db must implemented
if (db == nullptr) {
throw Exception(DB_ERROR, "null pointer");
}
}
Status
WalProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
id_t op_id) {
// write operation into disk
InsertEntityOperationPtr op = std::make_shared<InsertEntityOperation>();
op->collection_name_ = collection_name;
op->partition_name = partition_name;
op->data_chunk_ = data_chunk;
return WalManager::GetInstance().RecordOperation(op, db_);
}
Status
WalProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) {
// write operation into disk
DeleteEntityOperationPtr op = std::make_shared<DeleteEntityOperation>();
op->collection_name_ = collection_name;
op->entity_ids_ = entity_ids;
return WalManager::GetInstance().RecordOperation(op, db_);
}
Status
WalProxy::Flush(const std::string& collection_name) {
auto status = db_->Flush(collection_name);
return status;
}
Status
WalProxy::Flush() {
auto status = db_->Flush();
return status;
}
} // namespace engine
} // namespace milvus

View File

@ -20,15 +20,16 @@
namespace milvus {
namespace engine {
class WriteAheadLog : public DBProxy {
class WalProxy : public DBProxy {
public:
WriteAheadLog(const DBPtr& db, const DBOptions& options);
WalProxy(const DBPtr& db, const DBOptions& options);
Status
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) override;
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
id_t op_id) override;
Status
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override;
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) override;
Status
Flush(const std::string& collection_name) override;

View File

@ -1,47 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/wal/WriteAheadLog.h"
#include "utils/Exception.h"
namespace milvus {
namespace engine {
WriteAheadLog::WriteAheadLog(const DBPtr& db, const DBOptions& options) : DBProxy(db, options) {
// db must implemented
if (db == nullptr) {
throw Exception(DB_ERROR, "null pointer");
}
}
Status
WriteAheadLog::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) {
return db_->Insert(collection_name, partition_name, data_chunk);
}
Status
WriteAheadLog::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) {
return db_->DeleteEntityByID(collection_name, entity_ids);
}
Status
WriteAheadLog::Flush(const std::string& collection_name) {
return db_->Flush(collection_name);
}
Status
WriteAheadLog::Flush() {
return db_->Flush();
}
} // namespace engine
} // namespace milvus