diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index 0a5077bdfe..2925122603 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -155,6 +155,15 @@ message ReleaseDatabaseResponse { message ClaimDatabaseRequest { bytes uuid = 1; + + // Force this server to claim this database, even if it is + // ostensibly owned by another server. + // + // WARNING: ONLY do this if no other servers are writing to this + // database (for example, the data files have been copied + // somewhere). If another server is currently writing to this + // database, corruption will very likely occur + bool force = 2; } message ClaimDatabaseResponse { diff --git a/influxdb_iox/src/commands/database.rs b/influxdb_iox/src/commands/database.rs index 7aead8b0e1..296defd037 100644 --- a/influxdb_iox/src/commands/database.rs +++ b/influxdb_iox/src/commands/database.rs @@ -179,6 +179,16 @@ struct Release { struct Claim { /// The UUID of the database to claim uuid: Uuid, + + /// Force this server to claim this database, even if it is + /// ostensibly owned by another server. + /// + /// WARNING: ONLY do this if you are sure that no other servers + /// are writing to this database (for example, the data files have + /// been copied somewhere). If another server is currently writing + /// to this database, corruption will very likely occur + #[structopt(long)] + force: bool, } /// All possible subcommands for database @@ -337,7 +347,7 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { } Command::Claim(command) => { let mut client = management::Client::new(connection); - let db_name = client.claim_database(command.uuid).await?; + let db_name = client.claim_database(command.uuid, command.force).await?; println!("Claimed database {}", db_name); } } diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs index 2b8c4724ed..96b47e49da 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs @@ -144,13 +144,13 @@ impl management_service_server::ManagementService for ManagementService { &self, request: Request, ) -> Result, Status> { - let ClaimDatabaseRequest { uuid } = request.into_inner(); + let ClaimDatabaseRequest { uuid, force } = request.into_inner(); let uuid = Uuid::from_slice(&uuid).scope("uuid")?; let db_name = self .server - .claim_database(uuid) + .claim_database(uuid, force) .await .map_err(default_server_error_handler)?; diff --git a/influxdb_iox/tests/end_to_end_cases/management_api.rs b/influxdb_iox/tests/end_to_end_cases/management_api.rs index b5ea65f087..f38e33c009 100644 --- a/influxdb_iox/tests/end_to_end_cases/management_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/management_api.rs @@ -327,11 +327,14 @@ async fn test_create_get_update_release_claim_database() { format!("Resource database/{} not found", db_name) ); - client.claim_database(released_uuid).await.unwrap(); + client.claim_database(released_uuid, false).await.unwrap(); client.get_database(&db_name, false).await.unwrap(); - let err = client.claim_database(released_uuid).await.unwrap_err(); + let err = client + .claim_database(released_uuid, false) + .await + .unwrap_err(); assert_contains!( err.to_string(), format!("Resource database_uuid/{} already exists", released_uuid) @@ -347,7 +350,10 @@ async fn test_create_get_update_release_claim_database() { } let unknown_uuid = Uuid::new_v4(); - let err = client.claim_database(unknown_uuid).await.unwrap_err(); + let err = client + .claim_database(unknown_uuid, false) + .await + .unwrap_err(); assert_contains!( err.to_string(), format!("Resource database_uuid/{} not found", unknown_uuid) @@ -362,7 +368,10 @@ async fn test_create_get_update_release_claim_database() { assert_ne!(released_uuid, newly_created_uuid); - let err = client.claim_database(released_uuid).await.unwrap_err(); + let err = client + .claim_database(released_uuid, false) + .await + .unwrap_err(); assert_contains!( err.to_string(), format!("Resource database/{} already exists", db_name) @@ -449,7 +458,7 @@ async fn claim_database() { let deleted_uuid = client.release_database(&db_name, None).await.unwrap(); assert_eq!(created_uuid, deleted_uuid); - client.claim_database(deleted_uuid).await.unwrap(); + client.claim_database(deleted_uuid, false).await.unwrap(); // Claimed database is back in this server's database list assert_eq!( @@ -466,7 +475,10 @@ async fn claim_database() { ); // Claiming the same database again is an error - let err = client.claim_database(deleted_uuid).await.unwrap_err(); + let err = client + .claim_database(deleted_uuid, false) + .await + .unwrap_err(); assert_contains!( err.to_string(), format!("Resource database_uuid/{} already exists", deleted_uuid) diff --git a/influxdb_iox/tests/end_to_end_cases/management_cli.rs b/influxdb_iox/tests/end_to_end_cases/management_cli.rs index c7de9fb72b..6360c8f2d7 100644 --- a/influxdb_iox/tests/end_to_end_cases/management_cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/management_cli.rs @@ -13,7 +13,7 @@ use generated_types::{ influxdata::iox::management::v1::{operation_metadata::Job, WipePreservedCatalog}, }; use predicates::prelude::*; -use std::{sync::Arc, time::Duration}; +use std::{path::PathBuf, sync::Arc, time::Duration}; use tempfile::TempDir; use test_helpers::make_temp_file; use uuid::Uuid; @@ -669,6 +669,209 @@ async fn claim_database() { ))); } +#[tokio::test] +async fn force_claim_database() { + let server_fixture = ServerFixture::create_shared(ServerType::Database).await; + let addr = server_fixture.grpc_base(); + let db_name = rand_name(); + let db = &db_name; + + // Create a database on the server + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("create") + .arg(db) + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains("Created")); + + // Release database returns the UUID + let stdout = String::from_utf8( + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("release") + .arg(db) + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains(format!( + "Released database {}", + db + ))) + .get_output() + .stdout + .clone(), + ) + .unwrap(); + let db_uuid = stdout.lines().last().unwrap().trim(); + + // delete the owner file /dbs//owner.pb + let mut owner_file: PathBuf = server_fixture.dir().into(); + owner_file.push("dbs"); + owner_file.push(db_uuid); + owner_file.push("owner.pb"); + + println!("Deleting {:?}", owner_file); + Command::new("rm") + .arg(owner_file.to_string_lossy().to_string()) + .assert() + .success(); + + // Claiming db will now not work (no owner file) + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("claim") + .arg(db_uuid) + .arg("--host") + .arg(addr) + .assert() + .failure() + .stderr(predicate::str::contains("owner.pb not found")); + + // But does work when --force is supplied + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("claim") + .arg(db_uuid) + .arg("--host") + .arg(addr) + .arg("--force") + .assert() + .success() + .stdout(predicate::str::contains(format!( + "Claimed database {}", + db_name + ))); +} + +#[tokio::test] +async fn migrate_database_files_from_one_server_to_another() { + let server_fixture = ServerFixture::create_single_use(ServerType::Database).await; + let addr = server_fixture.grpc_base(); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("server") + .arg("set") + .arg("3113") + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains("Ok")); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("server") + .arg("wait-server-initialized") + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains("Server initialized.")); + + let db_name = rand_name(); + let db = &db_name; + + // Create a database on one server + let stdout = String::from_utf8( + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("create") + .arg(db) + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains("Created")) + .get_output() + .stdout + .clone(), + ) + .unwrap(); + + let db_uuid = stdout.lines().last().unwrap().trim(); + + // figure out where the database lives and copy its data to a temporary directory, + // as you might copy data from remote object storage to local disk for debugging. + + // Assume data layout is /dbs/ + let mut source_dir: PathBuf = server_fixture.dir().into(); + source_dir.push("dbs"); + source_dir.push(db_uuid); + + let tmp_dir = TempDir::new().expect("making tmp dir"); + let target_dir = tmp_dir.path(); + println!("Copying data from {:?} to {:?}", source_dir, target_dir); + + Command::new("cp") + .arg("-R") + .arg(source_dir.to_string_lossy().to_string()) + .arg(target_dir.to_string_lossy().to_string()) + .assert() + .success(); + + // stop the first server (note this call blocks until the process stops) + std::mem::drop(server_fixture); + + // Now start another server that can claim the database + let server_fixture = ServerFixture::create_shared(ServerType::Database).await; + let addr = server_fixture.grpc_base(); + + // copy the data from tmp_dir/ to the new server's location + let mut source_dir: PathBuf = tmp_dir.path().into(); + source_dir.push(db_uuid); + + let mut target_dir: PathBuf = server_fixture.dir().into(); + target_dir.push("dbs"); + + println!("Copying data from {:?} to {:?}", source_dir, target_dir); + Command::new("cp") + .arg("-R") + .arg(source_dir.to_string_lossy().to_string()) + .arg(target_dir.to_string_lossy().to_string()) + .assert() + .success(); + + // Claiming without --force doesn't work as owner.pb still record the other server owning it + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("claim") + .arg(db_uuid) + .arg("--host") + .arg(addr) + .assert() + .failure() + .stderr(predicate::str::contains( + "is already owned by the server with ID 3113", + )); + + // however with --force the owner.pb file is updated forcibly + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("claim") + .arg(db_uuid) + .arg("--host") + .arg(addr) + .arg("--force") // sudo make me a sandwich + .assert() + .success() + .stdout(predicate::str::contains(format!( + "Claimed database {}", + db_name + ))); +} + #[tokio::test] async fn test_get_chunks() { let server_fixture = ServerFixture::create_shared(ServerType::Database).await; diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index 5bcb91cd41..ed3efd287c 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -194,12 +194,21 @@ impl Client { } /// Claim database - pub async fn claim_database(&mut self, uuid: Uuid) -> Result { + /// + /// if `force` is true, forces the server to claim this database, even if it is + /// ostensibly owned by another server. + /// + /// WARNING: If another server is currently writing to this + /// database, corruption will very likely occur. + pub async fn claim_database(&mut self, uuid: Uuid, force: bool) -> Result { let uuid_bytes = uuid.as_bytes().to_vec(); let response = self .inner - .claim_database(ClaimDatabaseRequest { uuid: uuid_bytes }) + .claim_database(ClaimDatabaseRequest { + uuid: uuid_bytes, + force, + }) .await?; Ok(response.into_inner().db_name) diff --git a/server/src/database.rs b/server/src/database.rs index 6130b97771..12b82cceac 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -252,15 +252,20 @@ impl Database { Ok(uuid) } - /// Create an claimed database without any state. Returns its location in object storage - /// for saving in the server config file. + /// Create an claimed database without any state. Returns its + /// location in object storage for saving in the server config + /// file. + /// + /// if `force` is true, a missing owner info or owner info that is + /// for the wrong server id are ignored (do not cause errors) pub async fn claim( application: Arc, db_name: &DatabaseName<'static>, uuid: Uuid, server_id: ServerId, + force: bool, ) -> Result { - info!(%db_name, %uuid, "claiming database"); + info!(%db_name, %uuid, %force, "claiming database"); let iox_object_store = IoxObjectStore::load(Arc::clone(application.object_store()), uuid) .await @@ -268,15 +273,42 @@ impl Database { let owner_info = fetch_owner_info(&iox_object_store) .await - .context(FetchingOwnerInfo)?; + .context(FetchingOwnerInfo); - ensure!( - owner_info.id == 0, - CantClaimDatabaseCurrentlyOwned { - uuid, - server_id: owner_info.id + // try to recreate owner_info if force is specified + let owner_info = match owner_info { + Err(_) if force => { + warn!("Attempting to recreate missing owner info due to force"); + + let server_location = + IoxObjectStore::server_config_path(application.object_store(), server_id) + .to_string(); + + create_owner_info(server_id, server_location, &iox_object_store) + .await + .context(CreatingOwnerInfo)?; + + fetch_owner_info(&iox_object_store) + .await + .context(FetchingOwnerInfo) } - ); + t => t, + }?; + + if owner_info.id != 0 { + if !force { + return CantClaimDatabaseCurrentlyOwned { + uuid, + server_id: owner_info.id, + } + .fail(); + } else { + warn!( + owner_id = owner_info.id, + "Ignoring owner info mismatch due to force" + ); + } + } let database_location = iox_object_store.root_path(); let server_location = @@ -1608,7 +1640,7 @@ mod tests { .to_string(); let uuid = database.release().await.unwrap(); - Database::claim(application, db_name, uuid, new_server_id) + Database::claim(application, db_name, uuid, new_server_id, false) .await .unwrap(); diff --git a/server/src/lib.rs b/server/src/lib.rs index 2edbeddb8f..95c4ce8c51 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -699,8 +699,8 @@ impl Server { /// * No database with this UUID can be found /// * There's already an active database with this name /// * This database is already owned by this server - /// * This database is already owned by a different server - pub async fn claim_database(&self, uuid: Uuid) -> Result> { + /// * This database is already owned by a different server (unless force is true) + pub async fn claim_database(&self, uuid: Uuid, force: bool) -> Result> { // Wait for exclusive access to mutate server state let handle_fut = self.shared.state.read().freeze(); let handle = handle_fut.await; @@ -742,6 +742,7 @@ impl Server { &db_name, uuid, server_id, + force, ) .await .context(CannotClaimDatabase)?; @@ -2079,7 +2080,7 @@ mod tests { let released_uuid = server.release_database(&foo_db_name, None).await.unwrap(); // claim database by UUID - server.claim_database(released_uuid).await.unwrap(); + server.claim_database(released_uuid, false).await.unwrap(); let claimed = server.database(&foo_db_name).unwrap(); claimed.wait_for_init().await.unwrap(); @@ -2104,13 +2105,13 @@ mod tests { server.wait_for_init().await.unwrap(); assert_error!( - server.claim_database(invalid_uuid).await, + server.claim_database(invalid_uuid, false).await, Error::DatabaseUuidNotFound { .. }, ); } - #[tokio::test] - async fn cant_claim_database_owned_by_another_server() { + /// create servers (1 and 2) with a database on server 1 + async fn make_2_servers() -> (Arc, Arc, DatabaseName<'static>, Uuid) { let application = make_application(); let server_id1 = ServerId::try_from(1).unwrap(); let server_id2 = ServerId::try_from(2).unwrap(); @@ -2132,19 +2133,49 @@ mod tests { server2.set_id(server_id2).unwrap(); server2.wait_for_init().await.unwrap(); + (server1, server2, foo_db_name, uuid) + } + + #[tokio::test] + async fn cant_claim_database_owned_by_another_server() { + let (server1, server2, db_name, db_uuid) = make_2_servers().await; + // Attempting to claim on server 2 will fail assert_error!( - server2.claim_database(uuid).await, + server2.claim_database(db_uuid, false).await, Error::CannotClaimDatabase { source: database::InitError::CantClaimDatabaseCurrentlyOwned { server_id, .. } - } if server_id == server_id1.get_u32() + } if server_id == server1.server_id().unwrap().get_u32() ); // Have to release from server 1 first - server1.release_database(&foo_db_name, None).await.unwrap(); + server1.release_database(&db_name, None).await.unwrap(); // Then claiming on server 2 will work - server2.claim_database(uuid).await.unwrap(); + server2.claim_database(db_uuid, false).await.unwrap(); + } + + #[tokio::test] + async fn can_force_claim_database_owned_by_another_server() { + let (server1, server2, _db_name, db_uuid) = make_2_servers().await; + + // shutdown server 1 + server1.shutdown(); + server1 + .join() + .await + .expect("Server successfully terminated"); + + // Attempting to claim on server 2 will fail + assert_error!( + server2.claim_database(db_uuid, false).await, + Error::CannotClaimDatabase { + source: database::InitError::CantClaimDatabaseCurrentlyOwned { server_id, .. } + } if server_id == server1.server_id().unwrap().get_u32() + ); + + // Then claiming on server 2 with `force=true` will work + server2.claim_database(db_uuid, true).await.unwrap(); } #[tokio::test]