feat: Last 2 bonus features of remote store get-table (#5991)

* feat: Only get files that aren't already on disk with the reported size

* feat: Stream Parquet file bytes to file on disk

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Carol (Nichols || Goulding) 2022-10-28 07:03:08 -04:00 committed by GitHub
parent 16473f66e7
commit 69a2e6b871
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 76 additions and 16 deletions

2
Cargo.lock generated
View File

@ -5278,6 +5278,7 @@ checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
dependencies = [
"bytes",
"futures-core",
"futures-io",
"futures-sink",
"pin-project-lite",
"tokio",
@ -6007,6 +6008,7 @@ dependencies = [
"flatbuffers",
"futures-channel",
"futures-core",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",

View File

@ -65,7 +65,7 @@ thiserror = "1.0.37"
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
tokio = { version = "1.21", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7.4" }
tokio-util = { version = "0.7.4", features = ["compat"] }
tonic = "0.8"
uuid = { version = "1", features = ["v4"] }
# jemalloc-sys with unprefixed_malloc_on_supported_platforms feature and heappy are mutually exclusive

View File

@ -1,13 +1,15 @@
//! This module implements the `remote store` CLI subcommand
use futures::StreamExt;
use futures_util::TryStreamExt;
use influxdb_iox_client::{catalog, connection::Connection, store};
use std::path::PathBuf;
use thiserror::Error;
use tokio::{
fs::{self, File},
io::AsyncWriteExt,
io::{self, AsyncWriteExt},
};
use tokio_util::compat::FuturesAsyncReadCompatExt;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
@ -97,23 +99,36 @@ pub async fn command(connection: Connection, config: Config) -> Result<(), Error
.await?;
let num_parquet_files = parquet_files.len();
println!("found {num_parquet_files} Parquet files, downloading...");
let indexed_parquet_file_metadata = parquet_files
.into_iter()
.map(|pf| (pf.object_store_id, pf.partition_id))
.enumerate();
let indexed_parquet_file_metadata = parquet_files.into_iter().enumerate();
for (index, parquet_file) in indexed_parquet_file_metadata {
let uuid = parquet_file.object_store_id;
let partition_id = parquet_file.partition_id;
let file_size_bytes = parquet_file.file_size_bytes as u64;
for (index, (uuid, partition_id)) in indexed_parquet_file_metadata {
let index = index + 1;
let filename = format!("{uuid}.{partition_id}.parquet");
println!("downloading file {index} of {num_parquet_files} ({filename})...");
let mut response = store_client
.get_parquet_file_by_object_store_id(uuid.clone())
.await?;
let mut file = File::create(directory.join(&filename)).await?;
while let Some(res) = response.next().await {
let res = res.unwrap();
let file_path = directory.join(&filename);
file.write_all(&res.data).await?;
if fs::metadata(&file_path)
.await
.map_or(false, |metadata| metadata.len() == file_size_bytes)
{
println!(
"skipping file {index} of {num_parquet_files} ({filename} already exists)"
);
} else {
println!("downloading file {index} of {num_parquet_files} ({filename})...");
let mut response = store_client
.get_parquet_file_by_object_store_id(uuid.clone())
.await?
.map_ok(|res| res.data)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.into_async_read()
.compat();
let mut file = File::create(file_path).await?;
io::copy(&mut response, &mut file).await?;
}
}
println!("Done.");

View File

@ -6,6 +6,7 @@ use futures::FutureExt;
use predicates::prelude::*;
use tempfile::tempdir;
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState};
use tokio::fs;
/// Get all Parquet files for a table, using the command `remote store get-table`
#[tokio::test]
@ -133,6 +134,46 @@ async fn remote_store_get_table() {
.stderr(predicate::str::contains(
"Namespace nacho-namespace not found",
));
// Running the same command again shouldn't download any new files
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&router_addr)
.arg("remote")
.arg("store")
.arg("get-table")
.arg("-o")
.arg(&custom_output_dir)
.arg(&namespace)
.arg(&other_table_name)
.assert()
.success()
.stdout(predicate::str::contains(format!(
"skipping file 1 of 1 ({} already exists)",
entries[0].path().file_name().unwrap().to_str().unwrap()
)));
// If the file sizes don't match, re-download that file
fs::write(entries[0].path(), b"not parquet").await.unwrap();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&router_addr)
.arg("remote")
.arg("store")
.arg("get-table")
.arg("-o")
.arg(&custom_output_dir)
.arg(&namespace)
.arg(&other_table_name)
.assert()
.success()
.stdout(predicate::str::contains(format!(
"downloading file 1 of 1 ({})...",
entries[0].path().file_name().unwrap().to_str().unwrap()
)));
}
.boxed()
})),

View File

@ -32,6 +32,7 @@ fixedbitset = { version = "0.4", features = ["std"] }
flatbuffers = { version = "2", features = ["thiserror"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
futures-core = { version = "0.3", features = ["alloc", "std"] }
futures-io = { version = "0.3", features = ["std"] }
futures-sink = { version = "0.3", features = ["alloc", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
@ -68,7 +69,7 @@ sqlx-core = { version = "0.6", default-features = false, features = ["_rt-tokio"
thrift = { version = "0.16", features = ["log", "server", "threadpool"] }
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "parking_lot", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros", "tracing"] }
tokio-stream = { version = "0.1", features = ["fs", "net", "time"] }
tokio-util = { version = "0.7", features = ["codec", "io", "tracing"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "futures-io", "io", "tracing"] }
tonic = { version = "0.8", features = ["async-trait", "axum", "channel", "codegen", "h2", "hyper", "hyper-timeout", "prost", "prost-derive", "prost1", "tokio", "tower", "tracing-futures", "transport"] }
tower = { version = "0.4", features = ["__common", "balance", "buffer", "discover", "futures-core", "futures-util", "indexmap", "limit", "load", "log", "make", "pin-project", "pin-project-lite", "rand", "ready-cache", "slab", "timeout", "tokio", "tokio-util", "tracing", "util"] }
tower-http = { version = "0.3", features = ["catch-panic", "map-response-body", "tower", "tracing", "util"] }
@ -96,6 +97,7 @@ either = { version = "1", features = ["use_std"] }
fixedbitset = { version = "0.4", features = ["std"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
futures-core = { version = "0.3", features = ["alloc", "std"] }
futures-io = { version = "0.3", features = ["std"] }
futures-sink = { version = "0.3", features = ["alloc", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }