diff --git a/client_util/src/connection.rs b/client_util/src/connection.rs index 15a91741f9..fde94490db 100644 --- a/client_util/src/connection.rs +++ b/client_util/src/connection.rs @@ -60,7 +60,7 @@ pub type Result = std::result::Result; /// use client_util::connection::Builder; /// use std::time::Duration; /// -/// let connection = Builder::default() +/// let connection = Builder::new() /// .timeout(Duration::from_secs(42)) /// .user_agent("my_awesome_client") /// .build("http://127.0.0.1:8082/") @@ -88,6 +88,11 @@ impl std::default::Default for Builder { } impl Builder { + /// Create a new default builder + pub fn new() -> Self { + Default::default() + } + /// Construct the [`Connection`] instance using the specified base URL. pub async fn build(self, dst: D) -> Result where diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index 5635d11c19..7088c413a7 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -131,7 +131,7 @@ impl ColumnId { /// "sequencer_number" in the `write_buffer` which currently means /// "kafka partition". /// -/// https://github.com/influxdata/influxdb_iox/issues/4237 +/// #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] #[sqlx(transparent)] pub struct SequencerId(i16); @@ -798,7 +798,7 @@ pub struct ProcessedTombstone { pub parquet_file_id: ParquetFileId, } -/// Request received from the query service for data the ingester has +/// Request from the querier service to the ingester service #[derive(Debug, PartialEq)] pub struct IngesterQueryRequest { /// namespace to search diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 121e62af7b..38bdfb4671 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -365,7 +365,8 @@ pub async fn command(config: Config) -> Result<()> { ) .await?; - info!("starting querier"); + let ingester_address = format!("http://{}", ingester_run_config.grpc_bind_address); + info!(%ingester_address, "starting querier"); let querier = create_querier_server_type( &common_state, metrics, @@ -373,6 +374,7 @@ pub async fn command(config: Config) -> Result<()> { object_store, time_provider, exec, + ingester_address, ) .await; diff --git a/influxdb_iox/src/commands/run/querier.rs b/influxdb_iox/src/commands/run/querier.rs index efbd383dd5..fe22a3c0f5 100644 --- a/influxdb_iox/src/commands/run/querier.rs +++ b/influxdb_iox/src/commands/run/querier.rs @@ -59,6 +59,18 @@ pub struct Config { /// If not specified, defaults to the number of cores on the system #[clap(long = "--num-query-threads", env = "INFLUXDB_IOX_NUM_QUERY_THREADS")] pub num_query_threads: Option, + + /// gRPC address for the querier to talk with the + /// ingester. For example "http://127.0.0.1:8083" + /// + /// Note this is a workround for + /// + #[clap( + long = "--ingester-address", + env = "INFLUXDB_IOX_INGESTER_ADDRESS", + default_value = "http://127.0.0.1:8083" + )] + pub ingester_address: String, } pub async fn command(config: Config) -> Result<(), Error> { @@ -80,6 +92,7 @@ pub async fn command(config: Config) -> Result<(), Error> { let num_threads = config.num_query_threads.unwrap_or_else(num_cpus::get); info!(%num_threads, "using specified number of threads per thread pool"); + info!(ingester_address=%config.ingester_address, "using ingester address"); let exec = Arc::new(Executor::new(num_threads)); let server_type = create_querier_server_type( @@ -89,6 +102,7 @@ pub async fn command(config: Config) -> Result<(), Error> { object_store, time_provider, exec, + config.ingester_address, ) .await; diff --git a/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs b/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs index b10e9ba032..5b57bd0bef 100644 --- a/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs +++ b/influxdb_iox/tests/end_to_end_ng_cases/all_in_one.rs @@ -1,8 +1,8 @@ use arrow_util::assert_batches_sorted_eq; use http::StatusCode; use test_helpers_end_to_end_ng::{ - get_write_token, maybe_skip_integration, query_when_readable, rand_name, write_to_router, - ServerFixture, TestConfig, + get_write_token, maybe_skip_integration, rand_name, run_query, wait_for_persisted, + write_to_router, ServerFixture, TestConfig, }; #[tokio::test] @@ -25,16 +25,16 @@ async fn smoke() { let response = write_to_router(lp, org, bucket, all_in_one.server().router_http_base()).await; + // wait for data to be persisted to parquet assert_eq!(response.status(), StatusCode::NO_CONTENT); let write_token = get_write_token(&response); + wait_for_persisted(write_token, all_in_one.server().ingester_grpc_connection()).await; - // run query in a loop until the data becomes available + // run query let sql = format!("select * from {}", table_name); - let batches = query_when_readable( + let batches = run_query( sql, namespace, - write_token, - all_in_one.server().ingester_grpc_connection(), all_in_one.server().querier_grpc_connection(), ) .await; diff --git a/influxdb_iox/tests/end_to_end_ng_cases/namespace.rs b/influxdb_iox/tests/end_to_end_ng_cases/namespace.rs index 93194f3465..9f307183fe 100644 --- a/influxdb_iox/tests/end_to_end_ng_cases/namespace.rs +++ b/influxdb_iox/tests/end_to_end_ng_cases/namespace.rs @@ -10,7 +10,7 @@ async fn querier_namespace_client() { let router2_config = TestConfig::new_router2(&database_url); let ingester_config = TestConfig::new_ingester(&router2_config).with_fast_parquet_generation(); - let querier_config = TestConfig::new_querier(&router2_config); + let querier_config = TestConfig::new_querier(&ingester_config); // Set up the cluster ==================================== let cluster = MiniCluster::new() diff --git a/influxdb_iox/tests/end_to_end_ng_cases/querier.rs b/influxdb_iox/tests/end_to_end_ng_cases/querier.rs index d74b081862..67b9353829 100644 --- a/influxdb_iox/tests/end_to_end_ng_cases/querier.rs +++ b/influxdb_iox/tests/end_to_end_ng_cases/querier.rs @@ -1,9 +1,67 @@ use arrow_util::assert_batches_sorted_eq; use http::StatusCode; use test_helpers_end_to_end_ng::{ - get_write_token, maybe_skip_integration, query_when_readable, MiniCluster, TestConfig, + get_write_token, maybe_skip_integration, run_query, wait_for_persisted, wait_for_readable, + MiniCluster, TestConfig, }; +#[tokio::test] +async fn basic_ingester() { + let database_url = maybe_skip_integration!(); + + let table_name = "the_table"; + + let router2_config = TestConfig::new_router2(&database_url); + //let ingester_config = TestConfig::new_ingester(&router2_config); + // TEMP: use fast parquet generation until we have completed + // https://github.com/influxdata/influxdb_iox/pull/4255 + let ingester_config = TestConfig::new_ingester(&router2_config).with_fast_parquet_generation(); + let querier_config = TestConfig::new_querier(&ingester_config); + + // Set up the cluster ==================================== + let cluster = MiniCluster::new() + .with_router2(router2_config) + .await + .with_ingester(ingester_config) + .await + .with_querier(querier_config) + .await; + + // Write some data into the v2 HTTP API ============== + let lp = format!( + "{},tag1=A,tag2=B val=42i 123456\n\ + {},tag1=A,tag2=C val=43i 123457", + table_name, table_name + ); + let response = cluster.write_to_router(lp).await; + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + // Wait for data to be readable + let write_token = get_write_token(&response); + wait_for_readable(&write_token, cluster.ingester().ingester_grpc_connection()).await; + // TODO remove this as part of https://github.com/influxdata/influxdb_iox/pull/4255 + wait_for_persisted(write_token, cluster.ingester().ingester_grpc_connection()).await; + + // run query + let sql = format!("select * from {}", table_name); + let batches = run_query( + sql, + cluster.namespace(), + cluster.querier().querier_grpc_connection(), + ) + .await; + + let expected = [ + "+------+------+--------------------------------+-----+", + "| tag1 | tag2 | time | val |", + "+------+------+--------------------------------+-----+", + "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", + "| A | C | 1970-01-01T00:00:00.000123457Z | 43 |", + "+------+------+--------------------------------+-----+", + ]; + assert_batches_sorted_eq!(&expected, &batches); +} + #[tokio::test] async fn basic_on_parquet() { let database_url = maybe_skip_integration!(); @@ -11,8 +69,9 @@ async fn basic_on_parquet() { let table_name = "the_table"; let router2_config = TestConfig::new_router2(&database_url); + // fast parquet let ingester_config = TestConfig::new_ingester(&router2_config).with_fast_parquet_generation(); - let querier_config = TestConfig::new_querier(&router2_config); + let querier_config = TestConfig::new_querier(&ingester_config); // Set up the cluster ==================================== let cluster = MiniCluster::new() @@ -28,17 +87,15 @@ async fn basic_on_parquet() { let response = cluster.write_to_router(lp).await; assert_eq!(response.status(), StatusCode::NO_CONTENT); - // assert that the response contains a write token + // Wait for data to be persisted to parquet let write_token = get_write_token(&response); - assert!(!write_token.is_empty()); + wait_for_persisted(write_token, cluster.ingester().ingester_grpc_connection()).await; - // run query in a loop until the data becomes available + // run query let sql = format!("select * from {}", table_name); - let batches = query_when_readable( + let batches = run_query( sql, cluster.namespace(), - write_token, - cluster.ingester().ingester_grpc_connection(), cluster.querier().querier_grpc_connection(), ) .await; diff --git a/ioxd_querier/src/lib.rs b/ioxd_querier/src/lib.rs index 370f9737c9..4475e220ef 100644 --- a/ioxd_querier/src/lib.rs +++ b/ioxd_querier/src/lib.rs @@ -8,7 +8,9 @@ use hyper::{Body, Request, Response}; use iox_catalog::interface::Catalog; use metric::Registry; use object_store::DynObjectStore; -use querier::{QuerierDatabase, QuerierHandler, QuerierHandlerImpl, QuerierServer}; +use querier::{ + create_ingester_connection, QuerierDatabase, QuerierHandler, QuerierHandlerImpl, QuerierServer, +}; use query::exec::Executor; use time::TimeProvider; use trace::TraceCollector; @@ -130,6 +132,7 @@ pub async fn create_querier_server_type( object_store: Arc, time_provider: Arc, exec: Arc, + ingester_address: String, ) -> Arc { let database = Arc::new(QuerierDatabase::new( catalog, @@ -137,6 +140,7 @@ pub async fn create_querier_server_type( object_store, time_provider, exec, + create_ingester_connection(ingester_address), )); let querier_handler = Arc::new(QuerierHandlerImpl::new(Arc::clone(&database))); diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index da684eda9e..16fbeff0d7 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -55,6 +55,7 @@ impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl mod tests { use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService; use iox_tests::util::TestCatalog; + use querier::create_ingester_connection_for_testing; use super::*; @@ -68,6 +69,7 @@ mod tests { catalog.object_store(), catalog.time_provider(), catalog.exec(), + create_ingester_connection_for_testing(), )); let service = NamespaceServiceImpl::new(db); @@ -89,6 +91,7 @@ mod tests { catalog.object_store(), catalog.time_provider(), catalog.exec(), + create_ingester_connection_for_testing(), )); let service = NamespaceServiceImpl::new(db); diff --git a/querier/src/database.rs b/querier/src/database.rs index 520fc4b0fe..f24d99c97c 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -1,8 +1,8 @@ //! Database for the querier that contains all namespaces. use crate::{ - cache::CatalogCache, chunk::ParquetChunkAdapter, namespace::QuerierNamespace, - query_log::QueryLog, + cache::CatalogCache, chunk::ParquetChunkAdapter, ingester::IngesterConnection, + namespace::QuerierNamespace, query_log::QueryLog, }; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; @@ -52,6 +52,9 @@ pub struct QuerierDatabase { /// Executor for queries. exec: Arc, + /// Connection to ingester + ingester_connection: Arc, + /// Query log. query_log: Arc, } @@ -73,6 +76,7 @@ impl QuerierDatabase { object_store: Arc, time_provider: Arc, exec: Arc, + ingester_connection: Arc, ) -> Self { let catalog_cache = Arc::new(CatalogCache::new( Arc::clone(&catalog), @@ -96,6 +100,7 @@ impl QuerierDatabase { object_store, time_provider, exec, + ingester_connection, query_log, } } @@ -114,6 +119,7 @@ impl QuerierDatabase { schema, name, Arc::clone(&self.exec), + Arc::clone(&self.ingester_connection), Arc::clone(&self.query_log), ))) } @@ -134,6 +140,8 @@ impl QuerierDatabase { mod tests { use iox_tests::util::TestCatalog; + use crate::create_ingester_connection_for_testing; + use super::*; #[tokio::test] @@ -146,6 +154,7 @@ mod tests { catalog.object_store(), catalog.time_provider(), catalog.exec(), + create_ingester_connection_for_testing(), ); catalog.create_namespace("ns1").await; diff --git a/querier/src/handler.rs b/querier/src/handler.rs index 2937e7d029..45dc838ca8 100644 --- a/querier/src/handler.rs +++ b/querier/src/handler.rs @@ -115,6 +115,8 @@ mod tests { use query::exec::Executor; use time::{MockProvider, Time}; + use crate::create_ingester_connection_for_testing; + use super::*; #[tokio::test] @@ -151,6 +153,7 @@ mod tests { object_store, time_provider, exec, + create_ingester_connection_for_testing(), )); let querier = QuerierHandlerImpl::new(database); diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs new file mode 100644 index 0000000000..edf88839d9 --- /dev/null +++ b/querier/src/ingester/mod.rs @@ -0,0 +1,206 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use client_util::connection; +use observability_deps::tracing::debug; +use predicate::Predicate; +use query::{QueryChunk, QueryChunkMeta}; +use schema::Schema; +use snafu::{ResultExt, Snafu}; + +use crate::{QuerierFlightClient, QuerierFlightError}; + +use self::test_util::MockIngesterConnection; + +mod test_util; + +#[derive(Debug, Snafu)] +#[allow(missing_copy_implementations, missing_docs)] +pub enum Error { + #[snafu(display("Failed to select columns: {}", source))] + SelectColumns { source: schema::Error }, + + #[snafu(display("Failed to connect to ingester '{}': {}", ingester_address, source))] + Connecting { + ingester_address: String, + source: connection::Error, + }, + + #[snafu(display("Failed ingester handshake '{}': {}", ingester_address, source))] + Handshake { + ingester_address: String, + source: QuerierFlightError, + }, +} + +pub type Result = std::result::Result; + +/// Create a new connection given an `ingester_address` such as +/// "http://127.0.0.1:8083" +pub fn create_ingester_connection(ingester_address: String) -> Arc { + Arc::new(IngesterConnectionImpl::new(ingester_address)) +} + +/// Create a new ingester suitable for testing +pub fn create_ingester_connection_for_testing() -> Arc { + Arc::new(MockIngesterConnection::new()) +} + +/// Handles communicating with the ingester(s) to retrieve +/// data that is not yet persisted +#[async_trait] +pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static { + /// Returns all partitions ingester(s) know about for the specified table. + async fn partitions( + &self, + namespace_name: Arc, + table_name: Arc, + columns: Vec, + predicate: &Predicate, + expected_schema: &Schema, + ) -> Result>>; +} + +// IngesterConnection that communicates with an ingester. +#[allow(missing_copy_implementations)] +#[derive(Debug)] +pub(crate) struct IngesterConnectionImpl { + ingester_address: String, +} + +impl IngesterConnectionImpl { + /// Create a new connection given an `ingester_address` such as + /// "http://127.0.0.1:8083" + pub fn new(ingester_address: String) -> Self { + Self { ingester_address } + } +} + +#[async_trait] +impl IngesterConnection for IngesterConnectionImpl { + /// Retrieve chunks from the ingester for the particular table and + /// predicate + async fn partitions( + &self, + _namespace_name: Arc, + table_name: Arc, + _columns: Vec, + _predicate: &Predicate, + _expected_schema: &Schema, + ) -> Result>> { + let ingester_address = &self.ingester_address; + debug!(%ingester_address, %table_name, "Connecting to ingester"); + + // TODO maybe cache this connection + let connection = connection::Builder::new() + .build(&self.ingester_address) + .await + .context(ConnectingSnafu { ingester_address })?; + + let mut client = QuerierFlightClient::new(connection); + + // make contact with the ingester + client + .handshake() + .await + .context(HandshakeSnafu { ingester_address })?; + + // TODO Coming Soon: create the actual IngesterPartition and return them here. + + Ok(vec![]) + } +} + +/// A wrapper around the unpersisted data in a partition returned by +/// the ingester that (will) implement the `QueryChunk` interface +/// +/// Given the catalog heirarchy: +/// +/// ```text +/// (Catalog) Sequencer -> (Catalog) Table --> (Catalog) Partition +/// ``` +/// +/// An IngesterPartition contains the unpersisted data for a catalog +/// partition from a sequencer. Thus, there can be more than one +/// IngesterPartition for each table the ingester knows about. +#[allow(missing_copy_implementations)] +#[derive(Debug, Clone)] +pub struct IngesterPartition {} + +impl QueryChunkMeta for IngesterPartition { + fn summary(&self) -> Option<&data_types2::TableSummary> { + todo!() + } + + fn schema(&self) -> Arc { + todo!() + } + + fn sort_key(&self) -> Option<&schema::sort::SortKey> { + todo!() + } + + fn delete_predicates(&self) -> &[Arc] { + todo!() + } +} + +impl QueryChunk for IngesterPartition { + fn id(&self) -> data_types2::ChunkId { + todo!() + } + + fn addr(&self) -> data_types2::ChunkAddr { + todo!() + } + + fn table_name(&self) -> &str { + todo!() + } + + fn may_contain_pk_duplicates(&self) -> bool { + todo!() + } + + fn apply_predicate_to_metadata( + &self, + _predicate: &Predicate, + ) -> Result { + todo!() + } + + fn column_names( + &self, + _ctx: query::exec::IOxSessionContext, + _predicate: &Predicate, + _columns: schema::selection::Selection<'_>, + ) -> Result, query::QueryChunkError> { + todo!() + } + + fn column_values( + &self, + _ctx: query::exec::IOxSessionContext, + _column_name: &str, + _predicate: &Predicate, + ) -> Result, query::QueryChunkError> { + todo!() + } + + fn read_filter( + &self, + _ctx: query::exec::IOxSessionContext, + _predicate: &Predicate, + _selection: schema::selection::Selection<'_>, + ) -> Result { + todo!() + } + + fn chunk_type(&self) -> &str { + todo!() + } + + fn order(&self) -> data_types2::ChunkOrder { + todo!() + } +} diff --git a/querier/src/ingester/test_util.rs b/querier/src/ingester/test_util.rs new file mode 100644 index 0000000000..d3456477d9 --- /dev/null +++ b/querier/src/ingester/test_util.rs @@ -0,0 +1,29 @@ +use std::sync::Arc; + +use async_trait::async_trait; + +use super::IngesterConnection; + +/// IngesterConnection for testing +#[derive(Debug)] +pub(crate) struct MockIngesterConnection {} + +impl MockIngesterConnection { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl IngesterConnection for MockIngesterConnection { + async fn partitions( + &self, + _namespace_name: Arc, + _table_name: Arc, + _columns: Vec, + _predicate: &predicate::Predicate, + _expected_schema: &schema::Schema, + ) -> super::Result>> { + Ok(vec![]) + } +} diff --git a/querier/src/lib.rs b/querier/src/lib.rs index 9bdab1a3ea..e7843d56aa 100644 --- a/querier/src/lib.rs +++ b/querier/src/lib.rs @@ -17,6 +17,7 @@ mod database; /// Flight client to the ingester to request in-memory data. mod flight; mod handler; +mod ingester; mod namespace; mod poison; mod query_log; @@ -26,7 +27,10 @@ mod table; mod tombstone; pub use database::QuerierDatabase; -pub use flight::Client as QuerierFlightClient; +pub use flight::{Client as QuerierFlightClient, Error as QuerierFlightError}; pub use handler::{QuerierHandler, QuerierHandlerImpl}; +pub use ingester::{ + create_ingester_connection, create_ingester_connection_for_testing, IngesterConnection, +}; pub use namespace::QuerierNamespace; pub use server::QuerierServer; diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index d475bdfb8f..2c172794e2 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -1,6 +1,7 @@ //! Namespace within the whole database. use crate::{ - cache::CatalogCache, chunk::ParquetChunkAdapter, query_log::QueryLog, table::QuerierTable, + cache::CatalogCache, chunk::ParquetChunkAdapter, ingester::IngesterConnection, + query_log::QueryLog, table::QuerierTable, }; use backoff::BackoffConfig; use data_types2::{NamespaceId, NamespaceSchema}; @@ -37,6 +38,9 @@ pub struct QuerierNamespace { /// Executor for queries. exec: Arc, + /// Connection to ingester + ingester_connection: Arc, + /// Query log. query_log: Arc, } @@ -49,25 +53,28 @@ impl QuerierNamespace { schema: Arc, name: Arc, exec: Arc, + ingester_connection: Arc, query_log: Arc, ) -> Self { let tables: HashMap<_, _> = schema .tables .iter() - .map(|(name, table_schema)| { - let name = Arc::from(name.clone()); + .map(|(table_name, table_schema)| { + let table_name = Arc::from(table_name.clone()); let id = table_schema.id; let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema"); let table = Arc::new(QuerierTable::new( + Arc::clone(&name), backoff_config.clone(), id, - Arc::clone(&name), + Arc::clone(&table_name), Arc::new(schema), + Arc::clone(&ingester_connection), Arc::clone(&chunk_adapter), )); - (name, table) + (table_name, table) }) .collect(); @@ -78,11 +85,13 @@ impl QuerierNamespace { name, tables: Arc::new(tables), exec, + ingester_connection, query_log, } } /// Create new namespace for given schema, for testing. + #[allow(clippy::too_many_arguments)] pub fn new_testing( catalog: Arc, object_store: Arc, @@ -91,6 +100,7 @@ impl QuerierNamespace { name: Arc, schema: Arc, exec: Arc, + ingester_connection: Arc, ) -> Self { let catalog_cache = Arc::new(CatalogCache::new(catalog, Arc::clone(&time_provider))); let chunk_adapter = Arc::new(ParquetChunkAdapter::new( @@ -107,6 +117,7 @@ impl QuerierNamespace { schema, name, exec, + ingester_connection, query_log, ) } diff --git a/querier/src/namespace/query_access.rs b/querier/src/namespace/query_access.rs index 5b163e133f..662dbe5007 100644 --- a/querier/src/namespace/query_access.rs +++ b/querier/src/namespace/query_access.rs @@ -1,3 +1,5 @@ +//! This module contains implementations of [query] interfaces for +//! [QuerierNamespace]. use std::{any::Any, collections::HashMap, sync::Arc}; use async_trait::async_trait; @@ -6,6 +8,7 @@ use datafusion::{ catalog::{catalog::CatalogProvider, schema::SchemaProvider}, datasource::TableProvider, }; +use observability_deps::tracing::trace; use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate}; use query::{ exec::{ExecutionContextProvider, ExecutorType, IOxSessionContext}, @@ -45,11 +48,16 @@ impl QueryDatabase for QuerierNamespace { Some(table) => table, None => { // table gone + trace!(%table_name, "No entry for table"); return Ok(vec![]); } }; - let mut chunks = table.chunks().await; + let mut chunks = table + .chunks(predicate) + .await + // TODO QuerierNamespace trait needs to be updated to return Result + .expect("Success getting chunks"); // if there is a field restriction on the predicate, only // chunks with that field should be returned. If the chunk has diff --git a/querier/src/namespace/test_util.rs b/querier/src/namespace/test_util.rs index 0a817c858c..7adf7bc09f 100644 --- a/querier/src/namespace/test_util.rs +++ b/querier/src/namespace/test_util.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use iox_catalog::interface::get_schema_by_name; use iox_tests::util::TestNamespace; +use crate::create_ingester_connection_for_testing; + use super::QuerierNamespace; /// Create [`QuerierNamespace`] for testing. @@ -22,5 +24,6 @@ pub async fn querier_namespace(ns: &Arc) -> QuerierNamespace { ns.namespace.name.clone().into(), schema, ns.catalog.exec(), + create_ingester_connection_for_testing(), ) } diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 24cc0c4a38..d777317729 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -1,11 +1,17 @@ use std::{collections::HashMap, sync::Arc}; +use crate::{ + chunk::ParquetChunkAdapter, + ingester::{self, IngesterPartition}, + tombstone::QuerierTombstone, + IngesterConnection, +}; use backoff::{Backoff, BackoffConfig}; use data_types2::TableId; +use predicate::Predicate; use query::{provider::ChunkPruner, QueryChunk}; use schema::Schema; - -use crate::{chunk::ParquetChunkAdapter, tombstone::QuerierTombstone}; +use snafu::{ResultExt, Snafu}; use self::query_access::QuerierTableChunkPruner; @@ -14,9 +20,21 @@ mod query_access; #[cfg(test)] mod test_util; +#[derive(Debug, Snafu)] +#[allow(missing_copy_implementations, missing_docs)] +pub enum Error { + #[snafu(display("Error getting partitions from ingester: {}", source))] + GettingIngesterPartitions { source: ingester::Error }, +} + +pub type Result = std::result::Result; + /// Table representation for the querier. #[derive(Debug)] pub struct QuerierTable { + /// Namespace the table is in + namespace_name: Arc, + /// Backoff config for IO operations. backoff_config: BackoffConfig, @@ -29,6 +47,9 @@ pub struct QuerierTable { /// Table schema. schema: Arc, + /// Connection to ingester + ingester_connection: Arc, + /// Interface to create chunks for this table. chunk_adapter: Arc, } @@ -36,17 +57,21 @@ pub struct QuerierTable { impl QuerierTable { /// Create new table. pub fn new( + namespace_name: Arc, backoff_config: BackoffConfig, id: TableId, name: Arc, schema: Arc, + ingester_connection: Arc, chunk_adapter: Arc, ) -> Self { Self { + namespace_name, backoff_config, name, id, schema, + ingester_connection, chunk_adapter, } } @@ -69,7 +94,7 @@ impl QuerierTable { /// Query all chunks within this table. /// /// This currently contains all parquet files linked to their unprocessed tombstones. - pub async fn chunks(&self) -> Vec> { + pub async fn chunks(&self, predicate: &Predicate) -> Result>> { // get parquet files and tombstones in a single catalog transaction // TODO: figure out some form of caching let (parquet_files, tombstones) = Backoff::new(&self.backoff_config) @@ -93,6 +118,12 @@ impl QuerierTable { .await .expect("retry forever"); + // TODO do in parallel with fetching parquet files + let ingester_partitions = self.ingester_partitions(predicate).await?; + + // TODO: Validate that the cache contents is consistent with the + // parquet files we know about + // convert parquet files and tombstones to nicer objects let mut chunks = Vec::with_capacity(parquet_files.len()); for parquet_file in parquet_files { @@ -169,24 +200,58 @@ impl QuerierTable { chunks2.push(Arc::new(chunk) as Arc); } - chunks2 + // Add ingester chunks to the overall chunk list. What about tombstones?? + chunks2.extend(ingester_partitions.into_iter().map(|c| c as _)); + + Ok(chunks2) } /// Get a chunk pruner that can be used to prune chunks retrieved via [`chunks`](Self::chunks) pub fn chunk_pruner(&self) -> Arc { Arc::new(QuerierTableChunkPruner {}) } + + async fn ingester_partitions( + &self, + predicate: &Predicate, + ) -> Result>> { + // For now, ask for *all* columns in the table from the ingester (need + // at least all pk (time, tag) columns for + // deduplication. + // + // As a future optimization, might be able to fetch only + // fields that are needed in query + let columns: Vec = self + .schema + .iter() + .map(|(_, f)| f.name().to_string()) + .collect(); + + // get any chunks from the ingster + self.ingester_connection + .partitions( + Arc::clone(&self.namespace_name), + Arc::clone(&self.name), + columns, + predicate, + self.schema.as_ref(), + ) + .await + .context(GettingIngesterPartitionsSnafu) + } } #[cfg(test)] mod tests { use data_types2::{ChunkId, ColumnType}; use iox_tests::util::{now, TestCatalog}; + use predicate::Predicate; use crate::table::test_util::querier_table; #[tokio::test] async fn test_chunks() { + let pred = Predicate::default(); let catalog = TestCatalog::new(); let ns = catalog.create_namespace("ns").await; @@ -216,7 +281,7 @@ mod tests { let querier_table = querier_table(&catalog, &table1).await; // no parquet files yet - assert!(querier_table.chunks().await.is_empty()); + assert!(querier_table.chunks(&pred).await.unwrap().is_empty()); let file111 = partition11 .create_parquet_file_with_min_max( @@ -299,7 +364,7 @@ mod tests { // this contains all files except for: // - file111: marked for delete // - file221: wrong table - let mut chunks = querier_table.chunks().await; + let mut chunks = querier_table.chunks(&pred).await.unwrap(); chunks.sort_by_key(|c| c.id()); assert_eq!(chunks.len(), 5); diff --git a/querier/src/table/query_access.rs b/querier/src/table/query_access.rs index 00d5744c2d..b3584589ed 100644 --- a/querier/src/table/query_access.rs +++ b/querier/src/table/query_access.rs @@ -8,7 +8,7 @@ use datafusion::{ logical_plan::Expr, physical_plan::ExecutionPlan, }; -use predicate::Predicate; +use predicate::{Predicate, PredicateBuilder}; use query::{ provider::{ChunkPruner, ProviderBuilder}, pruning::{prune_chunks, PruningObserver}, @@ -43,7 +43,17 @@ impl TableProvider for QuerierTable { let mut builder = ProviderBuilder::new(self.name(), Arc::clone(self.schema())); builder = builder.add_pruner(self.chunk_pruner()); - for chunk in self.chunks().await { + let predicate = filters + .iter() + .fold(PredicateBuilder::new(), |b, expr| b.add_expr(expr.clone())) + .build(); + + let chunks = self + .chunks(&predicate) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + for chunk in chunks { builder = builder.add_chunk(chunk); } diff --git a/querier/src/table/test_util.rs b/querier/src/table/test_util.rs index 07f863449a..a4b7292073 100644 --- a/querier/src/table/test_util.rs +++ b/querier/src/table/test_util.rs @@ -5,7 +5,9 @@ use iox_catalog::interface::get_schema_by_name; use iox_tests::util::{TestCatalog, TestTable}; use schema::Schema; -use crate::{cache::CatalogCache, chunk::ParquetChunkAdapter}; +use crate::{ + cache::CatalogCache, chunk::ParquetChunkAdapter, create_ingester_connection_for_testing, +}; use super::QuerierTable; @@ -28,11 +30,15 @@ pub async fn querier_table(catalog: &Arc, table: &Arc) - let schema = catalog_schema.tables.remove(&table.table.name).unwrap(); let schema = Arc::new(Schema::try_from(schema).unwrap()); + let namespace_name = Arc::from(table.namespace.namespace.name.as_str()); + QuerierTable::new( + namespace_name, BackoffConfig::default(), table.table.id, table.table.name.clone().into(), schema, + create_ingester_connection_for_testing(), chunk_adapter, ) } diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 8008c996e4..e87dda308a 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -10,7 +10,7 @@ use db::{ use iox_catalog::interface::get_schema_by_name; use iox_tests::util::{TestCatalog, TestNamespace}; use itertools::Itertools; -use querier::QuerierNamespace; +use querier::{create_ingester_connection_for_testing, QuerierNamespace}; use query::QueryChunk; use schema::merge::SchemaMerger; use schema::selection::Selection; @@ -1256,5 +1256,6 @@ async fn make_querier_namespace(ns: Arc) -> Arc ns.namespace.name.clone().into(), schema, ns.catalog.exec(), + create_ingester_connection_for_testing(), )) } diff --git a/test_helpers_end_to_end_ng/src/client.rs b/test_helpers_end_to_end_ng/src/client.rs index 6721575d4f..9ee9e1d71c 100644 --- a/test_helpers_end_to_end_ng/src/client.rs +++ b/test_helpers_end_to_end_ng/src/client.rs @@ -58,8 +58,9 @@ pub async fn wait_for_token( F: Fn(&GetWriteInfoResponse) -> bool, { let write_token = write_token.into(); + assert!(!write_token.is_empty()); - println!("Waiting for Write Token {}", write_token); + println!(" write token: {}", write_token); let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC); let mut write_info_client = influxdb_iox_client::write_info::Client::new(ingester_connection); @@ -87,7 +88,7 @@ pub async fn wait_for_token( /// Waits for the specified write token to be readable pub async fn wait_for_readable(write_token: impl Into, ingester_connection: Connection) { - println!("Waiting for Write Token to be readable"); + println!("Waiting for write token to be readable"); wait_for_token(write_token, ingester_connection, |res| { if res.readable { @@ -102,7 +103,7 @@ pub async fn wait_for_readable(write_token: impl Into, ingester_connecti /// Waits for the write token to be persisted pub async fn wait_for_persisted(write_token: impl Into, ingester_connection: Connection) { - println!("Waiting for Write Token to be persisted"); + println!("Waiting for write token to be persisted"); wait_for_token(write_token, ingester_connection, |res| { if res.persisted { @@ -116,26 +117,14 @@ pub async fn wait_for_persisted(write_token: impl Into, ingester_connect } /// Runs a query using the flight API on the specified connection -/// until responses are produced. -/// -/// (Will) eventually wait until data from the specified write token -/// is readable, but currently waits for the data to be persisted (as -/// the querier doesn't know how to ask the ingester yet) -/// -/// The retry loop is used to wait for writes to become visible -pub async fn query_when_readable( +pub async fn run_query( sql: impl Into, namespace: impl Into, - write_token: impl Into, - ingester_connection: Connection, querier_connection: Connection, ) -> Vec { let namespace = namespace.into(); let sql = sql.into(); - // TODO: this should be "wait_for_readable" once the querier can talk to ingester - wait_for_persisted(write_token, ingester_connection).await; - let mut client = influxdb_iox_client::flight::Client::new(querier_connection); // This does nothing except test the client handshake implementation. diff --git a/test_helpers_end_to_end_ng/src/config.rs b/test_helpers_end_to_end_ng/src/config.rs index fdc3c0e82f..cbacb25cbb 100644 --- a/test_helpers_end_to_end_ng/src/config.rs +++ b/test_helpers_end_to_end_ng/src/config.rs @@ -3,6 +3,8 @@ use std::{collections::HashMap, sync::Arc}; use http::{header::HeaderName, HeaderValue}; use tempfile::TempDir; +use crate::addrs::BindAddresses; + use super::ServerType; /// Options for creating test servers (`influxdb_iox` processes) @@ -25,6 +27,9 @@ pub struct TestConfig { /// Object store directory, if needed. object_store_dir: Option>, + + /// Which ports this server should use + addrs: Arc, } impl TestConfig { @@ -38,6 +43,7 @@ impl TestConfig { dsn: dsn.into(), write_buffer_dir: None, object_store_dir: None, + addrs: Arc::new(BindAddresses::default()), } } @@ -57,10 +63,21 @@ impl TestConfig { .with_default_ingester_options() } - /// Create a minimal querier configuration, using the dsn and - /// object store from other - pub fn new_querier(other: &TestConfig) -> Self { - Self::new(ServerType::Querier, other.dsn()).with_existing_object_store(other) + /// Create a minimal querier configuration from the specified + /// ingester configuration, using the same dsn and object store, + /// and pointing at the specified ingester + pub fn new_querier(ingester_config: &TestConfig) -> Self { + assert_eq!(ingester_config.server_type(), ServerType::Ingester); + Self::new(ServerType::Querier, ingester_config.dsn()) + .with_existing_object_store(ingester_config) + // Configure to talk with the ingester + .with_ingester_address( + ingester_config + .addrs() + .ingester_grpc_api() + .client_base() + .as_ref(), + ) } /// Create a minimal all in one configuration @@ -91,6 +108,11 @@ impl TestConfig { .with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_END", "0") } + /// Adds the ingester address + fn with_ingester_address(self, ingester_address: &str) -> Self { + self.with_env("INFLUXDB_IOX_INGESTER_ADDRESS", ingester_address) + } + /// add a name=value environment variable when starting the server /// /// Should not be called directly, but instead all mapping to @@ -184,4 +206,10 @@ impl TestConfig { pub fn client_headers(&self) -> &[(HeaderName, HeaderValue)] { self.client_headers.as_ref() } + + /// Get a reference to the test config's addrs. + #[must_use] + pub fn addrs(&self) -> &BindAddresses { + &self.addrs + } } diff --git a/test_helpers_end_to_end_ng/src/server_fixture.rs b/test_helpers_end_to_end_ng/src/server_fixture.rs index c30b6a02c5..43c6fc4e58 100644 --- a/test_helpers_end_to_end_ng/src/server_fixture.rs +++ b/test_helpers_end_to_end_ng/src/server_fixture.rs @@ -81,9 +81,6 @@ pub struct TestServer { /// Handle to the server process being controlled server_process: Arc>, - /// Which ports this server should use - addrs: BindAddresses, - /// Configuration values for starting the test server test_config: TestConfig, @@ -105,17 +102,13 @@ struct Process { impl TestServer { async fn new(test_config: TestConfig) -> Self { - let addrs = BindAddresses::default(); let ready = Mutex::new(ServerState::Started); - let server_process = Arc::new(Mutex::new( - Self::create_server_process(&addrs, &test_config).await, - )); + let server_process = Arc::new(Mutex::new(Self::create_server_process(&test_config).await)); Self { ready, server_process, - addrs, test_config, router_grpc_connection: None, ingester_grpc_connection: None, @@ -149,12 +142,12 @@ impl TestServer { /// Return the http base URL for the router HTTP API pub fn router_http_base(&self) -> Arc { - self.addrs.router_http_api().client_base() + self.addrs().router_http_api().client_base() } /// Return the http base URL for the router gRPC API pub fn router_grpc_base(&self) -> Arc { - self.addrs.router_grpc_api().client_base() + self.addrs().router_grpc_api().client_base() } /// Create a connection channel to the specified gRPC endpoint @@ -183,7 +176,7 @@ impl TestServer { self.router_grpc_connection = match server_type { ServerType::AllInOne | ServerType::Router2 => { - let client_base = self.addrs.router_grpc_api().client_base(); + let client_base = self.addrs().router_grpc_api().client_base(); Some(self.grpc_channel(client_base.as_ref()).await.map_err(|e| { format!("Can not connect to router at {}: {}", client_base, e) })?) @@ -193,7 +186,7 @@ impl TestServer { self.ingester_grpc_connection = match server_type { ServerType::AllInOne | ServerType::Ingester => { - let client_base = self.addrs.ingester_grpc_api().client_base(); + let client_base = self.addrs().ingester_grpc_api().client_base(); Some(self.grpc_channel(client_base.as_ref()).await.map_err(|e| { format!("Can not connect to ingester at {}: {}", client_base, e) })?) @@ -204,7 +197,7 @@ impl TestServer { self.querier_grpc_connection = match server_type { ServerType::AllInOne | ServerType::Querier => { - let client_base = self.addrs.querier_grpc_api().client_base(); + let client_base = self.addrs().querier_grpc_api().client_base(); Some(self.grpc_channel(client_base.as_ref()).await.map_err(|e| { format!("Can not connect to querier at {}: {}", client_base, e) })?) @@ -217,7 +210,7 @@ impl TestServer { /// Returns the addresses to which the server has been bound fn addrs(&self) -> &BindAddresses { - &self.addrs + self.test_config.addrs() } /// Restarts the tests server process, but does not reconnect clients @@ -226,11 +219,11 @@ impl TestServer { let mut server_process = self.server_process.lock().await; server_process.child.kill().unwrap(); server_process.child.wait().unwrap(); - *server_process = Self::create_server_process(&self.addrs, &self.test_config).await; + *server_process = Self::create_server_process(&self.test_config).await; *ready_guard = ServerState::Started; } - async fn create_server_process(addrs: &BindAddresses, test_config: &TestConfig) -> Process { + async fn create_server_process(test_config: &TestConfig) -> Process { // Create a new file each time and keep it around to aid debugging let (log_file, log_path) = NamedTempFile::new() .expect("opening log file") @@ -266,7 +259,7 @@ impl TestServer { .env("LOG_FILTER", log_filter) .env("INFLUXDB_IOX_CATALOG_DSN", &dsn) // add http/grpc address information - .add_addr_env(server_type, addrs) + .add_addr_env(server_type, test_config.addrs()) .envs(test_config.env()) // redirect output to log file .stdout(stdout_log_file) @@ -467,7 +460,7 @@ impl std::fmt::Display for TestServer { f, "TestServer {:?} ({})", self.test_config.server_type(), - self.addrs + self.addrs() ) } } diff --git a/write_summary/src/lib.rs b/write_summary/src/lib.rs index d458bd72a5..4cb77c6f66 100644 --- a/write_summary/src/lib.rs +++ b/write_summary/src/lib.rs @@ -85,7 +85,7 @@ impl WriteSummary { ) } - /// Return a WriteSummary from the "token" (created with [to_token]), or error if not possible + /// Return a WriteSummary from the "token" (created with [Self::to_token]), or error if not possible pub fn try_from_token(token: &str) -> Result { let data = base64::decode(token) .map_err(|e| format!("Invalid write token, invalid base64: {}", e))?;