Merge pull request #847 from influxdata/grpc-management-api-stubs

feat: gRPC management api stubs
pull/24376/head
kodiakhq[bot] 2021-02-24 14:30:05 +00:00 committed by GitHub
commit 7f96515968
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 427 additions and 31 deletions

View File

@ -71,7 +71,7 @@ jobs:
args: --workspace
lints:
name: Lints
name: Rust Lints
runs-on: ubuntu-latest
container:
image: quay.io/influxdb/rust:ci
@ -91,3 +91,13 @@ jobs:
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-targets --workspace -- -D warnings
protobuf:
name: Protobuf Lints
runs-on: ubuntu-latest
container:
image: bufbuild/buf
steps:
- uses: actions/checkout@v2
- name: Lint IOx protobuf
run: buf lint

17
buf.yaml Normal file
View File

@ -0,0 +1,17 @@
version: v1beta1
build:
roots:
- generated_types/protos/
excludes:
- generated_types/protos/com
- generated_types/protos/influxdata/platform
lint:
use:
- DEFAULT
- STYLE_DEFAULT
breaking:
use:
- WIRE
- WIRE_JSON

View File

@ -63,7 +63,7 @@ pub struct DatabaseRules {
/// If set to `true`, this server should answer queries from one or more of
/// of its local write buffer and any read-only partitions that it knows
/// about. In this case, results will be merged with any others from the
/// remote goups or read only partitions.
/// remote goups or read-only partitions.
#[serde(default)]
pub query_local: bool,
/// Set `primary_query_group` to a host group if remote servers should be
@ -269,7 +269,7 @@ pub struct WalBufferConfig {
/// still try to hold as much in memory as possible while remaining
/// below this threshold
pub buffer_size: u64,
/// WAL segments become read only after crossing over this size. Which means
/// WAL segments become read-only after crossing over this size. Which means
/// that segments will always be >= this size. When old segments are
/// dropped from of memory, at least this much space will be freed from
/// the buffer.
@ -300,7 +300,7 @@ pub struct WalBufferConfig {
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Copy)]
pub enum WalBufferRollover {
/// Drop the old segment even though it hasn't been persisted. This part of
/// the WAl will be lost on this server.
/// the WAL will be lost on this server.
DropOldSegment,
/// Drop the incoming write and fail silently. This favors making sure that
/// older WAL data will be backed up.

View File

@ -10,7 +10,7 @@ type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;
fn main() -> Result<()> {
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("protos");
generate_grpc_types(&root)?;
generate_wal_types(&root)?;
@ -20,16 +20,25 @@ fn main() -> Result<()> {
/// Schema used with IOx specific gRPC requests
///
/// Creates `influxdata.platform.storage.rs` and
/// `com.github.influxdata.idpe.storage.read.rs`
/// Creates
/// - `influxdata.platform.storage.rs`
/// - `com.github.influxdata.idpe.storage.read.rs`
/// - `influxdata.iox.management.v1.rs`
fn generate_grpc_types(root: &Path) -> Result<()> {
let storage_path = root.join("influxdata/platform/storage");
let idpe_path = root.join("com/github/influxdata/idpe/storage/read");
let management_path = root.join("influxdata/iox/management/v1");
let proto_files = vec![
root.join("test.proto"),
root.join("predicate.proto"),
root.join("storage_common.proto"),
root.join("storage_common_idpe.proto"),
root.join("service.proto"),
root.join("source.proto"),
storage_path.join("test.proto"),
storage_path.join("predicate.proto"),
storage_path.join("storage_common.proto"),
storage_path.join("service.proto"),
storage_path.join("storage_common_idpe.proto"),
idpe_path.join("source.proto"),
management_path.join("base_types.proto"),
management_path.join("database_rules.proto"),
management_path.join("service.proto"),
];
// Tell cargo to recompile if any of these proto files are changed

View File

@ -0,0 +1,30 @@
syntax = "proto3";
package influxdata.iox.management.v1;
enum Order {
ORDER_UNSPECIFIED = 0;
ORDER_ASC = 1;
ORDER_DESC = 2;
}
enum Aggregate {
AGGREGATE_UNSPECIFIED = 0;
AGGREGATE_MIN = 1;
AGGREGATE_MAX = 2;
}
enum ColumnType {
COLUMN_TYPE_UNSPECIFIED = 0;
COLUMN_TYPE_I64 = 1;
COLUMN_TYPE_U64 = 2;
COLUMN_TYPE_F64 = 3;
COLUMN_TYPE_STRING = 4;
COLUMN_TYPE_BOOL = 5;
}
message HostGroup {
string id = 1;
// connection strings for remote hosts.
repeated string hosts = 2;
}

View File

@ -0,0 +1,248 @@
syntax = "proto3";
package influxdata.iox.management.v1;
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "influxdata/iox/management/v1/base_types.proto";
// `PartitionTemplate` is used to compute the partition key of each row that
// gets written. It can consist of the table name, a column name and its value,
// a formatted time, or a string column and regex captures of its value. For
// columns that do not appear in the input row, a blank value is output.
//
// The key is constructed in order of the template parts; thus ordering changes
// what partition key is generated.
message PartitionTemplate {
message Part {
message ColumnFormat {
string column = 1;
string format = 2;
}
oneof part {
google.protobuf.Empty table = 1;
string column = 2;
string time = 3;
ColumnFormat regex = 4;
ColumnFormat strf_time = 5;
}
}
repeated Part parts = 1;
}
message Matcher {
// A query predicate to filter rows
string predicate = 1;
// Restrict selection to a specific table or tables specified by a regex
oneof table_matcher {
google.protobuf.Empty all = 2;
string table = 3;
string regex = 4;
}
}
message ReplicationConfig {
// The set of host groups that data should be replicated to. Which host a
// write goes to within a host group is determined by consistent hashing of
// the partition key. We'd use this to create a host group per
// availability zone, so you might have 5 availability zones with 2
// hosts in each. Replication will ensure that N of those zones get a
// write. For each zone, only a single host needs to get the write.
// Replication is for ensuring a write exists across multiple hosts
// before returning success. Its purpose is to ensure write durability,
// rather than write availability for query (this is covered by
// subscriptions).
repeated string replications = 1;
// The minimum number of host groups to replicate a write to before success
// is returned. This can be overridden on a per request basis.
// Replication will continue to write to the other host groups in the
// background.
uint32 replication_count = 2;
// How long the replication queue can get before either rejecting writes or
// dropping missed writes. The queue is kept in memory on a
// per-database basis. A queue size of zero means it will only try to
// replicate synchronously and drop any failures.
uint64 replication_queue_max_size = 3;
}
message SubscriptionConfig {
message Subscription {
string name = 1;
string host_group_id = 2;
Matcher matcher = 3;
}
// `subscriptions` are used for query servers to get data via either push
// or pull as it arrives. They are separate from replication as they
// have a different purpose. They're for query servers or other clients
// that want to subscribe to some subset of data being written in. This
// could either be specific partitions, ranges of partitions, tables, or
// rows matching some predicate.
repeated Subscription subscriptions = 1;
}
message QueryConfig {
// If set to `true`, this server should answer queries from one or more of
// of its local write buffer and any read-only partitions that it knows
// about. In this case, results will be merged with any others from the
// remote goups or read-only partitions.
bool query_local = 1;
// Set `primary` to a host group if remote servers should be
// issued queries for this database. All hosts in the group should be
// queried with this server acting as the coordinator that merges
// results together.
string primary = 2;
// If a specific host in the primary group is unavailable,
// another host in the same position from a secondary group should be
// queried. For example, imagine we've partitioned the data in this DB into
// 4 partitions and we are replicating the data across 3 availability
// zones. We have 4 hosts in each of those AZs, thus they each have 1
// partition. We'd set the primary group to be the 4 hosts in the same
// AZ as this one, and the secondary groups as the hosts in the other 2 AZs.
repeated string secondaries = 3;
// Use `readOnlyPartitions` when a server should answer queries for
// partitions that come from object storage. This can be used to start
// up a new query server to handle queries by pointing it at a
// collection of partitions and then telling it to also pull
// data from the replication servers (writes that haven't been snapshotted
// into a partition).
repeated string read_only_partitions = 4;
}
message WalBufferConfig {
enum Rollover {
ROLLOVER_UNSPECIFIED = 0;
// Drop the old segment even though it hasn't been persisted. This part of
// the WAL will be lost on this server.
ROLLOVER_DROP_OLD_SEGMENT = 1;
// Drop the incoming write and fail silently. This favors making sure that
// older WAL data will be backed up.
ROLLOVER_DROP_INCOMING = 2;
// Reject the incoming write and return an error. The client may retry the
// request, which will succeed once the oldest segment has been
// persisted to object storage.
ROLLOVER_RETURN_ERROR = 3;
}
// The size the WAL buffer should be limited to. Once the buffer gets to
// this size it will drop old segments to remain below this size, but
// still try to hold as much in memory as possible while remaining
// below this threshold
uint64 buffer_size = 1;
// WAL segments become read-only after crossing over this size. Which means
// that segments will always be >= this size. When old segments are
// dropped from of memory, at least this much space will be freed from
// the buffer.
uint64 segment_size = 2;
// What should happen if a write comes in that would exceed the WAL buffer
// size and the oldest segment that could be dropped hasn't yet been
// persisted to object storage. If the oldest segment has been
// persisted, then it will be dropped from the buffer so that new writes
// can be accepted. This option is only for defining the behavior of what
// happens if that segment hasn't been persisted. If set to return an
// error, new writes will be rejected until the oldest segment has been
// persisted so that it can be cleared from memory. Alternatively, this
// can be set so that old segments are dropped even if they haven't been
// persisted. This setting is also useful for cases where persistence
// isn't being used and this is only for in-memory buffering.
Rollover buffer_rollover = 3;
// If set to true, buffer segments will be written to object storage.
bool persist_segments = 4;
// If set, segments will be rolled over after this period of time even
// if they haven't hit the size threshold. This allows them to be written
// out to object storage as they must be immutable first.
google.protobuf.Duration close_segment_after = 5;
}
message MutableBufferConfig {
message PartitionDropOrder {
message ColumnSort {
string column_name = 1;
ColumnType column_type = 2;
Aggregate column_value = 3;
}
// Sort partitions by this order. Last will be dropped first.
Order order = 1;
// Configure sort key
oneof sort {
// The last time the partition received a write.
google.protobuf.Empty last_write_time = 2;
// When the partition was opened in the mutable buffer.
google.protobuf.Empty created_at_time = 3;
// A column name, its expected type, and whether to use the min or max
// value. The ColumnType is necessary because the column can appear in
// any number of tables and be of a different type. This specifies that
// when sorting partitions, only columns with the given name and type
// should be used for the purposes of determining the partition order. If a
// partition doesn't have the given column in any way, the partition will
// appear at the beginning of the list with a null value where all
// partitions having null for that value will then be
// sorted by created_at_time desc. So if none of the partitions in the
// mutable buffer had this column with this type, then the partition
// that was created first would appear last in the list and thus be the
// first up to be dropped.
ColumnSort column = 4;
}
}
// The size the mutable buffer should be limited to. Once the buffer gets
// to this size it will drop partitions in the given order. If unable
// to drop partitions (because of later rules in this config) it will
// reject writes until it is able to drop partitions.
uint64 buffer_size = 1;
// If set, the mutable buffer will not drop partitions that have chunks
// that have not yet been persisted. Thus it will reject writes if it
// is over size and is unable to drop partitions. The default is to
// drop partitions in the sort order, regardless of whether they have
// unpersisted chunks or not. The WAL Buffer can be used to ensure
// persistence, but this may cause longer recovery times.
bool reject_if_not_persisted = 2;
// Configure order to drop partitions in
PartitionDropOrder partition_drop_order = 3;
// Attempt to persist partitions after they haven't received a write for
// this number of seconds. If not set, partitions won't be
// automatically persisted.
uint32 persist_after_cold_seconds = 4;
}
message DatabaseRules {
// The unencoded name of the database
string name = 1;
// Template that generates a partition key for each row inserted into the database
PartitionTemplate partition_template = 2;
// Synchronous replication configuration for this database
ReplicationConfig replication_config = 3;
// Asynchronous pull-based subscription configuration for this database
SubscriptionConfig subscription_config = 4;
// Query configuration for this database
QueryConfig query_config = 5;
// WAL configuration for this database
WalBufferConfig wal_buffer_config = 6;
// Mutable buffer configuration for this database
MutableBufferConfig mutable_buffer_config = 7;
}

View File

@ -0,0 +1,49 @@
syntax = "proto3";
package influxdata.iox.management.v1;
import "google/protobuf/empty.proto";
import "influxdata/iox/management/v1/database_rules.proto";
service ManagementService {
rpc GetWriterId(GetWriterIdRequest) returns (GetWriterIdResponse);
rpc UpdateWriterId(UpdateWriterIdRequest) returns (UpdateWriterIdResponse);
rpc ListDatabases(ListDatabasesRequest) returns (ListDatabasesResponse);
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse);
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
}
message GetWriterIdRequest {}
message GetWriterIdResponse {
uint32 id = 1;
}
message UpdateWriterIdRequest {
uint32 id = 1;
}
message UpdateWriterIdResponse {}
message ListDatabasesRequest {}
message ListDatabasesResponse {
repeated string names = 1;
}
message GetDatabaseRequest {
string name = 1;
}
message GetDatabaseResponse {
DatabaseRules rules = 1;
}
message CreateDatabaseRequest {
DatabaseRules rules = 1;
}
message CreateDatabaseResponse {}

View File

@ -8,9 +8,8 @@ syntax = "proto3";
package influxdata.platform.storage;
import "google/protobuf/empty.proto";
import "storage_common.proto";
import "storage_common_idpe.proto";
import "influxdata/platform/storage/storage_common.proto";
import "influxdata/platform/storage/storage_common_idpe.proto";
service Storage {
// ReadFilter performs a filter operation at storage

View File

@ -8,7 +8,7 @@ syntax = "proto3";
package influxdata.platform.storage;
import "google/protobuf/any.proto";
import "predicate.proto";
import "influxdata/platform/storage/predicate.proto";
message ReadFilterRequest {

View File

@ -10,8 +10,8 @@ syntax = "proto3";
package influxdata.platform.storage;
import "google/protobuf/any.proto";
import "predicate.proto";
import "storage_common.proto";
import "influxdata/platform/storage/predicate.proto";
import "influxdata/platform/storage/storage_common.proto";
message ReadSeriesCardinalityRequest {
google.protobuf.Any read_series_cardinality_source = 1;

View File

@ -9,21 +9,55 @@
clippy::clone_on_ref_ptr
)]
include!(concat!(env!("OUT_DIR"), "/influxdata.platform.storage.rs"));
include!(concat!(
env!("OUT_DIR"),
"/com.github.influxdata.idpe.storage.read.rs"
));
include!(concat!(env!("OUT_DIR"), "/wal_generated.rs"));
mod pb {
pub mod influxdata {
pub mod platform {
pub mod storage {
include!(concat!(env!("OUT_DIR"), "/influxdata.platform.storage.rs"));
// Can't implement `Default` because `prost::Message` implements `Default`
impl TimestampRange {
pub fn max() -> Self {
TimestampRange {
start: std::i64::MIN,
end: std::i64::MAX,
// Can't implement `Default` because `prost::Message` implements `Default`
impl TimestampRange {
pub fn max() -> Self {
TimestampRange {
start: std::i64::MIN,
end: std::i64::MAX,
}
}
}
}
}
pub mod iox {
pub mod management {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.management.v1.rs"));
}
}
}
}
pub mod com {
pub mod github {
pub mod influxdata {
pub mod idpe {
pub mod storage {
pub mod read {
include!(concat!(
env!("OUT_DIR"),
"/com.github.influxdata.idpe.storage.read.rs"
));
}
}
}
}
}
}
}
include!(concat!(env!("OUT_DIR"), "/wal_generated.rs"));
pub use pb::com::github::influxdata::idpe::storage::read::*;
pub use pb::influxdata::platform::storage::*;
pub use google_types as google;
pub use pb::influxdata;