mirror of https://github.com/milvus-io/milvus.git
* update Signed-off-by: Zhiru Zhu <zzhu@fandm.edu> * update Signed-off-by: Zhiru Zhu <zzhu@fandm.edu> * update Signed-off-by: Zhiru Zhu <zzhu@fandm.edu>pull/1674/head
parent
51cfc8978a
commit
3de34d3831
|
@ -20,10 +20,10 @@ Please mark all change in change log and use the issue from GitHub
|
|||
- \#1547 Rename storage/file to storage/disk and rename classes
|
||||
- \#1548 Move store/Directory to storage/Operation and add FSHandler
|
||||
- \#1649 Fix Milvus crash on old CPU
|
||||
- \#1619 Improve compact performance
|
||||
|
||||
## Task
|
||||
|
||||
|
||||
# Milvus 0.7.0 (2020-03-11)
|
||||
|
||||
## Bug
|
||||
|
|
|
@ -32,6 +32,9 @@ class DeletedDocsFormat {
|
|||
|
||||
virtual void
|
||||
write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0;
|
||||
|
||||
virtual void
|
||||
readSize(const storage::FSHandlerPtr& fs_ptr, size_t& size) = 0;
|
||||
};
|
||||
|
||||
using DeletedDocsFormatPtr = std::shared_ptr<DeletedDocsFormat>;
|
||||
|
|
|
@ -146,5 +146,35 @@ DefaultDeletedDocsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segme
|
|||
boost::filesystem::rename(temp_path, del_file_path);
|
||||
}
|
||||
|
||||
void
|
||||
DefaultDeletedDocsFormat::readSize(const storage::FSHandlerPtr& fs_ptr, size_t& size) {
|
||||
const std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
|
||||
const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
|
||||
|
||||
int del_fd = open(del_file_path.c_str(), O_RDONLY, 00664);
|
||||
if (del_fd == -1) {
|
||||
std::string err_msg = "Failed to open file: " + del_file_path + ", error: " + std::strerror(errno);
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
|
||||
}
|
||||
|
||||
size_t num_bytes;
|
||||
if (::read(del_fd, &num_bytes, sizeof(size_t)) == -1) {
|
||||
std::string err_msg = "Failed to read from file: " + del_file_path + ", error: " + std::strerror(errno);
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
throw Exception(SERVER_WRITE_ERROR, err_msg);
|
||||
}
|
||||
|
||||
size = num_bytes / sizeof(segment::offset_t);
|
||||
|
||||
if (::close(del_fd) == -1) {
|
||||
std::string err_msg = "Failed to close file: " + del_file_path + ", error: " + std::strerror(errno);
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
throw Exception(SERVER_WRITE_ERROR, err_msg);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace codec
|
||||
} // namespace milvus
|
||||
|
|
|
@ -35,6 +35,9 @@ class DefaultDeletedDocsFormat : public DeletedDocsFormat {
|
|||
void
|
||||
write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) override;
|
||||
|
||||
void
|
||||
readSize(const storage::FSHandlerPtr& fs_ptr, size_t& size) override;
|
||||
|
||||
// No copy and move
|
||||
DefaultDeletedDocsFormat(const DefaultDeletedDocsFormat&) = delete;
|
||||
DefaultDeletedDocsFormat(DefaultDeletedDocsFormat&&) = delete;
|
||||
|
|
|
@ -688,7 +688,7 @@ DBImpl::Compact(const std::string& table_id) {
|
|||
OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_compact);
|
||||
|
||||
Status compact_status;
|
||||
for (meta::TableFilesSchema::iterator iter = files_to_compact.begin(); iter != files_to_compact.end();) {
|
||||
for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) {
|
||||
meta::TableFileSchema file = *iter;
|
||||
iter = files_to_compact.erase(iter);
|
||||
|
||||
|
@ -697,17 +697,15 @@ DBImpl::Compact(const std::string& table_id) {
|
|||
utils::GetParentPath(file.location_, segment_dir);
|
||||
|
||||
segment::SegmentReader segment_reader(segment_dir);
|
||||
segment::DeletedDocsPtr deleted_docs;
|
||||
status = segment_reader.LoadDeletedDocs(deleted_docs);
|
||||
size_t deleted_docs_size;
|
||||
status = segment_reader.ReadDeletedDocsSize(deleted_docs_size);
|
||||
if (!status.ok()) {
|
||||
std::string msg = "Failed to load deleted_docs from " + segment_dir;
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
|
||||
continue; // skip this file and try compact next one
|
||||
}
|
||||
|
||||
meta::TableFilesSchema files_to_update;
|
||||
if (deleted_docs->GetSize() != 0) {
|
||||
if (deleted_docs_size != 0) {
|
||||
compact_status = CompactFile(table_id, file, files_to_update);
|
||||
|
||||
if (!compact_status.ok()) {
|
||||
|
|
|
@ -119,5 +119,18 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentReader::ReadDeletedDocsSize(size_t& size) {
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
default_codec.GetDeletedDocsFormat()->readSize(fs_ptr_, size);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to read deleted docs size: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
return Status(DB_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace segment
|
||||
} // namespace milvus
|
||||
|
|
|
@ -54,6 +54,9 @@ class SegmentReader {
|
|||
Status
|
||||
GetSegment(SegmentPtr& segment_ptr);
|
||||
|
||||
Status
|
||||
ReadDeletedDocsSize(size_t& size);
|
||||
|
||||
private:
|
||||
storage::FSHandlerPtr fs_ptr_;
|
||||
SegmentPtr segment_ptr_;
|
||||
|
|
Loading…
Reference in New Issue