feat: list admin tokens

- allows listing admin tokens
- uses _internal db for token system table
- mostly test fixes due to _internal db
praveen/token-support
Praveen Kumar 2025-04-09 13:30:05 +01:00
parent d69fbfc746
commit 695631566f
No known key found for this signature in database
GPG Key ID: CB9E05780A79EA5A
18 changed files with 526 additions and 71 deletions

View File

@ -19,10 +19,33 @@ pub enum SubCommand {
/// List databases /// List databases
Databases(DatabaseConfig), Databases(DatabaseConfig),
/// List tokens
Tokens(ShowTokensConfig),
/// Display system table data. /// Display system table data.
System(SystemConfig), System(SystemConfig),
} }
#[derive(Debug, Parser)]
pub struct ShowTokensConfig {
/// The host URL of the running InfluxDB 3 Enterprise server
#[clap(
short = 'H',
long = "host",
env = "INFLUXDB3_HOST_URL",
default_value = "http://127.0.0.1:8181"
)]
host_url: Url,
/// The token for authentication with the InfluxDB 3 Enterprise server
#[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")]
auth_token: Option<Secret<String>>,
/// The format in which to output the list of databases
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,
}
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
pub struct DatabaseConfig { pub struct DatabaseConfig {
/// The host URL of the running InfluxDB 3 Core server /// The host URL of the running InfluxDB 3 Core server
@ -71,6 +94,19 @@ pub(crate) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
println!("{}", std::str::from_utf8(&resp_bytes)?); println!("{}", std::str::from_utf8(&resp_bytes)?);
} }
SubCommand::System(cfg) => system::command(cfg).await?, SubCommand::System(cfg) => system::command(cfg).await?,
SubCommand::Tokens(show_tokens_config) => {
let mut client = influxdb3_client::Client::new(show_tokens_config.host_url.clone())?;
if let Some(t) = show_tokens_config.auth_token {
client = client.with_auth_token(t.expose_secret());
}
let resp_bytes = client
.api_v3_query_sql("_internal", "select * from system.tokens")
.format(show_tokens_config.output_format.into())
.send()
.await?;
println!("{}", std::str::from_utf8(&resp_bytes)?);
}
} }
Ok(()) Ok(())

View File

@ -181,6 +181,7 @@ async fn test_show_databases() {
+---------------+\n\ +---------------+\n\
| iox::database |\n\ | iox::database |\n\
+---------------+\n\ +---------------+\n\
| _internal |\n\
| bar |\n\ | bar |\n\
| foo |\n\ | foo |\n\
+---------------+\ +---------------+\
@ -191,7 +192,7 @@ async fn test_show_databases() {
// Show databases with JSON format // Show databases with JSON format
let output = server.show_databases().with_format("json").run().unwrap(); let output = server.show_databases().with_format("json").run().unwrap();
assert_eq!( assert_eq!(
r#"[{"iox::database":"bar"},{"iox::database":"foo"}]"#, r#"[{"iox::database":"_internal"},{"iox::database":"bar"},{"iox::database":"foo"}]"#,
output output
); );
@ -200,6 +201,7 @@ async fn test_show_databases() {
assert_eq!( assert_eq!(
"\ "\
iox::database\n\ iox::database\n\
_internal\n\
bar\n\ bar\n\
foo\ foo\
", ",
@ -210,6 +212,7 @@ async fn test_show_databases() {
let output = server.show_databases().with_format("jsonl").run().unwrap(); let output = server.show_databases().with_format("jsonl").run().unwrap();
assert_eq!( assert_eq!(
"\ "\
{\"iox::database\":\"_internal\"}\n\
{\"iox::database\":\"bar\"}\n\ {\"iox::database\":\"bar\"}\n\
{\"iox::database\":\"foo\"}\ {\"iox::database\":\"foo\"}\
", ",
@ -226,6 +229,7 @@ async fn test_show_databases() {
+---------------+\n\ +---------------+\n\
| iox::database |\n\ | iox::database |\n\
+---------------+\n\ +---------------+\n\
| _internal |\n\
| bar |\n\ | bar |\n\
+---------------+", +---------------+",
output output
@ -237,30 +241,6 @@ async fn test_show_databases() {
assert_contains!(output, "foo-"); assert_contains!(output, "foo-");
} }
#[test_log::test(tokio::test)]
async fn test_show_empty_database() {
let server = TestServer::spawn().await;
// Show empty database list with default format (pretty)
let output = server.show_databases().run().unwrap();
assert_eq!(
"\
+---------------+\n\
| iox::database |\n\
+---------------+\n\
+---------------+",
output
);
// Show empty database list with JSON format
let output = server.show_databases().with_format("json").run().unwrap();
assert_eq!(output, "[]");
// Show empty database list with JSONL format
let output = server.show_databases().with_format("jsonl").run().unwrap();
assert_eq!(output, "");
}
#[test_log::test(tokio::test)] #[test_log::test(tokio::test)]
async fn test_create_database() { async fn test_create_database() {
let server = TestServer::spawn().await; let server = TestServer::spawn().await;
@ -735,6 +715,7 @@ async fn test_database_create_persists() {
r#"+---------------+ r#"+---------------+
| iox::database | | iox::database |
+---------------+ +---------------+
| _internal |
| foo | | foo |
+---------------+"#, +---------------+"#,
result result

View File

@ -782,9 +782,8 @@ async fn api_v3_configure_db_delete() {
.json::<Value>() .json::<Value>()
.await .await
.unwrap(); .unwrap();
debug!(result = ?result, ">> RESULT");
assert_eq!( assert_eq!(
json!([{ "deleted": false, "iox::database": "foo" } ]), json!([{ "deleted": false, "iox::database": "_internal" }, { "deleted": false, "iox::database": "foo" } ]),
result result
); );
@ -802,9 +801,8 @@ async fn api_v3_configure_db_delete() {
.json::<Value>() .json::<Value>()
.await .await
.unwrap(); .unwrap();
debug!(result = ?result, ">> RESULT");
let array_result = result.as_array().unwrap(); let array_result = result.as_array().unwrap();
assert_eq!(1, array_result.len()); assert_eq!(2, array_result.len());
let first_db = array_result.first().unwrap(); let first_db = array_result.first().unwrap();
assert_contains!( assert_contains!(
first_db first_db
@ -814,6 +812,17 @@ async fn api_v3_configure_db_delete() {
.unwrap() .unwrap()
.as_str() .as_str()
.unwrap(), .unwrap(),
"_internal"
);
let second_db = array_result.get(1).unwrap();
assert_contains!(
second_db
.as_object()
.unwrap()
.get("iox::database")
.unwrap()
.as_str()
.unwrap(),
"foo-" "foo-"
); );
@ -832,15 +841,25 @@ async fn api_v3_configure_db_delete() {
.json::<Value>() .json::<Value>()
.await .await
.unwrap(); .unwrap();
debug!(result = ?result, ">> RESULT");
let array_result = result.as_array().unwrap(); let array_result = result.as_array().unwrap();
// check there are 2 dbs now, foo and foo-* // check there are 2 dbs now, foo and foo-*
assert_eq!(2, array_result.len()); assert_eq!(3, array_result.len());
let first_db = array_result.first().unwrap(); let first_db = array_result.first().unwrap();
let second_db = array_result.get(1).unwrap(); let second_db = array_result.get(1).unwrap();
let third_db = array_result.get(2).unwrap();
assert_contains!(
first_db
.as_object()
.unwrap()
.get("iox::database")
.unwrap()
.as_str()
.unwrap(),
"_internal"
);
assert_eq!( assert_eq!(
"foo", "foo",
first_db second_db
.as_object() .as_object()
.unwrap() .unwrap()
.get("iox::database") .get("iox::database")
@ -849,7 +868,7 @@ async fn api_v3_configure_db_delete() {
.unwrap(), .unwrap(),
); );
assert_contains!( assert_contains!(
second_db third_db
.as_object() .as_object()
.unwrap() .unwrap()
.get("iox::database") .get("iox::database")

View File

@ -356,6 +356,7 @@ async fn api_v3_query_influxql() {
+---------------+---------+\n\ +---------------+---------+\n\
| iox::database | deleted |\n\ | iox::database | deleted |\n\
+---------------+---------+\n\ +---------------+---------+\n\
| _internal | false |\n\
| bar | false |\n\ | bar | false |\n\
| foo | false |\n\ | foo | false |\n\
+---------------+---------+", +---------------+---------+",
@ -366,6 +367,7 @@ async fn api_v3_query_influxql() {
expected: "+---------------+---------+----------+\n\ expected: "+---------------+---------+----------+\n\
| iox::database | name | duration |\n\ | iox::database | name | duration |\n\
+---------------+---------+----------+\n\ +---------------+---------+----------+\n\
| _internal | autogen | |\n\
| bar | autogen | |\n\ | bar | autogen | |\n\
| foo | autogen | |\n\ | foo | autogen | |\n\
+---------------+---------+----------+", +---------------+---------+----------+",
@ -665,6 +667,10 @@ async fn api_v3_query_json_format() {
database: None, database: None,
query: "SHOW DATABASES", query: "SHOW DATABASES",
expected: json!([ expected: json!([
{
"deleted": false,
"iox::database": "_internal",
},
{ {
"deleted": false, "deleted": false,
"iox::database": "foo", "iox::database": "foo",
@ -675,6 +681,10 @@ async fn api_v3_query_json_format() {
database: None, database: None,
query: "SHOW RETENTION POLICIES", query: "SHOW RETENTION POLICIES",
expected: json!([ expected: json!([
{
"iox::database": "_internal",
"name": "autogen",
},
{ {
"iox::database": "foo", "iox::database": "foo",
"name": "autogen", "name": "autogen",
@ -771,12 +781,16 @@ async fn api_v3_query_jsonl_format() {
TestCase { TestCase {
database: None, database: None,
query: "SHOW DATABASES", query: "SHOW DATABASES",
expected: "{\"iox::database\":\"foo\",\"deleted\":false}\n".into(), expected:
"{\"iox::database\":\"_internal\",\"deleted\":false}\n\
{\"iox::database\":\"foo\",\"deleted\":false}\n".into(),
}, },
TestCase { TestCase {
database: None, database: None,
query: "SHOW RETENTION POLICIES", query: "SHOW RETENTION POLICIES",
expected: "{\"iox::database\":\"foo\",\"name\":\"autogen\"}\n".into(), expected:
"{\"iox::database\":\"_internal\",\"name\":\"autogen\"}\n\
{\"iox::database\":\"foo\",\"name\":\"autogen\"}\n".into(),
}, },
]; ];
for t in test_cases { for t in test_cases {

View File

@ -13,6 +13,22 @@ expression: catalog.snapshot()
0, 0,
{ {
"id": 0, "id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "test_db", "name": "test_db",
"tables": { "tables": {
"repo": [ "repo": [
@ -258,9 +274,9 @@ expression: catalog.snapshot()
} }
] ]
], ],
"next_id": 1 "next_id": 2
}, },
"sequence": 6, "sequence": 7,
"tokens": { "tokens": {
"repo": [], "repo": [],
"next_id": 0 "next_id": 0

View File

@ -59,6 +59,8 @@ const SOFT_DELETION_TIME_FORMAT: &str = "%Y%m%dT%H%M%S";
pub const TIME_COLUMN_NAME: &str = "time"; pub const TIME_COLUMN_NAME: &str = "time";
pub const INTERNAL_DB_NAME: &str = "_internal";
const DEFAULT_ADMIN_TOKEN_NAME: &str = "_admin"; const DEFAULT_ADMIN_TOKEN_NAME: &str = "_admin";
/// The sequence number of a batch of WAL operations. /// The sequence number of a batch of WAL operations.
@ -133,7 +135,7 @@ impl Catalog {
let store = let store =
ObjectStoreCatalog::new(Arc::clone(&node_id), CATALOG_CHECKPOINT_INTERVAL, store); ObjectStoreCatalog::new(Arc::clone(&node_id), CATALOG_CHECKPOINT_INTERVAL, store);
let subscriptions = Default::default(); let subscriptions = Default::default();
store let mut catalog = store
.load_or_create_catalog() .load_or_create_catalog()
.await .await
.map_err(Into::into) .map_err(Into::into)
@ -143,7 +145,10 @@ impl Catalog {
time_provider, time_provider,
store, store,
inner, inner,
}) });
create_internal_db(&mut catalog).await;
catalog
} }
pub async fn new_with_shutdown( pub async fn new_with_shutdown(
@ -367,6 +372,16 @@ impl Catalog {
result result
} }
pub fn get_tokens(&self) -> Vec<Arc<TokenInfo>> {
self.inner
.read()
.tokens
.repo()
.iter()
.map(|(_, token_info)| Arc::clone(token_info))
.collect()
}
pub async fn create_admin_token(&self, regenerate: bool) -> Result<(Arc<TokenInfo>, String)> { pub async fn create_admin_token(&self, regenerate: bool) -> Result<(Arc<TokenInfo>, String)> {
// if regen, if token is present already create a new token and hash and update the // if regen, if token is present already create a new token and hash and update the
// existing token otherwise we should insert to catalog (essentially an upsert) // existing token otherwise we should insert to catalog (essentially an upsert)
@ -447,6 +462,30 @@ impl Catalog {
} }
} }
async fn create_internal_db(catalog: &mut std::result::Result<Catalog, CatalogError>) {
// if catalog is initialised, create internal db
if let Ok(catalog) = catalog.as_mut() {
let result = catalog.create_database(INTERNAL_DB_NAME).await;
// what is the best outcome if "_internal" cannot be created?
match result {
Ok(_) => info!("created internal database"),
Err(err) => {
match err {
CatalogError::AlreadyExists => {
// this is probably ok
debug!("not creating internal db as it exists already");
}
_ => {
// all other errors are unexpected state
error!(?err, "unexpected error when creating internal db");
panic!("cannot create internal db");
}
}
}
};
}
}
impl Catalog { impl Catalog {
/// Create new `Catalog` that uses an in-memory object store. /// Create new `Catalog` that uses an in-memory object store.
/// ///
@ -667,7 +706,11 @@ impl InnerCatalog {
} }
pub fn database_count(&self) -> usize { pub fn database_count(&self) -> usize {
self.databases.iter().filter(|db| !db.1.deleted).count() self.databases
.iter()
// count if not db deleted _and_ not internal
.filter(|db| !db.1.deleted && db.1.name().as_ref() != INTERNAL_DB_NAME)
.count()
} }
pub fn table_count(&self) -> usize { pub fn table_count(&self) -> usize {
@ -1854,7 +1897,7 @@ mod tests {
".catalog_uuid" => "[uuid]" ".catalog_uuid" => "[uuid]"
}); });
catalog.update_from_snapshot(snapshot); catalog.update_from_snapshot(snapshot);
assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(0))); assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(1)));
}); });
} }
} }
@ -1942,7 +1985,7 @@ mod tests {
".catalog_uuid" => "[uuid]" ".catalog_uuid" => "[uuid]"
}); });
catalog.update_from_snapshot(snapshot); catalog.update_from_snapshot(snapshot);
assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(0))); assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(1)));
}); });
} }
} }
@ -1989,7 +2032,7 @@ mod tests {
".catalog_uuid" => "[uuid]" ".catalog_uuid" => "[uuid]"
}); });
catalog.update_from_snapshot(snapshot); catalog.update_from_snapshot(snapshot);
assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(0))); assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(1)));
}); });
} }
} }
@ -2035,7 +2078,7 @@ mod tests {
".catalog_uuid" => "[uuid]" ".catalog_uuid" => "[uuid]"
}); });
catalog.update_from_snapshot(snapshot); catalog.update_from_snapshot(snapshot);
assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(0))); assert_eq!(catalog.db_name_to_id("test_db"), Some(DbId::from(1)));
}); });
} }
} }

View File

@ -13,6 +13,22 @@ expression: catalog.snapshot()
0, 0,
{ {
"id": 0, "id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "test_db", "name": "test_db",
"tables": { "tables": {
"repo": [ "repo": [
@ -101,9 +117,9 @@ expression: catalog.snapshot()
} }
] ]
], ],
"next_id": 1 "next_id": 2
}, },
"sequence": 3, "sequence": 4,
"tokens": { "tokens": {
"repo": [], "repo": [],
"next_id": 0 "next_id": 0

View File

@ -14,6 +14,22 @@ expression: snapshot
0, 0,
{ {
"id": 0, "id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "test_db", "name": "test_db",
"tables": { "tables": {
"repo": [ "repo": [
@ -300,9 +316,9 @@ expression: snapshot
} }
] ]
], ],
"next_id": 1 "next_id": 2
}, },
"sequence": 3, "sequence": 4,
"tokens": { "tokens": {
"repo": [], "repo": [],
"next_id": 0 "next_id": 0

View File

@ -14,6 +14,22 @@ expression: snapshot
0, 0,
{ {
"id": 0, "id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "test_db", "name": "test_db",
"tables": { "tables": {
"repo": [ "repo": [
@ -140,9 +156,9 @@ expression: snapshot
} }
] ]
], ],
"next_id": 1 "next_id": 2
}, },
"sequence": 3, "sequence": 4,
"tokens": { "tokens": {
"repo": [], "repo": [],
"next_id": 0 "next_id": 0

View File

@ -14,6 +14,22 @@ expression: snapshot
0, 0,
{ {
"id": 0, "id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "test_db", "name": "test_db",
"tables": { "tables": {
"repo": [ "repo": [
@ -143,9 +159,9 @@ expression: snapshot
} }
] ]
], ],
"next_id": 1 "next_id": 2
}, },
"sequence": 3, "sequence": 4,
"tokens": { "tokens": {
"repo": [], "repo": [],
"next_id": 0 "next_id": 0

View File

@ -14,6 +14,22 @@ expression: snapshot
0, 0,
{ {
"id": 0, "id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "test_db", "name": "test_db",
"tables": { "tables": {
"repo": [ "repo": [
@ -124,9 +140,9 @@ expression: snapshot
} }
] ]
], ],
"next_id": 1 "next_id": 2
}, },
"sequence": 2, "sequence": 3,
"tokens": { "tokens": {
"repo": [], "repo": [],
"next_id": 0 "next_id": 0

View File

@ -441,6 +441,7 @@ impl QueryDatabase for QueryExecutorImpl {
Arc::clone(&self.query_log), Arc::clone(&self.query_log),
Arc::clone(&self.write_buffer), Arc::clone(&self.write_buffer),
Arc::clone(&self.sys_events_store), Arc::clone(&self.sys_events_store),
Arc::clone(&self.write_buffer.catalog()),
), ),
)); ));
Ok(Some(Arc::new(Database::new(CreateDatabaseArgs { Ok(Some(Arc::new(Database::new(CreateDatabaseArgs {
@ -1223,4 +1224,68 @@ mod tests {
.await .await
.unwrap(); .unwrap();
} }
#[test_log::test(tokio::test)]
async fn test_token_permissions_sys_table_query_wrong_db_name() {
let (write_buffer, query_exec, _, _) = setup(None).await;
write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"\
cpu,host=a,region=us-east usage=250\n\
mem,host=a,region=us-east usage=150000\n\
",
Time::from_timestamp_nanos(100),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
// create an admin token
write_buffer
.catalog()
.create_admin_token(false)
.await
.unwrap();
let query = "select token_id, name, created_at, expiry, permissions, description, created_by_token_id, updated_at, updated_by_token_id FROM system.tokens";
let stream = query_exec
// `foo` is present but `system.tokens` is only available in `_internal` db
.query_sql("foo", query, None, None, None)
.await;
assert!(stream.is_err());
}
#[test_log::test(tokio::test)]
async fn test_token_permissions_sys_table_query_with_admin_token() {
let (write_buffer, query_exec, _, _) = setup(None).await;
// create an admin token
write_buffer
.catalog()
.create_admin_token(false)
.await
.unwrap();
let query = "select token_id, name, created_at, expiry, permissions, description, created_by_token_id, updated_at, updated_by_token_id FROM system.tokens";
let stream = query_exec
.query_sql("_internal", query, None, None, None)
.await
.unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
assert_batches_sorted_eq!(
[
"+----------+--------+---------------------+--------+-------------+-------------+---------------------+------------+---------------------+",
"| token_id | name | created_at | expiry | permissions | description | created_by_token_id | updated_at | updated_by_token_id |",
"+----------+--------+---------------------+--------+-------------+-------------+---------------------+------------+---------------------+",
"| 0 | _admin | 1970-01-01T00:00:00 | | *:*:* | | | | |",
"+----------+--------+---------------------+--------+-------------+-------------+---------------------+------------+---------------------+",
],
&batches
);
}
} }

View File

@ -8,12 +8,13 @@ use datafusion::{
scalar::ScalarValue, scalar::ScalarValue,
}; };
use distinct_caches::DistinctCachesTable; use distinct_caches::DistinctCachesTable;
use influxdb3_catalog::catalog::DatabaseSchema; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, INTERNAL_DB_NAME};
use influxdb3_sys_events::SysEventStore; use influxdb3_sys_events::SysEventStore;
use influxdb3_write::WriteBuffer; use influxdb3_write::WriteBuffer;
use iox_query::query_log::QueryLog; use iox_query::query_log::QueryLog;
use iox_system_tables::SystemTableProvider; use iox_system_tables::SystemTableProvider;
use parquet_files::ParquetFilesTable; use parquet_files::ParquetFilesTable;
use tokens::TokenSystemTable;
use tonic::async_trait; use tonic::async_trait;
use self::{last_caches::LastCachesTable, queries::QueriesTable}; use self::{last_caches::LastCachesTable, queries::QueriesTable};
@ -25,6 +26,7 @@ use crate::system_tables::python_call::{ProcessingEngineLogsTable, ProcessingEng
mod python_call; mod python_call;
mod queries; mod queries;
mod tokens;
pub(crate) const SYSTEM_SCHEMA_NAME: &str = "system"; pub(crate) const SYSTEM_SCHEMA_NAME: &str = "system";
pub(crate) const TABLE_NAME_PREDICATE: &str = "table_name"; pub(crate) const TABLE_NAME_PREDICATE: &str = "table_name";
@ -33,6 +35,7 @@ pub(crate) const QUERIES_TABLE_NAME: &str = "queries";
pub(crate) const LAST_CACHES_TABLE_NAME: &str = "last_caches"; pub(crate) const LAST_CACHES_TABLE_NAME: &str = "last_caches";
pub(crate) const DISTINCT_CACHES_TABLE_NAME: &str = "distinct_caches"; pub(crate) const DISTINCT_CACHES_TABLE_NAME: &str = "distinct_caches";
pub(crate) const PARQUET_FILES_TABLE_NAME: &str = "parquet_files"; pub(crate) const PARQUET_FILES_TABLE_NAME: &str = "parquet_files";
pub(crate) const TOKENS_TABLE_NAME: &str = "tokens";
const PROCESSING_ENGINE_TRIGGERS_TABLE_NAME: &str = "processing_engine_triggers"; const PROCESSING_ENGINE_TRIGGERS_TABLE_NAME: &str = "processing_engine_triggers";
@ -89,6 +92,7 @@ impl AllSystemSchemaTablesProvider {
query_log: Arc<QueryLog>, query_log: Arc<QueryLog>,
buffer: Arc<dyn WriteBuffer>, buffer: Arc<dyn WriteBuffer>,
sys_events_store: Arc<SysEventStore>, sys_events_store: Arc<SysEventStore>,
catalog: Arc<Catalog>,
) -> Self { ) -> Self {
let mut tables = HashMap::<&'static str, Arc<dyn TableProvider>>::new(); let mut tables = HashMap::<&'static str, Arc<dyn TableProvider>>::new();
let queries = Arc::new(SystemTableProvider::new(Arc::new(QueriesTable::new( let queries = Arc::new(SystemTableProvider::new(Arc::new(QueriesTable::new(
@ -124,6 +128,14 @@ impl AllSystemSchemaTablesProvider {
ProcessingEngineLogsTable::new(sys_events_store), ProcessingEngineLogsTable::new(sys_events_store),
))); )));
tables.insert(PROCESSING_ENGINE_LOGS_TABLE_NAME, logs_table); tables.insert(PROCESSING_ENGINE_LOGS_TABLE_NAME, logs_table);
if db_schema.name.as_ref() == INTERNAL_DB_NAME {
tables.insert(
TOKENS_TABLE_NAME,
Arc::new(SystemTableProvider::new(Arc::new(TokenSystemTable::new(
Arc::clone(&catalog),
)))),
);
}
Self { tables } Self { tables }
} }
} }

View File

@ -0,0 +1,141 @@
use std::sync::Arc;
use arrow::array::{StringViewBuilder, TimestampMillisecondBuilder, UInt64Builder};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion::{error::DataFusionError, prelude::Expr};
use influxdb3_authz::TokenInfo;
use influxdb3_catalog::catalog::Catalog;
use iox_system_tables::IoxSystemTable;
use tonic::async_trait;
#[derive(Debug)]
pub(crate) struct TokenSystemTable {
catalog: Arc<Catalog>,
schema: SchemaRef,
}
impl TokenSystemTable {
pub(crate) fn new(catalog: Arc<Catalog>) -> Self {
Self {
catalog,
schema: table_schema(),
}
}
}
#[async_trait]
impl IoxSystemTable for TokenSystemTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
async fn scan(
&self,
_filters: Option<Vec<Expr>>,
_limit: Option<usize>,
) -> Result<RecordBatch, DataFusionError> {
let results = self.catalog.get_tokens();
to_record_batch(&self.schema, results)
}
}
fn table_schema() -> SchemaRef {
let columns = vec![
Field::new("token_id", DataType::UInt64, false),
Field::new("name", DataType::Utf8View, false),
Field::new("hash", DataType::Utf8View, false),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("description", DataType::Utf8View, true),
Field::new("created_by_token_id", DataType::UInt64, true),
Field::new(
"updated_at",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new("updated_by_token_id", DataType::UInt64, true),
Field::new(
"expiry",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new("permissions", DataType::Utf8View, false),
];
Arc::new(Schema::new(columns))
}
fn to_record_batch(
schema: &SchemaRef,
tokens: Vec<Arc<TokenInfo>>,
) -> Result<RecordBatch, DataFusionError> {
let mut id_arr = UInt64Builder::with_capacity(tokens.len());
let mut name_arr = StringViewBuilder::with_capacity(tokens.len());
let mut hash_arr = StringViewBuilder::with_capacity(tokens.len());
let mut created_at_arr = TimestampMillisecondBuilder::with_capacity(tokens.len());
let mut description_arr = StringViewBuilder::with_capacity(tokens.len());
let mut created_by_arr = UInt64Builder::with_capacity(tokens.len());
let mut updated_at_arr = TimestampMillisecondBuilder::with_capacity(tokens.len());
let mut updated_by_arr = UInt64Builder::with_capacity(tokens.len());
let mut expiry_arr = TimestampMillisecondBuilder::with_capacity(tokens.len());
let mut permissions_arr = StringViewBuilder::with_capacity(tokens.len());
for token in &tokens {
id_arr.append_value(token.id.get());
name_arr.append_value(&token.name);
hash_arr.append_value(hex::encode(&token.hash));
created_at_arr.append_value(token.created_at);
if token.description.is_some() {
description_arr.append_value(token.description.clone().unwrap());
} else {
description_arr.append_null();
}
if token.created_by.is_some() {
created_by_arr.append_value(token.created_by.unwrap().get());
} else {
created_by_arr.append_null();
}
if token.updated_at.is_some() {
updated_at_arr.append_value(token.updated_at.unwrap());
} else {
updated_at_arr.append_null();
}
if token.updated_by.is_some() {
updated_by_arr.append_value(token.updated_by.unwrap().get());
} else {
updated_by_arr.append_null();
}
// when expiry is not passed in, we default it to i64::MAX (which is same as null)
if token.expiry_millis == i64::MAX {
expiry_arr.append_null();
} else {
expiry_arr.append_value(token.expiry_millis);
}
// core only
let permissions_str = "*:*:*".to_string();
permissions_arr.append_value(permissions_str);
}
let columns: Vec<ArrayRef> = vec![
Arc::new(id_arr.finish()),
Arc::new(name_arr.finish()),
Arc::new(hash_arr.finish()),
Arc::new(created_at_arr.finish()),
Arc::new(description_arr.finish()),
Arc::new(created_by_arr.finish()),
Arc::new(updated_at_arr.finish()),
Arc::new(updated_by_arr.finish()),
Arc::new(expiry_arr.finish()),
Arc::new(permissions_arr.finish()),
];
Ok(RecordBatch::try_new(Arc::clone(schema), columns)?)
}

View File

@ -714,7 +714,7 @@ mod tests {
.unwrap_success() .unwrap_success()
.convert_lines_to_buffer(Gen1Duration::new_5m()); .convert_lines_to_buffer(Gen1Duration::new_5m());
let db = catalog.db_schema_by_id(&DbId::from(0)).unwrap(); let db = catalog.db_schema_by_id(&DbId::from(1)).unwrap();
assert_eq!(db.tables.len(), 2); assert_eq!(db.tables.len(), 2);
// cpu table // cpu table
@ -1487,7 +1487,7 @@ mod tests {
let persisted_snapshot = let persisted_snapshot =
serde_json::from_slice::<PersistedSnapshot>(&persisted_snapshot_bytes).unwrap(); serde_json::from_slice::<PersistedSnapshot>(&persisted_snapshot_bytes).unwrap();
assert_eq!( assert_eq!(
CatalogSequenceNumber::new(1), CatalogSequenceNumber::new(2),
persisted_snapshot.catalog_sequence_number persisted_snapshot.catalog_sequence_number
); );
} }
@ -1910,7 +1910,7 @@ mod tests {
// this persists the catalog immediately, so we don't wait for anything, just assert that // this persists the catalog immediately, so we don't wait for anything, just assert that
// the next db id is 1, since the above would have used 0 // the next db id is 1, since the above would have used 0
assert_eq!(wbuf.catalog().next_db_id(), DbId::new(1)); assert_eq!(wbuf.catalog().next_db_id(), DbId::new(2));
// drop the write buffer, and create a new one that replays and re-loads the catalog: // drop the write buffer, and create a new one that replays and re-loads the catalog:
drop(wbuf); drop(wbuf);
@ -1930,7 +1930,7 @@ mod tests {
.await; .await;
// check that the next db id is still 1 // check that the next db id is still 1
assert_eq!(wbuf.catalog().next_db_id(), DbId::new(1)); assert_eq!(wbuf.catalog().next_db_id(), DbId::new(2));
} }
#[test_log::test(tokio::test)] #[test_log::test(tokio::test)]
@ -1952,7 +1952,7 @@ mod tests {
) )
.await; .await;
let db_name = "my_corp"; let db_name = "my_corp";
let db_id = DbId::from(0); let db_id = DbId::from(1);
let tbl_name = "temp"; let tbl_name = "temp";
let tbl_id = TableId::from(0); let tbl_id = TableId::from(0);
@ -2060,7 +2060,7 @@ mod tests {
) )
.await; .await;
let db_name = "my_corp"; let db_name = "my_corp";
let db_id = DbId::from(0); let db_id = DbId::from(1);
let tbl_name = "temp"; let tbl_name = "temp";
let tbl_id = TableId::from(0); let tbl_id = TableId::from(0);
@ -2779,7 +2779,7 @@ mod tests {
// get the path for the created parquet file // get the path for the created parquet file
let persisted_files = write_buffer let persisted_files = write_buffer
.persisted_files() .persisted_files()
.get_files(DbId::from(0), TableId::from(0)); .get_files(DbId::from(1), TableId::from(0));
assert_eq!(1, persisted_files.len()); assert_eq!(1, persisted_files.len());
let path = ObjPath::from(persisted_files[0].path.as_str()); let path = ObjPath::from(persisted_files[0].path.as_str());
@ -2808,7 +2808,7 @@ mod tests {
// at this point everything should've been snapshotted // at this point everything should've been snapshotted
drop(write_buffer); drop(write_buffer);
debug!(">>> test: stopped"); debug!("test: stopped");
// nothing in the cache at this point and not in buffer // nothing in the cache at this point and not in buffer
let (write_buffer, ctx, _) = setup_cache_optional( let (write_buffer, ctx, _) = setup_cache_optional(
// move the time // move the time
@ -2909,7 +2909,7 @@ mod tests {
.unwrap_success() .unwrap_success()
.convert_lines_to_buffer(Gen1Duration::new_5m()); .convert_lines_to_buffer(Gen1Duration::new_5m());
let db = catalog.db_schema_by_id(&DbId::from(0)).unwrap(); let db = catalog.db_schema_by_id(&DbId::from(1)).unwrap();
assert_eq!(db.tables.len(), 1); assert_eq!(db.tables.len(), 1);
assert_eq!( assert_eq!(
@ -2937,7 +2937,7 @@ mod tests {
.convert_lines_to_buffer(Gen1Duration::new_5m()); .convert_lines_to_buffer(Gen1Duration::new_5m());
assert_eq!(db.tables.len(), 1); assert_eq!(db.tables.len(), 1);
let db = catalog.db_schema_by_id(&DbId::from(0)).unwrap(); let db = catalog.db_schema_by_id(&DbId::from(1)).unwrap();
let table = db.tables.get_by_id(&TableId::from(0)).unwrap(); let table = db.tables.get_by_id(&TableId::from(0)).unwrap();
assert_eq!(table.num_columns(), 4); assert_eq!(table.num_columns(), 4);
assert_eq!(table.series_key.len(), 2); assert_eq!(table.series_key.len(), 2);

View File

@ -13,6 +13,22 @@ expression: catalog_json
0, 0,
{ {
"id": 0, "id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "db", "name": "db",
"tables": { "tables": {
"repo": [ "repo": [
@ -117,9 +133,9 @@ expression: catalog_json
} }
] ]
], ],
"next_id": 1 "next_id": 2
}, },
"sequence": 3, "sequence": 4,
"tokens": { "tokens": {
"repo": [], "repo": [],
"next_id": 0 "next_id": 0

View File

@ -13,6 +13,22 @@ expression: catalog_json
0, 0,
{ {
"id": 0, "id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "db", "name": "db",
"tables": { "tables": {
"repo": [ "repo": [
@ -107,9 +123,9 @@ expression: catalog_json
} }
] ]
], ],
"next_id": 1 "next_id": 2
}, },
"sequence": 2, "sequence": 3,
"tokens": { "tokens": {
"repo": [], "repo": [],
"next_id": 0 "next_id": 0

View File

@ -13,6 +13,22 @@ expression: catalog_json
0, 0,
{ {
"id": 0, "id": 0,
"name": "_internal",
"tables": {
"repo": [],
"next_id": 0
},
"processing_engine_triggers": {
"repo": [],
"next_id": 0
},
"deleted": false
}
],
[
1,
{
"id": 1,
"name": "db", "name": "db",
"tables": { "tables": {
"repo": [ "repo": [
@ -101,9 +117,9 @@ expression: catalog_json
} }
] ]
], ],
"next_id": 1 "next_id": 2
}, },
"sequence": 4, "sequence": 5,
"tokens": { "tokens": {
"repo": [], "repo": [],
"next_id": 0 "next_id": 0