mirror of https://github.com/milvus-io/milvus.git
redefine SegmentWriter (#2853)
* redefine SegmentWriter Signed-off-by: yhmo <yihua.mo@zilliz.com> * redefine SegmentWriter Signed-off-by: yhmo <yihua.mo@zilliz.com> * refine code Signed-off-by: yhmo <yihua.mo@zilliz.com>pull/2854/head
parent
a8033a5954
commit
7bd60398a2
|
@ -0,0 +1,67 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "codecs/snapshot/SSBlockFormat.h"
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include "utils/Exception.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace codec {
|
||||
|
||||
void
|
||||
SSBlockFormat::read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, std::vector<uint8_t>& raw) {
|
||||
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
|
||||
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
throw Exception(SERVER_CANNOT_OPEN_FILE, err_msg);
|
||||
}
|
||||
|
||||
size_t num_bytes;
|
||||
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));
|
||||
|
||||
raw.resize(num_bytes);
|
||||
fs_ptr->reader_ptr_->read(raw.data(), num_bytes);
|
||||
|
||||
fs_ptr->reader_ptr_->close();
|
||||
}
|
||||
|
||||
void
|
||||
SSBlockFormat::write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
|
||||
const std::vector<uint8_t>& raw) {
|
||||
if (!fs_ptr->writer_ptr_->open(file_path.c_str())) {
|
||||
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
|
||||
}
|
||||
|
||||
size_t num_bytes = raw.size();
|
||||
fs_ptr->writer_ptr_->write(&num_bytes, sizeof(size_t));
|
||||
fs_ptr->writer_ptr_->write((void*)raw.data(), num_bytes);
|
||||
fs_ptr->writer_ptr_->close();
|
||||
}
|
||||
|
||||
} // namespace codec
|
||||
} // namespace milvus
|
|
@ -0,0 +1,53 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "knowhere/common/BinarySet.h"
|
||||
#include "storage/FSHandler.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace codec {
|
||||
|
||||
class SSBlockFormat {
|
||||
public:
|
||||
SSBlockFormat() = default;
|
||||
|
||||
void
|
||||
read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, std::vector<uint8_t>& raw);
|
||||
|
||||
void
|
||||
write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::vector<uint8_t>& raw);
|
||||
|
||||
// No copy and move
|
||||
SSBlockFormat(const SSBlockFormat&) = delete;
|
||||
SSBlockFormat(SSBlockFormat&&) = delete;
|
||||
|
||||
SSBlockFormat&
|
||||
operator=(const SSBlockFormat&) = delete;
|
||||
SSBlockFormat&
|
||||
operator=(SSBlockFormat&&) = delete;
|
||||
};
|
||||
|
||||
using SSBlockFormatPtr = std::shared_ptr<SSBlockFormat>;
|
||||
|
||||
} // namespace codec
|
||||
} // namespace milvus
|
|
@ -36,6 +36,7 @@ SSCodec::instance() {
|
|||
}
|
||||
|
||||
SSCodec::SSCodec() {
|
||||
block_format_ptr_ = std::make_shared<SSBlockFormat>();
|
||||
vectors_format_ptr_ = std::make_shared<SSVectorsFormat>();
|
||||
attrs_format_ptr_ = std::make_shared<SSAttrsFormat>();
|
||||
vector_index_format_ptr_ = std::make_shared<SSVectorIndexFormat>();
|
||||
|
@ -45,6 +46,11 @@ SSCodec::SSCodec() {
|
|||
vector_compress_format_ptr_ = std::make_shared<SSVectorCompressFormat>();
|
||||
}
|
||||
|
||||
SSBlockFormatPtr
|
||||
SSCodec::GetBlockFormat() {
|
||||
return block_format_ptr_;
|
||||
}
|
||||
|
||||
SSVectorsFormatPtr
|
||||
SSCodec::GetVectorsFormat() {
|
||||
return vectors_format_ptr_;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
#include "codecs/snapshot/SSAttrsFormat.h"
|
||||
#include "codecs/snapshot/SSAttrsIndexFormat.h"
|
||||
#include "codecs/snapshot/SSBlockFormat.h"
|
||||
#include "codecs/snapshot/SSDeletedDocsFormat.h"
|
||||
#include "codecs/snapshot/SSIdBloomFilterFormat.h"
|
||||
#include "codecs/snapshot/SSVectorCompressFormat.h"
|
||||
|
@ -33,6 +34,9 @@ class SSCodec {
|
|||
static SSCodec&
|
||||
instance();
|
||||
|
||||
SSBlockFormatPtr
|
||||
GetBlockFormat();
|
||||
|
||||
SSVectorsFormatPtr
|
||||
GetVectorsFormat();
|
||||
|
||||
|
@ -58,6 +62,7 @@ class SSCodec {
|
|||
SSCodec();
|
||||
|
||||
private:
|
||||
SSBlockFormatPtr block_format_ptr_;
|
||||
SSVectorsFormatPtr vectors_format_ptr_;
|
||||
SSAttrsFormatPtr attrs_format_ptr_;
|
||||
SSVectorIndexFormatPtr vector_index_format_ptr_;
|
||||
|
|
|
@ -1059,14 +1059,15 @@ SSDBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
|
|||
// construct chunk data
|
||||
DataChunkPtr chunk = std::make_shared<DataChunk>();
|
||||
chunk->count_ = record.length;
|
||||
chunk->fields_data_ = record.attr_data;
|
||||
chunk->fixed_fields_ = record.attr_data;
|
||||
std::vector<uint8_t> uid_data;
|
||||
uid_data.resize(record.length * sizeof(int64_t));
|
||||
memcpy(uid_data.data(), record.ids, record.length * sizeof(int64_t));
|
||||
chunk->fixed_fields_.insert(std::make_pair(engine::DEFAULT_UID_NAME, uid_data));
|
||||
std::vector<uint8_t> vector_data;
|
||||
vector_data.resize(record.data_size);
|
||||
memcpy(vector_data.data(), record.data, record.data_size);
|
||||
chunk->fields_data_.insert(std::make_pair(VECTOR_FIELD, vector_data));
|
||||
chunk->fixed_fields_.insert(std::make_pair(VECTOR_FIELD, vector_data));
|
||||
|
||||
status = mem_mgr_->InsertEntities(collection_id, partition_id, chunk, record.lsn);
|
||||
force_flush_if_mem_full();
|
||||
|
|
|
@ -18,22 +18,14 @@
|
|||
#include <vector>
|
||||
|
||||
#include "db/Types.h"
|
||||
#include "segment/Segment.h"
|
||||
#include "utils/Status.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
extern const char* ENTITY_ID_FIELD;
|
||||
extern const char* VECTOR_DIMENSION_PARAM;
|
||||
extern const char* VECTOR_FIELD;
|
||||
|
||||
struct DataChunk {
|
||||
uint64_t count_ = 0;
|
||||
std::unordered_map<std::string, std::vector<uint8_t>> fields_data_;
|
||||
};
|
||||
|
||||
using DataChunkPtr = std::shared_ptr<DataChunk>;
|
||||
|
||||
class SSMemManager {
|
||||
public:
|
||||
virtual Status
|
||||
|
|
|
@ -22,9 +22,7 @@
|
|||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
const char* ENTITY_ID_FIELD = "id"; // hard code
|
||||
const char* VECTOR_DIMENSION_PARAM = "dimension"; // hard code
|
||||
const char* VECTOR_FIELD = "vector"; // hard code
|
||||
const char* VECTOR_FIELD = "vector"; // hard code
|
||||
|
||||
SSMemCollectionPtr
|
||||
SSMemManagerImpl::GetMemByTable(int64_t collection_id, int64_t partition_id) {
|
||||
|
@ -81,8 +79,8 @@ SSMemManagerImpl::ValidateChunk(int64_t collection_id, int64_t partition_id, con
|
|||
|
||||
std::vector<std::string> field_names = ss->GetFieldNames();
|
||||
for (auto& name : field_names) {
|
||||
auto iter = chunk->fields_data_.find(name);
|
||||
if (iter == chunk->fields_data_.end()) {
|
||||
auto iter = chunk->fixed_fields_.find(name);
|
||||
if (iter == chunk->fixed_fields_.end()) {
|
||||
std::string err_msg = "Missed chunk field: " + name;
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
return Status(DB_ERROR, err_msg);
|
||||
|
|
|
@ -33,12 +33,7 @@ namespace engine {
|
|||
SSMemSegment::SSMemSegment(int64_t collection_id, int64_t partition_id, const DBOptions& options)
|
||||
: collection_id_(collection_id), partition_id_(partition_id), options_(options) {
|
||||
current_mem_ = 0;
|
||||
auto status = CreateSegment();
|
||||
if (status.ok()) {
|
||||
std::string directory;
|
||||
utils::CreatePath(segment_.get(), options_, directory);
|
||||
segment_writer_ptr_ = std::make_shared<segment::SegmentWriter>(directory);
|
||||
}
|
||||
CreateSegment();
|
||||
|
||||
SetIdentity("SSMemSegment");
|
||||
AddCacheInsertDataListener();
|
||||
|
@ -54,6 +49,7 @@ SSMemSegment::CreateSegment() {
|
|||
return status;
|
||||
}
|
||||
|
||||
// create segment
|
||||
snapshot::OperationContext context;
|
||||
context.prev_partition = ss->GetResource<snapshot::Partition>(partition_id_);
|
||||
operation_ = std::make_shared<snapshot::NewSegmentOperation>(context, ss);
|
||||
|
@ -64,7 +60,58 @@ SSMemSegment::CreateSegment() {
|
|||
return status;
|
||||
}
|
||||
|
||||
return status;
|
||||
// create segment raw files (placeholder)
|
||||
auto names = ss->GetFieldNames();
|
||||
for (auto& name : names) {
|
||||
snapshot::SegmentFileContext sf_context;
|
||||
sf_context.collection_id = collection_id_;
|
||||
sf_context.partition_id = partition_id_;
|
||||
sf_context.segment_id = segment_->GetID();
|
||||
sf_context.field_name = name;
|
||||
sf_context.field_element_name = engine::DEFAULT_RAW_DATA_NAME;
|
||||
|
||||
snapshot::SegmentFilePtr seg_file;
|
||||
status = operation_->CommitNewSegmentFile(sf_context, seg_file);
|
||||
if (!status.ok()) {
|
||||
std::string err_msg = "SSMemSegment::CreateSegment failed: " + status.ToString();
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
// create deleted_doc and bloom_filter files (placeholder)
|
||||
{
|
||||
snapshot::SegmentFileContext sf_context;
|
||||
sf_context.collection_id = collection_id_;
|
||||
sf_context.partition_id = partition_id_;
|
||||
sf_context.segment_id = segment_->GetID();
|
||||
sf_context.field_name = engine::DEFAULT_UID_NAME;
|
||||
sf_context.field_element_name = engine::DEFAULT_DELETED_DOCS_NAME;
|
||||
|
||||
snapshot::SegmentFilePtr delete_doc_file, bloom_filter_file;
|
||||
status = operation_->CommitNewSegmentFile(sf_context, delete_doc_file);
|
||||
if (!status.ok()) {
|
||||
std::string err_msg = "SSMemSegment::CreateSegment failed: " + status.ToString();
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
return status;
|
||||
}
|
||||
|
||||
sf_context.field_element_name = engine::DEFAULT_BLOOM_FILTER_NAME;
|
||||
status = operation_->CommitNewSegmentFile(sf_context, bloom_filter_file);
|
||||
if (!status.ok()) {
|
||||
std::string err_msg = "SSMemSegment::CreateSegment failed: " + status.ToString();
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
auto ctx = operation_->GetContext();
|
||||
auto visitor = SegmentVisitor::Build(ss, ctx.new_segment, ctx.new_segment_files);
|
||||
|
||||
// create segment writer
|
||||
segment_writer_ptr_ = std::make_shared<segment::SSSegmentWriter>(visitor);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
|
@ -155,14 +202,22 @@ SSMemSegment::Add(const SSVectorSourcePtr& source) {
|
|||
|
||||
Status
|
||||
SSMemSegment::Delete(segment::doc_id_t doc_id) {
|
||||
segment::SegmentPtr segment_ptr;
|
||||
engine::SegmentPtr segment_ptr;
|
||||
segment_writer_ptr_->GetSegment(segment_ptr);
|
||||
|
||||
// Check wither the doc_id is present, if yes, delete it's corresponding buffer
|
||||
auto uids = segment_ptr->vectors_ptr_->GetUids();
|
||||
auto found = std::find(uids.begin(), uids.end(), doc_id);
|
||||
if (found != uids.end()) {
|
||||
auto offset = std::distance(uids.begin(), found);
|
||||
segment_ptr->vectors_ptr_->Erase(offset);
|
||||
engine::FIXED_FIELD_DATA raw_data;
|
||||
auto status = segment_ptr->GetFixedFieldData(engine::DEFAULT_UID_NAME, raw_data);
|
||||
if (!status.ok()) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int64_t* uids = reinterpret_cast<int64_t*>(raw_data.data());
|
||||
int64_t row_count = segment_ptr->GetRowCount();
|
||||
for (int64_t i = 0; i < row_count; i++) {
|
||||
if (doc_id == uids[i]) {
|
||||
segment_ptr->DeleteEntity(i);
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
|
@ -170,37 +225,31 @@ SSMemSegment::Delete(segment::doc_id_t doc_id) {
|
|||
|
||||
Status
|
||||
SSMemSegment::Delete(const std::vector<segment::doc_id_t>& doc_ids) {
|
||||
segment::SegmentPtr segment_ptr;
|
||||
engine::SegmentPtr segment_ptr;
|
||||
segment_writer_ptr_->GetSegment(segment_ptr);
|
||||
|
||||
// Check wither the doc_id is present, if yes, delete it's corresponding buffer
|
||||
|
||||
std::vector<segment::doc_id_t> temp;
|
||||
temp.resize(doc_ids.size());
|
||||
memcpy(temp.data(), doc_ids.data(), doc_ids.size() * sizeof(segment::doc_id_t));
|
||||
|
||||
std::sort(temp.begin(), temp.end());
|
||||
|
||||
auto uids = segment_ptr->vectors_ptr_->GetUids();
|
||||
engine::FIXED_FIELD_DATA raw_data;
|
||||
auto status = segment_ptr->GetFixedFieldData(engine::DEFAULT_UID_NAME, raw_data);
|
||||
if (!status.ok()) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int64_t* uids = reinterpret_cast<int64_t*>(raw_data.data());
|
||||
int64_t row_count = segment_ptr->GetRowCount();
|
||||
size_t deleted = 0;
|
||||
size_t loop = uids.size();
|
||||
for (size_t i = 0; i < loop; ++i) {
|
||||
for (int64_t i = 0; i < row_count; ++i) {
|
||||
if (std::binary_search(temp.begin(), temp.end(), uids[i])) {
|
||||
segment_ptr->vectors_ptr_->Erase(i - deleted);
|
||||
segment_ptr->DeleteEntity(i - deleted);
|
||||
++deleted;
|
||||
}
|
||||
}
|
||||
/*
|
||||
for (auto& doc_id : doc_ids) {
|
||||
auto found = std::find(uids.begin(), uids.end(), doc_id);
|
||||
if (found != uids.end()) {
|
||||
auto offset = std::distance(uids.begin(), found);
|
||||
segment_ptr->vectors_ptr_->Erase(offset);
|
||||
uids = segment_ptr->vectors_ptr_->GetUids();
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -240,7 +289,6 @@ SSMemSegment::Serialize(uint64_t wal_lsn) {
|
|||
snapshot::SegmentFilePtr seg_file;
|
||||
auto status = operation_->CommitNewSegmentFile(sf_context, seg_file);
|
||||
|
||||
segment_writer_ptr_->SetSegmentName(std::to_string(segment_->GetID()));
|
||||
status = segment_writer_ptr_->Serialize();
|
||||
if (!status.ok()) {
|
||||
LOG_ENGINE_ERROR_ << "Failed to serialize segment: " << segment_->GetID();
|
||||
|
@ -248,7 +296,7 @@ SSMemSegment::Serialize(uint64_t wal_lsn) {
|
|||
}
|
||||
|
||||
seg_file->SetSize(segment_writer_ptr_->Size());
|
||||
seg_file->SetRowCount(segment_writer_ptr_->VectorCount());
|
||||
seg_file->SetRowCount(segment_writer_ptr_->RowCount());
|
||||
|
||||
status = operation_->Push();
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
#include "db/insert/SSVectorSource.h"
|
||||
#include "db/snapshot/CompoundOperations.h"
|
||||
#include "db/snapshot/Resources.h"
|
||||
#include "segment/SegmentWriter.h"
|
||||
#include "segment/SSSegmentWriter.h"
|
||||
#include "utils/Status.h"
|
||||
|
||||
namespace milvus {
|
||||
|
@ -78,7 +78,7 @@ class SSMemSegment : public server::CacheConfigHandler {
|
|||
size_t current_mem_;
|
||||
|
||||
// ExecutionEnginePtr execution_engine_;
|
||||
segment::SegmentWriterPtr segment_writer_ptr_;
|
||||
segment::SSSegmentWriterPtr segment_writer_ptr_;
|
||||
}; // SSMemTableFile
|
||||
|
||||
using SSMemSegmentPtr = std::shared_ptr<SSMemSegment>;
|
||||
|
|
|
@ -27,7 +27,7 @@ SSVectorSource::SSVectorSource(const DataChunkPtr& chunk) : chunk_(chunk) {
|
|||
}
|
||||
|
||||
Status
|
||||
SSVectorSource::Add(const milvus::segment::SegmentWriterPtr& segment_writer_ptr, const size_t& num_entities_to_add,
|
||||
SSVectorSource::Add(const segment::SSSegmentWriterPtr& segment_writer_ptr, const size_t& num_entities_to_add,
|
||||
size_t& num_entities_added) {
|
||||
// TODO: n = vectors_.vector_count_;???
|
||||
uint64_t n = chunk_->count_;
|
||||
|
|
|
@ -19,7 +19,8 @@
|
|||
#include "db/IDGenerator.h"
|
||||
#include "db/engine/ExecutionEngine.h"
|
||||
#include "db/insert/SSMemManager.h"
|
||||
#include "segment/SegmentWriter.h"
|
||||
#include "segment/SSSegmentWriter.h"
|
||||
#include "segment/Segment.h"
|
||||
#include "utils/Status.h"
|
||||
|
||||
namespace milvus {
|
||||
|
@ -32,7 +33,7 @@ class SSVectorSource {
|
|||
explicit SSVectorSource(const DataChunkPtr& chunk);
|
||||
|
||||
Status
|
||||
Add(const segment::SegmentWriterPtr& segment_writer_ptr, const size_t& num_attrs_to_add, size_t& num_attrs_added);
|
||||
Add(const segment::SSSegmentWriterPtr& segment_writer_ptr, const size_t& num_attrs_to_add, size_t& num_attrs_added);
|
||||
|
||||
bool
|
||||
AllAdded();
|
||||
|
|
|
@ -36,6 +36,11 @@ namespace milvus {
|
|||
namespace segment {
|
||||
|
||||
SSSegmentWriter::SSSegmentWriter(const engine::SegmentVisitorPtr& segment_visitor) : segment_visitor_(segment_visitor) {
|
||||
Initialize();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::Initialize() {
|
||||
auto& segment_ptr = segment_visitor_->GetSegment();
|
||||
std::string directory = engine::snapshot::GetResPath<engine::snapshot::Segment>(segment_ptr);
|
||||
|
||||
|
@ -43,118 +48,32 @@ SSSegmentWriter::SSSegmentWriter(const engine::SegmentVisitorPtr& segment_visito
|
|||
storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
|
||||
storage::OperationPtr operation_ptr = std::make_shared<storage::DiskOperation>(directory);
|
||||
fs_ptr_ = std::make_shared<storage::FSHandler>(reader_ptr, writer_ptr, operation_ptr);
|
||||
segment_ptr_ = std::make_shared<Segment>();
|
||||
}
|
||||
segment_ptr_ = std::make_shared<engine::Segment>();
|
||||
|
||||
Status
|
||||
SSSegmentWriter::AddVectors(const std::string& name, const std::vector<uint8_t>& data,
|
||||
const std::vector<doc_id_t>& uids) {
|
||||
segment_ptr_->vectors_ptr_->AddData(data);
|
||||
segment_ptr_->vectors_ptr_->AddUids(uids);
|
||||
segment_ptr_->vectors_ptr_->SetName(name);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::AddVectors(const std::string& name, const uint8_t* data, uint64_t size,
|
||||
const std::vector<doc_id_t>& uids) {
|
||||
segment_ptr_->vectors_ptr_->AddData(data, size);
|
||||
segment_ptr_->vectors_ptr_->AddUids(uids);
|
||||
segment_ptr_->vectors_ptr_->SetName(name);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::AddAttrs(const std::string& name, const std::unordered_map<std::string, uint64_t>& attr_nbytes,
|
||||
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data,
|
||||
const std::vector<doc_id_t>& uids) {
|
||||
auto attr_data_it = attr_data.begin();
|
||||
auto attrs = segment_ptr_->attrs_ptr_->attrs;
|
||||
for (; attr_data_it != attr_data.end(); ++attr_data_it) {
|
||||
AttrPtr attr = std::make_shared<Attr>(attr_data_it->second, attr_nbytes.at(attr_data_it->first), uids,
|
||||
attr_data_it->first);
|
||||
segment_ptr_->attrs_ptr_->attrs.insert(std::make_pair(attr_data_it->first, attr));
|
||||
|
||||
// if (attrs.find(attr_data_it->first) != attrs.end()) {
|
||||
// segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first)
|
||||
// ->AddAttr(attr_data_it->second, attr_nbytes.at(attr_data_it->first));
|
||||
// segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first)->AddUids(uids);
|
||||
// } else {
|
||||
// AttrPtr attr = std::make_shared<Attr>(attr_data_it->second, attr_nbytes.at(attr_data_it->first),
|
||||
// uids,
|
||||
// attr_data_it->first);
|
||||
// segment_ptr_->attrs_ptr_->attrs.insert(std::make_pair(attr_data_it->first, attr));
|
||||
// }
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::SetAttrsIndex(const std::unordered_map<std::string, knowhere::IndexPtr>& attr_indexes,
|
||||
const std::unordered_map<std::string, int64_t>& attr_sizes,
|
||||
const std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type) {
|
||||
auto attrs_index = std::make_shared<AttrsIndex>();
|
||||
auto attr_it = attr_indexes.begin();
|
||||
for (; attr_it != attr_indexes.end(); attr_it++) {
|
||||
auto attr_index = std::make_shared<AttrIndex>();
|
||||
attr_index->SetFieldName(attr_it->first);
|
||||
attr_index->SetDataType(attr_type.at(attr_it->first));
|
||||
attr_index->SetAttrIndex(attr_it->second);
|
||||
attrs_index->attr_indexes.insert(std::make_pair(attr_it->first, attr_index));
|
||||
}
|
||||
segment_ptr_->attrs_index_ptr_ = attrs_index;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::SetVectorIndex(const milvus::knowhere::VecIndexPtr& index) {
|
||||
segment_ptr_->vector_index_ptr_->SetVectorIndex(index);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::Serialize() {
|
||||
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
|
||||
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME);
|
||||
|
||||
/* write UID's raw data */
|
||||
auto uid_raw_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_RAW);
|
||||
std::string uid_raw_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_raw_visitor->GetFile());
|
||||
STATUS_CHECK(WriteUids(uid_raw_path, segment_ptr_->vectors_ptr_->GetUids()));
|
||||
|
||||
/* write UID's deleted docs */
|
||||
auto uid_del_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
|
||||
std::string uid_del_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_del_visitor->GetFile());
|
||||
STATUS_CHECK(WriteDeletedDocs(uid_del_path, segment_ptr_->deleted_docs_ptr_));
|
||||
|
||||
/* write UID's bloom filter */
|
||||
auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER);
|
||||
std::string uid_blf_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_blf_visitor->GetFile());
|
||||
STATUS_CHECK(WriteBloomFilter(uid_blf_path, segment_ptr_->id_bloom_filter_ptr_));
|
||||
|
||||
/* write other data */
|
||||
for (auto& f_kv : field_visitors_map) {
|
||||
auto& field_visitor = f_kv.second;
|
||||
auto& field = field_visitor->GetField();
|
||||
for (auto& file_kv : field_visitor->GetElementVistors()) {
|
||||
auto& field_element_visitor = file_kv.second;
|
||||
|
||||
auto& segment_file = field_element_visitor->GetFile();
|
||||
if (segment_file == nullptr) {
|
||||
continue;
|
||||
}
|
||||
auto file_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(segment_file);
|
||||
auto& field_element = field_element_visitor->GetElement();
|
||||
|
||||
if ((field->GetFtype() == engine::FieldType::VECTOR_FLOAT ||
|
||||
field->GetFtype() == engine::FieldType::VECTOR_BINARY) &&
|
||||
field_element->GetFtype() == engine::FieldElementType::FET_RAW) {
|
||||
STATUS_CHECK(WriteVectors(file_path, segment_ptr_->vectors_ptr_->GetData()));
|
||||
const engine::SegmentVisitor::IdMapT& field_map = segment_visitor_->GetFieldVisitors();
|
||||
for (auto& iter : field_map) {
|
||||
const engine::snapshot::FieldPtr& field = iter.second->GetField();
|
||||
std::string name = field->GetName();
|
||||
engine::FIELD_TYPE ftype = static_cast<engine::FIELD_TYPE>(field->GetFtype());
|
||||
if (ftype == engine::FIELD_TYPE::VECTOR || ftype == engine::FIELD_TYPE::VECTOR ||
|
||||
ftype == engine::FIELD_TYPE::VECTOR) {
|
||||
json params = field->GetParams();
|
||||
if (params.find(engine::VECTOR_DIMENSION_PARAM) == params.end()) {
|
||||
std::string msg = "Vector field params must contain: dimension";
|
||||
LOG_SERVER_ERROR_ << msg;
|
||||
return Status(DB_ERROR, msg);
|
||||
}
|
||||
|
||||
/* SS TODO: write attr data ? */
|
||||
uint64_t field_width = 0;
|
||||
uint64_t dimension = params[engine::VECTOR_DIMENSION_PARAM];
|
||||
if (ftype == engine::FIELD_TYPE::VECTOR_BINARY) {
|
||||
field_width += (dimension / 8);
|
||||
} else {
|
||||
field_width += (dimension * sizeof(float));
|
||||
}
|
||||
segment_ptr_->AddField(name, ftype, field_width);
|
||||
} else {
|
||||
segment_ptr_->AddField(name, ftype);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -162,11 +81,51 @@ SSSegmentWriter::Serialize() {
|
|||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::WriteUids(const std::string& file_path, const std::vector<doc_id_t>& uids) {
|
||||
SSSegmentWriter::AddChunk(const engine::DataChunkPtr& chunk_ptr) {
|
||||
return segment_ptr_->AddChunk(chunk_ptr);
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::AddChunk(const engine::DataChunkPtr& chunk_ptr, uint64_t from, uint64_t to) {
|
||||
return segment_ptr_->AddChunk(chunk_ptr, from, to);
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::Serialize() {
|
||||
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
|
||||
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME);
|
||||
|
||||
/* write fields raw data */
|
||||
for (auto& iter : field_visitors_map) {
|
||||
const engine::snapshot::FieldPtr& field = iter.second->GetField();
|
||||
std::string name = field->GetName();
|
||||
engine::FIXED_FIELD_DATA raw_data;
|
||||
segment_ptr_->GetFixedFieldData(name, raw_data);
|
||||
|
||||
auto element_visitor = iter.second->GetElementVisitor(engine::FieldElementType::FET_RAW);
|
||||
std::string file_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(element_visitor->GetFile());
|
||||
STATUS_CHECK(WriteField(file_path, raw_data));
|
||||
}
|
||||
|
||||
/* write UID's deleted docs */
|
||||
auto uid_del_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
|
||||
std::string uid_del_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_del_visitor->GetFile());
|
||||
STATUS_CHECK(WriteDeletedDocs(uid_del_path, segment_ptr_->GetDeletedDocs()));
|
||||
|
||||
/* write UID's bloom filter */
|
||||
auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER);
|
||||
std::string uid_blf_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(uid_blf_visitor->GetFile());
|
||||
STATUS_CHECK(WriteBloomFilter(uid_blf_path, segment_ptr_->GetBloomFilter()));
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::WriteField(const std::string& file_path, const engine::FIXED_FIELD_DATA& raw) {
|
||||
try {
|
||||
auto& ss_codec = codec::SSCodec::instance();
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
ss_codec.GetVectorsFormat()->write_uids(fs_ptr_, file_path, uids);
|
||||
ss_codec.GetBlockFormat()->write(fs_ptr_, file_path, raw);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
|
@ -177,97 +136,32 @@ SSSegmentWriter::WriteUids(const std::string& file_path, const std::vector<doc_i
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::WriteVectors(const std::string& file_path, const std::vector<uint8_t>& raw_vectors) {
|
||||
try {
|
||||
auto& ss_codec = codec::SSCodec::instance();
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
ss_codec.GetVectorsFormat()->write_vectors(fs_ptr_, file_path, raw_vectors);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
|
||||
engine::utils::SendExitSignal();
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::WriteAttrs() {
|
||||
try {
|
||||
auto& ss_codec = codec::SSCodec::instance();
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
ss_codec.GetAttrsFormat()->write(fs_ptr_, segment_ptr_->attrs_ptr_);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
|
||||
engine::utils::SendExitSignal();
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::WriteVectorIndex(const std::string& location) {
|
||||
if (location.empty()) {
|
||||
return Status(SERVER_WRITE_ERROR, "Invalid parameter of WriteVectorIndex");
|
||||
}
|
||||
|
||||
try {
|
||||
auto& ss_codec = codec::SSCodec::instance();
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
ss_codec.GetVectorIndexFormat()->write(fs_ptr_, location, segment_ptr_->vector_index_ptr_);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write vector index: " + std::string(e.what());
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
|
||||
engine::utils::SendExitSignal();
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::WriteAttrsIndex() {
|
||||
try {
|
||||
auto& ss_codec = codec::SSCodec::instance();
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
ss_codec.GetAttrsIndexFormat()->write(fs_ptr_, segment_ptr_->attrs_index_ptr_);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write vector index: " + std::string(e.what());
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
|
||||
engine::utils::SendExitSignal();
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::WriteBloomFilter(const std::string& file_path) {
|
||||
try {
|
||||
auto& ss_codec = codec::SSCodec::instance();
|
||||
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
|
||||
TimeRecorder recorder("SSSegmentWriter::WriteBloomFilter");
|
||||
|
||||
ss_codec.GetIdBloomFilterFormat()->create(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_);
|
||||
engine::FIXED_FIELD_DATA uid_data;
|
||||
auto status = segment_ptr_->GetFixedFieldData(engine::DEFAULT_UID_NAME, uid_data);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
segment::IdBloomFilterPtr bloom_filter_ptr;
|
||||
ss_codec.GetIdBloomFilterFormat()->create(fs_ptr_, bloom_filter_ptr);
|
||||
|
||||
recorder.RecordSection("Initializing bloom filter");
|
||||
|
||||
auto& uids = segment_ptr_->vectors_ptr_->GetUids();
|
||||
for (auto& uid : uids) {
|
||||
segment_ptr_->id_bloom_filter_ptr_->Add(uid);
|
||||
int64_t* uids = (int64_t*)(uid_data.data());
|
||||
int64_t row_count = segment_ptr_->GetRowCount();
|
||||
for (uint64_t i = 0; i < row_count; i++) {
|
||||
bloom_filter_ptr->Add(uids[i]);
|
||||
}
|
||||
segment_ptr_->SetBloomFilter(bloom_filter_ptr);
|
||||
|
||||
recorder.RecordSection("Adding " + std::to_string(uids.size()) + " ids to bloom filter");
|
||||
|
||||
ss_codec.GetIdBloomFilterFormat()->write(fs_ptr_, file_path, segment_ptr_->id_bloom_filter_ptr_);
|
||||
|
||||
recorder.RecordSection("Writing bloom filter");
|
||||
recorder.RecordSection("Adding " + std::to_string(row_count) + " ids to bloom filter");
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
|
@ -275,23 +169,37 @@ SSSegmentWriter::WriteBloomFilter(const std::string& file_path) {
|
|||
engine::utils::SendExitSignal();
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
}
|
||||
|
||||
return WriteBloomFilter(file_path, segment_ptr_->GetBloomFilter());
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilterPtr& id_bloom_filter_ptr) {
|
||||
try {
|
||||
TimeRecorder recorder("SSSegmentWriter::WriteBloomFilter");
|
||||
auto& ss_codec = codec::SSCodec::instance();
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
ss_codec.GetIdBloomFilterFormat()->write(fs_ptr_, file_path, id_bloom_filter_ptr);
|
||||
recorder.RecordSection("finish writing bloom filter");
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write bloom filter: " + std::string(e.what());
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
|
||||
engine::utils::SendExitSignal();
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::WriteDeletedDocs(const std::string& file_path) {
|
||||
try {
|
||||
auto& ss_codec = codec::SSCodec::instance();
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
DeletedDocsPtr deleted_docs_ptr = std::make_shared<DeletedDocs>();
|
||||
ss_codec.GetDeletedDocsFormat()->write(fs_ptr_, file_path, deleted_docs_ptr);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
|
||||
engine::utils::SendExitSignal();
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
DeletedDocsPtr deleted_docs_ptr = std::make_shared<DeletedDocs>();
|
||||
auto status = WriteDeletedDocs(file_path, deleted_docs_ptr);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
segment_ptr_->SetDeletedDocs(deleted_docs_ptr);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -312,35 +220,13 @@ SSSegmentWriter::WriteDeletedDocs(const std::string& file_path, const DeletedDoc
|
|||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilterPtr& id_bloom_filter_ptr) {
|
||||
try {
|
||||
auto& ss_codec = codec::SSCodec::instance();
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
ss_codec.GetIdBloomFilterFormat()->write(fs_ptr_, file_path, id_bloom_filter_ptr);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write bloom filter: " + std::string(e.what());
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
|
||||
engine::utils::SendExitSignal();
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::Cache() {
|
||||
// TODO(zhiru)
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::GetSegment(SegmentPtr& segment_ptr) {
|
||||
SSSegmentWriter::GetSegment(engine::SegmentPtr& segment_ptr) {
|
||||
segment_ptr = segment_ptr_;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) {
|
||||
SSSegmentWriter::Merge(const SSSegmentReaderPtr& segment_to_merge) {
|
||||
// if (dir_to_merge == fs_ptr_->operation_ptr_->GetDirectory()) {
|
||||
// return Status(DB_ERROR, "Cannot Merge Self");
|
||||
// }
|
||||
|
@ -404,25 +290,37 @@ SSSegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name)
|
|||
|
||||
size_t
|
||||
SSSegmentWriter::Size() {
|
||||
// TODO(zhiru): switch to actual directory size
|
||||
size_t vectors_size = segment_ptr_->vectors_ptr_->VectorsSize();
|
||||
size_t uids_size = segment_ptr_->vectors_ptr_->UidsSize();
|
||||
/*
|
||||
if (segment_ptr_->id_bloom_filter_ptr_) {
|
||||
ret += segment_ptr_->id_bloom_filter_ptr_->Size();
|
||||
}
|
||||
*/
|
||||
return (vectors_size * sizeof(uint8_t) + uids_size * sizeof(doc_id_t));
|
||||
return 0;
|
||||
}
|
||||
|
||||
size_t
|
||||
SSSegmentWriter::VectorCount() {
|
||||
return segment_ptr_->vectors_ptr_->GetCount();
|
||||
SSSegmentWriter::RowCount() {
|
||||
return segment_ptr_->GetRowCount();
|
||||
}
|
||||
|
||||
void
|
||||
SSSegmentWriter::SetSegmentName(const std::string& name) {
|
||||
segment_ptr_->vectors_ptr_->SetName(name);
|
||||
Status
|
||||
SSSegmentWriter::SetVectorIndex(const std::string& field_name, const milvus::knowhere::VecIndexPtr& index) {
|
||||
return segment_ptr_->SetVectorIndex(field_name, index);
|
||||
}
|
||||
|
||||
Status
|
||||
SSSegmentWriter::WriteVectorIndex(const std::string& field_name, const std::string& file_path) {
|
||||
try {
|
||||
knowhere::VecIndexPtr index;
|
||||
segment_ptr_->GetVectorIndex(field_name, index);
|
||||
segment::VectorIndexPtr index_ptr = std::make_shared<segment::VectorIndex>(index);
|
||||
|
||||
auto& ss_codec = codec::SSCodec::instance();
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
ss_codec.GetVectorIndexFormat()->write(fs_ptr_, file_path, index_ptr);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write vector index: " + std::string(e.what());
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
|
||||
engine::utils::SendExitSignal();
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace segment
|
||||
|
|
|
@ -24,7 +24,8 @@
|
|||
#include <vector>
|
||||
|
||||
#include "db/SnapshotVisitor.h"
|
||||
#include "segment/Types.h"
|
||||
#include "segment/SSSegmentReader.h"
|
||||
#include "segment/Segment.h"
|
||||
#include "storage/FSHandler.h"
|
||||
#include "utils/Status.h"
|
||||
|
||||
|
@ -36,22 +37,10 @@ class SSSegmentWriter {
|
|||
explicit SSSegmentWriter(const engine::SegmentVisitorPtr& segment_visitor);
|
||||
|
||||
Status
|
||||
AddVectors(const std::string& name, const std::vector<uint8_t>& data, const std::vector<doc_id_t>& uids);
|
||||
AddChunk(const engine::DataChunkPtr& chunk_ptr);
|
||||
|
||||
Status
|
||||
AddVectors(const std::string& name, const uint8_t* data, uint64_t size, const std::vector<doc_id_t>& uids);
|
||||
|
||||
Status
|
||||
AddAttrs(const std::string& name, const std::unordered_map<std::string, uint64_t>& attr_nbytes,
|
||||
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, const std::vector<doc_id_t>& uids);
|
||||
|
||||
Status
|
||||
SetVectorIndex(const knowhere::VecIndexPtr& index);
|
||||
|
||||
Status
|
||||
SetAttrsIndex(const std::unordered_map<std::string, knowhere::IndexPtr>& attr_index,
|
||||
const std::unordered_map<std::string, int64_t>& attr_nbytes,
|
||||
const std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type);
|
||||
AddChunk(const engine::DataChunkPtr& chunk_ptr, uint64_t from, uint64_t to);
|
||||
|
||||
Status
|
||||
WriteBloomFilter(const std::string& file_path, const IdBloomFilterPtr& bloom_filter_ptr);
|
||||
|
@ -63,38 +52,29 @@ class SSSegmentWriter {
|
|||
Serialize();
|
||||
|
||||
Status
|
||||
Cache();
|
||||
GetSegment(engine::SegmentPtr& segment_ptr);
|
||||
|
||||
Status
|
||||
GetSegment(SegmentPtr& segment_ptr);
|
||||
|
||||
Status
|
||||
Merge(const std::string& segment_dir_to_merge, const std::string& name);
|
||||
Merge(const SSSegmentReaderPtr& segment_to_merge);
|
||||
|
||||
size_t
|
||||
Size();
|
||||
|
||||
size_t
|
||||
VectorCount();
|
||||
RowCount();
|
||||
|
||||
Status
|
||||
WriteVectorIndex(const std::string& location);
|
||||
SetVectorIndex(const std::string& field_name, const knowhere::VecIndexPtr& index);
|
||||
|
||||
Status
|
||||
WriteAttrsIndex();
|
||||
|
||||
void
|
||||
SetSegmentName(const std::string& name);
|
||||
WriteVectorIndex(const std::string& field_name, const std::string& file_path);
|
||||
|
||||
private:
|
||||
Status
|
||||
WriteUids(const std::string& file_path, const std::vector<doc_id_t>& uids);
|
||||
Initialize();
|
||||
|
||||
Status
|
||||
WriteVectors(const std::string& file_path, const std::vector<uint8_t>& raw_vectors);
|
||||
|
||||
Status
|
||||
WriteAttrs();
|
||||
WriteField(const std::string& file_path, const engine::FIXED_FIELD_DATA& raw);
|
||||
|
||||
Status
|
||||
WriteBloomFilter(const std::string& file_path);
|
||||
|
@ -105,7 +85,7 @@ class SSSegmentWriter {
|
|||
private:
|
||||
engine::SegmentVisitorPtr segment_visitor_;
|
||||
storage::FSHandlerPtr fs_ptr_;
|
||||
SegmentPtr segment_ptr_;
|
||||
engine::SegmentPtr segment_ptr_;
|
||||
};
|
||||
|
||||
using SSSegmentWriterPtr = std::shared_ptr<SSSegmentWriter>;
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "segment/Segment.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
#include <utility>
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
const char* VECTOR_DIMENSION_PARAM = "dimension"; // hard code
|
||||
|
||||
Status
|
||||
Segment::AddField(const std::string& field_name, FIELD_TYPE field_type, uint64_t field_width) {
|
||||
if (field_types_.find(field_name) != field_types_.end()) {
|
||||
return Status(DB_ERROR, "duplicate field: " + field_name);
|
||||
}
|
||||
|
||||
uint64_t real_field_width = 0;
|
||||
switch (field_type) {
|
||||
case FIELD_TYPE::BOOL:
|
||||
real_field_width = sizeof(bool);
|
||||
break;
|
||||
case FIELD_TYPE::DOUBLE:
|
||||
real_field_width = sizeof(double);
|
||||
break;
|
||||
case FIELD_TYPE::FLOAT:
|
||||
real_field_width = sizeof(float);
|
||||
break;
|
||||
case FIELD_TYPE::INT8:
|
||||
real_field_width = sizeof(uint8_t);
|
||||
break;
|
||||
case FIELD_TYPE::INT16:
|
||||
real_field_width = sizeof(uint16_t);
|
||||
break;
|
||||
case FIELD_TYPE::INT32:
|
||||
real_field_width = sizeof(uint32_t);
|
||||
break;
|
||||
case FIELD_TYPE::UID:
|
||||
case FIELD_TYPE::INT64:
|
||||
real_field_width = sizeof(uint64_t);
|
||||
break;
|
||||
case FIELD_TYPE::VECTOR:
|
||||
case FIELD_TYPE::VECTOR_FLOAT:
|
||||
case FIELD_TYPE::VECTOR_BINARY: {
|
||||
if (field_width == 0) {
|
||||
std::string msg = "vecor field dimension required: " + field_name;
|
||||
LOG_SERVER_ERROR_ << msg;
|
||||
return Status(DB_ERROR, msg);
|
||||
}
|
||||
|
||||
real_field_width = field_width;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
field_types_.insert(std::make_pair(field_name, field_type));
|
||||
fixed_fields_width_.insert(std::make_pair(field_name, real_field_width));
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
Segment::AddChunk(const DataChunkPtr& chunk_ptr) {
|
||||
if (chunk_ptr == nullptr || chunk_ptr->count_ == 0) {
|
||||
return Status(DB_ERROR, "invalid input");
|
||||
}
|
||||
|
||||
return AddChunk(chunk_ptr, 0, chunk_ptr->count_);
|
||||
}
|
||||
|
||||
Status
|
||||
Segment::AddChunk(const DataChunkPtr& chunk_ptr, uint64_t from, uint64_t to) {
|
||||
if (chunk_ptr == nullptr || from > chunk_ptr->count_ || to > chunk_ptr->count_ || from >= to) {
|
||||
return Status(DB_ERROR, "invalid input");
|
||||
}
|
||||
|
||||
// check input
|
||||
for (auto& iter : chunk_ptr->fixed_fields_) {
|
||||
auto width_iter = fixed_fields_width_.find(iter.first);
|
||||
if (width_iter == fixed_fields_width_.end()) {
|
||||
return Status(DB_ERROR, "field not yet defined: " + iter.first);
|
||||
}
|
||||
|
||||
if (iter.second.size() != width_iter->second * chunk_ptr->count_) {
|
||||
return Status(DB_ERROR, "illegal field: " + iter.first);
|
||||
}
|
||||
}
|
||||
|
||||
// consume
|
||||
uint64_t add_count = to - from;
|
||||
for (auto& width_iter : fixed_fields_width_) {
|
||||
auto input = chunk_ptr->fixed_fields_.find(width_iter.first);
|
||||
auto& data = fixed_fields_[width_iter.first];
|
||||
size_t origin_bytes = data.size();
|
||||
uint64_t add_bytes = add_count * width_iter.second;
|
||||
uint64_t previous_bytes = row_count_ * width_iter.second;
|
||||
uint64_t target_bytes = previous_bytes + add_bytes;
|
||||
data.resize(target_bytes);
|
||||
if (input == chunk_ptr->fixed_fields_.end()) {
|
||||
// this field is not provided, complicate by 0
|
||||
memset(data.data() + origin_bytes, 0, target_bytes - origin_bytes);
|
||||
} else {
|
||||
// complicate by 0
|
||||
if (origin_bytes < previous_bytes) {
|
||||
memset(data.data() + origin_bytes, 0, previous_bytes - origin_bytes);
|
||||
}
|
||||
// copy input into this field
|
||||
memcpy(data.data() + previous_bytes, input->second.data() + from * width_iter.second, add_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
row_count_ += add_count;
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
Segment::DeleteEntity(int32_t offset) {
|
||||
for (auto& pair : fixed_fields_) {
|
||||
uint64_t width = fixed_fields_width_[pair.first];
|
||||
if (width != 0) {
|
||||
auto step = offset * width;
|
||||
FIXED_FIELD_DATA& data = pair.second;
|
||||
data.erase(data.begin() + step, data.begin() + step + width);
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
Segment::GetFieldType(const std::string& field_name, FIELD_TYPE& type) {
|
||||
auto iter = field_types_.find(field_name);
|
||||
if (iter == field_types_.end()) {
|
||||
return Status(DB_ERROR, "invalid field name: " + field_name);
|
||||
}
|
||||
|
||||
type = iter->second;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
Segment::GetFixedFieldWidth(const std::string& field_name, uint64_t width) {
|
||||
auto iter = fixed_fields_width_.find(field_name);
|
||||
if (iter == fixed_fields_width_.end()) {
|
||||
return Status(DB_ERROR, "invalid field name: " + field_name);
|
||||
}
|
||||
|
||||
width = iter->second;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
Segment::GetFixedFieldData(const std::string& field_name, FIXED_FIELD_DATA& data) {
|
||||
auto iter = fixed_fields_.find(field_name);
|
||||
if (iter == fixed_fields_.end()) {
|
||||
return Status(DB_ERROR, "invalid field name: " + field_name);
|
||||
}
|
||||
|
||||
data = iter->second;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
Segment::GetVectorIndex(const std::string& field_name, knowhere::VecIndexPtr& index) {
|
||||
auto iter = vector_indice_.find(field_name);
|
||||
if (iter == vector_indice_.end()) {
|
||||
return Status(DB_ERROR, "invalid field name: " + field_name);
|
||||
}
|
||||
|
||||
index = iter->second;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
Segment::SetVectorIndex(const std::string& field_name, const knowhere::VecIndexPtr& index) {
|
||||
vector_indice_[field_name] = index;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
|
@ -0,0 +1,140 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "db/Types.h"
|
||||
#include "db/meta/MetaTypes.h"
|
||||
#include "segment/DeletedDocs.h"
|
||||
#include "segment/IdBloomFilter.h"
|
||||
#include "segment/VectorIndex.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
extern const char* VECTOR_DIMENSION_PARAM;
|
||||
|
||||
using FIELD_TYPE = engine::meta::hybrid::DataType;
|
||||
using FIELD_TYPE_MAP = std::unordered_map<std::string, engine::meta::hybrid::DataType>;
|
||||
using FIELD_WIDTH_MAP = std::unordered_map<std::string, uint64_t>;
|
||||
using FIXED_FIELD_DATA = std::vector<uint8_t>;
|
||||
using FIXEDX_FIELD_MAP = std::unordered_map<std::string, FIXED_FIELD_DATA>;
|
||||
using VARIABLE_FIELD_DATA = std::vector<std::string>;
|
||||
using VARIABLE_FIELD_MAP = std::unordered_map<std::string, VARIABLE_FIELD_DATA>;
|
||||
using VECTOR_INDEX_MAP = std::unordered_map<std::string, knowhere::VecIndexPtr>;
|
||||
|
||||
struct DataChunk {
|
||||
uint64_t count_ = 0;
|
||||
FIXEDX_FIELD_MAP fixed_fields_;
|
||||
VARIABLE_FIELD_MAP variable_fields_;
|
||||
};
|
||||
|
||||
using DataChunkPtr = std::shared_ptr<DataChunk>;
|
||||
|
||||
class Segment {
|
||||
public:
|
||||
Status
|
||||
AddField(const std::string& field_name, FIELD_TYPE field_type, uint64_t field_width = 0);
|
||||
|
||||
Status
|
||||
AddChunk(const DataChunkPtr& chunk_ptr);
|
||||
|
||||
Status
|
||||
AddChunk(const DataChunkPtr& chunk_ptr, uint64_t from, uint64_t to);
|
||||
|
||||
Status
|
||||
DeleteEntity(int32_t offset);
|
||||
|
||||
Status
|
||||
GetFieldType(const std::string& field_name, FIELD_TYPE& type);
|
||||
|
||||
Status
|
||||
GetFixedFieldWidth(const std::string& field_name, uint64_t width);
|
||||
|
||||
Status
|
||||
GetFixedFieldData(const std::string& field_name, FIXED_FIELD_DATA& data);
|
||||
|
||||
Status
|
||||
GetVectorIndex(const std::string& field_name, knowhere::VecIndexPtr& index);
|
||||
|
||||
Status
|
||||
SetVectorIndex(const std::string& field_name, const knowhere::VecIndexPtr& index);
|
||||
|
||||
FIELD_TYPE_MAP&
|
||||
GetFieldTypes() {
|
||||
return field_types_;
|
||||
}
|
||||
FIXEDX_FIELD_MAP&
|
||||
GetFixedFields() {
|
||||
return fixed_fields_;
|
||||
}
|
||||
VARIABLE_FIELD_MAP&
|
||||
GetVariableFields() {
|
||||
return variable_fields_;
|
||||
}
|
||||
VECTOR_INDEX_MAP&
|
||||
GetVectorIndice() {
|
||||
return vector_indice_;
|
||||
}
|
||||
|
||||
int64_t
|
||||
GetRowCount() const {
|
||||
return row_count_;
|
||||
}
|
||||
|
||||
segment::DeletedDocsPtr
|
||||
GetDeletedDocs() const {
|
||||
return deleted_docs_ptr_;
|
||||
}
|
||||
|
||||
void
|
||||
SetDeletedDocs(const segment::DeletedDocsPtr& ptr) {
|
||||
deleted_docs_ptr_ = ptr;
|
||||
}
|
||||
|
||||
segment::IdBloomFilterPtr
|
||||
GetBloomFilter() const {
|
||||
return id_bloom_filter_ptr_;
|
||||
}
|
||||
|
||||
void
|
||||
SetBloomFilter(const segment::IdBloomFilterPtr& ptr) {
|
||||
id_bloom_filter_ptr_ = ptr;
|
||||
}
|
||||
|
||||
private:
|
||||
FIELD_TYPE_MAP field_types_;
|
||||
FIELD_WIDTH_MAP fixed_fields_width_;
|
||||
FIXEDX_FIELD_MAP fixed_fields_;
|
||||
VARIABLE_FIELD_MAP variable_fields_;
|
||||
VECTOR_INDEX_MAP vector_indice_;
|
||||
|
||||
uint64_t row_count_ = 0;
|
||||
|
||||
segment::DeletedDocsPtr deleted_docs_ptr_ = nullptr;
|
||||
segment::IdBloomFilterPtr id_bloom_filter_ptr_ = nullptr;
|
||||
};
|
||||
|
||||
using SegmentPtr = std::shared_ptr<Segment>;
|
||||
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
|
@ -92,8 +92,8 @@ TEST_F(SSSegmentTest, SegmentTest) {
|
|||
auto visitor = SegmentVisitor::Build(ss, segment->GetID());
|
||||
milvus::segment::SSSegmentWriter segment_writer(visitor);
|
||||
|
||||
status = segment_writer.AddVectors("test", raw_vectors, raw_uids);
|
||||
ASSERT_TRUE(status.ok());
|
||||
// status = segment_writer.AddVectors("test", raw_vectors, raw_uids);
|
||||
// ASSERT_TRUE(status.ok());
|
||||
|
||||
// status = segment_writer.Serialize();
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
|
Loading…
Reference in New Issue