Merge pull request #4486 from influxdata/jpg/command-cleanup

feat: Remove outdated management APIs / CLI commands
pull/24376/head
kodiakhq[bot] 2022-05-06 14:41:32 +00:00 committed by GitHub
commit ef78cb8552
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 247 additions and 3902 deletions

View File

@ -568,7 +568,9 @@ workflows:
- docs-lint
- test
- test_heappy
- test_perf
# Temporarily disable until perf can be updated to use NG
# https://github.com/influxdata/influxdb_iox/issues/4485
# - test_perf
- build_dev
- doc
- workspace_hack_checks
@ -588,7 +590,9 @@ workflows:
- docs-lint
- test
- test_heappy
- test_perf
# Temporarily disable until perf can be updated to use NG
# https://github.com/influxdata/influxdb_iox/issues/4485
# - test_perf
- build_dev
- build_release
- doc

175
Cargo.lock generated
View File

@ -145,7 +145,7 @@ dependencies = [
"arrow",
"chrono",
"comfy-table",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"num-traits",
"rand",
"snafu",
@ -665,9 +665,9 @@ dependencies = [
[[package]]
name = "clap"
version = "3.1.15"
version = "3.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85a35a599b11c089a7f49105658d089b8f2cf0882993c17daf6de15285c2c35d"
checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db"
dependencies = [
"atty",
"bitflags",
@ -684,7 +684,7 @@ dependencies = [
name = "clap_blocks"
version = "0.1.0"
dependencies = [
"clap 3.1.15",
"clap 3.1.12",
"data_types",
"futures",
"humantime",
@ -718,9 +718,9 @@ dependencies = [
[[package]]
name = "clap_lex"
version = "0.2.0"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a37c35f1112dad5e6e0b1adaff798507497a18fceeb30cceb3bae7d1427b9213"
checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669"
dependencies = [
"os_str_bytes",
]
@ -1206,7 +1206,7 @@ dependencies = [
"datafusion-expr",
"datafusion-physical-expr",
"futures",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"lazy_static",
"log",
"num_cpus",
@ -1271,7 +1271,7 @@ dependencies = [
"chrono",
"datafusion-common",
"datafusion-expr",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"lazy_static",
"md-5 0.10.1",
"ordered-float 3.0.0",
@ -1318,7 +1318,7 @@ dependencies = [
"datafusion_util",
"dml",
"futures",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"internal_types",
"iox_object_store",
"iox_time",
@ -1461,7 +1461,7 @@ version = "0.1.0"
dependencies = [
"arrow_util",
"data_types",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"iox_time",
"mutable_batch",
"mutable_batch_lp",
@ -1929,9 +1929,9 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.12.1"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3"
checksum = "8c21d40587b92fa6a6c6e3c1bdbf87d75511db5672f9c93175574b3a00df1758"
dependencies = [
"ahash",
]
@ -2210,9 +2210,8 @@ dependencies = [
"byteorder",
"bytes",
"chrono",
"clap 3.1.15",
"clap 3.1.12",
"clap_blocks",
"comfy-table",
"compactor",
"console-subscriber",
"csv",
@ -2225,7 +2224,7 @@ dependencies = [
"flate2",
"futures",
"generated_types",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"heappy",
"http",
"humantime",
@ -2241,10 +2240,8 @@ dependencies = [
"iox_time",
"ioxd_common",
"ioxd_compactor",
"ioxd_database",
"ioxd_ingester",
"ioxd_querier",
"ioxd_router",
"ioxd_router2",
"ioxd_test",
"itertools",
@ -2526,7 +2523,7 @@ version = "0.1.0"
dependencies = [
"chrono",
"chrono-english",
"clap 3.1.15",
"clap 3.1.12",
"criterion",
"data_types",
"futures",
@ -2555,7 +2552,7 @@ dependencies = [
"assert_matches",
"async-trait",
"chrono",
"clap 3.1.15",
"clap 3.1.12",
"dotenv",
"futures",
"glob",
@ -2658,13 +2655,13 @@ dependencies = [
"async-trait",
"bytes",
"chrono",
"clap 3.1.15",
"clap 3.1.12",
"clap_blocks",
"data_types",
"dml",
"flate2",
"futures",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"http",
"hyper",
"log",
@ -2722,60 +2719,6 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "ioxd_database"
version = "0.1.0"
dependencies = [
"arrow",
"arrow-flight",
"arrow_util",
"async-trait",
"bytes",
"clap 3.1.15",
"clap_blocks",
"data_types",
"db",
"dml",
"futures",
"generated_types",
"http",
"hyper",
"influxdb_iox_client",
"influxdb_storage_client",
"ioxd_common",
"job_registry",
"metric",
"mutable_batch_pb",
"object_store",
"observability_deps",
"prost",
"query",
"reqwest",
"schema",
"serde",
"serde_json",
"serde_urlencoded",
"server",
"service_common",
"service_grpc_flight",
"service_grpc_influxrpc",
"service_grpc_testing",
"snafu",
"test_helpers",
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"tonic",
"tonic-health",
"tonic-reflection",
"trace",
"trace_exporters",
"trace_http",
"tracker",
"uuid 0.8.2",
"workspace-hack",
]
[[package]]
name = "ioxd_ingester"
version = "0.1.0"
@ -2838,36 +2781,6 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "ioxd_router"
version = "0.1.0"
dependencies = [
"async-trait",
"data_types",
"dml",
"generated_types",
"http",
"hyper",
"iox_time",
"ioxd_common",
"metric",
"mutable_batch_pb",
"regex",
"reqwest",
"router",
"service_grpc_testing",
"snafu",
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"tonic",
"tonic-health",
"tonic-reflection",
"trace",
"trace_http",
"workspace-hack",
]
[[package]]
name = "ioxd_router2"
version = "0.1.0"
@ -2877,7 +2790,7 @@ dependencies = [
"clap_blocks",
"data_types2",
"generated_types",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"hyper",
"iox_catalog",
"iox_time",
@ -2907,7 +2820,7 @@ name = "ioxd_test"
version = "0.1.0"
dependencies = [
"async-trait",
"clap 3.1.15",
"clap 3.1.12",
"generated_types",
"hyper",
"ioxd_common",
@ -3215,9 +3128,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.125"
version = "0.2.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b"
checksum = "21a41fed9d98f27ab1c6d161da622a4fa35e8a54a8adc24bbf3ddd0ef70b0e50"
[[package]]
name = "libloading"
@ -3241,7 +3154,7 @@ version = "0.1.0"
dependencies = [
"data_types",
"futures",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"internal_types",
"iox_time",
"observability_deps",
@ -3275,9 +3188,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.17"
version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8"
dependencies = [
"cfg-if",
]
@ -3369,9 +3282,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "memchr"
version = "2.5.0"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
name = "memmap2"
@ -3506,7 +3419,7 @@ dependencies = [
"arrow_util",
"chrono",
"data_types",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"itertools",
"rand",
"schema",
@ -3521,7 +3434,7 @@ dependencies = [
"arrow_util",
"assert_matches",
"criterion",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"influxdb_line_protocol",
"mutable_batch",
"schema",
@ -3536,7 +3449,7 @@ dependencies = [
"arrow_util",
"dml",
"generated_types",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"mutable_batch",
"mutable_batch_lp",
"schema",
@ -3699,9 +3612,9 @@ dependencies = [
[[package]]
name = "num-integer"
version = "0.1.45"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
@ -4588,7 +4501,7 @@ dependencies = [
"datafusion_util",
"executor",
"futures",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"itertools",
"observability_deps",
"parking_lot 0.12.0",
@ -4796,7 +4709,7 @@ dependencies = [
"data_types",
"datafusion 0.1.0",
"either",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"itertools",
"metric",
"observability_deps",
@ -4938,7 +4851,7 @@ dependencies = [
"cache_loader_async",
"data_types",
"dml",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"influxdb_iox_client",
"iox_time",
"metric",
@ -4968,7 +4881,7 @@ dependencies = [
"flate2",
"futures",
"generated_types",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"hyper",
"influxdb_line_protocol",
"iox_catalog",
@ -5271,7 +5184,7 @@ name = "schema"
version = "0.1.0"
dependencies = [
"arrow",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"indexmap",
"itertools",
"snafu",
@ -5490,7 +5403,7 @@ dependencies = [
"futures",
"futures-util",
"generated_types",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"influxdb_line_protocol",
"internal_types",
"iox_object_store",
@ -6482,7 +6395,7 @@ version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"clap 3.1.15",
"clap 3.1.12",
"futures",
"observability_deps",
"snafu",
@ -6497,7 +6410,7 @@ name = "trace_http"
version = "0.1.0"
dependencies = [
"futures",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"http",
"http-body",
"itertools",
@ -6603,7 +6516,7 @@ name = "tracker"
version = "0.1.0"
dependencies = [
"futures",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"iox_time",
"lock_api",
"metric",
@ -6628,7 +6541,7 @@ dependencies = [
name = "trogging"
version = "0.1.0"
dependencies = [
"clap 3.1.15",
"clap 3.1.12",
"logfmt",
"observability_deps",
"regex",
@ -7076,7 +6989,7 @@ dependencies = [
"futures-util",
"getrandom",
"hashbrown 0.11.2",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"heck 0.4.0",
"hyper",
"hyper-rustls",
@ -7136,7 +7049,7 @@ dependencies = [
"dotenv",
"futures",
"generated_types",
"hashbrown 0.12.1",
"hashbrown 0.12.0",
"http",
"httparse",
"iox_time",

View File

@ -37,8 +37,6 @@ members = [
"ioxd_compactor",
"ioxd_ingester",
"ioxd_querier",
"ioxd_database",
"ioxd_router",
"ioxd_router2",
"ioxd_test",
"job_registry",

View File

@ -25,9 +25,7 @@ iox_catalog = { path = "../iox_catalog" }
iox_object_store = { path = "../iox_object_store" }
ioxd_common = { path = "../ioxd_common"}
ioxd_compactor = { path = "../ioxd_compactor"}
ioxd_database = { path = "../ioxd_database"}
ioxd_ingester = { path = "../ioxd_ingester"}
ioxd_router = { path = "../ioxd_router"}
ioxd_router2 = { path = "../ioxd_router2"}
ioxd_querier = { path = "../ioxd_querier"}
ioxd_test = { path = "../ioxd_test"}
@ -70,8 +68,6 @@ byteorder = "1.3.4"
bytes = "1.0"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "env"] }
# used by arrow/datafusion anyway
comfy-table = { version = "5.0", default-features = false }
console-subscriber = { version = "0.1.5", optional = true, features = ["parking_lot"] }
csv = "1.1"
dotenv = "0.15.0"

View File

@ -1,258 +0,0 @@
//! This module implements the `database` CLI command
use influxdb_iox_client::{
connection::Connection,
flight::{self, generated_types::ReadInfo},
format::QueryOutputFormat,
write,
};
use iox_time::TimeProvider;
use std::{fs::File, io::Read, num::NonZeroU64, path::PathBuf, str::FromStr, time::Duration};
use thiserror::Error;
use uuid::Uuid;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("Error reading file {:?}: {}", file_name, source)]
ReadingFile {
file_name: PathBuf,
source: std::io::Error,
},
#[error("Error formatting: {0}")]
FormattingError(#[from] influxdb_iox_client::format::Error),
#[error("Error querying: {0}")]
Query(#[from] influxdb_iox_client::flight::Error),
#[error("JSON Serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Manage IOx databases
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(subcommand)]
command: Command,
}
/// Create a new database
#[derive(Debug, clap::Parser)]
struct Create {
/// The name of the database
name: String,
/// Once the total amount of buffered data in memory reaches this size start
/// dropping data from memory based on the drop_order
#[clap(long, default_value = "52428800")] // 52428800 = 50*1024*1024
buffer_size_soft: usize,
/// Once the amount of data in memory reaches this size start
/// rejecting writes
#[clap(long, default_value = "104857600")] // 104857600 = 100*1024*1024
buffer_size_hard: usize,
/// Persists chunks to object storage.
#[clap(long = "skip-persist", parse(from_flag = std::ops::Not::not))]
persist: bool,
/// Do not allow writing new data to this database
#[clap(long)]
immutable: bool,
/// After how many transactions should IOx write a new checkpoint?
#[clap(long, default_value = "100", parse(try_from_str))]
catalog_transactions_until_checkpoint: NonZeroU64,
/// Prune catalog transactions older than the given age.
///
/// Keeping old transaction can be useful for debugging.
#[clap(long, default_value = "1d", parse(try_from_str = humantime::parse_duration))]
catalog_transaction_prune_age: Duration,
/// Once a partition hasn't received a write for this period of time,
/// it will be compacted and, if set, persisted. Writers will generally
/// have this amount of time to send late arriving writes or this could
/// be their clock skew.
#[clap(long, default_value = "300")]
late_arrive_window_seconds: u32,
/// Maximum number of rows before triggering persistence
#[clap(long, default_value = "100000")]
persist_row_threshold: u64,
/// Maximum age of a write before triggering persistence
#[clap(long, default_value = "1800")]
persist_age_threshold_seconds: u32,
/// Maximum number of rows to buffer in a MUB chunk before compacting it
#[clap(long, default_value = "100000")]
mub_row_threshold: u64,
/// Use up to this amount of space in bytes for caching Parquet files. A
/// value of zero disables Parquet file caching.
#[clap(long, default_value = "0")]
parquet_cache_limit: u64,
}
/// Get list of databases
#[derive(Debug, clap::Parser)]
struct List {
/// Whether to list detailed information about the databases along with their names.
#[clap(long)]
detailed: bool,
}
/// Return configuration of specific database
#[derive(Debug, clap::Parser)]
struct Get {
/// The name of the database
name: String,
/// If false, returns values for all fields, with defaults filled
/// in. If true, only returns values which were explicitly set on
/// database creation or update
#[clap(long)]
omit_defaults: bool,
}
/// Write data into the specified database
#[derive(Debug, clap::Parser)]
struct Write {
/// The name of the database
name: String,
/// File with data to load. Currently supported formats are .lp
file_name: PathBuf,
}
/// Query the data with SQL
#[derive(Debug, clap::Parser)]
struct Query {
/// The name of the database
name: String,
/// The query to run, in SQL format
query: String,
/// Optional format ('pretty', 'json', or 'csv')
#[clap(short, long, default_value = "pretty")]
format: String,
}
/// Release a database from its current server owner
#[derive(Debug, clap::Parser)]
struct Release {
/// The name of the database to release
name: String,
/// Optionally, the UUID of the database to delete. This must match the UUID of the current
/// database with the given name, or the release operation will result in an error.
#[clap(short, long)]
uuid: Option<Uuid>,
}
/// Claim an unowned database
#[derive(Debug, clap::Parser)]
struct Claim {
/// The UUID of the database to claim
uuid: Uuid,
/// Force this server to claim this database, even if it is
/// ostensibly owned by another server.
///
/// WARNING: ONLY do this if you are sure that no other servers
/// are writing to this database (for example, the data files have
/// been copied somewhere). If another server is currently writing
/// to this database, corruption will very likely occur
#[clap(long)]
force: bool,
}
/// Shutdown database
#[derive(Debug, clap::Parser)]
struct Shutdown {
/// The name of the database
name: String,
}
/// Restart database
#[derive(Debug, clap::Parser)]
struct Restart {
/// The name of the database
name: String,
/// Skip replay
#[clap(long)]
skip_replay: bool,
}
/// All possible subcommands for database
#[derive(Debug, clap::Parser)]
enum Command {
/// Write data into the specified database
Write(Write),
/// Query the data with SQL
Query(Query),
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::Write(write) => {
let mut client = write::Client::new(connection);
let mut file = File::open(&write.file_name).map_err(|e| Error::ReadingFile {
file_name: write.file_name.clone(),
source: e,
})?;
let mut lp_data = String::new();
file.read_to_string(&mut lp_data)
.map_err(|e| Error::ReadingFile {
file_name: write.file_name.clone(),
source: e,
})?;
let default_time = iox_time::SystemProvider::new().now().timestamp_nanos();
let lines_written = client.write_lp(write.name, lp_data, default_time).await?;
println!("{} Lines OK", lines_written);
}
Command::Query(query) => {
let mut client = flight::Client::new(connection);
let Query {
name,
format,
query,
} = query;
let format = QueryOutputFormat::from_str(&format)?;
let mut query_results = client
.perform_query(ReadInfo {
namespace_name: name,
sql_query: query,
})
.await?;
// It might be nice to do some sort of streaming write
// rather than buffering the whole thing.
let mut batches = vec![];
while let Some(data) = query_results.next().await? {
batches.push(data);
}
let formatted_result = format.format(&batches)?;
println!("{}", formatted_result);
}
}
Ok(())
}

View File

@ -1,12 +1,20 @@
use snafu::{ResultExt, Snafu};
use futures::Future;
use influxdb_iox_client::connection::Connection;
use snafu::prelude::*;
mod dump_catalog;
mod print_cpu;
mod schema;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(context(false))]
#[snafu(display("Error in dump-catalog subcommand: {}", source))]
DumpCatalogError { source: dump_catalog::Error },
#[snafu(context(false))]
#[snafu(display("Error in schema subcommand: {}", source))]
SchemaError { source: schema::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -25,16 +33,23 @@ enum Command {
/// Prints what CPU features are used by the compiler by default.
PrintCpu,
Schema(schema::Config),
}
pub async fn command(config: Config) -> Result<()> {
pub async fn command<C, CFut>(connection: C, config: Config) -> Result<()>
where
C: Send + FnOnce() -> CFut,
CFut: Send + Future<Output = Connection>,
{
match config.command {
Command::DumpCatalog(dump_catalog) => dump_catalog::command(*dump_catalog)
.await
.context(DumpCatalogSnafu),
Command::PrintCpu => {
print_cpu::main();
Ok(())
Command::DumpCatalog(dump_catalog) => dump_catalog::command(*dump_catalog).await?,
Command::PrintCpu => print_cpu::main(),
Command::Schema(config) => {
let connection = connection().await;
schema::command(connection, config).await?
}
}
Ok(())
}

View File

@ -0,0 +1,83 @@
use influxdb_iox_client::{
connection::Connection,
flight::{self, generated_types::ReadInfo},
format::QueryOutputFormat,
};
use std::str::FromStr;
use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
// #[error("Error reading file {:?}: {}", file_name, source)]
// ReadingFile {
// file_name: PathBuf,
// source: std::io::Error,
// },
#[error("Error formatting: {0}")]
FormattingError(#[from] influxdb_iox_client::format::Error),
#[error("Error querying: {0}")]
Query(#[from] influxdb_iox_client::flight::Error),
// #[error("Error in chunk subcommand: {0}")]
// Chunk(#[from] chunk::Error),
// #[error("Error in partition subcommand: {0}")]
// Partition(#[from] partition::Error),
// #[error("JSON Serialization error: {0}")]
// Serde(#[from] serde_json::Error),
// #[error("Error in partition subcommand: {0}")]
// Catalog(#[from] recover::Error),
// #[error("Client error: {0}")]
// ClientError(#[from] influxdb_iox_client::error::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Query the data with SQL
#[derive(Debug, clap::Parser)]
pub struct Config {
/// The name of the database
name: String,
/// The query to run, in SQL format
query: String,
/// Optional format ('pretty', 'json', or 'csv')
#[clap(short, long, default_value = "pretty")]
format: String,
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
let mut client = flight::Client::new(connection);
let Config {
name,
format,
query,
} = config;
let format = QueryOutputFormat::from_str(&format)?;
let mut query_results = client
.perform_query(ReadInfo {
namespace_name: name,
sql_query: query,
})
.await?;
// It might be nice to do some sort of streaming write
// rather than buffering the whole thing.
let mut batches = vec![];
while let Some(data) = query_results.next().await? {
batches.push(data);
}
let formatted_result = format.format(&batches)?;
println!("{}", formatted_result);
Ok(())
}

View File

@ -1,139 +0,0 @@
//! Implementation of command line option for running server
use std::sync::Arc;
use clap_blocks::run_config::RunConfig;
use data_types::boolean_flag::BooleanFlag;
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_database::{
setup::{make_application, make_server},
DatabaseServerType,
};
use thiserror::Error;
use super::main;
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
Run(#[from] main::Error),
#[error("Cannot setup server: {0}")]
Setup(#[from] ioxd_database::setup::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, clap::Parser)]
#[clap(
name = "run",
about = "Runs in database mode",
long_about = "Run the IOx database server.\n\nThe configuration options below can be \
set either with the command line flags or with the specified environment \
variable. If there is a file named '.env' in the current working directory, \
it is sourced before loading the configuration.
Configuration is loaded from the following sources (highest precedence first):
- command line arguments
- user set environment variables
- .env file contents
- pre-configured default values"
)]
pub struct Config {
#[clap(flatten)]
pub(crate) run_config: RunConfig,
/// The number of threads to use for all worker pools.
///
/// IOx uses a pool with `--num-threads` threads *each* for
/// 1. Handling API requests
/// 2. Running queries.
/// 3. Reorganizing data (e.g. compacting chunks)
///
/// If not specified, defaults to the number of cores on the system
#[clap(long = "--num-worker-threads", env = "INFLUXDB_IOX_NUM_WORKER_THREADS")]
pub(crate) num_worker_threads: Option<usize>,
// TODO(marco): Remove once the database-run-mode (aka the `server` crate) cannot handle routing anymore and we're
// fully migrated to the new router code.
/// When IOx nodes need to talk to remote peers they consult an internal remote address
/// mapping. This mapping is populated via API calls. If the mapping doesn't produce
/// a result, this config entry allows to generate a hostname from at template:
/// occurrences of the "{id}" substring will be replaced with the remote Server ID.
///
/// Example: http://node-{id}.ioxmydomain.com:8082
#[clap(long = "--remote-template", env = "INFLUXDB_IOX_REMOTE_TEMPLATE")]
pub remote_template: Option<String>,
/// Automatically wipe the preserved catalog on error
#[clap(
long = "--wipe-catalog-on-error",
env = "INFLUXDB_IOX_WIPE_CATALOG_ON_ERROR",
default_value = "no"
)]
pub wipe_catalog_on_error: BooleanFlag,
/// Skip replaying the write buffer and seek to high watermark instead.
#[clap(
long = "--skip-replay",
env = "INFLUXDB_IOX_SKIP_REPLAY",
default_value = "no"
)]
pub skip_replay_and_seek_instead: BooleanFlag,
/// Path to a configuration file to use for routing configuration, this will
/// disable dynamic configuration via `influxdata.iox.management.v1.ManagementService`
///
/// The config file should contain a JSON encoded `influxdata.iox.management.v1.ServerConfigFile`
#[clap(long = "--config-file", env = "INFLUXDB_IOX_CONFIG_FILE")]
pub config_file: Option<String>,
}
impl Config {
/// Get a reference to the config's run config.
pub fn run_config(&self) -> &RunConfig {
&self.run_config
}
/// Get a reference to the config's config file.
pub fn config_file(&self) -> Option<&String> {
self.config_file.as_ref()
}
}
pub async fn command(config: Config) -> Result<()> {
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let application = make_application(
config.run_config(),
config.config_file().cloned(),
config.num_worker_threads,
common_state.trace_collector(),
)
.await?;
let app_server = make_server(
Arc::clone(&application),
config.wipe_catalog_on_error.into(),
config.skip_replay_and_seek_instead.into(),
config.run_config(),
)?;
let server_type = Arc::new(DatabaseServerType::new(
Arc::clone(&application),
Arc::clone(&app_server),
&common_state,
config.config_file.is_some(),
));
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(main::main(
common_state,
services,
Arc::new(metric::Registry::default()),
)
.await?)
}

View File

@ -3,11 +3,9 @@ use trogging::cli::LoggingConfig;
mod all_in_one;
mod compactor;
mod database;
mod ingester;
mod main;
mod querier;
mod router;
mod router2;
mod test;
@ -17,15 +15,9 @@ pub enum Error {
#[snafu(display("Error in compactor subcommand: {}", source))]
CompactorError { source: compactor::Error },
#[snafu(display("Error in database subcommand: {}", source))]
DatabaseError { source: database::Error },
#[snafu(display("Error in querier subcommand: {}", source))]
QuerierError { source: querier::Error },
#[snafu(display("Error in router subcommand: {}", source))]
RouterError { source: router::Error },
#[snafu(display("Error in router2 subcommand: {}", source))]
Router2Error { source: router2::Error },
@ -43,27 +35,19 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, clap::Parser)]
pub struct Config {
// TODO(marco) remove this
/// Config for database mode, for backwards compatibility reasons.
#[clap(flatten)]
database_config: database::Config,
#[clap(subcommand)]
command: Option<Command>,
command: Command,
}
impl Config {
pub fn logging_config(&self) -> &LoggingConfig {
match &self.command {
None => self.database_config.run_config.logging_config(),
Some(Command::Compactor(config)) => config.run_config.logging_config(),
Some(Command::Database(config)) => config.run_config.logging_config(),
Some(Command::Querier(config)) => config.run_config.logging_config(),
Some(Command::Router(config)) => config.run_config.logging_config(),
Some(Command::Router2(config)) => config.run_config.logging_config(),
Some(Command::Ingester(config)) => config.run_config.logging_config(),
Some(Command::AllInOne(config)) => &config.logging_config,
Some(Command::Test(config)) => config.run_config.logging_config(),
Command::Compactor(config) => config.run_config.logging_config(),
Command::Querier(config) => config.run_config.logging_config(),
Command::Router2(config) => config.run_config.logging_config(),
Command::Ingester(config) => config.run_config.logging_config(),
Command::AllInOne(config) => &config.logging_config,
Command::Test(config) => config.run_config.logging_config(),
}
}
}
@ -73,15 +57,9 @@ enum Command {
/// Run the server in compactor mode
Compactor(compactor::Config),
/// Run the server in database mode (Deprecated)
Database(database::Config),
/// Run the server in querier mode
Querier(querier::Config),
/// Run the server in routing mode (Deprecated)
Router(router::Config),
/// Run the server in router2 mode
Router2(router2::Config),
@ -97,23 +75,11 @@ enum Command {
pub async fn command(config: Config) -> Result<()> {
match config.command {
None => {
println!(
"WARNING: Not specifying the run-mode is deprecated. Defaulting to 'database'."
);
database::command(config.database_config)
.await
.context(DatabaseSnafu)
}
Some(Command::Compactor(config)) => {
compactor::command(config).await.context(CompactorSnafu)
}
Some(Command::Database(config)) => database::command(config).await.context(DatabaseSnafu),
Some(Command::Querier(config)) => querier::command(config).await.context(QuerierSnafu),
Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu),
Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu),
Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu),
Some(Command::AllInOne(config)) => all_in_one::command(config).await.context(AllInOneSnafu),
Some(Command::Test(config)) => test::command(config).await.context(TestSnafu),
Command::Compactor(config) => compactor::command(config).await.context(CompactorSnafu),
Command::Querier(config) => querier::command(config).await.context(QuerierSnafu),
Command::Router2(config) => router2::command(config).await.context(Router2Snafu),
Command::Ingester(config) => ingester::command(config).await.context(IngesterSnafu),
Command::AllInOne(config) => all_in_one::command(config).await.context(AllInOneSnafu),
Command::Test(config) => test::command(config).await.context(TestSnafu),
}
}

View File

@ -1,168 +0,0 @@
//! Implementation of command line option for running server
use hashbrown::HashMap;
use std::sync::Arc;
use clap_blocks::run_config::RunConfig;
use data_types::router::Router as RouterConfig;
use generated_types::{google::FieldViolation, influxdata::iox::router::v1::RouterConfigFile};
use iox_time::SystemProvider;
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_router::RouterServerType;
use observability_deps::tracing::warn;
use router::{resolver::RemoteTemplate, server::RouterServer};
use thiserror::Error;
use super::main;
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
Run(#[from] main::Error),
#[error("Cannot setup server: {0}")]
Setup(#[from] ioxd_database::setup::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
#[error("error reading config file: {0}")]
ReadConfig(#[from] std::io::Error),
#[error("error decoding config file: {0}")]
DecodeConfig(#[from] serde_json::Error),
#[error("invalid config for router \"{0}\" in config file: {1}")]
InvalidRouterConfig(String, FieldViolation),
#[error("invalid router template \"{0}\" in config file: {1}")]
InvalidRouterTemplate(String, FieldViolation),
#[error("router template \"{template}\" not found for router \"{name}\"")]
TemplateNotFound { name: String, template: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, clap::Parser)]
#[clap(
name = "run",
about = "Runs in router mode",
long_about = "Run the IOx router server.\n\nThe configuration options below can be \
set either with the command line flags or with the specified environment \
variable. If there is a file named '.env' in the current working directory, \
it is sourced before loading the configuration.
Configuration is loaded from the following sources (highest precedence first):
- command line arguments
- user set environment variables
- .env file contents
- pre-configured default values"
)]
pub struct Config {
#[clap(flatten)]
pub(crate) run_config: RunConfig,
/// When IOx nodes need to talk to remote peers they consult an internal remote address
/// mapping. This mapping is populated via API calls. If the mapping doesn't produce
/// a result, this config entry allows to generate a hostname from at template:
/// occurrences of the "{id}" substring will be replaced with the remote Server ID.
///
/// Example: http://node-{id}.ioxmydomain.com:8082
#[clap(long = "--remote-template", env = "INFLUXDB_IOX_REMOTE_TEMPLATE")]
pub remote_template: Option<String>,
/// Path to a configuration file to use for routing configuration, this will
/// disable dynamic configuration via `influxdata.iox.router.v1.RouterService`
///
/// The config file should contain a JSON encoded `influxdata.iox.router.v1.RouterConfigFile`
#[clap(long = "--config-file", env = "INFLUXDB_IOX_CONFIG_FILE")]
pub config_file: Option<String>,
}
pub async fn command(config: Config) -> Result<()> {
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let remote_template = config.remote_template.map(RemoteTemplate::new);
let time_provider = Arc::new(SystemProvider::new());
let router_server = Arc::new(
RouterServer::new(
remote_template,
common_state.trace_collector(),
time_provider,
)
.await,
);
let config_immutable = match config.config_file {
Some(file) => {
let data = tokio::fs::read(file).await?;
let config: RouterConfigFile = serde_json::from_slice(data.as_slice())?;
for router in config.routers {
let name = router.name.clone();
let config = router
.try_into()
.map_err(|e| Error::InvalidRouterConfig(name, e))?;
router_server.update_router(config);
}
let templates = config
.templates
.into_iter()
.map(|router| {
let name = router.name.clone();
match router.try_into() {
Ok(router) => Ok((name, router)),
Err(e) => Err(Error::InvalidRouterTemplate(name, e)),
}
})
.collect::<Result<HashMap<String, RouterConfig>>>()?;
for instance in config.instances {
match templates.get(&instance.template) {
Some(template) => {
router_server.update_router(RouterConfig {
name: instance.name,
..template.clone()
});
}
None => {
return Err(Error::TemplateNotFound {
name: instance.name,
template: instance.template,
})
}
}
}
true
}
None => false,
};
if let Some(id) = config.run_config.server_id_config().server_id {
router_server
.set_server_id(id)
.expect("server id already set");
} else {
warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
}
let server_type = Arc::new(RouterServerType::new(
router_server,
&common_state,
config_immutable,
));
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(main::main(
common_state,
services,
Arc::new(metric::Registry::default()),
)
.await?)
}

View File

@ -0,0 +1,52 @@
use influxdb_iox_client::{connection::Connection, write};
use iox_time::TimeProvider;
use std::{fs::File, io::Read, path::PathBuf};
use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("Error reading file {:?}: {}", file_name, source)]
ReadingFile {
file_name: PathBuf,
source: std::io::Error,
},
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Write data into the specified database
#[derive(Debug, clap::Parser)]
pub struct Config {
/// The name of the database
name: String,
/// File with data to load. Currently supported formats are .lp
file_name: PathBuf,
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
let mut client = write::Client::new(connection);
let mut file = File::open(&config.file_name).map_err(|e| Error::ReadingFile {
file_name: config.file_name.clone(),
source: e,
})?;
let mut lp_data = String::new();
file.read_to_string(&mut lp_data)
.map_err(|e| Error::ReadingFile {
file_name: config.file_name.clone(),
source: e,
})?;
let default_time = iox_time::SystemProvider::new().now().timestamp_nanos();
let lines_written = client.write_lp(config.name, lp_data, default_time).await?;
println!("{} Lines OK", lines_written);
Ok(())
}

View File

@ -25,14 +25,14 @@ use tokio::runtime::Runtime;
mod commands {
pub mod catalog;
pub mod database;
pub mod debug;
pub mod query;
pub mod remote;
pub mod run;
pub mod schema;
pub mod sql;
pub mod storage;
pub mod tracing;
pub mod write;
}
enum ReturnCode {
@ -146,9 +146,6 @@ struct Config {
#[derive(Debug, clap::Parser)]
enum Command {
/// Database-related commands
Database(commands::database::Config),
/// Run the InfluxDB IOx server
// Clippy recommended boxing this variant because it's much larger than the others
Run(Box<commands::run::Config>),
@ -156,9 +153,6 @@ enum Command {
/// Commands to run against remote IOx APIs
Remote(commands::remote::Config),
/// IOx schema configuration commands
Schema(commands::schema::Config),
/// Start IOx interactive SQL REPL loop
Sql(commands::sql::Config),
@ -170,6 +164,12 @@ enum Command {
/// Initiate a read request to the gRPC storage service.
Storage(commands::storage::Config),
/// Write data into the specified database
Write(commands::write::Config),
/// Query the data with SQL
Query(commands::query::Config),
}
fn main() -> Result<(), std::io::Error> {
@ -227,14 +227,6 @@ fn main() -> Result<(), std::io::Error> {
}
match config.command {
Command::Database(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
let connection = connection().await;
if let Err(e) = commands::database::command(connection, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Remote(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
let connection = connection().await;
@ -251,14 +243,6 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Schema(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
let connection = connection().await;
if let Err(e) = commands::schema::command(connection, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Sql(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
let connection = connection().await;
@ -284,7 +268,23 @@ fn main() -> Result<(), std::io::Error> {
}
Command::Debug(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
if let Err(e) = commands::debug::command(config).await {
if let Err(e) = commands::debug::command(connection, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Write(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
let connection = connection().await;
if let Err(e) = commands::write::command(connection, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Query(config) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
let connection = connection().await;
if let Err(e) = commands::query::command(connection, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}

View File

@ -76,6 +76,7 @@ async fn ingester_schema_cli() {
.unwrap()
.arg("-h")
.arg(&router_addr)
.arg("debug")
.arg("schema")
.arg("get")
.arg(state.cluster().namespace())

View File

@ -1,65 +0,0 @@
[package]
name = "ioxd_database"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Workspace dependencies, in alphabetical order
clap_blocks = { path = "../clap_blocks" }
data_types = { path = "../data_types" }
db = { path = "../db" }
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
ioxd_common = { path = "../ioxd_common" }
job_registry = { path = "../job_registry" }
metric = { path = "../metric" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
query = { path = "../query" }
server = { path = "../server" }
service_common = { path = "../service_common" }
service_grpc_flight = { path = "../service_grpc_flight" }
service_grpc_influxrpc = { path = "../service_grpc_influxrpc" }
service_grpc_testing = { path = "../service_grpc_testing" }
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
tracker = { path = "../tracker" }
uuid = { version = "0.8", features = ["v4"] }
# Crates.io dependencies, in alphabetical order
arrow-flight = "13"
async-trait = "0.1"
bytes = "1.0"
futures = "0.3"
http = "0.2.7"
hyper = "0.14"
prost = "0.10"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.81"
serde_urlencoded = "0.7.0"
snafu = "0.7"
tokio = { version = "1.18", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7.0" }
tonic = "0.7"
tonic-health = "0.6.0"
tonic-reflection = "0.4.0"
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
# Workspace dependencies, in alphabetical order
arrow_util = { path = "../arrow_util" }
influxdb_storage_client = { path = "../influxdb_storage_client" }
test_helpers = { path = "../test_helpers" }
trace_exporters = { path = "../trace_exporters" }
schema = { path = "../schema" }
# Crates.io dependencies, in alphabetical order
arrow = "13"
clap = { version = "3", features = ["derive", "env"] }
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }

View File

@ -1,134 +0,0 @@
use generated_types::influxdata::iox::management;
use async_trait::async_trait;
use data_types::server_id::ServerId;
use generated_types::google::FieldViolation;
use generated_types::influxdata::iox::management::v1::OwnerInfo;
use server::{
config::{ConfigProvider, Result as ConfigResult},
rules::ProvidedDatabaseRules,
};
use snafu::{OptionExt, ResultExt, Snafu};
use uuid::Uuid;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("error fetching server config from file: {}", source))]
FetchBytes { source: std::io::Error },
#[snafu(display("error decoding server config from file: {}", source))]
Decode { source: serde_json::Error },
#[snafu(display("invalid database config: {}", source))]
InvalidDatabaseConfig { source: FieldViolation },
#[snafu(display("database with UUID {} not found in config file", uuid))]
DatabaseNotFound { uuid: Uuid },
#[snafu(display("database rules \"{}\" not found in config file", rules))]
DatabaseRulesNotFound { rules: String },
#[snafu(display("invalid UUID in server config file: {}", source))]
InvalidUUID { source: uuid::Error },
#[snafu(display("config is immutable"))]
ImmutableConfig,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
fn parse_uuid(uuid: &str) -> Result<Uuid> {
std::str::FromStr::from_str(uuid).context(InvalidUUIDSnafu)
}
/// A loader for [`ServerConfigFile`]
#[derive(Debug)]
pub struct ServerConfigFile {
path: String,
}
impl ServerConfigFile {
pub fn new(path: String) -> Self {
Self { path }
}
async fn load(&self) -> Result<management::v1::ServerConfigFile> {
let bytes = tokio::fs::read(&self.path).await.context(FetchBytesSnafu)?;
serde_json::from_slice(bytes.as_slice()).context(DecodeSnafu)
}
}
#[async_trait]
impl ConfigProvider for ServerConfigFile {
async fn fetch_server_config(&self, _server_id: ServerId) -> ConfigResult<Vec<(String, Uuid)>> {
let config = self.load().await?;
config
.databases
.into_iter()
.map(|config| Ok((config.name, parse_uuid(&config.uuid)?)))
.collect()
}
async fn store_server_config(
&self,
_server_id: ServerId,
_config: &[(String, Uuid)],
) -> ConfigResult<()> {
Err(Error::ImmutableConfig.into())
}
async fn fetch_rules(&self, uuid: Uuid) -> ConfigResult<ProvidedDatabaseRules> {
// We load the file each time to pick up changes
let server_config = self.load().await?;
let uuid_str = uuid.to_string();
// Lookup the database name and rules based on UUID
let config = server_config
.databases
.into_iter()
.find(|config| config.uuid == uuid_str)
.context(DatabaseNotFoundSnafu { uuid })?;
// Lookup the rules for this database
let rules = server_config
.rules
.into_iter()
.find(|r| r.name == config.rules)
.context(DatabaseRulesNotFoundSnafu {
rules: config.rules,
})?;
// Parse rules into [`ProvidedDatabaseRules`]
let rules = ProvidedDatabaseRules::new_rules(management::v1::DatabaseRules {
name: config.name,
..rules
})
.context(InvalidDatabaseConfigSnafu)?;
Ok(rules)
}
async fn store_rules(&self, _uuid: Uuid, _rules: &ProvidedDatabaseRules) -> ConfigResult<()> {
Err(Error::ImmutableConfig.into())
}
async fn fetch_owner_info(&self, server_id: ServerId, _uuid: Uuid) -> ConfigResult<OwnerInfo> {
Ok(OwnerInfo {
id: server_id.get_u32(),
location: "NONE".to_string(),
transactions: vec![],
})
}
async fn update_owner_info(
&self,
_server_id: Option<ServerId>,
_uuid: Uuid,
) -> ConfigResult<()> {
Err(Error::ImmutableConfig.into())
}
async fn create_owner_info(&self, _server_id: ServerId, _uuid: Uuid) -> ConfigResult<()> {
Err(Error::ImmutableConfig.into())
}
}

View File

@ -1,686 +0,0 @@
//! This module contains the HTTP api for InfluxDB IOx, including a
//! partial implementation of the /v2 HTTP api routes from InfluxDB
//! for compatibility.
//!
//! Note that these routes are designed to be just helpers for now,
//! and "close enough" to the real /v2 api to be able to test InfluxDB IOx
//! without needing to create and manage a mapping layer from name -->
//! id (this is done by other services in the influx cloud)
//!
//! Long term, we expect to create IOx specific api in terms of
//! database names and may remove this quasi /v2 API.
// Influx crates
use data_types::{names::OrgBucketMappingError, DatabaseName};
use influxdb_iox_client::format::QueryOutputFormat;
use query::{exec::ExecutionContextProvider, QueryDatabase};
use server::Error;
// External crates
use async_trait::async_trait;
use http::header::CONTENT_TYPE;
use hyper::{Body, Method, Request, Response};
use observability_deps::tracing::{debug, error};
use serde::Deserialize;
use snafu::{OptionExt, ResultExt, Snafu};
use dml::DmlOperation;
use ioxd_common::http::{
dml::{HttpDrivenDml, InnerDmlError, RequestOrResponse},
error::{HttpApiError, HttpApiErrorExt, HttpApiErrorSource},
metrics::LineProtocolMetrics,
};
use service_common::planner::Planner;
use std::{
fmt::Debug,
str::{self, FromStr},
sync::Arc,
};
use super::DatabaseServerType;
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Snafu)]
pub enum ApplicationError {
#[snafu(display("Internal error mapping org & bucket: {}", source))]
BucketMappingError { source: OrgBucketMappingError },
#[snafu(display("Internal error reading points from database {}: {}", db_name, source))]
Query {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Expected query string in request, but none was provided"))]
ExpectedQueryString {},
/// Error for when we could not parse the http query uri (e.g.
/// `?foo=bar&bar=baz)`
#[snafu(display("Invalid query string in HTTP URI '{}': {}", query_string, source))]
InvalidQueryString {
query_string: String,
source: serde_urlencoded::de::Error,
},
#[snafu(display("No handler for {:?} {}", method, path))]
RouteNotFound { method: Method, path: String },
#[snafu(display("Invalid database name: {}", source))]
DatabaseNameError {
source: data_types::DatabaseNameError,
},
#[snafu(display("Database {} not found", db_name))]
DatabaseNotFound { db_name: String },
#[snafu(display("Internal error creating HTTP response: {}", source))]
CreatingResponse { source: http::Error },
#[snafu(display("Invalid format '{}': : {}", format, source))]
ParsingFormat {
format: String,
source: influxdb_iox_client::format::Error,
},
#[snafu(display(
"Error formatting results of SQL query '{}' using '{:?}': {}",
q,
format,
source
))]
FormattingResult {
q: String,
format: QueryOutputFormat,
source: influxdb_iox_client::format::Error,
},
#[snafu(display("Error while planning query: {}", source))]
Planning {
source: service_common::planner::Error,
},
#[snafu(display("Server id not set"))]
ServerIdNotSet,
#[snafu(display("Server not initialized"))]
ServerNotInitialized,
#[snafu(display("Database {} not found", db_name))]
DatabaseNotInitialized { db_name: String },
#[snafu(display("Internal server error"))]
InternalServerError,
#[snafu(display("Cannot perform DML operation: {}", source))]
DmlError {
source: ioxd_common::http::dml::HttpDmlError,
},
}
type Result<T, E = ApplicationError> = std::result::Result<T, E>;
impl HttpApiErrorSource for ApplicationError {
fn to_http_api_error(&self) -> HttpApiError {
match self {
e @ Self::BucketMappingError { .. } => e.internal_error(),
e @ Self::Query { .. } => e.internal_error(),
e @ Self::ExpectedQueryString { .. } => e.invalid(),
e @ Self::InvalidQueryString { .. } => e.invalid(),
e @ Self::RouteNotFound { .. } => e.not_found(),
e @ Self::DatabaseNameError { .. } => e.invalid(),
e @ Self::DatabaseNotFound { .. } => e.not_found(),
e @ Self::CreatingResponse { .. } => e.internal_error(),
e @ Self::FormattingResult { .. } => e.internal_error(),
e @ Self::ParsingFormat { .. } => e.invalid(),
e @ Self::Planning { .. } => e.invalid(),
e @ Self::ServerIdNotSet => e.invalid(),
e @ Self::ServerNotInitialized => e.invalid(),
e @ Self::DatabaseNotInitialized { .. } => e.invalid(),
e @ Self::InternalServerError => e.internal_error(),
Self::DmlError { source } => source.to_http_api_error(),
}
}
}
impl From<server::Error> for ApplicationError {
fn from(e: Error) -> Self {
match e {
Error::IdNotSet => Self::ServerIdNotSet,
Error::ServerNotInitialized { .. } => Self::ServerNotInitialized,
Error::DatabaseNotInitialized { db_name } => Self::DatabaseNotInitialized { db_name },
Error::DatabaseNotFound { db_name } => Self::DatabaseNotFound { db_name },
Error::InvalidDatabaseName { source } => Self::DatabaseNameError { source },
e => {
error!(%e, "unexpected server error");
// Don't return potentially sensitive information in response
Self::InternalServerError
}
}
}
}
#[async_trait]
impl HttpDrivenDml for DatabaseServerType {
fn max_request_size(&self) -> usize {
self.max_request_size
}
fn lp_metrics(&self) -> Arc<LineProtocolMetrics> {
Arc::clone(&self.lp_metrics)
}
async fn write(
&self,
db_name: &DatabaseName<'_>,
op: DmlOperation,
) -> Result<(), InnerDmlError> {
let db = self
.server
.db(db_name)
.map_err(|_| InnerDmlError::DatabaseNotFound {
db_name: db_name.to_string(),
})?;
db.store_operation(&op)
.map_err(|e| InnerDmlError::UserError {
db_name: db_name.to_string(),
source: Box::new(e),
})
}
}
pub async fn route_request(
server_type: &DatabaseServerType,
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
match server_type
.route_dml_http_request(req)
.await
.context(DmlSnafu)?
{
RequestOrResponse::Response(resp) => Ok(resp),
RequestOrResponse::Request(req) => {
let method = req.method().clone();
let uri = req.uri().clone();
match (method.clone(), uri.path()) {
(Method::GET, "/api/v3/query") => query(req, server_type).await,
(method, path) => Err(ApplicationError::RouteNotFound {
method,
path: path.to_string(),
}),
}
}
}
}
#[derive(Deserialize, Debug, PartialEq)]
/// Parsed URI Parameters of the request to the .../query endpoint
struct QueryParams {
#[serde(alias = "database")]
d: String,
#[serde(alias = "query")]
q: String,
#[serde(default = "default_format")]
format: String,
}
fn default_format() -> String {
QueryOutputFormat::default().to_string()
}
async fn query(
req: Request<Body>,
server_type: &DatabaseServerType,
) -> Result<Response<Body>, ApplicationError> {
let server = &server_type.server;
let uri_query = req.uri().query().context(ExpectedQueryStringSnafu {})?;
let QueryParams { d, q, format } =
serde_urlencoded::from_str(uri_query).context(InvalidQueryStringSnafu {
query_string: uri_query,
})?;
let format = QueryOutputFormat::from_str(&format).context(ParsingFormatSnafu { format })?;
let db_name = DatabaseName::new(&d).context(DatabaseNameSnafu)?;
debug!(uri = ?req.uri(), %q, ?format, %db_name, "running SQL query");
let db = server.db(&db_name)?;
let ctx = db.new_query_context(req.extensions().get().cloned());
let mut query_completed_token = db.record_query(&ctx, "sql", Box::new(q.clone()));
let physical_plan = Planner::new(&ctx).sql(&q).await.context(PlanningSnafu)?;
// TODO: stream read results out rather than rendering the
// whole thing in mem
let batches = ctx
.collect(physical_plan)
.await
.map_err(|e| Box::new(e) as _)
.context(QuerySnafu { db_name })?;
let results = format
.format(&batches)
.context(FormattingResultSnafu { q, format })?;
let body = Body::from(results.into_bytes());
let response = Response::builder()
.header(CONTENT_TYPE, format.content_type())
.body(body)
.context(CreatingResponseSnafu)?;
query_completed_token.set_success();
Ok(response)
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::record_batch::RecordBatch;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
use db::Db;
use dml::DmlWrite;
use http::StatusCode;
use ioxd_common::http::{
dml::test_utils::{
assert_delete_bad_request, assert_delete_unknown_database, assert_delete_unknown_table,
assert_gzip_write, assert_write, assert_write_metrics, assert_write_precision,
assert_write_to_invalid_database,
},
test_utils::{
assert_health, assert_metrics, assert_tracing, check_response, get_content_type,
TestServer,
},
};
use ioxd_common::server_type::CommonServerState;
use object_store::ObjectStoreImpl;
use reqwest::Client;
use schema::selection::Selection;
use server::{rules::ProvidedDatabaseRules, ApplicationState, Server};
use std::convert::TryFrom;
use trace::RingBufferTraceCollector;
fn make_application() -> Arc<ApplicationState> {
Arc::new(ApplicationState::new(
Arc::new(ObjectStoreImpl::new_in_memory()),
None,
Some(Arc::new(RingBufferTraceCollector::new(5))),
None,
))
}
fn make_server(application: Arc<ApplicationState>) -> Arc<Server> {
Arc::new(Server::new(application, Default::default()))
}
#[tokio::test]
async fn test_health() {
assert_health(setup_server().await).await;
}
#[tokio::test]
async fn test_metrics() {
assert_metrics(setup_server().await).await;
}
#[tokio::test]
async fn test_tracing() {
assert_tracing(setup_server().await).await;
}
async fn assert_dbwrite(test_server: TestServer<DatabaseServerType>, write: DmlWrite) {
assert_dbwrites(test_server, &[write]).await
}
async fn assert_dbwrites(test_server: TestServer<DatabaseServerType>, writes: &[DmlWrite]) {
let expected_batches: Vec<_> = writes.iter().flat_map(|w| w.tables()).collect();
let table = expected_batches.first().unwrap().0;
assert!(expected_batches.iter().all(|(t, _)| *t == table));
let test_db = test_server
.server_type()
.server
.db(&DatabaseName::new("MyOrg_MyBucket").unwrap())
.expect("Database exists");
let actual_batches = run_query(test_db, &format!("select * from {}", table)).await;
let records: Vec<_> = expected_batches
.into_iter()
.map(|(_, batch)| batch.to_arrow(Selection::All).unwrap())
.collect();
let expected = arrow_util::display::pretty_format_batches(&records).unwrap();
let expected: Vec<_> = expected.split('\n').collect();
assert_batches_sorted_eq!(expected, &actual_batches);
}
#[tokio::test]
async fn test_write() {
let test_server = setup_server().await;
let write = assert_write(&test_server).await;
assert_dbwrite(test_server, write).await;
}
#[tokio::test]
async fn test_write_metrics() {
assert_write_metrics(setup_server().await, true).await;
}
#[tokio::test]
async fn test_write_precision() {
let test_server = setup_server().await;
let writes = assert_write_precision(&test_server).await;
assert_dbwrites(test_server, &writes).await;
}
#[tokio::test]
async fn test_gzip_write() {
let test_server = setup_server().await;
let write = assert_gzip_write(&test_server).await;
assert_dbwrite(test_server, write).await;
}
#[tokio::test]
async fn write_to_invalid_database() {
assert_write_to_invalid_database(setup_server().await).await;
}
#[tokio::test]
async fn test_delete() {
// Set up server
let test_server = setup_server().await;
// Set up client
let client = Client::new();
let bucket_name = "MyBucket";
let org_name = "MyOrg";
// Client requests delete something from an empty DB
let delete_line = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z", "predicate":"host=\"Orient.local\""}"#;
let response = client
.post(&format!(
"{}/api/v2/delete?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
))
.body(delete_line)
.send()
.await;
check_response("delete", response, StatusCode::NO_CONTENT, Some("")).await;
// Client writes data to the server
let lp_data = r#"h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1617286224000000000
h2o_temperature,location=Boston,state=MA surface_degrees=47.5,bottom_degrees=35 1617286224000000123"#;
let response = client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
))
.body(lp_data)
.send()
.await;
check_response("write", response, StatusCode::NO_CONTENT, Some("")).await;
// Check that the data got into the right bucket
let test_db = test_server
.server_type()
.server
.db(&DatabaseName::new("MyOrg_MyBucket").unwrap())
.expect("Database exists");
let batches = run_query(
Arc::clone(&test_db),
"select * from h2o_temperature order by location",
)
.await;
let expected = vec![
"+----------------+--------------+-------+-----------------+--------------------------------+",
"| bottom_degrees | location | state | surface_degrees | time |",
"+----------------+--------------+-------+-----------------+--------------------------------+",
"| 35 | Boston | MA | 47.5 | 2021-04-01T14:10:24.000000123Z |",
"| 50.4 | santa_monica | CA | 65.2 | 2021-04-01T14:10:24Z |",
"+----------------+--------------+-------+-----------------+--------------------------------+",
];
assert_batches_eq!(expected, &batches);
// Now delete something
let delete_line = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"location=Boston"}"#;
let response = client
.post(&format!(
"{}/api/v2/delete?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
))
.body(delete_line)
.send()
.await;
check_response("delete", response, StatusCode::NO_CONTENT, Some("")).await;
// query again and should not get the deleted data
let batches = run_query(test_db, "select * from h2o_temperature").await;
let expected = vec![
"+----------------+--------------+-------+-----------------+----------------------+",
"| bottom_degrees | location | state | surface_degrees | time |",
"+----------------+--------------+-------+-----------------+----------------------+",
"| 50.4 | santa_monica | CA | 65.2 | 2021-04-01T14:10:24Z |",
"+----------------+--------------+-------+-----------------+----------------------+",
];
assert_batches_eq!(expected, &batches);
}
#[tokio::test]
async fn test_delete_unknown_database() {
assert_delete_unknown_database(setup_server().await).await;
}
#[tokio::test]
async fn test_delete_unknown_table() {
assert_delete_unknown_table(setup_server().await).await;
}
#[tokio::test]
async fn test_delete_bad_request() {
assert_delete_bad_request(setup_server().await).await;
}
/// Sets up a test database with some data for testing the query endpoint
/// returns a client for communicating with the server, and the server
/// endpoint
async fn setup_test_data() -> (Client, TestServer<DatabaseServerType>) {
let test_server = setup_server().await;
let client = Client::new();
let lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1617286224000000000";
// send write data
let bucket_name = "MyBucket";
let org_name = "MyOrg";
let response = client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
))
.body(lp_data)
.send()
.await;
check_response("write", response, StatusCode::NO_CONTENT, Some("")).await;
(client, test_server)
}
#[tokio::test]
async fn test_query_pretty() {
let (client, test_server) = setup_test_data().await;
// send query data
let response = client
.get(&format!(
"{}/api/v3/query?d=MyOrg_MyBucket&q={}",
test_server.url(),
"select%20*%20from%20h2o_temperature"
))
.send()
.await;
assert_eq!(get_content_type(&response), "text/plain");
let expected = r#"+----------------+--------------+-------+-----------------+----------------------+
| bottom_degrees | location | state | surface_degrees | time |
+----------------+--------------+-------+-----------------+----------------------+
| 50.4 | santa_monica | CA | 65.2 | 2021-04-01T14:10:24Z |
+----------------+--------------+-------+-----------------+----------------------+"#;
check_response("query", response, StatusCode::OK, Some(expected)).await;
// same response is expected if we explicitly request 'format=pretty'
let response = client
.get(&format!(
"{}/api/v3/query?d=MyOrg_MyBucket&q={}&format=pretty",
test_server.url(),
"select%20*%20from%20h2o_temperature"
))
.send()
.await;
assert_eq!(get_content_type(&response), "text/plain");
check_response("query", response, StatusCode::OK, Some(expected)).await;
}
#[tokio::test]
async fn test_query_csv() {
let (client, test_server) = setup_test_data().await;
// send query data
let response = client
.get(&format!(
"{}/api/v3/query?d=MyOrg_MyBucket&q={}&format=csv",
test_server.url(),
"select%20*%20from%20h2o_temperature"
))
.send()
.await;
assert_eq!(get_content_type(&response), "text/csv");
let res = "bottom_degrees,location,state,surface_degrees,time\n\
50.4,santa_monica,CA,65.2,2021-04-01T14:10:24.000000000\n";
check_response("query", response, StatusCode::OK, Some(res)).await;
}
#[tokio::test]
async fn test_query_json() {
let (client, test_server) = setup_test_data().await;
// send a second line of data to demonstrate how that works
let lp_data =
"h2o_temperature,location=Boston,state=MA surface_degrees=50.2 1617286224000000000";
// send write data
let bucket_name = "MyBucket";
let org_name = "MyOrg";
let response = client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
))
.body(lp_data)
.send()
.await;
check_response("write", response, StatusCode::NO_CONTENT, Some("")).await;
// send query data
let response = client
.get(&format!(
"{}/api/v3/query?d=MyOrg_MyBucket&q={}&format=json",
test_server.url(),
"select%20*%20from%20h2o_temperature%20order%20by%20surface_degrees"
))
.send()
.await;
assert_eq!(get_content_type(&response), "application/json");
// Note two json records: one record on each line
let res = r#"[{"location":"Boston","state":"MA","surface_degrees":50.2,"time":"2021-04-01 14:10:24"},{"bottom_degrees":50.4,"location":"santa_monica","state":"CA","surface_degrees":65.2,"time":"2021-04-01 14:10:24"}]"#;
check_response("query", response, StatusCode::OK, Some(res)).await;
}
#[tokio::test]
async fn test_query_invalid_name() {
let (client, test_server) = setup_test_data().await;
// send query data
let response = client
.get(&format!(
"{}/api/v3/query?d=&q={}",
test_server.url(),
"select%20*%20from%20h2o_temperature%20order%20by%20surface_degrees"
))
.send()
.await;
check_response(
"query",
response,
StatusCode::BAD_REQUEST,
Some(r#"{"code":"invalid","message":"Invalid database name: Database name length must be between 1 and 64 characters"}"#),
)
.await;
}
/// Run the specified SQL query and return formatted results as a string
async fn run_query(db: Arc<Db>, query: &str) -> Vec<RecordBatch> {
let ctx = db.new_query_context(None);
let physical_plan = Planner::new(&ctx).sql(query).await.unwrap();
ctx.collect(physical_plan).await.unwrap()
}
/// return a test server and the url to contact it for `MyOrg_MyBucket`
async fn setup_server() -> TestServer<DatabaseServerType> {
let application = make_application();
let app_server = make_server(Arc::clone(&application));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.wait_for_init().await.unwrap();
app_server
.create_database(make_rules("MyOrg_MyBucket"))
.await
.unwrap();
let server_type = DatabaseServerType::new(
application,
app_server,
&CommonServerState::for_testing(),
false,
);
TestServer::new(Arc::new(server_type))
}
fn make_rules(db_name: impl Into<String>) -> ProvidedDatabaseRules {
let db_name = DatabaseName::new(db_name.into()).unwrap();
ProvidedDatabaseRules::new_rules(DatabaseRules::new(db_name).into())
.expect("Tests should create valid DatabaseRules")
}
}

View File

@ -1,105 +0,0 @@
use async_trait::async_trait;
use futures::{future::FusedFuture, FutureExt};
use hyper::{Body, Request, Response};
use ioxd_common::{
http::{error::HttpApiErrorSource, metrics::LineProtocolMetrics},
rpc::RpcBuilderInput,
server_type::{CommonServerState, RpcError, ServerType},
};
use metric::Registry;
use observability_deps::tracing::{error, info};
use server::{ApplicationState, Server};
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
mod config;
mod http;
mod rpc;
pub mod setup;
pub use self::http::ApplicationError;
#[derive(Debug)]
pub struct DatabaseServerType {
pub application: Arc<ApplicationState>,
pub server: Arc<Server>,
pub lp_metrics: Arc<LineProtocolMetrics>,
pub max_request_size: usize,
config_immutable: bool,
shutdown: CancellationToken,
}
impl DatabaseServerType {
pub fn new(
application: Arc<ApplicationState>,
server: Arc<Server>,
common_state: &CommonServerState,
config_immutable: bool,
) -> Self {
let lp_metrics = Arc::new(LineProtocolMetrics::new(
application.metric_registry().as_ref(),
));
Self {
application,
server,
lp_metrics,
config_immutable,
max_request_size: common_state.run_config().max_http_request_size,
shutdown: CancellationToken::new(),
}
}
}
#[async_trait]
impl ServerType for DatabaseServerType {
fn metric_registry(&self) -> Arc<Registry> {
Arc::clone(self.application.metric_registry())
}
fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>> {
self.application.trace_collector().clone()
}
async fn route_http_request(
&self,
req: Request<Body>,
) -> Result<Response<Body>, Box<dyn HttpApiErrorSource>> {
self::http::route_request(self, req)
.await
.map_err(|e| Box::new(e) as _)
}
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
self::rpc::server_grpc(self, builder_input).await
}
async fn join(self: Arc<Self>) {
let server_worker = self.server.join().fuse();
futures::pin_mut!(server_worker);
futures::select! {
_ = server_worker => {},
_ = self.shutdown.cancelled().fuse() => {},
}
self.server.shutdown();
if !server_worker.is_terminated() {
match server_worker.await {
Ok(_) => info!("server worker shutdown"),
Err(error) => error!(%error, "server worker error"),
}
}
info!("server completed shutting down");
self.application.join().await;
info!("shared application state completed shutting down");
}
fn shutdown(&self) {
self.shutdown.cancel();
}
}

View File

@ -1,54 +0,0 @@
use std::sync::Arc;
use data_types::non_empty::NonEmptyString;
use data_types::DatabaseName;
use dml::{DmlDelete, DmlMeta, DmlOperation};
use generated_types::google::{FieldViolationExt, FromOptionalField, OptionalField};
use generated_types::influxdata::iox::delete::v1::*;
use server::Server;
use tonic::Response;
struct DeleteService {
server: Arc<Server>,
}
use super::error::{default_dml_error_handler, default_server_error_handler};
#[tonic::async_trait]
impl delete_service_server::DeleteService for DeleteService {
async fn delete(
&self,
request: tonic::Request<DeleteRequest>,
) -> Result<tonic::Response<DeleteResponse>, tonic::Status> {
let span_ctx = request.extensions().get().cloned();
let DeleteRequest { payload } = request.into_inner();
let DeletePayload {
db_name,
table_name,
predicate,
} = payload.unwrap_field("payload")?;
let predicate = predicate.required("predicate")?;
let table_name = NonEmptyString::new(table_name);
let meta = DmlMeta::unsequenced(span_ctx);
let delete = DmlDelete::new(&db_name, predicate, table_name, meta);
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
db.store_operation(&DmlOperation::Delete(delete))
.map_err(default_dml_error_handler)?;
Ok(Response::new(DeleteResponse {}))
}
}
pub fn make_server(
server: Arc<Server>,
) -> delete_service_server::DeleteServiceServer<impl delete_service_server::DeleteService> {
delete_service_server::DeleteServiceServer::new(DeleteService { server })
}

View File

@ -1,56 +0,0 @@
use data_types::server_id::ServerId;
use generated_types::google::ResourceType;
use generated_types::{
google::{FieldViolation, NotFound},
influxdata::iox::deployment::v1::*,
};
use server::{Error, Server};
use std::{convert::TryFrom, sync::Arc};
use tonic::{Request, Response, Status};
struct DeploymentService {
server: Arc<Server>,
}
use super::error::default_server_error_handler;
#[tonic::async_trait]
impl deployment_service_server::DeploymentService for DeploymentService {
async fn get_server_id(
&self,
_: Request<GetServerIdRequest>,
) -> Result<Response<GetServerIdResponse>, Status> {
match self.server.server_id() {
Some(id) => Ok(Response::new(GetServerIdResponse { id: id.get_u32() })),
None => return Err(NotFound::new(ResourceType::ServerId, Default::default()).into()),
}
}
async fn update_server_id(
&self,
request: Request<UpdateServerIdRequest>,
) -> Result<Response<UpdateServerIdResponse>, Status> {
let id =
ServerId::try_from(request.get_ref().id).map_err(|_| FieldViolation::required("id"))?;
match self.server.set_id(id) {
Ok(_) => Ok(Response::new(UpdateServerIdResponse {})),
Err(e @ Error::IdAlreadySet) => {
return Err(FieldViolation {
field: "id".to_string(),
description: e.to_string(),
}
.into())
}
Err(e) => Err(default_server_error_handler(e)),
}
}
}
pub fn make_server(
server: Arc<Server>,
) -> deployment_service_server::DeploymentServiceServer<
impl deployment_service_server::DeploymentService,
> {
deployment_service_server::DeploymentServiceServer::new(DeploymentService { server })
}

View File

@ -1,157 +0,0 @@
use generated_types::google::{
AlreadyExists, FieldViolation, InternalError, NotFound, PreconditionViolation, QuotaFailure,
ResourceType,
};
use observability_deps::tracing::error;
/// map common [`server::Error`] errors to the appropriate tonic Status
pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
use server::{DatabaseNameFromRulesError, Error};
match error {
Error::IdNotSet => PreconditionViolation::ServerIdNotSet.into(),
Error::DatabaseNotInitialized { db_name } => PreconditionViolation::DatabaseInvalidState(
format!("Database ({}) is not yet initialized", db_name),
)
.into(),
Error::DatabaseAlreadyExists { db_name } => {
AlreadyExists::new(ResourceType::Database, db_name).into()
}
Error::ServerNotInitialized { server_id } => {
PreconditionViolation::ServerInvalidState(format!(
"Server ID is set ({}) but server is not yet initialized (e.g. DBs and remotes \
are not loaded). Server is not yet ready to read/write data.",
server_id
))
.into()
}
Error::DatabaseNotFound { db_name } => {
NotFound::new(ResourceType::Database, db_name).into()
}
Error::DatabaseUuidNotFound { uuid } => {
NotFound::new(ResourceType::DatabaseUuid, uuid.to_string()).into()
}
Error::InvalidDatabaseName { source } => FieldViolation {
field: "db_name".into(),
description: source.to_string(),
}
.into(),
Error::WipePreservedCatalog { source } => default_database_error_handler(source),
Error::DatabaseInit { source } => {
tonic::Status::invalid_argument(format!("Cannot initialize database: {}", source))
}
Error::DatabaseAlreadyOwnedByThisServer { uuid } => {
AlreadyExists::new(ResourceType::DatabaseUuid, uuid.to_string()).into()
}
Error::UuidMismatch { .. } | Error::CannotClaimDatabase { .. } => {
tonic::Status::invalid_argument(error.to_string())
}
Error::CouldNotGetDatabaseNameFromRules {
source: DatabaseNameFromRulesError::DatabaseRulesNotFound { uuid, .. },
} => NotFound::new(ResourceType::DatabaseUuid, uuid.to_string()).into(),
error => {
error!(?error, "Unexpected error");
InternalError {}.into()
}
}
}
/// map common [`catalog::Error`](db::catalog::Error) errors to the appropriate tonic Status
pub fn default_catalog_error_handler(error: db::catalog::Error) -> tonic::Status {
use db::catalog::Error;
match error {
Error::TableNotFound { table } => NotFound::new(ResourceType::Table, table).into(),
Error::PartitionNotFound { partition, table } => {
NotFound::new(ResourceType::Partition, format!("{}:{}", table, partition)).into()
}
Error::ChunkNotFound {
chunk_id,
partition,
table,
} => NotFound::new(
ResourceType::Chunk,
format!("{}:{}:{}", table, partition, chunk_id),
)
.into(),
}
}
/// map common [`database::Error`](server::database::Error) errors to the appropriate tonic Status
pub fn default_database_error_handler(error: server::database::Error) -> tonic::Status {
use server::database::Error;
match error {
Error::InvalidState { .. } => {
PreconditionViolation::DatabaseInvalidState(error.to_string()).into()
}
Error::RulesNotUpdateable { .. } => {
PreconditionViolation::DatabaseInvalidState(error.to_string()).into()
}
Error::WipePreservedCatalog { source, .. } => {
error!(%source, "Unexpected error while wiping catalog");
InternalError {}.into()
}
Error::InvalidStateForRebuild { .. } => {
PreconditionViolation::DatabaseInvalidState(error.to_string()).into()
}
Error::UnexpectedTransitionForRebuild { .. } => {
error!(%error, "Unexpected error during rebuild catalog");
InternalError {}.into()
}
Error::RebuildPreservedCatalog { source, .. } => {
error!(%source, "Unexpected error while rebuilding catalog");
InternalError {}.into()
}
Error::CannotPersistUpdatedRules { source } => {
error!(%source, "Unexpected error persisting database rules");
InternalError {}.into()
}
Error::SkipReplay { source, .. } => {
error!(%source, "Unexpected error skipping replay");
InternalError {}.into()
}
Error::CannotReleaseUnowned { .. } => tonic::Status::failed_precondition(error.to_string()),
Error::CannotRelease { source, .. } => {
error!(%source, "Unexpected error releasing database");
InternalError {}.into()
}
}
}
/// map common [`db::Error`](db::Error) errors to the appropriate tonic Status
pub fn default_db_error_handler(error: db::Error) -> tonic::Status {
use db::Error;
match error {
Error::LifecycleError { source } => PreconditionViolation::ChunkInvalidState(format!(
"Cannot perform operation due to wrong chunk lifecycle: {}",
source
))
.into(),
Error::CannotFlushPartition {
table_name,
partition_key,
} => PreconditionViolation::PartitionInvalidState(format!(
"Cannot persist partition because it cannot be flushed at the moment: {}:{}",
table_name, partition_key
))
.into(),
Error::CatalogError { source } => default_catalog_error_handler(source),
error => {
error!(?error, "Unexpected error");
InternalError {}.into()
}
}
}
/// map common [`db::DmlError`](db::DmlError) errors to the appropriate tonic Status
pub fn default_dml_error_handler(error: db::DmlError) -> tonic::Status {
use db::DmlError;
match error {
DmlError::HardLimitReached {} => QuotaFailure {
subject: "influxdata.com/iox/buffer".to_string(),
description: "hard buffer limit reached".to_string(),
}
.into(),
e => tonic::Status::invalid_argument(e.to_string()),
}
}

View File

@ -1,674 +0,0 @@
use data_types::error::ErrorLogger;
use data_types::{chunk_metadata::ChunkId, DatabaseName};
use generated_types::google::PreconditionViolation;
use generated_types::{
google::{FieldViolation, FieldViolationExt},
influxdata::iox::management::v1::{Error as ProtobufError, *},
};
use server::{rules::ProvidedDatabaseRules, ApplicationState, Server};
use std::{convert::TryFrom, sync::Arc};
use tonic::{Request, Response, Status};
use uuid::Uuid;
struct ManagementService {
application: Arc<ApplicationState>,
server: Arc<Server>,
config_immutable: bool,
}
use super::error::{
default_database_error_handler, default_db_error_handler, default_server_error_handler,
};
#[tonic::async_trait]
impl management_service_server::ManagementService for ManagementService {
async fn list_databases(
&self,
request: Request<ListDatabasesRequest>,
) -> Result<Response<ListDatabasesResponse>, Status> {
let ListDatabasesRequest { omit_defaults } = request.into_inner();
let rules = self
.server
.databases()
.map_err(default_server_error_handler)?
.into_iter()
.filter_map(|db| db.provided_rules())
.map(|rules| format_rules(rules, omit_defaults))
.collect::<Vec<_>>();
Ok(Response::new(ListDatabasesResponse { rules }))
}
async fn get_database(
&self,
request: Request<GetDatabaseRequest>,
) -> Result<Response<GetDatabaseResponse>, Status> {
let GetDatabaseRequest {
name,
omit_defaults,
} = request.into_inner();
let name = DatabaseName::new(name).scope("name")?;
let database = self
.server
.database(&name)
.map_err(default_server_error_handler)?;
let rules = database
.provided_rules()
.map(|rules| format_rules(rules, omit_defaults))
.ok_or_else(|| {
tonic::Status::unavailable(format!(
"Rules have not yet been loaded for database ({})",
name
))
})?;
Ok(Response::new(GetDatabaseResponse { rules: Some(rules) }))
}
async fn create_database(
&self,
request: Request<CreateDatabaseRequest>,
) -> Result<Response<CreateDatabaseResponse>, Status> {
if self.config_immutable {
return Err(PreconditionViolation::DatabaseConfigImmutable.into());
}
let rules: DatabaseRules = request
.into_inner()
.rules
.ok_or_else(|| FieldViolation::required("rules"))?;
let provided_rules =
ProvidedDatabaseRules::new_rules(rules).map_err(|e| e.scope("rules"))?;
let database = self
.server
.create_database(provided_rules)
.await
.map_err(default_server_error_handler)?;
Ok(Response::new(CreateDatabaseResponse {
uuid: database.uuid().as_bytes().to_vec(),
}))
}
async fn update_database(
&self,
request: Request<UpdateDatabaseRequest>,
) -> Result<Response<UpdateDatabaseResponse>, Status> {
if self.config_immutable {
return Err(PreconditionViolation::DatabaseConfigImmutable.into());
}
let rules: DatabaseRules = request
.into_inner()
.rules
.ok_or_else(|| FieldViolation::required("rules"))?;
let provided_rules =
ProvidedDatabaseRules::new_rules(rules).map_err(|e| e.scope("rules"))?;
let updated_rules = self
.server
.update_db_rules(provided_rules)
.await
.map_err(default_server_error_handler)?;
Ok(Response::new(UpdateDatabaseResponse {
rules: Some(updated_rules.rules().as_ref().clone().into()),
}))
}
async fn release_database(
&self,
request: Request<ReleaseDatabaseRequest>,
) -> Result<Response<ReleaseDatabaseResponse>, Status> {
if self.config_immutable {
return Err(PreconditionViolation::DatabaseConfigImmutable.into());
}
let ReleaseDatabaseRequest { db_name, uuid } = request.into_inner();
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let uuid = if uuid.is_empty() {
None
} else {
Some(Uuid::from_slice(&uuid).scope("uuid")?)
};
let returned_uuid = self
.server
.release_database(&db_name, uuid)
.await
.map_err(default_server_error_handler)?;
Ok(Response::new(ReleaseDatabaseResponse {
uuid: returned_uuid.as_bytes().to_vec(),
}))
}
async fn claim_database(
&self,
request: Request<ClaimDatabaseRequest>,
) -> Result<Response<ClaimDatabaseResponse>, Status> {
if self.config_immutable {
return Err(PreconditionViolation::DatabaseConfigImmutable.into());
}
let ClaimDatabaseRequest { uuid, force } = request.into_inner();
let uuid = Uuid::from_slice(&uuid).scope("uuid")?;
let db_name = self
.server
.claim_database(uuid, force)
.await
.map_err(default_server_error_handler)?;
Ok(Response::new(ClaimDatabaseResponse {
db_name: db_name.to_string(),
}))
}
async fn list_chunks(
&self,
request: Request<ListChunksRequest>,
) -> Result<Response<ListChunksResponse>, Status> {
let db_name = DatabaseName::new(request.into_inner().db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
let chunks: Vec<Chunk> = db
.chunk_summaries()
.into_iter()
.map(|summary| summary.into())
.collect();
Ok(Response::new(ListChunksResponse { chunks }))
}
async fn create_dummy_job(
&self,
request: Request<CreateDummyJobRequest>,
) -> Result<Response<CreateDummyJobResponse>, Status> {
let request = request.into_inner();
let tracker = self
.application
.job_registry()
.spawn_dummy_job(request.nanos, None);
let operation = Some(super::operations::encode_tracker(tracker)?);
Ok(Response::new(CreateDummyJobResponse { operation }))
}
async fn list_partitions(
&self,
request: Request<ListPartitionsRequest>,
) -> Result<Response<ListPartitionsResponse>, Status> {
let ListPartitionsRequest { db_name } = request.into_inner();
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
let partitions = db
.partition_addrs()
.into_iter()
.map(|addr| Partition {
table_name: addr.table_name.to_string(),
key: addr.partition_key.to_string(),
})
.collect::<Vec<_>>();
Ok(Response::new(ListPartitionsResponse { partitions }))
}
async fn get_partition(
&self,
request: Request<GetPartitionRequest>,
) -> Result<Response<GetPartitionResponse>, Status> {
let GetPartitionRequest {
db_name,
partition_key,
} = request.into_inner();
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
// TODO: get more actual partition details
let partition = db
.partition_addrs()
.iter()
.find(|addr| addr.partition_key.as_ref() == partition_key)
.map(|addr| Partition {
table_name: addr.table_name.to_string(),
key: addr.partition_key.to_string(),
});
Ok(Response::new(GetPartitionResponse { partition }))
}
async fn list_partition_chunks(
&self,
request: Request<ListPartitionChunksRequest>,
) -> Result<Response<ListPartitionChunksResponse>, Status> {
let ListPartitionChunksRequest {
db_name,
partition_key,
} = request.into_inner();
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
let chunks: Vec<Chunk> = db
.filtered_chunk_summaries(None, Some(&partition_key))
.into_iter()
.map(|summary| summary.into())
.collect();
Ok(Response::new(ListPartitionChunksResponse { chunks }))
}
async fn new_partition_chunk(
&self,
request: Request<NewPartitionChunkRequest>,
) -> Result<Response<NewPartitionChunkResponse>, Status> {
let NewPartitionChunkRequest {
db_name,
partition_key,
table_name,
} = request.into_inner();
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
db.rollover_partition(&table_name, &partition_key)
.await
.map_err(default_db_error_handler)?;
Ok(Response::new(NewPartitionChunkResponse {}))
}
async fn close_partition_chunk(
&self,
request: Request<ClosePartitionChunkRequest>,
) -> Result<Response<ClosePartitionChunkResponse>, Status> {
let ClosePartitionChunkRequest {
db_name,
partition_key,
table_name,
chunk_id,
} = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let chunk_id = ChunkId::try_from(chunk_id).scope("chunk_id")?;
let tracker = self
.server
.close_chunk(&db_name, table_name, partition_key, chunk_id)
.map_err(default_server_error_handler)?;
let operation = Some(super::operations::encode_tracker(tracker)?);
Ok(Response::new(ClosePartitionChunkResponse { operation }))
}
async fn unload_partition_chunk(
&self,
request: tonic::Request<UnloadPartitionChunkRequest>,
) -> Result<tonic::Response<UnloadPartitionChunkResponse>, tonic::Status> {
let UnloadPartitionChunkRequest {
db_name,
partition_key,
table_name,
chunk_id,
} = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
let chunk_id = ChunkId::try_from(chunk_id).scope("chunk_id")?;
db.unload_read_buffer(&table_name, &partition_key, chunk_id)
.map_err(default_db_error_handler)?;
Ok(Response::new(UnloadPartitionChunkResponse {}))
}
async fn load_partition_chunk(
&self,
request: tonic::Request<LoadPartitionChunkRequest>,
) -> Result<tonic::Response<LoadPartitionChunkResponse>, tonic::Status> {
let LoadPartitionChunkRequest {
db_name,
partition_key,
table_name,
chunk_id,
} = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let chunk_id = ChunkId::try_from(chunk_id).scope("chunk_id")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
let tracker = db
.load_read_buffer(&table_name, &partition_key, chunk_id)
.map_err(default_db_error_handler)?;
let operation = Some(super::operations::encode_tracker(tracker)?);
Ok(Response::new(LoadPartitionChunkResponse { operation }))
}
async fn get_server_status(
&self,
_request: Request<GetServerStatusRequest>,
) -> Result<Response<GetServerStatusResponse>, Status> {
let initialized = self.server.initialized();
// Purposefully suppress error from server::Databases as don't want
// to return an error if the server is not initialized
let mut database_statuses: Vec<_> = self
.server
.databases()
.map(|databases| {
databases
.into_iter()
.map(|database| DatabaseStatus {
db_name: database.name().to_string(),
error: database.init_error().map(|e| ProtobufError {
message: e.to_string(),
}),
state: database.state_code().into(),
uuid: database.uuid().as_bytes().to_vec(),
})
.collect()
})
.unwrap_or_default();
// Sort output by database name to ensure a nice output order
database_statuses.sort_unstable_by(|a, b| a.db_name.cmp(&b.db_name));
Ok(Response::new(GetServerStatusResponse {
server_status: Some(ServerStatus {
initialized,
error: self.server.server_init_error().map(|e| ProtobufError {
message: e.to_string(),
}),
database_statuses,
}),
}))
}
async fn wipe_preserved_catalog(
&self,
request: Request<WipePreservedCatalogRequest>,
) -> Result<Response<WipePreservedCatalogResponse>, Status> {
let WipePreservedCatalogRequest { db_name } = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let tracker = self
.server
.wipe_preserved_catalog(&db_name)
.await
.map_err(default_server_error_handler)?;
let operation = Some(super::operations::encode_tracker(tracker)?);
Ok(Response::new(WipePreservedCatalogResponse { operation }))
}
async fn rebuild_preserved_catalog(
&self,
request: Request<RebuildPreservedCatalogRequest>,
) -> Result<Response<RebuildPreservedCatalogResponse>, Status> {
let RebuildPreservedCatalogRequest { db_name, force } = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let database = self
.server
.database(&db_name)
.map_err(default_server_error_handler)?;
let tracker = database
.rebuild_preserved_catalog(force)
.await
.map_err(default_database_error_handler)?;
let operation = Some(super::operations::encode_tracker(tracker)?);
Ok(Response::new(RebuildPreservedCatalogResponse { operation }))
}
async fn skip_replay(
&self,
request: Request<SkipReplayRequest>,
) -> Result<Response<SkipReplayResponse>, Status> {
let SkipReplayRequest { db_name } = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let database = self
.server
.database(&db_name)
.map_err(default_server_error_handler)?;
database
.skip_replay()
.await
.map_err(default_database_error_handler)?;
Ok(Response::new(SkipReplayResponse {}))
}
async fn persist_partition(
&self,
request: tonic::Request<PersistPartitionRequest>,
) -> Result<tonic::Response<PersistPartitionResponse>, tonic::Status> {
let PersistPartitionRequest {
db_name,
partition_key,
table_name,
force,
} = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
db.persist_partition(&table_name, &partition_key, force)
.await
.map_err(default_db_error_handler)?;
Ok(Response::new(PersistPartitionResponse {}))
}
async fn drop_partition(
&self,
request: tonic::Request<DropPartitionRequest>,
) -> Result<tonic::Response<DropPartitionResponse>, tonic::Status> {
let DropPartitionRequest {
db_name,
partition_key,
table_name,
} = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
db.drop_partition(&table_name, &partition_key)
.await
.map_err(default_db_error_handler)?;
Ok(Response::new(DropPartitionResponse {}))
}
/// Compact all given object store chunks
async fn compact_object_store_chunks(
&self,
request: Request<CompactObjectStoreChunksRequest>,
) -> Result<Response<CompactObjectStoreChunksResponse>, Status> {
let CompactObjectStoreChunksRequest {
db_name,
partition_key,
table_name,
chunk_ids,
} = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
let mut chunk_id_ids = vec![];
for chunk_id in chunk_ids {
let chunk_id = ChunkId::try_from(chunk_id).scope("chunk_id")?;
chunk_id_ids.push(chunk_id);
}
let tracker = db
.compact_object_store_chunks(&table_name, &partition_key, chunk_id_ids)
.map_err(default_db_error_handler)?;
let operation = Some(super::operations::encode_tracker(tracker)?);
Ok(Response::new(CompactObjectStoreChunksResponse {
operation,
}))
}
// Compact all object store chunks of the given partition
async fn compact_object_store_partition(
&self,
request: Request<CompactObjectStorePartitionRequest>,
) -> Result<Response<CompactObjectStorePartitionResponse>, Status> {
let CompactObjectStorePartitionRequest {
db_name,
partition_key,
table_name,
} = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).scope("db_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
let tracker = db
.compact_object_store_partition(&table_name, &partition_key)
.map_err(default_db_error_handler)?;
let operation = Some(super::operations::encode_tracker(tracker)?);
Ok(Response::new(CompactObjectStorePartitionResponse {
operation,
}))
}
async fn shutdown_database(
&self,
request: Request<ShutdownDatabaseRequest>,
) -> Result<Response<ShutdownDatabaseResponse>, Status> {
let request = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(request.db_name).scope("db_name")?;
let database = self
.server
.database(&db_name)
.map_err(default_server_error_handler)?;
// Wait for database to shutdown
database.shutdown();
// Ignore error as database is still shutdown
let _ = database.join().await.log_if_error("shutdown database");
Ok(Response::new(ShutdownDatabaseResponse {}))
}
async fn restart_database(
&self,
request: Request<RestartDatabaseRequest>,
) -> Result<Response<RestartDatabaseResponse>, Status> {
let request = request.into_inner();
// Validate that the database name is legit
let db_name = DatabaseName::new(request.db_name).scope("db_name")?;
let database = self
.server
.database(&db_name)
.map_err(default_server_error_handler)?;
// restart database
database
.restart_with_options(request.skip_replay)
.await
.map_err(|source| {
tonic::Status::invalid_argument(format!("Cannot initialize database: {}", source))
})?;
Ok(Response::new(RestartDatabaseResponse {}))
}
}
/// Returns [`DatabaseRules`] formated according to the `omit_defaults` flag. If `omit_defaults` is
/// true, returns the stored config, otherwise returns the actual configuration.
fn format_rules(provided_rules: Arc<ProvidedDatabaseRules>, omit_defaults: bool) -> DatabaseRules {
if omit_defaults {
// return rules as originally provided by the user
provided_rules.original().clone()
} else {
// return the active rules (which have all default values filled in)
provided_rules.rules().as_ref().clone().into()
}
}
pub fn make_server(
application: Arc<ApplicationState>,
server: Arc<Server>,
config_immutable: bool,
) -> management_service_server::ManagementServiceServer<
impl management_service_server::ManagementService,
> {
management_service_server::ManagementServiceServer::new(ManagementService {
application,
server,
config_immutable,
})
}

View File

@ -1,60 +0,0 @@
use std::sync::Arc;
use crate::DatabaseServerType;
use ioxd_common::{
add_service, rpc::RpcBuilderInput, serve_builder, server_type::RpcError, setup_builder,
};
mod delete;
mod deployment;
mod error;
mod management;
mod operations;
mod query;
mod write_pb;
pub async fn server_grpc(
server_type: Arc<DatabaseServerType>,
builder_input: RpcBuilderInput,
) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, server_type);
add_service!(
builder,
query::make_storage_server(Arc::clone(&server_type.server),)
);
add_service!(
builder,
query::make_flight_server(Arc::clone(&server_type.server))
);
add_service!(
builder,
delete::make_server(Arc::clone(&server_type.server))
);
add_service!(
builder,
write_pb::make_server(Arc::clone(&server_type.server))
);
// Also important this is not behind a readiness check (as it is
// used to change the check!)
add_service!(
builder,
management::make_server(
Arc::clone(&server_type.application),
Arc::clone(&server_type.server),
server_type.config_immutable,
)
);
add_service!(
builder,
deployment::make_server(Arc::clone(&server_type.server),)
);
add_service!(
builder,
operations::make_server(Arc::clone(server_type.application.job_registry()))
);
serve_builder!(builder);
Ok(())
}

View File

@ -1,202 +0,0 @@
use bytes::BytesMut;
use data_types::job::Job;
use generated_types::{
google::{
longrunning::*,
protobuf::{Any, Empty},
rpc::Status,
FieldViolation, FieldViolationExt, InternalError, NotFound, ResourceType,
},
influxdata::iox::management::v1 as management,
protobuf_type_url,
};
use job_registry::JobRegistry;
use observability_deps::tracing::debug;
use prost::Message;
use std::{convert::TryInto, sync::Arc};
use tonic::Response;
use tracker::{TaskId, TaskResult, TaskStatus, TaskTracker};
/// Implementation of the write service
struct OperationsService {
jobs: Arc<JobRegistry>,
}
pub fn encode_tracker(tracker: TaskTracker<Job>) -> Result<Operation, tonic::Status> {
let id = tracker.id();
let status = tracker.get_status();
let result = status.result();
let operation_metadata = match status {
TaskStatus::Creating => management::OperationMetadata {
job: Some(tracker.metadata().clone().into()),
..Default::default()
},
TaskStatus::Running {
total_count,
pending_count,
cpu_nanos,
} => management::OperationMetadata {
cpu_nanos: cpu_nanos as _,
total_count: total_count as _,
pending_count: pending_count as _,
job: Some(tracker.metadata().clone().into()),
..Default::default()
},
TaskStatus::Complete {
total_count,
success_count,
error_count,
cancelled_count,
dropped_count,
cpu_nanos,
wall_nanos,
} => management::OperationMetadata {
cpu_nanos: cpu_nanos as _,
total_count: total_count as _,
success_count: success_count as _,
error_count: error_count as _,
cancelled_count: cancelled_count as _,
dropped_count: dropped_count as _,
wall_nanos: wall_nanos as _,
job: Some(tracker.metadata().clone().into()),
..Default::default()
},
};
let mut buffer = BytesMut::new();
operation_metadata.encode(&mut buffer).map_err(|error| {
debug!(?error, "Unexpected error");
InternalError {}
})?;
let metadata = Any {
type_url: protobuf_type_url(management::OPERATION_METADATA),
value: buffer.freeze(),
};
let result = match result {
Some(TaskResult::Success) => Some(operation::Result::Response(Any {
type_url: "type.googleapis.com/google.protobuf.Empty".to_string(),
value: Default::default(),
})),
Some(TaskResult::Cancelled) => Some(operation::Result::Error(Status {
code: tonic::Code::Cancelled as _,
message: "Job cancelled".to_string(),
details: vec![],
})),
Some(TaskResult::Dropped) => Some(operation::Result::Error(Status {
code: tonic::Code::Internal as _,
message: "Job did not run to completion, possible panic".to_string(),
details: vec![],
})),
Some(TaskResult::Error) => Some(operation::Result::Error(Status {
code: tonic::Code::Internal as _,
message: "Job returned an error".to_string(),
details: vec![],
})),
None => None,
};
Ok(Operation {
name: id.to_string(),
metadata: Some(metadata),
done: result.is_some(),
result,
})
}
fn get_tracker(jobs: &JobRegistry, tracker: String) -> Result<TaskTracker<Job>, tonic::Status> {
let tracker_id = tracker.parse::<TaskId>().map_err(|e| FieldViolation {
field: "name".to_string(),
description: e.to_string(),
})?;
let tracker = jobs
.get(tracker_id)
.ok_or_else(|| NotFound::new(ResourceType::Job, tracker))?;
Ok(tracker)
}
#[tonic::async_trait]
impl operations_server::Operations for OperationsService {
async fn list_operations(
&self,
_request: tonic::Request<ListOperationsRequest>,
) -> Result<tonic::Response<ListOperationsResponse>, tonic::Status> {
// TODO: Support pagination
let operations: Result<Vec<_>, _> = self
.jobs
.tracked()
.into_iter()
.map(encode_tracker)
.collect();
Ok(Response::new(ListOperationsResponse {
operations: operations?,
next_page_token: Default::default(),
}))
}
async fn get_operation(
&self,
request: tonic::Request<GetOperationRequest>,
) -> Result<tonic::Response<Operation>, tonic::Status> {
let request = request.into_inner();
let tracker = get_tracker(self.jobs.as_ref(), request.name)?;
Ok(Response::new(encode_tracker(tracker)?))
}
async fn delete_operation(
&self,
_: tonic::Request<DeleteOperationRequest>,
) -> Result<tonic::Response<Empty>, tonic::Status> {
Err(tonic::Status::unimplemented(
"IOx does not support operation deletion",
))
}
async fn cancel_operation(
&self,
request: tonic::Request<CancelOperationRequest>,
) -> Result<tonic::Response<Empty>, tonic::Status> {
let request = request.into_inner();
let tracker = get_tracker(self.jobs.as_ref(), request.name)?;
tracker.cancel();
Ok(Response::new(Empty {}))
}
async fn wait_operation(
&self,
request: tonic::Request<WaitOperationRequest>,
) -> Result<tonic::Response<Operation>, tonic::Status> {
// This should take into account the context deadline timeout
// Unfortunately these are currently stripped by tonic
// - https://github.com/hyperium/tonic/issues/75
let request = request.into_inner();
let tracker = get_tracker(self.jobs.as_ref(), request.name)?;
if let Some(timeout) = request.timeout {
let timeout = timeout.try_into().scope("timeout")?;
// Timeout is not an error so suppress it
let _ = tokio::time::timeout(timeout, tracker.join()).await;
} else {
tracker.join().await;
}
Ok(Response::new(encode_tracker(tracker)?))
}
}
/// Instantiate the write service
pub fn make_server(
jobs: Arc<JobRegistry>,
) -> operations_server::OperationsServer<impl operations_server::Operations> {
operations_server::OperationsServer::new(OperationsService { jobs })
}

View File

@ -1,15 +0,0 @@
use std::sync::Arc;
use arrow_flight::flight_service_server::{
FlightService as Flight, FlightServiceServer as FlightServer,
};
use generated_types::storage_server::{Storage, StorageServer};
use server::Server;
pub fn make_flight_server(server: Arc<Server>) -> FlightServer<impl Flight> {
service_grpc_flight::make_server(server)
}
pub fn make_storage_server(server: Arc<Server>) -> StorageServer<impl Storage> {
service_grpc_influxrpc::make_server(server)
}

View File

@ -1,59 +0,0 @@
use data_types::DatabaseName;
use dml::{DmlMeta, DmlOperation, DmlWrite};
use generated_types::google::{FieldViolation, FieldViolationExt};
use generated_types::influxdata::pbdata::v1::*;
use server::Server;
use std::sync::Arc;
use super::error::{default_dml_error_handler, default_server_error_handler};
struct PBWriteService {
server: Arc<Server>,
}
#[tonic::async_trait]
impl write_service_server::WriteService for PBWriteService {
async fn write(
&self,
request: tonic::Request<WriteRequest>,
) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
let span_ctx = request.extensions().get().cloned();
let database_batch = request
.into_inner()
.database_batch
.ok_or_else(|| FieldViolation::required("database_batch"))?;
let tables =
mutable_batch_pb::decode::decode_database_batch(&database_batch).map_err(|e| {
FieldViolation {
field: "database_batch".into(),
description: format!("Invalid DatabaseBatch: {}", e),
}
})?;
let write = DmlWrite::new(
&database_batch.database_name,
tables,
DmlMeta::unsequenced(span_ctx),
);
let db_name = DatabaseName::new(&database_batch.database_name)
.scope("database_batch.database_name")?;
let db = self
.server
.db(&db_name)
.map_err(default_server_error_handler)?;
db.store_operation(&DmlOperation::Write(write))
.map_err(default_dml_error_handler)?;
Ok(tonic::Response::new(WriteResponse {}))
}
}
pub fn make_server(
server: Arc<Server>,
) -> write_service_server::WriteServiceServer<impl write_service_server::WriteService> {
write_service_server::WriteServiceServer::new(PBWriteService { server })
}

View File

@ -1,79 +0,0 @@
use std::sync::Arc;
use clap_blocks::run_config::RunConfig;
use object_store::ObjectStoreImpl;
use observability_deps::tracing::warn;
use server::config::ConfigProvider;
use server::{ApplicationState, Server, ServerConfig};
use snafu::{ResultExt, Snafu};
use trace::TraceCollector;
use crate::config::ServerConfigFile;
use clap_blocks::object_store::{check_object_store, warn_about_inmem_store};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Cannot parse object store config: {}", source))]
ObjectStoreParsing {
source: clap_blocks::object_store::ParseError,
},
#[snafu(display("Cannot check object store config: {}", source))]
ObjectStoreCheck {
source: clap_blocks::object_store::CheckError,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub async fn make_application(
run_config: &RunConfig,
config_file: Option<String>,
num_worker_threads: Option<usize>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Arc<ApplicationState>> {
let object_store_config = run_config.object_store_config();
warn_about_inmem_store(object_store_config);
let object_store =
ObjectStoreImpl::try_from(object_store_config).context(ObjectStoreParsingSnafu)?;
check_object_store(&object_store)
.await
.context(ObjectStoreCheckSnafu)?;
let object_storage = Arc::new(object_store);
let config_provider =
config_file.map(|path| Arc::new(ServerConfigFile::new(path)) as Arc<dyn ConfigProvider>);
Ok(Arc::new(ApplicationState::new(
object_storage,
num_worker_threads,
trace_collector,
config_provider,
)))
}
pub fn make_server(
application: Arc<ApplicationState>,
wipe_catalog_on_error: bool,
skip_replay_and_seek_instead: bool,
run_config: &RunConfig,
) -> Result<Arc<Server>> {
let server_config = ServerConfig {
wipe_catalog_on_error,
skip_replay_and_seek_instead,
};
let app_server = Arc::new(Server::new(application, server_config));
// if this ID isn't set the server won't be usable until this is set via an API
// call
if let Some(id) = run_config.server_id_config().server_id {
app_server.set_id(id).expect("server id already set");
} else {
warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
}
Ok(app_server)
}

View File

@ -1,42 +0,0 @@
[package]
name = "ioxd_router"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Workspace dependencies, in alphabetical order
data_types = { path = "../data_types" }
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
router = { path = "../router" }
service_grpc_testing = { path = "../service_grpc_testing" }
iox_time = { path = "../iox_time" }
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
# Crates.io dependencies, in alphabetical order
async-trait = "0.1"
http = "0.2.7"
hyper = "0.14"
snafu = "0.7"
tonic = "0.7"
tonic-health = "0.6.0"
tonic-reflection = "0.4.0"
tokio = { version = "1.18", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7.0" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
# Workspace dependencies, in alphabetical order
# Crates.io dependencies, in alphabetical order
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
regex = "1"

View File

@ -1,286 +0,0 @@
use std::sync::Arc;
use async_trait::async_trait;
use data_types::DatabaseName;
use dml::DmlOperation;
use hyper::{Body, Method, Request, Response};
use snafu::{ResultExt, Snafu};
use ioxd_common::http::{
dml::{HttpDrivenDml, InnerDmlError, RequestOrResponse},
error::{HttpApiError, HttpApiErrorExt, HttpApiErrorSource},
metrics::LineProtocolMetrics,
};
use super::RouterServerType;
#[derive(Debug, Snafu)]
pub enum ApplicationError {
#[snafu(display("No handler for {:?} {}", method, path))]
RouteNotFound { method: Method, path: String },
#[snafu(display("Cannot write data: {}", source))]
WriteError {
source: ioxd_common::http::dml::HttpDmlError,
},
}
impl HttpApiErrorSource for ApplicationError {
fn to_http_api_error(&self) -> HttpApiError {
match self {
e @ Self::RouteNotFound { .. } => e.not_found(),
Self::WriteError { source } => source.to_http_api_error(),
}
}
}
#[async_trait]
impl HttpDrivenDml for RouterServerType {
fn max_request_size(&self) -> usize {
self.max_request_size
}
fn lp_metrics(&self) -> Arc<LineProtocolMetrics> {
Arc::clone(&self.lp_metrics)
}
async fn write(
&self,
db_name: &DatabaseName<'_>,
op: DmlOperation,
) -> Result<(), InnerDmlError> {
match self.server.router(db_name) {
Some(router) => router
.write(op)
.await
.map_err(|e| InnerDmlError::InternalError {
db_name: db_name.to_string(),
source: Box::new(e),
}),
None => Err(InnerDmlError::DatabaseNotFound {
db_name: db_name.to_string(),
}),
}
}
}
#[allow(clippy::match_single_binding)]
pub async fn route_request(
server_type: &RouterServerType,
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
match server_type
.route_dml_http_request(req)
.await
.context(WriteSnafu)?
{
RequestOrResponse::Response(resp) => Ok(resp),
RequestOrResponse::Request(req) => Err(ApplicationError::RouteNotFound {
method: req.method().clone(),
path: req.uri().path().to_string(),
}),
}
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, sync::Arc};
use data_types::{
delete_predicate::{DeleteExpr, DeletePredicate},
server_id::ServerId,
timestamp::TimestampRange,
};
use dml::{DmlDelete, DmlMeta, DmlOperation};
use http::StatusCode;
use iox_time::SystemProvider;
use reqwest::Client;
use router::{grpc_client::MockClient, resolver::RemoteTemplate, server::RouterServer};
use trace::RingBufferTraceCollector;
use ioxd_common::{
http::{
dml::test_utils::{
assert_delete_bad_request, assert_delete_unknown_database, assert_gzip_write,
assert_write, assert_write_metrics, assert_write_precision,
assert_write_to_invalid_database,
},
test_utils::{
assert_health, assert_metrics, assert_tracing, check_response, TestServer,
},
},
server_type::CommonServerState,
};
use super::*;
#[tokio::test]
async fn test_health() {
assert_health(test_server().await).await;
}
#[tokio::test]
async fn test_metrics() {
assert_metrics(test_server().await).await;
}
#[tokio::test]
async fn test_tracing() {
assert_tracing(test_server().await).await;
}
#[tokio::test]
async fn test_write() {
let test_server = test_server().await;
let write = assert_write(&test_server).await;
assert_dbwrite(test_server, DmlOperation::Write(write)).await;
}
#[tokio::test]
async fn test_gzip_write() {
let test_server = test_server().await;
let write = assert_gzip_write(&test_server).await;
assert_dbwrite(test_server, DmlOperation::Write(write)).await;
}
#[tokio::test]
async fn test_write_metrics() {
assert_write_metrics(test_server().await, false).await;
}
#[tokio::test]
async fn test_write_precision() {
let test_server = test_server().await;
let writes = assert_write_precision(&test_server).await;
assert_dbwrites(test_server, writes.into_iter().map(DmlOperation::Write)).await;
}
#[tokio::test]
async fn test_write_to_invalid_database() {
assert_write_to_invalid_database(test_server().await).await;
}
#[tokio::test]
async fn test_delete() {
// Set up server
let test_server = test_server().await;
// Set up client
let client = Client::new();
let bucket_name = "MyBucket";
let org_name = "MyOrg";
// Client requests to delete data
let delete_line = r#"{"start":"1","stop":"2", "predicate":"foo=1"}"#;
let response = client
.post(&format!(
"{}/api/v2/delete?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
))
.body(delete_line)
.send()
.await;
check_response("delete", response, StatusCode::NO_CONTENT, Some("")).await;
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![DeleteExpr {
column: String::from("foo"),
op: data_types::delete_predicate::Op::Eq,
scalar: data_types::delete_predicate::Scalar::I64(1),
}],
};
let delete = DmlDelete::new(
"MyOrg_MyBucket",
predicate,
None,
DmlMeta::unsequenced(None),
);
assert_dbwrite(test_server, DmlOperation::Delete(delete)).await;
}
#[tokio::test]
async fn test_delete_unknown_database() {
assert_delete_unknown_database(test_server().await).await;
}
#[tokio::test]
async fn test_delete_bad_request() {
assert_delete_bad_request(test_server().await).await;
}
async fn test_server() -> TestServer<RouterServerType> {
use data_types::router::{
Matcher, MatcherToShard, Router, ShardConfig, ShardId, WriteSink, WriteSinkSet,
WriteSinkVariant,
};
use regex::Regex;
let common_state = CommonServerState::for_testing();
let time_provider = Arc::new(SystemProvider::new());
let server_id_1 = ServerId::try_from(1).unwrap();
let remote_template = RemoteTemplate::new("{id}");
let server = Arc::new(
RouterServer::for_testing(
Some(remote_template),
Some(Arc::new(RingBufferTraceCollector::new(1))),
time_provider,
None,
)
.await,
);
server.update_router(Router {
name: String::from("MyOrg_MyBucket"),
write_sharder: ShardConfig {
specific_targets: vec![MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new(".*").unwrap()),
},
shard: ShardId::new(1),
}],
hash_ring: None,
},
write_sinks: BTreeMap::from([(
ShardId::new(1),
WriteSinkSet {
sinks: vec![WriteSink {
ignore_errors: false,
sink: WriteSinkVariant::GrpcRemote(server_id_1),
}],
},
)]),
query_sinks: Default::default(),
});
let server_type = Arc::new(RouterServerType::new(server, &common_state, false));
TestServer::new(server_type)
}
async fn assert_dbwrites(
test_server: TestServer<RouterServerType>,
writes: impl IntoIterator<Item = DmlOperation> + Send,
) {
let grpc_client = test_server
.server_type()
.server
.connection_pool()
.grpc_client("1")
.await
.unwrap();
let grpc_client = grpc_client.as_any().downcast_ref::<MockClient>().unwrap();
let operations: Vec<_> = writes
.into_iter()
.map(|o| (String::from("MyOrg_MyBucket"), o))
.collect();
grpc_client.assert_writes(&operations);
}
async fn assert_dbwrite(test_server: TestServer<RouterServerType>, write: DmlOperation) {
assert_dbwrites(test_server, std::iter::once(write)).await
}
}

View File

@ -1,78 +0,0 @@
use std::sync::Arc;
use async_trait::async_trait;
use hyper::{Body, Request, Response};
use metric::Registry;
use router::server::RouterServer;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
use ioxd_common::{
http::{error::HttpApiErrorSource, metrics::LineProtocolMetrics},
rpc::RpcBuilderInput,
server_type::{CommonServerState, RpcError, ServerType},
};
mod http;
mod rpc;
pub use self::http::ApplicationError;
#[derive(Debug)]
pub struct RouterServerType {
server: Arc<RouterServer>,
shutdown: CancellationToken,
max_request_size: usize,
lp_metrics: Arc<LineProtocolMetrics>,
config_immutable: bool,
}
impl RouterServerType {
pub fn new(
server: Arc<RouterServer>,
common_state: &CommonServerState,
config_immutable: bool,
) -> Self {
let lp_metrics = Arc::new(LineProtocolMetrics::new(server.metric_registry().as_ref()));
Self {
server,
shutdown: CancellationToken::new(),
max_request_size: common_state.run_config().max_http_request_size,
lp_metrics,
config_immutable,
}
}
}
#[async_trait]
impl ServerType for RouterServerType {
fn metric_registry(&self) -> Arc<Registry> {
Arc::clone(self.server.metric_registry())
}
fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>> {
self.server.trace_collector().clone()
}
async fn route_http_request(
&self,
req: Request<Body>,
) -> Result<Response<Body>, Box<dyn HttpApiErrorSource>> {
self::http::route_request(self, req)
.await
.map_err(|e| Box::new(e) as _)
}
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
self::rpc::server_grpc(self, builder_input).await
}
async fn join(self: Arc<Self>) {
self.shutdown.cancelled().await;
}
fn shutdown(&self) {
self.shutdown.cancel();
}
}

View File

@ -1,51 +0,0 @@
use std::sync::Arc;
use data_types::non_empty::NonEmptyString;
use dml::{DmlDelete, DmlMeta, DmlOperation};
use generated_types::google::{FromOptionalField, NotFound, OptionalField, ResourceType};
use generated_types::influxdata::iox::delete::v1::*;
use router::server::RouterServer;
use tonic::Response;
struct DeleteService {
server: Arc<RouterServer>,
}
#[tonic::async_trait]
impl delete_service_server::DeleteService for DeleteService {
async fn delete(
&self,
request: tonic::Request<DeleteRequest>,
) -> Result<tonic::Response<DeleteResponse>, tonic::Status> {
let span_ctx = request.extensions().get().cloned();
let DeleteRequest { payload } = request.into_inner();
let DeletePayload {
db_name,
table_name,
predicate,
} = payload.unwrap_field("payload")?;
let predicate = predicate.required("predicate")?;
let table_name = NonEmptyString::new(table_name);
let meta = DmlMeta::unsequenced(span_ctx);
let op = DmlOperation::Delete(DmlDelete::new(&db_name, predicate, table_name, meta));
let router = self
.server
.router(&db_name)
.ok_or_else(|| NotFound::new(ResourceType::Router, db_name))?;
router
.write(op)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
Ok(Response::new(DeleteResponse {}))
}
}
pub fn make_server(
server: Arc<RouterServer>,
) -> delete_service_server::DeleteServiceServer<impl delete_service_server::DeleteService> {
delete_service_server::DeleteServiceServer::new(DeleteService { server })
}

View File

@ -1,55 +0,0 @@
use data_types::server_id::ServerId;
use generated_types::google::ResourceType;
use generated_types::{
google::{FieldViolation, NotFound},
influxdata::iox::deployment::v1::*,
};
use router::server::RouterServer;
use std::{convert::TryFrom, sync::Arc};
use tonic::{Request, Response, Status};
struct DeploymentService {
server: Arc<RouterServer>,
}
#[tonic::async_trait]
impl deployment_service_server::DeploymentService for DeploymentService {
async fn get_server_id(
&self,
_: Request<GetServerIdRequest>,
) -> Result<Response<GetServerIdResponse>, Status> {
match self.server.server_id() {
Some(id) => Ok(Response::new(GetServerIdResponse { id: id.get_u32() })),
None => return Err(NotFound::new(ResourceType::ServerId, Default::default()).into()),
}
}
async fn update_server_id(
&self,
request: Request<UpdateServerIdRequest>,
) -> Result<Response<UpdateServerIdResponse>, Status> {
use router::server::SetServerIdError;
let id =
ServerId::try_from(request.get_ref().id).map_err(|_| FieldViolation::required("id"))?;
match self.server.set_server_id(id) {
Ok(_) => Ok(Response::new(UpdateServerIdResponse {})),
Err(e @ SetServerIdError::AlreadySet { .. }) => {
return Err(FieldViolation {
field: "id".to_string(),
description: e.to_string(),
}
.into())
}
}
}
}
pub fn make_server(
server: Arc<RouterServer>,
) -> deployment_service_server::DeploymentServiceServer<
impl deployment_service_server::DeploymentService,
> {
deployment_service_server::DeploymentServiceServer::new(DeploymentService { server })
}

View File

@ -1,48 +0,0 @@
use std::sync::Arc;
use ioxd_common::{
add_service, rpc::RpcBuilderInput, serve_builder, server_type::RpcError, setup_builder,
};
use super::RouterServerType;
mod delete;
mod deployment;
mod remote;
mod router;
mod write_pb;
pub async fn server_grpc(
server_type: Arc<RouterServerType>,
builder_input: RpcBuilderInput,
) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, server_type);
add_service!(
builder,
deployment::make_server(Arc::clone(&server_type.server),)
);
add_service!(
builder,
remote::make_server(Arc::clone(&server_type.server))
);
add_service!(
builder,
router::make_server(
Arc::clone(&server_type.server),
server_type.config_immutable
)
);
add_service!(
builder,
delete::make_server(Arc::clone(&server_type.server))
);
add_service!(
builder,
write_pb::make_server(Arc::clone(&server_type.server))
);
serve_builder!(builder);
Ok(())
}

View File

@ -1,72 +0,0 @@
use std::convert::TryFrom;
use std::sync::Arc;
use data_types::server_id::ServerId;
use generated_types::google::{FieldViolation, NotFound, ResourceType};
use generated_types::influxdata::iox::remote::v1::*;
use router::server::RouterServer;
use tonic::{Request, Response, Status};
struct RemoteService {
server: Arc<RouterServer>,
}
#[tonic::async_trait]
impl remote_service_server::RemoteService for RemoteService {
async fn list_remotes(
&self,
_: Request<ListRemotesRequest>,
) -> Result<Response<ListRemotesResponse>, Status> {
let remotes = self
.server
.resolver()
.remotes()
.into_iter()
.map(|(id, connection_string)| Remote {
id: id.get_u32(),
connection_string,
})
.collect();
Ok(Response::new(ListRemotesResponse { remotes }))
}
async fn update_remote(
&self,
request: Request<UpdateRemoteRequest>,
) -> Result<Response<UpdateRemoteResponse>, Status> {
let remote = request
.into_inner()
.remote
.ok_or_else(|| FieldViolation::required("remote"))?;
let remote_id = ServerId::try_from(remote.id)
.map_err(|_| FieldViolation::required("id").scope("remote"))?;
self.server
.resolver()
.update_remote(remote_id, remote.connection_string);
Ok(Response::new(UpdateRemoteResponse {}))
}
async fn delete_remote(
&self,
request: Request<DeleteRemoteRequest>,
) -> Result<Response<DeleteRemoteResponse>, Status> {
let request = request.into_inner();
let remote_id =
ServerId::try_from(request.id).map_err(|_| FieldViolation::required("id"))?;
if self.server.resolver().delete_remote(remote_id) {
Ok(Response::new(DeleteRemoteResponse {}))
} else {
Err(NotFound::new(ResourceType::ServerId, remote_id.to_string()).into())
}
}
}
pub fn make_server(
server: Arc<RouterServer>,
) -> remote_service_server::RemoteServiceServer<impl remote_service_server::RemoteService> {
remote_service_server::RemoteServiceServer::new(RemoteService { server })
}

View File

@ -1,85 +0,0 @@
use std::sync::Arc;
use tonic::{Request, Response, Status};
use generated_types::google::PreconditionViolation;
use generated_types::{
google::{FromOptionalField, NotFound, ResourceType},
influxdata::iox::router::v1::*,
};
use router::server::RouterServer;
struct RouterService {
server: Arc<RouterServer>,
config_immutable: bool,
}
#[tonic::async_trait]
impl router_service_server::RouterService for RouterService {
async fn get_router(
&self,
request: Request<GetRouterRequest>,
) -> Result<Response<GetRouterResponse>, Status> {
let GetRouterRequest { router_name } = request.into_inner();
let router = self
.server
.router(&router_name)
.ok_or_else(|| NotFound::new(ResourceType::Router, router_name))?;
Ok(Response::new(GetRouterResponse {
router: Some(router.config().clone().into()),
}))
}
async fn list_routers(
&self,
_: Request<ListRoutersRequest>,
) -> Result<Response<ListRoutersResponse>, Status> {
Ok(Response::new(ListRoutersResponse {
routers: self
.server
.routers()
.into_iter()
.map(|router| router.config().clone().into())
.collect(),
}))
}
async fn update_router(
&self,
request: Request<UpdateRouterRequest>,
) -> Result<Response<UpdateRouterResponse>, Status> {
if self.config_immutable {
return Err(PreconditionViolation::RouterConfigImmutable.into());
}
use data_types::router::Router as RouterConfig;
let UpdateRouterRequest { router } = request.into_inner();
let cfg: RouterConfig = router.required("router")?;
self.server.update_router(cfg);
Ok(Response::new(UpdateRouterResponse {}))
}
async fn delete_router(
&self,
request: Request<DeleteRouterRequest>,
) -> Result<Response<DeleteRouterResponse>, Status> {
if self.config_immutable {
return Err(PreconditionViolation::RouterConfigImmutable.into());
}
let DeleteRouterRequest { router_name } = request.into_inner();
self.server.delete_router(&router_name);
Ok(Response::new(DeleteRouterResponse {}))
}
}
pub fn make_server(
server: Arc<RouterServer>,
config_immutable: bool,
) -> router_service_server::RouterServiceServer<impl router_service_server::RouterService> {
router_service_server::RouterServiceServer::new(RouterService {
server,
config_immutable,
})
}

View File

@ -1,55 +0,0 @@
use dml::{DmlMeta, DmlOperation, DmlWrite};
use generated_types::google::{FieldViolation, NotFound, ResourceType};
use generated_types::influxdata::pbdata::v1::*;
use router::server::RouterServer;
use std::sync::Arc;
struct PBWriteService {
server: Arc<RouterServer>,
}
#[tonic::async_trait]
impl write_service_server::WriteService for PBWriteService {
async fn write(
&self,
request: tonic::Request<WriteRequest>,
) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
let span_ctx = request.extensions().get().cloned();
let database_batch = request
.into_inner()
.database_batch
.ok_or_else(|| FieldViolation::required("database_batch"))?;
let tables =
mutable_batch_pb::decode::decode_database_batch(&database_batch).map_err(|e| {
FieldViolation {
field: "database_batch".into(),
description: format!("Invalid DatabaseBatch: {}", e),
}
})?;
let write = DmlOperation::Write(DmlWrite::new(
&database_batch.database_name,
tables,
DmlMeta::unsequenced(span_ctx),
));
let router = self
.server
.router(&database_batch.database_name)
.ok_or_else(|| NotFound::new(ResourceType::Router, database_batch.database_name))?;
router
.write(write)
.await
.map_err::<tonic::Status, _>(|e| tonic::Status::aborted(e.to_string()))?;
Ok(tonic::Response::new(WriteResponse {}))
}
}
pub fn make_server(
server: Arc<RouterServer>,
) -> write_service_server::WriteServiceServer<impl write_service_server::WriteService> {
write_service_server::WriteServiceServer::new(PBWriteService { server })
}