feat: MVP of remote store get-table command

pull/24376/head
Carol (Nichols || Goulding) 2022-10-24 17:44:00 -04:00
parent 8697ef4967
commit de2ae6f557
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
5 changed files with 140 additions and 5 deletions

View File

@ -10,6 +10,9 @@ service CatalogService {
// Get the partition catalog records by the table id
rpc GetPartitionsByTableId(GetPartitionsByTableIdRequest) returns (GetPartitionsByTableIdResponse);
// Get the parquet_file catalog records in the given database and table name
rpc GetParquetFilesByDatabaseTable(GetParquetFilesByDatabaseTableRequest) returns (GetParquetFilesByDatabaseTableResponse);
}
message GetParquetFilesByPartitionIdRequest {
@ -46,4 +49,17 @@ message GetPartitionsByTableIdRequest {
message GetPartitionsByTableIdResponse {
repeated Partition partitions = 1;
}
}
message GetParquetFilesByDatabaseTableRequest {
// the database name
string database_name = 1;
// the table name in the database
string table_name = 2;
}
message GetParquetFilesByDatabaseTableResponse {
// the parquet_file records in the table in the database
repeated ParquetFile parquet_files = 1;
}

View File

@ -1,7 +1,7 @@
//! This module implements the `remote store` CLI subcommand
use futures::StreamExt;
use influxdb_iox_client::{connection::Connection, store};
use influxdb_iox_client::{catalog, connection::Connection, store};
use thiserror::Error;
use tokio::{
fs::{self, File},
@ -76,7 +76,40 @@ pub async fn command(connection: Connection, config: Config) -> Result<(), Error
Ok(())
}
Command::GetTable(get_table) => {
fs::create_dir_all(&get_table.table).await?;
let directory = std::path::Path::new(&get_table.table);
fs::create_dir_all(&directory).await?;
let mut catalog_client = catalog::Client::new(connection.clone());
let mut store_client = store::Client::new(connection);
let parquet_files = catalog_client
.get_parquet_files_by_database_table(
get_table.database.clone(),
get_table.table.clone(),
)
.await?;
let num_parquet_files = parquet_files.len();
println!("found {num_parquet_files} Parquet files, downloading...");
let indexed_object_store_ids = parquet_files
.into_iter()
.map(|pf| pf.object_store_id)
.enumerate();
for (index, uuid) in indexed_object_store_ids {
let index = index + 1;
let filename = format!("{uuid}.parquet");
println!("downloading file {index} of {num_parquet_files} ({filename})...");
let mut response = store_client
.get_parquet_file_by_object_store_id(uuid.clone())
.await?;
let mut file = File::create(directory.join(&filename)).await?;
while let Some(res) = response.next().await {
let res = res.unwrap();
file.write_all(&res.data).await?;
}
}
println!("Done.");
Ok(())
}
}

View File

@ -19,7 +19,15 @@ async fn remote_store_get_table() {
StepTest::new(
&mut cluster,
vec![
// Persist some data
Step::WriteLineProtocol(format!("{table_name},tag1=A,tag2=B val=42i 123456")),
Step::WaitForPersisted,
// Persist some more data for the same table in a 2nd Parquet file
Step::WriteLineProtocol(format!("{table_name},tag1=C,tag2=B val=9000i 789000")),
Step::WaitForPersisted,
// Persist some more data for a different table
Step::WriteLineProtocol(format!("{table_name}_2,tag1=A,tag2=B val=42i 123456")),
Step::WaitForPersisted,
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let router_addr = state.cluster().router().router_grpc_base().to_string();
@ -41,7 +49,22 @@ async fn remote_store_get_table() {
.assert()
.success();
assert!(dir.as_ref().join(&table_name).is_dir());
let table_dir = dir.as_ref().join(&table_name);
assert!(table_dir.is_dir());
let entries: Vec<_> = table_dir.read_dir().unwrap().flatten().collect();
assert_eq!(
entries.len(),
2,
"Expected 2 files in the directory, got: {entries:?}"
);
let path = entries[0].path();
let extension = path.extension().unwrap();
assert_eq!(
"parquet",
extension,
"Expected filename to have extension 'parquet', got: {}",
extension.to_str().unwrap()
);
}
.boxed()
})),

View File

@ -24,7 +24,7 @@ impl Client {
}
}
/// Get the parquet file records by their partition id
/// Get the Parquet file records by their partition id
pub async fn get_parquet_files_by_partition_id(
&mut self,
partition_id: i64,
@ -49,4 +49,21 @@ impl Client {
Ok(response.into_inner().partitions)
}
/// Get the Parquet file records by their database and table names
pub async fn get_parquet_files_by_database_table(
&mut self,
database_name: String,
table_name: String,
) -> Result<Vec<ParquetFile>, Error> {
let response = self
.inner
.get_parquet_files_by_database_table(GetParquetFilesByDatabaseTableRequest {
database_name,
table_name,
})
.await?;
Ok(response.into_inner().parquet_files)
}
}

View File

@ -80,6 +80,52 @@ impl catalog_service_server::CatalogService for CatalogService {
Ok(Response::new(response))
}
async fn get_parquet_files_by_database_table(
&self,
request: Request<GetParquetFilesByDatabaseTableRequest>,
) -> Result<Response<GetParquetFilesByDatabaseTableResponse>, Status> {
let mut repos = self.catalog.repositories().await;
let req = request.into_inner();
let namespace = repos
.namespaces()
.get_by_name(&req.database_name)
.await
.map_err(|e| Status::unknown(e.to_string()))?
.ok_or_else(|| {
Status::not_found(format!("Database {} not found", req.database_name))
})?;
let table = repos
.tables()
.get_by_namespace_and_name(namespace.id, &req.table_name)
.await
.map_err(|e| Status::unknown(e.to_string()))?
.ok_or_else(|| Status::not_found(format!("Table {} not found", req.table_name)))?;
let table_id = table.id;
let parquet_files = repos
.parquet_files()
.list_by_table_not_to_delete(table_id)
.await
.map_err(|e| {
warn!(
error=%e,
%req.database_name,
%req.table_name,
"failed to get parquet_files for table"
);
Status::not_found(e.to_string())
})?;
let parquet_files: Vec<_> = parquet_files.into_iter().map(to_parquet_file).collect();
let response = GetParquetFilesByDatabaseTableResponse { parquet_files };
Ok(Response::new(response))
}
}
// converts the catalog ParquetFile to protobuf