feat: porting token work from enterprise (#26239)

* feat: generate persistable admin token

- this commit allows admin token creation using `influxdb3 create token
  --admin` and also allows regeneration of admin token by `influxdb3
  create token --admin --regenerate`
- `influxdb3_authz` crate hosts all low level token types and behaviour
- catalog log and snapshot types updated to use the token repo
- tests that relied on auth have been updated to use the new token
  generation mechanism and new admin token generation/regeneration tests
  have been added

* feat: list admin tokens

- allows listing admin tokens
- uses _internal db for token system table
- mostly test fixes due to _internal db
hiltontj/catalog-metrics
praveen-influx 2025-04-09 16:31:59 +01:00 committed by GitHub
parent 11e4433852
commit 1983818e36
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
45 changed files with 2105 additions and 339 deletions

27
Cargo.lock generated
View File

@ -2759,6 +2759,7 @@ dependencies = [
"hex",
"humantime",
"hyper 0.14.32",
"influxdb3_authz",
"influxdb3_cache",
"influxdb3_catalog",
"influxdb3_clap_blocks",
@ -2809,6 +2810,23 @@ dependencies = [
"uuid",
]
[[package]]
name = "influxdb3_authz"
version = "3.0.0-beta.4"
dependencies = [
"async-trait",
"authz",
"hashbrown 0.15.2",
"influxdb3_id",
"iox_time",
"observability_deps",
"serde",
"sha2",
"test-log",
"thiserror 1.0.69",
"tokio",
]
[[package]]
name = "influxdb3_cache"
version = "3.0.0-beta.4"
@ -2850,6 +2868,7 @@ version = "3.0.0-beta.4"
dependencies = [
"anyhow",
"arrow",
"base64 0.21.7",
"bimap",
"bitcode",
"byteorder",
@ -2860,9 +2879,11 @@ dependencies = [
"cron",
"futures",
"hashbrown 0.15.2",
"hex",
"humantime",
"indexmap 2.7.0",
"influxdb-line-protocol",
"influxdb3_authz",
"influxdb3_id",
"influxdb3_shutdown",
"influxdb3_test_helpers",
@ -2873,10 +2894,12 @@ dependencies = [
"observability_deps",
"parking_lot",
"pretty_assertions",
"rand",
"schema",
"serde",
"serde_json",
"serde_with",
"sha2",
"test-log",
"test_helpers",
"thiserror 1.0.69",
@ -3099,6 +3122,7 @@ dependencies = [
"humantime",
"hyper 0.14.32",
"influxdb-line-protocol",
"influxdb3_authz",
"influxdb3_cache",
"influxdb3_catalog",
"influxdb3_client",
@ -3228,8 +3252,11 @@ name = "influxdb3_types"
version = "3.0.0-beta.4"
dependencies = [
"anyhow",
"chrono",
"hashbrown 0.15.2",
"hex",
"hyper 0.14.32",
"influxdb3_authz",
"influxdb3_cache",
"influxdb3_catalog",
"iox_http",

View File

@ -2,6 +2,7 @@
# In alphabetical order
members = [
"influxdb3",
"influxdb3_authz",
"influxdb3_cache",
"influxdb3_catalog",
"influxdb3_clap_blocks",

View File

@ -23,6 +23,7 @@ trace_exporters.workspace = true
trogging.workspace = true
# Local Crates
influxdb3_authz = { path = "../influxdb3_authz" }
influxdb3_cache = { path = "../influxdb3_cache" }
influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" }

View File

@ -1,6 +1,6 @@
pub mod token;
use crate::commands::common::{DataType, InfluxDb3Config, SeparatedKeyValue, parse_key_val};
use base64::Engine as _;
use base64::engine::general_purpose::URL_SAFE_NO_PAD as B64;
use hashbrown::HashMap;
use humantime::Duration;
use influxdb3_catalog::log::ErrorBehavior;
@ -9,15 +9,14 @@ use influxdb3_catalog::log::TriggerSpecificationDefinition;
use influxdb3_client::Client;
use influxdb3_types::http::LastCacheSize;
use influxdb3_types::http::LastCacheTtl;
use rand::RngCore;
use rand::rngs::OsRng;
use secrecy::ExposeSecret;
use secrecy::Secret;
use sha2::Digest;
use sha2::Sha512;
use std::error::Error;
use std::num::NonZeroUsize;
use std::str;
use token::AdminTokenConfig;
use token::TokenCommands;
use token::handle_token_creation;
use url::Url;
#[derive(Debug, clap::Parser)]
@ -28,7 +27,7 @@ pub struct Config {
impl Config {
fn get_client(&self) -> Result<Client, Box<dyn Error>> {
match &self.cmd {
let (host_url, auth_token) = match &self.cmd {
SubCommand::Database(DatabaseConfig {
host_url,
auth_token,
@ -69,17 +68,24 @@ impl Config {
..
},
..
}) => {
let mut client = Client::new(host_url.clone())?;
if let Some(token) = &auth_token {
client = client.with_auth_token(token.expose_secret());
}
Ok(client)
}) => (host_url, auth_token),
SubCommand::Token(token_commands) => {
let (host_url, auth_token) = match &token_commands.commands {
token::TokenSubCommand::Admin(AdminTokenConfig {
host_url,
auth_token,
..
}) => (host_url, auth_token),
};
(host_url, auth_token)
}
// We don't need a client for this, so we're just creating a
// placeholder client with an unusable URL
SubCommand::Token => Ok(Client::new("http://recall.invalid")?),
};
let mut client = Client::new(host_url.clone())?;
if let Some(token) = &auth_token {
client = client.with_auth_token(token.expose_secret());
}
Ok(client)
}
}
@ -96,7 +102,7 @@ pub enum SubCommand {
/// Create a new table in a database
Table(TableConfig),
/// Create a new auth token
Token,
Token(TokenCommands),
/// Create a new trigger for the processing engine that executes a plugin on either WAL rows, scheduled tasks, or requests to the serve at `/api/v3/engine/<path>`
Trigger(TriggerConfig),
}
@ -334,25 +340,27 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
&database_name, &table_name
);
}
SubCommand::Token => {
let token = {
let mut token = String::from("apiv3_");
let mut key = [0u8; 64];
OsRng.fill_bytes(&mut key);
token.push_str(&B64.encode(key));
token
};
println!(
"\
Token: {token}\n\
Hashed Token: {hashed}\n\n\
Start the server with the Hashed Token provided as the `--bearer-token` argument:\n\n
`influxdb3 serve --bearer-token {hashed} --node-id <NODE_ID> [OPTIONS]`\n\n\
HTTP requests require the following header: \"Authorization: Bearer {token}\"\n\
This will grant you access to every HTTP endpoint or deny it otherwise.
",
hashed = hex::encode(&Sha512::digest(&token)[..])
);
SubCommand::Token(token_commands) => {
match handle_token_creation(client, token_commands).await {
Ok(response) => {
println!(
"\
Token: {token}\n\
Hashed Token: {hashed}\n\n\
Start the server with the Hashed Token provided as the `--bearer-token` argument:\n\n
`influxdb3 serve --bearer-token {hashed} --node-id <NODE_ID> [OPTIONS]`\n\n\
HTTP requests require the following header: \"Authorization: Bearer {token}\"\n\
This will grant you access to every HTTP endpoint or deny it otherwise.
",
token = response.token,
hashed = response.hash,
);
}
Err(err) => {
println!("Failed to create token, error: {:?}", err);
}
}
}
SubCommand::Trigger(TriggerConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },

View File

@ -0,0 +1,68 @@
use std::error::Error;
use clap::Parser;
use influxdb3_client::Client;
use influxdb3_types::http::CreateTokenWithPermissionsResponse;
use secrecy::Secret;
use url::Url;
#[derive(Debug, clap::Parser)]
pub struct TokenCommands {
#[clap(subcommand)]
pub commands: TokenSubCommand,
}
#[derive(Debug, clap::Subcommand)]
pub enum TokenSubCommand {
#[clap(name = "--admin")]
Admin(AdminTokenConfig),
}
#[derive(Debug, Parser)]
pub struct AdminTokenConfig {
/// The host URL of the running InfluxDB 3 Enterprise server
#[clap(
short = 'H',
long = "host",
env = "INFLUXDB3_HOST_URL",
default_value = "http://127.0.0.1:8181"
)]
pub host_url: Url,
/// Admin token will be regenerated when this is set
#[clap(long, default_value = "false")]
pub regenerate: bool,
/// The token for authentication with the InfluxDB 3 Enterprise server
#[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")]
pub auth_token: Option<Secret<String>>,
}
pub(crate) async fn handle_token_creation(
client: Client,
config: TokenCommands,
) -> Result<CreateTokenWithPermissionsResponse, Box<dyn Error>> {
match config.commands {
TokenSubCommand::Admin(admin_token_config) => {
handle_admin_token_creation(client, admin_token_config).await
}
}
}
pub(crate) async fn handle_admin_token_creation(
client: Client,
config: AdminTokenConfig,
) -> Result<CreateTokenWithPermissionsResponse, Box<dyn Error>> {
let json_body = if config.regenerate {
client
.api_v3_configure_regenerate_admin_token()
.await?
.expect("token creation to return full token info")
} else {
client
.api_v3_configure_create_admin_token()
.await?
.expect("token creation to return full token info")
};
Ok(json_body)
}

View File

@ -3,6 +3,7 @@
use anyhow::{Context, bail};
use datafusion_util::config::register_iox_object_store;
use futures::{FutureExt, future::FusedFuture, pin_mut};
use influxdb3_authz::TokenAuthenticator;
use influxdb3_cache::{
distinct_cache::DistinctCacheProvider,
last_cache::{self, LastCacheProvider},
@ -28,7 +29,6 @@ use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager;
use influxdb3_processing_engine::virtualenv::find_python;
use influxdb3_server::{
CommonServerState,
auth::AllOrNothingAuthorizer,
builder::ServerBuilder,
query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl},
serve,
@ -655,14 +655,19 @@ pub async fn command(config: Config) -> Result<()> {
.max_request_size(config.max_http_request_size)
.write_buffer(write_buffer)
.query_executor(query_executor)
.time_provider(time_provider)
.time_provider(Arc::clone(&time_provider) as _)
.persister(persister)
.tcp_listener(listener)
.processing_engine(processing_engine);
let server = if let Some(token) = config.bearer_token.map(hex::decode).transpose()? {
// We can ignore the token passed in for now, as the token is supposed to be in catalog
let server = if let Some(_token) = config.bearer_token {
let authentication_provider = Arc::new(TokenAuthenticator::new(
Arc::clone(&catalog) as _,
Arc::clone(&time_provider) as _,
));
builder
.authorizer(Arc::new(AllOrNothingAuthorizer::new(token)))
.authorizer(authentication_provider as _)
.build()
.await
} else {

View File

@ -19,10 +19,33 @@ pub enum SubCommand {
/// List databases
Databases(DatabaseConfig),
/// List tokens
Tokens(ShowTokensConfig),
/// Display system table data.
System(SystemConfig),
}
#[derive(Debug, Parser)]
pub struct ShowTokensConfig {
/// The host URL of the running InfluxDB 3 Enterprise server
#[clap(
short = 'H',
long = "host",
env = "INFLUXDB3_HOST_URL",
default_value = "http://127.0.0.1:8181"
)]
host_url: Url,
/// The token for authentication with the InfluxDB 3 Enterprise server
#[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")]
auth_token: Option<Secret<String>>,
/// The format in which to output the list of databases
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,
}
#[derive(Debug, Parser)]
pub struct DatabaseConfig {
/// The host URL of the running InfluxDB 3 Core server
@ -71,6 +94,19 @@ pub(crate) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
println!("{}", std::str::from_utf8(&resp_bytes)?);
}
SubCommand::System(cfg) => system::command(cfg).await?,
SubCommand::Tokens(show_tokens_config) => {
let mut client = influxdb3_client::Client::new(show_tokens_config.host_url.clone())?;
if let Some(t) = show_tokens_config.auth_token {
client = client.with_auth_token(t.expose_secret());
}
let resp_bytes = client
.api_v3_query_sql("_internal", "select * from system.tokens")
.format(show_tokens_config.output_format.into())
.send()
.await?;
println!("{}", std::str::from_utf8(&resp_bytes)?);
}
}
Ok(())

View File

@ -1,19 +1,14 @@
mod api;
use crate::server::{ConfigProvider, TestServer};
use crate::server::{ConfigProvider, TestServer, parse_token};
use assert_cmd::Command as AssertCmd;
use assert_cmd::cargo::CommandCargoExt;
use observability_deps::tracing::debug;
use pretty_assertions::assert_eq;
use serde_json::{Value, json};
use std::fs::File;
use std::path::PathBuf;
use std::time::Duration;
use std::{
fs,
io::Write,
process::{Command, Stdio},
};
use std::{fs, io::Write};
use test_helpers::tempfile::NamedTempFile;
use test_helpers::tempfile::TempDir;
use test_helpers::{assert_contains, assert_not_contains};
@ -186,6 +181,7 @@ async fn test_show_databases() {
+---------------+\n\
| iox::database |\n\
+---------------+\n\
| _internal |\n\
| bar |\n\
| foo |\n\
+---------------+\
@ -196,7 +192,7 @@ async fn test_show_databases() {
// Show databases with JSON format
let output = server.show_databases().with_format("json").run().unwrap();
assert_eq!(
r#"[{"iox::database":"bar"},{"iox::database":"foo"}]"#,
r#"[{"iox::database":"_internal"},{"iox::database":"bar"},{"iox::database":"foo"}]"#,
output
);
@ -205,6 +201,7 @@ async fn test_show_databases() {
assert_eq!(
"\
iox::database\n\
_internal\n\
bar\n\
foo\
",
@ -215,6 +212,7 @@ async fn test_show_databases() {
let output = server.show_databases().with_format("jsonl").run().unwrap();
assert_eq!(
"\
{\"iox::database\":\"_internal\"}\n\
{\"iox::database\":\"bar\"}\n\
{\"iox::database\":\"foo\"}\
",
@ -231,6 +229,7 @@ async fn test_show_databases() {
+---------------+\n\
| iox::database |\n\
+---------------+\n\
| _internal |\n\
| bar |\n\
+---------------+",
output
@ -242,30 +241,6 @@ async fn test_show_databases() {
assert_contains!(output, "foo-");
}
#[test_log::test(tokio::test)]
async fn test_show_empty_database() {
let server = TestServer::spawn().await;
// Show empty database list with default format (pretty)
let output = server.show_databases().run().unwrap();
assert_eq!(
"\
+---------------+\n\
| iox::database |\n\
+---------------+\n\
+---------------+",
output
);
// Show empty database list with JSON format
let output = server.show_databases().with_format("json").run().unwrap();
assert_eq!(output, "[]");
// Show empty database list with JSONL format
let output = server.show_databases().with_format("jsonl").run().unwrap();
assert_eq!(output, "");
}
#[test_log::test(tokio::test)]
async fn test_create_database() {
let server = TestServer::spawn().await;
@ -740,6 +715,7 @@ async fn test_database_create_persists() {
r#"+---------------+
| iox::database |
+---------------+
| _internal |
| foo |
+---------------+"#,
result
@ -875,20 +851,6 @@ def process_writes(influxdb3_local, table_batches, args=None):
assert_contains!(&result, "Trigger test_trigger created successfully");
}
#[test]
fn test_create_token() {
let process = Command::cargo_bin("influxdb3")
.unwrap()
.args(["create", "token"])
.stdout(Stdio::piped())
.output()
.unwrap();
let result: String = String::from_utf8_lossy(&process.stdout).trim().into();
assert_contains!(
&result,
"This will grant you access to every HTTP endpoint or deny it otherwise"
);
}
#[test_log::test(tokio::test)]
async fn test_show_system() {
let server = TestServer::configure().spawn().await;
@ -1971,7 +1933,7 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_
yield "Line 1\n"
yield "Line 2\n"
yield "Line 3\n"
return generate_content()
"#;
let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code);
@ -2020,17 +1982,17 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_
self.response = response
self.status_code = status
self.headers = headers or {}
def get_data(self):
return self.response
def __flask_response__(self):
return True
# Return a Flask Response object
response = FlaskResponse(
"Custom Flask Response",
status=202,
"Custom Flask Response",
status=202,
headers={"Content-Type": "text/custom", "X-Generated-By": "FlaskResponse"}
)
return response
@ -2902,3 +2864,79 @@ async fn test_wal_overwritten() {
);
assert!(!p2.is_stopped(), "p2 should not be stopped");
}
#[test_log::test(tokio::test)]
async fn test_create_admin_token() {
let server = TestServer::spawn().await;
let args = &[];
let result = server
.run(vec!["create", "token", "--admin"], args)
.unwrap();
println!("{:?}", result);
assert_contains!(
&result,
"This will grant you access to every HTTP endpoint or deny it otherwise"
);
}
#[test_log::test(tokio::test)]
async fn test_create_admin_token_allowed_once() {
let server = TestServer::spawn().await;
let args = &[];
let result = server
.run(vec!["create", "token", "--admin"], args)
.unwrap();
assert_contains!(
&result,
"This will grant you access to every HTTP endpoint or deny it otherwise"
);
let result = server
.run(vec!["create", "token", "--admin"], args)
.unwrap();
assert_contains!(
&result,
"Failed to create token, error: ApiError { code: 500, message: \"token name already exists, _admin\" }"
);
}
#[test_log::test(tokio::test)]
async fn test_regenerate_admin_token() {
// when created with_auth, TestServer spins up server and generates admin token.
let mut server = TestServer::configure().with_auth().spawn().await;
let args = &[];
let result = server
.run(vec!["create", "token", "--admin"], args)
.unwrap();
// already has admin token, so it cannot be created again
assert_contains!(
&result,
"Failed to create token, error: ApiError { code: 500, message: \"token name already exists, _admin\" }"
);
// regenerating token is allowed
let result = server
.run(vec!["create", "token", "--admin"], &["--regenerate"])
.unwrap();
assert_contains!(
&result,
"This will grant you access to every HTTP endpoint or deny it otherwise"
);
let old_token = server.token().expect("admin token to be present");
let new_token = parse_token(result);
assert!(old_token != &new_token);
// old token cannot access
let res = server
.create_database("sample_db")
.run()
.err()
.unwrap()
.to_string();
assert_contains!(&res, "401 Unauthorized");
// new token should allow
server.set_token(Some(new_token));
let res = server.create_database("sample_db").run().unwrap();
assert_contains!(&res, "Database \"sample_db\" created successfully");
}

View File

@ -6,14 +6,12 @@ use reqwest::StatusCode;
use crate::server::{ConfigProvider, TestServer, collect_stream};
#[tokio::test]
async fn auth() {
const HASHED_TOKEN: &str = "5315f0c4714537843face80cca8c18e27ce88e31e9be7a5232dc4dc8444f27c0227a9bd64831d3ab58f652bd0262dd8558dd08870ac9e5c650972ce9e4259439";
const TOKEN: &str = "apiv3_mp75KQAhbqv0GeQXk8MPuZ3ztaLEaR5JzS8iifk1FwuroSVyXXyrJK1c4gEr1kHkmbgzDV-j3MvQpaIMVJBAiA";
let server = TestServer::configure()
.with_auth_token(HASHED_TOKEN, TOKEN)
.spawn()
.await;
async fn auth_http() {
let server = TestServer::configure().with_auth().spawn().await;
let token = server
.auth_token
.clone()
.expect("admin token to have been present");
let client = reqwest::Client::new();
let base = server.client_addr();
@ -48,7 +46,7 @@ async fn auth() {
.post(&write_lp_url)
.query(&write_lp_params)
.body("cpu,host=a val=1i 2998574937")
.bearer_auth(TOKEN)
.bearer_auth(token.clone())
.send()
.await
.unwrap()
@ -61,7 +59,7 @@ async fn auth() {
.query(&write_lp_params)
.body("cpu,host=a val=1i 2998574937")
// support both Bearer and Token auth schemes
.header("Authorization", format!("Token {TOKEN}"))
.header("Authorization", format!("Token {token}"))
.send()
.await
.unwrap()
@ -72,7 +70,7 @@ async fn auth() {
client
.get(&query_sql_url)
.query(&query_sql_params)
.bearer_auth(TOKEN)
.bearer_auth(&token)
.send()
.await
.unwrap()
@ -85,7 +83,7 @@ async fn auth() {
client
.get(&query_sql_url)
.query(&query_sql_params)
.header("Authorization", format!("Bearer {TOKEN} whee"))
.header("Authorization", format!("Bearer {token} whee"))
.send()
.await
.unwrap()
@ -96,7 +94,7 @@ async fn auth() {
client
.get(&query_sql_url)
.query(&query_sql_params)
.header("Authorization", format!("bearer {TOKEN}"))
.header("Authorization", format!("bearer {token}"))
.send()
.await
.unwrap()
@ -118,7 +116,7 @@ async fn auth() {
client
.get(&query_sql_url)
.query(&query_sql_params)
.header("auth", format!("Bearer {TOKEN}"))
.header("auth", format!("Bearer {token}"))
.send()
.await
.unwrap()
@ -129,14 +127,11 @@ async fn auth() {
#[test_log::test(tokio::test)]
async fn auth_grpc() {
const HASHED_TOKEN: &str = "5315f0c4714537843face80cca8c18e27ce88e31e9be7a5232dc4dc8444f27c0227a9bd64831d3ab58f652bd0262dd8558dd08870ac9e5c650972ce9e4259439";
const TOKEN: &str = "apiv3_mp75KQAhbqv0GeQXk8MPuZ3ztaLEaR5JzS8iifk1FwuroSVyXXyrJK1c4gEr1kHkmbgzDV-j3MvQpaIMVJBAiA";
let server = TestServer::configure()
.with_auth_token(HASHED_TOKEN, TOKEN)
.spawn()
.await;
let server = TestServer::configure().with_auth().spawn().await;
let token = server
.auth_token
.clone()
.expect("admin token to have been present");
// Write some data to the server, this will be authorized through the HTTP API
server
.write_lp_to_db(
@ -156,7 +151,7 @@ async fn auth_grpc() {
// Set the authorization header on the client:
client
.add_header(header, &format!("Bearer {TOKEN}"))
.add_header(header, &format!("Bearer {token}"))
.unwrap();
// Make the query again, this time it should work:
@ -194,7 +189,7 @@ async fn auth_grpc() {
{
let mut client = server.flight_sql_client("foo").await;
client
.add_header("authorization", &format!("bearer {TOKEN}"))
.add_header("authorization", &format!("bearer {token}"))
.unwrap();
let error = client.query("SELECT * FROM cpu").await.unwrap_err();
assert!(matches!(error, FlightError::Tonic(s) if s.code() == tonic::Code::Unauthenticated));
@ -207,16 +202,14 @@ async fn auth_grpc() {
.add_header("authorization", "Bearer invalid-token")
.unwrap();
let error = client.query("SELECT * FROM cpu").await.unwrap_err();
assert!(
matches!(error, FlightError::Tonic(s) if s.code() == tonic::Code::PermissionDenied)
);
assert!(matches!(error, FlightError::Tonic(s) if s.code() == tonic::Code::Unauthenticated));
}
// Misspelled header key
{
let mut client = server.flight_sql_client("foo").await;
client
.add_header("auth", &format!("Bearer {TOKEN}"))
.add_header("auth", &format!("Bearer {token}"))
.unwrap();
let error = client.query("SELECT * FROM cpu").await.unwrap_err();
assert!(matches!(error, FlightError::Tonic(s) if s.code() == tonic::Code::Unauthenticated));
@ -225,13 +218,11 @@ async fn auth_grpc() {
#[tokio::test]
async fn v1_password_parameter() {
const HASHED_TOKEN: &str = "5315f0c4714537843face80cca8c18e27ce88e31e9be7a5232dc4dc8444f27c0227a9bd64831d3ab58f652bd0262dd8558dd08870ac9e5c650972ce9e4259439";
const TOKEN: &str = "apiv3_mp75KQAhbqv0GeQXk8MPuZ3ztaLEaR5JzS8iifk1FwuroSVyXXyrJK1c4gEr1kHkmbgzDV-j3MvQpaIMVJBAiA";
let server = TestServer::configure()
.with_auth_token(HASHED_TOKEN, TOKEN)
.spawn()
.await;
let server = TestServer::configure().with_auth().spawn().await;
let token = server
.auth_token
.clone()
.expect("admin token to have been present");
let client = reqwest::Client::new();
let query_url = format!("{base}/query", base = server.client_addr());
@ -288,7 +279,11 @@ async fn v1_password_parameter() {
assert_eq!(
client
.get(&query_url)
.query(&[("p", TOKEN), ("q", "SELECT * FROM cpu"), ("db", "foo")])
.query(&[
("p", token.as_str()),
("q", "SELECT * FROM cpu"),
("db", "foo")
])
.send()
.await
.expect("send request")
@ -300,7 +295,7 @@ async fn v1_password_parameter() {
client
.get(&query_url)
.query(&[("q", "SELECT * FROM cpu"), ("db", "foo")])
.bearer_auth(TOKEN)
.bearer_auth(&token)
.send()
.await
.expect("send request")
@ -314,7 +309,7 @@ async fn v1_password_parameter() {
assert_eq!(
client
.post(&write_url)
.query(&[("p", TOKEN), ("db", "foo")])
.query(&[("p", token.as_str()), ("db", "foo")])
.body(valid_write_body)
.send()
.await
@ -326,7 +321,7 @@ async fn v1_password_parameter() {
assert_eq!(
client
.post(&write_url)
.bearer_auth(TOKEN)
.bearer_auth(&token)
.query(&[("db", "foo")])
.body(valid_write_body)
.send()

View File

@ -782,9 +782,8 @@ async fn api_v3_configure_db_delete() {
.json::<Value>()
.await
.unwrap();
debug!(result = ?result, ">> RESULT");
assert_eq!(
json!([{ "deleted": false, "iox::database": "foo" } ]),
json!([{ "deleted": false, "iox::database": "_internal" }, { "deleted": false, "iox::database": "foo" } ]),
result
);
@ -802,9 +801,8 @@ async fn api_v3_configure_db_delete() {
.json::<Value>()
.await
.unwrap();
debug!(result = ?result, ">> RESULT");
let array_result = result.as_array().unwrap();
assert_eq!(1, array_result.len());
assert_eq!(2, array_result.len());
let first_db = array_result.first().unwrap();
assert_contains!(
first_db
@ -814,6 +812,17 @@ async fn api_v3_configure_db_delete() {
.unwrap()
.as_str()
.unwrap(),
"_internal"
);
let second_db = array_result.get(1).unwrap();
assert_contains!(
second_db
.as_object()
.unwrap()
.get("iox::database")
.unwrap()
.as_str()
.unwrap(),
"foo-"
);
@ -832,15 +841,25 @@ async fn api_v3_configure_db_delete() {
.json::<Value>()
.await
.unwrap();
debug!(result = ?result, ">> RESULT");
let array_result = result.as_array().unwrap();
// check there are 2 dbs now, foo and foo-*
assert_eq!(2, array_result.len());
assert_eq!(3, array_result.len());
let first_db = array_result.first().unwrap();
let second_db = array_result.get(1).unwrap();
let third_db = array_result.get(2).unwrap();
assert_contains!(
first_db
.as_object()
.unwrap()
.get("iox::database")
.unwrap()
.as_str()
.unwrap(),
"_internal"
);
assert_eq!(
"foo",
first_db
second_db
.as_object()
.unwrap()
.get("iox::database")
@ -849,7 +868,7 @@ async fn api_v3_configure_db_delete() {
.unwrap(),
);
assert_contains!(
second_db
third_db
.as_object()
.unwrap()
.get("iox::database")

View File

@ -33,6 +33,9 @@ pub trait ConfigProvider {
/// Get the auth token from this config if it was set
fn auth_token(&self) -> Option<&str>;
/// Get if auth is enabled
fn auth_enabled(&self) -> bool;
/// Spawn a new [`TestServer`] with this configuration
///
/// This will run the `influxdb3 serve` command and bind its HTTP address to a random port
@ -49,6 +52,7 @@ pub trait ConfigProvider {
#[derive(Debug, Default)]
pub struct TestConfig {
auth_token: Option<(String, String)>,
auth: bool,
node_id: Option<String>,
plugin_dir: Option<String>,
virtual_env_dir: Option<String>,
@ -68,6 +72,12 @@ impl TestConfig {
self
}
/// Set the auth token for this [`TestServer`]
pub fn with_auth(mut self) -> Self {
self.auth = true;
self
}
/// Set a host identifier prefix on the spawned [`TestServer`]
pub fn with_node_id<S: Into<String>>(mut self, node_id: S) -> Self {
self.node_id = Some(node_id.into());
@ -102,8 +112,13 @@ impl ConfigProvider for TestConfig {
fn as_args(&self) -> Vec<String> {
let mut args = vec![];
if let Some((token, _)) = &self.auth_token {
// TODO: --bearer-token will be deprecated soon
args.append(&mut vec!["--bearer-token".to_string(), token.to_owned()]);
}
if self.auth {
// TODO: --bearer-token will be deprecated soon
args.append(&mut vec!["--bearer-token".to_string(), "foo".to_string()]);
}
if let Some(plugin_dir) = &self.plugin_dir {
args.append(&mut vec!["--plugin-dir".to_string(), plugin_dir.to_owned()]);
}
@ -144,6 +159,10 @@ impl ConfigProvider for TestConfig {
fn auth_token(&self) -> Option<&str> {
self.auth_token.as_ref().map(|(_, t)| t.as_str())
}
fn auth_enabled(&self) -> bool {
self.auth
}
}
/// A running instance of the `influxdb3 serve` process
@ -239,6 +258,17 @@ impl TestServer {
};
server.wait_until_ready().await;
let (mut server, token) = if config.auth_enabled() {
let result = server.run(vec!["create", "token", "--admin"], &[]).unwrap();
let token = parse_token(result);
(server, Some(token))
} else {
(server, None)
};
server.auth_token = token;
server
}
@ -252,6 +282,11 @@ impl TestServer {
self.auth_token.as_ref()
}
/// Set the token for the server
pub fn set_token(&mut self, token: Option<String>) {
self.auth_token = token;
}
/// Get a [`FlightSqlClient`] for making requests to the running service over gRPC
pub async fn flight_sql_client(&self, database: &str) -> FlightSqlClient {
let channel = tonic::transport::Channel::from_shared(self.client_addr())
@ -486,6 +521,15 @@ pub async fn write_lp_to_db(
.await
}
pub fn parse_token(result: String) -> String {
let all_lines: Vec<&str> = result.split('\n').collect();
let token = all_lines
.first()
.expect("token line to be present")
.replace("Token: ", "");
token
}
#[allow(dead_code)]
pub async fn collect_stream(stream: FlightRecordBatchStream) -> Vec<RecordBatch> {
stream

View File

@ -356,6 +356,7 @@ async fn api_v3_query_influxql() {
+---------------+---------+\n\
| iox::database | deleted |\n\
+---------------+---------+\n\
| _internal | false |\n\
| bar | false |\n\
| foo | false |\n\
+---------------+---------+",
@ -366,6 +367,7 @@ async fn api_v3_query_influxql() {
expected: "+---------------+---------+----------+\n\
| iox::database | name | duration |\n\
+---------------+---------+----------+\n\
| _internal | autogen | |\n\
| bar | autogen | |\n\
| foo | autogen | |\n\
+---------------+---------+----------+",
@ -665,6 +667,10 @@ async fn api_v3_query_json_format() {
database: None,
query: "SHOW DATABASES",
expected: json!([
{
"deleted": false,
"iox::database": "_internal",
},
{
"deleted": false,
"iox::database": "foo",
@ -675,6 +681,10 @@ async fn api_v3_query_json_format() {
database: None,
query: "SHOW RETENTION POLICIES",
expected: json!([
{
"iox::database": "_internal",
"name": "autogen",
},
{
"iox::database": "foo",
"name": "autogen",
@ -771,12 +781,16 @@ async fn api_v3_query_jsonl_format() {
TestCase {
database: None,
query: "SHOW DATABASES",
expected: "{\"iox::database\":\"foo\",\"deleted\":false}\n".into(),
expected:
"{\"iox::database\":\"_internal\",\"deleted\":false}\n\
{\"iox::database\":\"foo\",\"deleted\":false}\n".into(),
},
TestCase {
database: None,
query: "SHOW RETENTION POLICIES",
expected: "{\"iox::database\":\"foo\",\"name\":\"autogen\"}\n".into(),
expected:
"{\"iox::database\":\"_internal\",\"name\":\"autogen\"}\n\
{\"iox::database\":\"foo\",\"name\":\"autogen\"}\n".into(),
},
];
for t in test_cases {

View File

@ -0,0 +1,26 @@
[package]
name = "influxdb3_authz"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
# core crates
authz.workspace = true
observability_deps.workspace = true
iox_time.workspace = true
# local deps
influxdb3_id = { path = "../influxdb3_id" }
# crates.io deps
async-trait.workspace = true
hashbrown.workspace = true
serde.workspace = true
sha2.workspace = true
thiserror.workspace = true
tokio.workspace = true
[dev-dependencies]
test-log.workspace = true

429
influxdb3_authz/src/lib.rs Normal file
View File

@ -0,0 +1,429 @@
use std::sync::Arc;
use async_trait::async_trait;
use authz::{Authorizer as IoxAuthorizer, Error as IoxError, Permission as IoxPermission};
use influxdb3_id::{DbId, TokenId};
use iox_time::{Time, TimeProvider};
use observability_deps::tracing::{debug, trace};
use serde::Serialize;
use sha2::{Digest, Sha512};
use std::fmt::Debug;
use thiserror::Error;
#[derive(Debug, Copy, Clone, Serialize)]
pub struct DatabaseActions(u16);
impl From<u16> for DatabaseActions {
fn from(value: u16) -> Self {
DatabaseActions(value)
}
}
#[derive(Debug, Copy, Clone, Serialize)]
pub struct CrudActions(u16);
impl From<u16> for CrudActions {
fn from(value: u16) -> Self {
CrudActions(value)
}
}
pub enum AccessRequest {
Database(DbId, DatabaseActions),
Token(TokenId, CrudActions),
Admin,
}
#[derive(Debug, Error)]
pub enum ResourceAuthorizationError {
#[error("unauthorized to perform requested action with the token")]
Unauthorized,
}
#[derive(Debug, Error)]
pub enum AuthenticatorError {
/// Error for token that is present in the request but missing in the catalog
#[error("token provided is not present in catalog")]
InvalidToken,
/// Error for token that has expired
#[error("token has expired {0}")]
ExpiredToken(String),
/// Error for missing token (this should really be handled at the HTTP/Grpc API layer itself)
#[error("missing token to authenticate")]
MissingToken,
}
impl From<AuthenticatorError> for IoxError {
fn from(err: AuthenticatorError) -> Self {
match err {
AuthenticatorError::InvalidToken => IoxError::NoToken,
AuthenticatorError::ExpiredToken(token_expiry_time) => {
// there is no mapping to let the caller know about expired token in iox so
// we just log it for now (only useful in debugging)
debug!(?token_expiry_time, "supplied token has expired");
IoxError::InvalidToken
}
AuthenticatorError::MissingToken => IoxError::NoToken,
}
}
}
#[async_trait]
pub trait AuthProvider: Debug + Send + Sync + 'static {
/// Authenticate request, at this point token maybe missing
async fn authenticate(
&self,
unhashed_token: Option<Vec<u8>>,
) -> Result<TokenId, AuthenticatorError>;
/// Authorize an action for an already authenticated request
async fn authorize_action(
&self,
// you must have a token if you're trying to authorize request
token_id: &TokenId,
access_request: AccessRequest,
) -> Result<(), ResourceAuthorizationError>;
// When authenticating we fetch the token_id and add it to http req extensions and
// later lookup extension. This extra indicator allows to check if the missing token
// in extension is an authentication issue or not, otherwise need to pass a flag to indicate
// whether server has been started with auth flag or not
fn should_check_token(&self) -> bool;
/// this is only needed so that grpc service can get hold of a `Arc<dyn IoxAuthorizer>`
fn upcast(&self) -> Arc<dyn IoxAuthorizer>;
}
pub trait TokenProvider: Send + Debug + Sync + 'static {
fn get_token(&self, token_hash: Vec<u8>) -> Option<Arc<TokenInfo>>;
}
#[derive(Clone, Debug)]
pub struct TokenAuthenticator {
token_provider: Arc<dyn TokenProvider>,
time_provider: Arc<dyn TimeProvider>,
}
impl TokenAuthenticator {
pub fn new(
token_provider: Arc<dyn TokenProvider>,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
Self {
token_provider,
time_provider,
}
}
}
#[async_trait]
impl AuthProvider for TokenAuthenticator {
async fn authenticate(
&self,
unhashed_token: Option<Vec<u8>>,
) -> Result<TokenId, AuthenticatorError> {
let provided = unhashed_token
.as_deref()
.ok_or(AuthenticatorError::MissingToken)?;
let hashed_token = Sha512::digest(provided);
if let Some(token) = self.token_provider.get_token(hashed_token.to_vec()) {
let expiry_ms = token.expiry_millis();
let current_timestamp_ms = self.time_provider.now().timestamp_millis();
debug!(?expiry_ms, ?current_timestamp_ms, "time comparison");
let is_active = expiry_ms > current_timestamp_ms;
if is_active {
return Ok(token.id);
} else {
trace!(token_expiry = ?expiry_ms, "token has expired");
let formatted = Time::from_timestamp_millis(expiry_ms)
.map(|time| time.to_rfc3339())
// this should be an error but we're within an error already,
// also it's only useful for debugging this error so a string
// default is ok here.
.unwrap_or("CannotBuildTime".to_owned());
return Err(AuthenticatorError::ExpiredToken(formatted));
}
};
Err(AuthenticatorError::InvalidToken)
}
async fn authorize_action(
&self,
_token_id: &TokenId,
_access_request: AccessRequest,
) -> Result<(), ResourceAuthorizationError> {
// this is a no-op in authenticator
Ok(())
}
fn should_check_token(&self) -> bool {
true
}
fn upcast(&self) -> Arc<dyn IoxAuthorizer> {
let cloned_self = (*self).clone();
Arc::new(cloned_self) as _
}
}
#[async_trait]
impl IoxAuthorizer for TokenAuthenticator {
async fn permissions(
&self,
token: Option<Vec<u8>>,
perms: &[IoxPermission],
) -> Result<Vec<IoxPermission>, IoxError> {
match self.authenticate(token).await {
Ok(_) => {
return Ok(perms.to_vec());
}
Err(err) => {
let iox_err = err.into();
return Err(iox_err);
}
}
}
}
#[derive(Clone, Copy, Debug)]
pub struct NoAuthAuthenticator;
#[async_trait]
impl IoxAuthorizer for NoAuthAuthenticator {
async fn permissions(
&self,
_token: Option<Vec<u8>>,
perms: &[IoxPermission],
) -> Result<Vec<IoxPermission>, IoxError> {
Ok(perms.to_vec())
}
async fn probe(&self) -> Result<(), IoxError> {
Ok(())
}
}
#[async_trait]
impl AuthProvider for NoAuthAuthenticator {
async fn authenticate(
&self,
_unhashed_token: Option<Vec<u8>>,
) -> Result<TokenId, AuthenticatorError> {
Ok(TokenId::from(0))
}
async fn authorize_action(
&self,
_token_id: &TokenId,
_access_request: AccessRequest,
) -> Result<(), ResourceAuthorizationError> {
Ok(())
}
fn should_check_token(&self) -> bool {
false
}
fn upcast(&self) -> Arc<dyn IoxAuthorizer> {
Arc::new(*self) as _
}
}
#[derive(Debug, Clone)]
pub struct TokenInfo {
pub id: TokenId,
pub name: Arc<str>,
pub hash: Vec<u8>,
pub description: Option<String>,
pub created_by: Option<TokenId>,
pub created_at: i64,
pub updated_at: Option<i64>,
pub updated_by: Option<TokenId>,
pub expiry_millis: i64,
// should be used only in enterprise
pub permissions: Vec<Permission>,
}
impl TokenInfo {
pub fn new(
id: TokenId,
name: Arc<str>,
hash: Vec<u8>,
created_at: i64,
expiry_millis: Option<i64>,
) -> Self {
Self {
id,
name,
hash,
created_at,
expiry_millis: expiry_millis.unwrap_or(i64::MAX),
description: None,
created_by: None,
updated_at: None,
updated_by: None,
permissions: Default::default(),
}
}
pub fn expiry_millis(&self) -> i64 {
self.expiry_millis
}
pub fn maybe_expiry_millis(&self) -> Option<i64> {
if self.expiry_millis == i64::MAX {
return None;
}
Some(self.expiry_millis)
}
// enterprise only
pub fn set_permissions(&mut self, all_permissions: Vec<Permission>) {
self.permissions = all_permissions;
}
}
// common types
/// This permission should map exactly to any of these variations
/// --permission "db:db1,db2:read"
/// --permission "db:*:read"
/// --permission "db:db1:read,write"
/// --permission "db:*:read,write"
/// --permission "*:*:read,write" (not sure if this is useful?)
/// --permission "*:*:*" (this will be admin)
///
/// These permissions are loosely translated to ResourceType, ResourceIdentifier and Actions. There
/// are more restrictive ways to handle it to disallow certain combinations using type system
/// itself. The wildcards at each level makes the setup harder to model everything within a single
/// enum for example. So, once we add few more resources this should be fairly straightforward to
/// refine to make certain combinations impossible to set
#[derive(Debug, Clone, Serialize)]
pub struct Permission {
pub resource_type: ResourceType,
pub resource_identifier: ResourceIdentifier,
pub actions: Actions,
}
#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq, Serialize)]
pub enum ResourceType {
Database,
Token,
Wildcard,
}
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize)]
pub enum ResourceIdentifier {
Database(Vec<DbId>),
Token(Vec<TokenId>),
Wildcard,
}
#[derive(Debug, Copy, Clone, Serialize)]
pub enum Actions {
Database(DatabaseActions),
Token(CrudActions),
Wildcard,
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use influxdb3_id::TokenId;
use iox_time::{MockProvider, Time};
use sha2::Digest;
use crate::{AuthProvider, AuthenticatorError, TokenAuthenticator, TokenInfo, TokenProvider};
#[derive(Debug)]
struct MockTokenProvider {
hashed_token: Vec<u8>,
expired: bool,
}
impl MockTokenProvider {
pub fn new(token: &str, expired: bool) -> Self {
let hash = sha2::Sha512::digest(token);
Self {
hashed_token: hash.to_vec(),
expired,
}
}
}
impl TokenProvider for MockTokenProvider {
fn get_token(&self, token_hash: Vec<u8>) -> Option<std::sync::Arc<crate::TokenInfo>> {
if token_hash == self.hashed_token {
if self.expired {
Some(Arc::new(TokenInfo::new(
TokenId::from(0),
"admin-token".into(),
self.hashed_token.clone(),
1000,
Some(1743320379000),
)))
} else {
Some(Arc::new(TokenInfo::new(
TokenId::from(0),
"admin-token".into(),
self.hashed_token.clone(),
1000,
Some(i64::MAX),
)))
}
} else {
None
}
}
}
#[test_log::test(tokio::test)]
async fn test_authenticator_success() {
let time_provider = MockProvider::new(Time::from_timestamp_nanos(0));
let token = "sample-token";
let token_provider = MockTokenProvider::new(token, false);
let authenticator =
TokenAuthenticator::new(Arc::new(token_provider), Arc::new(time_provider));
let token_id = authenticator
.authenticate(Some(token.as_bytes().to_vec()))
.await
.expect("to get token id after successful auth");
assert_eq!(TokenId::from(0), token_id);
}
#[test_log::test(tokio::test)]
async fn test_authenticator_missing_token() {
let time_provider = MockProvider::new(Time::from_timestamp_nanos(0));
let token = "sample-token";
let token_provider = MockTokenProvider::new(token, false);
let authenticator =
TokenAuthenticator::new(Arc::new(token_provider), Arc::new(time_provider));
let result = authenticator
.authenticate(Some("not-matching-token".as_bytes().to_vec()))
.await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
AuthenticatorError::InvalidToken
));
}
#[test_log::test(tokio::test)]
async fn test_authenticator_expired_token() {
let time_provider = MockProvider::new(Time::from_timestamp_millis(1743420379000).unwrap());
let token = "sample-token";
let token_provider = MockTokenProvider::new(token, true);
let authenticator =
TokenAuthenticator::new(Arc::new(token_provider), Arc::new(time_provider));
let result = authenticator
.authenticate(Some("sample-token".as_bytes().to_vec()))
.await;
assert!(result.is_err());
if let AuthenticatorError::ExpiredToken(expiry_time_str) = result.unwrap_err() {
assert_eq!("2025-03-30T07:39:39+00:00", expiry_time_str);
} else {
panic!("not the right type of authentication error");
}
}
}

View File

@ -13,6 +13,22 @@ expression: catalog.snapshot()
0,
{
"id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "test_db",
"tables": {
"repo": [
@ -258,9 +274,13 @@ expression: catalog.snapshot()
}
]
],
"next_id": 1
"next_id": 2
},
"sequence": 7,
"tokens": {
"repo": [],
"next_id": 0
},
"sequence": 6,
"catalog_id": "sample-host-id",
"catalog_uuid": "[uuid]"
}

View File

@ -13,6 +13,7 @@ schema = { workspace = true }
iox_time.workspace = true
# Local deps
influxdb3_authz = { path = "../influxdb3_authz/" }
influxdb3_id = { path = "../influxdb3_id" }
influxdb3_shutdown = { path = "../influxdb3_shutdown" }
influxdb3_wal = { path = "../influxdb3_wal" }
@ -20,6 +21,7 @@ influxdb3_wal = { path = "../influxdb3_wal" }
# crates.io dependencies
anyhow.workspace = true
arrow.workspace = true
base64.workspace = true
bimap.workspace = true
bitcode.workspace = true
byteorder.workspace = true
@ -30,13 +32,16 @@ crc32fast.workspace = true
cron.workspace = true
futures.workspace = true
hashbrown.workspace = true
hex.workspace = true
humantime.workspace = true
indexmap.workspace = true
object_store.workspace = true
parking_lot.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
sha2.workspace = true
thiserror.workspace = true
tokio.workspace = true
uuid.workspace = true

View File

@ -1,8 +1,16 @@
//! Implementation of the Catalog that sits entirely in memory.
use base64::Engine as _;
use base64::engine::general_purpose::URL_SAFE_NO_PAD as B64;
use bimap::BiHashMap;
use influxdb3_authz::Actions;
use influxdb3_authz::Permission;
use influxdb3_authz::ResourceIdentifier;
use influxdb3_authz::ResourceType;
use influxdb3_authz::TokenInfo;
use influxdb3_authz::TokenProvider;
use influxdb3_id::{
CatalogId, ColumnId, DbId, DistinctCacheId, LastCacheId, NodeId, SerdeVecMap, TableId,
CatalogId, ColumnId, DbId, DistinctCacheId, LastCacheId, NodeId, SerdeVecMap, TableId, TokenId,
TriggerId,
};
use influxdb3_shutdown::ShutdownToken;
@ -10,8 +18,12 @@ use iox_time::{Time, TimeProvider};
use object_store::ObjectStore;
use observability_deps::tracing::{debug, error, info, trace, warn};
use parking_lot::RwLock;
use rand::RngCore;
use rand::rngs::OsRng;
use schema::{Schema, SchemaBuilder};
use serde::{Deserialize, Serialize};
use sha2::Digest;
use sha2::Sha512;
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::BTreeMap;
@ -25,9 +37,10 @@ pub use schema::{InfluxColumnType, InfluxFieldType};
pub use update::{CatalogUpdate, DatabaseCatalogTransaction, Prompt};
use crate::channel::{CatalogSubscriptions, CatalogUpdateReceiver};
use crate::log::CreateAdminTokenDetails;
use crate::log::{
CreateDatabaseLog, DatabaseBatch, DatabaseCatalogOp, NodeBatch, NodeCatalogOp, NodeMode,
RegisterNodeLog, StopNodeLog,
RegenerateAdminTokenDetails, RegisterNodeLog, StopNodeLog, TokenBatch, TokenCatalogOp,
};
use crate::object_store::ObjectStoreCatalog;
use crate::resource::CatalogResource;
@ -46,6 +59,10 @@ const SOFT_DELETION_TIME_FORMAT: &str = "%Y%m%dT%H%M%S";
pub const TIME_COLUMN_NAME: &str = "time";
pub const INTERNAL_DB_NAME: &str = "_internal";
const DEFAULT_ADMIN_TOKEN_NAME: &str = "_admin";
/// The sequence number of a batch of WAL operations.
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
@ -118,7 +135,7 @@ impl Catalog {
let store =
ObjectStoreCatalog::new(Arc::clone(&node_id), CATALOG_CHECKPOINT_INTERVAL, store);
let subscriptions = Default::default();
store
let mut catalog = store
.load_or_create_catalog()
.await
.map_err(Into::into)
@ -128,7 +145,10 @@ impl Catalog {
time_provider,
store,
inner,
})
});
create_internal_db(&mut catalog).await;
catalog
}
pub async fn new_with_shutdown(
@ -351,6 +371,119 @@ impl Catalog {
.collect();
result
}
pub fn get_tokens(&self) -> Vec<Arc<TokenInfo>> {
self.inner
.read()
.tokens
.repo()
.iter()
.map(|(_, token_info)| Arc::clone(token_info))
.collect()
}
pub async fn create_admin_token(&self, regenerate: bool) -> Result<(Arc<TokenInfo>, String)> {
// if regen, if token is present already create a new token and hash and update the
// existing token otherwise we should insert to catalog (essentially an upsert)
let (token, hash) = create_token_and_hash();
self.catalog_update_with_retry(|| {
if regenerate {
let default_admin_token = self
.inner
.read()
.tokens
.repo()
.get_by_name(DEFAULT_ADMIN_TOKEN_NAME);
if default_admin_token.is_none() {
return Err(CatalogError::MissingAdminTokenToUpdate);
}
// now just update the hash and updated at
Ok(CatalogBatch::Token(TokenBatch {
time_ns: self.time_provider.now().timestamp_nanos(),
ops: vec![TokenCatalogOp::RegenerateAdminToken(
RegenerateAdminTokenDetails {
token_id: default_admin_token.unwrap().as_ref().id,
hash: hash.clone(),
updated_at: self.time_provider.now().timestamp_millis(),
},
)],
}))
} else {
// validate name
if self
.inner
.read()
.tokens
.repo()
.contains_name(DEFAULT_ADMIN_TOKEN_NAME)
{
return Err(CatalogError::TokenNameAlreadyExists(
DEFAULT_ADMIN_TOKEN_NAME.to_owned(),
));
}
let (token_id, created_at, expiry) = {
let mut inner = self.inner.write();
let token_id = inner.tokens.get_and_increment_next_id();
let created_at = self.time_provider.now();
let expiry = None;
(token_id, created_at.timestamp_millis(), expiry)
};
Ok(CatalogBatch::Token(TokenBatch {
time_ns: created_at,
ops: vec![TokenCatalogOp::CreateAdminToken(CreateAdminTokenDetails {
token_id,
name: Arc::from(DEFAULT_ADMIN_TOKEN_NAME),
hash: hash.clone(),
created_at,
updated_at: None,
expiry,
})],
}))
}
})
.await?;
let token_info = {
self.inner
.read()
.tokens
.repo()
.get_by_name(DEFAULT_ADMIN_TOKEN_NAME)
.expect("token info must be present after token creation by name")
};
// we need to pass these details back, especially this token as this is what user should
// send in subsequent requests
Ok((token_info, token))
}
}
async fn create_internal_db(catalog: &mut std::result::Result<Catalog, CatalogError>) {
// if catalog is initialised, create internal db
if let Ok(catalog) = catalog.as_mut() {
let result = catalog.create_database(INTERNAL_DB_NAME).await;
// what is the best outcome if "_internal" cannot be created?
match result {
Ok(_) => info!("created internal database"),
Err(err) => {
match err {
CatalogError::AlreadyExists => {
// this is probably ok
debug!("not creating internal db as it exists already");
}
_ => {
// all other errors are unexpected state
error!(?err, "unexpected error when creating internal db");
panic!("cannot create internal db");
}
}
}
};
}
}
impl Catalog {
@ -392,6 +525,12 @@ impl Catalog {
}
}
impl TokenProvider for Catalog {
fn get_token(&self, token_hash: Vec<u8>) -> Option<Arc<TokenInfo>> {
self.inner.read().tokens.hash_to_info(token_hash)
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Repository<I: CatalogId, R: CatalogResource> {
pub(crate) repo: SerdeVecMap<I, Arc<R>>,
@ -545,6 +684,9 @@ pub struct InnerCatalog {
pub(crate) nodes: Repository<NodeId, NodeDefinition>,
/// The catalog is a map of databases with their table schemas
pub(crate) databases: Repository<DbId, DatabaseSchema>,
/// This holds all the tokens created and saved in catalog
/// saved in catalog snapshot
pub(crate) tokens: TokenRepository,
}
impl InnerCatalog {
@ -555,6 +697,7 @@ impl InnerCatalog {
catalog_uuid,
nodes: Repository::default(),
databases: Repository::default(),
tokens: TokenRepository::default(),
}
}
@ -563,7 +706,11 @@ impl InnerCatalog {
}
pub fn database_count(&self) -> usize {
self.databases.iter().filter(|db| !db.1.deleted).count()
self.databases
.iter()
// count if not db deleted _and_ not internal
.filter(|db| !db.1.deleted && db.1.name().as_ref() != INTERNAL_DB_NAME)
.count()
}
pub fn table_count(&self) -> usize {
@ -588,6 +735,7 @@ impl InnerCatalog {
let updated = match catalog_batch {
CatalogBatch::Node(root_batch) => self.apply_node_batch(root_batch)?,
CatalogBatch::Database(database_batch) => self.apply_database_batch(database_batch)?,
CatalogBatch::Token(token_batch) => self.apply_token_batch(token_batch)?,
};
Ok(updated.then(|| {
@ -654,6 +802,43 @@ impl InnerCatalog {
Ok(updated)
}
fn apply_token_batch(&mut self, token_batch: &TokenBatch) -> Result<bool> {
let mut is_updated = false;
for op in &token_batch.ops {
is_updated |= match op {
TokenCatalogOp::CreateAdminToken(create_admin_token_details) => {
let mut token_info = TokenInfo::new(
create_admin_token_details.token_id,
Arc::clone(&create_admin_token_details.name),
create_admin_token_details.hash.clone(),
create_admin_token_details.created_at,
create_admin_token_details.expiry,
);
token_info.set_permissions(vec![Permission {
resource_type: ResourceType::Wildcard,
resource_identifier: ResourceIdentifier::Wildcard,
actions: Actions::Wildcard,
}]);
// add the admin token itself
self.tokens
.add_token(create_admin_token_details.token_id, token_info)?;
true
}
TokenCatalogOp::RegenerateAdminToken(regenerate_admin_token_details) => {
self.tokens.update_admin_token_hash(
regenerate_admin_token_details.token_id,
regenerate_admin_token_details.hash.clone(),
regenerate_admin_token_details.updated_at,
)?;
true
}
};
}
Ok(is_updated)
}
fn apply_database_batch(&mut self, database_batch: &DatabaseBatch) -> Result<bool> {
let table_count = self.table_count();
if let Some(db) = self.databases.get_by_id(&database_batch.database_id) {
@ -1562,6 +1747,89 @@ impl ColumnDefinition {
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct TokenRepository {
repo: Repository<TokenId, TokenInfo>,
hash_lookup_map: BiHashMap<TokenId, Vec<u8>>,
}
impl TokenRepository {
pub(crate) fn new(
repo: Repository<TokenId, TokenInfo>,
hash_lookup_map: BiHashMap<TokenId, Vec<u8>>,
) -> Self {
Self {
repo,
hash_lookup_map,
}
}
pub(crate) fn repo(&self) -> &Repository<TokenId, TokenInfo> {
&self.repo
}
pub(crate) fn get_and_increment_next_id(&mut self) -> TokenId {
self.repo.get_and_increment_next_id()
}
pub(crate) fn hash_to_info(&self, hash: Vec<u8>) -> Option<Arc<TokenInfo>> {
let id = self
.hash_lookup_map
.get_by_right(&hash)
.map(|id| id.to_owned())?;
self.repo.get_by_id(&id)
}
pub(crate) fn add_token(&mut self, token_id: TokenId, token_info: TokenInfo) -> Result<()> {
self.hash_lookup_map
.insert(token_id, token_info.hash.clone());
self.repo.insert(token_id, token_info)?;
Ok(())
}
pub(crate) fn update_admin_token_hash(
&mut self,
token_id: TokenId,
hash: Vec<u8>,
updated_at: i64,
) -> Result<()> {
let mut token_info = self
.repo
.get_by_id(&token_id)
.ok_or_else(|| CatalogError::MissingAdminTokenToUpdate)?;
let updatable = Arc::make_mut(&mut token_info);
updatable.hash = hash.clone();
updatable.updated_at = Some(updated_at);
updatable.updated_by = Some(token_id);
self.repo.update(token_id, token_info)?;
self.hash_lookup_map.insert(token_id, hash);
Ok(())
}
}
impl CatalogResource for TokenInfo {
type Identifier = TokenId;
fn id(&self) -> Self::Identifier {
self.id
}
fn name(&self) -> Arc<str> {
Arc::clone(&self.name)
}
}
fn create_token_and_hash() -> (String, Vec<u8>) {
let token = {
let mut token = String::from("apiv3_");
let mut key = [0u8; 64];
OsRng.fill_bytes(&mut key);
token.push_str(&B64.encode(key));
token
};
(token.clone(), Sha512::digest(&token).to_vec())
}
#[cfg(test)]
mod tests {
@ -1629,7 +1897,7 @@ mod tests {
".catalog_uuid" => "[uuid]"
});
catalog.update_from_snapshot(snapshot);
assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(0)));
assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(1)));
});
}
}
@ -1717,7 +1985,7 @@ mod tests {
".catalog_uuid" => "[uuid]"
});
catalog.update_from_snapshot(snapshot);
assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(0)));
assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(1)));
});
}
}
@ -1764,7 +2032,7 @@ mod tests {
".catalog_uuid" => "[uuid]"
});
catalog.update_from_snapshot(snapshot);
assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(0)));
assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(1)));
});
}
}
@ -1810,7 +2078,7 @@ mod tests {
".catalog_uuid" => "[uuid]"
});
catalog.update_from_snapshot(snapshot);
assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(0)));
assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(1)));
});
}
}

View File

@ -672,7 +672,7 @@ impl Catalog {
.await
}
async fn catalog_update_with_retry<F>(
pub(crate) async fn catalog_update_with_retry<F>(
&self,
batch_creator_fn: F,
) -> Result<Option<OrderedCatalogBatch>>

View File

@ -139,6 +139,7 @@ mod tests {
match b {
CatalogBatch::Node(_) => (),
CatalogBatch::Database(_) => (),
CatalogBatch::Token(_) => (),
}
}
n_updates += 1;

View File

@ -163,6 +163,15 @@ pub enum CatalogError {
#[error("invalid error behavior {0}")]
InvalidErrorBehavior(String),
#[error("token name already exists, {0}")]
TokenNameAlreadyExists(String),
#[error("missing admin token, cannot update")]
MissingAdminTokenToUpdate,
#[error("cannot delete internal db")]
CannotDeleteInternalDatabase,
}
impl CatalogError {

View File

@ -12,7 +12,9 @@ use cron::Schedule;
use hashbrown::HashMap;
use humantime::{format_duration, parse_duration};
use influxdb_line_protocol::FieldValue;
use influxdb3_id::{ColumnId, DbId, DistinctCacheId, LastCacheId, NodeId, TableId, TriggerId};
use influxdb3_id::{
ColumnId, DbId, DistinctCacheId, LastCacheId, NodeId, TableId, TokenId, TriggerId,
};
use schema::{InfluxColumnType, InfluxFieldType};
use serde::{Deserialize, Serialize};
@ -22,6 +24,7 @@ use crate::{CatalogError, Result, catalog::CatalogSequenceNumber};
pub enum CatalogBatch {
Node(NodeBatch),
Database(DatabaseBatch),
Token(TokenBatch),
}
impl CatalogBatch {
@ -57,6 +60,7 @@ impl CatalogBatch {
match self {
CatalogBatch::Node(node_batch) => node_batch.ops.len(),
CatalogBatch::Database(database_batch) => database_batch.ops.len(),
CatalogBatch::Token(token_batch) => token_batch.ops.len(),
}
}
@ -64,6 +68,7 @@ impl CatalogBatch {
match self {
CatalogBatch::Node(_) => None,
CatalogBatch::Database(database_batch) => Some(database_batch),
CatalogBatch::Token(_) => None,
}
}
@ -71,6 +76,7 @@ impl CatalogBatch {
match self {
CatalogBatch::Node(_) => None,
CatalogBatch::Database(database_batch) => Some(database_batch),
CatalogBatch::Token(_) => None,
}
}
}
@ -792,3 +798,36 @@ impl TriggerSpecificationDefinition {
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Default)]
pub struct TokenBatch {
pub time_ns: i64,
pub ops: Vec<TokenCatalogOp>,
}
// PK: I cannot come up with better names for variants, I _think_
// it is ok to ignore. Maybe I can break them into separate
// enum for each type
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum TokenCatalogOp {
CreateAdminToken(CreateAdminTokenDetails),
RegenerateAdminToken(RegenerateAdminTokenDetails),
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct CreateAdminTokenDetails {
pub token_id: TokenId,
pub name: Arc<str>,
pub hash: Vec<u8>,
pub created_at: i64,
pub updated_at: Option<i64>,
pub expiry: Option<i64>,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct RegenerateAdminTokenDetails {
pub token_id: TokenId,
pub hash: Vec<u8>,
pub updated_at: i64,
}

View File

@ -10,6 +10,7 @@ impl From<super::CatalogSnapshot> for v2::CatalogSnapshot {
sequence: value.sequence,
catalog_id: value.catalog_id,
catalog_uuid: value.catalog_uuid,
tokens: v2::RepositorySnapshot::default(),
}
}
}

View File

@ -1,6 +1,6 @@
use crate::catalog::{
CatalogSequenceNumber, ColumnDefinition, DatabaseSchema, InnerCatalog, NodeDefinition,
NodeState, Repository, TableDefinition,
NodeState, Repository, TableDefinition, TokenRepository,
};
use crate::log::{
DistinctCacheDefinition, LastCacheDefinition, LastCacheTtl, LastCacheValueColumnsDef, MaxAge,
@ -8,9 +8,13 @@ use crate::log::{
};
use crate::resource::CatalogResource;
use arrow::datatypes::DataType as ArrowDataType;
use bimap::BiHashMap;
use hashbrown::HashMap;
use influxdb3_authz::{
Actions, CrudActions, DatabaseActions, Permission, ResourceIdentifier, ResourceType, TokenInfo,
};
use influxdb3_id::{
CatalogId, ColumnId, DbId, DistinctCacheId, LastCacheId, NodeId, SerdeVecMap, TableId,
CatalogId, ColumnId, DbId, DistinctCacheId, LastCacheId, NodeId, SerdeVecMap, TableId, TokenId,
TriggerId,
};
use schema::{InfluxColumnType, InfluxFieldType, TIME_DATA_TIMEZONE};
@ -30,6 +34,8 @@ pub struct CatalogSnapshot {
pub(crate) nodes: RepositorySnapshot<NodeId, NodeSnapshot>,
pub(crate) databases: RepositorySnapshot<DbId, DatabaseSnapshot>,
pub(crate) sequence: CatalogSequenceNumber,
#[serde(default)]
pub(crate) tokens: RepositorySnapshot<TokenId, TokenInfoSnapshot>,
pub(crate) catalog_id: Arc<str>,
pub(crate) catalog_uuid: Uuid,
}
@ -49,21 +55,235 @@ impl Snapshot for InnerCatalog {
databases: self.databases.snapshot(),
sequence: self.sequence,
catalog_id: Arc::clone(&self.catalog_id),
tokens: self.tokens.repo().snapshot(),
catalog_uuid: self.catalog_uuid,
}
}
fn from_snapshot(snap: Self::Serialized) -> Self {
let repository: Repository<TokenId, TokenInfo> = Repository::from_snapshot(snap.tokens);
let mut hash_lookup_map = BiHashMap::new();
repository.repo.iter().for_each(|(id, info)| {
// this clone should maybe be switched to arc?
hash_lookup_map.insert(*id, info.hash.clone());
});
let token_info_repo = TokenRepository::new(repository, hash_lookup_map);
Self {
sequence: snap.sequence,
catalog_id: snap.catalog_id,
catalog_uuid: snap.catalog_uuid,
nodes: Repository::from_snapshot(snap.nodes),
databases: Repository::from_snapshot(snap.databases),
tokens: token_info_repo,
}
}
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub(crate) struct TokenInfoSnapshot {
id: TokenId,
name: Arc<str>,
hash: Vec<u8>,
created_at: i64,
description: Option<String>,
created_by: Option<TokenId>,
expiry: i64,
updated_by: Option<TokenId>,
updated_at: Option<i64>,
permissions: Vec<PermissionSnapshot>,
}
impl Snapshot for TokenInfo {
type Serialized = TokenInfoSnapshot;
fn snapshot(&self) -> Self::Serialized {
Self::Serialized {
id: self.id,
name: Arc::clone(&self.name),
hash: self.hash.clone(),
created_at: self.created_at,
expiry: self.expiry_millis,
created_by: self.created_by,
updated_at: self.updated_at,
updated_by: self.updated_by,
description: self.description.clone(),
permissions: self
.permissions
.iter()
.map(|perm| perm.snapshot())
.collect(),
}
}
fn from_snapshot(snap: Self::Serialized) -> Self {
Self {
id: snap.id,
name: snap.name,
hash: snap.hash,
created_at: snap.created_at,
expiry_millis: snap.expiry,
created_by: snap.created_by,
updated_by: snap.updated_by,
updated_at: snap.updated_at,
permissions: snap
.permissions
.into_iter()
.map(Permission::from_snapshot)
.collect(),
description: snap.description,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct PermissionSnapshot {
resource_type: ResourceTypeSnapshot,
resource_identifier: ResourceIdentifierSnapshot,
actions: ActionsSnapshot,
}
impl Snapshot for Permission {
type Serialized = PermissionSnapshot;
fn snapshot(&self) -> Self::Serialized {
PermissionSnapshot {
resource_type: self.resource_type.snapshot(),
resource_identifier: self.resource_identifier.snapshot(),
actions: self.actions.snapshot(),
}
}
fn from_snapshot(snap: Self::Serialized) -> Self {
Self {
resource_type: ResourceType::from_snapshot(snap.resource_type),
resource_identifier: ResourceIdentifier::from_snapshot(snap.resource_identifier),
actions: Actions::from_snapshot(snap.actions),
}
}
}
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub(crate) enum ResourceTypeSnapshot {
Database,
Token,
Wildcard,
}
impl Snapshot for ResourceType {
type Serialized = ResourceTypeSnapshot;
fn snapshot(&self) -> Self::Serialized {
match self {
ResourceType::Database => ResourceTypeSnapshot::Database,
ResourceType::Token => ResourceTypeSnapshot::Token,
ResourceType::Wildcard => ResourceTypeSnapshot::Wildcard,
}
}
fn from_snapshot(snap: Self::Serialized) -> Self {
match snap {
ResourceTypeSnapshot::Database => ResourceType::Database,
ResourceTypeSnapshot::Token => ResourceType::Token,
ResourceTypeSnapshot::Wildcard => ResourceType::Wildcard,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) enum ResourceIdentifierSnapshot {
Database(Vec<DbId>),
Token(Vec<TokenId>),
Wildcard,
}
impl Snapshot for ResourceIdentifier {
type Serialized = ResourceIdentifierSnapshot;
fn snapshot(&self) -> Self::Serialized {
match self {
ResourceIdentifier::Database(db_id) => {
ResourceIdentifierSnapshot::Database(db_id.clone())
}
ResourceIdentifier::Token(token_id) => {
ResourceIdentifierSnapshot::Token(token_id.clone())
}
ResourceIdentifier::Wildcard => ResourceIdentifierSnapshot::Wildcard,
}
}
fn from_snapshot(snap: Self::Serialized) -> Self {
match snap {
ResourceIdentifierSnapshot::Database(db_id) => ResourceIdentifier::Database(db_id),
ResourceIdentifierSnapshot::Token(token_id) => ResourceIdentifier::Token(token_id),
ResourceIdentifierSnapshot::Wildcard => ResourceIdentifier::Wildcard,
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) enum ActionsSnapshot {
Database(DatabaseActionsSnapshot),
Token(CrudActionsSnapshot),
Wildcard,
}
impl Snapshot for Actions {
type Serialized = ActionsSnapshot;
fn snapshot(&self) -> Self::Serialized {
match self {
Actions::Database(database_actions) => {
ActionsSnapshot::Database(database_actions.snapshot())
}
Actions::Token(crud_actions) => ActionsSnapshot::Token(crud_actions.snapshot()),
Actions::Wildcard => ActionsSnapshot::Wildcard,
}
}
fn from_snapshot(snap: Self::Serialized) -> Self {
match snap {
ActionsSnapshot::Database(db_actions) => {
Actions::Database(DatabaseActions::from_snapshot(db_actions))
}
ActionsSnapshot::Token(crud_actions) => {
Actions::Token(CrudActions::from_snapshot(crud_actions))
}
ActionsSnapshot::Wildcard => Actions::Wildcard,
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) struct DatabaseActionsSnapshot(u16);
impl Snapshot for DatabaseActions {
type Serialized = DatabaseActionsSnapshot;
fn snapshot(&self) -> Self::Serialized {
DatabaseActionsSnapshot(u16::MAX)
}
fn from_snapshot(snap: Self::Serialized) -> Self {
snap.0.into()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) struct CrudActionsSnapshot(u16);
impl Snapshot for CrudActions {
type Serialized = CrudActionsSnapshot;
fn snapshot(&self) -> Self::Serialized {
CrudActionsSnapshot(u16::MAX)
}
fn from_snapshot(snap: Self::Serialized) -> Self {
snap.0.into()
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct NodeSnapshot {
pub(crate) node_id: Arc<str>,
@ -385,7 +605,7 @@ impl Snapshot for DistinctCacheDefinition {
}
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Default)]
pub(crate) struct RepositorySnapshot<I, R>
where
I: CatalogId,

View File

@ -13,6 +13,22 @@ expression: catalog.snapshot()
0,
{
"id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "test_db",
"tables": {
"repo": [
@ -101,9 +117,13 @@ expression: catalog.snapshot()
}
]
],
"next_id": 1
"next_id": 2
},
"sequence": 4,
"tokens": {
"repo": [],
"next_id": 0
},
"sequence": 3,
"catalog_id": "test",
"catalog_uuid": "[uuid]"
}

View File

@ -14,6 +14,22 @@ expression: snapshot
0,
{
"id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "test_db",
"tables": {
"repo": [
@ -300,9 +316,13 @@ expression: snapshot
}
]
],
"next_id": 1
"next_id": 2
},
"sequence": 4,
"tokens": {
"repo": [],
"next_id": 0
},
"sequence": 3,
"catalog_id": "sample-host-id",
"catalog_uuid": "[uuid]"
}

View File

@ -14,6 +14,22 @@ expression: snapshot
0,
{
"id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "test_db",
"tables": {
"repo": [
@ -140,9 +156,13 @@ expression: snapshot
}
]
],
"next_id": 1
"next_id": 2
},
"sequence": 4,
"tokens": {
"repo": [],
"next_id": 0
},
"sequence": 3,
"catalog_id": "sample-host-id",
"catalog_uuid": "[uuid]"
}

View File

@ -14,6 +14,22 @@ expression: snapshot
0,
{
"id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "test_db",
"tables": {
"repo": [
@ -143,9 +159,13 @@ expression: snapshot
}
]
],
"next_id": 1
"next_id": 2
},
"sequence": 4,
"tokens": {
"repo": [],
"next_id": 0
},
"sequence": 3,
"catalog_id": "sample-host-id",
"catalog_uuid": "[uuid]"
}

View File

@ -14,6 +14,22 @@ expression: snapshot
0,
{
"id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "test_db",
"tables": {
"repo": [
@ -124,9 +140,13 @@ expression: snapshot
}
]
],
"next_id": 1
"next_id": 2
},
"sequence": 3,
"tokens": {
"repo": [],
"next_id": 0
},
"sequence": 2,
"catalog_id": "sample-host-id",
"catalog_uuid": "[uuid]"
}

View File

@ -639,6 +639,36 @@ impl Client {
}
}
/// Create an admin token
pub async fn api_v3_configure_create_admin_token(
&self,
) -> Result<Option<CreateTokenWithPermissionsResponse>> {
let response_json: Result<Option<CreateTokenWithPermissionsResponse>> = self
.send_create(
Method::POST,
"/api/v3/configure/token/admin",
None::<()>,
None::<()>,
)
.await;
response_json
}
/// regenerate admin token
pub async fn api_v3_configure_regenerate_admin_token(
&self,
) -> Result<Option<CreateTokenWithPermissionsResponse>> {
let response_json: Result<Option<CreateTokenWithPermissionsResponse>> = self
.send_create(
Method::POST,
"/api/v3/configure/token/admin/regenerate",
None::<()>,
None::<()>,
)
.await;
response_json
}
/// Serialize the given `B` to json then send the request and return the resulting bytes.
async fn send_json_get_bytes<B, Q>(
&self,

View File

@ -66,6 +66,7 @@ catalog_identifier_type!(TriggerId, u32);
catalog_identifier_type!(ColumnId, u16);
catalog_identifier_type!(LastCacheId, u16);
catalog_identifier_type!(DistinctCacheId, u16);
catalog_identifier_type!(TokenId, u64);
/// The next file id to be used when persisting `ParquetFile`s
pub static NEXT_FILE_ID: AtomicU64 = AtomicU64::new(0);

View File

@ -34,6 +34,7 @@ trace_http.workspace = true
tracker.workspace = true
# Local Deps
influxdb3_authz = { path = "../influxdb3_authz" }
influxdb3_cache = { path = "../influxdb3_cache" }
influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" }

View File

@ -0,0 +1,30 @@
pub(crate) const API_LEGACY_WRITE: &str = "/write";
pub(crate) const API_V2_WRITE: &str = "/api/v2/write";
pub(crate) const API_V3_WRITE: &str = "/api/v3/write_lp";
pub(crate) const API_V3_QUERY_SQL: &str = "/api/v3/query_sql";
pub(crate) const API_V3_QUERY_INFLUXQL: &str = "/api/v3/query_influxql";
pub(crate) const API_V1_QUERY: &str = "/query";
pub(crate) const API_V3_HEALTH: &str = "/health";
pub(crate) const API_V1_HEALTH: &str = "/api/v1/health";
pub(crate) const API_V3_ENGINE: &str = "/api/v3/engine/";
pub(crate) const API_V3_CONFIGURE_DISTINCT_CACHE: &str = "/api/v3/configure/distinct_cache";
pub(crate) const API_V3_CONFIGURE_LAST_CACHE: &str = "/api/v3/configure/last_cache";
pub(crate) const API_V3_CONFIGURE_PROCESSING_ENGINE_DISABLE: &str =
"/api/v3/configure/processing_engine_trigger/disable";
pub(crate) const API_V3_CONFIGURE_PROCESSING_ENGINE_ENABLE: &str =
"/api/v3/configure/processing_engine_trigger/enable";
pub(crate) const API_V3_CONFIGURE_PROCESSING_ENGINE_TRIGGER: &str =
"/api/v3/configure/processing_engine_trigger";
pub(crate) const API_V3_CONFIGURE_PLUGIN_INSTALL_PACKAGES: &str =
"/api/v3/configure/plugin_environment/install_packages";
pub(crate) const API_V3_CONFIGURE_PLUGIN_INSTALL_REQUIREMENTS: &str =
"/api/v3/configure/plugin_environment/install_requirements";
pub(crate) const API_V3_CONFIGURE_DATABASE: &str = "/api/v3/configure/database";
pub(crate) const API_V3_CONFIGURE_TABLE: &str = "/api/v3/configure/table";
pub(crate) const API_METRICS: &str = "/metrics";
pub(crate) const API_PING: &str = "/ping";
pub(crate) const API_V3_CONFIGURE_ADMIN_TOKEN: &str = "/api/v3/configure/token/admin";
pub(crate) const API_V3_CONFIGURE_ADMIN_TOKEN_REGENERATE: &str =
"/api/v3/configure/token/admin/regenerate";
pub(crate) const API_V3_TEST_WAL_ROUTE: &str = "/api/v3/plugin_test/wal";
pub(crate) const API_V3_TEST_PLUGIN_ROUTE: &str = "/api/v3/plugin_test/schedule";

View File

@ -1,58 +0,0 @@
use async_trait::async_trait;
use authz::{Authorizer, Error, Permission};
use observability_deps::tracing::{debug, warn};
use sha2::{Digest, Sha512};
/// An [`Authorizer`] implementation that will grant access to all
/// requests that provide `token`
#[derive(Debug)]
pub struct AllOrNothingAuthorizer {
token: Vec<u8>,
}
impl AllOrNothingAuthorizer {
pub fn new(token: Vec<u8>) -> Self {
Self { token }
}
}
#[async_trait]
impl Authorizer for AllOrNothingAuthorizer {
async fn permissions(
&self,
token: Option<Vec<u8>>,
perms: &[Permission],
) -> Result<Vec<Permission>, Error> {
debug!(?perms, "requesting permissions");
let provided = token.as_deref().ok_or(Error::NoToken)?;
if Sha512::digest(provided)[..] == self.token {
Ok(perms.to_vec())
} else {
warn!("invalid token provided");
Err(Error::InvalidToken)
}
}
async fn probe(&self) -> Result<(), Error> {
Ok(())
}
}
/// The defult [`Authorizer`] implementation that will authorize all requests
#[derive(Clone, Copy, Debug)]
pub struct DefaultAuthorizer;
#[async_trait]
impl Authorizer for DefaultAuthorizer {
async fn permissions(
&self,
_token: Option<Vec<u8>>,
perms: &[Permission],
) -> Result<Vec<Permission>, Error> {
Ok(perms.to_vec())
}
async fn probe(&self) -> Result<(), Error> {
Ok(())
}
}

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use crate::{CommonServerState, Server, auth::DefaultAuthorizer, http::HttpApi};
use authz::Authorizer;
use crate::{CommonServerState, Server, http::HttpApi};
use influxdb3_authz::{AuthProvider, NoAuthAuthenticator};
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_processing_engine::ProcessingEngineManagerImpl;
use influxdb3_write::{WriteBuffer, persister::Persister};
@ -18,7 +18,7 @@ pub struct ServerBuilder<W, Q, P, T, L, E> {
persister: P,
listener: L,
processing_engine: E,
authorizer: Arc<dyn Authorizer>,
authorizer: Arc<dyn AuthProvider>,
}
impl
@ -40,7 +40,7 @@ impl
query_executor: NoQueryExec,
persister: NoPersister,
listener: NoListener,
authorizer: Arc::new(DefaultAuthorizer),
authorizer: Arc::new(NoAuthAuthenticator),
processing_engine: NoProcessingEngine,
}
}
@ -52,7 +52,7 @@ impl<W, Q, P, T, L, E> ServerBuilder<W, Q, P, T, L, E> {
self
}
pub fn authorizer(mut self, a: Arc<dyn Authorizer>) -> Self {
pub fn authorizer(mut self, a: Arc<dyn AuthProvider>) -> Self {
self.authorizer = a;
self
}

View File

@ -1,9 +1,8 @@
//! HTTP API service implementations for `server`
use crate::CommonServerState;
use crate::{CommonServerState, all_paths};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty;
use authz::Authorizer;
use authz::http::AuthorizationHeaderExtension;
use bytes::{Bytes, BytesMut};
use data_types::NamespaceName;
@ -21,6 +20,7 @@ use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response, StatusCode};
use influxdb_influxql_parser::select::GroupByClause;
use influxdb_influxql_parser::statement::Statement;
use influxdb3_authz::{AuthProvider, NoAuthAuthenticator};
use influxdb3_cache::distinct_cache;
use influxdb3_cache::last_cache;
use influxdb3_catalog::CatalogError;
@ -41,7 +41,7 @@ use iox_http::write::{WriteParseError, WriteRequestUnifier};
use iox_query_influxql_rewrite as rewrite;
use iox_query_params::StatementParams;
use iox_time::TimeProvider;
use observability_deps::tracing::{debug, error, info};
use observability_deps::tracing::{debug, error, info, trace};
use serde::Deserialize;
use serde::Serialize;
use serde::de::DeserializeOwned;
@ -235,9 +235,9 @@ pub enum Error {
}
#[derive(Debug, Error)]
pub(crate) enum AuthorizationError {
#[error("the request was not authorized")]
Unauthorized,
pub(crate) enum AuthenticationError {
#[error("the request was not authenticated")]
Unauthenticated,
#[error("the request was not in the form of 'Authorization: Bearer <token>'")]
MalformedRequest,
#[error("requestor is forbidden from requested resource")]
@ -500,7 +500,7 @@ pub(crate) struct HttpApi {
time_provider: Arc<dyn TimeProvider>,
pub(crate) query_executor: Arc<dyn QueryExecutor>,
max_request_bytes: usize,
authorizer: Arc<dyn Authorizer>,
authorizer: Arc<dyn AuthProvider>,
legacy_write_param_unifier: SingleTenantRequestUnifier,
}
@ -512,9 +512,12 @@ impl HttpApi {
query_executor: Arc<dyn QueryExecutor>,
processing_engine: Arc<ProcessingEngineManagerImpl>,
max_request_bytes: usize,
authorizer: Arc<dyn Authorizer>,
authorizer: Arc<dyn AuthProvider>,
) -> Self {
let legacy_write_param_unifier = SingleTenantRequestUnifier::new(Arc::clone(&authorizer));
// there is a global authentication setup, passing in auth provider just does the same
// check twice. So, instead we pass in a NoAuthAuthenticator to avoid authenticating twice.
let legacy_write_param_unifier =
SingleTenantRequestUnifier::new(Arc::clone(&NoAuthAuthenticator.upcast()));
Self {
common_state,
time_provider,
@ -577,6 +580,42 @@ impl HttpApi {
}
}
pub(crate) async fn create_admin_token(
&self,
_req: Request<Body>,
) -> Result<Response<Body>, Error> {
let catalog = self.write_buffer.catalog();
let (token_info, token) = catalog.create_admin_token(false).await?;
let response = CreateTokenWithPermissionsResponse::from_token_info(token_info, token);
let body = serde_json::to_vec(&response)?;
let body = Response::builder()
.status(StatusCode::CREATED)
.header(CONTENT_TYPE, "json")
.body(Body::from(body));
Ok(body?)
}
pub(crate) async fn regenerate_admin_token(
&self,
_req: Request<Body>,
) -> Result<Response<Body>, Error> {
let catalog = self.write_buffer.catalog();
let (token_info, token) = catalog.create_admin_token(true).await?;
let response = CreateTokenWithPermissionsResponse::from_token_info(token_info, token);
let body = serde_json::to_vec(&response)?;
let body = Response::builder()
.status(StatusCode::CREATED)
.header(CONTENT_TYPE, "json")
.body(Body::from(body));
Ok(body?)
}
async fn query_sql(&self, req: Request<Body>) -> Result<Response<Body>> {
let QueryRequest {
database,
@ -703,14 +742,17 @@ impl HttpApi {
Ok(decoded_data.into())
}
async fn authorize_request(&self, req: &mut Request<Body>) -> Result<(), AuthorizationError> {
async fn authenticate_request(
&self,
req: &mut Request<Body>,
) -> Result<(), AuthenticationError> {
// Extend the request with the authorization token; this is used downstream in some
// APIs, such as write, that need the full header value to authorize a request.
let auth_header = req.headers().get(AUTHORIZATION).cloned();
req.extensions_mut()
.insert(AuthorizationHeaderExtension::new(auth_header));
.insert(AuthorizationHeaderExtension::new(auth_header.clone()));
let auth = if let Some(p) = extract_v1_auth_token(req) {
let auth_token = if let Some(p) = extract_v1_auth_token(req) {
Some(p)
} else {
// We won't need the authorization header anymore and we don't want to accidentally log it.
@ -723,10 +765,17 @@ impl HttpApi {
// Currently we pass an empty permissions list, but in future we may be able to derive
// the permissions based on the incoming request
let permissions = self.authorizer.permissions(auth, &[]).await?;
let token_id = self
.authorizer
.authenticate(auth_token.clone())
.await
.map_err(|e| {
error!(?e, "cannot authenticate token");
AuthenticationError::Unauthenticated
})?;
// Extend the request with the permissions, which may be useful in future
req.extensions_mut().insert(permissions);
// Extend the request with the token, which can be looked up later in authorization
req.extensions_mut().insert(token_id);
Ok(())
}
@ -1306,33 +1355,33 @@ fn extract_v1_auth_token(req: &mut Request<Body>) -> Option<Vec<u8>> {
.map(String::into_bytes)
}
fn validate_auth_header(header: HeaderValue) -> Result<Vec<u8>, AuthorizationError> {
fn validate_auth_header(header: HeaderValue) -> Result<Vec<u8>, AuthenticationError> {
// Split the header value into two parts
let mut header = header.to_str()?.split(' ');
// Check that the header is the 'Bearer' or 'Token' auth scheme
let auth_scheme = header.next().ok_or(AuthorizationError::MalformedRequest)?;
let auth_scheme = header.next().ok_or(AuthenticationError::MalformedRequest)?;
if auth_scheme != "Bearer" && auth_scheme != "Token" {
return Err(AuthorizationError::MalformedRequest);
return Err(AuthenticationError::MalformedRequest);
}
// Get the token that we want to hash to check the request is valid
let token = header.next().ok_or(AuthorizationError::MalformedRequest)?;
let token = header.next().ok_or(AuthenticationError::MalformedRequest)?;
// There should only be two parts the 'Bearer' scheme and the actual
// token, error otherwise
if header.next().is_some() {
return Err(AuthorizationError::MalformedRequest);
return Err(AuthenticationError::MalformedRequest);
}
Ok(token.as_bytes().to_vec())
}
impl From<authz::Error> for AuthorizationError {
impl From<authz::Error> for AuthenticationError {
fn from(auth_error: authz::Error) -> Self {
match auth_error {
authz::Error::Forbidden => Self::Forbidden,
_ => Self::Unauthorized,
_ => Self::Unauthenticated,
}
}
}
@ -1611,46 +1660,28 @@ pub(crate) async fn route_request(
http_server: Arc<HttpApi>,
mut req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
if let Err(e) = http_server.authorize_request(&mut req).await {
match e {
AuthorizationError::Unauthorized => {
return Ok(Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(Body::empty())
.unwrap());
}
AuthorizationError::MalformedRequest => {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("{\"error\":\
\"Authorization header was malformed and should be in the form 'Authorization: Bearer <token>'\"\
}"))
.unwrap());
}
AuthorizationError::Forbidden => {
return Ok(Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Body::empty())
.unwrap());
}
// We don't expect this to happen, but if the header is messed up
// better to handle it then not at all
AuthorizationError::ToStr(_) => {
return Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.unwrap());
}
}
}
debug!(request = ?req,"Processing request");
let method = req.method().clone();
let uri = req.uri().clone();
if uri.path() != all_paths::API_V3_CONFIGURE_ADMIN_TOKEN {
trace!(?uri, "authenticating request");
if let Some(authentication_error) = authenticate(&http_server, &mut req).await {
return authentication_error;
}
}
trace!(request = ?req,"Processing request");
let content_length = req.headers().get("content-length").cloned();
let response = match (method.clone(), uri.path()) {
(Method::POST, "/write") => {
let path = uri.path();
let response = match (method.clone(), path) {
(Method::POST, all_paths::API_V3_CONFIGURE_ADMIN_TOKEN) => {
http_server.create_admin_token(req).await
}
(Method::POST, all_paths::API_V3_CONFIGURE_ADMIN_TOKEN_REGENERATE) => {
http_server.regenerate_admin_token(req).await
}
(Method::POST, all_paths::API_LEGACY_WRITE) => {
let params = match http_server.legacy_write_param_unifier.parse_v1(&req).await {
Ok(p) => p.into(),
Err(e) => return Ok(legacy_write_error_to_response(e)),
@ -1658,71 +1689,77 @@ pub(crate) async fn route_request(
http_server.write_lp_inner(params, req, true).await
}
(Method::POST, "/api/v2/write") => {
(Method::POST, all_paths::API_V2_WRITE) => {
let params = match http_server.legacy_write_param_unifier.parse_v2(&req).await {
Ok(p) => p.into(),
Err(e) => return Ok(legacy_write_error_to_response(e)),
};
http_server.write_lp_inner(params, req, false).await
}
(Method::POST, "/api/v3/write_lp") => http_server.write_lp(req).await,
(Method::GET | Method::POST, "/api/v3/query_sql") => http_server.query_sql(req).await,
(Method::GET | Method::POST, "/api/v3/query_influxql") => {
(Method::POST, all_paths::API_V3_WRITE) => http_server.write_lp(req).await,
(Method::GET | Method::POST, all_paths::API_V3_QUERY_SQL) => {
http_server.query_sql(req).await
}
(Method::GET | Method::POST, all_paths::API_V3_QUERY_INFLUXQL) => {
http_server.query_influxql(req).await
}
(Method::GET | Method::POST, "/query") => http_server.v1_query(req).await,
(Method::GET, "/health" | "/api/v1/health") => http_server.health(),
(Method::GET | Method::POST, "/ping") => http_server.ping(),
(Method::GET, "/metrics") => http_server.handle_metrics(),
(Method::GET | Method::POST, path) if path.starts_with("/api/v3/engine/") => {
let path = path.strip_prefix("/api/v3/engine/").unwrap();
(Method::GET | Method::POST, all_paths::API_V1_QUERY) => http_server.v1_query(req).await,
(Method::GET, all_paths::API_V3_HEALTH | all_paths::API_V1_HEALTH) => http_server.health(),
(Method::GET | Method::POST, all_paths::API_PING) => http_server.ping(),
(Method::GET, all_paths::API_METRICS) => http_server.handle_metrics(),
(Method::GET | Method::POST, path) if path.starts_with(all_paths::API_V3_ENGINE) => {
let path = path.strip_prefix(all_paths::API_V3_ENGINE).unwrap();
http_server
.processing_engine_request_plugin(path, req)
.await
}
(Method::POST, "/api/v3/configure/distinct_cache") => {
(Method::POST, all_paths::API_V3_CONFIGURE_DISTINCT_CACHE) => {
http_server.configure_distinct_cache_create(req).await
}
(Method::DELETE, "/api/v3/configure/distinct_cache") => {
(Method::DELETE, all_paths::API_V3_CONFIGURE_DISTINCT_CACHE) => {
http_server.configure_distinct_cache_delete(req).await
}
(Method::POST, "/api/v3/configure/last_cache") => {
(Method::POST, all_paths::API_V3_CONFIGURE_LAST_CACHE) => {
http_server.configure_last_cache_create(req).await
}
(Method::DELETE, "/api/v3/configure/last_cache") => {
(Method::DELETE, all_paths::API_V3_CONFIGURE_LAST_CACHE) => {
http_server.configure_last_cache_delete(req).await
}
(Method::POST, "/api/v3/configure/processing_engine_trigger/disable") => {
(Method::POST, all_paths::API_V3_CONFIGURE_PROCESSING_ENGINE_DISABLE) => {
http_server.disable_processing_engine_trigger(req).await
}
(Method::POST, "/api/v3/configure/processing_engine_trigger/enable") => {
(Method::POST, all_paths::API_V3_CONFIGURE_PROCESSING_ENGINE_ENABLE) => {
http_server.enable_processing_engine_trigger(req).await
}
(Method::POST, "/api/v3/configure/processing_engine_trigger") => {
(Method::POST, all_paths::API_V3_CONFIGURE_PROCESSING_ENGINE_TRIGGER) => {
http_server.configure_processing_engine_trigger(req).await
}
(Method::DELETE, "/api/v3/configure/processing_engine_trigger") => {
(Method::DELETE, all_paths::API_V3_CONFIGURE_PROCESSING_ENGINE_TRIGGER) => {
http_server.delete_processing_engine_trigger(req).await
}
(Method::POST, "/api/v3/configure/plugin_environment/install_packages") => {
(Method::POST, all_paths::API_V3_CONFIGURE_PLUGIN_INSTALL_PACKAGES) => {
http_server.install_plugin_environment_packages(req).await
}
(Method::POST, "/api/v3/configure/plugin_environment/install_requirements") => {
(Method::POST, all_paths::API_V3_CONFIGURE_PLUGIN_INSTALL_REQUIREMENTS) => {
http_server
.install_plugin_environment_requirements(req)
.await
}
(Method::GET, "/api/v3/configure/database") => http_server.show_databases(req).await,
(Method::POST, "/api/v3/configure/database") => http_server.create_database(req).await,
(Method::DELETE, "/api/v3/configure/database") => http_server.delete_database(req).await,
(Method::POST, "/api/v3/configure/table") => http_server.create_table(req).await,
// TODO: make table delete to use path param (DELETE db/foodb/table/bar)
(Method::DELETE, "/api/v3/configure/table") => http_server.delete_table(req).await,
(Method::POST, "/api/v3/plugin_test/wal") => {
(Method::GET, all_paths::API_V3_CONFIGURE_DATABASE) => {
http_server.show_databases(req).await
}
(Method::POST, all_paths::API_V3_CONFIGURE_DATABASE) => {
http_server.create_database(req).await
}
(Method::DELETE, all_paths::API_V3_CONFIGURE_DATABASE) => {
http_server.delete_database(req).await
}
(Method::POST, all_paths::API_V3_CONFIGURE_TABLE) => http_server.create_table(req).await,
(Method::DELETE, all_paths::API_V3_CONFIGURE_TABLE) => http_server.delete_table(req).await,
(Method::POST, all_paths::API_V3_TEST_WAL_ROUTE) => {
http_server.test_processing_engine_wal_plugin(req).await
}
(Method::POST, "/api/v3/plugin_test/schedule") => {
(Method::POST, all_paths::API_V3_TEST_PLUGIN_ROUTE) => {
http_server
.test_processing_engine_schedule_plugin(req)
.await
@ -1749,6 +1786,45 @@ pub(crate) async fn route_request(
}
}
async fn authenticate(
http_server: &Arc<HttpApi>,
req: &mut Request<Body>,
) -> Option<std::result::Result<Response<Body>, Infallible>> {
if let Err(e) = http_server.authenticate_request(req).await {
match e {
AuthenticationError::Unauthenticated => {
return Some(Ok(Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(Body::empty())
.unwrap()));
}
AuthenticationError::MalformedRequest => {
return Some(Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("{\"error\":\
\"Authorization header was malformed and should be in the form 'Authorization: Bearer <token>'\"\
}"))
.unwrap()));
}
AuthenticationError::Forbidden => {
return Some(Ok(Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Body::empty())
.unwrap()));
}
// We don't expect this to happen, but if the header is messed up
// better to handle it then not at all
AuthenticationError::ToStr(_) => {
return Some(Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.unwrap()));
}
}
}
None
}
fn legacy_write_error_to_response(e: WriteParseError) -> Response<Body> {
let err: ErrorMessage<()> = ErrorMessage {
error: e.to_string(),

View File

@ -11,7 +11,7 @@ clippy::clone_on_ref_ptr,
clippy::future_not_send
)]
pub mod auth;
pub mod all_paths;
pub mod builder;
mod grpc;
mod http;
@ -27,6 +27,7 @@ use authz::Authorizer;
use hyper::server::conn::AddrIncoming;
use hyper::server::conn::Http;
use hyper::service::service_fn;
use influxdb3_authz::AuthProvider;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_write::persister::Persister;
use observability_deps::tracing::error;
@ -119,13 +120,13 @@ pub struct Server {
common_state: CommonServerState,
http: Arc<HttpApi>,
persister: Arc<Persister>,
authorizer: Arc<dyn Authorizer>,
authorizer: Arc<dyn AuthProvider>,
listener: TcpListener,
}
impl Server {
pub fn authorizer(&self) -> Arc<dyn Authorizer> {
Arc::clone(&self.authorizer)
Arc::clone(&self.authorizer.upcast())
}
}
@ -180,12 +181,12 @@ pub async fn serve(
#[cfg(test)]
mod tests {
use crate::auth::DefaultAuthorizer;
use crate::builder::ServerBuilder;
use crate::query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl};
use crate::serve;
use datafusion::parquet::data_type::AsBytes;
use hyper::{Body, Client, Request, Response, StatusCode, body};
use influxdb3_authz::NoAuthAuthenticator;
use influxdb3_cache::distinct_cache::DistinctCacheProvider;
use influxdb3_cache::last_cache::LastCacheProvider;
use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle;
@ -835,7 +836,7 @@ mod tests {
.write_buffer(Arc::clone(&write_buffer))
.query_executor(query_executor)
.persister(persister)
.authorizer(Arc::new(DefaultAuthorizer))
.authorizer(Arc::new(NoAuthAuthenticator))
.time_provider(Arc::clone(&time_provider) as _)
.tcp_listener(listener)
.processing_engine(processing_engine)

View File

@ -441,6 +441,7 @@ impl QueryDatabase for QueryExecutorImpl {
Arc::clone(&self.query_log),
Arc::clone(&self.write_buffer),
Arc::clone(&self.sys_events_store),
Arc::clone(&self.write_buffer.catalog()),
),
));
Ok(Some(Arc::new(Database::new(CreateDatabaseArgs {
@ -1223,4 +1224,68 @@ mod tests {
.await
.unwrap();
}
#[test_log::test(tokio::test)]
async fn test_token_permissions_sys_table_query_wrong_db_name() {
let (write_buffer, query_exec, _, _) = setup(None).await;
write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"\
cpu,host=a,region=us-east usage=250\n\
mem,host=a,region=us-east usage=150000\n\
",
Time::from_timestamp_nanos(100),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
// create an admin token
write_buffer
.catalog()
.create_admin_token(false)
.await
.unwrap();
let query = "select token_id, name, created_at, expiry, permissions, description, created_by_token_id, updated_at, updated_by_token_id FROM system.tokens";
let stream = query_exec
// `foo` is present but `system.tokens` is only available in `_internal` db
.query_sql("foo", query, None, None, None)
.await;
assert!(stream.is_err());
}
#[test_log::test(tokio::test)]
async fn test_token_permissions_sys_table_query_with_admin_token() {
let (write_buffer, query_exec, _, _) = setup(None).await;
// create an admin token
write_buffer
.catalog()
.create_admin_token(false)
.await
.unwrap();
let query = "select token_id, name, created_at, expiry, permissions, description, created_by_token_id, updated_at, updated_by_token_id FROM system.tokens";
let stream = query_exec
.query_sql("_internal", query, None, None, None)
.await
.unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
assert_batches_sorted_eq!(
[
"+----------+--------+---------------------+--------+-------------+-------------+---------------------+------------+---------------------+",
"| token_id | name | created_at | expiry | permissions | description | created_by_token_id | updated_at | updated_by_token_id |",
"+----------+--------+---------------------+--------+-------------+-------------+---------------------+------------+---------------------+",
"| 0 | _admin | 1970-01-01T00:00:00 | | *:*:* | | | | |",
"+----------+--------+---------------------+--------+-------------+-------------+---------------------+------------+---------------------+",
],
&batches
);
}
}

View File

@ -8,12 +8,13 @@ use datafusion::{
scalar::ScalarValue,
};
use distinct_caches::DistinctCachesTable;
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, INTERNAL_DB_NAME};
use influxdb3_sys_events::SysEventStore;
use influxdb3_write::WriteBuffer;
use iox_query::query_log::QueryLog;
use iox_system_tables::SystemTableProvider;
use parquet_files::ParquetFilesTable;
use tokens::TokenSystemTable;
use tonic::async_trait;
use self::{last_caches::LastCachesTable, queries::QueriesTable};
@ -25,6 +26,7 @@ use crate::system_tables::python_call::{ProcessingEngineLogsTable, ProcessingEng
mod python_call;
mod queries;
mod tokens;
pub(crate) const SYSTEM_SCHEMA_NAME: &str = "system";
pub(crate) const TABLE_NAME_PREDICATE: &str = "table_name";
@ -33,6 +35,7 @@ pub(crate) const QUERIES_TABLE_NAME: &str = "queries";
pub(crate) const LAST_CACHES_TABLE_NAME: &str = "last_caches";
pub(crate) const DISTINCT_CACHES_TABLE_NAME: &str = "distinct_caches";
pub(crate) const PARQUET_FILES_TABLE_NAME: &str = "parquet_files";
pub(crate) const TOKENS_TABLE_NAME: &str = "tokens";
const PROCESSING_ENGINE_TRIGGERS_TABLE_NAME: &str = "processing_engine_triggers";
@ -89,6 +92,7 @@ impl AllSystemSchemaTablesProvider {
query_log: Arc<QueryLog>,
buffer: Arc<dyn WriteBuffer>,
sys_events_store: Arc<SysEventStore>,
catalog: Arc<Catalog>,
) -> Self {
let mut tables = HashMap::<&'static str, Arc<dyn TableProvider>>::new();
let queries = Arc::new(SystemTableProvider::new(Arc::new(QueriesTable::new(
@ -124,6 +128,14 @@ impl AllSystemSchemaTablesProvider {
ProcessingEngineLogsTable::new(sys_events_store),
)));
tables.insert(PROCESSING_ENGINE_LOGS_TABLE_NAME, logs_table);
if db_schema.name.as_ref() == INTERNAL_DB_NAME {
tables.insert(
TOKENS_TABLE_NAME,
Arc::new(SystemTableProvider::new(Arc::new(TokenSystemTable::new(
Arc::clone(&catalog),
)))),
);
}
Self { tables }
}
}

View File

@ -0,0 +1,141 @@
use std::sync::Arc;
use arrow::array::{StringViewBuilder, TimestampMillisecondBuilder, UInt64Builder};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion::{error::DataFusionError, prelude::Expr};
use influxdb3_authz::TokenInfo;
use influxdb3_catalog::catalog::Catalog;
use iox_system_tables::IoxSystemTable;
use tonic::async_trait;
#[derive(Debug)]
pub(crate) struct TokenSystemTable {
catalog: Arc<Catalog>,
schema: SchemaRef,
}
impl TokenSystemTable {
pub(crate) fn new(catalog: Arc<Catalog>) -> Self {
Self {
catalog,
schema: table_schema(),
}
}
}
#[async_trait]
impl IoxSystemTable for TokenSystemTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
async fn scan(
&self,
_filters: Option<Vec<Expr>>,
_limit: Option<usize>,
) -> Result<RecordBatch, DataFusionError> {
let results = self.catalog.get_tokens();
to_record_batch(&self.schema, results)
}
}
fn table_schema() -> SchemaRef {
let columns = vec![
Field::new("token_id", DataType::UInt64, false),
Field::new("name", DataType::Utf8View, false),
Field::new("hash", DataType::Utf8View, false),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("description", DataType::Utf8View, true),
Field::new("created_by_token_id", DataType::UInt64, true),
Field::new(
"updated_at",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new("updated_by_token_id", DataType::UInt64, true),
Field::new(
"expiry",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new("permissions", DataType::Utf8View, false),
];
Arc::new(Schema::new(columns))
}
fn to_record_batch(
schema: &SchemaRef,
tokens: Vec<Arc<TokenInfo>>,
) -> Result<RecordBatch, DataFusionError> {
let mut id_arr = UInt64Builder::with_capacity(tokens.len());
let mut name_arr = StringViewBuilder::with_capacity(tokens.len());
let mut hash_arr = StringViewBuilder::with_capacity(tokens.len());
let mut created_at_arr = TimestampMillisecondBuilder::with_capacity(tokens.len());
let mut description_arr = StringViewBuilder::with_capacity(tokens.len());
let mut created_by_arr = UInt64Builder::with_capacity(tokens.len());
let mut updated_at_arr = TimestampMillisecondBuilder::with_capacity(tokens.len());
let mut updated_by_arr = UInt64Builder::with_capacity(tokens.len());
let mut expiry_arr = TimestampMillisecondBuilder::with_capacity(tokens.len());
let mut permissions_arr = StringViewBuilder::with_capacity(tokens.len());
for token in &tokens {
id_arr.append_value(token.id.get());
name_arr.append_value(&token.name);
hash_arr.append_value(hex::encode(&token.hash));
created_at_arr.append_value(token.created_at);
if token.description.is_some() {
description_arr.append_value(token.description.clone().unwrap());
} else {
description_arr.append_null();
}
if token.created_by.is_some() {
created_by_arr.append_value(token.created_by.unwrap().get());
} else {
created_by_arr.append_null();
}
if token.updated_at.is_some() {
updated_at_arr.append_value(token.updated_at.unwrap());
} else {
updated_at_arr.append_null();
}
if token.updated_by.is_some() {
updated_by_arr.append_value(token.updated_by.unwrap().get());
} else {
updated_by_arr.append_null();
}
// when expiry is not passed in, we default it to i64::MAX (which is same as null)
if token.expiry_millis == i64::MAX {
expiry_arr.append_null();
} else {
expiry_arr.append_value(token.expiry_millis);
}
// core only
let permissions_str = "*:*:*".to_string();
permissions_arr.append_value(permissions_str);
}
let columns: Vec<ArrayRef> = vec![
Arc::new(id_arr.finish()),
Arc::new(name_arr.finish()),
Arc::new(hash_arr.finish()),
Arc::new(created_at_arr.finish()),
Arc::new(description_arr.finish()),
Arc::new(created_by_arr.finish()),
Arc::new(updated_at_arr.finish()),
Arc::new(updated_by_arr.finish()),
Arc::new(expiry_arr.finish()),
Arc::new(permissions_arr.finish()),
];
Ok(RecordBatch::try_new(Arc::clone(schema), columns)?)
}

View File

@ -11,13 +11,16 @@ iox_http.workspace = true
iox_query_params.workspace = true
# Local deps
influxdb3_authz = { path = "../influxdb3_authz" }
influxdb3_cache = { path = "../influxdb3_cache" }
influxdb3_catalog = { path = "../influxdb3_catalog" }
# crates.io dependencies
anyhow.workspace = true
chrono.workspace = true
serde.workspace = true
hashbrown.workspace = true
hex.workspace = true
hyper.workspace = true
thiserror.workspace = true

View File

@ -1,3 +1,7 @@
use std::sync::Arc;
use chrono::{DateTime, Utc};
use influxdb3_authz::TokenInfo;
use influxdb3_catalog::log::TriggerSettings;
use crate::write::Precision;
@ -350,3 +354,29 @@ impl From<iox_http::write::WriteParams> for WriteParams {
}
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct CreateTokenWithPermissionsResponse {
id: u64,
name: Arc<str>,
pub token: Arc<str>,
pub hash: Arc<str>,
created_at: chrono::DateTime<Utc>,
expiry: Option<chrono::DateTime<Utc>>,
}
impl CreateTokenWithPermissionsResponse {
pub fn from_token_info(token_info: Arc<TokenInfo>, token: String) -> Option<Self> {
let expiry = token_info
.maybe_expiry_millis()
.and_then(DateTime::from_timestamp_millis);
Some(Self {
id: token_info.id.get(),
name: Arc::clone(&token_info.name),
token: Arc::from(token.as_str()),
hash: hex::encode(&token_info.hash).into(),
created_at: DateTime::from_timestamp_millis(token_info.created_at)?,
expiry,
})
}
}

View File

@ -714,7 +714,7 @@ mod tests {
.unwrap_success()
.convert_lines_to_buffer(Gen1Duration::new_5m());
let db = catalog.db_schema_by_id(&DbId::from(0)).unwrap();
let db = catalog.db_schema_by_id(&DbId::from(1)).unwrap();
assert_eq!(db.tables.len(), 2);
// cpu table
@ -1487,7 +1487,7 @@ mod tests {
let persisted_snapshot =
serde_json::from_slice::<PersistedSnapshot>(&persisted_snapshot_bytes).unwrap();
assert_eq!(
CatalogSequenceNumber::new(1),
CatalogSequenceNumber::new(2),
persisted_snapshot.catalog_sequence_number
);
}
@ -1910,7 +1910,7 @@ mod tests {
// this persists the catalog immediately, so we don't wait for anything, just assert that
// the next db id is 1, since the above would have used 0
assert_eq!(wbuf.catalog().next_db_id(), DbId::new(1));
assert_eq!(wbuf.catalog().next_db_id(), DbId::new(2));
// drop the write buffer, and create a new one that replays and re-loads the catalog:
drop(wbuf);
@ -1930,7 +1930,7 @@ mod tests {
.await;
// check that the next db id is still 1
assert_eq!(wbuf.catalog().next_db_id(), DbId::new(1));
assert_eq!(wbuf.catalog().next_db_id(), DbId::new(2));
}
#[test_log::test(tokio::test)]
@ -1952,7 +1952,7 @@ mod tests {
)
.await;
let db_name = "my_corp";
let db_id = DbId::from(0);
let db_id = DbId::from(1);
let tbl_name = "temp";
let tbl_id = TableId::from(0);
@ -2060,7 +2060,7 @@ mod tests {
)
.await;
let db_name = "my_corp";
let db_id = DbId::from(0);
let db_id = DbId::from(1);
let tbl_name = "temp";
let tbl_id = TableId::from(0);
@ -2779,7 +2779,7 @@ mod tests {
// get the path for the created parquet file
let persisted_files = write_buffer
.persisted_files()
.get_files(DbId::from(0), TableId::from(0));
.get_files(DbId::from(1), TableId::from(0));
assert_eq!(1, persisted_files.len());
let path = ObjPath::from(persisted_files[0].path.as_str());
@ -2808,7 +2808,7 @@ mod tests {
// at this point everything should've been snapshotted
drop(write_buffer);
debug!(">>> test: stopped");
debug!("test: stopped");
// nothing in the cache at this point and not in buffer
let (write_buffer, ctx, _) = setup_cache_optional(
// move the time
@ -2909,7 +2909,7 @@ mod tests {
.unwrap_success()
.convert_lines_to_buffer(Gen1Duration::new_5m());
let db = catalog.db_schema_by_id(&DbId::from(0)).unwrap();
let db = catalog.db_schema_by_id(&DbId::from(1)).unwrap();
assert_eq!(db.tables.len(), 1);
assert_eq!(
@ -2937,7 +2937,7 @@ mod tests {
.convert_lines_to_buffer(Gen1Duration::new_5m());
assert_eq!(db.tables.len(), 1);
let db = catalog.db_schema_by_id(&DbId::from(0)).unwrap();
let db = catalog.db_schema_by_id(&DbId::from(1)).unwrap();
let table = db.tables.get_by_id(&TableId::from(0)).unwrap();
assert_eq!(table.num_columns(), 4);
assert_eq!(table.series_key.len(), 2);

View File

@ -13,6 +13,22 @@ expression: catalog_json
0,
{
"id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "db",
"tables": {
"repo": [
@ -117,9 +133,13 @@ expression: catalog_json
}
]
],
"next_id": 1
"next_id": 2
},
"sequence": 4,
"tokens": {
"repo": [],
"next_id": 0
},
"sequence": 3,
"catalog_id": "test_host",
"catalog_uuid": "[uuid]"
}

View File

@ -13,6 +13,22 @@ expression: catalog_json
0,
{
"id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "db",
"tables": {
"repo": [
@ -107,9 +123,13 @@ expression: catalog_json
}
]
],
"next_id": 1
"next_id": 2
},
"sequence": 3,
"tokens": {
"repo": [],
"next_id": 0
},
"sequence": 2,
"catalog_id": "test_host",
"catalog_uuid": "[uuid]"
}

View File

@ -13,6 +13,22 @@ expression: catalog_json
0,
{
"id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "db",
"tables": {
"repo": [
@ -101,9 +117,13 @@ expression: catalog_json
}
]
],
"next_id": 1
"next_id": 2
},
"sequence": 5,
"tokens": {
"repo": [],
"next_id": 0
},
"sequence": 4,
"catalog_id": "test_host",
"catalog_uuid": "[uuid]"
}