fix: fix flay migrate_database_files_from_one_server_to_another and prepare for reuse + fix (#3314)
* fix: fix flay migrate_database_files_from_one_server_to_another and prepare for reuse + fix * docs: clarify commentspull/24376/head
parent
468af9b8a9
commit
5316037ffe
|
@ -1,18 +1,47 @@
|
||||||
use std::path::PathBuf;
|
//! Contains tests using the CLI and other tools to test scenarios for
|
||||||
|
//! moving data from one server/database to another
|
||||||
|
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
use assert_cmd::Command;
|
use assert_cmd::Command;
|
||||||
use predicates::prelude::*;
|
use predicates::prelude::*;
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
common::server_fixture::{ServerFixture, ServerType},
|
common::server_fixture::{ServerFixture, ServerType},
|
||||||
end_to_end_cases::scenario::rand_name,
|
end_to_end_cases::scenario::{data_dir, db_data_dir, Scenario},
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Contains tests using the CLI and other tools to test scenarios for
|
/// Copy the `source_dir` directory into the `target_dir` directory using the `cp` command
|
||||||
/// moving data from one server/database to another
|
fn cp_dir(source_dir: impl AsRef<Path>, target_dir: impl AsRef<Path>) {
|
||||||
#[tokio::test]
|
let source_dir = source_dir.as_ref();
|
||||||
async fn migrate_database_files_from_one_server_to_another() {
|
let target_dir = target_dir.as_ref();
|
||||||
|
|
||||||
|
// needed so that if the target server has had no databases
|
||||||
|
// created yet, it will have no `data` directory. See #3292
|
||||||
|
println!("Ensuring {:?} directory exists", target_dir);
|
||||||
|
Command::new("mkdir")
|
||||||
|
.arg("-p")
|
||||||
|
.arg(target_dir.to_string_lossy().to_string())
|
||||||
|
.assert()
|
||||||
|
.success();
|
||||||
|
|
||||||
|
println!("Copying data from {:?} to {:?}", source_dir, target_dir);
|
||||||
|
|
||||||
|
Command::new("cp")
|
||||||
|
.arg("-R")
|
||||||
|
.arg(source_dir.to_string_lossy().to_string())
|
||||||
|
.arg(target_dir.to_string_lossy().to_string())
|
||||||
|
.assert()
|
||||||
|
.success();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a new database on a shared server, writes data to it,
|
||||||
|
/// shuts it down cleanly, and copies the files to Tempdir/uuid
|
||||||
|
///
|
||||||
|
/// Returns (db_name, uuid, tmp_dir)
|
||||||
|
async fn create_copied_database() -> (String, Uuid, TempDir) {
|
||||||
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
|
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
|
||||||
let addr = server_fixture.grpc_base();
|
let addr = server_fixture.grpc_base();
|
||||||
|
|
||||||
|
@ -37,76 +66,45 @@ async fn migrate_database_files_from_one_server_to_another() {
|
||||||
.success()
|
.success()
|
||||||
.stdout(predicate::str::contains("Server initialized."));
|
.stdout(predicate::str::contains("Server initialized."));
|
||||||
|
|
||||||
let db_name = rand_name();
|
let mut management_client = server_fixture.management_client();
|
||||||
let db = &db_name;
|
let scenario = Scenario::new();
|
||||||
|
let (db_name, db_uuid) = scenario.create_database(&mut management_client).await;
|
||||||
|
|
||||||
// Create a database on one server
|
// todo write data and force it to be written to disk
|
||||||
let stdout = String::from_utf8(
|
|
||||||
Command::cargo_bin("influxdb_iox")
|
|
||||||
.unwrap()
|
|
||||||
.arg("database")
|
|
||||||
.arg("create")
|
|
||||||
.arg(db)
|
|
||||||
.arg("--host")
|
|
||||||
.arg(addr)
|
|
||||||
.assert()
|
|
||||||
.success()
|
|
||||||
.stdout(predicate::str::contains("Created"))
|
|
||||||
.get_output()
|
|
||||||
.stdout
|
|
||||||
.clone(),
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let db_uuid = stdout.lines().last().unwrap().trim();
|
|
||||||
|
|
||||||
// figure out where the database lives and copy its data to a temporary directory,
|
// figure out where the database lives and copy its data to a temporary directory,
|
||||||
// as you might copy data from remote object storage to local disk for debugging.
|
// as you might copy data from remote object storage to local disk for debugging.
|
||||||
|
let source_dir = db_data_dir(server_fixture.dir(), db_uuid);
|
||||||
// Assume data layout is <dir>/dbs/<uuid>
|
|
||||||
let mut source_dir: PathBuf = server_fixture.dir().into();
|
|
||||||
source_dir.push("dbs");
|
|
||||||
source_dir.push(db_uuid);
|
|
||||||
|
|
||||||
let tmp_dir = TempDir::new().expect("making tmp dir");
|
let tmp_dir = TempDir::new().expect("making tmp dir");
|
||||||
let target_dir = tmp_dir.path();
|
cp_dir(source_dir, tmp_dir.path());
|
||||||
println!("Copying data from {:?} to {:?}", source_dir, target_dir);
|
|
||||||
|
|
||||||
Command::new("cp")
|
|
||||||
.arg("-R")
|
|
||||||
.arg(source_dir.to_string_lossy().to_string())
|
|
||||||
.arg(target_dir.to_string_lossy().to_string())
|
|
||||||
.assert()
|
|
||||||
.success();
|
|
||||||
|
|
||||||
// stop the first server (note this call blocks until the process stops)
|
// stop the first server (note this call blocks until the process stops)
|
||||||
std::mem::drop(server_fixture);
|
std::mem::drop(server_fixture);
|
||||||
|
|
||||||
|
(db_name.to_string(), db_uuid, tmp_dir)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn migrate_database_files_from_one_server_to_another() {
|
||||||
|
let (db_name, db_uuid, tmp_dir) = create_copied_database().await;
|
||||||
|
|
||||||
// Now start another server that can claim the database
|
// Now start another server that can claim the database
|
||||||
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
||||||
let addr = server_fixture.grpc_base();
|
let addr = server_fixture.grpc_base();
|
||||||
|
|
||||||
// copy the data from tmp_dir/<uuid> to the new server's location
|
// copy the data from tmp_dir/<uuid> to the new server's location
|
||||||
let mut source_dir: PathBuf = tmp_dir.path().into();
|
let mut source_dir: PathBuf = tmp_dir.path().into();
|
||||||
source_dir.push(db_uuid);
|
source_dir.push(db_uuid.to_string());
|
||||||
|
|
||||||
let mut target_dir: PathBuf = server_fixture.dir().into();
|
let target_dir = data_dir(server_fixture.dir());
|
||||||
target_dir.push("dbs");
|
cp_dir(source_dir, &target_dir);
|
||||||
|
|
||||||
println!("Copying data from {:?} to {:?}", source_dir, target_dir);
|
|
||||||
Command::new("cp")
|
|
||||||
.arg("-R")
|
|
||||||
.arg(source_dir.to_string_lossy().to_string())
|
|
||||||
.arg(target_dir.to_string_lossy().to_string())
|
|
||||||
.assert()
|
|
||||||
.success();
|
|
||||||
|
|
||||||
// Claiming without --force doesn't work as owner.pb still record the other server owning it
|
// Claiming without --force doesn't work as owner.pb still record the other server owning it
|
||||||
Command::cargo_bin("influxdb_iox")
|
Command::cargo_bin("influxdb_iox")
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.arg("database")
|
.arg("database")
|
||||||
.arg("claim")
|
.arg("claim")
|
||||||
.arg(db_uuid)
|
.arg(db_uuid.to_string())
|
||||||
.arg("--host")
|
.arg("--host")
|
||||||
.arg(addr)
|
.arg(addr)
|
||||||
.assert()
|
.assert()
|
||||||
|
@ -120,7 +118,7 @@ async fn migrate_database_files_from_one_server_to_another() {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.arg("database")
|
.arg("database")
|
||||||
.arg("claim")
|
.arg("claim")
|
||||||
.arg(db_uuid)
|
.arg(db_uuid.to_string())
|
||||||
.arg("--host")
|
.arg("--host")
|
||||||
.arg(addr)
|
.arg(addr)
|
||||||
.arg("--force") // sudo make me a sandwich
|
.arg("--force") // sudo make me a sandwich
|
||||||
|
|
|
@ -16,7 +16,10 @@ use generated_types::{
|
||||||
use influxdb_iox_client::{
|
use influxdb_iox_client::{
|
||||||
connection::Connection,
|
connection::Connection,
|
||||||
flight::PerformQuery,
|
flight::PerformQuery,
|
||||||
management::generated_types::{partition_template, WriteBufferConnection},
|
management::{
|
||||||
|
self,
|
||||||
|
generated_types::{partition_template, WriteBufferConnection},
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use rand::{
|
use rand::{
|
||||||
|
@ -24,12 +27,20 @@ use rand::{
|
||||||
thread_rng, Rng,
|
thread_rng, Rng,
|
||||||
};
|
};
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap, convert::TryInto, num::NonZeroU32, path::Path, str, sync::Arc,
|
collections::HashMap,
|
||||||
time::Duration, time::SystemTime, u32,
|
convert::TryInto,
|
||||||
|
num::NonZeroU32,
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
str,
|
||||||
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
|
time::SystemTime,
|
||||||
|
u32,
|
||||||
};
|
};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use test_helpers::assert_contains;
|
use test_helpers::assert_contains;
|
||||||
use time::SystemProvider;
|
use time::SystemProvider;
|
||||||
|
use uuid::Uuid;
|
||||||
use write_buffer::{
|
use write_buffer::{
|
||||||
core::{WriteBufferReading, WriteBufferWriting},
|
core::{WriteBufferReading, WriteBufferWriting},
|
||||||
file::{FileBufferConsumer, FileBufferProducer},
|
file::{FileBufferConsumer, FileBufferProducer},
|
||||||
|
@ -116,16 +127,24 @@ impl Scenario {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates the database on the server for this scenario
|
/// Creates the database on the server for this scenario,
|
||||||
pub async fn create_database(&self, client: &mut influxdb_iox_client::management::Client) {
|
/// returning (name, uuid)
|
||||||
client
|
pub async fn create_database(
|
||||||
|
&self,
|
||||||
|
client: &mut management::Client,
|
||||||
|
) -> (DatabaseName<'_>, Uuid) {
|
||||||
|
let db_name = self.database_name();
|
||||||
|
|
||||||
|
let db_uuid = client
|
||||||
.create_database(DatabaseRules {
|
.create_database(DatabaseRules {
|
||||||
name: self.database_name().to_string(),
|
name: db_name.to_string(),
|
||||||
lifecycle_rules: Some(Default::default()),
|
lifecycle_rules: Some(Default::default()),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
(db_name, db_uuid)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn load_data(&self, client: &mut influxdb_iox_client::write::Client) -> Vec<String> {
|
pub async fn load_data(&self, client: &mut influxdb_iox_client::write::Client) -> Vec<String> {
|
||||||
|
@ -269,6 +288,24 @@ pub fn rand_id() -> String {
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the path that the database stores data for all databases:
|
||||||
|
/// `<server_path>/dbs`
|
||||||
|
pub fn data_dir(server_path: impl AsRef<Path>) -> PathBuf {
|
||||||
|
// Assume data layout is <dir>/dbs/<uuid>
|
||||||
|
let mut data_dir: PathBuf = server_path.as_ref().into();
|
||||||
|
data_dir.push("dbs");
|
||||||
|
data_dir
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the path that the database with <uuid> stores its data:
|
||||||
|
/// `<server_path>/dbs/<uuid>`
|
||||||
|
pub fn db_data_dir(server_path: impl AsRef<Path>, db_uuid: Uuid) -> PathBuf {
|
||||||
|
// Assume data layout is <dir>/dbs/<uuid>
|
||||||
|
let mut data_dir = data_dir(server_path);
|
||||||
|
data_dir.push(db_uuid.to_string());
|
||||||
|
data_dir
|
||||||
|
}
|
||||||
|
|
||||||
pub struct DatabaseBuilder {
|
pub struct DatabaseBuilder {
|
||||||
name: String,
|
name: String,
|
||||||
partition_template: PartitionTemplate,
|
partition_template: PartitionTemplate,
|
||||||
|
@ -345,7 +382,7 @@ impl DatabaseBuilder {
|
||||||
self,
|
self,
|
||||||
channel: Connection,
|
channel: Connection,
|
||||||
) -> Result<(), influxdb_iox_client::error::Error> {
|
) -> Result<(), influxdb_iox_client::error::Error> {
|
||||||
let mut management_client = influxdb_iox_client::management::Client::new(channel);
|
let mut management_client = management::Client::new(channel);
|
||||||
|
|
||||||
management_client
|
management_client
|
||||||
.create_database(DatabaseRules {
|
.create_database(DatabaseRules {
|
||||||
|
@ -377,7 +414,7 @@ pub async fn create_readable_database(db_name: impl Into<String>, channel: Conne
|
||||||
/// given a channel to talk with the management api, create a new
|
/// given a channel to talk with the management api, create a new
|
||||||
/// database with no mutable buffer configured, no partitioning rules
|
/// database with no mutable buffer configured, no partitioning rules
|
||||||
pub async fn create_unreadable_database(db_name: impl Into<String>, channel: Connection) {
|
pub async fn create_unreadable_database(db_name: impl Into<String>, channel: Connection) {
|
||||||
let mut management_client = influxdb_iox_client::management::Client::new(channel);
|
let mut management_client = management::Client::new(channel);
|
||||||
|
|
||||||
let rules = DatabaseRules {
|
let rules = DatabaseRules {
|
||||||
name: db_name.into(),
|
name: db_name.into(),
|
||||||
|
|
Loading…
Reference in New Issue