diff --git a/Cargo.lock b/Cargo.lock index ebdeda30da..04d8a545b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1726,7 +1726,9 @@ dependencies = [ "influxdb_iox_client", "influxdb_line_protocol", "influxdb_storage_client", + "ingester", "internal_types", + "iox_catalog", "iox_object_store", "itertools", "job_registry", @@ -1861,13 +1863,16 @@ dependencies = [ "arrow_util", "data_types", "datafusion 0.1.0", + "hyper", "iox_catalog", + "metric", "mutable_batch", "parking_lot", "predicate", "query", "schema", "snafu", + "thiserror", "tokio", "uuid", "workspace-hack", @@ -3216,6 +3221,7 @@ dependencies = [ "observability_deps", "ordered-float 2.10.0", "regex", + "regex-syntax", "schema", "serde_json", "snafu", @@ -3760,6 +3766,27 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "rskafka" +version = "0.1.0" +source = "git+https://github.com/influxdata/rskafka.git?rev=63cf0a541b864d5376b2f96537a5bbb610d0e4bc#63cf0a541b864d5376b2f96537a5bbb610d0e4bc" +dependencies = [ + "async-trait", + "bytes", + "crc", + "futures", + "parking_lot", + "pin-project-lite", + "rand", + "rustls", + "thiserror", + "time 0.3.5", + "tokio", + "tokio-rustls", + "tracing", + "varint-rs", +] + [[package]] name = "rusoto_core" version = "0.47.0" @@ -4653,6 +4680,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "time" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41effe7cfa8af36f439fac33861b66b049edc6f9a32331e2312660529c1c24ad" +dependencies = [ + "libc", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -5132,6 +5168,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "varint-rs" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" + [[package]] name = "vcpkg" version = "0.2.15" @@ -5454,6 +5496,7 @@ dependencies = [ "pin-project", "prost", "rdkafka", + "rskafka", "tempfile", "time 0.1.0", "tokio", diff --git a/data_types/src/delete_predicate.rs b/data_types/src/delete_predicate.rs index 89c92af3fa..33f30b81d8 100644 --- a/data_types/src/delete_predicate.rs +++ b/data_types/src/delete_predicate.rs @@ -169,7 +169,7 @@ mod tests { #[test] fn test_expr_to_sql_no_expressions() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![], }; assert_eq!(&pred.expr_sql_string(), ""); @@ -178,7 +178,7 @@ mod tests { #[test] fn test_expr_to_sql_operators() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![ DeleteExpr { column: String::from("col1"), @@ -198,7 +198,7 @@ mod tests { #[test] fn test_expr_to_sql_column_escape() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![ DeleteExpr { column: String::from("col 1"), @@ -226,7 +226,7 @@ mod tests { #[test] fn test_expr_to_sql_bool() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![ DeleteExpr { column: String::from("col1"), @@ -246,7 +246,7 @@ mod tests { #[test] fn test_expr_to_sql_i64() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![ DeleteExpr { column: String::from("col1"), @@ -284,7 +284,7 @@ mod tests { #[test] fn test_expr_to_sql_f64() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![ DeleteExpr { column: String::from("col1"), @@ -327,7 +327,7 @@ mod tests { #[test] fn test_expr_to_sql_string() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![ DeleteExpr { column: String::from("col1"), diff --git a/data_types/src/timestamp.rs b/data_types/src/timestamp.rs index 71a8e14832..b6a1ecd542 100644 --- a/data_types/src/timestamp.rs +++ b/data_types/src/timestamp.rs @@ -36,15 +36,17 @@ pub const MAX_NANO_TIME: i64 = i64::MAX - 1; /// ``` #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Copy, Debug, Hash)] pub struct TimestampRange { - /// Start defines the inclusive lower bound. - pub start: i64, - /// End defines the exclusive upper bound. - pub end: i64, + /// Start defines the inclusive lower bound. Minimum value is [MIN_NANO_TIME] + start: i64, + /// End defines the exclusive upper bound. Maximum value is [MAX_NANO_TIME] + end: i64, } impl TimestampRange { pub fn new(start: i64, end: i64) -> Self { debug_assert!(end >= start); + let start = start.max(MIN_NANO_TIME); + let end = end.min(MAX_NANO_TIME); Self { start, end } } @@ -59,6 +61,16 @@ impl TimestampRange { pub fn contains_opt(&self, v: Option) -> bool { Some(true) == v.map(|ts| self.contains(ts)) } + + /// Return the timestamp range's end. + pub fn end(&self) -> i64 { + self.end + } + + /// Return the timestamp range's start. + pub fn start(&self) -> i64 { + self.start + } } /// Specifies a min/max timestamp value. @@ -93,6 +105,40 @@ impl TimestampMinMax { mod tests { use super::*; + #[test] + fn test_timestamp_nano_min_max() { + let cases = vec![ + ( + "MIN/MAX Nanos", + TimestampRange::new(MIN_NANO_TIME, MAX_NANO_TIME), + ), + ("MIN/MAX i64", TimestampRange::new(i64::MIN, i64::MAX)), + ]; + + for (name, range) in cases { + println!("case: {}", name); + assert!(!range.contains(i64::MIN)); + assert!(range.contains(MIN_NANO_TIME)); + assert!(range.contains(MIN_NANO_TIME + 1)); + assert!(range.contains(MAX_NANO_TIME - 1)); + assert!(!range.contains(MAX_NANO_TIME)); + assert!(!range.contains(i64::MAX)); + } + } + + #[test] + fn test_timestamp_i64_min_max_offset() { + let range = TimestampRange::new(MIN_NANO_TIME + 1, MAX_NANO_TIME - 1); + + assert!(!range.contains(i64::MIN)); + assert!(!range.contains(MIN_NANO_TIME)); + assert!(range.contains(MIN_NANO_TIME + 1)); + assert!(range.contains(MAX_NANO_TIME - 2)); + assert!(!range.contains(MAX_NANO_TIME - 1)); + assert!(!range.contains(MAX_NANO_TIME)); + assert!(!range.contains(i64::MAX)); + } + #[test] fn test_timestamp_range_contains() { let range = TimestampRange::new(100, 200); diff --git a/db/src/catalog/chunk.rs b/db/src/catalog/chunk.rs index dddfbf1f02..cf5314681a 100644 --- a/db/src/catalog/chunk.rs +++ b/db/src/catalog/chunk.rs @@ -1170,7 +1170,7 @@ mod tests { // Build delete predicate and expected output let del_pred1 = Arc::new(DeletePredicate { - range: TimestampRange { start: 0, end: 100 }, + range: TimestampRange::new(0, 100), exprs: vec![DeleteExpr::new( "city".to_string(), data_types::delete_predicate::Op::Eq, @@ -1192,7 +1192,7 @@ mod tests { // let add more delete predicate = simulate second delete // Build delete predicate and expected output let del_pred2 = Arc::new(DeletePredicate { - range: TimestampRange { start: 20, end: 50 }, + range: TimestampRange::new(20, 50), exprs: vec![DeleteExpr::new( "cost".to_string(), data_types::delete_predicate::Op::Ne, diff --git a/db/src/lifecycle/compact.rs b/db/src/lifecycle/compact.rs index 2587cd4e07..e750019277 100644 --- a/db/src/lifecycle/compact.rs +++ b/db/src/lifecycle/compact.rs @@ -255,10 +255,7 @@ mod tests { // Cannot simply use empty predicate (#2687) let predicate = Arc::new(DeletePredicate { - range: TimestampRange { - start: 0, - end: 1_000, - }, + range: TimestampRange::new(0, 1_000), exprs: vec![], }); @@ -288,10 +285,7 @@ mod tests { write_lp(db.as_ref(), "cpu foo=3 20"); write_lp(db.as_ref(), "cpu foo=4 20"); - let range = TimestampRange { - start: 0, - end: 1_000, - }; + let range = TimestampRange::new(0, 1_000); let pred1 = Arc::new(DeletePredicate { range, exprs: vec![DeleteExpr::new("foo".to_string(), Op::Eq, Scalar::I64(1))], diff --git a/db/src/lifecycle/compact_object_store.rs b/db/src/lifecycle/compact_object_store.rs index ab37e808d9..eb7c565956 100644 --- a/db/src/lifecycle/compact_object_store.rs +++ b/db/src/lifecycle/compact_object_store.rs @@ -811,7 +811,7 @@ mod tests { // Create 3 delete predicates that will delete all cookies in 3 different deletes let pred1 = Arc::new(DeletePredicate { - range: TimestampRange { start: 0, end: 11 }, + range: TimestampRange::new(0, 11), exprs: vec![DeleteExpr::new( "tag1".to_string(), data_types::delete_predicate::Op::Eq, @@ -819,7 +819,7 @@ mod tests { )], }); let pred2 = Arc::new(DeletePredicate { - range: TimestampRange { start: 12, end: 21 }, + range: TimestampRange::new(12, 21), exprs: vec![DeleteExpr::new( "tag1".to_string(), data_types::delete_predicate::Op::Eq, @@ -827,7 +827,7 @@ mod tests { )], }); let pred3 = Arc::new(DeletePredicate { - range: TimestampRange { start: 22, end: 31 }, + range: TimestampRange::new(22, 31), exprs: vec![DeleteExpr::new( "tag1".to_string(), data_types::delete_predicate::Op::Eq, @@ -976,7 +976,7 @@ mod tests { // Delete all let predicate = Arc::new(DeletePredicate { - range: TimestampRange { start: 0, end: 30 }, + range: TimestampRange::new(0, 30), exprs: vec![], }); db.delete("cpu", predicate).unwrap(); diff --git a/db/src/lifecycle/persist.rs b/db/src/lifecycle/persist.rs index 263d8ba044..fcec26241c 100644 --- a/db/src/lifecycle/persist.rs +++ b/db/src/lifecycle/persist.rs @@ -324,7 +324,7 @@ mod tests { // Delete first row let predicate = Arc::new(DeletePredicate { - range: TimestampRange { start: 0, end: 20 }, + range: TimestampRange::new(0, 20), exprs: vec![], }); @@ -388,10 +388,7 @@ mod tests { // Delete everything let predicate = Arc::new(DeletePredicate { - range: TimestampRange { - start: 0, - end: 1_000, - }, + range: TimestampRange::new(0, 1_000), exprs: vec![], }); @@ -430,10 +427,7 @@ mod tests { // Cannot simply use empty predicate (#2687) let predicate = Arc::new(DeletePredicate { - range: TimestampRange { - start: 0, - end: 1_000, - }, + range: TimestampRange::new(0, 1_000), exprs: vec![], }); @@ -494,10 +488,7 @@ mod tests { write_lp(db.as_ref(), "cpu foo=3 20"); write_lp(db.as_ref(), "cpu foo=4 20"); - let range = TimestampRange { - start: 0, - end: 1_000, - }; + let range = TimestampRange::new(0, 1_000); let pred1 = Arc::new(DeletePredicate { range, exprs: vec![DeleteExpr::new("foo".to_string(), Op::Eq, Scalar::I64(1))], diff --git a/db/src/pred.rs b/db/src/pred.rs index 482f6f56c5..0b8f456a2f 100644 --- a/db/src/pred.rs +++ b/db/src/pred.rs @@ -37,7 +37,7 @@ pub fn to_read_buffer_predicate(predicate: &Predicate) -> Result { - read_buffer::Predicate::with_time_range(&exprs, range.start, range.end) + read_buffer::Predicate::with_time_range(&exprs, range.start(), range.end()) } None => read_buffer::Predicate::new(exprs), }) diff --git a/db/src/replay.rs b/db/src/replay.rs index b26e77bd47..672c9dd869 100644 --- a/db/src/replay.rs +++ b/db/src/replay.rs @@ -2653,7 +2653,7 @@ mod tests { sequence_number: 1, table_name: None, predicate: DeletePredicate { - range: TimestampRange { start: 0, end: 20 }, + range: TimestampRange::new(0, 20), exprs: vec![], }, }]), @@ -2707,7 +2707,7 @@ mod tests { sequence_number: 1, table_name: None, predicate: DeletePredicate { - range: TimestampRange { start: 0, end: 11 }, + range: TimestampRange::new(0, 11), exprs: vec![], }, }]), @@ -2817,7 +2817,7 @@ mod tests { sequence_number: 2, table_name: Some("table_1"), predicate: DeletePredicate { - range: TimestampRange { start: 19, end: 21 }, + range: TimestampRange::new(19, 21), exprs: vec![], }, }]), diff --git a/dml/src/lib.rs b/dml/src/lib.rs index d789c7acc0..ca4b260c8a 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -593,7 +593,7 @@ mod tests { let meta = DmlMeta::unsequenced(None); let delete = DmlDelete::new( DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![], }, None, @@ -615,7 +615,7 @@ mod tests { let meta = DmlMeta::unsequenced(None); let delete = DmlDelete::new( DeletePredicate { - range: TimestampRange { start: 3, end: 4 }, + range: TimestampRange::new(3, 4), exprs: vec![], }, Some(NonEmptyString::new("some_foo").unwrap()), @@ -630,7 +630,7 @@ mod tests { let meta = DmlMeta::unsequenced(None); let delete = DmlDelete::new( DeletePredicate { - range: TimestampRange { start: 5, end: 6 }, + range: TimestampRange::new(5, 6), exprs: vec![], }, Some(NonEmptyString::new("bar").unwrap()), diff --git a/generated_types/src/delete_predicate.rs b/generated_types/src/delete_predicate.rs index 09e5e04329..2fa7b2126d 100644 --- a/generated_types/src/delete_predicate.rs +++ b/generated_types/src/delete_predicate.rs @@ -20,8 +20,8 @@ impl From for proto::Predicate { fn from(predicate: DeletePredicate) -> Self { proto::Predicate { range: Some(proto::TimestampRange { - start: predicate.range.start, - end: predicate.range.end, + start: predicate.range.start(), + end: predicate.range.end(), }), exprs: predicate.exprs.into_iter().map(Into::into).collect(), } @@ -35,10 +35,7 @@ impl TryFrom for DeletePredicate { let range = value.range.unwrap_field("range")?; Ok(Self { - range: TimestampRange { - start: range.start, - end: range.end, - }, + range: TimestampRange::new(range.start, range.end), exprs: value.exprs.repeated("exprs")?, }) } diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 252ec87a0d..9cca8f74c8 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -14,7 +14,9 @@ dml = { path = "../dml" } generated_types = { path = "../generated_types" } influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] } influxdb_line_protocol = { path = "../influxdb_line_protocol" } +ingester = { path = "../ingester" } internal_types = { path = "../internal_types" } +iox_catalog = { path = "../iox_catalog" } iox_object_store = { path = "../iox_object_store" } job_registry = { path = "../job_registry" } logfmt = { path = "../logfmt" } diff --git a/influxdb_iox/src/commands/run/ingester.rs b/influxdb_iox/src/commands/run/ingester.rs new file mode 100644 index 0000000000..57ff80b6d6 --- /dev/null +++ b/influxdb_iox/src/commands/run/ingester.rs @@ -0,0 +1,121 @@ +//! Implementation of command line option for running ingester + +use std::sync::Arc; + +use crate::{ + clap_blocks::run_config::RunConfig, + influxdb_ioxd::{ + self, + server_type::{ + common_state::{CommonServerState, CommonServerStateError}, + ingester::IngesterServerType, + }, + }, +}; +use ingester::handler::IngestHandlerImpl; +use ingester::server::grpc::GrpcDelegate; +use ingester::server::{http::HttpDelegate, IngesterServer}; +use iox_catalog::interface::{Catalog, KafkaPartition}; +use iox_catalog::postgres::PostgresCatalog; +use observability_deps::tracing::*; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Run: {0}")] + Run(#[from] influxdb_ioxd::Error), + + #[error("Cannot setup server: {0}")] + Setup(#[from] crate::influxdb_ioxd::server_type::database::setup::Error), + + #[error("Invalid config: {0}")] + InvalidConfig(#[from] CommonServerStateError), + + #[error("Catalog error: {0}")] + Catalog(#[from] iox_catalog::interface::Error), + + #[error("Kafka topic {0} not found in the catalog")] + KafkaTopicNotFound(String), +} + +pub type Result = std::result::Result; + +#[derive(Debug, clap::Parser)] +#[clap( + name = "run", + about = "Runs in ingester mode", + long_about = "Run the IOx ingester server.\n\nThe configuration options below can be \ + set either with the command line flags or with the specified environment \ + variable. If there is a file named '.env' in the current working directory, \ + it is sourced before loading the configuration. +Configuration is loaded from the following sources (highest precedence first): + - command line arguments + - user set environment variables + - .env file contents + - pre-configured default values" +)] +pub struct Config { + #[clap(flatten)] + pub(crate) run_config: RunConfig, + + /// Postgres connection string + #[clap(env = "INFLUXDB_IOX_CATALOG_DNS")] + pub catalog_dsn: String, + + /// Kafka connection string + #[clap(env = "INFLUXDB_IOX_KAFKA_CONNECTION")] + pub kafka_connection: String, + + /// Kafka topic name + #[clap(env = "INFLUXDB_IOX_KAFKA_TOPIC")] + pub kafka_topic: String, + + /// Kafka partition number to start (inclusive) range with + #[clap(env = "INFLUXDB_IOX_KAFKA_PARTITION_RANGE_START")] + pub kafka_partition_range_start: i32, + + /// Kafka partition number to end (inclusive) range with + #[clap(env = "INFLUXDB_IOX_KAFKA_PARTITION_RANGE_END")] + pub kafka_partition_range_end: i32, +} + +pub async fn command(config: Config) -> Result<()> { + let common_state = CommonServerState::from_config(config.run_config.clone())?; + + let catalog: Arc = Arc::new( + PostgresCatalog::connect( + "ingester", + iox_catalog::postgres::SCHEMA_NAME, + &config.catalog_dsn, + ) + .await?, + ); + + let kafka_topic = match catalog + .kafka_topics() + .get_by_name(&config.kafka_topic) + .await? + { + Some(k) => k, + None => return Err(Error::KafkaTopicNotFound(config.kafka_topic)), + }; + let kafka_partitions: Vec<_> = (config.kafka_partition_range_start + ..config.kafka_partition_range_end) + .map(KafkaPartition::new) + .collect(); + + let ingest_handler = Arc::new(IngestHandlerImpl::new( + kafka_topic, + kafka_partitions, + catalog, + )); + let http = HttpDelegate::new(Arc::clone(&ingest_handler)); + let grpc = GrpcDelegate::new(ingest_handler); + + let ingester = IngesterServer::new(http, grpc); + let server_type = Arc::new(IngesterServerType::new(ingester, &common_state)); + + info!("starting ingester"); + + Ok(influxdb_ioxd::main(common_state, server_type).await?) +} diff --git a/influxdb_iox/src/commands/run/mod.rs b/influxdb_iox/src/commands/run/mod.rs index 6533663f0f..68882ce6d9 100644 --- a/influxdb_iox/src/commands/run/mod.rs +++ b/influxdb_iox/src/commands/run/mod.rs @@ -3,6 +3,7 @@ use snafu::{ResultExt, Snafu}; use crate::clap_blocks::run_config::RunConfig; pub mod database; +pub mod ingester; pub mod router; pub mod router2; pub mod test; @@ -19,6 +20,9 @@ pub enum Error { #[snafu(display("Error in router2 subcommand: {}", source))] Router2Error { source: router2::Error }, + #[snafu(display("Error in ingester subcommand: {}", source))] + IngesterError { source: ingester::Error }, + #[snafu(display("Error in test subcommand: {}", source))] TestError { source: test::Error }, } @@ -43,6 +47,7 @@ impl Config { Some(Command::Database(config)) => &config.run_config, Some(Command::Router(config)) => &config.run_config, Some(Command::Router2(config)) => &config.run_config, + Some(Command::Ingester(config)) => &config.run_config, Some(Command::Test(config)) => &config.run_config, } } @@ -59,6 +64,9 @@ enum Command { /// Run the server in router2 mode Router2(router2::Config), + /// Run the server in ingester mode + Ingester(ingester::Config), + /// Run the server in test mode Test(test::Config), } @@ -76,6 +84,7 @@ pub async fn command(config: Config) -> Result<()> { Some(Command::Database(config)) => database::command(config).await.context(DatabaseSnafu), Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu), Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu), + Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu), Some(Command::Test(config)) => test::command(config).await.context(TestSnafu), } } diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/ingester.rs b/influxdb_iox/src/influxdb_ioxd/server_type/ingester.rs new file mode 100644 index 0000000000..ecb9bd7ad7 --- /dev/null +++ b/influxdb_iox/src/influxdb_ioxd/server_type/ingester.rs @@ -0,0 +1,100 @@ +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; + +use async_trait::async_trait; +use hyper::{Body, Request, Response}; +use ingester::server::IngesterServer; +use metric::Registry; +use tokio_util::sync::CancellationToken; +use trace::TraceCollector; + +use crate::influxdb_ioxd::{ + http::error::{HttpApiError, HttpApiErrorSource}, + rpc::RpcBuilderInput, + server_type::{common_state::CommonServerState, RpcError, ServerType}, +}; +use ingester::handler::IngestHandler; + +#[derive(Debug)] +pub struct IngesterServerType { + server: IngesterServer, + shutdown: CancellationToken, + trace_collector: Option>, +} + +impl IngesterServerType { + pub fn new(server: IngesterServer, common_state: &CommonServerState) -> Self { + Self { + server, + shutdown: CancellationToken::new(), + trace_collector: common_state.trace_collector(), + } + } +} + +#[async_trait] +impl ServerType for IngesterServerType { + type RouteError = IoxHttpErrorAdaptor; + + /// Return the [`metric::Registry`] used by the router. + fn metric_registry(&self) -> Arc { + self.server.metric_registry() + } + + /// Returns the trace collector for router traces. + fn trace_collector(&self) -> Option> { + self.trace_collector.as_ref().map(Arc::clone) + } + + /// Dispatches `req` to the router [`HttpDelegate`] delegate. + /// + /// [`HttpDelegate`]: router2::server::http::HttpDelegate + async fn route_http_request( + &self, + _req: Request, + ) -> Result, Self::RouteError> { + unimplemented!(); + } + + /// Registers the services exposed by the router [`GrpcDelegate`] delegate. + /// + /// [`GrpcDelegate`]: router2::server::grpc::GrpcDelegate + async fn server_grpc(self: Arc, _builder_input: RpcBuilderInput) -> Result<(), RpcError> { + unimplemented!() + // let builder = setup_builder!(builder_input, self); + // add_service!(builder, self.server.grpc().write_service()); + // serve_builder!(builder); + // + // Ok(()) + } + + async fn join(self: Arc) { + self.shutdown.cancelled().await; + } + + fn shutdown(&self) { + self.shutdown.cancel(); + } +} + +/// This adaptor converts the `ingester` http error type into a type that +/// satisfies the requirements of influxdb_ioxd's runner framework, keeping the +/// two decoupled. +#[derive(Debug)] +pub struct IoxHttpErrorAdaptor(router2::server::http::Error); + +impl Display for IoxHttpErrorAdaptor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.0, f) + } +} + +impl std::error::Error for IoxHttpErrorAdaptor {} + +impl HttpApiErrorSource for IoxHttpErrorAdaptor { + fn to_http_api_error(&self) -> HttpApiError { + HttpApiError::new(self.0.as_status_code(), self.to_string()) + } +} diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs b/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs index ade35bd93c..59da6bf504 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs @@ -10,6 +10,7 @@ use crate::influxdb_ioxd::{http::error::HttpApiErrorSource, rpc::RpcBuilderInput pub mod common_state; pub mod database; +pub mod ingester; pub mod router; pub mod router2; pub mod test; diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/router/http.rs b/influxdb_iox/src/influxdb_ioxd/server_type/router/http.rs index b9a2716f5a..d27afba29a 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/router/http.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/router/http.rs @@ -177,7 +177,7 @@ mod tests { check_response("delete", response, StatusCode::NO_CONTENT, Some("")).await; let predicate = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![DeleteExpr { column: String::from("foo"), op: data_types::delete_predicate::Op::Eq, diff --git a/influxdb_iox/tests/end_to_end_cases/delete_api.rs b/influxdb_iox/tests/end_to_end_cases/delete_api.rs index 765216a063..a0348c3ea0 100644 --- a/influxdb_iox/tests/end_to_end_cases/delete_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/delete_api.rs @@ -66,10 +66,7 @@ async fn test_delete_on_database() { // Delete some data let table = "cpu"; let pred = DeletePredicate { - range: TimestampRange { - start: 100, - end: 120, - }, + range: TimestampRange::new(100, 120), exprs: vec![DeleteExpr { column: String::from("region"), op: data_types::delete_predicate::Op::Eq, @@ -169,10 +166,7 @@ pub async fn test_delete_on_router() { let table = "cpu"; let pred = DeletePredicate { - range: TimestampRange { - start: 100, - end: 120, - }, + range: TimestampRange::new(100, 120), exprs: vec![DeleteExpr { column: String::from("region"), op: data_types::delete_predicate::Op::Eq, diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 901ad78cef..a988285b1f 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -9,7 +9,9 @@ arrow = { version = "7.0", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } datafusion = { path = "../datafusion" } data_types = { path = "../data_types" } +hyper = "0.14" iox_catalog = { path = "../iox_catalog" } +metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch"} parking_lot = "0.11.2" predicate = { path = "../predicate" } @@ -17,5 +19,6 @@ query = { path = "../query" } schema = { path = "../schema" } snafu = "0.7" tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } +thiserror = "1.0" uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} diff --git a/ingester/src/data.rs b/ingester/src/data.rs index c682c23b39..e3e30641e2 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -6,10 +6,10 @@ use data_types::delete_predicate::DeletePredicate; use std::{collections::BTreeMap, sync::Arc}; use uuid::Uuid; -use crate::server::IngesterServer; +use crate::handler::IngestHandlerImpl; use iox_catalog::interface::{ - KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, RepoCollection, SequenceNumber, - SequencerId, TableId, Tombstone, + KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId, + Tombstone, }; use mutable_batch::MutableBatch; use parking_lot::RwLock; @@ -55,11 +55,9 @@ pub struct Sequencers { impl Sequencers { /// One time initialize Sequencers of this Ingester - pub async fn initialize( - ingester: &IngesterServer<'_, T>, - ) -> Result { + pub async fn initialize(ingester: &IngestHandlerImpl) -> Result { // Get sequencer ids from the catalog - let sequencer_repro = ingester.iox_catalog.sequencer(); + let sequencer_repro = ingester.iox_catalog.sequencers(); let mut sequencers = BTreeMap::default(); let topic = ingester.get_topic(); for shard in ingester.get_kafka_partitions() { diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs new file mode 100644 index 0000000000..58d9c23188 --- /dev/null +++ b/ingester/src/handler.rs @@ -0,0 +1,62 @@ +//! Ingest handler + +use std::sync::Arc; + +use iox_catalog::interface::{Catalog, KafkaPartition, KafkaTopic, KafkaTopicId}; +use std::fmt::Formatter; + +/// The [`IngestHandler`] handles all ingest from kafka, persistence and queries +pub trait IngestHandler {} + +/// Implementation of the `IngestHandler` trait to ingest from kafka and manage persistence and answer queries +pub struct IngestHandlerImpl { + /// Kafka Topic assigned to this ingester + kafka_topic: KafkaTopic, + /// Kafka Partitions (Shards) assigned to this INgester + kafka_partitions: Vec, + /// Catalog of this ingester + pub iox_catalog: Arc, +} + +impl std::fmt::Debug for IngestHandlerImpl { + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +impl IngestHandlerImpl { + /// Initialize the Ingester + pub fn new( + topic: KafkaTopic, + shard_ids: Vec, + catalog: Arc, + ) -> Self { + Self { + kafka_topic: topic, + kafka_partitions: shard_ids, + iox_catalog: catalog, + } + } + + /// Return a kafka topic + pub fn get_topic(&self) -> KafkaTopic { + self.kafka_topic.clone() + } + + /// Return a kafka topic id + pub fn get_topic_id(&self) -> KafkaTopicId { + self.kafka_topic.id + } + + /// Return a kafka topic name + pub fn get_topic_name(&self) -> String { + self.kafka_topic.name.clone() + } + + /// Return Kafka Partitions + pub fn get_kafka_partitions(&self) -> Vec { + self.kafka_partitions.clone() + } +} + +impl IngestHandler for IngestHandlerImpl {} diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index 097e3242e0..68fd09a53d 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -16,4 +16,5 @@ pub mod compact; pub mod data; pub mod query; +pub mod handler; pub mod server; diff --git a/ingester/src/server.rs b/ingester/src/server.rs index 11ce6dc553..324b8cbf5b 100644 --- a/ingester/src/server.rs +++ b/ingester/src/server.rs @@ -1,54 +1,49 @@ -//! Ingester Server -//! +//! Ingester server entrypoint. use std::sync::Arc; -use iox_catalog::interface::{KafkaPartition, KafkaTopic, KafkaTopicId, RepoCollection}; +use self::{grpc::GrpcDelegate, http::HttpDelegate}; +use crate::handler::IngestHandler; +use std::fmt::Debug; -/// The [`IngesterServer`] manages the lifecycle and contains all state for -/// an `ingester` server instance. -pub struct IngesterServer<'a, T> -where - T: RepoCollection + Send + Sync, -{ - /// Kafka Topic assigned to this ingester - kafka_topic: KafkaTopic, - /// Kafka Partitions (Shards) assigned to this INgester - kafka_partitions: Vec, - /// Catalog of this ingester - pub iox_catalog: &'a Arc, +pub mod grpc; +pub mod http; + +/// The [`IngesterServer`] manages the lifecycle and contains all state for a +/// `ingester` server instance. +#[derive(Debug, Default)] +pub struct IngesterServer { + metrics: Arc, + + http: HttpDelegate, + grpc: GrpcDelegate, } -impl<'a, T> IngesterServer<'a, T> -where - T: RepoCollection + Send + Sync, -{ - /// Initialize the Ingester - pub fn new(topic: KafkaTopic, shard_ids: Vec, catalog: &'a Arc) -> Self { +impl IngesterServer { + /// Initialise a new [`IngesterServer`] using the provided HTTP and gRPC + /// handlers. + pub fn new(http: HttpDelegate, grpc: GrpcDelegate) -> Self { Self { - kafka_topic: topic, - kafka_partitions: shard_ids, - iox_catalog: catalog, + metrics: Default::default(), + http, + grpc, } } - /// Return a kafka topic - pub fn get_topic(&self) -> KafkaTopic { - self.kafka_topic.clone() - } - - /// Return a kafka topic id - pub fn get_topic_id(&self) -> KafkaTopicId { - self.kafka_topic.id - } - - /// Return a kafka topic name - pub fn get_topic_name(&self) -> String { - self.kafka_topic.name.clone() - } - - /// Return Kafka Partitions - pub fn get_kafka_partitions(&self) -> Vec { - self.kafka_partitions.clone() + /// Return the [`metric::Registry`] used by the router. + pub fn metric_registry(&self) -> Arc { + Arc::clone(&self.metrics) + } +} + +impl IngesterServer { + /// Get a reference to the router http delegate. + pub fn http(&self) -> &HttpDelegate { + &self.http + } + + /// Get a reference to the router grpc delegate. + pub fn grpc(&self) -> &GrpcDelegate { + &self.grpc } } diff --git a/ingester/src/server/grpc.rs b/ingester/src/server/grpc.rs new file mode 100644 index 0000000000..09f460ed8e --- /dev/null +++ b/ingester/src/server/grpc.rs @@ -0,0 +1,20 @@ +//! gRPC service implementations for `ingester`. + +use crate::handler::IngestHandler; +use std::sync::Arc; + +/// This type is responsible for managing all gRPC services exposed by +/// `ingester`. +#[derive(Debug, Default)] +pub struct GrpcDelegate { + #[allow(dead_code)] + ingest_handler: Arc, +} + +impl GrpcDelegate { + /// Initialise a new [`GrpcDelegate`] passing valid requests to the + /// specified `ingest_handler`. + pub fn new(ingest_handler: Arc) -> Self { + Self { ingest_handler } + } +} diff --git a/ingester/src/server/http.rs b/ingester/src/server/http.rs new file mode 100644 index 0000000000..09a4d26fa1 --- /dev/null +++ b/ingester/src/server/http.rs @@ -0,0 +1,50 @@ +//! HTTP service implementations for `ingester`. + +use crate::handler::IngestHandler; +use hyper::{Body, Request, Response, StatusCode}; +use std::sync::Arc; +use thiserror::Error; + +/// Errors returned by the `router2` HTTP request handler. +#[derive(Debug, Error, Copy, Clone)] +pub enum Error { + /// The requested path has no registered handler. + #[error("not found")] + NotFound, +} + +impl Error { + /// Convert the error into an appropriate [`StatusCode`] to be returned to + /// the end user. + pub fn as_status_code(&self) -> StatusCode { + match self { + Error::NotFound => StatusCode::NOT_FOUND, + } + } +} + +/// This type is responsible for servicing requests to the `ingester` HTTP +/// endpoint. +/// +/// Requests to some paths may be handled externally by the caller - the IOx +/// server runner framework takes care of implementing the heath endpoint, +/// metrics, pprof, etc. +#[derive(Debug, Default)] +pub struct HttpDelegate { + #[allow(dead_code)] + ingest_handler: Arc, +} + +impl HttpDelegate { + /// Initialise a new [`HttpDelegate`] passing valid requests to the + /// specified `ingest_handler`. + pub fn new(ingest_handler: Arc) -> Self { + Self { ingest_handler } + } + + /// Routes `req` to the appropriate handler, if any, returning the handler + /// response. + pub fn route(&self, _req: Request) -> Result, Error> { + unimplemented!() + } +} diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index d72e91a4ee..01431ae969 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -6,7 +6,6 @@ use snafu::{OptionExt, Snafu}; use std::collections::BTreeMap; use std::convert::TryFrom; use std::fmt::Formatter; -use std::sync::Arc; use uuid::Uuid; #[derive(Debug, Snafu)] @@ -247,32 +246,32 @@ impl std::fmt::Display for ParquetFileId { } } -/// Container that can return repos for each of the catalog data types. +/// Trait that contains methods for working with the catalog #[async_trait] -pub trait RepoCollection { +pub trait Catalog: Send + Sync { /// repo for kafka topics - fn kafka_topic(&self) -> Arc; + fn kafka_topics(&self) -> &dyn KafkaTopicRepo; /// repo fo rquery pools - fn query_pool(&self) -> Arc; + fn query_pools(&self) -> &dyn QueryPoolRepo; /// repo for namespaces - fn namespace(&self) -> Arc; + fn namespaces(&self) -> &dyn NamespaceRepo; /// repo for tables - fn table(&self) -> Arc; + fn tables(&self) -> &dyn TableRepo; /// repo for columns - fn column(&self) -> Arc; + fn columns(&self) -> &dyn ColumnRepo; /// repo for sequencers - fn sequencer(&self) -> Arc; + fn sequencers(&self) -> &dyn SequencerRepo; /// repo for partitions - fn partition(&self) -> Arc; + fn partitions(&self) -> &dyn PartitionRepo; /// repo for tombstones - fn tombstone(&self) -> Arc; + fn tombstones(&self) -> &dyn TombstoneRepo; /// repo for parquet_files - fn parquet_file(&self) -> Arc; + fn parquet_files(&self) -> &dyn ParquetFileRepo; } /// Functions for working with Kafka topics in the catalog. #[async_trait] -pub trait KafkaTopicRepo { +pub trait KafkaTopicRepo: Send + Sync { /// Creates the kafka topic in the catalog or gets the existing record by name. async fn create_or_get(&self, name: &str) -> Result; @@ -282,14 +281,14 @@ pub trait KafkaTopicRepo { /// Functions for working with query pools in the catalog. #[async_trait] -pub trait QueryPoolRepo { +pub trait QueryPoolRepo: Send + Sync { /// Creates the query pool in the catalog or gets the existing record by name. async fn create_or_get(&self, name: &str) -> Result; } /// Functions for working with namespaces in the catalog #[async_trait] -pub trait NamespaceRepo { +pub trait NamespaceRepo: Send + Sync { /// Creates the namespace in the catalog. If one by the same name already exists, an /// error is returned. async fn create( @@ -306,7 +305,7 @@ pub trait NamespaceRepo { /// Functions for working with tables in the catalog #[async_trait] -pub trait TableRepo { +pub trait TableRepo: Send + Sync { /// Creates the table in the catalog or get the existing record by name. async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result; @@ -316,7 +315,7 @@ pub trait TableRepo { /// Functions for working with columns in the catalog #[async_trait] -pub trait ColumnRepo { +pub trait ColumnRepo: Send + Sync { /// Creates the column in the catalog or returns the existing column. Will return a /// `Error::ColumnTypeMismatch` if the existing column type doesn't match the type /// the caller is attempting to create. @@ -333,7 +332,7 @@ pub trait ColumnRepo { /// Functions for working with sequencers in the catalog #[async_trait] -pub trait SequencerRepo { +pub trait SequencerRepo: Send + Sync { /// create a sequencer record for the kafka topic and partition or return the existing record async fn create_or_get( &self, @@ -358,7 +357,7 @@ pub trait SequencerRepo { /// Functions for working with IOx partitions in the catalog. Note that these are how /// IOx splits up data within a database, which is differenet than Kafka partitions. #[async_trait] -pub trait PartitionRepo { +pub trait PartitionRepo: Send + Sync { /// create or get a partition record for the given partition key, sequencer and table async fn create_or_get( &self, @@ -373,7 +372,7 @@ pub trait PartitionRepo { /// Functions for working with tombstones in the catalog #[async_trait] -pub trait TombstoneRepo { +pub trait TombstoneRepo: Send + Sync { /// create or get a tombstone async fn create_or_get( &self, @@ -397,7 +396,7 @@ pub trait TombstoneRepo { /// Functions for working with parquet file pointers in the catalog #[async_trait] -pub trait ParquetFileRepo { +pub trait ParquetFileRepo: Send + Sync { /// create the parquet file #[allow(clippy::too_many_arguments)] async fn create( @@ -519,22 +518,19 @@ impl NamespaceSchema { } /// Gets the namespace schema including all tables and columns. -pub async fn get_schema_by_name( +pub async fn get_schema_by_name( name: &str, - repo: &T, + catalog: &dyn Catalog, ) -> Result> { - let namespace_repo = repo.namespace(); - let table_repo = repo.table(); - let column_repo = repo.column(); - - let namespace = namespace_repo + let namespace = catalog + .namespaces() .get_by_name(name) .await? .context(NamespaceNotFoundSnafu { name })?; // get the columns first just in case someone else is creating schema while we're doing this. - let columns = column_repo.list_by_namespace_id(namespace.id).await?; - let tables = table_repo.list_by_namespace_id(namespace.id).await?; + let columns = catalog.columns().list_by_namespace_id(namespace.id).await?; + let tables = catalog.tables().list_by_namespace_id(namespace.id).await?; let mut namespace = NamespaceSchema::new( namespace.id, @@ -813,25 +809,22 @@ pub struct ParquetFile { pub(crate) mod test_helpers { use super::*; use futures::{stream::FuturesOrdered, StreamExt}; + use std::sync::Arc; - pub(crate) async fn test_repo(new_repo: F) - where - T: RepoCollection + Send + Sync, - F: Fn() -> T + Send + Sync, - { - test_kafka_topic(&new_repo()).await; - test_query_pool(&new_repo()).await; - test_namespace(&new_repo()).await; - test_table(&new_repo()).await; - test_column(&new_repo()).await; - test_sequencer(&new_repo()).await; - test_partition(&new_repo()).await; - test_tombstone(&new_repo()).await; - test_parquet_file(&new_repo()).await; + pub(crate) async fn test_catalog(catalog: Arc) { + test_kafka_topic(Arc::clone(&catalog)).await; + test_query_pool(Arc::clone(&catalog)).await; + test_namespace(Arc::clone(&catalog)).await; + test_table(Arc::clone(&catalog)).await; + test_column(Arc::clone(&catalog)).await; + test_sequencer(Arc::clone(&catalog)).await; + test_partition(Arc::clone(&catalog)).await; + test_tombstone(Arc::clone(&catalog)).await; + test_parquet_file(Arc::clone(&catalog)).await; } - async fn test_kafka_topic(repo: &T) { - let kafka_repo = repo.kafka_topic(); + async fn test_kafka_topic(catalog: Arc) { + let kafka_repo = catalog.kafka_topics(); let k = kafka_repo.create_or_get("foo").await.unwrap(); assert!(k.id > KafkaTopicId::new(0)); assert_eq!(k.name, "foo"); @@ -843,8 +836,8 @@ pub(crate) mod test_helpers { assert!(k3.is_none()); } - async fn test_query_pool(repo: &T) { - let query_repo = repo.query_pool(); + async fn test_query_pool(catalog: Arc) { + let query_repo = catalog.query_pools(); let q = query_repo.create_or_get("foo").await.unwrap(); assert!(q.id > QueryPoolId::new(0)); assert_eq!(q.name, "foo"); @@ -852,10 +845,10 @@ pub(crate) mod test_helpers { assert_eq!(q, q2); } - async fn test_namespace(repo: &T) { - let namespace_repo = repo.namespace(); - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); + async fn test_namespace(catalog: Arc) { + let namespace_repo = catalog.namespaces(); + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); let namespace_name = "test_namespace"; let namespace = namespace_repo @@ -881,53 +874,75 @@ pub(crate) mod test_helpers { assert_eq!(namespace, found); } - async fn test_table(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_table(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_table_test", "inf", kafka.id, pool.id) .await .unwrap(); // test we can create or get a table - let table_repo = repo.table(); - let t = table_repo + let t = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let tt = table_repo + let tt = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); assert!(t.id > TableId::new(0)); assert_eq!(t, tt); - let tables = table_repo.list_by_namespace_id(namespace.id).await.unwrap(); + let tables = catalog + .tables() + .list_by_namespace_id(namespace.id) + .await + .unwrap(); assert_eq!(vec![t], tables); + + // test we can create a table of the same name in a different namespace + let namespace2 = catalog + .namespaces() + .create("two", "inf", kafka.id, pool.id) + .await + .unwrap(); + assert_ne!(namespace, namespace2); + let test_table = catalog + .tables() + .create_or_get("test_table", namespace2.id) + .await + .unwrap(); + assert_ne!(tt, test_table); + assert_eq!(test_table.namespace_id, namespace2.id) } - async fn test_column(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_column(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_column_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = repo - .table() + let table = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); + assert_eq!(table.namespace_id, namespace.id); // test we can create or get a column - let column_repo = repo.column(); - let c = column_repo + let c = catalog + .columns() .create_or_get("column_test", table.id, ColumnType::Tag) .await .unwrap(); - let cc = column_repo + let cc = catalog + .columns() .create_or_get("column_test", table.id, ColumnType::Tag) .await .unwrap(); @@ -935,7 +950,8 @@ pub(crate) mod test_helpers { assert_eq!(c, cc); // test that attempting to create an already defined column of a different type returns error - let err = column_repo + let err = catalog + .columns() .create_or_get("column_test", table.id, ColumnType::U64) .await .expect_err("should error with wrong column type"); @@ -949,35 +965,40 @@ pub(crate) mod test_helpers { )); // test that we can create a column of the same name under a different table - let table2 = repo - .table() + let table2 = catalog + .tables() .create_or_get("test_table_2", namespace.id) .await .unwrap(); - let ccc = column_repo + let ccc = catalog + .columns() .create_or_get("column_test", table2.id, ColumnType::U64) .await .unwrap(); assert_ne!(c, ccc); - let columns = column_repo + let columns = catalog + .columns() .list_by_namespace_id(namespace.id) .await .unwrap(); assert_eq!(vec![c, ccc], columns); } - async fn test_sequencer(repo: &T) { - let kafka = repo - .kafka_topic() + async fn test_sequencer(catalog: Arc) { + let kafka = catalog + .kafka_topics() .create_or_get("sequencer_test") .await .unwrap(); - let sequencer_repo = repo.sequencer(); // Create 10 sequencers let created = (1..=10) - .map(|partition| sequencer_repo.create_or_get(&kafka, KafkaPartition::new(partition))) + .map(|partition| { + catalog + .sequencers() + .create_or_get(&kafka, KafkaPartition::new(partition)) + }) .collect::>() .map(|v| { let v = v.expect("failed to create sequencer"); @@ -987,7 +1008,8 @@ pub(crate) mod test_helpers { .await; // List them and assert they match - let listed = sequencer_repo + let listed = catalog + .sequencers() .list_by_kafka_topic(&kafka) .await .expect("failed to list sequencers") @@ -999,7 +1021,8 @@ pub(crate) mod test_helpers { // get by the sequencer id and partition let kafka_partition = KafkaPartition::new(1); - let sequencer = sequencer_repo + let sequencer = catalog + .sequencers() .get_by_topic_id_and_partition(kafka.id, kafka_partition) .await .unwrap() @@ -1007,42 +1030,45 @@ pub(crate) mod test_helpers { assert_eq!(kafka.id, sequencer.kafka_topic_id); assert_eq!(kafka_partition, sequencer.kafka_partition); - let sequencer = sequencer_repo + let sequencer = catalog + .sequencers() .get_by_topic_id_and_partition(kafka.id, KafkaPartition::new(523)) .await .unwrap(); assert!(sequencer.is_none()); } - async fn test_partition(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_partition(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_partition_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = repo - .table() + let table = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let sequencer = repo - .sequencer() + let sequencer = catalog + .sequencers() .create_or_get(&kafka, KafkaPartition::new(1)) .await .unwrap(); - let other_sequencer = repo - .sequencer() + let other_sequencer = catalog + .sequencers() .create_or_get(&kafka, KafkaPartition::new(2)) .await .unwrap(); - let partition_repo = repo.partition(); - let created = ["foo", "bar"] .iter() - .map(|key| partition_repo.create_or_get(key, sequencer.id, table.id)) + .map(|key| { + catalog + .partitions() + .create_or_get(key, sequencer.id, table.id) + }) .collect::>() .map(|v| { let v = v.expect("failed to create partition"); @@ -1050,13 +1076,15 @@ pub(crate) mod test_helpers { }) .collect::>() .await; - let _ = partition_repo + let _ = catalog + .partitions() .create_or_get("asdf", other_sequencer.id, table.id) .await .unwrap(); // List them and assert they match - let listed = partition_repo + let listed = catalog + .partitions() .list_by_sequencer(sequencer.id) .await .expect("failed to list partitions") @@ -1067,34 +1095,34 @@ pub(crate) mod test_helpers { assert_eq!(created, listed); } - async fn test_tombstone(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_tombstone(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_tombstone_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = repo - .table() + let table = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let other_table = repo - .table() + let other_table = catalog + .tables() .create_or_get("other", namespace.id) .await .unwrap(); - let sequencer = repo - .sequencer() + let sequencer = catalog + .sequencers() .create_or_get(&kafka, KafkaPartition::new(1)) .await .unwrap(); - let tombstone_repo = repo.tombstone(); let min_time = Timestamp::new(1); let max_time = Timestamp::new(10); - let t1 = tombstone_repo + let t1 = catalog + .tombstones() .create_or_get( table.id, sequencer.id, @@ -1111,7 +1139,8 @@ pub(crate) mod test_helpers { assert_eq!(t1.min_time, min_time); assert_eq!(t1.max_time, max_time); assert_eq!(t1.serialized_predicate, "whatevs"); - let t2 = tombstone_repo + let t2 = catalog + .tombstones() .create_or_get( other_table.id, sequencer.id, @@ -1122,7 +1151,8 @@ pub(crate) mod test_helpers { ) .await .unwrap(); - let t3 = tombstone_repo + let t3 = catalog + .tombstones() .create_or_get( table.id, sequencer.id, @@ -1134,43 +1164,44 @@ pub(crate) mod test_helpers { .await .unwrap(); - let listed = tombstone_repo + let listed = catalog + .tombstones() .list_tombstones_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1)) .await .unwrap(); assert_eq!(vec![t2, t3], listed); } - async fn test_parquet_file(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_parquet_file(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_parquet_file_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = repo - .table() + let table = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let other_table = repo - .table() + let other_table = catalog + .tables() .create_or_get("other", namespace.id) .await .unwrap(); - let sequencer = repo - .sequencer() + let sequencer = catalog + .sequencers() .create_or_get(&kafka, KafkaPartition::new(1)) .await .unwrap(); - let partition = repo - .partition() + let partition = catalog + .partitions() .create_or_get("one", sequencer.id, table.id) .await .unwrap(); - let other_partition = repo - .partition() + let other_partition = catalog + .partitions() .create_or_get("one", sequencer.id, other_table.id) .await .unwrap(); @@ -1178,7 +1209,7 @@ pub(crate) mod test_helpers { let min_time = Timestamp::new(1); let max_time = Timestamp::new(10); - let parquet_repo = repo.parquet_file(); + let parquet_repo = catalog.parquet_files(); let parquet_file = parquet_repo .create( sequencer.id, diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index a23de38af4..3c5c5f2356 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -12,8 +12,8 @@ )] use crate::interface::{ - column_type_from_field, ColumnSchema, ColumnType, Error, KafkaPartition, KafkaTopic, - NamespaceSchema, QueryPool, RepoCollection, Result, Sequencer, SequencerId, TableId, + column_type_from_field, Catalog, ColumnSchema, ColumnType, Error, KafkaPartition, KafkaTopic, + NamespaceSchema, QueryPool, Result, Sequencer, SequencerId, TableId, }; use futures::{stream::FuturesOrdered, StreamExt}; use influxdb_line_protocol::ParsedLine; @@ -36,10 +36,10 @@ pub mod postgres; /// If another writer attempts to create a column of the same name with a different /// type at the same time and beats this caller to it, an error will be returned. If another /// writer adds the same schema before this one, then this will load that schema here. -pub async fn validate_or_insert_schema( +pub async fn validate_or_insert_schema( lines: Vec>, schema: &NamespaceSchema, - repo: &T, + catalog: &dyn Catalog, ) -> Result> { // table name to table_id let mut new_tables: BTreeMap = BTreeMap::new(); @@ -66,8 +66,8 @@ pub async fn validate_or_insert_schema( None => { let entry = new_columns.entry(table.id).or_default(); if entry.get(key.as_str()).is_none() { - let column_repo = repo.column(); - let column = column_repo + let column = catalog + .columns() .create_or_get(key.as_str(), table.id, ColumnType::Tag) .await?; entry.insert( @@ -97,8 +97,8 @@ pub async fn validate_or_insert_schema( let entry = new_columns.entry(table.id).or_default(); if entry.get(key.as_str()).is_none() { let data_type = column_type_from_field(value); - let column_repo = repo.column(); - let column = column_repo + let column = catalog + .columns() .create_or_get(key.as_str(), table.id, data_type) .await?; entry.insert( @@ -113,15 +113,16 @@ pub async fn validate_or_insert_schema( } } None => { - let table_repo = repo.table(); - let new_table = table_repo.create_or_get(table_name, schema.id).await?; + let new_table = catalog + .tables() + .create_or_get(table_name, schema.id) + .await?; let new_table_columns = new_columns.entry(new_table.id).or_default(); - let column_repo = repo.column(); - if let Some(tagset) = &line.series.tag_set { for (key, _) in tagset { - let new_column = column_repo + let new_column = catalog + .columns() .create_or_get(key.as_str(), new_table.id, ColumnType::Tag) .await?; new_table_columns.insert( @@ -135,7 +136,8 @@ pub async fn validate_or_insert_schema( } for (key, value) in &line.field_set { let data_type = column_type_from_field(value); - let new_column = column_repo + let new_column = catalog + .columns() .create_or_get(key.as_str(), new_table.id, data_type) .await?; new_table_columns.insert( @@ -146,7 +148,8 @@ pub async fn validate_or_insert_schema( }, ); } - let time_column = column_repo + let time_column = catalog + .columns() .create_or_get(TIME_COLUMN, new_table.id, ColumnType::Time) .await?; new_table_columns.insert( @@ -173,19 +176,25 @@ pub async fn validate_or_insert_schema( /// Creates or gets records in the catalog for the shared kafka topic, query pool, and sequencers for /// each of the partitions. -pub async fn create_or_get_default_records( +pub async fn create_or_get_default_records( kafka_partition_count: i32, - repo: &T, + catalog: &dyn Catalog, ) -> Result<(KafkaTopic, QueryPool, BTreeMap)> { - let kafka_repo = repo.kafka_topic(); - let query_repo = repo.query_pool(); - let sequencer_repo = repo.sequencer(); - - let kafka_topic = kafka_repo.create_or_get(SHARED_KAFKA_TOPIC).await?; - let query_pool = query_repo.create_or_get(SHARED_QUERY_POOL).await?; + let kafka_topic = catalog + .kafka_topics() + .create_or_get(SHARED_KAFKA_TOPIC) + .await?; + let query_pool = catalog + .query_pools() + .create_or_get(SHARED_QUERY_POOL) + .await?; let sequencers = (1..=kafka_partition_count) - .map(|partition| sequencer_repo.create_or_get(&kafka_topic, KafkaPartition::new(partition))) + .map(|partition| { + catalog + .sequencers() + .create_or_get(&kafka_topic, KafkaPartition::new(partition)) + }) .collect::>() .map(|v| { let v = v.expect("failed to create sequencer"); @@ -207,13 +216,13 @@ mod tests { #[tokio::test] async fn test_validate_or_insert_schema() { - let repo = Arc::new(MemCatalog::new()); + let repo = MemCatalog::new(); let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &repo).await.unwrap(); let namespace_name = "validate_schema"; // now test with a new namespace let namespace = repo - .namespace() + .namespaces() .create(namespace_name, "inf", kafka_topic.id, query_pool.id) .await .unwrap(); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index c4cf0333b1..b5e634ea81 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -2,16 +2,16 @@ //! used for testing or for an IOx designed to run without catalog persistence. use crate::interface::{ - Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId, - KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId, - ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo, - RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, + Catalog, Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, + KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, + ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, + QueryPoolRepo, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo, }; use async_trait::async_trait; use std::convert::TryFrom; use std::fmt::Formatter; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use uuid::Uuid; /// In-memory catalog that implements the `RepoCollection` and individual repo traits from @@ -48,41 +48,41 @@ struct MemCollections { parquet_files: Vec, } -impl RepoCollection for Arc { - fn kafka_topic(&self) -> Arc { - Self::clone(self) as Arc +impl Catalog for MemCatalog { + fn kafka_topics(&self) -> &dyn KafkaTopicRepo { + self } - fn query_pool(&self) -> Arc { - Self::clone(self) as Arc + fn query_pools(&self) -> &dyn QueryPoolRepo { + self } - fn namespace(&self) -> Arc { - Self::clone(self) as Arc + fn namespaces(&self) -> &dyn NamespaceRepo { + self } - fn table(&self) -> Arc { - Self::clone(self) as Arc + fn tables(&self) -> &dyn TableRepo { + self } - fn column(&self) -> Arc { - Self::clone(self) as Arc + fn columns(&self) -> &dyn ColumnRepo { + self } - fn sequencer(&self) -> Arc { - Self::clone(self) as Arc + fn sequencers(&self) -> &dyn SequencerRepo { + self } - fn partition(&self) -> Arc { - Self::clone(self) as Arc + fn partitions(&self) -> &dyn PartitionRepo { + self } - fn tombstone(&self) -> Arc { - Self::clone(self) as Arc + fn tombstones(&self) -> &dyn TombstoneRepo { + self } - fn parquet_file(&self) -> Arc { - Self::clone(self) as Arc + fn parquet_files(&self) -> &dyn ParquetFileRepo { + self } } @@ -180,7 +180,11 @@ impl TableRepo for MemCatalog { async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result
{ let mut collections = self.collections.lock().expect("mutex poisoned"); - let table = match collections.tables.iter().find(|t| t.name == name) { + let table = match collections + .tables + .iter() + .find(|t| t.name == name && t.namespace_id == namespace_id) + { Some(t) => t, None => { let table = Table { @@ -250,18 +254,22 @@ impl ColumnRepo for MemCatalog { } async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result> { - let mut columns = vec![]; - let collections = self.collections.lock().expect("mutex poisoned"); - for t in collections + + let table_ids: Vec<_> = collections .tables .iter() .filter(|t| t.namespace_id == namespace_id) - { - for c in collections.columns.iter().filter(|c| c.table_id == t.id) { - columns.push(c.clone()); - } - } + .map(|t| t.id) + .collect(); + println!("tables: {:?}", collections.tables); + println!("table_ids: {:?}", table_ids); + let columns: Vec<_> = collections + .columns + .iter() + .filter(|c| table_ids.contains(&c.table_id)) + .cloned() + .collect(); Ok(columns) } @@ -488,11 +496,10 @@ impl ParquetFileRepo for MemCatalog { #[cfg(test)] mod tests { use super::*; + use std::sync::Arc; #[tokio::test] - async fn test_mem_repo() { - let f = || Arc::new(MemCatalog::new()); - - crate::interface::test_helpers::test_repo(f).await; + async fn test_catalog() { + crate::interface::test_helpers::test_catalog(Arc::new(MemCatalog::new())).await; } } diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 2b052a9738..d790500427 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1,24 +1,23 @@ //! A Postgres backed implementation of the Catalog use crate::interface::{ - Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId, + Catalog, Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo, - RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, - TableRepo, Timestamp, Tombstone, TombstoneRepo, + Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo, + Timestamp, Tombstone, TombstoneRepo, }; use async_trait::async_trait; use observability_deps::tracing::info; use sqlx::{postgres::PgPoolOptions, Executor, Pool, Postgres}; -use std::sync::Arc; use std::time::Duration; use uuid::Uuid; const MAX_CONNECTIONS: u32 = 5; const CONNECT_TIMEOUT: Duration = Duration::from_secs(2); const IDLE_TIMEOUT: Duration = Duration::from_secs(500); -#[allow(dead_code)] -const SCHEMA_NAME: &str = "iox_catalog"; +/// the default schema name to use in Postgres +pub const SCHEMA_NAME: &str = "iox_catalog"; /// In-memory catalog that implements the `RepoCollection` and individual repo traits. #[derive(Debug)] @@ -62,41 +61,41 @@ impl PostgresCatalog { } } -impl RepoCollection for Arc { - fn kafka_topic(&self) -> Arc { - Self::clone(self) as Arc +impl Catalog for PostgresCatalog { + fn kafka_topics(&self) -> &dyn KafkaTopicRepo { + self } - fn query_pool(&self) -> Arc { - Self::clone(self) as Arc + fn query_pools(&self) -> &dyn QueryPoolRepo { + self } - fn namespace(&self) -> Arc { - Self::clone(self) as Arc + fn namespaces(&self) -> &dyn NamespaceRepo { + self } - fn table(&self) -> Arc { - Self::clone(self) as Arc + fn tables(&self) -> &dyn TableRepo { + self } - fn column(&self) -> Arc { - Self::clone(self) as Arc + fn columns(&self) -> &dyn ColumnRepo { + self } - fn sequencer(&self) -> Arc { - Self::clone(self) as Arc + fn sequencers(&self) -> &dyn SequencerRepo { + self } - fn partition(&self) -> Arc { - Self::clone(self) as Arc + fn partitions(&self) -> &dyn PartitionRepo { + self } - fn tombstone(&self) -> Arc { - Self::clone(self) as Arc + fn tombstones(&self) -> &dyn TombstoneRepo { + self } - fn parquet_file(&self) -> Arc { - Self::clone(self) as Arc + fn parquet_files(&self) -> &dyn ParquetFileRepo { + self } } @@ -586,6 +585,7 @@ fn is_fk_violation(e: &sqlx::Error) -> bool { mod tests { use super::*; use std::env; + use std::sync::Arc; // Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set. macro_rules! maybe_skip_integration { @@ -624,17 +624,15 @@ mod tests { }}; } - async fn setup_db() -> Arc { + async fn setup_db() -> PostgresCatalog { let dsn = std::env::var("DATABASE_URL").unwrap(); - Arc::new( - PostgresCatalog::connect("test", SCHEMA_NAME, &dsn) - .await - .unwrap(), - ) + PostgresCatalog::connect("test", SCHEMA_NAME, &dsn) + .await + .unwrap() } #[tokio::test] - async fn test_repo() { + async fn test_catalog() { // If running an integration test on your laptop, this requires that you have Postgres // running and that you've done the sqlx migrations. See the README in this crate for // info to set it up. @@ -642,10 +640,9 @@ mod tests { let postgres = setup_db().await; clear_schema(&postgres.pool).await; + let postgres: Arc = Arc::new(postgres); - let f = || Arc::clone(&postgres); - - crate::interface::test_helpers::test_repo(f).await; + crate::interface::test_helpers::test_catalog(postgres).await; } async fn clear_schema(pool: &Pool) { diff --git a/parquet_catalog/src/test_helpers.rs b/parquet_catalog/src/test_helpers.rs index 6e1ae4ab73..2c49105e7d 100644 --- a/parquet_catalog/src/test_helpers.rs +++ b/parquet_catalog/src/test_helpers.rs @@ -545,7 +545,7 @@ fn get_sorted_keys<'a>( /// Helper to create a simple delete predicate. pub fn create_delete_predicate(value: i64) -> Arc { Arc::new(DeletePredicate { - range: TimestampRange { start: 11, end: 22 }, + range: TimestampRange::new(11, 22), exprs: vec![DeleteExpr::new( "foo".to_string(), Op::Eq, diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml index c1c638b1f5..52c2cc093e 100644 --- a/predicate/Cargo.toml +++ b/predicate/Cargo.toml @@ -13,6 +13,7 @@ schema = { path = "../schema" } observability_deps = { path = "../observability_deps" } ordered-float = "2" regex = "1" +regex-syntax = "0.6.25" serde_json = "1.0.72" snafu = "0.7" sqlparser = "0.13.0" diff --git a/predicate/src/delete_predicate.rs b/predicate/src/delete_predicate.rs index 30dcf905da..d142a54e90 100644 --- a/predicate/src/delete_predicate.rs +++ b/predicate/src/delete_predicate.rs @@ -94,10 +94,7 @@ pub fn parse_delete_predicate( let delete_exprs = parse_predicate(predicate)?; Ok(DeletePredicate { - range: TimestampRange { - start: start_time, - end: stop_time, - }, + range: TimestampRange::new(start_time, stop_time), exprs: delete_exprs, }) } @@ -551,8 +548,8 @@ mod tests { let pred = r#"cost != 100"#; let result = parse_delete_predicate(start, stop, pred).unwrap(); - assert_eq!(result.range.start, 0); - assert_eq!(result.range.end, 200); + assert_eq!(result.range.start(), 0); + assert_eq!(result.range.end(), 200); let expected = vec![DeleteExpr::new( "cost".to_string(), diff --git a/predicate/src/predicate.rs b/predicate/src/predicate.rs index a31b57f3bb..6e4a7d98c4 100644 --- a/predicate/src/predicate.rs +++ b/predicate/src/predicate.rs @@ -124,7 +124,7 @@ impl Predicate { /// `range.start <= time and time < range.end` fn make_timestamp_predicate_expr(&self) -> Option { self.range - .map(|range| make_range_expr(range.start, range.end, TIME_COLUMN_NAME)) + .map(|range| make_range_expr(range.start(), range.end(), TIME_COLUMN_NAME)) } /// Returns true if ths predicate evaluates to true for all rows @@ -184,8 +184,8 @@ impl Predicate { // time_expr = NOT(start <= time_range <= end) // Equivalent to: (time < start OR time > end) let time_expr = col(TIME_COLUMN_NAME) - .lt(lit_timestamp_nano(range.start)) - .or(col(TIME_COLUMN_NAME).gt(lit_timestamp_nano(range.end))); + .lt(lit_timestamp_nano(range.start())) + .or(col(TIME_COLUMN_NAME).gt(lit_timestamp_nano(range.end()))); match expr { None => expr = Some(time_expr), @@ -215,7 +215,7 @@ impl Predicate { /// existing storage engine pub fn clear_timestamp_if_max_range(mut self) -> Self { self.range = self.range.take().and_then(|range| { - if range.start <= MIN_NANO_TIME && range.end >= MAX_NANO_TIME { + if range.start() <= MIN_NANO_TIME && range.end() >= MAX_NANO_TIME { None } else { Some(range) @@ -254,7 +254,7 @@ impl fmt::Display for Predicate { if let Some(range) = &self.range { // TODO: could be nice to show this as actual timestamps (not just numbers)? - write!(f, " range: [{} - {}]", range.start, range.end)?; + write!(f, " range: [{} - {}]", range.start(), range.end())?; } if !self.exprs.is_empty() { @@ -313,7 +313,7 @@ impl PredicateBuilder { "Unexpected re-definition of timestamp range" ); - self.inner.range = Some(TimestampRange { start, end }); + self.inner.range = Some(TimestampRange::new(start, end)); self } diff --git a/predicate/src/regex.rs b/predicate/src/regex.rs index dae111bfcb..37e0c6a327 100644 --- a/predicate/src/regex.rs +++ b/predicate/src/regex.rs @@ -30,6 +30,11 @@ pub fn regex_match_expr(input: Expr, pattern: String, matches: bool) -> Expr { // N.B., this function does not utilise the Arrow regexp compute kernel because // in order to act as a filter it needs to return a boolean array of comparison // results, not an array of strings as the regex compute kernel does. + + // Attempt to make the pattern compatible with what is accepted by + // the golang regexp library which is different than Rust's regexp + let pattern = clean_non_meta_escapes(pattern); + let func = move |args: &[ArrayRef]| { assert_eq!(args.len(), 1); // only works over a single column at a time. @@ -72,6 +77,79 @@ pub fn regex_match_expr(input: Expr, pattern: String, matches: bool) -> Expr { udf.call(vec![input]) } +fn is_valid_character_after_escape(c: char) -> bool { + // same list as https://docs.rs/regex-syntax/0.6.25/src/regex_syntax/ast/parse.rs.html#1445-1538 + match c { + '0'..='7' => true, + '8'..='9' => true, + 'x' | 'u' | 'U' => true, + 'p' | 'P' => true, + 'd' | 's' | 'w' | 'D' | 'S' | 'W' => true, + _ => regex_syntax::is_meta_character(c), + } +} + +/// Removes all `/` patterns that the rust regex library would reject +/// and rewrites them to their unescaped form. +/// +/// For example, `\:` is rewritten to `:` as `\:` is not a valid +/// escape sequence in the `regexp` crate but is valid in golang's +/// regexp implementation. +/// +/// This is done for compatibility purposes so that the regular +/// expression matching in Rust more closely follows the matching in +/// golang, used by the influx storage rpc. +/// +/// See for more details +fn clean_non_meta_escapes(pattern: String) -> String { + if pattern.is_empty() { + return pattern; + } + + #[derive(Debug, Copy, Clone)] + enum SlashState { + No, + Single, + Double, + } + + let mut next_state = SlashState::No; + + let next_chars = pattern + .chars() + .map(Some) + .skip(1) + .chain(std::iter::once(None)); + + // emit char based on previous + let new_pattern: String = pattern + .chars() + .zip(next_chars) + .filter_map(|(c, next_char)| { + let cur_state = next_state; + next_state = match (c, cur_state) { + ('\\', SlashState::No) => SlashState::Single, + ('\\', SlashState::Single) => SlashState::Double, + ('\\', SlashState::Double) => SlashState::Single, + _ => SlashState::No, + }; + + // Decide to emit `c` or not + match (cur_state, c, next_char) { + (SlashState::No, '\\', Some(next_char)) + | (SlashState::Double, '\\', Some(next_char)) + if !is_valid_character_after_escape(next_char) => + { + None + } + _ => Some(c), + } + }) + .collect(); + + new_pattern +} + #[cfg(test)] mod test { use arrow::{ @@ -88,6 +166,8 @@ mod test { }; use std::sync::Arc; + use super::clean_non_meta_escapes; + #[tokio::test] async fn regex_match_expr() { let cases = vec![ @@ -220,4 +300,37 @@ mod test { .map(|s| s.to_owned()) .collect()) } + + #[test] + fn test_clean_non_meta_escapes() { + let cases = vec![ + ("", ""), + (r#"\"#, r#"\"#), + (r#"\\"#, r#"\\"#), + // : is not a special meta character + (r#"\:"#, r#":"#), + // . is a special meta character + (r#"\."#, r#"\."#), + (r#"foo\"#, r#"foo\"#), + (r#"foo\\"#, r#"foo\\"#), + (r#"foo\:"#, r#"foo:"#), + (r#"foo\xff"#, r#"foo\xff"#), + (r#"fo\\o"#, r#"fo\\o"#), + (r#"fo\:o"#, r#"fo:o"#), + (r#"fo\:o\x123"#, r#"fo:o\x123"#), + (r#"fo\:o\x123\:"#, r#"fo:o\x123:"#), + (r#"foo\\\:bar"#, r#"foo\\:bar"#), + (r#"foo\\\:bar\\\:"#, r#"foo\\:bar\\:"#), + ("foo", "foo"), + ]; + + for (pattern, expected) in cases { + let cleaned_pattern = clean_non_meta_escapes(pattern.to_string()); + assert_eq!( + cleaned_pattern, expected, + "Expected '{}' to be cleaned to '{}', got '{}'", + pattern, expected, cleaned_pattern + ); + } + } } diff --git a/query_tests/src/influxrpc/read_filter.rs b/query_tests/src/influxrpc/read_filter.rs index 740cda4ebb..ea0a8aecb0 100644 --- a/query_tests/src/influxrpc/read_filter.rs +++ b/query_tests/src/influxrpc/read_filter.rs @@ -62,10 +62,7 @@ impl DbSetup for TwoMeasurementsMultiSeriesWithDelete { // 2 rows of h2o with timestamp 200 and 350 will be deleted let delete_table_name = "h2o"; let pred = DeletePredicate { - range: TimestampRange { - start: 120, - end: 250, - }, + range: TimestampRange::new(120, 250), exprs: vec![], }; @@ -104,10 +101,7 @@ impl DbSetup for TwoMeasurementsMultiSeriesWithDeleteAll { // pred: delete from h20 where 100 <= time <= 360 let delete_table_name = "h2o"; let pred = DeletePredicate { - range: TimestampRange { - start: 100, - end: 360, - }, + range: TimestampRange::new(100, 360), exprs: vec![], }; @@ -483,7 +477,7 @@ async fn test_read_filter_data_pred_using_regex_match() { .build(); let expected_results = vec![ - "Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200], values: [90.0]", + "Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200], values: [90.0]", ]; run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await; @@ -514,7 +508,7 @@ async fn test_read_filter_data_pred_using_regex_match_with_delete() { .build(); let expected_results = vec![ - "Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]", + "Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]", ]; run_read_filter_test_case( TwoMeasurementsMultiSeriesWithDelete {}, @@ -542,14 +536,59 @@ async fn test_read_filter_data_pred_using_regex_not_match() { .build(); let expected_results = vec![ - "Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [72.4]", - "Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [250], values: [51.0]", - "Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [53.4]", + "Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [72.4]", + "Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [250], values: [51.0]", + "Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [53.4]", ]; run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await; } +#[tokio::test] +async fn test_read_filter_data_pred_regex_escape() { + let predicate = PredicateBuilder::default() + // Came from InfluxQL like `SELECT value FROM db0.rp0.status_code WHERE url =~ /https\:\/\/influxdb\.com/`, + .build_regex_match_expr("url", r#"https\://influxdb\.com"#) + .build(); + + // expect one series with influxdb.com + let expected_results = vec![ + "Series tags={_measurement=status_code, url=https://influxdb.com, _field=value}\n FloatPoints timestamps: [1527018816000000000], values: [418.0]", + ]; + + run_read_filter_test_case(MeasurementStatusCode {}, predicate, expected_results).await; +} + +pub struct MeasurementStatusCode {} +#[async_trait] +impl DbSetup for MeasurementStatusCode { + async fn make(&self) -> Vec { + let partition_key = "2018-05-22T19"; + + let lp = vec![ + "status_code,url=http://www.example.com value=404 1527018806000000000", + "status_code,url=https://influxdb.com value=418 1527018816000000000", + ]; + + all_scenarios_for_one_chunk(vec![], vec![], lp, "status_code", partition_key).await + } +} + +#[tokio::test] +async fn test_read_filter_data_pred_not_match_regex_escape() { + let predicate = PredicateBuilder::default() + // Came from InfluxQL like `SELECT value FROM db0.rp0.status_code WHERE url !~ /https\:\/\/influxdb\.com/`, + .build_regex_not_match_expr("url", r#"https\://influxdb\.com"#) + .build(); + + // expect one series with influxdb.com + let expected_results = vec![ + "Series tags={_measurement=status_code, url=http://www.example.com, _field=value}\n FloatPoints timestamps: [1527018806000000000], values: [404.0]", + ]; + + run_read_filter_test_case(MeasurementStatusCode {}, predicate, expected_results).await; +} + #[tokio::test] async fn test_read_filter_data_pred_unsupported_in_scan() { test_helpers::maybe_start_logging(); @@ -649,10 +688,7 @@ impl DbSetup for MeasurementsSortableTagsWithDelete { // 1 rows of h2o with timestamp 250 will be deleted let delete_table_name = "h2o"; let pred = DeletePredicate { - range: TimestampRange { - start: 120, - end: 350, - }, + range: TimestampRange::new(120, 350), exprs: vec![DeleteExpr::new( "state".to_string(), data_types::delete_predicate::Op::Eq, diff --git a/query_tests/src/influxrpc/read_group.rs b/query_tests/src/influxrpc/read_group.rs index 6d245512b6..bade4fcbca 100644 --- a/query_tests/src/influxrpc/read_group.rs +++ b/query_tests/src/influxrpc/read_group.rs @@ -86,7 +86,7 @@ impl DbSetup for OneMeasurementNoTagsWithDelete { // 1 row of m0 with timestamp 1 let delete_table_name = "m0"; let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 1 }, + range: TimestampRange::new(1, 1), exprs: vec![DeleteExpr::new( "foo".to_string(), data_types::delete_predicate::Op::Eq, @@ -118,7 +118,7 @@ impl DbSetup for OneMeasurementNoTagsWithDeleteAllWithAndWithoutChunk { // pred: delete from m0 where 1 <= time <= 2 let delete_table_name = "m0"; let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![], }; diff --git a/query_tests/src/influxrpc/read_window_aggregate.rs b/query_tests/src/influxrpc/read_window_aggregate.rs index 24f8115c4f..150d58a70b 100644 --- a/query_tests/src/influxrpc/read_window_aggregate.rs +++ b/query_tests/src/influxrpc/read_window_aggregate.rs @@ -315,10 +315,7 @@ impl DbSetup for MeasurementForDefect2697WithDelete { // 1 row of m0 with timestamp 1609459201000000022 (section=2b bar=1.2) let delete_table_name = "mm"; let pred = DeletePredicate { - range: TimestampRange { - start: 1609459201000000022, - end: 1609459201000000022, - }, + range: TimestampRange::new(1609459201000000022, 1609459201000000022), exprs: vec![], }; @@ -347,10 +344,7 @@ impl DbSetup for MeasurementForDefect2697WithDeleteAll { // pred: delete from mm where 1 <= time <= 1609459201000000031 let delete_table_name = "mm"; let pred = DeletePredicate { - range: TimestampRange { - start: 1, - end: 1609459201000000031, - }, + range: TimestampRange::new(1, 1609459201000000031), exprs: vec![], }; diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index 1c356cdc6b..4875fcde07 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -279,10 +279,7 @@ impl DbSetup for OneMeasurementManyNullTagsWithDelete { // 3 rows of h2o & NY state will be deleted let delete_table_name = "h2o"; let pred = DeletePredicate { - range: TimestampRange { - start: 400, - end: 602, - }, + range: TimestampRange::new(400, 602), exprs: vec![DeleteExpr::new( "state".to_string(), data_types::delete_predicate::Op::Eq, @@ -322,10 +319,7 @@ impl DbSetup for OneMeasurementManyNullTagsWithDeleteAll { // all rows of h2o will be deleted let delete_table_name = "h2o"; let pred = DeletePredicate { - range: TimestampRange { - start: 100, - end: 602, - }, + range: TimestampRange::new(100, 602), exprs: vec![], }; @@ -412,10 +406,7 @@ impl DbSetup for TwoMeasurementsWithDelete { // delete 1 row from cpu with timestamp 150 let table_name = "cpu"; let pred = DeletePredicate { - range: TimestampRange { - start: 120, - end: 160, - }, + range: TimestampRange::new(120, 160), exprs: vec![DeleteExpr::new( "region".to_string(), data_types::delete_predicate::Op::Eq, @@ -447,10 +438,7 @@ impl DbSetup for TwoMeasurementsWithDeleteAll { // which will delete second row of the cpu let table_name = "cpu"; let pred1 = DeletePredicate { - range: TimestampRange { - start: 120, - end: 160, - }, + range: TimestampRange::new(120, 160), exprs: vec![DeleteExpr::new( "region".to_string(), data_types::delete_predicate::Op::Eq, @@ -460,7 +448,7 @@ impl DbSetup for TwoMeasurementsWithDeleteAll { // delete the first row of the cpu let pred2 = DeletePredicate { - range: TimestampRange { start: 0, end: 110 }, + range: TimestampRange::new(0, 110), exprs: vec![], }; @@ -885,10 +873,7 @@ impl DbSetup for OneMeasurementManyFieldsWithDelete { // field4 no longer available let delete_table_name = "h2o"; let pred = DeletePredicate { - range: TimestampRange { - start: 1000, - end: 1100, - }, + range: TimestampRange::new(1000, 1100), exprs: vec![], }; @@ -950,10 +935,7 @@ impl DbSetup for EndToEndTestWithDelete { // 1 rows of h2o with timestamp 250 will be deleted let delete_table_name = "swap"; let pred = DeletePredicate { - range: TimestampRange { - start: 6000, - end: 6000, - }, + range: TimestampRange::new(6000, 6000), exprs: vec![DeleteExpr::new( "name".to_string(), data_types::delete_predicate::Op::Eq, diff --git a/query_tests/src/scenarios/delete.rs b/query_tests/src/scenarios/delete.rs index e321ce11bb..cdd8f85849 100644 --- a/query_tests/src/scenarios/delete.rs +++ b/query_tests/src/scenarios/delete.rs @@ -31,7 +31,7 @@ impl DbSetup for OneDeleteSimpleExprOneChunkDeleteAll { // delete predicate let pred = DeletePredicate { - range: TimestampRange { start: 10, end: 20 }, + range: TimestampRange::new(10, 20), exprs: vec![], }; @@ -54,7 +54,7 @@ impl DbSetup for OneDeleteSimpleExprOneChunk { // delete predicate let pred = DeletePredicate { - range: TimestampRange { start: 0, end: 15 }, + range: TimestampRange::new(0, 15), exprs: vec![DeleteExpr::new( "bar".to_string(), data_types::delete_predicate::Op::Eq, @@ -106,7 +106,7 @@ impl DbSetup for OneDeleteMultiExprsOneChunk { ]; // delete predicate let pred = DeletePredicate { - range: TimestampRange { start: 0, end: 30 }, + range: TimestampRange::new(0, 30), exprs: vec![ DeleteExpr::new( "bar".to_string(), @@ -149,7 +149,7 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk { // delete predicate // pred1: delete from cpu where 0 <= time <= 32 and bar = 1 and foo = 'me' let pred1 = DeletePredicate { - range: TimestampRange { start: 0, end: 32 }, + range: TimestampRange::new(0, 32), exprs: vec![ DeleteExpr::new( "bar".to_string(), @@ -166,7 +166,7 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk { // pred2: delete from cpu where 10 <= time <= 40 and bar != 1 let pred2 = DeletePredicate { - range: TimestampRange { start: 10, end: 40 }, + range: TimestampRange::new(10, 40), exprs: vec![DeleteExpr::new( "bar".to_string(), data_types::delete_predicate::Op::Ne, @@ -206,7 +206,7 @@ impl DbSetup for ThreeDeleteThreeChunks { ]; // delete predicate on chunk 1 let pred1 = DeletePredicate { - range: TimestampRange { start: 0, end: 30 }, + range: TimestampRange::new(0, 30), exprs: vec![ DeleteExpr::new( "bar".to_string(), @@ -230,7 +230,7 @@ impl DbSetup for ThreeDeleteThreeChunks { ]; // delete predicate on chunk 1 & chunk 2 let pred2 = DeletePredicate { - range: TimestampRange { start: 20, end: 45 }, + range: TimestampRange::new(20, 45), exprs: vec![DeleteExpr::new( "foo".to_string(), data_types::delete_predicate::Op::Eq, @@ -247,7 +247,7 @@ impl DbSetup for ThreeDeleteThreeChunks { ]; // delete predicate on chunk 3 let pred3 = DeletePredicate { - range: TimestampRange { start: 75, end: 95 }, + range: TimestampRange::new(75, 95), exprs: vec![DeleteExpr::new( "bar".to_string(), data_types::delete_predicate::Op::Ne, diff --git a/router/src/router.rs b/router/src/router.rs index 5ed0f9083c..fe02e90559 100644 --- a/router/src/router.rs +++ b/router/src/router.rs @@ -410,7 +410,7 @@ mod tests { ); let delete = DmlOperation::Delete(DmlDelete::new( DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![], }, Some(NonEmptyString::new("foo_foo").unwrap()), diff --git a/server/tests/delete.rs b/server/tests/delete.rs index 7e2e9cde6b..e5e788a0a9 100644 --- a/server/tests/delete.rs +++ b/server/tests/delete.rs @@ -101,10 +101,7 @@ async fn delete_predicate_preservation() { // ==================== do: delete ==================== let pred = Arc::new(DeletePredicate { - range: TimestampRange { - start: 0, - end: 1_000, - }, + range: TimestampRange::new(0, 1_000), exprs: vec![DeleteExpr::new( "selector".to_string(), data_types::delete_predicate::Op::Eq, diff --git a/server/tests/write_buffer_delete.rs b/server/tests/write_buffer_delete.rs index 380b9cc395..4680c581db 100644 --- a/server/tests/write_buffer_delete.rs +++ b/server/tests/write_buffer_delete.rs @@ -179,7 +179,7 @@ async fn write_buffer_deletes() { fixture .delete(DmlDelete::new( DeletePredicate { - range: TimestampRange { start: 0, end: 20 }, + range: TimestampRange::new(0, 20), exprs: vec![DeleteExpr { column: "x".to_string(), op: Op::Eq, diff --git a/trace/src/span.rs b/trace/src/span.rs index 249af64ee9..9282c4040c 100644 --- a/trace/src/span.rs +++ b/trace/src/span.rs @@ -72,6 +72,11 @@ impl Span { pub fn child(&self, name: impl Into>) -> Self { self.ctx.child(name) } + + /// Link this span to another context. + pub fn link(&mut self, other: &SpanContext) { + self.ctx.links.push((other.trace_id, other.span_id)); + } } #[derive(Debug, Clone)] @@ -126,7 +131,7 @@ pub struct SpanRecorder { span: Option, } -impl<'a> SpanRecorder { +impl SpanRecorder { pub fn new(mut span: Option) -> Self { if let Some(span) = span.as_mut() { span.start = Some(Utc::now()); @@ -177,9 +182,16 @@ impl<'a> SpanRecorder { pub fn span(&self) -> Option<&Span> { self.span.as_ref() } + + /// Link this span to another context. + pub fn link(&mut self, other: &SpanContext) { + if let Some(span) = self.span.as_mut() { + span.link(other); + } + } } -impl<'a> Drop for SpanRecorder { +impl Drop for SpanRecorder { fn drop(&mut self) { if let Some(mut span) = self.span.take() { let now = Utc::now(); diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index c079dfd1e9..7165f4712b 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -20,6 +20,7 @@ parking_lot = "0.11.2" pin-project = "1.0" prost = "0.9" rdkafka = { version = "0.28.0", optional = true } +rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="63cf0a541b864d5376b2f96537a5bbb610d0e4bc" } time = { path = "../time" } tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] } tokio-util = "0.6.9" diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs index 7baaee6166..768a312ccc 100644 --- a/write_buffer/src/config.rs +++ b/write_buffer/src/config.rs @@ -5,6 +5,7 @@ use crate::{ MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, MockBufferSharedState, }, + rskafka::{RSKafkaConsumer, RSKafkaProducer}, }; use data_types::{server_id::ServerId, write_buffer::WriteBufferConnection}; use parking_lot::RwLock; @@ -16,12 +17,6 @@ use std::{ use time::TimeProvider; use trace::TraceCollector; -#[derive(Debug)] -pub enum WriteBufferConfig { - Writing(Arc), - Reading(Arc>>), -} - #[derive(Debug, Clone)] enum Mock { Normal(MockBufferSharedState), @@ -121,6 +116,16 @@ impl WriteBufferConfigFactory { Arc::new(mock_buffer) as _ } }, + "rskafka" => { + let rskafa_buffer = RSKafkaProducer::new( + cfg.connection.clone(), + db_name.to_owned(), + cfg.creation_config.as_ref(), + Arc::clone(&self.time_provider), + ) + .await?; + Arc::new(rskafa_buffer) as _ + } other => { return Err(format!("Unknown write buffer type: {}", other).into()); } @@ -196,6 +201,16 @@ impl WriteBufferConfigFactory { Box::new(mock_buffer) as _ } }, + "rskafka" => { + let rskafka_buffer = RSKafkaConsumer::new( + cfg.connection.clone(), + db_name.to_owned(), + cfg.creation_config.as_ref(), + trace_collector.map(Arc::clone), + ) + .await?; + Box::new(rskafka_buffer) as _ + } other => { return Err(format!("Unknown write buffer type: {}", other).into()); } @@ -245,7 +260,10 @@ impl WriteBufferConfigFactory { #[cfg(test)] mod tests { use super::*; - use crate::{core::test_utils::random_topic_name, mock::MockBufferSharedState}; + use crate::{ + core::test_utils::random_topic_name, maybe_skip_kafka_integration, + mock::MockBufferSharedState, + }; use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName}; use std::{convert::TryFrom, num::NonZeroU32}; use tempfile::TempDir; @@ -446,10 +464,49 @@ mod tests { WriteBufferConfigFactory::new(time, registry) } + #[tokio::test] + async fn test_writing_rskafka() { + let conn = maybe_skip_kafka_integration!(); + let factory = factory(); + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); + let cfg = WriteBufferConnection { + type_: "rskafka".to_string(), + connection: conn, + creation_config: Some(WriteBufferCreationConfig::default()), + ..Default::default() + }; + + let conn = factory + .new_config_write(db_name.as_str(), &cfg) + .await + .unwrap(); + assert_eq!(conn.type_name(), "rskafka"); + } + + #[tokio::test] + async fn test_reading_rskafka() { + let conn = maybe_skip_kafka_integration!(); + let factory = factory(); + let server_id = ServerId::try_from(1).unwrap(); + + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); + let cfg = WriteBufferConnection { + type_: "rskafka".to_string(), + connection: conn, + creation_config: Some(WriteBufferCreationConfig::default()), + ..Default::default() + }; + + let conn = factory + .new_config_read(server_id, db_name.as_str(), None, &cfg) + .await + .unwrap(); + assert_eq!(conn.type_name(), "rskafka"); + } + #[cfg(feature = "kafka")] mod kafka { use super::*; - use crate::maybe_skip_kafka_integration; #[tokio::test] async fn test_writing_kafka() { diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index f59c0fb8da..71970dbf00 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -758,4 +758,51 @@ pub mod test_utils { { set.iter().next().cloned().map(|k| set.take(&k)).flatten() } + + /// Get the testing Kafka connection string or return current scope. + /// + /// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the + /// caller. + /// + /// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide + /// guidance for setting `KAFKA_CONNECTION`. + /// + /// If `TEST_INTEGRATION` is not set, skip the calling test by returning early. + #[macro_export] + macro_rules! maybe_skip_kafka_integration { + () => {{ + use std::env; + dotenv::dotenv().ok(); + + match ( + env::var("TEST_INTEGRATION").is_ok(), + env::var("KAFKA_CONNECT").ok(), + ) { + (true, Some(kafka_connection)) => kafka_connection, + (true, None) => { + panic!( + "TEST_INTEGRATION is set which requires running integration tests, but \ + KAFKA_CONNECT is not set. Please run Kafka, perhaps by using the command \ + `docker-compose -f docker/ci-kafka-docker-compose.yml up kafka`, then \ + set KAFKA_CONNECT to the host and port where Kafka is accessible. If \ + running the `docker-compose` command and the Rust tests on the host, the \ + value for `KAFKA_CONNECT` should be `localhost:9093`. If running the Rust \ + tests in another container in the `docker-compose` network as on CI, \ + `KAFKA_CONNECT` should be `kafka:9092`." + ) + } + (false, Some(_)) => { + eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run"); + return; + } + (false, None) => { + eprintln!( + "skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \ + run" + ); + return; + } + } + }}; + } } diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index e417276556..9f6b037b04 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -703,53 +703,6 @@ pub mod test_utils { use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier}; use std::{collections::BTreeMap, time::Duration}; - /// Get the testing Kafka connection string or return current scope. - /// - /// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the - /// caller. - /// - /// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide - /// guidance for setting `KAFKA_CONNECTION`. - /// - /// If `TEST_INTEGRATION` is not set, skip the calling test by returning early. - #[macro_export] - macro_rules! maybe_skip_kafka_integration { - () => {{ - use std::env; - dotenv::dotenv().ok(); - - match ( - env::var("TEST_INTEGRATION").is_ok(), - env::var("KAFKA_CONNECT").ok(), - ) { - (true, Some(kafka_connection)) => kafka_connection, - (true, None) => { - panic!( - "TEST_INTEGRATION is set which requires running integration tests, but \ - KAFKA_CONNECT is not set. Please run Kafka, perhaps by using the command \ - `docker-compose -f docker/ci-kafka-docker-compose.yml up kafka`, then \ - set KAFKA_CONNECT to the host and port where Kafka is accessible. If \ - running the `docker-compose` command and the Rust tests on the host, the \ - value for `KAFKA_CONNECT` should be `localhost:9093`. If running the Rust \ - tests in another container in the `docker-compose` network as on CI, \ - `KAFKA_CONNECT` should be `kafka:9092`." - ) - } - (false, Some(_)) => { - eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run"); - return; - } - (false, None) => { - eprintln!( - "skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \ - run" - ); - return; - } - } - }}; - } - /// Create topic creation config that is ideal for testing and works with [`purge_kafka_topic`] pub fn kafka_sequencer_options() -> BTreeMap { BTreeMap::from([ diff --git a/write_buffer/src/lib.rs b/write_buffer/src/lib.rs index aaf97776fe..dde917c645 100644 --- a/write_buffer/src/lib.rs +++ b/write_buffer/src/lib.rs @@ -17,3 +17,5 @@ pub mod file; pub mod kafka; pub mod mock; + +pub mod rskafka; diff --git a/write_buffer/src/rskafka.rs b/write_buffer/src/rskafka.rs new file mode 100644 index 0000000000..e031a0f516 --- /dev/null +++ b/write_buffer/src/rskafka.rs @@ -0,0 +1,436 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, + }, +}; + +use async_trait::async_trait; +use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig}; +use dml::{DmlMeta, DmlOperation}; +use futures::{FutureExt, StreamExt}; +use observability_deps::tracing::debug; +use rskafka::{ + client::{ + consumer::StreamConsumerBuilder, + error::{Error as RSKafkaError, ProtocolError}, + partition::PartitionClient, + ClientBuilder, + }, + record::Record, +}; +use time::{Time, TimeProvider}; +use trace::TraceCollector; + +use crate::{ + codec::{ContentType, IoxHeaders}, + core::{ + FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading, + WriteBufferWriting, WriteStream, + }, +}; + +type Result = std::result::Result; + +#[derive(Debug)] +pub struct RSKafkaProducer { + database_name: String, + time_provider: Arc, + // TODO: batched writes + partition_clients: BTreeMap, +} + +impl RSKafkaProducer { + pub async fn new( + conn: String, + database_name: String, + creation_config: Option<&WriteBufferCreationConfig>, + time_provider: Arc, + ) -> Result { + let partition_clients = setup_topic(conn, database_name.clone(), creation_config).await?; + + Ok(Self { + database_name, + time_provider, + partition_clients, + }) + } +} + +#[async_trait] +impl WriteBufferWriting for RSKafkaProducer { + fn sequencer_ids(&self) -> BTreeSet { + self.partition_clients.keys().copied().collect() + } + + async fn store_operation( + &self, + sequencer_id: u32, + operation: &DmlOperation, + ) -> Result { + let partition_client = self + .partition_clients + .get(&sequencer_id) + .ok_or_else::(|| { + format!("Unknown partition: {}", sequencer_id).into() + })?; + + // truncate milliseconds from timestamps because that's what Kafka supports + let now = operation + .meta() + .producer_ts() + .unwrap_or_else(|| self.time_provider.now()); + + let timestamp_millis = now.date_time().timestamp_millis(); + let timestamp = Time::from_timestamp_millis(timestamp_millis); + + let headers = IoxHeaders::new( + ContentType::Protobuf, + operation.meta().span_context().cloned(), + ); + + let mut buf = Vec::new(); + crate::codec::encode_operation(&self.database_name, operation, &mut buf)?; + let buf_len = buf.len(); + + let record = Record { + key: Default::default(), + value: buf, + headers: headers + .headers() + .map(|(k, v)| (k.to_owned(), v.as_bytes().to_vec())) + .collect(), + timestamp: rskafka::time::OffsetDateTime::from_unix_timestamp_nanos( + timestamp_millis as i128 * 1_000_000, + )?, + }; + + let kafka_write_size = record.approximate_size(); + + debug!(db_name=%self.database_name, partition=sequencer_id, size=buf_len, "writing to kafka"); + + let offsets = partition_client.produce(vec![record]).await?; + let offset = offsets[0]; + + debug!(db_name=%self.database_name, %offset, partition=sequencer_id, size=buf_len, "wrote to kafka"); + + Ok(DmlMeta::sequenced( + Sequence::new(sequencer_id, offset.try_into()?), + timestamp, + operation.meta().span_context().cloned(), + kafka_write_size, + )) + } + + fn type_name(&self) -> &'static str { + "rskafka" + } +} + +#[derive(Debug)] +struct ConsumerPartition { + partition_client: Arc, + next_offset: Arc, +} + +#[derive(Debug)] +pub struct RSKafkaConsumer { + partitions: BTreeMap, + trace_collector: Option>, +} + +impl RSKafkaConsumer { + pub async fn new( + conn: String, + database_name: String, + creation_config: Option<&WriteBufferCreationConfig>, + trace_collector: Option>, + ) -> Result { + let partition_clients = setup_topic(conn, database_name.clone(), creation_config).await?; + + let partitions = partition_clients + .into_iter() + .map(|(partition_id, partition_client)| { + let partition_client = Arc::new(partition_client); + let next_offset = Arc::new(AtomicI64::new(0)); + + ( + partition_id, + ConsumerPartition { + partition_client, + next_offset, + }, + ) + }) + .collect(); + + Ok(Self { + partitions, + trace_collector, + }) + } +} + +#[async_trait] +impl WriteBufferReading for RSKafkaConsumer { + fn streams(&mut self) -> BTreeMap> { + let mut streams = BTreeMap::new(); + + for (sequencer_id, partition) in &self.partitions { + let trace_collector = self.trace_collector.clone(); + let next_offset = Arc::clone(&partition.next_offset); + let stream = StreamConsumerBuilder::new( + Arc::clone(&partition.partition_client), + next_offset.load(Ordering::SeqCst), + ) + .with_max_wait_ms(100) + .build(); + let stream = stream.map(move |res| { + let (record, _watermark) = res?; + + let kafka_read_size = record.record.approximate_size(); + + let headers = + IoxHeaders::from_headers(record.record.headers, trace_collector.as_ref())?; + + let sequence = Sequence { + id: *sequencer_id, + number: record.offset.try_into()?, + }; + + let timestamp_millis = + i64::try_from(record.record.timestamp.unix_timestamp_nanos() / 1_000_000)?; + let timestamp = Time::from_timestamp_millis_opt(timestamp_millis) + .ok_or_else::(|| { + format!( + "Cannot parse timestamp for milliseconds: {}", + timestamp_millis + ) + .into() + })?; + + next_offset.store(record.offset + 1, Ordering::SeqCst); + + crate::codec::decode( + &record.record.value, + headers, + sequence, + timestamp, + kafka_read_size, + ) + }); + let stream = stream.boxed(); + + let partition_client = Arc::clone(&partition.partition_client); + let fetch_high_watermark = move || { + let partition_client = Arc::clone(&partition_client); + let fut = async move { + let watermark = partition_client.get_high_watermark().await?; + u64::try_from(watermark).map_err(|e| Box::new(e) as WriteBufferError) + }; + + fut.boxed() as FetchHighWatermarkFut<'_> + }; + let fetch_high_watermark = Box::new(fetch_high_watermark) as FetchHighWatermark<'_>; + + streams.insert( + *sequencer_id, + WriteStream { + stream, + fetch_high_watermark, + }, + ); + } + + streams + } + + async fn seek( + &mut self, + sequencer_id: u32, + sequence_number: u64, + ) -> Result<(), WriteBufferError> { + let partition = self + .partitions + .get_mut(&sequencer_id) + .ok_or_else::(|| { + format!("Unknown partition: {}", sequencer_id).into() + })?; + + let offset = i64::try_from(sequence_number)?; + partition.next_offset.store(offset, Ordering::SeqCst); + + Ok(()) + } + + fn type_name(&self) -> &'static str { + "rskafka" + } +} + +async fn setup_topic( + conn: String, + database_name: String, + creation_config: Option<&WriteBufferCreationConfig>, +) -> Result> { + let client = ClientBuilder::new(vec![conn]).build().await?; + let controller_client = client.controller_client().await?; + + loop { + // check if topic already exists + let topics = client.list_topics().await?; + if let Some(topic) = topics.into_iter().find(|t| t.name == database_name) { + let mut partition_clients = BTreeMap::new(); + for partition in topic.partitions { + let c = client.partition_client(&database_name, partition).await?; + let partition = u32::try_from(partition)?; + partition_clients.insert(partition, c); + } + return Ok(partition_clients); + } + + // create topic + if let Some(creation_config) = creation_config { + match controller_client + .create_topic(&database_name, creation_config.n_sequencers.get() as i32, 1) + .await + { + Ok(_) => {} + // race condition between check and creation action, that's OK + Err(RSKafkaError::ServerError(ProtocolError::TopicAlreadyExists, _)) => {} + Err(e) => { + return Err(e.into()); + } + } + } else { + return Err("no partitions found and auto-creation not requested" + .to_string() + .into()); + } + } +} + +#[cfg(test)] +mod tests { + use std::num::NonZeroU32; + + use futures::{stream::FuturesUnordered, TryStreamExt}; + use trace::RingBufferTraceCollector; + + use crate::{ + core::test_utils::{perform_generic_tests, random_topic_name, TestAdapter, TestContext}, + maybe_skip_kafka_integration, + }; + + use super::*; + + struct RSKafkaTestAdapter { + conn: String, + } + + impl RSKafkaTestAdapter { + fn new(conn: String) -> Self { + Self { conn } + } + } + + #[async_trait] + impl TestAdapter for RSKafkaTestAdapter { + type Context = RSKafkaTestContext; + + async fn new_context_with_time( + &self, + n_sequencers: NonZeroU32, + time_provider: Arc, + ) -> Self::Context { + RSKafkaTestContext { + conn: self.conn.clone(), + database_name: random_topic_name(), + n_sequencers, + time_provider, + } + } + } + + struct RSKafkaTestContext { + conn: String, + database_name: String, + n_sequencers: NonZeroU32, + time_provider: Arc, + } + + impl RSKafkaTestContext { + fn creation_config(&self, value: bool) -> Option { + value.then(|| WriteBufferCreationConfig { + n_sequencers: self.n_sequencers, + ..Default::default() + }) + } + } + + #[async_trait] + impl TestContext for RSKafkaTestContext { + type Writing = RSKafkaProducer; + + type Reading = RSKafkaConsumer; + + async fn writing(&self, creation_config: bool) -> Result { + RSKafkaProducer::new( + self.conn.clone(), + self.database_name.clone(), + self.creation_config(creation_config).as_ref(), + Arc::clone(&self.time_provider), + ) + .await + } + + async fn reading(&self, creation_config: bool) -> Result { + let collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); + + RSKafkaConsumer::new( + self.conn.clone(), + self.database_name.clone(), + self.creation_config(creation_config).as_ref(), + Some(collector), + ) + .await + } + } + + #[tokio::test] + async fn test_generic() { + let conn = maybe_skip_kafka_integration!(); + + perform_generic_tests(RSKafkaTestAdapter::new(conn)).await; + } + + #[tokio::test] + async fn test_setup_topic_race() { + let conn = maybe_skip_kafka_integration!(); + let topic_name = random_topic_name(); + let n_partitions = NonZeroU32::new(2).unwrap(); + + let mut jobs: FuturesUnordered<_> = (0..10) + .map(|_| { + let conn = conn.clone(); + let topic_name = topic_name.clone(); + + tokio::spawn(async move { + setup_topic( + conn, + topic_name, + Some(&WriteBufferCreationConfig { + n_sequencers: n_partitions, + ..Default::default() + }), + ) + .await + .unwrap(); + }) + }) + .collect(); + + while jobs.try_next().await.unwrap().is_some() {} + } +}