Merge branch 'main' into dom/dml-delete-namespace-id

pull/24376/head
kodiakhq[bot] 2022-11-07 09:07:38 +00:00 committed by GitHub
commit df5ec013d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 318 additions and 5 deletions

View File

@ -442,6 +442,8 @@ pub struct Namespace {
/// never drop data).
#[sqlx(default)]
pub retention_duration: Option<String>,
/// The retention period in ns. None represents infinite duration (i.e. never drop data).
pub retention_period_ns: Option<i64>,
/// The topic that writes to this namespace will land in
pub topic_id: TopicId,
/// The query pool assigned to answer queries for this namespace

View File

@ -5,6 +5,9 @@ option go_package = "github.com/influxdata/iox/schema/v1";
service SchemaService {
// Get the schema for a namespace
rpc GetSchema(GetSchemaRequest) returns (GetSchemaResponse);
// update retention period
rpc UpdateNamespaceRetention(UpdateNamespaceRetentionRequest) returns (UpdateNamespaceRetentionResponse);
}
message GetSchemaRequest {
@ -61,3 +64,25 @@ message ColumnSchema {
}
}
message UpdateNamespaceRetentionRequest {
// Name of the namespace to be set
string name = 1;
// Number of hours of the retention period
int64 retention_hours = 2;
}
message UpdateNamespaceRetentionResponse {
Namespace namespace = 1;
}
message Namespace {
// Namespace ID
int64 id = 1;
// Name of the Namespace
string name = 2;
// Retention period ns
optional int64 retention_period_ns = 3;
}

View File

@ -3,6 +3,8 @@
use influxdb_iox_client::{connection::Connection, namespace};
use thiserror::Error;
mod retention;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
@ -25,17 +27,22 @@ pub struct Config {
enum Command {
/// Fetch namespaces
List,
/// Update retention of an existing namespace
Retention(retention::Config),
}
pub async fn command(connection: Connection, config: Config) -> Result<(), Error> {
let mut client = namespace::Client::new(connection);
match config.command {
Command::List => {
let mut client = namespace::Client::new(connection);
let namespaces = client.get_namespaces().await?;
println!("{}", serde_json::to_string_pretty(&namespaces)?);
}
Command::Retention(config) => {
retention::command(connection, config).await?;
} // Deliberately not adding _ => so the compiler will direct people here to impl new
// commands
}
Ok(())
}

View File

@ -0,0 +1,42 @@
use influxdb_iox_client::connection::Connection;
use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("JSON Serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
}
/// Write data into the specified database
#[derive(Debug, clap::Parser)]
pub struct Config {
/// The namespace to update the retention period for
#[clap(action)]
namespace: String,
/// Num of hours of the retention period of this namespace. Default is 0 representing infinite retention
#[clap(action, long, short = 'c', default_value = "0")]
retention_hours: u32,
}
pub async fn command(
connection: Connection,
config: Config,
) -> Result<(), crate::commands::namespace::Error> {
let Config {
namespace,
retention_hours,
} = config;
let mut client = influxdb_iox_client::schema::Client::new(connection);
let namespace = client
.update_namespace_retention(&namespace, retention_hours.try_into().unwrap())
.await?;
println!("{}", serde_json::to_string_pretty(&namespace)?);
Ok(())
}

View File

@ -570,6 +570,91 @@ async fn namespaces_cli() {
.await
}
/// Test the namespace retention command
#[tokio::test]
async fn namespace_retention() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(String::from(
"my_awesome_table2,tag1=A,tag2=B val=42i 123456",
)),
// Set the retention period to 2 hours
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let addr = state.cluster().router().router_grpc_base().to_string();
let namespace = state.cluster().namespace();
let retention_period_hours = 2;
let retention_period_ns =
retention_period_hours as i64 * 60 * 60 * 1_000_000_000;
// Validate the output of the namespace retention command
//
// {
// "id": "1",
// "name": "0911430016317810_8303971312605107",
// "retentionPeriodNs": "7200000000000"
// }
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("retention")
.arg("--retention-hours")
.arg(retention_period_hours.to_string())
.arg(&namespace)
.assert()
.success()
.stdout(
predicate::str::contains(namespace)
.and(predicate::str::contains(&retention_period_ns.to_string())),
);
}
.boxed()
})),
// set the retention period to null
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let addr = state.cluster().router().router_grpc_base().to_string();
let namespace = state.cluster().namespace();
let retention_period_hours = 0; // will be updated to null
// Validate the output of the namespace retention command
//
// {
// "id": "1",
// "name": "6699752880299094_1206270074309156"
// }
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("retention")
.arg("--retention-hours")
.arg(retention_period_hours.to_string())
.arg(&namespace)
.assert()
.success()
.stdout(
predicate::str::contains(namespace)
.and(predicate::str::contains("retentionPeriodNs".to_string()))
.not(),
);
}
.boxed()
})),
],
)
.run()
.await
}
/// Test the query_ingester CLI command
#[tokio::test]
async fn query_ingester() {

View File

@ -35,4 +35,21 @@ impl Client {
Ok(response.into_inner().schema.unwrap_field("schema")?)
}
/// Update retention for a namespace
pub async fn update_namespace_retention(
&mut self,
namespace: &str,
retention_hours: i64,
) -> Result<Namespace, Error> {
let response = self
.inner
.update_namespace_retention(UpdateNamespaceRetentionRequest {
name: namespace.to_string(),
retention_hours,
})
.await?;
Ok(response.into_inner().namespace.unwrap_field("namespace")?)
}
}

View File

@ -295,6 +295,13 @@ pub trait NamespaceRepo: Send + Sync {
query_pool_id: QueryPoolId,
) -> Result<Namespace>;
/// Update retention period for a namespace
async fn update_retention_period(
&mut self,
name: &str,
retention_hours: i64,
) -> Result<Namespace>;
/// List all namespaces.
async fn list(&mut self) -> Result<Vec<Namespace>>;
@ -718,6 +725,23 @@ where
get_schema_internal(namespace, repos).await
}
/// Update retention for a namespace
pub async fn update_namespace_retention<R>(
name: &str,
retention_hours: i64,
repos: &mut R,
) -> Result<Namespace>
where
R: RepoCollection + ?Sized,
{
let namespace = repos
.namespaces()
.update_retention_period(name, retention_hours)
.await?;
Ok(namespace)
}
async fn get_schema_internal<R>(namespace: Namespace, repos: &mut R) -> Result<NamespaceSchema>
where
R: RepoCollection + ?Sized,
@ -1036,6 +1060,25 @@ pub(crate) mod test_helpers {
.await
.expect("namespace should be updateable");
assert_eq!(NEW_COLUMN_LIMIT, modified.max_columns_per_table);
const NEW_RETENTION_PERIOD: i64 = 5;
let modified = repos
.namespaces()
.update_retention_period(namespace_name, NEW_RETENTION_PERIOD)
.await
.expect("namespace should be updateable");
assert_eq!(
NEW_RETENTION_PERIOD as i64 * 60 * 60 * 1_000_000_000,
modified.retention_period_ns.unwrap()
);
const NEW_RETENTION_PERIOD_NULL: i64 = 0;
let modified = repos
.namespaces()
.update_retention_period(namespace_name, NEW_RETENTION_PERIOD_NULL)
.await
.expect("namespace should be updateable");
assert!(modified.retention_period_ns.is_none());
}
async fn test_table(catalog: Arc<dyn Catalog>) {

View File

@ -31,6 +31,8 @@ const TIME_COLUMN: &str = "time";
pub const DEFAULT_MAX_TABLES: i32 = 10_000;
/// Default per-table column count service protection limit.
pub const DEFAULT_MAX_COLUMNS_PER_TABLE: i32 = 200;
/// Default retention period for data in the catalog.
pub const DEFAULT_RETENTION_PERIOD: Option<i64> = None;
/// A string value representing an infinite retention policy.
pub const INFINITE_RETENTION_POLICY: &str = "inf";

View File

@ -9,7 +9,7 @@ use crate::{
TombstoneRepo, TopicMetadataRepo, Transaction,
},
metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, DEFAULT_RETENTION_PERIOD,
};
use async_trait::async_trait;
use data_types::{
@ -305,6 +305,7 @@ impl NamespaceRepo for MemTxn {
retention_duration: Some(retention_duration.to_string()),
max_tables: DEFAULT_MAX_TABLES,
max_columns_per_table: DEFAULT_MAX_COLUMNS_PER_TABLE,
retention_period_ns: DEFAULT_RETENTION_PERIOD,
};
stage.namespaces.push(namespace);
Ok(stage.namespaces.last().unwrap().clone())
@ -353,6 +354,30 @@ impl NamespaceRepo for MemTxn {
}),
}
}
async fn update_retention_period(
&mut self,
name: &str,
retention_hours: i64,
) -> Result<Namespace> {
let rentenion_period_ns = retention_hours * 60 * 60 * 1_000_000_000;
let retention = if rentenion_period_ns == 0 {
None
} else {
Some(rentenion_period_ns)
};
let stage = self.stage();
match stage.namespaces.iter_mut().find(|n| n.name == name) {
Some(n) => {
n.retention_period_ns = retention;
Ok(n.clone())
}
None => Err(Error::NamespaceNotFoundByName {
name: name.to_string(),
}),
}
}
}
#[async_trait]

View File

@ -194,6 +194,7 @@ decorate!(
impl_trait = NamespaceRepo,
methods = [
"namespace_create" = create(&mut self, name: &str, retention_duration: &str, topic_id: TopicId, query_pool_id: QueryPoolId) -> Result<Namespace>;
"namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_hours: i64) -> Result<Namespace>;
"namespace_list" = list(&mut self) -> Result<Vec<Namespace>>;
"namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId) -> Result<Option<Namespace>>;
"namespace_get_by_name" = get_by_name(&mut self, name: &str) -> Result<Option<Namespace>>;

View File

@ -8,7 +8,7 @@ use crate::{
TombstoneRepo, TopicMetadataRepo, Transaction,
},
metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, DEFAULT_RETENTION_PERIOD,
};
use async_trait::async_trait;
use data_types::{
@ -622,6 +622,7 @@ RETURNING *;
// Ensure the column default values match the code values.
debug_assert_eq!(rec.max_tables, DEFAULT_MAX_TABLES);
debug_assert_eq!(rec.max_columns_per_table, DEFAULT_MAX_COLUMNS_PER_TABLE);
debug_assert_eq!(rec.retention_period_ns, DEFAULT_RETENTION_PERIOD);
Ok(rec)
}
@ -729,6 +730,40 @@ RETURNING *;
Ok(namespace)
}
async fn update_retention_period(
&mut self,
name: &str,
retention_hours: i64,
) -> Result<Namespace> {
let rentenion_period_ns = retention_hours * 60 * 60 * 1_000_000_000;
let rec = if rentenion_period_ns == 0 {
sqlx::query_as::<_, Namespace>(
r#"UPDATE namespace SET retention_period_ns = NULL WHERE name = $1 RETURNING *;"#,
)
.bind(&name) // $1
.fetch_one(&mut self.inner)
.await
} else {
sqlx::query_as::<_, Namespace>(
r#"UPDATE namespace SET retention_period_ns = $1 WHERE name = $2 RETURNING *;"#,
)
.bind(&rentenion_period_ns) // $1
.bind(&name) // $2
.fetch_one(&mut self.inner)
.await
};
let namespace = rec.map_err(|e| match e {
sqlx::Error::RowNotFound => Error::NamespaceNotFoundByName {
name: name.to_string(),
},
_ => Error::SqlxError { source: e },
})?;
Ok(namespace)
}
}
#[async_trait]

View File

@ -215,6 +215,7 @@ mod tests {
query_pool_id: QueryPoolId::new(42),
max_tables: iox_catalog::DEFAULT_MAX_TABLES,
max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
retention_period_ns: iox_catalog::DEFAULT_RETENTION_PERIOD,
}
);
}

View File

@ -3,7 +3,7 @@
use std::{ops::DerefMut, sync::Arc};
use generated_types::influxdata::iox::schema::v1::*;
use iox_catalog::interface::{get_schema_by_name, Catalog};
use iox_catalog::interface::{get_schema_by_name, update_namespace_retention, Catalog};
use observability_deps::tracing::warn;
use tonic::{Request, Response, Status};
@ -38,6 +38,24 @@ impl schema_service_server::SchemaService for SchemaService {
.map(Arc::new)?;
Ok(Response::new(schema_to_proto(schema)))
}
async fn update_namespace_retention(
&self,
request: Request<UpdateNamespaceRetentionRequest>,
) -> Result<Response<UpdateNamespaceRetentionResponse>, Status> {
let mut repos = self.catalog.repositories().await;
let req = request.into_inner();
let namespace =
update_namespace_retention(&req.name, req.retention_hours, repos.deref_mut())
.await
.map_err(|e| {
warn!(error=%e, %req.name, "failed to update namespace retention");
Status::not_found(e.to_string())
})
.map(Arc::new)?;
Ok(Response::new(namespace_to_proto(namespace)))
}
}
fn schema_to_proto(schema: Arc<data_types::NamespaceSchema>) -> GetSchemaResponse {
@ -76,6 +94,16 @@ fn schema_to_proto(schema: Arc<data_types::NamespaceSchema>) -> GetSchemaRespons
response
}
fn namespace_to_proto(namespace: Arc<data_types::Namespace>) -> UpdateNamespaceRetentionResponse {
UpdateNamespaceRetentionResponse {
namespace: Some(Namespace {
id: namespace.id.get(),
name: namespace.name.clone(),
retention_period_ns: namespace.retention_period_ns,
}),
}
}
#[cfg(test)]
mod tests {
use super::*;