diff --git a/influxdb_iox/tests/end_to_end_cases/database_migration.rs b/influxdb_iox/tests/end_to_end_cases/database_migration.rs index 96ec916426..d3e16f7825 100644 --- a/influxdb_iox/tests/end_to_end_cases/database_migration.rs +++ b/influxdb_iox/tests/end_to_end_cases/database_migration.rs @@ -1,18 +1,47 @@ -use std::path::PathBuf; +//! Contains tests using the CLI and other tools to test scenarios for +//! moving data from one server/database to another + +use std::path::{Path, PathBuf}; use assert_cmd::Command; use predicates::prelude::*; use tempfile::TempDir; +use uuid::Uuid; use crate::{ common::server_fixture::{ServerFixture, ServerType}, - end_to_end_cases::scenario::rand_name, + end_to_end_cases::scenario::{data_dir, db_data_dir, Scenario}, }; -/// Contains tests using the CLI and other tools to test scenarios for -/// moving data from one server/database to another -#[tokio::test] -async fn migrate_database_files_from_one_server_to_another() { +/// Copy the `source_dir` directory into the `target_dir` directory using the `cp` command +fn cp_dir(source_dir: impl AsRef, target_dir: impl AsRef) { + let source_dir = source_dir.as_ref(); + let target_dir = target_dir.as_ref(); + + // needed so that if the target server has had no databases + // created yet, it will have no `data` directory. See #3292 + println!("Ensuring {:?} directory exists", target_dir); + Command::new("mkdir") + .arg("-p") + .arg(target_dir.to_string_lossy().to_string()) + .assert() + .success(); + + println!("Copying data from {:?} to {:?}", source_dir, target_dir); + + Command::new("cp") + .arg("-R") + .arg(source_dir.to_string_lossy().to_string()) + .arg(target_dir.to_string_lossy().to_string()) + .assert() + .success(); +} + +/// Creates a new database on a shared server, writes data to it, +/// shuts it down cleanly, and copies the files to Tempdir/uuid +/// +/// Returns (db_name, uuid, tmp_dir) +async fn create_copied_database() -> (String, Uuid, TempDir) { let server_fixture = ServerFixture::create_single_use(ServerType::Database).await; let addr = server_fixture.grpc_base(); @@ -37,76 +66,45 @@ async fn migrate_database_files_from_one_server_to_another() { .success() .stdout(predicate::str::contains("Server initialized.")); - let db_name = rand_name(); - let db = &db_name; + let mut management_client = server_fixture.management_client(); + let scenario = Scenario::new(); + let (db_name, db_uuid) = scenario.create_database(&mut management_client).await; - // Create a database on one server - let stdout = String::from_utf8( - Command::cargo_bin("influxdb_iox") - .unwrap() - .arg("database") - .arg("create") - .arg(db) - .arg("--host") - .arg(addr) - .assert() - .success() - .stdout(predicate::str::contains("Created")) - .get_output() - .stdout - .clone(), - ) - .unwrap(); - - let db_uuid = stdout.lines().last().unwrap().trim(); + // todo write data and force it to be written to disk // figure out where the database lives and copy its data to a temporary directory, // as you might copy data from remote object storage to local disk for debugging. - - // Assume data layout is /dbs/ - let mut source_dir: PathBuf = server_fixture.dir().into(); - source_dir.push("dbs"); - source_dir.push(db_uuid); - + let source_dir = db_data_dir(server_fixture.dir(), db_uuid); let tmp_dir = TempDir::new().expect("making tmp dir"); - let target_dir = tmp_dir.path(); - println!("Copying data from {:?} to {:?}", source_dir, target_dir); - - Command::new("cp") - .arg("-R") - .arg(source_dir.to_string_lossy().to_string()) - .arg(target_dir.to_string_lossy().to_string()) - .assert() - .success(); + cp_dir(source_dir, tmp_dir.path()); // stop the first server (note this call blocks until the process stops) std::mem::drop(server_fixture); + (db_name.to_string(), db_uuid, tmp_dir) +} + +#[tokio::test] +async fn migrate_database_files_from_one_server_to_another() { + let (db_name, db_uuid, tmp_dir) = create_copied_database().await; + // Now start another server that can claim the database let server_fixture = ServerFixture::create_shared(ServerType::Database).await; let addr = server_fixture.grpc_base(); // copy the data from tmp_dir/ to the new server's location let mut source_dir: PathBuf = tmp_dir.path().into(); - source_dir.push(db_uuid); + source_dir.push(db_uuid.to_string()); - let mut target_dir: PathBuf = server_fixture.dir().into(); - target_dir.push("dbs"); - - println!("Copying data from {:?} to {:?}", source_dir, target_dir); - Command::new("cp") - .arg("-R") - .arg(source_dir.to_string_lossy().to_string()) - .arg(target_dir.to_string_lossy().to_string()) - .assert() - .success(); + let target_dir = data_dir(server_fixture.dir()); + cp_dir(source_dir, &target_dir); // Claiming without --force doesn't work as owner.pb still record the other server owning it Command::cargo_bin("influxdb_iox") .unwrap() .arg("database") .arg("claim") - .arg(db_uuid) + .arg(db_uuid.to_string()) .arg("--host") .arg(addr) .assert() @@ -120,7 +118,7 @@ async fn migrate_database_files_from_one_server_to_another() { .unwrap() .arg("database") .arg("claim") - .arg(db_uuid) + .arg(db_uuid.to_string()) .arg("--host") .arg(addr) .arg("--force") // sudo make me a sandwich diff --git a/influxdb_iox/tests/end_to_end_cases/scenario.rs b/influxdb_iox/tests/end_to_end_cases/scenario.rs index fdbae36ff9..508c1d0221 100644 --- a/influxdb_iox/tests/end_to_end_cases/scenario.rs +++ b/influxdb_iox/tests/end_to_end_cases/scenario.rs @@ -16,7 +16,10 @@ use generated_types::{ use influxdb_iox_client::{ connection::Connection, flight::PerformQuery, - management::generated_types::{partition_template, WriteBufferConnection}, + management::{ + self, + generated_types::{partition_template, WriteBufferConnection}, + }, }; use prost::Message; use rand::{ @@ -24,12 +27,20 @@ use rand::{ thread_rng, Rng, }; use std::{ - collections::HashMap, convert::TryInto, num::NonZeroU32, path::Path, str, sync::Arc, - time::Duration, time::SystemTime, u32, + collections::HashMap, + convert::TryInto, + num::NonZeroU32, + path::{Path, PathBuf}, + str, + sync::Arc, + time::Duration, + time::SystemTime, + u32, }; use tempfile::TempDir; use test_helpers::assert_contains; use time::SystemProvider; +use uuid::Uuid; use write_buffer::{ core::{WriteBufferReading, WriteBufferWriting}, file::{FileBufferConsumer, FileBufferProducer}, @@ -116,16 +127,24 @@ impl Scenario { }) } - /// Creates the database on the server for this scenario - pub async fn create_database(&self, client: &mut influxdb_iox_client::management::Client) { - client + /// Creates the database on the server for this scenario, + /// returning (name, uuid) + pub async fn create_database( + &self, + client: &mut management::Client, + ) -> (DatabaseName<'_>, Uuid) { + let db_name = self.database_name(); + + let db_uuid = client .create_database(DatabaseRules { - name: self.database_name().to_string(), + name: db_name.to_string(), lifecycle_rules: Some(Default::default()), ..Default::default() }) .await .unwrap(); + + (db_name, db_uuid) } pub async fn load_data(&self, client: &mut influxdb_iox_client::write::Client) -> Vec { @@ -269,6 +288,24 @@ pub fn rand_id() -> String { .collect() } +/// Return the path that the database stores data for all databases: +/// `/dbs` +pub fn data_dir(server_path: impl AsRef) -> PathBuf { + // Assume data layout is /dbs/ + let mut data_dir: PathBuf = server_path.as_ref().into(); + data_dir.push("dbs"); + data_dir +} + +/// Return the path that the database with stores its data: +/// `/dbs/` +pub fn db_data_dir(server_path: impl AsRef, db_uuid: Uuid) -> PathBuf { + // Assume data layout is /dbs/ + let mut data_dir = data_dir(server_path); + data_dir.push(db_uuid.to_string()); + data_dir +} + pub struct DatabaseBuilder { name: String, partition_template: PartitionTemplate, @@ -345,7 +382,7 @@ impl DatabaseBuilder { self, channel: Connection, ) -> Result<(), influxdb_iox_client::error::Error> { - let mut management_client = influxdb_iox_client::management::Client::new(channel); + let mut management_client = management::Client::new(channel); management_client .create_database(DatabaseRules { @@ -377,7 +414,7 @@ pub async fn create_readable_database(db_name: impl Into, channel: Conne /// given a channel to talk with the management api, create a new /// database with no mutable buffer configured, no partitioning rules pub async fn create_unreadable_database(db_name: impl Into, channel: Connection) { - let mut management_client = influxdb_iox_client::management::Client::new(channel); + let mut management_client = management::Client::new(channel); let rules = DatabaseRules { name: db_name.into(),