refactor: Extract common, OG database and router out of influxdb_ioxd ()

* refactor: Extract common, OG database and router out of influxdb_ioxd

* chore: Run cargo hakari tasks

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
pull/24376/head
Andrew Lamb 2022-03-29 09:07:19 -04:00 committed by GitHub
parent 99b6be9c2b
commit 4ca52e5ae0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 703 additions and 516 deletions

174
Cargo.lock generated
View File

@ -2147,6 +2147,9 @@ dependencies = [
"iox_catalog",
"iox_object_store",
"iox_tests",
"ioxd_common",
"ioxd_database",
"ioxd_router",
"itertools",
"job_registry",
"libc",
@ -2242,76 +2245,36 @@ name = "influxdb_ioxd"
version = "0.1.0"
dependencies = [
"ansi_term",
"arrow",
"arrow-flight",
"arrow_util",
"assert_cmd",
"async-trait",
"base64 0.13.0",
"byteorder",
"bytes",
"chrono",
"clap 3.1.6",
"clap_blocks",
"comfy-table",
"compactor",
"csv",
"data_types",
"data_types2",
"db",
"dml",
"flate2",
"futures",
"generated_types",
"hashbrown 0.12.0",
"heappy",
"hex",
"http",
"hyper",
"influxdb_iox_client",
"influxdb_line_protocol",
"influxdb_storage_client",
"ingester",
"internal_types",
"iox_catalog",
"iox_object_store",
"itertools",
"job_registry",
"libc",
"log",
"logfmt",
"ioxd_common",
"metric",
"metric_exporters",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"mutable_buffer",
"num_cpus",
"object_store",
"observability_deps",
"once_cell",
"panic_logging",
"parking_lot 0.12.0",
"parquet",
"parquet_catalog",
"parquet_file",
"pin-project",
"pprof 0.7.0",
"predicate",
"predicates",
"prost",
"querier",
"query",
"rand",
"read_buffer",
"regex",
"reqwest",
"router",
"router2",
"schema",
"serde",
"serde_json",
"serde_urlencoded",
"server",
"service_common",
"service_grpc_flight",
@ -2330,13 +2293,9 @@ dependencies = [
"tonic",
"tonic-health",
"tonic-reflection",
"tower",
"trace",
"trace_exporters",
"trace_http",
"tracker",
"trogging",
"uuid",
"workspace-hack",
"write_buffer",
]
@ -2602,6 +2561,131 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "ioxd_common"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"chrono",
"clap 3.1.6",
"clap_blocks",
"data_types",
"dml",
"flate2",
"futures",
"hashbrown 0.12.0",
"http",
"hyper",
"log",
"metric",
"metric_exporters",
"mutable_batch_lp",
"observability_deps",
"parking_lot 0.12.0",
"predicate",
"prost",
"reqwest",
"serde",
"serde_json",
"serde_urlencoded",
"snafu",
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"tonic",
"tonic-health",
"tower",
"trace",
"trace_exporters",
"trace_http",
"workspace-hack",
]
[[package]]
name = "ioxd_database"
version = "0.1.0"
dependencies = [
"arrow",
"arrow-flight",
"arrow_util",
"async-trait",
"bytes",
"clap 3.1.6",
"clap_blocks",
"data_types",
"db",
"dml",
"futures",
"generated_types",
"http",
"hyper",
"influxdb_iox_client",
"influxdb_storage_client",
"ioxd_common",
"job_registry",
"metric",
"mutable_batch_pb",
"object_store",
"observability_deps",
"prost",
"query",
"reqwest",
"schema",
"serde",
"serde_json",
"serde_urlencoded",
"server",
"service_common",
"service_grpc_flight",
"service_grpc_influxrpc",
"service_grpc_testing",
"snafu",
"test_helpers",
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"tonic",
"tonic-health",
"tonic-reflection",
"trace",
"trace_exporters",
"trace_http",
"tracker",
"uuid",
"workspace-hack",
]
[[package]]
name = "ioxd_router"
version = "0.1.0"
dependencies = [
"async-trait",
"data_types",
"dml",
"generated_types",
"http",
"hyper",
"ioxd_common",
"metric",
"mutable_batch_pb",
"regex",
"reqwest",
"router",
"service_grpc_testing",
"snafu",
"time 0.1.0",
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"tonic",
"tonic-health",
"tonic-reflection",
"trace",
"trace_http",
"workspace-hack",
]
[[package]]
name = "ipnet"
version = "2.4.0"

View File

@ -31,6 +31,9 @@ members = [
"iox_gitops_adapter",
"iox_object_store",
"iox_tests",
"ioxd_common",
"ioxd_database",
"ioxd_router",
"job_registry",
"lifecycle",
"logfmt",

View File

@ -24,6 +24,9 @@ internal_types = { path = "../internal_types" }
influxrpc_parser = { path = "../influxrpc_parser"}
iox_catalog = { path = "../iox_catalog" }
iox_object_store = { path = "../iox_object_store" }
ioxd_common = { path = "../ioxd_common"}
ioxd_database = { path = "../ioxd_database"}
ioxd_router = { path = "../ioxd_router"}
job_registry = { path = "../job_registry" }
logfmt = { path = "../logfmt" }
metric = { path = "../metric" }
@ -107,7 +110,6 @@ workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
# Workspace dependencies, in alphabetical order
arrow_util = { path = "../arrow_util" }
influxdb_storage_client = { path = "../influxdb_storage_client" }
iox_tests = { path = "../iox_tests" }
test_helpers = { path = "../test_helpers" }
test_helpers_end_to_end_ng = { path = "../test_helpers_end_to_end_ng" }

View File

@ -9,14 +9,13 @@ use clap_blocks::{
socket_addr::SocketAddr,
write_buffer::WriteBufferConfig,
};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use influxdb_ioxd::{
self,
server_type::{
common_state::{CommonServerState, CommonServerStateError},
compactor::create_compactor_server_type,
ingester::create_ingester_server_type,
querier::create_querier_server_type,
router2::create_router2_server_type,
compactor::create_compactor_server_type, ingester::create_ingester_server_type,
querier::create_querier_server_type, router2::create_router2_server_type,
},
Service,
};

View File

@ -10,14 +10,8 @@ use time::SystemProvider;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, compactor::CompactorConfig, run_config::RunConfig,
};
use influxdb_ioxd::{
self,
server_type::{
common_state::{CommonServerState, CommonServerStateError},
compactor::create_compactor_server_type,
},
Service,
};
use influxdb_ioxd::{self, server_type::compactor::create_compactor_server_type, Service};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
#[derive(Debug, Error)]
pub enum Error {

View File

@ -4,17 +4,13 @@ use std::sync::Arc;
use clap_blocks::run_config::RunConfig;
use data_types::boolean_flag::BooleanFlag;
use influxdb_ioxd::{
self,
server_type::{
common_state::{CommonServerState, CommonServerStateError},
database::{
setup::{make_application, make_server},
DatabaseServerType,
},
},
Service,
use influxdb_ioxd::{self, Service};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_database::{
setup::{make_application, make_server},
DatabaseServerType,
};
use thiserror::Error;
#[derive(Debug, Error)]
@ -23,7 +19,7 @@ pub enum Error {
Run(#[from] influxdb_ioxd::Error),
#[error("Cannot setup server: {0}")]
Setup(#[from] influxdb_ioxd::server_type::database::setup::Error),
Setup(#[from] ioxd_database::setup::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),

View File

@ -4,14 +4,8 @@ use clap_blocks::{
catalog_dsn::CatalogDsnConfig, ingester::IngesterConfig, run_config::RunConfig,
write_buffer::WriteBufferConfig,
};
use influxdb_ioxd::{
self,
server_type::{
common_state::{CommonServerState, CommonServerStateError},
ingester::create_ingester_server_type,
},
Service,
};
use influxdb_ioxd::{self, server_type::ingester::create_ingester_server_type, Service};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use object_store::{instrumentation::ObjectStoreMetrics, DynObjectStore, ObjectStoreImpl};
use observability_deps::tracing::*;
use query::exec::Executor;

View File

@ -8,14 +8,8 @@ use thiserror::Error;
use time::SystemProvider;
use clap_blocks::{catalog_dsn::CatalogDsnConfig, run_config::RunConfig};
use influxdb_ioxd::{
self,
server_type::{
common_state::{CommonServerState, CommonServerStateError},
querier::create_querier_server_type,
},
Service,
};
use influxdb_ioxd::{self, server_type::querier::create_querier_server_type, Service};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
#[derive(Debug, Error)]
pub enum Error {

View File

@ -7,14 +7,9 @@ use clap_blocks::run_config::RunConfig;
use data_types::router::Router as RouterConfig;
use generated_types::{google::FieldViolation, influxdata::iox::router::v1::RouterConfigFile};
use influxdb_ioxd::{
self,
server_type::{
common_state::{CommonServerState, CommonServerStateError},
router::RouterServerType,
},
Service,
};
use influxdb_ioxd::{self, Service};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_router::RouterServerType;
use observability_deps::tracing::warn;
use router::{resolver::RemoteTemplate, server::RouterServer};
use thiserror::Error;
@ -26,7 +21,7 @@ pub enum Error {
Run(#[from] influxdb_ioxd::Error),
#[error("Cannot setup server: {0}")]
Setup(#[from] influxdb_ioxd::server_type::database::setup::Error),
Setup(#[from] ioxd_database::setup::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),

View File

@ -5,14 +5,8 @@ use std::sync::Arc;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, run_config::RunConfig, write_buffer::WriteBufferConfig,
};
use influxdb_ioxd::{
self,
server_type::{
common_state::{CommonServerState, CommonServerStateError},
router2::create_router2_server_type,
},
Service,
};
use influxdb_ioxd::{self, server_type::router2::create_router2_server_type, Service};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use observability_deps::tracing::*;
use thiserror::Error;

View File

@ -5,12 +5,10 @@ use std::sync::Arc;
use clap_blocks::run_config::RunConfig;
use influxdb_ioxd::{
self,
server_type::{
common_state::{CommonServerState, CommonServerStateError},
test::{TestAction, TestServerType},
},
server_type::test::{TestAction, TestServerType},
Service,
};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use metric::Registry;
use thiserror::Error;

View File

@ -8,35 +8,20 @@ edition = "2021"
# Workspace dependencies, in alphabetical order
clap_blocks = { path = "../clap_blocks" }
compactor = { path = "../compactor" }
data_types = { path = "../data_types" }
data_types2 = { path = "../data_types2" }
db = { path = "../db" }
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
influxdb_storage_client = { path = "../influxdb_storage_client" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
ingester = { path = "../ingester" }
internal_types = { path = "../internal_types" }
iox_catalog = { path = "../iox_catalog" }
iox_object_store = { path = "../iox_object_store" }
job_registry = { path = "../job_registry" }
logfmt = { path = "../logfmt" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
metric_exporters = { path = "../metric_exporters" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
mutable_buffer = { path = "../mutable_buffer" }
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
panic_logging = { path = "../panic_logging" }
parquet_catalog = { path = "../parquet_catalog" }
parquet_file = { path = "../parquet_file" }
predicate = { path = "../predicate" }
querier = { path = "../querier" }
query = { path = "../query" }
read_buffer = { path = "../read_buffer" }
router = { path = "../router" }
router2 = { path = "../router2" }
server = { path = "../server" }
@ -48,40 +33,19 @@ time = { path = "../time" }
trace = { path = "../trace" }
trace_exporters = { path = "../trace_exporters" }
trace_http = { path = "../trace_http" }
tracker = { path = "../tracker" }
trogging = { path = "../trogging", default-features = false, features = ["clap"] }
write_buffer = { path = "../write_buffer" }
# Crates.io dependencies, in alphabetical order
ansi_term = "0.12"
arrow = { version = "11", features = ["prettyprint"] }
arrow-flight = "11"
async-trait = "0.1"
byteorder = "1.3.4"
bytes = "1.0"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "env"] }
# used by arrow/datafusion anyway
comfy-table = { version = "5.0", default-features = false }
csv = "1.1"
flate2 = "1.0"
futures = "0.3"
hashbrown = "0.12"
http = "0.2.0"
hyper = "0.14"
itertools = "0.10.1"
libc = { version = "0.2" }
log = "0.4"
num_cpus = "1.13.0"
once_cell = { version = "1.10.0", features = ["parking_lot"] }
parking_lot = "0.12"
parquet = "11"
pin-project = "1.0"
pprof = { version = "0.7", default-features = false, features = ["flamegraph", "prost-codec"], optional = true }
prost = "0.9"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.79"
serde_urlencoded = "0.7.0"
snafu = "0.7"
thiserror = "1.0.30"
tikv-jemalloc-ctl = { version = "0.4.0", optional = true }
@ -91,8 +55,6 @@ tokio-util = { version = "0.7.1" }
tonic = "0.6"
tonic-health = "0.5.0"
tonic-reflection = "0.3.0"
tower = "0.4"
uuid = { version = "0.8", features = ["v4"] }
# jemalloc-sys with unprefixed_malloc_on_supported_platforms feature and heappy are mutually exclusive
tikv-jemalloc-sys = { version = "0.4.0", optional = true, features = ["unprefixed_malloc_on_supported_platforms"] }
heappy = { git = "https://github.com/mkmik/heappy", rev = "1770cd0cde556d121e7f017538ddda0e1778126a", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true }
@ -102,16 +64,9 @@ workspace-hack = { path = "../workspace-hack"}
# Workspace dependencies, in alphabetical order
arrow_util = { path = "../arrow_util" }
test_helpers = { path = "../test_helpers" }
schema = { path = "../schema" }
# Crates.io dependencies, in alphabetical order
assert_cmd = "2.0.2"
base64 = "0.13"
hex = "0.4.2"
predicates = "2.1.0"
rand = "0.8.3"
regex = "1"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
tempfile = "3.1.0"
[features]

View File

@ -1,78 +1,36 @@
use crate::server_type::{common_state::CommonServerState, ServerType};
use clap_blocks::run_config::RunConfig;
use futures::{future::FusedFuture, pin_mut, FutureExt};
use hyper::server::conn::AddrIncoming;
use ioxd_common::{
grpc_listener, http_listener, serve,
server_type::{CommonServerState, ServerType},
};
use observability_deps::tracing::{error, info};
use panic_logging::SendPanicsToTracing;
use snafu::{ResultExt, Snafu};
use std::{net::SocketAddr, sync::Arc};
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use trace_http::ctx::TraceHeaderParser;
mod http;
#[cfg(all(not(feature = "heappy"), feature = "jemalloc_replacing_malloc"))]
mod jemalloc;
pub(crate) mod rpc;
pub mod server_type;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unable to bind to listen for HTTP requests on {}: {}", addr, source))]
StartListeningHttp {
addr: SocketAddr,
source: hyper::Error,
},
#[snafu(display("Unable to bind to listen for gRPC requests on {}: {}", addr, source))]
StartListeningGrpc {
addr: SocketAddr,
source: std::io::Error,
},
#[snafu(display("Error serving HTTP: {}", source))]
ServingHttp { source: hyper::Error },
#[snafu(display("Error serving RPC: {}", source))]
ServingRpc { source: server_type::RpcError },
#[snafu(display("{}", source))]
Wrapper { source: ioxd_common::Error },
#[snafu(display("Error joining server task: {}", source))]
Joining { source: tokio::task::JoinError },
#[snafu(display("Early Http shutdown"))]
LostHttp,
#[snafu(display("Early RPC shutdown"))]
LostRpc,
#[snafu(display("Early server shutdown"))]
LostServer,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// On unix platforms we want to intercept SIGINT and SIGTERM
/// This method returns if either are signalled
#[cfg(unix)]
async fn wait_for_signal() {
use tokio::signal::unix::{signal, SignalKind};
let mut term = signal(SignalKind::terminate()).expect("failed to register signal handler");
let mut int = signal(SignalKind::interrupt()).expect("failed to register signal handler");
tokio::select! {
_ = term.recv() => info!("Received SIGTERM"),
_ = int.recv() => info!("Received SIGINT"),
impl From<ioxd_common::Error> for Error {
fn from(source: ioxd_common::Error) -> Self {
Self::Wrapper { source }
}
}
#[cfg(windows)]
/// ctrl_c is the cross-platform way to intercept the equivalent of SIGINT
/// This method returns if this occurs
async fn wait_for_signal() {
let _ = tokio::signal::ctrl_c().await;
}
#[cfg(all(not(feature = "heappy"), not(feature = "jemalloc_replacing_malloc")))]
fn build_malloc_conf() -> String {
"system".to_string()
@ -244,170 +202,3 @@ pub async fn main(common_state: CommonServerState, services: Vec<Service>) -> Re
Ok(())
}
pub async fn grpc_listener(addr: SocketAddr) -> Result<tokio::net::TcpListener> {
let listener = tokio::net::TcpListener::bind(addr)
.await
.context(StartListeningGrpcSnafu { addr })?;
match listener.local_addr() {
Ok(local_addr) => info!(%local_addr, "bound gRPC listener"),
Err(_) => info!(%addr, "bound gRPC listener"),
}
Ok(listener)
}
pub async fn http_listener(addr: SocketAddr) -> Result<AddrIncoming> {
let listener = AddrIncoming::bind(&addr).context(StartListeningHttpSnafu { addr })?;
info!(bind_addr=%listener.local_addr(), "bound HTTP listener");
Ok(listener)
}
/// Instantiates the gRPC and optional HTTP listeners and returns a
/// Future that completes when these listeners, the Server, Databases,
/// etc... have all exited or the frontend_shutdown token is called.
async fn serve(
common_state: CommonServerState,
frontend_shutdown: CancellationToken,
grpc_listener: tokio::net::TcpListener,
http_listener: Option<AddrIncoming>,
server_type: Arc<dyn ServerType>,
) -> Result<()> {
let trace_header_parser = TraceHeaderParser::new()
.with_jaeger_trace_context_header_name(
&common_state
.run_config()
.tracing_config()
.traces_jaeger_trace_context_header_name,
)
.with_jaeger_debug_name(
&common_state
.run_config()
.tracing_config()
.traces_jaeger_debug_name,
);
// Construct and start up gRPC server
let grpc_server = rpc::serve(
grpc_listener,
Arc::clone(&server_type),
trace_header_parser.clone(),
frontend_shutdown.clone(),
)
.fuse();
info!("gRPC server listening");
let captured_server_type = Arc::clone(&server_type);
let captured_shutdown = frontend_shutdown.clone();
let http_server = async move {
if let Some(http_listener) = http_listener {
http::serve(
http_listener,
captured_server_type,
captured_shutdown,
trace_header_parser,
)
.await?
} else {
// don't resolve otherwise will cause server to shutdown
captured_shutdown.cancelled().await
}
Ok(())
}
.fuse();
info!("HTTP server listening");
// Purposefully use log not tokio-tracing to ensure correctly hooked up
log::info!("InfluxDB IOx server ready");
// Get IOx background worker join handle
let server_handle = Arc::clone(&server_type).join().fuse();
// Shutdown signal
let signal = wait_for_signal().fuse();
// There are two different select macros - tokio::select and futures::select
//
// tokio::select takes ownership of the passed future "moving" it into the
// select block. This works well when not running select inside a loop, or
// when using a future that can be dropped and recreated, often the case
// with tokio's futures e.g. `channel.recv()`
//
// futures::select is more flexible as it doesn't take ownership of the provided
// future. However, to safely provide this it imposes some additional
// requirements
//
// All passed futures must implement FusedFuture - it is IB to poll a future
// that has returned Poll::Ready(_). A FusedFuture has an is_terminated()
// method that indicates if it is safe to poll - e.g. false if it has
// returned Poll::Ready(_). futures::select uses this to implement its
// functionality. futures::FutureExt adds a fuse() method that
// wraps an arbitrary future and makes it a FusedFuture
//
// The additional requirement of futures::select is that if the future passed
// outlives the select block, it must be Unpin or already Pinned
// pin_mut constructs a Pin<&mut T> from a T by preventing moving the T
// from the current stack frame and constructing a Pin<&mut T> to it
pin_mut!(signal);
pin_mut!(server_handle);
pin_mut!(grpc_server);
pin_mut!(http_server);
// Return the first error encountered
let mut res = Ok(());
// Graceful shutdown can be triggered by sending SIGINT or SIGTERM to the
// process, or by a background task exiting - most likely with an error
//
// Graceful shutdown should then proceed in the following order
// 1. Stop accepting new HTTP and gRPC requests and drain existing connections
// 2. Trigger shutdown of internal background workers loops
//
// This is important to ensure background tasks, such as polling the tracker
// registry, don't exit before HTTP and gRPC requests dependent on them
while !grpc_server.is_terminated() && !http_server.is_terminated() {
futures::select! {
_ = signal => info!("Shutdown requested"),
_ = server_handle => {
error!("server worker shutdown prematurely");
res = res.and(Err(Error::LostServer));
},
result = grpc_server => match result {
Ok(_) if frontend_shutdown.is_cancelled() => info!("gRPC server shutdown"),
Ok(_) => {
error!("Early gRPC server exit");
res = res.and(Err(Error::LostRpc));
}
Err(error) => {
error!(%error, "gRPC server error");
res = res.and(Err(Error::ServingRpc{source: error}));
}
},
result = http_server => match result {
Ok(_) if frontend_shutdown.is_cancelled() => info!("HTTP server shutdown"),
Ok(_) => {
error!("Early HTTP server exit");
res = res.and(Err(Error::LostHttp));
}
Err(error) => {
error!(%error, "HTTP server error");
res = res.and(Err(Error::ServingHttp{source: error}));
}
},
}
frontend_shutdown.cancel()
}
info!("frontend shutdown completed");
server_type.shutdown();
if !server_handle.is_terminated() {
server_handle.await;
}
info!("backend shutdown completed");
res
}

View File

@ -17,12 +17,15 @@ use query::exec::Executor;
use time::TimeProvider;
use trace::TraceCollector;
use crate::{
http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource},
rpc::{add_service, serve_builder, setup_builder, RpcBuilderInput},
server_type::{common_state::CommonServerState, RpcError, ServerType},
};
use clap_blocks::compactor::CompactorConfig;
use ioxd_common::{
add_service,
http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource},
rpc::RpcBuilderInput,
serve_builder,
server_type::{CommonServerState, RpcError, ServerType},
setup_builder,
};
use thiserror::Error;
#[derive(Debug, Error)]

View File

@ -20,12 +20,15 @@ use object_store::DynObjectStore;
use query::exec::Executor;
use trace::TraceCollector;
use crate::{
http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource},
rpc::{add_service, serve_builder, setup_builder, RpcBuilderInput},
server_type::{common_state::CommonServerState, RpcError, ServerType},
};
use ingester::handler::IngestHandler;
use ioxd_common::{
add_service,
http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource},
rpc::RpcBuilderInput,
serve_builder,
server_type::{CommonServerState, RpcError, ServerType},
setup_builder,
};
use thiserror::Error;
use time::SystemProvider;

View File

@ -1,69 +1,5 @@
use std::sync::Arc;
use async_trait::async_trait;
use hyper::{Body, Request, Response};
use metric::Registry;
use snafu::Snafu;
use trace::TraceCollector;
use crate::{http::error::HttpApiErrorSource, rpc::RpcBuilderInput};
pub mod common_state;
pub mod compactor;
pub mod database;
pub mod ingester;
pub mod querier;
pub mod router;
pub mod router2;
pub mod test;
#[derive(Debug, Snafu)]
pub enum RpcError {
#[snafu(display("gRPC transport error: {}{}", source, details))]
TransportError {
source: tonic::transport::Error,
details: String,
},
}
// Custom impl to include underlying source (not included in tonic
// transport error)
impl From<tonic::transport::Error> for RpcError {
fn from(source: tonic::transport::Error) -> Self {
use std::error::Error;
let details = source
.source()
.map(|e| format!(" ({})", e))
.unwrap_or_else(|| "".to_string());
Self::TransportError { source, details }
}
}
#[async_trait]
pub trait ServerType: std::fmt::Debug + Send + Sync + 'static {
/// Metric registry associated with the server.
fn metric_registry(&self) -> Arc<Registry>;
/// Trace collector associated with the server, if any.
fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>>;
/// Route given HTTP request.
///
/// Note that this is only called if none of the shared, common routes (e.g. `/health`) match.
async fn route_http_request(
&self,
req: Request<Body>,
) -> Result<Response<Body>, Box<dyn HttpApiErrorSource>>;
/// Construct and serve gRPC subsystem.
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError>;
/// Join shutdown worker.
///
/// This MUST NOT exit before `shutdown` is called, otherwise the server is deemed to be dead and the process will exit.
async fn join(self: Arc<Self>);
/// Shutdown background worker.
fn shutdown(&self);
}

View File

@ -13,10 +13,13 @@ use query::exec::Executor;
use time::TimeProvider;
use trace::TraceCollector;
use crate::{
use ioxd_common::{
add_service,
http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource},
rpc::{add_service, serve_builder, setup_builder, RpcBuilderInput},
server_type::{common_state::CommonServerState, RpcError, ServerType},
rpc::RpcBuilderInput,
serve_builder,
server_type::{CommonServerState, RpcError, ServerType},
setup_builder,
};
mod rpc;

View File

@ -28,10 +28,13 @@ use trace::TraceCollector;
use thiserror::Error;
use crate::{
use ioxd_common::{
add_service,
http::error::{HttpApiError, HttpApiErrorSource},
rpc::{add_service, serve_builder, setup_builder, RpcBuilderInput},
server_type::{common_state::CommonServerState, RpcError, ServerType},
rpc::RpcBuilderInput,
serve_builder,
server_type::{CommonServerState, RpcError, ServerType},
setup_builder,
};
#[derive(Debug, Error)]

View File

@ -1,17 +1,18 @@
use std::sync::Arc;
use crate::{
http::error::{HttpApiError, HttpApiErrorExt, HttpApiErrorSource},
rpc::{serve_builder, setup_builder, RpcBuilderInput},
};
use async_trait::async_trait;
use hyper::{Body, Method, Request, Response};
use ioxd_common::{
http::error::{HttpApiError, HttpApiErrorExt, HttpApiErrorSource},
rpc::RpcBuilderInput,
serve_builder, setup_builder,
};
use metric::Registry;
use snafu::Snafu;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
use super::{RpcError, ServerType};
use ioxd_common::server_type::{RpcError, ServerType};
#[derive(Debug, Snafu)]
pub enum ApplicationError {

51
ioxd_common/Cargo.toml Normal file
View File

@ -0,0 +1,51 @@
[package]
name = "ioxd_common"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Workspace dependencies, in alphabetical order
clap_blocks = { path = "../clap_blocks" }
data_types = { path = "../data_types" }
dml = { path = "../dml" }
metric = { path = "../metric" }
observability_deps = { path = "../observability_deps" }
predicate = { path = "../predicate" }
metric_exporters = { path = "../metric_exporters" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
trace = { path = "../trace" }
trace_exporters = { path = "../trace_exporters" }
trace_http = { path = "../trace_http" }
# Crates.io dependencies, in alphabetical order
async-trait = "0.1"
bytes = "1.0"
clap = { version = "3", features = ["derive", "env"] }
chrono = { version = "0.4", default-features = false }
flate2 = "1.0"
futures = "0.3"
hashbrown = "0.12"
http = "0.2.0"
hyper = "0.14"
log = "0.4"
parking_lot = "0.12"
prost = "0.9"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.79"
serde_urlencoded = "0.7.0"
snafu = "0.7"
tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-util = { version = "0.7.0" }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.6"
tonic-health = "0.5.0"
tower = "0.4"
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
# Workspace dependencies, in alphabetical order
# Crates.io dependencies, in alphabetical order

View File

@ -410,7 +410,6 @@ struct WriteInfo {
precision: Precision,
}
#[cfg(test)]
pub mod test_utils {
use http::{header::CONTENT_ENCODING, StatusCode};
use reqwest::Client;

View File

@ -28,7 +28,6 @@ pub mod error;
pub mod metrics;
pub mod utils;
#[cfg(test)]
pub mod test_utils;
#[allow(clippy::large_enum_variant)]

232
ioxd_common/src/lib.rs Normal file
View File

@ -0,0 +1,232 @@
pub mod http;
pub mod rpc;
pub mod server_type;
use crate::server_type::{CommonServerState, ServerType};
use futures::{future::FusedFuture, pin_mut, FutureExt};
use hyper::server::conn::AddrIncoming;
use observability_deps::tracing::{error, info};
use snafu::{ResultExt, Snafu};
use std::{net::SocketAddr, sync::Arc};
use tokio_util::sync::CancellationToken;
use trace_http::ctx::TraceHeaderParser;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unable to bind to listen for HTTP requests on {}: {}", addr, source))]
StartListeningHttp {
addr: SocketAddr,
source: hyper::Error,
},
#[snafu(display("Unable to bind to listen for gRPC requests on {}: {}", addr, source))]
StartListeningGrpc {
addr: SocketAddr,
source: std::io::Error,
},
#[snafu(display("Error serving HTTP: {}", source))]
ServingHttp { source: hyper::Error },
#[snafu(display("Error serving RPC: {}", source))]
ServingRpc { source: server_type::RpcError },
#[snafu(display("Early Http shutdown"))]
LostHttp,
#[snafu(display("Early RPC shutdown"))]
LostRpc,
#[snafu(display("Early server shutdown"))]
LostServer,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// On unix platforms we want to intercept SIGINT and SIGTERM
/// This method returns if either are signalled
#[cfg(unix)]
pub async fn wait_for_signal() {
use tokio::signal::unix::{signal, SignalKind};
let mut term = signal(SignalKind::terminate()).expect("failed to register signal handler");
let mut int = signal(SignalKind::interrupt()).expect("failed to register signal handler");
tokio::select! {
_ = term.recv() => info!("Received SIGTERM"),
_ = int.recv() => info!("Received SIGINT"),
}
}
#[cfg(windows)]
/// ctrl_c is the cross-platform way to intercept the equivalent of SIGINT
/// This method returns if this occurs
pub async fn wait_for_signal() {
let _ = tokio::signal::ctrl_c().await;
}
pub async fn grpc_listener(addr: SocketAddr) -> Result<tokio::net::TcpListener> {
let listener = tokio::net::TcpListener::bind(addr)
.await
.context(StartListeningGrpcSnafu { addr })?;
match listener.local_addr() {
Ok(local_addr) => info!(%local_addr, "bound gRPC listener"),
Err(_) => info!(%addr, "bound gRPC listener"),
}
Ok(listener)
}
pub async fn http_listener(addr: SocketAddr) -> Result<AddrIncoming> {
let listener = AddrIncoming::bind(&addr).context(StartListeningHttpSnafu { addr })?;
info!(bind_addr=%listener.local_addr(), "bound HTTP listener");
Ok(listener)
}
/// Instantiates the gRPC and optional HTTP listeners and returns a
/// Future that completes when these listeners, the Server, Databases,
/// etc... have all exited or the frontend_shutdown token is called.
pub async fn serve(
common_state: CommonServerState,
frontend_shutdown: CancellationToken,
grpc_listener: tokio::net::TcpListener,
http_listener: Option<AddrIncoming>,
server_type: Arc<dyn ServerType>,
) -> Result<()> {
let trace_header_parser = TraceHeaderParser::new()
.with_jaeger_trace_context_header_name(
&common_state
.run_config()
.tracing_config()
.traces_jaeger_trace_context_header_name,
)
.with_jaeger_debug_name(
&common_state
.run_config()
.tracing_config()
.traces_jaeger_debug_name,
);
// Construct and start up gRPC server
let grpc_server = rpc::serve(
grpc_listener,
Arc::clone(&server_type),
trace_header_parser.clone(),
frontend_shutdown.clone(),
)
.fuse();
info!("gRPC server listening");
let captured_server_type = Arc::clone(&server_type);
let captured_shutdown = frontend_shutdown.clone();
let http_server = async move {
if let Some(http_listener) = http_listener {
http::serve(
http_listener,
captured_server_type,
captured_shutdown,
trace_header_parser,
)
.await?
} else {
// don't resolve otherwise will cause server to shutdown
captured_shutdown.cancelled().await
}
Ok(())
}
.fuse();
info!("HTTP server listening");
// Purposefully use log not tokio-tracing to ensure correctly hooked up
log::info!("InfluxDB IOx server ready");
// Get IOx background worker join handle
let server_handle = Arc::clone(&server_type).join().fuse();
// Shutdown signal
let signal = wait_for_signal().fuse();
// There are two different select macros - tokio::select and futures::select
//
// tokio::select takes ownership of the passed future "moving" it into the
// select block. This works well when not running select inside a loop, or
// when using a future that can be dropped and recreated, often the case
// with tokio's futures e.g. `channel.recv()`
//
// futures::select is more flexible as it doesn't take ownership of the provided
// future. However, to safely provide this it imposes some additional
// requirements
//
// All passed futures must implement FusedFuture - it is IB to poll a future
// that has returned Poll::Ready(_). A FusedFuture has an is_terminated()
// method that indicates if it is safe to poll - e.g. false if it has
// returned Poll::Ready(_). futures::select uses this to implement its
// functionality. futures::FutureExt adds a fuse() method that
// wraps an arbitrary future and makes it a FusedFuture
//
// The additional requirement of futures::select is that if the future passed
// outlives the select block, it must be Unpin or already Pinned
// pin_mut constructs a Pin<&mut T> from a T by preventing moving the T
// from the current stack frame and constructing a Pin<&mut T> to it
pin_mut!(signal);
pin_mut!(server_handle);
pin_mut!(grpc_server);
pin_mut!(http_server);
// Return the first error encountered
let mut res = Ok(());
// Graceful shutdown can be triggered by sending SIGINT or SIGTERM to the
// process, or by a background task exiting - most likely with an error
//
// Graceful shutdown should then proceed in the following order
// 1. Stop accepting new HTTP and gRPC requests and drain existing connections
// 2. Trigger shutdown of internal background workers loops
//
// This is important to ensure background tasks, such as polling the tracker
// registry, don't exit before HTTP and gRPC requests dependent on them
while !grpc_server.is_terminated() && !http_server.is_terminated() {
futures::select! {
_ = signal => info!("Shutdown requested"),
_ = server_handle => {
error!("server worker shutdown prematurely");
res = res.and(Err(Error::LostServer));
},
result = grpc_server => match result {
Ok(_) if frontend_shutdown.is_cancelled() => info!("gRPC server shutdown"),
Ok(_) => {
error!("Early gRPC server exit");
res = res.and(Err(Error::LostRpc));
}
Err(error) => {
error!(%error, "gRPC server error");
res = res.and(Err(Error::ServingRpc{source: error}));
}
},
result = http_server => match result {
Ok(_) if frontend_shutdown.is_cancelled() => info!("HTTP server shutdown"),
Ok(_) => {
error!("Early HTTP server exit");
res = res.and(Err(Error::LostHttp));
}
Err(error) => {
error!(%error, "HTTP server error");
res = res.and(Err(Error::ServingHttp{source: error}));
}
},
}
frontend_shutdown.cancel()
}
info!("frontend shutdown completed");
server_type.shutdown();
if !server_handle.is_terminated() {
server_handle.await;
}
info!("backend shutdown completed");
res
}

View File

@ -30,6 +30,7 @@ pub struct RpcBuilder<T> {
/// Adds a gRPC service to the builder, and registers it with the
/// health reporter
#[macro_export]
macro_rules! add_service {
($builder:ident, $svc:expr) => {
let $builder = {
@ -66,19 +67,15 @@ macro_rules! add_service {
};
}
pub(crate) use add_service;
/// Creates a [`RpcBuilder`] from [`RpcBuilderInput`].
///
/// The resulting builder can be used w/ [`add_service`]. After adding all services it should
/// be used w/ [`serve_builder`].
#[macro_export]
macro_rules! setup_builder {
($input:ident, $server_type:ident) => {{
#[allow(unused_imports)]
use $crate::{
rpc::{add_service, RpcBuilder},
server_type::ServerType,
};
use ioxd_common::{add_service, rpc::RpcBuilder, server_type::ServerType};
let RpcBuilderInput {
socket,
@ -115,9 +112,8 @@ macro_rules! setup_builder {
}};
}
pub(crate) use setup_builder;
/// Serve a server constructed using [`RpcBuilder`].
#[macro_export]
macro_rules! serve_builder {
($builder:ident) => {{
use tokio_stream::wrappers::TcpListenerStream;
@ -137,8 +133,6 @@ macro_rules! serve_builder {
}};
}
pub(crate) use serve_builder;
/// Instantiate a server listening on the specified address
/// implementing the IOx, Storage, and Flight gRPC interfaces, the
/// underlying hyper server instance. Resolves when the server has

View File

@ -0,0 +1,64 @@
mod common_state;
use std::sync::Arc;
use async_trait::async_trait;
use hyper::{Body, Request, Response};
use metric::Registry;
use snafu::Snafu;
use trace::TraceCollector;
pub use common_state::{CommonServerState, CommonServerStateError};
use crate::{http::error::HttpApiErrorSource, rpc::RpcBuilderInput};
#[derive(Debug, Snafu)]
pub enum RpcError {
#[snafu(display("gRPC transport error: {}{}", source, details))]
TransportError {
source: tonic::transport::Error,
details: String,
},
}
// Custom impl to include underlying source (not included in tonic
// transport error)
impl From<tonic::transport::Error> for RpcError {
fn from(source: tonic::transport::Error) -> Self {
use std::error::Error;
let details = source
.source()
.map(|e| format!(" ({})", e))
.unwrap_or_else(|| "".to_string());
Self::TransportError { source, details }
}
}
#[async_trait]
pub trait ServerType: std::fmt::Debug + Send + Sync + 'static {
/// Metric registry associated with the server.
fn metric_registry(&self) -> Arc<Registry>;
/// Trace collector associated with the server, if any.
fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>>;
/// Route given HTTP request.
///
/// Note that this is only called if none of the shared, common routes (e.g. `/health`) match.
async fn route_http_request(
&self,
req: Request<Body>,
) -> Result<Response<Body>, Box<dyn HttpApiErrorSource>>;
/// Construct and serve gRPC subsystem.
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError>;
/// Join shutdown worker.
///
/// This MUST NOT exit before `shutdown` is called, otherwise the server is deemed to be dead and the process will exit.
async fn join(self: Arc<Self>);
/// Shutdown background worker.
fn shutdown(&self);
}

View File

@ -28,7 +28,6 @@ impl CommonServerState {
})
}
#[cfg(test)]
pub fn for_testing() -> Self {
use clap::Parser;

65
ioxd_database/Cargo.toml Normal file
View File

@ -0,0 +1,65 @@
[package]
name = "ioxd_database"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Workspace dependencies, in alphabetical order
clap_blocks = { path = "../clap_blocks" }
data_types = { path = "../data_types" }
db = { path = "../db" }
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
ioxd_common = { path = "../ioxd_common" }
job_registry = { path = "../job_registry" }
metric = { path = "../metric" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
query = { path = "../query" }
server = { path = "../server" }
service_common = { path = "../service_common" }
service_grpc_flight = { path = "../service_grpc_flight" }
service_grpc_influxrpc = { path = "../service_grpc_influxrpc" }
service_grpc_testing = { path = "../service_grpc_testing" }
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
tracker = { path = "../tracker" }
uuid = { version = "0.8", features = ["v4"] }
# Crates.io dependencies, in alphabetical order
arrow-flight = "11"
async-trait = "0.1"
bytes = "1.0"
futures = "0.3"
http = "0.2.0"
hyper = "0.14"
prost = "0.9"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.79"
serde_urlencoded = "0.7.0"
snafu = "0.7"
tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7.0" }
tonic = "0.6"
tonic-health = "0.5.0"
tonic-reflection = "0.3.0"
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
# Workspace dependencies, in alphabetical order
arrow_util = { path = "../arrow_util" }
influxdb_storage_client = { path = "../influxdb_storage_client" }
test_helpers = { path = "../test_helpers" }
trace_exporters = { path = "../trace_exporters" }
schema = { path = "../schema" }
# Crates.io dependencies, in alphabetical order
arrow = "11"
clap = { version = "3", features = ["derive", "env"] }
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }

View File

@ -24,12 +24,12 @@ use observability_deps::tracing::{debug, error};
use serde::Deserialize;
use snafu::{OptionExt, ResultExt, Snafu};
use crate::http::{
use dml::DmlOperation;
use ioxd_common::http::{
dml::{HttpDrivenDml, InnerDmlError, RequestOrResponse},
error::{HttpApiError, HttpApiErrorExt, HttpApiErrorSource},
metrics::LineProtocolMetrics,
};
use dml::DmlOperation;
use service_common::planner::Planner;
use std::{
fmt::Debug,
@ -113,7 +113,7 @@ pub enum ApplicationError {
#[snafu(display("Cannot perform DML operation: {}", source))]
DmlError {
source: crate::http::dml::HttpDmlError,
source: ioxd_common::http::dml::HttpDmlError,
},
}
@ -282,27 +282,24 @@ async fn query(
#[cfg(test)]
mod tests {
use super::*;
use crate::http::dml::test_utils::assert_write_precision;
use crate::{
http::{
dml::test_utils::{
assert_delete_bad_request, assert_delete_unknown_database,
assert_delete_unknown_table, assert_gzip_write, assert_write, assert_write_metrics,
assert_write_to_invalid_database,
},
test_utils::{
assert_health, assert_metrics, assert_tracing, check_response, get_content_type,
TestServer,
},
},
server_type::common_state::CommonServerState,
};
use arrow::record_batch::RecordBatch;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
use db::Db;
use dml::DmlWrite;
use http::StatusCode;
use ioxd_common::http::{
dml::test_utils::{
assert_delete_bad_request, assert_delete_unknown_database, assert_delete_unknown_table,
assert_gzip_write, assert_write, assert_write_metrics, assert_write_precision,
assert_write_to_invalid_database,
},
test_utils::{
assert_health, assert_metrics, assert_tracing, check_response, get_content_type,
TestServer,
},
};
use ioxd_common::server_type::CommonServerState;
use object_store::ObjectStoreImpl;
use reqwest::Client;
use schema::selection::Selection;

View File

@ -1,11 +1,11 @@
use crate::{
http::{error::HttpApiErrorSource, metrics::LineProtocolMetrics},
rpc::RpcBuilderInput,
server_type::{RpcError, ServerType},
};
use async_trait::async_trait;
use futures::{future::FusedFuture, FutureExt};
use hyper::{Body, Request, Response};
use ioxd_common::{
http::{error::HttpApiErrorSource, metrics::LineProtocolMetrics},
rpc::RpcBuilderInput,
server_type::{CommonServerState, RpcError, ServerType},
};
use metric::Registry;
use observability_deps::tracing::{error, info};
use server::{ApplicationState, Server};
@ -19,7 +19,6 @@ mod rpc;
pub mod setup;
pub use self::http::ApplicationError;
use super::common_state::CommonServerState;
#[derive(Debug)]
pub struct DatabaseServerType {
@ -109,10 +108,9 @@ impl ServerType for DatabaseServerType {
mod tests {
use clap_blocks::run_config::RunConfig;
use crate::{
grpc_listener, http_listener, serve,
server_type::database::setup::{make_application, make_server},
};
use ioxd_common::{grpc_listener, http_listener, serve};
use crate::setup::{make_application, make_server};
use super::*;
use ::http::{header::HeaderName, HeaderValue};
@ -204,7 +202,7 @@ mod tests {
run_config: RunConfig,
application: Arc<ApplicationState>,
server: Arc<Server>,
) -> Result<(), crate::Error> {
) -> Result<(), ioxd_common::Error> {
let grpc_listener = grpc_listener(run_config.grpc_bind_address.into())
.await
.unwrap();
@ -393,7 +391,7 @@ mod tests {
async fn tracing_server<T: TraceCollector + 'static>(
collector: &Arc<T>,
) -> (SocketAddr, Arc<Server>, JoinHandle<crate::Result<()>>) {
) -> (SocketAddr, Arc<Server>, JoinHandle<ioxd_common::Result<()>>) {
// Create a server and wait for it to initialize
let (application, server, run_config) = TestServerBuilder::new()
.with_server_id(Some(23))
@ -503,7 +501,7 @@ mod tests {
// shutdown server early
server.shutdown();
let res = join.await.unwrap();
assert_error!(res, crate::Error::LostServer);
assert_error!(res, ioxd_common::Error::LostServer);
}
/// Ensure that query is fully executed.
@ -574,7 +572,7 @@ mod tests {
// early shutdown
server.shutdown();
let res = join.await.unwrap();
assert_error!(res, crate::Error::LostServer);
assert_error!(res, ioxd_common::Error::LostServer);
// Check generated traces
@ -645,7 +643,7 @@ mod tests {
// early shutdown
server.shutdown();
let res = join.await.unwrap();
assert_error!(res, crate::Error::LostServer);
assert_error!(res, ioxd_common::Error::LostServer);
let span = receiver.recv().await.unwrap();
assert_eq!(span.ctx.trace_id.get(), 0x34f8495);

View File

@ -1,8 +1,8 @@
use std::sync::Arc;
use crate::{
rpc::{add_service, serve_builder, setup_builder, RpcBuilderInput},
server_type::{database::DatabaseServerType, RpcError},
use crate::DatabaseServerType;
use ioxd_common::{
add_service, rpc::RpcBuilderInput, serve_builder, server_type::RpcError, setup_builder,
};
mod delete;

View File

@ -8,7 +8,7 @@ use server::{ApplicationState, Server, ServerConfig};
use snafu::{ResultExt, Snafu};
use trace::TraceCollector;
use crate::server_type::database::config::ServerConfigFile;
use crate::config::ServerConfigFile;
use clap_blocks::object_store::{check_object_store, warn_about_inmem_store};
#[derive(Debug, Snafu)]

42
ioxd_router/Cargo.toml Normal file
View File

@ -0,0 +1,42 @@
[package]
name = "ioxd_router"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Workspace dependencies, in alphabetical order
data_types = { path = "../data_types" }
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
router = { path = "../router" }
service_grpc_testing = { path = "../service_grpc_testing" }
time = { path = "../time" }
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
# Crates.io dependencies, in alphabetical order
async-trait = "0.1"
http = "0.2.0"
hyper = "0.14"
snafu = "0.7"
tonic = "0.6"
tonic-health = "0.5.0"
tonic-reflection = "0.3.0"
tokio = { version = "1.17", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7.0" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
# Workspace dependencies, in alphabetical order
# Crates.io dependencies, in alphabetical order
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
regex = "1"

View File

@ -6,7 +6,7 @@ use dml::DmlOperation;
use hyper::{Body, Method, Request, Response};
use snafu::{ResultExt, Snafu};
use crate::http::{
use ioxd_common::http::{
dml::{HttpDrivenDml, InnerDmlError, RequestOrResponse},
error::{HttpApiError, HttpApiErrorExt, HttpApiErrorSource},
metrics::LineProtocolMetrics,
@ -21,7 +21,7 @@ pub enum ApplicationError {
#[snafu(display("Cannot write data: {}", source))]
WriteError {
source: crate::http::dml::HttpDmlError,
source: ioxd_common::http::dml::HttpDmlError,
},
}
@ -98,7 +98,7 @@ mod tests {
use time::SystemProvider;
use trace::RingBufferTraceCollector;
use crate::{
use ioxd_common::{
http::{
dml::test_utils::{
assert_delete_bad_request, assert_delete_unknown_database, assert_gzip_write,
@ -109,7 +109,7 @@ mod tests {
assert_health, assert_metrics, assert_tracing, check_response, TestServer,
},
},
server_type::common_state::CommonServerState,
server_type::CommonServerState,
};
use super::*;

View File

@ -7,10 +7,10 @@ use router::server::RouterServer;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
use crate::{
use ioxd_common::{
http::{error::HttpApiErrorSource, metrics::LineProtocolMetrics},
rpc::RpcBuilderInput,
server_type::{common_state::CommonServerState, RpcError, ServerType},
server_type::{CommonServerState, RpcError, ServerType},
};
mod http;

View File

@ -1,8 +1,7 @@
use std::sync::Arc;
use crate::{
rpc::{add_service, serve_builder, setup_builder, RpcBuilderInput},
server_type::RpcError,
use ioxd_common::{
add_service, rpc::RpcBuilderInput, serve_builder, server_type::RpcError, setup_builder,
};
use super::RouterServerType;