Merge branch 'main' into dom/consistent-sort-key

pull/24376/head
kodiakhq[bot] 2022-05-31 16:15:08 +00:00 committed by GitHub
commit 2d08478e1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 99 additions and 190 deletions

103
Cargo.lock generated
View File

@ -102,7 +102,7 @@ checksum = "0612b6a634de6c3f5e63fdaa6932f7bc598f92de0462ac6e69b0aebd77e093aa"
dependencies = [
"bitflags",
"chrono",
"comfy-table",
"comfy-table 5.0.1",
"csv",
"flatbuffers",
"half",
@ -145,7 +145,7 @@ dependencies = [
"arrow",
"arrow-flight",
"chrono",
"comfy-table",
"comfy-table 6.0.0",
"datafusion 0.1.0",
"hashbrown 0.12.1",
"num-traits",
@ -599,7 +599,7 @@ dependencies = [
"iox_time",
"metric",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"proptest",
"rand",
"tokio",
@ -819,8 +819,19 @@ version = "5.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b103d85ca6e209388771bfb7aa6b68a7aeec4afbf6f0a0264bfbf50360e5212e"
dependencies = [
"strum",
"strum_macros",
"strum 0.23.0",
"strum_macros 0.23.1",
"unicode-width",
]
[[package]]
name = "comfy-table"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121d8a5b0346092c18a4b2fd6f620d7a06f0eb7ac0a45860939a0884bc579c56"
dependencies = [
"strum 0.24.0",
"strum_macros 0.24.0",
"unicode-width",
]
@ -1179,7 +1190,7 @@ dependencies = [
"log",
"num_cpus",
"ordered-float 3.0.0",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"parquet",
"paste",
"pin-project-lite",
@ -1212,7 +1223,7 @@ dependencies = [
"chrono",
"futures",
"glob",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"tempfile",
"tokio",
]
@ -1491,7 +1502,7 @@ dependencies = [
"futures",
"libc",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"pin-project",
"tokio",
"tokio-util 0.7.2",
@ -2103,7 +2114,7 @@ dependencies = [
"futures",
"mockito",
"once_cell",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"reqwest",
"serde",
"serde_json",
@ -2279,7 +2290,7 @@ dependencies = [
"mutable_batch_lp",
"object_store",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"parquet_file",
"paste",
"pin-project",
@ -2398,7 +2409,7 @@ dependencies = [
"hashbrown 0.12.1",
"itertools",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"predicate",
"query_functions",
"schema",
@ -2436,7 +2447,7 @@ name = "iox_time"
version = "0.1.0"
dependencies = [
"chrono",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"workspace-hack",
]
@ -2462,7 +2473,7 @@ dependencies = [
"metric_exporters",
"mutable_batch_lp",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"pprof",
"predicate",
"reqwest",
@ -2780,7 +2791,7 @@ version = "0.1.0"
dependencies = [
"observability_deps",
"once_cell",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"regex",
"tracing-subscriber",
"workspace-hack",
@ -2887,7 +2898,7 @@ dependencies = [
name = "metric"
version = "0.1.0"
dependencies = [
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"workspace-hack",
]
@ -3403,9 +3414,9 @@ dependencies = [
[[package]]
name = "parking_lot"
version = "0.12.0"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core 0.9.3",
@ -3486,17 +3497,14 @@ dependencies = [
"metric",
"object_store",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"parquet",
"parquet-format",
"pbjson-types",
"pin-project",
"predicate",
"prost",
"rayon",
"schema",
"snafu",
"tempfile",
"thiserror",
"thrift",
"tokio",
@ -3704,7 +3712,7 @@ dependencies = [
"log",
"nix 0.24.1",
"once_cell",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"prost",
"prost-build",
"prost-derive",
@ -3836,7 +3844,7 @@ dependencies = [
"fnv",
"lazy_static",
"memchr",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"thiserror",
]
@ -3946,7 +3954,7 @@ dependencies = [
"mutable_batch_lp",
"object_store",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"parquet_file",
"pin-project",
"predicate",
@ -4120,9 +4128,9 @@ dependencies = [
[[package]]
name = "rayon"
version = "1.5.2"
version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd249e82c21598a9a426a4e00dd7adc1d640b22445ec8545feef801d1a74c221"
checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d"
dependencies = [
"autocfg",
"crossbeam-deque",
@ -4158,7 +4166,7 @@ dependencies = [
"metric",
"observability_deps",
"packers",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"permutation",
"proptest",
"rand",
@ -4310,7 +4318,7 @@ dependencies = [
"mutable_batch_pb",
"object_store",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"paste",
"predicate",
"pretty_assertions",
@ -4344,7 +4352,7 @@ dependencies = [
"crc32c",
"futures",
"integer-encoding 3.0.3",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"pin-project-lite",
"rand",
"snap",
@ -4806,7 +4814,7 @@ dependencies = [
"metric",
"observability_deps",
"panic_logging",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"predicate",
"prost",
"query_functions",
@ -5201,6 +5209,12 @@ version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cae14b91c7d11c9a851d3fbc80a963198998c2a64eec840477fa92d8ce9b70bb"
[[package]]
name = "strum"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e96acfc1b70604b8b2f1ffa4c57e59176c7dbb05d556c71ecd2f5498a1dee7f8"
[[package]]
name = "strum_macros"
version = "0.23.1"
@ -5214,6 +5228,19 @@ dependencies = [
"syn",
]
[[package]]
name = "strum_macros"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6878079b17446e4d3eba6192bb0a2950d5b14f0ed8424b852310e5a94345d0ef"
dependencies = [
"heck 0.4.0",
"proc-macro2",
"quote",
"rustversion",
"syn",
]
[[package]]
name = "subtle"
version = "2.4.1"
@ -5302,7 +5329,7 @@ dependencies = [
"async-trait",
"dotenv",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"tempfile",
"tokio",
"tracing-log",
@ -5327,7 +5354,7 @@ dependencies = [
"nix 0.24.1",
"observability_deps",
"once_cell",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"prost",
"rand",
"reqwest",
@ -5485,7 +5512,7 @@ dependencies = [
"mio",
"num_cpus",
"once_cell",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@ -5718,7 +5745,7 @@ version = "0.1.0"
dependencies = [
"chrono",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"rand",
"workspace-hack",
]
@ -5750,7 +5777,7 @@ dependencies = [
"itertools",
"metric",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"pin-project",
"snafu",
"tower",
@ -5832,7 +5859,7 @@ dependencies = [
"ansi_term",
"lazy_static",
"matchers",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"regex",
"serde",
"serde_json",
@ -5855,7 +5882,7 @@ dependencies = [
"lock_api",
"metric",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"pin-project",
"tokio",
"tokio-util 0.7.2",
@ -6383,7 +6410,7 @@ dependencies = [
"mutable_batch_lp",
"mutable_batch_pb",
"observability_deps",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"pin-project",
"prost",
"rskafka",

View File

@ -10,7 +10,7 @@ ahash = { version = "0.7.5", default-features = false }
arrow = { version = "14.0.0", features = ["prettyprint"] }
# used by arrow anyway (needed for printing workaround)
chrono = { version = "0.4", default-features = false }
comfy-table = { version = "5.0", default-features = false }
comfy-table = { version = "6.0", default-features = false }
datafusion = { path = "../datafusion" }
hashbrown = "0.12"
num-traits = "0.2"

View File

@ -21,13 +21,10 @@ parking_lot = "0.12"
parquet = {version = "14.0.0", features = ["experimental"]}
parquet-format = "4.0"
pbjson-types = "0.3"
pin-project = "1.0"
predicate = { path = "../predicate" }
prost = "0.10"
rayon = "1.5"
schema = { path = "../schema" }
snafu = "0.7"
tempfile = "3.1.0"
thrift = "0.13"
tokio = { version = "1.18", features = ["macros", "parking_lot", "rt", "rt-multi-thread", "sync"] }
uuid = { version = "0.8", features = ["v4"] }

View File

@ -19,26 +19,13 @@ use object_store::{DynObjectStore, GetResult};
use observability_deps::tracing::*;
use parquet::{
arrow::{ArrowReader, ParquetFileArrowReader},
file::reader::SerializedFileReader,
file::{reader::SerializedFileReader, serialized_reader::SliceableCursor},
};
use pin_project::{pin_project, pinned_drop};
use predicate::Predicate;
use rayon::{ThreadPool, ThreadPoolBuilder};
use schema::selection::Selection;
use std::{
future::Future,
io::SeekFrom,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use std::{sync::Arc, time::Duration};
use thiserror::Error;
use tokio::{
fs::File,
io::{AsyncSeekExt, AsyncWriteExt},
};
use tokio::io::AsyncReadExt;
/// Errors returned during a Parquet "put" operation, covering [`RecordBatch`]
/// pull from the provided stream, encoding, and finally uploading the bytes to
@ -62,11 +49,6 @@ pub enum UploadError {
/// Errors during Parquet file download & scan.
#[derive(Debug, Error)]
pub enum ReadError {
/// Failed to create the temporary Parquet file on disk to which the
/// downloaded parquet bytes will be spilled.
#[error("failed to create temporary file: {0}")]
TempFile(std::io::Error),
/// Error writing the bytes fetched from object store to the temporary
/// parquet file on disk.
#[error("i/o error writing downloaded parquet: {0}")]
@ -79,10 +61,6 @@ pub enum ReadError {
/// An error reading the downloaded Parquet file.
#[error("invalid parquet file: {0}")]
Parquet(#[from] parquet::errors::ParquetError),
/// Cannot poll arrow blocking wrapper
#[error("cannot poll arrow blocking wrapper: {0}")]
Poll(#[from] tokio::sync::oneshot::error::RecvError),
}
/// The [`ParquetStorage`] type encapsulates [`RecordBatch`] persistence to an
@ -97,25 +75,15 @@ pub enum ReadError {
/// [`ObjectStore`]: object_store::ObjectStore
#[derive(Debug, Clone)]
pub struct ParquetStorage {
/// Underlying object store.
object_store: Arc<DynObjectStore>,
parquet_io_threadpool: Arc<ThreadPool>,
}
impl ParquetStorage {
/// Initialise a new [`ParquetStorage`] using `object_store` as the
/// persistence layer.
pub fn new(object_store: Arc<DynObjectStore>) -> Self {
let parquet_io_threadpool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(8)
.thread_name(|i| format!("parquet IO {i}"))
.build()
.expect("cannot build parquet IO threadpool"),
);
Self {
object_store,
parquet_io_threadpool,
}
Self { object_store }
}
/// Push `batches`, a stream of [`RecordBatch`] instances, to object
@ -217,11 +185,9 @@ impl ParquetStorage {
// `download_and_scan_parquet` is sent back to the reader and
// not silently ignored
let object_store = Arc::clone(&self.object_store);
let thread_pool = Arc::clone(&self.parquet_io_threadpool);
let handle = tokio::task::spawn(async move {
let download_result =
download_and_scan_parquet(projection, path, object_store, tx.clone(), thread_pool)
.await;
download_and_scan_parquet(projection, path, object_store, tx.clone()).await;
// If there was an error returned from download_and_scan_parquet send it back to the receiver.
if let Err(e) = download_result {
@ -273,130 +239,49 @@ async fn download_and_scan_parquet(
path: object_store::path::Path,
object_store: Arc<DynObjectStore>,
tx: tokio::sync::mpsc::Sender<ArrowResult<RecordBatch>>,
thread_pool: Arc<ThreadPool>,
) -> Result<(), ReadError> {
let read_stream = object_store.get(&path).await?;
let file = match read_stream {
GetResult::File(f, _) => {
let data = match read_stream {
GetResult::File(mut f, _) => {
trace!(?path, "Using file directly");
f.into_std().await
let l = f.metadata().await?.len();
let mut buf = Vec::with_capacity(l as usize);
f.read_to_end(&mut buf).await?;
buf
}
GetResult::Stream(mut read_stream) => {
// read parquet file to local file
let mut file = File::from_std(tempfile::tempfile().map_err(ReadError::TempFile)?);
GetResult::Stream(read_stream) => {
let chunks: Vec<_> = read_stream.try_collect().await?;
trace!(?path, ?file, "Beginning to read parquet to temp file");
while let Some(bytes) = read_stream.try_next().await? {
trace!(len = bytes.len(), "read bytes from object store");
file.write_all(&bytes).await?;
let mut buf = Vec::with_capacity(chunks.iter().map(|c| c.len()).sum::<usize>());
for c in chunks {
buf.extend(c);
}
file.seek(SeekFrom::Start(0)).await?;
let file = file.into_std().await;
trace!(?path, "Completed read parquet to tempfile");
file
buf
}
};
ParquetBlockingReader::new(file, tx, projection, thread_pool)?.await?;
// Size of each batch
let batch_size = 1024; // Todo: make a constant or policy for this
let cursor = SliceableCursor::new(data);
let file_reader = SerializedFileReader::new(cursor)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let record_batch_reader = arrow_reader.get_record_reader_by_columns(projection, batch_size)?;
for batch in record_batch_reader {
if tx.send(batch).await.is_err() {
debug!("Receiver hung up - exiting");
break;
}
}
debug!(?path, "Completed parquet download & scan");
Ok(())
}
/// Helper to ensure that arrows file->RecordBatch parquet IO is done in a dedicated IO thread.
///
/// This deliberately does NOT use tokio's `spawn_blocking` because this might steal too many threads from our query executor.
#[pin_project(PinnedDrop)]
struct ParquetBlockingReader {
cancelled: Arc<AtomicBool>,
#[pin]
finished: tokio::sync::oneshot::Receiver<Result<(), ReadError>>,
}
impl ParquetBlockingReader {
fn new(
file: std::fs::File,
tx: tokio::sync::mpsc::Sender<ArrowResult<RecordBatch>>,
projection: Vec<usize>,
thread_pool: Arc<ThreadPool>,
) -> Result<Self, ReadError> {
let (tx_finished, finished) = tokio::sync::oneshot::channel();
let cancelled = Arc::new(AtomicBool::new(false));
let cancelled_captured = Arc::clone(&cancelled);
thread_pool.spawn(move || {
// early cancelleation check since this task may have been within the queue for a while
if cancelled_captured.load(Ordering::SeqCst) {
return;
}
let res = Self::inner(file, tx, projection, cancelled_captured);
tx_finished.send(res).ok();
});
Ok(Self {
finished,
cancelled,
})
}
fn inner(
file: std::fs::File,
tx: tokio::sync::mpsc::Sender<ArrowResult<RecordBatch>>,
projection: Vec<usize>,
cancelled: Arc<AtomicBool>,
) -> Result<(), ReadError> {
// Size of each batch
let batch_size = 1024; // Todo: make a constant or policy for this
let file_reader = SerializedFileReader::new(file)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let record_batch_reader =
arrow_reader.get_record_reader_by_columns(projection, batch_size)?;
if cancelled.load(Ordering::SeqCst) {
return Ok(());
}
for batch in record_batch_reader {
if tx.blocking_send(batch).is_err() {
debug!("Receiver hung up - exiting");
break;
}
if cancelled.load(Ordering::SeqCst) {
break;
}
}
Ok(())
}
}
#[pinned_drop]
impl PinnedDrop for ParquetBlockingReader {
fn drop(self: std::pin::Pin<&mut Self>) {
self.cancelled.store(true, Ordering::SeqCst);
}
}
impl Future for ParquetBlockingReader {
type Output = Result<(), ReadError>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.finished.poll(cx)?
}
}
#[cfg(test)]
mod tests {
use super::*;