diff --git a/Cargo.lock b/Cargo.lock index 660aeea1c6..c583ce406f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,6 +31,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "796540673305a66d127804eef19ad696f1f204b8c1025aaca4958c17eab32877" +dependencies = [ + "getrandom 0.2.2", + "once_cell", + "version_check", +] + [[package]] name = "ahash" version = "0.7.2" @@ -96,6 +107,15 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" +[[package]] +name = "arrayvec" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd9fd44efafa8690358b7408d253adf110036b88f55672a933f01d616ad9b1b9" +dependencies = [ + "nodrop", +] + [[package]] name = "arrayvec" version = "0.5.2" @@ -159,7 +179,7 @@ dependencies = [ name = "arrow_util" version = "0.1.0" dependencies = [ - "ahash", + "ahash 0.7.2", "arrow 0.1.0", "futures", "hashbrown 0.11.2", @@ -193,6 +213,15 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.1" @@ -372,7 +401,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "afa748e348ad3be8263be728124b24a24f268266f6f5d58af9d75f6a40b5c587" dependencies = [ "arrayref", - "arrayvec", + "arrayvec 0.5.2", "constant_time_eq", ] @@ -424,6 +453,12 @@ version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe" +[[package]] +name = "bytemuck" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bed57e2090563b83ba8f83366628ce535a7584c9afa4c9fc0612a03925c6df58" + [[package]] name = "byteorder" version = "1.4.3" @@ -439,6 +474,40 @@ dependencies = [ "serde", ] +[[package]] +name = "cached" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e2afe73808fbaac302e39c9754bfc3c4b4d0f99c9c240b9f4e4efc841ad1b74" +dependencies = [ + "async-mutex", + "async-trait", + "cached_proc_macro", + "cached_proc_macro_types", + "futures", + "hashbrown 0.9.1", + "once_cell", +] + +[[package]] +name = "cached_proc_macro" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf857ae42d910aede5c5186e62684b0d7a597ce2fe3bd14448ab8f7ef439848c" +dependencies = [ + "async-mutex", + "cached_proc_macro_types", + "darling", + "quote", + "syn", +] + +[[package]] +name = "cached_proc_macro_types" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a4f925191b4367301851c6d99b09890311d74b0d43f274c0b34c86d308a3663" + [[package]] name = "cast" version = "0.2.5" @@ -512,7 +581,7 @@ dependencies = [ "ansi_term 0.11.0", "atty", "bitflags", - "strsim", + "strsim 0.8.0", "textwrap", "unicode-width", "vec_map", @@ -767,6 +836,41 @@ dependencies = [ "sct", ] +[[package]] +name = "darling" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.9.3", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "dashmap" version = "4.0.2" @@ -804,7 +908,7 @@ name = "datafusion" version = "4.0.0-SNAPSHOT" source = "git+https://github.com/apache/arrow-datafusion.git?rev=9cf32cf2cda8472b87130142c4eee1126d4d9cbe#9cf32cf2cda8472b87130142c4eee1126d4d9cbe" dependencies = [ - "ahash", + "ahash 0.7.2", "arrow 4.0.0-SNAPSHOT", "async-trait", "chrono", @@ -830,6 +934,15 @@ dependencies = [ "futures", ] +[[package]] +name = "debugid" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f91cf5a8c2f2097e2a32627123508635d47ce10563d999ec1a95addf08b502ba" +dependencies = [ + "uuid", +] + [[package]] name = "difference" version = "2.0.0" @@ -975,6 +1088,12 @@ dependencies = [ "termcolor", ] +[[package]] +name = "event-listener" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" + [[package]] name = "extend" version = "0.1.2" @@ -1319,7 +1438,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" dependencies = [ - "ahash", + "ahash 0.7.2", ] [[package]] @@ -1456,6 +1575,12 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.2.3" @@ -1477,6 +1602,24 @@ dependencies = [ "hashbrown 0.9.1", ] +[[package]] +name = "inferno" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37fb405dbcc505ed20838c4f8dad7b901094de90add237755df54bd5dcda2fdd" +dependencies = [ + "ahash 0.6.3", + "atty", + "indexmap", + "itoa", + "lazy_static", + "log", + "num-format", + "quick-xml", + "rgb", + "str_stack", +] + [[package]] name = "influxdb2_client" version = "0.1.0" @@ -1541,6 +1684,7 @@ dependencies = [ "panic_logging", "parking_lot", "parquet 0.1.0", + "pprof", "predicates", "prettytable-rs", "prost", @@ -1738,7 +1882,7 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" dependencies = [ - "arrayvec", + "arrayvec 0.5.2", "bitflags", "cfg-if", "ryu", @@ -1859,6 +2003,16 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc" +[[package]] +name = "memmap" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "memoffset" version = "0.6.3" @@ -2024,6 +2178,12 @@ dependencies = [ "libc", ] +[[package]] +name = "nodrop" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" + [[package]] name = "nom" version = "5.1.2" @@ -2095,6 +2255,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-format" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bafe4179722c2894288ee77a9f044f02811c86af699344c498b0840c698a2465" +dependencies = [ + "arrayvec 0.4.12", + "itoa", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -2625,6 +2795,27 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "pprof" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cb1ca0d59aa771d9bc7e268739d7aef6ca7e9e8d3b78d92f266cd663fd0c1" +dependencies = [ + "backtrace", + "inferno", + "lazy_static", + "libc", + "log", + "nix", + "parking_lot", + "prost", + "prost-build", + "prost-derive", + "symbolic-demangle", + "tempfile", + "thiserror", +] + [[package]] name = "ppv-lite86" version = "0.2.10" @@ -2824,6 +3015,15 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quick-xml" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26aab6b48e2590e4a64d1ed808749ba06257882b461d01ca71baeb747074a6dd" +dependencies = [ + "memchr", +] + [[package]] name = "quote" version = "1.0.9" @@ -3095,6 +3295,15 @@ dependencies = [ "winreg", ] +[[package]] +name = "rgb" +version = "0.8.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fddb3b23626145d1776addfc307e1a1851f60ef6ca64f376bcb889697144cf0" +dependencies = [ + "bytemuck", +] + [[package]] name = "ring" version = "0.16.20" @@ -3461,6 +3670,7 @@ dependencies = [ "arrow_util", "async-trait", "bytes", + "cached", "chrono", "crc32fast", "criterion", @@ -3702,12 +3912,24 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" +[[package]] +name = "str_stack" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb" + [[package]] name = "strsim" version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +[[package]] +name = "strsim" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" + [[package]] name = "structopt" version = "0.3.21" @@ -3738,6 +3960,28 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2" +[[package]] +name = "symbolic-common" +version = "8.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be7dfa630954f18297ceae1ff2890cb7f5008a0b2d2106b0468dafc45b0b6b12" +dependencies = [ + "debugid", + "memmap", + "stable_deref_trait", + "uuid", +] + +[[package]] +name = "symbolic-demangle" +version = "8.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b4ba42bd1221803e965054767b1899f2db9a12c89969965c6cb3a02af7014eb" +dependencies = [ + "rustc-demangle", + "symbolic-common", +] + [[package]] name = "syn" version = "1.0.67" diff --git a/Cargo.toml b/Cargo.toml index 904749f4e9..a987028199 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,6 +92,7 @@ parking_lot = "0.11.1" itertools = "0.9.0" # used by arrow/datafusion anyway prettytable-rs = "0.8" +pprof = { version = "^0.4", default-features = false, features = ["flamegraph", "protobuf"] } prost = "0.7" # Forked to upgrade hyper and tokio routerify = { git = "https://github.com/influxdata/routerify", rev = "274e250" } diff --git a/data_types/src/error.rs b/data_types/src/error.rs index 2b51153cbe..ef7131298b 100644 --- a/data_types/src/error.rs +++ b/data_types/src/error.rs @@ -4,8 +4,8 @@ use std::fmt::Debug; use observability_deps::tracing::error; /// Add ability for Results to log error messages via `error!` logs. -/// This is useful when using async tasks that may not have a natural -/// return error +/// This is useful when using async tasks that may not have any code +/// checking their return values. pub trait ErrorLogger { /// Log the contents of self with a string of context. The context /// should appear in a message such as diff --git a/docs/README.md b/docs/README.md index 2cb4e2d1fc..142adaaf01 100644 --- a/docs/README.md +++ b/docs/README.md @@ -17,6 +17,7 @@ We hold monthly Tech Talks that explain the project's technical underpinnings. Y * Rust style and Idiom guide: [style_guide.md](style_guide.md) * Tracing and logging Guide: [tracing.md](tracing.md) +* Profiling Guide: [profiling.md](profiling.md) * How InfluxDB IOx manages the lifecycle of time series data: [data_management.md](data_management.md) * Thoughts on parquet encoding and compression for timeseries data: [encoding_thoughts.md](encoding_thoughts.md) * Thoughts on using multiple cores: [multi_core_tasks.md](multi_core_tasks.md) diff --git a/docs/images/flame_graph.png b/docs/images/flame_graph.png new file mode 100644 index 0000000000..a56def9c99 Binary files /dev/null and b/docs/images/flame_graph.png differ diff --git a/docs/profiling.md b/docs/profiling.md new file mode 100644 index 0000000000..db34eb70d2 --- /dev/null +++ b/docs/profiling.md @@ -0,0 +1,46 @@ +# IOx — Profiling + +IOx includes an embedded `pprof` exporter compatible with the [go pprof](https://golang.org/pkg/net/http/pprof/) tool. + +To use it, aim your favorite tool at your IOx host at the HTTP `/debug/pprof/profile` endpoint. + +# Use the Go `pprof` tool: + +Example + +```shell +go tool pprof 'http://localhost:8080/debug/pprof/profile?seconds=5' +``` + +And you get output like: + +``` +Fetching profile over HTTP from http://localhost:8080/debug/pprof/profile?seconds=5 +Saved profile in /Users/mkm/pprof/pprof.cpu.006.pb.gz +Type: cpu +Entering interactive mode (type "help" for commands, "o" for options) +(pprof) top +Showing nodes accounting for 93, 100% of 93 total +Showing top 10 nodes out of 185 + flat flat% sum% cum cum% + 93 100% 100% 93 100% backtrace::backtrace::libunwind::trace + 0 0% 100% 1 1.08% <&str as nom::traits::InputTakeAtPosition>::split_at_position1_complete + 0 0% 100% 1 1.08% <(FnA,FnB) as nom::sequence::Tuple>::parse + 0 0% 100% 1 1.08% <(FnA,FnB,FnC) as nom::sequence::Tuple>::parse + 0 0% 100% 5 5.38% ::try_poll + 0 0% 100% 1 1.08% ::to_vec + 0 0% 100% 1 1.08% ::allocate + 0 0% 100% 1 1.08% as core::clone::Clone>::clone + 0 0% 100% 3 3.23% as alloc::vec::spec_extend::SpecExtend>::spec_extend + 0 0% 100% 1 1.08% as core::iter::traits::collect::Extend>::extend +``` + +# Use the built in flame graph renderer + +IOx also knows how to render a flamegraph SVG directly if opened directly in the browser: + +For example, if you aim your browser at an IOx server with a URL such as http://localhost:8080/debug/pprof/profile?seconds=5 + +You will see a beautiful flame graph such as + +![Flame Graph](images/flame_graph.png) diff --git a/mutable_buffer/src/chunk/snapshot.rs b/mutable_buffer/src/chunk/snapshot.rs index 04718fc689..efda563c0e 100644 --- a/mutable_buffer/src/chunk/snapshot.rs +++ b/mutable_buffer/src/chunk/snapshot.rs @@ -8,7 +8,7 @@ use internal_types::selection::Selection; use snafu::{OptionExt, ResultExt, Snafu}; use super::Chunk; -use data_types::partition_metadata::Statistics; +use data_types::{error::ErrorLogger, partition_metadata::Statistics}; #[derive(Debug, Snafu)] pub enum Error { @@ -57,8 +57,15 @@ impl ChunkSnapshot { let mut records: HashMap = Default::default(); let table = &chunk.table; - let schema = table.schema(&chunk.dictionary, Selection::All).unwrap(); - let batch = table.to_arrow(&chunk.dictionary, Selection::All).unwrap(); + let schema = table + .schema(&chunk.dictionary, Selection::All) + .log_if_error("ChunkSnapshot getting table schema") + .unwrap(); + let batch = table + .to_arrow(&chunk.dictionary, Selection::All) + .log_if_error("ChunkSnapshot converting table to arrow") + .unwrap(); + let name = chunk.table_name.as_ref(); let timestamp_range = @@ -87,10 +94,13 @@ impl ChunkSnapshot { }, ); - Self { - chunk_id: chunk.id.expect("cannot snapshot chunk without an ID"), - records, - } + let chunk_id = chunk + .id + .ok_or("cannot snapshot chunk without an ID") + .log_if_error("ChunkSnapshot determining chunk id") + .unwrap(); + + Self { chunk_id, records } } /// return the ID of the chunk this is a snapshot of diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 5c2b731c26..d3b7583d05 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -251,24 +251,33 @@ where S: CatalogState, { /// Create new catalog w/o any data. - pub fn new_empty( + /// + /// An empty transaction will be used to mark the catalog start so that concurrent open but still-empty catalogs can + /// easily be detected. + pub async fn new_empty( object_store: Arc, server_id: ServerId, db_name: impl Into, state_data: S::EmptyInput, - ) -> Self { + ) -> Result { let inner = PreservedCatalogInner { previous_tkey: None, state: Arc::new(S::new_empty(state_data)), }; - Self { + let catalog = Self { inner: RwLock::new(inner), transaction_semaphore: Semaphore::new(1), object_store, server_id, db_name: db_name.into(), - } + }; + + // add empty transaction + let transaction = catalog.open_transaction().await; + transaction.commit().await?; + + Ok(catalog) } /// Load existing catalog from store, if it exists. @@ -383,14 +392,13 @@ where } /// Get latest revision counter. - /// - /// This can be `None` for a newly created catalog. - pub fn revision_counter(&self) -> Option { + pub fn revision_counter(&self) -> u64 { self.inner .read() .previous_tkey .clone() .map(|tkey| tkey.revision_counter) + .expect("catalog should have at least an empty transaction") } } @@ -960,6 +968,42 @@ pub mod tests { use super::test_helpers::TestCatalogState; use super::*; + #[tokio::test] + async fn test_create_empty() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + + assert!(PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + () + ) + .await + .unwrap() + .is_none()); + + PreservedCatalog::::new_empty( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await + .unwrap(); + + assert!(PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + () + ) + .await + .unwrap() + .is_some()); + } + #[tokio::test] async fn test_inmem_commit_semantics() { let object_store = make_object_store(); @@ -1064,12 +1108,7 @@ pub mod tests { (), ) .await; - assert!(matches!( - res, - Err(Error::MissingTransaction { - revision_counter: 0 - }) - )); + assert_eq!(res.unwrap_err().to_string(), "Missing transaction: 0",); } #[tokio::test] @@ -1327,14 +1366,16 @@ pub mod tests { make_server_id(), "db1".to_string(), (), - ); + ) + .await + .unwrap(); let mut t = catalog.open_transaction().await; // open transaction t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string(); assert_eq!( format!("{:?}", t), - "TransactionHandle(open, 0.00000000-0000-0000-0000-000000000000)" + "TransactionHandle(open, 1.00000000-0000-0000-0000-000000000000)" ); // "closed" transaction @@ -1521,7 +1562,9 @@ pub mod tests { server_id, db_name.to_string(), (), - ); + ) + .await + .unwrap(); // get some test metadata let metadata1 = make_metadata(object_store, "foo").await; @@ -1531,8 +1574,9 @@ pub mod tests { let mut trace = TestTrace::new(); // empty catalog has no data - assert!(catalog.revision_counter().is_none()); + assert_eq!(catalog.revision_counter(), 0); assert_catalog_parquet_files(&catalog, &[]); + trace.record(&catalog); // fill catalog with examples { @@ -1548,7 +1592,7 @@ pub mod tests { t.commit().await.unwrap(); } - assert_eq!(catalog.revision_counter().unwrap(), 0); + assert_eq!(catalog.revision_counter(), 1); assert_catalog_parquet_files( &catalog, &[ @@ -1578,7 +1622,7 @@ pub mod tests { t.commit().await.unwrap(); } - assert_eq!(catalog.revision_counter().unwrap(), 1); + assert_eq!(catalog.revision_counter(), 2); assert_catalog_parquet_files( &catalog, &[ @@ -1599,7 +1643,7 @@ pub mod tests { // NO commit here! } - assert_eq!(catalog.revision_counter().unwrap(), 1); + assert_eq!(catalog.revision_counter(), 2); assert_catalog_parquet_files( &catalog, &[ @@ -1629,7 +1673,7 @@ pub mod tests { .unwrap() .unwrap(); assert_eq!( - catalog.revision_counter().unwrap(), + catalog.revision_counter(), trace.tkeys.last().unwrap().revision_counter ); assert_catalog_parquet_files( diff --git a/query/src/exec/context.rs b/query/src/exec/context.rs index c8a07ac0db..984fffca9e 100644 --- a/query/src/exec/context.rs +++ b/query/src/exec/context.rs @@ -8,7 +8,7 @@ use datafusion::{ execution::context::{ExecutionContextState, QueryPlanner}, logical_plan::{LogicalPlan, UserDefinedLogicalNode}, physical_plan::{ - collect, + collect, displayable, merge::MergeExec, planner::{DefaultPhysicalPlanner, ExtensionPlanner}, ExecutionPlan, PhysicalPlanner, SendableRecordBatchStream, @@ -151,21 +151,15 @@ impl IOxExecutionContext { /// Prepare (optimize + plan) a pre-created logical plan for execution pub fn prepare_plan(&self, plan: &LogicalPlan) -> Result> { - debug!( - "Creating plan: Initial plan\n----\n{}\n{}\n----", - plan.display_indent_schema(), - plan.display_graphviz(), - ); + debug!(text=%plan.display_indent_schema(), "initial plan"); let plan = self.inner.optimize(&plan)?; + debug!(text=%plan.display_indent_schema(), graphviz=%plan.display_graphviz(), "optimized plan"); - debug!( - "Creating plan: Optimized plan\n----\n{}\n{}\n----", - plan.display_indent_schema(), - plan.display_graphviz(), - ); + let physical_plan = self.inner.create_physical_plan(&plan)?; - self.inner.create_physical_plan(&plan) + debug!(text=%displayable(physical_plan.as_ref()).indent(), "optimized physical plan"); + Ok(physical_plan) } /// Executes the logical plan using DataFusion on a separate diff --git a/query/src/predicate.rs b/query/src/predicate.rs index f02fdc6958..5b6c69fb90 100644 --- a/query/src/predicate.rs +++ b/query/src/predicate.rs @@ -3,7 +3,7 @@ //! mode as well as for arbitrary other predicates that are expressed //! by DataFusion's `Expr` type. -use std::collections::{BTreeSet, HashSet}; +use std::{collections::{BTreeSet, HashSet}, fmt}; use data_types::timestamp::TimestampRange; use datafusion::{ @@ -27,9 +27,9 @@ pub const EMPTY_PREDICATE: Predicate = Predicate { /// Represents a parsed predicate for evaluation by the /// TSDatabase InfluxDB IOx query engine. /// -/// Note that the data model of TSDatabase (e.g. ParsedLine's) +/// Note that the InfluxDB data model (e.g. ParsedLine's) /// distinguishes between some types of columns (tags and fields), and -/// likewise the semantics of this structure has some types of +/// likewise the semantics of this structure can express some types of /// restrictions that only apply to certain types of columns. #[derive(Clone, Debug, Default, PartialEq)] pub struct Predicate { @@ -37,23 +37,23 @@ pub struct Predicate { /// to only tables whose names are in `table_names` pub table_names: Option>, - // Optional field restriction. If present, restricts the results to only - // tables which have *at least one* of the fields in field_columns. + /// Optional field restriction. If present, restricts the results to only + /// tables which have *at least one* of the fields in field_columns. pub field_columns: Option>, + /// Optional partition key filter + pub partition_key: Option, + + /// Optional timestamp range: only rows within this range are included in + /// results. Other rows are excluded + pub range: Option, + /// Optional arbitrary predicates, represented as list of /// DataFusion expressions applied a logical conjunction (aka they /// are 'AND'ed together). Only rows that evaluate to TRUE for all /// these expressions should be returned. Other rows are excluded /// from the results. pub exprs: Vec, - - /// Optional timestamp range: only rows within this range are included in - /// results. Other rows are excluded - pub range: Option, - - /// Optional partition key filter - pub partition_key: Option, } impl Predicate { @@ -108,8 +108,66 @@ impl Predicate { } } +impl fmt::Display for Predicate { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn iter_to_str(s: impl IntoIterator) -> String + where + S: ToString, + { + s.into_iter() + .map(|v| v.to_string()) + .collect::>() + .join(", ") + } + + write!(f, "Predicate")?; + + if let Some(table_names) = &self.table_names { + write!(f, " table_names: {{{}}}", iter_to_str(table_names))?; + } + + if let Some(field_columns) = &self.field_columns { + write!(f, " field_columns: {{{}}}", iter_to_str(field_columns))?; + } + + if let Some(partition_key) = &self.partition_key { + write!(f, " partition_key: '{}'", partition_key)?; + } + + if let Some(range) = &self.range { + // TODO: could be nice to show this as actual timestamps (not just numbers)? + write!(f, " range: [{} - {}]", range.start, range.end)?; + } + + if !self.exprs.is_empty() { + // Expr doesn't implement `Display` yet, so just the debug version + // See https://github.com/apache/arrow-datafusion/issues/347 + let display_exprs = self.exprs.iter().map(|e| format!("{:?}", e)); + write!(f, " exprs: [{}]", iter_to_str(display_exprs))?; + } + + Ok(()) + } +} + #[derive(Debug, Default)] -/// Structure for building `Predicate`s +/// Structure for building [`Predicate`]s +/// +/// Example: +/// ``` +/// use query::predicate::PredicateBuilder; +/// use datafusion::logical_plan::{col, lit}; +/// +/// let p = PredicateBuilder::new() +/// .timestamp_range(1, 100) +/// .add_expr(col("foo").eq(lit(42))) +/// .build(); +/// +/// assert_eq!( +/// p.to_string(), +/// "Predicate range: [1 - 100] exprs: [#foo Eq Int32(42)]" +/// ); +/// ``` pub struct PredicateBuilder { inner: Predicate, } @@ -315,9 +373,8 @@ impl PredicateBuilder { #[cfg(test)] mod tests { - use datafusion::logical_plan::{col, lit}; - use super::*; + use datafusion::logical_plan::{col, lit}; #[test] fn test_default_predicate_is_empty() { @@ -392,4 +449,38 @@ mod tests { assert_eq!(predicate.exprs[5], col("city").eq(lit("Boston"))); assert_eq!(predicate.exprs[6], col("city").not_eq(lit("Braintree"))); } + + #[test] + fn predicate_display_ts() { + // TODO make this a doc example? + let p = PredicateBuilder::new().timestamp_range(1, 100).build(); + + assert_eq!(p.to_string(), "Predicate range: [1 - 100]"); + } + + #[test] + fn predicate_display_ts_and_expr() { + let p = PredicateBuilder::new() + .timestamp_range(1, 100) + .add_expr(col("foo").eq(lit(42)).and(col("bar").lt(lit(11)))) + .build(); + + assert_eq!( + p.to_string(), + "Predicate range: [1 - 100] exprs: [#foo Eq Int32(42) And #bar Lt Int32(11)]" + ); + } + + #[test] + fn predicate_display_full() { + let p = PredicateBuilder::new() + .timestamp_range(1, 100) + .add_expr(col("foo").eq(lit(42))) + .table("my_table") + .field_columns(vec!["f1", "f2"]) + .partition_key("the_key") + .build(); + + assert_eq!(p.to_string(), "Predicate table_names: {my_table} field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo Eq Int32(42)]"); + } } diff --git a/query/src/provider/physical.rs b/query/src/provider/physical.rs index 7ea4023258..b2186ecde0 100644 --- a/query/src/provider/physical.rs +++ b/query/src/provider/physical.rs @@ -1,11 +1,11 @@ //! Implementation of a DataFusion PhysicalPlan node across partition chunks -use std::sync::Arc; +use std::{fmt, sync::Arc}; use arrow::datatypes::SchemaRef; use datafusion::{ error::DataFusionError, - physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream}, + physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream}, }; use internal_types::{schema::Schema, selection::Selection}; @@ -116,6 +116,21 @@ impl ExecutionPlan for IOxReadFilterNode { Ok(Box::pin(adapter)) } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default => { + // Note Predicate doesn't implement Display so punt on showing that now + write!( + f, + "IOxReadFilterNode: table_name={}, chunks={} predicate={}", + self.table_name, + self.chunk_and_infos.len(), + self.predicate, + ) + } + } + } } /// Removes any columns that are not present in schema, returning a possibly diff --git a/read_buffer/benches/plain.rs b/read_buffer/benches/plain.rs index da796c5620..f6c3a03f5a 100644 --- a/read_buffer/benches/plain.rs +++ b/read_buffer/benches/plain.rs @@ -116,7 +116,7 @@ fn benchmark_plain_sum( |b, input| { b.iter(|| { // do work - let _ = encoding.sum(&input); + let _ = encoding.sum::(&input); }); }, ); @@ -142,7 +142,7 @@ fn benchmark_plain_sum( |b, input| { b.iter(|| { // do work - let _ = encoding.sum::(&input); + let _ = encoding.sum::(&input); }); }, ); @@ -161,7 +161,7 @@ fn benchmark_plain_sum( |b, input| { b.iter(|| { // do work - let _ = encoding.sum(&input); + let _ = encoding.sum::(&input); }); }, ); @@ -206,7 +206,7 @@ fn benchmark_plain_sum( |b, input| { b.iter(|| { // do work - let _ = encoding.sum(&input); + let _ = encoding.sum::(&input); }); }, ); diff --git a/read_buffer/benches/sum_fixed.rs b/read_buffer/benches/sum_fixed.rs index b34426766e..7c38172b5f 100644 --- a/read_buffer/benches/sum_fixed.rs +++ b/read_buffer/benches/sum_fixed.rs @@ -116,7 +116,7 @@ fn benchmark_none_sum( |b, input| { b.iter(|| { // do work - let _ = encoding.sum(&input); + let _ = encoding.sum::(&input); }); }, ); @@ -142,7 +142,7 @@ fn benchmark_none_sum( |b, input| { b.iter(|| { // do work - let _ = encoding.sum::(&input); + let _ = encoding.sum::(&input); }); }, ); @@ -161,7 +161,7 @@ fn benchmark_none_sum( |b, input| { b.iter(|| { // do work - let _ = encoding.sum(&input); + let _ = encoding.sum::(&input); }); }, ); @@ -187,7 +187,7 @@ fn benchmark_none_sum( |b, input| { b.iter(|| { // do work - let _ = encoding.sum::(&input); + let _ = encoding.sum::(&input); }); }, ); @@ -206,7 +206,7 @@ fn benchmark_none_sum( |b, input| { b.iter(|| { // do work - let _ = encoding.sum(&input); + let _ = encoding.sum::(&input); }); }, ); diff --git a/read_buffer/src/column/encoding/scalar/fixed.rs b/read_buffer/src/column/encoding/scalar/fixed.rs index 6186f456a9..64b65169e9 100644 --- a/read_buffer/src/column/encoding/scalar/fixed.rs +++ b/read_buffer/src/column/encoding/scalar/fixed.rs @@ -21,7 +21,7 @@ use arrow::array::Array; use crate::column::{cmp, RowIDs}; -#[derive(Debug, Default)] +#[derive(Debug, Default, PartialEq, PartialOrd)] /// A Fixed encoding is one in which every value has a fixed width, and is /// stored contiguous in a backing vector. Fixed encodings do not support NULL /// values, so are suitable for columns known to not have NULL values that we @@ -566,13 +566,6 @@ macro_rules! fixed_from_arrow_impls { } fixed_from_arrow_impls! { - (&arrow::array::Int64Array, i64), - (&arrow::array::Int64Array, i32), - (&arrow::array::Int64Array, i16), - (&arrow::array::Int64Array, i8), - (&arrow::array::Int64Array, u32), - (&arrow::array::Int64Array, u16), - (&arrow::array::Int64Array, u8), (arrow::array::Int64Array, i64), (arrow::array::Int64Array, i32), (arrow::array::Int64Array, i16), @@ -581,10 +574,6 @@ fixed_from_arrow_impls! { (arrow::array::Int64Array, u16), (arrow::array::Int64Array, u8), - (&arrow::array::UInt64Array, u64), - (&arrow::array::UInt64Array, u32), - (&arrow::array::UInt64Array, u16), - (&arrow::array::UInt64Array, u8), (arrow::array::UInt64Array, u64), (arrow::array::UInt64Array, u32), (arrow::array::UInt64Array, u16), diff --git a/read_buffer/src/column/encoding/scalar/fixed_null.rs b/read_buffer/src/column/encoding/scalar/fixed_null.rs index c0c64c610b..4e3865622d 100644 --- a/read_buffer/src/column/encoding/scalar/fixed_null.rs +++ b/read_buffer/src/column/encoding/scalar/fixed_null.rs @@ -14,6 +14,7 @@ //! consumer of these encodings. use std::cmp::Ordering; use std::fmt::Debug; +use std::iter::FromIterator; use std::mem::size_of; use arrow::{ @@ -23,7 +24,7 @@ use arrow::{ use crate::column::{cmp, RowIDs}; -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub struct FixedNull where T: ArrowNumericType, @@ -116,26 +117,30 @@ where // // - /// Return the logical (decoded) value at the provided row ID. A NULL value + /// Return the logical (decoded) value at the provided row ID according to + /// the logical type of the column, which is specified by `U`. A NULL value /// is represented by None. - pub fn value(&self, row_id: u32) -> Option { + pub fn value(&self, row_id: u32) -> Option + where + U: From, + { if self.arr.is_null(row_id as usize) { return None; } - Some(self.arr.value(row_id as usize)) + Some(U::from(self.arr.value(row_id as usize))) } - /// Returns the logical (decoded) values for the provided row IDs. + /// Returns the logical (decoded) values for the provided row IDs according + /// to the logical type of the column, which is specified by `U`. /// /// NULL values are represented by None. /// /// TODO(edd): Perf - we should return a vector of values and a vector of /// integers representing the null validity bitmap. - pub fn values( - &self, - row_ids: &[u32], - mut dst: Vec>, - ) -> Vec> { + pub fn values(&self, row_ids: &[u32], mut dst: Vec>) -> Vec> + where + U: From, + { dst.clear(); dst.reserve(row_ids.len()); @@ -143,20 +148,24 @@ where if self.arr.is_null(row_id as usize) { dst.push(None) } else { - dst.push(Some(self.arr.value(row_id as usize))) + dst.push(Some(U::from(self.arr.value(row_id as usize)))) } } assert_eq!(dst.len(), row_ids.len()); dst } - /// Returns the logical (decoded) values for all the rows in the column. + /// Returns the logical (decoded) values for all the rows in the column + /// according to the logical type of the column, which is specified by `U`. /// /// NULL values are represented by None. /// /// TODO(edd): Perf - we should return a vector of values and a vector of /// integers representing the null validity bitmap. - pub fn all_values(&self, mut dst: Vec>) -> Vec> { + pub fn all_values(&self, mut dst: Vec>) -> Vec> + where + U: From, + { dst.clear(); dst.reserve(self.arr.len()); @@ -164,7 +173,7 @@ where if self.arr.is_null(i) { dst.push(None) } else { - dst.push(Some(self.arr.value(i))) + dst.push(Some(U::from(self.arr.value(i)))) } } assert_eq!(dst.len(), self.num_rows() as usize); @@ -202,23 +211,23 @@ where /// this implementation (about 85% in the `sum` case). We will revisit /// them in the future as they do would the implementation of these /// aggregation functions. - pub fn sum(&self, row_ids: &[u32]) -> Option + pub fn sum(&self, row_ids: &[u32]) -> Option where - T::Native: std::ops::Add, + U: Default + From + std::ops::Add, { - let mut result = T::Native::default(); + let mut result = U::default(); if self.arr.null_count() == 0 { for chunks in row_ids.chunks_exact(4) { - result = result + self.arr.value(chunks[3] as usize); - result = result + self.arr.value(chunks[2] as usize); - result = result + self.arr.value(chunks[1] as usize); - result = result + self.arr.value(chunks[0] as usize); + result = result + U::from(self.arr.value(chunks[3] as usize)); + result = result + U::from(self.arr.value(chunks[2] as usize)); + result = result + U::from(self.arr.value(chunks[1] as usize)); + result = result + U::from(self.arr.value(chunks[0] as usize)); } let rem = row_ids.len() % 4; for &i in &row_ids[row_ids.len() - rem..row_ids.len()] { - result = result + self.arr.value(i as usize); + result = result + U::from(self.arr.value(i as usize)); } return Some(result); @@ -230,7 +239,7 @@ where continue; } is_none = false; - result = result + self.arr.value(i as usize); + result = result + U::from(self.arr.value(i as usize)); } if is_none { @@ -241,20 +250,29 @@ where /// Returns the first logical (decoded) value from the provided /// row IDs. - pub fn first(&self, row_ids: &[u32]) -> Option { + pub fn first(&self, row_ids: &[u32]) -> Option + where + U: From, + { self.value(row_ids[0]) } /// Returns the last logical (decoded) value from the provided /// row IDs. - pub fn last(&self, row_ids: &[u32]) -> Option { + pub fn last(&self, row_ids: &[u32]) -> Option + where + U: From, + { self.value(row_ids[row_ids.len() - 1]) } /// Returns the minimum logical (decoded) non-null value from the provided /// row IDs. - pub fn min(&self, row_ids: &[u32]) -> Option { - let mut min: Option = self.value(row_ids[0]); + pub fn min(&self, row_ids: &[u32]) -> Option + where + U: From + PartialOrd, + { + let mut min: Option = self.value(row_ids[0]); for &v in row_ids.iter().skip(1) { if self.arr.is_null(v as usize) { continue; @@ -269,8 +287,11 @@ where /// Returns the maximum logical (decoded) non-null value from the provided /// row IDs. - pub fn max(&self, row_ids: &[u32]) -> Option { - let mut max: Option = self.value(row_ids[0]); + pub fn max(&self, row_ids: &[u32]) -> Option + where + U: From + PartialOrd, + { + let mut max: Option = self.value(row_ids[0]); for &v in row_ids.iter().skip(1) { if self.arr.is_null(v as usize) { continue; @@ -414,11 +435,11 @@ where /// `x {>, >=, <, <=} value1 AND x {>, >=, <, <=} value2`. pub fn row_ids_filter_range( &self, - left: (T::Native, cmp::Operator), - right: (T::Native, cmp::Operator), + left: (T::Native, &cmp::Operator), + right: (T::Native, &cmp::Operator), dst: RowIDs, ) -> RowIDs { - match (&left.1, &right.1) { + match (left.1, right.1) { (cmp::Operator::GT, cmp::Operator::LT) | (cmp::Operator::GT, cmp::Operator::LTE) | (cmp::Operator::GTE, cmp::Operator::LT) @@ -509,81 +530,93 @@ where // This macro implements the From trait for slices of various logical types. // -// Here is an example implementation: +// Here are example implementations: // -// impl From<&[i64]> for FixedNull { -// fn from(v: &[i64]) -> Self { +// impl From> for FixedNull { +// fn from(v: Vec) -> Self { // Self{ -// arr: PrimitiveArray::from(v.to_vec()), +// arr: PrimitiveArray::from(v), // } // } // } // -// impl From<&[Option]> for -// FixedNull { fn from(v: &[i64]) -// -> Self { Self{ -// arr: PrimitiveArray::from(v.to_vec()), +// impl From<&[i64]> for FixedNull { +// fn from(v: &[i64]) -> Self { +// Self::from(v.to_vec()) +// } +// } +// +// impl From>> for FixedNull { +// fn from(v: Vec>) -> Self { +// Self{ +// arr: PrimitiveArray::from(v), // } // } // } // - -macro_rules! fixed_from_slice_impls { +// impl From<&[Option]> for FixedNull { +// fn from(v: &[i64]) -> Self { +// Self::from(v.to_vec()) +// } +// } +// +macro_rules! fixed_null_from_native_types { ($(($type_from:ty, $type_to:ty),)*) => { $( + impl From> for FixedNull<$type_to> { + fn from(v: Vec<$type_from>) -> Self { + Self{ + arr: PrimitiveArray::from(v), + } + } + } + impl From<&[$type_from]> for FixedNull<$type_to> { fn from(v: &[$type_from]) -> Self { + Self::from(v.to_vec()) + } + } + + impl From>> for FixedNull<$type_to> { + fn from(v: Vec>) -> Self { Self{ - arr: PrimitiveArray::from(v.to_vec()), + arr: PrimitiveArray::from(v), } } } impl From<&[Option<$type_from>]> for FixedNull<$type_to> { fn from(v: &[Option<$type_from>]) -> Self { - Self{ - arr: PrimitiveArray::from(v.to_vec()), - } + Self::from(v.to_vec()) } } )* }; } -// Supported logical and physical datatypes for the FixedNull encoding. -// -// Need to look at possibility of initialising smaller datatypes... -fixed_from_slice_impls! { +fixed_null_from_native_types! { (i64, arrow::datatypes::Int64Type), - // (i64, arrow::datatypes::Int32Type), - // (i64, arrow::datatypes::Int16Type), - // (i64, arrow::datatypes::Int8Type), - // (i64, arrow::datatypes::UInt32Type), - // (i64, arrow::datatypes::UInt16Type), - // (i64, arrow::datatypes::UInt8Type), - (i32, arrow::datatypes::Int32Type), - // (i32, arrow::datatypes::Int16Type), - // (i32, arrow::datatypes::Int8Type), - // (i32, arrow::datatypes::UInt16Type), - // (i32, arrow::datatypes::UInt8Type), - (i16, arrow::datatypes::Int16Type), - // (i16, arrow::datatypes::Int8Type), - // (i16, arrow::datatypes::UInt8Type), - (i8, arrow::datatypes::Int8Type), - (u64, arrow::datatypes::UInt64Type), - // (u64, arrow::datatypes::UInt32Type), - // (u64, arrow::datatypes::UInt16Type), - // (u64, arrow::datatypes::UInt8Type), - (u32, arrow::datatypes::UInt32Type), - // (u32, arrow::datatypes::UInt16Type), - // (u32, arrow::datatypes::UInt8Type), - (u16, arrow::datatypes::UInt16Type), - // (u16, arrow::datatypes::UInt8Type), - (u8, arrow::datatypes::UInt8Type), - (f64, arrow::datatypes::Float64Type), + (i32, arrow::datatypes::Int32Type), + (i16, arrow::datatypes::Int16Type), + (i8, arrow::datatypes::Int8Type), + (u64, arrow::datatypes::UInt64Type), + (u32, arrow::datatypes::UInt32Type), + (u16, arrow::datatypes::UInt16Type), + (u8, arrow::datatypes::UInt8Type), + (f64, arrow::datatypes::Float64Type), } -macro_rules! fixed_from_arrow_impls { +// This macro implements the From trait for Arrow arrays +// +// Implementation: +// +// impl From for FixedNull { +// fn from(arr: Int64Array) -> Self { +// Self{arr} +// } +// } +// +macro_rules! fixed_null_from_arrow_types { ($(($type_from:ty, $type_to:ty),)*) => { $( impl From<$type_from> for FixedNull<$type_to> { @@ -595,27 +628,76 @@ macro_rules! fixed_from_arrow_impls { }; } -// Supported logical and physical datatypes for the Plain encoding. -// -// Need to look at possibility of initialising smaller datatypes... -fixed_from_arrow_impls! { +fixed_null_from_arrow_types! { (arrow::array::Int64Array, arrow::datatypes::Int64Type), (arrow::array::UInt64Array, arrow::datatypes::UInt64Type), (arrow::array::Float64Array, arrow::datatypes::Float64Type), +} - // TODO(edd): add more datatypes +// This macro implements the From trait for Arrow arrays where some down-casting +// to a smaller physical type happens. It is the caller's responsibility to +// ensure that this down-casting is safe. +// +// Example implementation: +// +// impl From for FixedNull { +// fn from(arr: Int64Array) -> Self { +// let arr: PrimitiveArray = +// PrimitiveArray::from_iter(arr.iter().map(|v| v.map(|v| v as i32))); +// Self { arr } +// } +// } +// +macro_rules! fixed_null_from_arrow_types_down_cast { + ($(($type_from:ty, $type_to:ty, $rust_type:ty),)*) => { + $( + impl From<$type_from> for FixedNull<$type_to> { + fn from(arr: $type_from) -> Self { + let arr: PrimitiveArray<$type_to> = + PrimitiveArray::from_iter(arr.iter().map(|v| v.map(|v| v as $rust_type))); + Self { arr } + } + } + )* + }; +} + +fixed_null_from_arrow_types_down_cast! { + (arrow::array::Int64Array, arrow::datatypes::Int32Type, i32), + (arrow::array::Int64Array, arrow::datatypes::UInt32Type, u32), + (arrow::array::Int64Array, arrow::datatypes::Int16Type, i16), + (arrow::array::Int64Array, arrow::datatypes::UInt16Type, u16), + (arrow::array::Int64Array, arrow::datatypes::Int8Type, i8), + (arrow::array::Int64Array, arrow::datatypes::UInt8Type, u8), + (arrow::array::UInt64Array, arrow::datatypes::UInt32Type, u32), + (arrow::array::UInt64Array, arrow::datatypes::UInt16Type, u16), + (arrow::array::UInt64Array, arrow::datatypes::UInt8Type, u8), } #[cfg(test)] mod test { use super::cmp::Operator; use super::*; + use arrow::array::*; use arrow::datatypes::*; fn some_vec(v: Vec) -> Vec> { v.iter().map(|x| Some(*x)).collect() } + #[test] + fn from_arrow_downcast() { + let arr = Int64Array::from(vec![100, u8::MAX as i64]); + let exp_values = arr.iter().collect::>>(); + let enc: FixedNull = FixedNull::from(arr); + assert_eq!(enc.all_values(vec![]), exp_values); + + let arr = Int64Array::from(vec![100, i32::MAX as i64]); + let exp_values = arr.iter().collect::>>(); + let enc: FixedNull = FixedNull::from(arr); + assert_eq!(enc.all_values(vec![]), exp_values); + } + #[test] fn size() { let v = FixedNull::::from(vec![None, None, Some(100), Some(2222)].as_slice()); @@ -879,36 +961,36 @@ mod test { ); let row_ids = v.row_ids_filter_range( - (100, Operator::GTE), - (240, Operator::LT), + (100, &Operator::GTE), + (240, &Operator::LT), RowIDs::new_vector(), ); assert_eq!(row_ids.to_vec(), vec![0, 1, 5, 6, 13, 17]); let row_ids = v.row_ids_filter_range( - (100, Operator::GT), - (240, Operator::LT), + (100, &Operator::GT), + (240, &Operator::LT), RowIDs::new_vector(), ); assert_eq!(row_ids.to_vec(), vec![1, 6, 13]); let row_ids = v.row_ids_filter_range( - (10, Operator::LT), - (-100, Operator::GT), + (10, &Operator::LT), + (-100, &Operator::GT), RowIDs::new_vector(), ); assert_eq!(row_ids.to_vec(), vec![11, 14, 15]); let row_ids = v.row_ids_filter_range( - (21, Operator::GTE), - (21, Operator::LTE), + (21, &Operator::GTE), + (21, &Operator::LTE), RowIDs::new_vector(), ); assert_eq!(row_ids.to_vec(), vec![16]); let row_ids = v.row_ids_filter_range( - (10000, Operator::LTE), - (3999, Operator::GT), + (10000, &Operator::LTE), + (3999, &Operator::GT), RowIDs::new_vector(), ); assert_eq!(row_ids.to_vec(), Vec::::new()); @@ -926,8 +1008,8 @@ mod test { .as_slice(), ); let row_ids = v.row_ids_filter_range( - (200, Operator::GTE), - (300, Operator::LTE), + (200, &Operator::GTE), + (300, &Operator::LTE), RowIDs::new_vector(), ); assert_eq!(row_ids.to_vec(), vec![1, 2, 4]); diff --git a/read_buffer/src/column/integer.rs b/read_buffer/src/column/integer.rs index aaf03dbc85..f80774574a 100644 --- a/read_buffer/src/column/integer.rs +++ b/read_buffer/src/column/integer.rs @@ -1,12 +1,19 @@ use std::mem::size_of; -use arrow::{self, array::Array}; +use arrow::{ + self, array::Array, datatypes::Int16Type as ArrowInt16Type, + datatypes::Int32Type as ArrowInt32Type, datatypes::Int64Type as ArrowInt64Type, + datatypes::Int8Type as ArrowInt8Type, datatypes::UInt16Type as ArrowUInt16Type, + datatypes::UInt32Type as ArrowUInt32Type, datatypes::UInt64Type as ArrowUInt64Type, + datatypes::UInt8Type as ArrowUInt8Type, +}; use super::encoding::{scalar::Fixed, scalar::FixedNull}; use super::{cmp, Statistics}; use crate::column::{EncodedValues, RowIDs, Scalar, Value, Values}; pub enum IntegerEncoding { + // non-null encodings. These are backed by `Vec` I64I64(Fixed), I64I32(Fixed), I64U32(Fixed), @@ -14,15 +21,97 @@ pub enum IntegerEncoding { I64U16(Fixed), I64I8(Fixed), I64U8(Fixed), - U64U64(Fixed), U64U32(Fixed), U64U16(Fixed), U64U8(Fixed), - // Nullable encodings - TODO, add variants for smaller physical types. - I64I64N(FixedNull), - U64U64N(FixedNull), + // Nullable encodings. These are backed by an Arrow array. + I64I64N(FixedNull), + I64I32N(FixedNull), + I64U32N(FixedNull), + I64I16N(FixedNull), + I64U16N(FixedNull), + I64I8N(FixedNull), + I64U8N(FixedNull), + U64U64N(FixedNull), + U64U32N(FixedNull), + U64U16N(FixedNull), + U64U8N(FixedNull), +} + +impl PartialEq for IntegerEncoding { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::I64I64(a), Self::I64I64(b)) => a == b, + (Self::I64I32(a), Self::I64I32(b)) => a == b, + (Self::I64U32(a), Self::I64U32(b)) => a == b, + (Self::I64I16(a), Self::I64I16(b)) => a == b, + (Self::I64U16(a), Self::I64U16(b)) => a == b, + (Self::I64I8(a), Self::I64I8(b)) => a == b, + (Self::I64U8(a), Self::I64U8(b)) => a == b, + (Self::U64U64(a), Self::U64U64(b)) => a == b, + (Self::U64U32(a), Self::U64U32(b)) => a == b, + (Self::U64U16(a), Self::U64U16(b)) => a == b, + (Self::U64U8(a), Self::U64U8(b)) => a == b, + (Self::I64I64N(a), Self::I64I64N(b)) => { + let a = a.all_values::(vec![]); + let b = b.all_values(vec![]); + a == b + } + (Self::I64I32N(a), Self::I64I32N(b)) => { + let a = a.all_values::(vec![]); + let b = b.all_values(vec![]); + a == b + } + (Self::I64U32N(a), Self::I64U32N(b)) => { + let a = a.all_values::(vec![]); + let b = b.all_values(vec![]); + a == b + } + (Self::I64I16N(a), Self::I64I16N(b)) => { + let a = a.all_values::(vec![]); + let b = b.all_values(vec![]); + a == b + } + (Self::I64U16N(a), Self::I64U16N(b)) => { + let a = a.all_values::(vec![]); + let b = b.all_values(vec![]); + a == b + } + (Self::I64I8N(a), Self::I64I8N(b)) => { + let a = a.all_values::(vec![]); + let b = b.all_values(vec![]); + a == b + } + (Self::I64U8N(a), Self::I64U8N(b)) => { + let a = a.all_values::(vec![]); + let b = b.all_values(vec![]); + a == b + } + (Self::U64U64N(a), Self::U64U64N(b)) => { + let a = a.all_values::(vec![]); + let b = b.all_values(vec![]); + a == b + } + (Self::U64U32N(a), Self::U64U32N(b)) => { + let a = a.all_values::(vec![]); + let b = b.all_values(vec![]); + a == b + } + (Self::U64U16N(a), Self::U64U16N(b)) => { + let a = a.all_values::(vec![]); + let b = b.all_values(vec![]); + a == b + } + (Self::U64U8N(a), Self::U64U8N(b)) => { + let a = a.all_values::(vec![]); + let b = b.all_values(vec![]); + a == b + } + (_, _) => false, + } + } } impl IntegerEncoding { @@ -41,7 +130,16 @@ impl IntegerEncoding { Self::U64U16(enc) => enc.size(), Self::U64U8(enc) => enc.size(), Self::I64I64N(enc) => enc.size(), + Self::I64I32N(enc) => enc.size(), + Self::I64U32N(enc) => enc.size(), + Self::I64I16N(enc) => enc.size(), + Self::I64U16N(enc) => enc.size(), + Self::I64I8N(enc) => enc.size(), + Self::I64U8N(enc) => enc.size(), Self::U64U64N(enc) => enc.size(), + Self::U64U32N(enc) => enc.size(), + Self::U64U16N(enc) => enc.size(), + Self::U64U8N(enc) => enc.size(), } } @@ -68,7 +166,16 @@ impl IntegerEncoding { } Self::I64I64N(enc) => enc.size_raw(include_nulls), + Self::I64I32N(enc) => enc.size_raw(include_nulls), + Self::I64U32N(enc) => enc.size_raw(include_nulls), + Self::I64I16N(enc) => enc.size_raw(include_nulls), + Self::I64U16N(enc) => enc.size_raw(include_nulls), + Self::I64I8N(enc) => enc.size_raw(include_nulls), + Self::I64U8N(enc) => enc.size_raw(include_nulls), Self::U64U64N(enc) => enc.size_raw(include_nulls), + Self::U64U32N(enc) => enc.size_raw(include_nulls), + Self::U64U16N(enc) => enc.size_raw(include_nulls), + Self::U64U8N(enc) => enc.size_raw(include_nulls), } } @@ -87,7 +194,16 @@ impl IntegerEncoding { Self::U64U16(enc) => enc.num_rows(), Self::U64U8(enc) => enc.num_rows(), Self::I64I64N(enc) => enc.num_rows(), + Self::I64I32N(enc) => enc.num_rows(), + Self::I64U32N(enc) => enc.num_rows(), + Self::I64I16N(enc) => enc.num_rows(), + Self::I64U16N(enc) => enc.num_rows(), + Self::I64I8N(enc) => enc.num_rows(), + Self::I64U8N(enc) => enc.num_rows(), Self::U64U64N(enc) => enc.num_rows(), + Self::U64U32N(enc) => enc.num_rows(), + Self::U64U16N(enc) => enc.num_rows(), + Self::U64U8N(enc) => enc.num_rows(), } } @@ -108,7 +224,16 @@ impl IntegerEncoding { pub fn contains_null(&self) -> bool { match self { Self::I64I64N(enc) => enc.contains_null(), + Self::I64I32N(enc) => enc.contains_null(), + Self::I64U32N(enc) => enc.contains_null(), + Self::I64I16N(enc) => enc.contains_null(), + Self::I64U16N(enc) => enc.contains_null(), + Self::I64I8N(enc) => enc.contains_null(), + Self::I64U8N(enc) => enc.contains_null(), Self::U64U64N(enc) => enc.contains_null(), + Self::U64U32N(enc) => enc.contains_null(), + Self::U64U16N(enc) => enc.contains_null(), + Self::U64U8N(enc) => enc.contains_null(), _ => false, } } @@ -128,7 +253,16 @@ impl IntegerEncoding { Self::U64U16(_) => 0, Self::U64U8(_) => 0, Self::I64I64N(enc) => enc.null_count(), + Self::I64I32N(enc) => enc.null_count(), + Self::I64U32N(enc) => enc.null_count(), + Self::I64I16N(enc) => enc.null_count(), + Self::I64U16N(enc) => enc.null_count(), + Self::I64I8N(enc) => enc.null_count(), + Self::I64U8N(enc) => enc.null_count(), Self::U64U64N(enc) => enc.null_count(), + Self::U64U32N(enc) => enc.null_count(), + Self::U64U16N(enc) => enc.null_count(), + Self::U64U8N(enc) => enc.null_count(), } } @@ -136,7 +270,16 @@ impl IntegerEncoding { pub fn has_any_non_null_value(&self) -> bool { match self { Self::I64I64N(enc) => enc.has_any_non_null_value(), + Self::I64I32N(enc) => enc.has_any_non_null_value(), + Self::I64U32N(enc) => enc.has_any_non_null_value(), + Self::I64I16N(enc) => enc.has_any_non_null_value(), + Self::I64U16N(enc) => enc.has_any_non_null_value(), + Self::I64I8N(enc) => enc.has_any_non_null_value(), + Self::I64U8N(enc) => enc.has_any_non_null_value(), Self::U64U64N(enc) => enc.has_any_non_null_value(), + Self::U64U32N(enc) => enc.has_any_non_null_value(), + Self::U64U16N(enc) => enc.has_any_non_null_value(), + Self::U64U8N(enc) => enc.has_any_non_null_value(), _ => true, } } @@ -146,7 +289,16 @@ impl IntegerEncoding { pub fn has_non_null_value(&self, row_ids: &[u32]) -> bool { match self { Self::I64I64N(enc) => enc.has_non_null_value(row_ids), + Self::I64I32N(enc) => enc.has_non_null_value(row_ids), + Self::I64U32N(enc) => enc.has_non_null_value(row_ids), + Self::I64I16N(enc) => enc.has_non_null_value(row_ids), + Self::I64U16N(enc) => enc.has_non_null_value(row_ids), + Self::I64I8N(enc) => enc.has_non_null_value(row_ids), + Self::I64U8N(enc) => enc.has_non_null_value(row_ids), Self::U64U64N(enc) => enc.has_non_null_value(row_ids), + Self::U64U32N(enc) => enc.has_non_null_value(row_ids), + Self::U64U16N(enc) => enc.has_non_null_value(row_ids), + Self::U64U8N(enc) => enc.has_non_null_value(row_ids), _ => !row_ids.is_empty(), // all rows will be non-null } } @@ -158,25 +310,64 @@ impl IntegerEncoding { // `c.value` should return as the logical type // signed 64-bit variants - logical type is i64 for all these - Self::I64I64(c) => Value::Scalar(Scalar::I64(c.value(row_id))), - Self::I64I32(c) => Value::Scalar(Scalar::I64(c.value(row_id))), - Self::I64U32(c) => Value::Scalar(Scalar::I64(c.value(row_id))), - Self::I64I16(c) => Value::Scalar(Scalar::I64(c.value(row_id))), - Self::I64U16(c) => Value::Scalar(Scalar::I64(c.value(row_id))), - Self::I64I8(c) => Value::Scalar(Scalar::I64(c.value(row_id))), - Self::I64U8(c) => Value::Scalar(Scalar::I64(c.value(row_id))), + Self::I64I64(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))), + Self::I64I32(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))), + Self::I64U32(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))), + Self::I64I16(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))), + Self::I64U16(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))), + Self::I64I8(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))), + Self::I64U8(enc) => Value::Scalar(Scalar::I64(enc.value(row_id))), // unsigned 64-bit variants - logical type is u64 for all these - Self::U64U64(c) => Value::Scalar(Scalar::U64(c.value(row_id))), - Self::U64U32(c) => Value::Scalar(Scalar::U64(c.value(row_id))), - Self::U64U16(c) => Value::Scalar(Scalar::U64(c.value(row_id))), - Self::U64U8(c) => Value::Scalar(Scalar::U64(c.value(row_id))), + Self::U64U64(enc) => Value::Scalar(Scalar::U64(enc.value(row_id))), + Self::U64U32(enc) => Value::Scalar(Scalar::U64(enc.value(row_id))), + Self::U64U16(enc) => Value::Scalar(Scalar::U64(enc.value(row_id))), + Self::U64U8(enc) => Value::Scalar(Scalar::U64(enc.value(row_id))), - Self::I64I64N(c) => match c.value(row_id) { + // signed 64-bit variants + Self::I64I64N(enc) => match enc.value(row_id) { Some(v) => Value::Scalar(Scalar::I64(v)), None => Value::Null, }, - Self::U64U64N(c) => match c.value(row_id) { + Self::I64I32N(enc) => match enc.value(row_id) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64U32N(enc) => match enc.value(row_id) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64I16N(enc) => match enc.value(row_id) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64U16N(enc) => match enc.value(row_id) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64I8N(enc) => match enc.value(row_id) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64U8N(enc) => match enc.value(row_id) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + + // unsigned 64-bit variants + Self::U64U64N(enc) => match enc.value(row_id) { + Some(v) => Value::Scalar(Scalar::U64(v)), + None => Value::Null, + }, + Self::U64U32N(enc) => match enc.value(row_id) { + Some(v) => Value::Scalar(Scalar::U64(v)), + None => Value::Null, + }, + Self::U64U16N(enc) => match enc.value(row_id) { + Some(v) => Value::Scalar(Scalar::U64(v)), + None => Value::Null, + }, + Self::U64U8N(enc) => match enc.value(row_id) { Some(v) => Value::Scalar(Scalar::U64(v)), None => Value::Null, }, @@ -190,22 +381,34 @@ impl IntegerEncoding { pub fn values(&self, row_ids: &[u32]) -> Values<'_> { match &self { // signed 64-bit variants - logical type is i64 for all these - Self::I64I64(c) => Values::I64(c.values::(row_ids, vec![])), - Self::I64I32(c) => Values::I64(c.values::(row_ids, vec![])), - Self::I64U32(c) => Values::I64(c.values::(row_ids, vec![])), - Self::I64I16(c) => Values::I64(c.values::(row_ids, vec![])), - Self::I64U16(c) => Values::I64(c.values::(row_ids, vec![])), - Self::I64I8(c) => Values::I64(c.values::(row_ids, vec![])), - Self::I64U8(c) => Values::I64(c.values::(row_ids, vec![])), + Self::I64I64(enc) => Values::I64(enc.values::(row_ids, vec![])), + Self::I64I32(enc) => Values::I64(enc.values::(row_ids, vec![])), + Self::I64U32(enc) => Values::I64(enc.values::(row_ids, vec![])), + Self::I64I16(enc) => Values::I64(enc.values::(row_ids, vec![])), + Self::I64U16(enc) => Values::I64(enc.values::(row_ids, vec![])), + Self::I64I8(enc) => Values::I64(enc.values::(row_ids, vec![])), + Self::I64U8(enc) => Values::I64(enc.values::(row_ids, vec![])), // unsigned 64-bit variants - logical type is u64 for all these - Self::U64U64(c) => Values::U64(c.values::(row_ids, vec![])), - Self::U64U32(c) => Values::U64(c.values::(row_ids, vec![])), - Self::U64U16(c) => Values::U64(c.values::(row_ids, vec![])), - Self::U64U8(c) => Values::U64(c.values::(row_ids, vec![])), + Self::U64U64(enc) => Values::U64(enc.values::(row_ids, vec![])), + Self::U64U32(enc) => Values::U64(enc.values::(row_ids, vec![])), + Self::U64U16(enc) => Values::U64(enc.values::(row_ids, vec![])), + Self::U64U8(enc) => Values::U64(enc.values::(row_ids, vec![])), - Self::I64I64N(c) => Values::I64N(c.values(row_ids, vec![])), - Self::U64U64N(c) => Values::U64N(c.values(row_ids, vec![])), + // signed 64-bit nullable variants - logical type is i64 for all these. + Self::I64I64N(enc) => Values::I64N(enc.values(row_ids, vec![])), + Self::I64I32N(enc) => Values::I64N(enc.values(row_ids, vec![])), + Self::I64U32N(enc) => Values::I64N(enc.values(row_ids, vec![])), + Self::I64I16N(enc) => Values::I64N(enc.values(row_ids, vec![])), + Self::I64U16N(enc) => Values::I64N(enc.values(row_ids, vec![])), + Self::I64I8N(enc) => Values::I64N(enc.values(row_ids, vec![])), + Self::I64U8N(enc) => Values::I64N(enc.values(row_ids, vec![])), + + // unsigned 64-bit nullable variants - logical type is u64 for all these. + Self::U64U64N(enc) => Values::U64N(enc.values(row_ids, vec![])), + Self::U64U32N(enc) => Values::U64N(enc.values(row_ids, vec![])), + Self::U64U16N(enc) => Values::U64N(enc.values(row_ids, vec![])), + Self::U64U8N(enc) => Values::U64N(enc.values(row_ids, vec![])), } } @@ -230,8 +433,20 @@ impl IntegerEncoding { Self::U64U16(c) => Values::U64(c.all_values::(vec![])), Self::U64U8(c) => Values::U64(c.all_values::(vec![])), - Self::I64I64N(c) => Values::I64N(c.all_values(vec![])), - Self::U64U64N(c) => Values::U64N(c.all_values(vec![])), + // signed 64-bit nullable variants - logical type is i64 for all these. + Self::I64I64N(enc) => Values::I64N(enc.all_values(vec![])), + Self::I64I32N(enc) => Values::I64N(enc.all_values(vec![])), + Self::I64U32N(enc) => Values::I64N(enc.all_values(vec![])), + Self::I64I16N(enc) => Values::I64N(enc.all_values(vec![])), + Self::I64U16N(enc) => Values::I64N(enc.all_values(vec![])), + Self::I64I8N(enc) => Values::I64N(enc.all_values(vec![])), + Self::I64U8N(enc) => Values::I64N(enc.all_values(vec![])), + + // unsigned 64-bit nullable variants - logical type is u64 for all these. + Self::U64U64N(enc) => Values::U64N(enc.all_values(vec![])), + Self::U64U32N(enc) => Values::U64N(enc.all_values(vec![])), + Self::U64U16N(enc) => Values::U64N(enc.all_values(vec![])), + Self::U64U8N(enc) => Values::U64N(enc.all_values(vec![])), } } @@ -297,8 +512,20 @@ impl IntegerEncoding { Self::U64U16(c) => c.row_ids_filter(value.as_u16(), op, dst), Self::U64U8(c) => c.row_ids_filter(value.as_u8(), op, dst), - Self::I64I64N(c) => c.row_ids_filter(value.as_i64(), op, dst), - Self::U64U64N(c) => c.row_ids_filter(value.as_u64(), op, dst), + // signed 64-bit nullable variants - logical type is i64 for all these. + Self::I64I64N(enc) => enc.row_ids_filter(value.as_i64(), op, dst), + Self::I64I32N(enc) => enc.row_ids_filter(value.as_i32(), op, dst), + Self::I64U32N(enc) => enc.row_ids_filter(value.as_u32(), op, dst), + Self::I64I16N(enc) => enc.row_ids_filter(value.as_i16(), op, dst), + Self::I64U16N(enc) => enc.row_ids_filter(value.as_u16(), op, dst), + Self::I64I8N(enc) => enc.row_ids_filter(value.as_i8(), op, dst), + Self::I64U8N(enc) => enc.row_ids_filter(value.as_u8(), op, dst), + + // unsigned 64-bit nullable variants - logical type is u64 for all these. + Self::U64U64N(enc) => enc.row_ids_filter(value.as_u64(), op, dst), + Self::U64U32N(enc) => enc.row_ids_filter(value.as_u32(), op, dst), + Self::U64U16N(enc) => enc.row_ids_filter(value.as_u16(), op, dst), + Self::U64U8N(enc) => enc.row_ids_filter(value.as_u8(), op, dst), } } @@ -349,8 +576,41 @@ impl IntegerEncoding { c.row_ids_filter_range((low.1.as_u8(), low.0), (high.1.as_u8(), high.0), dst) } - Self::I64I64N(_) => todo!(), - Self::U64U64N(_) => todo!(), + Self::I64I64N(enc) => { + enc.row_ids_filter_range((low.1.as_i64(), low.0), (high.1.as_i64(), high.0), dst) + } + Self::I64I32N(enc) => { + enc.row_ids_filter_range((low.1.as_i32(), low.0), (high.1.as_i32(), high.0), dst) + } + Self::I64U32N(enc) => { + enc.row_ids_filter_range((low.1.as_u32(), low.0), (high.1.as_u32(), high.0), dst) + } + Self::I64I16N(enc) => { + enc.row_ids_filter_range((low.1.as_i16(), low.0), (high.1.as_i16(), high.0), dst) + } + Self::I64U16N(enc) => { + enc.row_ids_filter_range((low.1.as_u16(), low.0), (high.1.as_u16(), high.0), dst) + } + Self::I64I8N(enc) => { + enc.row_ids_filter_range((low.1.as_i8(), low.0), (high.1.as_i8(), high.0), dst) + } + Self::I64U8N(enc) => { + enc.row_ids_filter_range((low.1.as_u8(), low.0), (high.1.as_u8(), high.0), dst) + } + + // unsigned 64-bit nullable variants - logical type is u64 for all these. + Self::U64U64N(enc) => { + enc.row_ids_filter_range((low.1.as_u64(), low.0), (high.1.as_u64(), high.0), dst) + } + Self::U64U32N(enc) => { + enc.row_ids_filter_range((low.1.as_u32(), low.0), (high.1.as_u32(), high.0), dst) + } + Self::U64U16N(enc) => { + enc.row_ids_filter_range((low.1.as_u16(), low.0), (high.1.as_u16(), high.0), dst) + } + Self::U64U8N(enc) => { + enc.row_ids_filter_range((low.1.as_u8(), low.0), (high.1.as_u8(), high.0), dst) + } } } @@ -367,11 +627,49 @@ impl IntegerEncoding { Self::U64U32(c) => Value::Scalar(Scalar::U64(c.min(row_ids))), Self::U64U16(c) => Value::Scalar(Scalar::U64(c.min(row_ids))), Self::U64U8(c) => Value::Scalar(Scalar::U64(c.min(row_ids))), - Self::I64I64N(c) => match c.min(row_ids) { + + Self::I64I64N(enc) => match enc.min(row_ids) { Some(v) => Value::Scalar(Scalar::I64(v)), None => Value::Null, }, - Self::U64U64N(c) => match c.min(row_ids) { + Self::I64I32N(enc) => match enc.min(row_ids) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64U32N(enc) => match enc.min(row_ids) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64I16N(enc) => match enc.min(row_ids) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64U16N(enc) => match enc.min(row_ids) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64I8N(enc) => match enc.min(row_ids) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64U8N(enc) => match enc.min(row_ids) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + + Self::U64U64N(enc) => match enc.min(row_ids) { + Some(v) => Value::Scalar(Scalar::U64(v)), + None => Value::Null, + }, + Self::U64U32N(enc) => match enc.min(row_ids) { + Some(v) => Value::Scalar(Scalar::U64(v)), + None => Value::Null, + }, + Self::U64U16N(enc) => match enc.min(row_ids) { + Some(v) => Value::Scalar(Scalar::U64(v)), + None => Value::Null, + }, + Self::U64U8N(enc) => match enc.min(row_ids) { Some(v) => Value::Scalar(Scalar::U64(v)), None => Value::Null, }, @@ -391,11 +689,48 @@ impl IntegerEncoding { Self::U64U32(c) => Value::Scalar(Scalar::U64(c.max(row_ids))), Self::U64U16(c) => Value::Scalar(Scalar::U64(c.max(row_ids))), Self::U64U8(c) => Value::Scalar(Scalar::U64(c.max(row_ids))), - Self::I64I64N(c) => match c.max(row_ids) { + Self::I64I64N(enc) => match enc.max(row_ids) { Some(v) => Value::Scalar(Scalar::I64(v)), None => Value::Null, }, - Self::U64U64N(c) => match c.max(row_ids) { + Self::I64I32N(enc) => match enc.max(row_ids) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64U32N(enc) => match enc.max(row_ids) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64I16N(enc) => match enc.max(row_ids) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64U16N(enc) => match enc.max(row_ids) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64I8N(enc) => match enc.max(row_ids) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + Self::I64U8N(enc) => match enc.max(row_ids) { + Some(v) => Value::Scalar(Scalar::I64(v)), + None => Value::Null, + }, + + Self::U64U64N(enc) => match enc.max(row_ids) { + Some(v) => Value::Scalar(Scalar::U64(v)), + None => Value::Null, + }, + Self::U64U32N(enc) => match enc.max(row_ids) { + Some(v) => Value::Scalar(Scalar::U64(v)), + None => Value::Null, + }, + Self::U64U16N(enc) => match enc.max(row_ids) { + Some(v) => Value::Scalar(Scalar::U64(v)), + None => Value::Null, + }, + Self::U64U8N(enc) => match enc.max(row_ids) { Some(v) => Value::Scalar(Scalar::U64(v)), None => Value::Null, }, @@ -415,11 +750,48 @@ impl IntegerEncoding { Self::U64U32(c) => Scalar::U64(c.sum(row_ids)), Self::U64U16(c) => Scalar::U64(c.sum(row_ids)), Self::U64U8(c) => Scalar::U64(c.sum(row_ids)), - Self::I64I64N(c) => match c.sum(row_ids) { + Self::I64I64N(enc) => match enc.sum(row_ids) { Some(v) => Scalar::I64(v), None => Scalar::Null, }, - Self::U64U64N(c) => match c.sum(row_ids) { + Self::I64I32N(enc) => match enc.sum(row_ids) { + Some(v) => Scalar::I64(v), + None => Scalar::Null, + }, + Self::I64U32N(enc) => match enc.sum(row_ids) { + Some(v) => Scalar::I64(v), + None => Scalar::Null, + }, + Self::I64I16N(enc) => match enc.sum(row_ids) { + Some(v) => Scalar::I64(v), + None => Scalar::Null, + }, + Self::I64U16N(enc) => match enc.sum(row_ids) { + Some(v) => Scalar::I64(v), + None => Scalar::Null, + }, + Self::I64I8N(enc) => match enc.sum(row_ids) { + Some(v) => Scalar::I64(v), + None => Scalar::Null, + }, + Self::I64U8N(enc) => match enc.sum(row_ids) { + Some(v) => Scalar::I64(v), + None => Scalar::Null, + }, + + Self::U64U64N(enc) => match enc.sum(row_ids) { + Some(v) => Scalar::U64(v), + None => Scalar::Null, + }, + Self::U64U32N(enc) => match enc.sum(row_ids) { + Some(v) => Scalar::U64(v), + None => Scalar::Null, + }, + Self::U64U16N(enc) => match enc.sum(row_ids) { + Some(v) => Scalar::U64(v), + None => Scalar::Null, + }, + Self::U64U8N(enc) => match enc.sum(row_ids) { Some(v) => Scalar::U64(v), None => Scalar::Null, }, @@ -439,8 +811,17 @@ impl IntegerEncoding { Self::U64U32(c) => c.count(row_ids), Self::U64U16(c) => c.count(row_ids), Self::U64U8(c) => c.count(row_ids), - Self::I64I64N(c) => c.count(row_ids), - Self::U64U64N(c) => c.count(row_ids), + Self::I64I64N(enc) => enc.count(row_ids), + Self::I64I32N(enc) => enc.count(row_ids), + Self::I64U32N(enc) => enc.count(row_ids), + Self::I64I16N(enc) => enc.count(row_ids), + Self::I64U16N(enc) => enc.count(row_ids), + Self::I64I8N(enc) => enc.count(row_ids), + Self::I64U8N(enc) => enc.count(row_ids), + Self::U64U64N(enc) => enc.count(row_ids), + Self::U64U32N(enc) => enc.count(row_ids), + Self::U64U16N(enc) => enc.count(row_ids), + Self::U64U8N(enc) => enc.count(row_ids), } } @@ -459,7 +840,16 @@ impl IntegerEncoding { Self::U64U16(_) => "BT_U16", Self::U64U8(_) => "BT_U8", Self::I64I64N(_) => "None", + Self::I64I32N(_) => "BT_I32N", + Self::I64U32N(_) => "BT_U32N", + Self::I64I16N(_) => "BT_U16N", + Self::I64U16N(_) => "BT_U16N", + Self::I64I8N(_) => "BT_I8N", + Self::I64U8N(_) => "BT_U8N", Self::U64U64N(_) => "None", + Self::U64U32N(_) => "BT_U32N", + Self::U64U16N(_) => "BT_U16N", + Self::U64U8N(_) => "BT_U8N", } } @@ -478,7 +868,16 @@ impl IntegerEncoding { Self::U64U16(_) => "u64", Self::U64U8(_) => "u64", Self::I64I64N(_) => "i64", + Self::I64I32N(_) => "i64", + Self::I64U32N(_) => "i64", + Self::I64I16N(_) => "i64", + Self::I64U16N(_) => "i64", + Self::I64I8N(_) => "i64", + Self::I64U8N(_) => "i64", Self::U64U64N(_) => "u64", + Self::U64U32N(_) => "u64", + Self::U64U16N(_) => "u64", + Self::U64U8N(_) => "u64", } } } @@ -499,7 +898,46 @@ impl std::fmt::Display for IntegerEncoding { Self::U64U16(enc) => write!(f, "[{}]: {}", name, enc), Self::U64U8(enc) => write!(f, "[{}]: {}", name, enc), Self::I64I64N(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64I32N(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64U32N(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64I16N(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64U16N(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64I8N(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64U8N(enc) => write!(f, "[{}]: {}", name, enc), Self::U64U64N(enc) => write!(f, "[{}]: {}", name, enc), + Self::U64U32N(enc) => write!(f, "[{}]: {}", name, enc), + Self::U64U16N(enc) => write!(f, "[{}]: {}", name, enc), + Self::U64U8N(enc) => write!(f, "[{}]: {}", name, enc), + } + } +} + +impl std::fmt::Debug for IntegerEncoding { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let name = self.name(); + match self { + Self::I64I64(enc) => enc.fmt(f), + Self::I64I32(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::I64U32(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::I64I16(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::I64U16(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::I64I8(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::I64U8(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::U64U64(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::U64U32(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::U64U16(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::U64U8(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::I64I64N(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::I64I32N(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::I64U32N(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::I64I16N(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::I64U16N(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::I64I8N(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::I64U8N(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::U64U64N(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::U64U32N(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::U64U16N(enc) => write!(f, "[{}]: {:?}", name, enc), + Self::U64U8N(enc) => write!(f, "[{}]: {:?}", name, enc), } } } @@ -553,17 +991,48 @@ impl From<&[i64]> for IntegerEncoding { /// Converts an Arrow array into an IntegerEncoding. /// -/// TODO(edd): convert underlying type of Arrow data to smallest physical -/// representation. +/// The most compact physical Arrow array type is used to store the column +/// within a `FixedNull` encoding. impl From for IntegerEncoding { fn from(arr: arrow::array::Int64Array) -> Self { if arr.null_count() == 0 { return Self::from(arr.values()); } - // TODO(edd): currently fixed null only supports 64-bit logical/physical - // types. Need to add support for storing as smaller physical types. - Self::I64I64N(FixedNull::::from(arr)) + // determine min and max values. + let min = arrow::compute::kernels::aggregate::min(&arr); + let max = arrow::compute::kernels::aggregate::max(&arr); + + // This match is carefully ordered. It prioritises smaller physical + // datatypes that can safely represent the provided logical data + match (min, max) { + // encode as u8 values + (min, max) if min >= Some(0) && max <= Some(u8::MAX as i64) => { + Self::I64U8N(FixedNull::::from(arr)) + } + // encode as i8 values + (min, max) if min >= Some(i8::MIN as i64) && max <= Some(i8::MAX as i64) => { + Self::I64I8N(FixedNull::::from(arr)) + } + // encode as u16 values + (min, max) if min >= Some(0) && max <= Some(u16::MAX as i64) => { + Self::I64U16N(FixedNull::::from(arr)) + } + // encode as i16 values + (min, max) if min >= Some(i16::MIN as i64) && max <= Some(i16::MAX as i64) => { + Self::I64I16N(FixedNull::::from(arr)) + } + // encode as u32 values + (min, max) if min >= Some(0) && max <= Some(u32::MAX as i64) => { + Self::I64U32N(FixedNull::::from(arr)) + } + // encode as i32 values + (min, max) if min >= Some(i32::MIN as i64) && max <= Some(i32::MAX as i64) => { + Self::I64I32N(FixedNull::::from(arr)) + } + // otherwise, encode with the same physical type (i64) + (_, _) => Self::I64I64N(FixedNull::::from(arr)), + } } } @@ -600,23 +1069,42 @@ impl From<&[u64]> for IntegerEncoding { /// Converts an Arrow array into an IntegerEncoding. /// -/// TODO(edd): convert underlying type of Arrow data to smallest physical -/// representation. +/// The most compact physical Arrow array type is used to store the column +/// within a `FixedNull` encoding. impl From for IntegerEncoding { fn from(arr: arrow::array::UInt64Array) -> Self { if arr.null_count() == 0 { return Self::from(arr.values()); } - // TODO(edd): currently fixed null only supports 64-bit logical/physical - // types. Need to add support for storing as smaller physical types. - Self::U64U64N(FixedNull::::from(arr)) + // determine max value. + let max = arrow::compute::kernels::aggregate::max(&arr); + + // This match is carefully ordered. It prioritises smaller physical + // datatypes that can safely represent the provided logical data + match max { + // encode as u8 values + max if max <= Some(u8::MAX as u64) => { + Self::U64U8N(FixedNull::::from(arr)) + } + // encode as u16 values + max if max <= Some(u16::MAX as u64) => { + Self::U64U16N(FixedNull::::from(arr)) + } + // encode as u32 values + max if max <= Some(u32::MAX as u64) => { + Self::U64U32N(FixedNull::::from(arr)) + } + // otherwise, encode with the same physical type (u64) + _ => Self::U64U64N(FixedNull::::from(arr)), + } } } #[cfg(test)] mod test { - use arrow::datatypes::Int64Type; + use arrow::array::{Int64Array, UInt64Array}; + use std::iter; use super::*; @@ -628,7 +1116,7 @@ mod test { vec![399_i64, 2, 2452, 3], vec![-399_i64, 2, 2452, 3], vec![u32::MAX as i64, 2, 245, 3], - vec![i32::MAX as i64, 2, 245, 3], + vec![i32::MIN as i64, 2, 245, 3], vec![0_i64, 2, 245, u32::MAX as i64 + 1], ]; @@ -642,9 +1130,156 @@ mod test { IntegerEncoding::I64I64(Fixed::::from(cases[6].as_slice())), ]; - for (_case, _exp) in cases.iter().zip(exp.iter()) { - // TODO - add debug - //assert_eq!(IntegerEncoding::from(&case), exp); + for (case, exp) in cases.into_iter().zip(exp.into_iter()) { + assert_eq!(IntegerEncoding::from(case.as_slice()), exp); + } + } + + #[test] + fn from_slice_u64() { + let cases = vec![ + vec![0_u64, 2, 245, 3], + vec![399_u64, 2, 2452, 3], + vec![u32::MAX as u64, 2, 245, 3], + vec![0_u64, 2, 245, u32::MAX as u64 + 1], + ]; + + let exp = vec![ + IntegerEncoding::U64U8(Fixed::::from(cases[0].as_slice())), + IntegerEncoding::U64U16(Fixed::::from(cases[1].as_slice())), + IntegerEncoding::U64U32(Fixed::::from(cases[2].as_slice())), + IntegerEncoding::U64U64(Fixed::::from(cases[3].as_slice())), + ]; + + for (case, exp) in cases.into_iter().zip(exp.into_iter()) { + assert_eq!(IntegerEncoding::from(case.as_slice()), exp); + } + } + + #[test] + fn from_arrow_i64_array() { + let cases = vec![ + vec![0_i64, 2, 245, 3], + vec![0_i64, -120, 127, 3], + vec![399_i64, 2, 2452, 3], + vec![-399_i64, 2, 2452, 3], + vec![u32::MAX as i64, 2, 245, 3], + vec![i32::MIN as i64, 2, 245, 3], + vec![0_i64, 2, 245, u32::MAX as i64 + 1], + ]; + + let exp = vec![ + IntegerEncoding::I64U8(Fixed::::from(cases[0].as_slice())), + IntegerEncoding::I64I8(Fixed::::from(cases[1].as_slice())), + IntegerEncoding::I64U16(Fixed::::from(cases[2].as_slice())), + IntegerEncoding::I64I16(Fixed::::from(cases[3].as_slice())), + IntegerEncoding::I64U32(Fixed::::from(cases[4].as_slice())), + IntegerEncoding::I64I32(Fixed::::from(cases[5].as_slice())), + IntegerEncoding::I64I64(Fixed::::from(cases[6].as_slice())), + ]; + + // for Arrow arrays with no nulls we can store the column using a + // non-nullable fixed encoding + for (case, exp) in cases.iter().cloned().zip(exp.into_iter()) { + let arr = Int64Array::from(case); + assert_eq!(IntegerEncoding::from(arr), exp); + } + + // Tack a NULL onto each of the input cases. + let cases = cases + .iter() + .map(|case| { + case.iter() + .map(|x| Some(*x)) + .chain(iter::repeat(None).take(1)) + .collect::>() + }) + .collect::>(); + + // when a NULL value is present then we need to use a nullable encoding. + let exp = vec![ + IntegerEncoding::I64U8N(FixedNull::::from(Int64Array::from( + cases[0].clone(), + ))), + IntegerEncoding::I64I8N(FixedNull::::from(Int64Array::from( + cases[1].clone(), + ))), + IntegerEncoding::I64U16N(FixedNull::::from(Int64Array::from( + cases[2].clone(), + ))), + IntegerEncoding::I64I16N(FixedNull::::from(Int64Array::from( + cases[3].clone(), + ))), + IntegerEncoding::I64U32N(FixedNull::::from(Int64Array::from( + cases[4].clone(), + ))), + IntegerEncoding::I64I32N(FixedNull::::from(Int64Array::from( + cases[5].clone(), + ))), + IntegerEncoding::I64I64N(FixedNull::::from(Int64Array::from( + cases[6].clone(), + ))), + ]; + + for (case, exp) in cases.into_iter().zip(exp.into_iter()) { + let arr = Int64Array::from(case.clone()); + assert_eq!(IntegerEncoding::from(arr), exp); + } + } + + #[test] + fn from_arrow_u64_array() { + let cases = vec![ + vec![0_u64, 2, 245, 3], + vec![399_u64, 2, 2452, 3], + vec![u32::MAX as u64, 2, 245, 3], + vec![0_u64, 2, 245, u32::MAX as u64 + 1], + ]; + + let exp = vec![ + IntegerEncoding::U64U8(Fixed::::from(cases[0].as_slice())), + IntegerEncoding::U64U16(Fixed::::from(cases[1].as_slice())), + IntegerEncoding::U64U32(Fixed::::from(cases[2].as_slice())), + IntegerEncoding::U64U64(Fixed::::from(cases[3].as_slice())), + ]; + + // for Arrow arrays with no nulls we can store the column using a + // non-nullable fixed encoding + for (case, exp) in cases.iter().cloned().zip(exp.into_iter()) { + let arr = UInt64Array::from(case); + assert_eq!(IntegerEncoding::from(arr), exp); + } + + // Tack a NULL onto each of the input cases. + let cases = cases + .iter() + .map(|case| { + case.iter() + .map(|x| Some(*x)) + .chain(iter::repeat(None).take(1)) + .collect::>() + }) + .collect::>(); + + // when a NULL value is present then we need to use a nullable encoding. + let exp = vec![ + IntegerEncoding::U64U8N(FixedNull::::from(UInt64Array::from( + cases[0].clone(), + ))), + IntegerEncoding::U64U16N(FixedNull::::from(UInt64Array::from( + cases[1].clone(), + ))), + IntegerEncoding::U64U32N(FixedNull::::from(UInt64Array::from( + cases[2].clone(), + ))), + IntegerEncoding::U64U64N(FixedNull::::from(UInt64Array::from( + cases[3].clone(), + ))), + ]; + + for (case, exp) in cases.into_iter().zip(exp.into_iter()) { + let arr = UInt64Array::from(case.clone()); + assert_eq!(IntegerEncoding::from(arr), exp); } } @@ -660,12 +1295,7 @@ mod test { assert_eq!(enc.size_raw(true), 56); assert_eq!(enc.size_raw(false), 56); - let enc = IntegerEncoding::I64I64N(FixedNull::::from(&[2, 22, 12, 31][..])); - // (4 * 8) + 24 - assert_eq!(enc.size_raw(true), 56); - assert_eq!(enc.size_raw(false), 56); - - let enc = IntegerEncoding::I64I64N(FixedNull::::from( + let enc = IntegerEncoding::I64I64N(FixedNull::::from( &[Some(2), Some(22), Some(12), None, None, Some(31)][..], )); // (6 * 8) + 24 diff --git a/server/Cargo.toml b/server/Cargo.toml index da9e614a57..d1ebe30405 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,6 +11,7 @@ arrow_flight = { path = "../arrow_flight" } async-trait = "0.1" bytes = { version = "1.0" } chrono = "0.4" +cached = "0.23.0" crc32fast = "1.2.0" data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } diff --git a/server/src/config.rs b/server/src/config.rs index 5a4dbc12f6..fc4e2d6f8e 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -6,10 +6,14 @@ use std::{ use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName}; use metrics::MetricRegistry; use object_store::{path::ObjectStorePath, ObjectStore}; +use parquet_file::catalog::PreservedCatalog; use query::exec::Executor; /// This module contains code for managing the configuration of the server. -use crate::{db::Db, Error, JobRegistry, Result}; +use crate::{ + db::{catalog::Catalog, Db}, + Error, JobRegistry, Result, +}; use observability_deps::tracing::{self, error, info, warn, Instrument}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -125,6 +129,7 @@ impl Config { server_id: ServerId, object_store: Arc, exec: Arc, + preserved_catalog: PreservedCatalog, ) { let mut state = self.state.write().expect("mutex poisoned"); let name = state @@ -148,7 +153,7 @@ impl Config { exec, write_buffer, Arc::clone(&self.jobs), - Arc::clone(&self.metric_registry), + preserved_catalog, )); let shutdown = self.shutdown.child_token(); @@ -203,6 +208,10 @@ impl Config { info!("database background workers shutdown"); } + + pub fn metrics_registry(&self) -> Arc { + Arc::clone(&self.metric_registry) + } } pub fn object_store_path_for_database_config( @@ -274,9 +283,15 @@ impl<'a> CreateDatabaseHandle<'a> { server_id: ServerId, object_store: Arc, exec: Arc, + preserved_catalog: PreservedCatalog, ) { - self.config - .commit(self.rules.take().unwrap(), server_id, object_store, exec) + self.config.commit( + self.rules.take().unwrap(), + server_id, + object_store, + exec, + preserved_catalog, + ) } pub(crate) fn rules(&self) -> &DatabaseRules { @@ -298,6 +313,8 @@ mod test { use object_store::{memory::InMemory, ObjectStore, ObjectStoreApi}; + use crate::db::load_preserved_catalog; + use super::*; #[tokio::test] @@ -317,7 +334,15 @@ mod test { let server_id = ServerId::try_from(1).unwrap(); let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); let exec = Arc::new(Executor::new(1)); - db_reservation.commit(server_id, store, exec); + let preserved_catalog = load_preserved_catalog( + &name, + Arc::clone(&store), + server_id, + config.metrics_registry(), + ) + .await + .unwrap(); + db_reservation.commit(server_id, store, exec, preserved_catalog); assert!(config.db(&name).is_some()); assert_eq!(config.db_names_sorted(), vec![name.clone()]); @@ -345,7 +370,15 @@ mod test { let server_id = ServerId::try_from(1).unwrap(); let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); let exec = Arc::new(Executor::new(1)); - db_reservation.commit(server_id, store, exec); + let preserved_catalog = load_preserved_catalog( + &name, + Arc::clone(&store), + server_id, + config.metrics_registry(), + ) + .await + .unwrap(); + db_reservation.commit(server_id, store, exec, preserved_catalog); let token = config .state diff --git a/server/src/db.rs b/server/src/db.rs index 62f971ba14..2a1c5276fc 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -310,6 +310,33 @@ pub struct Db { metric_labels: Vec, } +/// Load preserved catalog state from store. +pub async fn load_preserved_catalog( + db_name: &str, + object_store: Arc, + server_id: ServerId, + metrics_registry: Arc, +) -> std::result::Result, parquet_file::catalog::Error> { + let metric_labels = vec![ + KeyValue::new("db_name", db_name.to_string()), + KeyValue::new("svr_id", format!("{}", server_id)), + ]; + let metrics_domain = + metrics_registry.register_domain_with_labels("catalog", metric_labels.clone()); + + PreservedCatalog::new_empty( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + CatalogEmptyInput { + domain: metrics_domain, + metrics_registry: Arc::clone(&metrics_registry), + metric_labels: metric_labels.clone(), + }, + ) + .await +} + impl Db { pub fn new( rules: DatabaseRules, @@ -318,33 +345,19 @@ impl Db { exec: Arc, write_buffer: Option, jobs: Arc, - metrics_registry: Arc, + preserved_catalog: PreservedCatalog, ) -> Self { let db_name = rules.name.clone(); - let metric_labels = vec![ - KeyValue::new("db_name", db_name.to_string()), - KeyValue::new("svr_id", format!("{}", server_id)), - ]; - - let metrics_domain = - metrics_registry.register_domain_with_labels("catalog", metric_labels.clone()); let rules = RwLock::new(rules); let server_id = server_id; let store = Arc::clone(&object_store); let write_buffer = write_buffer.map(Mutex::new); - let catalog = PreservedCatalog::new_empty( - Arc::clone(&object_store), - server_id, - db_name.to_string(), - CatalogEmptyInput { - domain: metrics_domain, - metrics_registry: Arc::clone(&metrics_registry), - metric_labels: metric_labels.clone(), - }, - ); - let system_tables = SystemSchemaProvider::new(&db_name, catalog.state(), Arc::clone(&jobs)); + let system_tables = + SystemSchemaProvider::new(&db_name, preserved_catalog.state(), Arc::clone(&jobs)); let system_tables = Arc::new(system_tables); + let metrics_registry = Arc::clone(&preserved_catalog.state().metrics_registry); + let metric_labels = preserved_catalog.state().metric_labels.clone(); let process_clock = process_clock::ProcessClock::new(); @@ -353,7 +366,7 @@ impl Db { server_id, store, exec, - catalog, + catalog: preserved_catalog, write_buffer, jobs, metrics_registry, @@ -1205,10 +1218,10 @@ mod tests { type Error = Box; type Result = std::result::Result; - #[test] - fn write_no_mutable_buffer() { + #[tokio::test] + async fn write_no_mutable_buffer() { // Validate that writes are rejected if there is no mutable buffer - let db = make_db().db; + let db = make_db().await.db; db.rules.write().lifecycle_rules.immutable = true; let entry = lp_to_entry("cpu bar=1 10"); let res = db.store_entry(entry); @@ -1220,7 +1233,7 @@ mod tests { #[tokio::test] async fn read_write() { - let db = Arc::new(make_db().db); + let db = Arc::new(make_db().await.db); write_lp(&db, "cpu bar=1 10"); let batches = run_query(db, "select * from cpu").await; @@ -1252,7 +1265,7 @@ mod tests { #[tokio::test] async fn metrics_during_rollover() { - let test_db = make_db(); + let test_db = make_db().await; let db = Arc::new(test_db.db); write_lp(db.as_ref(), "cpu bar=1 10"); @@ -1372,7 +1385,7 @@ mod tests { #[tokio::test] async fn write_with_rollover() { - let db = Arc::new(make_db().db); + let db = Arc::new(make_db().await.db); write_lp(db.as_ref(), "cpu bar=1 10"); assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap()); @@ -1420,7 +1433,7 @@ mod tests { #[tokio::test] async fn write_with_missing_tags_are_null() { - let db = Arc::new(make_db().db); + let db = Arc::new(make_db().await.db); // Note the `region` tag is introduced in the second line, so // the values in prior rows for the region column are // null. Likewise the `core` tag is introduced in the third @@ -1457,7 +1470,7 @@ mod tests { #[tokio::test] async fn read_from_read_buffer() { // Test that data can be loaded into the ReadBuffer - let test_db = make_db(); + let test_db = make_db().await; let db = Arc::new(test_db.db); write_lp(db.as_ref(), "cpu bar=1 10"); @@ -1543,7 +1556,7 @@ mod tests { #[tokio::test] async fn load_to_read_buffer_sorted() { - let test_db = make_db(); + let test_db = make_db().await; let db = Arc::new(test_db.db); write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10"); @@ -1655,7 +1668,8 @@ mod tests { .server_id(server_id) .object_store(Arc::clone(&object_store)) .db_name(db_name) - .build(); + .build() + .await; let db = Arc::new(test_db.db); @@ -1754,7 +1768,8 @@ mod tests { .server_id(server_id) .object_store(Arc::clone(&object_store)) .db_name(db_name) - .build(); + .build() + .await; let db = Arc::new(test_db.db); @@ -1866,9 +1881,9 @@ mod tests { assert_batches_eq!(expected, &record_batches); } - #[test] - fn write_updates_last_write_at() { - let db = Arc::new(make_db().db); + #[tokio::test] + async fn write_updates_last_write_at() { + let db = Arc::new(make_db().await.db); let before_create = Utc::now(); let partition_key = "1970-01-01T00"; @@ -1896,7 +1911,7 @@ mod tests { #[tokio::test] async fn test_chunk_timestamps() { let start = Utc::now(); - let db = Arc::new(make_db().db); + let db = Arc::new(make_db().await.db); // Given data loaded into two chunks write_lp(&db, "cpu bar=1 10"); @@ -1931,9 +1946,9 @@ mod tests { assert!(chunk.time_closed().unwrap() < after_rollover); } - #[test] - fn test_chunk_closing() { - let db = Arc::new(make_db().db); + #[tokio::test] + async fn test_chunk_closing() { + let db = Arc::new(make_db().await.db); db.rules.write().lifecycle_rules.mutable_size_threshold = Some(NonZeroUsize::new(2).unwrap()); @@ -1952,9 +1967,9 @@ mod tests { assert!(matches!(chunks[1].read().state(), ChunkState::Closed(_))); } - #[test] - fn chunks_sorted_by_times() { - let db = Arc::new(make_db().db); + #[tokio::test] + async fn chunks_sorted_by_times() { + let db = Arc::new(make_db().await.db); write_lp(&db, "cpu val=1 1"); write_lp(&db, "mem val=2 400000000000001"); write_lp(&db, "cpu val=1 2"); @@ -1987,7 +2002,7 @@ mod tests { #[tokio::test] async fn chunk_id_listing() { // Test that chunk id listing is hooked up - let db = Arc::new(make_db().db); + let db = Arc::new(make_db().await.db); let partition_key = "1970-01-01T00"; write_lp(&db, "cpu bar=1 10"); @@ -2056,7 +2071,7 @@ mod tests { #[tokio::test] async fn partition_chunk_summaries() { // Test that chunk id listing is hooked up - let db = Arc::new(make_db().db); + let db = Arc::new(make_db().await.db); write_lp(&db, "cpu bar=1 1"); db.rollover_partition("1970-01-01T00", "cpu").await.unwrap(); @@ -2104,7 +2119,7 @@ mod tests { #[tokio::test] async fn partition_chunk_summaries_timestamp() { - let db = Arc::new(make_db().db); + let db = Arc::new(make_db().await.db); let start = Utc::now(); write_lp(&db, "cpu bar=1 1"); let after_first_write = Utc::now(); @@ -2155,7 +2170,7 @@ mod tests { #[tokio::test] async fn chunk_summaries() { // Test that chunk id listing is hooked up - let db = Arc::new(make_db().db); + let db = Arc::new(make_db().await.db); // get three chunks: one open, one closed in mb and one close in rb write_lp(&db, "cpu bar=1 1"); @@ -2250,7 +2265,7 @@ mod tests { #[tokio::test] async fn partition_summaries() { // Test that chunk id listing is hooked up - let db = Arc::new(make_db().db); + let db = Arc::new(make_db().await.db); write_lp(&db, "cpu bar=1 1"); let chunk_id = db @@ -2460,7 +2475,7 @@ mod tests { #[tokio::test] async fn write_chunk_to_object_store_in_background() { // Test that data can be written to object store using a background task - let db = Arc::new(make_db().db); + let db = Arc::new(make_db().await.db); // create MB partition write_lp(db.as_ref(), "cpu bar=1 10"); @@ -2501,9 +2516,9 @@ mod tests { assert_eq!(read_parquet_file_chunk_ids(&db, partition_key), vec![0]); } - #[test] - fn write_hard_limit() { - let db = Arc::new(make_db().db); + #[tokio::test] + async fn write_hard_limit() { + let db = Arc::new(make_db().await.db); db.rules.write().lifecycle_rules.buffer_size_hard = Some(NonZeroUsize::new(10).unwrap()); // inserting first line does not trigger hard buffer limit @@ -2516,9 +2531,9 @@ mod tests { )); } - #[test] - fn write_goes_to_write_buffer_if_configured() { - let db = Arc::new(TestDb::builder().write_buffer(true).build().db); + #[tokio::test] + async fn write_goes_to_write_buffer_if_configured() { + let db = Arc::new(TestDb::builder().write_buffer(true).build().await.db); assert_eq!(db.write_buffer.as_ref().unwrap().lock().size(), 0); write_lp(db.as_ref(), "cpu bar=1 10"); @@ -2536,7 +2551,8 @@ mod tests { .server_id(server_id) .object_store(Arc::clone(&object_store)) .db_name(db_name) - .build(); + .build() + .await; let db = Arc::new(test_db.db); @@ -2704,9 +2720,22 @@ mod tests { .object_store(Arc::clone(&object_store)) .server_id(server_id) .db_name(db_name) - .build(); + .build() + .await; let db = Arc::new(test_db.db); + // at this point, an empty preserved catalog exists + let maybe_preserved_catalog = + PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await + .unwrap(); + assert!(maybe_preserved_catalog.is_some()); + // Write some line protocols in Mutable buffer of the DB write_lp(db.as_ref(), "cpu bar=1 10"); @@ -2722,18 +2751,6 @@ mod tests { .await .unwrap(); - // at this point, no preserved catalog exists - let maybe_preserved_catalog = - PreservedCatalog::::load( - Arc::clone(&object_store), - server_id, - db_name.to_string(), - (), - ) - .await - .unwrap(); - assert!(maybe_preserved_catalog.is_none()); - // Write the RB chunk to Object Store but keep it in RB db.write_chunk_to_object_store(partition_key, "cpu", mb_chunk.id()) .await diff --git a/server/src/db/process_clock.rs b/server/src/db/process_clock.rs index 3e173cfbb1..2c05514133 100644 --- a/server/src/db/process_clock.rs +++ b/server/src/db/process_clock.rs @@ -75,11 +75,11 @@ mod tests { use entry::test_helpers::lp_to_entry; use std::{sync::Arc, thread, time::Duration}; - #[test] - fn process_clock_defaults_to_current_time_in_ns() { + #[tokio::test] + async fn process_clock_defaults_to_current_time_in_ns() { let before = system_clock_now(); - let db = Arc::new(TestDb::builder().build().db); + let db = Arc::new(TestDb::builder().build().await.db); let db_process_clock = db.process_clock.inner.load(Ordering::SeqCst); let after = system_clock_now(); @@ -98,12 +98,12 @@ mod tests { ); } - #[test] - fn process_clock_incremented_and_set_on_sequenced_entry() { + #[tokio::test] + async fn process_clock_incremented_and_set_on_sequenced_entry() { let before = system_clock_now(); let before = ClockValue::try_from(before).unwrap(); - let db = Arc::new(TestDb::builder().write_buffer(true).build().db); + let db = Arc::new(TestDb::builder().write_buffer(true).build().await.db); let entry = lp_to_entry("cpu bar=1 10"); db.store_entry(entry).unwrap(); @@ -147,10 +147,10 @@ mod tests { ); } - #[test] - fn next_process_clock_always_increments() { + #[tokio::test] + async fn next_process_clock_always_increments() { // Process clock defaults to the current time - let db = Arc::new(TestDb::builder().write_buffer(true).build().db); + let db = Arc::new(TestDb::builder().write_buffer(true).build().await.db); // Set the process clock value to a time in the future, so that when compared to the // current time, the process clock value will be greater diff --git a/server/src/lib.rs b/server/src/lib.rs index bfeb2b5406..40d67d9383 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -72,6 +72,8 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::BytesMut; +use cached::proc_macro::cached; +use db::load_preserved_catalog; use futures::stream::TryStreamExt; use observability_deps::tracing::{error, info, warn}; use parking_lot::Mutex; @@ -175,6 +177,8 @@ pub enum Error { }, #[snafu(display("remote error: {}", source))] RemoteError { source: ConnectionManagerError }, + #[snafu(display("cannot load catalog: {}", source))] + CatalogLoadError { source: DatabaseError }, } pub type Result = std::result::Result; @@ -418,12 +422,26 @@ impl Server { pub async fn create_database(&self, rules: DatabaseRules, server_id: ServerId) -> Result<()> { // Return an error if this server hasn't yet been setup with an id self.require_id()?; - let db_reservation = self.config.create_db(rules)?; + let preserved_catalog = load_preserved_catalog( + rules.db_name(), + Arc::clone(&self.store), + server_id, + self.config.metrics_registry(), + ) + .await + .map_err(|e| Box::new(e) as _) + .context(CatalogLoadError)?; + + let db_reservation = self.config.create_db(rules)?; self.persist_database_rules(db_reservation.rules().clone()) .await?; - - db_reservation.commit(server_id, Arc::clone(&self.store), Arc::clone(&self.exec)); + db_reservation.commit( + server_id, + Arc::clone(&self.store), + Arc::clone(&self.exec), + preserved_catalog, + ); Ok(()) } @@ -507,10 +525,24 @@ impl Server { Err(e) => { error!("error parsing database config {:?} from store: {}", path, e) } - Ok(rules) => match config.create_db(rules) { - Err(e) => error!("error adding database to config: {}", e), - Ok(handle) => handle.commit(server_id, store, exec), - }, + Ok(rules) => { + match load_preserved_catalog( + rules.db_name(), + Arc::clone(&store), + server_id, + config.metrics_registry(), + ) + .await + { + Err(e) => error!("cannot load database: {}", e), + Ok(preserved_catalog) => match config.create_db(rules) { + Err(e) => error!("error adding database to config: {}", e), + Ok(handle) => { + handle.commit(server_id, store, exec, preserved_catalog) + } + }, + } + } }; }) }) @@ -905,17 +937,25 @@ impl ConnectionManager for ConnectionManagerImpl { &self, connect: &str, ) -> Result, ConnectionManagerError> { - // TODO(mkm): cache the connections - let connection = Builder::default() - .build(connect) - .await - .map_err(|e| Box::new(e) as _) - .context(RemoteServerConnectError)?; - let client = write::Client::new(connection); - Ok(Arc::new(RemoteServerImpl { client })) + cached_remote_server(connect.to_string()).await } } +// cannot be an associated function +// argument need to have static lifetime because they become caching keys +#[cached(result = true)] +async fn cached_remote_server( + connect: String, +) -> Result, ConnectionManagerError> { + let connection = Builder::default() + .build(&connect) + .await + .map_err(|e| Box::new(e) as _) + .context(RemoteServerConnectError)?; + let client = write::Client::new(connection); + Ok(Arc::new(RemoteServerImpl { client })) +} + /// An implementation for communicating with other IOx servers. This should /// be moved into and implemented in an influxdb_iox_client create at a later /// date. diff --git a/server/src/query_tests/influxrpc/read_window_aggregate.rs b/server/src/query_tests/influxrpc/read_window_aggregate.rs index c97fd53eb6..9fee6da726 100644 --- a/server/src/query_tests/influxrpc/read_window_aggregate.rs +++ b/server/src/query_tests/influxrpc/read_window_aggregate.rs @@ -162,7 +162,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths { // partition keys are: ["2020-03-02T00", "2020-03-01T00", "2020-04-01T00", // "2020-04-02T00"] - let db = make_db().db; + let db = make_db().await.db; let data = lp_lines.join("\n"); write_lp(&db, &data); let scenario1 = DbScenario { @@ -170,7 +170,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths { db, }; - let db = make_db().db; + let db = make_db().await.db; let data = lp_lines.join("\n"); write_lp(&db, &data); db.rollover_partition("2020-03-01T00", "h2o").await.unwrap(); @@ -182,7 +182,7 @@ impl DbSetup for MeasurementForWindowAggregateMonths { db, }; - let db = make_db().db; + let db = make_db().await.db; let data = lp_lines.join("\n"); write_lp(&db, &data); // roll over and load chunks into both RUB and OS diff --git a/server/src/query_tests/scenarios.rs b/server/src/query_tests/scenarios.rs index 73d90b6fdc..f6c74e3a94 100644 --- a/server/src/query_tests/scenarios.rs +++ b/server/src/query_tests/scenarios.rs @@ -36,7 +36,7 @@ impl DbSetup for NoData { // Scenario 1: No data in the DB yet // - let db = make_db().db; + let db = make_db().await.db; let scenario1 = DbScenario { scenario_name: "New, Empty Database".into(), db, @@ -45,7 +45,7 @@ impl DbSetup for NoData { // Scenario 2: listing partitions (which may create an entry in a map) // in an empty database // - let db = make_db().db; + let db = make_db().await.db; assert_eq!(count_mutable_buffer_chunks(&db), 0); assert_eq!(count_read_buffer_chunks(&db), 0); assert_eq!(count_object_store_chunks(&db), 0); @@ -56,7 +56,7 @@ impl DbSetup for NoData { // Scenario 3: the database has had data loaded into RB and then deleted // - let db = make_db().db; + let db = make_db().await.db; let data = "cpu,region=west user=23.2 100"; write_lp(&db, data); // move data out of open chunk @@ -94,7 +94,7 @@ impl DbSetup for NoData { // Scenario 4: the database has had data loaded into RB & Object Store and then deleted // - let db = make_db().db; + let db = make_db().await.db; let data = "cpu,region=west user=23.2 100"; write_lp(&db, data); // move data out of open chunk @@ -253,7 +253,7 @@ pub struct TwoMeasurementsManyFieldsOneChunk {} #[async_trait] impl DbSetup for TwoMeasurementsManyFieldsOneChunk { async fn make(&self) -> Vec { - let db = make_db().db; + let db = make_db().await.db; let lp_lines = vec![ "h2o,state=MA,city=Boston temp=70.4 50", @@ -277,7 +277,7 @@ pub struct TwoMeasurementsManyFieldsTwoChunks {} #[async_trait] impl DbSetup for TwoMeasurementsManyFieldsTwoChunks { async fn make(&self) -> Vec { - let db = make_db().db; + let db = make_db().await.db; let partition_key = "1970-01-01T00"; @@ -314,7 +314,7 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle { async fn make(&self) -> Vec { let partition_key = "1970-01-01T00"; - let db = std::sync::Arc::new(make_db().db); + let db = std::sync::Arc::new(make_db().await.db); write_lp( &db, @@ -397,7 +397,7 @@ impl DbSetup for EndToEndTest { let lp_data = lp_lines.join("\n"); - let db = make_db().db; + let db = make_db().await.db; write_lp(&db, &lp_data); let scenario1 = DbScenario { @@ -413,7 +413,7 @@ impl DbSetup for EndToEndTest { /// pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> Vec { // Scenario 1: One open chunk in MUB - let db = make_db().db; + let db = make_db().await.db; write_lp(&db, data); let scenario1 = DbScenario { scenario_name: "Data in open chunk of mutable buffer".into(), @@ -421,7 +421,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> }; // Scenario 2: One closed chunk in MUB - let db = make_db().db; + let db = make_db().await.db; let table_names = write_lp(&db, data); for table_name in &table_names { db.rollover_partition(partition_key, &table_name) @@ -434,7 +434,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> }; // Scenario 3: One closed chunk in RUB - let db = make_db().db; + let db = make_db().await.db; let table_names = write_lp(&db, data); for table_name in &table_names { db.rollover_partition(partition_key, &table_name) @@ -450,7 +450,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> }; // Scenario 4: One closed chunk in both RUb and OS - let db = make_db().db; + let db = make_db().await.db; let table_names = write_lp(&db, data); for table_name in &table_names { db.rollover_partition(partition_key, &table_name) @@ -469,7 +469,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> }; // Scenario 5: One closed chunk in OS only - let db = make_db().db; + let db = make_db().await.db; let table_names = write_lp(&db, data); for table_name in &table_names { db.rollover_partition(partition_key, &table_name) @@ -504,7 +504,7 @@ pub async fn make_two_chunk_scenarios( data1: &str, data2: &str, ) -> Vec { - let db = make_db().db; + let db = make_db().await.db; write_lp(&db, data1); write_lp(&db, data2); let scenario1 = DbScenario { @@ -513,7 +513,7 @@ pub async fn make_two_chunk_scenarios( }; // spread across 2 mutable buffer chunks - let db = make_db().db; + let db = make_db().await.db; let table_names = write_lp(&db, data1); for table_name in &table_names { db.rollover_partition(partition_key, &table_name) @@ -527,7 +527,7 @@ pub async fn make_two_chunk_scenarios( }; // spread across 1 mutable buffer, 1 read buffer chunks - let db = make_db().db; + let db = make_db().await.db; let table_names = write_lp(&db, data1); for table_name in &table_names { db.rollover_partition(partition_key, &table_name) @@ -544,7 +544,7 @@ pub async fn make_two_chunk_scenarios( }; // in 2 read buffer chunks - let db = make_db().db; + let db = make_db().await.db; let table_names = write_lp(&db, data1); for table_name in &table_names { db.rollover_partition(partition_key, &table_name) @@ -571,7 +571,7 @@ pub async fn make_two_chunk_scenarios( }; // in 2 read buffer chunks that also loaded into object store - let db = make_db().db; + let db = make_db().await.db; let table_names = write_lp(&db, data1); for table_name in &table_names { db.rollover_partition(partition_key, &table_name) @@ -606,7 +606,7 @@ pub async fn make_two_chunk_scenarios( }; // Scenario 6: Two closed chunk in OS only - let db = make_db().db; + let db = make_db().await.db; let table_names = write_lp(&db, data1); for table_name in &table_names { db.rollover_partition(partition_key, &table_name) diff --git a/server/src/query_tests/utils.rs b/server/src/query_tests/utils.rs index ab569204d6..e2dda6e798 100644 --- a/server/src/query_tests/utils.rs +++ b/server/src/query_tests/utils.rs @@ -7,7 +7,11 @@ use data_types::{ use object_store::{memory::InMemory, ObjectStore}; use query::{exec::Executor, Database}; -use crate::{buffer::Buffer, db::Db, JobRegistry}; +use crate::{ + buffer::Buffer, + db::{load_preserved_catalog, Db}, + JobRegistry, +}; use std::{borrow::Cow, convert::TryFrom, sync::Arc}; // A wrapper around a Db and a metrics registry allowing for isolated testing @@ -37,7 +41,7 @@ impl TestDbBuilder { Self::default() } - pub fn build(self) -> TestDb { + pub async fn build(self) -> TestDb { let server_id = self .server_id .unwrap_or_else(|| ServerId::try_from(1).unwrap()); @@ -64,9 +68,17 @@ impl TestDbBuilder { } else { None }; + let preserved_catalog = load_preserved_catalog( + db_name.as_str(), + Arc::clone(&object_store), + server_id, + Arc::clone(&metrics_registry), + ) + .await + .unwrap(); TestDb { - metric_registry: metrics::TestMetricRegistry::new(Arc::clone(&metrics_registry)), + metric_registry: metrics::TestMetricRegistry::new(metrics_registry), db: Db::new( DatabaseRules::new(db_name), server_id, @@ -74,7 +86,7 @@ impl TestDbBuilder { exec, write_buffer, Arc::new(JobRegistry::new()), - metrics_registry, + preserved_catalog, ), } } @@ -101,8 +113,8 @@ impl TestDbBuilder { } /// Used for testing: create a Database with a local store -pub fn make_db() -> TestDb { - TestDb::builder().build() +pub async fn make_db() -> TestDb { + TestDb::builder().build().await } fn chunk_summary_iter(db: &Db) -> impl Iterator + '_ { diff --git a/server/src/snapshot.rs b/server/src/snapshot.rs index 28eef0076c..98ef630aea 100644 --- a/server/src/snapshot.rs +++ b/server/src/snapshot.rs @@ -279,6 +279,7 @@ cpu,host=B,region=east user=10.0,system=74.1 1 let db = TestDb::builder() .object_store(Arc::new(ObjectStore::new_in_memory(InMemory::new()))) .build() + .await .db; write_lp(&db, &lp); diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index 08d2fd1ddb..9bebdfe420 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -26,21 +26,24 @@ use server::{ConnectionManager, Server as AppServer}; use bytes::{Bytes, BytesMut}; use futures::{self, StreamExt}; use http::header::{CONTENT_ENCODING, CONTENT_TYPE}; -use hyper::{Body, Method, Request, Response, StatusCode}; +use hyper::{http::HeaderValue, Body, Method, Request, Response, StatusCode}; use observability_deps::{ opentelemetry::KeyValue, - tracing::{self, debug, error}, + tracing::{self, debug, error, info}, }; use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError, RouterService}; use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; use hyper::server::conn::AddrIncoming; +use pprof::protos::Message; +use std::num::NonZeroI32; use std::{ fmt::Debug, str::{self, FromStr}, sync::Arc, }; +use tokio::time::Duration; use tokio_util::sync::CancellationToken; /// Constants used in API error codes. @@ -217,6 +220,15 @@ pub enum ApplicationError { partition: String, table_name: String, }, + + #[snafu(display("PProf error: {}", source))] + PProf { source: pprof::Error }, + + #[snafu(display("Protobuf error: {}", source))] + Prost { source: prost::EncodeError }, + + #[snafu(display("Empty flamegraph"))] + EmptyFlamegraph, } impl ApplicationError { @@ -251,6 +263,9 @@ impl ApplicationError { Self::ParsingFormat { .. } => self.bad_request(), Self::Planning { .. } => self.bad_request(), Self::NoSnapshot { .. } => self.not_modified(), + Self::PProf { .. } => self.internal_error(), + Self::Prost { .. } => self.internal_error(), + Self::EmptyFlamegraph => self.no_content(), } } @@ -282,6 +297,13 @@ impl ApplicationError { .unwrap() } + fn no_content(&self) -> Response { + Response::builder() + .status(StatusCode::NO_CONTENT) + .body(self.body()) + .unwrap() + } + fn body(&self) -> Body { let json = serde_json::json!({"error": self.to_string(), "error_code": self.api_error_code()}) @@ -340,6 +362,8 @@ where .get("/iox/api/v1/databases/:name/query", query::) .get("/api/v1/partitions", list_partitions::) .post("/api/v1/snapshot", snapshot_partition::) + .get("/debug/pprof", pprof_home::) + .get("/debug/pprof/profile", pprof_profile::) // Specify the error handler to handle any errors caused by // a route or any middleware. .err_handler_with_info(error_handler) @@ -785,6 +809,97 @@ async fn snapshot_partition( + req: Request, +) -> Result, ApplicationError> { + let default_host = HeaderValue::from_static("localhost"); + let host = req + .headers() + .get("host") + .unwrap_or(&default_host) + .to_str() + .unwrap_or_default(); + let cmd = format!( + "/debug/pprof/profile?seconds={}", + PProfArgs::default_seconds() + ); + Ok(Response::new(Body::from(format!( + r#"http://{}{}"#, + cmd, host, cmd + )))) +} + +async fn dump_rsprof(seconds: u64, frequency: i32) -> pprof::Result { + let guard = pprof::ProfilerGuard::new(frequency)?; + info!( + "start profiling {} seconds with frequency {} /s", + seconds, frequency + ); + + tokio::time::sleep(Duration::from_secs(seconds)).await; + + info!( + "done profiling {} seconds with frequency {} /s", + seconds, frequency + ); + guard.report().build() +} + +#[derive(Debug, Deserialize)] +struct PProfArgs { + #[serde(default = "PProfArgs::default_seconds")] + seconds: u64, + #[serde(default = "PProfArgs::default_frequency")] + frequency: NonZeroI32, +} + +impl PProfArgs { + fn default_seconds() -> u64 { + 30 + } + + // 99Hz to avoid coinciding with special periods + fn default_frequency() -> NonZeroI32 { + NonZeroI32::new(99).unwrap() + } +} + +#[tracing::instrument(level = "debug")] +async fn pprof_profile( + req: Request, +) -> Result, ApplicationError> { + let query_string = req.uri().query().unwrap_or_default(); + let query: PProfArgs = + serde_urlencoded::from_str(query_string).context(InvalidQueryString { query_string })?; + + let report = dump_rsprof(query.seconds, query.frequency.get()) + .await + .context(PProf)?; + + let mut body: Vec = Vec::new(); + + // render flamegraph when opening in the browser + // otherwise render as protobuf; works great with: go tool pprof http://..../debug/pprof/profile + if req + .headers() + .get_all("Accept") + .iter() + .flat_map(|i| i.to_str().unwrap_or_default().split(',')) + .any(|i| i == "text/html" || i == "image/svg+xml") + { + report.flamegraph(&mut body).context(PProf)?; + if body.is_empty() { + return EmptyFlamegraph.fail(); + } + } else { + let profile = report.pprof().context(PProf)?; + profile.encode(&mut body).context(Prost)?; + } + + Ok(Response::new(Body::from(body))) +} + pub async fn serve( addr: AddrIncoming, server: Arc>,