diff --git a/core/src/db/SSDBImpl.cpp b/core/src/db/SSDBImpl.cpp index 87d0c86ffa..5566244b31 100644 --- a/core/src/db/SSDBImpl.cpp +++ b/core/src/db/SSDBImpl.cpp @@ -12,6 +12,7 @@ #include "db/SSDBImpl.h" #include "cache/CpuCacheMgr.h" #include "db/IDGenerator.h" +#include "db/merge/MergeManagerFactory.h" #include "db/snapshot/CompoundOperations.h" #include "db/snapshot/ResourceHelper.h" #include "db/snapshot/ResourceTypes.h" @@ -51,6 +52,7 @@ static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown SSDBImpl::SSDBImpl(const DBOptions& options) : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) { mem_mgr_ = MemManagerFactory::SSBuild(options_); + merge_mgr_ptr_ = MergeManagerFactory::SSBuild(options_); if (options_.wal_enable_) { wal::MXLogConfiguration mxlog_config; diff --git a/core/src/db/merge/MergeManagerFactory.cpp b/core/src/db/merge/MergeManagerFactory.cpp index 4f15281e16..aa3186c999 100644 --- a/core/src/db/merge/MergeManagerFactory.cpp +++ b/core/src/db/merge/MergeManagerFactory.cpp @@ -11,6 +11,7 @@ #include "db/merge/MergeManagerFactory.h" #include "db/merge/MergeManagerImpl.h" +#include "db/merge/SSMergeManagerImpl.h" #include "utils/Exception.h" #include "utils/Log.h" @@ -22,5 +23,10 @@ MergeManagerFactory::Build(const meta::MetaPtr& meta_ptr, const DBOptions& optio return std::make_shared(meta_ptr, options, MergeStrategyType::LAYERED); } +MergeManagerPtr +MergeManagerFactory::SSBuild(const DBOptions& options) { + return std::make_shared(options, MergeStrategyType::SIMPLE); +} + } // namespace engine } // namespace milvus diff --git a/core/src/db/merge/MergeManagerFactory.h b/core/src/db/merge/MergeManagerFactory.h index 533a321161..b7a072aa14 100644 --- a/core/src/db/merge/MergeManagerFactory.h +++ b/core/src/db/merge/MergeManagerFactory.h @@ -23,6 +23,9 @@ class MergeManagerFactory { public: static MergeManagerPtr Build(const meta::MetaPtr& meta_ptr, const DBOptions& options); + + static MergeManagerPtr + SSBuild(const DBOptions& options); }; } // namespace engine diff --git a/core/src/db/merge/SSMergeManagerImpl.cpp b/core/src/db/merge/SSMergeManagerImpl.cpp new file mode 100644 index 0000000000..f46a70761f --- /dev/null +++ b/core/src/db/merge/SSMergeManagerImpl.cpp @@ -0,0 +1,98 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/merge/SSMergeManagerImpl.h" +#include "db/merge/SSMergeSimpleStrategy.h" +#include "db/merge/SSMergeTask.h" +#include "db/snapshot/Snapshots.h" +#include "utils/Exception.h" +#include "utils/Log.h" + +#include + +namespace milvus { +namespace engine { + +SSMergeManagerImpl::SSMergeManagerImpl(const DBOptions& options, MergeStrategyType type) + : options_(options), strategy_type_(type) { + UseStrategy(type); +} + +Status +SSMergeManagerImpl::UseStrategy(MergeStrategyType type) { + switch (type) { + case MergeStrategyType::SIMPLE: { + strategy_ = std::make_shared(); + break; + } + case MergeStrategyType::LAYERED: + case MergeStrategyType::ADAPTIVE: + default: { + std::string msg = "Unsupported merge strategy type: " + std::to_string((int32_t)type); + LOG_ENGINE_ERROR_ << msg; + throw Exception(DB_ERROR, msg); + } + } + strategy_type_ = type; + + return Status::OK(); +} + +Status +SSMergeManagerImpl::MergeFiles(const std::string& collection_name) { + if (strategy_ == nullptr) { + std::string msg = "No merge strategy specified"; + LOG_ENGINE_ERROR_ << msg; + return Status(DB_ERROR, msg); + } + + int64_t row_count_per_segment = DEFAULT_ROW_COUNT_PER_SEGMENT; + while (true) { + snapshot::ScopedSnapshotT latest_ss; + STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name)); + + Partition2SegmentsMap part2seg; + auto& segments = latest_ss->GetResources(); + for (auto& kv : segments) { + auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(kv.second->GetID()); + part2seg[kv.second->GetPartitionId()].push_back(kv.second->GetID()); + } + + Partition2SegmentsMap::iterator it; + for (it = part2seg.begin(); it != part2seg.end();) { + if (it->second.size() <= 1) { + part2seg.erase(it++); + } + } + + if (part2seg.empty()) { + break; + } + + SegmentGroups segment_groups; + auto status = strategy_->RegroupSegments(latest_ss, part2seg, segment_groups); + if (!status.ok()) { + LOG_ENGINE_ERROR_ << "Failed to regroup segments for: " << collection_name + << ", continue to merge all files into one"; + return status; + } + + for (auto& segments : segment_groups) { + SSMergeTask task(options_, latest_ss, segments); + task.Execute(); + } + } + + return Status::OK(); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/SSMergeManagerImpl.h b/core/src/db/merge/SSMergeManagerImpl.h new file mode 100644 index 0000000000..bd900b044f --- /dev/null +++ b/core/src/db/merge/SSMergeManagerImpl.h @@ -0,0 +1,53 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "db/merge/MergeManager.h" +#include "db/merge/SSMergeStrategy.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { + +class SSMergeManagerImpl : public MergeManager { + public: + SSMergeManagerImpl(const DBOptions& options, MergeStrategyType type); + + MergeStrategyType + Strategy() const override { + return strategy_type_; + } + + Status + UseStrategy(MergeStrategyType type) override; + + Status + MergeFiles(const std::string& collection_name) override; + + private: + DBOptions options_; + + MergeStrategyType strategy_type_ = MergeStrategyType::SIMPLE; + SSMergeStrategyPtr strategy_; +}; // MergeManagerImpl + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/SSMergeSimpleStrategy.cpp b/core/src/db/merge/SSMergeSimpleStrategy.cpp new file mode 100644 index 0000000000..ff8f97f711 --- /dev/null +++ b/core/src/db/merge/SSMergeSimpleStrategy.cpp @@ -0,0 +1,58 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/merge/SSMergeSimpleStrategy.h" +#include "db/snapshot/Snapshots.h" +#include "utils/Log.h" + +namespace milvus { +namespace engine { + +const char* ROW_COUNT_PER_SEGMENT = "row_count_per_segment"; + +Status +SSMergeSimpleStrategy::RegroupSegments(const snapshot::ScopedSnapshotT& ss, const Partition2SegmentsMap& part2segment, + SegmentGroups& groups) { + auto collection = ss->GetCollection(); + + int64_t row_count_per_segment = DEFAULT_ROW_COUNT_PER_SEGMENT; + const json params = collection->GetParams(); + if (params.find(ROW_COUNT_PER_SEGMENT) != params.end()) { + row_count_per_segment = params[ROW_COUNT_PER_SEGMENT]; + } + + for (auto& kv : part2segment) { + snapshot::IDS_TYPE ids; + int64_t row_count_sum = 0; + for (auto& id : kv.second) { + auto segment_commit = ss->GetSegmentCommitBySegmentId(id); + if (segment_commit == nullptr) { + continue; // maybe stale + } + + ids.push_back(id); + row_count_sum += segment_commit->GetRowCount(); + if (row_count_sum >= row_count_per_segment) { + if (ids.size() >= 2) { + groups.push_back(ids); + } + ids.clear(); + row_count_sum = 0; + continue; + } + } + } + + return Status::OK(); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/SSMergeSimpleStrategy.h b/core/src/db/merge/SSMergeSimpleStrategy.h new file mode 100644 index 0000000000..217e46e44f --- /dev/null +++ b/core/src/db/merge/SSMergeSimpleStrategy.h @@ -0,0 +1,31 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include + +#include "db/merge/SSMergeStrategy.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { + +class SSMergeSimpleStrategy : public SSMergeStrategy { + public: + Status + RegroupSegments(const snapshot::ScopedSnapshotT& ss, const Partition2SegmentsMap& part2segment, + SegmentGroups& groups) override; +}; // MergeSimpleStrategy + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/SSMergeStrategy.h b/core/src/db/merge/SSMergeStrategy.h new file mode 100644 index 0000000000..badadf7b73 --- /dev/null +++ b/core/src/db/merge/SSMergeStrategy.h @@ -0,0 +1,43 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include "db/Types.h" +#include "db/snapshot/ResourceTypes.h" +#include "db/snapshot/Snapshot.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { + +const int64_t DEFAULT_ROW_COUNT_PER_SEGMENT = 500000; + +using Partition2SegmentsMap = std::map; +using SegmentGroups = std::vector; + +class SSMergeStrategy { + public: + virtual Status + RegroupSegments(const snapshot::ScopedSnapshotT& ss, const Partition2SegmentsMap& part2segment, + SegmentGroups& groups) = 0; +}; // MergeStrategy + +using SSMergeStrategyPtr = std::shared_ptr; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/SSMergeTask.cpp b/core/src/db/merge/SSMergeTask.cpp new file mode 100644 index 0000000000..6397fb7897 --- /dev/null +++ b/core/src/db/merge/SSMergeTask.cpp @@ -0,0 +1,76 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/merge/SSMergeTask.h" +#include "db/Utils.h" +#include "db/snapshot/CompoundOperations.h" +#include "db/snapshot/Operations.h" +#include "db/snapshot/Snapshots.h" +#include "metrics/Metrics.h" +#include "segment/SegmentReader.h" +#include "segment/SegmentWriter.h" +#include "utils/Log.h" + +#include +#include + +namespace milvus { +namespace engine { + +SSMergeTask::SSMergeTask(const DBOptions& options, const snapshot::ScopedSnapshotT& ss, + const snapshot::IDS_TYPE& segments) + : options_(options), snapshot_(ss), segments_(segments) { +} + +Status +SSMergeTask::Execute() { + if (segments_.size() <= 1) { + return Status::OK(); + } + + snapshot::OperationContext context; + for (auto& id : segments_) { + auto seg = snapshot_->GetResource(id); + if (!seg) { + return Status(DB_ERROR, "snapshot segment is null"); + } + + context.stale_segments.push_back(seg); + if (!context.prev_partition) { + snapshot::PartitionPtr partition = snapshot_->GetResource(seg->GetPartitionId()); + context.prev_partition = partition; + } + } + + auto op = std::make_shared(context, snapshot_); + snapshot::SegmentPtr new_seg; + auto status = op->CommitNewSegment(new_seg); + if (!status.ok()) { + return status; + } + + // TODO: merge each field, each field create a new SegmentFile + snapshot::SegmentFileContext sf_context; + sf_context.field_name = "vector"; + sf_context.field_element_name = "ivfsq8"; + sf_context.segment_id = 1; + sf_context.partition_id = 1; + sf_context.segment_id = new_seg->GetID(); + snapshot::SegmentFilePtr seg_file; + status = op->CommitNewSegmentFile(sf_context, seg_file); + + status = op->Push(); + + return status; +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/SSMergeTask.h b/core/src/db/merge/SSMergeTask.h new file mode 100644 index 0000000000..bfb22214cc --- /dev/null +++ b/core/src/db/merge/SSMergeTask.h @@ -0,0 +1,39 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include + +#include "db/merge/MergeManager.h" +#include "db/meta/MetaTypes.h" +#include "db/snapshot/ResourceTypes.h" +#include "db/snapshot/Snapshot.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { + +class SSMergeTask { + public: + SSMergeTask(const DBOptions& options, const snapshot::ScopedSnapshotT& ss, const snapshot::IDS_TYPE& segments); + + Status + Execute(); + + private: + DBOptions options_; + snapshot::ScopedSnapshotT snapshot_; + snapshot::IDS_TYPE segments_; +}; // SSMergeTask + +} // namespace engine +} // namespace milvus