feat(idpe-17265): cst write authorization (#7527)

* feat(idpe-17265): authorization should occur as part of the single_tenant specific mod
* authz service is accessed only through the single_tenant mod handler
* authz service is wrapped in auth mod
* move auth integration test into auth mod
* push down the authorize() call into the query params parser call, in order to access query params in the extract_token
* provide configuration error when authz or single_tenant mode are not co-presented
* update authz e2e fixtures

* feat(idpe-17265): extract tokens based upon preferred ordering in spec, and write tests to verify behavior.

* chore(idpe-17265): update naming conventions for a unifying parser

* test: make MockAuthorizer have default, and add a test_delegate_to_authz for CST

* chore: record authz duration metric, and include in delegation test.

* chore: use authz terminology instead of auth_service

* chore: more explicit naming

* Revert "chore: record authz duration metric, and include in delegation test."

This reverts commit 05c36888ca7247b6953343d759a5185098fae679.

* refactor: extract_header_token versus the else condition

* refactor: make single_tenant mod and move auth within

* chore: make unreachable explicitly panic in the build

* test: make token values be const, to be consumed when MockAuthorizer is used

* test: use locking for calls_counter in test

* fix: add base64 encoding as expected for Basic header

* fix: merge conflict resolution. The AuthorizationHeaderExtension is now under the authz::http mod, which is a required feature for router package.

* chore: run rustfmt nightly with preferred import handling, on files with modified imports

* chore: code cleanup, to have minimal code needed
pull/24376/head
wiedld 2023-04-19 08:28:10 -07:00 committed by GitHub
parent 7a6862ee3a
commit 1d2003d385
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 593 additions and 284 deletions

1
Cargo.lock generated
View File

@ -4721,6 +4721,7 @@ dependencies = [
"assert_matches",
"async-trait",
"authz",
"base64 0.21.0",
"bytes",
"criterion",
"crossbeam-utils",

View File

@ -1,3 +1,8 @@
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use authz::Authorizer;
use clap_blocks::router2::Router2Config;
@ -39,18 +44,14 @@ use router::{
grpc::RpcWriteGrpcDelegate,
http::{
write::{
multi_tenant::MultiTenantRequestParser, single_tenant::SingleTenantRequestParser,
WriteParamExtractor,
multi_tenant::MultiTenantRequestUnifier, single_tenant::SingleTenantRequestUnifier,
WriteRequestUnifier,
},
HttpDelegate,
},
RpcWriteRouterServer,
},
};
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
@ -348,19 +349,20 @@ pub async fn create_router2_server_type(
let handler_stack = InstrumentationDecorator::new("request", &metrics, handler_stack);
// Initialize the HTTP API delegate
let write_param_extractor: Box<dyn WriteParamExtractor> =
match router_config.single_tenant_deployment {
true => Box::<SingleTenantRequestParser>::default(),
false => Box::<MultiTenantRequestParser>::default(),
let write_request_unifier: Result<Box<dyn WriteRequestUnifier>> =
match (router_config.single_tenant_deployment, authz) {
(true, Some(auth)) => Ok(Box::new(SingleTenantRequestUnifier::new(auth))),
(true, None) => unreachable!("INFLUXDB_IOX_SINGLE_TENANCY is set, but could not create an authz service. Check the INFLUXDB_IOX_AUTHZ_ADDR."),
(false, None) => Ok(Box::<MultiTenantRequestUnifier>::default()),
(false, Some(_)) => unreachable!("INFLUXDB_IOX_AUTHZ_ADDR is set, but authz only exists for single_tenancy. Check the INFLUXDB_IOX_SINGLE_TENANCY."),
};
let http = HttpDelegate::new(
common_state.run_config().max_http_request_size,
router_config.http_request_limit,
namespace_resolver,
handler_stack,
authz,
&metrics,
write_param_extractor,
write_request_unifier?,
);
// Initialize the gRPC API delegate that creates the services relevant to the RPC

View File

@ -8,6 +8,7 @@ license.workspace = true
[dependencies]
async-trait = "0.1"
authz = { path = "../authz", features = ["http"] }
base64 = "0.21.0"
bytes = "1.4"
crossbeam-utils = "0.8.15"
data_types = { path = "../data_types" }

View File

@ -2,7 +2,8 @@
pub mod write;
use authz::{http::AuthorizationHeaderExtension, Action, Authorizer, Permission, Resource};
use std::{str::Utf8Error, time::Instant};
use bytes::{Bytes, BytesMut};
use futures::StreamExt;
use hashbrown::HashMap;
@ -12,14 +13,13 @@ use metric::{DurationHistogram, U64Counter};
use mutable_batch::MutableBatch;
use mutable_batch_lp::LinesConverter;
use observability_deps::tracing::*;
use std::{str::Utf8Error, sync::Arc, time::Instant};
use thiserror::Error;
use tokio::sync::{Semaphore, TryAcquireError};
use trace::ctx::SpanContext;
use self::write::{
multi_tenant::MultiTenantExtractError, single_tenant::SingleTenantExtractError,
WriteParamExtractor, WriteParams,
multi_tenant::MultiTenantExtractError, single_tenant::SingleTenantExtractError, WriteParams,
WriteRequestUnifier,
};
use crate::{
dml_handlers::{
@ -98,10 +98,6 @@ pub enum Error {
/// The provided authorization is not sufficient to perform the request.
#[error("access denied")]
Forbidden,
/// An error occurred verifying the authorization token.
#[error(transparent)]
Authorizer(authz::Error),
}
impl Error {
@ -130,23 +126,12 @@ impl Error {
Error::RequestLimit => StatusCode::SERVICE_UNAVAILABLE,
Error::Unauthenticated => StatusCode::UNAUTHORIZED,
Error::Forbidden => StatusCode::FORBIDDEN,
Error::Authorizer(_) => StatusCode::INTERNAL_SERVER_ERROR,
Error::SingleTenantError(e) => StatusCode::from(e),
Error::MultiTenantError(e) => StatusCode::from(e),
}
}
}
impl From<authz::Error> for Error {
fn from(value: authz::Error) -> Self {
match value {
authz::Error::Forbidden => Self::Forbidden,
authz::Error::NoToken => Self::Unauthenticated,
e => Self::Authorizer(e),
}
}
}
impl From<&DmlError> for StatusCode {
fn from(e: &DmlError) -> Self {
match e {
@ -197,8 +182,7 @@ pub struct HttpDelegate<D, N, T = SystemProvider> {
time_provider: T,
namespace_resolver: N,
dml_handler: D,
authz: Option<Arc<dyn Authorizer>>,
write_param_extractor: Box<dyn WriteParamExtractor>,
write_request_mode_handler: Box<dyn WriteRequestUnifier>,
// A request limiter to restrict the number of simultaneous requests this
// router services.
@ -228,9 +212,8 @@ impl<D, N> HttpDelegate<D, N, SystemProvider> {
max_requests: usize,
namespace_resolver: N,
dml_handler: D,
authz: Option<Arc<dyn Authorizer>>,
metrics: &metric::Registry,
write_param_extractor: Box<dyn WriteParamExtractor>,
write_request_mode_handler: Box<dyn WriteRequestUnifier>,
) -> Self {
let write_metric_lines = metrics
.register_metric::<U64Counter>(
@ -273,9 +256,8 @@ impl<D, N> HttpDelegate<D, N, SystemProvider> {
max_request_bytes,
time_provider: SystemProvider::default(),
namespace_resolver,
write_param_extractor,
write_request_mode_handler,
dml_handler,
authz,
request_sem: Semaphore::new(max_requests),
write_metric_lines,
http_line_protocol_parse_duration,
@ -316,11 +298,11 @@ where
// Route the request to a handler.
match (req.method(), req.uri().path()) {
(&Method::POST, "/write") => {
let dml_info = self.write_param_extractor.parse_v1(&req)?;
let dml_info = self.write_request_mode_handler.parse_v1(&req).await?;
self.write_handler(req, dml_info).await
}
(&Method::POST, "/api/v2/write") => {
let dml_info = self.write_param_extractor.parse_v2(&req)?;
let dml_info = self.write_request_mode_handler.parse_v2(&req).await?;
self.write_handler(req, dml_info).await
}
(&Method::POST, "/api/v2/delete") => return Err(Error::DeletesUnsupported),
@ -341,27 +323,6 @@ where
) -> Result<(), Error> {
let span_ctx: Option<SpanContext> = req.extensions().get().cloned();
let token = req
.extensions()
.get::<AuthorizationHeaderExtension>()
.and_then(|v| v.as_ref())
.and_then(|v| {
let s = v.as_ref();
if s.len() < b"Token ".len() {
None
} else {
match s.split_at(b"Token ".len()) {
(b"Token ", token) => Some(token),
_ => None,
}
}
});
let perms = [Permission::ResourceAction(
Resource::Database(write_info.namespace.to_string()),
Action::Write,
)];
self.authz.require_any_permission(token, &perms).await?;
trace!(
namespace=%write_info.namespace,
"processing write request"
@ -480,23 +441,9 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::{
dml_handlers::{
mock::{MockDmlHandler, MockDmlHandlerCall},
CachedServiceProtectionLimit,
},
namespace_resolver::{mock::MockNamespaceResolver, NamespaceCreationError},
server::http::write::{
mock::{MockExtractorCall, MockWriteParamsExtractor},
multi_tenant::MultiTenantRequestParser,
v1::V1WriteParseError,
v2::V2WriteParseError,
Precision,
},
};
use std::{io::Write, iter, sync::Arc, time::Duration};
use assert_matches::assert_matches;
use async_trait::async_trait;
use data_types::{
NamespaceId, NamespaceName, NamespaceNameError, OrgBucketMappingError, TableId,
};
@ -505,10 +452,25 @@ mod tests {
use metric::{Attributes, Metric};
use mutable_batch::column::ColumnData;
use mutable_batch_lp::LineWriteError;
use std::{io::Write, iter, sync::Arc, time::Duration};
use test_helpers::timeout::FutureTimeout;
use tokio_stream::wrappers::ReceiverStream;
use super::*;
use crate::{
dml_handlers::{
mock::{MockDmlHandler, MockDmlHandlerCall},
CachedServiceProtectionLimit,
},
namespace_resolver::{mock::MockNamespaceResolver, NamespaceCreationError},
server::http::write::{
mock::{MockUnifyingParseCall, MockWriteRequestUnifier},
multi_tenant::MultiTenantRequestUnifier,
v1::V1WriteParseError,
v2::V2WriteParseError,
Precision,
},
};
const MAX_BYTES: usize = 1024;
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
static NAMESPACE_NAME: &str = "bananas_test";
@ -625,9 +587,8 @@ mod tests {
100,
mock_namespace_resolver,
Arc::clone(&dml_handler),
None,
&metrics,
Box::<crate::server::http::write::multi_tenant::MultiTenantRequestParser>::default(),
Box::<crate::server::http::write::multi_tenant::MultiTenantRequestUnifier>::default(),
);
let got = delegate.route(request).await;
@ -1080,10 +1041,9 @@ mod tests {
1,
mock_namespace_resolver,
Arc::clone(&dml_handler),
None,
&metrics,
Box::new(
MockWriteParamsExtractor::default().with_ret(iter::repeat_with(|| {
MockWriteRequestUnifier::default().with_ret(iter::repeat_with(|| {
Ok(WriteParams {
namespace: NamespaceName::new(NAMESPACE_NAME).unwrap(),
precision: Precision::default(),
@ -1199,101 +1159,6 @@ mod tests {
assert_metric_hit(&metrics, "http_request_limit_rejected", Some(1));
}
#[derive(Debug)]
struct MockAuthorizer {}
#[async_trait]
impl Authorizer for MockAuthorizer {
async fn permissions(
&self,
token: Option<&[u8]>,
perms: &[Permission],
) -> Result<Vec<Permission>, authz::Error> {
match token {
Some(b"GOOD") => Ok(perms.to_vec()),
Some(b"UGLY") => Err(authz::Error::verification("test", "test error")),
Some(_) => Ok(vec![]),
None => Err(authz::Error::NoToken),
}
}
}
#[tokio::test]
async fn test_authz() {
let mock_namespace_resolver =
MockNamespaceResolver::default().with_mapping(NAMESPACE_NAME, NamespaceId::new(42));
let dml_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let metrics = Arc::new(metric::Registry::default());
let authz = Arc::new(MockAuthorizer {});
let delegate = HttpDelegate::new(
MAX_BYTES,
1,
mock_namespace_resolver,
Arc::clone(&dml_handler),
Some(authz),
&metrics,
Box::new(
MockWriteParamsExtractor::default().with_ret(iter::repeat_with(|| {
Ok(WriteParams {
namespace: NamespaceName::new(NAMESPACE_NAME).unwrap(),
precision: Precision::default(),
})
})),
),
);
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.extension(AuthorizationHeaderExtension::new(Some(
HeaderValue::from_str("Token GOOD").unwrap(),
)))
.body(Body::from("platanos,tag1=A,tag2=B val=42i 123456"))
.unwrap();
let got = delegate.route(request).await;
assert_matches!(got, Ok(_));
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.extension(AuthorizationHeaderExtension::new(Some(
HeaderValue::from_str("Token BAD").unwrap(),
)))
.body(Body::from(""))
.unwrap();
let got = delegate.route(request).await;
assert_matches!(got, Err(Error::Forbidden));
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.body(Body::from(""))
.unwrap();
let got = delegate.route(request).await;
assert_matches!(got, Err(Error::Unauthenticated));
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.extension(AuthorizationHeaderExtension::new(Some(
HeaderValue::from_str("Token UGLY").unwrap(),
)))
.body(Body::from(""))
.unwrap();
let got = delegate.route(request).await;
assert_matches!(got, Err(Error::Authorizer(_)));
let calls = dml_handler.calls();
assert_matches!(calls.as_slice(), [MockDmlHandlerCall::Write{namespace, ..}] => {
assert_eq!(namespace, NAMESPACE_NAME);
})
}
/// Assert the router rejects writes to the V1 endpoint when in
/// "multi-tenant" mode.
#[tokio::test]
@ -1308,17 +1173,13 @@ mod tests {
1,
mock_namespace_resolver,
Arc::clone(&dml_handler),
None,
&metrics,
Box::<MultiTenantRequestParser>::default(),
Box::<MultiTenantRequestUnifier>::default(),
);
let request = Request::builder()
.uri("https://bananas.example/write")
.method("POST")
.extension(AuthorizationHeaderExtension::new(Some(
HeaderValue::from_str("Token GOOD").unwrap(),
)))
.body(Body::from("platanos,tag1=A,tag2=B val=42i 123456"))
.unwrap();
@ -1327,17 +1188,17 @@ mod tests {
}
/// Assert the router delegates request parsing to the
/// [`WriteParamExtractor`] implementation.
/// [`WriteRequestUnifier`] implementation.
///
/// By validating request parsing is delegated, behavioural tests for each
/// implementation can be implemented directly against those implementations
/// (instead of putting them all here).
#[tokio::test]
async fn test_delegate_to_write_param_extractor_ok() {
async fn test_delegate_to_write_request_unifier_ok() {
let mock_namespace_resolver =
MockNamespaceResolver::default().with_mapping(NAMESPACE_NAME, NamespaceId::new(42));
let request_parser = Arc::new(MockWriteParamsExtractor::default().with_ret(
let request_unifier = Arc::new(MockWriteRequestUnifier::default().with_ret(
iter::repeat_with(|| {
Ok(WriteParams {
namespace: NamespaceName::new(NAMESPACE_NAME).unwrap(),
@ -1354,9 +1215,8 @@ mod tests {
1,
mock_namespace_resolver,
Arc::clone(&dml_handler),
None,
&metrics,
Box::new(Arc::clone(&request_parser)),
Box::new(Arc::clone(&request_unifier)),
);
// A route miss does not invoke the parser
@ -1378,7 +1238,10 @@ mod tests {
assert_matches!(got, Ok(_));
// Only one call was received, and it should be v1.
assert_matches!(request_parser.calls().as_slice(), [MockExtractorCall::V1]);
assert_matches!(
request_unifier.calls().as_slice(),
[MockUnifyingParseCall::V1]
);
// V2 write parsing is delegated
let request = Request::builder()
@ -1391,8 +1254,8 @@ mod tests {
// Both call were received.
assert_matches!(
request_parser.calls().as_slice(),
[MockExtractorCall::V1, MockExtractorCall::V2]
request_unifier.calls().as_slice(),
[MockUnifyingParseCall::V1, MockUnifyingParseCall::V2]
);
}
@ -1566,13 +1429,6 @@ mod tests {
"access denied",
),
(
Authorizer(
authz::Error::verification("bananas", NamespaceCreationError::Reject("bananas".to_string()))
),
"token verification not possible: bananas",
),
(
DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(Box::new(CachedServiceProtectionLimit::Column {
table_name: "bananas".to_string(),

View File

@ -3,12 +3,13 @@
//! [V2 Write API]:
//! https://docs.influxdata.com/influxdb/v2.6/api/#operation/PostWrite
use async_trait::async_trait;
use data_types::{NamespaceName, OrgBucketMappingError};
use hyper::{Body, Request};
use super::{
v2::{V2WriteParseError, WriteParamsV2},
WriteParamExtractor, WriteParams,
WriteParams, WriteRequestUnifier,
};
use crate::server::http::Error;
@ -49,14 +50,15 @@ impl From<&MultiTenantExtractError> for hyper::StatusCode {
/// [V2 Write API]:
/// https://docs.influxdata.com/influxdb/v2.6/api/#operation/PostWrite
#[derive(Debug, Default)]
pub struct MultiTenantRequestParser;
pub struct MultiTenantRequestUnifier;
impl WriteParamExtractor for MultiTenantRequestParser {
fn parse_v1(&self, _req: &Request<Body>) -> Result<WriteParams, Error> {
#[async_trait]
impl WriteRequestUnifier for MultiTenantRequestUnifier {
async fn parse_v1(&self, _req: &Request<Body>) -> Result<WriteParams, Error> {
Err(Error::NoHandler)
}
fn parse_v2(&self, req: &Request<Body>) -> Result<WriteParams, Error> {
async fn parse_v2(&self, req: &Request<Body>) -> Result<WriteParams, Error> {
Ok(parse_v2(req)?)
}
}
@ -77,15 +79,14 @@ fn parse_v2(req: &Request<Body>) -> Result<WriteParams, MultiTenantExtractError>
mod tests {
use assert_matches::assert_matches;
use super::*;
use crate::server::http::write::Precision;
use super::*;
#[tokio::test]
async fn test_parse_v1_always_errors() {
let unifier = MultiTenantRequestUnifier::default();
#[test]
fn test_parse_v1_always_errors() {
let parser = MultiTenantRequestParser::default();
let got = parser.parse_v1(&Request::default());
let got = unifier.parse_v1(&Request::default()).await;
assert_matches!(got, Err(Error::NoHandler));
}
@ -96,9 +97,9 @@ mod tests {
want = $($want:tt)+ // A pattern match for assert_matches!
) => {
paste::paste! {
#[test]
fn [<test_parse_v2_ $name>]() {
let parser = MultiTenantRequestParser::default();
#[tokio::test]
async fn [<test_parse_v2_ $name>]() {
let unifier = MultiTenantRequestUnifier::default();
let query = $query_string;
let request = Request::builder()
@ -107,7 +108,7 @@ mod tests {
.body(Body::from(""))
.unwrap();
let got = parser.parse_v2(&request);
let got = unifier.parse_v2(&request).await;
assert_matches!(got, $($want)+);
}
}

View File

@ -1,5 +1,6 @@
use std::sync::Arc;
use async_trait::async_trait;
use data_types::NamespaceName;
use hyper::{Body, Request};
use serde::Deserialize;
@ -43,36 +44,38 @@ pub struct WriteParams {
pub(crate) precision: Precision,
}
/// A [`WriteParamExtractor`] abstraction returns [`WriteParams`] from
/// [`Request`] that conform to the [V1 Write API] or [V2 Write API].
/// A [`WriteRequestUnifier`] abstraction returns a unified [`WriteParams`]
/// from [`Request`] that conform to the [V1 Write API] or [V2 Write API].
///
/// Differing request parsing semantics are abstracted through this trait
/// (single tenant, vs multi tenant).
/// Differing request parsing semantics and authorization are abstracted
/// through this trait (single tenant, vs multi tenant).
///
/// [V1 Write API]:
/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint
/// [V2 Write API]:
/// https://docs.influxdata.com/influxdb/v2.6/api/#operation/PostWrite
pub trait WriteParamExtractor: std::fmt::Debug + Send + Sync {
/// Parse a [`WriteParams`] from a HTTP [`Request]`, according to the V1
/// Write API.
fn parse_v1(&self, req: &Request<Body>) -> Result<WriteParams, Error>;
#[async_trait]
pub trait WriteRequestUnifier: std::fmt::Debug + Send + Sync {
/// Perform a unifying parse to produce a [`WriteParams`] from a HTTP [`Request]`,
/// according to the V1 Write API.
async fn parse_v1(&self, req: &Request<Body>) -> Result<WriteParams, Error>;
/// Parse a [`WriteParams`] from a HTTP [`Request]`, according to the V2
/// Write API.
fn parse_v2(&self, req: &Request<Body>) -> Result<WriteParams, Error>;
/// Perform a unifying parse to produce a [`WriteParams`] from a HTTP [`Request]`,
/// according to the V2 Write API.
async fn parse_v2(&self, req: &Request<Body>) -> Result<WriteParams, Error>;
}
impl<T> WriteParamExtractor for Arc<T>
#[async_trait]
impl<T> WriteRequestUnifier for Arc<T>
where
T: WriteParamExtractor,
T: WriteRequestUnifier,
{
fn parse_v1(&self, req: &Request<Body>) -> Result<WriteParams, Error> {
(**self).parse_v1(req)
async fn parse_v1(&self, req: &Request<Body>) -> Result<WriteParams, Error> {
(**self).parse_v1(req).await
}
fn parse_v2(&self, req: &Request<Body>) -> Result<WriteParams, Error> {
(**self).parse_v2(req)
async fn parse_v2(&self, req: &Request<Body>) -> Result<WriteParams, Error> {
(**self).parse_v2(req).await
}
}
@ -83,13 +86,13 @@ pub mod mock {
use super::*;
#[derive(Debug, Clone, Copy)]
pub enum MockExtractorCall {
pub enum MockUnifyingParseCall {
V1,
V2,
}
struct State {
calls: Vec<MockExtractorCall>,
calls: Vec<MockUnifyingParseCall>,
ret: Box<dyn Iterator<Item = Result<WriteParams, Error>> + Send + Sync>,
}
@ -109,11 +112,11 @@ pub mod mock {
}
#[derive(Debug, Default)]
pub struct MockWriteParamsExtractor {
pub struct MockWriteRequestUnifier {
state: Mutex<State>,
}
impl MockWriteParamsExtractor {
impl MockWriteRequestUnifier {
/// Read values off of the provided iterator and return them for calls
/// to [`Self::write()`].
pub(crate) fn with_ret<T, U>(self, ret: T) -> Self
@ -125,21 +128,22 @@ pub mod mock {
self
}
pub(crate) fn calls(&self) -> Vec<MockExtractorCall> {
pub(crate) fn calls(&self) -> Vec<MockUnifyingParseCall> {
self.state.lock().calls.clone()
}
}
impl WriteParamExtractor for MockWriteParamsExtractor {
fn parse_v1(&self, _req: &Request<Body>) -> Result<WriteParams, Error> {
#[async_trait]
impl WriteRequestUnifier for MockWriteRequestUnifier {
async fn parse_v1(&self, _req: &Request<Body>) -> Result<WriteParams, Error> {
let mut guard = self.state.lock();
guard.calls.push(MockExtractorCall::V1);
guard.calls.push(MockUnifyingParseCall::V1);
guard.ret.next().unwrap()
}
fn parse_v2(&self, _req: &Request<Body>) -> Result<WriteParams, Error> {
async fn parse_v2(&self, _req: &Request<Body>) -> Result<WriteParams, Error> {
let mut guard = self.state.lock();
guard.calls.push(MockExtractorCall::V2);
guard.calls.push(MockUnifyingParseCall::V2);
guard.ret.next().unwrap()
}
}

View File

@ -0,0 +1,340 @@
//! Authorization of HTTP requests using the authz service client.
use std::sync::Arc;
use authz::{
self, http::AuthorizationHeaderExtension, Action, Authorizer, Error, Permission, Resource,
};
use base64::{prelude::BASE64_STANDARD, Engine};
use data_types::NamespaceName;
use hyper::{header::HeaderValue, Body, Request};
fn extract_header_token(header_value: &'_ HeaderValue) -> Option<Vec<u8>> {
let mut parts = header_value.as_bytes().splitn(2, |&v| v == b' ');
let token = match parts.next()? {
b"Token" | b"Bearer" => parts.next()?.to_vec(),
b"Basic" => parts
.next()
.and_then(|v| BASE64_STANDARD.decode(v).ok())?
.splitn(2, |&v| v == b':')
.nth(1)?
.to_vec(),
_ => return None,
};
if token.is_empty() {
return None;
}
Some(token)
}
pub(crate) async fn authorize(
authz: &Arc<dyn Authorizer>,
req: &Request<Body>,
namespace: &NamespaceName<'_>,
query_param_token: Option<String>,
) -> Result<(), Error> {
let token = req
.extensions()
.get::<AuthorizationHeaderExtension>()
.and_then(|v| v.as_ref())
.and_then(|v| {
extract_header_token(v).or_else(|| query_param_token.map(|t| t.into_bytes()))
});
let perms = [Permission::ResourceAction(
Resource::Database(namespace.to_string()),
Action::Write,
)];
authz
.require_any_permission(token.as_deref(), &perms)
.await?;
Ok(())
}
#[cfg(test)]
pub mod mock {
use async_trait::async_trait;
use super::*;
pub const MOCK_AUTH_VALID_TOKEN: &str = "GOOD";
pub const MOCK_AUTH_INVALID_TOKEN: &str = "UGLY";
pub const MOCK_AUTH_NO_PERMS_TOKEN: &str = "BAD";
#[derive(Debug, Default)]
pub struct MockAuthorizer {}
#[async_trait]
impl Authorizer for MockAuthorizer {
async fn permissions(
&self,
token: Option<&[u8]>,
perms: &[Permission],
) -> Result<Vec<Permission>, authz::Error> {
match token {
Some(b"GOOD") => Ok(perms.to_vec()),
Some(b"UGLY") => Err(authz::Error::verification("test", "test error")),
Some(_) => Ok(vec![]),
None => Err(authz::Error::NoToken),
}
}
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use data_types::NamespaceId;
use hyper::header::HeaderValue;
use super::{mock::*, *};
use crate::{
dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall},
namespace_resolver::mock::MockNamespaceResolver,
server::http::{
self,
write::single_tenant::{SingleTenantExtractError, SingleTenantRequestUnifier},
HttpDelegate,
},
};
const MAX_BYTES: usize = 1024;
#[tokio::test]
async fn test_authz_service_integration() {
static NAMESPACE_NAME: &str = "test";
let mock_namespace_resolver =
MockNamespaceResolver::default().with_mapping(NAMESPACE_NAME, NamespaceId::new(42));
let dml_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let metrics = Arc::new(metric::Registry::default());
let authz = Arc::new(MockAuthorizer::default());
let delegate = HttpDelegate::new(
MAX_BYTES,
1,
mock_namespace_resolver,
Arc::clone(&dml_handler),
&metrics,
Box::new(SingleTenantRequestUnifier::new(authz)),
);
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.extension(AuthorizationHeaderExtension::new(Some(
HeaderValue::from_str(format!("Token {MOCK_AUTH_VALID_TOKEN}").as_str()).unwrap(),
)))
.body(Body::from("platanos,tag1=A,tag2=B val=42i 123456"))
.unwrap();
let got = delegate.route(request).await;
assert_matches!(got, Ok(_));
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.extension(AuthorizationHeaderExtension::new(Some(
HeaderValue::from_str(format!("Token {MOCK_AUTH_NO_PERMS_TOKEN}").as_str())
.unwrap(),
)))
.body(Body::from(""))
.unwrap();
let got = delegate.route(request).await;
assert_matches!(
got,
Err(http::Error::SingleTenantError(
SingleTenantExtractError::Authorizer(authz::Error::Forbidden)
))
);
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.body(Body::from(""))
.unwrap();
let got = delegate.route(request).await;
assert_matches!(
got,
Err(http::Error::SingleTenantError(
SingleTenantExtractError::Authorizer(authz::Error::NoToken)
))
);
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.extension(AuthorizationHeaderExtension::new(Some(
HeaderValue::from_str(format!("Token {MOCK_AUTH_INVALID_TOKEN}").as_str()).unwrap(),
)))
.body(Body::from(""))
.unwrap();
let got = delegate.route(request).await;
assert_matches!(
got,
Err(http::Error::SingleTenantError(
SingleTenantExtractError::Authorizer(authz::Error::Verification { .. })
))
);
let calls = dml_handler.calls();
assert_matches!(calls.as_slice(), [MockDmlHandlerCall::Write{namespace, ..}] => {
assert_eq!(namespace, NAMESPACE_NAME);
})
}
macro_rules! test_authorize {
(
$name:ident,
header_value = $header_value:expr, // If present, set as header
query_param_token = $query_token:expr, // Optional token provided as ?q=<token>
want = $($want:tt)+ // A pattern match for assert_matches!
) => {
paste::paste! {
#[tokio::test]
async fn [<test_authorize_ $name>]() {
let authz: Arc<dyn Authorizer> = Arc::new(MockAuthorizer::default());
let namespace = NamespaceName::new("test").unwrap();
let request = Request::builder()
.uri(format!("https://any.com/ignored"))
.method("POST")
.extension(AuthorizationHeaderExtension::new(Some(
HeaderValue::from_str($header_value).unwrap(),
)))
.body(Body::from(""))
.unwrap();
let got = authorize(&authz, &request, &namespace, $query_token).await;
assert_matches!(got, $($want)+);
}
}
};
}
fn encode_basic_header(token: String) -> String {
format!("Basic {}", BASE64_STANDARD.encode(token))
}
test_authorize!(
token_header_ok,
header_value = format!("Token {MOCK_AUTH_VALID_TOKEN}").as_str(),
query_param_token = Some("ignore".to_string()),
want = Ok(())
);
test_authorize!(
token_header_rejected,
header_value = format!("Token {MOCK_AUTH_INVALID_TOKEN}").as_str(),
query_param_token = Some("ignore".to_string()),
want = Err(authz::Error::Verification { .. })
);
test_authorize!(
token_header_forbidden,
header_value = format!("Token {MOCK_AUTH_NO_PERMS_TOKEN}").as_str(),
query_param_token = Some("ignore".to_string()),
want = Err(authz::Error::Forbidden)
);
test_authorize!(
token_header_missing,
header_value = "Token ",
query_param_token = None,
want = Err(authz::Error::NoToken)
);
test_authorize!(
token_header_missing_whitespace,
header_value = "Token",
query_param_token = None,
want = Err(authz::Error::NoToken)
);
test_authorize!(
token_header_missing_whitespace_match_next,
header_value = "Token",
query_param_token = Some(MOCK_AUTH_VALID_TOKEN.to_string()),
want = Ok(())
);
test_authorize!(
bearer_header_ok,
header_value = format!("Bearer {MOCK_AUTH_VALID_TOKEN}").as_str(),
query_param_token = Some("ignore".to_string()),
want = Ok(())
);
test_authorize!(
bearer_header_missing,
header_value = "Bearer ",
query_param_token = None,
want = Err(authz::Error::NoToken)
);
test_authorize!(
basic_header_ok,
header_value = encode_basic_header(format!("ignore:{MOCK_AUTH_VALID_TOKEN}")).as_str(),
query_param_token = Some("ignore".to_string()),
want = Ok(())
);
test_authorize!(
basic_header_missing,
header_value = encode_basic_header("".to_string()).as_str(),
query_param_token = None,
want = Err(authz::Error::NoToken)
);
test_authorize!(
basic_header_missing_part,
header_value = encode_basic_header("ignore:".to_string()).as_str(),
query_param_token = None,
want = Err(authz::Error::NoToken)
);
test_authorize!(
basic_header_rejected,
header_value = encode_basic_header(format!("ignore:{MOCK_AUTH_INVALID_TOKEN}")).as_str(),
query_param_token = Some("ignore".to_string()),
want = Err(authz::Error::Verification { .. })
);
test_authorize!(
basic_header_forbidden,
header_value = encode_basic_header(format!("ignore:{MOCK_AUTH_NO_PERMS_TOKEN}")).as_str(),
query_param_token = Some("ignore".to_string()),
want = Err(authz::Error::Forbidden)
);
test_authorize!(
query_param_token_ok,
header_value = "",
query_param_token = Some(MOCK_AUTH_VALID_TOKEN.to_string()),
want = Ok(())
);
test_authorize!(
query_param_token_rejected,
header_value = "",
query_param_token = Some(MOCK_AUTH_INVALID_TOKEN.to_string()),
want = Err(authz::Error::Verification { .. })
);
test_authorize!(
query_param_token_forbidden,
header_value = "",
query_param_token = Some(MOCK_AUTH_NO_PERMS_TOKEN.to_string()),
want = Err(authz::Error::Forbidden)
);
test_authorize!(
everything_missing,
header_value = "",
query_param_token = None,
want = Err(authz::Error::NoToken)
);
}

View File

@ -7,19 +7,26 @@
//! [V1 Write API]:
//! https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint
pub mod auth;
use std::sync::Arc;
use async_trait::async_trait;
use auth::authorize;
use authz::{self, Authorizer};
use data_types::{NamespaceName, NamespaceNameError};
use hyper::{Body, Request};
use thiserror::Error;
use super::{
v1::{RetentionPolicy, V1WriteParseError, WriteParamsV1},
v2::{V2WriteParseError, WriteParamsV2},
WriteParamExtractor, WriteParams,
WriteParams, WriteRequestUnifier,
};
use crate::server::http::{
write::v1::V1_NAMESPACE_RP_SEPARATOR,
Error::{self},
};
use data_types::{NamespaceName, NamespaceNameError};
/// Request parsing errors when operating in "single tenant" mode.
#[derive(Debug, Error)]
@ -39,6 +46,10 @@ pub enum SingleTenantExtractError {
/// A [`WriteParamsV2`] failed to be parsed from the HTTP request.
#[error(transparent)]
ParseV2Request(#[from] V2WriteParseError),
/// An error occurred verifying the authorization token.
#[error(transparent)]
Authorizer(authz::Error),
}
/// Implement a by-ref conversion to avoid "moving" the inner errors when only
@ -60,6 +71,11 @@ impl From<&SingleTenantExtractError> for hyper::StatusCode {
SingleTenantExtractError::ParseV2Request(
V2WriteParseError::NoQueryParams | V2WriteParseError::DecodeFail(_),
) => Self::BAD_REQUEST,
SingleTenantExtractError::Authorizer(e) => match e {
authz::Error::Forbidden => Self::FORBIDDEN,
authz::Error::NoToken => Self::UNAUTHORIZED,
_ => Self::FORBIDDEN,
},
}
}
}
@ -68,7 +84,8 @@ impl From<&SingleTenantExtractError> for hyper::StatusCode {
///
/// This handler respects the [V2 Write API] with the following modifications:
///
/// * The namespace is derived from ONLY the bucket name (org is discarded)
/// * The namespace is derived from ONLY the bucket name (org is discarded).
/// * Authorization token is required.
///
/// This handler respects a limited subset of the [V1 Write API] defined in
/// <https://github.com/influxdata/idpe/issues/17265>.
@ -77,21 +94,34 @@ impl From<&SingleTenantExtractError> for hyper::StatusCode {
/// https://docs.influxdata.com/influxdb/v2.6/api/#operation/PostWrite
/// [V1 Write API]:
/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint
#[derive(Debug, Default)]
pub struct SingleTenantRequestParser;
#[derive(Debug)]
pub struct SingleTenantRequestUnifier {
authz: Arc<dyn Authorizer>,
}
impl WriteParamExtractor for SingleTenantRequestParser {
fn parse_v1(&self, req: &Request<Body>) -> Result<WriteParams, Error> {
Ok(parse_v1(req)?)
impl SingleTenantRequestUnifier {
/// Creates a new SingleTenantRequestParser
pub fn new(authz: Arc<dyn Authorizer>) -> Self {
Self { authz }
}
}
#[async_trait]
impl WriteRequestUnifier for SingleTenantRequestUnifier {
async fn parse_v1(&self, req: &Request<Body>) -> Result<WriteParams, Error> {
Ok(parse_v1(req, &self.authz).await?)
}
fn parse_v2(&self, req: &Request<Body>) -> Result<WriteParams, Error> {
Ok(parse_v2(req)?)
async fn parse_v2(&self, req: &Request<Body>) -> Result<WriteParams, Error> {
Ok(parse_v2(req, &self.authz).await?)
}
}
// Parse a V1 write request for single tenant mode.
fn parse_v1(req: &Request<Body>) -> Result<WriteParams, SingleTenantExtractError> {
async fn parse_v1(
req: &Request<Body>,
authz: &Arc<dyn Authorizer>,
) -> Result<WriteParams, SingleTenantExtractError> {
// Extract the write parameters.
let write_params = WriteParamsV1::try_from(req)?;
@ -100,7 +130,7 @@ fn parse_v1(req: &Request<Body>) -> Result<WriteParams, SingleTenantExtractError
debug_assert!(!write_params.db.contains(V1_NAMESPACE_RP_SEPARATOR));
// Extract or construct the namespace name string from the write parameters
let namespace = match write_params.rp {
let namespace = NamespaceName::new(match write_params.rp {
RetentionPolicy::Unspecified | RetentionPolicy::Autogen => write_params.db,
RetentionPolicy::Named(rp) => {
format!(
@ -109,16 +139,22 @@ fn parse_v1(req: &Request<Body>) -> Result<WriteParams, SingleTenantExtractError
sep = V1_NAMESPACE_RP_SEPARATOR
)
}
};
})?;
authorize(authz, req, &namespace, write_params.password)
.await
.map_err(SingleTenantExtractError::Authorizer)?;
Ok(WriteParams {
namespace: NamespaceName::new(namespace)?,
namespace,
precision: write_params.precision,
})
}
// Parse a V2 write request for single tenant mode.
fn parse_v2(req: &Request<Body>) -> Result<WriteParams, SingleTenantExtractError> {
async fn parse_v2(
req: &Request<Body>,
authz: &Arc<dyn Authorizer>,
) -> Result<WriteParams, SingleTenantExtractError> {
let write_params = WriteParamsV2::try_from(req)?;
// For V2 requests in "single tenant" mode, only the bucket parameter is
@ -130,20 +166,66 @@ fn parse_v2(req: &Request<Body>) -> Result<WriteParams, SingleTenantExtractError
if write_params.bucket.is_empty() {
return Err(SingleTenantExtractError::NoBucketSpecified);
}
let namespace = NamespaceName::new(write_params.bucket)?;
authorize(authz, req, &namespace, None)
.await
.map_err(SingleTenantExtractError::Authorizer)?;
Ok(WriteParams {
namespace: NamespaceName::new(write_params.bucket)?,
namespace,
precision: write_params.precision,
})
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use authz::{http::AuthorizationHeaderExtension, Permission};
use hyper::header::HeaderValue;
use parking_lot::Mutex;
use super::{
auth::mock::{MockAuthorizer, *},
*,
};
use crate::server::http::write::Precision;
use super::*;
#[tokio::test]
async fn test_delegate_to_authz() {
#[derive(Debug)]
struct MockCountingAuthorizer {
calls_counter: Arc<Mutex<usize>>,
}
use assert_matches::assert_matches;
#[async_trait]
impl Authorizer for MockCountingAuthorizer {
async fn permissions(
&self,
_token: Option<&[u8]>,
perms: &[Permission],
) -> Result<Vec<Permission>, authz::Error> {
*self.calls_counter.lock() += 1;
Ok(perms.to_vec())
}
}
let counter = Arc::new(Mutex::new(0));
let authz: Arc<MockCountingAuthorizer> = Arc::new(MockCountingAuthorizer {
calls_counter: Arc::clone(&counter),
});
let unifier = SingleTenantRequestUnifier::new(authz);
let request = Request::builder()
.uri("https://foo?db=bananas")
.method("POST")
.extension(AuthorizationHeaderExtension::new(Some(
HeaderValue::from_str(format!("Token {MOCK_AUTH_VALID_TOKEN}").as_str()).unwrap(),
)))
.body(Body::from(""))
.unwrap();
assert!(unifier.parse_v1(&request).await.is_ok());
assert_eq!(*counter.lock(), 1);
}
macro_rules! test_parse_v1 {
(
@ -152,18 +234,22 @@ mod tests {
want = $($want:tt)+ // A pattern match for assert_matches!
) => {
paste::paste! {
#[test]
fn [<test_parse_v1_ $name>]() {
let parser = SingleTenantRequestParser::default();
#[tokio::test]
async fn [<test_parse_v1_ $name>]() {
let authz = Arc::new(MockAuthorizer::default());
let unifier = SingleTenantRequestUnifier::new(authz);
let query = $query_string;
let request = Request::builder()
.uri(format!("https://itsallbroken.com/ignored{query}"))
.method("POST")
.extension(AuthorizationHeaderExtension::new(Some(
HeaderValue::from_str(format!("Token {MOCK_AUTH_VALID_TOKEN}").as_str()).unwrap(),
)))
.body(Body::from(""))
.unwrap();
let got = parser.parse_v1(&request);
let got = unifier.parse_v1(&request).await;
assert_matches!(got, $($want)+);
}
}
@ -317,18 +403,22 @@ mod tests {
want = $($want:tt)+ // A pattern match for assert_matches!
) => {
paste::paste! {
#[test]
fn [<test_parse_v2_ $name>]() {
let parser = SingleTenantRequestParser::default();
#[tokio::test]
async fn [<test_parse_v2_ $name>]() {
let authz = Arc::new(MockAuthorizer::default());
let unifier = SingleTenantRequestUnifier::new(authz);
let query = $query_string;
let request = Request::builder()
.uri(format!("https://itsallbroken.com/ignored{query}"))
.method("POST")
.extension(AuthorizationHeaderExtension::new(Some(
HeaderValue::from_str(format!("Token {MOCK_AUTH_VALID_TOKEN}").as_str()).unwrap(),
)))
.body(Body::from(""))
.unwrap();
let got = parser.parse_v2(&request);
let got = unifier.parse_v2(&request).await;
assert_matches!(got, $($want)+);
}
}

View File

@ -67,7 +67,7 @@ impl<'de> Deserialize<'de> for RetentionPolicy {
}
}
/// Query parameters for v2 write requests.
/// Query parameters for v1 write requests.
#[derive(Debug, Deserialize)]
pub(crate) struct WriteParamsV1 {
pub(crate) db: String,
@ -76,6 +76,11 @@ pub(crate) struct WriteParamsV1 {
pub(crate) precision: Precision,
#[serde(default)]
pub(crate) rp: RetentionPolicy,
// `username` is an optional v1 query parameter, but is ignored
// in the CST spec, we treat the `p` parameter as a token
#[serde(rename(deserialize = "p"))]
pub(crate) password: Option<String>,
}
impl<T> TryFrom<&Request<T>> for WriteParamsV1 {

View File

@ -1,3 +1,5 @@
use std::{iter, string::String, sync::Arc, time::Duration};
use data_types::{PartitionTemplate, QueryPoolId, TableId, TemplatePart, TopicId};
use generated_types::influxdata::iox::ingester::v1::WriteRequest;
use hashbrown::HashMap;
@ -18,16 +20,9 @@ use router::{
namespace_resolver::{MissingNamespaceAction, NamespaceAutocreation, NamespaceSchemaResolver},
server::{
grpc::RpcWriteGrpcDelegate,
http::{
write::{
multi_tenant::MultiTenantRequestParser, single_tenant::SingleTenantRequestParser,
WriteParamExtractor,
},
HttpDelegate,
},
http::{write::multi_tenant::MultiTenantRequestUnifier, HttpDelegate},
},
};
use std::{iter, string::String, sync::Arc, time::Duration};
/// The topic catalog ID assigned by the namespace auto-creator in the
/// handler stack for namespaces it has not yet observed.
@ -187,19 +182,15 @@ impl TestContext {
let handler_stack = InstrumentationDecorator::new("request", &metrics, handler_stack);
let write_param_extractor: Box<dyn WriteParamExtractor> = match single_tenancy {
true => Box::<SingleTenantRequestParser>::default(),
false => Box::<MultiTenantRequestParser>::default(),
};
let write_request_unifier = Box::<MultiTenantRequestUnifier>::default();
let http_delegate = HttpDelegate::new(
1024,
100,
namespace_resolver,
handler_stack,
None,
&metrics,
write_param_extractor,
write_request_unifier,
);
let grpc_delegate = RpcWriteGrpcDelegate::new(

View File

@ -184,9 +184,10 @@ impl TestConfig {
)
}
/// Configure the authorization server.
pub fn with_authz_addr(self, addr: impl Into<String>) -> Self {
/// Configure the single tenancy mode, including the authorization server.
pub fn with_single_tenancy(self, addr: impl Into<String>) -> Self {
self.with_env("INFLUXDB_IOX_AUTHZ_ADDR", addr)
.with_env("INFLUXDB_IOX_SINGLE_TENANCY", "true")
}
// Get the catalog DSN URL if set.

View File

@ -66,6 +66,22 @@ impl MiniCluster {
}
}
pub fn new_based_on_tenancy(is_single_tenant: bool) -> Self {
let org_id = rand_id();
let bucket_id = rand_id();
let namespace = match is_single_tenant {
true => bucket_id.clone(),
false => format!("{org_id}_{bucket_id}"),
};
Self {
org_id,
bucket_id,
namespace,
..Self::default()
}
}
/// Create a new MiniCluster that shares the same underlying servers but has a new unique
/// namespace and set of connections
///
@ -229,12 +245,13 @@ impl MiniCluster {
) -> Self {
let ingester_config = TestConfig::new_ingester2(&database_url);
let router_config =
TestConfig::new_router2(&ingester_config).with_authz_addr(authz_addr.clone());
let querier_config = TestConfig::new_querier2(&ingester_config).with_authz_addr(authz_addr);
TestConfig::new_router2(&ingester_config).with_single_tenancy(authz_addr.clone());
let querier_config =
TestConfig::new_querier2(&ingester_config).with_single_tenancy(authz_addr);
let compactor_config = TestConfig::new_compactor2(&ingester_config);
// Set up the cluster ====================================
Self::new()
Self::new_based_on_tenancy(true)
.with_ingester(ingester_config)
.await
.with_router(router_config)