Merge pull request #3506 from influxdata/pd/ingester-server
feat: Add scaffolding for ingester serverpull/24376/head
commit
5eb2e8b7fe
|
@ -1726,7 +1726,9 @@ dependencies = [
|
||||||
"influxdb_iox_client",
|
"influxdb_iox_client",
|
||||||
"influxdb_line_protocol",
|
"influxdb_line_protocol",
|
||||||
"influxdb_storage_client",
|
"influxdb_storage_client",
|
||||||
|
"ingester",
|
||||||
"internal_types",
|
"internal_types",
|
||||||
|
"iox_catalog",
|
||||||
"iox_object_store",
|
"iox_object_store",
|
||||||
"itertools",
|
"itertools",
|
||||||
"job_registry",
|
"job_registry",
|
||||||
|
@ -1858,10 +1860,13 @@ name = "ingester"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
|
"hyper",
|
||||||
"iox_catalog",
|
"iox_catalog",
|
||||||
|
"metric",
|
||||||
"mutable_batch",
|
"mutable_batch",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
"snafu",
|
"snafu",
|
||||||
|
"thiserror",
|
||||||
"uuid",
|
"uuid",
|
||||||
"workspace-hack",
|
"workspace-hack",
|
||||||
]
|
]
|
||||||
|
|
|
@ -14,7 +14,9 @@ dml = { path = "../dml" }
|
||||||
generated_types = { path = "../generated_types" }
|
generated_types = { path = "../generated_types" }
|
||||||
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
|
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
|
||||||
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
||||||
|
ingester = { path = "../ingester" }
|
||||||
internal_types = { path = "../internal_types" }
|
internal_types = { path = "../internal_types" }
|
||||||
|
iox_catalog = { path = "../iox_catalog" }
|
||||||
iox_object_store = { path = "../iox_object_store" }
|
iox_object_store = { path = "../iox_object_store" }
|
||||||
job_registry = { path = "../job_registry" }
|
job_registry = { path = "../job_registry" }
|
||||||
logfmt = { path = "../logfmt" }
|
logfmt = { path = "../logfmt" }
|
||||||
|
|
|
@ -0,0 +1,121 @@
|
||||||
|
//! Implementation of command line option for running ingester
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
clap_blocks::run_config::RunConfig,
|
||||||
|
influxdb_ioxd::{
|
||||||
|
self,
|
||||||
|
server_type::{
|
||||||
|
common_state::{CommonServerState, CommonServerStateError},
|
||||||
|
ingester::IngesterServerType,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use ingester::handler::IngestHandlerImpl;
|
||||||
|
use ingester::server::grpc::GrpcDelegate;
|
||||||
|
use ingester::server::{http::HttpDelegate, IngesterServer};
|
||||||
|
use iox_catalog::interface::{Catalog, KafkaPartition};
|
||||||
|
use iox_catalog::postgres::PostgresCatalog;
|
||||||
|
use observability_deps::tracing::*;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("Run: {0}")]
|
||||||
|
Run(#[from] influxdb_ioxd::Error),
|
||||||
|
|
||||||
|
#[error("Cannot setup server: {0}")]
|
||||||
|
Setup(#[from] crate::influxdb_ioxd::server_type::database::setup::Error),
|
||||||
|
|
||||||
|
#[error("Invalid config: {0}")]
|
||||||
|
InvalidConfig(#[from] CommonServerStateError),
|
||||||
|
|
||||||
|
#[error("Catalog error: {0}")]
|
||||||
|
Catalog(#[from] iox_catalog::interface::Error),
|
||||||
|
|
||||||
|
#[error("Kafka topic {0} not found in the catalog")]
|
||||||
|
KafkaTopicNotFound(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
||||||
|
#[derive(Debug, clap::Parser)]
|
||||||
|
#[clap(
|
||||||
|
name = "run",
|
||||||
|
about = "Runs in ingester mode",
|
||||||
|
long_about = "Run the IOx ingester server.\n\nThe configuration options below can be \
|
||||||
|
set either with the command line flags or with the specified environment \
|
||||||
|
variable. If there is a file named '.env' in the current working directory, \
|
||||||
|
it is sourced before loading the configuration.
|
||||||
|
Configuration is loaded from the following sources (highest precedence first):
|
||||||
|
- command line arguments
|
||||||
|
- user set environment variables
|
||||||
|
- .env file contents
|
||||||
|
- pre-configured default values"
|
||||||
|
)]
|
||||||
|
pub struct Config {
|
||||||
|
#[clap(flatten)]
|
||||||
|
pub(crate) run_config: RunConfig,
|
||||||
|
|
||||||
|
/// Postgres connection string
|
||||||
|
#[clap(env = "INFLUXDB_IOX_CATALOG_DNS")]
|
||||||
|
pub catalog_dsn: String,
|
||||||
|
|
||||||
|
/// Kafka connection string
|
||||||
|
#[clap(env = "INFLUXDB_IOX_KAFKA_CONNECTION")]
|
||||||
|
pub kafka_connection: String,
|
||||||
|
|
||||||
|
/// Kafka topic name
|
||||||
|
#[clap(env = "INFLUXDB_IOX_KAFKA_TOPIC")]
|
||||||
|
pub kafka_topic: String,
|
||||||
|
|
||||||
|
/// Kafka partition number to start (inclusive) range with
|
||||||
|
#[clap(env = "INFLUXDB_IOX_KAFKA_PARTITION_RANGE_START")]
|
||||||
|
pub kafka_partition_range_start: i32,
|
||||||
|
|
||||||
|
/// Kafka partition number to end (inclusive) range with
|
||||||
|
#[clap(env = "INFLUXDB_IOX_KAFKA_PARTITION_RANGE_END")]
|
||||||
|
pub kafka_partition_range_end: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn command(config: Config) -> Result<()> {
|
||||||
|
let common_state = CommonServerState::from_config(config.run_config.clone())?;
|
||||||
|
|
||||||
|
let catalog: Arc<dyn Catalog> = Arc::new(
|
||||||
|
PostgresCatalog::connect(
|
||||||
|
"ingester",
|
||||||
|
iox_catalog::postgres::SCHEMA_NAME,
|
||||||
|
&config.catalog_dsn,
|
||||||
|
)
|
||||||
|
.await?,
|
||||||
|
);
|
||||||
|
|
||||||
|
let kafka_topic = match catalog
|
||||||
|
.kafka_topics()
|
||||||
|
.get_by_name(&config.kafka_topic)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
Some(k) => k,
|
||||||
|
None => return Err(Error::KafkaTopicNotFound(config.kafka_topic)),
|
||||||
|
};
|
||||||
|
let kafka_partitions: Vec<_> = (config.kafka_partition_range_start
|
||||||
|
..config.kafka_partition_range_end)
|
||||||
|
.map(KafkaPartition::new)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let ingest_handler = Arc::new(IngestHandlerImpl::new(
|
||||||
|
kafka_topic,
|
||||||
|
kafka_partitions,
|
||||||
|
catalog,
|
||||||
|
));
|
||||||
|
let http = HttpDelegate::new(Arc::clone(&ingest_handler));
|
||||||
|
let grpc = GrpcDelegate::new(ingest_handler);
|
||||||
|
|
||||||
|
let ingester = IngesterServer::new(http, grpc);
|
||||||
|
let server_type = Arc::new(IngesterServerType::new(ingester, &common_state));
|
||||||
|
|
||||||
|
info!("starting ingester");
|
||||||
|
|
||||||
|
Ok(influxdb_ioxd::main(common_state, server_type).await?)
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ use snafu::{ResultExt, Snafu};
|
||||||
use crate::clap_blocks::run_config::RunConfig;
|
use crate::clap_blocks::run_config::RunConfig;
|
||||||
|
|
||||||
pub mod database;
|
pub mod database;
|
||||||
|
pub mod ingester;
|
||||||
pub mod router;
|
pub mod router;
|
||||||
pub mod router2;
|
pub mod router2;
|
||||||
pub mod test;
|
pub mod test;
|
||||||
|
@ -19,6 +20,9 @@ pub enum Error {
|
||||||
#[snafu(display("Error in router2 subcommand: {}", source))]
|
#[snafu(display("Error in router2 subcommand: {}", source))]
|
||||||
Router2Error { source: router2::Error },
|
Router2Error { source: router2::Error },
|
||||||
|
|
||||||
|
#[snafu(display("Error in ingester subcommand: {}", source))]
|
||||||
|
IngesterError { source: ingester::Error },
|
||||||
|
|
||||||
#[snafu(display("Error in test subcommand: {}", source))]
|
#[snafu(display("Error in test subcommand: {}", source))]
|
||||||
TestError { source: test::Error },
|
TestError { source: test::Error },
|
||||||
}
|
}
|
||||||
|
@ -43,6 +47,7 @@ impl Config {
|
||||||
Some(Command::Database(config)) => &config.run_config,
|
Some(Command::Database(config)) => &config.run_config,
|
||||||
Some(Command::Router(config)) => &config.run_config,
|
Some(Command::Router(config)) => &config.run_config,
|
||||||
Some(Command::Router2(config)) => &config.run_config,
|
Some(Command::Router2(config)) => &config.run_config,
|
||||||
|
Some(Command::Ingester(config)) => &config.run_config,
|
||||||
Some(Command::Test(config)) => &config.run_config,
|
Some(Command::Test(config)) => &config.run_config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -59,6 +64,9 @@ enum Command {
|
||||||
/// Run the server in router2 mode
|
/// Run the server in router2 mode
|
||||||
Router2(router2::Config),
|
Router2(router2::Config),
|
||||||
|
|
||||||
|
/// Run the server in ingester mode
|
||||||
|
Ingester(ingester::Config),
|
||||||
|
|
||||||
/// Run the server in test mode
|
/// Run the server in test mode
|
||||||
Test(test::Config),
|
Test(test::Config),
|
||||||
}
|
}
|
||||||
|
@ -76,6 +84,7 @@ pub async fn command(config: Config) -> Result<()> {
|
||||||
Some(Command::Database(config)) => database::command(config).await.context(DatabaseSnafu),
|
Some(Command::Database(config)) => database::command(config).await.context(DatabaseSnafu),
|
||||||
Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu),
|
Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu),
|
||||||
Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu),
|
Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu),
|
||||||
|
Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu),
|
||||||
Some(Command::Test(config)) => test::command(config).await.context(TestSnafu),
|
Some(Command::Test(config)) => test::command(config).await.context(TestSnafu),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,100 @@
|
||||||
|
use std::{
|
||||||
|
fmt::{Debug, Display},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use hyper::{Body, Request, Response};
|
||||||
|
use ingester::server::IngesterServer;
|
||||||
|
use metric::Registry;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
use trace::TraceCollector;
|
||||||
|
|
||||||
|
use crate::influxdb_ioxd::{
|
||||||
|
http::error::{HttpApiError, HttpApiErrorSource},
|
||||||
|
rpc::RpcBuilderInput,
|
||||||
|
server_type::{common_state::CommonServerState, RpcError, ServerType},
|
||||||
|
};
|
||||||
|
use ingester::handler::IngestHandler;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct IngesterServerType<I: IngestHandler> {
|
||||||
|
server: IngesterServer<I>,
|
||||||
|
shutdown: CancellationToken,
|
||||||
|
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I: IngestHandler> IngesterServerType<I> {
|
||||||
|
pub fn new(server: IngesterServer<I>, common_state: &CommonServerState) -> Self {
|
||||||
|
Self {
|
||||||
|
server,
|
||||||
|
shutdown: CancellationToken::new(),
|
||||||
|
trace_collector: common_state.trace_collector(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<I: IngestHandler + Sync + Send + Debug + 'static> ServerType for IngesterServerType<I> {
|
||||||
|
type RouteError = IoxHttpErrorAdaptor;
|
||||||
|
|
||||||
|
/// Return the [`metric::Registry`] used by the router.
|
||||||
|
fn metric_registry(&self) -> Arc<Registry> {
|
||||||
|
self.server.metric_registry()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the trace collector for router traces.
|
||||||
|
fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>> {
|
||||||
|
self.trace_collector.as_ref().map(Arc::clone)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dispatches `req` to the router [`HttpDelegate`] delegate.
|
||||||
|
///
|
||||||
|
/// [`HttpDelegate`]: router2::server::http::HttpDelegate
|
||||||
|
async fn route_http_request(
|
||||||
|
&self,
|
||||||
|
_req: Request<Body>,
|
||||||
|
) -> Result<Response<Body>, Self::RouteError> {
|
||||||
|
unimplemented!();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Registers the services exposed by the router [`GrpcDelegate`] delegate.
|
||||||
|
///
|
||||||
|
/// [`GrpcDelegate`]: router2::server::grpc::GrpcDelegate
|
||||||
|
async fn server_grpc(self: Arc<Self>, _builder_input: RpcBuilderInput) -> Result<(), RpcError> {
|
||||||
|
unimplemented!()
|
||||||
|
// let builder = setup_builder!(builder_input, self);
|
||||||
|
// add_service!(builder, self.server.grpc().write_service());
|
||||||
|
// serve_builder!(builder);
|
||||||
|
//
|
||||||
|
// Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn join(self: Arc<Self>) {
|
||||||
|
self.shutdown.cancelled().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn shutdown(&self) {
|
||||||
|
self.shutdown.cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This adaptor converts the `ingester` http error type into a type that
|
||||||
|
/// satisfies the requirements of influxdb_ioxd's runner framework, keeping the
|
||||||
|
/// two decoupled.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct IoxHttpErrorAdaptor(router2::server::http::Error);
|
||||||
|
|
||||||
|
impl Display for IoxHttpErrorAdaptor {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
Display::fmt(&self.0, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for IoxHttpErrorAdaptor {}
|
||||||
|
|
||||||
|
impl HttpApiErrorSource for IoxHttpErrorAdaptor {
|
||||||
|
fn to_http_api_error(&self) -> HttpApiError {
|
||||||
|
HttpApiError::new(self.0.as_status_code(), self.to_string())
|
||||||
|
}
|
||||||
|
}
|
|
@ -10,6 +10,7 @@ use crate::influxdb_ioxd::{http::error::HttpApiErrorSource, rpc::RpcBuilderInput
|
||||||
|
|
||||||
pub mod common_state;
|
pub mod common_state;
|
||||||
pub mod database;
|
pub mod database;
|
||||||
|
pub mod ingester;
|
||||||
pub mod router;
|
pub mod router;
|
||||||
pub mod router2;
|
pub mod router2;
|
||||||
pub mod test;
|
pub mod test;
|
||||||
|
|
|
@ -6,9 +6,12 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
arrow = { version = "7.0", features = ["prettyprint"] }
|
arrow = { version = "7.0", features = ["prettyprint"] }
|
||||||
|
hyper = "0.14"
|
||||||
iox_catalog = { path = "../iox_catalog" }
|
iox_catalog = { path = "../iox_catalog" }
|
||||||
|
metric = { path = "../metric" }
|
||||||
mutable_batch = { path = "../mutable_batch"}
|
mutable_batch = { path = "../mutable_batch"}
|
||||||
parking_lot = "0.11.2"
|
parking_lot = "0.11.2"
|
||||||
snafu = "0.7"
|
snafu = "0.7"
|
||||||
|
thiserror = "1.0"
|
||||||
uuid = { version = "0.8", features = ["v4"] }
|
uuid = { version = "0.8", features = ["v4"] }
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
|
|
@ -5,10 +5,10 @@ use arrow::record_batch::RecordBatch;
|
||||||
use std::{collections::BTreeMap, sync::Arc};
|
use std::{collections::BTreeMap, sync::Arc};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::server::IngesterServer;
|
use crate::handler::IngestHandlerImpl;
|
||||||
use iox_catalog::interface::{
|
use iox_catalog::interface::{
|
||||||
Catalog, KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, SequenceNumber, SequencerId,
|
KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId,
|
||||||
TableId, Tombstone,
|
Tombstone,
|
||||||
};
|
};
|
||||||
use mutable_batch::MutableBatch;
|
use mutable_batch::MutableBatch;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
|
@ -54,7 +54,7 @@ pub struct Sequencers {
|
||||||
|
|
||||||
impl Sequencers {
|
impl Sequencers {
|
||||||
/// One time initialize Sequencers of this Ingester
|
/// One time initialize Sequencers of this Ingester
|
||||||
pub async fn initialize<T: Catalog>(ingester: &IngesterServer<'_, T>) -> Result<Self> {
|
pub async fn initialize(ingester: &IngestHandlerImpl) -> Result<Self> {
|
||||||
// Get sequencer ids from the catalog
|
// Get sequencer ids from the catalog
|
||||||
let sequencer_repro = ingester.iox_catalog.sequencers();
|
let sequencer_repro = ingester.iox_catalog.sequencers();
|
||||||
let mut sequencers = BTreeMap::default();
|
let mut sequencers = BTreeMap::default();
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
//! Ingest handler
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use iox_catalog::interface::{Catalog, KafkaPartition, KafkaTopic, KafkaTopicId};
|
||||||
|
use std::fmt::Formatter;
|
||||||
|
|
||||||
|
/// The [`IngestHandler`] handles all ingest from kafka, persistence and queries
|
||||||
|
pub trait IngestHandler {}
|
||||||
|
|
||||||
|
/// Implementation of the `IngestHandler` trait to ingest from kafka and manage persistence and answer queries
|
||||||
|
pub struct IngestHandlerImpl {
|
||||||
|
/// Kafka Topic assigned to this ingester
|
||||||
|
kafka_topic: KafkaTopic,
|
||||||
|
/// Kafka Partitions (Shards) assigned to this INgester
|
||||||
|
kafka_partitions: Vec<KafkaPartition>,
|
||||||
|
/// Catalog of this ingester
|
||||||
|
pub iox_catalog: Arc<dyn Catalog>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for IngestHandlerImpl {
|
||||||
|
fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IngestHandlerImpl {
|
||||||
|
/// Initialize the Ingester
|
||||||
|
pub fn new(
|
||||||
|
topic: KafkaTopic,
|
||||||
|
shard_ids: Vec<KafkaPartition>,
|
||||||
|
catalog: Arc<dyn Catalog>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
kafka_topic: topic,
|
||||||
|
kafka_partitions: shard_ids,
|
||||||
|
iox_catalog: catalog,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return a kafka topic
|
||||||
|
pub fn get_topic(&self) -> KafkaTopic {
|
||||||
|
self.kafka_topic.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return a kafka topic id
|
||||||
|
pub fn get_topic_id(&self) -> KafkaTopicId {
|
||||||
|
self.kafka_topic.id
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return a kafka topic name
|
||||||
|
pub fn get_topic_name(&self) -> String {
|
||||||
|
self.kafka_topic.name.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return Kafka Partitions
|
||||||
|
pub fn get_kafka_partitions(&self) -> Vec<KafkaPartition> {
|
||||||
|
self.kafka_partitions.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IngestHandler for IngestHandlerImpl {}
|
|
@ -14,4 +14,5 @@
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub mod data;
|
pub mod data;
|
||||||
|
pub mod handler;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
|
|
|
@ -1,54 +1,49 @@
|
||||||
//! Ingester Server
|
//! Ingester server entrypoint.
|
||||||
//!
|
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use iox_catalog::interface::{Catalog, KafkaPartition, KafkaTopic, KafkaTopicId};
|
use self::{grpc::GrpcDelegate, http::HttpDelegate};
|
||||||
|
use crate::handler::IngestHandler;
|
||||||
|
use std::fmt::Debug;
|
||||||
|
|
||||||
/// The [`IngesterServer`] manages the lifecycle and contains all state for
|
pub mod grpc;
|
||||||
/// an `ingester` server instance.
|
pub mod http;
|
||||||
pub struct IngesterServer<'a, T>
|
|
||||||
where
|
/// The [`IngesterServer`] manages the lifecycle and contains all state for a
|
||||||
T: Catalog,
|
/// `ingester` server instance.
|
||||||
{
|
#[derive(Debug, Default)]
|
||||||
/// Kafka Topic assigned to this ingester
|
pub struct IngesterServer<I: IngestHandler> {
|
||||||
kafka_topic: KafkaTopic,
|
metrics: Arc<metric::Registry>,
|
||||||
/// Kafka Partitions (Shards) assigned to this INgester
|
|
||||||
kafka_partitions: Vec<KafkaPartition>,
|
http: HttpDelegate<I>,
|
||||||
/// Catalog of this ingester
|
grpc: GrpcDelegate<I>,
|
||||||
pub iox_catalog: &'a Arc<T>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T> IngesterServer<'a, T>
|
impl<I: IngestHandler> IngesterServer<I> {
|
||||||
where
|
/// Initialise a new [`IngesterServer`] using the provided HTTP and gRPC
|
||||||
T: Catalog,
|
/// handlers.
|
||||||
{
|
pub fn new(http: HttpDelegate<I>, grpc: GrpcDelegate<I>) -> Self {
|
||||||
/// Initialize the Ingester
|
|
||||||
pub fn new(topic: KafkaTopic, shard_ids: Vec<KafkaPartition>, catalog: &'a Arc<T>) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
kafka_topic: topic,
|
metrics: Default::default(),
|
||||||
kafka_partitions: shard_ids,
|
http,
|
||||||
iox_catalog: catalog,
|
grpc,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a kafka topic
|
/// Return the [`metric::Registry`] used by the router.
|
||||||
pub fn get_topic(&self) -> KafkaTopic {
|
pub fn metric_registry(&self) -> Arc<metric::Registry> {
|
||||||
self.kafka_topic.clone()
|
Arc::clone(&self.metrics)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
/// Return a kafka topic id
|
|
||||||
pub fn get_topic_id(&self) -> KafkaTopicId {
|
impl<I: IngestHandler + Debug> IngesterServer<I> {
|
||||||
self.kafka_topic.id
|
/// Get a reference to the router http delegate.
|
||||||
}
|
pub fn http(&self) -> &HttpDelegate<I> {
|
||||||
|
&self.http
|
||||||
/// Return a kafka topic name
|
}
|
||||||
pub fn get_topic_name(&self) -> String {
|
|
||||||
self.kafka_topic.name.clone()
|
/// Get a reference to the router grpc delegate.
|
||||||
}
|
pub fn grpc(&self) -> &GrpcDelegate<I> {
|
||||||
|
&self.grpc
|
||||||
/// Return Kafka Partitions
|
|
||||||
pub fn get_kafka_partitions(&self) -> Vec<KafkaPartition> {
|
|
||||||
self.kafka_partitions.clone()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
//! gRPC service implementations for `ingester`.
|
||||||
|
|
||||||
|
use crate::handler::IngestHandler;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
/// This type is responsible for managing all gRPC services exposed by
|
||||||
|
/// `ingester`.
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct GrpcDelegate<I: IngestHandler> {
|
||||||
|
#[allow(dead_code)]
|
||||||
|
ingest_handler: Arc<I>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I: IngestHandler> GrpcDelegate<I> {
|
||||||
|
/// Initialise a new [`GrpcDelegate`] passing valid requests to the
|
||||||
|
/// specified `ingest_handler`.
|
||||||
|
pub fn new(ingest_handler: Arc<I>) -> Self {
|
||||||
|
Self { ingest_handler }
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,50 @@
|
||||||
|
//! HTTP service implementations for `ingester`.
|
||||||
|
|
||||||
|
use crate::handler::IngestHandler;
|
||||||
|
use hyper::{Body, Request, Response, StatusCode};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
/// Errors returned by the `router2` HTTP request handler.
|
||||||
|
#[derive(Debug, Error, Copy, Clone)]
|
||||||
|
pub enum Error {
|
||||||
|
/// The requested path has no registered handler.
|
||||||
|
#[error("not found")]
|
||||||
|
NotFound,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Error {
|
||||||
|
/// Convert the error into an appropriate [`StatusCode`] to be returned to
|
||||||
|
/// the end user.
|
||||||
|
pub fn as_status_code(&self) -> StatusCode {
|
||||||
|
match self {
|
||||||
|
Error::NotFound => StatusCode::NOT_FOUND,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This type is responsible for servicing requests to the `ingester` HTTP
|
||||||
|
/// endpoint.
|
||||||
|
///
|
||||||
|
/// Requests to some paths may be handled externally by the caller - the IOx
|
||||||
|
/// server runner framework takes care of implementing the heath endpoint,
|
||||||
|
/// metrics, pprof, etc.
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct HttpDelegate<I: IngestHandler> {
|
||||||
|
#[allow(dead_code)]
|
||||||
|
ingest_handler: Arc<I>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I: IngestHandler> HttpDelegate<I> {
|
||||||
|
/// Initialise a new [`HttpDelegate`] passing valid requests to the
|
||||||
|
/// specified `ingest_handler`.
|
||||||
|
pub fn new(ingest_handler: Arc<I>) -> Self {
|
||||||
|
Self { ingest_handler }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Routes `req` to the appropriate handler, if any, returning the handler
|
||||||
|
/// response.
|
||||||
|
pub fn route(&self, _req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,8 +16,8 @@ use uuid::Uuid;
|
||||||
const MAX_CONNECTIONS: u32 = 5;
|
const MAX_CONNECTIONS: u32 = 5;
|
||||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
|
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
|
||||||
const IDLE_TIMEOUT: Duration = Duration::from_secs(500);
|
const IDLE_TIMEOUT: Duration = Duration::from_secs(500);
|
||||||
#[allow(dead_code)]
|
/// the default schema name to use in Postgres
|
||||||
const SCHEMA_NAME: &str = "iox_catalog";
|
pub const SCHEMA_NAME: &str = "iox_catalog";
|
||||||
|
|
||||||
/// In-memory catalog that implements the `RepoCollection` and individual repo traits.
|
/// In-memory catalog that implements the `RepoCollection` and individual repo traits.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
Loading…
Reference in New Issue