milvus/pkg/streaming/proto/streaming.proto

507 lines
18 KiB
Protocol Buffer

syntax = "proto3";
package milvus.proto.streaming;
option go_package = "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb";
import "messages.proto";
import "milvus.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
//
// Common
//
// PChannelInfo is the information of a pchannel info, should only keep the
// basic info of a pchannel. It's used in many rpc and meta, so keep it simple.
message PChannelInfo {
string name = 1; // channel name
int64 term = 2; // A monotonic increasing term, every time the channel is
// recovered or moved to another streamingnode, the term
// will increase by meta server.
}
// PChannelAssignmentLog is the log of meta information of a pchannel, should
// only keep the data that is necessary to persistent.
message PChannelAssignmentLog {
int64 term = 1; // term when server assigned.
StreamingNodeInfo node =
2; // streaming node that the channel is assigned to.
}
// PChannelMetaState
enum PChannelMetaState {
PCHANNEL_META_STATE_UNKNOWN = 0; // should never used.
PCHANNEL_META_STATE_UNINITIALIZED =
1; // channel is uninitialized, never assgined to any streaming node.
PCHANNEL_META_STATE_ASSIGNING =
2; // new term is allocated, but not determined to be assgined.
PCHANNEL_META_STATE_ASSIGNED =
3; // channel is assigned to a streaming node.
PCHANNEL_META_STATE_UNAVAILABLE =
4; // channel is unavailable at this term.
}
// PChannelMeta is the meta information of a pchannel, should only keep the data
// that is necessary to persistent. It's only used in meta, so do not use it in
// rpc.
message PChannelMeta {
PChannelInfo channel = 1; // keep the meta info that current assigned to.
StreamingNodeInfo node = 2; // nil if channel is not uninitialized.
PChannelMetaState state = 3; // state of the channel.
repeated PChannelAssignmentLog histories =
4; // keep the meta info assignment log that used to be assigned to.
}
// VersionPair is the version pair of global and local.
message VersionPair {
int64 global = 1;
int64 local = 2;
}
// BroadcastTaskState is the state of the broadcast task.
enum BroadcastTaskState {
BROADCAST_TASK_STATE_UNKNOWN = 0; // should never used.
BROADCAST_TASK_STATE_PENDING = 1; // task is pending.
BROADCAST_TASK_STATE_DONE = 2; // task is done, the message is broadcasted, and the persisted task can be cleared.
}
// BroadcastTask is the task to broadcast the message.
message BroadcastTask {
int64 task_id = 1; // task id.
messages.Message message = 2; // message to be broadcast.
BroadcastTaskState state = 3; // state of the task.
}
//
// Milvus Service
//
service StreamingNodeStateService {
rpc GetComponentStates(milvus.GetComponentStatesRequest)
returns (milvus.ComponentStates) {}
}
//
// StreamingCoordBroadcastService
//
// StreamingCoordBroadcastService is the broadcast service for streaming coord.
service StreamingCoordBroadcastService {
// Broadcast receives broadcast messages from other component and make sure that the message is broadcast to all wal.
// It performs an atomic broadcast to all wal, achieve eventual consistency.
rpc Broadcast(BroadcastRequest) returns (BroadcastResponse) {}
}
// BroadcastRequest is the request of the Broadcast RPC.
message BroadcastRequest {
messages.Message message = 1; // message to be broadcast.
}
// BroadcastResponse is the response of the Broadcast RPC.
message BroadcastResponse {
map<string,ProduceMessageResponseResult> results = 1;
}
//
// StreamingCoordAssignmentService
//
// StreamingCoordAssignmentService is the global log management service.
// Server: log coord. Running on every log node.
// Client: all log publish/consuming node.
service StreamingCoordAssignmentService {
// AssignmentDiscover is used to discover all log nodes managed by the
// streamingcoord. Channel assignment information will be pushed to client
// by stream.
rpc AssignmentDiscover(stream AssignmentDiscoverRequest)
returns (stream AssignmentDiscoverResponse) {}
}
// AssignmentDiscoverRequest is the request of Discovery
message AssignmentDiscoverRequest {
oneof command {
ReportAssignmentErrorRequest report_error =
1; // report streaming error, trigger reassign right now.
CloseAssignmentDiscoverRequest close = 2; // close the stream.
}
}
// ReportAssignmentErrorRequest is the request to report assignment error
// happens.
message ReportAssignmentErrorRequest {
PChannelInfo pchannel = 1; // channel
StreamingError err = 2; // error happend on log node
}
// CloseAssignmentDiscoverRequest is the request to close the stream.
message CloseAssignmentDiscoverRequest {}
// AssignmentDiscoverResponse is the response of Discovery
message AssignmentDiscoverResponse {
oneof response {
FullStreamingNodeAssignmentWithVersion full_assignment =
1; // all assignment info.
// TODO: may be support partial assignment info in future.
CloseAssignmentDiscoverResponse close = 2;
}
}
// FullStreamingNodeAssignmentWithVersion is the full assignment info of a log
// node with version.
message FullStreamingNodeAssignmentWithVersion {
VersionPair version = 1;
repeated StreamingNodeAssignment assignments = 2;
}
message CloseAssignmentDiscoverResponse {}
// StreamingNodeInfo is the information of a streaming node.
message StreamingNodeInfo {
int64 server_id = 1;
string address = 2;
}
// StreamingNodeAssignment is the assignment info of a streaming node.
message StreamingNodeAssignment {
StreamingNodeInfo node = 1;
repeated PChannelInfo channels = 2;
}
// DeliverPolicy is the policy to deliver message.
message DeliverPolicy {
oneof policy {
google.protobuf.Empty all = 1; // deliver all messages.
google.protobuf.Empty latest = 2; // deliver the latest message.
messages.MessageID start_from =
3; // deliver message from this message id. [startFrom, ...]
messages.MessageID start_after =
4; // deliver message after this message id. (startAfter, ...]
}
}
// DeliverFilter is the filter to deliver message.
message DeliverFilter {
oneof filter {
DeliverFilterTimeTickGT time_tick_gt = 1;
DeliverFilterTimeTickGTE time_tick_gte = 2;
DeliverFilterMessageType message_type = 3;
}
}
// DeliverFilterTimeTickGT is the filter to deliver message with time tick
// greater than this value.
message DeliverFilterTimeTickGT {
uint64 time_tick =
1; // deliver message with time tick greater than this value.
}
// DeliverFilterTimeTickGTE is the filter to deliver message with time tick
// greater than or equal to this value.
message DeliverFilterTimeTickGTE {
uint64 time_tick = 1; // deliver message with time tick greater than or
// equal to this value.
}
message DeliverFilterMessageType {
// deliver message with message type.
repeated messages.MessageType message_types = 1;
}
// StreamingCode is the error code for log internal component.
enum StreamingCode {
STREAMING_CODE_OK = 0;
STREAMING_CODE_CHANNEL_NOT_EXIST = 1; // channel not exist
STREAMING_CODE_CHANNEL_FENCED = 2; // channel is fenced
STREAMING_CODE_ON_SHUTDOWN = 3; // component is on shutdown
STREAMING_CODE_INVALID_REQUEST_SEQ = 4; // invalid request sequence
STREAMING_CODE_UNMATCHED_CHANNEL_TERM = 5; // unmatched channel term
STREAMING_CODE_IGNORED_OPERATION = 6; // ignored operation
STREAMING_CODE_INNER = 7; // underlying service failure.
STREAMING_CODE_INVAILD_ARGUMENT = 8; // invalid argument
STREAMING_CODE_TRANSACTION_EXPIRED = 9; // transaction expired
STREAMING_CODE_INVALID_TRANSACTION_STATE = 10; // invalid transaction state
STREAMING_CODE_UNRECOVERABLE = 11; // unrecoverable error
STREAMING_CODE_UNKNOWN = 999; // unknown error
}
// StreamingError is the error type for log internal component.
message StreamingError {
StreamingCode code = 1;
string cause = 2;
}
//
// StreamingNodeHandlerService
//
// StreamingNodeHandlerService is the service to handle log messages.
// All handler operation will be blocked until the channel is ready read or
// write on that log node. Server: all log node. Running on every log node.
// Client: all log produce or consuming node.
service StreamingNodeHandlerService {
// Produce is a bi-directional streaming RPC to send messages to a channel.
// All messages sent to a channel will be assigned a unique messageID.
// The messageID is used to identify the message in the channel.
// The messageID isn't promised to be monotonous increasing with the
// sequence of responsing. Error: If channel isn't assign to this log node,
// the RPC will return error CHANNEL_NOT_EXIST. If channel is moving away to
// other log node, the RPC will return error CHANNEL_FENCED.
rpc Produce(stream ProduceRequest) returns (stream ProduceResponse) {};
// Consume is a server streaming RPC to receive messages from a channel.
// All message after given startMessageID and excluding will be sent to the
// client by stream. If no more message in the channel, the stream will be
// blocked until new message coming. Error: If channel isn't assign to this
// log node, the RPC will return error CHANNEL_NOT_EXIST. If channel is
// moving away to other log node, the RPC will return error CHANNEL_FENCED.
rpc Consume(stream ConsumeRequest) returns (stream ConsumeResponse) {};
}
// ProduceRequest is the request of the Produce RPC.
// Channel name will be passthrough in the header of stream bu not in the
// request body.
message ProduceRequest {
oneof request {
ProduceMessageRequest produce = 1;
CloseProducerRequest close = 2;
}
}
// CreateProducerRequest is the request of the CreateProducer RPC.
// CreateProducerRequest is passed in the header of stream.
message CreateProducerRequest {
PChannelInfo pchannel = 1;
}
// ProduceMessageRequest is the request of the Produce RPC.
message ProduceMessageRequest {
int64 request_id = 1; // request id for reply.
messages.Message message = 2; // message to be sent.
}
// CloseProducerRequest is the request of the CloseProducer RPC.
// After CloseProducerRequest is requested, no more ProduceRequest can be sent.
message CloseProducerRequest {}
// ProduceResponse is the response of the Produce RPC.
message ProduceResponse {
oneof response {
CreateProducerResponse create = 1;
ProduceMessageResponse produce = 2;
CloseProducerResponse close = 3;
}
}
// CreateProducerResponse is the result of the CreateProducer RPC.
message CreateProducerResponse {
string wal_name = 1; // wal name at server side.
int64 producer_server_id = 2; // A unique producer server id on streamingnode
// for this producer in streamingnode lifetime.
// Is used to identify the producer in streamingnode for other unary grpc
// call at producer level.
}
// ProduceMessageResponse is the response of the ProduceMessage RPC.
message ProduceMessageResponse {
int64 request_id = 1;
oneof response {
ProduceMessageResponseResult result = 2;
StreamingError error = 3;
}
}
// ProduceMessageResponseResult is the result of the produce message streaming
// RPC.
message ProduceMessageResponseResult {
messages.MessageID id = 1; // the offset of the message in the channel.
uint64 timetick = 2; // the timetick of that message sent.
messages.TxnContext txnContext = 3; // the txn context of the message.
google.protobuf.Any extra = 4; // the extra message.
}
// CloseProducerResponse is the result of the CloseProducer RPC.
message CloseProducerResponse {}
// ConsumeRequest is the request of the Consume RPC.
// Add more control block in future.
message ConsumeRequest {
oneof request {
CreateVChannelConsumerRequest create_vchannel_consumer = 1;
CreateVChannelConsumersRequest create_vchannel_consumers =
2; // Create multiple vchannel consumers, used for recovery in future.
CloseVChannelConsumerRequest close_vchannel = 3;
CloseConsumerRequest close = 4;
}
}
// CloseConsumerRequest is the request of the CloseConsumer RPC.
// After CloseConsumerRequest is requested, no more ConsumeRequest can be sent.
message CloseConsumerRequest {}
// CreateConsumerRequest is the request of the CreateConsumer RPC.
// CreateConsumerRequest is passed in the header of stream.
message CreateConsumerRequest {
PChannelInfo pchannel = 1;
}
message CreateVChannelConsumersRequest {
repeated CreateVChannelConsumerRequest create_vchannels = 1;
}
// CreateVChannelConsumerRequest is the request of the CreateVChannelConsumer
// RPC. It's used to create a new vchannel consumer at server side.
message CreateVChannelConsumerRequest {
string vchannel = 1;
DeliverPolicy deliver_policy = 2; // deliver policy.
repeated DeliverFilter deliver_filters = 3; // deliver filter.
}
// ConsumeMessageRequest is the request of the Consume RPC.
message CreateVChannelConsumersResponse {
repeated CreateVChannelConsumerResponse create_vchannels = 1;
}
// CreateVChannelConsumerResponse is the response of the CreateVChannelConsumer
// RPC.
message CreateVChannelConsumerResponse {
oneof response {
int64 consumer_id = 1;
StreamingError error = 2;
}
}
// CloseVChannelConsumerRequest is the request of the CloseVChannelConsumer RPC.
message CloseVChannelConsumerRequest {
int64 consumer_id = 1;
}
// CloseVChannelConsumerResponse is the response of the CloseVChannelConsumer
// RPC.
message CloseVChannelConsumerResponse {
int64 consumer_id = 1;
}
// ConsumeResponse is the reponse of the Consume RPC.
message ConsumeResponse {
oneof response {
CreateConsumerResponse create = 1;
ConsumeMessageReponse consume = 2;
CreateVChannelConsumerResponse create_vchannel = 3;
CreateVChannelConsumersResponse create_vchannels = 4;
CloseVChannelConsumerResponse close_vchannel = 5;
CloseConsumerResponse close = 6;
}
}
message CreateConsumerResponse {
string wal_name = 1; // wal name at server side.
// A unique consumer id on streamingnode for this
// consumer in streamingnode lifetime.
int64 consumer_server_id = 2;
}
message ConsumeMessageReponse {
int64 consumer_id = 1;
messages.ImmutableMessage message = 2;
}
message CloseConsumerResponse {}
//
// StreamingNodeManagerService
//
// StreamingNodeManagerService is the log manage operation on log node.
// Server: all log node. Running on every log node.
// Client: log coord. There should be only one client globally to call this
// service on all streamingnode.
service StreamingNodeManagerService {
// Assign is a unary RPC to assign a channel on a log node.
// Block until the channel assignd is ready to read or write on the log
// node. Error: If the channel already exists, return error with code
// CHANNEL_EXIST.
rpc Assign(StreamingNodeManagerAssignRequest)
returns (StreamingNodeManagerAssignResponse) {};
// Remove is unary RPC to remove a channel on a log node.
// Data of the channel on flying would be sent or flused as much as
// possible. Block until the resource of channel is released on the log
// node. New incoming request of handler of this channel will be rejected
// with special error. Error: If the channel does not exist, return error
// with code CHANNEL_NOT_EXIST.
rpc Remove(StreamingNodeManagerRemoveRequest)
returns (StreamingNodeManagerRemoveResponse) {};
// rpc CollectStatus() ...
// CollectStatus is unary RPC to collect all avaliable channel info and load
// balance info on a log node. Used to recover channel info on log coord,
// collect balance info and health check.
rpc CollectStatus(StreamingNodeManagerCollectStatusRequest)
returns (StreamingNodeManagerCollectStatusResponse) {};
}
// StreamingManagerAssignRequest is the request message of Assign RPC.
message StreamingNodeManagerAssignRequest {
PChannelInfo pchannel = 1;
}
message StreamingNodeManagerAssignResponse {}
message StreamingNodeManagerRemoveRequest {
PChannelInfo pchannel = 1;
}
message StreamingNodeManagerRemoveResponse {}
message StreamingNodeManagerCollectStatusRequest {}
message StreamingNodeBalanceAttributes {
// TODO: traffic of pchannel or other things.
}
message StreamingNodeManagerCollectStatusResponse {
StreamingNodeBalanceAttributes balance_attributes = 1;
}
///
/// SegmentAssignment
///
// SegmentAssignmentMeta is the stat of segment assignment.
// These meta is only used to recover status at streaming node segment
// assignment, don't use it outside.
// Used to storage the segment assignment stat
// at meta-store. The WAL use it to determine when to make the segment sealed.
message SegmentAssignmentMeta {
int64 collection_id = 1;
int64 partition_id = 2;
int64 segment_id = 3;
string vchannel = 4;
SegmentAssignmentState state = 5;
SegmentAssignmentStat stat = 6;
}
// SegmentAssignmentState is the state of segment assignment.
// The state machine can be described as following:
// 1. PENDING -> GROWING -> SEALED -> FLUSHED
enum SegmentAssignmentState {
SEGMENT_ASSIGNMENT_STATE_UNKNOWN = 0; // should never used.
SEGMENT_ASSIGNMENT_STATE_PENDING = 1;
SEGMENT_ASSIGNMENT_STATE_GROWING = 2;
SEGMENT_ASSIGNMENT_STATE_SEALED = 3;
SEGMENT_ASSIGNMENT_STATE_FLUSHED = 4; // can never be seen, because it's
// removed physically when enter FLUSHED.
}
// SegmentAssignmentStat is the stat of segment assignment.
message SegmentAssignmentStat {
uint64 max_binary_size = 1;
uint64 inserted_rows = 2;
uint64 inserted_binary_size = 3;
int64 create_timestamp_nanoseconds = 4;
int64 last_modified_timestamp_nanoseconds = 5;
uint64 binlog_counter = 6;
uint64 create_segment_time_tick = 7; // The timetick of create segment message in wal.
}