Merge branch 'main' into dependabot/cargo/hashbrown-0.14.0
commit
be75ba23e0
|
@ -155,8 +155,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
|
|||
|
||||
[[package]]
|
||||
name = "arrow"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4a46441ae78c0c5915f62aa32cad9910647c19241456dd24039646dd96d494a5"
|
||||
dependencies = [
|
||||
"ahash 0.8.3",
|
||||
"arrow-arith",
|
||||
|
@ -176,8 +177,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-arith"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "350c5067470aeeb38dcfcc1f7e9c397098116409c9087e43ca99c231020635d9"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
|
@ -190,8 +192,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-array"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6049e031521c4e7789b7530ea5991112c0a375430094191f3b74bdf37517c9a9"
|
||||
dependencies = [
|
||||
"ahash 0.8.3",
|
||||
"arrow-buffer",
|
||||
|
@ -206,8 +209,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-buffer"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a83450b94b9fe018b65ba268415aaab78757636f68b7f37b6bc1f2a3888af0a0"
|
||||
dependencies = [
|
||||
"half 2.2.1",
|
||||
"num",
|
||||
|
@ -215,8 +219,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-cast"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "249198411254530414805f77e88e1587b0914735ea180f906506905721f7a44a"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
|
@ -231,8 +236,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-csv"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ec9ee134298aa895ef9d791dc9cc557cecd839108843830bd35824fcd8d7f721"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
|
@ -249,8 +255,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-data"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4d48dcbed83d741d4af712af17f6d952972b8f6491b24ee2415243a7e37c6438"
|
||||
dependencies = [
|
||||
"arrow-buffer",
|
||||
"arrow-schema",
|
||||
|
@ -260,18 +267,25 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-flight"
|
||||
version = "40.0.0"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6dff9cf247f68541be625fa32a52f9307beae701d1b3a126d1761c605adcd80c"
|
||||
checksum = "462e7938174bfdf83311dcbcfea634660a6600986c3ad9d0421bbc6cc797648b"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
"arrow-cast",
|
||||
"arrow-data",
|
||||
"arrow-ipc",
|
||||
"arrow-ord",
|
||||
"arrow-row",
|
||||
"arrow-schema",
|
||||
"arrow-select",
|
||||
"arrow-string",
|
||||
"base64 0.21.2",
|
||||
"bytes",
|
||||
"futures",
|
||||
"once_cell",
|
||||
"paste",
|
||||
"prost",
|
||||
"tokio",
|
||||
|
@ -280,8 +294,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-ipc"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ea8d7b138c5414aeef5dd08abacf362f87ed9b1168ea38d60a6f67590c3f7d99"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
|
@ -293,8 +308,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-json"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a3a597fdca885a81f2e7ab0bacaa0bd2dfefb4cd6a2e5a3d1677396a68673101"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
|
@ -312,8 +328,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-ord"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "29be2d5fadaab29e4fa6a7e527ceaa1c2cddc57dc6d86c062f7a05adcd8df71e"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
|
@ -326,8 +343,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-row"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6e0bd6ad24d56679b3317b499b0de61bca16d3142896908cce1aa943e56e981"
|
||||
dependencies = [
|
||||
"ahash 0.8.3",
|
||||
"arrow-array",
|
||||
|
@ -340,13 +358,15 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-schema"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b71d8d68d0bc2e648e4e395896dc518be8b90c5f0f763c59083187c3d46184b"
|
||||
|
||||
[[package]]
|
||||
name = "arrow-select"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "470cb8610bdfda56554a436febd4e457e506f3c42e01e545a1ea7ecf2a4c8823"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
|
@ -357,8 +377,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-string"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "70f8a2e4ff9dbbd51adbabf92098b71e3eb2ef0cfcb75236ca7c3ce087cce038"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
|
@ -1400,8 +1421,8 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "25.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=859251b4a20e00c3dfe73eee6b605fcf722687e7#859251b4a20e00c3dfe73eee6b605fcf722687e7"
|
||||
version = "26.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=91e75d7e6303c1a7331e8e90eaad9b095ace929b#91e75d7e6303c1a7331e8e90eaad9b095ace929b"
|
||||
dependencies = [
|
||||
"ahash 0.8.3",
|
||||
"arrow",
|
||||
|
@ -1449,8 +1470,8 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "datafusion-common"
|
||||
version = "25.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=859251b4a20e00c3dfe73eee6b605fcf722687e7#859251b4a20e00c3dfe73eee6b605fcf722687e7"
|
||||
version = "26.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=91e75d7e6303c1a7331e8e90eaad9b095ace929b#91e75d7e6303c1a7331e8e90eaad9b095ace929b"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
|
@ -1463,8 +1484,8 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "datafusion-execution"
|
||||
version = "25.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=859251b4a20e00c3dfe73eee6b605fcf722687e7#859251b4a20e00c3dfe73eee6b605fcf722687e7"
|
||||
version = "26.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=91e75d7e6303c1a7331e8e90eaad9b095ace929b#91e75d7e6303c1a7331e8e90eaad9b095ace929b"
|
||||
dependencies = [
|
||||
"dashmap",
|
||||
"datafusion-common",
|
||||
|
@ -1480,8 +1501,8 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "datafusion-expr"
|
||||
version = "25.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=859251b4a20e00c3dfe73eee6b605fcf722687e7#859251b4a20e00c3dfe73eee6b605fcf722687e7"
|
||||
version = "26.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=91e75d7e6303c1a7331e8e90eaad9b095ace929b#91e75d7e6303c1a7331e8e90eaad9b095ace929b"
|
||||
dependencies = [
|
||||
"ahash 0.8.3",
|
||||
"arrow",
|
||||
|
@ -1494,8 +1515,8 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "datafusion-optimizer"
|
||||
version = "25.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=859251b4a20e00c3dfe73eee6b605fcf722687e7#859251b4a20e00c3dfe73eee6b605fcf722687e7"
|
||||
version = "26.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=91e75d7e6303c1a7331e8e90eaad9b095ace929b#91e75d7e6303c1a7331e8e90eaad9b095ace929b"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
|
@ -1511,8 +1532,8 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "datafusion-physical-expr"
|
||||
version = "25.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=859251b4a20e00c3dfe73eee6b605fcf722687e7#859251b4a20e00c3dfe73eee6b605fcf722687e7"
|
||||
version = "26.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=91e75d7e6303c1a7331e8e90eaad9b095ace929b#91e75d7e6303c1a7331e8e90eaad9b095ace929b"
|
||||
dependencies = [
|
||||
"ahash 0.8.3",
|
||||
"arrow",
|
||||
|
@ -1543,8 +1564,8 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "datafusion-proto"
|
||||
version = "25.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=859251b4a20e00c3dfe73eee6b605fcf722687e7#859251b4a20e00c3dfe73eee6b605fcf722687e7"
|
||||
version = "26.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=91e75d7e6303c1a7331e8e90eaad9b095ace929b#91e75d7e6303c1a7331e8e90eaad9b095ace929b"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"chrono",
|
||||
|
@ -1557,8 +1578,8 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "datafusion-row"
|
||||
version = "25.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=859251b4a20e00c3dfe73eee6b605fcf722687e7#859251b4a20e00c3dfe73eee6b605fcf722687e7"
|
||||
version = "26.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=91e75d7e6303c1a7331e8e90eaad9b095ace929b#91e75d7e6303c1a7331e8e90eaad9b095ace929b"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
|
@ -1568,8 +1589,8 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "datafusion-sql"
|
||||
version = "25.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=859251b4a20e00c3dfe73eee6b605fcf722687e7#859251b4a20e00c3dfe73eee6b605fcf722687e7"
|
||||
version = "26.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=91e75d7e6303c1a7331e8e90eaad9b095ace929b#91e75d7e6303c1a7331e8e90eaad9b095ace929b"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-schema",
|
||||
|
@ -3840,15 +3861,17 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "object_store"
|
||||
version = "0.5.6"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ec9cd6ca25e796a49fa242876d1c4de36a24a6da5258e9f0bc062dbf5e81c53b"
|
||||
checksum = "27c776db4f332b571958444982ff641d2531417a326ca368995073b639205d58"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.21.2",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"futures",
|
||||
"humantime",
|
||||
"hyper",
|
||||
"itertools",
|
||||
"parking_lot 0.12.1",
|
||||
"percent-encoding",
|
||||
|
@ -3998,8 +4021,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "parquet"
|
||||
version = "40.0.0"
|
||||
source = "git+https://github.com/alamb/arrow-rs?rev=4a59200#4a59200c81e90b838f36f002fb1f3a947665bc8d"
|
||||
version = "41.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6880c32d81884ac4441d9f4b027df8561be23b54f3ac1e62086fa42753dd3faa"
|
||||
dependencies = [
|
||||
"ahash 0.8.3",
|
||||
"arrow-array",
|
||||
|
|
28
Cargo.toml
28
Cargo.toml
|
@ -116,34 +116,18 @@ edition = "2021"
|
|||
license = "MIT OR Apache-2.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
arrow = { version = "40.0.0" }
|
||||
arrow-flight = { version = "40.0.0" }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "859251b4a20e00c3dfe73eee6b605fcf722687e7", default-features = false }
|
||||
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "859251b4a20e00c3dfe73eee6b605fcf722687e7" }
|
||||
arrow = { version = "41.0.0" }
|
||||
arrow-flight = { version = "41.0.0" }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "91e75d7e6303c1a7331e8e90eaad9b095ace929b", default-features = false }
|
||||
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "91e75d7e6303c1a7331e8e90eaad9b095ace929b" }
|
||||
hashbrown = { version = "0.14.0" }
|
||||
object_store = { version = "0.5.6" }
|
||||
parquet = { version = "40.0.0" }
|
||||
object_store = { version = "0.6.0" }
|
||||
parquet = { version = "41.0.0" }
|
||||
tonic = { version = "0.9.2", features = ["tls", "tls-webpki-roots"] }
|
||||
tonic-build = { version = "0.9.2" }
|
||||
tonic-health = { version = "0.9.2" }
|
||||
tonic-reflection = { version = "0.9.2" }
|
||||
|
||||
[patch.crates-io]
|
||||
# Temporary until arrow/parquet 41 is released.
|
||||
# Needed to get https://github.com/apache/arrow-rs/pull/4280
|
||||
# The branch used is: https://github.com/alamb/arrow-rs/pull/37
|
||||
parquet = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200" }
|
||||
arrow = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200" }
|
||||
arrow-array = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200" }
|
||||
arrow-buffer = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200" }
|
||||
arrow-cast = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200" }
|
||||
arrow-data = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200" }
|
||||
arrow-ord = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200" }
|
||||
arrow-ipc = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200" }
|
||||
arrow-schema = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200" }
|
||||
arrow-select = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200" }
|
||||
arrow-string = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200" }
|
||||
|
||||
# This profile optimizes for runtime performance and small binary size at the expense of longer
|
||||
# build times. It's most suitable for final release builds.
|
||||
[profile.release]
|
||||
|
|
|
@ -5,7 +5,8 @@ use async_trait::async_trait;
|
|||
use bytes::Bytes;
|
||||
use futures::stream::BoxStream;
|
||||
use object_store::{
|
||||
path::Path, DynObjectStore, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
|
||||
path::Path, DynObjectStore, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
|
||||
ObjectStore, Result,
|
||||
};
|
||||
use tokio::io::{sink, AsyncWrite};
|
||||
use uuid::Uuid;
|
||||
|
@ -46,8 +47,8 @@ impl ObjectStore for IgnoreWrites {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn get(&self, location: &Path) -> Result<GetResult> {
|
||||
self.inner.get(location).await
|
||||
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
|
||||
self.inner.get_opts(location, options).await
|
||||
}
|
||||
|
||||
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
|
||||
|
|
|
@ -11,8 +11,8 @@ use bytes::Bytes;
|
|||
use futures::stream::BoxStream;
|
||||
use metric::{Registry, U64Gauge};
|
||||
use object_store::{
|
||||
path::Path, DynObjectStore, Error, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
|
||||
Result,
|
||||
path::Path, DynObjectStore, Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
|
||||
ObjectStore, Result,
|
||||
};
|
||||
use tokio::io::AsyncWrite;
|
||||
|
||||
|
@ -77,8 +77,8 @@ impl ObjectStore for MetricsStore {
|
|||
Err(Error::NotImplemented)
|
||||
}
|
||||
|
||||
async fn get(&self, location: &Path) -> Result<GetResult> {
|
||||
self.inner.get(location).await
|
||||
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
|
||||
self.inner.get_opts(location, options).await
|
||||
}
|
||||
|
||||
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
|
||||
|
|
|
@ -649,6 +649,14 @@ impl From<SkippedCompaction> for compactor_proto::SkippedCompaction {
|
|||
}
|
||||
}
|
||||
|
||||
/// Struct to contain row result from a SELECT 1 as exists FROM ... query to support
|
||||
/// testing the existence of a parquet file.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, sqlx::FromRow)]
|
||||
pub struct ParquetFileExists {
|
||||
/// column for result of SELECT 1 (integer)
|
||||
pub exists: i32,
|
||||
}
|
||||
|
||||
/// Data for a parquet file reference that has been inserted in the catalog.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)]
|
||||
pub struct ParquetFile {
|
||||
|
|
|
@ -108,7 +108,7 @@ pub enum FlightSQLCommand {
|
|||
impl Display for FlightSQLCommand {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::CommandStatementQuery(CommandStatementQuery { query }) => {
|
||||
Self::CommandStatementQuery(CommandStatementQuery { query, .. }) => {
|
||||
write!(f, "CommandStatementQuery{query}")
|
||||
}
|
||||
Self::CommandPreparedStatementQuery(h) => write!(f, "CommandPreparedStatementQuery{h}"),
|
||||
|
@ -236,6 +236,7 @@ impl Display for FlightSQLCommand {
|
|||
}
|
||||
Self::ActionCreatePreparedStatementRequest(ActionCreatePreparedStatementRequest {
|
||||
query,
|
||||
..
|
||||
}) => {
|
||||
write!(f, "ActionCreatePreparedStatementRequest{query}")
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ impl FlightSQLPlanner {
|
|||
debug!(%namespace_name, %cmd, "Handling flightsql get_flight_info");
|
||||
|
||||
match cmd {
|
||||
FlightSQLCommand::CommandStatementQuery(CommandStatementQuery { query }) => {
|
||||
FlightSQLCommand::CommandStatementQuery(CommandStatementQuery { query, .. }) => {
|
||||
get_schema_for_query(&query, ctx).await
|
||||
}
|
||||
FlightSQLCommand::CommandPreparedStatementQuery(handle) => {
|
||||
|
@ -110,7 +110,7 @@ impl FlightSQLPlanner {
|
|||
debug!(%namespace_name, %cmd, "Handling flightsql do_get");
|
||||
|
||||
match cmd {
|
||||
FlightSQLCommand::CommandStatementQuery(CommandStatementQuery { query }) => {
|
||||
FlightSQLCommand::CommandStatementQuery(CommandStatementQuery { query, .. }) => {
|
||||
debug!(%query, "Planning FlightSQL query");
|
||||
Ok(ctx.sql_to_physical_plan(&query).await?)
|
||||
}
|
||||
|
@ -271,7 +271,7 @@ impl FlightSQLPlanner {
|
|||
|
||||
match cmd {
|
||||
FlightSQLCommand::ActionCreatePreparedStatementRequest(
|
||||
ActionCreatePreparedStatementRequest { query },
|
||||
ActionCreatePreparedStatementRequest { query, .. },
|
||||
) => {
|
||||
debug!(%query, "Creating prepared statement");
|
||||
|
||||
|
|
|
@ -88,12 +88,12 @@ async fn should_delete(
|
|||
|
||||
if let Some(uuid) = file_name.as_ref().strip_suffix(".parquet") {
|
||||
if let Ok(object_store_id) = uuid.parse() {
|
||||
let parquet_file = parquet_files
|
||||
.get_by_object_store_id(object_store_id)
|
||||
let parquet_file_exists = parquet_files
|
||||
.exists_by_object_store_id(object_store_id)
|
||||
.await
|
||||
.context(GetFileSnafu { object_store_id })?;
|
||||
|
||||
if parquet_file.is_some() {
|
||||
if parquet_file_exists {
|
||||
// We have a reference to this file; do not delete
|
||||
debug!(
|
||||
location = %item.location,
|
||||
|
@ -212,6 +212,7 @@ mod tests {
|
|||
location,
|
||||
last_modified,
|
||||
size: 0,
|
||||
e_tag: None,
|
||||
};
|
||||
|
||||
assert!(!should_delete(&item, cutoff, parquet_files).await.unwrap());
|
||||
|
@ -239,6 +240,7 @@ mod tests {
|
|||
location,
|
||||
last_modified,
|
||||
size: 0,
|
||||
e_tag: None,
|
||||
};
|
||||
|
||||
assert!(!should_delete(&item, cutoff, parquet_files).await.unwrap());
|
||||
|
@ -258,6 +260,7 @@ mod tests {
|
|||
location: Path::from("not-a-uuid.parquet"),
|
||||
last_modified,
|
||||
size: 0,
|
||||
e_tag: None,
|
||||
};
|
||||
|
||||
assert!(!should_delete(&item, cutoff, parquet_files).await.unwrap());
|
||||
|
@ -284,6 +287,7 @@ mod tests {
|
|||
location,
|
||||
last_modified,
|
||||
size: 0,
|
||||
e_tag: None,
|
||||
};
|
||||
|
||||
assert!(!should_delete(&item, cutoff, parquet_files).await.unwrap());
|
||||
|
@ -311,6 +315,7 @@ mod tests {
|
|||
location,
|
||||
last_modified,
|
||||
size: 0,
|
||||
e_tag: None,
|
||||
};
|
||||
|
||||
assert!(should_delete(&item, cutoff, parquet_files).await.unwrap());
|
||||
|
@ -330,6 +335,7 @@ mod tests {
|
|||
location: Path::from("not-a-uuid.parquet"),
|
||||
last_modified,
|
||||
size: 0,
|
||||
e_tag: None,
|
||||
};
|
||||
|
||||
assert!(should_delete(&item, cutoff, parquet_files).await.unwrap());
|
||||
|
|
|
@ -133,6 +133,7 @@ mod tests {
|
|||
location: new_object_meta_location(),
|
||||
last_modified: Utc::now(),
|
||||
size: 0,
|
||||
e_tag: None,
|
||||
};
|
||||
os.put(&object_meta.location, Bytes::from(i.to_string()))
|
||||
.await
|
||||
|
|
|
@ -19,7 +19,7 @@ pub(crate) async fn perform(
|
|||
.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.delete_old_ids_only(older_than)
|
||||
.delete_old_ids_only(older_than) // read/write
|
||||
.await
|
||||
.context(DeletingSnafu)?;
|
||||
info!(delete_count = %deleted.len(), "iox_catalog::delete_old()");
|
||||
|
|
|
@ -17,7 +17,7 @@ pub(crate) async fn perform(
|
|||
.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.flag_for_delete_by_retention()
|
||||
.flag_for_delete_by_retention() //read/write
|
||||
.await
|
||||
.context(FlaggingSnafu)?;
|
||||
info!(flagged_count = %flagged.len(), "iox_catalog::flag_for_delete_by_retention()");
|
||||
|
|
|
@ -1526,6 +1526,7 @@ async fn flightsql_schema_matches() {
|
|||
let cases = vec![
|
||||
CommandStatementQuery {
|
||||
query: format!("select * from {table_name}"),
|
||||
transaction_id: None,
|
||||
}
|
||||
.as_any(),
|
||||
CommandGetSqlInfo { info: vec![] }.as_any(),
|
||||
|
|
|
@ -123,14 +123,13 @@ Error during planning: gap-filling query is missing lower time bound
|
|||
| | GapFillExec: group_expr=[region@0, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1], aggr_expr=[LOCF(AVG(cpu.user)@2)], stride=600000000000, time_range=Included("957528000000000000")..Included("957531540000000000") |
|
||||
| | SortPreservingMergeExec: [region@0 ASC,date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1 ASC] |
|
||||
| | SortExec: expr=[region@0 ASC,date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1 ASC] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[region@0 as region, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1 as date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[AVG(cpu.user)] |
|
||||
| | AggregateExec: mode=FinalPartitioned, gby=[region@0 as region, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1 as date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[AVG(cpu.user)], ordering_mode=PartiallyOrdered |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | RepartitionExec: partitioning=Hash([Column { name: "region", index: 0 }, Column { name: "date_bin_gapfill(IntervalMonthDayNano(\"600000000000\"),cpu.time)", index: 1 }], 4), input_partitions=4 |
|
||||
| | RepartitionExec: partitioning=Hash([Column { name: "region", index: 0 }, Column { name: "date_bin_gapfill(IntervalMonthDayNano(\"600000000000\"),cpu.time)", index: 1 }], 4), input_partitions=1 |
|
||||
| | AggregateExec: mode=Partial, gby=[region@0 as region, date_bin(600000000000, time@1) as date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[AVG(cpu.user)], ordering_mode=PartiallyOrdered |
|
||||
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | FilterExec: time@1 >= 957528000000000000 AND time@1 <= 957531540000000000 |
|
||||
| | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[region, time, user], output_ordering=[region@0 ASC, time@1 ASC], predicate=time@2 >= 957528000000000000 AND time@2 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000 |
|
||||
| | CoalesceBatchesExec: target_batch_size=8192 |
|
||||
| | FilterExec: time@1 >= 957528000000000000 AND time@1 <= 957531540000000000 |
|
||||
| | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[region, time, user], output_ordering=[region@0 ASC, time@1 ASC], predicate=time@2 >= 957528000000000000 AND time@2 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000 |
|
||||
| | |
|
||||
----------
|
||||
-- SQL: SELECT region, date_bin_gapfill(interval '5 minute', time) as minute, locf(min(cpu.user)) from cpu where time between timestamp '2000-05-05T12:15:00Z' and timestamp '2000-05-05T12:59:00Z' group by region, minute;
|
||||
|
|
|
@ -928,8 +928,8 @@ name: logical_plan
|
|||
name: physical_plan
|
||||
plan
|
||||
SortPreservingMergeExec: [iox::measurement@0 ASC NULLS LAST,tag0@2 ASC NULLS LAST,time@1 ASC NULLS LAST]
|
||||
SortExec: expr=[iox::measurement@0 ASC NULLS LAST,tag0@2 ASC NULLS LAST,time@1 ASC NULLS LAST]
|
||||
InterleaveExec
|
||||
UnionExec
|
||||
SortExec: expr=[iox::measurement@0 ASC NULLS LAST,tag0@2 ASC NULLS LAST,time@1 ASC NULLS LAST]
|
||||
ProjectionExec: expr=[m0 as iox::measurement, 0 as time, tag0@0 as tag0, COUNT(m0.f64)@1 as count, SUM(m0.f64)@2 as sum, STDDEV(m0.f64)@3 as stddev]
|
||||
AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[COUNT(m0.f64), SUM(m0.f64), STDDEV(m0.f64)]
|
||||
CoalesceBatchesExec: target_batch_size=8192
|
||||
|
@ -937,11 +937,12 @@ name: physical_plan
|
|||
RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
|
||||
AggregateExec: mode=Partial, gby=[tag0@1 as tag0], aggr=[COUNT(m0.f64), SUM(m0.f64), STDDEV(m0.f64)]
|
||||
ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[f64, tag0]
|
||||
SortExec: expr=[iox::measurement@0 ASC NULLS LAST,tag0@2 ASC NULLS LAST,time@1 ASC NULLS LAST]
|
||||
ProjectionExec: expr=[m1 as iox::measurement, 0 as time, tag0@0 as tag0, COUNT(m1.f64)@1 as count, SUM(m1.f64)@2 as sum, STDDEV(m1.f64)@3 as stddev]
|
||||
AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[COUNT(m1.f64), SUM(m1.f64), STDDEV(m1.f64)]
|
||||
CoalesceBatchesExec: target_batch_size=8192
|
||||
RepartitionExec: partitioning=Hash([Column { name: "tag0", index: 0 }], 4), input_partitions=4
|
||||
RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
|
||||
RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=4
|
||||
AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[COUNT(m1.f64), SUM(m1.f64), STDDEV(m1.f64)], ordering_mode=FullyOrdered
|
||||
CoalesceBatchesExec: target_batch_size=8192
|
||||
RepartitionExec: partitioning=Hash([Column { name: "tag0", index: 0 }], 4), input_partitions=1
|
||||
AggregateExec: mode=Partial, gby=[tag0@1 as tag0], aggr=[COUNT(m1.f64), SUM(m1.f64), STDDEV(m1.f64)], ordering_mode=FullyOrdered
|
||||
ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[f64, tag0], output_ordering=[tag0@1 ASC]
|
||||
-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0, m1 GROUP BY tag0;
|
||||
|
|
|
@ -128,6 +128,7 @@ impl FlightSqlClient {
|
|||
) -> Result<FlightRecordBatchStream> {
|
||||
let msg = CommandStatementQuery {
|
||||
query: query.into(),
|
||||
transaction_id: None,
|
||||
};
|
||||
self.do_get_with_cmd(msg.as_any()).await
|
||||
}
|
||||
|
@ -400,6 +401,7 @@ impl FlightSqlClient {
|
|||
mut endpoint,
|
||||
total_records: _,
|
||||
total_bytes: _,
|
||||
ordered: _,
|
||||
} = self.get_flight_info_for_command(cmd).await?;
|
||||
|
||||
let flight_endpoint = endpoint.pop().ok_or_else(|| {
|
||||
|
@ -446,7 +448,10 @@ impl FlightSqlClient {
|
|||
///
|
||||
/// See [`Self::execute`] to run a previously prepared statement
|
||||
pub async fn prepare(&mut self, query: String) -> Result<PreparedStatement> {
|
||||
let cmd = ActionCreatePreparedStatementRequest { query };
|
||||
let cmd = ActionCreatePreparedStatementRequest {
|
||||
query,
|
||||
transaction_id: None,
|
||||
};
|
||||
|
||||
let request = Action {
|
||||
r#type: "CreatePreparedStatement".into(),
|
||||
|
|
|
@ -3,8 +3,8 @@ use std::pin::Pin;
|
|||
use arrow_flight::{
|
||||
encode::FlightDataEncoderBuilder, error::FlightError,
|
||||
flight_service_server::FlightService as Flight, Action, ActionType, Criteria, Empty,
|
||||
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, IpcMessage,
|
||||
PutResult, SchemaResult, Ticket,
|
||||
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult,
|
||||
SchemaResult, Ticket,
|
||||
};
|
||||
use data_types::{NamespaceId, PartitionId, TableId};
|
||||
use flatbuffers::FlatBufferBuilder;
|
||||
|
@ -282,12 +282,9 @@ fn encode_partition(
|
|||
prost::Message::encode(&app_metadata, &mut bytes)
|
||||
.map_err(|e| FlightError::from_external_error(Box::new(e)))?;
|
||||
|
||||
Ok(FlightData::new(
|
||||
None,
|
||||
IpcMessage(build_none_flight_msg().into()),
|
||||
bytes.to_vec(),
|
||||
vec![],
|
||||
))
|
||||
Ok(FlightData::new()
|
||||
.with_app_metadata(bytes)
|
||||
.with_data_header(build_none_flight_msg()))
|
||||
}
|
||||
|
||||
fn build_none_flight_msg() -> Vec<u8> {
|
||||
|
|
|
@ -483,6 +483,9 @@ pub trait ParquetFileRepo: Send + Sync {
|
|||
object_store_id: Uuid,
|
||||
) -> Result<Option<ParquetFile>>;
|
||||
|
||||
/// Test parquet file exists by object store id
|
||||
async fn exists_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<bool>;
|
||||
|
||||
/// Commit deletions, upgrades and creations in a single transaction.
|
||||
///
|
||||
/// Returns IDs of created files.
|
||||
|
|
|
@ -846,6 +846,15 @@ impl ParquetFileRepo for MemTxn {
|
|||
.cloned())
|
||||
}
|
||||
|
||||
async fn exists_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<bool> {
|
||||
let stage = self.stage();
|
||||
|
||||
Ok(stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.any(|f| f.object_store_id.eq(&object_store_id)))
|
||||
}
|
||||
|
||||
async fn create_upgrade_delete(
|
||||
&mut self,
|
||||
delete: &[ParquetFileId],
|
||||
|
|
|
@ -193,6 +193,7 @@ decorate!(
|
|||
"parquet_delete_old_ids_only" = delete_old_ids_only(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFileId>>;
|
||||
"parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: PartitionId) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
|
||||
"exists_by_object_store_id" = exists_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<bool>;
|
||||
"parquet_create_upgrade_delete" = create_upgrade_delete(&mut self, delete: &[ParquetFileId], upgrade: &[ParquetFileId], create: &[ParquetFileParams], target_level: CompactionLevel) -> Result<Vec<ParquetFileId>>;
|
||||
]
|
||||
);
|
||||
|
|
|
@ -20,8 +20,8 @@ use data_types::{
|
|||
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart,
|
||||
},
|
||||
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, ParquetFile,
|
||||
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction,
|
||||
Table, TableId, Timestamp,
|
||||
ParquetFileExists, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey,
|
||||
SkippedCompaction, Table, TableId, Timestamp,
|
||||
};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use observability_deps::tracing::{debug, info, warn};
|
||||
|
@ -1467,6 +1467,28 @@ WHERE object_store_id = $1;
|
|||
|
||||
Ok(Some(parquet_file))
|
||||
}
|
||||
|
||||
async fn exists_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<bool> {
|
||||
let rec = sqlx::query_as::<_, ParquetFileExists>(
|
||||
r#"
|
||||
SELECT 1 as exists
|
||||
FROM parquet_file
|
||||
WHERE object_store_id = $1;
|
||||
"#,
|
||||
)
|
||||
.bind(object_store_id) // $1
|
||||
.fetch_one(&mut self.inner)
|
||||
.await;
|
||||
|
||||
if let Err(sqlx::Error::RowNotFound) = rec {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
rec.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn create_upgrade_delete(
|
||||
&mut self,
|
||||
delete: &[ParquetFileId],
|
||||
|
|
|
@ -19,8 +19,8 @@ use data_types::{
|
|||
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart,
|
||||
},
|
||||
Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId,
|
||||
NamespaceName, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId,
|
||||
PartitionKey, SkippedCompaction, Table, TableId, Timestamp,
|
||||
NamespaceName, ParquetFile, ParquetFileExists, ParquetFileId, ParquetFileParams, Partition,
|
||||
PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
|
@ -1335,6 +1335,27 @@ WHERE object_store_id = $1;
|
|||
Ok(Some(parquet_file.into()))
|
||||
}
|
||||
|
||||
async fn exists_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<bool> {
|
||||
let rec = sqlx::query_as::<_, ParquetFileExists>(
|
||||
r#"
|
||||
SELECT 1 as exists
|
||||
FROM parquet_file
|
||||
WHERE object_store_id = $1;
|
||||
"#,
|
||||
)
|
||||
.bind(object_store_id) // $1
|
||||
.fetch_one(self.inner.get_mut())
|
||||
.await;
|
||||
|
||||
if let Err(sqlx::Error::RowNotFound) = rec {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
rec.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn create_upgrade_delete(
|
||||
&mut self,
|
||||
delete: &[ParquetFileId],
|
||||
|
|
|
@ -650,6 +650,7 @@ mod tests {
|
|||
location: Path::parse(format!("{n}.parquet")).unwrap(),
|
||||
last_modified: Default::default(),
|
||||
size: 0,
|
||||
e_tag: None,
|
||||
},
|
||||
partition_values: vec![],
|
||||
range: None,
|
||||
|
|
|
@ -455,6 +455,7 @@ impl TestChunk {
|
|||
location: Self::parquet_location(self.id),
|
||||
last_modified: Default::default(),
|
||||
size: 1,
|
||||
e_tag: None,
|
||||
},
|
||||
}),
|
||||
..self
|
||||
|
|
|
@ -7,7 +7,7 @@ use snafu::Snafu;
|
|||
use std::ops::Range;
|
||||
|
||||
use object_store::{
|
||||
path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
|
||||
path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
|
||||
};
|
||||
use tokio::io::AsyncWrite;
|
||||
|
||||
|
@ -70,6 +70,10 @@ impl ObjectStore for DummyObjectStore {
|
|||
Ok(NotSupportedSnafu { name: self.name }.fail()?)
|
||||
}
|
||||
|
||||
async fn get_opts(&self, _location: &Path, _options: GetOptions) -> Result<GetResult> {
|
||||
Ok(NotSupportedSnafu { name: self.name }.fail()?)
|
||||
}
|
||||
|
||||
async fn get(&self, _location: &Path) -> Result<GetResult> {
|
||||
Ok(NotSupportedSnafu { name: self.name }.fail()?)
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
unused_crate_dependencies
|
||||
)]
|
||||
|
||||
use object_store::GetOptions;
|
||||
// Workaround for "unused crate" lint false positives.
|
||||
use workspace_hack as _;
|
||||
|
||||
|
@ -221,10 +222,10 @@ impl ObjectStore for ObjectStoreMetrics {
|
|||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn get(&self, location: &Path) -> Result<GetResult> {
|
||||
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
|
||||
let started_at = self.time_provider.now();
|
||||
|
||||
let res = self.inner.get(location).await;
|
||||
let res = self.inner.get_opts(location, options).await;
|
||||
|
||||
match res {
|
||||
Ok(GetResult::File(file, path)) => {
|
||||
|
|
|
@ -296,6 +296,7 @@ impl ParquetStorage {
|
|||
// we don't care about the "last modified" field
|
||||
last_modified: Default::default(),
|
||||
size: file_size,
|
||||
e_tag: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -324,7 +325,7 @@ pub enum ProjectionError {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use arrow::{
|
||||
array::{ArrayRef, Int64Array, StringArray},
|
||||
array::{ArrayRef, BinaryArray, Int64Array, StringArray},
|
||||
record_batch::RecordBatch,
|
||||
};
|
||||
use data_types::{CompactionLevel, NamespaceId, PartitionId, TableId};
|
||||
|
@ -440,13 +441,13 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_schema_check_fail_different_types() {
|
||||
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();
|
||||
let batch = RecordBatch::try_from_iter([("a", to_binary_array(&["value"]))]).unwrap();
|
||||
let other_batch = RecordBatch::try_from_iter([("a", to_int_array(&[1]))]).unwrap();
|
||||
let schema = batch.schema();
|
||||
assert_schema_check_fail(
|
||||
other_batch,
|
||||
schema,
|
||||
"Execution error: Failed to map column projection for field a. Incompatible data types Int64 and Utf8",
|
||||
"Error during planning: Cannot cast file schema field a of type Int64 to table schema field of type Binary",
|
||||
).await;
|
||||
}
|
||||
|
||||
|
@ -568,6 +569,11 @@ mod tests {
|
|||
Arc::new(array)
|
||||
}
|
||||
|
||||
fn to_binary_array(strs: &[&str]) -> ArrayRef {
|
||||
let array: BinaryArray = strs.iter().map(|s| Some(*s)).collect();
|
||||
Arc::new(array)
|
||||
}
|
||||
|
||||
fn to_int_array(vals: &[i64]) -> ArrayRef {
|
||||
let array: Int64Array = vals.iter().map(|v| Some(*v)).collect();
|
||||
Arc::new(array)
|
||||
|
|
|
@ -115,7 +115,7 @@ mod test {
|
|||
test_helpers::maybe_start_logging();
|
||||
let props = WriterProperties::builder()
|
||||
.set_max_row_group_size(TEST_MAX_ROW_GROUP_SIZE)
|
||||
.set_data_pagesize_limit(10) // ensure each batch is written as a page
|
||||
.set_data_page_size_limit(10) // ensure each batch is written as a page
|
||||
.build();
|
||||
|
||||
let mut data_gen = DataGenerator::new();
|
||||
|
@ -163,7 +163,7 @@ mod test {
|
|||
test_helpers::maybe_start_logging();
|
||||
let props = WriterProperties::builder()
|
||||
.set_max_row_group_size(TEST_MAX_ROW_GROUP_SIZE)
|
||||
.set_data_pagesize_limit(10) // ensure each batch is written as a page
|
||||
.set_data_page_size_limit(10) // ensure each batch is written as a page
|
||||
.build();
|
||||
|
||||
let mut data_gen = DataGenerator::new();
|
||||
|
@ -199,7 +199,7 @@ mod test {
|
|||
test_helpers::maybe_start_logging();
|
||||
let props = WriterProperties::builder()
|
||||
.set_max_row_group_size(TEST_MAX_ROW_GROUP_SIZE)
|
||||
.set_data_pagesize_limit(10) // ensure each batch is written as a page
|
||||
.set_data_page_size_limit(10) // ensure each batch is written as a page
|
||||
.build();
|
||||
|
||||
let mut data_gen = DataGenerator::new();
|
||||
|
|
|
@ -16,9 +16,10 @@ use cache_system::{
|
|||
use futures::{stream::BoxStream, StreamExt};
|
||||
use iox_time::TimeProvider;
|
||||
use object_store::{
|
||||
path::Path, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta,
|
||||
ObjectStore,
|
||||
path::Path, Error as ObjectStoreError, GetOptions, GetResult, ListResult, MultipartId,
|
||||
ObjectMeta, ObjectStore,
|
||||
};
|
||||
use observability_deps::tracing::warn;
|
||||
use tokio::io::AsyncWrite;
|
||||
use trace::span::Span;
|
||||
|
||||
|
@ -187,7 +188,54 @@ impl ObjectStore for CachedObjectStore {
|
|||
Err(ObjectStoreError::NotImplemented)
|
||||
}
|
||||
|
||||
async fn get(&self, location: &Path) -> Result<GetResult, ObjectStoreError> {
|
||||
async fn get_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
options: GetOptions,
|
||||
) -> Result<GetResult, ObjectStoreError> {
|
||||
let GetOptions {
|
||||
if_match,
|
||||
if_none_match,
|
||||
if_modified_since,
|
||||
if_unmodified_since,
|
||||
range,
|
||||
} = options;
|
||||
|
||||
// since the options are not cached, error if we see any of them
|
||||
if if_match.is_some() {
|
||||
warn!(?location, "if_match not supported by CachedObjectStore");
|
||||
return Err(ObjectStoreError::NotImplemented);
|
||||
}
|
||||
|
||||
if if_none_match.is_some() {
|
||||
warn!(
|
||||
?location,
|
||||
"if_none_match not supported by CachedObjectStore"
|
||||
);
|
||||
return Err(ObjectStoreError::NotImplemented);
|
||||
}
|
||||
|
||||
if if_modified_since.is_some() {
|
||||
warn!(
|
||||
?location,
|
||||
"is_modified_since not supported by CachedObjectStore"
|
||||
);
|
||||
return Err(ObjectStoreError::NotImplemented);
|
||||
}
|
||||
|
||||
if if_unmodified_since.is_some() {
|
||||
warn!(
|
||||
?location,
|
||||
"if_unmodified_since not supported by CachedObjectStore"
|
||||
);
|
||||
return Err(ObjectStoreError::NotImplemented);
|
||||
}
|
||||
|
||||
if range.is_some() {
|
||||
warn!(?location, "range not supported by CachedObjectStore");
|
||||
return Err(ObjectStoreError::NotImplemented);
|
||||
}
|
||||
|
||||
let data = self.get_data(location).await?;
|
||||
|
||||
Ok(GetResult::Stream(
|
||||
|
@ -227,6 +275,7 @@ impl ObjectStore for CachedObjectStore {
|
|||
// retrieve it.
|
||||
last_modified: Default::default(),
|
||||
size: data.len(),
|
||||
e_tag: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -654,30 +654,14 @@ where
|
|||
.try_encode()
|
||||
.context(InternalCreatingTicketSnafu)?;
|
||||
|
||||
// Flight says "Set these to -1 if unknown."
|
||||
//
|
||||
// https://github.com/apache/arrow-rs/blob/a0a5880665b1836890f6843b6b8772d81c463351/format/Flight.proto#L274-L276
|
||||
let total_records = -1;
|
||||
let total_bytes = -1;
|
||||
let endpoint = FlightEndpoint::new().with_ticket(ticket);
|
||||
|
||||
let endpoint = vec![FlightEndpoint {
|
||||
ticket: Some(ticket),
|
||||
// "If the list is empty, the expectation is that the
|
||||
// ticket can only be redeemed on the current service
|
||||
// where the ticket was generated."
|
||||
//
|
||||
// https://github.com/apache/arrow-rs/blob/a0a5880665b1836890f6843b6b8772d81c463351/format/Flight.proto#L292-L294
|
||||
location: vec![],
|
||||
}];
|
||||
|
||||
let flight_info = FlightInfo {
|
||||
schema,
|
||||
let mut flight_info = FlightInfo::new()
|
||||
.with_endpoint(endpoint)
|
||||
// return descriptor we were passed
|
||||
flight_descriptor: Some(flight_descriptor),
|
||||
endpoint,
|
||||
total_records,
|
||||
total_bytes,
|
||||
};
|
||||
.with_descriptor(flight_descriptor);
|
||||
|
||||
flight_info.schema = schema;
|
||||
|
||||
Ok(tonic::Response::new(flight_info))
|
||||
}
|
||||
|
|
|
@ -677,6 +677,7 @@ mod tests {
|
|||
fn round_trip_flightsql() {
|
||||
let cmd = FlightSQLCommand::CommandStatementQuery(CommandStatementQuery {
|
||||
query: "select * from foo".into(),
|
||||
transaction_id: None,
|
||||
});
|
||||
|
||||
let request = IoxGetRequest {
|
||||
|
|
|
@ -17,11 +17,11 @@ license.workspace = true
|
|||
### BEGIN HAKARI SECTION
|
||||
[dependencies]
|
||||
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
|
||||
arrow = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200", features = ["dyn_cmp_dict", "prettyprint"] }
|
||||
arrow-array = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200", default-features = false, features = ["chrono-tz"] }
|
||||
arrow-flight = { version = "40", features = ["flight-sql-experimental"] }
|
||||
arrow-ord = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200", default-features = false, features = ["dyn_cmp_dict"] }
|
||||
arrow-string = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200", default-features = false, features = ["dyn_cmp_dict"] }
|
||||
arrow = { version = "41", features = ["dyn_cmp_dict", "prettyprint"] }
|
||||
arrow-array = { version = "41", default-features = false, features = ["chrono-tz"] }
|
||||
arrow-flight = { version = "41", features = ["flight-sql-experimental"] }
|
||||
arrow-ord = { version = "41", default-features = false, features = ["dyn_cmp_dict"] }
|
||||
arrow-string = { version = "41", default-features = false, features = ["dyn_cmp_dict"] }
|
||||
base64-594e8ee84c453af0 = { package = "base64", version = "0.13" }
|
||||
base64-647d43efb71741da = { package = "base64", version = "0.21" }
|
||||
bitflags = { version = "1" }
|
||||
|
@ -30,9 +30,9 @@ bytes = { version = "1" }
|
|||
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] }
|
||||
crossbeam-utils = { version = "0.8" }
|
||||
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "859251b4a20e00c3dfe73eee6b605fcf722687e7" }
|
||||
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "859251b4a20e00c3dfe73eee6b605fcf722687e7", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
|
||||
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "859251b4a20e00c3dfe73eee6b605fcf722687e7", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "91e75d7e6303c1a7331e8e90eaad9b095ace929b" }
|
||||
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "91e75d7e6303c1a7331e8e90eaad9b095ace929b", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
|
||||
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "91e75d7e6303c1a7331e8e90eaad9b095ace929b", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
|
||||
digest = { version = "0.10", features = ["mac", "std"] }
|
||||
either = { version = "1" }
|
||||
fixedbitset = { version = "0.4" }
|
||||
|
@ -57,10 +57,10 @@ memchr = { version = "2" }
|
|||
nom = { version = "7" }
|
||||
num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
object_store = { version = "0.5", default-features = false, features = ["aws", "azure", "gcp"] }
|
||||
object_store = { version = "0.6", default-features = false, features = ["aws", "azure", "gcp"] }
|
||||
once_cell = { version = "1", features = ["parking_lot"] }
|
||||
parking_lot = { version = "0.12", features = ["arc_lock"] }
|
||||
parquet = { git = "https://github.com/alamb/arrow-rs", rev = "4a59200", features = ["experimental", "object_store"] }
|
||||
parquet = { version = "41", features = ["experimental", "object_store"] }
|
||||
petgraph = { version = "0.6" }
|
||||
phf_shared = { version = "0.11" }
|
||||
predicates = { version = "3" }
|
||||
|
|
Loading…
Reference in New Issue