From 76dd62a6c26c40ffc2325a32563433d5d5908740 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 20 Jan 2022 12:36:05 +0100 Subject: [PATCH 1/7] feat: RSKafka-driven write buffer --- Cargo.lock | 37 +++ write_buffer/Cargo.toml | 1 + write_buffer/src/config.rs | 73 +++++- write_buffer/src/core.rs | 47 ++++ write_buffer/src/kafka.rs | 47 ---- write_buffer/src/lib.rs | 2 + write_buffer/src/rskafka.rs | 436 ++++++++++++++++++++++++++++++++++++ 7 files changed, 588 insertions(+), 55 deletions(-) create mode 100644 write_buffer/src/rskafka.rs diff --git a/Cargo.lock b/Cargo.lock index 04a96ff666..252b5ad600 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3752,6 +3752,27 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "rskafka" +version = "0.1.0" +source = "git+https://github.com/influxdata/rskafka.git?rev=63cf0a541b864d5376b2f96537a5bbb610d0e4bc#63cf0a541b864d5376b2f96537a5bbb610d0e4bc" +dependencies = [ + "async-trait", + "bytes", + "crc", + "futures", + "parking_lot", + "pin-project-lite", + "rand", + "rustls", + "thiserror", + "time 0.3.5", + "tokio", + "tokio-rustls", + "tracing", + "varint-rs", +] + [[package]] name = "rusoto_core" version = "0.47.0" @@ -4645,6 +4666,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "time" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41effe7cfa8af36f439fac33861b66b049edc6f9a32331e2312660529c1c24ad" +dependencies = [ + "libc", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -5124,6 +5154,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "varint-rs" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" + [[package]] name = "vcpkg" version = "0.2.15" @@ -5446,6 +5482,7 @@ dependencies = [ "pin-project", "prost", "rdkafka", + "rskafka", "tempfile", "time 0.1.0", "tokio", diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index c079dfd1e9..7165f4712b 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -20,6 +20,7 @@ parking_lot = "0.11.2" pin-project = "1.0" prost = "0.9" rdkafka = { version = "0.28.0", optional = true } +rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="63cf0a541b864d5376b2f96537a5bbb610d0e4bc" } time = { path = "../time" } tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] } tokio-util = "0.6.9" diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs index 7baaee6166..768a312ccc 100644 --- a/write_buffer/src/config.rs +++ b/write_buffer/src/config.rs @@ -5,6 +5,7 @@ use crate::{ MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, MockBufferSharedState, }, + rskafka::{RSKafkaConsumer, RSKafkaProducer}, }; use data_types::{server_id::ServerId, write_buffer::WriteBufferConnection}; use parking_lot::RwLock; @@ -16,12 +17,6 @@ use std::{ use time::TimeProvider; use trace::TraceCollector; -#[derive(Debug)] -pub enum WriteBufferConfig { - Writing(Arc), - Reading(Arc>>), -} - #[derive(Debug, Clone)] enum Mock { Normal(MockBufferSharedState), @@ -121,6 +116,16 @@ impl WriteBufferConfigFactory { Arc::new(mock_buffer) as _ } }, + "rskafka" => { + let rskafa_buffer = RSKafkaProducer::new( + cfg.connection.clone(), + db_name.to_owned(), + cfg.creation_config.as_ref(), + Arc::clone(&self.time_provider), + ) + .await?; + Arc::new(rskafa_buffer) as _ + } other => { return Err(format!("Unknown write buffer type: {}", other).into()); } @@ -196,6 +201,16 @@ impl WriteBufferConfigFactory { Box::new(mock_buffer) as _ } }, + "rskafka" => { + let rskafka_buffer = RSKafkaConsumer::new( + cfg.connection.clone(), + db_name.to_owned(), + cfg.creation_config.as_ref(), + trace_collector.map(Arc::clone), + ) + .await?; + Box::new(rskafka_buffer) as _ + } other => { return Err(format!("Unknown write buffer type: {}", other).into()); } @@ -245,7 +260,10 @@ impl WriteBufferConfigFactory { #[cfg(test)] mod tests { use super::*; - use crate::{core::test_utils::random_topic_name, mock::MockBufferSharedState}; + use crate::{ + core::test_utils::random_topic_name, maybe_skip_kafka_integration, + mock::MockBufferSharedState, + }; use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName}; use std::{convert::TryFrom, num::NonZeroU32}; use tempfile::TempDir; @@ -446,10 +464,49 @@ mod tests { WriteBufferConfigFactory::new(time, registry) } + #[tokio::test] + async fn test_writing_rskafka() { + let conn = maybe_skip_kafka_integration!(); + let factory = factory(); + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); + let cfg = WriteBufferConnection { + type_: "rskafka".to_string(), + connection: conn, + creation_config: Some(WriteBufferCreationConfig::default()), + ..Default::default() + }; + + let conn = factory + .new_config_write(db_name.as_str(), &cfg) + .await + .unwrap(); + assert_eq!(conn.type_name(), "rskafka"); + } + + #[tokio::test] + async fn test_reading_rskafka() { + let conn = maybe_skip_kafka_integration!(); + let factory = factory(); + let server_id = ServerId::try_from(1).unwrap(); + + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); + let cfg = WriteBufferConnection { + type_: "rskafka".to_string(), + connection: conn, + creation_config: Some(WriteBufferCreationConfig::default()), + ..Default::default() + }; + + let conn = factory + .new_config_read(server_id, db_name.as_str(), None, &cfg) + .await + .unwrap(); + assert_eq!(conn.type_name(), "rskafka"); + } + #[cfg(feature = "kafka")] mod kafka { use super::*; - use crate::maybe_skip_kafka_integration; #[tokio::test] async fn test_writing_kafka() { diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index f59c0fb8da..71970dbf00 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -758,4 +758,51 @@ pub mod test_utils { { set.iter().next().cloned().map(|k| set.take(&k)).flatten() } + + /// Get the testing Kafka connection string or return current scope. + /// + /// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the + /// caller. + /// + /// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide + /// guidance for setting `KAFKA_CONNECTION`. + /// + /// If `TEST_INTEGRATION` is not set, skip the calling test by returning early. + #[macro_export] + macro_rules! maybe_skip_kafka_integration { + () => {{ + use std::env; + dotenv::dotenv().ok(); + + match ( + env::var("TEST_INTEGRATION").is_ok(), + env::var("KAFKA_CONNECT").ok(), + ) { + (true, Some(kafka_connection)) => kafka_connection, + (true, None) => { + panic!( + "TEST_INTEGRATION is set which requires running integration tests, but \ + KAFKA_CONNECT is not set. Please run Kafka, perhaps by using the command \ + `docker-compose -f docker/ci-kafka-docker-compose.yml up kafka`, then \ + set KAFKA_CONNECT to the host and port where Kafka is accessible. If \ + running the `docker-compose` command and the Rust tests on the host, the \ + value for `KAFKA_CONNECT` should be `localhost:9093`. If running the Rust \ + tests in another container in the `docker-compose` network as on CI, \ + `KAFKA_CONNECT` should be `kafka:9092`." + ) + } + (false, Some(_)) => { + eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run"); + return; + } + (false, None) => { + eprintln!( + "skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \ + run" + ); + return; + } + } + }}; + } } diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index e417276556..9f6b037b04 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -703,53 +703,6 @@ pub mod test_utils { use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier}; use std::{collections::BTreeMap, time::Duration}; - /// Get the testing Kafka connection string or return current scope. - /// - /// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the - /// caller. - /// - /// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide - /// guidance for setting `KAFKA_CONNECTION`. - /// - /// If `TEST_INTEGRATION` is not set, skip the calling test by returning early. - #[macro_export] - macro_rules! maybe_skip_kafka_integration { - () => {{ - use std::env; - dotenv::dotenv().ok(); - - match ( - env::var("TEST_INTEGRATION").is_ok(), - env::var("KAFKA_CONNECT").ok(), - ) { - (true, Some(kafka_connection)) => kafka_connection, - (true, None) => { - panic!( - "TEST_INTEGRATION is set which requires running integration tests, but \ - KAFKA_CONNECT is not set. Please run Kafka, perhaps by using the command \ - `docker-compose -f docker/ci-kafka-docker-compose.yml up kafka`, then \ - set KAFKA_CONNECT to the host and port where Kafka is accessible. If \ - running the `docker-compose` command and the Rust tests on the host, the \ - value for `KAFKA_CONNECT` should be `localhost:9093`. If running the Rust \ - tests in another container in the `docker-compose` network as on CI, \ - `KAFKA_CONNECT` should be `kafka:9092`." - ) - } - (false, Some(_)) => { - eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run"); - return; - } - (false, None) => { - eprintln!( - "skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \ - run" - ); - return; - } - } - }}; - } - /// Create topic creation config that is ideal for testing and works with [`purge_kafka_topic`] pub fn kafka_sequencer_options() -> BTreeMap { BTreeMap::from([ diff --git a/write_buffer/src/lib.rs b/write_buffer/src/lib.rs index aaf97776fe..dde917c645 100644 --- a/write_buffer/src/lib.rs +++ b/write_buffer/src/lib.rs @@ -17,3 +17,5 @@ pub mod file; pub mod kafka; pub mod mock; + +pub mod rskafka; diff --git a/write_buffer/src/rskafka.rs b/write_buffer/src/rskafka.rs new file mode 100644 index 0000000000..e031a0f516 --- /dev/null +++ b/write_buffer/src/rskafka.rs @@ -0,0 +1,436 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, + }, +}; + +use async_trait::async_trait; +use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig}; +use dml::{DmlMeta, DmlOperation}; +use futures::{FutureExt, StreamExt}; +use observability_deps::tracing::debug; +use rskafka::{ + client::{ + consumer::StreamConsumerBuilder, + error::{Error as RSKafkaError, ProtocolError}, + partition::PartitionClient, + ClientBuilder, + }, + record::Record, +}; +use time::{Time, TimeProvider}; +use trace::TraceCollector; + +use crate::{ + codec::{ContentType, IoxHeaders}, + core::{ + FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading, + WriteBufferWriting, WriteStream, + }, +}; + +type Result = std::result::Result; + +#[derive(Debug)] +pub struct RSKafkaProducer { + database_name: String, + time_provider: Arc, + // TODO: batched writes + partition_clients: BTreeMap, +} + +impl RSKafkaProducer { + pub async fn new( + conn: String, + database_name: String, + creation_config: Option<&WriteBufferCreationConfig>, + time_provider: Arc, + ) -> Result { + let partition_clients = setup_topic(conn, database_name.clone(), creation_config).await?; + + Ok(Self { + database_name, + time_provider, + partition_clients, + }) + } +} + +#[async_trait] +impl WriteBufferWriting for RSKafkaProducer { + fn sequencer_ids(&self) -> BTreeSet { + self.partition_clients.keys().copied().collect() + } + + async fn store_operation( + &self, + sequencer_id: u32, + operation: &DmlOperation, + ) -> Result { + let partition_client = self + .partition_clients + .get(&sequencer_id) + .ok_or_else::(|| { + format!("Unknown partition: {}", sequencer_id).into() + })?; + + // truncate milliseconds from timestamps because that's what Kafka supports + let now = operation + .meta() + .producer_ts() + .unwrap_or_else(|| self.time_provider.now()); + + let timestamp_millis = now.date_time().timestamp_millis(); + let timestamp = Time::from_timestamp_millis(timestamp_millis); + + let headers = IoxHeaders::new( + ContentType::Protobuf, + operation.meta().span_context().cloned(), + ); + + let mut buf = Vec::new(); + crate::codec::encode_operation(&self.database_name, operation, &mut buf)?; + let buf_len = buf.len(); + + let record = Record { + key: Default::default(), + value: buf, + headers: headers + .headers() + .map(|(k, v)| (k.to_owned(), v.as_bytes().to_vec())) + .collect(), + timestamp: rskafka::time::OffsetDateTime::from_unix_timestamp_nanos( + timestamp_millis as i128 * 1_000_000, + )?, + }; + + let kafka_write_size = record.approximate_size(); + + debug!(db_name=%self.database_name, partition=sequencer_id, size=buf_len, "writing to kafka"); + + let offsets = partition_client.produce(vec![record]).await?; + let offset = offsets[0]; + + debug!(db_name=%self.database_name, %offset, partition=sequencer_id, size=buf_len, "wrote to kafka"); + + Ok(DmlMeta::sequenced( + Sequence::new(sequencer_id, offset.try_into()?), + timestamp, + operation.meta().span_context().cloned(), + kafka_write_size, + )) + } + + fn type_name(&self) -> &'static str { + "rskafka" + } +} + +#[derive(Debug)] +struct ConsumerPartition { + partition_client: Arc, + next_offset: Arc, +} + +#[derive(Debug)] +pub struct RSKafkaConsumer { + partitions: BTreeMap, + trace_collector: Option>, +} + +impl RSKafkaConsumer { + pub async fn new( + conn: String, + database_name: String, + creation_config: Option<&WriteBufferCreationConfig>, + trace_collector: Option>, + ) -> Result { + let partition_clients = setup_topic(conn, database_name.clone(), creation_config).await?; + + let partitions = partition_clients + .into_iter() + .map(|(partition_id, partition_client)| { + let partition_client = Arc::new(partition_client); + let next_offset = Arc::new(AtomicI64::new(0)); + + ( + partition_id, + ConsumerPartition { + partition_client, + next_offset, + }, + ) + }) + .collect(); + + Ok(Self { + partitions, + trace_collector, + }) + } +} + +#[async_trait] +impl WriteBufferReading for RSKafkaConsumer { + fn streams(&mut self) -> BTreeMap> { + let mut streams = BTreeMap::new(); + + for (sequencer_id, partition) in &self.partitions { + let trace_collector = self.trace_collector.clone(); + let next_offset = Arc::clone(&partition.next_offset); + let stream = StreamConsumerBuilder::new( + Arc::clone(&partition.partition_client), + next_offset.load(Ordering::SeqCst), + ) + .with_max_wait_ms(100) + .build(); + let stream = stream.map(move |res| { + let (record, _watermark) = res?; + + let kafka_read_size = record.record.approximate_size(); + + let headers = + IoxHeaders::from_headers(record.record.headers, trace_collector.as_ref())?; + + let sequence = Sequence { + id: *sequencer_id, + number: record.offset.try_into()?, + }; + + let timestamp_millis = + i64::try_from(record.record.timestamp.unix_timestamp_nanos() / 1_000_000)?; + let timestamp = Time::from_timestamp_millis_opt(timestamp_millis) + .ok_or_else::(|| { + format!( + "Cannot parse timestamp for milliseconds: {}", + timestamp_millis + ) + .into() + })?; + + next_offset.store(record.offset + 1, Ordering::SeqCst); + + crate::codec::decode( + &record.record.value, + headers, + sequence, + timestamp, + kafka_read_size, + ) + }); + let stream = stream.boxed(); + + let partition_client = Arc::clone(&partition.partition_client); + let fetch_high_watermark = move || { + let partition_client = Arc::clone(&partition_client); + let fut = async move { + let watermark = partition_client.get_high_watermark().await?; + u64::try_from(watermark).map_err(|e| Box::new(e) as WriteBufferError) + }; + + fut.boxed() as FetchHighWatermarkFut<'_> + }; + let fetch_high_watermark = Box::new(fetch_high_watermark) as FetchHighWatermark<'_>; + + streams.insert( + *sequencer_id, + WriteStream { + stream, + fetch_high_watermark, + }, + ); + } + + streams + } + + async fn seek( + &mut self, + sequencer_id: u32, + sequence_number: u64, + ) -> Result<(), WriteBufferError> { + let partition = self + .partitions + .get_mut(&sequencer_id) + .ok_or_else::(|| { + format!("Unknown partition: {}", sequencer_id).into() + })?; + + let offset = i64::try_from(sequence_number)?; + partition.next_offset.store(offset, Ordering::SeqCst); + + Ok(()) + } + + fn type_name(&self) -> &'static str { + "rskafka" + } +} + +async fn setup_topic( + conn: String, + database_name: String, + creation_config: Option<&WriteBufferCreationConfig>, +) -> Result> { + let client = ClientBuilder::new(vec![conn]).build().await?; + let controller_client = client.controller_client().await?; + + loop { + // check if topic already exists + let topics = client.list_topics().await?; + if let Some(topic) = topics.into_iter().find(|t| t.name == database_name) { + let mut partition_clients = BTreeMap::new(); + for partition in topic.partitions { + let c = client.partition_client(&database_name, partition).await?; + let partition = u32::try_from(partition)?; + partition_clients.insert(partition, c); + } + return Ok(partition_clients); + } + + // create topic + if let Some(creation_config) = creation_config { + match controller_client + .create_topic(&database_name, creation_config.n_sequencers.get() as i32, 1) + .await + { + Ok(_) => {} + // race condition between check and creation action, that's OK + Err(RSKafkaError::ServerError(ProtocolError::TopicAlreadyExists, _)) => {} + Err(e) => { + return Err(e.into()); + } + } + } else { + return Err("no partitions found and auto-creation not requested" + .to_string() + .into()); + } + } +} + +#[cfg(test)] +mod tests { + use std::num::NonZeroU32; + + use futures::{stream::FuturesUnordered, TryStreamExt}; + use trace::RingBufferTraceCollector; + + use crate::{ + core::test_utils::{perform_generic_tests, random_topic_name, TestAdapter, TestContext}, + maybe_skip_kafka_integration, + }; + + use super::*; + + struct RSKafkaTestAdapter { + conn: String, + } + + impl RSKafkaTestAdapter { + fn new(conn: String) -> Self { + Self { conn } + } + } + + #[async_trait] + impl TestAdapter for RSKafkaTestAdapter { + type Context = RSKafkaTestContext; + + async fn new_context_with_time( + &self, + n_sequencers: NonZeroU32, + time_provider: Arc, + ) -> Self::Context { + RSKafkaTestContext { + conn: self.conn.clone(), + database_name: random_topic_name(), + n_sequencers, + time_provider, + } + } + } + + struct RSKafkaTestContext { + conn: String, + database_name: String, + n_sequencers: NonZeroU32, + time_provider: Arc, + } + + impl RSKafkaTestContext { + fn creation_config(&self, value: bool) -> Option { + value.then(|| WriteBufferCreationConfig { + n_sequencers: self.n_sequencers, + ..Default::default() + }) + } + } + + #[async_trait] + impl TestContext for RSKafkaTestContext { + type Writing = RSKafkaProducer; + + type Reading = RSKafkaConsumer; + + async fn writing(&self, creation_config: bool) -> Result { + RSKafkaProducer::new( + self.conn.clone(), + self.database_name.clone(), + self.creation_config(creation_config).as_ref(), + Arc::clone(&self.time_provider), + ) + .await + } + + async fn reading(&self, creation_config: bool) -> Result { + let collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); + + RSKafkaConsumer::new( + self.conn.clone(), + self.database_name.clone(), + self.creation_config(creation_config).as_ref(), + Some(collector), + ) + .await + } + } + + #[tokio::test] + async fn test_generic() { + let conn = maybe_skip_kafka_integration!(); + + perform_generic_tests(RSKafkaTestAdapter::new(conn)).await; + } + + #[tokio::test] + async fn test_setup_topic_race() { + let conn = maybe_skip_kafka_integration!(); + let topic_name = random_topic_name(); + let n_partitions = NonZeroU32::new(2).unwrap(); + + let mut jobs: FuturesUnordered<_> = (0..10) + .map(|_| { + let conn = conn.clone(); + let topic_name = topic_name.clone(); + + tokio::spawn(async move { + setup_topic( + conn, + topic_name, + Some(&WriteBufferCreationConfig { + n_sequencers: n_partitions, + ..Default::default() + }), + ) + .await + .unwrap(); + }) + }) + .collect(); + + while jobs.try_next().await.unwrap().is_some() {} + } +} From 9c19cd6cc4f2af30e42c01f6e50b7111fd83603b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 20 Jan 2022 11:08:00 -0500 Subject: [PATCH 2/7] fix: clamp start/end of TimestampRange to min/max valid timestamp values (#3487) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- data_types/src/delete_predicate.rs | 14 ++--- data_types/src/timestamp.rs | 54 +++++++++++++++++-- db/src/catalog/chunk.rs | 4 +- db/src/lifecycle/compact.rs | 10 +--- db/src/lifecycle/compact_object_store.rs | 8 +-- db/src/lifecycle/persist.rs | 17 ++---- db/src/pred.rs | 2 +- db/src/replay.rs | 6 +-- dml/src/lib.rs | 6 +-- generated_types/src/delete_predicate.rs | 9 ++-- .../influxdb_ioxd/server_type/router/http.rs | 2 +- .../tests/end_to_end_cases/delete_api.rs | 10 +--- parquet_catalog/src/test_helpers.rs | 2 +- predicate/src/delete_predicate.rs | 9 ++-- predicate/src/predicate.rs | 12 ++--- query_tests/src/influxrpc/read_filter.rs | 15 ++---- query_tests/src/influxrpc/read_group.rs | 4 +- .../src/influxrpc/read_window_aggregate.rs | 10 +--- query_tests/src/scenarios.rs | 32 +++-------- query_tests/src/scenarios/delete.rs | 16 +++--- router/src/router.rs | 2 +- server/tests/delete.rs | 5 +- server/tests/write_buffer_delete.rs | 2 +- 23 files changed, 117 insertions(+), 134 deletions(-) diff --git a/data_types/src/delete_predicate.rs b/data_types/src/delete_predicate.rs index 89c92af3fa..33f30b81d8 100644 --- a/data_types/src/delete_predicate.rs +++ b/data_types/src/delete_predicate.rs @@ -169,7 +169,7 @@ mod tests { #[test] fn test_expr_to_sql_no_expressions() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![], }; assert_eq!(&pred.expr_sql_string(), ""); @@ -178,7 +178,7 @@ mod tests { #[test] fn test_expr_to_sql_operators() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![ DeleteExpr { column: String::from("col1"), @@ -198,7 +198,7 @@ mod tests { #[test] fn test_expr_to_sql_column_escape() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![ DeleteExpr { column: String::from("col 1"), @@ -226,7 +226,7 @@ mod tests { #[test] fn test_expr_to_sql_bool() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![ DeleteExpr { column: String::from("col1"), @@ -246,7 +246,7 @@ mod tests { #[test] fn test_expr_to_sql_i64() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![ DeleteExpr { column: String::from("col1"), @@ -284,7 +284,7 @@ mod tests { #[test] fn test_expr_to_sql_f64() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![ DeleteExpr { column: String::from("col1"), @@ -327,7 +327,7 @@ mod tests { #[test] fn test_expr_to_sql_string() { let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![ DeleteExpr { column: String::from("col1"), diff --git a/data_types/src/timestamp.rs b/data_types/src/timestamp.rs index 71a8e14832..b6a1ecd542 100644 --- a/data_types/src/timestamp.rs +++ b/data_types/src/timestamp.rs @@ -36,15 +36,17 @@ pub const MAX_NANO_TIME: i64 = i64::MAX - 1; /// ``` #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Copy, Debug, Hash)] pub struct TimestampRange { - /// Start defines the inclusive lower bound. - pub start: i64, - /// End defines the exclusive upper bound. - pub end: i64, + /// Start defines the inclusive lower bound. Minimum value is [MIN_NANO_TIME] + start: i64, + /// End defines the exclusive upper bound. Maximum value is [MAX_NANO_TIME] + end: i64, } impl TimestampRange { pub fn new(start: i64, end: i64) -> Self { debug_assert!(end >= start); + let start = start.max(MIN_NANO_TIME); + let end = end.min(MAX_NANO_TIME); Self { start, end } } @@ -59,6 +61,16 @@ impl TimestampRange { pub fn contains_opt(&self, v: Option) -> bool { Some(true) == v.map(|ts| self.contains(ts)) } + + /// Return the timestamp range's end. + pub fn end(&self) -> i64 { + self.end + } + + /// Return the timestamp range's start. + pub fn start(&self) -> i64 { + self.start + } } /// Specifies a min/max timestamp value. @@ -93,6 +105,40 @@ impl TimestampMinMax { mod tests { use super::*; + #[test] + fn test_timestamp_nano_min_max() { + let cases = vec![ + ( + "MIN/MAX Nanos", + TimestampRange::new(MIN_NANO_TIME, MAX_NANO_TIME), + ), + ("MIN/MAX i64", TimestampRange::new(i64::MIN, i64::MAX)), + ]; + + for (name, range) in cases { + println!("case: {}", name); + assert!(!range.contains(i64::MIN)); + assert!(range.contains(MIN_NANO_TIME)); + assert!(range.contains(MIN_NANO_TIME + 1)); + assert!(range.contains(MAX_NANO_TIME - 1)); + assert!(!range.contains(MAX_NANO_TIME)); + assert!(!range.contains(i64::MAX)); + } + } + + #[test] + fn test_timestamp_i64_min_max_offset() { + let range = TimestampRange::new(MIN_NANO_TIME + 1, MAX_NANO_TIME - 1); + + assert!(!range.contains(i64::MIN)); + assert!(!range.contains(MIN_NANO_TIME)); + assert!(range.contains(MIN_NANO_TIME + 1)); + assert!(range.contains(MAX_NANO_TIME - 2)); + assert!(!range.contains(MAX_NANO_TIME - 1)); + assert!(!range.contains(MAX_NANO_TIME)); + assert!(!range.contains(i64::MAX)); + } + #[test] fn test_timestamp_range_contains() { let range = TimestampRange::new(100, 200); diff --git a/db/src/catalog/chunk.rs b/db/src/catalog/chunk.rs index dddfbf1f02..cf5314681a 100644 --- a/db/src/catalog/chunk.rs +++ b/db/src/catalog/chunk.rs @@ -1170,7 +1170,7 @@ mod tests { // Build delete predicate and expected output let del_pred1 = Arc::new(DeletePredicate { - range: TimestampRange { start: 0, end: 100 }, + range: TimestampRange::new(0, 100), exprs: vec![DeleteExpr::new( "city".to_string(), data_types::delete_predicate::Op::Eq, @@ -1192,7 +1192,7 @@ mod tests { // let add more delete predicate = simulate second delete // Build delete predicate and expected output let del_pred2 = Arc::new(DeletePredicate { - range: TimestampRange { start: 20, end: 50 }, + range: TimestampRange::new(20, 50), exprs: vec![DeleteExpr::new( "cost".to_string(), data_types::delete_predicate::Op::Ne, diff --git a/db/src/lifecycle/compact.rs b/db/src/lifecycle/compact.rs index 2587cd4e07..e750019277 100644 --- a/db/src/lifecycle/compact.rs +++ b/db/src/lifecycle/compact.rs @@ -255,10 +255,7 @@ mod tests { // Cannot simply use empty predicate (#2687) let predicate = Arc::new(DeletePredicate { - range: TimestampRange { - start: 0, - end: 1_000, - }, + range: TimestampRange::new(0, 1_000), exprs: vec![], }); @@ -288,10 +285,7 @@ mod tests { write_lp(db.as_ref(), "cpu foo=3 20"); write_lp(db.as_ref(), "cpu foo=4 20"); - let range = TimestampRange { - start: 0, - end: 1_000, - }; + let range = TimestampRange::new(0, 1_000); let pred1 = Arc::new(DeletePredicate { range, exprs: vec![DeleteExpr::new("foo".to_string(), Op::Eq, Scalar::I64(1))], diff --git a/db/src/lifecycle/compact_object_store.rs b/db/src/lifecycle/compact_object_store.rs index ab37e808d9..eb7c565956 100644 --- a/db/src/lifecycle/compact_object_store.rs +++ b/db/src/lifecycle/compact_object_store.rs @@ -811,7 +811,7 @@ mod tests { // Create 3 delete predicates that will delete all cookies in 3 different deletes let pred1 = Arc::new(DeletePredicate { - range: TimestampRange { start: 0, end: 11 }, + range: TimestampRange::new(0, 11), exprs: vec![DeleteExpr::new( "tag1".to_string(), data_types::delete_predicate::Op::Eq, @@ -819,7 +819,7 @@ mod tests { )], }); let pred2 = Arc::new(DeletePredicate { - range: TimestampRange { start: 12, end: 21 }, + range: TimestampRange::new(12, 21), exprs: vec![DeleteExpr::new( "tag1".to_string(), data_types::delete_predicate::Op::Eq, @@ -827,7 +827,7 @@ mod tests { )], }); let pred3 = Arc::new(DeletePredicate { - range: TimestampRange { start: 22, end: 31 }, + range: TimestampRange::new(22, 31), exprs: vec![DeleteExpr::new( "tag1".to_string(), data_types::delete_predicate::Op::Eq, @@ -976,7 +976,7 @@ mod tests { // Delete all let predicate = Arc::new(DeletePredicate { - range: TimestampRange { start: 0, end: 30 }, + range: TimestampRange::new(0, 30), exprs: vec![], }); db.delete("cpu", predicate).unwrap(); diff --git a/db/src/lifecycle/persist.rs b/db/src/lifecycle/persist.rs index 263d8ba044..fcec26241c 100644 --- a/db/src/lifecycle/persist.rs +++ b/db/src/lifecycle/persist.rs @@ -324,7 +324,7 @@ mod tests { // Delete first row let predicate = Arc::new(DeletePredicate { - range: TimestampRange { start: 0, end: 20 }, + range: TimestampRange::new(0, 20), exprs: vec![], }); @@ -388,10 +388,7 @@ mod tests { // Delete everything let predicate = Arc::new(DeletePredicate { - range: TimestampRange { - start: 0, - end: 1_000, - }, + range: TimestampRange::new(0, 1_000), exprs: vec![], }); @@ -430,10 +427,7 @@ mod tests { // Cannot simply use empty predicate (#2687) let predicate = Arc::new(DeletePredicate { - range: TimestampRange { - start: 0, - end: 1_000, - }, + range: TimestampRange::new(0, 1_000), exprs: vec![], }); @@ -494,10 +488,7 @@ mod tests { write_lp(db.as_ref(), "cpu foo=3 20"); write_lp(db.as_ref(), "cpu foo=4 20"); - let range = TimestampRange { - start: 0, - end: 1_000, - }; + let range = TimestampRange::new(0, 1_000); let pred1 = Arc::new(DeletePredicate { range, exprs: vec![DeleteExpr::new("foo".to_string(), Op::Eq, Scalar::I64(1))], diff --git a/db/src/pred.rs b/db/src/pred.rs index 482f6f56c5..0b8f456a2f 100644 --- a/db/src/pred.rs +++ b/db/src/pred.rs @@ -37,7 +37,7 @@ pub fn to_read_buffer_predicate(predicate: &Predicate) -> Result { - read_buffer::Predicate::with_time_range(&exprs, range.start, range.end) + read_buffer::Predicate::with_time_range(&exprs, range.start(), range.end()) } None => read_buffer::Predicate::new(exprs), }) diff --git a/db/src/replay.rs b/db/src/replay.rs index b26e77bd47..672c9dd869 100644 --- a/db/src/replay.rs +++ b/db/src/replay.rs @@ -2653,7 +2653,7 @@ mod tests { sequence_number: 1, table_name: None, predicate: DeletePredicate { - range: TimestampRange { start: 0, end: 20 }, + range: TimestampRange::new(0, 20), exprs: vec![], }, }]), @@ -2707,7 +2707,7 @@ mod tests { sequence_number: 1, table_name: None, predicate: DeletePredicate { - range: TimestampRange { start: 0, end: 11 }, + range: TimestampRange::new(0, 11), exprs: vec![], }, }]), @@ -2817,7 +2817,7 @@ mod tests { sequence_number: 2, table_name: Some("table_1"), predicate: DeletePredicate { - range: TimestampRange { start: 19, end: 21 }, + range: TimestampRange::new(19, 21), exprs: vec![], }, }]), diff --git a/dml/src/lib.rs b/dml/src/lib.rs index d789c7acc0..ca4b260c8a 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -593,7 +593,7 @@ mod tests { let meta = DmlMeta::unsequenced(None); let delete = DmlDelete::new( DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![], }, None, @@ -615,7 +615,7 @@ mod tests { let meta = DmlMeta::unsequenced(None); let delete = DmlDelete::new( DeletePredicate { - range: TimestampRange { start: 3, end: 4 }, + range: TimestampRange::new(3, 4), exprs: vec![], }, Some(NonEmptyString::new("some_foo").unwrap()), @@ -630,7 +630,7 @@ mod tests { let meta = DmlMeta::unsequenced(None); let delete = DmlDelete::new( DeletePredicate { - range: TimestampRange { start: 5, end: 6 }, + range: TimestampRange::new(5, 6), exprs: vec![], }, Some(NonEmptyString::new("bar").unwrap()), diff --git a/generated_types/src/delete_predicate.rs b/generated_types/src/delete_predicate.rs index 09e5e04329..2fa7b2126d 100644 --- a/generated_types/src/delete_predicate.rs +++ b/generated_types/src/delete_predicate.rs @@ -20,8 +20,8 @@ impl From for proto::Predicate { fn from(predicate: DeletePredicate) -> Self { proto::Predicate { range: Some(proto::TimestampRange { - start: predicate.range.start, - end: predicate.range.end, + start: predicate.range.start(), + end: predicate.range.end(), }), exprs: predicate.exprs.into_iter().map(Into::into).collect(), } @@ -35,10 +35,7 @@ impl TryFrom for DeletePredicate { let range = value.range.unwrap_field("range")?; Ok(Self { - range: TimestampRange { - start: range.start, - end: range.end, - }, + range: TimestampRange::new(range.start, range.end), exprs: value.exprs.repeated("exprs")?, }) } diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/router/http.rs b/influxdb_iox/src/influxdb_ioxd/server_type/router/http.rs index b9a2716f5a..d27afba29a 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/router/http.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/router/http.rs @@ -177,7 +177,7 @@ mod tests { check_response("delete", response, StatusCode::NO_CONTENT, Some("")).await; let predicate = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![DeleteExpr { column: String::from("foo"), op: data_types::delete_predicate::Op::Eq, diff --git a/influxdb_iox/tests/end_to_end_cases/delete_api.rs b/influxdb_iox/tests/end_to_end_cases/delete_api.rs index 765216a063..a0348c3ea0 100644 --- a/influxdb_iox/tests/end_to_end_cases/delete_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/delete_api.rs @@ -66,10 +66,7 @@ async fn test_delete_on_database() { // Delete some data let table = "cpu"; let pred = DeletePredicate { - range: TimestampRange { - start: 100, - end: 120, - }, + range: TimestampRange::new(100, 120), exprs: vec![DeleteExpr { column: String::from("region"), op: data_types::delete_predicate::Op::Eq, @@ -169,10 +166,7 @@ pub async fn test_delete_on_router() { let table = "cpu"; let pred = DeletePredicate { - range: TimestampRange { - start: 100, - end: 120, - }, + range: TimestampRange::new(100, 120), exprs: vec![DeleteExpr { column: String::from("region"), op: data_types::delete_predicate::Op::Eq, diff --git a/parquet_catalog/src/test_helpers.rs b/parquet_catalog/src/test_helpers.rs index 6e1ae4ab73..2c49105e7d 100644 --- a/parquet_catalog/src/test_helpers.rs +++ b/parquet_catalog/src/test_helpers.rs @@ -545,7 +545,7 @@ fn get_sorted_keys<'a>( /// Helper to create a simple delete predicate. pub fn create_delete_predicate(value: i64) -> Arc { Arc::new(DeletePredicate { - range: TimestampRange { start: 11, end: 22 }, + range: TimestampRange::new(11, 22), exprs: vec![DeleteExpr::new( "foo".to_string(), Op::Eq, diff --git a/predicate/src/delete_predicate.rs b/predicate/src/delete_predicate.rs index 30dcf905da..d142a54e90 100644 --- a/predicate/src/delete_predicate.rs +++ b/predicate/src/delete_predicate.rs @@ -94,10 +94,7 @@ pub fn parse_delete_predicate( let delete_exprs = parse_predicate(predicate)?; Ok(DeletePredicate { - range: TimestampRange { - start: start_time, - end: stop_time, - }, + range: TimestampRange::new(start_time, stop_time), exprs: delete_exprs, }) } @@ -551,8 +548,8 @@ mod tests { let pred = r#"cost != 100"#; let result = parse_delete_predicate(start, stop, pred).unwrap(); - assert_eq!(result.range.start, 0); - assert_eq!(result.range.end, 200); + assert_eq!(result.range.start(), 0); + assert_eq!(result.range.end(), 200); let expected = vec![DeleteExpr::new( "cost".to_string(), diff --git a/predicate/src/predicate.rs b/predicate/src/predicate.rs index a31b57f3bb..6e4a7d98c4 100644 --- a/predicate/src/predicate.rs +++ b/predicate/src/predicate.rs @@ -124,7 +124,7 @@ impl Predicate { /// `range.start <= time and time < range.end` fn make_timestamp_predicate_expr(&self) -> Option { self.range - .map(|range| make_range_expr(range.start, range.end, TIME_COLUMN_NAME)) + .map(|range| make_range_expr(range.start(), range.end(), TIME_COLUMN_NAME)) } /// Returns true if ths predicate evaluates to true for all rows @@ -184,8 +184,8 @@ impl Predicate { // time_expr = NOT(start <= time_range <= end) // Equivalent to: (time < start OR time > end) let time_expr = col(TIME_COLUMN_NAME) - .lt(lit_timestamp_nano(range.start)) - .or(col(TIME_COLUMN_NAME).gt(lit_timestamp_nano(range.end))); + .lt(lit_timestamp_nano(range.start())) + .or(col(TIME_COLUMN_NAME).gt(lit_timestamp_nano(range.end()))); match expr { None => expr = Some(time_expr), @@ -215,7 +215,7 @@ impl Predicate { /// existing storage engine pub fn clear_timestamp_if_max_range(mut self) -> Self { self.range = self.range.take().and_then(|range| { - if range.start <= MIN_NANO_TIME && range.end >= MAX_NANO_TIME { + if range.start() <= MIN_NANO_TIME && range.end() >= MAX_NANO_TIME { None } else { Some(range) @@ -254,7 +254,7 @@ impl fmt::Display for Predicate { if let Some(range) = &self.range { // TODO: could be nice to show this as actual timestamps (not just numbers)? - write!(f, " range: [{} - {}]", range.start, range.end)?; + write!(f, " range: [{} - {}]", range.start(), range.end())?; } if !self.exprs.is_empty() { @@ -313,7 +313,7 @@ impl PredicateBuilder { "Unexpected re-definition of timestamp range" ); - self.inner.range = Some(TimestampRange { start, end }); + self.inner.range = Some(TimestampRange::new(start, end)); self } diff --git a/query_tests/src/influxrpc/read_filter.rs b/query_tests/src/influxrpc/read_filter.rs index 740cda4ebb..0761c42441 100644 --- a/query_tests/src/influxrpc/read_filter.rs +++ b/query_tests/src/influxrpc/read_filter.rs @@ -62,10 +62,7 @@ impl DbSetup for TwoMeasurementsMultiSeriesWithDelete { // 2 rows of h2o with timestamp 200 and 350 will be deleted let delete_table_name = "h2o"; let pred = DeletePredicate { - range: TimestampRange { - start: 120, - end: 250, - }, + range: TimestampRange::new(120, 250), exprs: vec![], }; @@ -104,10 +101,7 @@ impl DbSetup for TwoMeasurementsMultiSeriesWithDeleteAll { // pred: delete from h20 where 100 <= time <= 360 let delete_table_name = "h2o"; let pred = DeletePredicate { - range: TimestampRange { - start: 100, - end: 360, - }, + range: TimestampRange::new(100, 360), exprs: vec![], }; @@ -649,10 +643,7 @@ impl DbSetup for MeasurementsSortableTagsWithDelete { // 1 rows of h2o with timestamp 250 will be deleted let delete_table_name = "h2o"; let pred = DeletePredicate { - range: TimestampRange { - start: 120, - end: 350, - }, + range: TimestampRange::new(120, 350), exprs: vec![DeleteExpr::new( "state".to_string(), data_types::delete_predicate::Op::Eq, diff --git a/query_tests/src/influxrpc/read_group.rs b/query_tests/src/influxrpc/read_group.rs index 6d245512b6..bade4fcbca 100644 --- a/query_tests/src/influxrpc/read_group.rs +++ b/query_tests/src/influxrpc/read_group.rs @@ -86,7 +86,7 @@ impl DbSetup for OneMeasurementNoTagsWithDelete { // 1 row of m0 with timestamp 1 let delete_table_name = "m0"; let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 1 }, + range: TimestampRange::new(1, 1), exprs: vec![DeleteExpr::new( "foo".to_string(), data_types::delete_predicate::Op::Eq, @@ -118,7 +118,7 @@ impl DbSetup for OneMeasurementNoTagsWithDeleteAllWithAndWithoutChunk { // pred: delete from m0 where 1 <= time <= 2 let delete_table_name = "m0"; let pred = DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![], }; diff --git a/query_tests/src/influxrpc/read_window_aggregate.rs b/query_tests/src/influxrpc/read_window_aggregate.rs index 24f8115c4f..150d58a70b 100644 --- a/query_tests/src/influxrpc/read_window_aggregate.rs +++ b/query_tests/src/influxrpc/read_window_aggregate.rs @@ -315,10 +315,7 @@ impl DbSetup for MeasurementForDefect2697WithDelete { // 1 row of m0 with timestamp 1609459201000000022 (section=2b bar=1.2) let delete_table_name = "mm"; let pred = DeletePredicate { - range: TimestampRange { - start: 1609459201000000022, - end: 1609459201000000022, - }, + range: TimestampRange::new(1609459201000000022, 1609459201000000022), exprs: vec![], }; @@ -347,10 +344,7 @@ impl DbSetup for MeasurementForDefect2697WithDeleteAll { // pred: delete from mm where 1 <= time <= 1609459201000000031 let delete_table_name = "mm"; let pred = DeletePredicate { - range: TimestampRange { - start: 1, - end: 1609459201000000031, - }, + range: TimestampRange::new(1, 1609459201000000031), exprs: vec![], }; diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index 1c356cdc6b..4875fcde07 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -279,10 +279,7 @@ impl DbSetup for OneMeasurementManyNullTagsWithDelete { // 3 rows of h2o & NY state will be deleted let delete_table_name = "h2o"; let pred = DeletePredicate { - range: TimestampRange { - start: 400, - end: 602, - }, + range: TimestampRange::new(400, 602), exprs: vec![DeleteExpr::new( "state".to_string(), data_types::delete_predicate::Op::Eq, @@ -322,10 +319,7 @@ impl DbSetup for OneMeasurementManyNullTagsWithDeleteAll { // all rows of h2o will be deleted let delete_table_name = "h2o"; let pred = DeletePredicate { - range: TimestampRange { - start: 100, - end: 602, - }, + range: TimestampRange::new(100, 602), exprs: vec![], }; @@ -412,10 +406,7 @@ impl DbSetup for TwoMeasurementsWithDelete { // delete 1 row from cpu with timestamp 150 let table_name = "cpu"; let pred = DeletePredicate { - range: TimestampRange { - start: 120, - end: 160, - }, + range: TimestampRange::new(120, 160), exprs: vec![DeleteExpr::new( "region".to_string(), data_types::delete_predicate::Op::Eq, @@ -447,10 +438,7 @@ impl DbSetup for TwoMeasurementsWithDeleteAll { // which will delete second row of the cpu let table_name = "cpu"; let pred1 = DeletePredicate { - range: TimestampRange { - start: 120, - end: 160, - }, + range: TimestampRange::new(120, 160), exprs: vec![DeleteExpr::new( "region".to_string(), data_types::delete_predicate::Op::Eq, @@ -460,7 +448,7 @@ impl DbSetup for TwoMeasurementsWithDeleteAll { // delete the first row of the cpu let pred2 = DeletePredicate { - range: TimestampRange { start: 0, end: 110 }, + range: TimestampRange::new(0, 110), exprs: vec![], }; @@ -885,10 +873,7 @@ impl DbSetup for OneMeasurementManyFieldsWithDelete { // field4 no longer available let delete_table_name = "h2o"; let pred = DeletePredicate { - range: TimestampRange { - start: 1000, - end: 1100, - }, + range: TimestampRange::new(1000, 1100), exprs: vec![], }; @@ -950,10 +935,7 @@ impl DbSetup for EndToEndTestWithDelete { // 1 rows of h2o with timestamp 250 will be deleted let delete_table_name = "swap"; let pred = DeletePredicate { - range: TimestampRange { - start: 6000, - end: 6000, - }, + range: TimestampRange::new(6000, 6000), exprs: vec![DeleteExpr::new( "name".to_string(), data_types::delete_predicate::Op::Eq, diff --git a/query_tests/src/scenarios/delete.rs b/query_tests/src/scenarios/delete.rs index e321ce11bb..cdd8f85849 100644 --- a/query_tests/src/scenarios/delete.rs +++ b/query_tests/src/scenarios/delete.rs @@ -31,7 +31,7 @@ impl DbSetup for OneDeleteSimpleExprOneChunkDeleteAll { // delete predicate let pred = DeletePredicate { - range: TimestampRange { start: 10, end: 20 }, + range: TimestampRange::new(10, 20), exprs: vec![], }; @@ -54,7 +54,7 @@ impl DbSetup for OneDeleteSimpleExprOneChunk { // delete predicate let pred = DeletePredicate { - range: TimestampRange { start: 0, end: 15 }, + range: TimestampRange::new(0, 15), exprs: vec![DeleteExpr::new( "bar".to_string(), data_types::delete_predicate::Op::Eq, @@ -106,7 +106,7 @@ impl DbSetup for OneDeleteMultiExprsOneChunk { ]; // delete predicate let pred = DeletePredicate { - range: TimestampRange { start: 0, end: 30 }, + range: TimestampRange::new(0, 30), exprs: vec![ DeleteExpr::new( "bar".to_string(), @@ -149,7 +149,7 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk { // delete predicate // pred1: delete from cpu where 0 <= time <= 32 and bar = 1 and foo = 'me' let pred1 = DeletePredicate { - range: TimestampRange { start: 0, end: 32 }, + range: TimestampRange::new(0, 32), exprs: vec![ DeleteExpr::new( "bar".to_string(), @@ -166,7 +166,7 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk { // pred2: delete from cpu where 10 <= time <= 40 and bar != 1 let pred2 = DeletePredicate { - range: TimestampRange { start: 10, end: 40 }, + range: TimestampRange::new(10, 40), exprs: vec![DeleteExpr::new( "bar".to_string(), data_types::delete_predicate::Op::Ne, @@ -206,7 +206,7 @@ impl DbSetup for ThreeDeleteThreeChunks { ]; // delete predicate on chunk 1 let pred1 = DeletePredicate { - range: TimestampRange { start: 0, end: 30 }, + range: TimestampRange::new(0, 30), exprs: vec![ DeleteExpr::new( "bar".to_string(), @@ -230,7 +230,7 @@ impl DbSetup for ThreeDeleteThreeChunks { ]; // delete predicate on chunk 1 & chunk 2 let pred2 = DeletePredicate { - range: TimestampRange { start: 20, end: 45 }, + range: TimestampRange::new(20, 45), exprs: vec![DeleteExpr::new( "foo".to_string(), data_types::delete_predicate::Op::Eq, @@ -247,7 +247,7 @@ impl DbSetup for ThreeDeleteThreeChunks { ]; // delete predicate on chunk 3 let pred3 = DeletePredicate { - range: TimestampRange { start: 75, end: 95 }, + range: TimestampRange::new(75, 95), exprs: vec![DeleteExpr::new( "bar".to_string(), data_types::delete_predicate::Op::Ne, diff --git a/router/src/router.rs b/router/src/router.rs index 5ed0f9083c..fe02e90559 100644 --- a/router/src/router.rs +++ b/router/src/router.rs @@ -410,7 +410,7 @@ mod tests { ); let delete = DmlOperation::Delete(DmlDelete::new( DeletePredicate { - range: TimestampRange { start: 1, end: 2 }, + range: TimestampRange::new(1, 2), exprs: vec![], }, Some(NonEmptyString::new("foo_foo").unwrap()), diff --git a/server/tests/delete.rs b/server/tests/delete.rs index 7e2e9cde6b..e5e788a0a9 100644 --- a/server/tests/delete.rs +++ b/server/tests/delete.rs @@ -101,10 +101,7 @@ async fn delete_predicate_preservation() { // ==================== do: delete ==================== let pred = Arc::new(DeletePredicate { - range: TimestampRange { - start: 0, - end: 1_000, - }, + range: TimestampRange::new(0, 1_000), exprs: vec![DeleteExpr::new( "selector".to_string(), data_types::delete_predicate::Op::Eq, diff --git a/server/tests/write_buffer_delete.rs b/server/tests/write_buffer_delete.rs index 380b9cc395..4680c581db 100644 --- a/server/tests/write_buffer_delete.rs +++ b/server/tests/write_buffer_delete.rs @@ -179,7 +179,7 @@ async fn write_buffer_deletes() { fixture .delete(DmlDelete::new( DeletePredicate { - range: TimestampRange { start: 0, end: 20 }, + range: TimestampRange::new(0, 20), exprs: vec![DeleteExpr { column: "x".to_string(), op: Op::Eq, From 2f0e1b9d67f9c3094c675c7c02f8aae1bee329ff Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 21 Jan 2022 09:54:11 +0100 Subject: [PATCH 3/7] refactor: remove unused lifetime from `SpanRecorder` impls --- trace/src/span.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trace/src/span.rs b/trace/src/span.rs index 249af64ee9..24c369f404 100644 --- a/trace/src/span.rs +++ b/trace/src/span.rs @@ -126,7 +126,7 @@ pub struct SpanRecorder { span: Option, } -impl<'a> SpanRecorder { +impl SpanRecorder { pub fn new(mut span: Option) -> Self { if let Some(span) = span.as_mut() { span.start = Some(Utc::now()); @@ -179,7 +179,7 @@ impl<'a> SpanRecorder { } } -impl<'a> Drop for SpanRecorder { +impl Drop for SpanRecorder { fn drop(&mut self) { if let Some(mut span) = self.span.take() { let now = Utc::now(); From 548cfabcda6f208e695599f97c2aaab6679899a1 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 21 Jan 2022 09:54:49 +0100 Subject: [PATCH 4/7] feat: simplify linking of spans Linking of span contexts was introduced in #2803 but the high-level interface was never used. This adds the missing bits to allow links to be used with `Span` and `SpanRecorder`. --- trace/src/span.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/trace/src/span.rs b/trace/src/span.rs index 24c369f404..9282c4040c 100644 --- a/trace/src/span.rs +++ b/trace/src/span.rs @@ -72,6 +72,11 @@ impl Span { pub fn child(&self, name: impl Into>) -> Self { self.ctx.child(name) } + + /// Link this span to another context. + pub fn link(&mut self, other: &SpanContext) { + self.ctx.links.push((other.trace_id, other.span_id)); + } } #[derive(Debug, Clone)] @@ -177,6 +182,13 @@ impl SpanRecorder { pub fn span(&self) -> Option<&Span> { self.span.as_ref() } + + /// Link this span to another context. + pub fn link(&mut self, other: &SpanContext) { + if let Some(span) = self.span.as_mut() { + span.link(other); + } + } } impl Drop for SpanRecorder { From 9615feacb351a11cafb8b699436a64f5dda5cff8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 21 Jan 2022 09:40:10 -0500 Subject: [PATCH 5/7] fix(InfluxQL): Support RegEx with escape sequences not supported by Rust regex (#3502) * fix(InfluxQL): Translate unsupported meta characters * fix: remove debugging * fix: clippy sacrifice * docs: Add additional background and rationale for rewriting * fix: doc link --- Cargo.lock | 1 + predicate/Cargo.toml | 1 + predicate/src/regex.rs | 113 +++++++++++++++++++++++ query_tests/src/influxrpc/read_filter.rs | 55 ++++++++++- 4 files changed, 165 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 57fc9e4ef6..dcdfed40ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3209,6 +3209,7 @@ dependencies = [ "observability_deps", "ordered-float 2.10.0", "regex", + "regex-syntax", "schema", "serde_json", "snafu", diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml index c1c638b1f5..52c2cc093e 100644 --- a/predicate/Cargo.toml +++ b/predicate/Cargo.toml @@ -13,6 +13,7 @@ schema = { path = "../schema" } observability_deps = { path = "../observability_deps" } ordered-float = "2" regex = "1" +regex-syntax = "0.6.25" serde_json = "1.0.72" snafu = "0.7" sqlparser = "0.13.0" diff --git a/predicate/src/regex.rs b/predicate/src/regex.rs index dae111bfcb..37e0c6a327 100644 --- a/predicate/src/regex.rs +++ b/predicate/src/regex.rs @@ -30,6 +30,11 @@ pub fn regex_match_expr(input: Expr, pattern: String, matches: bool) -> Expr { // N.B., this function does not utilise the Arrow regexp compute kernel because // in order to act as a filter it needs to return a boolean array of comparison // results, not an array of strings as the regex compute kernel does. + + // Attempt to make the pattern compatible with what is accepted by + // the golang regexp library which is different than Rust's regexp + let pattern = clean_non_meta_escapes(pattern); + let func = move |args: &[ArrayRef]| { assert_eq!(args.len(), 1); // only works over a single column at a time. @@ -72,6 +77,79 @@ pub fn regex_match_expr(input: Expr, pattern: String, matches: bool) -> Expr { udf.call(vec![input]) } +fn is_valid_character_after_escape(c: char) -> bool { + // same list as https://docs.rs/regex-syntax/0.6.25/src/regex_syntax/ast/parse.rs.html#1445-1538 + match c { + '0'..='7' => true, + '8'..='9' => true, + 'x' | 'u' | 'U' => true, + 'p' | 'P' => true, + 'd' | 's' | 'w' | 'D' | 'S' | 'W' => true, + _ => regex_syntax::is_meta_character(c), + } +} + +/// Removes all `/` patterns that the rust regex library would reject +/// and rewrites them to their unescaped form. +/// +/// For example, `\:` is rewritten to `:` as `\:` is not a valid +/// escape sequence in the `regexp` crate but is valid in golang's +/// regexp implementation. +/// +/// This is done for compatibility purposes so that the regular +/// expression matching in Rust more closely follows the matching in +/// golang, used by the influx storage rpc. +/// +/// See for more details +fn clean_non_meta_escapes(pattern: String) -> String { + if pattern.is_empty() { + return pattern; + } + + #[derive(Debug, Copy, Clone)] + enum SlashState { + No, + Single, + Double, + } + + let mut next_state = SlashState::No; + + let next_chars = pattern + .chars() + .map(Some) + .skip(1) + .chain(std::iter::once(None)); + + // emit char based on previous + let new_pattern: String = pattern + .chars() + .zip(next_chars) + .filter_map(|(c, next_char)| { + let cur_state = next_state; + next_state = match (c, cur_state) { + ('\\', SlashState::No) => SlashState::Single, + ('\\', SlashState::Single) => SlashState::Double, + ('\\', SlashState::Double) => SlashState::Single, + _ => SlashState::No, + }; + + // Decide to emit `c` or not + match (cur_state, c, next_char) { + (SlashState::No, '\\', Some(next_char)) + | (SlashState::Double, '\\', Some(next_char)) + if !is_valid_character_after_escape(next_char) => + { + None + } + _ => Some(c), + } + }) + .collect(); + + new_pattern +} + #[cfg(test)] mod test { use arrow::{ @@ -88,6 +166,8 @@ mod test { }; use std::sync::Arc; + use super::clean_non_meta_escapes; + #[tokio::test] async fn regex_match_expr() { let cases = vec![ @@ -220,4 +300,37 @@ mod test { .map(|s| s.to_owned()) .collect()) } + + #[test] + fn test_clean_non_meta_escapes() { + let cases = vec![ + ("", ""), + (r#"\"#, r#"\"#), + (r#"\\"#, r#"\\"#), + // : is not a special meta character + (r#"\:"#, r#":"#), + // . is a special meta character + (r#"\."#, r#"\."#), + (r#"foo\"#, r#"foo\"#), + (r#"foo\\"#, r#"foo\\"#), + (r#"foo\:"#, r#"foo:"#), + (r#"foo\xff"#, r#"foo\xff"#), + (r#"fo\\o"#, r#"fo\\o"#), + (r#"fo\:o"#, r#"fo:o"#), + (r#"fo\:o\x123"#, r#"fo:o\x123"#), + (r#"fo\:o\x123\:"#, r#"fo:o\x123:"#), + (r#"foo\\\:bar"#, r#"foo\\:bar"#), + (r#"foo\\\:bar\\\:"#, r#"foo\\:bar\\:"#), + ("foo", "foo"), + ]; + + for (pattern, expected) in cases { + let cleaned_pattern = clean_non_meta_escapes(pattern.to_string()); + assert_eq!( + cleaned_pattern, expected, + "Expected '{}' to be cleaned to '{}', got '{}'", + pattern, expected, cleaned_pattern + ); + } + } } diff --git a/query_tests/src/influxrpc/read_filter.rs b/query_tests/src/influxrpc/read_filter.rs index 0761c42441..ea0a8aecb0 100644 --- a/query_tests/src/influxrpc/read_filter.rs +++ b/query_tests/src/influxrpc/read_filter.rs @@ -477,7 +477,7 @@ async fn test_read_filter_data_pred_using_regex_match() { .build(); let expected_results = vec![ - "Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200], values: [90.0]", + "Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [200], values: [90.0]", ]; run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await; @@ -508,7 +508,7 @@ async fn test_read_filter_data_pred_using_regex_match_with_delete() { .build(); let expected_results = vec![ - "Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]", + "Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]", ]; run_read_filter_test_case( TwoMeasurementsMultiSeriesWithDelete {}, @@ -536,14 +536,59 @@ async fn test_read_filter_data_pred_using_regex_not_match() { .build(); let expected_results = vec![ - "Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [72.4]", - "Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [250], values: [51.0]", - "Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [53.4]", + "Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [72.4]", + "Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [250], values: [51.0]", + "Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [250], values: [53.4]", ]; run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await; } +#[tokio::test] +async fn test_read_filter_data_pred_regex_escape() { + let predicate = PredicateBuilder::default() + // Came from InfluxQL like `SELECT value FROM db0.rp0.status_code WHERE url =~ /https\:\/\/influxdb\.com/`, + .build_regex_match_expr("url", r#"https\://influxdb\.com"#) + .build(); + + // expect one series with influxdb.com + let expected_results = vec![ + "Series tags={_measurement=status_code, url=https://influxdb.com, _field=value}\n FloatPoints timestamps: [1527018816000000000], values: [418.0]", + ]; + + run_read_filter_test_case(MeasurementStatusCode {}, predicate, expected_results).await; +} + +pub struct MeasurementStatusCode {} +#[async_trait] +impl DbSetup for MeasurementStatusCode { + async fn make(&self) -> Vec { + let partition_key = "2018-05-22T19"; + + let lp = vec![ + "status_code,url=http://www.example.com value=404 1527018806000000000", + "status_code,url=https://influxdb.com value=418 1527018816000000000", + ]; + + all_scenarios_for_one_chunk(vec![], vec![], lp, "status_code", partition_key).await + } +} + +#[tokio::test] +async fn test_read_filter_data_pred_not_match_regex_escape() { + let predicate = PredicateBuilder::default() + // Came from InfluxQL like `SELECT value FROM db0.rp0.status_code WHERE url !~ /https\:\/\/influxdb\.com/`, + .build_regex_not_match_expr("url", r#"https\://influxdb\.com"#) + .build(); + + // expect one series with influxdb.com + let expected_results = vec![ + "Series tags={_measurement=status_code, url=http://www.example.com, _field=value}\n FloatPoints timestamps: [1527018806000000000], values: [404.0]", + ]; + + run_read_filter_test_case(MeasurementStatusCode {}, predicate, expected_results).await; +} + #[tokio::test] async fn test_read_filter_data_pred_unsupported_in_scan() { test_helpers::maybe_start_logging(); From bfa54033bdc2a1d147f5d4b236867d5c0998518f Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 21 Jan 2022 16:01:13 -0500 Subject: [PATCH 6/7] refactor: Clean up the Catalog API This updates the catalog API to make it easier to work with for consumers. I also found a bug in the MemCatalog implementation while refactoring the tests to work with the new API definition. Consumers will now be able to Arc wrap the catalog and use it across awaits. --- ingester/src/data.rs | 10 +- ingester/src/server.rs | 6 +- iox_catalog/src/interface.rs | 295 +++++++++++++++++++---------------- iox_catalog/src/lib.rs | 61 +++++--- iox_catalog/src/mem.rs | 81 +++++----- iox_catalog/src/postgres.rs | 63 ++++---- 6 files changed, 279 insertions(+), 237 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index b838faca54..35eba45818 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -7,8 +7,8 @@ use uuid::Uuid; use crate::server::IngesterServer; use iox_catalog::interface::{ - KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, RepoCollection, SequenceNumber, - SequencerId, TableId, Tombstone, + Catalog, KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, SequenceNumber, SequencerId, + TableId, Tombstone, }; use mutable_batch::MutableBatch; use parking_lot::RwLock; @@ -54,11 +54,9 @@ pub struct Sequencers { impl Sequencers { /// One time initialize Sequencers of this Ingester - pub async fn initialize( - ingester: &IngesterServer<'_, T>, - ) -> Result { + pub async fn initialize(ingester: &IngesterServer<'_, T>) -> Result { // Get sequencer ids from the catalog - let sequencer_repro = ingester.iox_catalog.sequencer(); + let sequencer_repro = ingester.iox_catalog.sequencers(); let mut sequencers = BTreeMap::default(); let topic = ingester.get_topic(); for shard in ingester.get_kafka_partitions() { diff --git a/ingester/src/server.rs b/ingester/src/server.rs index 11ce6dc553..7c48be5389 100644 --- a/ingester/src/server.rs +++ b/ingester/src/server.rs @@ -3,13 +3,13 @@ use std::sync::Arc; -use iox_catalog::interface::{KafkaPartition, KafkaTopic, KafkaTopicId, RepoCollection}; +use iox_catalog::interface::{Catalog, KafkaPartition, KafkaTopic, KafkaTopicId}; /// The [`IngesterServer`] manages the lifecycle and contains all state for /// an `ingester` server instance. pub struct IngesterServer<'a, T> where - T: RepoCollection + Send + Sync, + T: Catalog, { /// Kafka Topic assigned to this ingester kafka_topic: KafkaTopic, @@ -21,7 +21,7 @@ where impl<'a, T> IngesterServer<'a, T> where - T: RepoCollection + Send + Sync, + T: Catalog, { /// Initialize the Ingester pub fn new(topic: KafkaTopic, shard_ids: Vec, catalog: &'a Arc) -> Self { diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index d72e91a4ee..01431ae969 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -6,7 +6,6 @@ use snafu::{OptionExt, Snafu}; use std::collections::BTreeMap; use std::convert::TryFrom; use std::fmt::Formatter; -use std::sync::Arc; use uuid::Uuid; #[derive(Debug, Snafu)] @@ -247,32 +246,32 @@ impl std::fmt::Display for ParquetFileId { } } -/// Container that can return repos for each of the catalog data types. +/// Trait that contains methods for working with the catalog #[async_trait] -pub trait RepoCollection { +pub trait Catalog: Send + Sync { /// repo for kafka topics - fn kafka_topic(&self) -> Arc; + fn kafka_topics(&self) -> &dyn KafkaTopicRepo; /// repo fo rquery pools - fn query_pool(&self) -> Arc; + fn query_pools(&self) -> &dyn QueryPoolRepo; /// repo for namespaces - fn namespace(&self) -> Arc; + fn namespaces(&self) -> &dyn NamespaceRepo; /// repo for tables - fn table(&self) -> Arc; + fn tables(&self) -> &dyn TableRepo; /// repo for columns - fn column(&self) -> Arc; + fn columns(&self) -> &dyn ColumnRepo; /// repo for sequencers - fn sequencer(&self) -> Arc; + fn sequencers(&self) -> &dyn SequencerRepo; /// repo for partitions - fn partition(&self) -> Arc; + fn partitions(&self) -> &dyn PartitionRepo; /// repo for tombstones - fn tombstone(&self) -> Arc; + fn tombstones(&self) -> &dyn TombstoneRepo; /// repo for parquet_files - fn parquet_file(&self) -> Arc; + fn parquet_files(&self) -> &dyn ParquetFileRepo; } /// Functions for working with Kafka topics in the catalog. #[async_trait] -pub trait KafkaTopicRepo { +pub trait KafkaTopicRepo: Send + Sync { /// Creates the kafka topic in the catalog or gets the existing record by name. async fn create_or_get(&self, name: &str) -> Result; @@ -282,14 +281,14 @@ pub trait KafkaTopicRepo { /// Functions for working with query pools in the catalog. #[async_trait] -pub trait QueryPoolRepo { +pub trait QueryPoolRepo: Send + Sync { /// Creates the query pool in the catalog or gets the existing record by name. async fn create_or_get(&self, name: &str) -> Result; } /// Functions for working with namespaces in the catalog #[async_trait] -pub trait NamespaceRepo { +pub trait NamespaceRepo: Send + Sync { /// Creates the namespace in the catalog. If one by the same name already exists, an /// error is returned. async fn create( @@ -306,7 +305,7 @@ pub trait NamespaceRepo { /// Functions for working with tables in the catalog #[async_trait] -pub trait TableRepo { +pub trait TableRepo: Send + Sync { /// Creates the table in the catalog or get the existing record by name. async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result; @@ -316,7 +315,7 @@ pub trait TableRepo { /// Functions for working with columns in the catalog #[async_trait] -pub trait ColumnRepo { +pub trait ColumnRepo: Send + Sync { /// Creates the column in the catalog or returns the existing column. Will return a /// `Error::ColumnTypeMismatch` if the existing column type doesn't match the type /// the caller is attempting to create. @@ -333,7 +332,7 @@ pub trait ColumnRepo { /// Functions for working with sequencers in the catalog #[async_trait] -pub trait SequencerRepo { +pub trait SequencerRepo: Send + Sync { /// create a sequencer record for the kafka topic and partition or return the existing record async fn create_or_get( &self, @@ -358,7 +357,7 @@ pub trait SequencerRepo { /// Functions for working with IOx partitions in the catalog. Note that these are how /// IOx splits up data within a database, which is differenet than Kafka partitions. #[async_trait] -pub trait PartitionRepo { +pub trait PartitionRepo: Send + Sync { /// create or get a partition record for the given partition key, sequencer and table async fn create_or_get( &self, @@ -373,7 +372,7 @@ pub trait PartitionRepo { /// Functions for working with tombstones in the catalog #[async_trait] -pub trait TombstoneRepo { +pub trait TombstoneRepo: Send + Sync { /// create or get a tombstone async fn create_or_get( &self, @@ -397,7 +396,7 @@ pub trait TombstoneRepo { /// Functions for working with parquet file pointers in the catalog #[async_trait] -pub trait ParquetFileRepo { +pub trait ParquetFileRepo: Send + Sync { /// create the parquet file #[allow(clippy::too_many_arguments)] async fn create( @@ -519,22 +518,19 @@ impl NamespaceSchema { } /// Gets the namespace schema including all tables and columns. -pub async fn get_schema_by_name( +pub async fn get_schema_by_name( name: &str, - repo: &T, + catalog: &dyn Catalog, ) -> Result> { - let namespace_repo = repo.namespace(); - let table_repo = repo.table(); - let column_repo = repo.column(); - - let namespace = namespace_repo + let namespace = catalog + .namespaces() .get_by_name(name) .await? .context(NamespaceNotFoundSnafu { name })?; // get the columns first just in case someone else is creating schema while we're doing this. - let columns = column_repo.list_by_namespace_id(namespace.id).await?; - let tables = table_repo.list_by_namespace_id(namespace.id).await?; + let columns = catalog.columns().list_by_namespace_id(namespace.id).await?; + let tables = catalog.tables().list_by_namespace_id(namespace.id).await?; let mut namespace = NamespaceSchema::new( namespace.id, @@ -813,25 +809,22 @@ pub struct ParquetFile { pub(crate) mod test_helpers { use super::*; use futures::{stream::FuturesOrdered, StreamExt}; + use std::sync::Arc; - pub(crate) async fn test_repo(new_repo: F) - where - T: RepoCollection + Send + Sync, - F: Fn() -> T + Send + Sync, - { - test_kafka_topic(&new_repo()).await; - test_query_pool(&new_repo()).await; - test_namespace(&new_repo()).await; - test_table(&new_repo()).await; - test_column(&new_repo()).await; - test_sequencer(&new_repo()).await; - test_partition(&new_repo()).await; - test_tombstone(&new_repo()).await; - test_parquet_file(&new_repo()).await; + pub(crate) async fn test_catalog(catalog: Arc) { + test_kafka_topic(Arc::clone(&catalog)).await; + test_query_pool(Arc::clone(&catalog)).await; + test_namespace(Arc::clone(&catalog)).await; + test_table(Arc::clone(&catalog)).await; + test_column(Arc::clone(&catalog)).await; + test_sequencer(Arc::clone(&catalog)).await; + test_partition(Arc::clone(&catalog)).await; + test_tombstone(Arc::clone(&catalog)).await; + test_parquet_file(Arc::clone(&catalog)).await; } - async fn test_kafka_topic(repo: &T) { - let kafka_repo = repo.kafka_topic(); + async fn test_kafka_topic(catalog: Arc) { + let kafka_repo = catalog.kafka_topics(); let k = kafka_repo.create_or_get("foo").await.unwrap(); assert!(k.id > KafkaTopicId::new(0)); assert_eq!(k.name, "foo"); @@ -843,8 +836,8 @@ pub(crate) mod test_helpers { assert!(k3.is_none()); } - async fn test_query_pool(repo: &T) { - let query_repo = repo.query_pool(); + async fn test_query_pool(catalog: Arc) { + let query_repo = catalog.query_pools(); let q = query_repo.create_or_get("foo").await.unwrap(); assert!(q.id > QueryPoolId::new(0)); assert_eq!(q.name, "foo"); @@ -852,10 +845,10 @@ pub(crate) mod test_helpers { assert_eq!(q, q2); } - async fn test_namespace(repo: &T) { - let namespace_repo = repo.namespace(); - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); + async fn test_namespace(catalog: Arc) { + let namespace_repo = catalog.namespaces(); + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); let namespace_name = "test_namespace"; let namespace = namespace_repo @@ -881,53 +874,75 @@ pub(crate) mod test_helpers { assert_eq!(namespace, found); } - async fn test_table(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_table(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_table_test", "inf", kafka.id, pool.id) .await .unwrap(); // test we can create or get a table - let table_repo = repo.table(); - let t = table_repo + let t = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let tt = table_repo + let tt = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); assert!(t.id > TableId::new(0)); assert_eq!(t, tt); - let tables = table_repo.list_by_namespace_id(namespace.id).await.unwrap(); + let tables = catalog + .tables() + .list_by_namespace_id(namespace.id) + .await + .unwrap(); assert_eq!(vec![t], tables); + + // test we can create a table of the same name in a different namespace + let namespace2 = catalog + .namespaces() + .create("two", "inf", kafka.id, pool.id) + .await + .unwrap(); + assert_ne!(namespace, namespace2); + let test_table = catalog + .tables() + .create_or_get("test_table", namespace2.id) + .await + .unwrap(); + assert_ne!(tt, test_table); + assert_eq!(test_table.namespace_id, namespace2.id) } - async fn test_column(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_column(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_column_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = repo - .table() + let table = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); + assert_eq!(table.namespace_id, namespace.id); // test we can create or get a column - let column_repo = repo.column(); - let c = column_repo + let c = catalog + .columns() .create_or_get("column_test", table.id, ColumnType::Tag) .await .unwrap(); - let cc = column_repo + let cc = catalog + .columns() .create_or_get("column_test", table.id, ColumnType::Tag) .await .unwrap(); @@ -935,7 +950,8 @@ pub(crate) mod test_helpers { assert_eq!(c, cc); // test that attempting to create an already defined column of a different type returns error - let err = column_repo + let err = catalog + .columns() .create_or_get("column_test", table.id, ColumnType::U64) .await .expect_err("should error with wrong column type"); @@ -949,35 +965,40 @@ pub(crate) mod test_helpers { )); // test that we can create a column of the same name under a different table - let table2 = repo - .table() + let table2 = catalog + .tables() .create_or_get("test_table_2", namespace.id) .await .unwrap(); - let ccc = column_repo + let ccc = catalog + .columns() .create_or_get("column_test", table2.id, ColumnType::U64) .await .unwrap(); assert_ne!(c, ccc); - let columns = column_repo + let columns = catalog + .columns() .list_by_namespace_id(namespace.id) .await .unwrap(); assert_eq!(vec![c, ccc], columns); } - async fn test_sequencer(repo: &T) { - let kafka = repo - .kafka_topic() + async fn test_sequencer(catalog: Arc) { + let kafka = catalog + .kafka_topics() .create_or_get("sequencer_test") .await .unwrap(); - let sequencer_repo = repo.sequencer(); // Create 10 sequencers let created = (1..=10) - .map(|partition| sequencer_repo.create_or_get(&kafka, KafkaPartition::new(partition))) + .map(|partition| { + catalog + .sequencers() + .create_or_get(&kafka, KafkaPartition::new(partition)) + }) .collect::>() .map(|v| { let v = v.expect("failed to create sequencer"); @@ -987,7 +1008,8 @@ pub(crate) mod test_helpers { .await; // List them and assert they match - let listed = sequencer_repo + let listed = catalog + .sequencers() .list_by_kafka_topic(&kafka) .await .expect("failed to list sequencers") @@ -999,7 +1021,8 @@ pub(crate) mod test_helpers { // get by the sequencer id and partition let kafka_partition = KafkaPartition::new(1); - let sequencer = sequencer_repo + let sequencer = catalog + .sequencers() .get_by_topic_id_and_partition(kafka.id, kafka_partition) .await .unwrap() @@ -1007,42 +1030,45 @@ pub(crate) mod test_helpers { assert_eq!(kafka.id, sequencer.kafka_topic_id); assert_eq!(kafka_partition, sequencer.kafka_partition); - let sequencer = sequencer_repo + let sequencer = catalog + .sequencers() .get_by_topic_id_and_partition(kafka.id, KafkaPartition::new(523)) .await .unwrap(); assert!(sequencer.is_none()); } - async fn test_partition(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_partition(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_partition_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = repo - .table() + let table = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let sequencer = repo - .sequencer() + let sequencer = catalog + .sequencers() .create_or_get(&kafka, KafkaPartition::new(1)) .await .unwrap(); - let other_sequencer = repo - .sequencer() + let other_sequencer = catalog + .sequencers() .create_or_get(&kafka, KafkaPartition::new(2)) .await .unwrap(); - let partition_repo = repo.partition(); - let created = ["foo", "bar"] .iter() - .map(|key| partition_repo.create_or_get(key, sequencer.id, table.id)) + .map(|key| { + catalog + .partitions() + .create_or_get(key, sequencer.id, table.id) + }) .collect::>() .map(|v| { let v = v.expect("failed to create partition"); @@ -1050,13 +1076,15 @@ pub(crate) mod test_helpers { }) .collect::>() .await; - let _ = partition_repo + let _ = catalog + .partitions() .create_or_get("asdf", other_sequencer.id, table.id) .await .unwrap(); // List them and assert they match - let listed = partition_repo + let listed = catalog + .partitions() .list_by_sequencer(sequencer.id) .await .expect("failed to list partitions") @@ -1067,34 +1095,34 @@ pub(crate) mod test_helpers { assert_eq!(created, listed); } - async fn test_tombstone(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_tombstone(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_tombstone_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = repo - .table() + let table = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let other_table = repo - .table() + let other_table = catalog + .tables() .create_or_get("other", namespace.id) .await .unwrap(); - let sequencer = repo - .sequencer() + let sequencer = catalog + .sequencers() .create_or_get(&kafka, KafkaPartition::new(1)) .await .unwrap(); - let tombstone_repo = repo.tombstone(); let min_time = Timestamp::new(1); let max_time = Timestamp::new(10); - let t1 = tombstone_repo + let t1 = catalog + .tombstones() .create_or_get( table.id, sequencer.id, @@ -1111,7 +1139,8 @@ pub(crate) mod test_helpers { assert_eq!(t1.min_time, min_time); assert_eq!(t1.max_time, max_time); assert_eq!(t1.serialized_predicate, "whatevs"); - let t2 = tombstone_repo + let t2 = catalog + .tombstones() .create_or_get( other_table.id, sequencer.id, @@ -1122,7 +1151,8 @@ pub(crate) mod test_helpers { ) .await .unwrap(); - let t3 = tombstone_repo + let t3 = catalog + .tombstones() .create_or_get( table.id, sequencer.id, @@ -1134,43 +1164,44 @@ pub(crate) mod test_helpers { .await .unwrap(); - let listed = tombstone_repo + let listed = catalog + .tombstones() .list_tombstones_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1)) .await .unwrap(); assert_eq!(vec![t2, t3], listed); } - async fn test_parquet_file(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_parquet_file(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_parquet_file_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = repo - .table() + let table = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let other_table = repo - .table() + let other_table = catalog + .tables() .create_or_get("other", namespace.id) .await .unwrap(); - let sequencer = repo - .sequencer() + let sequencer = catalog + .sequencers() .create_or_get(&kafka, KafkaPartition::new(1)) .await .unwrap(); - let partition = repo - .partition() + let partition = catalog + .partitions() .create_or_get("one", sequencer.id, table.id) .await .unwrap(); - let other_partition = repo - .partition() + let other_partition = catalog + .partitions() .create_or_get("one", sequencer.id, other_table.id) .await .unwrap(); @@ -1178,7 +1209,7 @@ pub(crate) mod test_helpers { let min_time = Timestamp::new(1); let max_time = Timestamp::new(10); - let parquet_repo = repo.parquet_file(); + let parquet_repo = catalog.parquet_files(); let parquet_file = parquet_repo .create( sequencer.id, diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index a23de38af4..3c5c5f2356 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -12,8 +12,8 @@ )] use crate::interface::{ - column_type_from_field, ColumnSchema, ColumnType, Error, KafkaPartition, KafkaTopic, - NamespaceSchema, QueryPool, RepoCollection, Result, Sequencer, SequencerId, TableId, + column_type_from_field, Catalog, ColumnSchema, ColumnType, Error, KafkaPartition, KafkaTopic, + NamespaceSchema, QueryPool, Result, Sequencer, SequencerId, TableId, }; use futures::{stream::FuturesOrdered, StreamExt}; use influxdb_line_protocol::ParsedLine; @@ -36,10 +36,10 @@ pub mod postgres; /// If another writer attempts to create a column of the same name with a different /// type at the same time and beats this caller to it, an error will be returned. If another /// writer adds the same schema before this one, then this will load that schema here. -pub async fn validate_or_insert_schema( +pub async fn validate_or_insert_schema( lines: Vec>, schema: &NamespaceSchema, - repo: &T, + catalog: &dyn Catalog, ) -> Result> { // table name to table_id let mut new_tables: BTreeMap = BTreeMap::new(); @@ -66,8 +66,8 @@ pub async fn validate_or_insert_schema( None => { let entry = new_columns.entry(table.id).or_default(); if entry.get(key.as_str()).is_none() { - let column_repo = repo.column(); - let column = column_repo + let column = catalog + .columns() .create_or_get(key.as_str(), table.id, ColumnType::Tag) .await?; entry.insert( @@ -97,8 +97,8 @@ pub async fn validate_or_insert_schema( let entry = new_columns.entry(table.id).or_default(); if entry.get(key.as_str()).is_none() { let data_type = column_type_from_field(value); - let column_repo = repo.column(); - let column = column_repo + let column = catalog + .columns() .create_or_get(key.as_str(), table.id, data_type) .await?; entry.insert( @@ -113,15 +113,16 @@ pub async fn validate_or_insert_schema( } } None => { - let table_repo = repo.table(); - let new_table = table_repo.create_or_get(table_name, schema.id).await?; + let new_table = catalog + .tables() + .create_or_get(table_name, schema.id) + .await?; let new_table_columns = new_columns.entry(new_table.id).or_default(); - let column_repo = repo.column(); - if let Some(tagset) = &line.series.tag_set { for (key, _) in tagset { - let new_column = column_repo + let new_column = catalog + .columns() .create_or_get(key.as_str(), new_table.id, ColumnType::Tag) .await?; new_table_columns.insert( @@ -135,7 +136,8 @@ pub async fn validate_or_insert_schema( } for (key, value) in &line.field_set { let data_type = column_type_from_field(value); - let new_column = column_repo + let new_column = catalog + .columns() .create_or_get(key.as_str(), new_table.id, data_type) .await?; new_table_columns.insert( @@ -146,7 +148,8 @@ pub async fn validate_or_insert_schema( }, ); } - let time_column = column_repo + let time_column = catalog + .columns() .create_or_get(TIME_COLUMN, new_table.id, ColumnType::Time) .await?; new_table_columns.insert( @@ -173,19 +176,25 @@ pub async fn validate_or_insert_schema( /// Creates or gets records in the catalog for the shared kafka topic, query pool, and sequencers for /// each of the partitions. -pub async fn create_or_get_default_records( +pub async fn create_or_get_default_records( kafka_partition_count: i32, - repo: &T, + catalog: &dyn Catalog, ) -> Result<(KafkaTopic, QueryPool, BTreeMap)> { - let kafka_repo = repo.kafka_topic(); - let query_repo = repo.query_pool(); - let sequencer_repo = repo.sequencer(); - - let kafka_topic = kafka_repo.create_or_get(SHARED_KAFKA_TOPIC).await?; - let query_pool = query_repo.create_or_get(SHARED_QUERY_POOL).await?; + let kafka_topic = catalog + .kafka_topics() + .create_or_get(SHARED_KAFKA_TOPIC) + .await?; + let query_pool = catalog + .query_pools() + .create_or_get(SHARED_QUERY_POOL) + .await?; let sequencers = (1..=kafka_partition_count) - .map(|partition| sequencer_repo.create_or_get(&kafka_topic, KafkaPartition::new(partition))) + .map(|partition| { + catalog + .sequencers() + .create_or_get(&kafka_topic, KafkaPartition::new(partition)) + }) .collect::>() .map(|v| { let v = v.expect("failed to create sequencer"); @@ -207,13 +216,13 @@ mod tests { #[tokio::test] async fn test_validate_or_insert_schema() { - let repo = Arc::new(MemCatalog::new()); + let repo = MemCatalog::new(); let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &repo).await.unwrap(); let namespace_name = "validate_schema"; // now test with a new namespace let namespace = repo - .namespace() + .namespaces() .create(namespace_name, "inf", kafka_topic.id, query_pool.id) .await .unwrap(); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index c4cf0333b1..b5e634ea81 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -2,16 +2,16 @@ //! used for testing or for an IOx designed to run without catalog persistence. use crate::interface::{ - Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId, - KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId, - ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo, - RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, + Catalog, Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, + KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, + ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, + QueryPoolRepo, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo, }; use async_trait::async_trait; use std::convert::TryFrom; use std::fmt::Formatter; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use uuid::Uuid; /// In-memory catalog that implements the `RepoCollection` and individual repo traits from @@ -48,41 +48,41 @@ struct MemCollections { parquet_files: Vec, } -impl RepoCollection for Arc { - fn kafka_topic(&self) -> Arc { - Self::clone(self) as Arc +impl Catalog for MemCatalog { + fn kafka_topics(&self) -> &dyn KafkaTopicRepo { + self } - fn query_pool(&self) -> Arc { - Self::clone(self) as Arc + fn query_pools(&self) -> &dyn QueryPoolRepo { + self } - fn namespace(&self) -> Arc { - Self::clone(self) as Arc + fn namespaces(&self) -> &dyn NamespaceRepo { + self } - fn table(&self) -> Arc { - Self::clone(self) as Arc + fn tables(&self) -> &dyn TableRepo { + self } - fn column(&self) -> Arc { - Self::clone(self) as Arc + fn columns(&self) -> &dyn ColumnRepo { + self } - fn sequencer(&self) -> Arc { - Self::clone(self) as Arc + fn sequencers(&self) -> &dyn SequencerRepo { + self } - fn partition(&self) -> Arc { - Self::clone(self) as Arc + fn partitions(&self) -> &dyn PartitionRepo { + self } - fn tombstone(&self) -> Arc { - Self::clone(self) as Arc + fn tombstones(&self) -> &dyn TombstoneRepo { + self } - fn parquet_file(&self) -> Arc { - Self::clone(self) as Arc + fn parquet_files(&self) -> &dyn ParquetFileRepo { + self } } @@ -180,7 +180,11 @@ impl TableRepo for MemCatalog { async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result
{ let mut collections = self.collections.lock().expect("mutex poisoned"); - let table = match collections.tables.iter().find(|t| t.name == name) { + let table = match collections + .tables + .iter() + .find(|t| t.name == name && t.namespace_id == namespace_id) + { Some(t) => t, None => { let table = Table { @@ -250,18 +254,22 @@ impl ColumnRepo for MemCatalog { } async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result> { - let mut columns = vec![]; - let collections = self.collections.lock().expect("mutex poisoned"); - for t in collections + + let table_ids: Vec<_> = collections .tables .iter() .filter(|t| t.namespace_id == namespace_id) - { - for c in collections.columns.iter().filter(|c| c.table_id == t.id) { - columns.push(c.clone()); - } - } + .map(|t| t.id) + .collect(); + println!("tables: {:?}", collections.tables); + println!("table_ids: {:?}", table_ids); + let columns: Vec<_> = collections + .columns + .iter() + .filter(|c| table_ids.contains(&c.table_id)) + .cloned() + .collect(); Ok(columns) } @@ -488,11 +496,10 @@ impl ParquetFileRepo for MemCatalog { #[cfg(test)] mod tests { use super::*; + use std::sync::Arc; #[tokio::test] - async fn test_mem_repo() { - let f = || Arc::new(MemCatalog::new()); - - crate::interface::test_helpers::test_repo(f).await; + async fn test_catalog() { + crate::interface::test_helpers::test_catalog(Arc::new(MemCatalog::new())).await; } } diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 2b052a9738..4fc600ea56 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1,16 +1,15 @@ //! A Postgres backed implementation of the Catalog use crate::interface::{ - Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId, + Catalog, Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo, - RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, - TableRepo, Timestamp, Tombstone, TombstoneRepo, + Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo, + Timestamp, Tombstone, TombstoneRepo, }; use async_trait::async_trait; use observability_deps::tracing::info; use sqlx::{postgres::PgPoolOptions, Executor, Pool, Postgres}; -use std::sync::Arc; use std::time::Duration; use uuid::Uuid; @@ -62,41 +61,41 @@ impl PostgresCatalog { } } -impl RepoCollection for Arc { - fn kafka_topic(&self) -> Arc { - Self::clone(self) as Arc +impl Catalog for PostgresCatalog { + fn kafka_topics(&self) -> &dyn KafkaTopicRepo { + self } - fn query_pool(&self) -> Arc { - Self::clone(self) as Arc + fn query_pools(&self) -> &dyn QueryPoolRepo { + self } - fn namespace(&self) -> Arc { - Self::clone(self) as Arc + fn namespaces(&self) -> &dyn NamespaceRepo { + self } - fn table(&self) -> Arc { - Self::clone(self) as Arc + fn tables(&self) -> &dyn TableRepo { + self } - fn column(&self) -> Arc { - Self::clone(self) as Arc + fn columns(&self) -> &dyn ColumnRepo { + self } - fn sequencer(&self) -> Arc { - Self::clone(self) as Arc + fn sequencers(&self) -> &dyn SequencerRepo { + self } - fn partition(&self) -> Arc { - Self::clone(self) as Arc + fn partitions(&self) -> &dyn PartitionRepo { + self } - fn tombstone(&self) -> Arc { - Self::clone(self) as Arc + fn tombstones(&self) -> &dyn TombstoneRepo { + self } - fn parquet_file(&self) -> Arc { - Self::clone(self) as Arc + fn parquet_files(&self) -> &dyn ParquetFileRepo { + self } } @@ -586,6 +585,7 @@ fn is_fk_violation(e: &sqlx::Error) -> bool { mod tests { use super::*; use std::env; + use std::sync::Arc; // Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set. macro_rules! maybe_skip_integration { @@ -624,17 +624,15 @@ mod tests { }}; } - async fn setup_db() -> Arc { + async fn setup_db() -> PostgresCatalog { let dsn = std::env::var("DATABASE_URL").unwrap(); - Arc::new( - PostgresCatalog::connect("test", SCHEMA_NAME, &dsn) - .await - .unwrap(), - ) + PostgresCatalog::connect("test", SCHEMA_NAME, &dsn) + .await + .unwrap() } #[tokio::test] - async fn test_repo() { + async fn test_catalog() { // If running an integration test on your laptop, this requires that you have Postgres // running and that you've done the sqlx migrations. See the README in this crate for // info to set it up. @@ -642,10 +640,9 @@ mod tests { let postgres = setup_db().await; clear_schema(&postgres.pool).await; + let postgres: Arc = Arc::new(postgres); - let f = || Arc::clone(&postgres); - - crate::interface::test_helpers::test_repo(f).await; + crate::interface::test_helpers::test_catalog(postgres).await; } async fn clear_schema(pool: &Pool) { From bb893510a0501abbfa5a81dd5a3b8b8a26dbaf29 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 21 Jan 2022 18:02:19 -0500 Subject: [PATCH 7/7] feat: Add scaffolding for ingester server * Adds a new ingester command to start an ingester server * Moves previous ingester server over to handler * Skeleton for gRPC and HTTP handlers --- Cargo.lock | 5 + influxdb_iox/Cargo.toml | 2 + influxdb_iox/src/commands/run/ingester.rs | 121 ++++++++++++++++++ influxdb_iox/src/commands/run/mod.rs | 9 ++ .../src/influxdb_ioxd/server_type/ingester.rs | 100 +++++++++++++++ .../src/influxdb_ioxd/server_type/mod.rs | 1 + ingester/Cargo.toml | 3 + ingester/src/data.rs | 8 +- ingester/src/handler.rs | 62 +++++++++ ingester/src/lib.rs | 1 + ingester/src/server.rs | 79 ++++++------ ingester/src/server/grpc.rs | 20 +++ ingester/src/server/http.rs | 50 ++++++++ iox_catalog/src/postgres.rs | 4 +- 14 files changed, 417 insertions(+), 48 deletions(-) create mode 100644 influxdb_iox/src/commands/run/ingester.rs create mode 100644 influxdb_iox/src/influxdb_ioxd/server_type/ingester.rs create mode 100644 ingester/src/handler.rs create mode 100644 ingester/src/server/grpc.rs create mode 100644 ingester/src/server/http.rs diff --git a/Cargo.lock b/Cargo.lock index dcdfed40ef..fe5cf88981 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1726,7 +1726,9 @@ dependencies = [ "influxdb_iox_client", "influxdb_line_protocol", "influxdb_storage_client", + "ingester", "internal_types", + "iox_catalog", "iox_object_store", "itertools", "job_registry", @@ -1858,10 +1860,13 @@ name = "ingester" version = "0.1.0" dependencies = [ "arrow", + "hyper", "iox_catalog", + "metric", "mutable_batch", "parking_lot", "snafu", + "thiserror", "uuid", "workspace-hack", ] diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 252ec87a0d..9cca8f74c8 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -14,7 +14,9 @@ dml = { path = "../dml" } generated_types = { path = "../generated_types" } influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] } influxdb_line_protocol = { path = "../influxdb_line_protocol" } +ingester = { path = "../ingester" } internal_types = { path = "../internal_types" } +iox_catalog = { path = "../iox_catalog" } iox_object_store = { path = "../iox_object_store" } job_registry = { path = "../job_registry" } logfmt = { path = "../logfmt" } diff --git a/influxdb_iox/src/commands/run/ingester.rs b/influxdb_iox/src/commands/run/ingester.rs new file mode 100644 index 0000000000..57ff80b6d6 --- /dev/null +++ b/influxdb_iox/src/commands/run/ingester.rs @@ -0,0 +1,121 @@ +//! Implementation of command line option for running ingester + +use std::sync::Arc; + +use crate::{ + clap_blocks::run_config::RunConfig, + influxdb_ioxd::{ + self, + server_type::{ + common_state::{CommonServerState, CommonServerStateError}, + ingester::IngesterServerType, + }, + }, +}; +use ingester::handler::IngestHandlerImpl; +use ingester::server::grpc::GrpcDelegate; +use ingester::server::{http::HttpDelegate, IngesterServer}; +use iox_catalog::interface::{Catalog, KafkaPartition}; +use iox_catalog::postgres::PostgresCatalog; +use observability_deps::tracing::*; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Run: {0}")] + Run(#[from] influxdb_ioxd::Error), + + #[error("Cannot setup server: {0}")] + Setup(#[from] crate::influxdb_ioxd::server_type::database::setup::Error), + + #[error("Invalid config: {0}")] + InvalidConfig(#[from] CommonServerStateError), + + #[error("Catalog error: {0}")] + Catalog(#[from] iox_catalog::interface::Error), + + #[error("Kafka topic {0} not found in the catalog")] + KafkaTopicNotFound(String), +} + +pub type Result = std::result::Result; + +#[derive(Debug, clap::Parser)] +#[clap( + name = "run", + about = "Runs in ingester mode", + long_about = "Run the IOx ingester server.\n\nThe configuration options below can be \ + set either with the command line flags or with the specified environment \ + variable. If there is a file named '.env' in the current working directory, \ + it is sourced before loading the configuration. +Configuration is loaded from the following sources (highest precedence first): + - command line arguments + - user set environment variables + - .env file contents + - pre-configured default values" +)] +pub struct Config { + #[clap(flatten)] + pub(crate) run_config: RunConfig, + + /// Postgres connection string + #[clap(env = "INFLUXDB_IOX_CATALOG_DNS")] + pub catalog_dsn: String, + + /// Kafka connection string + #[clap(env = "INFLUXDB_IOX_KAFKA_CONNECTION")] + pub kafka_connection: String, + + /// Kafka topic name + #[clap(env = "INFLUXDB_IOX_KAFKA_TOPIC")] + pub kafka_topic: String, + + /// Kafka partition number to start (inclusive) range with + #[clap(env = "INFLUXDB_IOX_KAFKA_PARTITION_RANGE_START")] + pub kafka_partition_range_start: i32, + + /// Kafka partition number to end (inclusive) range with + #[clap(env = "INFLUXDB_IOX_KAFKA_PARTITION_RANGE_END")] + pub kafka_partition_range_end: i32, +} + +pub async fn command(config: Config) -> Result<()> { + let common_state = CommonServerState::from_config(config.run_config.clone())?; + + let catalog: Arc = Arc::new( + PostgresCatalog::connect( + "ingester", + iox_catalog::postgres::SCHEMA_NAME, + &config.catalog_dsn, + ) + .await?, + ); + + let kafka_topic = match catalog + .kafka_topics() + .get_by_name(&config.kafka_topic) + .await? + { + Some(k) => k, + None => return Err(Error::KafkaTopicNotFound(config.kafka_topic)), + }; + let kafka_partitions: Vec<_> = (config.kafka_partition_range_start + ..config.kafka_partition_range_end) + .map(KafkaPartition::new) + .collect(); + + let ingest_handler = Arc::new(IngestHandlerImpl::new( + kafka_topic, + kafka_partitions, + catalog, + )); + let http = HttpDelegate::new(Arc::clone(&ingest_handler)); + let grpc = GrpcDelegate::new(ingest_handler); + + let ingester = IngesterServer::new(http, grpc); + let server_type = Arc::new(IngesterServerType::new(ingester, &common_state)); + + info!("starting ingester"); + + Ok(influxdb_ioxd::main(common_state, server_type).await?) +} diff --git a/influxdb_iox/src/commands/run/mod.rs b/influxdb_iox/src/commands/run/mod.rs index 6533663f0f..68882ce6d9 100644 --- a/influxdb_iox/src/commands/run/mod.rs +++ b/influxdb_iox/src/commands/run/mod.rs @@ -3,6 +3,7 @@ use snafu::{ResultExt, Snafu}; use crate::clap_blocks::run_config::RunConfig; pub mod database; +pub mod ingester; pub mod router; pub mod router2; pub mod test; @@ -19,6 +20,9 @@ pub enum Error { #[snafu(display("Error in router2 subcommand: {}", source))] Router2Error { source: router2::Error }, + #[snafu(display("Error in ingester subcommand: {}", source))] + IngesterError { source: ingester::Error }, + #[snafu(display("Error in test subcommand: {}", source))] TestError { source: test::Error }, } @@ -43,6 +47,7 @@ impl Config { Some(Command::Database(config)) => &config.run_config, Some(Command::Router(config)) => &config.run_config, Some(Command::Router2(config)) => &config.run_config, + Some(Command::Ingester(config)) => &config.run_config, Some(Command::Test(config)) => &config.run_config, } } @@ -59,6 +64,9 @@ enum Command { /// Run the server in router2 mode Router2(router2::Config), + /// Run the server in ingester mode + Ingester(ingester::Config), + /// Run the server in test mode Test(test::Config), } @@ -76,6 +84,7 @@ pub async fn command(config: Config) -> Result<()> { Some(Command::Database(config)) => database::command(config).await.context(DatabaseSnafu), Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu), Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu), + Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu), Some(Command::Test(config)) => test::command(config).await.context(TestSnafu), } } diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/ingester.rs b/influxdb_iox/src/influxdb_ioxd/server_type/ingester.rs new file mode 100644 index 0000000000..ecb9bd7ad7 --- /dev/null +++ b/influxdb_iox/src/influxdb_ioxd/server_type/ingester.rs @@ -0,0 +1,100 @@ +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; + +use async_trait::async_trait; +use hyper::{Body, Request, Response}; +use ingester::server::IngesterServer; +use metric::Registry; +use tokio_util::sync::CancellationToken; +use trace::TraceCollector; + +use crate::influxdb_ioxd::{ + http::error::{HttpApiError, HttpApiErrorSource}, + rpc::RpcBuilderInput, + server_type::{common_state::CommonServerState, RpcError, ServerType}, +}; +use ingester::handler::IngestHandler; + +#[derive(Debug)] +pub struct IngesterServerType { + server: IngesterServer, + shutdown: CancellationToken, + trace_collector: Option>, +} + +impl IngesterServerType { + pub fn new(server: IngesterServer, common_state: &CommonServerState) -> Self { + Self { + server, + shutdown: CancellationToken::new(), + trace_collector: common_state.trace_collector(), + } + } +} + +#[async_trait] +impl ServerType for IngesterServerType { + type RouteError = IoxHttpErrorAdaptor; + + /// Return the [`metric::Registry`] used by the router. + fn metric_registry(&self) -> Arc { + self.server.metric_registry() + } + + /// Returns the trace collector for router traces. + fn trace_collector(&self) -> Option> { + self.trace_collector.as_ref().map(Arc::clone) + } + + /// Dispatches `req` to the router [`HttpDelegate`] delegate. + /// + /// [`HttpDelegate`]: router2::server::http::HttpDelegate + async fn route_http_request( + &self, + _req: Request, + ) -> Result, Self::RouteError> { + unimplemented!(); + } + + /// Registers the services exposed by the router [`GrpcDelegate`] delegate. + /// + /// [`GrpcDelegate`]: router2::server::grpc::GrpcDelegate + async fn server_grpc(self: Arc, _builder_input: RpcBuilderInput) -> Result<(), RpcError> { + unimplemented!() + // let builder = setup_builder!(builder_input, self); + // add_service!(builder, self.server.grpc().write_service()); + // serve_builder!(builder); + // + // Ok(()) + } + + async fn join(self: Arc) { + self.shutdown.cancelled().await; + } + + fn shutdown(&self) { + self.shutdown.cancel(); + } +} + +/// This adaptor converts the `ingester` http error type into a type that +/// satisfies the requirements of influxdb_ioxd's runner framework, keeping the +/// two decoupled. +#[derive(Debug)] +pub struct IoxHttpErrorAdaptor(router2::server::http::Error); + +impl Display for IoxHttpErrorAdaptor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.0, f) + } +} + +impl std::error::Error for IoxHttpErrorAdaptor {} + +impl HttpApiErrorSource for IoxHttpErrorAdaptor { + fn to_http_api_error(&self) -> HttpApiError { + HttpApiError::new(self.0.as_status_code(), self.to_string()) + } +} diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs b/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs index ade35bd93c..59da6bf504 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs @@ -10,6 +10,7 @@ use crate::influxdb_ioxd::{http::error::HttpApiErrorSource, rpc::RpcBuilderInput pub mod common_state; pub mod database; +pub mod ingester; pub mod router; pub mod router2; pub mod test; diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 7e1d8f1719..8df4f7bf46 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -6,9 +6,12 @@ edition = "2021" [dependencies] arrow = { version = "7.0", features = ["prettyprint"] } +hyper = "0.14" iox_catalog = { path = "../iox_catalog" } +metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch"} parking_lot = "0.11.2" snafu = "0.7" +thiserror = "1.0" uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 35eba45818..2e1767eb21 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -5,10 +5,10 @@ use arrow::record_batch::RecordBatch; use std::{collections::BTreeMap, sync::Arc}; use uuid::Uuid; -use crate::server::IngesterServer; +use crate::handler::IngestHandlerImpl; use iox_catalog::interface::{ - Catalog, KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, SequenceNumber, SequencerId, - TableId, Tombstone, + KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId, + Tombstone, }; use mutable_batch::MutableBatch; use parking_lot::RwLock; @@ -54,7 +54,7 @@ pub struct Sequencers { impl Sequencers { /// One time initialize Sequencers of this Ingester - pub async fn initialize(ingester: &IngesterServer<'_, T>) -> Result { + pub async fn initialize(ingester: &IngestHandlerImpl) -> Result { // Get sequencer ids from the catalog let sequencer_repro = ingester.iox_catalog.sequencers(); let mut sequencers = BTreeMap::default(); diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs new file mode 100644 index 0000000000..58d9c23188 --- /dev/null +++ b/ingester/src/handler.rs @@ -0,0 +1,62 @@ +//! Ingest handler + +use std::sync::Arc; + +use iox_catalog::interface::{Catalog, KafkaPartition, KafkaTopic, KafkaTopicId}; +use std::fmt::Formatter; + +/// The [`IngestHandler`] handles all ingest from kafka, persistence and queries +pub trait IngestHandler {} + +/// Implementation of the `IngestHandler` trait to ingest from kafka and manage persistence and answer queries +pub struct IngestHandlerImpl { + /// Kafka Topic assigned to this ingester + kafka_topic: KafkaTopic, + /// Kafka Partitions (Shards) assigned to this INgester + kafka_partitions: Vec, + /// Catalog of this ingester + pub iox_catalog: Arc, +} + +impl std::fmt::Debug for IngestHandlerImpl { + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +impl IngestHandlerImpl { + /// Initialize the Ingester + pub fn new( + topic: KafkaTopic, + shard_ids: Vec, + catalog: Arc, + ) -> Self { + Self { + kafka_topic: topic, + kafka_partitions: shard_ids, + iox_catalog: catalog, + } + } + + /// Return a kafka topic + pub fn get_topic(&self) -> KafkaTopic { + self.kafka_topic.clone() + } + + /// Return a kafka topic id + pub fn get_topic_id(&self) -> KafkaTopicId { + self.kafka_topic.id + } + + /// Return a kafka topic name + pub fn get_topic_name(&self) -> String { + self.kafka_topic.name.clone() + } + + /// Return Kafka Partitions + pub fn get_kafka_partitions(&self) -> Vec { + self.kafka_partitions.clone() + } +} + +impl IngestHandler for IngestHandlerImpl {} diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index 31bc719a49..57b5a1a450 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -14,4 +14,5 @@ #[allow(dead_code)] pub mod data; +pub mod handler; pub mod server; diff --git a/ingester/src/server.rs b/ingester/src/server.rs index 7c48be5389..324b8cbf5b 100644 --- a/ingester/src/server.rs +++ b/ingester/src/server.rs @@ -1,54 +1,49 @@ -//! Ingester Server -//! +//! Ingester server entrypoint. use std::sync::Arc; -use iox_catalog::interface::{Catalog, KafkaPartition, KafkaTopic, KafkaTopicId}; +use self::{grpc::GrpcDelegate, http::HttpDelegate}; +use crate::handler::IngestHandler; +use std::fmt::Debug; -/// The [`IngesterServer`] manages the lifecycle and contains all state for -/// an `ingester` server instance. -pub struct IngesterServer<'a, T> -where - T: Catalog, -{ - /// Kafka Topic assigned to this ingester - kafka_topic: KafkaTopic, - /// Kafka Partitions (Shards) assigned to this INgester - kafka_partitions: Vec, - /// Catalog of this ingester - pub iox_catalog: &'a Arc, +pub mod grpc; +pub mod http; + +/// The [`IngesterServer`] manages the lifecycle and contains all state for a +/// `ingester` server instance. +#[derive(Debug, Default)] +pub struct IngesterServer { + metrics: Arc, + + http: HttpDelegate, + grpc: GrpcDelegate, } -impl<'a, T> IngesterServer<'a, T> -where - T: Catalog, -{ - /// Initialize the Ingester - pub fn new(topic: KafkaTopic, shard_ids: Vec, catalog: &'a Arc) -> Self { +impl IngesterServer { + /// Initialise a new [`IngesterServer`] using the provided HTTP and gRPC + /// handlers. + pub fn new(http: HttpDelegate, grpc: GrpcDelegate) -> Self { Self { - kafka_topic: topic, - kafka_partitions: shard_ids, - iox_catalog: catalog, + metrics: Default::default(), + http, + grpc, } } - /// Return a kafka topic - pub fn get_topic(&self) -> KafkaTopic { - self.kafka_topic.clone() - } - - /// Return a kafka topic id - pub fn get_topic_id(&self) -> KafkaTopicId { - self.kafka_topic.id - } - - /// Return a kafka topic name - pub fn get_topic_name(&self) -> String { - self.kafka_topic.name.clone() - } - - /// Return Kafka Partitions - pub fn get_kafka_partitions(&self) -> Vec { - self.kafka_partitions.clone() + /// Return the [`metric::Registry`] used by the router. + pub fn metric_registry(&self) -> Arc { + Arc::clone(&self.metrics) + } +} + +impl IngesterServer { + /// Get a reference to the router http delegate. + pub fn http(&self) -> &HttpDelegate { + &self.http + } + + /// Get a reference to the router grpc delegate. + pub fn grpc(&self) -> &GrpcDelegate { + &self.grpc } } diff --git a/ingester/src/server/grpc.rs b/ingester/src/server/grpc.rs new file mode 100644 index 0000000000..09f460ed8e --- /dev/null +++ b/ingester/src/server/grpc.rs @@ -0,0 +1,20 @@ +//! gRPC service implementations for `ingester`. + +use crate::handler::IngestHandler; +use std::sync::Arc; + +/// This type is responsible for managing all gRPC services exposed by +/// `ingester`. +#[derive(Debug, Default)] +pub struct GrpcDelegate { + #[allow(dead_code)] + ingest_handler: Arc, +} + +impl GrpcDelegate { + /// Initialise a new [`GrpcDelegate`] passing valid requests to the + /// specified `ingest_handler`. + pub fn new(ingest_handler: Arc) -> Self { + Self { ingest_handler } + } +} diff --git a/ingester/src/server/http.rs b/ingester/src/server/http.rs new file mode 100644 index 0000000000..09a4d26fa1 --- /dev/null +++ b/ingester/src/server/http.rs @@ -0,0 +1,50 @@ +//! HTTP service implementations for `ingester`. + +use crate::handler::IngestHandler; +use hyper::{Body, Request, Response, StatusCode}; +use std::sync::Arc; +use thiserror::Error; + +/// Errors returned by the `router2` HTTP request handler. +#[derive(Debug, Error, Copy, Clone)] +pub enum Error { + /// The requested path has no registered handler. + #[error("not found")] + NotFound, +} + +impl Error { + /// Convert the error into an appropriate [`StatusCode`] to be returned to + /// the end user. + pub fn as_status_code(&self) -> StatusCode { + match self { + Error::NotFound => StatusCode::NOT_FOUND, + } + } +} + +/// This type is responsible for servicing requests to the `ingester` HTTP +/// endpoint. +/// +/// Requests to some paths may be handled externally by the caller - the IOx +/// server runner framework takes care of implementing the heath endpoint, +/// metrics, pprof, etc. +#[derive(Debug, Default)] +pub struct HttpDelegate { + #[allow(dead_code)] + ingest_handler: Arc, +} + +impl HttpDelegate { + /// Initialise a new [`HttpDelegate`] passing valid requests to the + /// specified `ingest_handler`. + pub fn new(ingest_handler: Arc) -> Self { + Self { ingest_handler } + } + + /// Routes `req` to the appropriate handler, if any, returning the handler + /// response. + pub fn route(&self, _req: Request) -> Result, Error> { + unimplemented!() + } +} diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 4fc600ea56..d790500427 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -16,8 +16,8 @@ use uuid::Uuid; const MAX_CONNECTIONS: u32 = 5; const CONNECT_TIMEOUT: Duration = Duration::from_secs(2); const IDLE_TIMEOUT: Duration = Duration::from_secs(500); -#[allow(dead_code)] -const SCHEMA_NAME: &str = "iox_catalog"; +/// the default schema name to use in Postgres +pub const SCHEMA_NAME: &str = "iox_catalog"; /// In-memory catalog that implements the `RepoCollection` and individual repo traits. #[derive(Debug)]