Merge branch 'main' into cn/cleanups

pull/24376/head
kodiakhq[bot] 2023-07-21 13:22:50 +00:00 committed by GitHub
commit 76ecfcc815
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 223 additions and 243 deletions

85
Cargo.lock generated
View File

@ -1367,7 +1367,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46182c894e5106adba7fb53e9848ce666fb6129b#46182c894e5106adba7fb53e9848ce666fb6129b"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=44008d71180f2d03e9d21944788e61cb8845abc7#44008d71180f2d03e9d21944788e61cb8845abc7"
dependencies = [
"ahash",
"arrow",
@ -1415,7 +1415,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46182c894e5106adba7fb53e9848ce666fb6129b#46182c894e5106adba7fb53e9848ce666fb6129b"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=44008d71180f2d03e9d21944788e61cb8845abc7#44008d71180f2d03e9d21944788e61cb8845abc7"
dependencies = [
"arrow",
"arrow-array",
@ -1429,7 +1429,7 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46182c894e5106adba7fb53e9848ce666fb6129b#46182c894e5106adba7fb53e9848ce666fb6129b"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=44008d71180f2d03e9d21944788e61cb8845abc7#44008d71180f2d03e9d21944788e61cb8845abc7"
dependencies = [
"dashmap",
"datafusion-common",
@ -1446,7 +1446,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46182c894e5106adba7fb53e9848ce666fb6129b#46182c894e5106adba7fb53e9848ce666fb6129b"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=44008d71180f2d03e9d21944788e61cb8845abc7#44008d71180f2d03e9d21944788e61cb8845abc7"
dependencies = [
"ahash",
"arrow",
@ -1460,7 +1460,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46182c894e5106adba7fb53e9848ce666fb6129b#46182c894e5106adba7fb53e9848ce666fb6129b"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=44008d71180f2d03e9d21944788e61cb8845abc7#44008d71180f2d03e9d21944788e61cb8845abc7"
dependencies = [
"arrow",
"async-trait",
@ -1477,7 +1477,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46182c894e5106adba7fb53e9848ce666fb6129b#46182c894e5106adba7fb53e9848ce666fb6129b"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=44008d71180f2d03e9d21944788e61cb8845abc7#44008d71180f2d03e9d21944788e61cb8845abc7"
dependencies = [
"ahash",
"arrow",
@ -1498,6 +1498,7 @@ dependencies = [
"itertools 0.11.0",
"lazy_static",
"libc",
"log",
"md-5",
"paste",
"petgraph",
@ -1511,7 +1512,7 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46182c894e5106adba7fb53e9848ce666fb6129b#46182c894e5106adba7fb53e9848ce666fb6129b"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=44008d71180f2d03e9d21944788e61cb8845abc7#44008d71180f2d03e9d21944788e61cb8845abc7"
dependencies = [
"arrow",
"chrono",
@ -1525,7 +1526,7 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46182c894e5106adba7fb53e9848ce666fb6129b#46182c894e5106adba7fb53e9848ce666fb6129b"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=44008d71180f2d03e9d21944788e61cb8845abc7#44008d71180f2d03e9d21944788e61cb8845abc7"
dependencies = [
"arrow",
"datafusion-common",
@ -1536,7 +1537,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46182c894e5106adba7fb53e9848ce666fb6129b#46182c894e5106adba7fb53e9848ce666fb6129b"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=44008d71180f2d03e9d21944788e61cb8845abc7#44008d71180f2d03e9d21944788e61cb8845abc7"
dependencies = [
"arrow",
"arrow-schema",
@ -1731,12 +1732,9 @@ dependencies = [
[[package]]
name = "fastrand"
version = "1.9.0"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
dependencies = [
"instant",
]
checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764"
[[package]]
name = "fd-lock"
@ -1745,7 +1743,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef033ed5e9bad94e55838ca0ca906db0e043f517adda0c8b79c7a8c66c93c1b5"
dependencies = [
"cfg-if",
"rustix 0.38.4",
"rustix",
"windows-sys 0.48.0",
]
@ -2763,15 +2761,6 @@ dependencies = [
"yaml-rust",
]
[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
]
[[package]]
name = "integer-encoding"
version = "3.0.4"
@ -2784,17 +2773,6 @@ version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "924df4f0e24e2e7f9cdd90babb0b96f93b20f3ecfa949ea9e6613756b8c8e1bf"
[[package]]
name = "io-lifetimes"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi",
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "iox_catalog"
version = "0.1.0"
@ -3164,7 +3142,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
dependencies = [
"hermit-abi",
"rustix 0.38.4",
"rustix",
"windows-sys 0.48.0",
]
@ -3312,12 +3290,6 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "linux-raw-sys"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "linux-raw-sys"
version = "0.4.3"
@ -3741,9 +3713,9 @@ dependencies = [
[[package]]
name = "num-traits"
version = "0.2.15"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2"
dependencies = [
"autocfg",
"libm",
@ -4788,20 +4760,6 @@ dependencies = [
"semver",
]
[[package]]
name = "rustix"
version = "0.37.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8818fa822adcc98b18fedbb3632a6a33213c070556b5aa7c4c8cc21cff565c4c"
dependencies = [
"bitflags 1.3.2",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys 0.3.8",
"windows-sys 0.48.0",
]
[[package]]
name = "rustix"
version = "0.38.4"
@ -4811,7 +4769,7 @@ dependencies = [
"bitflags 2.3.3",
"errno",
"libc",
"linux-raw-sys 0.4.3",
"linux-raw-sys",
"windows-sys 0.48.0",
]
@ -5740,15 +5698,14 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.6.0"
version = "3.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31c0432476357e58790aaa47a8efb0c5138f137343f3b5f23bd36a27e3b0a6d6"
checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998"
dependencies = [
"autocfg",
"cfg-if",
"fastrand",
"redox_syscall 0.3.5",
"rustix 0.37.22",
"rustix",
"windows-sys 0.48.0",
]
@ -6918,7 +6875,7 @@ dependencies = [
"regex-syntax 0.7.4",
"reqwest",
"ring",
"rustix 0.38.4",
"rustix",
"rustls",
"scopeguard",
"serde",

View File

@ -121,8 +121,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "43.0.0" }
arrow-flight = { version = "43.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46182c894e5106adba7fb53e9848ce666fb6129b", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46182c894e5106adba7fb53e9848ce666fb6129b" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "44008d71180f2d03e9d21944788e61cb8845abc7", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "44008d71180f2d03e9d21944788e61cb8845abc7" }
hashbrown = { version = "0.14.0" }
object_store = { version = "0.6.0" }

View File

@ -21,7 +21,7 @@ uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
tempfile = "3.6.0"
tempfile = "3.7.0"
test_helpers = { path = "../test_helpers" }
[features]

View File

@ -51,3 +51,4 @@ We hold monthly Tech Talks that explain the project's technical underpinnings. Y
* [Querier <> Ingester Query Protocol](ingester_querier_protocol.md)
* [Underground Guide to Running IOx Locally](underground_guide.md)
* [Query Processing](query_processing.md)
* [How to Reproduce and Debug Production Data Locally](debug.md)

105
docs/debug.md Normal file
View File

@ -0,0 +1,105 @@
# How to Reproduce and Debug Production Data Locally
Here is a way to reproduce issues using production data locally with all in one mode.
## Summary of steps
Reproduce the error locally by building a local catalog from the output of `influxdb_iox remote store get-table`:
1. Download contents of table_name into a directory named 'table_name'
```
influxdb_iox remote store get-table <namespace> <table_name>
```
1. Create a catalog and object_store in /tmp/data_dir
```
influxdb_iox debug build-catalog <table_dir> /tmp/data_dir
```
1. Start iox using this data directory (you can now query `table_name` locally):
```
influxdb_iox --data-dir /tmp/data_dir
```
## Demonstration
## Setup
Running `influxdb_iox` and getting local telegraph data
```shell
$ influxdb_iox namespace list
[
{
"id": "1",
"name": "26f7e5a4b7be365b_917b97a92e883afc",
"maxTables": 500,
"maxColumnsPerTable": 200
}
]
```
## Export `cpu` table:
```shell
$ influxdb_iox remote store get-table 26f7e5a4b7be365b_917b97a92e883afc cpu
found 11 Parquet files, exporting...
downloading file 1 of 11 (1b8eb36a-7a34-4635-9156-251efcb1c024.4.parquet)...
downloading file 2 of 11 (1819137f-7cb5-4dc8-8051-6fa0b42990cb.4.parquet)...
downloading file 3 of 11 (4931cad7-7aaf-4b41-8f46-2d3be85c492b.4.parquet)...
downloading file 4 of 11 (be75f5fb-a8bc-4646-893a-70d496b13f3d.4.parquet)...
downloading file 5 of 11 (5235b87d-19ee-48ae-830f-b19d81bfe915.4.parquet)...
downloading file 6 of 11 (a8f7be33-42b6-4353-8735-51b245196d39.4.parquet)...
downloading file 7 of 11 (3b43c4ee-7500-47f9-9c0f-76d4f80b480e.4.parquet)...
downloading file 8 of 11 (081da5be-e0f9-4b42-8cd2-45bfebbd934c.4.parquet)...
downloading file 9 of 11 (f29ba3b4-53b1-4c68-9287-4bcea7c4e86b.4.parquet)...
downloading file 10 of 11 (1ce94ce2-1200-4516-950a-64828a7cebba.4.parquet)...
downloading file 11 of 11 (3a2b5525-3be5-41ef-b082-b279edc32acb.4.parquet)...
Done.
$ ls cpu/
081da5be-e0f9-4b42-8cd2-45bfebbd934c.4.parquet 1ce94ce2-1200-4516-950a-64828a7cebba.4.parquet 4931cad7-7aaf-4b41-8f46-2d3be85c492b.4.parquet be75f5fb-a8bc-4646-893a-70d496b13f3d.4.parquet
081da5be-e0f9-4b42-8cd2-45bfebbd934c.4.parquet.json 1ce94ce2-1200-4516-950a-64828a7cebba.4.parquet.json 4931cad7-7aaf-4b41-8f46-2d3be85c492b.4.parquet.json be75f5fb-a8bc-4646-893a-70d496b13f3d.4.parquet.json
1819137f-7cb5-4dc8-8051-6fa0b42990cb.4.parquet 3a2b5525-3be5-41ef-b082-b279edc32acb.4.parquet 5235b87d-19ee-48ae-830f-b19d81bfe915.4.parquet f29ba3b4-53b1-4c68-9287-4bcea7c4e86b.4.parquet
1819137f-7cb5-4dc8-8051-6fa0b42990cb.4.parquet.json 3a2b5525-3be5-41ef-b082-b279edc32acb.4.parquet.json 5235b87d-19ee-48ae-830f-b19d81bfe915.4.parquet.json f29ba3b4-53b1-4c68-9287-4bcea7c4e86b.4.parquet.json
1b8eb36a-7a34-4635-9156-251efcb1c024.4.parquet 3b43c4ee-7500-47f9-9c0f-76d4f80b480e.4.parquet a8f7be33-42b6-4353-8735-51b245196d39.4.parquet partition.4.json
1b8eb36a-7a34-4635-9156-251efcb1c024.4.parquet.json 3b43c4ee-7500-47f9-9c0f-76d4f80b480e.4.parquet.json a8f7be33-42b6-4353-8735-51b245196d39.4.parquet.json table.1.json
```
## Build a new `new_data_dir` from export:
```shell
$ influxdb_iox debug build-catalog cpu new_data_dir
Beginning catalog / object_store build from "cpu" in "new_data_dir"....
Done
$ ls new_data_dir/
catalog.sqlite object_store/
```
## Run `influxdb_iox` with `new_data_dir`:
```shell
$ influxdb_iox --data-dir new_data_dir/
```
And in a separate shell, you can query the data and see it is present:
```shell
$ influxdb_iox query 26f7e5a4b7be365b_917b97a92e883afc 'select * from cpu limit 10';
+-----------+---------------------+----------------------+-------------+------------------+-------------------+--------------+-----------+------------+---------------+-------------+--------------------+--------------------+
| cpu | host | time | usage_guest | usage_guest_nice | usage_idle | usage_iowait | usage_irq | usage_nice | usage_softirq | usage_steal | usage_system | usage_user |
+-----------+---------------------+----------------------+-------------+------------------+-------------------+--------------+-----------+------------+---------------+-------------+--------------------+--------------------+
| cpu-total | MacBook-Pro-8.local | 2023-07-06T17:13:40Z | 0.0 | 0.0 | 95.6668753914105 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.4902943018170824 | 2.8428303068453085 |
| cpu-total | MacBook-Pro-8.local | 2023-07-06T17:13:50Z | 0.0 | 0.0 | 95.9551687433697 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.4213261536472683 | 2.6235051029648098 |
| cpu-total | MacBook-Pro-8.local | 2023-07-06T17:14:00Z | 0.0 | 0.0 | 96.52108622167991 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.37029157802418 | 2.108622199968126 |
| cpu-total | MacBook-Pro-8.local | 2023-07-06T17:14:10Z | 0.0 | 0.0 | 95.26819803491809 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.752519246414341 | 2.979282718922596 |
| cpu-total | MacBook-Pro-8.local | 2023-07-06T17:14:20Z | 0.0 | 0.0 | 95.28402329791422 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.6408843239063593 | 3.0750923780335997 |
| cpu-total | MacBook-Pro-8.local | 2023-07-06T17:14:30Z | 0.0 | 0.0 | 93.97484827633119 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 2.0271538509716924 | 3.9979978727699588 |
| cpu-total | MacBook-Pro-8.local | 2023-07-06T17:14:40Z | 0.0 | 0.0 | 95.69219209824692 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.458894245831095 | 2.848913656031324 |
| cpu-total | MacBook-Pro-8.local | 2023-07-06T17:14:50Z | 0.0 | 0.0 | 94.78402607970591 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.9685286188771443 | 3.2474453011797517 |
| cpu-total | MacBook-Pro-8.local | 2023-07-06T17:15:00Z | 0.0 | 0.0 | 95.85132344665212 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.5706151054475623 | 2.5780614479731607 |
| cpu0 | MacBook-Pro-8.local | 2023-07-06T17:13:40Z | 0.0 | 0.0 | 78.65055387717186 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 7.452165156077374 | 13.897280966824042 |
+-----------+---------------------+----------------------+-------------+------------------+-------------------+--------------+-----------+------------+---------------+-------------+--------------------+--------------------+
```

View File

@ -69,7 +69,7 @@ once_cell = { version = "1.18", features = ["parking_lot"] }
rustyline = { version = "12.0", default-features = false, features = ["with-file-history"]}
serde_json = "1.0.103"
snafu = "0.7"
tempfile = "3.6.0"
tempfile = "3.7.0"
thiserror = "1.0.43"
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time", "io-std"] }

View File

@ -1,10 +1,5 @@
//! Tests the `influxdb_iox debug` commands
use std::{
collections::VecDeque,
io::Write,
path::{Path, PathBuf},
time::Duration,
};
use std::path::Path;
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_sorted_eq;
@ -12,7 +7,6 @@ use assert_cmd::Command;
use futures::FutureExt;
use predicates::prelude::*;
use tempfile::TempDir;
use test_helpers::timeout::FutureTimeout;
use test_helpers_end_to_end::{
maybe_skip_integration, run_sql, MiniCluster, ServerFixture, Step, StepTest, StepTestState,
TestConfig,
@ -52,8 +46,6 @@ async fn test_print_cpu() {
/// 3. Start a all-in-one instance from that rebuilt catalog
/// 4. Can run a query successfully
#[tokio::test]
// Ignore due to https://github.com/influxdata/influxdb_iox/issues/8203
#[ignore]
async fn build_catalog() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
@ -111,20 +103,11 @@ async fn build_catalog() {
let table_dir = export_dir.path().join(table_name);
// We can build a catalog and start up the server and run a query
let restarted = RestartedServer::build_catalog_and_start(&table_dir).await;
let batches = restarted
.run_sql_until_non_empty(sql, namespace.as_str())
.await;
assert_batches_sorted_eq!(&expected, &batches);
rebuild_and_query(&table_dir, &namespace, sql, &expected).await;
// We can also rebuild a catalog from just the parquet files
let only_parquet_dir = copy_only_parquet_files(&table_dir);
let restarted =
RestartedServer::build_catalog_and_start(only_parquet_dir.path()).await;
let batches = restarted
.run_sql_until_non_empty(sql, namespace.as_str())
.await;
assert_batches_sorted_eq!(&expected, &batches);
rebuild_and_query(only_parquet_dir.path(), &namespace, sql, &expected).await;
}
.boxed()
})),
@ -134,6 +117,30 @@ async fn build_catalog() {
.await
}
/// Rebuilds a catalog from an export directory, starts up a server
/// and verifies the running `sql` in `namespace` produces `expected`
async fn rebuild_and_query(table_dir: &Path, namespace: &str, sql: &str, expected: &[&str]) {
// Very occassionally, something goes wrong with the sqlite based
// catalog and it doesn't get the new files. Thus try a few times
//
// See https://github.com/influxdata/influxdb_iox/issues/8287
let mut retries = 5;
while retries > 0 {
println!("** Retries remaining: {retries}");
let restarted = RestartedServer::build_catalog_and_start(table_dir).await;
let batches = restarted.run_sql(sql, namespace).await;
// if we got results, great, otherwise try again
if !batches.is_empty() {
assert_batches_sorted_eq!(expected, &batches);
return;
}
retries -= 1;
}
}
/// An all in one instance, with data directory of `data_dir`
struct RestartedServer {
all_in_one: ServerFixture,
@ -171,7 +178,7 @@ impl RestartedServer {
println!("target_directory: {data_dir:?}");
// call `influxdb_iox debug build-catalog <table_dir> <new_data_dir>`
let cmd = Command::cargo_bin("influxdb_iox")
Command::cargo_bin("influxdb_iox")
.unwrap()
// use -v to enable logging so we can check the status messages
.arg("-vv")
@ -180,31 +187,18 @@ impl RestartedServer {
.arg(exported_table_dir.as_os_str().to_str().unwrap())
.arg(data_dir.path().as_os_str().to_str().unwrap())
.assert()
.success();
// debug information to track down https://github.com/influxdata/influxdb_iox/issues/8203
println!("***** Begin build-catalog STDOUT ****");
std::io::stdout()
.write_all(&cmd.get_output().stdout)
.unwrap();
println!("***** Begin build-catalog STDERR ****");
std::io::stdout()
.write_all(&cmd.get_output().stderr)
.unwrap();
println!("***** DONE ****");
cmd.stdout(
predicate::str::contains("Beginning catalog / object_store build")
.and(predicate::str::contains(
"Begin importing files total_files=1",
))
.and(predicate::str::contains(
"Completed importing files total_files=1",
)),
);
.success()
.stdout(
predicate::str::contains("Beginning catalog / object_store build")
.and(predicate::str::contains(
"Begin importing files total_files=1",
))
.and(predicate::str::contains(
"Completed importing files total_files=1",
)),
);
println!("Completed rebuild in {data_dir:?}");
RecursiveDirPrinter::new().print(data_dir.path());
// now, start up a new server in all-in-one mode
// using the newly built data directory
@ -216,27 +210,6 @@ impl RestartedServer {
data_dir,
}
}
/// Runs the SQL query against this server, in a loop until
/// results are returned. Panics if the results are not produced
/// within a 5 seconds
async fn run_sql_until_non_empty(&self, sql: &str, namespace: &str) -> Vec<RecordBatch> {
let timeout = Duration::from_secs(5);
let loop_sleep = Duration::from_millis(500);
let fut = async {
loop {
let batches = self.run_sql(sql, namespace).await;
if !batches.is_empty() {
return batches;
}
tokio::time::sleep(loop_sleep).await;
}
};
fut.with_timeout(timeout)
.await
.expect("timed out waiting for non-empty batches in result")
}
}
/// Copies only parquet files from the source directory to a new
@ -262,43 +235,3 @@ fn copy_only_parquet_files(src: &Path) -> TempDir {
}
target_dir
}
/// Prints out the contents of the directory recursively
/// for debugging.
///
/// ```text
/// RecursiveDirPrinter All files rooted at "/tmp/.tmpvf16r0"
/// "/tmp/.tmpvf16r0"
/// "/tmp/.tmpvf16r0/catalog.sqlite"
/// "/tmp/.tmpvf16r0/object_store"
/// "/tmp/.tmpvf16r0/object_store/1"
/// "/tmp/.tmpvf16r0/object_store/1/1"
/// "/tmp/.tmpvf16r0/object_store/1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237"
/// "/tmp/.tmpvf16r0/object_store/1/1/b862a7e9b329ee6a418cde191198eaeb1512753f19b87a81def2ae6c3d0ed237/d78abef6-6859-48eb-aa62-3518097fbb9b.parquet"
///
struct RecursiveDirPrinter {
paths: VecDeque<PathBuf>,
}
impl RecursiveDirPrinter {
fn new() -> Self {
Self {
paths: VecDeque::new(),
}
}
// print root and all directories
fn print(mut self, root: &Path) {
println!("RecursiveDirPrinter All files rooted at {root:?}");
self.paths.push_back(PathBuf::from(root));
while let Some(path) = self.paths.pop_front() {
println!("{path:?}");
if path.is_dir() {
for entry in std::fs::read_dir(path).unwrap() {
self.paths.push_front(entry.unwrap().path());
}
}
}
}
}

View File

@ -1,8 +1,8 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use std::path::PathBuf;
use arrow::{
array::as_generic_binary_array,
datatypes::{DataType, Fields, Schema, SchemaRef, TimeUnit},
datatypes::{DataType, Schema, TimeUnit},
record_batch::RecordBatch,
};
use arrow_flight::{
@ -1592,10 +1592,7 @@ async fn assert_schema(client: &mut FlightClient, cmd: Any) {
let mut saw_data = false;
while let Some(batch) = result_stream.try_next().await.unwrap() {
saw_data = true;
// strip metadata (GetFlightInfo doesn't include metadata for
// some reason) before comparison
// https://github.com/influxdata/influxdb_iox/issues/7282
let batch_schema = strip_metadata(&batch.schema());
let batch_schema = batch.schema();
assert_eq!(
batch_schema.as_ref(),
&flight_info_schema,
@ -1603,10 +1600,6 @@ async fn assert_schema(client: &mut FlightClient, cmd: Any) {
);
// The stream itself also may report a schema
if let Some(stream_schema) = result_stream.schema() {
// strip metadata (GetFlightInfo doesn't include metadata for
// some reason) before comparison
// https://github.com/influxdata/influxdb_iox/issues/7282
let stream_schema = strip_metadata(stream_schema);
assert_eq!(stream_schema.as_ref(), &flight_info_schema);
}
}
@ -1615,16 +1608,6 @@ async fn assert_schema(client: &mut FlightClient, cmd: Any) {
assert!(saw_data);
}
fn strip_metadata(schema: &Schema) -> SchemaRef {
let stripped_fields: Fields = schema
.fields()
.iter()
.map(|f| f.as_ref().clone().with_metadata(HashMap::new()))
.collect();
Arc::new(Schema::new(stripped_fields))
}
#[tokio::test]
async fn authz() {
test_helpers::maybe_start_logging();

View File

@ -60,7 +60,7 @@ lazy_static = "1.4.0"
mutable_batch_lp = { path = "../mutable_batch_lp" }
object_store = { workspace = true }
paste = "1.0.14"
tempfile = "3.6.0"
tempfile = "3.7.0"
test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
tokio = { version = "1.29", features = ["macros", "time", "test-util"] }

View File

@ -24,11 +24,10 @@ use super::PartitionProvider;
type BoxedResolveFuture =
Pin<Box<dyn std::future::Future<Output = Arc<Mutex<PartitionData>>> + Send>>;
/// A compound key of `(namespace, table, partition_key)` which uniquely
/// A compound key of `(table, partition_key)` which uniquely
/// identifies a single partition.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct Key {
namespace_id: NamespaceId,
table_id: TableId,
partition_key: PartitionKey,
}
@ -148,7 +147,6 @@ where
table: Arc<DeferredLoad<TableMetadata>>,
) -> Arc<Mutex<PartitionData>> {
let key = Key {
namespace_id,
table_id,
partition_key: partition_key.clone(), // Ref-counted anyway!
};

View File

@ -27,7 +27,7 @@ object_store = { workspace = true }
observability_deps = { version = "0.1.0", path = "../observability_deps" }
parquet_file = { version = "0.1.0", path = "../parquet_file" }
prost = { version = "0.11.9", default-features = false, features = ["std"] }
tempfile = { version = "3.6.0" }
tempfile = { version = "3.7.0" }
test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tokio-util = "0.7.8"

View File

@ -0,0 +1,13 @@
-- FUNTION that updates the new_file_at field in the partition table when the update_partition trigger is fired
-- The field new_file_at signals when the last file was added to the partition for compaction.
CREATE OR REPLACE FUNCTION update_partition_on_new_file_at()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS $$
BEGIN
UPDATE partition SET new_file_at = NEW.created_at WHERE id = NEW.partition_id;
RETURN NEW;
END;
$$;

View File

@ -0,0 +1,9 @@
-- update new_file_at for all compactions, not just L0 & L1
drop trigger update_partition;
create trigger if not exists update_partition
after insert
on parquet_file
for each row
begin
UPDATE partition set new_file_at = NEW.created_at WHERE id = NEW.partition_id;
end;

View File

@ -2575,7 +2575,6 @@ pub(crate) mod test_helpers {
assert!(partitions.is_empty());
// Add an L2 file created just now for partition three
// Since the file is L2, the partition won't get updated
let l2_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
created_at: time_now,
@ -2588,16 +2587,17 @@ pub(crate) mod test_helpers {
.create(l2_file_params.clone())
.await
.unwrap();
// still should return partition one and two only
// now should return partition one two and three
let mut partitions = repos
.partitions()
.partitions_new_file_between(time_two_hour_ago, None)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
assert_eq!(partitions.len(), 3);
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
assert_eq!(partitions[2], partition3.id);
// Only return partition1: the creation time must be strictly less than the maximum time,
// not equal
let partitions = repos

View File

@ -995,23 +995,19 @@ async fn create_parquet_file(
parquet_file_params,
ParquetFileId::new(stage.parquet_files.len() as i64 + 1),
);
let compaction_level = parquet_file.compaction_level;
let created_at = parquet_file.created_at;
let partition_id = parquet_file.partition_id;
stage.parquet_files.push(parquet_file);
// Update the new_file_at field its partition to the time of created_at
// Only update if the compaction level is not Final which signal more compaction needed
if compaction_level < CompactionLevel::Final {
let partition = stage
.partitions
.iter_mut()
.find(|p| p.id == partition_id)
.ok_or(Error::PartitionNotFound {
id: TransitionPartitionId::Deprecated(partition_id),
})?;
partition.new_file_at = Some(created_at);
}
let partition = stage
.partitions
.iter_mut()
.find(|p| p.id == partition_id)
.ok_or(Error::PartitionNotFound {
id: TransitionPartitionId::Deprecated(partition_id),
})?;
partition.new_file_at = Some(created_at);
Ok(stage.parquet_files.last().unwrap().clone())
}

View File

@ -39,7 +39,7 @@ pub(super) fn accumulator(dt: &DataType) -> Result<Box<dyn Accumulator>> {
/// Calculate the intermediate merge state for the aggregator.
pub(super) fn state_type(dt: &DataType) -> Result<Arc<Vec<DataType>>> {
Ok(Arc::new(vec![
DataType::List(Arc::new(Field::new("state", dt.clone(), false))),
DataType::List(Arc::new(Field::new("item", dt.clone(), true))),
DataType::Float64,
]))
}

View File

@ -145,34 +145,27 @@ where
negated,
expr,
pattern,
case_insensitive,
escape_char,
}) => Ok(Expr::Like(Like::new(
*negated,
Box::new(clone_with_replacement(expr, replacement_fn)?),
Box::new(clone_with_replacement(pattern, replacement_fn)?),
*escape_char,
))),
Expr::ILike(Like {
negated,
expr,
pattern,
escape_char,
}) => Ok(Expr::ILike(Like::new(
*negated,
Box::new(clone_with_replacement(expr, replacement_fn)?),
Box::new(clone_with_replacement(pattern, replacement_fn)?),
*escape_char,
*case_insensitive,
))),
Expr::SimilarTo(Like {
negated,
expr,
pattern,
case_insensitive,
escape_char,
}) => Ok(Expr::SimilarTo(Like::new(
*negated,
Box::new(clone_with_replacement(expr, replacement_fn)?),
Box::new(clone_with_replacement(pattern, replacement_fn)?),
*escape_char,
*case_insensitive,
))),
Expr::Case(case) => Ok(Expr::Case(Case::new(
match &case.expr {

View File

@ -21,17 +21,6 @@ struct NonNegative {
}
impl PartitionEvaluator for NonNegative {
fn update_state(
&mut self,
state: &WindowAggState,
idx: usize,
range_columns: &[Arc<dyn Array>],
sort_partition_points: &[Range<usize>],
) -> Result<()> {
self.partition_evaluator
.update_state(state, idx, range_columns, sort_partition_points)
}
fn memoize(&mut self, state: &mut WindowAggState) -> Result<()> {
self.partition_evaluator.memoize(state)
}

View File

@ -496,7 +496,6 @@ impl TreeNodeVisitor for RowBasedVisitor {
| Expr::Column(_)
| Expr::Exists { .. }
| Expr::GetIndexedField { .. }
| Expr::ILike { .. }
| Expr::InList { .. }
| Expr::InSubquery { .. }
| Expr::IsFalse(_)

View File

@ -515,6 +515,7 @@ mod tests {
expr,
pattern,
escape_char: None,
case_insensitive: false,
})
}

View File

@ -8,7 +8,7 @@ license.workspace = true
[dependencies] # In alphabetical order
dotenvy = "0.15.7"
parking_lot = "0.12"
tempfile = "3.6.0"
tempfile = "3.7.0"
tracing-log = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
observability_deps = { path = "../observability_deps" }

View File

@ -32,7 +32,7 @@ regex = "1.9"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
snafu = "0.7"
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls" , "postgres", "uuid" ] }
tempfile = "3.6.0"
tempfile = "3.7.0"
test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-util = "0.7"

View File

@ -22,6 +22,6 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
sysinfo = "0.29.5"
[dev-dependencies]
tempfile = "3.6.0"
tempfile = "3.7.0"
# Need the multi-threaded executor for testing
tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread", "time"] }

View File

@ -28,9 +28,9 @@ bytes = { version = "1" }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] }
crossbeam-utils = { version = "0.8" }
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46182c894e5106adba7fb53e9848ce666fb6129b" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46182c894e5106adba7fb53e9848ce666fb6129b", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46182c894e5106adba7fb53e9848ce666fb6129b", default-features = false, features = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "44008d71180f2d03e9d21944788e61cb8845abc7" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "44008d71180f2d03e9d21944788e61cb8845abc7", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "44008d71180f2d03e9d21944788e61cb8845abc7", default-features = false, features = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions"] }
digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1", features = ["serde"] }
fixedbitset = { version = "0.4" }
@ -172,6 +172,7 @@ rustls = { version = "0.21" }
[target.x86_64-unknown-linux-gnu.build-dependencies]
bitflags = { version = "2", default-features = false, features = ["std"] }
once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustix = { version = "0.38", features = ["fs", "termios"] }
rustls = { version = "0.21" }
[target.x86_64-apple-darwin.dependencies]
@ -184,6 +185,7 @@ rustls = { version = "0.21" }
[target.x86_64-apple-darwin.build-dependencies]
bitflags = { version = "2", default-features = false, features = ["std"] }
once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustix = { version = "0.38", features = ["fs", "termios"] }
rustls = { version = "0.21" }
[target.aarch64-apple-darwin.dependencies]
@ -196,6 +198,7 @@ rustls = { version = "0.21" }
[target.aarch64-apple-darwin.build-dependencies]
bitflags = { version = "2", default-features = false, features = ["std"] }
once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustix = { version = "0.38", features = ["fs", "termios"] }
rustls = { version = "0.21" }
[target.x86_64-pc-windows-msvc.dependencies]