diff --git a/CHANGELOG.md b/CHANGELOG.md index 48915da341..5b994e48ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ Please mark all change in change log and use the issue from GitHub - \#1504 Avoid possible race condition between delete and search - \#1510 Add set interfaces for WAL configurations - \#1511 Fix big integer cannot pass to server correctly +- \#1518 Table count did not match after deleting vectors and compact ## Feature - \#216 Add CLI to get server info diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 1e515f0739..8ec8017bb0 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -635,7 +635,6 @@ DBImpl::Flush() { Status DBImpl::Compact(const std::string& table_id) { - // TODO: WAL??? if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -657,31 +656,38 @@ DBImpl::Compact(const std::string& table_id) { } } + ENGINE_LOG_DEBUG << "Before compacting, wait for build index thread to finish..."; + + WaitBuildIndexFinish(); + + std::lock_guard index_lock(index_result_mutex_); + const std::lock_guard merge_lock(flush_merge_compact_mutex_); + ENGINE_LOG_DEBUG << "Compacting table: " << table_id; - const std::lock_guard lock(flush_merge_compact_mutex_); + /* + // Save table index + TableIndex table_index; + status = DescribeIndex(table_id, table_index); + if (!status.ok()) { + return status; + } - // Save table index - TableIndex table_index; - status = DescribeIndex(table_id, table_index); - if (!status.ok()) { - return status; - } - - // Drop all index - status = DropIndex(table_id); - if (!status.ok()) { - return status; - } - - // Then update table index to the previous index - status = UpdateTableIndexRecursively(table_id, table_index); - if (!status.ok()) { - return status; - } + // Drop all index + status = DropIndex(table_id); + if (!status.ok()) { + return status; + } + // Then update table index to the previous index + status = UpdateTableIndexRecursively(table_id, table_index); + if (!status.ok()) { + return status; + } + */ // Get files to compact from meta. - std::vector file_types{meta::TableFileSchema::FILE_TYPE::RAW, meta::TableFileSchema::FILE_TYPE::TO_INDEX}; + std::vector file_types{meta::TableFileSchema::FILE_TYPE::RAW, meta::TableFileSchema::FILE_TYPE::TO_INDEX, + meta::TableFileSchema::FILE_TYPE::BACKUP}; meta::TableFilesSchema files_to_compact; status = meta_ptr_->FilesByType(table_id, file_types, files_to_compact); if (!status.ok()) { @@ -697,11 +703,29 @@ DBImpl::Compact(const std::string& table_id) { meta::TableFilesSchema files_to_update; Status compact_status; for (auto& file : files_to_compact) { - compact_status = CompactFile(table_id, file, files_to_update); + // Check if the segment needs compacting + std::string segment_dir; + utils::GetParentPath(file.location_, segment_dir); - if (!compact_status.ok()) { - ENGINE_LOG_ERROR << "Compact failed for file " << file.file_id_ << ": " << compact_status.message(); - break; + segment::SegmentReader segment_reader(segment_dir); + segment::DeletedDocsPtr deleted_docs; + status = segment_reader.LoadDeletedDocs(deleted_docs); + if (!status.ok()) { + std::string msg = "Failed to load deleted_docs from " + segment_dir; + ENGINE_LOG_ERROR << msg; + return Status(DB_ERROR, msg); + } + + if (deleted_docs->GetSize() != 0) { + compact_status = CompactFile(table_id, file, files_to_update); + + if (!compact_status.ok()) { + ENGINE_LOG_ERROR << "Compact failed for segment " << file.segment_id_ << ": " + << compact_status.message(); + break; + } + } else { + ENGINE_LOG_ERROR << "Segment " << file.segment_id_ << " has no deleted data. No need to compact"; } } @@ -711,6 +735,7 @@ DBImpl::Compact(const std::string& table_id) { ENGINE_LOG_ERROR << "Updating meta after compaction..."; + /* // Drop index again, in case some files were in the index building process during compacting status = DropIndex(table_id); if (!status.ok()) { @@ -722,6 +747,7 @@ DBImpl::Compact(const std::string& table_id) { if (!status.ok()) { return status; } + */ status = meta_ptr_->UpdateTableFiles(files_to_update); if (!status.ok()) { @@ -753,7 +779,6 @@ DBImpl::CompactFile(const std::string& table_id, const meta::TableFileSchema& fi } // Compact (merge) file to the newly created table file - meta::TableFilesSchema updated; std::string new_segment_dir; utils::GetParentPath(compacted_file.location_, new_segment_dir); @@ -765,10 +790,6 @@ DBImpl::CompactFile(const std::string& table_id, const meta::TableFileSchema& fi ENGINE_LOG_DEBUG << "Compacting begin..."; segment_writer_ptr->Merge(segment_dir_to_merge, compacted_file.file_id_); - auto file_to_compact = file; - file_to_compact.file_type_ = meta::TableFileSchema::TO_DELETE; - updated.emplace_back(file_to_compact); - // Serialize ENGINE_LOG_DEBUG << "Serializing compacted segment..."; status = segment_writer_ptr->Serialize(); @@ -800,15 +821,23 @@ DBImpl::CompactFile(const std::string& table_id, const meta::TableFileSchema& fi compacted_file.file_type_ = meta::TableFileSchema::TO_DELETE; } - updated.emplace_back(compacted_file); + files_to_update.emplace_back(compacted_file); - for (auto& f : updated) { + // Set all files in segment to TO_DELETE + auto& segment_id = file.segment_id_; + meta::TableFilesSchema segment_files; + status = meta_ptr_->GetTableFilesBySegmentId(segment_id, segment_files); + if (!status.ok()) { + return status; + } + for (auto& f : segment_files) { + f.file_type_ = meta::TableFileSchema::FILE_TYPE::TO_DELETE; files_to_update.emplace_back(f); } ENGINE_LOG_DEBUG << "Compacted segment " << compacted_file.segment_id_ << " from " - << std::to_string(file_to_compact.file_size_) << " bytes to " - << std::to_string(compacted_file.file_size_) << " bytes"; + << std::to_string(file.file_size_) << " bytes to " << std::to_string(compacted_file.file_size_) + << " bytes"; if (options_.insert_cache_immediately_) { segment_writer_ptr->Cache(); @@ -1365,7 +1394,7 @@ DBImpl::StartMergeTask() { Status DBImpl::MergeFiles(const std::string& table_id, const meta::TableFilesSchema& files) { - const std::lock_guard lock(flush_merge_compact_mutex_); + // const std::lock_guard lock(flush_merge_compact_mutex_); ENGINE_LOG_DEBUG << "Merge files for table: " << table_id; @@ -1455,7 +1484,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::TableFilesSchema& fi Status DBImpl::BackgroundMergeFiles(const std::string& table_id) { - // const std::lock_guard lock(flush_merge_compact_mutex_); + const std::lock_guard lock(flush_merge_compact_mutex_); meta::TableFilesSchema raw_files; auto status = meta_ptr_->FilesToMerge(table_id, raw_files); diff --git a/core/src/db/insert/MemTable.cpp b/core/src/db/insert/MemTable.cpp index 7656683f56..1ca2a3e42d 100644 --- a/core/src/db/insert/MemTable.cpp +++ b/core/src/db/insert/MemTable.cpp @@ -337,8 +337,6 @@ MemTable::ApplyDeletes() { << " s"; // Update table file row count - start = std::chrono::high_resolution_clock::now(); - auto& segment_id = table_file.segment_id_; meta::TableFilesSchema segment_files; status = meta_->GetTableFilesBySegmentId(segment_id, segment_files); @@ -354,11 +352,7 @@ MemTable::ApplyDeletes() { } } - end = std::chrono::high_resolution_clock::now(); - diff = end - start; - status = meta_->UpdateTableFiles(table_files_to_update); - ENGINE_LOG_DEBUG << "Updated meta in table: " << table_id_ << " in " << diff.count() << " s"; if (!status.ok()) { std::string err_msg = "Failed to apply deletes: " + status.ToString(); diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index f7e7c1af3b..c0983a89be 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -47,7 +47,7 @@ SegmentReader::Load() { default_codec.GetVectorsFormat()->read(directory_ptr_, segment_ptr_->vectors_ptr_); default_codec.GetDeletedDocsFormat()->read(directory_ptr_, segment_ptr_->deleted_docs_ptr_); } catch (std::exception& e) { - return Status(SERVER_WRITE_ERROR, e.what()); + return Status(DB_ERROR, e.what()); } return Status::OK(); } @@ -59,9 +59,9 @@ SegmentReader::LoadVectors(off_t offset, size_t num_bytes, std::vector& directory_ptr_->Create(); default_codec.GetVectorsFormat()->read_vectors(directory_ptr_, offset, num_bytes, raw_vectors); } catch (std::exception& e) { - std::string err_msg = "Failed to load raw vectors. " + std::string(e.what()); + std::string err_msg = "Failed to load raw vectors: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(SERVER_WRITE_ERROR, err_msg); + return Status(DB_ERROR, err_msg); } return Status::OK(); } @@ -73,9 +73,9 @@ SegmentReader::LoadUids(std::vector& uids) { directory_ptr_->Create(); default_codec.GetVectorsFormat()->read_uids(directory_ptr_, uids); } catch (std::exception& e) { - std::string err_msg = "Failed to load uids. " + std::string(e.what()); + std::string err_msg = "Failed to load uids: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(SERVER_WRITE_ERROR, err_msg); + return Status(DB_ERROR, err_msg); } return Status::OK(); } @@ -93,9 +93,9 @@ SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) { directory_ptr_->Create(); default_codec.GetIdBloomFilterFormat()->read(directory_ptr_, id_bloom_filter_ptr); } catch (std::exception& e) { - std::string err_msg = "Failed to load bloom filter. " + std::string(e.what()); + std::string err_msg = "Failed to load bloom filter: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(SERVER_WRITE_ERROR, err_msg); + return Status(DB_ERROR, err_msg); } return Status::OK(); } @@ -107,9 +107,9 @@ SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) { directory_ptr_->Create(); default_codec.GetDeletedDocsFormat()->read(directory_ptr_, deleted_docs_ptr); } catch (std::exception& e) { - std::string err_msg = "Failed to load deleted docs. " + std::string(e.what()); + std::string err_msg = "Failed to load deleted docs: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; - return Status(SERVER_WRITE_ERROR, err_msg); + return Status(DB_ERROR, err_msg); } return Status::OK(); } diff --git a/core/unittest/db/test_delete.cpp b/core/unittest/db/test_delete.cpp index 806a82e936..26441de971 100644 --- a/core/unittest/db/test_delete.cpp +++ b/core/unittest/db/test_delete.cpp @@ -634,6 +634,10 @@ TEST_F(CompactTest, compact_with_index) { stat = db_->Compact(GetTableName()); ASSERT_TRUE(stat.ok()); + stat = db_->GetTableRowCount(GetTableName(), row_count); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(row_count, nb - ids_to_delete.size()); + milvus::engine::TableIndex table_index; stat = db_->DescribeIndex(GetTableName(), table_index); ASSERT_TRUE(stat.ok());