Merge pull request #7690 from influxdata/chore/router-metrics-for-auth-v2
chore: authz metric to measure duration of ioxauth rpc callpull/24376/head
commit
007460c0a6
|
@ -519,7 +519,10 @@ dependencies = [
|
|||
"base64 0.21.0",
|
||||
"generated_types",
|
||||
"http",
|
||||
"iox_time",
|
||||
"metric",
|
||||
"observability_deps",
|
||||
"parking_lot 0.12.1",
|
||||
"paste",
|
||||
"snafu",
|
||||
"test_helpers_end_to_end",
|
||||
|
|
|
@ -10,7 +10,9 @@ license.workspace = true
|
|||
|
||||
[dependencies]
|
||||
http = {version = "0.2.9", optional = true }
|
||||
iox_time = { version = "0.1.0", path = "../iox_time" }
|
||||
generated_types = { path = "../generated_types" }
|
||||
metric = { version = "0.1.0", path = "../metric" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
workspace-hack = { version = "0.1", path = "../workspace-hack" }
|
||||
|
||||
|
@ -22,6 +24,7 @@ tonic = { workspace = true }
|
|||
|
||||
[dev-dependencies]
|
||||
assert_matches = "1.5.0"
|
||||
parking_lot = "0.12.1"
|
||||
paste = "1.0.12"
|
||||
test_helpers_end_to_end = { path = "../test_helpers_end_to_end" }
|
||||
tokio = "1.28.0"
|
||||
|
|
|
@ -0,0 +1,248 @@
|
|||
use async_trait::async_trait;
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use metric::{DurationHistogram, Metric, Registry};
|
||||
|
||||
use super::{Authorizer, Error, Permission};
|
||||
|
||||
const AUTHZ_DURATION_METRIC: &str = "authz_permission_check_duration";
|
||||
|
||||
/// An instrumentation decorator over a [`Authorizer`] implementation.
|
||||
///
|
||||
/// This wrapper captures the latency distribution of the decorated
|
||||
/// [`Authorizer::permissions()`] call, faceted by success/error result.
|
||||
#[derive(Debug)]
|
||||
pub struct AuthorizerInstrumentation<T, P = SystemProvider> {
|
||||
inner: T,
|
||||
time_provider: P,
|
||||
|
||||
/// Permissions-check duration distribution for successesful rpc, but not authorized.
|
||||
ioxauth_rpc_duration_success_unauth: DurationHistogram,
|
||||
|
||||
/// Permissions-check duration distribution for successesful rpc + authorized.
|
||||
ioxauth_rpc_duration_success_auth: DurationHistogram,
|
||||
|
||||
/// Permissions-check duration distribution for errors.
|
||||
ioxauth_rpc_duration_error: DurationHistogram,
|
||||
}
|
||||
|
||||
impl<T> AuthorizerInstrumentation<T> {
|
||||
/// Record permissions-check duration metrics, broken down by result.
|
||||
pub fn new(registry: &Registry, inner: T) -> Self {
|
||||
let metric: Metric<DurationHistogram> =
|
||||
registry.register_metric(AUTHZ_DURATION_METRIC, "duration of authz permissions check");
|
||||
|
||||
let ioxauth_rpc_duration_success_unauth =
|
||||
metric.recorder(&[("result", "success"), ("auth_state", "unauthorised")]);
|
||||
let ioxauth_rpc_duration_success_auth =
|
||||
metric.recorder(&[("result", "success"), ("auth_state", "authorised")]);
|
||||
let ioxauth_rpc_duration_error =
|
||||
metric.recorder(&[("result", "error"), ("auth_state", "unauthorised")]);
|
||||
|
||||
Self {
|
||||
inner,
|
||||
time_provider: Default::default(),
|
||||
ioxauth_rpc_duration_success_unauth,
|
||||
ioxauth_rpc_duration_success_auth,
|
||||
ioxauth_rpc_duration_error,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T> Authorizer for AuthorizerInstrumentation<T>
|
||||
where
|
||||
T: Authorizer,
|
||||
{
|
||||
async fn permissions(
|
||||
&self,
|
||||
token: Option<Vec<u8>>,
|
||||
perms: &[Permission],
|
||||
) -> Result<Vec<Permission>, Error> {
|
||||
let t = self.time_provider.now();
|
||||
let res = self.inner.permissions(token, perms).await;
|
||||
|
||||
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
|
||||
match &res {
|
||||
Ok(_) => self.ioxauth_rpc_duration_success_auth.record(delta),
|
||||
Err(Error::Forbidden) | Err(Error::InvalidToken) => {
|
||||
self.ioxauth_rpc_duration_success_unauth.record(delta)
|
||||
}
|
||||
Err(Error::Verification { .. }) => self.ioxauth_rpc_duration_error.record(delta),
|
||||
Err(Error::NoToken) => {} // rpc was not made
|
||||
};
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use metric::{assert_histogram, Attributes, Registry};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use super::*;
|
||||
use crate::{Action, Resource};
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct MockAuthorizerState {
|
||||
ret: VecDeque<Result<Vec<Permission>, Error>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct MockAuthorizer {
|
||||
state: Mutex<MockAuthorizerState>,
|
||||
}
|
||||
|
||||
impl MockAuthorizer {
|
||||
pub(crate) fn with_permissions_return(
|
||||
self,
|
||||
ret: impl Into<VecDeque<Result<Vec<Permission>, Error>>>,
|
||||
) -> Self {
|
||||
self.state.lock().ret = ret.into();
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Authorizer for MockAuthorizer {
|
||||
async fn permissions(
|
||||
&self,
|
||||
_token: Option<Vec<u8>>,
|
||||
_perms: &[Permission],
|
||||
) -> Result<Vec<Permission>, Error> {
|
||||
self.state
|
||||
.lock()
|
||||
.ret
|
||||
.pop_front()
|
||||
.expect("no mock sink value to return")
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! assert_metric_counts {
|
||||
(
|
||||
$metrics:ident,
|
||||
expected_success = $expected_success:expr,
|
||||
expected_rpc_success_unauth = $expected_rpc_success_unauth:expr,
|
||||
expected_rpc_failures = $expected_rpc_failures:expr,
|
||||
) => {
|
||||
let histogram = &$metrics
|
||||
.get_instrument::<Metric<DurationHistogram>>(AUTHZ_DURATION_METRIC)
|
||||
.expect("failed to read metric");
|
||||
|
||||
let success_labels =
|
||||
Attributes::from(&[("result", "success"), ("auth_state", "authorised")]);
|
||||
let histogram_success = &histogram
|
||||
.get_observer(&success_labels)
|
||||
.expect("failed to find metric with provided attributes")
|
||||
.fetch();
|
||||
|
||||
assert_histogram!(
|
||||
$metrics,
|
||||
DurationHistogram,
|
||||
AUTHZ_DURATION_METRIC,
|
||||
labels = success_labels,
|
||||
samples = $expected_success,
|
||||
sum = histogram_success.total,
|
||||
);
|
||||
|
||||
let success_unauth_labels =
|
||||
Attributes::from(&[("result", "success"), ("auth_state", "unauthorised")]);
|
||||
let histogram_success_unauth = &histogram
|
||||
.get_observer(&success_unauth_labels)
|
||||
.expect("failed to find metric with provided attributes")
|
||||
.fetch();
|
||||
|
||||
assert_histogram!(
|
||||
$metrics,
|
||||
DurationHistogram,
|
||||
AUTHZ_DURATION_METRIC,
|
||||
labels = success_unauth_labels,
|
||||
samples = $expected_rpc_success_unauth,
|
||||
sum = histogram_success_unauth.total,
|
||||
);
|
||||
|
||||
let rpc_error_labels =
|
||||
Attributes::from(&[("result", "error"), ("auth_state", "unauthorised")]);
|
||||
let histogram_rpc_error = &histogram
|
||||
.get_observer(&rpc_error_labels)
|
||||
.expect("failed to find metric with provided attributes")
|
||||
.fetch();
|
||||
|
||||
assert_histogram!(
|
||||
$metrics,
|
||||
DurationHistogram,
|
||||
AUTHZ_DURATION_METRIC,
|
||||
labels = rpc_error_labels,
|
||||
samples = $expected_rpc_failures,
|
||||
sum = histogram_rpc_error.total,
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! test_authorizer_metric {
|
||||
(
|
||||
$name:ident,
|
||||
rpc_response = $rpc_response:expr,
|
||||
will_pass_auth = $will_pass_auth:expr,
|
||||
expected_success_cnt = $expected_success_cnt:expr,
|
||||
expected_success_unauth_cnt = $expected_success_unauth_cnt:expr,
|
||||
expected_error_cnt = $expected_error_cnt:expr,
|
||||
) => {
|
||||
paste::paste! {
|
||||
#[tokio::test]
|
||||
async fn [<test_authorizer_metric_ $name>]() {
|
||||
let metrics = Registry::default();
|
||||
let decorated_authz = AuthorizerInstrumentation::new(
|
||||
&metrics,
|
||||
MockAuthorizer::default().with_permissions_return([$rpc_response])
|
||||
);
|
||||
|
||||
let token = "any".as_bytes().to_vec();
|
||||
let got = decorated_authz
|
||||
.permissions(Some(token), &[])
|
||||
.await;
|
||||
assert_eq!(got.is_ok(), $will_pass_auth);
|
||||
assert_metric_counts!(
|
||||
metrics,
|
||||
expected_success = $expected_success_cnt,
|
||||
expected_rpc_success_unauth = $expected_success_unauth_cnt,
|
||||
expected_rpc_failures = $expected_error_cnt,
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
test_authorizer_metric!(
|
||||
ok,
|
||||
rpc_response = Ok(vec![Permission::ResourceAction(
|
||||
Resource::Database("foo".to_string()),
|
||||
Action::Write,
|
||||
)]),
|
||||
will_pass_auth = true,
|
||||
expected_success_cnt = 1,
|
||||
expected_success_unauth_cnt = 0,
|
||||
expected_error_cnt = 0,
|
||||
);
|
||||
|
||||
test_authorizer_metric!(
|
||||
will_record_failure_if_rpc_fails,
|
||||
rpc_response = Err(Error::verification("test", "test error")),
|
||||
will_pass_auth = false,
|
||||
expected_success_cnt = 0,
|
||||
expected_success_unauth_cnt = 0,
|
||||
expected_error_cnt = 1,
|
||||
);
|
||||
|
||||
test_authorizer_metric!(
|
||||
will_record_success_if_rpc_pass_but_auth_fails,
|
||||
rpc_response = Err(Error::InvalidToken),
|
||||
will_pass_auth = false,
|
||||
expected_success_cnt = 0,
|
||||
expected_success_unauth_cnt = 1,
|
||||
expected_error_cnt = 0,
|
||||
);
|
||||
}
|
|
@ -24,6 +24,8 @@ mod authorizer;
|
|||
pub use authorizer::Authorizer;
|
||||
mod iox_authorizer;
|
||||
pub use iox_authorizer::{Error, IoxAuthorizer};
|
||||
mod instrumentation;
|
||||
pub use instrumentation::AuthorizerInstrumentation;
|
||||
mod permission;
|
||||
pub use permission::{Action, Permission, Resource};
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::{
|
|||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use authz::{Authorizer, IoxAuthorizer};
|
||||
use authz::{Authorizer, AuthorizerInstrumentation, IoxAuthorizer};
|
||||
use clap_blocks::router::RouterConfig;
|
||||
use data_types::{DefaultPartitionTemplate, NamespaceName};
|
||||
use hashbrown::HashMap;
|
||||
|
@ -316,7 +316,9 @@ pub async fn create_router_server_type(
|
|||
) {
|
||||
(true, Some(addr)) => {
|
||||
let authz = IoxAuthorizer::connect_lazy(addr.clone())
|
||||
.map(|c| Arc::new(c) as Arc<dyn Authorizer>)
|
||||
.map(|c| {
|
||||
Arc::new(AuthorizerInstrumentation::new(&metrics, c)) as Arc<dyn Authorizer>
|
||||
})
|
||||
.map_err(|source| Error::AuthzConfig {
|
||||
source,
|
||||
addr: addr.clone(),
|
||||
|
|
|
@ -67,9 +67,11 @@ pub mod mock {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use assert_matches::assert_matches;
|
||||
use authz::AuthorizerInstrumentation;
|
||||
use base64::{prelude::BASE64_STANDARD, Engine};
|
||||
use data_types::NamespaceId;
|
||||
use hyper::header::HeaderValue;
|
||||
use metric::{Attributes, DurationHistogram, Metric};
|
||||
|
||||
use super::{mock::*, *};
|
||||
use crate::{
|
||||
|
@ -169,6 +171,78 @@ mod tests {
|
|||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_authz_metric() {
|
||||
static NAMESPACE_NAME: &str = "test";
|
||||
let mock_namespace_resolver =
|
||||
MockNamespaceResolver::default().with_mapping(NAMESPACE_NAME, NamespaceId::new(42));
|
||||
let dml_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
|
||||
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let decorator = Arc::new(AuthorizerInstrumentation::new(
|
||||
&metrics,
|
||||
MockAuthorizer::default(),
|
||||
));
|
||||
|
||||
let delegate = HttpDelegate::new(
|
||||
MAX_BYTES,
|
||||
1,
|
||||
mock_namespace_resolver,
|
||||
Arc::clone(&dml_handler),
|
||||
&metrics,
|
||||
Box::new(SingleTenantRequestUnifier::new(decorator)),
|
||||
);
|
||||
|
||||
let request = Request::builder()
|
||||
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
|
||||
.method("POST")
|
||||
.extension(AuthorizationHeaderExtension::new(Some(
|
||||
HeaderValue::from_str(format!("Token {MOCK_AUTH_VALID_TOKEN}").as_str()).unwrap(),
|
||||
)))
|
||||
.body(Body::from("platanos,tag1=A,tag2=B val=42i 123456"))
|
||||
.unwrap();
|
||||
let got = delegate.route(request).await;
|
||||
assert!(got.is_ok());
|
||||
|
||||
let request = Request::builder()
|
||||
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
|
||||
.method("POST")
|
||||
.extension(AuthorizationHeaderExtension::new(Some(
|
||||
HeaderValue::from_str(format!("Token {MOCK_AUTH_INVALID_TOKEN}").as_str()).unwrap(),
|
||||
)))
|
||||
.body(Body::from("platanos,tag1=A,tag2=B val=42i 123456"))
|
||||
.unwrap();
|
||||
let got = delegate.route(request).await;
|
||||
assert!(got.is_err());
|
||||
|
||||
let histogram = &metrics
|
||||
.get_instrument::<Metric<DurationHistogram>>("authz_permission_check_duration")
|
||||
.expect("failed to read metric");
|
||||
|
||||
assert_eq!(
|
||||
histogram
|
||||
.get_observer(&Attributes::from(&[
|
||||
("result", "success"),
|
||||
("auth_state", "authorised")
|
||||
]))
|
||||
.expect("failed to get observer")
|
||||
.fetch()
|
||||
.sample_count(),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
histogram
|
||||
.get_observer(&Attributes::from(&[
|
||||
("result", "error"),
|
||||
("auth_state", "unauthorised")
|
||||
]))
|
||||
.expect("failed to get observer")
|
||||
.fetch()
|
||||
.sample_count(),
|
||||
1
|
||||
);
|
||||
}
|
||||
|
||||
macro_rules! test_authorize {
|
||||
(
|
||||
$name:ident,
|
||||
|
|
Loading…
Reference in New Issue