feat: Add basic Querier <--> Ingester "Service Configuration" (#4259)

* feat: Add basic Querier <--> Ingester "Service Configuration"

* docs: update comments in test

* refactor: cleanup tests a little

* refactor: make trait more consistent

* docs: improve comments in IngesterPartition
pull/24376/head
Andrew Lamb 2022-04-11 07:50:22 -04:00 committed by GitHub
parent b097b394b9
commit f6e6821276
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 528 additions and 78 deletions

View File

@ -60,7 +60,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// use client_util::connection::Builder;
/// use std::time::Duration;
///
/// let connection = Builder::default()
/// let connection = Builder::new()
/// .timeout(Duration::from_secs(42))
/// .user_agent("my_awesome_client")
/// .build("http://127.0.0.1:8082/")
@ -88,6 +88,11 @@ impl std::default::Default for Builder {
}
impl Builder {
/// Create a new default builder
pub fn new() -> Self {
Default::default()
}
/// Construct the [`Connection`] instance using the specified base URL.
pub async fn build<D>(self, dst: D) -> Result<Connection>
where

View File

@ -131,7 +131,7 @@ impl ColumnId {
/// "sequencer_number" in the `write_buffer` which currently means
/// "kafka partition".
///
/// https://github.com/influxdata/influxdb_iox/issues/4237
/// <https://github.com/influxdata/influxdb_iox/issues/4237>
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct SequencerId(i16);
@ -798,7 +798,7 @@ pub struct ProcessedTombstone {
pub parquet_file_id: ParquetFileId,
}
/// Request received from the query service for data the ingester has
/// Request from the querier service to the ingester service
#[derive(Debug, PartialEq)]
pub struct IngesterQueryRequest {
/// namespace to search

View File

@ -365,7 +365,8 @@ pub async fn command(config: Config) -> Result<()> {
)
.await?;
info!("starting querier");
let ingester_address = format!("http://{}", ingester_run_config.grpc_bind_address);
info!(%ingester_address, "starting querier");
let querier = create_querier_server_type(
&common_state,
metrics,
@ -373,6 +374,7 @@ pub async fn command(config: Config) -> Result<()> {
object_store,
time_provider,
exec,
ingester_address,
)
.await;

View File

@ -59,6 +59,18 @@ pub struct Config {
/// If not specified, defaults to the number of cores on the system
#[clap(long = "--num-query-threads", env = "INFLUXDB_IOX_NUM_QUERY_THREADS")]
pub num_query_threads: Option<usize>,
/// gRPC address for the querier to talk with the
/// ingester. For example "http://127.0.0.1:8083"
///
/// Note this is a workround for
/// <https://github.com/influxdata/influxdb_iox/issues/3996>
#[clap(
long = "--ingester-address",
env = "INFLUXDB_IOX_INGESTER_ADDRESS",
default_value = "http://127.0.0.1:8083"
)]
pub ingester_address: String,
}
pub async fn command(config: Config) -> Result<(), Error> {
@ -80,6 +92,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
let num_threads = config.num_query_threads.unwrap_or_else(num_cpus::get);
info!(%num_threads, "using specified number of threads per thread pool");
info!(ingester_address=%config.ingester_address, "using ingester address");
let exec = Arc::new(Executor::new(num_threads));
let server_type = create_querier_server_type(
@ -89,6 +102,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
object_store,
time_provider,
exec,
config.ingester_address,
)
.await;

View File

@ -1,8 +1,8 @@
use arrow_util::assert_batches_sorted_eq;
use http::StatusCode;
use test_helpers_end_to_end_ng::{
get_write_token, maybe_skip_integration, query_when_readable, rand_name, write_to_router,
ServerFixture, TestConfig,
get_write_token, maybe_skip_integration, rand_name, run_query, wait_for_persisted,
write_to_router, ServerFixture, TestConfig,
};
#[tokio::test]
@ -25,16 +25,16 @@ async fn smoke() {
let response = write_to_router(lp, org, bucket, all_in_one.server().router_http_base()).await;
// wait for data to be persisted to parquet
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let write_token = get_write_token(&response);
wait_for_persisted(write_token, all_in_one.server().ingester_grpc_connection()).await;
// run query in a loop until the data becomes available
// run query
let sql = format!("select * from {}", table_name);
let batches = query_when_readable(
let batches = run_query(
sql,
namespace,
write_token,
all_in_one.server().ingester_grpc_connection(),
all_in_one.server().querier_grpc_connection(),
)
.await;

View File

@ -10,7 +10,7 @@ async fn querier_namespace_client() {
let router2_config = TestConfig::new_router2(&database_url);
let ingester_config = TestConfig::new_ingester(&router2_config).with_fast_parquet_generation();
let querier_config = TestConfig::new_querier(&router2_config);
let querier_config = TestConfig::new_querier(&ingester_config);
// Set up the cluster ====================================
let cluster = MiniCluster::new()

View File

@ -1,9 +1,67 @@
use arrow_util::assert_batches_sorted_eq;
use http::StatusCode;
use test_helpers_end_to_end_ng::{
get_write_token, maybe_skip_integration, query_when_readable, MiniCluster, TestConfig,
get_write_token, maybe_skip_integration, run_query, wait_for_persisted, wait_for_readable,
MiniCluster, TestConfig,
};
#[tokio::test]
async fn basic_ingester() {
let database_url = maybe_skip_integration!();
let table_name = "the_table";
let router2_config = TestConfig::new_router2(&database_url);
//let ingester_config = TestConfig::new_ingester(&router2_config);
// TEMP: use fast parquet generation until we have completed
// https://github.com/influxdata/influxdb_iox/pull/4255
let ingester_config = TestConfig::new_ingester(&router2_config).with_fast_parquet_generation();
let querier_config = TestConfig::new_querier(&ingester_config);
// Set up the cluster ====================================
let cluster = MiniCluster::new()
.with_router2(router2_config)
.await
.with_ingester(ingester_config)
.await
.with_querier(querier_config)
.await;
// Write some data into the v2 HTTP API ==============
let lp = format!(
"{},tag1=A,tag2=B val=42i 123456\n\
{},tag1=A,tag2=C val=43i 123457",
table_name, table_name
);
let response = cluster.write_to_router(lp).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
// Wait for data to be readable
let write_token = get_write_token(&response);
wait_for_readable(&write_token, cluster.ingester().ingester_grpc_connection()).await;
// TODO remove this as part of https://github.com/influxdata/influxdb_iox/pull/4255
wait_for_persisted(write_token, cluster.ingester().ingester_grpc_connection()).await;
// run query
let sql = format!("select * from {}", table_name);
let batches = run_query(
sql,
cluster.namespace(),
cluster.querier().querier_grpc_connection(),
)
.await;
let expected = [
"+------+------+--------------------------------+-----+",
"| tag1 | tag2 | time | val |",
"+------+------+--------------------------------+-----+",
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"| A | C | 1970-01-01T00:00:00.000123457Z | 43 |",
"+------+------+--------------------------------+-----+",
];
assert_batches_sorted_eq!(&expected, &batches);
}
#[tokio::test]
async fn basic_on_parquet() {
let database_url = maybe_skip_integration!();
@ -11,8 +69,9 @@ async fn basic_on_parquet() {
let table_name = "the_table";
let router2_config = TestConfig::new_router2(&database_url);
// fast parquet
let ingester_config = TestConfig::new_ingester(&router2_config).with_fast_parquet_generation();
let querier_config = TestConfig::new_querier(&router2_config);
let querier_config = TestConfig::new_querier(&ingester_config);
// Set up the cluster ====================================
let cluster = MiniCluster::new()
@ -28,17 +87,15 @@ async fn basic_on_parquet() {
let response = cluster.write_to_router(lp).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
// assert that the response contains a write token
// Wait for data to be persisted to parquet
let write_token = get_write_token(&response);
assert!(!write_token.is_empty());
wait_for_persisted(write_token, cluster.ingester().ingester_grpc_connection()).await;
// run query in a loop until the data becomes available
// run query
let sql = format!("select * from {}", table_name);
let batches = query_when_readable(
let batches = run_query(
sql,
cluster.namespace(),
write_token,
cluster.ingester().ingester_grpc_connection(),
cluster.querier().querier_grpc_connection(),
)
.await;

View File

@ -8,7 +8,9 @@ use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
use metric::Registry;
use object_store::DynObjectStore;
use querier::{QuerierDatabase, QuerierHandler, QuerierHandlerImpl, QuerierServer};
use querier::{
create_ingester_connection, QuerierDatabase, QuerierHandler, QuerierHandlerImpl, QuerierServer,
};
use query::exec::Executor;
use time::TimeProvider;
use trace::TraceCollector;
@ -130,6 +132,7 @@ pub async fn create_querier_server_type(
object_store: Arc<DynObjectStore>,
time_provider: Arc<dyn TimeProvider>,
exec: Arc<Executor>,
ingester_address: String,
) -> Arc<dyn ServerType> {
let database = Arc::new(QuerierDatabase::new(
catalog,
@ -137,6 +140,7 @@ pub async fn create_querier_server_type(
object_store,
time_provider,
exec,
create_ingester_connection(ingester_address),
));
let querier_handler = Arc::new(QuerierHandlerImpl::new(Arc::clone(&database)));

View File

@ -55,6 +55,7 @@ impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl
mod tests {
use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService;
use iox_tests::util::TestCatalog;
use querier::create_ingester_connection_for_testing;
use super::*;
@ -68,6 +69,7 @@ mod tests {
catalog.object_store(),
catalog.time_provider(),
catalog.exec(),
create_ingester_connection_for_testing(),
));
let service = NamespaceServiceImpl::new(db);
@ -89,6 +91,7 @@ mod tests {
catalog.object_store(),
catalog.time_provider(),
catalog.exec(),
create_ingester_connection_for_testing(),
));
let service = NamespaceServiceImpl::new(db);

View File

@ -1,8 +1,8 @@
//! Database for the querier that contains all namespaces.
use crate::{
cache::CatalogCache, chunk::ParquetChunkAdapter, namespace::QuerierNamespace,
query_log::QueryLog,
cache::CatalogCache, chunk::ParquetChunkAdapter, ingester::IngesterConnection,
namespace::QuerierNamespace, query_log::QueryLog,
};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
@ -52,6 +52,9 @@ pub struct QuerierDatabase {
/// Executor for queries.
exec: Arc<Executor>,
/// Connection to ingester
ingester_connection: Arc<dyn IngesterConnection>,
/// Query log.
query_log: Arc<QueryLog>,
}
@ -73,6 +76,7 @@ impl QuerierDatabase {
object_store: Arc<DynObjectStore>,
time_provider: Arc<dyn TimeProvider>,
exec: Arc<Executor>,
ingester_connection: Arc<dyn IngesterConnection>,
) -> Self {
let catalog_cache = Arc::new(CatalogCache::new(
Arc::clone(&catalog),
@ -96,6 +100,7 @@ impl QuerierDatabase {
object_store,
time_provider,
exec,
ingester_connection,
query_log,
}
}
@ -114,6 +119,7 @@ impl QuerierDatabase {
schema,
name,
Arc::clone(&self.exec),
Arc::clone(&self.ingester_connection),
Arc::clone(&self.query_log),
)))
}
@ -134,6 +140,8 @@ impl QuerierDatabase {
mod tests {
use iox_tests::util::TestCatalog;
use crate::create_ingester_connection_for_testing;
use super::*;
#[tokio::test]
@ -146,6 +154,7 @@ mod tests {
catalog.object_store(),
catalog.time_provider(),
catalog.exec(),
create_ingester_connection_for_testing(),
);
catalog.create_namespace("ns1").await;

View File

@ -115,6 +115,8 @@ mod tests {
use query::exec::Executor;
use time::{MockProvider, Time};
use crate::create_ingester_connection_for_testing;
use super::*;
#[tokio::test]
@ -151,6 +153,7 @@ mod tests {
object_store,
time_provider,
exec,
create_ingester_connection_for_testing(),
));
let querier = QuerierHandlerImpl::new(database);

206
querier/src/ingester/mod.rs Normal file
View File

@ -0,0 +1,206 @@
use std::sync::Arc;
use async_trait::async_trait;
use client_util::connection;
use observability_deps::tracing::debug;
use predicate::Predicate;
use query::{QueryChunk, QueryChunkMeta};
use schema::Schema;
use snafu::{ResultExt, Snafu};
use crate::{QuerierFlightClient, QuerierFlightError};
use self::test_util::MockIngesterConnection;
mod test_util;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
#[snafu(display("Failed to select columns: {}", source))]
SelectColumns { source: schema::Error },
#[snafu(display("Failed to connect to ingester '{}': {}", ingester_address, source))]
Connecting {
ingester_address: String,
source: connection::Error,
},
#[snafu(display("Failed ingester handshake '{}': {}", ingester_address, source))]
Handshake {
ingester_address: String,
source: QuerierFlightError,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Create a new connection given an `ingester_address` such as
/// "http://127.0.0.1:8083"
pub fn create_ingester_connection(ingester_address: String) -> Arc<dyn IngesterConnection> {
Arc::new(IngesterConnectionImpl::new(ingester_address))
}
/// Create a new ingester suitable for testing
pub fn create_ingester_connection_for_testing() -> Arc<dyn IngesterConnection> {
Arc::new(MockIngesterConnection::new())
}
/// Handles communicating with the ingester(s) to retrieve
/// data that is not yet persisted
#[async_trait]
pub trait IngesterConnection: std::fmt::Debug + Send + Sync + 'static {
/// Returns all partitions ingester(s) know about for the specified table.
async fn partitions(
&self,
namespace_name: Arc<str>,
table_name: Arc<str>,
columns: Vec<String>,
predicate: &Predicate,
expected_schema: &Schema,
) -> Result<Vec<Arc<IngesterPartition>>>;
}
// IngesterConnection that communicates with an ingester.
#[allow(missing_copy_implementations)]
#[derive(Debug)]
pub(crate) struct IngesterConnectionImpl {
ingester_address: String,
}
impl IngesterConnectionImpl {
/// Create a new connection given an `ingester_address` such as
/// "http://127.0.0.1:8083"
pub fn new(ingester_address: String) -> Self {
Self { ingester_address }
}
}
#[async_trait]
impl IngesterConnection for IngesterConnectionImpl {
/// Retrieve chunks from the ingester for the particular table and
/// predicate
async fn partitions(
&self,
_namespace_name: Arc<str>,
table_name: Arc<str>,
_columns: Vec<String>,
_predicate: &Predicate,
_expected_schema: &Schema,
) -> Result<Vec<Arc<IngesterPartition>>> {
let ingester_address = &self.ingester_address;
debug!(%ingester_address, %table_name, "Connecting to ingester");
// TODO maybe cache this connection
let connection = connection::Builder::new()
.build(&self.ingester_address)
.await
.context(ConnectingSnafu { ingester_address })?;
let mut client = QuerierFlightClient::new(connection);
// make contact with the ingester
client
.handshake()
.await
.context(HandshakeSnafu { ingester_address })?;
// TODO Coming Soon: create the actual IngesterPartition and return them here.
Ok(vec![])
}
}
/// A wrapper around the unpersisted data in a partition returned by
/// the ingester that (will) implement the `QueryChunk` interface
///
/// Given the catalog heirarchy:
///
/// ```text
/// (Catalog) Sequencer -> (Catalog) Table --> (Catalog) Partition
/// ```
///
/// An IngesterPartition contains the unpersisted data for a catalog
/// partition from a sequencer. Thus, there can be more than one
/// IngesterPartition for each table the ingester knows about.
#[allow(missing_copy_implementations)]
#[derive(Debug, Clone)]
pub struct IngesterPartition {}
impl QueryChunkMeta for IngesterPartition {
fn summary(&self) -> Option<&data_types2::TableSummary> {
todo!()
}
fn schema(&self) -> Arc<Schema> {
todo!()
}
fn sort_key(&self) -> Option<&schema::sort::SortKey> {
todo!()
}
fn delete_predicates(&self) -> &[Arc<data_types2::DeletePredicate>] {
todo!()
}
}
impl QueryChunk for IngesterPartition {
fn id(&self) -> data_types2::ChunkId {
todo!()
}
fn addr(&self) -> data_types2::ChunkAddr {
todo!()
}
fn table_name(&self) -> &str {
todo!()
}
fn may_contain_pk_duplicates(&self) -> bool {
todo!()
}
fn apply_predicate_to_metadata(
&self,
_predicate: &Predicate,
) -> Result<predicate::PredicateMatch, query::QueryChunkError> {
todo!()
}
fn column_names(
&self,
_ctx: query::exec::IOxSessionContext,
_predicate: &Predicate,
_columns: schema::selection::Selection<'_>,
) -> Result<Option<query::exec::stringset::StringSet>, query::QueryChunkError> {
todo!()
}
fn column_values(
&self,
_ctx: query::exec::IOxSessionContext,
_column_name: &str,
_predicate: &Predicate,
) -> Result<Option<query::exec::stringset::StringSet>, query::QueryChunkError> {
todo!()
}
fn read_filter(
&self,
_ctx: query::exec::IOxSessionContext,
_predicate: &Predicate,
_selection: schema::selection::Selection<'_>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, query::QueryChunkError> {
todo!()
}
fn chunk_type(&self) -> &str {
todo!()
}
fn order(&self) -> data_types2::ChunkOrder {
todo!()
}
}

View File

@ -0,0 +1,29 @@
use std::sync::Arc;
use async_trait::async_trait;
use super::IngesterConnection;
/// IngesterConnection for testing
#[derive(Debug)]
pub(crate) struct MockIngesterConnection {}
impl MockIngesterConnection {
pub fn new() -> Self {
Self {}
}
}
#[async_trait]
impl IngesterConnection for MockIngesterConnection {
async fn partitions(
&self,
_namespace_name: Arc<str>,
_table_name: Arc<str>,
_columns: Vec<String>,
_predicate: &predicate::Predicate,
_expected_schema: &schema::Schema,
) -> super::Result<Vec<Arc<super::IngesterPartition>>> {
Ok(vec![])
}
}

View File

@ -17,6 +17,7 @@ mod database;
/// Flight client to the ingester to request in-memory data.
mod flight;
mod handler;
mod ingester;
mod namespace;
mod poison;
mod query_log;
@ -26,7 +27,10 @@ mod table;
mod tombstone;
pub use database::QuerierDatabase;
pub use flight::Client as QuerierFlightClient;
pub use flight::{Client as QuerierFlightClient, Error as QuerierFlightError};
pub use handler::{QuerierHandler, QuerierHandlerImpl};
pub use ingester::{
create_ingester_connection, create_ingester_connection_for_testing, IngesterConnection,
};
pub use namespace::QuerierNamespace;
pub use server::QuerierServer;

View File

@ -1,6 +1,7 @@
//! Namespace within the whole database.
use crate::{
cache::CatalogCache, chunk::ParquetChunkAdapter, query_log::QueryLog, table::QuerierTable,
cache::CatalogCache, chunk::ParquetChunkAdapter, ingester::IngesterConnection,
query_log::QueryLog, table::QuerierTable,
};
use backoff::BackoffConfig;
use data_types2::{NamespaceId, NamespaceSchema};
@ -37,6 +38,9 @@ pub struct QuerierNamespace {
/// Executor for queries.
exec: Arc<Executor>,
/// Connection to ingester
ingester_connection: Arc<dyn IngesterConnection>,
/// Query log.
query_log: Arc<QueryLog>,
}
@ -49,25 +53,28 @@ impl QuerierNamespace {
schema: Arc<NamespaceSchema>,
name: Arc<str>,
exec: Arc<Executor>,
ingester_connection: Arc<dyn IngesterConnection>,
query_log: Arc<QueryLog>,
) -> Self {
let tables: HashMap<_, _> = schema
.tables
.iter()
.map(|(name, table_schema)| {
let name = Arc::from(name.clone());
.map(|(table_name, table_schema)| {
let table_name = Arc::from(table_name.clone());
let id = table_schema.id;
let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema");
let table = Arc::new(QuerierTable::new(
Arc::clone(&name),
backoff_config.clone(),
id,
Arc::clone(&name),
Arc::clone(&table_name),
Arc::new(schema),
Arc::clone(&ingester_connection),
Arc::clone(&chunk_adapter),
));
(name, table)
(table_name, table)
})
.collect();
@ -78,11 +85,13 @@ impl QuerierNamespace {
name,
tables: Arc::new(tables),
exec,
ingester_connection,
query_log,
}
}
/// Create new namespace for given schema, for testing.
#[allow(clippy::too_many_arguments)]
pub fn new_testing(
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
@ -91,6 +100,7 @@ impl QuerierNamespace {
name: Arc<str>,
schema: Arc<NamespaceSchema>,
exec: Arc<Executor>,
ingester_connection: Arc<dyn IngesterConnection>,
) -> Self {
let catalog_cache = Arc::new(CatalogCache::new(catalog, Arc::clone(&time_provider)));
let chunk_adapter = Arc::new(ParquetChunkAdapter::new(
@ -107,6 +117,7 @@ impl QuerierNamespace {
schema,
name,
exec,
ingester_connection,
query_log,
)
}

View File

@ -1,3 +1,5 @@
//! This module contains implementations of [query] interfaces for
//! [QuerierNamespace].
use std::{any::Any, collections::HashMap, sync::Arc};
use async_trait::async_trait;
@ -6,6 +8,7 @@ use datafusion::{
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
datasource::TableProvider,
};
use observability_deps::tracing::trace;
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
use query::{
exec::{ExecutionContextProvider, ExecutorType, IOxSessionContext},
@ -45,11 +48,16 @@ impl QueryDatabase for QuerierNamespace {
Some(table) => table,
None => {
// table gone
trace!(%table_name, "No entry for table");
return Ok(vec![]);
}
};
let mut chunks = table.chunks().await;
let mut chunks = table
.chunks(predicate)
.await
// TODO QuerierNamespace trait needs to be updated to return Result
.expect("Success getting chunks");
// if there is a field restriction on the predicate, only
// chunks with that field should be returned. If the chunk has

View File

@ -3,6 +3,8 @@ use std::sync::Arc;
use iox_catalog::interface::get_schema_by_name;
use iox_tests::util::TestNamespace;
use crate::create_ingester_connection_for_testing;
use super::QuerierNamespace;
/// Create [`QuerierNamespace`] for testing.
@ -22,5 +24,6 @@ pub async fn querier_namespace(ns: &Arc<TestNamespace>) -> QuerierNamespace {
ns.namespace.name.clone().into(),
schema,
ns.catalog.exec(),
create_ingester_connection_for_testing(),
)
}

View File

@ -1,11 +1,17 @@
use std::{collections::HashMap, sync::Arc};
use crate::{
chunk::ParquetChunkAdapter,
ingester::{self, IngesterPartition},
tombstone::QuerierTombstone,
IngesterConnection,
};
use backoff::{Backoff, BackoffConfig};
use data_types2::TableId;
use predicate::Predicate;
use query::{provider::ChunkPruner, QueryChunk};
use schema::Schema;
use crate::{chunk::ParquetChunkAdapter, tombstone::QuerierTombstone};
use snafu::{ResultExt, Snafu};
use self::query_access::QuerierTableChunkPruner;
@ -14,9 +20,21 @@ mod query_access;
#[cfg(test)]
mod test_util;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
#[snafu(display("Error getting partitions from ingester: {}", source))]
GettingIngesterPartitions { source: ingester::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Table representation for the querier.
#[derive(Debug)]
pub struct QuerierTable {
/// Namespace the table is in
namespace_name: Arc<str>,
/// Backoff config for IO operations.
backoff_config: BackoffConfig,
@ -29,6 +47,9 @@ pub struct QuerierTable {
/// Table schema.
schema: Arc<Schema>,
/// Connection to ingester
ingester_connection: Arc<dyn IngesterConnection>,
/// Interface to create chunks for this table.
chunk_adapter: Arc<ParquetChunkAdapter>,
}
@ -36,17 +57,21 @@ pub struct QuerierTable {
impl QuerierTable {
/// Create new table.
pub fn new(
namespace_name: Arc<str>,
backoff_config: BackoffConfig,
id: TableId,
name: Arc<str>,
schema: Arc<Schema>,
ingester_connection: Arc<dyn IngesterConnection>,
chunk_adapter: Arc<ParquetChunkAdapter>,
) -> Self {
Self {
namespace_name,
backoff_config,
name,
id,
schema,
ingester_connection,
chunk_adapter,
}
}
@ -69,7 +94,7 @@ impl QuerierTable {
/// Query all chunks within this table.
///
/// This currently contains all parquet files linked to their unprocessed tombstones.
pub async fn chunks(&self) -> Vec<Arc<dyn QueryChunk>> {
pub async fn chunks(&self, predicate: &Predicate) -> Result<Vec<Arc<dyn QueryChunk>>> {
// get parquet files and tombstones in a single catalog transaction
// TODO: figure out some form of caching
let (parquet_files, tombstones) = Backoff::new(&self.backoff_config)
@ -93,6 +118,12 @@ impl QuerierTable {
.await
.expect("retry forever");
// TODO do in parallel with fetching parquet files
let ingester_partitions = self.ingester_partitions(predicate).await?;
// TODO: Validate that the cache contents is consistent with the
// parquet files we know about
// convert parquet files and tombstones to nicer objects
let mut chunks = Vec::with_capacity(parquet_files.len());
for parquet_file in parquet_files {
@ -169,24 +200,58 @@ impl QuerierTable {
chunks2.push(Arc::new(chunk) as Arc<dyn QueryChunk>);
}
chunks2
// Add ingester chunks to the overall chunk list. What about tombstones??
chunks2.extend(ingester_partitions.into_iter().map(|c| c as _));
Ok(chunks2)
}
/// Get a chunk pruner that can be used to prune chunks retrieved via [`chunks`](Self::chunks)
pub fn chunk_pruner(&self) -> Arc<dyn ChunkPruner> {
Arc::new(QuerierTableChunkPruner {})
}
async fn ingester_partitions(
&self,
predicate: &Predicate,
) -> Result<Vec<Arc<IngesterPartition>>> {
// For now, ask for *all* columns in the table from the ingester (need
// at least all pk (time, tag) columns for
// deduplication.
//
// As a future optimization, might be able to fetch only
// fields that are needed in query
let columns: Vec<String> = self
.schema
.iter()
.map(|(_, f)| f.name().to_string())
.collect();
// get any chunks from the ingster
self.ingester_connection
.partitions(
Arc::clone(&self.namespace_name),
Arc::clone(&self.name),
columns,
predicate,
self.schema.as_ref(),
)
.await
.context(GettingIngesterPartitionsSnafu)
}
}
#[cfg(test)]
mod tests {
use data_types2::{ChunkId, ColumnType};
use iox_tests::util::{now, TestCatalog};
use predicate::Predicate;
use crate::table::test_util::querier_table;
#[tokio::test]
async fn test_chunks() {
let pred = Predicate::default();
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
@ -216,7 +281,7 @@ mod tests {
let querier_table = querier_table(&catalog, &table1).await;
// no parquet files yet
assert!(querier_table.chunks().await.is_empty());
assert!(querier_table.chunks(&pred).await.unwrap().is_empty());
let file111 = partition11
.create_parquet_file_with_min_max(
@ -299,7 +364,7 @@ mod tests {
// this contains all files except for:
// - file111: marked for delete
// - file221: wrong table
let mut chunks = querier_table.chunks().await;
let mut chunks = querier_table.chunks(&pred).await.unwrap();
chunks.sort_by_key(|c| c.id());
assert_eq!(chunks.len(), 5);

View File

@ -8,7 +8,7 @@ use datafusion::{
logical_plan::Expr,
physical_plan::ExecutionPlan,
};
use predicate::Predicate;
use predicate::{Predicate, PredicateBuilder};
use query::{
provider::{ChunkPruner, ProviderBuilder},
pruning::{prune_chunks, PruningObserver},
@ -43,7 +43,17 @@ impl TableProvider for QuerierTable {
let mut builder = ProviderBuilder::new(self.name(), Arc::clone(self.schema()));
builder = builder.add_pruner(self.chunk_pruner());
for chunk in self.chunks().await {
let predicate = filters
.iter()
.fold(PredicateBuilder::new(), |b, expr| b.add_expr(expr.clone()))
.build();
let chunks = self
.chunks(&predicate)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
for chunk in chunks {
builder = builder.add_chunk(chunk);
}

View File

@ -5,7 +5,9 @@ use iox_catalog::interface::get_schema_by_name;
use iox_tests::util::{TestCatalog, TestTable};
use schema::Schema;
use crate::{cache::CatalogCache, chunk::ParquetChunkAdapter};
use crate::{
cache::CatalogCache, chunk::ParquetChunkAdapter, create_ingester_connection_for_testing,
};
use super::QuerierTable;
@ -28,11 +30,15 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
let schema = catalog_schema.tables.remove(&table.table.name).unwrap();
let schema = Arc::new(Schema::try_from(schema).unwrap());
let namespace_name = Arc::from(table.namespace.namespace.name.as_str());
QuerierTable::new(
namespace_name,
BackoffConfig::default(),
table.table.id,
table.table.name.clone().into(),
schema,
create_ingester_connection_for_testing(),
chunk_adapter,
)
}

View File

@ -10,7 +10,7 @@ use db::{
use iox_catalog::interface::get_schema_by_name;
use iox_tests::util::{TestCatalog, TestNamespace};
use itertools::Itertools;
use querier::QuerierNamespace;
use querier::{create_ingester_connection_for_testing, QuerierNamespace};
use query::QueryChunk;
use schema::merge::SchemaMerger;
use schema::selection::Selection;
@ -1256,5 +1256,6 @@ async fn make_querier_namespace(ns: Arc<TestNamespace>) -> Arc<QuerierNamespace>
ns.namespace.name.clone().into(),
schema,
ns.catalog.exec(),
create_ingester_connection_for_testing(),
))
}

View File

@ -58,8 +58,9 @@ pub async fn wait_for_token<F>(
F: Fn(&GetWriteInfoResponse) -> bool,
{
let write_token = write_token.into();
assert!(!write_token.is_empty());
println!("Waiting for Write Token {}", write_token);
println!(" write token: {}", write_token);
let retry_duration = Duration::from_secs(MAX_QUERY_RETRY_TIME_SEC);
let mut write_info_client = influxdb_iox_client::write_info::Client::new(ingester_connection);
@ -87,7 +88,7 @@ pub async fn wait_for_token<F>(
/// Waits for the specified write token to be readable
pub async fn wait_for_readable(write_token: impl Into<String>, ingester_connection: Connection) {
println!("Waiting for Write Token to be readable");
println!("Waiting for write token to be readable");
wait_for_token(write_token, ingester_connection, |res| {
if res.readable {
@ -102,7 +103,7 @@ pub async fn wait_for_readable(write_token: impl Into<String>, ingester_connecti
/// Waits for the write token to be persisted
pub async fn wait_for_persisted(write_token: impl Into<String>, ingester_connection: Connection) {
println!("Waiting for Write Token to be persisted");
println!("Waiting for write token to be persisted");
wait_for_token(write_token, ingester_connection, |res| {
if res.persisted {
@ -116,26 +117,14 @@ pub async fn wait_for_persisted(write_token: impl Into<String>, ingester_connect
}
/// Runs a query using the flight API on the specified connection
/// until responses are produced.
///
/// (Will) eventually wait until data from the specified write token
/// is readable, but currently waits for the data to be persisted (as
/// the querier doesn't know how to ask the ingester yet)
///
/// The retry loop is used to wait for writes to become visible
pub async fn query_when_readable(
pub async fn run_query(
sql: impl Into<String>,
namespace: impl Into<String>,
write_token: impl Into<String>,
ingester_connection: Connection,
querier_connection: Connection,
) -> Vec<RecordBatch> {
let namespace = namespace.into();
let sql = sql.into();
// TODO: this should be "wait_for_readable" once the querier can talk to ingester
wait_for_persisted(write_token, ingester_connection).await;
let mut client = influxdb_iox_client::flight::Client::new(querier_connection);
// This does nothing except test the client handshake implementation.

View File

@ -3,6 +3,8 @@ use std::{collections::HashMap, sync::Arc};
use http::{header::HeaderName, HeaderValue};
use tempfile::TempDir;
use crate::addrs::BindAddresses;
use super::ServerType;
/// Options for creating test servers (`influxdb_iox` processes)
@ -25,6 +27,9 @@ pub struct TestConfig {
/// Object store directory, if needed.
object_store_dir: Option<Arc<TempDir>>,
/// Which ports this server should use
addrs: Arc<BindAddresses>,
}
impl TestConfig {
@ -38,6 +43,7 @@ impl TestConfig {
dsn: dsn.into(),
write_buffer_dir: None,
object_store_dir: None,
addrs: Arc::new(BindAddresses::default()),
}
}
@ -57,10 +63,21 @@ impl TestConfig {
.with_default_ingester_options()
}
/// Create a minimal querier configuration, using the dsn and
/// object store from other
pub fn new_querier(other: &TestConfig) -> Self {
Self::new(ServerType::Querier, other.dsn()).with_existing_object_store(other)
/// Create a minimal querier configuration from the specified
/// ingester configuration, using the same dsn and object store,
/// and pointing at the specified ingester
pub fn new_querier(ingester_config: &TestConfig) -> Self {
assert_eq!(ingester_config.server_type(), ServerType::Ingester);
Self::new(ServerType::Querier, ingester_config.dsn())
.with_existing_object_store(ingester_config)
// Configure to talk with the ingester
.with_ingester_address(
ingester_config
.addrs()
.ingester_grpc_api()
.client_base()
.as_ref(),
)
}
/// Create a minimal all in one configuration
@ -91,6 +108,11 @@ impl TestConfig {
.with_env("INFLUXDB_IOX_WRITE_BUFFER_PARTITION_RANGE_END", "0")
}
/// Adds the ingester address
fn with_ingester_address(self, ingester_address: &str) -> Self {
self.with_env("INFLUXDB_IOX_INGESTER_ADDRESS", ingester_address)
}
/// add a name=value environment variable when starting the server
///
/// Should not be called directly, but instead all mapping to
@ -184,4 +206,10 @@ impl TestConfig {
pub fn client_headers(&self) -> &[(HeaderName, HeaderValue)] {
self.client_headers.as_ref()
}
/// Get a reference to the test config's addrs.
#[must_use]
pub fn addrs(&self) -> &BindAddresses {
&self.addrs
}
}

View File

@ -81,9 +81,6 @@ pub struct TestServer {
/// Handle to the server process being controlled
server_process: Arc<Mutex<Process>>,
/// Which ports this server should use
addrs: BindAddresses,
/// Configuration values for starting the test server
test_config: TestConfig,
@ -105,17 +102,13 @@ struct Process {
impl TestServer {
async fn new(test_config: TestConfig) -> Self {
let addrs = BindAddresses::default();
let ready = Mutex::new(ServerState::Started);
let server_process = Arc::new(Mutex::new(
Self::create_server_process(&addrs, &test_config).await,
));
let server_process = Arc::new(Mutex::new(Self::create_server_process(&test_config).await));
Self {
ready,
server_process,
addrs,
test_config,
router_grpc_connection: None,
ingester_grpc_connection: None,
@ -149,12 +142,12 @@ impl TestServer {
/// Return the http base URL for the router HTTP API
pub fn router_http_base(&self) -> Arc<str> {
self.addrs.router_http_api().client_base()
self.addrs().router_http_api().client_base()
}
/// Return the http base URL for the router gRPC API
pub fn router_grpc_base(&self) -> Arc<str> {
self.addrs.router_grpc_api().client_base()
self.addrs().router_grpc_api().client_base()
}
/// Create a connection channel to the specified gRPC endpoint
@ -183,7 +176,7 @@ impl TestServer {
self.router_grpc_connection =
match server_type {
ServerType::AllInOne | ServerType::Router2 => {
let client_base = self.addrs.router_grpc_api().client_base();
let client_base = self.addrs().router_grpc_api().client_base();
Some(self.grpc_channel(client_base.as_ref()).await.map_err(|e| {
format!("Can not connect to router at {}: {}", client_base, e)
})?)
@ -193,7 +186,7 @@ impl TestServer {
self.ingester_grpc_connection = match server_type {
ServerType::AllInOne | ServerType::Ingester => {
let client_base = self.addrs.ingester_grpc_api().client_base();
let client_base = self.addrs().ingester_grpc_api().client_base();
Some(self.grpc_channel(client_base.as_ref()).await.map_err(|e| {
format!("Can not connect to ingester at {}: {}", client_base, e)
})?)
@ -204,7 +197,7 @@ impl TestServer {
self.querier_grpc_connection =
match server_type {
ServerType::AllInOne | ServerType::Querier => {
let client_base = self.addrs.querier_grpc_api().client_base();
let client_base = self.addrs().querier_grpc_api().client_base();
Some(self.grpc_channel(client_base.as_ref()).await.map_err(|e| {
format!("Can not connect to querier at {}: {}", client_base, e)
})?)
@ -217,7 +210,7 @@ impl TestServer {
/// Returns the addresses to which the server has been bound
fn addrs(&self) -> &BindAddresses {
&self.addrs
self.test_config.addrs()
}
/// Restarts the tests server process, but does not reconnect clients
@ -226,11 +219,11 @@ impl TestServer {
let mut server_process = self.server_process.lock().await;
server_process.child.kill().unwrap();
server_process.child.wait().unwrap();
*server_process = Self::create_server_process(&self.addrs, &self.test_config).await;
*server_process = Self::create_server_process(&self.test_config).await;
*ready_guard = ServerState::Started;
}
async fn create_server_process(addrs: &BindAddresses, test_config: &TestConfig) -> Process {
async fn create_server_process(test_config: &TestConfig) -> Process {
// Create a new file each time and keep it around to aid debugging
let (log_file, log_path) = NamedTempFile::new()
.expect("opening log file")
@ -266,7 +259,7 @@ impl TestServer {
.env("LOG_FILTER", log_filter)
.env("INFLUXDB_IOX_CATALOG_DSN", &dsn)
// add http/grpc address information
.add_addr_env(server_type, addrs)
.add_addr_env(server_type, test_config.addrs())
.envs(test_config.env())
// redirect output to log file
.stdout(stdout_log_file)
@ -467,7 +460,7 @@ impl std::fmt::Display for TestServer {
f,
"TestServer {:?} ({})",
self.test_config.server_type(),
self.addrs
self.addrs()
)
}
}

View File

@ -85,7 +85,7 @@ impl WriteSummary {
)
}
/// Return a WriteSummary from the "token" (created with [to_token]), or error if not possible
/// Return a WriteSummary from the "token" (created with [Self::to_token]), or error if not possible
pub fn try_from_token(token: &str) -> Result<Self, String> {
let data = base64::decode(token)
.map_err(|e| format!("Invalid write token, invalid base64: {}", e))?;