diff --git a/Cargo.lock b/Cargo.lock index 5f8042202d..6d3e0f6810 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -127,13 +127,13 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "4.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f3334cea4f209440350d00ae1dab237ced49d80b664cc4b0e984893d583890" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/perf_integration#d416e9158275148e2be5e64a1c8a6689c7a83fac" dependencies = [ - "cfg_aliases", + "bitflags", "chrono", "csv", - "flatbuffers 0.8.4", + "flatbuffers", + "getrandom 0.2.3", "hex", "indexmap", "lazy_static", @@ -141,7 +141,7 @@ dependencies = [ "multiversion", "num", "prettytable-rs", - "rand 0.7.3", + "rand 0.8.4", "regex", "serde", "serde_derive", @@ -151,12 +151,11 @@ dependencies = [ [[package]] name = "arrow-flight" version = "4.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8148336a3dcb02497a7f851e247cfd53d5b669e439b9bcf7d7eb6d8f5c8103b" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/perf_integration#d416e9158275148e2be5e64a1c8a6689c7a83fac" dependencies = [ "arrow", + "base64 0.13.0", "bytes", - "futures", "proc-macro2", "prost", "prost-derive", @@ -509,12 +508,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "cfg_aliases" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" - [[package]] name = "chrono" version = "0.4.19" @@ -848,7 +841,7 @@ dependencies = [ [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=75a376f0ff2e8236c07a3b0a16374b7e3855c194#75a376f0ff2e8236c07a3b0a16374b7e3855c194" +source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/perf_integration_df#9ef4a257cf8b7717df60de56b9e17c6bd7286cd4" dependencies = [ "ahash 0.7.4", "arrow", @@ -1021,7 +1014,7 @@ version = "0.1.0" dependencies = [ "chrono", "data_types", - "flatbuffers 2.0.0", + "flatbuffers", "generated_types", "influxdb_line_protocol", "internal_types", @@ -1090,17 +1083,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" -[[package]] -name = "flatbuffers" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3c502342b7d6d73beb1b8bab39dc01deba0c8ef66f4e6f1eba7c69ee6b38069" -dependencies = [ - "bitflags", - "smallvec", - "thiserror", -] - [[package]] name = "flatbuffers" version = "2.0.0" @@ -1296,6 +1278,7 @@ dependencies = [ "futures", "google_types", "observability_deps", + "proc-macro2", "prost", "prost-build", "prost-types", @@ -2610,8 +2593,7 @@ dependencies = [ [[package]] name = "parquet" version = "4.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "265044e41d674fad4c7860a3e245e53138e926fe83cad8d45193a7a354c56a54" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/perf_integration#d416e9158275148e2be5e64a1c8a6689c7a83fac" dependencies = [ "arrow", "base64 0.13.0", @@ -2622,6 +2604,7 @@ dependencies = [ "lz4", "num-bigint 0.4.0", "parquet-format", + "rand 0.8.4", "snap", "thrift", "zstd", @@ -2959,9 +2942,9 @@ checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" [[package]] name = "proc-macro2" -version = "1.0.24" +version = "1.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71" +checksum = "f0d8caf72986c1a598726adc988bb5984792ef84f5ee5aa50209145ee8077038" dependencies = [ "unicode-xid", ] @@ -3782,7 +3765,7 @@ dependencies = [ "datafusion 0.1.0", "datafusion_util", "entry", - "flatbuffers 2.0.0", + "flatbuffers", "futures", "futures-util", "generated_types", @@ -4111,9 +4094,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.67" +version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6498a9efc342871f91cc2d0d694c674368b4ceb40f62b65a7a08c3792935e702" +checksum = "f71489ff30030d2ae598524f61326b902466f72a0fb1a8564c001cc63425bcc7" dependencies = [ "proc-macro2", "quote", @@ -4995,18 +4978,18 @@ checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd" [[package]] name = "zstd" -version = "0.8.3+zstd.1.5.0" +version = "0.9.0+zstd.1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ea7094c7b4a58fbd738eb0d4a2fc7684a0e6949a31597e074ffe20a07cbc2bf" +checksum = "07749a5dc2cb6b36661290245e350f15ec3bbb304e493db54a1d354480522ccd" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "4.1.0+zstd.1.5.0" +version = "4.1.1+zstd.1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d30375f78e185ca4c91930f42ea2c0162f9aa29737032501f93b79266d985ae7" +checksum = "c91c90f2c593b003603e5e0493c837088df4469da25aafff8bce42ba48caf079" dependencies = [ "libc", "zstd-sys", @@ -5014,9 +4997,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "1.6.0+zstd.1.5.0" +version = "1.6.1+zstd.1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2141bed8922b427761470e6bbfeff255da94fa20b0bbeab0d9297fcaf71e3aa7" +checksum = "615120c7a2431d16cf1cf979e7fc31ba7a5b5e5707b29c8a99e5dbf8a8392a33" dependencies = [ "cc", "libc", diff --git a/Cargo.toml b/Cargo.toml index 2f49612710..c8e0b7879b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -137,3 +137,9 @@ tempfile = "3.1.0" azure = ["object_store/azure"] gcp = ["object_store/gcp"] aws = ["object_store/aws"] + + +[patch.crates-io] +arrow = { git="https://github.com/alamb/arrow-rs.git", branch = "alamb/perf_integration" } +parquet = { git="https://github.com/alamb/arrow-rs.git", branch = "alamb/perf_integration" } +arrow-flight= { git="https://github.com/alamb/arrow-rs.git", branch = "alamb/perf_integration" } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index fbbefa62ae..252893e0c4 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -9,4 +9,4 @@ description = "Re-exports datafusion at a specific version" # Rename to workaround doctest bug # Turn off optional datafusion features (function packages) -upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev = "75a376f0ff2e8236c07a3b0a16374b7e3855c194", default-features = false, package = "datafusion" } +upstream = { git = "https://github.com/alamb/arrow-datafusion.git", branch = "alamb/perf_integration_df", default-features = false, package = "datafusion" } diff --git a/generated_types/Cargo.toml b/generated_types/Cargo.toml index 0e21335ef3..386ee36888 100644 --- a/generated_types/Cargo.toml +++ b/generated_types/Cargo.toml @@ -22,5 +22,7 @@ thiserror = "1.0.23" tonic = "0.4" [build-dependencies] # In alphabetical order +# Pin specific version of the tonic-build dependencies to match arrow +proc-macro2 = "=1.0.27" tonic-build = "0.4" prost-build = "0.7" diff --git a/query/src/provider/deduplicate/algo.rs b/query/src/provider/deduplicate/algo.rs index 799f5e2250..6cc7ecb77e 100644 --- a/query/src/provider/deduplicate/algo.rs +++ b/query/src/provider/deduplicate/algo.rs @@ -232,7 +232,7 @@ impl RecordBatchDeduplicator { .collect(); // Compute partitions (aka breakpoints between the ranges) - let ranges = arrow::compute::lexicographical_partition_ranges(&columns)?; + let ranges = arrow::compute::lexicographical_partition_ranges(&columns)?.collect(); Ok(DuplicateRanges { is_sort_key, diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index 85edbddb62..6e533b34c0 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -320,9 +320,9 @@ async fn sql_select_from_system_chunk_columns() { "| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | estimated_bytes |", "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", "| 1970-01-01T00 | 0 | h2o | city | ReadBuffer | 2 | Boston | Boston | 252 |", - "| 1970-01-01T00 | 0 | h2o | other_temp | ReadBuffer | 1 | 70.4 | 70.4 | 369 |", + "| 1970-01-01T00 | 0 | h2o | other_temp | ReadBuffer | 1 | 70.4 | 70.4 | 425 |", "| 1970-01-01T00 | 0 | h2o | state | ReadBuffer | 2 | MA | MA | 240 |", - "| 1970-01-01T00 | 0 | h2o | temp | ReadBuffer | 1 | 70.4 | 70.4 | 369 |", + "| 1970-01-01T00 | 0 | h2o | temp | ReadBuffer | 1 | 70.4 | 70.4 | 425 |", "| 1970-01-01T00 | 0 | h2o | time | ReadBuffer | 2 | 50 | 250 | 51 |", "| 1970-01-01T00 | 0 | o2 | city | OpenMutableBuffer | 1 | Boston | Boston | 35 |", "| 1970-01-01T00 | 0 | o2 | reading | OpenMutableBuffer | 1 | 51 | 51 | 25 |", diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index e2ea5e51dc..fdff4a2e0c 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -659,9 +659,9 @@ mod test { "# HELP read_buffer_column_bytes The number of bytes used by all columns in the Read Buffer", "# TYPE read_buffer_column_bytes gauge", r#"read_buffer_column_bytes{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64"} 72"#, - r#"read_buffer_column_bytes{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64"} 688"#, + r#"read_buffer_column_bytes{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64"} 800"#, r#"read_buffer_column_bytes{db="mydb",encoding="FIXED",log_data_type="f64"} 96"#, - r#"read_buffer_column_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool"} 768"#, + r#"read_buffer_column_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool"} 672"#, r#"read_buffer_column_bytes{db="mydb",encoding="RLE",log_data_type="string"} 500"#, "# HELP read_buffer_column_raw_bytes The number of bytes used by all columns if they were uncompressed in the Read Buffer", "# TYPE read_buffer_column_raw_bytes gauge", diff --git a/read_buffer/src/column/encoding/bool.rs b/read_buffer/src/column/encoding/bool.rs index 19b3ec0737..f32e21bb97 100644 --- a/read_buffer/src/column/encoding/bool.rs +++ b/read_buffer/src/column/encoding/bool.rs @@ -360,7 +360,7 @@ mod test { #[test] fn size() { let v = Bool::from(vec![None, None, Some(true), Some(false)].as_slice()); - assert_eq!(v.size(), 464); + assert_eq!(v.size(), 400); } #[test] diff --git a/read_buffer/src/column/encoding/scalar/fixed_null.rs b/read_buffer/src/column/encoding/scalar/fixed_null.rs index e6976fbf3a..0597438466 100644 --- a/read_buffer/src/column/encoding/scalar/fixed_null.rs +++ b/read_buffer/src/column/encoding/scalar/fixed_null.rs @@ -478,7 +478,7 @@ mod test { #[test] fn size() { let (v, _) = new_encoding(vec![None, None, Some(100), Some(2222)]); - assert_eq!(v.size(), 352); + assert_eq!(v.size(), 408); } #[test] diff --git a/read_buffer/src/column/integer.rs b/read_buffer/src/column/integer.rs index 921ba838e3..2118edf85a 100644 --- a/read_buffer/src/column/integer.rs +++ b/read_buffer/src/column/integer.rs @@ -971,13 +971,13 @@ mod test { // Input data containing NULL will be stored in an Arrow array encoding let cases = vec![ - (vec![None, Some(0_i64)], 344_usize), // u8 Arrow array - (vec![None, Some(-120_i64)], 344), // i8 - (vec![None, Some(399_i64)], 344), // u16 - (vec![None, Some(-399_i64)], 344), // i16 - (vec![None, Some(u32::MAX as i64)], 344), // u32 - (vec![None, Some(i32::MIN as i64)], 344), // i32 - (vec![None, Some(u32::MAX as i64 + 1)], 344), //u64 + (vec![None, Some(0_i64)], 400_usize), // u8 Arrow array + (vec![None, Some(-120_i64)], 400), // i8 + (vec![None, Some(399_i64)], 400), // u16 + (vec![None, Some(-399_i64)], 400), // i16 + (vec![None, Some(u32::MAX as i64)], 400), // u32 + (vec![None, Some(i32::MIN as i64)], 400), // i32 + (vec![None, Some(u32::MAX as i64 + 1)], 400), //u64 ]; for (case, name) in cases.iter().cloned() { @@ -1163,10 +1163,10 @@ mod test { // Input data containing NULL will be stored in an Arrow array encoding let cases = vec![ - (vec![None, Some(0_u64)], 344_usize), - (vec![None, Some(399_u64)], 344), - (vec![None, Some(u32::MAX as u64)], 344), - (vec![None, Some(u64::MAX)], 344), + (vec![None, Some(0_u64)], 400_usize), + (vec![None, Some(399_u64)], 400), + (vec![None, Some(u32::MAX as u64)], 400), + (vec![None, Some(u64::MAX)], 400), ]; for (case, size) in cases.iter().cloned() { diff --git a/server/src/db.rs b/server/src/db.rs index 4aea194e72..b12b1a03b8 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1345,7 +1345,7 @@ mod tests { .eq(1.0) .unwrap(); - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1143) + catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1255) .unwrap(); db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0) @@ -2394,7 +2394,7 @@ mod tests { 0, ChunkStorage::ClosedMutableBuffer, lifecycle_action, - 2190, + 2414, 1, ), ChunkSummary::new_without_timestamps( @@ -2416,7 +2416,7 @@ mod tests { assert_eq!( db.catalog.metrics().memory().mutable_buffer().get_total(), - 64 + 2190 + 87 + 64 + 2414 + 87 ); assert_eq!( db.catalog.metrics().memory().read_buffer().get_total(), diff --git a/src/influxdb_ioxd/rpc/flight.rs b/src/influxdb_ioxd/rpc/flight.rs index d654c244e9..979892d147 100644 --- a/src/influxdb_ioxd/rpc/flight.rs +++ b/src/influxdb_ioxd/rpc/flight.rs @@ -17,7 +17,7 @@ use arrow::{ use arrow_flight::{ flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer}, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, - HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, + HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, }; use data_types::{DatabaseName, DatabaseNameError}; use server::{ConnectionManager, Server}; @@ -191,8 +191,7 @@ where let options = arrow::ipc::writer::IpcWriteOptions::default(); let schema = Arc::new(optimize_schema(&physical_plan.schema())); - let schema_flight_data = - arrow_flight::utils::flight_data_from_arrow_schema(&schema, &options); + let schema_flight_data = SchemaAsIpc::new(&schema, &options).into(); let mut flights = vec![schema_flight_data];