Merge branch 'main' into dom/req-mode-parsing

pull/24376/head
Dom 2023-04-11 11:00:22 +01:00 committed by GitHub
commit 78f2b3a36a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 715 additions and 57 deletions

View File

@ -5,10 +5,13 @@ protocol = "sparse"
[target.x86_64-unknown-linux-gnu]
rustflags = [
# Faster linker.
"-C", "link-arg=-fuse-ld=lld",
# Fix `perf` as suggested by https://github.com/flamegraph-rs/flamegraph/blob/2d19a162df4066f37d58d5471634f0bd9f0f4a62/README.md?plain=1#L18
# Also see https://bugs.chromium.org/p/chromium/issues/detail?id=919499#c16
"-C", "link-arg=-Wl,--no-rosegment",
# Enable all features supported by CPUs more recent than haswell (2013)
"-C", "target-cpu=haswell"
"-C", "target-cpu=haswell",
# Enable framepointers because profiling and debugging is a nightmare w/o it and it is generally not considered a performance advantage on modern x86_64 CPUs.
"-C", "force-frame-pointers=yes",
]

37
Cargo.lock generated
View File

@ -1036,7 +1036,6 @@ dependencies = [
"rand",
"schema",
"sharder",
"snafu",
"test_helpers",
"tokio",
"tokio-util",
@ -1498,7 +1497,7 @@ dependencies = [
"pin-project-lite",
"rand",
"smallvec",
"sqlparser",
"sqlparser 0.32.0",
"tempfile",
"tokio",
"tokio-stream",
@ -1520,7 +1519,7 @@ dependencies = [
"num_cpus",
"object_store",
"parquet",
"sqlparser",
"sqlparser 0.32.0",
]
[[package]]
@ -1548,7 +1547,7 @@ dependencies = [
"ahash 0.8.3",
"arrow",
"datafusion-common",
"sqlparser",
"sqlparser 0.32.0",
]
[[package]]
@ -1633,7 +1632,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"log",
"sqlparser",
"sqlparser 0.32.0",
]
[[package]]
@ -2489,7 +2488,6 @@ version = "0.1.0"
dependencies = [
"assert_matches",
"chrono",
"clap_blocks",
"client_util",
"data_types",
"futures",
@ -2498,7 +2496,6 @@ dependencies = [
"iox_catalog",
"metric",
"object_store",
"observability_deps",
"parking_lot 0.12.1",
"schema",
"serde",
@ -2507,7 +2504,6 @@ dependencies = [
"tokio",
"tokio-stream",
"tonic",
"trogging",
"workspace-hack",
]
@ -2594,6 +2590,7 @@ dependencies = [
"arrow-flight",
"arrow_util",
"assert_cmd",
"assert_matches",
"async-trait",
"authz",
"backtrace",
@ -2726,7 +2723,7 @@ version = "0.1.0"
dependencies = [
"generated_types",
"snafu",
"sqlparser",
"sqlparser 0.33.0",
"workspace-hack",
]
@ -3064,10 +3061,8 @@ version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"chrono",
"clap 4.2.1",
"clap_blocks",
"data_types",
"flate2",
"futures",
"generated_types",
@ -3203,12 +3198,9 @@ dependencies = [
"ioxd_common",
"metric",
"object_store",
"parquet_file",
"querier",
"router",
"service_grpc_flight",
"service_grpc_influxrpc",
"sharder",
"thiserror",
"tokio",
"tokio-util",
@ -3267,14 +3259,14 @@ checksum = "12b6ee2129af8d4fb011108c73d99a1b83a85977f23b82460c0ae2e25bb4b57f"
[[package]]
name = "is-terminal"
version = "0.4.6"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "256017f749ab3117e93acb91063009e1f1bb56d03965b14c2c8df4eb02c524d8"
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
dependencies = [
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix 0.37.11",
"windows-sys 0.45.0",
"windows-sys 0.48.0",
]
[[package]]
@ -4336,7 +4328,7 @@ dependencies = [
"query_functions",
"schema",
"snafu",
"sqlparser",
"sqlparser 0.33.0",
"test_helpers",
"workspace-hack",
]
@ -5432,6 +5424,15 @@ dependencies = [
"sqlparser_derive",
]
[[package]]
name = "sqlparser"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "355dc4d4b6207ca8a3434fc587db0a8016130a574dbcdbfb93d7f7b5bc5b211a"
dependencies = [
"log",
]
[[package]]
name = "sqlparser_derive"
version = "0.1.1"

View File

@ -24,7 +24,6 @@ predicate = { path = "../predicate" }
rand = "0.8.3"
schema = { path = "../schema" }
sharder = { path = "../sharder" }
snafu = "0.7"
tokio = { version = "1", features = ["macros", "rt", "sync"] }
tokio-util = { version = "0.7.7" }
tracker = { path = "../tracker" }

View File

@ -355,6 +355,9 @@ async fn execute_plan(
job_semaphore: Arc<InstrumentedAsyncSemaphore>,
) -> Result<Vec<ParquetFileParams>, DynError> {
let create = {
// Adjust concurrency based on the column count in the partition.
let permits = compute_permits(job_semaphore.total_permits(), partition_info.column_count());
// draw semaphore BEFORE creating the DataFusion plan and drop it directly AFTER finishing the
// DataFusion computation (but BEFORE doing any additional external IO).
//
@ -362,12 +365,12 @@ async fn execute_plan(
// DataFusion ever starts to pre-allocate buffers during the physical planning. To the best of our
// knowledge, this is currently (2023-01-25) not the case but if this ever changes, then we are prepared.
let permit = job_semaphore
.acquire(None)
.acquire_many(permits, None)
.await
.expect("semaphore not closed");
info!(
partition_id = partition_info.partition_id.get(),
"job semaphore acquired",
permits, "job semaphore acquired",
);
let plan = components
@ -473,3 +476,40 @@ async fn update_catalog(
(created_file_params, upgraded_files)
}
// SINGLE_THREADED_COLUMN_COUNT is the number of columns requiring a partition be comapacted single threaded.
const SINGLE_THREADED_COLUMN_COUNT: usize = 80;
// Determine how many permits must be acquired from the concurrency limiter semaphore
// based on the column count of this job and the total permits (concurrency).
fn compute_permits(
total_permits: usize, // total number of permits (max concurrency)
columns: usize, // column count for this job
) -> u32 {
if columns >= SINGLE_THREADED_COLUMN_COUNT {
// this job requires all permits, forcing it to run by itself.
return total_permits as u32;
}
// compute the share (linearly scaled) of total permits this job requires
let share = columns as f64 / SINGLE_THREADED_COLUMN_COUNT as f64;
let permits = total_permits as f64 * share;
if permits < 1.0 {
return 1;
}
permits as u32
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn concurrency_limits() {
assert_eq!(compute_permits(10, 10000), 10); // huge column count takes exactly all permits (not more than the total)
assert_eq!(compute_permits(10, 1), 1); // 1 column still takes 1 permit
assert_eq!(compute_permits(10, SINGLE_THREADED_COLUMN_COUNT / 2), 5); // 1/2 the max column count takes half the total permits
}
}

View File

@ -6,7 +6,6 @@ edition.workspace = true
license.workspace = true
[dependencies]
clap_blocks = { path = "../clap_blocks" }
chrono = { version = "0.4", default-features = false }
data_types = { path = "../data_types" }
futures = "0.3"
@ -14,14 +13,12 @@ generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client" }
iox_catalog = { path = "../iox_catalog" }
object_store = { version = "0.5.6", features = ["aws"] }
observability_deps = { path = "../observability_deps" }
schema = { path = "../schema" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.95"
thiserror = "1.0.40"
tokio = { version = "1.27" }
tonic = { version = "0.8" }
trogging = { path = "../trogging", features = ["clap"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]

View File

@ -82,6 +82,7 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
# In alphabetical order
arrow_util = { path = "../arrow_util" }
assert_cmd = "2.0.10"
assert_matches = "1.5"
async-trait = "0.1"
predicate = { path = "../predicate" }
predicates = "3.0.2"

View File

@ -24,7 +24,7 @@ async fn smoke() {
// Write some data into the v2 HTTP API ==============
let lp = format!("{table_name},tag1=A,tag2=B val=42i 123456");
let response = write_to_router(lp, org, bucket, all_in_one.router_http_base()).await;
let response = write_to_router(lp, org, bucket, all_in_one.router_http_base(), None).await;
assert_eq!(
response.status(),
StatusCode::NO_CONTENT,
@ -33,7 +33,7 @@ async fn smoke() {
// run query
let sql = format!("select * from {table_name}");
let batches = run_sql(sql, namespace, all_in_one.querier_grpc_connection()).await;
let batches = run_sql(sql, namespace, all_in_one.querier_grpc_connection(), None).await;
let expected = [
"+------+------+--------------------------------+-----+",
@ -69,7 +69,7 @@ async fn ephemeral_mode() {
.to_string();
let lp = format!("{table_name},tag1=A,tag2=B val=42i {now}");
let response = write_to_router(lp, org, bucket, all_in_one.router_http_base()).await;
let response = write_to_router(lp, org, bucket, all_in_one.router_http_base(), None).await;
assert_eq!(
response.status(),
StatusCode::NO_CONTENT,
@ -79,7 +79,7 @@ async fn ephemeral_mode() {
// run query
// do not select time becasue it changes every time
let sql = format!("select tag1, tag2, val from {table_name}");
let batches = run_sql(sql, namespace, all_in_one.querier_grpc_connection()).await;
let batches = run_sql(sql, namespace, all_in_one.querier_grpc_connection(), None).await;
let expected = [
"+------+------+-----+",

View File

@ -7,6 +7,7 @@ use arrow::{
};
use arrow_flight::{
decode::FlightRecordBatchStream,
error::FlightError,
sql::{
Any, CommandGetCatalogs, CommandGetDbSchemas, CommandGetSqlInfo, CommandGetTableTypes,
CommandGetTables, CommandStatementQuery, ProstMessageExt, SqlInfo,
@ -15,13 +16,16 @@ use arrow_flight::{
};
use arrow_util::test_util::batches_to_sorted_lines;
use assert_cmd::Command;
use assert_matches::assert_matches;
use bytes::Bytes;
use datafusion::common::assert_contains;
use futures::{FutureExt, TryStreamExt};
use influxdb_iox_client::flightsql::FlightSqlClient;
use predicates::prelude::*;
use prost::Message;
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState};
use test_helpers_end_to_end::{
maybe_skip_integration, Authorizer, MiniCluster, Step, StepTest, StepTestState,
};
#[tokio::test]
async fn flightsql_adhoc_query() {
@ -1213,6 +1217,100 @@ fn strip_metadata(schema: &Schema) -> SchemaRef {
Arc::new(Schema::new(stripped_fields))
}
#[tokio::test]
async fn authz() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the authorizer =================================
let mut authz = Authorizer::create().await;
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_non_shared2_with_authz(database_url, authz.addr()).await;
let write_token = authz.create_token_for(cluster.namespace(), &["ACTION_WRITE"]);
let read_token = authz.create_token_for(cluster.namespace(), &["ACTION_READ_SCHEMA"]);
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocolWithAuthorization {
line_protocol: format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
),
authorization: format!("Token {}", write_token.clone()),
},
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let mut client = flightsql_client(state.cluster());
let err = client.get_table_types().await.unwrap_err();
assert_matches!(err, FlightError::Tonic(status) => {
assert_eq!(tonic::Code::Unauthenticated, status.code());
assert_eq!("Unauthenticated", status.message());
});
}
.boxed()
})),
Step::Custom(Box::new(move |state: &mut StepTestState| {
let token = write_token.clone();
async move {
let mut client = flightsql_client(state.cluster());
client
.add_header(
"authorization",
format!("Bearer {}", token.clone()).as_str(),
)
.unwrap();
let err = client.get_table_types().await.unwrap_err();
assert_matches!(err, FlightError::Tonic(status) => {
assert_eq!(tonic::Code::PermissionDenied, status.code());
assert_eq!("Permission denied", status.message());
});
}
.boxed()
})),
Step::Custom(Box::new(move |state: &mut StepTestState| {
let token = read_token.clone();
async move {
let mut client = flightsql_client(state.cluster());
client
.add_header(
"authorization",
format!("Bearer {}", token.clone()).as_str(),
)
.unwrap();
let stream = client.get_table_types().await.unwrap();
let batches = collect_stream(stream).await;
insta::assert_yaml_snapshot!(
batches_to_sorted_lines(&batches),
@r###"
---
- +------------+
- "| table_type |"
- +------------+
- "| BASE TABLE |"
- "| VIEW |"
- +------------+
"###
);
}
.boxed()
})),
],
)
.run()
.await;
authz.close().await;
}
/// Return a [`FlightSqlClient`] configured for use
fn flightsql_client(cluster: &MiniCluster) -> FlightSqlClient {
let connection = cluster.querier().querier_grpc_connection();

View File

@ -1,4 +1,8 @@
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest};
use futures::FutureExt;
use test_helpers_end_to_end::{
check_flight_error, maybe_skip_integration, try_run_influxql, Authorizer, MiniCluster, Step,
StepTest, StepTestState,
};
#[tokio::test]
async fn influxql_returns_error() {
@ -63,3 +67,74 @@ async fn influxql_select_returns_results() {
.run()
.await
}
#[tokio::test]
async fn authz() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the authorizer =================================
let mut authz = Authorizer::create().await;
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_non_shared2_with_authz(database_url, authz.addr()).await;
let write_token = authz.create_token_for(cluster.namespace(), &["ACTION_WRITE"]);
let read_token = authz.create_token_for(cluster.namespace(), &["ACTION_READ"]);
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocolWithAuthorization {
line_protocol: format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
),
authorization: format!("Token {}", write_token.clone()),
},
Step::InfluxQLExpectingError {
query: format!("select tag1, val from {table_name}"),
expected_error_code: tonic::Code::Unauthenticated,
expected_message: "Unauthenticated".to_string(),
},
Step::Custom(Box::new(move |state: &mut StepTestState| {
let token = write_token.clone();
async move {
let cluster = state.cluster();
let err = try_run_influxql(
format!("select tag1, val from {}", table_name),
cluster.namespace(),
cluster.querier().querier_grpc_connection(),
Some(format!("Bearer {}", token.clone()).as_str()),
)
.await
.unwrap_err();
check_flight_error(
err,
tonic::Code::PermissionDenied,
Some("Permission denied"),
);
}
.boxed()
})),
Step::InfluxQLQueryWithAuthorization {
query: format!("select tag1, val from {table_name}"),
authorization: format!("Bearer {read_token}"),
expected: vec![
"+------------------+--------------------------------+------+-----+",
"| iox::measurement | time | tag1 | val |",
"+------------------+--------------------------------+------+-----+",
"| the_table | 1970-01-01T00:00:00.000123456Z | A | 42 |",
"| the_table | 1970-01-01T00:00:00.000123457Z | A | 43 |",
"+------------------+--------------------------------+------+-----+",
],
},
],
)
.run()
.await;
authz.close().await;
}

View File

@ -102,7 +102,7 @@ async fn ingester_flight_api() {
// Write some data into the v2 HTTP API to set up the namespace and schema ==============
let lp = format!("{table_name},tag1=A,tag2=B val=42i 123456");
let response = cluster.write_to_router(lp).await;
let response = cluster.write_to_router(lp, None).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
// Write some data directly into the ingester through its gRPC API
@ -208,7 +208,7 @@ async fn ingester_flight_api_table_not_found() {
// Write some data into the v2 HTTP API ==============
let lp = String::from("my_table,tag1=A,tag2=B val=42i 123456");
let response = cluster.write_to_router(lp).await;
let response = cluster.write_to_router(lp, None).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let mut querier_flight =

View File

@ -27,7 +27,7 @@ async fn querier_namespace_client() {
// Write some data into the v2 HTTP API ==============
let lp = format!("{table_name},tag1=A,tag2=B val=42i 123456");
let response = cluster.write_to_router(lp).await;
let response = cluster.write_to_router(lp, None).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let mut client =

View File

@ -13,7 +13,7 @@ use generated_types::{
use influxdb_iox_client::flight::IOxRecordBatchStream;
use test_helpers_end_to_end::{
check_flight_error, check_tonic_status, maybe_skip_integration, run_sql, try_run_sql,
GrpcRequestBuilder, MiniCluster, Step, StepTest, StepTestState, TestConfig,
Authorizer, GrpcRequestBuilder, MiniCluster, Step, StepTest, StepTestState, TestConfig,
};
#[tokio::test]
@ -566,6 +566,7 @@ async fn table_or_namespace_not_found() {
"select * from this_table_does_exist;",
format!("{}_suffix", state.cluster().namespace()),
state.cluster().querier().querier_grpc_connection(),
None,
)
.await
.unwrap_err();
@ -654,6 +655,7 @@ async fn oom_protection() {
&sql,
state.cluster().namespace(),
state.cluster().querier().querier_grpc_connection(),
None,
)
.await
.unwrap_err();
@ -664,6 +666,7 @@ async fn oom_protection() {
format!("EXPLAIN {sql}"),
state.cluster().namespace(),
state.cluster().querier().querier_grpc_connection(),
None,
)
.await;
}
@ -695,6 +698,76 @@ async fn oom_protection() {
.await
}
#[tokio::test]
async fn authz() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the authorizer =================================
let mut authz = Authorizer::create().await;
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_non_shared2_with_authz(database_url, authz.addr()).await;
let write_token = authz.create_token_for(cluster.namespace(), &["ACTION_WRITE"]);
let read_token = authz.create_token_for(cluster.namespace(), &["ACTION_READ"]);
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocolWithAuthorization {
line_protocol: format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
),
authorization: format!("Token {}", write_token.clone()),
},
Step::QueryExpectingError {
sql: "SELECT 1".to_string(),
expected_error_code: tonic::Code::Unauthenticated,
expected_message: "Unauthenticated".to_string(),
},
Step::Custom(Box::new(move |state: &mut StepTestState| {
let token = write_token.clone();
async move {
let cluster = state.cluster();
let err = try_run_sql(
"SELECT 1",
cluster.namespace(),
cluster.querier().querier_grpc_connection(),
Some(format!("Bearer {}", token.clone()).as_str()),
)
.await
.unwrap_err();
check_flight_error(
err,
tonic::Code::PermissionDenied,
Some("Permission denied"),
);
}
.boxed()
})),
Step::QueryWithAuthorization {
sql: "SELECT 1".to_string(),
authorization: format!("Bearer {read_token}").to_string(),
expected: vec![
"+----------+",
"| Int64(1) |",
"+----------+",
"| 1 |",
"+----------+",
],
},
],
)
.run()
.await;
authz.close().await;
}
/// Some clients, such as the golang ones, cannot decode dictionary encoded Flight data. This
/// function asserts that all schemas received in the stream are unpacked.
pub(crate) async fn verify_schema(stream: IOxRecordBatchStream) {

View File

@ -2,7 +2,7 @@ use bytes::Buf;
use futures::FutureExt;
use http::{HeaderValue, StatusCode};
use test_helpers_end_to_end::{
maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState, TestConfig,
maybe_skip_integration, Authorizer, MiniCluster, Step, StepTest, StepTestState, TestConfig,
};
use tonic::codegen::Body;
@ -25,7 +25,7 @@ pub async fn test_json_errors() {
&mut cluster,
vec![Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let response = state.cluster().write_to_router("bananas").await;
let response = state.cluster().write_to_router("bananas", None).await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
assert_eq!(
response
@ -62,3 +62,61 @@ where
bufs
}
#[tokio::test]
async fn authz() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the authorizer =================================
let mut authz = Authorizer::create().await;
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_non_shared2_with_authz(database_url, authz.addr()).await;
let write_token = authz.create_token_for(cluster.namespace(), &["ACTION_WRITE"]);
let read_token = authz.create_token_for(cluster.namespace(), &["ACTION_READ"]);
let line_protocol = format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
);
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocolExpectingError {
line_protocol: line_protocol.clone(),
expected_error_code: http::StatusCode::UNAUTHORIZED,
},
Step::Custom(Box::new(move |state: &mut StepTestState| {
let token = read_token.clone();
async move {
let cluster = state.cluster();
let authorization = format!("Token {}", token.clone());
let response = cluster
.write_to_router(
format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
),
Some(authorization.as_str()),
)
.await;
assert_eq!(response.status(), http::StatusCode::FORBIDDEN);
}
.boxed()
})),
Step::WriteLineProtocolWithAuthorization {
line_protocol: line_protocol.clone(),
authorization: format!("Token {write_token}"),
},
],
)
.run()
.await;
authz.close().await;
}

View File

@ -6,6 +6,7 @@ use ::generated_types::influxdata::iox::querier::v1::{read_info::QueryType, Read
use futures_util::{Stream, StreamExt};
use prost::Message;
use thiserror::Error;
use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
use arrow::{
ipc::{self},
@ -153,10 +154,10 @@ impl Client {
// Copy any headers from IOx Connection
for (name, value) in headers.iter() {
let name = tonic::metadata::MetadataKey::<_>::from_bytes(name.as_str().as_bytes())
let name = MetadataKey::<_>::from_bytes(name.as_str().as_bytes())
.expect("Invalid metadata name");
let value: tonic::metadata::MetadataValue<_> =
let value: MetadataValue<_> =
value.as_bytes().try_into().expect("Invalid metadata value");
inner.metadata_mut().insert(name, value);
}
@ -169,6 +170,24 @@ impl Client {
self.inner
}
/// Return a reference to gRPC metadata included with each request
pub fn metadata(&self) -> &MetadataMap {
self.inner.metadata()
}
/// Return a reference to gRPC metadata included with each request
///
/// This can be used, for example, to include authorization or
/// other headers with each request
pub fn metadata_mut(&mut self) -> &mut MetadataMap {
self.inner.metadata_mut()
}
/// Add the specified header with value to all subsequent requests
pub fn add_header(&mut self, key: &str, value: &str) -> Result<(), Error> {
Ok(self.inner.add_header(key, value)?)
}
/// Query the given namespace with the given SQL query, returning
/// a struct that can stream Arrow [`RecordBatch`] results.
pub async fn sql(

View File

@ -6,7 +6,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
sqlparser = "0.32.0"
sqlparser = "0.33.0"
snafu = "0.7.4"
generated_types = { path = "../generated_types" }

View File

@ -10,7 +10,6 @@ license.workspace = true
[dependencies]
# Workspace dependencies, in alphabetical order
clap_blocks = { path = "../clap_blocks" }
data_types = { path = "../data_types" }
generated_types = { path = "../generated_types" }
heappy = { git = "https://github.com/mkmik/heappy", rev = "1d6ac77a4026fffce8680a7b31a9f6e9859b5e73", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true }
metric = { path = "../metric" }
@ -30,7 +29,6 @@ trace_http = { path = "../trace_http" }
async-trait = "0.1"
bytes = "1.4"
clap = { version = "4", features = ["derive", "env"] }
chrono = { version = "0.4", default-features = false }
flate2 = "1.0"
futures = "0.3"
hashbrown = { workspace = true }

View File

@ -18,10 +18,8 @@ metric = { path = "../metric" }
object_store = "0.5.6"
querier = { path = "../querier" }
iox_query = { path = "../iox_query" }
router = { path = "../router" }
service_grpc_flight = { path = "../service_grpc_flight" }
service_grpc_influxrpc = { path = "../service_grpc_influxrpc" }
sharder = { path = "../sharder" }
iox_time = { path = "../iox_time" }
trace = { path = "../trace" }
@ -33,7 +31,6 @@ thiserror = "1.0.40"
tokio = { version = "1.27", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tonic = "0.8"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
parquet_file = { version = "0.1.0", path = "../parquet_file" }
tokio-util = "0.7.7"
[dev-dependencies]

View File

@ -16,7 +16,7 @@ observability_deps = { path = "../observability_deps" }
query_functions = { path = "../query_functions"}
schema = { path = "../schema" }
snafu = "0.7"
sqlparser = "0.32.0"
sqlparser = "0.33.0"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]

View File

@ -0,0 +1,151 @@
use futures::FutureExt;
use generated_types::influxdata::iox::authz::v1::{
iox_authorizer_service_server::{IoxAuthorizerService, IoxAuthorizerServiceServer},
permission::PermissionOneOf,
resource_action_permission::{Action, ResourceType},
AuthorizeRequest, AuthorizeResponse, Permission, ResourceActionPermission,
};
use observability_deps::tracing::{error, info};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, Mutex},
};
use tokio::{
net::TcpListener,
sync::oneshot,
task::{spawn, JoinHandle},
};
use tonic::transport::{server::TcpIncoming, Server};
#[derive(Debug)]
pub struct Authorizer {
tokens: Arc<Mutex<HashMap<Vec<u8>, Vec<Permission>>>>,
addr: SocketAddr,
stop: Option<oneshot::Sender<()>>,
handle: JoinHandle<Result<(), tonic::transport::Error>>,
}
impl Authorizer {
pub async fn create() -> Self {
let listener = TcpListener::bind("localhost:0").await.unwrap();
let addr = listener.local_addr().unwrap();
info!("****************");
info!(local_addr = %addr, "Authorizer started");
info!("****************");
let incoming = TcpIncoming::from_listener(listener, false, None).unwrap();
let (stop, stop_rx) = oneshot::channel();
let tokens = Arc::new(Mutex::new(HashMap::new()));
let router = Server::builder().add_service(IoxAuthorizerServiceServer::new(
AuthorizerService::new(Arc::clone(&tokens)),
));
let handle = spawn(router.serve_with_incoming_shutdown(incoming, stop_rx.map(drop)));
Self {
tokens,
addr,
stop: Some(stop),
handle,
}
}
pub async fn close(mut self) {
info!("****************");
info!(local_addr = %self.addr, "Stopping authorizer");
info!("****************");
if let Some(stop) = self.stop.take() {
stop.send(()).expect("Error stopping authorizer");
};
match self.handle.await {
Ok(Ok(())) => {}
Ok(Err(e)) => error!(error = %e, "Error stopping authorizer"),
Err(e) => error!(error = %e, "Error stopping authorizer"),
}
}
/// Create a new token with the requested permissions.
///
/// Actions are specified by name, the currently supported actions are:
/// - `"ACTION_READ_SCHEMA"`
/// - `"ACTION_READ"`
/// - `"ACTION_WRITE"`
/// - `"ACTION_CREATE"`
/// - `"ACTION_DELETE"`
pub fn create_token_for(&mut self, namespace_name: &str, actions: &[&str]) -> String {
let perms = actions
.iter()
.filter_map(|a| Action::from_str_name(a))
.map(|a| Permission {
permission_one_of: Some(PermissionOneOf::ResourceAction(
ResourceActionPermission {
resource_type: ResourceType::Namespace.into(),
resource_id: Some(namespace_name.to_string()),
action: a.into(),
},
)),
})
.collect();
let token = format!(
"{namespace_name}_{}",
thread_rng()
.sample_iter(&Alphanumeric)
.take(5)
.map(char::from)
.collect::<String>()
);
self.tokens
.lock()
.unwrap()
.insert(token.clone().into_bytes(), perms);
token
}
/// Get the address the server is listening at.
pub fn addr(&self) -> String {
format!("http://{}", self.addr)
}
}
/// Test implementation of the IoxAuthorizationService.
#[derive(Debug)]
struct AuthorizerService {
tokens: Arc<Mutex<HashMap<Vec<u8>, Vec<Permission>>>>,
}
impl AuthorizerService {
/// Create new TestAuthorizationService.
fn new(tokens: Arc<Mutex<HashMap<Vec<u8>, Vec<Permission>>>>) -> Self {
Self { tokens }
}
}
#[tonic::async_trait]
impl IoxAuthorizerService for AuthorizerService {
async fn authorize(
&self,
request: tonic::Request<AuthorizeRequest>,
) -> Result<tonic::Response<AuthorizeResponse>, tonic::Status> {
let request = request.into_inner();
let perms = self
.tokens
.lock()
.map_err(|e| tonic::Status::internal(e.to_string()))?
.get(&request.token)
.cloned()
.unwrap_or_default();
Ok(tonic::Response::new(AuthorizeResponse {
valid: true,
subject: None,
permissions: request
.permissions
.iter()
.filter(|p| perms.contains(p))
.cloned()
.collect(),
}))
}
}

View File

@ -20,6 +20,7 @@ pub async fn write_to_router(
org: impl AsRef<str>,
bucket: impl AsRef<str>,
write_base: impl AsRef<str>,
authorization: Option<&str>,
) -> Response<Body> {
let client = Client::new();
let url = format!(
@ -29,9 +30,11 @@ pub async fn write_to_router(
bucket.as_ref()
);
let request = Request::builder()
.uri(url)
.method("POST")
let mut builder = Request::builder().uri(url).method("POST");
if let Some(authorization) = authorization {
builder = builder.header(hyper::header::AUTHORIZATION, authorization);
};
let request = builder
.body(Body::from(line_protocol.into()))
.expect("failed to construct HTTP request");
@ -80,8 +83,12 @@ pub async fn try_run_sql(
sql_query: impl Into<String>,
namespace: impl Into<String>,
querier_connection: Connection,
authorization: Option<&str>,
) -> Result<Vec<RecordBatch>, influxdb_iox_client::flight::Error> {
let mut client = influxdb_iox_client::flight::Client::new(querier_connection);
if let Some(authorization) = authorization {
client.add_header("authorization", authorization).unwrap();
}
// Test the client handshake implementation
// Normally this would be done one per connection, not per query
@ -99,8 +106,12 @@ pub async fn try_run_influxql(
influxql_query: impl Into<String>,
namespace: impl Into<String>,
querier_connection: Connection,
authorization: Option<&str>,
) -> Result<Vec<RecordBatch>, influxdb_iox_client::flight::Error> {
let mut client = influxdb_iox_client::flight::Client::new(querier_connection);
if let Some(authorization) = authorization {
client.add_header("authorization", authorization).unwrap();
}
// Test the client handshake implementation
// Normally this would be done one per connection, not per query
@ -120,8 +131,9 @@ pub async fn run_sql(
sql: impl Into<String>,
namespace: impl Into<String>,
querier_connection: Connection,
authorization: Option<&str>,
) -> Vec<RecordBatch> {
try_run_sql(sql, namespace, querier_connection)
try_run_sql(sql, namespace, querier_connection, authorization)
.await
.expect("Error executing sql query")
}
@ -133,8 +145,14 @@ pub async fn run_influxql(
influxql: impl Into<String> + Clone + Display,
namespace: impl Into<String>,
querier_connection: Connection,
authorization: Option<&str>,
) -> Vec<RecordBatch> {
try_run_influxql(influxql.clone(), namespace, querier_connection)
.await
.unwrap_or_else(|_| panic!("Error executing InfluxQL query: {influxql}"))
try_run_influxql(
influxql.clone(),
namespace,
querier_connection,
authorization,
)
.await
.unwrap_or_else(|_| panic!("Error executing InfluxQL query: {influxql}"))
}

View File

@ -180,6 +180,11 @@ impl TestConfig {
)
}
/// Configure the authorization server.
pub fn with_authz_addr(self, addr: impl Into<String>) -> Self {
self.with_env("INFLUXDB_IOX_AUTHZ_ADDR", addr)
}
// Get the catalog DSN URL if set.
pub fn dsn(&self) -> &Option<String> {
&self.dsn

View File

@ -4,6 +4,7 @@ use rand::{
};
mod addrs;
mod authz;
mod client;
mod config;
mod data_generator;
@ -18,6 +19,7 @@ mod steps;
mod udp_listener;
pub use addrs::BindAddresses;
pub use authz::Authorizer;
pub use client::*;
pub use config::TestConfig;
pub use data_generator::DataGenerator;

View File

@ -217,6 +217,32 @@ impl MiniCluster {
.await
.with_compactor_config(compactor_config)
}
/// Create a non-shared "version 2" MiniCluster that has a router,
/// ingester, and querier. The router and querier will be configured
/// to use the authorization service and will require all requests to
/// be authorized. Save config for a compactor, but the compactor service
/// should be run on-demand in tests using `compactor run-once` rather
/// than using `run compactor`.
pub async fn create_non_shared2_with_authz(
database_url: String,
authz_addr: impl Into<String> + Clone,
) -> Self {
let ingester_config = TestConfig::new_ingester2(&database_url);
let router_config =
TestConfig::new_router2(&ingester_config).with_authz_addr(authz_addr.clone());
let querier_config = TestConfig::new_querier2(&ingester_config).with_authz_addr(authz_addr);
let compactor_config = TestConfig::new_compactor2(&ingester_config);
// Set up the cluster ====================================
Self::new()
.with_ingester(ingester_config)
.await
.with_router(router_config)
.await
.with_querier(querier_config)
.await
.with_compactor_config(compactor_config)
}
/// Create an all-(minus compactor)-in-one server with the specified configuration
pub async fn create_all_in_one(test_config: TestConfig) -> Self {
@ -380,12 +406,17 @@ impl MiniCluster {
/// Writes the line protocol to the write_base/api/v2/write endpoint on the router into the
/// org/bucket
pub async fn write_to_router(&self, line_protocol: impl Into<String>) -> Response<Body> {
pub async fn write_to_router(
&self,
line_protocol: impl Into<String>,
authorization: Option<&str>,
) -> Response<Body> {
write_to_router(
line_protocol,
&self.org_id,
&self.bucket_id,
self.router().router_http_base(),
authorization,
)
.await
}

View File

@ -225,6 +225,7 @@ async fn run_query(
query_text,
cluster.namespace(),
cluster.querier().querier_grpc_connection(),
None,
)
.await
}
@ -233,6 +234,7 @@ async fn run_query(
query_text,
cluster.namespace(),
cluster.querier().querier_grpc_connection(),
None,
)
.await
{

View File

@ -143,6 +143,14 @@ pub enum Step {
expected_error_code: StatusCode,
},
/// Writes the specified line protocol to the `/api/v2/write` endpoint
/// using the specified authorization header, assert the data was
/// written successfully.
WriteLineProtocolWithAuthorization {
line_protocol: String,
authorization: String,
},
/// Ask the catalog service how many Parquet files it has for this cluster's namespace. Do this
/// before a write where you're interested in when the write has been persisted to Parquet;
/// then after the write use `WaitForPersisted2` to observe the change in the number of Parquet
@ -193,6 +201,16 @@ pub enum Step {
expected_message: String,
},
/// Run a SQL query using the FlightSQL interface authorized by the
/// authorization header. Verify that the
/// results match the expected results using the `assert_batches_eq!`
/// macro
QueryWithAuthorization {
sql: String,
authorization: String,
expected: Vec<&'static str>,
},
/// Run a SQL query using the FlightSQL interface, and then verifies
/// the results using the provided validation function on the
/// results.
@ -228,6 +246,15 @@ pub enum Step {
expected_message: String,
},
/// Run an InfluxQL query using the FlightSQL interface including an
/// authorization header. Verify that the results match the expected
/// results using the `assert_batches_eq!` macro.
InfluxQLQueryWithAuthorization {
query: String,
authorization: String,
expected: Vec<&'static str>,
},
/// Retrieve the metrics and verify the results using the provided
/// validation function.
///
@ -280,7 +307,7 @@ where
"====Begin writing line protocol to v2 HTTP API:\n{}",
line_protocol
);
let response = state.cluster.write_to_router(line_protocol).await;
let response = state.cluster.write_to_router(line_protocol, None).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
info!("====Done writing line protocol");
}
@ -292,10 +319,25 @@ where
"====Begin writing line protocol expecting error to v2 HTTP API:\n{}",
line_protocol
);
let response = state.cluster.write_to_router(line_protocol).await;
let response = state.cluster.write_to_router(line_protocol, None).await;
assert_eq!(response.status(), *expected_error_code);
info!("====Done writing line protocol expecting error");
}
Step::WriteLineProtocolWithAuthorization {
line_protocol,
authorization,
} => {
info!(
"====Begin writing line protocol (authenticated) to v2 HTTP API:\n{}",
line_protocol
);
let response = state
.cluster
.write_to_router(line_protocol, Some(authorization))
.await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
info!("====Done writing line protocol");
}
// Get the current number of Parquet files in the cluster's namespace before
// starting a new write so we can observe a change when waiting for persistence.
Step::RecordNumParquetFiles => {
@ -344,6 +386,7 @@ where
sql,
state.cluster.namespace(),
state.cluster.querier().querier_grpc_connection(),
None,
)
.await;
assert_batches_sorted_eq!(expected, &batches);
@ -380,6 +423,7 @@ where
sql,
state.cluster().namespace(),
state.cluster().querier().querier_grpc_connection(),
None,
)
.await
.unwrap_err();
@ -388,6 +432,23 @@ where
info!("====Done running");
}
Step::QueryWithAuthorization {
sql,
authorization,
expected,
} => {
info!("====Begin running SQL query (authenticated): {}", sql);
// run query
let batches = run_sql(
sql,
state.cluster.namespace(),
state.cluster().querier().querier_grpc_connection(),
Some(authorization.as_str()),
)
.await;
assert_batches_sorted_eq!(expected, &batches);
info!("====Done running");
}
Step::VerifiedQuery { sql, verify } => {
info!("====Begin running SQL verified query: {}", sql);
// run query
@ -395,6 +456,7 @@ where
sql,
state.cluster.namespace(),
state.cluster.querier().querier_grpc_connection(),
None,
)
.await;
verify(batches);
@ -407,6 +469,7 @@ where
query,
state.cluster.namespace(),
state.cluster.querier().querier_grpc_connection(),
None,
)
.await;
assert_batches_sorted_eq!(expected, &batches);
@ -446,6 +509,7 @@ where
query,
state.cluster().namespace(),
state.cluster().querier().querier_grpc_connection(),
None,
)
.await
.unwrap_err();
@ -454,6 +518,23 @@ where
info!("====Done running");
}
Step::InfluxQLQueryWithAuthorization {
query,
authorization,
expected,
} => {
info!("====Begin running InfluxQL query: {}", query);
// run query
let batches = run_influxql(
query,
state.cluster.namespace(),
state.cluster.querier().querier_grpc_connection(),
Some(authorization),
)
.await;
assert_batches_sorted_eq!(expected, &batches);
info!("====Done running");
}
Step::VerifiedMetrics(verify) => {
info!("====Begin validating metrics");

View File

@ -197,6 +197,11 @@ impl InstrumentedAsyncSemaphore {
span_recorder: Some(SpanRecorder::new(span)),
}
}
/// return the total number of permits (available + already acquired).
pub fn total_permits(self: &Arc<Self>) -> usize {
self.permits
}
}
impl Drop for InstrumentedAsyncSemaphore {
@ -420,6 +425,8 @@ mod tests {
let metrics = Arc::new(AsyncSemaphoreMetrics::new_unregistered());
let semaphore = Arc::new(metrics.new_semaphore(10));
assert_eq!(10, semaphore.total_permits());
let acquire_fut = semaphore.acquire_owned(None);
let acquire_many_fut = semaphore.acquire_many_owned(1, None);
assert_send(&acquire_fut);
@ -427,7 +434,9 @@ mod tests {
// futures itself are NOT Sync
let permit_acquire = acquire_fut.await.unwrap();
assert_eq!(10, semaphore.total_permits());
let permit_acquire_many = acquire_many_fut.await.unwrap();
assert_eq!(10, semaphore.total_permits());
assert_send(&permit_acquire);
assert_send(&permit_acquire_many);
assert_sync(&permit_acquire);

View File

@ -8,7 +8,7 @@ license.workspace = true
[dependencies]
clap = { version = "4", features = ["derive", "env"], optional = true }
is-terminal = "0.4.6"
is-terminal = "0.4.7"
logfmt = { path = "../logfmt" }
observability_deps = { path = "../observability_deps" }
thiserror = "1.0.40"