Fix bug and add meta wrapper

Signed-off-by: shengjh <1572099106@qq.com>
pull/4973/head^2
shengjh 2020-09-19 17:10:23 +08:00 committed by yefu.chen
parent 6bc7e6d372
commit 8c260e8112
67 changed files with 39645 additions and 374 deletions

View File

@ -11,12 +11,12 @@
master:
address: localhost
port: 6000
port: 53100
etcd:
address: localhost
port: 2379
rootpath: suvlim
rootpath: by-dev/
segthreshold: 10000
timesync:

View File

@ -1,20 +0,0 @@
project(sulvim_core)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
cmake_minimum_required(VERSION 3.16)
set( CMAKE_CXX_STANDARD 17 )
set( CMAKE_CXX_STANDARD_REQUIRED on )
set (CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH};${CMAKE_CURRENT_SOURCE_DIR}/cmake")
include_directories(src)
add_subdirectory(src)
add_subdirectory(unittest)
install(
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/dog_segment/
DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/include
FILES_MATCHING PATTERN "*_c.h"
)
install(FILES ${CMAKE_BINARY_DIR}/src/dog_segment/libmilvus_dog_segment.so
DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/lib)

View File

@ -73,6 +73,9 @@ BuildIndex(CSegmentBase c_segment);
bool
IsOpened(CSegmentBase c_segment);
long int
GetMemoryUsageInBytes(CSegmentBase c_segment);
//////////////////////////////////////////////////////////////////
long int

View File

@ -1,4 +0,0 @@
add_subdirectory(utils)
add_subdirectory(dog_segment)
#add_subdirectory(index)
add_subdirectory(query)

View File

@ -1,3 +1,5 @@
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/../pb PB_SRC_FILES)
set(DOG_SEGMENT_FILES
SegmentNaive.cpp
IndexMeta.cpp
@ -7,10 +9,13 @@ set(DOG_SEGMENT_FILES
collection_c.cpp
partition_c.cpp
segment_c.cpp
${PB_SRC_FILES}
)
add_library(milvus_dog_segment SHARED
${DOG_SEGMENT_FILES}
)
#add_dependencies( segment sqlite mysqlpp )
target_link_libraries(milvus_dog_segment tbb utils pthread knowhere log)
target_link_libraries(milvus_dog_segment tbb utils pthread knowhere log libprotobuf)

View File

@ -1,5 +1,7 @@
#include "Collection.h"
#include "pb/master.pb.h"
//using Collection = masterpb::Collection;
#include <google/protobuf/text_format.h>
namespace milvus::dog_segment {
Collection::Collection(std::string &collection_name, std::string &schema):
@ -12,10 +14,32 @@ Collection::set_index() {}
void
Collection::parse() {
if(schema_json_ == "") {
auto schema = std::make_shared<Schema>();
schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16);
schema->AddField("age", DataType::INT32);
schema_ = schema;
return;
}
masterpb::Collection collection;
auto suc = google::protobuf::TextFormat::ParseFromString(schema_json_, &collection);
if (!suc) {
std::cerr << "unmarshal failed" << std::endl;
}
auto schema = std::make_shared<Schema>();
for (const milvus::grpc::FieldMeta & child: collection.schema().field_metas()){
std::cout<<"add Field, name :" << child.field_name() << std::endl;
schema->AddField(std::string_view(child.field_name()), DataType {child.type()}, int(child.dim()));
}
/*
schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16);
schema->AddField("age", DataType::INT32);
*/
schema_ = schema;
}
}

View File

@ -177,6 +177,13 @@ IsOpened(CSegmentBase c_segment) {
return status == milvus::dog_segment::SegmentBase::SegmentState::Open;
}
long int
GetMemoryUsageInBytes(CSegmentBase c_segment) {
auto segment = (milvus::dog_segment::SegmentBase*)c_segment;
auto mem_size = segment->GetMemoryUsageInBytes();
return mem_size;
}
//////////////////////////////////////////////////////////////////
long int

View File

@ -73,6 +73,9 @@ BuildIndex(CSegmentBase c_segment);
bool
IsOpened(CSegmentBase c_segment);
long int
GetMemoryUsageInBytes(CSegmentBase c_segment);
//////////////////////////////////////////////////////////////////
long int

1654
core/src/pb/master.pb.cc Normal file

File diff suppressed because it is too large Load Diff

1088
core/src/pb/master.pb.h Normal file

File diff suppressed because it is too large Load Diff

19734
core/src/pb/message.pb.cc Normal file

File diff suppressed because it is too large Load Diff

13531
core/src/pb/message.pb.h Normal file

File diff suppressed because it is too large Load Diff

View File

@ -41,6 +41,7 @@ using ResultIds = std::vector<idx_t>;
using ResultDistances = std::vector<distance_t>;
///////////////////////////////////////////////////////////////////////////////////////////////////
enum class DataType {
NONE = 0,
BOOL = 1,

View File

@ -45,4 +45,6 @@ endif()
# ****************************** Thirdparty opentracing ***************************************
if ( MILVUS_WITH_OPENTRACING )
add_subdirectory( opentracing )
endif()
endif()
add_subdirectory( protobuf )

66
core/thirdparty/protobuf/CMakeLists.txt vendored Normal file
View File

@ -0,0 +1,66 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if ( DEFINED ENV{MILVUS_GTEST_URL} )
set( GTEST_SOURCE_URL "$ENV{MILVUS_GTEST_URL}" )
else()
set( GTEST_SOURCE_URL
"https://github.com/protocolbuffers/protobuf/releases/download/v3.9.0/protobuf-cpp-3.9.0.zip")
endif()
message( STATUS "Building protobuf-cpp-3.9.0 from source" )
FetchContent_Declare(
protobuf
URL ${GTEST_SOURCE_URL}
URL_MD5 "9562b27cc6ac5ebd087f201f1310c885"
DOWNLOAD_DIR ${MILVUS_BINARY_DIR}/3rdparty_download/download
SOURCE_DIR ${CMAKE_CURRENT_BINARY_DIR}/protobuf-src
BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/protobuf-build
)
if ( NOT protobuf_POPULATED )
FetchContent_Populate( protobuf )
add_subdirectory(${protobuf_SOURCE_DIR}/cmake
${protobuf_BINARY_DIR}
EXCLUDE_FROM_ALL)
endif()
set(_PROTOBUF_LIBRARY_NAME "libprotobuf")
set(gRPC_PROTOBUF_PROVIDER "module" CACHE STRING "Provider of protobuf library")
set_property(CACHE gRPC_PROTOBUF_PROVIDER PROPERTY STRINGS "module" "package")
# Building the protobuf tests require gmock what is not part of a standard protobuf checkout.
# Disable them unless they are explicitly requested from the cmake command line (when we assume
# gmock is downloaded to the right location inside protobuf).
if(NOT protobuf_BUILD_TESTS)
set(protobuf_BUILD_TESTS OFF CACHE BOOL "Build protobuf tests")
endif()
# Disable building protobuf with zlib. Building protobuf with zlib breaks
# the build if zlib is not installed on the system.
if(NOT protobuf_WITH_ZLIB)
set(protobuf_WITH_ZLIB OFF CACHE BOOL "Build protobuf with zlib.")
endif()
if(NOT PROTOBUF_ROOT_DIR)
#set(PROTOBUF_ROOT_DIR ${CMAKE_CURRENT_SOURCE_DIR}/third_party/protobuf)
set(PROTOBUF_ROOT_DIR ${protobuf_SOURCE_DIR})
endif()

View File

@ -3,4 +3,5 @@ GTEST_VERSION=1.8.1
YAMLCPP_VERSION=0.6.3
ZLIB_VERSION=v1.2.11
OPENTRACING_VERSION=v1.5.1
PROTOBUF_VERSION=3.9.0
# vim: set filetype=sh:

View File

@ -247,6 +247,50 @@ TEST(CApiTest, CloseTest) {
}
TEST(CApiTest, GetMemoryUsageInBytesTest) {
auto collection_name = "collection0";
auto schema_tmp_conf = "null_schema";
auto collection = NewCollection(collection_name, schema_tmp_conf);
auto partition_name = "partition0";
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
int N = 10000;
std::default_random_engine e(67);
for (int i = 0; i < N; ++i) {
uids.push_back(100000 + i);
timestamps.push_back(0);
// append vec
float vec[16];
for (auto &x: vec) {
x = e() % 2000 * 0.001 - 1.0;
}
raw_data.insert(raw_data.end(), (const char *) std::begin(vec), (const char *) std::end(vec));
int age = e() % 100;
raw_data.insert(raw_data.end(), (const char *) &age, ((const char *) &age) + sizeof(age));
}
auto line_sizeof = (sizeof(int) + sizeof(float) * 16);
auto offset = PreInsert(segment, N);
auto res = Insert(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), (int) line_sizeof, N);
assert(res == 0);
auto memory_usage_size = GetMemoryUsageInBytes(segment);
assert(memory_usage_size == 1898459);
DeleteCollection(collection);
DeletePartition(partition);
DeleteSegment(segment);
}
namespace {
auto generate_data(int N) {
std::vector<char> raw_data;

View File

@ -3,7 +3,7 @@ package common
import "time"
const (
PULSAR_URL = "pulsar://localhost:16650"
PULSAR_URL = "pulsar://localhost:6650"
PULSAR_MONITER_INTERVAL = 1 * time.Second
PULSAR_TOPIC = "monitor-topic"
ETCD_ROOT_PATH = "by-dev"

View File

@ -34,6 +34,11 @@ func NewEtcdKVBase(client *clientv3.Client, rootPath string) *EtcdKVBase {
}
}
func (kv *EtcdKVBase) Close(){
kv.client.Close()
}
func (kv *EtcdKVBase) LoadWithPrefix(key string) ( []string, []string) {
key = path.Join(kv.rootPath, key)
println("in loadWithPrefix,", key)

View File

@ -184,7 +184,6 @@ func CollectionController(ch chan *messagepb.Mapping) {
if err != nil {
log.Fatal(err)
}
}
}

View File

@ -80,7 +80,7 @@ set( SERVER_LIBS server )
add_executable( milvus_server ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
)
target_include_directories(server PUBLIC ${PROJECT_BINARY_DIR}/thirdparty/pulsar-client-cpp/pulsar-client-cpp-src/pulsar-client-cpp/include)
target_include_directories(server PRIVATE ${PROJECT_BINARY_DIR}/thirdparty)
target_link_libraries( milvus_server PRIVATE ${SERVER_LIBS} )

View File

@ -82,7 +82,7 @@ ConfigMgr::ConfigMgr() {
{"network.port", CreateIntegerConfig("network.port", false, 0, 65535, &config.network.port.value,
19530, nullptr, nullptr)},
/* pulsar */
{"pulsar.address", CreateStringConfig("pulsar.address", false, &config.pulsar.address.value,
"localhost", nullptr, nullptr)},
@ -94,6 +94,17 @@ ConfigMgr::ConfigMgr() {
{"master.port", CreateIntegerConfig("master.port", false, 0, 65535, &config.master.port.value,
6000, nullptr, nullptr)},
/* etcd */
{"etcd.address", CreateStringConfig("etcd.address", false, &config.etcd.address.value, "localhost", nullptr,
nullptr)},
{"etcd.port", CreateIntegerConfig("etcd.port", false, 0, 65535, &config.etcd.port.value,
6000,nullptr, nullptr)},
{"etcd.rootpath", CreateStringConfig("etcd.rootpath", false, &config.etcd.rootpath.value, "by-dev", nullptr,
nullptr)},
/* time sync */
{"timesync.interval", CreateIntegerConfig("timesync.interval", false, 0, std::numeric_limits<int64_t >::max(), &config.timesync.interval.value, 10,
nullptr, nullptr)},
/* log */
{"logs.level", CreateStringConfig("logs.level", false, &config.logs.level.value, "debug", nullptr, nullptr)},
@ -146,6 +157,9 @@ ConfigMgr::Load(const std::string& path) {
// auto proxy_yaml = yaml["porxy"];
auto other_yaml = YAML::Node{};
other_yaml["pulsar"] = yaml["pulsar"];
other_yaml["master"] = yaml["master"];
other_yaml["etcd"] = yaml["etcd"];
other_yaml["timesync"] = yaml["timesync"];
Flatten(yaml["proxy"], flattened, "");
Flatten(other_yaml, flattened, "");
// Flatten(yaml["proxy"], flattened, "");

View File

@ -82,6 +82,15 @@ struct ServerConfig {
Integer port{6000};
}master;
struct Etcd{
String address{"localhost"};
Integer port{2379};
String rootpath{"by-dev"};
}etcd;
struct TimeSync{
Integer interval{10};
}timesync;
struct Engine {
Integer build_index_threshold{4096};

View File

@ -66,5 +66,66 @@ Watch::Service::~Service() {
}
static const char* KV_method_names[] = {
"/etcdserverpb.KV/Range",
};
std::unique_ptr< KV::Stub> KV::NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options) {
(void)options;
std::unique_ptr< KV::Stub> stub(new KV::Stub(channel));
return stub;
}
KV::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel)
: channel_(channel), rpcmethod_Range_(KV_method_names[0], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
{}
::grpc::Status KV::Stub::Range(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::etcdserverpb::RangeResponse* response) {
return ::grpc::internal::BlockingUnaryCall(channel_.get(), rpcmethod_Range_, context, request, response);
}
void KV::Stub::experimental_async::Range(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest* request, ::etcdserverpb::RangeResponse* response, std::function<void(::grpc::Status)> f) {
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_Range_, context, request, response, std::move(f));
}
void KV::Stub::experimental_async::Range(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::etcdserverpb::RangeResponse* response, std::function<void(::grpc::Status)> f) {
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_Range_, context, request, response, std::move(f));
}
void KV::Stub::experimental_async::Range(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest* request, ::etcdserverpb::RangeResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_Range_, context, request, response, reactor);
}
void KV::Stub::experimental_async::Range(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::etcdserverpb::RangeResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_Range_, context, request, response, reactor);
}
::grpc::ClientAsyncResponseReader< ::etcdserverpb::RangeResponse>* KV::Stub::AsyncRangeRaw(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::etcdserverpb::RangeResponse>::Create(channel_.get(), cq, rpcmethod_Range_, context, request, true);
}
::grpc::ClientAsyncResponseReader< ::etcdserverpb::RangeResponse>* KV::Stub::PrepareAsyncRangeRaw(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::etcdserverpb::RangeResponse>::Create(channel_.get(), cq, rpcmethod_Range_, context, request, false);
}
KV::Service::Service() {
AddMethod(new ::grpc::internal::RpcServiceMethod(
KV_method_names[0],
::grpc::internal::RpcMethod::NORMAL_RPC,
new ::grpc::internal::RpcMethodHandler< KV::Service, ::etcdserverpb::RangeRequest, ::etcdserverpb::RangeResponse>(
std::mem_fn(&KV::Service::Range), this)));
}
KV::Service::~Service() {
}
::grpc::Status KV::Service::Range(::grpc::ServerContext* context, const ::etcdserverpb::RangeRequest* request, ::etcdserverpb::RangeResponse* response) {
(void) context;
(void) request;
(void) response;
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
} // namespace etcdserverpb

View File

@ -229,6 +229,217 @@ class Watch final {
typedef Service StreamedService;
};
class KV final {
public:
static constexpr char const* service_full_name() {
return "etcdserverpb.KV";
}
class StubInterface {
public:
virtual ~StubInterface() {}
// Range gets the keys in the range from the key-value store.
virtual ::grpc::Status Range(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::etcdserverpb::RangeResponse* response) = 0;
std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::etcdserverpb::RangeResponse>> AsyncRange(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::etcdserverpb::RangeResponse>>(AsyncRangeRaw(context, request, cq));
}
std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::etcdserverpb::RangeResponse>> PrepareAsyncRange(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::etcdserverpb::RangeResponse>>(PrepareAsyncRangeRaw(context, request, cq));
}
class experimental_async_interface {
public:
virtual ~experimental_async_interface() {}
// Range gets the keys in the range from the key-value store.
virtual void Range(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest* request, ::etcdserverpb::RangeResponse* response, std::function<void(::grpc::Status)>) = 0;
virtual void Range(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::etcdserverpb::RangeResponse* response, std::function<void(::grpc::Status)>) = 0;
virtual void Range(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest* request, ::etcdserverpb::RangeResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0;
virtual void Range(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::etcdserverpb::RangeResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0;
};
virtual class experimental_async_interface* experimental_async() { return nullptr; }
private:
virtual ::grpc::ClientAsyncResponseReaderInterface< ::etcdserverpb::RangeResponse>* AsyncRangeRaw(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::grpc::CompletionQueue* cq) = 0;
virtual ::grpc::ClientAsyncResponseReaderInterface< ::etcdserverpb::RangeResponse>* PrepareAsyncRangeRaw(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::grpc::CompletionQueue* cq) = 0;
};
class Stub final : public StubInterface {
public:
Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
::grpc::Status Range(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::etcdserverpb::RangeResponse* response) override;
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::etcdserverpb::RangeResponse>> AsyncRange(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::etcdserverpb::RangeResponse>>(AsyncRangeRaw(context, request, cq));
}
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::etcdserverpb::RangeResponse>> PrepareAsyncRange(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::etcdserverpb::RangeResponse>>(PrepareAsyncRangeRaw(context, request, cq));
}
class experimental_async final :
public StubInterface::experimental_async_interface {
public:
void Range(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest* request, ::etcdserverpb::RangeResponse* response, std::function<void(::grpc::Status)>) override;
void Range(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::etcdserverpb::RangeResponse* response, std::function<void(::grpc::Status)>) override;
void Range(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest* request, ::etcdserverpb::RangeResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) override;
void Range(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::etcdserverpb::RangeResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) override;
private:
friend class Stub;
explicit experimental_async(Stub* stub): stub_(stub) { }
Stub* stub() { return stub_; }
Stub* stub_;
};
class experimental_async_interface* experimental_async() override { return &async_stub_; }
private:
std::shared_ptr< ::grpc::ChannelInterface> channel_;
class experimental_async async_stub_{this};
::grpc::ClientAsyncResponseReader< ::etcdserverpb::RangeResponse>* AsyncRangeRaw(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientAsyncResponseReader< ::etcdserverpb::RangeResponse>* PrepareAsyncRangeRaw(::grpc::ClientContext* context, const ::etcdserverpb::RangeRequest& request, ::grpc::CompletionQueue* cq) override;
const ::grpc::internal::RpcMethod rpcmethod_Range_;
};
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
class Service : public ::grpc::Service {
public:
Service();
virtual ~Service();
// Range gets the keys in the range from the key-value store.
virtual ::grpc::Status Range(::grpc::ServerContext* context, const ::etcdserverpb::RangeRequest* request, ::etcdserverpb::RangeResponse* response);
};
template <class BaseClass>
class WithAsyncMethod_Range : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithAsyncMethod_Range() {
::grpc::Service::MarkMethodAsync(0);
}
~WithAsyncMethod_Range() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Range(::grpc::ServerContext* /*context*/, const ::etcdserverpb::RangeRequest* /*request*/, ::etcdserverpb::RangeResponse* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
void RequestRange(::grpc::ServerContext* context, ::etcdserverpb::RangeRequest* request, ::grpc::ServerAsyncResponseWriter< ::etcdserverpb::RangeResponse>* response, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
::grpc::Service::RequestAsyncUnary(0, context, request, response, new_call_cq, notification_cq, tag);
}
};
typedef WithAsyncMethod_Range<Service > AsyncService;
template <class BaseClass>
class ExperimentalWithCallbackMethod_Range : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
ExperimentalWithCallbackMethod_Range() {
::grpc::Service::experimental().MarkMethodCallback(0,
new ::grpc_impl::internal::CallbackUnaryHandler< ::etcdserverpb::RangeRequest, ::etcdserverpb::RangeResponse>(
[this](::grpc::ServerContext* context,
const ::etcdserverpb::RangeRequest* request,
::etcdserverpb::RangeResponse* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
return this->Range(context, request, response, controller);
}));
}
void SetMessageAllocatorFor_Range(
::grpc::experimental::MessageAllocator< ::etcdserverpb::RangeRequest, ::etcdserverpb::RangeResponse>* allocator) {
static_cast<::grpc_impl::internal::CallbackUnaryHandler< ::etcdserverpb::RangeRequest, ::etcdserverpb::RangeResponse>*>(
::grpc::Service::experimental().GetHandler(0))
->SetMessageAllocator(allocator);
}
~ExperimentalWithCallbackMethod_Range() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Range(::grpc::ServerContext* /*context*/, const ::etcdserverpb::RangeRequest* /*request*/, ::etcdserverpb::RangeResponse* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
virtual void Range(::grpc::ServerContext* /*context*/, const ::etcdserverpb::RangeRequest* /*request*/, ::etcdserverpb::RangeResponse* /*response*/, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); }
};
typedef ExperimentalWithCallbackMethod_Range<Service > ExperimentalCallbackService;
template <class BaseClass>
class WithGenericMethod_Range : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithGenericMethod_Range() {
::grpc::Service::MarkMethodGeneric(0);
}
~WithGenericMethod_Range() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Range(::grpc::ServerContext* /*context*/, const ::etcdserverpb::RangeRequest* /*request*/, ::etcdserverpb::RangeResponse* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
};
template <class BaseClass>
class WithRawMethod_Range : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithRawMethod_Range() {
::grpc::Service::MarkMethodRaw(0);
}
~WithRawMethod_Range() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Range(::grpc::ServerContext* /*context*/, const ::etcdserverpb::RangeRequest* /*request*/, ::etcdserverpb::RangeResponse* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
void RequestRange(::grpc::ServerContext* context, ::grpc::ByteBuffer* request, ::grpc::ServerAsyncResponseWriter< ::grpc::ByteBuffer>* response, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
::grpc::Service::RequestAsyncUnary(0, context, request, response, new_call_cq, notification_cq, tag);
}
};
template <class BaseClass>
class ExperimentalWithRawCallbackMethod_Range : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
ExperimentalWithRawCallbackMethod_Range() {
::grpc::Service::experimental().MarkMethodRawCallback(0,
new ::grpc_impl::internal::CallbackUnaryHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
[this](::grpc::ServerContext* context,
const ::grpc::ByteBuffer* request,
::grpc::ByteBuffer* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
this->Range(context, request, response, controller);
}));
}
~ExperimentalWithRawCallbackMethod_Range() override {
BaseClassMustBeDerivedFromService(this);
}
// disable synchronous version of this method
::grpc::Status Range(::grpc::ServerContext* /*context*/, const ::etcdserverpb::RangeRequest* /*request*/, ::etcdserverpb::RangeResponse* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
virtual void Range(::grpc::ServerContext* /*context*/, const ::grpc::ByteBuffer* /*request*/, ::grpc::ByteBuffer* /*response*/, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); }
};
template <class BaseClass>
class WithStreamedUnaryMethod_Range : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service* /*service*/) {}
public:
WithStreamedUnaryMethod_Range() {
::grpc::Service::MarkMethodStreamed(0,
new ::grpc::internal::StreamedUnaryHandler< ::etcdserverpb::RangeRequest, ::etcdserverpb::RangeResponse>(std::bind(&WithStreamedUnaryMethod_Range<BaseClass>::StreamedRange, this, std::placeholders::_1, std::placeholders::_2)));
}
~WithStreamedUnaryMethod_Range() override {
BaseClassMustBeDerivedFromService(this);
}
// disable regular version of this method
::grpc::Status Range(::grpc::ServerContext* /*context*/, const ::etcdserverpb::RangeRequest* /*request*/, ::etcdserverpb::RangeResponse* /*response*/) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
// replace default version of method with streamed unary
virtual ::grpc::Status StreamedRange(::grpc::ServerContext* context, ::grpc::ServerUnaryStreamer< ::etcdserverpb::RangeRequest,::etcdserverpb::RangeResponse>* server_unary_streamer) = 0;
};
typedef WithStreamedUnaryMethod_Range<Service > StreamedUnaryService;
typedef Service SplitStreamedService;
typedef WithStreamedUnaryMethod_Range<Service > StreamedService;
};
} // namespace etcdserverpb

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -101,6 +101,7 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_master_2eproto::offsets[] PROT
PROTOBUF_FIELD_OFFSET(::masterpb::Segment, channel_end_),
PROTOBUF_FIELD_OFFSET(::masterpb::Segment, open_timestamp_),
PROTOBUF_FIELD_OFFSET(::masterpb::Segment, close_timestamp_),
PROTOBUF_FIELD_OFFSET(::masterpb::Segment, collection_name_),
~0u, // no _has_bits_
PROTOBUF_FIELD_OFFSET(::masterpb::SegmentStat, _internal_metadata_),
~0u, // no _extensions_
@ -113,7 +114,7 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_master_2eproto::offsets[] PROT
static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = {
{ 0, -1, sizeof(::masterpb::Collection)},
{ 11, -1, sizeof(::masterpb::Segment)},
{ 23, -1, sizeof(::masterpb::SegmentStat)},
{ 24, -1, sizeof(::masterpb::SegmentStat)},
};
static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = {
@ -127,16 +128,16 @@ const char descriptor_table_protodef_master_2eproto[] PROTOBUF_SECTION_VARIABLE(
"\215\001\n\nCollection\022\n\n\002id\030\001 \001(\004\022\014\n\004name\030\002 \001(\t"
"\022#\n\006schema\030\003 \001(\0132\023.milvus.grpc.Schema\022\023\n"
"\013create_time\030\004 \001(\004\022\023\n\013segment_ids\030\005 \003(\004\022"
"\026\n\016partition_tags\030\006 \003(\t\"\250\001\n\007Segment\022\022\n\ns"
"\026\n\016partition_tags\030\006 \003(\t\"\301\001\n\007Segment\022\022\n\ns"
"egment_id\030\001 \001(\004\022\025\n\rcollection_id\030\002 \001(\004\022\025"
"\n\rpartition_tag\030\003 \001(\t\022\025\n\rchannel_start\030\004"
" \001(\005\022\023\n\013channel_end\030\005 \001(\005\022\026\n\016open_timest"
"amp\030\006 \001(\004\022\027\n\017close_timestamp\030\007 \001(\004\"K\n\013Se"
"gmentStat\022\022\n\nsegment_id\030\001 \001(\004\022\023\n\013memory_"
"size\030\002 \001(\004\022\023\n\013memory_rate\030\003 \001(\0022I\n\006Maste"
"r\022\?\n\020CreateCollection\022\024.milvus.grpc.Mapp"
"ing\032\023.milvus.grpc.Status\"\000B\010Z\006masterb\006pr"
"oto3"
"amp\030\006 \001(\004\022\027\n\017close_timestamp\030\007 \001(\004\022\027\n\017co"
"llection_name\030\010 \001(\t\"K\n\013SegmentStat\022\022\n\nse"
"gment_id\030\001 \001(\004\022\023\n\013memory_size\030\002 \001(\004\022\023\n\013m"
"emory_rate\030\003 \001(\0022I\n\006Master\022\?\n\020CreateColl"
"ection\022\024.milvus.grpc.Mapping\032\023.milvus.gr"
"pc.Status\"\000B\010Z\006masterb\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_master_2eproto_deps[1] = {
&::descriptor_table_message_2eproto,
@ -149,7 +150,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_mas
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_master_2eproto_once;
static bool descriptor_table_master_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_master_2eproto = {
&descriptor_table_master_2eproto_initialized, descriptor_table_protodef_master_2eproto, "master.proto", 524,
&descriptor_table_master_2eproto_initialized, descriptor_table_protodef_master_2eproto, "master.proto", 549,
&descriptor_table_master_2eproto_once, descriptor_table_master_2eproto_sccs, descriptor_table_master_2eproto_deps, 3, 1,
schemas, file_default_instances, TableStruct_master_2eproto::offsets,
file_level_metadata_master_2eproto, 3, file_level_enum_descriptors_master_2eproto, file_level_service_descriptors_master_2eproto,
@ -734,6 +735,10 @@ Segment::Segment(const Segment& from)
if (!from.partition_tag().empty()) {
partition_tag_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.partition_tag_);
}
collection_name_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
if (!from.collection_name().empty()) {
collection_name_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.collection_name_);
}
::memcpy(&segment_id_, &from.segment_id_,
static_cast<size_t>(reinterpret_cast<char*>(&close_timestamp_) -
reinterpret_cast<char*>(&segment_id_)) + sizeof(close_timestamp_));
@ -743,6 +748,7 @@ Segment::Segment(const Segment& from)
void Segment::SharedCtor() {
::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&scc_info_Segment_master_2eproto.base);
partition_tag_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
collection_name_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
::memset(&segment_id_, 0, static_cast<size_t>(
reinterpret_cast<char*>(&close_timestamp_) -
reinterpret_cast<char*>(&segment_id_)) + sizeof(close_timestamp_));
@ -755,6 +761,7 @@ Segment::~Segment() {
void Segment::SharedDtor() {
partition_tag_.DestroyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
collection_name_.DestroyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
}
void Segment::SetCachedSize(int size) const {
@ -773,6 +780,7 @@ void Segment::Clear() {
(void) cached_has_bits;
partition_tag_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
collection_name_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
::memset(&segment_id_, 0, static_cast<size_t>(
reinterpret_cast<char*>(&close_timestamp_) -
reinterpret_cast<char*>(&segment_id_)) + sizeof(close_timestamp_));
@ -836,6 +844,13 @@ const char* Segment::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::in
CHK_(ptr);
} else goto handle_unusual;
continue;
// string collection_name = 8;
case 8:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 66)) {
ptr = ::PROTOBUF_NAMESPACE_ID::internal::InlineGreedyStringParserUTF8(mutable_collection_name(), ptr, ctx, "masterpb.Segment.collection_name");
CHK_(ptr);
} else goto handle_unusual;
continue;
default: {
handle_unusual:
if ((tag & 7) == 4 || tag == 0) {
@ -959,6 +974,21 @@ bool Segment::MergePartialFromCodedStream(
break;
}
// string collection_name = 8;
case 8: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (66 & 0xFF)) {
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadString(
input, this->mutable_collection_name()));
DO_(::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String(
this->collection_name().data(), static_cast<int>(this->collection_name().length()),
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::PARSE,
"masterpb.Segment.collection_name"));
} else {
goto handle_unusual;
}
break;
}
default: {
handle_unusual:
if (tag == 0) {
@ -1026,6 +1056,16 @@ void Segment::SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteUInt64(7, this->close_timestamp(), output);
}
// string collection_name = 8;
if (this->collection_name().size() > 0) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String(
this->collection_name().data(), static_cast<int>(this->collection_name().length()),
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE,
"masterpb.Segment.collection_name");
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteStringMaybeAliased(
8, this->collection_name(), output);
}
if (_internal_metadata_.have_unknown_fields()) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFields(
_internal_metadata_.unknown_fields(), output);
@ -1080,6 +1120,17 @@ void Segment::SerializeWithCachedSizes(
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteUInt64ToArray(7, this->close_timestamp(), target);
}
// string collection_name = 8;
if (this->collection_name().size() > 0) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String(
this->collection_name().data(), static_cast<int>(this->collection_name().length()),
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE,
"masterpb.Segment.collection_name");
target =
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteStringToArray(
8, this->collection_name(), target);
}
if (_internal_metadata_.have_unknown_fields()) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFieldsToArray(
_internal_metadata_.unknown_fields(), target);
@ -1108,6 +1159,13 @@ size_t Segment::ByteSizeLong() const {
this->partition_tag());
}
// string collection_name = 8;
if (this->collection_name().size() > 0) {
total_size += 1 +
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::StringSize(
this->collection_name());
}
// uint64 segment_id = 1;
if (this->segment_id() != 0) {
total_size += 1 +
@ -1181,6 +1239,10 @@ void Segment::MergeFrom(const Segment& from) {
partition_tag_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.partition_tag_);
}
if (from.collection_name().size() > 0) {
collection_name_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.collection_name_);
}
if (from.segment_id() != 0) {
set_segment_id(from.segment_id());
}
@ -1224,6 +1286,8 @@ void Segment::InternalSwap(Segment* other) {
_internal_metadata_.Swap(&other->_internal_metadata_);
partition_tag_.Swap(&other->partition_tag_, &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(),
GetArenaNoVirtual());
collection_name_.Swap(&other->collection_name_, &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(),
GetArenaNoVirtual());
swap(segment_id_, other->segment_id_);
swap(collection_id_, other->collection_id_);
swap(channel_start_, other->channel_start_);

View File

@ -383,6 +383,7 @@ class Segment :
enum : int {
kPartitionTagFieldNumber = 3,
kCollectionNameFieldNumber = 8,
kSegmentIdFieldNumber = 1,
kCollectionIdFieldNumber = 2,
kChannelStartFieldNumber = 4,
@ -401,6 +402,17 @@ class Segment :
std::string* release_partition_tag();
void set_allocated_partition_tag(std::string* partition_tag);
// string collection_name = 8;
void clear_collection_name();
const std::string& collection_name() const;
void set_collection_name(const std::string& value);
void set_collection_name(std::string&& value);
void set_collection_name(const char* value);
void set_collection_name(const char* value, size_t size);
std::string* mutable_collection_name();
std::string* release_collection_name();
void set_allocated_collection_name(std::string* collection_name);
// uint64 segment_id = 1;
void clear_segment_id();
::PROTOBUF_NAMESPACE_ID::uint64 segment_id() const;
@ -437,6 +449,7 @@ class Segment :
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr partition_tag_;
::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr collection_name_;
::PROTOBUF_NAMESPACE_ID::uint64 segment_id_;
::PROTOBUF_NAMESPACE_ID::uint64 collection_id_;
::PROTOBUF_NAMESPACE_ID::int32 channel_start_;
@ -960,6 +973,57 @@ inline void Segment::set_close_timestamp(::PROTOBUF_NAMESPACE_ID::uint64 value)
// @@protoc_insertion_point(field_set:masterpb.Segment.close_timestamp)
}
// string collection_name = 8;
inline void Segment::clear_collection_name() {
collection_name_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
}
inline const std::string& Segment::collection_name() const {
// @@protoc_insertion_point(field_get:masterpb.Segment.collection_name)
return collection_name_.GetNoArena();
}
inline void Segment::set_collection_name(const std::string& value) {
collection_name_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), value);
// @@protoc_insertion_point(field_set:masterpb.Segment.collection_name)
}
inline void Segment::set_collection_name(std::string&& value) {
collection_name_.SetNoArena(
&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::move(value));
// @@protoc_insertion_point(field_set_rvalue:masterpb.Segment.collection_name)
}
inline void Segment::set_collection_name(const char* value) {
GOOGLE_DCHECK(value != nullptr);
collection_name_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::string(value));
// @@protoc_insertion_point(field_set_char:masterpb.Segment.collection_name)
}
inline void Segment::set_collection_name(const char* value, size_t size) {
collection_name_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(),
::std::string(reinterpret_cast<const char*>(value), size));
// @@protoc_insertion_point(field_set_pointer:masterpb.Segment.collection_name)
}
inline std::string* Segment::mutable_collection_name() {
// @@protoc_insertion_point(field_mutable:masterpb.Segment.collection_name)
return collection_name_.MutableNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
}
inline std::string* Segment::release_collection_name() {
// @@protoc_insertion_point(field_release:masterpb.Segment.collection_name)
return collection_name_.ReleaseNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
}
inline void Segment::set_allocated_collection_name(std::string* collection_name) {
if (collection_name != nullptr) {
} else {
}
collection_name_.SetAllocatedNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), collection_name);
// @@protoc_insertion_point(field_set_allocated:masterpb.Segment.collection_name)
}
// -------------------------------------------------------------------
// SegmentStat

View File

@ -11,6 +11,90 @@ service Watch {
}
}
service KV {
// Range gets the keys in the range from the key-value store.
rpc Range(RangeRequest) returns (RangeResponse) {
}
}
message RangeRequest {
enum SortOrder {
NONE = 0; // default, no sorting
ASCEND = 1; // lowest target value first
DESCEND = 2; // highest target value first
}
enum SortTarget {
KEY = 0;
VERSION = 1;
CREATE = 2;
MOD = 3;
VALUE = 4;
}
// key is the first key for the range. If range_end is not given, the request only looks up key.
bytes key = 1;
// range_end is the upper bound on the requested range [key, range_end).
// If range_end is '\0', the range is all keys >= key.
// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
// then the range request gets all keys prefixed with key.
// If both key and range_end are '\0', then the range request returns all keys.
bytes range_end = 2;
// limit is a limit on the number of keys returned for the request. When limit is set to 0,
// it is treated as no limit.
int64 limit = 3;
// revision is the point-in-time of the key-value store to use for the range.
// If revision is less or equal to zero, the range is over the newest key-value store.
// If the revision has been compacted, ErrCompacted is returned as a response.
int64 revision = 4;
// sort_order is the order for returned sorted results.
SortOrder sort_order = 5;
// sort_target is the key-value field to use for sorting.
SortTarget sort_target = 6;
// serializable sets the range request to use serializable member-local reads.
// Range requests are linearizable by default; linearizable requests have higher
// latency and lower throughput than serializable requests but reflect the current
// consensus of the cluster. For better performance, in exchange for possible stale reads,
// a serializable range request is served locally without needing to reach consensus
// with other nodes in the cluster.
bool serializable = 7;
// keys_only when set returns only the keys and not the values.
bool keys_only = 8;
// count_only when set returns only the count of the keys in the range.
bool count_only = 9;
// min_mod_revision is the lower bound for returned key mod revisions; all keys with
// lesser mod revisions will be filtered away.
int64 min_mod_revision = 10;
// max_mod_revision is the upper bound for returned key mod revisions; all keys with
// greater mod revisions will be filtered away.
int64 max_mod_revision = 11;
// min_create_revision is the lower bound for returned key create revisions; all keys with
// lesser create revisions will be filtered away.
int64 min_create_revision = 12;
// max_create_revision is the upper bound for returned key create revisions; all keys with
// greater create revisions will be filtered away.
int64 max_create_revision = 13;
}
message RangeResponse {
ResponseHeader header = 1;
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
repeated KeyValue kvs = 2;
// more indicates if there are more keys to return in the requested range.
bool more = 3;
// count is set to the number of keys within the range when requested.
int64 count = 4;
}
message WatchRequest {
// request_union is a request to either create a new watcher or cancel an existing watcher.
oneof request_union {

View File

@ -2,6 +2,7 @@
#include "pulsar/Result.h"
#include "PartitionPolicy.h"
#include "utils/CommonUtil.h"
#include "M3_hash.h"
#include "config/ServerConfig.h"
#include <omp.h>
#include <numeric>
@ -11,7 +12,10 @@ namespace milvus::message_client {
std::map<int64_t, std::vector<std::shared_ptr<grpc::QueryResult>>> total_results;
MsgClientV2::MsgClientV2(int64_t client_id, const std::string &service_url, const uint32_t mut_parallelism, const pulsar::ClientConfiguration &config)
MsgClientV2::MsgClientV2(int64_t client_id,
const std::string &service_url,
const uint32_t mut_parallelism,
const pulsar::ClientConfiguration &config)
: client_id_(client_id), service_url_(service_url), mut_parallelism_(mut_parallelism) {}
Status MsgClientV2::Init(const std::string &insert_delete,
@ -48,13 +52,13 @@ Status MsgClientV2::Init(const std::string &insert_delete,
}
int64_t GetQueryNodeNum() {
return config.query_node_num();
return config.query_node_num();
}
Status
Aggregation(std::vector<std::shared_ptr<grpc::QueryResult>> results, milvus::grpc::QueryResult* result) {
Aggregation(std::vector<std::shared_ptr<grpc::QueryResult>> results, milvus::grpc::QueryResult *result) {
if (results.empty()) {
return Status(DB_ERROR, "The result is null!");
return Status(DB_ERROR, "The result is null!");
}
std::vector<float> all_scores;
@ -64,99 +68,101 @@ Aggregation(std::vector<std::shared_ptr<grpc::QueryResult>> results, milvus::grp
std::vector<grpc::RowData> all_row_data;
std::vector<grpc::KeyValuePair> all_kv_pairs;
grpc::Status status;
int row_num = 0;
grpc::Status status;
int row_num = 0;
for (auto & result_per_node : results) {
if (result_per_node->status().error_code() != grpc::ErrorCode::SUCCESS){
for (auto &result_per_node : results) {
if (result_per_node->status().error_code() != grpc::ErrorCode::SUCCESS) {
// if (one_node_res->status().error_code() != grpc::ErrorCode::SUCCESS ||
// one_node_res->entities().status().error_code() != grpc::ErrorCode::SUCCESS) {
return Status(DB_ERROR, "QueryNode return wrong status!");
}
for (int j = 0; j < result_per_node->distances_size(); j++) {
all_scores.push_back(result_per_node->scores()[j]);
all_distance.push_back(result_per_node->distances()[j]);
return Status(DB_ERROR, "QueryNode return wrong status!");
}
for (int j = 0; j < result_per_node->distances_size(); j++) {
all_scores.push_back(result_per_node->scores()[j]);
all_distance.push_back(result_per_node->distances()[j]);
// all_kv_pairs.push_back(result_per_node->extra_params()[j]);
}
for (int k = 0; k < result_per_node->entities().ids_size(); ++k) {
all_entities_ids.push_back(result_per_node->entities().ids(k));
}
for (int k = 0; k < result_per_node->entities().ids_size(); ++k) {
all_entities_ids.push_back(result_per_node->entities().ids(k));
// all_valid_row.push_back(result_per_node->entities().valid_row(k));
// all_row_data.push_back(result_per_node->entities().rows_data(k));
}
if (result_per_node->row_num() > row_num){
row_num = result_per_node->row_num();
}
status = result_per_node->status();
}
if (result_per_node->row_num() > row_num) {
row_num = result_per_node->row_num();
}
status = result_per_node->status();
}
std::vector<int> index(all_distance.size());
std::vector<int> index(all_distance.size());
iota(index.begin(), index.end(), 0);
iota(index.begin(), index.end(), 0);
std::stable_sort(index.begin(), index.end(),
[&all_distance](size_t i1, size_t i2) {return all_distance[i1] > all_distance[i2];});
std::stable_sort(index.begin(), index.end(),
[&all_distance](size_t i1, size_t i2) { return all_distance[i1] > all_distance[i2]; });
grpc::Entities result_entities;
grpc::Entities result_entities;
for (int m = 0; m < result->row_num(); ++m) {
result->add_scores(all_scores[index[m]]);
result->add_distances(all_distance[index[m]]);
for (int m = 0; m < result->row_num(); ++m) {
result->add_scores(all_scores[index[m]]);
result->add_distances(all_distance[index[m]]);
// result->add_extra_params();
// result->mutable_extra_params(m)->CopyFrom(all_kv_pairs[index[m]]);
result_entities.add_ids(all_entities_ids[index[m]]);
result_entities.add_ids(all_entities_ids[index[m]]);
// result_entities.add_valid_row(all_valid_row[index[m]]);
// result_entities.add_rows_data();
// result_entities.mutable_rows_data(m)->CopyFrom(all_row_data[index[m]]);
}
}
result_entities.mutable_status()->CopyFrom(status);
result_entities.mutable_status()->CopyFrom(status);
result->set_row_num(row_num);
result->mutable_entities()->CopyFrom(result_entities);
result->set_query_id(results[0]->query_id());
result->set_row_num(row_num);
result->mutable_entities()->CopyFrom(result_entities);
result->set_query_id(results[0]->query_id());
// result->set_client_id(results[0]->client_id());
return Status::OK();
}
Status MsgClientV2::GetQueryResult(int64_t query_id, milvus::grpc::QueryResult* result) {
Status MsgClientV2::GetQueryResult(int64_t query_id, milvus::grpc::QueryResult *result) {
int64_t query_node_num = GetQueryNodeNum();
int64_t query_node_num = GetQueryNodeNum();
auto t1 = std::chrono::high_resolution_clock::now();
auto t1 = std::chrono::high_resolution_clock::now();
while (true) {
auto received_result = total_results[query_id];
if (received_result.size() == query_node_num) {
break;
}
Message msg;
consumer_->receive(msg);
grpc::QueryResult search_res_msg;
auto status = search_res_msg.ParseFromString(msg.getDataAsString());
if (status) {
auto message = std::make_shared<grpc::QueryResult>(search_res_msg);
total_results[message->query_id()].push_back(message);
consumer_->acknowledge(msg);
} else {
return Status(DB_ERROR, "can't parse message which from pulsar!");
}
while (true) {
auto received_result = total_results[query_id];
if (received_result.size() == query_node_num) {
break;
}
auto status = Aggregation(total_results[query_id], result);
Message msg;
consumer_->receive(msg);
return status;
grpc::QueryResult search_res_msg;
auto status = search_res_msg.ParseFromString(msg.getDataAsString());
if (status) {
auto message = std::make_shared<grpc::QueryResult>(search_res_msg);
total_results[message->query_id()].push_back(message);
consumer_->acknowledge(msg);
} else {
return Status(DB_ERROR, "can't parse message which from pulsar!");
}
}
auto status = Aggregation(total_results[query_id], result);
return status;
}
Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, uint64_t timestamp) {
Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
uint64_t timestamp,
const std::function<uint64_t(const std::string &collection_name,
uint64_t channel_id,
uint64_t timestamp)> &segment_id) {
// may have retry policy?
auto row_count = request.rows_data_size();
// TODO: Get the segment from master
int64_t segment = 0;
auto stats = std::vector<pulsar::Result>(ParallelNum);
auto stats = std::vector<Status>(ParallelNum);
#pragma omp parallel for default(none), shared(row_count, request, timestamp, segment, stats), num_threads(ParallelNum)
#pragma omp parallel for default(none), shared(row_count, request, timestamp, stats, segment_id), num_threads(ParallelNum)
for (auto i = 0; i < row_count; i++) {
milvus::grpc::InsertOrDeleteMsg mut_msg;
int this_thread = omp_get_thread_num();
@ -166,34 +172,48 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, uin
mut_msg.set_timestamp(timestamp);
mut_msg.set_collection_name(request.collection_name());
mut_msg.set_partition_tag(request.partition_tag());
mut_msg.set_segment_id(segment);
mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i));
mut_msg.mutable_extra_params()->CopyFrom(request.extra_params());
uint64_t uid = request.entity_id_array(i);
auto channel_id = makeHash(&uid, sizeof(uint64_t));
try {
mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp));
printf("%ld \n", mut_msg.segment_id());
mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i));
mut_msg.mutable_extra_params()->CopyFrom(request.extra_params());
auto result = paralle_mut_producers_[this_thread]->send(mut_msg);
if (result != pulsar::ResultOk) {
stats[this_thread] = result;
auto result = paralle_mut_producers_[this_thread]->send(mut_msg);
if (result != pulsar::ResultOk) {
stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result));
}
}
catch (const std::exception &e) {
stats[this_thread] = Status(DB_ERROR, "Meta error");
}
}
for (auto &stat : stats) {
if (stat != pulsar::ResultOk) {
return Status(DB_ERROR, pulsar::strResult(stat));
if (!stat.ok()) {
return stat;
}
}
return Status::OK();
}
Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request, uint64_t timestamp) {
Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
uint64_t timestamp,
const std::function<uint64_t(const std::string &collection_name,
uint64_t channel_id,
uint64_t timestamp)> &segment_id) {
auto stats = std::vector<pulsar::Result>(ParallelNum);
#pragma omp parallel for default(none), shared( request, timestamp, stats), num_threads(ParallelNum)
#pragma omp parallel for default(none), shared( request, timestamp, stats, segment_id), num_threads(ParallelNum)
for (auto i = 0; i < request.id_array_size(); i++) {
milvus::grpc::InsertOrDeleteMsg mut_msg;
mut_msg.set_op(milvus::grpc::OpType::DELETE);
mut_msg.set_uid(GetUniqueQId());
mut_msg.set_client_id(client_id_);
mut_msg.set_uid(request.id_array(i));
mut_msg.set_collection_name(request.collection_name());
mut_msg.set_timestamp(timestamp);
uint64_t uid = request.id_array(i);
auto channel_id = makeHash(&uid, sizeof(uint64_t));
mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp));
int this_thread = omp_get_thread_num();
auto result = paralle_mut_producers_[this_thread]->send(mut_msg);
@ -210,51 +230,51 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
}
Status MsgClientV2::SendQueryMessage(const milvus::grpc::SearchParam &request, uint64_t timestamp, int64_t &query_id) {
milvus::grpc::SearchMsg search_msg;
milvus::grpc::SearchMsg search_msg;
query_id = GetUniqueQId();
search_msg.set_collection_name(request.collection_name());
search_msg.set_uid(query_id);
//TODO: get client id from master
search_msg.set_client_id(1);
search_msg.set_timestamp(timestamp);
search_msg.set_dsl(request.dsl());
query_id = GetUniqueQId();
search_msg.set_collection_name(request.collection_name());
search_msg.set_uid(query_id);
//TODO: get client id from master
search_msg.set_client_id(1);
search_msg.set_timestamp(timestamp);
search_msg.set_dsl(request.dsl());
milvus::grpc::VectorRowRecord vector_row_recode;
std::vector<float> vectors_records;
std::string binary_data;
for (int i = 0; i < request.vector_param_size(); ++i) {
search_msg.add_json(request.vector_param(i).json());
for (int j = 0; j < request.vector_param(i).row_record().records_size(); ++j) {
for (int k = 0; k < request.vector_param(i).row_record().records(j).float_data_size(); ++k) {
vector_row_recode.add_float_data(request.vector_param(i).row_record().records(j).float_data(k));
}
binary_data.append(request.vector_param(i).row_record().records(j).binary_data());
}
milvus::grpc::VectorRowRecord vector_row_recode;
std::vector<float> vectors_records;
std::string binary_data;
for (int i = 0; i < request.vector_param_size(); ++i) {
search_msg.add_json(request.vector_param(i).json());
for (int j = 0; j < request.vector_param(i).row_record().records_size(); ++j) {
for (int k = 0; k < request.vector_param(i).row_record().records(j).float_data_size(); ++k) {
vector_row_recode.add_float_data(request.vector_param(i).row_record().records(j).float_data(k));
}
binary_data.append(request.vector_param(i).row_record().records(j).binary_data());
}
vector_row_recode.set_binary_data(binary_data);
}
vector_row_recode.set_binary_data(binary_data);
search_msg.mutable_records()->CopyFrom(vector_row_recode);
search_msg.mutable_records()->CopyFrom(vector_row_recode);
for (int m = 0; m < request.partition_tag_size(); ++m) {
search_msg.add_partition_tag(request.partition_tag(m));
}
for (int m = 0; m < request.partition_tag_size(); ++m) {
search_msg.add_partition_tag(request.partition_tag(m));
}
for (int l = 0; l < request.extra_params_size(); ++l) {
search_msg.mutable_extra_params(l)->CopyFrom(request.extra_params(l));
}
for (int l = 0; l < request.extra_params_size(); ++l) {
search_msg.mutable_extra_params(l)->CopyFrom(request.extra_params(l));
}
auto result = search_producer_->send(search_msg);
if (result != pulsar::Result::ResultOk) {
return Status(DB_ERROR, pulsar::strResult(result));
}
auto result = search_producer_->send(search_msg);
if (result != pulsar::Result::ResultOk) {
return Status(DB_ERROR, pulsar::strResult(result));
}
return Status::OK();
return Status::OK();
}
MsgClientV2::~MsgClientV2() {
// insert_delete_producer_->close();
for (auto& producer: paralle_mut_producers_){
for (auto &producer: paralle_mut_producers_) {
producer->close();
}
search_producer_->close();

View File

@ -24,9 +24,13 @@ class MsgClientV2 {
const std::string &search_result);
// unpackage batch insert or delete request, and delivery message to pulsar per row
Status SendMutMessage(const milvus::grpc::InsertParam &request, uint64_t timestamp);
Status SendMutMessage(const milvus::grpc::InsertParam &request, uint64_t timestamp, const std::function<uint64_t (const std::string &collection_name,
uint64_t channel_id,
uint64_t timestam)>&);
Status SendMutMessage(const milvus::grpc::DeleteByIDParam &request, uint64_t timestamp);
Status SendMutMessage(const milvus::grpc::DeleteByIDParam &request, uint64_t timestamp, const std::function<uint64_t(const std::string &collection_name,
uint64_t channel_id,
uint64_t timestam) >&);
//
Status SendQueryMessage(const milvus::grpc::SearchParam &request, uint64_t timestamp, int64_t &query_id);

View File

@ -2,8 +2,11 @@ include_directories(${PROJECT_BINARY_DIR}/thirdparty/grpc/grpc-src/third_party/p
include_directories(${PROJECT_BINARY_DIR}/thirdparty/grpc/grpc-src/include)
add_subdirectory( etcd_watcher )
aux_source_directory( ./master master_src)
add_library(meta ${master_src}
aux_source_directory( ./master master_src)
aux_source_directory(./etcd_client etcd_src)
add_library(meta
${master_src}
${etcd_src}
./etcd_watcher/Watcher.cpp
${PROJECT_SOURCE_DIR}/src/grpc/etcd.pb.cc
${PROJECT_SOURCE_DIR}/src/grpc/etcd.grpc.pb.cc

View File

@ -0,0 +1,23 @@
#include "Etcd_client.h"
#include "grpc++/grpc++.h"
namespace milvus{
namespace master{
EtcdClient::EtcdClient(const std::string &addr) {
auto channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
stub_ = etcdserverpb::KV::NewStub(channel);
}
Status
EtcdClient::Range(const etcdserverpb::RangeRequest& request, etcdserverpb::RangeResponse& response){
::grpc::ClientContext context;
auto status = stub_->Range(&context, request, &response);
if (!status.ok()){
return Status(DB_ERROR, status.error_message());
}
return Status::OK();
}
}
}

View File

@ -0,0 +1,18 @@
#include "grpc/etcd.grpc.pb.h"
#include "utils/Status.h"
namespace milvus {
namespace master {
class EtcdClient {
public:
explicit EtcdClient(const std::string &addr);
Status
Range(const etcdserverpb::RangeRequest& request, etcdserverpb::RangeResponse& response);
private:
std::unique_ptr<etcdserverpb::KV::Stub> stub_;
};
}
}

View File

@ -30,10 +30,10 @@ class AsyncWatchAction {
void CancelWatch();
private:
// Status status;
grpc::ClientContext context_;
grpc::CompletionQueue cq_;
::grpc::ClientContext context_;
::grpc::CompletionQueue cq_;
etcdserverpb::WatchResponse reply_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<etcdserverpb::WatchRequest, etcdserverpb::WatchResponse>> stream_;
std::unique_ptr<::grpc::ClientAsyncReaderWriter<etcdserverpb::WatchRequest, etcdserverpb::WatchResponse>> stream_;
std::atomic<bool> cancled_ = false;
};
}

View File

@ -20,7 +20,7 @@ Status GrpcClient::CreateCollection(const milvus::grpc::Mapping &mapping) {
::grpc::Status grpc_status = stub_->CreateCollection(&context, mapping, &response);
if (!grpc_status.ok()) {
std::cerr << "CreateHybridCollection gRPC failed!" << std::endl;
std::cerr << "CreateHybridCollection gRPC failed!" << grpc_status.error_message() << std::endl;
return Status(grpc_status.error_code(), grpc_status.error_message());
}

View File

@ -1,20 +1,159 @@
#include "MetaWrapper.h"
#include "config/ServerConfig.h"
namespace milvus{
#include "nlohmann/json.hpp"
#include <mutex>
#include <google/protobuf/text_format.h>
using Collection = masterpb::Collection;
using Schema = milvus::grpc::Schema;
using SegmentInfo = masterpb::Segment;
using JSON = nlohmann::json;
namespace milvus {
namespace server {
MetaWrapper& MetaWrapper::GetInstance() {
namespace {
void ParseSegmentInfo(const std::string &json_str, SegmentInfo &segment_info) {
auto json = JSON::parse(json_str);
segment_info.set_segment_id(json["segment_id"].get<uint64_t>());
segment_info.set_partition_tag(json["partition_tag"].get<std::string>());
segment_info.set_channel_start(json["channel_start"].get<int32_t>());
segment_info.set_channel_end(json["channel_end"].get<int32_t>());
segment_info.set_open_timestamp(json["open_timestamp"].get<uint64_t>());
segment_info.set_close_timestamp(json["close_timestamp"].get<uint64_t>());
segment_info.set_collection_id(json["collection_id"].get<uint64_t>());
segment_info.set_collection_name(json["collection_name"].get<std::string>());
}
void ParseCollectionSchema(const std::string &json_str, Collection &collection) {
auto json = JSON::parse(json_str);
auto proto_str = json["grpc_marshal_string"].get<std::string>();
auto suc = google::protobuf::TextFormat::ParseFromString(proto_str, &collection);
if (!suc) {
std::cerr << "unmarshal failed" << std::endl;
}
}
}
bool MetaWrapper::IsCollectionMetaKey(const std::string &key) {
return key.rfind(collection_path_, 0) == 0;
}
bool MetaWrapper::IsSegmentMetaKey(const std::string &key) {
return key.rfind(segment_path_, 0) == 0;
}
MetaWrapper &MetaWrapper::GetInstance() {
static MetaWrapper wrapper;
return wrapper;
}
Status MetaWrapper::Init() {
auto addr = config.master.address() + ":" + std::to_string(config.master.port());
client_ = std::make_shared<milvus::master::GrpcClient>(addr);
etcd_root_path_ = config.etcd.rootpath();
segment_path_ = etcd_root_path_ + "segment/";
collection_path_ = etcd_root_path_ + "collection/";
auto master_addr = config.master.address() + ":" + std::to_string(config.master.port());
master_client_ = std::make_shared<milvus::master::GrpcClient>(master_addr);
auto etcd_addr = config.etcd.address() + ":" + std::to_string(config.etcd.port());
etcd_client_ = std::make_shared<milvus::master::EtcdClient>(etcd_addr);
// init etcd watcher
auto f = [&](const etcdserverpb::WatchResponse &res) {
UpdateMeta(res);
};
watcher_ = std::make_shared<milvus::master::Watcher>(etcd_addr, segment_path_, f, true);
SyncMeta();
}
std::shared_ptr<milvus::master::GrpcClient> MetaWrapper::MetaClient() {
return client_;
return master_client_;
}
void MetaWrapper::UpdateMeta(const etcdserverpb::WatchResponse &res) {
for (auto &event: res.events()) {
auto &event_key = event.kv().key();
auto &event_value = event.kv().value();
if (event.type() == etcdserverpb::Event_EventType::Event_EventType_PUT) {
if (event_key.rfind(segment_path_, 0) == 0) {
// segment info
SegmentInfo segment_info;
ParseSegmentInfo(event_value, segment_info);
std::unique_lock lock(mutex_);
segment_infos_[segment_info.segment_id()] = segment_info;
lock.unlock();
} else {
// table scheme
Collection collection;
ParseCollectionSchema(event_value, collection);
std::unique_lock lock(mutex_);
schemas_[collection.name()] = collection;
lock.unlock();
}
}
// TODO: Delete event type
}
}
uint64_t MetaWrapper::AskSegmentId(const std::string &collection_name, uint64_t channel_id, uint64_t timestamp) {
// TODO: may using some multi index data structure to speed up search
// index timestamp: no-unique, seems close timestamp is enough
// index collection_name: no-unique
// index channel_id: must satisfy channel_start <= channel_id < channel_end
std::shared_lock lock(mutex_);
for (auto &item: segment_infos_) {
auto &segment_info = item.second;
uint64_t open_ts = segment_info.open_timestamp();
uint64_t close_ts = segment_info.close_timestamp();
if (channel_id >= segment_info.channel_start() && channel_id < segment_info.channel_end()
&& timestamp >= open_ts << 18 && timestamp < close_ts << 18
&& segment_info.collection_name() == collection_name) {
return segment_info.segment_id();
}
}
throw std::runtime_error("Can't find eligible segment");
}
const Schema &MetaWrapper::AskCollectionSchema(const std::string &collection_name) {
std::shared_lock lock(mutex_);
if (schemas_.find(collection_name) != schemas_.end()) {
return schemas_[collection_name].schema();
}
throw std::runtime_error("Collection " + collection_name + " not existed");
}
Status MetaWrapper::SyncMeta() {
::etcdserverpb::RangeRequest request;
request.set_key(etcd_root_path_);
std::string range_end(etcd_root_path_);
int ascii = (int) range_end[range_end.length() - 1];
range_end.back() = ascii + 1;
request.set_range_end(range_end);
::etcdserverpb::RangeResponse response;
auto status = etcd_client_->Range(request, response);
if (status.ok()) {
for (auto &kv : response.kvs()) {
if (IsCollectionMetaKey(kv.key())) {
Collection collection;
ParseCollectionSchema(kv.value(), collection);
std::unique_lock lock(mutex_);
schemas_[collection.name()] = collection;
lock.unlock();
} else {
assert(IsSegmentMetaKey(kv.key()));
SegmentInfo segment_info;
ParseSegmentInfo(kv.value(), segment_info);
std::unique_lock lock(mutex_);
segment_infos_[segment_info.segment_id()] = segment_info;
lock.unlock();
}
}
}
return status;
}
}

View File

@ -1,12 +1,19 @@
#include "utils/Status.h"
#include "meta/master/GrpcClient.h"
#include "grpc/message.pb.h"
#include "grpc/master.pb.h"
#include "meta/etcd_watcher/Watcher.h"
#include "meta/etcd_client/Etcd_client.h"
#include "config/ServerConfig.h"
namespace milvus{
namespace server{
#include <shared_mutex>
namespace milvus {
namespace server {
class MetaWrapper {
public:
static MetaWrapper&
static MetaWrapper &
GetInstance();
Status
@ -15,10 +22,34 @@ class MetaWrapper {
std::shared_ptr<milvus::master::GrpcClient>
MetaClient();
uint64_t
AskSegmentId(const std::string &collection_name, uint64_t channel_id, uint64_t timestamp);
const milvus::grpc::Schema &
AskCollectionSchema(const std::string &collection_name);
Status
SyncMeta();
private:
std::shared_ptr<milvus::master::GrpcClient> client_;
bool IsCollectionMetaKey(const std::string &key);
bool IsSegmentMetaKey(const std::string &key);
void UpdateMeta(const etcdserverpb::WatchResponse &res);
private:
std::shared_ptr<milvus::master::GrpcClient> master_client_;
std::shared_ptr<milvus::master::EtcdClient> etcd_client_;
std::unordered_map<std::string, masterpb::Collection> schemas_;
std::unordered_map<uint64_t, masterpb::Segment> segment_infos_;
std::shared_ptr<milvus::master::Watcher> watcher_;
std::shared_mutex mutex_;
std::string etcd_root_path_;
std::string segment_path_;
std::string collection_path_;
};
}
}

View File

@ -43,9 +43,8 @@ namespace milvus {
namespace server {
Status
ReqHandler::CreateCollection(const ContextPtr& context, const std::string& collection_name, FieldsType& fields,
milvus::json& json_param) {
BaseReqPtr req_ptr = CreateCollectionReq::Create(context, collection_name, fields, json_param);
ReqHandler::CreateCollection(const ContextPtr& context, const ::milvus::grpc::Mapping *request) {
BaseReqPtr req_ptr = CreateCollectionReq::Create(context, request);
ReqScheduler::ExecReq(req_ptr);
return req_ptr->status();
}
@ -72,9 +71,8 @@ ReqHandler::ListCollections(const ContextPtr& context, std::vector<std::string>&
}
Status
ReqHandler::GetCollectionInfo(const ContextPtr& context, const std::string& collection_name,
CollectionSchema& collection_schema) {
BaseReqPtr req_ptr = GetCollectionInfoReq::Create(context, collection_name, collection_schema);
ReqHandler::GetCollectionInfo(const ContextPtr& context, const ::milvus::grpc::CollectionName *request, ::milvus::grpc::Mapping& response) {
BaseReqPtr req_ptr = GetCollectionInfoReq::Create(context, request, response);
ReqScheduler::ExecReq(req_ptr);
return req_ptr->status();
}

View File

@ -31,8 +31,7 @@ class ReqHandler {
ReqHandler() = default;
Status
CreateCollection(const ContextPtr& context, const std::string& collection_name, FieldsType& fields,
milvus::json& json_params);
CreateCollection(const ContextPtr& context, const ::milvus::grpc::Mapping *request);
Status
DropCollection(const ContextPtr& context, const std::string& collection_name);
@ -44,8 +43,7 @@ class ReqHandler {
ListCollections(const ContextPtr& context, std::vector<std::string>& collections);
Status
GetCollectionInfo(const ContextPtr& context, const std::string& collection_name,
CollectionSchema& collection_schema);
GetCollectionInfo(const ContextPtr& context, const ::milvus::grpc::CollectionName *request, ::milvus::grpc::Mapping& respons);
Status
GetCollectionStats(const ContextPtr& context, const std::string& collection_name, std::string& collection_stats);

View File

@ -160,13 +160,10 @@ ReqScheduler::PutToQueue(const BaseReqPtr& req_ptr) {
int64_t ReqScheduler::GetLatestDeliveredReqTime() {
std::lock_guard lock(time_syc_mtx_);
if (sending_){
return latest_req_time_;
if (!sending_){
latest_req_time_ = TSOracle::GetInstance().GetTimeStamp();
}
auto ts = TSOracle::GetInstance().GetTimeStamp();
latest_req_time_ = ts;
assert(ts != 0);
return ts;
return latest_req_time_;
}
void ReqScheduler::UpdateLatestDeliveredReqTime(int64_t time) {

View File

@ -13,32 +13,30 @@
// #include "db/Utils.h"
#include "server/ValidationUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "server/MetaWrapper.h"
#include <set>
namespace milvus {
namespace server {
CreateCollectionReq::CreateCollectionReq(const ContextPtr& context, const std::string& collection_name,
FieldsType& fields, milvus::json& extra_params)
CreateCollectionReq::CreateCollectionReq(const ContextPtr &context, const ::milvus::grpc::Mapping *request)
: BaseReq(context, ReqType::kCreateCollection),
collection_name_(collection_name),
fields_(fields),
extra_params_(extra_params) {
request_(request) {
}
BaseReqPtr
CreateCollectionReq::Create(const ContextPtr& context, const std::string& collection_name, FieldsType& fields,
milvus::json& extra_params) {
return std::shared_ptr<BaseReq>(new CreateCollectionReq(context, collection_name, fields, extra_params));
CreateCollectionReq::Create(const ContextPtr &context, const ::milvus::grpc::Mapping *request) {
return std::shared_ptr<BaseReq>(new CreateCollectionReq(context, request));
}
Status
CreateCollectionReq::OnExecute() {
return Status::OK();
auto status = MetaWrapper::GetInstance().MetaClient()->CreateCollection(*request_);
if (status.ok()){
status = MetaWrapper::GetInstance().SyncMeta();
}
return status;
}
} // namespace server

View File

@ -22,21 +22,18 @@ namespace server {
class CreateCollectionReq : public BaseReq {
public:
static BaseReqPtr
Create(const ContextPtr& context, const std::string& collection_name, FieldsType& fields,
milvus::json& extra_params);
static BaseReqPtr
Create(const ContextPtr &context, const ::milvus::grpc::Mapping *request);
protected:
CreateCollectionReq(const ContextPtr& context, const std::string& collection_name, FieldsType& fields,
milvus::json& extra_params);
CreateCollectionReq(const ContextPtr &context, const ::milvus::grpc::Mapping *request);
Status
OnExecute() override;
Status
OnExecute() override;
private:
const std::string collection_name_;
std::unordered_map<std::string, FieldSchema> fields_;
milvus::json extra_params_;
const std::string collection_name_;
const ::milvus::grpc::Mapping *request_;
};
} // namespace server

View File

@ -18,6 +18,7 @@
#include "server/delivery/request/DeleteEntityByIDReq.h"
#include "src/server/delivery/ReqScheduler.h"
#include "server/MessageWrapper.h"
#include "server/MetaWrapper.h"
#include <memory>
#include <string>
@ -43,7 +44,12 @@ DeleteEntityByIDReq::Create(const ContextPtr& context, const ::milvus::grpc::Del
Status
DeleteEntityByIDReq::OnExecute() {
auto &msg_client = MessageWrapper::GetInstance().MessageClient();
Status status = msg_client->SendMutMessage(*request_, timestamp_);
auto segment_id = [](const std::string &collection_name,
uint64_t channel_id,
uint64_t timestamp) {
return MetaWrapper::GetInstance().AskSegmentId(collection_name, channel_id, timestamp);
};
Status status = msg_client->SendMutMessage(*request_, timestamp_, segment_id);
return status;
}

View File

@ -16,30 +16,36 @@
// #include "server/web_impl/Constants.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "server/MetaWrapper.h"
#include <utility>
namespace milvus {
namespace server {
GetCollectionInfoReq::GetCollectionInfoReq(const ContextPtr& context, const std::string& collection_name,
CollectionSchema& collection_schema)
GetCollectionInfoReq::GetCollectionInfoReq(const ContextPtr& context, const ::milvus::grpc::CollectionName *request, ::milvus::grpc::Mapping& response)
: BaseReq(context, ReqType::kGetCollectionInfo),
collection_name_(collection_name),
collection_schema_(collection_schema) {
collection_name_(request->collection_name()),
collection_schema_(response) {
}
BaseReqPtr
GetCollectionInfoReq::Create(const ContextPtr& context, const std::string& collection_name,
CollectionSchema& collection_schema) {
return std::shared_ptr<BaseReq>(new GetCollectionInfoReq(context, collection_name, collection_schema));
GetCollectionInfoReq::Create(const ContextPtr& context, const ::milvus::grpc::CollectionName *request, ::milvus::grpc::Mapping& response) {
return std::shared_ptr<BaseReq>(new GetCollectionInfoReq(context, request, response));
}
Status
GetCollectionInfoReq::OnExecute() {
try {
auto schema = MetaWrapper::GetInstance().AskCollectionSchema(collection_name_);
collection_schema_.mutable_schema()->CopyFrom(schema);
collection_schema_.set_collection_name(collection_schema_.collection_name());
return Status::OK();
}
catch (const std::exception& e){
return Status{DB_ERROR, e.what()};
}
}
} // namespace server

View File

@ -21,19 +21,18 @@ namespace server {
class GetCollectionInfoReq : public BaseReq {
public:
static BaseReqPtr
Create(const ContextPtr& context, const std::string& collection_name, CollectionSchema& collection_schema);
static BaseReqPtr
Create(const ContextPtr& context, const ::milvus::grpc::CollectionName *request, ::milvus::grpc::Mapping& response);
protected:
GetCollectionInfoReq(const ContextPtr& context, const std::string& collection_name,
CollectionSchema& collection_schema);
GetCollectionInfoReq(const ContextPtr& context, const ::milvus::grpc::CollectionName *request, ::milvus::grpc::Mapping& response);
Status
OnExecute() override;
Status
OnExecute() override;
private:
const std::string collection_name_;
CollectionSchema& collection_schema_;
const std::string collection_name_;
::milvus::grpc::Mapping &collection_schema_;
};
} // namespace server

View File

@ -16,6 +16,7 @@
#include "utils/TimeRecorder.h"
#include "server/delivery/ReqScheduler.h"
#include "server/MessageWrapper.h"
#include "server/MetaWrapper.h"
#include <memory>
#include <string>
@ -44,7 +45,13 @@ Status
InsertReq::OnExecute() {
LOG_SERVER_INFO_ << LogOut("[%s][%ld] ", "insert", 0) << "Execute InsertReq.";
auto &msg_client = MessageWrapper::GetInstance().MessageClient();
Status status = msg_client->SendMutMessage(*insert_param_, timestamp_);
auto segment_id = [](const std::string &collection_name,
uint64_t channel_id,
uint64_t timestamp) {
return MetaWrapper::GetInstance().AskSegmentId(collection_name, channel_id, timestamp);
};
Status status;
status = msg_client->SendMutMessage(*insert_param_, timestamp_, segment_id);
return status;
}

View File

@ -36,7 +36,7 @@ class InsertReq : public BaseReq {
OnPostExecute() override ;
private:
const ::milvus::grpc::InsertParam *insert_param_;
const grpc::InsertParam *insert_param_;
};
} // namespace server

View File

@ -341,7 +341,7 @@ GrpcRequestHandler::CreateCollection(::grpc::ServerContext *context, const ::mil
CHECK_NULLPTR_RETURN(request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
Status status = MetaWrapper::GetInstance().MetaClient()->CreateCollection(*request);
Status status = req_handler_.CreateCollection(GetContext(context), request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__);
SET_RESPONSE(response, status, context)
@ -468,6 +468,9 @@ GrpcRequestHandler::DescribeCollection(::grpc::ServerContext *context, const ::m
::milvus::grpc::Mapping *response) {
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
CHECK_NULLPTR_RETURN(request);
Status status = req_handler_.GetCollectionInfo(GetContext(context), request, *response);
SET_RESPONSE(response->mutable_status(), status, context)
return ::grpc::Status::OK;
}
@ -697,6 +700,17 @@ GrpcRequestHandler::Insert(::grpc::ServerContext *context, const ::milvus::grpc:
return ::grpc::Status::OK;
}
// check if collection exist, using `HasCollection` after.
try {
MetaWrapper::GetInstance().AskCollectionSchema(request->collection_name());
}
catch (const std::exception& e){
// means collection not exit
SET_RESPONSE(response->mutable_status(), Status(SERVER_COLLECTION_NOT_EXIST, "Collection not exist " + request->collection_name()), context)
return ::grpc::Status::OK;
}
// generate uid for entities
//if (request->entity_id_array_size() == 0) {
// auto ids = std::vector<int64_t >(request->rows_data_size());
@ -858,8 +872,8 @@ GrpcRequestHandler::Search(::grpc::ServerContext *context, const ::milvus::grpc:
//TODO: check if the request is legal
BaseReqPtr req_ptr = SearchReq::Create(GetContext(context), request, response);
ReqScheduler::ExecReq(req_ptr);
BaseReqPtr req_ptr = SearchReq::Create(GetContext(context), request, response);
ReqScheduler::ExecReq(req_ptr);
return ::grpc::Status::OK;
}

View File

@ -105,7 +105,7 @@ GrpcServer::StartService() {
int client_id = 0;
std::string pulsar_server_addr
(std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port()));
timesync::TimeSync syc(client_id,GetMessageTimeSyncTime, 400, pulsar_server_addr, "TimeSync");
timesync::TimeSync syc(client_id,GetMessageTimeSyncTime, config.timesync.interval(), pulsar_server_addr, "TimeSync");
// Add gRPC interceptor

View File

@ -64,7 +64,7 @@ add_custom_target(generate_suvlim_pb_grpc ALL DEPENDS protoc grpc_cpp_plugin)
add_custom_command(TARGET generate_suvlim_pb_grpc
POST_BUILD
COMMAND echo "${PROTOC_EXCUTABLE}"
COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_go.sh" -p "${PROTOC_EXCUTABLE}"
# COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_go.sh" -p "${PROTOC_EXCUTABLE}"
COMMAND bash "${PROTO_GEN_SCRIPTS_DIR}/generate_cpp.sh" -p "${PROTOC_EXCUTABLE}" -g "${GRPC_CPP_PLUGIN_EXCUTABLE}"
COMMAND ${PROTOC_EXCUTABLE} -I "${PROTO_PATH}/proto" --grpc_out "${PROTO_PATH}" --cpp_out "${PROTO_PATH}"
--plugin=protoc-gen-grpc="${GRPC_CPP_PLUGIN_EXCUTABLE}"

View File

@ -20,20 +20,6 @@ endif ()
message(STATUS "Building pulsar-client-cpp-${PULSAR_CLIENT_CPP_VERSION} from source")
#include(ExternalProject)
#ExternalProject_Add(
# pulsar
# URL ${PULSAR_URL}
# PREFIX ${pulsar_ROOT}
# CONFIGURE_COMMAND cd ${pulsar_ROOT}/src/pulsar/pulsar-client-cpp && cmake -DBUILD_TESTS=OFF -DCMAKE_INSTALL_PREFIX=${pulsar_ROOT}/build .
# BUILD_COMMAND cd ${pulsar_ROOT}/src/pulsar/pulsar-client-cpp && cmake .
# BUILD_IN_SOURCE true
# INSTALL_COMMAND cd ${pulsar_ROOT}/src/pulsar/pulsar-client-cpp && make install
#)
FetchContent_Declare(
pulsar
URL ${PULSAR_URL}

View File

@ -23,4 +23,4 @@ func main2() {
wg.Add(1)
reader.StartQueryNode2()
wg.Wait()
}
}

View File

@ -14,10 +14,10 @@ package reader
import "C"
type Collection struct {
CollectionPtr C.CCollection
CollectionPtr C.CCollection
CollectionName string
CollectionID uint64
Partitions []*Partition
CollectionID uint64
Partitions []*Partition
}
func (c *Collection) NewPartition(partitionName string) *Partition {

View File

@ -3,16 +3,17 @@ package reader
import (
"encoding/binary"
"fmt"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/stretchr/testify/assert"
"math"
"testing"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/stretchr/testify/assert"
)
func TestIndex_BuildIndex(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)

View File

@ -3,36 +3,36 @@ package reader
import (
"context"
"fmt"
"github.com/czs007/suvlim/pkg/master/mock"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/czs007/suvlim/conf"
"github.com/czs007/suvlim/pkg/master/kv"
"github.com/czs007/suvlim/pkg/master/mock"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
)
const (
CollectonPrefix = "/collection/"
SegmentPrefix = "/segment/"
SegmentPrefix = "/segment/"
)
func GetCollectionObjId(key string) string {
prefix := conf.Config.Etcd.Rootpath + CollectonPrefix
prefix := conf.Config.Etcd.Rootpath + CollectonPrefix
return strings.TrimPrefix(key, prefix)
}
func GetSegmentObjId(key string) string {
prefix := conf.Config.Etcd.Rootpath + SegmentPrefix
prefix := conf.Config.Etcd.Rootpath + SegmentPrefix
return strings.TrimPrefix(key, prefix)
}
func isCollectionObj(key string) bool {
prefix := conf.Config.Etcd.Rootpath + CollectonPrefix
prefix := conf.Config.Etcd.Rootpath + CollectonPrefix
prefix = strings.TrimSpace(prefix)
println("prefix is :$", prefix)
index := strings.Index(key, prefix)
@ -41,31 +41,31 @@ func isCollectionObj(key string) bool {
}
func isSegmentObj(key string) bool {
prefix := conf.Config.Etcd.Rootpath + SegmentPrefix
prefix := conf.Config.Etcd.Rootpath + SegmentPrefix
prefix = strings.TrimSpace(prefix)
index := strings.Index(key, prefix)
return index == 0
}
func printCollectionStruct(obj *mock.Collection){
func printCollectionStruct(obj *mock.Collection) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v)
typeOfS := v.Type()
for i := 0; i< v.NumField(); i++ {
if typeOfS.Field(i).Name == "GrpcMarshalString"{
for i := 0; i < v.NumField(); i++ {
if typeOfS.Field(i).Name == "GrpcMarshalString" {
continue
}
fmt.Printf("Field: %s\tValue: %v\n", typeOfS.Field(i).Name, v.Field(i).Interface())
}
}
func printSegmentStruct(obj *mock.Segment){
func printSegmentStruct(obj *mock.Segment) {
v := reflect.ValueOf(obj)
v = reflect.Indirect(v)
typeOfS := v.Type()
for i := 0; i< v.NumField(); i++ {
for i := 0; i < v.NumField(); i++ {
fmt.Printf("Field: %s\tValue: %v\n", typeOfS.Field(i).Name, v.Field(i).Interface())
}
}
@ -78,6 +78,10 @@ func (node *QueryNode) processCollectionCreate(id string, value string) {
println(err.Error())
}
printCollectionStruct(collection)
newCollection := node.NewCollection(collection.ID, collection.Name, collection.GrpcMarshalString)
for _, partitionTag := range collection.PartitionTags {
newCollection.NewPartition(partitionTag)
}
}
func (node *QueryNode) processSegmentCreate(id string, value string) {
@ -88,17 +92,25 @@ func (node *QueryNode) processSegmentCreate(id string, value string) {
println(err.Error())
}
printSegmentStruct(segment)
collection := node.GetCollectionByID(segment.CollectionID)
if collection != nil {
partition := collection.GetPartitionByName(segment.PartitionTag)
if partition != nil {
partition.NewSegment(int64(segment.SegmentID)) // todo change all to uint64
}
}
// segment.CollectionName
}
func (node *QueryNode) processCreate(key string, msg string) {
println("process create", key, ":", msg)
if isCollectionObj(key){
if isCollectionObj(key) {
objID := GetCollectionObjId(key)
node.processCollectionCreate(objID, msg)
}else if isSegmentObj(key){
} else if isSegmentObj(key) {
objID := GetSegmentObjId(key)
node.processSegmentCreate(objID, msg)
}else {
} else {
println("can not process create msg:", key)
}
}
@ -112,6 +124,10 @@ func (node *QueryNode) processSegmentModify(id string, value string) {
println(err.Error())
}
printSegmentStruct(segment)
seg, err := node.GetSegmentBySegmentID(int64(segment.SegmentID)) // todo change to uint64
if seg != nil {
seg.SegmentCloseTime = segment.CloseTimeStamp
}
}
func (node *QueryNode) processCollectionModify(id string, value string) {
@ -124,37 +140,37 @@ func (node *QueryNode) processCollectionModify(id string, value string) {
printCollectionStruct(collection)
}
func (node *QueryNode) processModify(key string, msg string){
func (node *QueryNode) processModify(key string, msg string) {
println("process modify")
if isCollectionObj(key){
if isCollectionObj(key) {
objID := GetCollectionObjId(key)
node.processCollectionModify(objID, msg)
}else if isSegmentObj(key){
} else if isSegmentObj(key) {
objID := GetSegmentObjId(key)
node.processSegmentModify(objID, msg)
}else {
} else {
println("can not process modify msg:", key)
}
}
func (node *QueryNode) processSegmentDelete(id string){
func (node *QueryNode) processSegmentDelete(id string) {
println("Delete segment: ", id)
}
func (node *QueryNode) processCollectionDelete(id string){
func (node *QueryNode) processCollectionDelete(id string) {
println("Delete collection: ", id)
}
func (node *QueryNode) processDelete(key string){
func (node *QueryNode) processDelete(key string) {
println("process delete")
if isCollectionObj(key){
if isCollectionObj(key) {
objID := GetCollectionObjId(key)
node.processCollectionDelete(objID)
}else if isSegmentObj(key){
} else if isSegmentObj(key) {
objID := GetSegmentObjId(key)
node.processSegmentDelete(objID)
}else {
} else {
println("can not process delete msg:", key)
}
}
@ -164,6 +180,8 @@ func (node *QueryNode) processResp(resp clientv3.WatchResponse) error {
if err != nil {
return err
}
println("processResp!!!!!\n")
for _, ev := range resp.Events {
if ev.IsCreate() {
key := string(ev.Kv.Key)
@ -185,7 +203,7 @@ func (node *QueryNode) processResp(resp clientv3.WatchResponse) error {
func (node *QueryNode) loadCollections() error {
keys, values := node.kvBase.LoadWithPrefix(CollectonPrefix)
for i:= range keys{
for i := range keys {
objID := GetCollectionObjId(keys[i])
node.processCollectionCreate(objID, values[i])
}
@ -193,7 +211,7 @@ func (node *QueryNode) loadCollections() error {
}
func (node *QueryNode) loadSegments() error {
keys, values := node.kvBase.LoadWithPrefix(SegmentPrefix)
for i:= range keys{
for i := range keys {
objID := GetSegmentObjId(keys[i])
node.processSegmentCreate(objID, values[i])
}
@ -210,7 +228,7 @@ func (node *QueryNode) InitFromMeta() error {
Endpoints: []string{etcdAddr},
DialTimeout: 5 * time.Second,
})
defer cli.Close()
//defer cli.Close()
node.kvBase = kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
node.loadCollections()
node.loadSegments()
@ -230,4 +248,4 @@ func (node *QueryNode) RunMetaService(ctx context.Context, wg *sync.WaitGroup) {
node.processResp(resp)
}
}
}
}

View File

@ -14,8 +14,8 @@ package reader
import "C"
type Partition struct {
PartitionPtr C.CPartition
PartitionName string
PartitionPtr C.CPartition
PartitionName string
OpenedSegments []*Segment
ClosedSegments []*Segment
}

View File

@ -102,6 +102,7 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
func (node *QueryNode) Close() {
node.messageClient.Close()
node.kvBase.Close()
}
func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.MessageClient) *QueryNode {
@ -156,12 +157,12 @@ func (node *QueryNode) QueryNodeDataInit() {
node.insertData = insertData
}
func (node *QueryNode) NewCollection(collectionName string, schemaConfig string) *Collection {
func (node *QueryNode) NewCollection(collectionID uint64, collectionName string, schemaConfig string) *Collection {
cName := C.CString(collectionName)
cSchema := C.CString(schemaConfig)
collection := C.NewCollection(cName, cSchema)
var newCollection = &Collection{CollectionPtr: collection, CollectionName: collectionName}
var newCollection = &Collection{CollectionPtr: collection, CollectionName: collectionName, CollectionID: collectionID}
node.Collections = append(node.Collections, newCollection)
return newCollection
@ -184,7 +185,7 @@ func (node *QueryNode) PrepareBatchMsg() []int {
func (node *QueryNode) InitQueryNodeCollection() {
// TODO: remove hard code, add collection creation request
// TODO: error handle
var newCollection = node.NewCollection("collection1", "fakeSchema")
var newCollection = node.NewCollection(0, "collection1", "fakeSchema")
var newPartition = newCollection.NewPartition("partition1")
// TODO: add segment id
var segment = newPartition.NewSegment(0)

View File

@ -1,26 +1,27 @@
package reader
import (
"testing"
masterPb "github.com/czs007/suvlim/pkg/master/grpc/master"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"testing"
)
func TestResult_PublishSearchResult(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: start pulsar server
const N = 10
var entityIDs = msgPb.Entities {
var entityIDs = msgPb.Entities{
Ids: make([]int64, N),
}
var result = msgPb.QueryResult {
Entities: &entityIDs,
var result = msgPb.QueryResult{
Entities: &entityIDs,
Distances: make([]float32, N),
}
for i := 0; i < N; i++ {
@ -33,7 +34,7 @@ func TestResult_PublishSearchResult(t *testing.T) {
func TestResult_PublishFailedSearchResult(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
@ -45,7 +46,7 @@ func TestResult_PublishFailedSearchResult(t *testing.T) {
func TestResult_PublicStatistic(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment

View File

@ -77,12 +77,18 @@ func (s *Segment) Close() error {
}
// Build index after closing segment
s.buildIndex()
go s.buildIndex()
return nil
}
func (s *Segment) GetMemSize() uint64 {
return 100000
/*C.GetMemoryUsageInBytes
long int
GetMemoryUsageInBytes(CSegmentBase c_segment);
*/
var memoryUsageInBytes = C.GetMemoryUsageInBytes(s.SegmentPtr)
return uint64(memoryUsageInBytes)
}
////////////////////////////////////////////////////////////////////////////

View File

@ -7,7 +7,7 @@ import (
func TestSegmentManagement_SegmentsManagement(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
@ -19,7 +19,7 @@ func TestSegmentManagement_SegmentsManagement(t *testing.T) {
func TestSegmentManagement_SegmentService(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
@ -31,7 +31,7 @@ func TestSegmentManagement_SegmentService(t *testing.T) {
func TestSegmentManagement_SegmentStatistic(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
@ -43,7 +43,7 @@ func TestSegmentManagement_SegmentStatistic(t *testing.T) {
func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment

View File

@ -3,16 +3,17 @@ package reader
import (
"encoding/binary"
"fmt"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/stretchr/testify/assert"
"math"
"testing"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/stretchr/testify/assert"
)
func TestSegment_ConstructorAndDestructor(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -25,7 +26,7 @@ func TestSegment_ConstructorAndDestructor(t *testing.T) {
func TestSegment_SegmentInsert(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -70,7 +71,7 @@ func TestSegment_SegmentInsert(t *testing.T) {
func TestSegment_SegmentDelete(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -95,7 +96,7 @@ func TestSegment_SegmentDelete(t *testing.T) {
func TestSegment_SegmentSearch(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -121,7 +122,7 @@ func TestSegment_SegmentSearch(t *testing.T) {
var records [][]byte
for i := 0; i < N; i++ {
ids = append(ids, int64(i))
timestamps = append(timestamps, uint64(i + 1))
timestamps = append(timestamps, uint64(i+1))
records = append(records, rawData)
}
@ -136,10 +137,10 @@ func TestSegment_SegmentSearch(t *testing.T) {
// 6. Do search
var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}"
var queryRawData = make([]float32, 0)
for i := 0; i < 16; i ++ {
for i := 0; i < 16; i++ {
queryRawData = append(queryRawData, float32(i))
}
var vectorRecord = msgPb.VectorRowRecord {
var vectorRecord = msgPb.VectorRowRecord{
FloatData: queryRawData,
}
var searchRes, searchErr = segment.SegmentSearch(queryJson, timestamps[N/2], &vectorRecord)
@ -155,7 +156,7 @@ func TestSegment_SegmentSearch(t *testing.T) {
func TestSegment_SegmentPreInsert(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -172,7 +173,7 @@ func TestSegment_SegmentPreInsert(t *testing.T) {
func TestSegment_SegmentPreDelete(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -191,7 +192,7 @@ func TestSegment_SegmentPreDelete(t *testing.T) {
func TestSegment_GetStatus(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -208,7 +209,7 @@ func TestSegment_GetStatus(t *testing.T) {
func TestSegment_Close(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -225,7 +226,7 @@ func TestSegment_Close(t *testing.T) {
func TestSegment_GetRowCount(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -274,7 +275,7 @@ func TestSegment_GetRowCount(t *testing.T) {
func TestSegment_GetDeletedCount(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -300,3 +301,52 @@ func TestSegment_GetDeletedCount(t *testing.T) {
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}
func TestSegment_GetMemSize(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
// 2. Create ids and timestamps
ids := []int64{1, 2, 3}
timestamps := []uint64{0, 0, 0}
// 3. Create records, use schema below:
// schema_tmp->AddField("fakeVec", DataType::VECTOR_FLOAT, 16);
// schema_tmp->AddField("age", DataType::INT32);
const DIM = 16
const N = 3
var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
var rawData []byte
for _, ele := range vec {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
rawData = append(rawData, buf...)
}
bs := make([]byte, 4)
binary.LittleEndian.PutUint32(bs, 1)
rawData = append(rawData, bs...)
var records [][]byte
for i := 0; i < N; i++ {
records = append(records, rawData)
}
// 4. Do PreInsert
var offset = segment.SegmentPreInsert(N)
assert.GreaterOrEqual(t, offset, int64(0))
// 5. Do Insert
var err = segment.SegmentInsert(offset, &ids, &timestamps, &records)
assert.NoError(t, err)
// 6. Get memory usage in bytes
var memSize = segment.GetMemSize()
assert.Equal(t, memSize, uint64(1048714))
// 7. Destruct node, collection, and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
}

View File

@ -29,6 +29,16 @@ func (node *QueryNode) GetKey2Segments() (*[]int64, *[]uint64, *[]int64) {
return &entityIDs, &timestamps, &segmentIDs
}
func (node *QueryNode) GetCollectionByID(collectionID uint64) *Collection {
for _, collection := range node.Collections {
if collection.CollectionID == collectionID {
return collection
}
}
return nil
}
func (node *QueryNode) GetCollectionByCollectionName(collectionName string) (*Collection, error) {
for _, collection := range node.Collections {
if collection.CollectionName == collectionName {
@ -48,3 +58,13 @@ func (node *QueryNode) GetSegmentBySegmentID(segmentID int64) (*Segment, error)
return targetSegment, nil
}
func (c *Collection) GetPartitionByName(partitionName string) (partition *Partition) {
for _, partition := range c.Partitions {
if partition.PartitionName == partitionName {
return partition
}
}
return nil
// TODO: remove from c.Partitions
}

View File

@ -1,8 +1,9 @@
package reader
import (
"github.com/stretchr/testify/assert"
"testing"
"github.com/stretchr/testify/assert"
)
func TestUtilFunctions_GetKey2Segments(t *testing.T) {
@ -12,18 +13,21 @@ func TestUtilFunctions_GetKey2Segments(t *testing.T) {
func TestUtilFunctions_GetCollectionByCollectionName(t *testing.T) {
// 1. Construct node, and collections
node := NewQueryNode(0, 0)
var _ = node.NewCollection("collection0", "fake schema")
var _ = node.NewCollection(0, "collection0", "fake schema")
// 2. Get collection by collectionName
var c0, err = node.GetCollectionByCollectionName("collection0")
assert.NoError(t, err)
assert.Equal(t, c0.CollectionName, "collection0")
c0 = node.GetCollectionByID(0)
assert.NotNil(t, c0)
assert.Equal(t, c0.CollectionID, 0)
}
func TestUtilFunctions_GetSegmentBySegmentID(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection("collection0", "fake schema")
var collection = node.NewCollection(0, "collection0", "fake schema")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment

View File

@ -14,4 +14,8 @@ install(TARGETS insert DESTINATION test)
add_executable(delete delete.cpp)
target_link_libraries(delete milvus_sdk pthread)
install(TARGETS delete DESTINATION test)
install(TARGETS delete DESTINATION test)
add_executable(create CreateCollection.cpp)
target_link_libraries(create milvus_sdk pthread)
install(TARGETS create DESTINATION test)

View File

@ -0,0 +1,46 @@
#include <Status.h>
#include <Field.h>
#include <MilvusApi.h>
#include <interface/ConnectionImpl.h>
int main() {
auto client = milvus::ConnectionImpl();
milvus::ConnectParam connect_param;
connect_param.ip_address = "localhost";
connect_param.port = "19530";
client.Connect(connect_param);
milvus::Status stat;
const std::string collectin_name = "collection0";
// Create
milvus::FieldPtr field_ptr1 = std::make_shared<milvus::Field>();
milvus::FieldPtr field_ptr2 = std::make_shared<milvus::Field>();
milvus::FieldPtr field_ptr3 = std::make_shared<milvus::Field>();
milvus::FieldPtr field_ptr4 = std::make_shared<milvus::Field>();
field_ptr1->field_name = "field_1";
field_ptr1->field_type = milvus::DataType::INT64;
field_ptr2->field_name = "field_2";
field_ptr2->field_type = milvus::DataType::FLOAT;
field_ptr3->field_name = "field_3";
field_ptr3->field_type = milvus::DataType::INT32;
field_ptr4->field_name = "field_vec";
field_ptr4->field_type = milvus::DataType::VECTOR_FLOAT;
milvus::Mapping mapping = {collectin_name, {field_ptr1, field_ptr2, field_ptr3, field_ptr4}};
stat = client.CreateCollection(mapping, "test_extra_params");
// Get Collection info
milvus::Mapping map;
client.GetCollectionInfo(collectin_name, map);
for (auto &f : map.fields) {
std::cout << f->field_name << ":" << int(f->field_type) << ":" << f->dim << "DIM" << std::endl;
}
}