diff --git a/generated_types/protos/influxdata/iox/catalog/v1/service.proto b/generated_types/protos/influxdata/iox/catalog/v1/service.proto index 7adfa33c7d..0957aaa35e 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/service.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/service.proto @@ -10,6 +10,9 @@ service CatalogService { // Get the partition catalog records by the table id rpc GetPartitionsByTableId(GetPartitionsByTableIdRequest) returns (GetPartitionsByTableIdResponse); + + // Get the parquet_file catalog records in the given database and table name + rpc GetParquetFilesByDatabaseTable(GetParquetFilesByDatabaseTableRequest) returns (GetParquetFilesByDatabaseTableResponse); } message GetParquetFilesByPartitionIdRequest { @@ -46,4 +49,17 @@ message GetPartitionsByTableIdRequest { message GetPartitionsByTableIdResponse { repeated Partition partitions = 1; -} \ No newline at end of file +} + +message GetParquetFilesByDatabaseTableRequest { + // the database name + string database_name = 1; + + // the table name in the database + string table_name = 2; +} + +message GetParquetFilesByDatabaseTableResponse { + // the parquet_file records in the table in the database + repeated ParquetFile parquet_files = 1; +} diff --git a/influxdb_iox/src/commands/remote/store.rs b/influxdb_iox/src/commands/remote/store.rs index cc79cb3ba2..82253afdc1 100644 --- a/influxdb_iox/src/commands/remote/store.rs +++ b/influxdb_iox/src/commands/remote/store.rs @@ -1,7 +1,7 @@ //! This module implements the `remote store` CLI subcommand use futures::StreamExt; -use influxdb_iox_client::{connection::Connection, store}; +use influxdb_iox_client::{catalog, connection::Connection, store}; use thiserror::Error; use tokio::{ fs::{self, File}, @@ -76,7 +76,40 @@ pub async fn command(connection: Connection, config: Config) -> Result<(), Error Ok(()) } Command::GetTable(get_table) => { - fs::create_dir_all(&get_table.table).await?; + let directory = std::path::Path::new(&get_table.table); + fs::create_dir_all(&directory).await?; + let mut catalog_client = catalog::Client::new(connection.clone()); + let mut store_client = store::Client::new(connection); + + let parquet_files = catalog_client + .get_parquet_files_by_database_table( + get_table.database.clone(), + get_table.table.clone(), + ) + .await?; + let num_parquet_files = parquet_files.len(); + println!("found {num_parquet_files} Parquet files, downloading..."); + let indexed_object_store_ids = parquet_files + .into_iter() + .map(|pf| pf.object_store_id) + .enumerate(); + + for (index, uuid) in indexed_object_store_ids { + let index = index + 1; + let filename = format!("{uuid}.parquet"); + println!("downloading file {index} of {num_parquet_files} ({filename})..."); + let mut response = store_client + .get_parquet_file_by_object_store_id(uuid.clone()) + .await?; + let mut file = File::create(directory.join(&filename)).await?; + while let Some(res) = response.next().await { + let res = res.unwrap(); + + file.write_all(&res.data).await?; + } + } + println!("Done."); + Ok(()) } } diff --git a/influxdb_iox/tests/end_to_end_cases/remote.rs b/influxdb_iox/tests/end_to_end_cases/remote.rs index 2f4a32ca01..c3d329954b 100644 --- a/influxdb_iox/tests/end_to_end_cases/remote.rs +++ b/influxdb_iox/tests/end_to_end_cases/remote.rs @@ -19,7 +19,15 @@ async fn remote_store_get_table() { StepTest::new( &mut cluster, vec![ + // Persist some data Step::WriteLineProtocol(format!("{table_name},tag1=A,tag2=B val=42i 123456")), + Step::WaitForPersisted, + // Persist some more data for the same table in a 2nd Parquet file + Step::WriteLineProtocol(format!("{table_name},tag1=C,tag2=B val=9000i 789000")), + Step::WaitForPersisted, + // Persist some more data for a different table + Step::WriteLineProtocol(format!("{table_name}_2,tag1=A,tag2=B val=42i 123456")), + Step::WaitForPersisted, Step::Custom(Box::new(move |state: &mut StepTestState| { async move { let router_addr = state.cluster().router().router_grpc_base().to_string(); @@ -41,7 +49,22 @@ async fn remote_store_get_table() { .assert() .success(); - assert!(dir.as_ref().join(&table_name).is_dir()); + let table_dir = dir.as_ref().join(&table_name); + assert!(table_dir.is_dir()); + let entries: Vec<_> = table_dir.read_dir().unwrap().flatten().collect(); + assert_eq!( + entries.len(), + 2, + "Expected 2 files in the directory, got: {entries:?}" + ); + let path = entries[0].path(); + let extension = path.extension().unwrap(); + assert_eq!( + "parquet", + extension, + "Expected filename to have extension 'parquet', got: {}", + extension.to_str().unwrap() + ); } .boxed() })), diff --git a/influxdb_iox_client/src/client/catalog.rs b/influxdb_iox_client/src/client/catalog.rs index 39c331d6ec..ddb556cf8f 100644 --- a/influxdb_iox_client/src/client/catalog.rs +++ b/influxdb_iox_client/src/client/catalog.rs @@ -24,7 +24,7 @@ impl Client { } } - /// Get the parquet file records by their partition id + /// Get the Parquet file records by their partition id pub async fn get_parquet_files_by_partition_id( &mut self, partition_id: i64, @@ -49,4 +49,21 @@ impl Client { Ok(response.into_inner().partitions) } + + /// Get the Parquet file records by their database and table names + pub async fn get_parquet_files_by_database_table( + &mut self, + database_name: String, + table_name: String, + ) -> Result, Error> { + let response = self + .inner + .get_parquet_files_by_database_table(GetParquetFilesByDatabaseTableRequest { + database_name, + table_name, + }) + .await?; + + Ok(response.into_inner().parquet_files) + } } diff --git a/service_grpc_catalog/src/lib.rs b/service_grpc_catalog/src/lib.rs index 66bedf8892..f329eeda46 100644 --- a/service_grpc_catalog/src/lib.rs +++ b/service_grpc_catalog/src/lib.rs @@ -80,6 +80,52 @@ impl catalog_service_server::CatalogService for CatalogService { Ok(Response::new(response)) } + + async fn get_parquet_files_by_database_table( + &self, + request: Request, + ) -> Result, Status> { + let mut repos = self.catalog.repositories().await; + let req = request.into_inner(); + + let namespace = repos + .namespaces() + .get_by_name(&req.database_name) + .await + .map_err(|e| Status::unknown(e.to_string()))? + .ok_or_else(|| { + Status::not_found(format!("Database {} not found", req.database_name)) + })?; + + let table = repos + .tables() + .get_by_namespace_and_name(namespace.id, &req.table_name) + .await + .map_err(|e| Status::unknown(e.to_string()))? + .ok_or_else(|| Status::not_found(format!("Table {} not found", req.table_name)))?; + + let table_id = table.id; + + let parquet_files = repos + .parquet_files() + .list_by_table_not_to_delete(table_id) + .await + .map_err(|e| { + warn!( + error=%e, + %req.database_name, + %req.table_name, + "failed to get parquet_files for table" + ); + Status::not_found(e.to_string()) + })?; + + let parquet_files: Vec<_> = parquet_files.into_iter().map(to_parquet_file).collect(); + + let response = GetParquetFilesByDatabaseTableResponse { parquet_files }; + + Ok(Response::new(response)) + } } // converts the catalog ParquetFile to protobuf