chore: merge main to branch

pull/24376/head
Nga Tran 2021-05-19 15:11:15 -04:00
commit 11561111d5
26 changed files with 1739 additions and 373 deletions

256
Cargo.lock generated
View File

@ -31,6 +31,17 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "796540673305a66d127804eef19ad696f1f204b8c1025aaca4958c17eab32877"
dependencies = [
"getrandom 0.2.2",
"once_cell",
"version_check",
]
[[package]]
name = "ahash"
version = "0.7.2"
@ -96,6 +107,15 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544"
[[package]]
name = "arrayvec"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd9fd44efafa8690358b7408d253adf110036b88f55672a933f01d616ad9b1b9"
dependencies = [
"nodrop",
]
[[package]]
name = "arrayvec"
version = "0.5.2"
@ -159,7 +179,7 @@ dependencies = [
name = "arrow_util"
version = "0.1.0"
dependencies = [
"ahash",
"ahash 0.7.2",
"arrow 0.1.0",
"futures",
"hashbrown 0.11.2",
@ -193,6 +213,15 @@ dependencies = [
"wait-timeout",
]
[[package]]
name = "async-mutex"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e"
dependencies = [
"event-listener",
]
[[package]]
name = "async-stream"
version = "0.3.1"
@ -372,7 +401,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afa748e348ad3be8263be728124b24a24f268266f6f5d58af9d75f6a40b5c587"
dependencies = [
"arrayref",
"arrayvec",
"arrayvec 0.5.2",
"constant_time_eq",
]
@ -424,6 +453,12 @@ version = "3.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe"
[[package]]
name = "bytemuck"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bed57e2090563b83ba8f83366628ce535a7584c9afa4c9fc0612a03925c6df58"
[[package]]
name = "byteorder"
version = "1.4.3"
@ -439,6 +474,40 @@ dependencies = [
"serde",
]
[[package]]
name = "cached"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e2afe73808fbaac302e39c9754bfc3c4b4d0f99c9c240b9f4e4efc841ad1b74"
dependencies = [
"async-mutex",
"async-trait",
"cached_proc_macro",
"cached_proc_macro_types",
"futures",
"hashbrown 0.9.1",
"once_cell",
]
[[package]]
name = "cached_proc_macro"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf857ae42d910aede5c5186e62684b0d7a597ce2fe3bd14448ab8f7ef439848c"
dependencies = [
"async-mutex",
"cached_proc_macro_types",
"darling",
"quote",
"syn",
]
[[package]]
name = "cached_proc_macro_types"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a4f925191b4367301851c6d99b09890311d74b0d43f274c0b34c86d308a3663"
[[package]]
name = "cast"
version = "0.2.5"
@ -512,7 +581,7 @@ dependencies = [
"ansi_term 0.11.0",
"atty",
"bitflags",
"strsim",
"strsim 0.8.0",
"textwrap",
"unicode-width",
"vec_map",
@ -767,6 +836,41 @@ dependencies = [
"sct",
]
[[package]]
name = "darling"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858"
dependencies = [
"darling_core",
"darling_macro",
]
[[package]]
name = "darling_core"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim 0.9.3",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72"
dependencies = [
"darling_core",
"quote",
"syn",
]
[[package]]
name = "dashmap"
version = "4.0.2"
@ -804,7 +908,7 @@ name = "datafusion"
version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=9cf32cf2cda8472b87130142c4eee1126d4d9cbe#9cf32cf2cda8472b87130142c4eee1126d4d9cbe"
dependencies = [
"ahash",
"ahash 0.7.2",
"arrow 4.0.0-SNAPSHOT",
"async-trait",
"chrono",
@ -830,6 +934,15 @@ dependencies = [
"futures",
]
[[package]]
name = "debugid"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f91cf5a8c2f2097e2a32627123508635d47ce10563d999ec1a95addf08b502ba"
dependencies = [
"uuid",
]
[[package]]
name = "difference"
version = "2.0.0"
@ -975,6 +1088,12 @@ dependencies = [
"termcolor",
]
[[package]]
name = "event-listener"
version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]]
name = "extend"
version = "0.1.2"
@ -1319,7 +1438,7 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash",
"ahash 0.7.2",
]
[[package]]
@ -1456,6 +1575,12 @@ dependencies = [
"tokio-native-tls",
]
[[package]]
name = "ident_case"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "idna"
version = "0.2.3"
@ -1477,6 +1602,24 @@ dependencies = [
"hashbrown 0.9.1",
]
[[package]]
name = "inferno"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37fb405dbcc505ed20838c4f8dad7b901094de90add237755df54bd5dcda2fdd"
dependencies = [
"ahash 0.6.3",
"atty",
"indexmap",
"itoa",
"lazy_static",
"log",
"num-format",
"quick-xml",
"rgb",
"str_stack",
]
[[package]]
name = "influxdb2_client"
version = "0.1.0"
@ -1541,6 +1684,7 @@ dependencies = [
"panic_logging",
"parking_lot",
"parquet 0.1.0",
"pprof",
"predicates",
"prettytable-rs",
"prost",
@ -1738,7 +1882,7 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe"
dependencies = [
"arrayvec",
"arrayvec 0.5.2",
"bitflags",
"cfg-if",
"ryu",
@ -1859,6 +2003,16 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc"
[[package]]
name = "memmap"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "memoffset"
version = "0.6.3"
@ -2024,6 +2178,12 @@ dependencies = [
"libc",
]
[[package]]
name = "nodrop"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb"
[[package]]
name = "nom"
version = "5.1.2"
@ -2095,6 +2255,16 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-format"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bafe4179722c2894288ee77a9f044f02811c86af699344c498b0840c698a2465"
dependencies = [
"arrayvec 0.4.12",
"itoa",
]
[[package]]
name = "num-integer"
version = "0.1.44"
@ -2625,6 +2795,27 @@ dependencies = [
"plotters-backend",
]
[[package]]
name = "pprof"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234cb1ca0d59aa771d9bc7e268739d7aef6ca7e9e8d3b78d92f266cd663fd0c1"
dependencies = [
"backtrace",
"inferno",
"lazy_static",
"libc",
"log",
"nix",
"parking_lot",
"prost",
"prost-build",
"prost-derive",
"symbolic-demangle",
"tempfile",
"thiserror",
]
[[package]]
name = "ppv-lite86"
version = "0.2.10"
@ -2824,6 +3015,15 @@ version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quick-xml"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26aab6b48e2590e4a64d1ed808749ba06257882b461d01ca71baeb747074a6dd"
dependencies = [
"memchr",
]
[[package]]
name = "quote"
version = "1.0.9"
@ -3095,6 +3295,15 @@ dependencies = [
"winreg",
]
[[package]]
name = "rgb"
version = "0.8.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fddb3b23626145d1776addfc307e1a1851f60ef6ca64f376bcb889697144cf0"
dependencies = [
"bytemuck",
]
[[package]]
name = "ring"
version = "0.16.20"
@ -3461,6 +3670,7 @@ dependencies = [
"arrow_util",
"async-trait",
"bytes",
"cached",
"chrono",
"crc32fast",
"criterion",
@ -3702,12 +3912,24 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
[[package]]
name = "str_stack"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
[[package]]
name = "strsim"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "strsim"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c"
[[package]]
name = "structopt"
version = "0.3.21"
@ -3738,6 +3960,28 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2"
[[package]]
name = "symbolic-common"
version = "8.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be7dfa630954f18297ceae1ff2890cb7f5008a0b2d2106b0468dafc45b0b6b12"
dependencies = [
"debugid",
"memmap",
"stable_deref_trait",
"uuid",
]
[[package]]
name = "symbolic-demangle"
version = "8.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b4ba42bd1221803e965054767b1899f2db9a12c89969965c6cb3a02af7014eb"
dependencies = [
"rustc-demangle",
"symbolic-common",
]
[[package]]
name = "syn"
version = "1.0.67"

View File

@ -92,6 +92,7 @@ parking_lot = "0.11.1"
itertools = "0.9.0"
# used by arrow/datafusion anyway
prettytable-rs = "0.8"
pprof = { version = "^0.4", default-features = false, features = ["flamegraph", "protobuf"] }
prost = "0.7"
# Forked to upgrade hyper and tokio
routerify = { git = "https://github.com/influxdata/routerify", rev = "274e250" }

View File

@ -4,8 +4,8 @@ use std::fmt::Debug;
use observability_deps::tracing::error;
/// Add ability for Results to log error messages via `error!` logs.
/// This is useful when using async tasks that may not have a natural
/// return error
/// This is useful when using async tasks that may not have any code
/// checking their return values.
pub trait ErrorLogger {
/// Log the contents of self with a string of context. The context
/// should appear in a message such as

View File

@ -17,6 +17,7 @@ We hold monthly Tech Talks that explain the project's technical underpinnings. Y
* Rust style and Idiom guide: [style_guide.md](style_guide.md)
* Tracing and logging Guide: [tracing.md](tracing.md)
* Profiling Guide: [profiling.md](profiling.md)
* How InfluxDB IOx manages the lifecycle of time series data: [data_management.md](data_management.md)
* Thoughts on parquet encoding and compression for timeseries data: [encoding_thoughts.md](encoding_thoughts.md)
* Thoughts on using multiple cores: [multi_core_tasks.md](multi_core_tasks.md)

BIN
docs/images/flame_graph.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

46
docs/profiling.md Normal file
View File

@ -0,0 +1,46 @@
# IOx — Profiling
IOx includes an embedded `pprof` exporter compatible with the [go pprof](https://golang.org/pkg/net/http/pprof/) tool.
To use it, aim your favorite tool at your IOx host at the HTTP `/debug/pprof/profile` endpoint.
# Use the Go `pprof` tool:
Example
```shell
go tool pprof 'http://localhost:8080/debug/pprof/profile?seconds=5'
```
And you get output like:
```
Fetching profile over HTTP from http://localhost:8080/debug/pprof/profile?seconds=5
Saved profile in /Users/mkm/pprof/pprof.cpu.006.pb.gz
Type: cpu
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 93, 100% of 93 total
Showing top 10 nodes out of 185
flat flat% sum% cum cum%
93 100% 100% 93 100% backtrace::backtrace::libunwind::trace
0 0% 100% 1 1.08% <&str as nom::traits::InputTakeAtPosition>::split_at_position1_complete
0 0% 100% 1 1.08% <(FnA,FnB) as nom::sequence::Tuple<Input,(A,B),Error>>::parse
0 0% 100% 1 1.08% <(FnA,FnB,FnC) as nom::sequence::Tuple<Input,(A,B,C),Error>>::parse
0 0% 100% 5 5.38% <F as futures_core::future::TryFuture>::try_poll
0 0% 100% 1 1.08% <T as alloc::slice::hack::ConvertVec>::to_vec
0 0% 100% 1 1.08% <alloc::alloc::Global as core::alloc::Allocator>::allocate
0 0% 100% 1 1.08% <alloc::borrow::Cow<B> as core::clone::Clone>::clone
0 0% 100% 3 3.23% <alloc::vec::Vec<T,A> as alloc::vec::spec_extend::SpecExtend<T,I>>::spec_extend
0 0% 100% 1 1.08% <alloc::vec::Vec<T,A> as core::iter::traits::collect::Extend<T>>::extend
```
# Use the built in flame graph renderer
IOx also knows how to render a flamegraph SVG directly if opened directly in the browser:
For example, if you aim your browser at an IOx server with a URL such as http://localhost:8080/debug/pprof/profile?seconds=5
You will see a beautiful flame graph such as
![Flame Graph](images/flame_graph.png)

View File

@ -8,7 +8,7 @@ use internal_types::selection::Selection;
use snafu::{OptionExt, ResultExt, Snafu};
use super::Chunk;
use data_types::partition_metadata::Statistics;
use data_types::{error::ErrorLogger, partition_metadata::Statistics};
#[derive(Debug, Snafu)]
pub enum Error {
@ -57,8 +57,15 @@ impl ChunkSnapshot {
let mut records: HashMap<String, TableSnapshot> = Default::default();
let table = &chunk.table;
let schema = table.schema(&chunk.dictionary, Selection::All).unwrap();
let batch = table.to_arrow(&chunk.dictionary, Selection::All).unwrap();
let schema = table
.schema(&chunk.dictionary, Selection::All)
.log_if_error("ChunkSnapshot getting table schema")
.unwrap();
let batch = table
.to_arrow(&chunk.dictionary, Selection::All)
.log_if_error("ChunkSnapshot converting table to arrow")
.unwrap();
let name = chunk.table_name.as_ref();
let timestamp_range =
@ -87,10 +94,13 @@ impl ChunkSnapshot {
},
);
Self {
chunk_id: chunk.id.expect("cannot snapshot chunk without an ID"),
records,
}
let chunk_id = chunk
.id
.ok_or("cannot snapshot chunk without an ID")
.log_if_error("ChunkSnapshot determining chunk id")
.unwrap();
Self { chunk_id, records }
}
/// return the ID of the chunk this is a snapshot of

View File

@ -251,24 +251,33 @@ where
S: CatalogState,
{
/// Create new catalog w/o any data.
pub fn new_empty(
///
/// An empty transaction will be used to mark the catalog start so that concurrent open but still-empty catalogs can
/// easily be detected.
pub async fn new_empty(
object_store: Arc<ObjectStore>,
server_id: ServerId,
db_name: impl Into<String>,
state_data: S::EmptyInput,
) -> Self {
) -> Result<Self> {
let inner = PreservedCatalogInner {
previous_tkey: None,
state: Arc::new(S::new_empty(state_data)),
};
Self {
let catalog = Self {
inner: RwLock::new(inner),
transaction_semaphore: Semaphore::new(1),
object_store,
server_id,
db_name: db_name.into(),
}
};
// add empty transaction
let transaction = catalog.open_transaction().await;
transaction.commit().await?;
Ok(catalog)
}
/// Load existing catalog from store, if it exists.
@ -383,14 +392,13 @@ where
}
/// Get latest revision counter.
///
/// This can be `None` for a newly created catalog.
pub fn revision_counter(&self) -> Option<u64> {
pub fn revision_counter(&self) -> u64 {
self.inner
.read()
.previous_tkey
.clone()
.map(|tkey| tkey.revision_counter)
.expect("catalog should have at least an empty transaction")
}
}
@ -960,6 +968,42 @@ pub mod tests {
use super::test_helpers::TestCatalogState;
use super::*;
#[tokio::test]
async fn test_create_empty() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
assert!(PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
()
)
.await
.unwrap()
.is_none());
PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
assert!(PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
()
)
.await
.unwrap()
.is_some());
}
#[tokio::test]
async fn test_inmem_commit_semantics() {
let object_store = make_object_store();
@ -1064,12 +1108,7 @@ pub mod tests {
(),
)
.await;
assert!(matches!(
res,
Err(Error::MissingTransaction {
revision_counter: 0
})
));
assert_eq!(res.unwrap_err().to_string(), "Missing transaction: 0",);
}
#[tokio::test]
@ -1327,14 +1366,16 @@ pub mod tests {
make_server_id(),
"db1".to_string(),
(),
);
)
.await
.unwrap();
let mut t = catalog.open_transaction().await;
// open transaction
t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string();
assert_eq!(
format!("{:?}", t),
"TransactionHandle(open, 0.00000000-0000-0000-0000-000000000000)"
"TransactionHandle(open, 1.00000000-0000-0000-0000-000000000000)"
);
// "closed" transaction
@ -1521,7 +1562,9 @@ pub mod tests {
server_id,
db_name.to_string(),
(),
);
)
.await
.unwrap();
// get some test metadata
let metadata1 = make_metadata(object_store, "foo").await;
@ -1531,8 +1574,9 @@ pub mod tests {
let mut trace = TestTrace::new();
// empty catalog has no data
assert!(catalog.revision_counter().is_none());
assert_eq!(catalog.revision_counter(), 0);
assert_catalog_parquet_files(&catalog, &[]);
trace.record(&catalog);
// fill catalog with examples
{
@ -1548,7 +1592,7 @@ pub mod tests {
t.commit().await.unwrap();
}
assert_eq!(catalog.revision_counter().unwrap(), 0);
assert_eq!(catalog.revision_counter(), 1);
assert_catalog_parquet_files(
&catalog,
&[
@ -1578,7 +1622,7 @@ pub mod tests {
t.commit().await.unwrap();
}
assert_eq!(catalog.revision_counter().unwrap(), 1);
assert_eq!(catalog.revision_counter(), 2);
assert_catalog_parquet_files(
&catalog,
&[
@ -1599,7 +1643,7 @@ pub mod tests {
// NO commit here!
}
assert_eq!(catalog.revision_counter().unwrap(), 1);
assert_eq!(catalog.revision_counter(), 2);
assert_catalog_parquet_files(
&catalog,
&[
@ -1629,7 +1673,7 @@ pub mod tests {
.unwrap()
.unwrap();
assert_eq!(
catalog.revision_counter().unwrap(),
catalog.revision_counter(),
trace.tkeys.last().unwrap().revision_counter
);
assert_catalog_parquet_files(

View File

@ -8,7 +8,7 @@ use datafusion::{
execution::context::{ExecutionContextState, QueryPlanner},
logical_plan::{LogicalPlan, UserDefinedLogicalNode},
physical_plan::{
collect,
collect, displayable,
merge::MergeExec,
planner::{DefaultPhysicalPlanner, ExtensionPlanner},
ExecutionPlan, PhysicalPlanner, SendableRecordBatchStream,
@ -151,21 +151,15 @@ impl IOxExecutionContext {
/// Prepare (optimize + plan) a pre-created logical plan for execution
pub fn prepare_plan(&self, plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
debug!(
"Creating plan: Initial plan\n----\n{}\n{}\n----",
plan.display_indent_schema(),
plan.display_graphviz(),
);
debug!(text=%plan.display_indent_schema(), "initial plan");
let plan = self.inner.optimize(&plan)?;
debug!(text=%plan.display_indent_schema(), graphviz=%plan.display_graphviz(), "optimized plan");
debug!(
"Creating plan: Optimized plan\n----\n{}\n{}\n----",
plan.display_indent_schema(),
plan.display_graphviz(),
);
let physical_plan = self.inner.create_physical_plan(&plan)?;
self.inner.create_physical_plan(&plan)
debug!(text=%displayable(physical_plan.as_ref()).indent(), "optimized physical plan");
Ok(physical_plan)
}
/// Executes the logical plan using DataFusion on a separate

View File

@ -3,7 +3,7 @@
//! mode as well as for arbitrary other predicates that are expressed
//! by DataFusion's `Expr` type.
use std::collections::{BTreeSet, HashSet};
use std::{collections::{BTreeSet, HashSet}, fmt};
use data_types::timestamp::TimestampRange;
use datafusion::{
@ -27,9 +27,9 @@ pub const EMPTY_PREDICATE: Predicate = Predicate {
/// Represents a parsed predicate for evaluation by the
/// TSDatabase InfluxDB IOx query engine.
///
/// Note that the data model of TSDatabase (e.g. ParsedLine's)
/// Note that the InfluxDB data model (e.g. ParsedLine's)
/// distinguishes between some types of columns (tags and fields), and
/// likewise the semantics of this structure has some types of
/// likewise the semantics of this structure can express some types of
/// restrictions that only apply to certain types of columns.
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Predicate {
@ -37,23 +37,23 @@ pub struct Predicate {
/// to only tables whose names are in `table_names`
pub table_names: Option<BTreeSet<String>>,
// Optional field restriction. If present, restricts the results to only
// tables which have *at least one* of the fields in field_columns.
/// Optional field restriction. If present, restricts the results to only
/// tables which have *at least one* of the fields in field_columns.
pub field_columns: Option<BTreeSet<String>>,
/// Optional partition key filter
pub partition_key: Option<String>,
/// Optional timestamp range: only rows within this range are included in
/// results. Other rows are excluded
pub range: Option<TimestampRange>,
/// Optional arbitrary predicates, represented as list of
/// DataFusion expressions applied a logical conjunction (aka they
/// are 'AND'ed together). Only rows that evaluate to TRUE for all
/// these expressions should be returned. Other rows are excluded
/// from the results.
pub exprs: Vec<Expr>,
/// Optional timestamp range: only rows within this range are included in
/// results. Other rows are excluded
pub range: Option<TimestampRange>,
/// Optional partition key filter
pub partition_key: Option<String>,
}
impl Predicate {
@ -108,8 +108,66 @@ impl Predicate {
}
}
impl fmt::Display for Predicate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn iter_to_str<S>(s: impl IntoIterator<Item = S>) -> String
where
S: ToString,
{
s.into_iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(", ")
}
write!(f, "Predicate")?;
if let Some(table_names) = &self.table_names {
write!(f, " table_names: {{{}}}", iter_to_str(table_names))?;
}
if let Some(field_columns) = &self.field_columns {
write!(f, " field_columns: {{{}}}", iter_to_str(field_columns))?;
}
if let Some(partition_key) = &self.partition_key {
write!(f, " partition_key: '{}'", partition_key)?;
}
if let Some(range) = &self.range {
// TODO: could be nice to show this as actual timestamps (not just numbers)?
write!(f, " range: [{} - {}]", range.start, range.end)?;
}
if !self.exprs.is_empty() {
// Expr doesn't implement `Display` yet, so just the debug version
// See https://github.com/apache/arrow-datafusion/issues/347
let display_exprs = self.exprs.iter().map(|e| format!("{:?}", e));
write!(f, " exprs: [{}]", iter_to_str(display_exprs))?;
}
Ok(())
}
}
#[derive(Debug, Default)]
/// Structure for building `Predicate`s
/// Structure for building [`Predicate`]s
///
/// Example:
/// ```
/// use query::predicate::PredicateBuilder;
/// use datafusion::logical_plan::{col, lit};
///
/// let p = PredicateBuilder::new()
/// .timestamp_range(1, 100)
/// .add_expr(col("foo").eq(lit(42)))
/// .build();
///
/// assert_eq!(
/// p.to_string(),
/// "Predicate range: [1 - 100] exprs: [#foo Eq Int32(42)]"
/// );
/// ```
pub struct PredicateBuilder {
inner: Predicate,
}
@ -315,9 +373,8 @@ impl PredicateBuilder {
#[cfg(test)]
mod tests {
use datafusion::logical_plan::{col, lit};
use super::*;
use datafusion::logical_plan::{col, lit};
#[test]
fn test_default_predicate_is_empty() {
@ -392,4 +449,38 @@ mod tests {
assert_eq!(predicate.exprs[5], col("city").eq(lit("Boston")));
assert_eq!(predicate.exprs[6], col("city").not_eq(lit("Braintree")));
}
#[test]
fn predicate_display_ts() {
// TODO make this a doc example?
let p = PredicateBuilder::new().timestamp_range(1, 100).build();
assert_eq!(p.to_string(), "Predicate range: [1 - 100]");
}
#[test]
fn predicate_display_ts_and_expr() {
let p = PredicateBuilder::new()
.timestamp_range(1, 100)
.add_expr(col("foo").eq(lit(42)).and(col("bar").lt(lit(11))))
.build();
assert_eq!(
p.to_string(),
"Predicate range: [1 - 100] exprs: [#foo Eq Int32(42) And #bar Lt Int32(11)]"
);
}
#[test]
fn predicate_display_full() {
let p = PredicateBuilder::new()
.timestamp_range(1, 100)
.add_expr(col("foo").eq(lit(42)))
.table("my_table")
.field_columns(vec!["f1", "f2"])
.partition_key("the_key")
.build();
assert_eq!(p.to_string(), "Predicate table_names: {my_table} field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo Eq Int32(42)]");
}
}

View File

@ -1,11 +1,11 @@
//! Implementation of a DataFusion PhysicalPlan node across partition chunks
use std::sync::Arc;
use std::{fmt, sync::Arc};
use arrow::datatypes::SchemaRef;
use datafusion::{
error::DataFusionError,
physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream},
physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream},
};
use internal_types::{schema::Schema, selection::Selection};
@ -116,6 +116,21 @@ impl<C: PartitionChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
Ok(Box::pin(adapter))
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default => {
// Note Predicate doesn't implement Display so punt on showing that now
write!(
f,
"IOxReadFilterNode: table_name={}, chunks={} predicate={}",
self.table_name,
self.chunk_and_infos.len(),
self.predicate,
)
}
}
}
}
/// Removes any columns that are not present in schema, returning a possibly

View File

@ -116,7 +116,7 @@ fn benchmark_plain_sum(
|b, input| {
b.iter(|| {
// do work
let _ = encoding.sum(&input);
let _ = encoding.sum::<i64>(&input);
});
},
);
@ -142,7 +142,7 @@ fn benchmark_plain_sum(
|b, input| {
b.iter(|| {
// do work
let _ = encoding.sum::<i32>(&input);
let _ = encoding.sum::<i64>(&input);
});
},
);
@ -161,7 +161,7 @@ fn benchmark_plain_sum(
|b, input| {
b.iter(|| {
// do work
let _ = encoding.sum(&input);
let _ = encoding.sum::<i64>(&input);
});
},
);
@ -206,7 +206,7 @@ fn benchmark_plain_sum(
|b, input| {
b.iter(|| {
// do work
let _ = encoding.sum(&input);
let _ = encoding.sum::<i64>(&input);
});
},
);

View File

@ -116,7 +116,7 @@ fn benchmark_none_sum(
|b, input| {
b.iter(|| {
// do work
let _ = encoding.sum(&input);
let _ = encoding.sum::<i64>(&input);
});
},
);
@ -142,7 +142,7 @@ fn benchmark_none_sum(
|b, input| {
b.iter(|| {
// do work
let _ = encoding.sum::<i32>(&input);
let _ = encoding.sum::<i64>(&input);
});
},
);
@ -161,7 +161,7 @@ fn benchmark_none_sum(
|b, input| {
b.iter(|| {
// do work
let _ = encoding.sum(&input);
let _ = encoding.sum::<i64>(&input);
});
},
);
@ -187,7 +187,7 @@ fn benchmark_none_sum(
|b, input| {
b.iter(|| {
// do work
let _ = encoding.sum::<i16>(&input);
let _ = encoding.sum::<i64>(&input);
});
},
);
@ -206,7 +206,7 @@ fn benchmark_none_sum(
|b, input| {
b.iter(|| {
// do work
let _ = encoding.sum(&input);
let _ = encoding.sum::<i64>(&input);
});
},
);

View File

@ -21,7 +21,7 @@ use arrow::array::Array;
use crate::column::{cmp, RowIDs};
#[derive(Debug, Default)]
#[derive(Debug, Default, PartialEq, PartialOrd)]
/// A Fixed encoding is one in which every value has a fixed width, and is
/// stored contiguous in a backing vector. Fixed encodings do not support NULL
/// values, so are suitable for columns known to not have NULL values that we
@ -566,13 +566,6 @@ macro_rules! fixed_from_arrow_impls {
}
fixed_from_arrow_impls! {
(&arrow::array::Int64Array, i64),
(&arrow::array::Int64Array, i32),
(&arrow::array::Int64Array, i16),
(&arrow::array::Int64Array, i8),
(&arrow::array::Int64Array, u32),
(&arrow::array::Int64Array, u16),
(&arrow::array::Int64Array, u8),
(arrow::array::Int64Array, i64),
(arrow::array::Int64Array, i32),
(arrow::array::Int64Array, i16),
@ -581,10 +574,6 @@ fixed_from_arrow_impls! {
(arrow::array::Int64Array, u16),
(arrow::array::Int64Array, u8),
(&arrow::array::UInt64Array, u64),
(&arrow::array::UInt64Array, u32),
(&arrow::array::UInt64Array, u16),
(&arrow::array::UInt64Array, u8),
(arrow::array::UInt64Array, u64),
(arrow::array::UInt64Array, u32),
(arrow::array::UInt64Array, u16),

View File

@ -14,6 +14,7 @@
//! consumer of these encodings.
use std::cmp::Ordering;
use std::fmt::Debug;
use std::iter::FromIterator;
use std::mem::size_of;
use arrow::{
@ -23,7 +24,7 @@ use arrow::{
use crate::column::{cmp, RowIDs};
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct FixedNull<T>
where
T: ArrowNumericType,
@ -116,26 +117,30 @@ where
//
//
/// Return the logical (decoded) value at the provided row ID. A NULL value
/// Return the logical (decoded) value at the provided row ID according to
/// the logical type of the column, which is specified by `U`. A NULL value
/// is represented by None.
pub fn value(&self, row_id: u32) -> Option<T::Native> {
pub fn value<U>(&self, row_id: u32) -> Option<U>
where
U: From<T::Native>,
{
if self.arr.is_null(row_id as usize) {
return None;
}
Some(self.arr.value(row_id as usize))
Some(U::from(self.arr.value(row_id as usize)))
}
/// Returns the logical (decoded) values for the provided row IDs.
/// Returns the logical (decoded) values for the provided row IDs according
/// to the logical type of the column, which is specified by `U`.
///
/// NULL values are represented by None.
///
/// TODO(edd): Perf - we should return a vector of values and a vector of
/// integers representing the null validity bitmap.
pub fn values(
&self,
row_ids: &[u32],
mut dst: Vec<Option<T::Native>>,
) -> Vec<Option<T::Native>> {
pub fn values<U>(&self, row_ids: &[u32], mut dst: Vec<Option<U>>) -> Vec<Option<U>>
where
U: From<T::Native>,
{
dst.clear();
dst.reserve(row_ids.len());
@ -143,20 +148,24 @@ where
if self.arr.is_null(row_id as usize) {
dst.push(None)
} else {
dst.push(Some(self.arr.value(row_id as usize)))
dst.push(Some(U::from(self.arr.value(row_id as usize))))
}
}
assert_eq!(dst.len(), row_ids.len());
dst
}
/// Returns the logical (decoded) values for all the rows in the column.
/// Returns the logical (decoded) values for all the rows in the column
/// according to the logical type of the column, which is specified by `U`.
///
/// NULL values are represented by None.
///
/// TODO(edd): Perf - we should return a vector of values and a vector of
/// integers representing the null validity bitmap.
pub fn all_values(&self, mut dst: Vec<Option<T::Native>>) -> Vec<Option<T::Native>> {
pub fn all_values<U>(&self, mut dst: Vec<Option<U>>) -> Vec<Option<U>>
where
U: From<T::Native>,
{
dst.clear();
dst.reserve(self.arr.len());
@ -164,7 +173,7 @@ where
if self.arr.is_null(i) {
dst.push(None)
} else {
dst.push(Some(self.arr.value(i)))
dst.push(Some(U::from(self.arr.value(i))))
}
}
assert_eq!(dst.len(), self.num_rows() as usize);
@ -202,23 +211,23 @@ where
/// this implementation (about 85% in the `sum` case). We will revisit
/// them in the future as they do would the implementation of these
/// aggregation functions.
pub fn sum(&self, row_ids: &[u32]) -> Option<T::Native>
pub fn sum<U>(&self, row_ids: &[u32]) -> Option<U>
where
T::Native: std::ops::Add<Output = T::Native>,
U: Default + From<T::Native> + std::ops::Add<Output = U>,
{
let mut result = T::Native::default();
let mut result = U::default();
if self.arr.null_count() == 0 {
for chunks in row_ids.chunks_exact(4) {
result = result + self.arr.value(chunks[3] as usize);
result = result + self.arr.value(chunks[2] as usize);
result = result + self.arr.value(chunks[1] as usize);
result = result + self.arr.value(chunks[0] as usize);
result = result + U::from(self.arr.value(chunks[3] as usize));
result = result + U::from(self.arr.value(chunks[2] as usize));
result = result + U::from(self.arr.value(chunks[1] as usize));
result = result + U::from(self.arr.value(chunks[0] as usize));
}
let rem = row_ids.len() % 4;
for &i in &row_ids[row_ids.len() - rem..row_ids.len()] {
result = result + self.arr.value(i as usize);
result = result + U::from(self.arr.value(i as usize));
}
return Some(result);
@ -230,7 +239,7 @@ where
continue;
}
is_none = false;
result = result + self.arr.value(i as usize);
result = result + U::from(self.arr.value(i as usize));
}
if is_none {
@ -241,20 +250,29 @@ where
/// Returns the first logical (decoded) value from the provided
/// row IDs.
pub fn first(&self, row_ids: &[u32]) -> Option<T::Native> {
pub fn first<U>(&self, row_ids: &[u32]) -> Option<U>
where
U: From<T::Native>,
{
self.value(row_ids[0])
}
/// Returns the last logical (decoded) value from the provided
/// row IDs.
pub fn last(&self, row_ids: &[u32]) -> Option<T::Native> {
pub fn last<U>(&self, row_ids: &[u32]) -> Option<U>
where
U: From<T::Native>,
{
self.value(row_ids[row_ids.len() - 1])
}
/// Returns the minimum logical (decoded) non-null value from the provided
/// row IDs.
pub fn min(&self, row_ids: &[u32]) -> Option<T::Native> {
let mut min: Option<T::Native> = self.value(row_ids[0]);
pub fn min<U>(&self, row_ids: &[u32]) -> Option<U>
where
U: From<T::Native> + PartialOrd,
{
let mut min: Option<U> = self.value(row_ids[0]);
for &v in row_ids.iter().skip(1) {
if self.arr.is_null(v as usize) {
continue;
@ -269,8 +287,11 @@ where
/// Returns the maximum logical (decoded) non-null value from the provided
/// row IDs.
pub fn max(&self, row_ids: &[u32]) -> Option<T::Native> {
let mut max: Option<T::Native> = self.value(row_ids[0]);
pub fn max<U>(&self, row_ids: &[u32]) -> Option<U>
where
U: From<T::Native> + PartialOrd,
{
let mut max: Option<U> = self.value(row_ids[0]);
for &v in row_ids.iter().skip(1) {
if self.arr.is_null(v as usize) {
continue;
@ -414,11 +435,11 @@ where
/// `x {>, >=, <, <=} value1 AND x {>, >=, <, <=} value2`.
pub fn row_ids_filter_range(
&self,
left: (T::Native, cmp::Operator),
right: (T::Native, cmp::Operator),
left: (T::Native, &cmp::Operator),
right: (T::Native, &cmp::Operator),
dst: RowIDs,
) -> RowIDs {
match (&left.1, &right.1) {
match (left.1, right.1) {
(cmp::Operator::GT, cmp::Operator::LT)
| (cmp::Operator::GT, cmp::Operator::LTE)
| (cmp::Operator::GTE, cmp::Operator::LT)
@ -509,81 +530,93 @@ where
// This macro implements the From trait for slices of various logical types.
//
// Here is an example implementation:
// Here are example implementations:
//
// impl From<&[i64]> for FixedNull<arrow::datatypes::Int64Type> {
// fn from(v: &[i64]) -> Self {
// impl From<Vec<i64>> for FixedNull<Int64Type> {
// fn from(v: Vec<i64>) -> Self {
// Self{
// arr: PrimitiveArray::from(v.to_vec()),
// arr: PrimitiveArray::from(v),
// }
// }
// }
//
// impl From<&[Option<i64>]> for
// FixedNull<arrow::datatypes::Int64Type> { fn from(v: &[i64])
// -> Self { Self{
// arr: PrimitiveArray::from(v.to_vec()),
// impl From<&[i64]> for FixedNull<Int64Type> {
// fn from(v: &[i64]) -> Self {
// Self::from(v.to_vec())
// }
// }
//
// impl From<Vec<Option<i64>>> for FixedNull<Int64Type> {
// fn from(v: Vec<Option<i64>>) -> Self {
// Self{
// arr: PrimitiveArray::from(v),
// }
// }
// }
//
macro_rules! fixed_from_slice_impls {
// impl From<&[Option<i64>]> for FixedNull<Int64Type> {
// fn from(v: &[i64]) -> Self {
// Self::from(v.to_vec())
// }
// }
//
macro_rules! fixed_null_from_native_types {
($(($type_from:ty, $type_to:ty),)*) => {
$(
impl From<Vec<$type_from>> for FixedNull<$type_to> {
fn from(v: Vec<$type_from>) -> Self {
Self{
arr: PrimitiveArray::from(v),
}
}
}
impl From<&[$type_from]> for FixedNull<$type_to> {
fn from(v: &[$type_from]) -> Self {
Self::from(v.to_vec())
}
}
impl From<Vec<Option<$type_from>>> for FixedNull<$type_to> {
fn from(v: Vec<Option<$type_from>>) -> Self {
Self{
arr: PrimitiveArray::from(v.to_vec()),
arr: PrimitiveArray::from(v),
}
}
}
impl From<&[Option<$type_from>]> for FixedNull<$type_to> {
fn from(v: &[Option<$type_from>]) -> Self {
Self{
arr: PrimitiveArray::from(v.to_vec()),
}
Self::from(v.to_vec())
}
}
)*
};
}
// Supported logical and physical datatypes for the FixedNull encoding.
//
// Need to look at possibility of initialising smaller datatypes...
fixed_from_slice_impls! {
fixed_null_from_native_types! {
(i64, arrow::datatypes::Int64Type),
// (i64, arrow::datatypes::Int32Type),
// (i64, arrow::datatypes::Int16Type),
// (i64, arrow::datatypes::Int8Type),
// (i64, arrow::datatypes::UInt32Type),
// (i64, arrow::datatypes::UInt16Type),
// (i64, arrow::datatypes::UInt8Type),
(i32, arrow::datatypes::Int32Type),
// (i32, arrow::datatypes::Int16Type),
// (i32, arrow::datatypes::Int8Type),
// (i32, arrow::datatypes::UInt16Type),
// (i32, arrow::datatypes::UInt8Type),
(i16, arrow::datatypes::Int16Type),
// (i16, arrow::datatypes::Int8Type),
// (i16, arrow::datatypes::UInt8Type),
(i8, arrow::datatypes::Int8Type),
(u64, arrow::datatypes::UInt64Type),
// (u64, arrow::datatypes::UInt32Type),
// (u64, arrow::datatypes::UInt16Type),
// (u64, arrow::datatypes::UInt8Type),
(u32, arrow::datatypes::UInt32Type),
// (u32, arrow::datatypes::UInt16Type),
// (u32, arrow::datatypes::UInt8Type),
(u16, arrow::datatypes::UInt16Type),
// (u16, arrow::datatypes::UInt8Type),
(u8, arrow::datatypes::UInt8Type),
(f64, arrow::datatypes::Float64Type),
(i32, arrow::datatypes::Int32Type),
(i16, arrow::datatypes::Int16Type),
(i8, arrow::datatypes::Int8Type),
(u64, arrow::datatypes::UInt64Type),
(u32, arrow::datatypes::UInt32Type),
(u16, arrow::datatypes::UInt16Type),
(u8, arrow::datatypes::UInt8Type),
(f64, arrow::datatypes::Float64Type),
}
macro_rules! fixed_from_arrow_impls {
// This macro implements the From trait for Arrow arrays
//
// Implementation:
//
// impl From<Int64Array> for FixedNull<Int64Type> {
// fn from(arr: Int64Array) -> Self {
// Self{arr}
// }
// }
//
macro_rules! fixed_null_from_arrow_types {
($(($type_from:ty, $type_to:ty),)*) => {
$(
impl From<$type_from> for FixedNull<$type_to> {
@ -595,27 +628,76 @@ macro_rules! fixed_from_arrow_impls {
};
}
// Supported logical and physical datatypes for the Plain encoding.
//
// Need to look at possibility of initialising smaller datatypes...
fixed_from_arrow_impls! {
fixed_null_from_arrow_types! {
(arrow::array::Int64Array, arrow::datatypes::Int64Type),
(arrow::array::UInt64Array, arrow::datatypes::UInt64Type),
(arrow::array::Float64Array, arrow::datatypes::Float64Type),
}
// TODO(edd): add more datatypes
// This macro implements the From trait for Arrow arrays where some down-casting
// to a smaller physical type happens. It is the caller's responsibility to
// ensure that this down-casting is safe.
//
// Example implementation:
//
// impl From<Int64Array> for FixedNull<Int32Type> {
// fn from(arr: Int64Array) -> Self {
// let arr: PrimitiveArray<Int32Type> =
// PrimitiveArray::from_iter(arr.iter().map(|v| v.map(|v| v as i32)));
// Self { arr }
// }
// }
//
macro_rules! fixed_null_from_arrow_types_down_cast {
($(($type_from:ty, $type_to:ty, $rust_type:ty),)*) => {
$(
impl From<$type_from> for FixedNull<$type_to> {
fn from(arr: $type_from) -> Self {
let arr: PrimitiveArray<$type_to> =
PrimitiveArray::from_iter(arr.iter().map(|v| v.map(|v| v as $rust_type)));
Self { arr }
}
}
)*
};
}
fixed_null_from_arrow_types_down_cast! {
(arrow::array::Int64Array, arrow::datatypes::Int32Type, i32),
(arrow::array::Int64Array, arrow::datatypes::UInt32Type, u32),
(arrow::array::Int64Array, arrow::datatypes::Int16Type, i16),
(arrow::array::Int64Array, arrow::datatypes::UInt16Type, u16),
(arrow::array::Int64Array, arrow::datatypes::Int8Type, i8),
(arrow::array::Int64Array, arrow::datatypes::UInt8Type, u8),
(arrow::array::UInt64Array, arrow::datatypes::UInt32Type, u32),
(arrow::array::UInt64Array, arrow::datatypes::UInt16Type, u16),
(arrow::array::UInt64Array, arrow::datatypes::UInt8Type, u8),
}
#[cfg(test)]
mod test {
use super::cmp::Operator;
use super::*;
use arrow::array::*;
use arrow::datatypes::*;
fn some_vec<T: Copy>(v: Vec<T>) -> Vec<Option<T>> {
v.iter().map(|x| Some(*x)).collect()
}
#[test]
fn from_arrow_downcast() {
let arr = Int64Array::from(vec![100, u8::MAX as i64]);
let exp_values = arr.iter().collect::<Vec<Option<i64>>>();
let enc: FixedNull<UInt8Type> = FixedNull::from(arr);
assert_eq!(enc.all_values(vec![]), exp_values);
let arr = Int64Array::from(vec![100, i32::MAX as i64]);
let exp_values = arr.iter().collect::<Vec<Option<i64>>>();
let enc: FixedNull<UInt32Type> = FixedNull::from(arr);
assert_eq!(enc.all_values(vec![]), exp_values);
}
#[test]
fn size() {
let v = FixedNull::<UInt64Type>::from(vec![None, None, Some(100), Some(2222)].as_slice());
@ -879,36 +961,36 @@ mod test {
);
let row_ids = v.row_ids_filter_range(
(100, Operator::GTE),
(240, Operator::LT),
(100, &Operator::GTE),
(240, &Operator::LT),
RowIDs::new_vector(),
);
assert_eq!(row_ids.to_vec(), vec![0, 1, 5, 6, 13, 17]);
let row_ids = v.row_ids_filter_range(
(100, Operator::GT),
(240, Operator::LT),
(100, &Operator::GT),
(240, &Operator::LT),
RowIDs::new_vector(),
);
assert_eq!(row_ids.to_vec(), vec![1, 6, 13]);
let row_ids = v.row_ids_filter_range(
(10, Operator::LT),
(-100, Operator::GT),
(10, &Operator::LT),
(-100, &Operator::GT),
RowIDs::new_vector(),
);
assert_eq!(row_ids.to_vec(), vec![11, 14, 15]);
let row_ids = v.row_ids_filter_range(
(21, Operator::GTE),
(21, Operator::LTE),
(21, &Operator::GTE),
(21, &Operator::LTE),
RowIDs::new_vector(),
);
assert_eq!(row_ids.to_vec(), vec![16]);
let row_ids = v.row_ids_filter_range(
(10000, Operator::LTE),
(3999, Operator::GT),
(10000, &Operator::LTE),
(3999, &Operator::GT),
RowIDs::new_vector(),
);
assert_eq!(row_ids.to_vec(), Vec::<u32>::new());
@ -926,8 +1008,8 @@ mod test {
.as_slice(),
);
let row_ids = v.row_ids_filter_range(
(200, Operator::GTE),
(300, Operator::LTE),
(200, &Operator::GTE),
(300, &Operator::LTE),
RowIDs::new_vector(),
);
assert_eq!(row_ids.to_vec(), vec![1, 2, 4]);

View File

@ -1,12 +1,19 @@
use std::mem::size_of;
use arrow::{self, array::Array};
use arrow::{
self, array::Array, datatypes::Int16Type as ArrowInt16Type,
datatypes::Int32Type as ArrowInt32Type, datatypes::Int64Type as ArrowInt64Type,
datatypes::Int8Type as ArrowInt8Type, datatypes::UInt16Type as ArrowUInt16Type,
datatypes::UInt32Type as ArrowUInt32Type, datatypes::UInt64Type as ArrowUInt64Type,
datatypes::UInt8Type as ArrowUInt8Type,
};
use super::encoding::{scalar::Fixed, scalar::FixedNull};
use super::{cmp, Statistics};
use crate::column::{EncodedValues, RowIDs, Scalar, Value, Values};
pub enum IntegerEncoding {
// non-null encodings. These are backed by `Vec<T>`
I64I64(Fixed<i64>),
I64I32(Fixed<i32>),
I64U32(Fixed<u32>),
@ -14,15 +21,97 @@ pub enum IntegerEncoding {
I64U16(Fixed<u16>),
I64I8(Fixed<i8>),
I64U8(Fixed<u8>),
U64U64(Fixed<u64>),
U64U32(Fixed<u32>),
U64U16(Fixed<u16>),
U64U8(Fixed<u8>),
// Nullable encodings - TODO, add variants for smaller physical types.
I64I64N(FixedNull<arrow::datatypes::Int64Type>),
U64U64N(FixedNull<arrow::datatypes::UInt64Type>),
// Nullable encodings. These are backed by an Arrow array.
I64I64N(FixedNull<ArrowInt64Type>),
I64I32N(FixedNull<ArrowInt32Type>),
I64U32N(FixedNull<ArrowUInt32Type>),
I64I16N(FixedNull<ArrowInt16Type>),
I64U16N(FixedNull<ArrowUInt16Type>),
I64I8N(FixedNull<ArrowInt8Type>),
I64U8N(FixedNull<ArrowUInt8Type>),
U64U64N(FixedNull<ArrowUInt64Type>),
U64U32N(FixedNull<ArrowUInt32Type>),
U64U16N(FixedNull<ArrowUInt16Type>),
U64U8N(FixedNull<ArrowUInt8Type>),
}
impl PartialEq for IntegerEncoding {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::I64I64(a), Self::I64I64(b)) => a == b,
(Self::I64I32(a), Self::I64I32(b)) => a == b,
(Self::I64U32(a), Self::I64U32(b)) => a == b,
(Self::I64I16(a), Self::I64I16(b)) => a == b,
(Self::I64U16(a), Self::I64U16(b)) => a == b,
(Self::I64I8(a), Self::I64I8(b)) => a == b,
(Self::I64U8(a), Self::I64U8(b)) => a == b,
(Self::U64U64(a), Self::U64U64(b)) => a == b,
(Self::U64U32(a), Self::U64U32(b)) => a == b,
(Self::U64U16(a), Self::U64U16(b)) => a == b,
(Self::U64U8(a), Self::U64U8(b)) => a == b,
(Self::I64I64N(a), Self::I64I64N(b)) => {
let a = a.all_values::<i64>(vec![]);
let b = b.all_values(vec![]);
a == b
}
(Self::I64I32N(a), Self::I64I32N(b)) => {
let a = a.all_values::<i64>(vec![]);
let b = b.all_values(vec![]);
a == b
}
(Self::I64U32N(a), Self::I64U32N(b)) => {
let a = a.all_values::<i64>(vec![]);
let b = b.all_values(vec![]);
a == b
}
(Self::I64I16N(a), Self::I64I16N(b)) => {
let a = a.all_values::<i64>(vec![]);
let b = b.all_values(vec![]);
a == b
}
(Self::I64U16N(a), Self::I64U16N(b)) => {
let a = a.all_values::<i64>(vec![]);
let b = b.all_values(vec![]);
a == b
}
(Self::I64I8N(a), Self::I64I8N(b)) => {
let a = a.all_values::<i64>(vec![]);
let b = b.all_values(vec![]);
a == b
}
(Self::I64U8N(a), Self::I64U8N(b)) => {
let a = a.all_values::<i64>(vec![]);
let b = b.all_values(vec![]);
a == b
}
(Self::U64U64N(a), Self::U64U64N(b)) => {
let a = a.all_values::<u64>(vec![]);
let b = b.all_values(vec![]);
a == b
}
(Self::U64U32N(a), Self::U64U32N(b)) => {
let a = a.all_values::<u64>(vec![]);
let b = b.all_values(vec![]);
a == b
}
(Self::U64U16N(a), Self::U64U16N(b)) => {
let a = a.all_values::<u64>(vec![]);
let b = b.all_values(vec![]);
a == b
}
(Self::U64U8N(a), Self::U64U8N(b)) => {
let a = a.all_values::<u64>(vec![]);
let b = b.all_values(vec![]);
a == b
}
(_, _) => false,
}
}
}
impl IntegerEncoding {
@ -41,7 +130,16 @@ impl IntegerEncoding {
Self::U64U16(enc) => enc.size(),
Self::U64U8(enc) => enc.size(),
Self::I64I64N(enc) => enc.size(),
Self::I64I32N(enc) => enc.size(),
Self::I64U32N(enc) => enc.size(),
Self::I64I16N(enc) => enc.size(),
Self::I64U16N(enc) => enc.size(),
Self::I64I8N(enc) => enc.size(),
Self::I64U8N(enc) => enc.size(),
Self::U64U64N(enc) => enc.size(),
Self::U64U32N(enc) => enc.size(),
Self::U64U16N(enc) => enc.size(),
Self::U64U8N(enc) => enc.size(),
}
}
@ -68,7 +166,16 @@ impl IntegerEncoding {
}
Self::I64I64N(enc) => enc.size_raw(include_nulls),
Self::I64I32N(enc) => enc.size_raw(include_nulls),
Self::I64U32N(enc) => enc.size_raw(include_nulls),
Self::I64I16N(enc) => enc.size_raw(include_nulls),
Self::I64U16N(enc) => enc.size_raw(include_nulls),
Self::I64I8N(enc) => enc.size_raw(include_nulls),
Self::I64U8N(enc) => enc.size_raw(include_nulls),
Self::U64U64N(enc) => enc.size_raw(include_nulls),
Self::U64U32N(enc) => enc.size_raw(include_nulls),
Self::U64U16N(enc) => enc.size_raw(include_nulls),
Self::U64U8N(enc) => enc.size_raw(include_nulls),
}
}
@ -87,7 +194,16 @@ impl IntegerEncoding {
Self::U64U16(enc) => enc.num_rows(),
Self::U64U8(enc) => enc.num_rows(),
Self::I64I64N(enc) => enc.num_rows(),
Self::I64I32N(enc) => enc.num_rows(),
Self::I64U32N(enc) => enc.num_rows(),
Self::I64I16N(enc) => enc.num_rows(),
Self::I64U16N(enc) => enc.num_rows(),
Self::I64I8N(enc) => enc.num_rows(),
Self::I64U8N(enc) => enc.num_rows(),
Self::U64U64N(enc) => enc.num_rows(),
Self::U64U32N(enc) => enc.num_rows(),
Self::U64U16N(enc) => enc.num_rows(),
Self::U64U8N(enc) => enc.num_rows(),
}
}
@ -108,7 +224,16 @@ impl IntegerEncoding {
pub fn contains_null(&self) -> bool {
match self {
Self::I64I64N(enc) => enc.contains_null(),
Self::I64I32N(enc) => enc.contains_null(),
Self::I64U32N(enc) => enc.contains_null(),
Self::I64I16N(enc) => enc.contains_null(),
Self::I64U16N(enc) => enc.contains_null(),
Self::I64I8N(enc) => enc.contains_null(),
Self::I64U8N(enc) => enc.contains_null(),
Self::U64U64N(enc) => enc.contains_null(),
Self::U64U32N(enc) => enc.contains_null(),
Self::U64U16N(enc) => enc.contains_null(),
Self::U64U8N(enc) => enc.contains_null(),
_ => false,
}
}
@ -128,7 +253,16 @@ impl IntegerEncoding {
Self::U64U16(_) => 0,
Self::U64U8(_) => 0,
Self::I64I64N(enc) => enc.null_count(),
Self::I64I32N(enc) => enc.null_count(),
Self::I64U32N(enc) => enc.null_count(),
Self::I64I16N(enc) => enc.null_count(),
Self::I64U16N(enc) => enc.null_count(),
Self::I64I8N(enc) => enc.null_count(),
Self::I64U8N(enc) => enc.null_count(),
Self::U64U64N(enc) => enc.null_count(),
Self::U64U32N(enc) => enc.null_count(),
Self::U64U16N(enc) => enc.null_count(),
Self::U64U8N(enc) => enc.null_count(),
}
}
@ -136,7 +270,16 @@ impl IntegerEncoding {
pub fn has_any_non_null_value(&self) -> bool {
match self {
Self::I64I64N(enc) => enc.has_any_non_null_value(),
Self::I64I32N(enc) => enc.has_any_non_null_value(),
Self::I64U32N(enc) => enc.has_any_non_null_value(),
Self::I64I16N(enc) => enc.has_any_non_null_value(),
Self::I64U16N(enc) => enc.has_any_non_null_value(),
Self::I64I8N(enc) => enc.has_any_non_null_value(),
Self::I64U8N(enc) => enc.has_any_non_null_value(),
Self::U64U64N(enc) => enc.has_any_non_null_value(),
Self::U64U32N(enc) => enc.has_any_non_null_value(),
Self::U64U16N(enc) => enc.has_any_non_null_value(),
Self::U64U8N(enc) => enc.has_any_non_null_value(),
_ => true,
}
}
@ -146,7 +289,16 @@ impl IntegerEncoding {
pub fn has_non_null_value(&self, row_ids: &[u32]) -> bool {
match self {
Self::I64I64N(enc) => enc.has_non_null_value(row_ids),
Self::I64I32N(enc) => enc.has_non_null_value(row_ids),
Self::I64U32N(enc) => enc.has_non_null_value(row_ids),
Self::I64I16N(enc) => enc.has_non_null_value(row_ids),
Self::I64U16N(enc) => enc.has_non_null_value(row_ids),
Self::I64I8N(enc) => enc.has_non_null_value(row_ids),
Self::I64U8N(enc) => enc.has_non_null_value(row_ids),
Self::U64U64N(enc) => enc.has_non_null_value(row_ids),
Self::U64U32N(enc) => enc.has_non_null_value(row_ids),
Self::U64U16N(enc) => enc.has_non_null_value(row_ids),
Self::U64U8N(enc) => enc.has_non_null_value(row_ids),
_ => !row_ids.is_empty(), // all rows will be non-null
}
}
@ -158,25 +310,64 @@ impl IntegerEncoding {
// `c.value` should return as the logical type
// signed 64-bit variants - logical type is i64 for all these
Self::I64I64(c) => Value::Scalar(Scalar::I64(c.value(row_id))),
Self::I64I32(c) => Value::Scalar(Scalar::I64(c.value(row_id))),
Self::I64U32(c) => Value::Scalar(Scalar::I64(c.value(row_id))),
Self::I64I16(c) => Value::Scalar(Scalar::I64(c.value(row_id))),
Self::I64U16(c) => Value::Scalar(Scalar::I64(c.value(row_id))),
Self::I64I8(c) => Value::Scalar(Scalar::I64(c.value(row_id))),
Self::I64U8(c) => Value::Scalar(Scalar::I64(c.value(row_id))),
Self::I64I64(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))),
Self::I64I32(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))),
Self::I64U32(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))),
Self::I64I16(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))),
Self::I64U16(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))),
Self::I64I8(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))),
Self::I64U8(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))),
// unsigned 64-bit variants - logical type is u64 for all these
Self::U64U64(c) => Value::Scalar(Scalar::U64(c.value(row_id))),
Self::U64U32(c) => Value::Scalar(Scalar::U64(c.value(row_id))),
Self::U64U16(c) => Value::Scalar(Scalar::U64(c.value(row_id))),
Self::U64U8(c) => Value::Scalar(Scalar::U64(c.value(row_id))),
Self::U64U64(enc) => Value::Scalar(Scalar::U64(enc.value(row_id))),
Self::U64U32(enc) => Value::Scalar(Scalar::U64(enc.value(row_id))),
Self::U64U16(enc) => Value::Scalar(Scalar::U64(enc.value(row_id))),
Self::U64U8(enc) => Value::Scalar(Scalar::U64(enc.value(row_id))),
Self::I64I64N(c) => match c.value(row_id) {
// signed 64-bit variants
Self::I64I64N(enc) => match enc.value(row_id) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::U64U64N(c) => match c.value(row_id) {
Self::I64I32N(enc) => match enc.value(row_id) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64U32N(enc) => match enc.value(row_id) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64I16N(enc) => match enc.value(row_id) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64U16N(enc) => match enc.value(row_id) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64I8N(enc) => match enc.value(row_id) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64U8N(enc) => match enc.value(row_id) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
// unsigned 64-bit variants
Self::U64U64N(enc) => match enc.value(row_id) {
Some(v) => Value::Scalar(Scalar::U64(v)),
None => Value::Null,
},
Self::U64U32N(enc) => match enc.value(row_id) {
Some(v) => Value::Scalar(Scalar::U64(v)),
None => Value::Null,
},
Self::U64U16N(enc) => match enc.value(row_id) {
Some(v) => Value::Scalar(Scalar::U64(v)),
None => Value::Null,
},
Self::U64U8N(enc) => match enc.value(row_id) {
Some(v) => Value::Scalar(Scalar::U64(v)),
None => Value::Null,
},
@ -190,22 +381,34 @@ impl IntegerEncoding {
pub fn values(&self, row_ids: &[u32]) -> Values<'_> {
match &self {
// signed 64-bit variants - logical type is i64 for all these
Self::I64I64(c) => Values::I64(c.values::<i64>(row_ids, vec![])),
Self::I64I32(c) => Values::I64(c.values::<i64>(row_ids, vec![])),
Self::I64U32(c) => Values::I64(c.values::<i64>(row_ids, vec![])),
Self::I64I16(c) => Values::I64(c.values::<i64>(row_ids, vec![])),
Self::I64U16(c) => Values::I64(c.values::<i64>(row_ids, vec![])),
Self::I64I8(c) => Values::I64(c.values::<i64>(row_ids, vec![])),
Self::I64U8(c) => Values::I64(c.values::<i64>(row_ids, vec![])),
Self::I64I64(enc) => Values::I64(enc.values::<i64>(row_ids, vec![])),
Self::I64I32(enc) => Values::I64(enc.values::<i64>(row_ids, vec![])),
Self::I64U32(enc) => Values::I64(enc.values::<i64>(row_ids, vec![])),
Self::I64I16(enc) => Values::I64(enc.values::<i64>(row_ids, vec![])),
Self::I64U16(enc) => Values::I64(enc.values::<i64>(row_ids, vec![])),
Self::I64I8(enc) => Values::I64(enc.values::<i64>(row_ids, vec![])),
Self::I64U8(enc) => Values::I64(enc.values::<i64>(row_ids, vec![])),
// unsigned 64-bit variants - logical type is u64 for all these
Self::U64U64(c) => Values::U64(c.values::<u64>(row_ids, vec![])),
Self::U64U32(c) => Values::U64(c.values::<u64>(row_ids, vec![])),
Self::U64U16(c) => Values::U64(c.values::<u64>(row_ids, vec![])),
Self::U64U8(c) => Values::U64(c.values::<u64>(row_ids, vec![])),
Self::U64U64(enc) => Values::U64(enc.values::<u64>(row_ids, vec![])),
Self::U64U32(enc) => Values::U64(enc.values::<u64>(row_ids, vec![])),
Self::U64U16(enc) => Values::U64(enc.values::<u64>(row_ids, vec![])),
Self::U64U8(enc) => Values::U64(enc.values::<u64>(row_ids, vec![])),
Self::I64I64N(c) => Values::I64N(c.values(row_ids, vec![])),
Self::U64U64N(c) => Values::U64N(c.values(row_ids, vec![])),
// signed 64-bit nullable variants - logical type is i64 for all these.
Self::I64I64N(enc) => Values::I64N(enc.values(row_ids, vec![])),
Self::I64I32N(enc) => Values::I64N(enc.values(row_ids, vec![])),
Self::I64U32N(enc) => Values::I64N(enc.values(row_ids, vec![])),
Self::I64I16N(enc) => Values::I64N(enc.values(row_ids, vec![])),
Self::I64U16N(enc) => Values::I64N(enc.values(row_ids, vec![])),
Self::I64I8N(enc) => Values::I64N(enc.values(row_ids, vec![])),
Self::I64U8N(enc) => Values::I64N(enc.values(row_ids, vec![])),
// unsigned 64-bit nullable variants - logical type is u64 for all these.
Self::U64U64N(enc) => Values::U64N(enc.values(row_ids, vec![])),
Self::U64U32N(enc) => Values::U64N(enc.values(row_ids, vec![])),
Self::U64U16N(enc) => Values::U64N(enc.values(row_ids, vec![])),
Self::U64U8N(enc) => Values::U64N(enc.values(row_ids, vec![])),
}
}
@ -230,8 +433,20 @@ impl IntegerEncoding {
Self::U64U16(c) => Values::U64(c.all_values::<u64>(vec![])),
Self::U64U8(c) => Values::U64(c.all_values::<u64>(vec![])),
Self::I64I64N(c) => Values::I64N(c.all_values(vec![])),
Self::U64U64N(c) => Values::U64N(c.all_values(vec![])),
// signed 64-bit nullable variants - logical type is i64 for all these.
Self::I64I64N(enc) => Values::I64N(enc.all_values(vec![])),
Self::I64I32N(enc) => Values::I64N(enc.all_values(vec![])),
Self::I64U32N(enc) => Values::I64N(enc.all_values(vec![])),
Self::I64I16N(enc) => Values::I64N(enc.all_values(vec![])),
Self::I64U16N(enc) => Values::I64N(enc.all_values(vec![])),
Self::I64I8N(enc) => Values::I64N(enc.all_values(vec![])),
Self::I64U8N(enc) => Values::I64N(enc.all_values(vec![])),
// unsigned 64-bit nullable variants - logical type is u64 for all these.
Self::U64U64N(enc) => Values::U64N(enc.all_values(vec![])),
Self::U64U32N(enc) => Values::U64N(enc.all_values(vec![])),
Self::U64U16N(enc) => Values::U64N(enc.all_values(vec![])),
Self::U64U8N(enc) => Values::U64N(enc.all_values(vec![])),
}
}
@ -297,8 +512,20 @@ impl IntegerEncoding {
Self::U64U16(c) => c.row_ids_filter(value.as_u16(), op, dst),
Self::U64U8(c) => c.row_ids_filter(value.as_u8(), op, dst),
Self::I64I64N(c) => c.row_ids_filter(value.as_i64(), op, dst),
Self::U64U64N(c) => c.row_ids_filter(value.as_u64(), op, dst),
// signed 64-bit nullable variants - logical type is i64 for all these.
Self::I64I64N(enc) => enc.row_ids_filter(value.as_i64(), op, dst),
Self::I64I32N(enc) => enc.row_ids_filter(value.as_i32(), op, dst),
Self::I64U32N(enc) => enc.row_ids_filter(value.as_u32(), op, dst),
Self::I64I16N(enc) => enc.row_ids_filter(value.as_i16(), op, dst),
Self::I64U16N(enc) => enc.row_ids_filter(value.as_u16(), op, dst),
Self::I64I8N(enc) => enc.row_ids_filter(value.as_i8(), op, dst),
Self::I64U8N(enc) => enc.row_ids_filter(value.as_u8(), op, dst),
// unsigned 64-bit nullable variants - logical type is u64 for all these.
Self::U64U64N(enc) => enc.row_ids_filter(value.as_u64(), op, dst),
Self::U64U32N(enc) => enc.row_ids_filter(value.as_u32(), op, dst),
Self::U64U16N(enc) => enc.row_ids_filter(value.as_u16(), op, dst),
Self::U64U8N(enc) => enc.row_ids_filter(value.as_u8(), op, dst),
}
}
@ -349,8 +576,41 @@ impl IntegerEncoding {
c.row_ids_filter_range((low.1.as_u8(), low.0), (high.1.as_u8(), high.0), dst)
}
Self::I64I64N(_) => todo!(),
Self::U64U64N(_) => todo!(),
Self::I64I64N(enc) => {
enc.row_ids_filter_range((low.1.as_i64(), low.0), (high.1.as_i64(), high.0), dst)
}
Self::I64I32N(enc) => {
enc.row_ids_filter_range((low.1.as_i32(), low.0), (high.1.as_i32(), high.0), dst)
}
Self::I64U32N(enc) => {
enc.row_ids_filter_range((low.1.as_u32(), low.0), (high.1.as_u32(), high.0), dst)
}
Self::I64I16N(enc) => {
enc.row_ids_filter_range((low.1.as_i16(), low.0), (high.1.as_i16(), high.0), dst)
}
Self::I64U16N(enc) => {
enc.row_ids_filter_range((low.1.as_u16(), low.0), (high.1.as_u16(), high.0), dst)
}
Self::I64I8N(enc) => {
enc.row_ids_filter_range((low.1.as_i8(), low.0), (high.1.as_i8(), high.0), dst)
}
Self::I64U8N(enc) => {
enc.row_ids_filter_range((low.1.as_u8(), low.0), (high.1.as_u8(), high.0), dst)
}
// unsigned 64-bit nullable variants - logical type is u64 for all these.
Self::U64U64N(enc) => {
enc.row_ids_filter_range((low.1.as_u64(), low.0), (high.1.as_u64(), high.0), dst)
}
Self::U64U32N(enc) => {
enc.row_ids_filter_range((low.1.as_u32(), low.0), (high.1.as_u32(), high.0), dst)
}
Self::U64U16N(enc) => {
enc.row_ids_filter_range((low.1.as_u16(), low.0), (high.1.as_u16(), high.0), dst)
}
Self::U64U8N(enc) => {
enc.row_ids_filter_range((low.1.as_u8(), low.0), (high.1.as_u8(), high.0), dst)
}
}
}
@ -367,11 +627,49 @@ impl IntegerEncoding {
Self::U64U32(c) => Value::Scalar(Scalar::U64(c.min(row_ids))),
Self::U64U16(c) => Value::Scalar(Scalar::U64(c.min(row_ids))),
Self::U64U8(c) => Value::Scalar(Scalar::U64(c.min(row_ids))),
Self::I64I64N(c) => match c.min(row_ids) {
Self::I64I64N(enc) => match enc.min(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::U64U64N(c) => match c.min(row_ids) {
Self::I64I32N(enc) => match enc.min(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64U32N(enc) => match enc.min(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64I16N(enc) => match enc.min(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64U16N(enc) => match enc.min(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64I8N(enc) => match enc.min(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64U8N(enc) => match enc.min(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::U64U64N(enc) => match enc.min(row_ids) {
Some(v) => Value::Scalar(Scalar::U64(v)),
None => Value::Null,
},
Self::U64U32N(enc) => match enc.min(row_ids) {
Some(v) => Value::Scalar(Scalar::U64(v)),
None => Value::Null,
},
Self::U64U16N(enc) => match enc.min(row_ids) {
Some(v) => Value::Scalar(Scalar::U64(v)),
None => Value::Null,
},
Self::U64U8N(enc) => match enc.min(row_ids) {
Some(v) => Value::Scalar(Scalar::U64(v)),
None => Value::Null,
},
@ -391,11 +689,48 @@ impl IntegerEncoding {
Self::U64U32(c) => Value::Scalar(Scalar::U64(c.max(row_ids))),
Self::U64U16(c) => Value::Scalar(Scalar::U64(c.max(row_ids))),
Self::U64U8(c) => Value::Scalar(Scalar::U64(c.max(row_ids))),
Self::I64I64N(c) => match c.max(row_ids) {
Self::I64I64N(enc) => match enc.max(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::U64U64N(c) => match c.max(row_ids) {
Self::I64I32N(enc) => match enc.max(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64U32N(enc) => match enc.max(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64I16N(enc) => match enc.max(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64U16N(enc) => match enc.max(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64I8N(enc) => match enc.max(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::I64U8N(enc) => match enc.max(row_ids) {
Some(v) => Value::Scalar(Scalar::I64(v)),
None => Value::Null,
},
Self::U64U64N(enc) => match enc.max(row_ids) {
Some(v) => Value::Scalar(Scalar::U64(v)),
None => Value::Null,
},
Self::U64U32N(enc) => match enc.max(row_ids) {
Some(v) => Value::Scalar(Scalar::U64(v)),
None => Value::Null,
},
Self::U64U16N(enc) => match enc.max(row_ids) {
Some(v) => Value::Scalar(Scalar::U64(v)),
None => Value::Null,
},
Self::U64U8N(enc) => match enc.max(row_ids) {
Some(v) => Value::Scalar(Scalar::U64(v)),
None => Value::Null,
},
@ -415,11 +750,48 @@ impl IntegerEncoding {
Self::U64U32(c) => Scalar::U64(c.sum(row_ids)),
Self::U64U16(c) => Scalar::U64(c.sum(row_ids)),
Self::U64U8(c) => Scalar::U64(c.sum(row_ids)),
Self::I64I64N(c) => match c.sum(row_ids) {
Self::I64I64N(enc) => match enc.sum(row_ids) {
Some(v) => Scalar::I64(v),
None => Scalar::Null,
},
Self::U64U64N(c) => match c.sum(row_ids) {
Self::I64I32N(enc) => match enc.sum(row_ids) {
Some(v) => Scalar::I64(v),
None => Scalar::Null,
},
Self::I64U32N(enc) => match enc.sum(row_ids) {
Some(v) => Scalar::I64(v),
None => Scalar::Null,
},
Self::I64I16N(enc) => match enc.sum(row_ids) {
Some(v) => Scalar::I64(v),
None => Scalar::Null,
},
Self::I64U16N(enc) => match enc.sum(row_ids) {
Some(v) => Scalar::I64(v),
None => Scalar::Null,
},
Self::I64I8N(enc) => match enc.sum(row_ids) {
Some(v) => Scalar::I64(v),
None => Scalar::Null,
},
Self::I64U8N(enc) => match enc.sum(row_ids) {
Some(v) => Scalar::I64(v),
None => Scalar::Null,
},
Self::U64U64N(enc) => match enc.sum(row_ids) {
Some(v) => Scalar::U64(v),
None => Scalar::Null,
},
Self::U64U32N(enc) => match enc.sum(row_ids) {
Some(v) => Scalar::U64(v),
None => Scalar::Null,
},
Self::U64U16N(enc) => match enc.sum(row_ids) {
Some(v) => Scalar::U64(v),
None => Scalar::Null,
},
Self::U64U8N(enc) => match enc.sum(row_ids) {
Some(v) => Scalar::U64(v),
None => Scalar::Null,
},
@ -439,8 +811,17 @@ impl IntegerEncoding {
Self::U64U32(c) => c.count(row_ids),
Self::U64U16(c) => c.count(row_ids),
Self::U64U8(c) => c.count(row_ids),
Self::I64I64N(c) => c.count(row_ids),
Self::U64U64N(c) => c.count(row_ids),
Self::I64I64N(enc) => enc.count(row_ids),
Self::I64I32N(enc) => enc.count(row_ids),
Self::I64U32N(enc) => enc.count(row_ids),
Self::I64I16N(enc) => enc.count(row_ids),
Self::I64U16N(enc) => enc.count(row_ids),
Self::I64I8N(enc) => enc.count(row_ids),
Self::I64U8N(enc) => enc.count(row_ids),
Self::U64U64N(enc) => enc.count(row_ids),
Self::U64U32N(enc) => enc.count(row_ids),
Self::U64U16N(enc) => enc.count(row_ids),
Self::U64U8N(enc) => enc.count(row_ids),
}
}
@ -459,7 +840,16 @@ impl IntegerEncoding {
Self::U64U16(_) => "BT_U16",
Self::U64U8(_) => "BT_U8",
Self::I64I64N(_) => "None",
Self::I64I32N(_) => "BT_I32N",
Self::I64U32N(_) => "BT_U32N",
Self::I64I16N(_) => "BT_U16N",
Self::I64U16N(_) => "BT_U16N",
Self::I64I8N(_) => "BT_I8N",
Self::I64U8N(_) => "BT_U8N",
Self::U64U64N(_) => "None",
Self::U64U32N(_) => "BT_U32N",
Self::U64U16N(_) => "BT_U16N",
Self::U64U8N(_) => "BT_U8N",
}
}
@ -478,7 +868,16 @@ impl IntegerEncoding {
Self::U64U16(_) => "u64",
Self::U64U8(_) => "u64",
Self::I64I64N(_) => "i64",
Self::I64I32N(_) => "i64",
Self::I64U32N(_) => "i64",
Self::I64I16N(_) => "i64",
Self::I64U16N(_) => "i64",
Self::I64I8N(_) => "i64",
Self::I64U8N(_) => "i64",
Self::U64U64N(_) => "u64",
Self::U64U32N(_) => "u64",
Self::U64U16N(_) => "u64",
Self::U64U8N(_) => "u64",
}
}
}
@ -499,7 +898,46 @@ impl std::fmt::Display for IntegerEncoding {
Self::U64U16(enc) => write!(f, "[{}]: {}", name, enc),
Self::U64U8(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64I64N(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64I32N(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64U32N(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64I16N(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64U16N(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64I8N(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64U8N(enc) => write!(f, "[{}]: {}", name, enc),
Self::U64U64N(enc) => write!(f, "[{}]: {}", name, enc),
Self::U64U32N(enc) => write!(f, "[{}]: {}", name, enc),
Self::U64U16N(enc) => write!(f, "[{}]: {}", name, enc),
Self::U64U8N(enc) => write!(f, "[{}]: {}", name, enc),
}
}
}
impl std::fmt::Debug for IntegerEncoding {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let name = self.name();
match self {
Self::I64I64(enc) => enc.fmt(f),
Self::I64I32(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::I64U32(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::I64I16(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::I64U16(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::I64I8(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::I64U8(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::U64U64(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::U64U32(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::U64U16(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::U64U8(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::I64I64N(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::I64I32N(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::I64U32N(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::I64I16N(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::I64U16N(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::I64I8N(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::I64U8N(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::U64U64N(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::U64U32N(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::U64U16N(enc) => write!(f, "[{}]: {:?}", name, enc),
Self::U64U8N(enc) => write!(f, "[{}]: {:?}", name, enc),
}
}
}
@ -553,17 +991,48 @@ impl From<&[i64]> for IntegerEncoding {
/// Converts an Arrow array into an IntegerEncoding.
///
/// TODO(edd): convert underlying type of Arrow data to smallest physical
/// representation.
/// The most compact physical Arrow array type is used to store the column
/// within a `FixedNull` encoding.
impl From<arrow::array::Int64Array> for IntegerEncoding {
fn from(arr: arrow::array::Int64Array) -> Self {
if arr.null_count() == 0 {
return Self::from(arr.values());
}
// TODO(edd): currently fixed null only supports 64-bit logical/physical
// types. Need to add support for storing as smaller physical types.
Self::I64I64N(FixedNull::<arrow::datatypes::Int64Type>::from(arr))
// determine min and max values.
let min = arrow::compute::kernels::aggregate::min(&arr);
let max = arrow::compute::kernels::aggregate::max(&arr);
// This match is carefully ordered. It prioritises smaller physical
// datatypes that can safely represent the provided logical data
match (min, max) {
// encode as u8 values
(min, max) if min >= Some(0) && max <= Some(u8::MAX as i64) => {
Self::I64U8N(FixedNull::<ArrowUInt8Type>::from(arr))
}
// encode as i8 values
(min, max) if min >= Some(i8::MIN as i64) && max <= Some(i8::MAX as i64) => {
Self::I64I8N(FixedNull::<ArrowInt8Type>::from(arr))
}
// encode as u16 values
(min, max) if min >= Some(0) && max <= Some(u16::MAX as i64) => {
Self::I64U16N(FixedNull::<ArrowUInt16Type>::from(arr))
}
// encode as i16 values
(min, max) if min >= Some(i16::MIN as i64) && max <= Some(i16::MAX as i64) => {
Self::I64I16N(FixedNull::<ArrowInt16Type>::from(arr))
}
// encode as u32 values
(min, max) if min >= Some(0) && max <= Some(u32::MAX as i64) => {
Self::I64U32N(FixedNull::<ArrowUInt32Type>::from(arr))
}
// encode as i32 values
(min, max) if min >= Some(i32::MIN as i64) && max <= Some(i32::MAX as i64) => {
Self::I64I32N(FixedNull::<ArrowInt32Type>::from(arr))
}
// otherwise, encode with the same physical type (i64)
(_, _) => Self::I64I64N(FixedNull::<ArrowInt64Type>::from(arr)),
}
}
}
@ -600,23 +1069,42 @@ impl From<&[u64]> for IntegerEncoding {
/// Converts an Arrow array into an IntegerEncoding.
///
/// TODO(edd): convert underlying type of Arrow data to smallest physical
/// representation.
/// The most compact physical Arrow array type is used to store the column
/// within a `FixedNull` encoding.
impl From<arrow::array::UInt64Array> for IntegerEncoding {
fn from(arr: arrow::array::UInt64Array) -> Self {
if arr.null_count() == 0 {
return Self::from(arr.values());
}
// TODO(edd): currently fixed null only supports 64-bit logical/physical
// types. Need to add support for storing as smaller physical types.
Self::U64U64N(FixedNull::<arrow::datatypes::UInt64Type>::from(arr))
// determine max value.
let max = arrow::compute::kernels::aggregate::max(&arr);
// This match is carefully ordered. It prioritises smaller physical
// datatypes that can safely represent the provided logical data
match max {
// encode as u8 values
max if max <= Some(u8::MAX as u64) => {
Self::U64U8N(FixedNull::<ArrowUInt8Type>::from(arr))
}
// encode as u16 values
max if max <= Some(u16::MAX as u64) => {
Self::U64U16N(FixedNull::<ArrowUInt16Type>::from(arr))
}
// encode as u32 values
max if max <= Some(u32::MAX as u64) => {
Self::U64U32N(FixedNull::<ArrowUInt32Type>::from(arr))
}
// otherwise, encode with the same physical type (u64)
_ => Self::U64U64N(FixedNull::<ArrowUInt64Type>::from(arr)),
}
}
}
#[cfg(test)]
mod test {
use arrow::datatypes::Int64Type;
use arrow::array::{Int64Array, UInt64Array};
use std::iter;
use super::*;
@ -628,7 +1116,7 @@ mod test {
vec![399_i64, 2, 2452, 3],
vec![-399_i64, 2, 2452, 3],
vec![u32::MAX as i64, 2, 245, 3],
vec![i32::MAX as i64, 2, 245, 3],
vec![i32::MIN as i64, 2, 245, 3],
vec![0_i64, 2, 245, u32::MAX as i64 + 1],
];
@ -642,9 +1130,156 @@ mod test {
IntegerEncoding::I64I64(Fixed::<i64>::from(cases[6].as_slice())),
];
for (_case, _exp) in cases.iter().zip(exp.iter()) {
// TODO - add debug
//assert_eq!(IntegerEncoding::from(&case), exp);
for (case, exp) in cases.into_iter().zip(exp.into_iter()) {
assert_eq!(IntegerEncoding::from(case.as_slice()), exp);
}
}
#[test]
fn from_slice_u64() {
let cases = vec![
vec![0_u64, 2, 245, 3],
vec![399_u64, 2, 2452, 3],
vec![u32::MAX as u64, 2, 245, 3],
vec![0_u64, 2, 245, u32::MAX as u64 + 1],
];
let exp = vec![
IntegerEncoding::U64U8(Fixed::<u8>::from(cases[0].as_slice())),
IntegerEncoding::U64U16(Fixed::<u16>::from(cases[1].as_slice())),
IntegerEncoding::U64U32(Fixed::<u32>::from(cases[2].as_slice())),
IntegerEncoding::U64U64(Fixed::<u64>::from(cases[3].as_slice())),
];
for (case, exp) in cases.into_iter().zip(exp.into_iter()) {
assert_eq!(IntegerEncoding::from(case.as_slice()), exp);
}
}
#[test]
fn from_arrow_i64_array() {
let cases = vec![
vec![0_i64, 2, 245, 3],
vec![0_i64, -120, 127, 3],
vec![399_i64, 2, 2452, 3],
vec![-399_i64, 2, 2452, 3],
vec![u32::MAX as i64, 2, 245, 3],
vec![i32::MIN as i64, 2, 245, 3],
vec![0_i64, 2, 245, u32::MAX as i64 + 1],
];
let exp = vec![
IntegerEncoding::I64U8(Fixed::<u8>::from(cases[0].as_slice())),
IntegerEncoding::I64I8(Fixed::<i8>::from(cases[1].as_slice())),
IntegerEncoding::I64U16(Fixed::<u16>::from(cases[2].as_slice())),
IntegerEncoding::I64I16(Fixed::<i16>::from(cases[3].as_slice())),
IntegerEncoding::I64U32(Fixed::<u32>::from(cases[4].as_slice())),
IntegerEncoding::I64I32(Fixed::<i32>::from(cases[5].as_slice())),
IntegerEncoding::I64I64(Fixed::<i64>::from(cases[6].as_slice())),
];
// for Arrow arrays with no nulls we can store the column using a
// non-nullable fixed encoding
for (case, exp) in cases.iter().cloned().zip(exp.into_iter()) {
let arr = Int64Array::from(case);
assert_eq!(IntegerEncoding::from(arr), exp);
}
// Tack a NULL onto each of the input cases.
let cases = cases
.iter()
.map(|case| {
case.iter()
.map(|x| Some(*x))
.chain(iter::repeat(None).take(1))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
// when a NULL value is present then we need to use a nullable encoding.
let exp = vec![
IntegerEncoding::I64U8N(FixedNull::<ArrowUInt8Type>::from(Int64Array::from(
cases[0].clone(),
))),
IntegerEncoding::I64I8N(FixedNull::<ArrowInt8Type>::from(Int64Array::from(
cases[1].clone(),
))),
IntegerEncoding::I64U16N(FixedNull::<ArrowUInt16Type>::from(Int64Array::from(
cases[2].clone(),
))),
IntegerEncoding::I64I16N(FixedNull::<ArrowInt16Type>::from(Int64Array::from(
cases[3].clone(),
))),
IntegerEncoding::I64U32N(FixedNull::<ArrowUInt32Type>::from(Int64Array::from(
cases[4].clone(),
))),
IntegerEncoding::I64I32N(FixedNull::<ArrowInt32Type>::from(Int64Array::from(
cases[5].clone(),
))),
IntegerEncoding::I64I64N(FixedNull::<ArrowInt64Type>::from(Int64Array::from(
cases[6].clone(),
))),
];
for (case, exp) in cases.into_iter().zip(exp.into_iter()) {
let arr = Int64Array::from(case.clone());
assert_eq!(IntegerEncoding::from(arr), exp);
}
}
#[test]
fn from_arrow_u64_array() {
let cases = vec![
vec![0_u64, 2, 245, 3],
vec![399_u64, 2, 2452, 3],
vec![u32::MAX as u64, 2, 245, 3],
vec![0_u64, 2, 245, u32::MAX as u64 + 1],
];
let exp = vec![
IntegerEncoding::U64U8(Fixed::<u8>::from(cases[0].as_slice())),
IntegerEncoding::U64U16(Fixed::<u16>::from(cases[1].as_slice())),
IntegerEncoding::U64U32(Fixed::<u32>::from(cases[2].as_slice())),
IntegerEncoding::U64U64(Fixed::<u64>::from(cases[3].as_slice())),
];
// for Arrow arrays with no nulls we can store the column using a
// non-nullable fixed encoding
for (case, exp) in cases.iter().cloned().zip(exp.into_iter()) {
let arr = UInt64Array::from(case);
assert_eq!(IntegerEncoding::from(arr), exp);
}
// Tack a NULL onto each of the input cases.
let cases = cases
.iter()
.map(|case| {
case.iter()
.map(|x| Some(*x))
.chain(iter::repeat(None).take(1))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
// when a NULL value is present then we need to use a nullable encoding.
let exp = vec![
IntegerEncoding::U64U8N(FixedNull::<ArrowUInt8Type>::from(UInt64Array::from(
cases[0].clone(),
))),
IntegerEncoding::U64U16N(FixedNull::<ArrowUInt16Type>::from(UInt64Array::from(
cases[1].clone(),
))),
IntegerEncoding::U64U32N(FixedNull::<ArrowUInt32Type>::from(UInt64Array::from(
cases[2].clone(),
))),
IntegerEncoding::U64U64N(FixedNull::<ArrowUInt64Type>::from(UInt64Array::from(
cases[3].clone(),
))),
];
for (case, exp) in cases.into_iter().zip(exp.into_iter()) {
let arr = UInt64Array::from(case.clone());
assert_eq!(IntegerEncoding::from(arr), exp);
}
}
@ -660,12 +1295,7 @@ mod test {
assert_eq!(enc.size_raw(true), 56);
assert_eq!(enc.size_raw(false), 56);
let enc = IntegerEncoding::I64I64N(FixedNull::<Int64Type>::from(&[2, 22, 12, 31][..]));
// (4 * 8) + 24
assert_eq!(enc.size_raw(true), 56);
assert_eq!(enc.size_raw(false), 56);
let enc = IntegerEncoding::I64I64N(FixedNull::<Int64Type>::from(
let enc = IntegerEncoding::I64I64N(FixedNull::<ArrowInt64Type>::from(
&[Some(2), Some(22), Some(12), None, None, Some(31)][..],
));
// (6 * 8) + 24

View File

@ -11,6 +11,7 @@ arrow_flight = { path = "../arrow_flight" }
async-trait = "0.1"
bytes = { version = "1.0" }
chrono = "0.4"
cached = "0.23.0"
crc32fast = "1.2.0"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }

View File

@ -6,10 +6,14 @@ use std::{
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
use metrics::MetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore};
use parquet_file::catalog::PreservedCatalog;
use query::exec::Executor;
/// This module contains code for managing the configuration of the server.
use crate::{db::Db, Error, JobRegistry, Result};
use crate::{
db::{catalog::Catalog, Db},
Error, JobRegistry, Result,
};
use observability_deps::tracing::{self, error, info, warn, Instrument};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
@ -125,6 +129,7 @@ impl Config {
server_id: ServerId,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
preserved_catalog: PreservedCatalog<Catalog>,
) {
let mut state = self.state.write().expect("mutex poisoned");
let name = state
@ -148,7 +153,7 @@ impl Config {
exec,
write_buffer,
Arc::clone(&self.jobs),
Arc::clone(&self.metric_registry),
preserved_catalog,
));
let shutdown = self.shutdown.child_token();
@ -203,6 +208,10 @@ impl Config {
info!("database background workers shutdown");
}
pub fn metrics_registry(&self) -> Arc<MetricRegistry> {
Arc::clone(&self.metric_registry)
}
}
pub fn object_store_path_for_database_config<P: ObjectStorePath>(
@ -274,9 +283,15 @@ impl<'a> CreateDatabaseHandle<'a> {
server_id: ServerId,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
preserved_catalog: PreservedCatalog<Catalog>,
) {
self.config
.commit(self.rules.take().unwrap(), server_id, object_store, exec)
self.config.commit(
self.rules.take().unwrap(),
server_id,
object_store,
exec,
preserved_catalog,
)
}
pub(crate) fn rules(&self) -> &DatabaseRules {
@ -298,6 +313,8 @@ mod test {
use object_store::{memory::InMemory, ObjectStore, ObjectStoreApi};
use crate::db::load_preserved_catalog;
use super::*;
#[tokio::test]
@ -317,7 +334,15 @@ mod test {
let server_id = ServerId::try_from(1).unwrap();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let exec = Arc::new(Executor::new(1));
db_reservation.commit(server_id, store, exec);
let preserved_catalog = load_preserved_catalog(
&name,
Arc::clone(&store),
server_id,
config.metrics_registry(),
)
.await
.unwrap();
db_reservation.commit(server_id, store, exec, preserved_catalog);
assert!(config.db(&name).is_some());
assert_eq!(config.db_names_sorted(), vec![name.clone()]);
@ -345,7 +370,15 @@ mod test {
let server_id = ServerId::try_from(1).unwrap();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let exec = Arc::new(Executor::new(1));
db_reservation.commit(server_id, store, exec);
let preserved_catalog = load_preserved_catalog(
&name,
Arc::clone(&store),
server_id,
config.metrics_registry(),
)
.await
.unwrap();
db_reservation.commit(server_id, store, exec, preserved_catalog);
let token = config
.state

View File

@ -310,6 +310,33 @@ pub struct Db {
metric_labels: Vec<KeyValue>,
}
/// Load preserved catalog state from store.
pub async fn load_preserved_catalog(
db_name: &str,
object_store: Arc<ObjectStore>,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
) -> std::result::Result<PreservedCatalog<Catalog>, parquet_file::catalog::Error> {
let metric_labels = vec![
KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("svr_id", format!("{}", server_id)),
];
let metrics_domain =
metrics_registry.register_domain_with_labels("catalog", metric_labels.clone());
PreservedCatalog::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
CatalogEmptyInput {
domain: metrics_domain,
metrics_registry: Arc::clone(&metrics_registry),
metric_labels: metric_labels.clone(),
},
)
.await
}
impl Db {
pub fn new(
rules: DatabaseRules,
@ -318,33 +345,19 @@ impl Db {
exec: Arc<Executor>,
write_buffer: Option<Buffer>,
jobs: Arc<JobRegistry>,
metrics_registry: Arc<MetricRegistry>,
preserved_catalog: PreservedCatalog<Catalog>,
) -> Self {
let db_name = rules.name.clone();
let metric_labels = vec![
KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("svr_id", format!("{}", server_id)),
];
let metrics_domain =
metrics_registry.register_domain_with_labels("catalog", metric_labels.clone());
let rules = RwLock::new(rules);
let server_id = server_id;
let store = Arc::clone(&object_store);
let write_buffer = write_buffer.map(Mutex::new);
let catalog = PreservedCatalog::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
CatalogEmptyInput {
domain: metrics_domain,
metrics_registry: Arc::clone(&metrics_registry),
metric_labels: metric_labels.clone(),
},
);
let system_tables = SystemSchemaProvider::new(&db_name, catalog.state(), Arc::clone(&jobs));
let system_tables =
SystemSchemaProvider::new(&db_name, preserved_catalog.state(), Arc::clone(&jobs));
let system_tables = Arc::new(system_tables);
let metrics_registry = Arc::clone(&preserved_catalog.state().metrics_registry);
let metric_labels = preserved_catalog.state().metric_labels.clone();
let process_clock = process_clock::ProcessClock::new();
@ -353,7 +366,7 @@ impl Db {
server_id,
store,
exec,
catalog,
catalog: preserved_catalog,
write_buffer,
jobs,
metrics_registry,
@ -1205,10 +1218,10 @@ mod tests {
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
#[test]
fn write_no_mutable_buffer() {
#[tokio::test]
async fn write_no_mutable_buffer() {
// Validate that writes are rejected if there is no mutable buffer
let db = make_db().db;
let db = make_db().await.db;
db.rules.write().lifecycle_rules.immutable = true;
let entry = lp_to_entry("cpu bar=1 10");
let res = db.store_entry(entry);
@ -1220,7 +1233,7 @@ mod tests {
#[tokio::test]
async fn read_write() {
let db = Arc::new(make_db().db);
let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu bar=1 10");
let batches = run_query(db, "select * from cpu").await;
@ -1252,7 +1265,7 @@ mod tests {
#[tokio::test]
async fn metrics_during_rollover() {
let test_db = make_db();
let test_db = make_db().await;
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu bar=1 10");
@ -1372,7 +1385,7 @@ mod tests {
#[tokio::test]
async fn write_with_rollover() {
let db = Arc::new(make_db().db);
let db = Arc::new(make_db().await.db);
write_lp(db.as_ref(), "cpu bar=1 10");
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
@ -1420,7 +1433,7 @@ mod tests {
#[tokio::test]
async fn write_with_missing_tags_are_null() {
let db = Arc::new(make_db().db);
let db = Arc::new(make_db().await.db);
// Note the `region` tag is introduced in the second line, so
// the values in prior rows for the region column are
// null. Likewise the `core` tag is introduced in the third
@ -1457,7 +1470,7 @@ mod tests {
#[tokio::test]
async fn read_from_read_buffer() {
// Test that data can be loaded into the ReadBuffer
let test_db = make_db();
let test_db = make_db().await;
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu bar=1 10");
@ -1543,7 +1556,7 @@ mod tests {
#[tokio::test]
async fn load_to_read_buffer_sorted() {
let test_db = make_db();
let test_db = make_db().await;
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10");
@ -1655,7 +1668,8 @@ mod tests {
.server_id(server_id)
.object_store(Arc::clone(&object_store))
.db_name(db_name)
.build();
.build()
.await;
let db = Arc::new(test_db.db);
@ -1754,7 +1768,8 @@ mod tests {
.server_id(server_id)
.object_store(Arc::clone(&object_store))
.db_name(db_name)
.build();
.build()
.await;
let db = Arc::new(test_db.db);
@ -1866,9 +1881,9 @@ mod tests {
assert_batches_eq!(expected, &record_batches);
}
#[test]
fn write_updates_last_write_at() {
let db = Arc::new(make_db().db);
#[tokio::test]
async fn write_updates_last_write_at() {
let db = Arc::new(make_db().await.db);
let before_create = Utc::now();
let partition_key = "1970-01-01T00";
@ -1896,7 +1911,7 @@ mod tests {
#[tokio::test]
async fn test_chunk_timestamps() {
let start = Utc::now();
let db = Arc::new(make_db().db);
let db = Arc::new(make_db().await.db);
// Given data loaded into two chunks
write_lp(&db, "cpu bar=1 10");
@ -1931,9 +1946,9 @@ mod tests {
assert!(chunk.time_closed().unwrap() < after_rollover);
}
#[test]
fn test_chunk_closing() {
let db = Arc::new(make_db().db);
#[tokio::test]
async fn test_chunk_closing() {
let db = Arc::new(make_db().await.db);
db.rules.write().lifecycle_rules.mutable_size_threshold =
Some(NonZeroUsize::new(2).unwrap());
@ -1952,9 +1967,9 @@ mod tests {
assert!(matches!(chunks[1].read().state(), ChunkState::Closed(_)));
}
#[test]
fn chunks_sorted_by_times() {
let db = Arc::new(make_db().db);
#[tokio::test]
async fn chunks_sorted_by_times() {
let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu val=1 1");
write_lp(&db, "mem val=2 400000000000001");
write_lp(&db, "cpu val=1 2");
@ -1987,7 +2002,7 @@ mod tests {
#[tokio::test]
async fn chunk_id_listing() {
// Test that chunk id listing is hooked up
let db = Arc::new(make_db().db);
let db = Arc::new(make_db().await.db);
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu bar=1 10");
@ -2056,7 +2071,7 @@ mod tests {
#[tokio::test]
async fn partition_chunk_summaries() {
// Test that chunk id listing is hooked up
let db = Arc::new(make_db().db);
let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu bar=1 1");
db.rollover_partition("1970-01-01T00", "cpu").await.unwrap();
@ -2104,7 +2119,7 @@ mod tests {
#[tokio::test]
async fn partition_chunk_summaries_timestamp() {
let db = Arc::new(make_db().db);
let db = Arc::new(make_db().await.db);
let start = Utc::now();
write_lp(&db, "cpu bar=1 1");
let after_first_write = Utc::now();
@ -2155,7 +2170,7 @@ mod tests {
#[tokio::test]
async fn chunk_summaries() {
// Test that chunk id listing is hooked up
let db = Arc::new(make_db().db);
let db = Arc::new(make_db().await.db);
// get three chunks: one open, one closed in mb and one close in rb
write_lp(&db, "cpu bar=1 1");
@ -2250,7 +2265,7 @@ mod tests {
#[tokio::test]
async fn partition_summaries() {
// Test that chunk id listing is hooked up
let db = Arc::new(make_db().db);
let db = Arc::new(make_db().await.db);
write_lp(&db, "cpu bar=1 1");
let chunk_id = db
@ -2460,7 +2475,7 @@ mod tests {
#[tokio::test]
async fn write_chunk_to_object_store_in_background() {
// Test that data can be written to object store using a background task
let db = Arc::new(make_db().db);
let db = Arc::new(make_db().await.db);
// create MB partition
write_lp(db.as_ref(), "cpu bar=1 10");
@ -2501,9 +2516,9 @@ mod tests {
assert_eq!(read_parquet_file_chunk_ids(&db, partition_key), vec![0]);
}
#[test]
fn write_hard_limit() {
let db = Arc::new(make_db().db);
#[tokio::test]
async fn write_hard_limit() {
let db = Arc::new(make_db().await.db);
db.rules.write().lifecycle_rules.buffer_size_hard = Some(NonZeroUsize::new(10).unwrap());
// inserting first line does not trigger hard buffer limit
@ -2516,9 +2531,9 @@ mod tests {
));
}
#[test]
fn write_goes_to_write_buffer_if_configured() {
let db = Arc::new(TestDb::builder().write_buffer(true).build().db);
#[tokio::test]
async fn write_goes_to_write_buffer_if_configured() {
let db = Arc::new(TestDb::builder().write_buffer(true).build().await.db);
assert_eq!(db.write_buffer.as_ref().unwrap().lock().size(), 0);
write_lp(db.as_ref(), "cpu bar=1 10");
@ -2536,7 +2551,8 @@ mod tests {
.server_id(server_id)
.object_store(Arc::clone(&object_store))
.db_name(db_name)
.build();
.build()
.await;
let db = Arc::new(test_db.db);
@ -2704,9 +2720,22 @@ mod tests {
.object_store(Arc::clone(&object_store))
.server_id(server_id)
.db_name(db_name)
.build();
.build()
.await;
let db = Arc::new(test_db.db);
// at this point, an empty preserved catalog exists
let maybe_preserved_catalog =
PreservedCatalog::<parquet_file::catalog::test_helpers::TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
assert!(maybe_preserved_catalog.is_some());
// Write some line protocols in Mutable buffer of the DB
write_lp(db.as_ref(), "cpu bar=1 10");
@ -2722,18 +2751,6 @@ mod tests {
.await
.unwrap();
// at this point, no preserved catalog exists
let maybe_preserved_catalog =
PreservedCatalog::<parquet_file::catalog::test_helpers::TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
assert!(maybe_preserved_catalog.is_none());
// Write the RB chunk to Object Store but keep it in RB
db.write_chunk_to_object_store(partition_key, "cpu", mb_chunk.id())
.await

View File

@ -75,11 +75,11 @@ mod tests {
use entry::test_helpers::lp_to_entry;
use std::{sync::Arc, thread, time::Duration};
#[test]
fn process_clock_defaults_to_current_time_in_ns() {
#[tokio::test]
async fn process_clock_defaults_to_current_time_in_ns() {
let before = system_clock_now();
let db = Arc::new(TestDb::builder().build().db);
let db = Arc::new(TestDb::builder().build().await.db);
let db_process_clock = db.process_clock.inner.load(Ordering::SeqCst);
let after = system_clock_now();
@ -98,12 +98,12 @@ mod tests {
);
}
#[test]
fn process_clock_incremented_and_set_on_sequenced_entry() {
#[tokio::test]
async fn process_clock_incremented_and_set_on_sequenced_entry() {
let before = system_clock_now();
let before = ClockValue::try_from(before).unwrap();
let db = Arc::new(TestDb::builder().write_buffer(true).build().db);
let db = Arc::new(TestDb::builder().write_buffer(true).build().await.db);
let entry = lp_to_entry("cpu bar=1 10");
db.store_entry(entry).unwrap();
@ -147,10 +147,10 @@ mod tests {
);
}
#[test]
fn next_process_clock_always_increments() {
#[tokio::test]
async fn next_process_clock_always_increments() {
// Process clock defaults to the current time
let db = Arc::new(TestDb::builder().write_buffer(true).build().db);
let db = Arc::new(TestDb::builder().write_buffer(true).build().await.db);
// Set the process clock value to a time in the future, so that when compared to the
// current time, the process clock value will be greater

View File

@ -72,6 +72,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use cached::proc_macro::cached;
use db::load_preserved_catalog;
use futures::stream::TryStreamExt;
use observability_deps::tracing::{error, info, warn};
use parking_lot::Mutex;
@ -175,6 +177,8 @@ pub enum Error {
},
#[snafu(display("remote error: {}", source))]
RemoteError { source: ConnectionManagerError },
#[snafu(display("cannot load catalog: {}", source))]
CatalogLoadError { source: DatabaseError },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -418,12 +422,26 @@ impl<M: ConnectionManager> Server<M> {
pub async fn create_database(&self, rules: DatabaseRules, server_id: ServerId) -> Result<()> {
// Return an error if this server hasn't yet been setup with an id
self.require_id()?;
let db_reservation = self.config.create_db(rules)?;
let preserved_catalog = load_preserved_catalog(
rules.db_name(),
Arc::clone(&self.store),
server_id,
self.config.metrics_registry(),
)
.await
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)?;
let db_reservation = self.config.create_db(rules)?;
self.persist_database_rules(db_reservation.rules().clone())
.await?;
db_reservation.commit(server_id, Arc::clone(&self.store), Arc::clone(&self.exec));
db_reservation.commit(
server_id,
Arc::clone(&self.store),
Arc::clone(&self.exec),
preserved_catalog,
);
Ok(())
}
@ -507,10 +525,24 @@ impl<M: ConnectionManager> Server<M> {
Err(e) => {
error!("error parsing database config {:?} from store: {}", path, e)
}
Ok(rules) => match config.create_db(rules) {
Err(e) => error!("error adding database to config: {}", e),
Ok(handle) => handle.commit(server_id, store, exec),
},
Ok(rules) => {
match load_preserved_catalog(
rules.db_name(),
Arc::clone(&store),
server_id,
config.metrics_registry(),
)
.await
{
Err(e) => error!("cannot load database: {}", e),
Ok(preserved_catalog) => match config.create_db(rules) {
Err(e) => error!("error adding database to config: {}", e),
Ok(handle) => {
handle.commit(server_id, store, exec, preserved_catalog)
}
},
}
}
};
})
})
@ -905,17 +937,25 @@ impl ConnectionManager for ConnectionManagerImpl {
&self,
connect: &str,
) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError> {
// TODO(mkm): cache the connections
let connection = Builder::default()
.build(connect)
.await
.map_err(|e| Box::new(e) as _)
.context(RemoteServerConnectError)?;
let client = write::Client::new(connection);
Ok(Arc::new(RemoteServerImpl { client }))
cached_remote_server(connect.to_string()).await
}
}
// cannot be an associated function
// argument need to have static lifetime because they become caching keys
#[cached(result = true)]
async fn cached_remote_server(
connect: String,
) -> Result<Arc<RemoteServerImpl>, ConnectionManagerError> {
let connection = Builder::default()
.build(&connect)
.await
.map_err(|e| Box::new(e) as _)
.context(RemoteServerConnectError)?;
let client = write::Client::new(connection);
Ok(Arc::new(RemoteServerImpl { client }))
}
/// An implementation for communicating with other IOx servers. This should
/// be moved into and implemented in an influxdb_iox_client create at a later
/// date.

View File

@ -162,7 +162,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
// partition keys are: ["2020-03-02T00", "2020-03-01T00", "2020-04-01T00",
// "2020-04-02T00"]
let db = make_db().db;
let db = make_db().await.db;
let data = lp_lines.join("\n");
write_lp(&db, &data);
let scenario1 = DbScenario {
@ -170,7 +170,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
db,
};
let db = make_db().db;
let db = make_db().await.db;
let data = lp_lines.join("\n");
write_lp(&db, &data);
db.rollover_partition("2020-03-01T00", "h2o").await.unwrap();
@ -182,7 +182,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
db,
};
let db = make_db().db;
let db = make_db().await.db;
let data = lp_lines.join("\n");
write_lp(&db, &data);
// roll over and load chunks into both RUB and OS

View File

@ -36,7 +36,7 @@ impl DbSetup for NoData {
// Scenario 1: No data in the DB yet
//
let db = make_db().db;
let db = make_db().await.db;
let scenario1 = DbScenario {
scenario_name: "New, Empty Database".into(),
db,
@ -45,7 +45,7 @@ impl DbSetup for NoData {
// Scenario 2: listing partitions (which may create an entry in a map)
// in an empty database
//
let db = make_db().db;
let db = make_db().await.db;
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 0);
@ -56,7 +56,7 @@ impl DbSetup for NoData {
// Scenario 3: the database has had data loaded into RB and then deleted
//
let db = make_db().db;
let db = make_db().await.db;
let data = "cpu,region=west user=23.2 100";
write_lp(&db, data);
// move data out of open chunk
@ -94,7 +94,7 @@ impl DbSetup for NoData {
// Scenario 4: the database has had data loaded into RB & Object Store and then deleted
//
let db = make_db().db;
let db = make_db().await.db;
let data = "cpu,region=west user=23.2 100";
write_lp(&db, data);
// move data out of open chunk
@ -253,7 +253,7 @@ pub struct TwoMeasurementsManyFieldsOneChunk {}
#[async_trait]
impl DbSetup for TwoMeasurementsManyFieldsOneChunk {
async fn make(&self) -> Vec<DbScenario> {
let db = make_db().db;
let db = make_db().await.db;
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 50",
@ -277,7 +277,7 @@ pub struct TwoMeasurementsManyFieldsTwoChunks {}
#[async_trait]
impl DbSetup for TwoMeasurementsManyFieldsTwoChunks {
async fn make(&self) -> Vec<DbScenario> {
let db = make_db().db;
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
@ -314,7 +314,7 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let db = std::sync::Arc::new(make_db().db);
let db = std::sync::Arc::new(make_db().await.db);
write_lp(
&db,
@ -397,7 +397,7 @@ impl DbSetup for EndToEndTest {
let lp_data = lp_lines.join("\n");
let db = make_db().db;
let db = make_db().await.db;
write_lp(&db, &lp_data);
let scenario1 = DbScenario {
@ -413,7 +413,7 @@ impl DbSetup for EndToEndTest {
///
pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> Vec<DbScenario> {
// Scenario 1: One open chunk in MUB
let db = make_db().db;
let db = make_db().await.db;
write_lp(&db, data);
let scenario1 = DbScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
@ -421,7 +421,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
};
// Scenario 2: One closed chunk in MUB
let db = make_db().db;
let db = make_db().await.db;
let table_names = write_lp(&db, data);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
@ -434,7 +434,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
};
// Scenario 3: One closed chunk in RUB
let db = make_db().db;
let db = make_db().await.db;
let table_names = write_lp(&db, data);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
@ -450,7 +450,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
};
// Scenario 4: One closed chunk in both RUb and OS
let db = make_db().db;
let db = make_db().await.db;
let table_names = write_lp(&db, data);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
@ -469,7 +469,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
};
// Scenario 5: One closed chunk in OS only
let db = make_db().db;
let db = make_db().await.db;
let table_names = write_lp(&db, data);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
@ -504,7 +504,7 @@ pub async fn make_two_chunk_scenarios(
data1: &str,
data2: &str,
) -> Vec<DbScenario> {
let db = make_db().db;
let db = make_db().await.db;
write_lp(&db, data1);
write_lp(&db, data2);
let scenario1 = DbScenario {
@ -513,7 +513,7 @@ pub async fn make_two_chunk_scenarios(
};
// spread across 2 mutable buffer chunks
let db = make_db().db;
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
@ -527,7 +527,7 @@ pub async fn make_two_chunk_scenarios(
};
// spread across 1 mutable buffer, 1 read buffer chunks
let db = make_db().db;
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
@ -544,7 +544,7 @@ pub async fn make_two_chunk_scenarios(
};
// in 2 read buffer chunks
let db = make_db().db;
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
@ -571,7 +571,7 @@ pub async fn make_two_chunk_scenarios(
};
// in 2 read buffer chunks that also loaded into object store
let db = make_db().db;
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)
@ -606,7 +606,7 @@ pub async fn make_two_chunk_scenarios(
};
// Scenario 6: Two closed chunk in OS only
let db = make_db().db;
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
for table_name in &table_names {
db.rollover_partition(partition_key, &table_name)

View File

@ -7,7 +7,11 @@ use data_types::{
use object_store::{memory::InMemory, ObjectStore};
use query::{exec::Executor, Database};
use crate::{buffer::Buffer, db::Db, JobRegistry};
use crate::{
buffer::Buffer,
db::{load_preserved_catalog, Db},
JobRegistry,
};
use std::{borrow::Cow, convert::TryFrom, sync::Arc};
// A wrapper around a Db and a metrics registry allowing for isolated testing
@ -37,7 +41,7 @@ impl TestDbBuilder {
Self::default()
}
pub fn build(self) -> TestDb {
pub async fn build(self) -> TestDb {
let server_id = self
.server_id
.unwrap_or_else(|| ServerId::try_from(1).unwrap());
@ -64,9 +68,17 @@ impl TestDbBuilder {
} else {
None
};
let preserved_catalog = load_preserved_catalog(
db_name.as_str(),
Arc::clone(&object_store),
server_id,
Arc::clone(&metrics_registry),
)
.await
.unwrap();
TestDb {
metric_registry: metrics::TestMetricRegistry::new(Arc::clone(&metrics_registry)),
metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
db: Db::new(
DatabaseRules::new(db_name),
server_id,
@ -74,7 +86,7 @@ impl TestDbBuilder {
exec,
write_buffer,
Arc::new(JobRegistry::new()),
metrics_registry,
preserved_catalog,
),
}
}
@ -101,8 +113,8 @@ impl TestDbBuilder {
}
/// Used for testing: create a Database with a local store
pub fn make_db() -> TestDb {
TestDb::builder().build()
pub async fn make_db() -> TestDb {
TestDb::builder().build().await
}
fn chunk_summary_iter(db: &Db) -> impl Iterator<Item = ChunkSummary> + '_ {

View File

@ -279,6 +279,7 @@ cpu,host=B,region=east user=10.0,system=74.1 1
let db = TestDb::builder()
.object_store(Arc::new(ObjectStore::new_in_memory(InMemory::new())))
.build()
.await
.db;
write_lp(&db, &lp);

View File

@ -26,21 +26,24 @@ use server::{ConnectionManager, Server as AppServer};
use bytes::{Bytes, BytesMut};
use futures::{self, StreamExt};
use http::header::{CONTENT_ENCODING, CONTENT_TYPE};
use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::{http::HeaderValue, Body, Method, Request, Response, StatusCode};
use observability_deps::{
opentelemetry::KeyValue,
tracing::{self, debug, error},
tracing::{self, debug, error, info},
};
use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError, RouterService};
use serde::Deserialize;
use snafu::{OptionExt, ResultExt, Snafu};
use hyper::server::conn::AddrIncoming;
use pprof::protos::Message;
use std::num::NonZeroI32;
use std::{
fmt::Debug,
str::{self, FromStr},
sync::Arc,
};
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
/// Constants used in API error codes.
@ -217,6 +220,15 @@ pub enum ApplicationError {
partition: String,
table_name: String,
},
#[snafu(display("PProf error: {}", source))]
PProf { source: pprof::Error },
#[snafu(display("Protobuf error: {}", source))]
Prost { source: prost::EncodeError },
#[snafu(display("Empty flamegraph"))]
EmptyFlamegraph,
}
impl ApplicationError {
@ -251,6 +263,9 @@ impl ApplicationError {
Self::ParsingFormat { .. } => self.bad_request(),
Self::Planning { .. } => self.bad_request(),
Self::NoSnapshot { .. } => self.not_modified(),
Self::PProf { .. } => self.internal_error(),
Self::Prost { .. } => self.internal_error(),
Self::EmptyFlamegraph => self.no_content(),
}
}
@ -282,6 +297,13 @@ impl ApplicationError {
.unwrap()
}
fn no_content(&self) -> Response<Body> {
Response::builder()
.status(StatusCode::NO_CONTENT)
.body(self.body())
.unwrap()
}
fn body(&self) -> Body {
let json =
serde_json::json!({"error": self.to_string(), "error_code": self.api_error_code()})
@ -340,6 +362,8 @@ where
.get("/iox/api/v1/databases/:name/query", query::<M>)
.get("/api/v1/partitions", list_partitions::<M>)
.post("/api/v1/snapshot", snapshot_partition::<M>)
.get("/debug/pprof", pprof_home::<M>)
.get("/debug/pprof/profile", pprof_profile::<M>)
// Specify the error handler to handle any errors caused by
// a route or any middleware.
.err_handler_with_info(error_handler)
@ -785,6 +809,97 @@ async fn snapshot_partition<M: ConnectionManager + Send + Sync + Debug + 'static
}
}
#[tracing::instrument(level = "debug")]
async fn pprof_home<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
let default_host = HeaderValue::from_static("localhost");
let host = req
.headers()
.get("host")
.unwrap_or(&default_host)
.to_str()
.unwrap_or_default();
let cmd = format!(
"/debug/pprof/profile?seconds={}",
PProfArgs::default_seconds()
);
Ok(Response::new(Body::from(format!(
r#"<a href="{}">http://{}{}</a>"#,
cmd, host, cmd
))))
}
async fn dump_rsprof(seconds: u64, frequency: i32) -> pprof::Result<pprof::Report> {
let guard = pprof::ProfilerGuard::new(frequency)?;
info!(
"start profiling {} seconds with frequency {} /s",
seconds, frequency
);
tokio::time::sleep(Duration::from_secs(seconds)).await;
info!(
"done profiling {} seconds with frequency {} /s",
seconds, frequency
);
guard.report().build()
}
#[derive(Debug, Deserialize)]
struct PProfArgs {
#[serde(default = "PProfArgs::default_seconds")]
seconds: u64,
#[serde(default = "PProfArgs::default_frequency")]
frequency: NonZeroI32,
}
impl PProfArgs {
fn default_seconds() -> u64 {
30
}
// 99Hz to avoid coinciding with special periods
fn default_frequency() -> NonZeroI32 {
NonZeroI32::new(99).unwrap()
}
}
#[tracing::instrument(level = "debug")]
async fn pprof_profile<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
let query_string = req.uri().query().unwrap_or_default();
let query: PProfArgs =
serde_urlencoded::from_str(query_string).context(InvalidQueryString { query_string })?;
let report = dump_rsprof(query.seconds, query.frequency.get())
.await
.context(PProf)?;
let mut body: Vec<u8> = Vec::new();
// render flamegraph when opening in the browser
// otherwise render as protobuf; works great with: go tool pprof http://..../debug/pprof/profile
if req
.headers()
.get_all("Accept")
.iter()
.flat_map(|i| i.to_str().unwrap_or_default().split(','))
.any(|i| i == "text/html" || i == "image/svg+xml")
{
report.flamegraph(&mut body).context(PProf)?;
if body.is_empty() {
return EmptyFlamegraph.fail();
}
} else {
let profile = report.pprof().context(PProf)?;
profile.encode(&mut body).context(Prost)?;
}
Ok(Response::new(Body::from(body)))
}
pub async fn serve<M>(
addr: AddrIncoming,
server: Arc<AppServer<M>>,