Merge pull request #6303 from influxdata/cn/ingester2

feat: Add an ingester2 command behind the rpc_write feature flag
pull/24376/head
kodiakhq[bot] 2022-12-02 13:46:30 +00:00 committed by GitHub
commit 0e21f3dcfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 439 additions and 97 deletions

19
Cargo.lock generated
View File

@ -2292,6 +2292,7 @@ dependencies = [
"ioxd_compactor",
"ioxd_garbage_collector",
"ioxd_ingester",
"ioxd_ingester2",
"ioxd_querier",
"ioxd_router",
"ioxd_test",
@ -2782,6 +2783,24 @@ dependencies = [
"write_buffer",
]
[[package]]
name = "ioxd_ingester2"
version = "0.1.0"
dependencies = [
"async-trait",
"clap_blocks",
"hyper",
"ingester2",
"iox_catalog",
"ioxd_common",
"metric",
"thiserror",
"tokio",
"tokio-util",
"trace",
"workspace-hack",
]
[[package]]
name = "ioxd_querier"
version = "0.1.0"

View File

@ -35,6 +35,7 @@ members = [
"ioxd_common",
"ioxd_compactor",
"ioxd_ingester",
"ioxd_ingester2",
"ioxd_garbage_collector",
"ioxd_querier",
"ioxd_router",

View File

@ -0,0 +1,32 @@
//! CLI config for the ingester using the RPC write path
use std::path::PathBuf;
/// CLI config for the ingester using the RPC write path
#[derive(Debug, Clone, clap::Parser)]
#[allow(missing_copy_implementations)]
pub struct Ingester2Config {
/// Where this ingester instance should store its write-ahead log files. Each ingester instance
/// must have its own directory.
#[clap(long = "wal-directory", env = "INFLUXDB_IOX_WAL_DIRECTORY", action)]
pub wal_directory: PathBuf,
/// The number of seconds between WAL file rotations.
#[clap(
long = "wal-rotation-period-seconds",
env = "INFLUXDB_IOX_WAL_ROTATION_PERIOD_SECONDS",
default_value = "300",
action
)]
pub wal_rotation_period_seconds: u64,
/// Sets how many queries the ingester will handle simultaneously before
/// rejecting further incoming requests.
#[clap(
long = "concurrent-query-limit",
env = "INFLUXDB_IOX_CONCURRENT_QUERY_LIMIT",
default_value = "20",
action
)]
pub concurrent_query_limit: usize,
}

View File

@ -15,9 +15,11 @@
pub mod catalog_dsn;
pub mod compactor;
pub mod ingester;
pub mod ingester2;
pub mod object_store;
pub mod querier;
pub mod router;
pub mod router_rpc_write;
pub mod run_config;
pub mod socket_addr;
pub mod write_buffer;

View File

@ -0,0 +1,70 @@
//! CLI config for the router using the RPC write path
/// CLI config for the router using the RPC write path
#[derive(Debug, Clone, clap::Parser)]
#[allow(missing_copy_implementations)]
pub struct RouterRpcWriteConfig {
/// The maximum number of simultaneous requests the HTTP server is
/// configured to accept.
///
/// This number of requests, multiplied by the maximum request body size the
/// HTTP server is configured with gives the rough amount of memory a HTTP
/// server will use to buffer request bodies in memory.
///
/// A default maximum of 200 requests, multiplied by the default 10MiB
/// maximum for HTTP request bodies == ~2GiB.
#[clap(
long = "max-http-requests",
env = "INFLUXDB_IOX_MAX_HTTP_REQUESTS",
default_value = "200",
action
)]
pub http_request_limit: usize,
/// gRPC address for the router to talk with the ingesters. For
/// example:
///
/// "http://127.0.0.1:8083"
///
/// or
///
/// "http://10.10.10.1:8083,http://10.10.10.2:8083"
///
/// for multiple addresses.
#[clap(
long = "ingester-addresses",
env = "INFLUXDB_IOX_INGESTER_ADDRESSES",
required = true
)]
pub ingester_addresses: Vec<String>,
/// Write buffer topic/database that should be used.
// This isn't really relevant to the RPC write path and will be removed eventually.
#[clap(
long = "write-buffer-topic",
env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC",
default_value = "iox-shared",
action
)]
pub topic: String,
/// Query pool name to dispatch writes to.
// This isn't really relevant to the RPC write path and will be removed eventually.
#[clap(
long = "query-pool",
env = "INFLUXDB_IOX_QUERY_POOL_NAME",
default_value = "iox-shared",
action
)]
pub query_pool_name: String,
/// Retention period to use when auto-creating namespaces.
/// For infinite retention, leave this unset and it will default to `None`.
/// Setting it to zero will not make it infinite.
#[clap(
long = "new-namespace-retention-hours",
env = "INFLUXDB_IOX_NEW_NAMESPACE_RETENTION_HOURS",
action
)]
pub new_namespace_retention_hours: Option<u64>,
}

View File

@ -21,6 +21,7 @@ iox_catalog = { path = "../iox_catalog" }
ioxd_common = { path = "../ioxd_common"}
ioxd_compactor = { path = "../ioxd_compactor"}
ioxd_ingester = { path = "../ioxd_ingester"}
ioxd_ingester2 = { path = "../ioxd_ingester2"}
ioxd_garbage_collector = { path = "../ioxd_garbage_collector" }
ioxd_querier = { path = "../ioxd_querier"}
ioxd_router = { path = "../ioxd_router"}

View File

@ -0,0 +1,80 @@
//! Command line options for running an ingester for a router using the RPC write path to talk to.
use super::main;
use crate::process_info::setup_metric_registry;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, ingester2::Ingester2Config, run_config::RunConfig,
};
use ioxd_common::{
server_type::{CommonServerState, CommonServerStateError},
Service,
};
use ioxd_ingester2::create_ingester_server_type;
use observability_deps::tracing::*;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
Run(#[from] main::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
#[error("error initializing ingester2: {0}")]
Ingester(#[from] ioxd_ingester2::Error),
#[error("Catalog DSN error: {0}")]
CatalogDsn(#[from] clap_blocks::catalog_dsn::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, clap::Parser)]
#[clap(
name = "run",
about = "Runs in ingester mode",
long_about = "Run the IOx ingester server.\n\nThe configuration options below can be \
set either with the command line flags or with the specified environment \
variable. If there is a file named '.env' in the current working directory, \
it is sourced before loading the configuration.
Configuration is loaded from the following sources (highest precedence first):
- command line arguments
- user set environment variables
- .env file contents
- pre-configured default values"
)]
pub struct Config {
#[clap(flatten)]
pub(crate) run_config: RunConfig,
#[clap(flatten)]
pub(crate) catalog_dsn: CatalogDsnConfig,
#[clap(flatten)]
pub(crate) ingester_config: Ingester2Config,
}
pub async fn command(config: Config) -> Result<()> {
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let metric_registry = setup_metric_registry();
let catalog = config
.catalog_dsn
.get_catalog("ingester", Arc::clone(&metric_registry))
.await?;
let server_type = create_ingester_server_type(
&common_state,
catalog,
Arc::clone(&metric_registry),
&config.ingester_config,
)
.await?;
info!("starting ingester2");
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(main::main(common_state, services, metric_registry).await?)
}

View File

@ -5,6 +5,8 @@ pub(crate) mod all_in_one;
mod compactor;
mod garbage_collector;
mod ingester;
#[cfg(feature = "rpc_write")]
mod ingester2;
mod main;
mod querier;
mod router;
@ -34,6 +36,10 @@ pub enum Error {
#[snafu(display("Error in ingester subcommand: {}", source))]
IngesterError { source: ingester::Error },
#[cfg(feature = "rpc_write")]
#[snafu(display("Error in ingester2 subcommand: {}", source))]
Ingester2Error { source: ingester2::Error },
#[snafu(display("Error in all in one subcommand: {}", source))]
AllInOneError { source: all_in_one::Error },
@ -64,6 +70,8 @@ impl Config {
#[cfg(feature = "rpc_write")]
Some(Command::RouterRpcWrite(config)) => config.run_config.logging_config(),
Some(Command::Ingester(config)) => config.run_config.logging_config(),
#[cfg(feature = "rpc_write")]
Some(Command::Ingester2(config)) => config.run_config.logging_config(),
Some(Command::AllInOne(config)) => &config.logging_config,
Some(Command::Test(config)) => config.run_config.logging_config(),
}
@ -88,6 +96,10 @@ enum Command {
/// Run the server in ingester mode
Ingester(ingester::Config),
/// Run the server in ingester2 mode
#[cfg(feature = "rpc_write")]
Ingester2(ingester2::Config),
/// Run the server in "all in one" mode (Default)
AllInOne(all_in_one::Config),
@ -116,6 +128,10 @@ pub async fn command(config: Config) -> Result<()> {
.await
.context(RouterRpcWriteSnafu),
Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu),
#[cfg(feature = "rpc_write")]
Some(Command::Ingester2(config)) => {
ingester2::command(config).await.context(Ingester2Snafu)
}
Some(Command::AllInOne(config)) => all_in_one::command(config).await.context(AllInOneSnafu),
Some(Command::Test(config)) => test::command(config).await.context(TestSnafu),
}

View File

@ -2,7 +2,8 @@
use super::main;
use crate::process_info::setup_metric_registry;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, object_store::make_object_store, run_config::RunConfig,
catalog_dsn::CatalogDsnConfig, object_store::make_object_store,
router_rpc_write::RouterRpcWriteConfig, run_config::RunConfig,
};
use iox_time::{SystemProvider, TimeProvider};
use ioxd_common::{
@ -58,69 +59,8 @@ pub struct Config {
#[clap(flatten)]
pub(crate) catalog_dsn: CatalogDsnConfig,
/// The maximum number of simultaneous requests the HTTP server is
/// configured to accept.
///
/// This number of requests, multiplied by the maximum request body size the
/// HTTP server is configured with gives the rough amount of memory a HTTP
/// server will use to buffer request bodies in memory.
///
/// A default maximum of 200 requests, multiplied by the default 10MiB
/// maximum for HTTP request bodies == ~2GiB.
#[clap(
long = "max-http-requests",
env = "INFLUXDB_IOX_MAX_HTTP_REQUESTS",
default_value = "200",
action
)]
pub(crate) http_request_limit: usize,
/// gRPC address for the router to talk with the ingesters. For
/// example:
///
/// "http://127.0.0.1:8083"
///
/// or
///
/// "http://10.10.10.1:8083,http://10.10.10.2:8083"
///
/// for multiple addresses.
#[clap(
long = "ingester-addresses",
env = "INFLUXDB_IOX_INGESTER_ADDRESSES",
required = true
)]
pub(crate) ingester_addresses: Vec<String>,
/// Write buffer topic/database that should be used.
// This isn't really relevant to the RPC write path and will be removed eventually.
#[clap(
long = "write-buffer-topic",
env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC",
default_value = "iox-shared",
action
)]
pub(crate) topic: String,
/// Query pool name to dispatch writes to.
// This isn't really relevant to the RPC write path and will be removed eventually.
#[clap(
long = "query-pool",
env = "INFLUXDB_IOX_QUERY_POOL_NAME",
default_value = "iox-shared",
action
)]
pub(crate) query_pool_name: String,
/// Retention period to use when auto-creating namespaces.
/// For infinite retention, leave this unset and it will default to `None`.
/// Setting it to zero will not make it infinite.
#[clap(
long = "new-namespace-retention-hours",
env = "INFLUXDB_IOX_NEW_NAMESPACE_RETENTION_HOURS",
action
)]
pub(crate) new_namespace_retention_hours: Option<u64>,
#[clap(flatten)]
pub(crate) router_config: RouterRpcWriteConfig,
}
pub async fn command(config: Config) -> Result<()> {
@ -147,11 +87,7 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&metrics),
catalog,
object_store,
config.http_request_limit,
&config.ingester_addresses,
config.topic.as_ref(),
config.query_pool_name.as_ref(),
config.new_namespace_retention_hours,
&config.router_config,
)
.await?;

View File

@ -44,10 +44,7 @@ pub trait IngesterRpcInterface: Send + Sync + std::fmt::Debug {
/// Acquire an opaque handle to the Ingester's [`CatalogService`] RPC
/// handler implementation.
fn catalog_service(
&self,
catalog: Arc<dyn Catalog>,
) -> CatalogServiceServer<Self::CatalogHandler>;
fn catalog_service(&self) -> CatalogServiceServer<Self::CatalogHandler>;
/// Acquire an opaque handle to the Ingester's [`WriteService`] RPC
/// handler implementation.
@ -59,7 +56,6 @@ pub trait IngesterRpcInterface: Send + Sync + std::fmt::Debug {
fn query_service(
&self,
max_simultaneous_requests: usize,
metrics: &metric::Registry,
) -> FlightServiceServer<Self::FlightHandler>;
}
@ -195,7 +191,7 @@ pub async fn new(
namespace_name_provider,
table_name_provider,
partition_provider,
metrics,
Arc::clone(&metrics),
));
// TODO: start hot-partition persist task before replaying the WAL
@ -235,7 +231,7 @@ pub async fn new(
));
Ok(IngesterGuard {
rpc: GrpcDelegate::new(Arc::new(write_path), buffer, timestamp),
rpc: GrpcDelegate::new(Arc::new(write_path), buffer, timestamp, catalog, metrics),
rotation_task: handle,
})
}

View File

@ -32,6 +32,8 @@ pub(crate) struct GrpcDelegate<D, Q> {
dml_sink: Arc<D>,
query_exec: Arc<Q>,
timestamp: Arc<TimestampOracle>,
catalog: Arc<dyn Catalog>,
metrics: Arc<metric::Registry>,
}
impl<D, Q> GrpcDelegate<D, Q>
@ -44,11 +46,15 @@ where
dml_sink: Arc<D>,
query_exec: Arc<Q>,
timestamp: Arc<TimestampOracle>,
catalog: Arc<dyn Catalog>,
metrics: Arc<metric::Registry>,
) -> Self {
Self {
dml_sink,
query_exec,
timestamp,
catalog,
metrics,
}
}
}
@ -67,11 +73,8 @@ where
/// Acquire a [`CatalogService`] gRPC service implementation.
///
/// [`CatalogService`]: generated_types::influxdata::iox::catalog::v1::catalog_service_server::CatalogService.
fn catalog_service(
&self,
catalog: Arc<dyn Catalog>,
) -> CatalogServiceServer<Self::CatalogHandler> {
CatalogServiceServer::new(CatalogService::new(catalog))
fn catalog_service(&self) -> CatalogServiceServer<Self::CatalogHandler> {
CatalogServiceServer::new(CatalogService::new(Arc::clone(&self.catalog)))
}
/// Return a [`WriteService`] gRPC implementation.
@ -90,12 +93,11 @@ where
fn query_service(
&self,
max_simultaneous_requests: usize,
metrics: &metric::Registry,
) -> FlightServiceServer<Self::FlightHandler> {
FlightServiceServer::new(query::FlightService::new(
Arc::clone(&self.query_exec),
max_simultaneous_requests,
metrics,
&self.metrics,
))
}
}

20
ioxd_ingester2/Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "ioxd_ingester2"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies] # In alphabetical order
async-trait = "0.1"
clap_blocks = { path = "../clap_blocks" }
hyper = "0.14"
ingester2 = { path = "../ingester2" }
iox_catalog = { path = "../iox_catalog" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
thiserror = "1.0.37"
tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-util = { version = "0.7.4" }
trace = { path = "../trace" }
workspace-hack = { path = "../workspace-hack"}

162
ioxd_ingester2/src/lib.rs Normal file
View File

@ -0,0 +1,162 @@
use async_trait::async_trait;
use clap_blocks::ingester2::Ingester2Config;
use hyper::{Body, Request, Response};
use ingester2::{IngesterGuard, IngesterRpcInterface};
use iox_catalog::interface::Catalog;
use ioxd_common::{
add_service,
http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource},
rpc::RpcBuilderInput,
serve_builder,
server_type::{CommonServerState, RpcError, ServerType},
setup_builder,
};
use metric::Registry;
use std::{
fmt::{Debug, Display},
sync::Arc,
time::Duration,
};
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
#[derive(Debug, Error)]
pub enum Error {
#[error("error initializing ingester2: {0}")]
Ingester(#[from] ingester2::InitError),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
struct IngesterServerType<I: IngesterRpcInterface> {
server: IngesterGuard<I>,
shutdown: CancellationToken,
metrics: Arc<Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
max_simultaneous_queries: usize,
}
impl<I: IngesterRpcInterface> IngesterServerType<I> {
pub fn new(
server: IngesterGuard<I>,
metrics: Arc<Registry>,
common_state: &CommonServerState,
max_simultaneous_queries: usize,
) -> Self {
Self {
server,
shutdown: CancellationToken::new(),
metrics,
trace_collector: common_state.trace_collector(),
max_simultaneous_queries,
}
}
}
impl<I: IngesterRpcInterface> std::fmt::Debug for IngesterServerType<I> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Ingester2")
}
}
#[async_trait]
impl<I: IngesterRpcInterface + Sync + Send + Debug + 'static> ServerType for IngesterServerType<I> {
/// Return the [`metric::Registry`] used by the ingester.
fn metric_registry(&self) -> Arc<Registry> {
Arc::clone(&self.metrics)
}
/// Returns the trace collector for ingester traces.
fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>> {
self.trace_collector.as_ref().map(Arc::clone)
}
/// Just return "not found".
async fn route_http_request(
&self,
_req: Request<Body>,
) -> Result<Response<Body>, Box<dyn HttpApiErrorSource>> {
Err(Box::new(IoxHttpError::NotFound))
}
/// Configure the gRPC services.
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, self);
add_service!(builder, self.server.rpc().catalog_service());
add_service!(builder, self.server.rpc().write_service());
add_service!(
builder,
self.server
.rpc()
.query_service(self.max_simultaneous_queries)
);
serve_builder!(builder);
Ok(())
}
async fn join(self: Arc<Self>) {
self.shutdown.cancelled().await;
}
fn shutdown(&self) {
self.shutdown.cancel();
}
}
/// Simple error struct, we're not really providing an HTTP interface for the ingester.
#[derive(Debug)]
pub enum IoxHttpError {
NotFound,
}
impl IoxHttpError {
fn status_code(&self) -> HttpApiErrorCode {
match self {
IoxHttpError::NotFound => HttpApiErrorCode::NotFound,
}
}
}
impl Display for IoxHttpError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for IoxHttpError {}
impl HttpApiErrorSource for IoxHttpError {
fn to_http_api_error(&self) -> HttpApiError {
HttpApiError::new(self.status_code(), self.to_string())
}
}
const PERSIST_BACKGROUND_FETCH_TIME: Duration = Duration::from_secs(30);
/// Instantiate an ingester server type
pub async fn create_ingester_server_type(
common_state: &CommonServerState,
catalog: Arc<dyn Catalog>,
metrics: Arc<Registry>,
ingester_config: &Ingester2Config,
) -> Result<Arc<dyn ServerType>> {
let grpc = ingester2::new(
catalog,
Arc::clone(&metrics),
PERSIST_BACKGROUND_FETCH_TIME,
ingester_config.wal_directory.clone(),
Duration::from_secs(ingester_config.wal_rotation_period_seconds),
)
.await?;
Ok(Arc::new(IngesterServerType::new(
grpc,
metrics,
common_state,
ingester_config.concurrent_query_limit,
)))
}

View File

@ -1,5 +1,7 @@
use async_trait::async_trait;
use clap_blocks::{router::RouterConfig, write_buffer::WriteBufferConfig};
use clap_blocks::{
router::RouterConfig, router_rpc_write::RouterRpcWriteConfig, write_buffer::WriteBufferConfig,
};
use data_types::{NamespaceName, PartitionTemplate, TemplatePart};
use hashbrown::HashMap;
use hyper::{Body, Request, Response};
@ -244,24 +246,20 @@ impl HttpApiErrorSource for IoxHttpErrorAdaptor {
// NOTE!!! This needs to be kept in sync with `create_router_server_type` until the
// switch to the RPC write path/ingester2 is complete! See the numbered sections that annotate
// where these two functions line up and where they diverge.
#[allow(clippy::too_many_arguments)] // Some of these arguments should go away soon.
pub async fn create_router_grpc_write_server_type(
common_state: &CommonServerState,
metrics: Arc<metric::Registry>,
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
request_limit: usize,
ingester_addresses: &[String],
topic: &str,
query_pool_name: &str,
new_namespace_retention_hours: Option<u64>,
router_config: &RouterRpcWriteConfig,
) -> Result<Arc<dyn ServerType>> {
// 1. START: Different Setup Per Router Path: this part is only relevant to using RPC write
// path and should not be added to `create_router_server_type`.
// Initialise the DML handler that sends writes to the ingester using the RPC write path.
let rpc_writer = RpcWrite::new(RoundRobin::new(
ingester_addresses
router_config
.ingester_addresses
.iter()
.map(|ingester_addr| write_service_client(ingester_addr)),
));
@ -336,16 +334,21 @@ pub async fn create_router_grpc_write_server_type(
let mut txn = catalog.start_transaction().await?;
let topic_id = txn
.topics()
.get_by_name(topic)
.get_by_name(&router_config.topic)
.await?
.map(|v| v.id)
.unwrap_or_else(|| panic!("no topic named {} in catalog", topic));
.unwrap_or_else(|| panic!("no topic named {} in catalog", router_config.topic));
let query_id = txn
.query_pools()
.create_or_get(query_pool_name)
.create_or_get(&router_config.query_pool_name)
.await
.map(|v| v.id)
.unwrap_or_else(|e| panic!("failed to upsert query pool {} in catalog: {}", topic, e));
.unwrap_or_else(|e| {
panic!(
"failed to upsert query pool {} in catalog: {}",
router_config.topic, e
)
});
txn.commit().await?;
let namespace_resolver = NamespaceAutocreation::new(
@ -354,7 +357,9 @@ pub async fn create_router_grpc_write_server_type(
Arc::clone(&catalog),
topic_id,
query_id,
new_namespace_retention_hours.map(|hours| hours as i64 * 60 * 60 * 1_000_000_000),
router_config
.new_namespace_retention_hours
.map(|hours| hours as i64 * 60 * 60 * 1_000_000_000),
);
//
////////////////////////////////////////////////////////////////////////////
@ -389,7 +394,7 @@ pub async fn create_router_grpc_write_server_type(
// 4. START: Initialize the HTTP API delegate, this is the same in both router paths
let http = HttpDelegate::new(
common_state.run_config().max_http_request_size,
request_limit,
router_config.http_request_limit,
namespace_resolver,
handler_stack,
&metrics,