feat: Set up CLI args for new get-table command
parent
71770486af
commit
8697ef4967
|
@ -29,12 +29,12 @@ pub struct Config {
|
||||||
command: Command,
|
command: Command,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// All possible subcommands for catalog
|
/// All possible subcommands for remote
|
||||||
#[derive(Debug, clap::Parser)]
|
#[derive(Debug, clap::Parser)]
|
||||||
enum Command {
|
enum Command {
|
||||||
/// Get partition data
|
/// Get partition data
|
||||||
Partition(partition::Config),
|
Partition(partition::Config),
|
||||||
/// Get parquet files from the object store
|
/// Get Parquet files from the object store
|
||||||
Store(store::Config),
|
Store(store::Config),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,8 +3,10 @@
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use influxdb_iox_client::{connection::Connection, store};
|
use influxdb_iox_client::{connection::Connection, store};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::fs::File;
|
use tokio::{
|
||||||
use tokio::io::AsyncWriteExt;
|
fs::{self, File},
|
||||||
|
io::AsyncWriteExt,
|
||||||
|
};
|
||||||
|
|
||||||
#[allow(clippy::enum_variant_names)]
|
#[allow(clippy::enum_variant_names)]
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
|
@ -26,10 +28,10 @@ pub struct Config {
|
||||||
command: Command,
|
command: Command,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a parquet file by its object store uuid
|
/// Get a Parquet file by its object store uuid
|
||||||
#[derive(Debug, clap::Parser)]
|
#[derive(Debug, clap::Parser)]
|
||||||
struct Get {
|
struct Get {
|
||||||
/// The object store uuid of the parquet file
|
/// The object store uuid of the Parquet file
|
||||||
#[clap(action)]
|
#[clap(action)]
|
||||||
uuid: String,
|
uuid: String,
|
||||||
|
|
||||||
|
@ -38,10 +40,24 @@ struct Get {
|
||||||
file_name: String,
|
file_name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// All possible subcommands for partition
|
/// Get all the Parquet files for a particular database's table
|
||||||
|
#[derive(Debug, clap::Parser)]
|
||||||
|
struct GetTable {
|
||||||
|
/// The database (namespace) to get the Parquet files for
|
||||||
|
#[clap(action)]
|
||||||
|
database: String,
|
||||||
|
|
||||||
|
/// The table to get the Parquet files for
|
||||||
|
#[clap(action)]
|
||||||
|
table: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// All possible subcommands for store
|
||||||
#[derive(Debug, clap::Parser)]
|
#[derive(Debug, clap::Parser)]
|
||||||
enum Command {
|
enum Command {
|
||||||
Get(Get),
|
Get(Get),
|
||||||
|
|
||||||
|
GetTable(GetTable),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn command(connection: Connection, config: Config) -> Result<(), Error> {
|
pub async fn command(connection: Connection, config: Config) -> Result<(), Error> {
|
||||||
|
@ -59,5 +75,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<(), Error
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Command::GetTable(get_table) => {
|
||||||
|
fs::create_dir_all(&get_table.table).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,50 @@ use predicates::prelude::*;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState};
|
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState};
|
||||||
|
|
||||||
|
/// Get all Parquet files for a table, using the command `remote store get-table`
|
||||||
|
#[tokio::test]
|
||||||
|
async fn remote_store_get_table() {
|
||||||
|
test_helpers::maybe_start_logging();
|
||||||
|
let database_url = maybe_skip_integration!();
|
||||||
|
let table_name = "my_awesome_table";
|
||||||
|
|
||||||
|
let mut cluster = MiniCluster::create_shared(database_url).await;
|
||||||
|
|
||||||
|
StepTest::new(
|
||||||
|
&mut cluster,
|
||||||
|
vec![
|
||||||
|
Step::WriteLineProtocol(format!("{table_name},tag1=A,tag2=B val=42i 123456")),
|
||||||
|
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||||
|
async move {
|
||||||
|
let router_addr = state.cluster().router().router_grpc_base().to_string();
|
||||||
|
let namespace = state.cluster().namespace().to_string();
|
||||||
|
|
||||||
|
// Ensure files are actually written to the filesystem
|
||||||
|
let dir = tempfile::tempdir().expect("could not get temporary directory");
|
||||||
|
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.current_dir(&dir)
|
||||||
|
.arg("-h")
|
||||||
|
.arg(&router_addr)
|
||||||
|
.arg("remote")
|
||||||
|
.arg("store")
|
||||||
|
.arg("get-table")
|
||||||
|
.arg(&namespace)
|
||||||
|
.arg("my_awesome_table")
|
||||||
|
.assert()
|
||||||
|
.success();
|
||||||
|
|
||||||
|
assert!(dir.as_ref().join(&table_name).is_dir());
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
})),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
/// remote partition command and getting a parquet file from the object store and pulling the
|
/// remote partition command and getting a parquet file from the object store and pulling the
|
||||||
/// files, using these commands:
|
/// files, using these commands:
|
||||||
///
|
///
|
||||||
|
|
Loading…
Reference in New Issue