mirror of https://github.com/milvus-io/milvus.git
Update compact (#1522)
* update Signed-off-by: youny626 <zzhu@fandm.edu> * update Signed-off-by: youny626 <zzhu@fandm.edu> * update Signed-off-by: Zhiru Zhu <zzhu@fandm.edu> * update Signed-off-by: Zhiru Zhu <zzhu@fandm.edu> * [skip ci] update CHANGELOG Signed-off-by: Zhiru Zhu <zzhu@fandm.edu> * update Signed-off-by: Zhiru Zhu <zzhu@fandm.edu> * update Signed-off-by: Zhiru Zhu <zzhu@fandm.edu> * update Signed-off-by: Zhiru Zhu <zzhu@fandm.edu> * update Signed-off-by: Zhiru Zhu <zzhu@fandm.edu>pull/1526/head^2
parent
19b8565748
commit
b43e0fbada
|
@ -35,6 +35,7 @@ Please mark all change in change log and use the issue from GitHub
|
|||
- \#1504 Avoid possible race condition between delete and search
|
||||
- \#1510 Add set interfaces for WAL configurations
|
||||
- \#1511 Fix big integer cannot pass to server correctly
|
||||
- \#1518 Table count did not match after deleting vectors and compact
|
||||
|
||||
## Feature
|
||||
- \#216 Add CLI to get server info
|
||||
|
|
|
@ -635,7 +635,6 @@ DBImpl::Flush() {
|
|||
|
||||
Status
|
||||
DBImpl::Compact(const std::string& table_id) {
|
||||
// TODO: WAL???
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
@ -657,10 +656,16 @@ DBImpl::Compact(const std::string& table_id) {
|
|||
}
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Before compacting, wait for build index thread to finish...";
|
||||
|
||||
WaitBuildIndexFinish();
|
||||
|
||||
std::lock_guard<std::mutex> index_lock(index_result_mutex_);
|
||||
const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);
|
||||
|
||||
ENGINE_LOG_DEBUG << "Compacting table: " << table_id;
|
||||
|
||||
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
|
||||
|
||||
/*
|
||||
// Save table index
|
||||
TableIndex table_index;
|
||||
status = DescribeIndex(table_id, table_index);
|
||||
|
@ -679,9 +684,10 @@ DBImpl::Compact(const std::string& table_id) {
|
|||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
*/
|
||||
// Get files to compact from meta.
|
||||
std::vector<int> file_types{meta::TableFileSchema::FILE_TYPE::RAW, meta::TableFileSchema::FILE_TYPE::TO_INDEX};
|
||||
std::vector<int> file_types{meta::TableFileSchema::FILE_TYPE::RAW, meta::TableFileSchema::FILE_TYPE::TO_INDEX,
|
||||
meta::TableFileSchema::FILE_TYPE::BACKUP};
|
||||
meta::TableFilesSchema files_to_compact;
|
||||
status = meta_ptr_->FilesByType(table_id, file_types, files_to_compact);
|
||||
if (!status.ok()) {
|
||||
|
@ -697,12 +703,30 @@ DBImpl::Compact(const std::string& table_id) {
|
|||
meta::TableFilesSchema files_to_update;
|
||||
Status compact_status;
|
||||
for (auto& file : files_to_compact) {
|
||||
// Check if the segment needs compacting
|
||||
std::string segment_dir;
|
||||
utils::GetParentPath(file.location_, segment_dir);
|
||||
|
||||
segment::SegmentReader segment_reader(segment_dir);
|
||||
segment::DeletedDocsPtr deleted_docs;
|
||||
status = segment_reader.LoadDeletedDocs(deleted_docs);
|
||||
if (!status.ok()) {
|
||||
std::string msg = "Failed to load deleted_docs from " + segment_dir;
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
return Status(DB_ERROR, msg);
|
||||
}
|
||||
|
||||
if (deleted_docs->GetSize() != 0) {
|
||||
compact_status = CompactFile(table_id, file, files_to_update);
|
||||
|
||||
if (!compact_status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Compact failed for file " << file.file_id_ << ": " << compact_status.message();
|
||||
ENGINE_LOG_ERROR << "Compact failed for segment " << file.segment_id_ << ": "
|
||||
<< compact_status.message();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
ENGINE_LOG_ERROR << "Segment " << file.segment_id_ << " has no deleted data. No need to compact";
|
||||
}
|
||||
}
|
||||
|
||||
if (compact_status.ok()) {
|
||||
|
@ -711,6 +735,7 @@ DBImpl::Compact(const std::string& table_id) {
|
|||
|
||||
ENGINE_LOG_ERROR << "Updating meta after compaction...";
|
||||
|
||||
/*
|
||||
// Drop index again, in case some files were in the index building process during compacting
|
||||
status = DropIndex(table_id);
|
||||
if (!status.ok()) {
|
||||
|
@ -722,6 +747,7 @@ DBImpl::Compact(const std::string& table_id) {
|
|||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
*/
|
||||
|
||||
status = meta_ptr_->UpdateTableFiles(files_to_update);
|
||||
if (!status.ok()) {
|
||||
|
@ -753,7 +779,6 @@ DBImpl::CompactFile(const std::string& table_id, const meta::TableFileSchema& fi
|
|||
}
|
||||
|
||||
// Compact (merge) file to the newly created table file
|
||||
meta::TableFilesSchema updated;
|
||||
|
||||
std::string new_segment_dir;
|
||||
utils::GetParentPath(compacted_file.location_, new_segment_dir);
|
||||
|
@ -765,10 +790,6 @@ DBImpl::CompactFile(const std::string& table_id, const meta::TableFileSchema& fi
|
|||
ENGINE_LOG_DEBUG << "Compacting begin...";
|
||||
segment_writer_ptr->Merge(segment_dir_to_merge, compacted_file.file_id_);
|
||||
|
||||
auto file_to_compact = file;
|
||||
file_to_compact.file_type_ = meta::TableFileSchema::TO_DELETE;
|
||||
updated.emplace_back(file_to_compact);
|
||||
|
||||
// Serialize
|
||||
ENGINE_LOG_DEBUG << "Serializing compacted segment...";
|
||||
status = segment_writer_ptr->Serialize();
|
||||
|
@ -800,15 +821,23 @@ DBImpl::CompactFile(const std::string& table_id, const meta::TableFileSchema& fi
|
|||
compacted_file.file_type_ = meta::TableFileSchema::TO_DELETE;
|
||||
}
|
||||
|
||||
updated.emplace_back(compacted_file);
|
||||
files_to_update.emplace_back(compacted_file);
|
||||
|
||||
for (auto& f : updated) {
|
||||
// Set all files in segment to TO_DELETE
|
||||
auto& segment_id = file.segment_id_;
|
||||
meta::TableFilesSchema segment_files;
|
||||
status = meta_ptr_->GetTableFilesBySegmentId(segment_id, segment_files);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
for (auto& f : segment_files) {
|
||||
f.file_type_ = meta::TableFileSchema::FILE_TYPE::TO_DELETE;
|
||||
files_to_update.emplace_back(f);
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Compacted segment " << compacted_file.segment_id_ << " from "
|
||||
<< std::to_string(file_to_compact.file_size_) << " bytes to "
|
||||
<< std::to_string(compacted_file.file_size_) << " bytes";
|
||||
<< std::to_string(file.file_size_) << " bytes to " << std::to_string(compacted_file.file_size_)
|
||||
<< " bytes";
|
||||
|
||||
if (options_.insert_cache_immediately_) {
|
||||
segment_writer_ptr->Cache();
|
||||
|
@ -1365,7 +1394,7 @@ DBImpl::StartMergeTask() {
|
|||
|
||||
Status
|
||||
DBImpl::MergeFiles(const std::string& table_id, const meta::TableFilesSchema& files) {
|
||||
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
|
||||
// const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
|
||||
|
||||
ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
|
||||
|
||||
|
@ -1455,7 +1484,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::TableFilesSchema& fi
|
|||
|
||||
Status
|
||||
DBImpl::BackgroundMergeFiles(const std::string& table_id) {
|
||||
// const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
|
||||
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
|
||||
|
||||
meta::TableFilesSchema raw_files;
|
||||
auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
|
||||
|
|
|
@ -337,8 +337,6 @@ MemTable::ApplyDeletes() {
|
|||
<< " s";
|
||||
|
||||
// Update table file row count
|
||||
start = std::chrono::high_resolution_clock::now();
|
||||
|
||||
auto& segment_id = table_file.segment_id_;
|
||||
meta::TableFilesSchema segment_files;
|
||||
status = meta_->GetTableFilesBySegmentId(segment_id, segment_files);
|
||||
|
@ -354,11 +352,7 @@ MemTable::ApplyDeletes() {
|
|||
}
|
||||
}
|
||||
|
||||
end = std::chrono::high_resolution_clock::now();
|
||||
diff = end - start;
|
||||
|
||||
status = meta_->UpdateTableFiles(table_files_to_update);
|
||||
ENGINE_LOG_DEBUG << "Updated meta in table: " << table_id_ << " in " << diff.count() << " s";
|
||||
|
||||
if (!status.ok()) {
|
||||
std::string err_msg = "Failed to apply deletes: " + status.ToString();
|
||||
|
|
|
@ -47,7 +47,7 @@ SegmentReader::Load() {
|
|||
default_codec.GetVectorsFormat()->read(directory_ptr_, segment_ptr_->vectors_ptr_);
|
||||
default_codec.GetDeletedDocsFormat()->read(directory_ptr_, segment_ptr_->deleted_docs_ptr_);
|
||||
} catch (std::exception& e) {
|
||||
return Status(SERVER_WRITE_ERROR, e.what());
|
||||
return Status(DB_ERROR, e.what());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -59,9 +59,9 @@ SegmentReader::LoadVectors(off_t offset, size_t num_bytes, std::vector<uint8_t>&
|
|||
directory_ptr_->Create();
|
||||
default_codec.GetVectorsFormat()->read_vectors(directory_ptr_, offset, num_bytes, raw_vectors);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to load raw vectors. " + std::string(e.what());
|
||||
std::string err_msg = "Failed to load raw vectors: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
return Status(DB_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -73,9 +73,9 @@ SegmentReader::LoadUids(std::vector<doc_id_t>& uids) {
|
|||
directory_ptr_->Create();
|
||||
default_codec.GetVectorsFormat()->read_uids(directory_ptr_, uids);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to load uids. " + std::string(e.what());
|
||||
std::string err_msg = "Failed to load uids: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
return Status(DB_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -93,9 +93,9 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
|
|||
directory_ptr_->Create();
|
||||
default_codec.GetIdBloomFilterFormat()->read(directory_ptr_, id_bloom_filter_ptr);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to load bloom filter. " + std::string(e.what());
|
||||
std::string err_msg = "Failed to load bloom filter: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
return Status(DB_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -107,9 +107,9 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
|
|||
directory_ptr_->Create();
|
||||
default_codec.GetDeletedDocsFormat()->read(directory_ptr_, deleted_docs_ptr);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to load deleted docs. " + std::string(e.what());
|
||||
std::string err_msg = "Failed to load deleted docs: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
return Status(DB_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
|
@ -634,6 +634,10 @@ TEST_F(CompactTest, compact_with_index) {
|
|||
stat = db_->Compact(GetTableName());
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
stat = db_->GetTableRowCount(GetTableName(), row_count);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(row_count, nb - ids_to_delete.size());
|
||||
|
||||
milvus::engine::TableIndex table_index;
|
||||
stat = db_->DescribeIndex(GetTableName(), table_index);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
|
Loading…
Reference in New Issue