feat: drop/delete database (#25549)

* feat: drop/delete database

This commit allows soft deletion of database using `influxdb3 database
delete <db_name>` command. The write buffer and last value cache are
cleared as well.

closes: https://github.com/influxdata/influxdb/issues/25523

* feat: reuse same code path when deleting database

- In previous commit, the deletion of database immediately triggered
  clearing last cache and query buffer. But on restarts same logic had
  to be repeated to allow deleting database when starting up. This
  commit removes immediate deletion by explicitly calling necessary
  methods and moves the logic to `apply_catalog_batch` which already
  applies `CatalogOp` and also clearing cache and buffer in
  `buffer_ops` method which has hooks to call other places.

closes: https://github.com/influxdata/influxdb/issues/25523

* feat: use reqwest query api for query param

Co-authored-by: Trevor Hilton <thilton@influxdata.com>

* feat: include deleted flag in DatabaseSnapshot

- `DatabaseSchema` serialization/deserialization is delegated to
 `DatabaseSnapshot`, so the `deleted` flag should be included in
 `DatabaseSnapshot` as well.
- insta test snapshots fixed

closes: https://github.com/influxdata/influxdb/issues/25523

* feat: address PR comments + tidy ups

---------

Co-authored-by: Trevor Hilton <thilton@influxdata.com>
praveen/drop-table
praveen-influx 2024-11-19 16:08:14 +00:00 committed by GitHub
parent 53f54a6845
commit 33c2d47ba9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 573 additions and 72 deletions

2
Cargo.lock generated
View File

@ -2770,12 +2770,14 @@ version = "0.1.0"
dependencies = [
"arrow",
"bimap",
"chrono",
"hashbrown 0.15.1",
"indexmap 2.6.0",
"influxdb-line-protocol",
"influxdb3_id",
"influxdb3_wal",
"insta",
"iox_time",
"observability_deps",
"parking_lot",
"pretty_assertions",

View File

@ -0,0 +1,53 @@
use std::{error::Error, io};
use secrecy::ExposeSecret;
use crate::commands::common::InfluxDb3Config;
#[derive(Debug, clap::Parser)]
enum Command {
Delete(Config),
}
#[derive(Debug, clap::Parser)]
pub(crate) struct ManageDatabaseConfig {
#[clap(subcommand)]
command: Command,
}
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
}
pub async fn delete_database(manage_db_config: ManageDatabaseConfig) -> Result<(), Box<dyn Error>> {
match manage_db_config.command {
Command::Delete(config) => {
let InfluxDb3Config {
host_url,
database_name,
auth_token,
} = config.influxdb3_config;
println!(
"Are you sure you want to delete {:?}? Enter 'yes' to confirm",
database_name
);
let mut confirmation = String::new();
let _ = io::stdin().read_line(&mut confirmation);
if confirmation.trim() != "yes" {
println!("Cannot delete database without confirmation");
} else {
let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
client.api_v3_configure_db_delete(&database_name).await?;
println!("Database {:?} deleted successfully", &database_name);
}
}
}
Ok(())
}

View File

@ -26,6 +26,7 @@ use trogging::{
mod commands {
pub(crate) mod common;
pub mod database;
pub mod last_cache;
pub mod query;
pub mod serve;
@ -91,6 +92,9 @@ enum Command {
/// Manage last-n-value caches
LastCache(commands::last_cache::Config),
/// Manage database (delete only for the moment)
Database(commands::database::ManageDatabaseConfig),
}
fn main() -> Result<(), std::io::Error> {
@ -148,6 +152,12 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Database(db_config)) => {
if let Err(e) = commands::database::delete_database(db_config).await {
eprintln!("Database delete command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
}
});

View File

@ -0,0 +1,98 @@
use std::{
io::Write,
process::{Command, Stdio},
thread,
};
use assert_cmd::cargo::CommandCargoExt;
use observability_deps::tracing::debug;
use test_helpers::assert_contains;
use crate::TestServer;
pub fn run_with_confirmation(args: &[&str]) -> String {
let mut child_process = Command::cargo_bin("influxdb3")
.unwrap()
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.unwrap();
let mut stdin = child_process.stdin.take().expect("failed to open stdin");
thread::spawn(move || {
stdin
.write_all(b"yes\n")
.expect("cannot write confirmation msg to stdin");
});
String::from_utf8(child_process.wait_with_output().unwrap().stdout)
.unwrap()
.trim()
.into()
}
pub fn run_with_confirmation_and_err(args: &[&str]) -> String {
let mut child_process = Command::cargo_bin("influxdb3")
.unwrap()
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
let mut stdin = child_process.stdin.take().expect("failed to open stdin");
thread::spawn(move || {
stdin
.write_all(b"yes\n")
.expect("cannot write confirmation msg to stdin");
});
String::from_utf8(child_process.wait_with_output().unwrap().stderr)
.unwrap()
.trim()
.into()
}
#[test_log::test(tokio::test)]
async fn test_delete_database() {
let server = TestServer::spawn().await;
let server_addr = server.client_addr();
let db_name = "foo";
server
.write_lp_to_db(
db_name,
"cpu,t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000",
influxdb3_client::Precision::Second,
)
.await
.expect("write to db");
let result = run_with_confirmation(&[
"database",
"delete",
"--dbname",
db_name,
"--host",
&server_addr,
]);
debug!(result = ?result, "delete database");
assert_contains!(&result, "Database \"foo\" deleted successfully");
}
#[test_log::test(tokio::test)]
async fn test_delete_missing_database() {
let server = TestServer::spawn().await;
let server_addr = server.client_addr();
let db_name = "foo";
let result = run_with_confirmation_and_err(&[
"database",
"delete",
"--dbname",
db_name,
"--host",
&server_addr,
]);
debug!(result = ?result, "delete missing database");
assert_contains!(&result, "404");
}

View File

@ -1,4 +1,8 @@
use hyper::StatusCode;
use observability_deps::tracing::debug;
use pretty_assertions::assert_eq;
use serde_json::{json, Value};
use test_helpers::assert_contains;
use crate::TestServer;
@ -367,3 +371,140 @@ async fn api_v3_configure_last_cache_delete() {
}
}
}
#[test_log::test(tokio::test)]
async fn api_v3_configure_db_delete() {
let db_name = "foo";
let tbl_name = "tbl";
let server = TestServer::spawn().await;
let client = reqwest::Client::new();
let url = format!(
"{base}/api/v3/configure/database?db={db_name}",
base = server.client_addr()
);
server
.write_lp_to_db(
db_name,
format!("{tbl_name},t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000"),
influxdb3_client::Precision::Second,
)
.await
.expect("write to db");
// check foo db is present
let result = server
.api_v3_query_influxql(&[("q", "SHOW DATABASES"), ("format", "json")])
.await
.json::<Value>()
.await
.unwrap();
debug!(result = ?result, ">> RESULT");
assert_eq!(json!([{ "iox::database": "foo" } ]), result);
let resp = client
.delete(&url)
.send()
.await
.expect("delete database call succeed");
assert_eq!(200, resp.status());
// check foo db is now foo-YYYYMMDD..
let result = server
.api_v3_query_influxql(&[("q", "SHOW DATABASES"), ("format", "json")])
.await
.json::<Value>()
.await
.unwrap();
debug!(result = ?result, ">> RESULT");
let array_result = result.as_array().unwrap();
assert_eq!(1, array_result.len());
let first_db = array_result.first().unwrap();
assert_contains!(
first_db
.as_object()
.unwrap()
.get("iox::database")
.unwrap()
.as_str()
.unwrap(),
"foo-"
);
server
.write_lp_to_db(
db_name,
format!("{tbl_name},t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000"),
influxdb3_client::Precision::Second,
)
.await
.expect("write to db");
let result = server
.api_v3_query_influxql(&[("q", "SHOW DATABASES"), ("format", "json")])
.await
.json::<Value>()
.await
.unwrap();
debug!(result = ?result, ">> RESULT");
let array_result = result.as_array().unwrap();
// check there are 2 dbs now, foo and foo-*
assert_eq!(2, array_result.len());
let first_db = array_result.first().unwrap();
let second_db = array_result.get(1).unwrap();
assert_eq!(
"foo",
first_db
.as_object()
.unwrap()
.get("iox::database")
.unwrap()
.as_str()
.unwrap(),
);
assert_contains!(
second_db
.as_object()
.unwrap()
.get("iox::database")
.unwrap()
.as_str()
.unwrap(),
"foo-"
);
}
#[tokio::test]
async fn api_v3_configure_db_delete_no_db() {
let db_name = "db";
let server = TestServer::spawn().await;
let client = reqwest::Client::new();
let url = format!(
"{base}/api/v3/configure/database?db={db_name}",
base = server.client_addr()
);
let resp = client
.delete(&url)
.send()
.await
.expect("delete database call succeed");
assert_eq!(StatusCode::NOT_FOUND, resp.status());
}
#[tokio::test]
async fn api_v3_configure_db_delete_missing_query_param() {
let server = TestServer::spawn().await;
let client = reqwest::Client::new();
let url = format!(
"{base}/api/v3/configure/database",
base = server.client_addr()
);
let resp = client
.delete(&url)
.send()
.await
.expect("delete database call succeed");
assert_eq!(StatusCode::BAD_REQUEST, resp.status());
}

View File

@ -14,6 +14,7 @@ use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::Response;
mod auth;
mod cli;
mod client;
mod configure;
mod flight;

View File

@ -10,6 +10,7 @@ license.workspace = true
influxdb-line-protocol.workspace = true
observability_deps.workspace = true
schema = { workspace = true }
iox_time.workspace = true
# Local deps
influxdb3_id = { path = "../influxdb3_id" }
@ -18,6 +19,7 @@ influxdb3_wal = { path = "../influxdb3_wal" }
# crates.io dependencies
arrow.workspace = true
bimap.workspace = true
chrono.workspace = true
hashbrown.workspace = true
indexmap.workspace = true
parking_lot.workspace = true

View File

@ -9,7 +9,8 @@ use influxdb3_wal::{
CatalogBatch, CatalogOp, FieldAdditions, LastCacheDefinition, LastCacheDelete,
};
use influxdb_line_protocol::FieldValue;
use observability_deps::tracing::info;
use iox_time::Time;
use observability_deps::tracing::{debug, info};
use parking_lot::RwLock;
use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder};
use serde::{Deserialize, Serialize, Serializer};
@ -17,6 +18,8 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use thiserror::Error;
const SOFT_DB_DELETION_TIME_FORMAT: &str = "%Y%m%dT%H%M%S";
#[derive(Debug, Error, Clone)]
pub enum Error {
#[error("catalog updated elsewhere")]
@ -173,14 +176,14 @@ impl Catalog {
}
pub fn db_schema(&self, db_name: &str) -> Option<Arc<DatabaseSchema>> {
self.db_schema_and_id(db_name).map(|(_, schema)| schema)
self.db_id_and_schema(db_name).map(|(_, schema)| schema)
}
pub fn db_schema_by_id(&self, db_id: &DbId) -> Option<Arc<DatabaseSchema>> {
self.inner.read().databases.get(db_id).cloned()
}
pub fn db_schema_and_id(&self, db_name: &str) -> Option<(DbId, Arc<DatabaseSchema>)> {
pub fn db_id_and_schema(&self, db_name: &str) -> Option<(DbId, Arc<DatabaseSchema>)> {
let inner = self.inner.read();
let db_id = inner.db_map.get_by_right(db_name)?;
inner
@ -227,8 +230,7 @@ impl Catalog {
table.add_last_cache(last_cache);
db.tables.insert(table_id, Arc::new(table));
inner.databases.insert(db_id, Arc::new(db));
inner.sequence = inner.sequence.next();
inner.updated = true;
inner.set_db_updated();
}
pub fn delete_last_cache(&self, db_id: DbId, table_id: TableId, name: &str) {
@ -248,8 +250,7 @@ impl Catalog {
table.remove_last_cache(name);
db.tables.insert(table_id, Arc::new(table));
inner.databases.insert(db_id, Arc::new(db));
inner.sequence = inner.sequence.next();
inner.updated = true;
inner.set_db_updated();
}
pub fn instance_id(&self) -> Arc<str> {
@ -267,10 +268,7 @@ impl Catalog {
pub fn insert_database(&self, db: DatabaseSchema) {
let mut inner = self.inner.write();
inner.db_map.insert(db.id, Arc::clone(&db.name));
inner.databases.insert(db.id, Arc::new(db));
inner.sequence = inner.sequence.next();
inner.updated = true;
inner.upsert_db(db);
}
pub fn is_updated(&self) -> bool {
@ -388,34 +386,17 @@ impl InnerCatalog {
let table_count = self.table_count();
if let Some(db) = self.databases.get(&catalog_batch.database_id) {
let existing_table_count = db.tables.len();
if let Some(new_db) = db.new_if_updated_from_batch(catalog_batch)? {
let new_table_count = new_db.tables.len() - existing_table_count;
if table_count + new_table_count > Catalog::NUM_TABLES_LIMIT {
return Err(Error::TooManyTables);
}
let new_db = Arc::new(new_db);
self.databases.insert(new_db.id, Arc::clone(&new_db));
self.sequence = self.sequence.next();
self.updated = true;
self.db_map.insert(new_db.id, Arc::clone(&new_db.name));
if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, catalog_batch)? {
check_overall_table_count(Some(db), &new_db, table_count)?;
self.upsert_db(new_db);
}
} else {
if self.databases.len() >= Catalog::NUM_DBS_LIMIT {
return Err(Error::TooManyDbs);
}
let new_db = DatabaseSchema::new_from_batch(catalog_batch)?;
if table_count + new_db.tables.len() > Catalog::NUM_TABLES_LIMIT {
return Err(Error::TooManyTables);
}
let new_db = Arc::new(new_db);
self.databases.insert(new_db.id, Arc::clone(&new_db));
self.sequence = self.sequence.next();
self.updated = true;
self.db_map.insert(new_db.id, Arc::clone(&new_db.name));
check_overall_table_count(None, &new_db, table_count)?;
self.upsert_db(new_db);
}
Ok(())
@ -424,6 +405,36 @@ impl InnerCatalog {
pub fn db_exists(&self, db_id: DbId) -> bool {
self.databases.contains_key(&db_id)
}
pub fn upsert_db(&mut self, db: DatabaseSchema) {
let name = Arc::clone(&db.name);
let id = db.id;
self.databases.insert(id, Arc::new(db));
self.set_db_updated();
self.db_map.insert(id, name);
}
fn set_db_updated(&mut self) {
self.sequence = self.sequence.next();
self.updated = true;
}
}
fn check_overall_table_count(
existing_db: Option<&Arc<DatabaseSchema>>,
new_db: &DatabaseSchema,
current_table_count: usize,
) -> Result<()> {
let existing_table_count = if let Some(existing_db) = existing_db {
existing_db.tables.len()
} else {
0
};
let newly_added_table_count = new_db.tables.len() - existing_table_count;
if current_table_count + newly_added_table_count > Catalog::NUM_TABLES_LIMIT {
return Err(Error::TooManyTables);
}
Ok(())
}
#[derive(Debug, Eq, PartialEq, Clone)]
@ -433,6 +444,7 @@ pub struct DatabaseSchema {
/// The database is a map of tables
pub tables: SerdeVecMap<TableId, Arc<TableDefinition>>,
pub table_map: BiHashMap<TableId, Arc<str>>,
pub deleted: bool,
}
impl DatabaseSchema {
@ -442,14 +454,21 @@ impl DatabaseSchema {
name,
tables: Default::default(),
table_map: BiHashMap::new(),
deleted: false,
}
}
/// Validates the updates in the `CatalogBatch` are compatible with this schema. If
/// everything is compatible and there are no updates to the existing schema, None will be
/// returned, otherwise a new `DatabaseSchema` will be returned with the updates applied.
pub fn new_if_updated_from_batch(&self, catalog_batch: &CatalogBatch) -> Result<Option<Self>> {
pub fn new_if_updated_from_batch(
db_schema: &DatabaseSchema,
catalog_batch: &CatalogBatch,
) -> Result<Option<Self>> {
debug!(name = ?db_schema.name, deleted = ?db_schema.deleted, full_batch = ?catalog_batch, "Updating / adding to catalog");
let mut updated_or_new_tables = SerdeVecMap::new();
let mut schema_deleted = false;
let mut schema_name = Arc::clone(&db_schema.name);
for catalog_op in &catalog_batch.ops {
match catalog_op {
@ -457,7 +476,7 @@ impl DatabaseSchema {
CatalogOp::CreateTable(table_definition) => {
let new_or_existing_table = updated_or_new_tables
.get(&table_definition.table_id)
.or_else(|| self.tables.get(&table_definition.table_id));
.or_else(|| db_schema.tables.get(&table_definition.table_id));
if let Some(existing_table) = new_or_existing_table {
if let Some(new_table) =
existing_table.new_if_definition_adds_new_fields(table_definition)?
@ -472,7 +491,7 @@ impl DatabaseSchema {
CatalogOp::AddFields(field_additions) => {
let Some(new_or_existing_table) = updated_or_new_tables
.get(&field_additions.table_id)
.or_else(|| self.tables.get(&field_additions.table_id))
.or_else(|| db_schema.tables.get(&field_additions.table_id))
else {
return Err(Error::TableNotFound {
db_name: Arc::clone(&field_additions.database_name),
@ -488,10 +507,10 @@ impl DatabaseSchema {
CatalogOp::CreateLastCache(last_cache_definition) => {
let new_or_existing_table = updated_or_new_tables
.get(&last_cache_definition.table_id)
.or_else(|| self.tables.get(&last_cache_definition.table_id));
.or_else(|| db_schema.tables.get(&last_cache_definition.table_id));
let table = new_or_existing_table.ok_or(TableNotFound {
db_name: Arc::clone(&self.name),
db_name: Arc::clone(&db_schema.name),
table_name: Arc::clone(&last_cache_definition.table),
})?;
@ -504,10 +523,10 @@ impl DatabaseSchema {
CatalogOp::DeleteLastCache(last_cache_deletion) => {
let new_or_existing_table = updated_or_new_tables
.get(&last_cache_deletion.table_id)
.or_else(|| self.tables.get(&last_cache_deletion.table_id));
.or_else(|| db_schema.tables.get(&last_cache_deletion.table_id));
let table = new_or_existing_table.ok_or(TableNotFound {
db_name: Arc::clone(&self.name),
db_name: Arc::clone(&db_schema.name),
table_name: Arc::clone(&last_cache_deletion.table_name),
})?;
@ -517,13 +536,19 @@ impl DatabaseSchema {
updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table));
}
}
CatalogOp::DeleteDatabase(params) => {
schema_deleted = true;
let deletion_time = Time::from_timestamp_nanos(params.deletion_time);
schema_name =
DatabaseSchema::make_new_schema_name(&params.database_name, deletion_time);
}
}
}
if updated_or_new_tables.is_empty() {
if updated_or_new_tables.is_empty() && !schema_deleted {
Ok(None)
} else {
for (table_id, table_def) in &self.tables {
for (table_id, table_def) in &db_schema.tables {
if !updated_or_new_tables.contains_key(table_id) {
updated_or_new_tables.insert(*table_id, Arc::clone(table_def));
}
@ -536,10 +561,11 @@ impl DatabaseSchema {
.collect();
Ok(Some(Self {
id: self.id,
name: Arc::clone(&self.name),
id: db_schema.id,
name: schema_name,
tables: updated_or_new_tables,
table_map: new_table_maps,
deleted: schema_deleted,
}))
}
}
@ -549,8 +575,7 @@ impl DatabaseSchema {
catalog_batch.database_id,
Arc::clone(&catalog_batch.database_name),
);
let new_db = db_schema
.new_if_updated_from_batch(catalog_batch)?
let new_db = DatabaseSchema::new_if_updated_from_batch(&db_schema, catalog_batch)?
.expect("database must be new");
Ok(new_db)
}
@ -640,6 +665,21 @@ impl DatabaseSchema {
pub fn table_id_to_name(&self, table_id: &TableId) -> Option<Arc<str>> {
self.table_map.get_by_left(table_id).map(Arc::clone)
}
pub fn soft_delete_db(&mut self, deletion_time: Time) {
self.deleted = true;
self.name = DatabaseSchema::make_new_schema_name(&self.name, deletion_time);
}
pub fn make_new_schema_name(name: &str, deletion_time: Time) -> Arc<str> {
Arc::from(format!(
"{}-{}",
name,
deletion_time
.date_time()
.format(SOFT_DB_DELETION_TIME_FORMAT)
))
}
}
#[derive(Debug, Eq, PartialEq, Clone)]
@ -1026,6 +1066,7 @@ mod tests {
map.insert(TableId::from(2), "test_table_2".into());
map
},
deleted: false,
};
use InfluxColumnType::*;
use InfluxFieldType::*;
@ -1106,7 +1147,8 @@ mod tests {
"id": 0,
"name": "db1",
"tables": [],
"table_map": []
"table_map": [],
"deleted": false
}
],
[
@ -1115,7 +1157,8 @@ mod tests {
"id": 0,
"name": "db1",
"tables": [],
"table_map": []
"table_map": [],
"deleted": false
}
]
],
@ -1155,7 +1198,8 @@ mod tests {
"next_column_id": 0
}
]
]
],
"deleted": false
}
]
],
@ -1206,7 +1250,8 @@ mod tests {
]
}
]
]
],
"deleted": false
}
]
]
@ -1223,6 +1268,7 @@ mod tests {
name: "test".into(),
tables: SerdeVecMap::new(),
table_map: BiHashMap::new(),
deleted: false,
};
database.tables.insert(
TableId::from(0),
@ -1279,6 +1325,7 @@ mod tests {
map.insert(TableId::from(1), "test_table_1".into());
map
},
deleted: false,
};
use InfluxColumnType::*;
use InfluxFieldType::*;
@ -1337,6 +1384,7 @@ mod tests {
map.insert(TableId::from(0), "test".into());
map
},
deleted: false,
};
use InfluxColumnType::*;
use InfluxFieldType::*;

View File

@ -38,6 +38,7 @@ struct DatabaseSnapshot {
id: DbId,
name: Arc<str>,
tables: SerdeVecMap<TableId, TableSnapshot>,
deleted: bool,
}
impl From<&DatabaseSchema> for DatabaseSnapshot {
@ -50,6 +51,7 @@ impl From<&DatabaseSchema> for DatabaseSnapshot {
.iter()
.map(|(table_id, table_def)| (*table_id, table_def.as_ref().into()))
.collect(),
deleted: db.deleted,
}
}
}
@ -70,6 +72,7 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
name: snap.name,
tables,
table_map,
deleted: snap.deleted,
}
}
}

View File

@ -249,7 +249,8 @@ expression: catalog
]
}
]
]
],
"deleted": false
}
]
],

View File

@ -106,7 +106,8 @@ expression: catalog
]
}
]
]
],
"deleted": false
}
]
],

View File

@ -95,7 +95,8 @@ expression: catalog
]
}
]
]
],
"deleted": false
}
]
],

View File

@ -259,6 +259,30 @@ impl Client {
}
}
/// Make a request to the `DELETE /api/v3/configure/database?db=foo` API
pub async fn api_v3_configure_db_delete(&self, db: impl AsRef<str> + Send) -> Result<()> {
let api_path = "/api/v3/configure/database";
let url = self.base_url.join(api_path)?;
let mut req = self.http_client.delete(url).query(&[("db", db.as_ref())]);
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
}
let resp = req
.send()
.await
.map_err(|src| Error::request_send(Method::DELETE, api_path, src))?;
let status = resp.status();
match status {
StatusCode::OK => Ok(()),
code => Err(Error::ApiError {
code,
message: resp.text().await.map_err(Error::Text)?,
}),
}
}
/// Send a `/ping` request to the target `influxdb3` server to check its
/// status and gather `version` and `revision` information
pub async fn ping(&self) -> Result<PingResponse> {

View File

@ -121,6 +121,10 @@ pub enum Error {
#[error("error serving http: {0}")]
ServingHttp(#[from] hyper::Error),
/// Missing parameters for query
#[error("missing query parameters 'db'")]
MissingDeleteDatabaseParams,
/// Missing parameters for query
#[error("missing query parameters 'db' and 'q'")]
MissingQueryParams,
@ -223,6 +227,10 @@ impl Error {
fn into_response(self) -> Response<Body> {
debug!(error = ?self, "API error");
match self {
Self::WriteBuffer(err @ WriteBufferError::DatabaseNotFound(_)) => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(err.to_string()))
.unwrap(),
Self::WriteBuffer(WriteBufferError::CatalogUpdateError(
err @ (CatalogError::TooManyDbs
| CatalogError::TooManyColumns
@ -711,7 +719,7 @@ where
let (db_id, db_schema) = self
.write_buffer
.catalog()
.db_schema_and_id(&db)
.db_id_and_schema(&db)
.ok_or_else(|| WriteBufferError::DbDoesNotExist)?;
let (table_id, table_def) = db_schema
.table_definition_and_id(table.as_str())
@ -784,7 +792,7 @@ where
let (db_id, db_schema) = self
.write_buffer
.catalog()
.db_schema_and_id(&db)
.db_id_and_schema(&db)
.ok_or_else(|| WriteBufferError::DbDoesNotExist)?;
let table_id = db_schema
.table_name_to_id(table)
@ -799,6 +807,16 @@ where
.unwrap())
}
async fn delete_database(&self, req: Request<Body>) -> Result<Response<Body>> {
let query = req.uri().query().unwrap_or("");
let delete_req = serde_urlencoded::from_str::<DatabaseDeleteRequest>(query)?;
self.write_buffer.delete_database(delete_req.db).await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap())
}
async fn read_body_json<ReqBody: DeserializeOwned>(
&self,
req: hyper::Request<Body>,
@ -1112,6 +1130,11 @@ struct LastCacheDeleteRequest {
name: String,
}
#[derive(Debug, Deserialize)]
struct DatabaseDeleteRequest {
db: String,
}
pub(crate) async fn route_request<Q: QueryExecutor, T: TimeProvider>(
http_server: Arc<HttpApi<Q, T>>,
mut req: Request<Body>,
@ -1190,6 +1213,7 @@ where
(Method::DELETE, "/api/v3/configure/last_cache") => {
http_server.configure_last_cache_delete(req).await
}
(Method::DELETE, "/api/v3/configure/database") => http_server.delete_database(req).await,
_ => {
let body = Body::from("not found");
Ok(Response::builder()

View File

@ -244,6 +244,7 @@ pub enum CatalogOp {
AddFields(FieldAdditions),
CreateLastCache(LastCacheDefinition),
DeleteLastCache(LastCacheDelete),
DeleteDatabase(DeleteDatabaseDefinition),
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
@ -252,6 +253,13 @@ pub struct DatabaseDefinition {
pub database_name: Arc<str>,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct DeleteDatabaseDefinition {
pub database_id: DbId,
pub database_name: Arc<str>,
pub deletion_time: i64,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct TableDefinition {
pub database_id: DbId,

View File

@ -492,6 +492,12 @@ impl LastCacheProvider {
Ok(())
}
/// Delete all caches for database from the provider
pub fn delete_caches_for_db(&self, db_id: &DbId) {
let mut lock = self.cache_map.write();
lock.remove(db_id);
}
/// Write the contents from a wal file into the cache by iterating over its database and table batches
/// to find entries that belong in the cache.
///
@ -1799,7 +1805,7 @@ mod tests {
.await
.unwrap();
let (db_id, db_schema) = wbuf.catalog().db_schema_and_id("foo").unwrap();
let (db_id, db_schema) = wbuf.catalog().db_id_and_schema("foo").unwrap();
let (tbl_id, table_def) = db_schema.table_definition_and_id("cpu").unwrap();
let col_id = table_def.column_name_to_id("host").unwrap();
@ -1908,7 +1914,7 @@ mod tests {
.await
.unwrap();
let (db_id, db_schema) = wbuf.catalog().db_schema_and_id("foo").unwrap();
let (db_id, db_schema) = wbuf.catalog().db_id_and_schema("foo").unwrap();
let (tbl_id, table_def) = db_schema.table_definition_and_id("cpu").unwrap();
let host_col_id = table_def.column_name_to_id("host").unwrap();
let region_col_id = table_def.column_name_to_id("region").unwrap();
@ -2145,7 +2151,7 @@ mod tests {
.await
.unwrap();
let (db_id, db_schema) = wbuf.catalog().db_schema_and_id("foo").unwrap();
let (db_id, db_schema) = wbuf.catalog().db_id_and_schema("foo").unwrap();
let (tbl_id, table_def) = db_schema.table_definition_and_id("cpu").unwrap();
let host_col_id = table_def.column_name_to_id("host").unwrap();
let region_col_id = table_def.column_name_to_id("region").unwrap();
@ -2327,7 +2333,7 @@ mod tests {
.await
.unwrap();
let (db_id, db_schema) = wbuf.catalog().db_schema_and_id("foo").unwrap();
let (db_id, db_schema) = wbuf.catalog().db_id_and_schema("foo").unwrap();
let (tbl_id, table_def) = db_schema.table_definition_and_id("cpu").unwrap();
let host_col_id = table_def.column_name_to_id("host").unwrap();
let region_col_id = table_def.column_name_to_id("region").unwrap();
@ -2473,7 +2479,7 @@ mod tests {
.await
.unwrap();
let (db_id, db_schema) = wbuf.catalog().db_schema_and_id("cassini_mission").unwrap();
let (db_id, db_schema) = wbuf.catalog().db_id_and_schema("cassini_mission").unwrap();
let (tbl_id, table_def) = db_schema.table_definition_and_id("temp").unwrap();
let component_id_col_id = table_def.column_name_to_id("component_id").unwrap();
let active_col_id = table_def.column_name_to_id("active").unwrap();
@ -2607,7 +2613,7 @@ mod tests {
.await
.unwrap();
let (db_id, db_schema) = wbuf.catalog().db_schema_and_id(db_name).unwrap();
let (db_id, db_schema) = wbuf.catalog().db_id_and_schema(db_name).unwrap();
let (tbl_id, table_def) = db_schema.table_definition_and_id(tbl_name).unwrap();
let state_col_id = table_def.column_name_to_id("state").unwrap();
let county_col_id = table_def.column_name_to_id("county").unwrap();
@ -2745,7 +2751,7 @@ mod tests {
.await
.unwrap();
let (db_id, db_schema) = wbuf.catalog().db_schema_and_id(db_name).unwrap();
let (db_id, db_schema) = wbuf.catalog().db_id_and_schema(db_name).unwrap();
let (tbl_id, table_def) = db_schema.table_definition_and_id(tbl_name).unwrap();
let state_col_id = table_def.column_name_to_id("state").unwrap();
let county_col_id = table_def.column_name_to_id("county").unwrap();
@ -2884,7 +2890,7 @@ mod tests {
.await
.unwrap();
let (db_id, db_schema) = wbuf.catalog().db_schema_and_id(db_name).unwrap();
let (db_id, db_schema) = wbuf.catalog().db_id_and_schema(db_name).unwrap();
let tbl_id = db_schema.table_name_to_id(tbl_name).unwrap();
// Create the last cache using default tags as keys
@ -2953,7 +2959,7 @@ mod tests {
.await
.unwrap();
let (db_id, db_schema) = wbuf.catalog().db_schema_and_id(db_name).unwrap();
let (db_id, db_schema) = wbuf.catalog().db_id_and_schema(db_name).unwrap();
let (tbl_id, table_def) = db_schema.table_definition_and_id(tbl_name).unwrap();
let game_id_col_id = table_def.column_name_to_id("game_id").unwrap();
@ -3054,7 +3060,7 @@ mod tests {
.await
.unwrap();
let (db_id, db_schema) = wbuf.catalog().db_schema_and_id(db_name).unwrap();
let (db_id, db_schema) = wbuf.catalog().db_id_and_schema(db_name).unwrap();
let (tbl_id, table_def) = db_schema.table_definition_and_id(tbl_name).unwrap();
let t1_col_id = table_def.column_name_to_id("t1").unwrap();
@ -3191,7 +3197,7 @@ mod tests {
.await
.unwrap();
let (db_id, db_schema) = wbuf.catalog().db_schema_and_id(db_name).unwrap();
let (db_id, db_schema) = wbuf.catalog().db_id_and_schema(db_name).unwrap();
let (tbl_id, table_def) = db_schema.table_definition_and_id(tbl_name).unwrap();
let t1_col_id = table_def.column_name_to_id("t1").unwrap();
let t2_col_id = table_def.column_name_to_id("t2").unwrap();
@ -3310,6 +3316,7 @@ mod tests {
map.insert(TableId::from(1), "test_table_2".into());
map
},
deleted: false,
};
let table_id = TableId::from(0);
use schema::InfluxColumnType::*;

View File

@ -34,9 +34,6 @@ use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("database not found {db_name}")]
DatabaseNotFound { db_name: String },
#[error("object store path error: {0}")]
ObjStorePath(#[from] object_store::path::Error),
@ -49,7 +46,13 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub trait WriteBuffer: Bufferer + ChunkContainer + LastCacheManager {}
pub trait WriteBuffer: Bufferer + ChunkContainer + LastCacheManager + DatabaseManager {}
/// Database manager - supports only delete operation
#[async_trait::async_trait]
pub trait DatabaseManager: Debug + Send + Sync + 'static {
async fn delete_database(&self, name: String) -> Result<(), write_buffer::Error>;
}
/// The buffer is for buffering data in memory and in the wal before it is persisted as parquet files in storage.
#[async_trait]

View File

@ -5,13 +5,13 @@ pub mod queryable_buffer;
mod table_buffer;
pub mod validator;
use crate::chunk::ParquetChunk;
use crate::last_cache::{self, CreateCacheArguments, LastCacheProvider};
use crate::parquet_cache::ParquetCacheOracle;
use crate::persister::Persister;
use crate::write_buffer::persisted_files::PersistedFiles;
use crate::write_buffer::queryable_buffer::QueryableBuffer;
use crate::write_buffer::validator::WriteValidator;
use crate::{chunk::ParquetChunk, DatabaseManager};
use crate::{
BufferedWriteRequest, Bufferer, ChunkContainer, LastCacheManager, ParquetFile,
PersistedSnapshot, Precision, WriteBuffer, WriteLineError,
@ -27,8 +27,8 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::logical_expr::Expr;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_id::{ColumnId, DbId, TableId};
use influxdb3_wal::object_store::WalObjectStore;
use influxdb3_wal::CatalogOp::CreateLastCache;
use influxdb3_wal::{object_store::WalObjectStore, DeleteDatabaseDefinition};
use influxdb3_wal::{
CatalogBatch, CatalogOp, LastCacheDefinition, LastCacheDelete, Wal, WalConfig, WalFileNotifier,
WalOp,
@ -76,6 +76,9 @@ pub enum Error {
#[error("error in last cache: {0}")]
LastCacheError(#[from] last_cache::Error),
#[error("database not found {0}")]
DatabaseNotFound(String),
#[error("tried accessing database and table that do not exist")]
DbDoesNotExist,
@ -532,6 +535,33 @@ impl LastCacheManager for WriteBufferImpl {
}
}
#[async_trait::async_trait]
impl DatabaseManager for WriteBufferImpl {
async fn delete_database(&self, name: String) -> crate::Result<(), self::Error> {
let (db_id, db_schema) = self
.catalog
.db_id_and_schema(&name)
.ok_or_else(|| self::Error::DatabaseNotFound(name))?;
let deletion_time = self.time_provider.now();
let catalog_batch = CatalogBatch {
time_ns: deletion_time.timestamp_nanos(),
database_id: db_id,
database_name: Arc::clone(&db_schema.name),
ops: vec![CatalogOp::DeleteDatabase(DeleteDatabaseDefinition {
database_id: db_id,
database_name: Arc::clone(&db_schema.name),
deletion_time: deletion_time.timestamp_nanos(),
})],
};
self.catalog.apply_catalog_batch(&catalog_batch)?;
let wal_op = WalOp::Catalog(catalog_batch);
self.wal.write_ops(vec![wal_op]).await?;
debug!(db_id = ?db_id, name = ?&db_schema.name, "successfully deleted database");
Ok(())
}
}
impl WriteBuffer for WriteBufferImpl {}
#[cfg(test)]
@ -1884,6 +1914,34 @@ mod tests {
assert_eq!(0, test_store.head_request_count(&path));
}
#[tokio::test]
async fn test_delete_database() {
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
let test_store = Arc::new(InMemory::new());
let wal_config = WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
};
let (write_buffer, _) =
setup_cache_optional(start_time, test_store, wal_config, false).await;
let _ = write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu,warehouse=us-east,room=01a,device=10001 reading=37\n",
start_time,
false,
Precision::Nanosecond,
)
.await
.unwrap();
let result = write_buffer.delete_database("foo".to_string()).await;
assert!(result.is_ok());
}
struct TestWrite<LP> {
lp: LP,
time_seconds: i64,

View File

@ -329,6 +329,11 @@ impl QueryableBuffer {
) -> tokio::sync::watch::Receiver<Option<PersistedSnapshot>> {
self.persisted_snapshot_notify_rx.clone()
}
pub fn clear_buffer_for_db(&self, db_id: &DbId) {
let mut buffer = self.buffer.write();
buffer.db_to_table.remove(db_id);
}
}
#[async_trait]
@ -373,6 +378,7 @@ impl BufferState {
WalOp::Write(write_batch) => self.add_write_batch(write_batch),
WalOp::Catalog(catalog_batch) => {
self.catalog
// just catalog level changes
.apply_catalog_batch(&catalog_batch)
.expect("catalog batch should apply");
@ -381,6 +387,8 @@ impl BufferState {
.db_schema_by_id(&catalog_batch.database_id)
.expect("database should exist");
// catalog changes that has external actions are applied here
// eg. creating or deleting last cache itself
for op in catalog_batch.ops {
match op {
CatalogOp::CreateLastCache(definition) => {
@ -404,6 +412,11 @@ impl BufferState {
CatalogOp::AddFields(_) => (),
CatalogOp::CreateTable(_) => (),
CatalogOp::CreateDatabase(_) => (),
CatalogOp::DeleteDatabase(db_definition) => {
self.db_to_table.remove(&db_definition.database_id);
last_cache_provider
.delete_caches_for_db(&db_definition.database_id);
}
}
}
}

View File

@ -7,6 +7,7 @@ expression: catalog_json
[
0,
{
"deleted": false,
"id": 0,
"name": "db",
"tables": [

View File

@ -7,6 +7,7 @@ expression: catalog_json
[
0,
{
"deleted": false,
"id": 0,
"name": "db",
"tables": [

View File

@ -7,6 +7,7 @@ expression: catalog_json
[
0,
{
"deleted": false,
"id": 0,
"name": "db",
"tables": [