Add index meta

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2020-09-21 15:40:56 +08:00 committed by yefu.chen
parent a95f6843aa
commit 99d0a69e6a
11 changed files with 264 additions and 36 deletions

View File

@ -1,20 +1,123 @@
#include "Collection.h"
#include "pb/master.pb.h"
//using Collection = masterpb::Collection;
#include "pb/message.pb.h"
#include <google/protobuf/text_format.h>
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
namespace milvus::dog_segment {
Collection::Collection(std::string &collection_name, std::string &schema):
collection_name_(collection_name), schema_json_(schema) {
parse();
}
void
Collection::set_index() {}
Collection::AddIndex(const grpc::IndexParam& index_param) {
auto& index_name = index_param.index_name();
auto& field_name = index_param.field_name();
assert(!index_name.empty());
assert(!field_name.empty());
auto index_type = knowhere::IndexEnum::INDEX_FAISS_IVFPQ;
auto index_mode = knowhere::IndexMode::MODE_CPU;
knowhere::Config index_conf;
bool found_index_type = false;
bool found_index_mode = false;
bool found_index_conf = false;
auto extra_params = index_param.extra_params();
for (auto& extra_param: extra_params) {
if (extra_param.key() == "index_type") {
index_type = extra_param.value().data();
found_index_type = true;
continue;
}
if (extra_param.key() == "index_mode") {
auto index_mode_int = stoi(extra_param.value());
if (index_mode_int == 0) {
found_index_mode = true;
continue;
} else if (index_mode_int == 1) {
index_mode = knowhere::IndexMode::MODE_GPU;
found_index_mode = true;
continue;
} else {
throw std::runtime_error("Illegal index mode, only 0 or 1 is supported.");
}
}
if (extra_param.key() == "params") {
index_conf = nlohmann::json::parse(extra_param.value());
found_index_conf = true;
continue;
}
}
if (!found_index_type) {
std::cout << "WARN: Not specify index type, use default index type: INDEX_FAISS_IVFPQ" << std::endl;
}
if (!found_index_mode) {
std::cout << "WARN: Not specify index mode, use default index mode: MODE_CPU" << std::endl;
}
if (!found_index_conf) {
int dim = 0;
for (auto& field: schema_->get_fields()) {
if (field.get_data_type() == DataType::VECTOR_FLOAT) {
dim = field.get_dim();
}
}
assert(dim != 0);
index_conf = milvus::knowhere::Config{
{knowhere::meta::DIM, dim},
{knowhere::IndexParams::nlist, 100},
{knowhere::IndexParams::nprobe, 4},
{knowhere::IndexParams::m, 4},
{knowhere::IndexParams::nbits, 8},
{knowhere::Metric::TYPE, milvus::knowhere::Metric::L2},
{knowhere::meta::DEVICEID, 0},
};
std::cout << "WARN: Not specify index config, use default index config" << std::endl;
}
index_->AddEntry(index_name, field_name, index_type, index_mode, index_conf);
}
void
Collection::CreateIndex(std::string &index_config) {
if(index_config.empty()) {
index_ = nullptr;
std::cout << "null index config when create index" << std::endl;
return;
}
masterpb::Collection collection;
auto suc = google::protobuf::TextFormat::ParseFromString(index_config, &collection);
if (!suc) {
std::cerr << "unmarshal index string failed" << std::endl;
}
index_ = std::make_shared<IndexMeta>(schema_);
for (const auto &index: collection.indexes()){
std::cout << "add index, index name =" << index.index_name()
<< ", field_name = " << index.field_name()
<< std::endl;
AddIndex(index);
}
}
void
Collection::parse() {
if(schema_json_ == "") {
if(schema_json_.empty()) {
auto schema = std::make_shared<Schema>();
schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16);
schema->AddField("age", DataType::INT32);
@ -27,12 +130,12 @@ Collection::parse() {
if (!suc) {
std::cerr << "unmarshal failed" << std::endl;
std::cerr << "unmarshal schema string failed" << std::endl;
}
auto schema = std::make_shared<Schema>();
for (const milvus::grpc::FieldMeta & child: collection.schema().field_metas()){
std::cout<<"add Field, name :" << child.field_name() << ", datatype :" << child.type() << ", dim :" << int(child.dim()) << std::endl;
schema->AddField(std::string_view(child.field_name()), DataType {child.type()}, int(child.dim()));
std::cout<<"add Field, name :" << child.field_name() << ", datatype :" << child.type() << ", dim :" << int(child.dim()) << std::endl;
schema->AddField(std::string_view(child.field_name()), DataType {child.type()}, int(child.dim()));
}
/*
schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16);

View File

@ -1,5 +1,6 @@
#pragma once
#include <src/pb/message.pb.h>
#include "dog_segment/Partition.h"
#include "SegmentDefs.h"
@ -9,10 +10,10 @@ class Collection {
public:
explicit Collection(std::string &collection_name, std::string &schema);
// TODO: set index
void set_index();
void AddIndex(const grpc::IndexParam &index_param);
void CreateIndex(std::string &index_config);
// TODO: config to schema
void parse();
public:
@ -20,13 +21,16 @@ public:
return schema_;
}
IndexMetaPtr& get_index() {
return index_;
}
std::string& get_collection_name() {
return collection_name_;
}
private:
// TODO: add Index ptr
// IndexPtr index_ = nullptr;
IndexMetaPtr index_;
std::string collection_name_;
std::string schema_json_;
SchemaPtr schema_;

View File

@ -2,7 +2,7 @@
namespace milvus::dog_segment {
Partition::Partition(std::string& partition_name, SchemaPtr& schema):
partition_name_(partition_name), schema_(schema) {}
Partition::Partition(std::string& partition_name, SchemaPtr& schema, IndexMetaPtr& index):
partition_name_(partition_name), schema_(schema), index_(index) {}
}

View File

@ -6,13 +6,17 @@ namespace milvus::dog_segment {
class Partition {
public:
explicit Partition(std::string& partition_name, SchemaPtr& schema);
explicit Partition(std::string& partition_name, SchemaPtr& schema, IndexMetaPtr& index);
public:
SchemaPtr& get_schema() {
return schema_;
}
IndexMetaPtr& get_index() {
return index_;
}
std::string& get_partition_name() {
return partition_name_;
}
@ -20,6 +24,7 @@ public:
private:
std::string partition_name_;
SchemaPtr schema_;
IndexMetaPtr index_;
};
using PartitionPtr = std::unique_ptr<Partition>;

View File

@ -21,3 +21,9 @@ DeleteCollection(CCollection collection) {
std::cout << "delete collection " << col->get_collection_name() << std::endl;
delete col;
}
void UpdateIndexs(CCollection c_collection, const char *index_string) {
auto c = (milvus::dog_segment::Collection*)c_collection;
std::string s(index_string);
c->CreateIndex(s);
}

View File

@ -10,6 +10,8 @@ NewCollection(const char* collection_name, const char* schema_conf);
void
DeleteCollection(CCollection collection);
void UpdateIndexs(CCollection c_collection, const char *index_string);
#ifdef __cplusplus
}
#endif

View File

@ -10,7 +10,9 @@ NewPartition(CCollection collection, const char* partition_name) {
auto schema = c->get_schema();
auto partition = std::make_unique<milvus::dog_segment::Partition>(name, schema);
auto index = c->get_index();
auto partition = std::make_unique<milvus::dog_segment::Partition>(name, schema, index);
// TODO: delete print
std::cout << "create partition " << name << std::endl;

View File

@ -12,7 +12,6 @@ CSegmentBase
NewSegment(CPartition partition, unsigned long segment_id) {
auto p = (milvus::dog_segment::Partition*)partition;
// TODO: remove hard code null index ptr
auto segment = milvus::dog_segment::CreateSegment(p->get_schema());
// TODO: delete print

View File

@ -15,6 +15,7 @@
#include <google/protobuf/wire_format.h>
// @@protoc_insertion_point(includes)
#include <google/protobuf/port_def.inc>
extern PROTOBUF_INTERNAL_EXPORT_message_2eproto ::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<2> scc_info_IndexParam_message_2eproto;
extern PROTOBUF_INTERNAL_EXPORT_message_2eproto ::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<1> scc_info_Schema_message_2eproto;
namespace masterpb {
class CollectionDefaultTypeInternal {
@ -41,9 +42,10 @@ static void InitDefaultsscc_info_Collection_master_2eproto() {
::masterpb::Collection::InitAsDefaultInstance();
}
::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<1> scc_info_Collection_master_2eproto =
{{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 1, InitDefaultsscc_info_Collection_master_2eproto}, {
&scc_info_Schema_message_2eproto.base,}};
::PROTOBUF_NAMESPACE_ID::internal::SCCInfo<2> scc_info_Collection_master_2eproto =
{{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 2, InitDefaultsscc_info_Collection_master_2eproto}, {
&scc_info_Schema_message_2eproto.base,
&scc_info_IndexParam_message_2eproto.base,}};
static void InitDefaultsscc_info_Segment_master_2eproto() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
@ -89,6 +91,7 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_master_2eproto::offsets[] PROT
PROTOBUF_FIELD_OFFSET(::masterpb::Collection, create_time_),
PROTOBUF_FIELD_OFFSET(::masterpb::Collection, segment_ids_),
PROTOBUF_FIELD_OFFSET(::masterpb::Collection, partition_tags_),
PROTOBUF_FIELD_OFFSET(::masterpb::Collection, indexes_),
~0u, // no _has_bits_
PROTOBUF_FIELD_OFFSET(::masterpb::Segment, _internal_metadata_),
~0u, // no _extensions_
@ -113,8 +116,8 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_master_2eproto::offsets[] PROT
};
static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = {
{ 0, -1, sizeof(::masterpb::Collection)},
{ 11, -1, sizeof(::masterpb::Segment)},
{ 24, -1, sizeof(::masterpb::SegmentStat)},
{ 12, -1, sizeof(::masterpb::Segment)},
{ 25, -1, sizeof(::masterpb::SegmentStat)},
};
static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = {
@ -125,19 +128,22 @@ static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] =
const char descriptor_table_protodef_master_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) =
"\n\014master.proto\022\010masterpb\032\rmessage.proto\""
"\215\001\n\nCollection\022\n\n\002id\030\001 \001(\004\022\014\n\004name\030\002 \001(\t"
"\267\001\n\nCollection\022\n\n\002id\030\001 \001(\004\022\014\n\004name\030\002 \001(\t"
"\022#\n\006schema\030\003 \001(\0132\023.milvus.grpc.Schema\022\023\n"
"\013create_time\030\004 \001(\004\022\023\n\013segment_ids\030\005 \003(\004\022"
"\026\n\016partition_tags\030\006 \003(\t\"\301\001\n\007Segment\022\022\n\ns"
"egment_id\030\001 \001(\004\022\025\n\rcollection_id\030\002 \001(\004\022\025"
"\n\rpartition_tag\030\003 \001(\t\022\025\n\rchannel_start\030\004"
" \001(\005\022\023\n\013channel_end\030\005 \001(\005\022\026\n\016open_timest"
"amp\030\006 \001(\004\022\027\n\017close_timestamp\030\007 \001(\004\022\027\n\017co"
"llection_name\030\010 \001(\t\"K\n\013SegmentStat\022\022\n\nse"
"gment_id\030\001 \001(\004\022\023\n\013memory_size\030\002 \001(\004\022\023\n\013m"
"emory_rate\030\003 \001(\0022I\n\006Master\022\?\n\020CreateColl"
"ection\022\024.milvus.grpc.Mapping\032\023.milvus.gr"
"pc.Status\"\000B\010Z\006masterb\006proto3"
"\026\n\016partition_tags\030\006 \003(\t\022(\n\007indexes\030\007 \003(\013"
"2\027.milvus.grpc.IndexParam\"\301\001\n\007Segment\022\022\n"
"\nsegment_id\030\001 \001(\004\022\025\n\rcollection_id\030\002 \001(\004"
"\022\025\n\rpartition_tag\030\003 \001(\t\022\025\n\rchannel_start"
"\030\004 \001(\005\022\023\n\013channel_end\030\005 \001(\005\022\026\n\016open_time"
"stamp\030\006 \001(\004\022\027\n\017close_timestamp\030\007 \001(\004\022\027\n\017"
"collection_name\030\010 \001(\t\"K\n\013SegmentStat\022\022\n\n"
"segment_id\030\001 \001(\004\022\023\n\013memory_size\030\002 \001(\004\022\023\n"
"\013memory_rate\030\003 \001(\0022\210\001\n\006Master\022\?\n\020CreateC"
"ollection\022\024.milvus.grpc.Mapping\032\023.milvus"
".grpc.Status\"\000\022=\n\013CreateIndex\022\027.milvus.g"
"rpc.IndexParam\032\023.milvus.grpc.Status\"\000B\010Z"
"\006masterb\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_master_2eproto_deps[1] = {
&::descriptor_table_message_2eproto,
@ -150,7 +156,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_mas
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_master_2eproto_once;
static bool descriptor_table_master_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_master_2eproto = {
&descriptor_table_master_2eproto_initialized, descriptor_table_protodef_master_2eproto, "master.proto", 549,
&descriptor_table_master_2eproto_initialized, descriptor_table_protodef_master_2eproto, "master.proto", 655,
&descriptor_table_master_2eproto_once, descriptor_table_master_2eproto_sccs, descriptor_table_master_2eproto_deps, 3, 1,
schemas, file_default_instances, TableStruct_master_2eproto::offsets,
file_level_metadata_master_2eproto, 3, file_level_enum_descriptors_master_2eproto, file_level_service_descriptors_master_2eproto,
@ -181,6 +187,9 @@ void Collection::clear_schema() {
}
schema_ = nullptr;
}
void Collection::clear_indexes() {
indexes_.Clear();
}
Collection::Collection()
: ::PROTOBUF_NAMESPACE_ID::Message(), _internal_metadata_(nullptr) {
SharedCtor();
@ -190,7 +199,8 @@ Collection::Collection(const Collection& from)
: ::PROTOBUF_NAMESPACE_ID::Message(),
_internal_metadata_(nullptr),
segment_ids_(from.segment_ids_),
partition_tags_(from.partition_tags_) {
partition_tags_(from.partition_tags_),
indexes_(from.indexes_) {
_internal_metadata_.MergeFrom(from._internal_metadata_);
name_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
if (!from.name().empty()) {
@ -242,6 +252,7 @@ void Collection::Clear() {
segment_ids_.Clear();
partition_tags_.Clear();
indexes_.Clear();
name_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
if (GetArenaNoVirtual() == nullptr && schema_ != nullptr) {
delete schema_;
@ -311,6 +322,18 @@ const char* Collection::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID:
} while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 50);
} else goto handle_unusual;
continue;
// repeated .milvus.grpc.IndexParam indexes = 7;
case 7:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 58)) {
ptr -= 1;
do {
ptr += 1;
ptr = ctx->ParseMessage(add_indexes(), ptr);
CHK_(ptr);
if (!ctx->DataAvailable(ptr)) break;
} while (::PROTOBUF_NAMESPACE_ID::internal::UnalignedLoad<::PROTOBUF_NAMESPACE_ID::uint8>(ptr) == 58);
} else goto handle_unusual;
continue;
default: {
handle_unusual:
if ((tag & 7) == 4 || tag == 0) {
@ -425,6 +448,17 @@ bool Collection::MergePartialFromCodedStream(
break;
}
// repeated .milvus.grpc.IndexParam indexes = 7;
case 7: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (58 & 0xFF)) {
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadMessage(
input, add_indexes()));
} else {
goto handle_unusual;
}
break;
}
default: {
handle_unusual:
if (tag == 0) {
@ -499,6 +533,15 @@ void Collection::SerializeWithCachedSizes(
6, this->partition_tags(i), output);
}
// repeated .milvus.grpc.IndexParam indexes = 7;
for (unsigned int i = 0,
n = static_cast<unsigned int>(this->indexes_size()); i < n; i++) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteMessageMaybeToArray(
7,
this->indexes(static_cast<int>(i)),
output);
}
if (_internal_metadata_.have_unknown_fields()) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFields(
_internal_metadata_.unknown_fields(), output);
@ -563,6 +606,14 @@ void Collection::SerializeWithCachedSizes(
WriteStringToArray(6, this->partition_tags(i), target);
}
// repeated .milvus.grpc.IndexParam indexes = 7;
for (unsigned int i = 0,
n = static_cast<unsigned int>(this->indexes_size()); i < n; i++) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::
InternalWriteMessageToArray(
7, this->indexes(static_cast<int>(i)), target);
}
if (_internal_metadata_.have_unknown_fields()) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFieldsToArray(
_internal_metadata_.unknown_fields(), target);
@ -607,6 +658,17 @@ size_t Collection::ByteSizeLong() const {
this->partition_tags(i));
}
// repeated .milvus.grpc.IndexParam indexes = 7;
{
unsigned int count = static_cast<unsigned int>(this->indexes_size());
total_size += 1UL * count;
for (unsigned int i = 0; i < count; i++) {
total_size +=
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::MessageSize(
this->indexes(static_cast<int>(i)));
}
}
// string name = 2;
if (this->name().size() > 0) {
total_size += 1 +
@ -664,6 +726,7 @@ void Collection::MergeFrom(const Collection& from) {
segment_ids_.MergeFrom(from.segment_ids_);
partition_tags_.MergeFrom(from.partition_tags_);
indexes_.MergeFrom(from.indexes_);
if (from.name().size() > 0) {
name_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.name_);
@ -702,6 +765,7 @@ void Collection::InternalSwap(Collection* other) {
_internal_metadata_.Swap(&other->_internal_metadata_);
segment_ids_.InternalSwap(&other->segment_ids_);
partition_tags_.InternalSwap(CastToBase(&other->partition_tags_));
CastToBase(&indexes_)->InternalSwap(CastToBase(&other->indexes_));
name_.Swap(&other->name_, &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(),
GetArenaNoVirtual());
swap(schema_, other->schema_);

View File

@ -190,6 +190,7 @@ class Collection :
enum : int {
kSegmentIdsFieldNumber = 5,
kPartitionTagsFieldNumber = 6,
kIndexesFieldNumber = 7,
kNameFieldNumber = 2,
kSchemaFieldNumber = 3,
kIdFieldNumber = 1,
@ -223,6 +224,17 @@ class Collection :
const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string>& partition_tags() const;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string>* mutable_partition_tags();
// repeated .milvus.grpc.IndexParam indexes = 7;
int indexes_size() const;
void clear_indexes();
::milvus::grpc::IndexParam* mutable_indexes(int index);
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::IndexParam >*
mutable_indexes();
const ::milvus::grpc::IndexParam& indexes(int index) const;
::milvus::grpc::IndexParam* add_indexes();
const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::IndexParam >&
indexes() const;
// string name = 2;
void clear_name();
const std::string& name() const;
@ -260,6 +272,7 @@ class Collection :
::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::uint64 > segment_ids_;
mutable std::atomic<int> _segment_ids_cached_byte_size_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string> partition_tags_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::IndexParam > indexes_;
::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr name_;
::milvus::grpc::Schema* schema_;
::PROTOBUF_NAMESPACE_ID::uint64 id_;
@ -834,6 +847,33 @@ Collection::mutable_partition_tags() {
return &partition_tags_;
}
// repeated .milvus.grpc.IndexParam indexes = 7;
inline int Collection::indexes_size() const {
return indexes_.size();
}
inline ::milvus::grpc::IndexParam* Collection::mutable_indexes(int index) {
// @@protoc_insertion_point(field_mutable:masterpb.Collection.indexes)
return indexes_.Mutable(index);
}
inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::IndexParam >*
Collection::mutable_indexes() {
// @@protoc_insertion_point(field_mutable_list:masterpb.Collection.indexes)
return &indexes_;
}
inline const ::milvus::grpc::IndexParam& Collection::indexes(int index) const {
// @@protoc_insertion_point(field_get:masterpb.Collection.indexes)
return indexes_.Get(index);
}
inline ::milvus::grpc::IndexParam* Collection::add_indexes() {
// @@protoc_insertion_point(field_add:masterpb.Collection.indexes)
return indexes_.Add();
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::IndexParam >&
Collection::indexes() const {
// @@protoc_insertion_point(field_list:masterpb.Collection.indexes)
return indexes_;
}
// -------------------------------------------------------------------
// Segment

View File

@ -24,8 +24,10 @@ import (
const SegmentLifetime = 20000
const (
SegmentOpened = 0
SegmentClosed = 1
SegmentOpened = 0
SegmentClosed = 1
SegmentIndexing = 2
SegmentIndexed = 3
)
type Segment struct {
@ -33,6 +35,7 @@ type Segment struct {
SegmentId int64
SegmentCloseTime uint64
LastMemSize uint64
SegmentStatus int
}
func (s *Segment) GetStatus() int {