mirror of https://github.com/milvus-io/milvus.git
parent
e962a8ba31
commit
247f117096
|
@ -860,14 +860,13 @@ std::vector<SegOffset>
|
|||
SegmentSealedImpl::search_ids(const BitsetType& bitset,
|
||||
Timestamp timestamp) const {
|
||||
std::vector<SegOffset> dst_offset;
|
||||
for (int i = bitset.find_first(); i < bitset.size();
|
||||
i = bitset.find_next(i)) {
|
||||
if (i == BitsetType::npos) {
|
||||
for (int offset = bitset.find_first(); offset < bitset.size();
|
||||
offset = bitset.find_next(offset)) {
|
||||
if (offset == BitsetType::npos) {
|
||||
return dst_offset;
|
||||
}
|
||||
auto offset = SegOffset(i);
|
||||
if (insert_record_.timestamps_[offset.get()] <= timestamp) {
|
||||
dst_offset.push_back(offset);
|
||||
if (insert_record_.timestamps_[offset] <= timestamp) {
|
||||
dst_offset.emplace_back(offset);
|
||||
}
|
||||
}
|
||||
return dst_offset;
|
||||
|
@ -877,11 +876,10 @@ std::vector<SegOffset>
|
|||
SegmentSealedImpl::search_ids(const BitsetView& bitset,
|
||||
Timestamp timestamp) const {
|
||||
std::vector<SegOffset> dst_offset;
|
||||
for (int i = 0; i < bitset.size(); i++) {
|
||||
if (!bitset.test(i)) {
|
||||
auto offset = SegOffset(i);
|
||||
if (insert_record_.timestamps_[offset.get()] <= timestamp) {
|
||||
dst_offset.push_back(offset);
|
||||
for (int offset = 0; offset < bitset.size(); offset++) {
|
||||
if (!bitset.test(offset)) {
|
||||
if (insert_record_.timestamps_[offset] <= timestamp) {
|
||||
dst_offset.emplace_back(offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -896,7 +894,7 @@ SegmentSealedImpl::search_ids(const BitsetView& bitset,
|
|||
for (auto& offset : offsets) {
|
||||
if (!bitset.test(offset)) {
|
||||
if (insert_record_.timestamps_[offset] <= timestamp) {
|
||||
dst_offset.push_back(SegOffset(offset));
|
||||
dst_offset.emplace_back(offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ namespace milvus::storage {
|
|||
class DataCodec {
|
||||
public:
|
||||
explicit DataCodec(FieldDataPtr data, CodecType type)
|
||||
: field_data_(data), codec_type_(type) {
|
||||
: field_data_(std::move(data)), codec_type_(type) {
|
||||
}
|
||||
|
||||
virtual ~DataCodec() = default;
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include <algorithm>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
|
||||
#include "common/Common.h"
|
||||
#include "common/Slice.h"
|
||||
|
@ -63,9 +64,9 @@ using WriteLock = std::lock_guard<std::shared_mutex>;
|
|||
namespace milvus::storage {
|
||||
|
||||
DiskFileManagerImpl::DiskFileManagerImpl(const FieldDataMeta& field_meta,
|
||||
const IndexMeta& index_meta,
|
||||
IndexMeta index_meta,
|
||||
const StorageConfig& storage_config)
|
||||
: field_meta_(field_meta), index_meta_(index_meta) {
|
||||
: field_meta_(field_meta), index_meta_(std::move(index_meta)) {
|
||||
remote_root_path_ = storage_config.remote_root_path;
|
||||
rcm_ = std::make_unique<MinioChunkManager>(storage_config);
|
||||
}
|
||||
|
@ -86,7 +87,7 @@ EncodeAndUploadIndexSlice(RemoteChunkManager* remote_chunk_manager,
|
|||
const std::string& file,
|
||||
int64_t offset,
|
||||
int64_t batch_size,
|
||||
IndexMeta index_meta,
|
||||
const IndexMeta& index_meta,
|
||||
FieldDataMeta field_meta,
|
||||
std::string object_key) {
|
||||
auto& local_chunk_manager = LocalChunkManager::GetInstance();
|
||||
|
@ -104,7 +105,7 @@ EncodeAndUploadIndexSlice(RemoteChunkManager* remote_chunk_manager,
|
|||
auto serialized_index_size = serialized_index_data.size();
|
||||
remote_chunk_manager->Write(
|
||||
object_key, serialized_index_data.data(), serialized_index_size);
|
||||
return std::pair<std::string, size_t>(object_key, serialized_index_size);
|
||||
return std::make_pair(std::move(object_key), serialized_index_size);
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -191,7 +192,8 @@ DiskFileManagerImpl::AddBatchIndexFiles(
|
|||
}
|
||||
|
||||
void
|
||||
DiskFileManagerImpl::CacheIndexToDisk(std::vector<std::string> remote_files) {
|
||||
DiskFileManagerImpl::CacheIndexToDisk(
|
||||
const std::vector<std::string>& remote_files) {
|
||||
auto& local_chunk_manager = LocalChunkManager::GetInstance();
|
||||
|
||||
std::map<std::string, std::vector<int>> index_slices;
|
||||
|
@ -214,20 +216,19 @@ DiskFileManagerImpl::CacheIndexToDisk(std::vector<std::string> remote_files) {
|
|||
auto prefix = slices.first;
|
||||
auto local_index_file_name =
|
||||
GetLocalIndexObjectPrefix() +
|
||||
prefix.substr(prefix.find_last_of("/") + 1);
|
||||
prefix.substr(prefix.find_last_of('/') + 1);
|
||||
local_chunk_manager.CreateFile(local_index_file_name);
|
||||
int64_t offset = 0;
|
||||
std::vector<std::string> batch_remote_files;
|
||||
uint64_t max_parallel_degree = INT_MAX;
|
||||
for (auto iter = slices.second.begin(); iter != slices.second.end();
|
||||
iter++) {
|
||||
for (int& iter : slices.second) {
|
||||
if (batch_remote_files.size() == max_parallel_degree) {
|
||||
auto next_offset = CacheBatchIndexFilesToDisk(
|
||||
batch_remote_files, local_index_file_name, offset);
|
||||
offset = next_offset;
|
||||
batch_remote_files.clear();
|
||||
}
|
||||
auto origin_file = prefix + "_" + std::to_string(*iter);
|
||||
auto origin_file = prefix + "_" + std::to_string(iter);
|
||||
if (batch_remote_files.size() == 0) {
|
||||
// Use first file size as average size to estimate
|
||||
max_parallel_degree = EstimateParallelDegree(origin_file);
|
||||
|
@ -246,7 +247,7 @@ DiskFileManagerImpl::CacheIndexToDisk(std::vector<std::string> remote_files) {
|
|||
|
||||
std::unique_ptr<DataCodec>
|
||||
DownloadAndDecodeRemoteIndexfile(RemoteChunkManager* remote_chunk_manager,
|
||||
std::string file) {
|
||||
const std::string& file) {
|
||||
auto fileSize = remote_chunk_manager->Size(file);
|
||||
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[fileSize]);
|
||||
remote_chunk_manager->Read(file, buf.get(), fileSize);
|
||||
|
|
|
@ -34,7 +34,7 @@ namespace milvus::storage {
|
|||
class DiskFileManagerImpl : public FileManagerImpl {
|
||||
public:
|
||||
explicit DiskFileManagerImpl(const FieldDataMeta& field_meta,
|
||||
const IndexMeta& index_meta,
|
||||
IndexMeta index_meta,
|
||||
const StorageConfig& storage_config);
|
||||
|
||||
virtual ~DiskFileManagerImpl();
|
||||
|
@ -77,13 +77,14 @@ class DiskFileManagerImpl : public FileManagerImpl {
|
|||
}
|
||||
|
||||
std::string
|
||||
GenerateRemoteIndexFile(std::string file_name, int64_t slice_num) const {
|
||||
GenerateRemoteIndexFile(const std::string& file_name,
|
||||
int64_t slice_num) const {
|
||||
return GetRemoteIndexObjectPrefix() + "/" + file_name + "_" +
|
||||
std::to_string(slice_num);
|
||||
}
|
||||
|
||||
void
|
||||
CacheIndexToDisk(std::vector<std::string> remote_files);
|
||||
CacheIndexToDisk(const std::vector<std::string>& remote_files);
|
||||
|
||||
uint64_t
|
||||
CacheBatchIndexFilesToDisk(const std::vector<std::string>& remote_files,
|
||||
|
|
|
@ -34,8 +34,7 @@ struct EventHeader {
|
|||
int32_t event_length_;
|
||||
int32_t next_position_;
|
||||
|
||||
EventHeader() {
|
||||
}
|
||||
EventHeader() = default;
|
||||
explicit EventHeader(BinlogReaderPtr reader);
|
||||
|
||||
std::vector<uint8_t>
|
||||
|
@ -51,8 +50,7 @@ struct DescriptorEventDataFixPart {
|
|||
Timestamp end_timestamp;
|
||||
milvus::proto::schema::DataType data_type;
|
||||
|
||||
DescriptorEventDataFixPart() {
|
||||
}
|
||||
DescriptorEventDataFixPart() = default;
|
||||
explicit DescriptorEventDataFixPart(BinlogReaderPtr reader);
|
||||
|
||||
std::vector<uint8_t>
|
||||
|
@ -66,8 +64,7 @@ struct DescriptorEventData {
|
|||
std::unordered_map<std::string, std::string> extras;
|
||||
std::vector<uint8_t> post_header_lengths;
|
||||
|
||||
DescriptorEventData() {
|
||||
}
|
||||
DescriptorEventData() = default;
|
||||
explicit DescriptorEventData(BinlogReaderPtr reader);
|
||||
|
||||
std::vector<uint8_t>
|
||||
|
@ -79,8 +76,7 @@ struct BaseEventData {
|
|||
Timestamp end_timestamp;
|
||||
FieldDataPtr field_data;
|
||||
|
||||
BaseEventData() {
|
||||
}
|
||||
BaseEventData() = default;
|
||||
explicit BaseEventData(BinlogReaderPtr reader,
|
||||
int event_length,
|
||||
DataType data_type);
|
||||
|
@ -93,8 +89,7 @@ struct DescriptorEvent {
|
|||
EventHeader event_header;
|
||||
DescriptorEventData event_data;
|
||||
|
||||
DescriptorEvent() {
|
||||
}
|
||||
DescriptorEvent() = default;
|
||||
explicit DescriptorEvent(BinlogReaderPtr reader);
|
||||
|
||||
std::vector<uint8_t>
|
||||
|
@ -105,8 +100,7 @@ struct BaseEvent {
|
|||
EventHeader event_header;
|
||||
BaseEventData event_data;
|
||||
|
||||
BaseEvent() {
|
||||
}
|
||||
BaseEvent() = default;
|
||||
explicit BaseEvent(BinlogReaderPtr reader, DataType data_type);
|
||||
|
||||
std::vector<uint8_t>
|
||||
|
@ -149,8 +143,7 @@ struct LocalIndexEvent {
|
|||
uint32_t degree;
|
||||
FieldDataPtr field_data;
|
||||
|
||||
LocalIndexEvent() {
|
||||
}
|
||||
LocalIndexEvent() = default;
|
||||
explicit LocalIndexEvent(BinlogReaderPtr reader);
|
||||
|
||||
std::vector<uint8_t>
|
||||
|
|
|
@ -23,7 +23,7 @@ namespace milvus::storage {
|
|||
PayloadReader::PayloadReader(std::shared_ptr<PayloadInputStream> input,
|
||||
DataType data_type)
|
||||
: column_type_(data_type) {
|
||||
init(input);
|
||||
init(std::move(input));
|
||||
}
|
||||
|
||||
PayloadReader::PayloadReader(const uint8_t* data,
|
||||
|
|
Loading…
Reference in New Issue