Merge branch 'main' into savage/wal-inspect-library-refactor

pull/24376/head
kodiakhq[bot] 2023-05-16 11:07:47 +00:00 committed by GitHub
commit 6bfa269896
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
500 changed files with 7420 additions and 7104 deletions

View File

@ -67,24 +67,6 @@ commands:
cargo clippy --version
cargo install cargo-hakari && cargo hakari --version
cache_restore:
description: Restore Cargo Cache
steps:
- restore_cache:
name: Restoring Cargo Cache
keys:
- cargo-cache-{{ arch }}-{{ .Branch }}-{{ checksum "Cargo.lock" }}
- cargo-cache-{{ arch }}-{{ .Branch }}
- cargo-cache
cache_save:
description: Save Cargo Cache
steps:
- save_cache:
name: Save Cargo Cache
paths:
- /usr/local/cargo/registry
key: cargo-cache-{{ arch }}-{{ .Branch }}-{{ checksum "Cargo.lock" }}
login_to_gcloud:
steps:
- run:
@ -110,11 +92,9 @@ jobs:
steps:
- checkout
- rust_components
- cache_restore
- run:
name: Rust fmt
command: cargo fmt --all -- --check
- cache_save
lint:
docker:
- image: quay.io/influxdb/rust:ci
@ -129,7 +109,6 @@ jobs:
steps:
- checkout
- rust_components
- cache_restore
- run:
name: Clippy
command: cargo clippy --all-targets --all-features --workspace -- -D warnings
@ -139,7 +118,6 @@ jobs:
- run:
name: Yamllint
command: yamllint --config-file .circleci/yamllint.yml --strict .
- cache_save
cargo_audit:
docker:
- image: quay.io/influxdb/rust:ci
@ -154,14 +132,12 @@ jobs:
steps:
- checkout
- rust_components
- cache_restore
- run:
name: Install cargo-deny
command: cargo install cargo-deny
- run:
name: cargo-deny Checks
command: cargo deny check -s
- cache_save
doc:
docker:
- image: quay.io/influxdb/rust:ci
@ -179,11 +155,9 @@ jobs:
steps:
- checkout
- rust_components
- cache_restore
- run:
name: Cargo doc
command: cargo doc --document-private-items --no-deps --workspace
- cache_save
- run:
name: Compress Docs
command: tar -cvzf rustdoc.tar.gz target/doc/
@ -201,7 +175,6 @@ jobs:
- "77:99:88:4a:ac:1f:55:9e:39:c7:1f:e4:7f:1e:60:4b"
- checkout
- rust_components
- cache_restore
- run:
name: Configure git
command: |
@ -252,14 +225,6 @@ jobs:
# When removing this, also remove the ignore on the test in trogging/src/cli.rs
RUST_LOG: debug,,hyper::proto::h1=info,h2=info
LOG_FILTER: debug,,hyper::proto::h1=info,h2=info
# TEMPORARY: Can be removed when the ingester that uses the write buffer is removed. Tests
# need to spin up separate servers because I've only been able to implement a "persist
# everything" API, and if tests run in parallel using a shared server, they interfere with
# each other. Starting separate servers with the maximum number of Rust test threads uses up
# all the Postgres connections in CI, so limit the parallelization until we switch completely
# to ingester2, which does have a "persist-per-namespace" API that means tests can run on
# shared MiniClusters.
RUST_TEST_THREADS: 8
# Run the JDBC tests
TEST_INFLUXDB_JDBC: "true"
steps:
@ -270,7 +235,6 @@ jobs:
sudo apt-get update
sudo apt-get install openjdk-11-jdk -y
- rust_components
- cache_restore
- run:
name: Download flight-sql-jdbc-driver-10.0.0.jar
command: make -C influxdb_iox/tests/jdbc_client flight-sql-jdbc-driver-10.0.0.jar
@ -283,7 +247,6 @@ jobs:
- run:
name: cargo test --test end_to_end
command: cargo test --test end_to_end
- cache_save
# Run all tests (without external dependencies, like a developer would)
test:
@ -302,11 +265,9 @@ jobs:
steps:
- checkout
- rust_components
- cache_restore
- run:
name: cargo test --workspace
command: cargo test --workspace
- cache_save
# end to end tests with Heappy (heap profiling enabled)
@ -326,11 +287,9 @@ jobs:
steps:
- checkout
- rust_components
- cache_restore
- run:
name: cargo test --no-default-features --features=heappy end_to_end
command: cargo test --no-default-features --features=heappy end_to_end
- cache_save
# Build a dev binary.
#
@ -355,7 +314,6 @@ jobs:
steps:
- checkout
- rust_components
- cache_restore
- run:
name: Cargo build
command: cargo build --workspace
@ -369,7 +327,6 @@ jobs:
# Validate that the data generator compiles (in the same way as it does in release builds)
name: Check iox_data_generator compiles
command: cargo check --package="iox_data_generator" --no-default-features
- cache_save
# Lint protobufs.
protobuf-lint:
@ -470,7 +427,6 @@ jobs:
root: /tmp/images
paths:
- "*.tar.gz"
- cache_save
deploy_release:
docker:

269
Cargo.lock generated
View File

@ -276,7 +276,7 @@ dependencies = [
"paste",
"prost",
"tokio",
"tonic 0.9.2",
"tonic",
]
[[package]]
@ -514,13 +514,17 @@ dependencies = [
name = "authz"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"base64 0.21.0",
"generated_types",
"http",
"observability_deps",
"paste",
"snafu",
"tonic 0.9.2",
"test_helpers_end_to_end",
"tokio",
"tonic",
"workspace-hack",
]
@ -947,7 +951,7 @@ dependencies = [
"reqwest",
"thiserror",
"tokio",
"tonic 0.9.2",
"tonic",
"tower",
"workspace-hack",
]
@ -1002,7 +1006,7 @@ dependencies = [
]
[[package]]
name = "compactor2"
name = "compactor"
version = "0.1.0"
dependencies = [
"arrow_util",
@ -1010,7 +1014,7 @@ dependencies = [
"async-trait",
"backoff",
"bytes",
"compactor2_test_utils",
"compactor_test_utils",
"data_types",
"datafusion",
"futures",
@ -1037,12 +1041,12 @@ dependencies = [
]
[[package]]
name = "compactor2_test_utils"
name = "compactor_test_utils"
version = "0.1.0"
dependencies = [
"async-trait",
"backoff",
"compactor2",
"compactor",
"data_types",
"datafusion",
"datafusion_util",
@ -1085,21 +1089,21 @@ dependencies = [
[[package]]
name = "console-api"
version = "0.4.0"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e57ff02e8ad8e06ab9731d5dc72dc23bef9200778eae1a89d555d8c42e5d4a86"
checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e"
dependencies = [
"prost",
"prost-types",
"tonic 0.8.3",
"tonic",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.1.8"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22a3a81dfaf6b66bce5d159eddae701e3a002f194d378cbf7be5f053c281d9be"
checksum = "57ab2224a0311582eb03adba4caaf18644f7b1f10a760803a803b9b605187fc7"
dependencies = [
"console-api",
"crossbeam-channel",
@ -1107,14 +1111,14 @@ dependencies = [
"futures",
"hdrhistogram",
"humantime",
"parking_lot 0.11.2",
"parking_lot 0.12.1",
"prost-types",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic 0.8.3",
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber",
@ -1414,9 +1418,11 @@ name = "data_types"
version = "0.1.0"
dependencies = [
"croaring",
"generated_types",
"influxdb-line-protocol",
"iox_time",
"observability_deps",
"once_cell",
"ordered-float 3.7.0",
"percent-encoding",
"proptest",
@ -1431,8 +1437,8 @@ dependencies = [
[[package]]
name = "datafusion"
version = "23.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=beee1d91303d5eff220fadd08b2c28404c2b3e5a#beee1d91303d5eff220fadd08b2c28404c2b3e5a"
version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1480,8 +1486,8 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "23.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=beee1d91303d5eff220fadd08b2c28404c2b3e5a#beee1d91303d5eff220fadd08b2c28404c2b3e5a"
version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [
"arrow",
"arrow-array",
@ -1494,8 +1500,8 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "23.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=beee1d91303d5eff220fadd08b2c28404c2b3e5a#beee1d91303d5eff220fadd08b2c28404c2b3e5a"
version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [
"dashmap",
"datafusion-common",
@ -1511,8 +1517,8 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "23.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=beee1d91303d5eff220fadd08b2c28404c2b3e5a#beee1d91303d5eff220fadd08b2c28404c2b3e5a"
version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1522,8 +1528,8 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "23.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=beee1d91303d5eff220fadd08b2c28404c2b3e5a#beee1d91303d5eff220fadd08b2c28404c2b3e5a"
version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [
"arrow",
"async-trait",
@ -1539,8 +1545,8 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "23.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=beee1d91303d5eff220fadd08b2c28404c2b3e5a#beee1d91303d5eff220fadd08b2c28404c2b3e5a"
version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1571,8 +1577,8 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "23.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=beee1d91303d5eff220fadd08b2c28404c2b3e5a#beee1d91303d5eff220fadd08b2c28404c2b3e5a"
version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [
"arrow",
"chrono",
@ -1585,8 +1591,8 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "23.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=beee1d91303d5eff220fadd08b2c28404c2b3e5a#beee1d91303d5eff220fadd08b2c28404c2b3e5a"
version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [
"arrow",
"datafusion-common",
@ -1596,8 +1602,8 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "23.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=beee1d91303d5eff220fadd08b2c28404c2b3e5a#beee1d91303d5eff220fadd08b2c28404c2b3e5a"
version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [
"arrow",
"arrow-schema",
@ -1861,7 +1867,7 @@ dependencies = [
"prost",
"snafu",
"tokio",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -2035,20 +2041,15 @@ version = "0.1.0"
dependencies = [
"base64 0.21.0",
"bytes",
"data_types",
"datafusion",
"datafusion-proto",
"observability_deps",
"pbjson",
"pbjson-build",
"pbjson-types",
"predicate",
"prost",
"prost-build",
"query_functions",
"serde",
"snafu",
"tonic 0.9.2",
"tonic",
"tonic-build",
"workspace-hack",
]
@ -2105,7 +2106,7 @@ dependencies = [
"prost-build",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
"tonic-build",
"tower",
"workspace-hack",
@ -2118,7 +2119,7 @@ dependencies = [
"prost",
"prost-build",
"prost-types",
"tonic 0.9.2",
"tonic",
"tonic-build",
"workspace-hack",
]
@ -2130,7 +2131,7 @@ dependencies = [
"prost",
"prost-build",
"prost-types",
"tonic 0.9.2",
"tonic",
"tonic-build",
"workspace-hack",
]
@ -2172,9 +2173,9 @@ dependencies = [
[[package]]
name = "handlebars"
version = "4.3.6"
version = "4.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "035ef95d03713f2c347a72547b7cd38cbc9af7cd51e6099fb62d586d4a6dee3a"
checksum = "83c3372087601b532857d332f5957cbae686da52bb7810bf038c3e3c3cc2fa0d"
dependencies = [
"log",
"pest",
@ -2443,8 +2444,6 @@ dependencies = [
"client_util",
"data_types",
"futures",
"generated_types",
"influxdb_iox_client",
"iox_catalog",
"metric",
"object_store",
@ -2455,7 +2454,6 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"workspace-hack",
]
@ -2550,7 +2548,7 @@ dependencies = [
"clap 4.2.7",
"clap_blocks",
"comfy-table",
"compactor2",
"compactor",
"console-subscriber",
"data_types",
"datafusion",
@ -2567,14 +2565,15 @@ dependencies = [
"influxdb_iox_client",
"influxdb_storage_client",
"influxrpc_parser",
"ingester_query_grpc",
"insta",
"iox_catalog",
"iox_query",
"iox_time",
"ioxd_common",
"ioxd_compactor2",
"ioxd_compactor",
"ioxd_garbage_collector",
"ioxd_ingester2",
"ioxd_ingester",
"ioxd_querier",
"ioxd_router",
"ioxd_test",
@ -2588,6 +2587,7 @@ dependencies = [
"observability_deps",
"once_cell",
"panic_logging",
"parking_lot 0.12.1",
"parquet_file",
"parquet_to_line_protocol",
"predicate",
@ -2597,7 +2597,6 @@ dependencies = [
"schema",
"serde",
"serde_json",
"sharder",
"snafu",
"tempfile",
"test_helpers",
@ -2608,7 +2607,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.9.2",
"tonic",
"trace_exporters",
"trogging",
"uuid",
@ -2637,7 +2636,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
]
[[package]]
@ -2649,7 +2648,7 @@ dependencies = [
"generated_types",
"observability_deps",
"prost",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -2679,7 +2678,7 @@ dependencies = [
]
[[package]]
name = "ingester2"
name = "ingester"
version = "0.1.0"
dependencies = [
"arrow",
@ -2701,7 +2700,8 @@ dependencies = [
"generated_types",
"hashbrown 0.13.2",
"influxdb_iox_client",
"ingester2_test_ctx",
"ingester_query_grpc",
"ingester_test_ctx",
"iox_catalog",
"iox_query",
"iox_time",
@ -2728,7 +2728,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"tonic 0.9.2",
"tonic",
"trace",
"uuid",
"wal",
@ -2736,7 +2736,28 @@ dependencies = [
]
[[package]]
name = "ingester2_test_ctx"
name = "ingester_query_grpc"
version = "0.1.0"
dependencies = [
"base64 0.21.0",
"data_types",
"datafusion",
"datafusion-proto",
"pbjson",
"pbjson-build",
"pbjson-types",
"predicate",
"prost",
"prost-build",
"query_functions",
"serde",
"snafu",
"tonic-build",
"workspace-hack",
]
[[package]]
name = "ingester_test_ctx"
version = "0.1.0"
dependencies = [
"arrow",
@ -2748,7 +2769,8 @@ dependencies = [
"generated_types",
"hashbrown 0.13.2",
"influxdb_iox_client",
"ingester2",
"ingester",
"ingester_query_grpc",
"iox_catalog",
"iox_query",
"iox_time",
@ -2763,7 +2785,7 @@ dependencies = [
"test_helpers",
"tokio",
"tokio-util",
"tonic 0.9.2",
"tonic",
"wal",
"workspace-hack",
]
@ -2975,7 +2997,6 @@ dependencies = [
"parquet_file",
"predicate",
"schema",
"sharder",
"uuid",
"workspace-hack",
]
@ -3021,7 +3042,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.9.2",
"tonic",
"tonic-health",
"tonic-reflection",
"tower",
@ -3033,13 +3054,13 @@ dependencies = [
]
[[package]]
name = "ioxd_compactor2"
name = "ioxd_compactor"
version = "0.1.0"
dependencies = [
"async-trait",
"backoff",
"clap_blocks",
"compactor2",
"compactor",
"data_types",
"hyper",
"iox_catalog",
@ -3072,7 +3093,7 @@ dependencies = [
]
[[package]]
name = "ioxd_ingester2"
name = "ioxd_ingester"
version = "0.1.0"
dependencies = [
"arrow-flight",
@ -3081,7 +3102,7 @@ dependencies = [
"futures",
"generated_types",
"hyper",
"ingester2",
"ingester",
"iox_catalog",
"iox_query",
"ioxd_common",
@ -3119,7 +3140,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"tonic 0.9.2",
"tonic",
"trace",
"workspace-hack",
]
@ -3286,9 +3307,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.142"
version = "0.2.144"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a987beff54b60ffa6d51982e1aa1146bc42f19bd26be28b0586f252fccf5317"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
[[package]]
name = "libm"
@ -4163,22 +4184,22 @@ dependencies = [
[[package]]
name = "pin-project"
version = "1.0.12"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.12"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
"syn 2.0.15",
]
[[package]]
@ -4430,8 +4451,8 @@ dependencies = [
"datafusion",
"datafusion_util",
"futures",
"generated_types",
"influxdb_iox_client",
"ingester_query_grpc",
"insta",
"iox_catalog",
"iox_query",
@ -4458,7 +4479,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"tonic 0.9.2",
"tonic",
"trace",
"trace_exporters",
"trace_http",
@ -4761,7 +4782,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
"trace",
"workspace-hack",
]
@ -4783,9 +4804,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.37.18"
version = "0.37.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bbfc1d1c7c40c01715f47d71444744a81669ca84e8b63e25a55e169b1f86433"
checksum = "acf8729d8542766f1b2cf77eb034d52f40d375bb8b615d0b147089946e16613d"
dependencies = [
"bitflags",
"errno",
@ -4940,18 +4961,18 @@ checksum = "e6b44e8fc93a14e66336d230954dda83d18b4605ccace8fe09bc7514a71ad0bc"
[[package]]
name = "serde"
version = "1.0.160"
version = "1.0.163"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c"
checksum = "2113ab51b87a539ae008b5c6c02dc020ffa39afd2d83cffcb3f4eb2722cebec2"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.160"
version = "1.0.163"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df"
checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e"
dependencies = [
"proc-macro2",
"quote",
@ -5004,7 +5025,7 @@ dependencies = [
"metric",
"parking_lot 0.12.1",
"predicate",
"tonic 0.9.2",
"tonic",
"trace",
"tracker",
"workspace-hack",
@ -5020,7 +5041,7 @@ dependencies = [
"metric",
"observability_deps",
"tokio",
"tonic 0.9.2",
"tonic",
"uuid",
"workspace-hack",
]
@ -5050,7 +5071,7 @@ dependencies = [
"service_common",
"snafu",
"tokio",
"tonic 0.9.2",
"tonic",
"trace",
"trace_http",
"tracker",
@ -5089,7 +5110,7 @@ dependencies = [
"test_helpers",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
"trace",
"trace_http",
"tracker",
@ -5109,7 +5130,7 @@ dependencies = [
"observability_deps",
"paste",
"tokio",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -5127,7 +5148,7 @@ dependencies = [
"observability_deps",
"parquet_file",
"tokio",
"tonic 0.9.2",
"tonic",
"uuid",
"workspace-hack",
]
@ -5142,7 +5163,7 @@ dependencies = [
"metric",
"observability_deps",
"tokio",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -5152,7 +5173,7 @@ version = "0.1.0"
dependencies = [
"generated_types",
"observability_deps",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -5616,6 +5637,8 @@ dependencies = [
"http",
"hyper",
"influxdb_iox_client",
"ingester_query_grpc",
"iox_catalog",
"mutable_batch_lp",
"mutable_batch_pb",
"nix",
@ -5632,7 +5655,7 @@ dependencies = [
"test_helpers",
"tokio",
"tokio-util",
"tonic 0.9.2",
"tonic",
"workspace-hack",
]
@ -5751,9 +5774,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.28.0"
version = "1.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c786bf8134e5a3a166db9b29ab8f48134739014a3eca7bc6bfa95d673b136f"
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
dependencies = [
"autocfg",
"bytes",
@ -5871,38 +5894,6 @@ dependencies = [
"winnow",
]
[[package]]
name = "tonic"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.13.1",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tower-layer",
"tower-service",
"tracing",
"tracing-futures",
]
[[package]]
name = "tonic"
version = "0.9.2"
@ -5958,7 +5949,7 @@ dependencies = [
"prost",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
]
[[package]]
@ -5971,7 +5962,7 @@ dependencies = [
"prost-types",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
]
[[package]]
@ -6098,24 +6089,14 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.30"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a"
checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
@ -6290,9 +6271,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.3.2"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dad5567ad0cf5b760e5665964bec1b47dfd077ba8a2544b513f3556d3d239a2"
checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2"
dependencies = [
"getrandom",
]
@ -6783,7 +6764,6 @@ dependencies = [
"hashbrown 0.12.3",
"hashbrown 0.13.2",
"heck",
"hyper",
"indexmap",
"io-lifetimes",
"itertools",
@ -6820,6 +6800,7 @@ dependencies = [
"sha2",
"similar",
"smallvec",
"sqlparser",
"sqlx",
"sqlx-core",
"sqlx-macros",
@ -6829,7 +6810,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.9.2",
"tonic",
"tower",
"tracing",
"tracing-core",

View File

@ -7,8 +7,8 @@ members = [
"cache_system",
"clap_blocks",
"client_util",
"compactor2_test_utils",
"compactor2",
"compactor_test_utils",
"compactor",
"data_types",
"datafusion_util",
"dml",
@ -28,8 +28,9 @@ members = [
"influxdb_tsm",
"influxdb2_client",
"influxrpc_parser",
"ingester2_test_ctx",
"ingester2",
"ingester_test_ctx",
"ingester",
"ingester_query_grpc",
"iox_catalog",
"iox_data_generator",
"iox_query_influxql",
@ -38,9 +39,9 @@ members = [
"iox_tests",
"iox_time",
"ioxd_common",
"ioxd_compactor2",
"ioxd_compactor",
"ioxd_garbage_collector",
"ioxd_ingester2",
"ioxd_ingester",
"ioxd_querier",
"ioxd_router",
"ioxd_test",
@ -116,8 +117,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "38.0.0" }
arrow-flight = { version = "38.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="beee1d91303d5eff220fadd08b2c28404c2b3e5a", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="beee1d91303d5eff220fadd08b2c28404c2b3e5a" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="496fc399de700ae14fab436fdff8711cd3132436", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="496fc399de700ae14fab436fdff8711cd3132436" }
hashbrown = { version = "0.13.2" }
parquet = { version = "38.0.0" }
tonic = { version = "0.9.2", features = ["tls", "tls-webpki-roots"] }

View File

@ -20,5 +20,11 @@ base64 = "0.21.0"
snafu = "0.7"
tonic = { workspace = true }
[dev-dependencies]
assert_matches = "1.5.0"
paste = "1.0.12"
test_helpers_end_to_end = { path = "../test_helpers_end_to_end" }
tokio = "1.28.0"
[features]
http = ["dep:http"]

73
authz/src/authorizer.rs Normal file
View File

@ -0,0 +1,73 @@
use async_trait::async_trait;
use super::{Error, Permission};
/// An authorizer is used to validate a request
/// (+ associated permissions needed to fulfill the request)
/// with an authorization token that has been extracted from the request.
#[async_trait]
pub trait Authorizer: std::fmt::Debug + Send + Sync {
/// Determine the permissions associated with a request token.
///
/// The returned list of permissions is the intersection of the permissions
/// requested and the permissions associated with the token.
///
/// Implementations of this trait should return the specified errors under
/// the following conditions:
///
/// * [`Error::InvalidToken`]: the token is invalid / in an incorrect
/// format / otherwise corrupt and a permission check cannot be
/// performed
///
/// * [`Error::NoToken`]: the token was not provided
///
/// * [`Error::Forbidden`]: the token was well formed, but lacks
/// authorisation to perform the requested action
///
/// * [`Error::Verification`]: the token permissions were not possible
/// to validate - an internal error has occurred
async fn permissions(
&self,
token: Option<Vec<u8>>,
perms: &[Permission],
) -> Result<Vec<Permission>, Error>;
/// Make a test request that determines if end-to-end communication
/// with the service is working.
async fn probe(&self) -> Result<(), Error> {
match self.permissions(Some(b"".to_vec()), &[]).await {
// got response from authorizer server
Ok(_) | Err(Error::Forbidden) | Err(Error::InvalidToken) => Ok(()),
// other errors, including Error::Verification
Err(e) => Err(e),
}
}
}
/// Wrapped `Option<dyn Authorizer>`
/// Provides response to inner `IoxAuthorizer::permissions()`
#[async_trait]
impl<T: Authorizer> Authorizer for Option<T> {
async fn permissions(
&self,
token: Option<Vec<u8>>,
perms: &[Permission],
) -> Result<Vec<Permission>, Error> {
match self {
Some(authz) => authz.permissions(token, perms).await,
// no authz rpc service => return same perms requested. Used for testing.
None => Ok(perms.to_vec()),
}
}
}
#[async_trait]
impl<T: AsRef<dyn Authorizer> + std::fmt::Debug + Send + Sync> Authorizer for T {
async fn permissions(
&self,
token: Option<Vec<u8>>,
perms: &[Permission],
) -> Result<Vec<Permission>, Error> {
self.as_ref().permissions(token, perms).await
}
}

217
authz/src/iox_authorizer.rs Normal file
View File

@ -0,0 +1,217 @@
use async_trait::async_trait;
use generated_types::influxdata::iox::authz::v1::{self as proto, AuthorizeResponse};
use observability_deps::tracing::warn;
use snafu::Snafu;
use tonic::Response;
use super::{Authorizer, Permission};
/// Authorizer implementation using influxdata.iox.authz.v1 protocol.
#[derive(Clone, Debug)]
pub struct IoxAuthorizer {
client:
proto::iox_authorizer_service_client::IoxAuthorizerServiceClient<tonic::transport::Channel>,
}
impl IoxAuthorizer {
/// Attempt to create a new client by connecting to a given endpoint.
pub fn connect_lazy<D>(dst: D) -> Result<Self, Box<dyn std::error::Error>>
where
D: TryInto<tonic::transport::Endpoint> + Send,
D::Error: Into<tonic::codegen::StdError>,
{
let ep = tonic::transport::Endpoint::new(dst)?;
let client = proto::iox_authorizer_service_client::IoxAuthorizerServiceClient::new(
ep.connect_lazy(),
);
Ok(Self { client })
}
async fn request(
&self,
token: Vec<u8>,
requested_perms: &[Permission],
) -> Result<Response<AuthorizeResponse>, tonic::Status> {
let req = proto::AuthorizeRequest {
token,
permissions: requested_perms
.iter()
.filter_map(|p| p.clone().try_into().ok())
.collect(),
};
let mut client = self.client.clone();
client.authorize(req).await
}
}
#[async_trait]
impl Authorizer for IoxAuthorizer {
async fn permissions(
&self,
token: Option<Vec<u8>>,
requested_perms: &[Permission],
) -> Result<Vec<Permission>, Error> {
let authz_rpc_result = self
.request(token.ok_or(Error::NoToken)?, requested_perms)
.await
.map_err(|status| Error::Verification {
msg: status.message().to_string(),
source: Box::new(status),
})?
.into_inner();
if !authz_rpc_result.valid {
return Err(Error::InvalidToken);
}
let intersected_perms: Vec<Permission> = authz_rpc_result
.permissions
.into_iter()
.filter_map(|p| match p.try_into() {
Ok(p) => Some(p),
Err(e) => {
warn!(error=%e, "authz service returned incompatible permission");
None
}
})
.collect();
if intersected_perms.is_empty() {
return Err(Error::Forbidden);
}
Ok(intersected_perms)
}
}
/// Authorization related error.
#[derive(Debug, Snafu)]
pub enum Error {
/// Communication error when verifying a token.
#[snafu(display("token verification not possible: {msg}"))]
Verification {
/// Message describing the error.
msg: String,
/// Source of the error.
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
/// Token is invalid.
#[snafu(display("invalid token"))]
InvalidToken,
/// The token's permissions do not allow the operation.
#[snafu(display("forbidden"))]
Forbidden,
/// No token has been supplied, but is required.
#[snafu(display("no token"))]
NoToken,
}
impl Error {
/// Create new Error::Verification.
pub fn verification(
msg: impl Into<String>,
source: impl Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
) -> Self {
Self::Verification {
msg: msg.into(),
source: source.into(),
}
}
}
impl From<tonic::Status> for Error {
fn from(value: tonic::Status) -> Self {
Self::verification(value.message(), value.clone())
}
}
#[cfg(test)]
mod test {
use assert_matches::assert_matches;
use test_helpers_end_to_end::Authorizer as AuthorizerServer;
use super::*;
use crate::{Action, Authorizer, Permission, Resource};
const NAMESPACE: &str = "bananas";
macro_rules! test_iox_authorizer {
(
$name:ident,
token_permissions = $token_permissions:expr,
permissions_required = $permissions_required:expr,
want = $want:pat
) => {
paste::paste! {
#[tokio::test]
async fn [<test_iox_authorizer_ $name>]() {
let mut authz_server = AuthorizerServer::create().await;
let authz = IoxAuthorizer::connect_lazy(authz_server.addr())
.expect("Failed to create IoxAuthorizer client.");
let token = authz_server.create_token_for(NAMESPACE, $token_permissions);
let got = authz.permissions(
Some(token.as_bytes().to_vec()),
$permissions_required
).await;
assert_matches!(got, $want);
}
}
};
}
test_iox_authorizer!(
ok,
token_permissions = &["ACTION_WRITE"],
permissions_required = &[Permission::ResourceAction(
Resource::Database(NAMESPACE.to_string()),
Action::Write,
)],
want = Ok(_)
);
test_iox_authorizer!(
insufficient_perms,
token_permissions = &["ACTION_READ"],
permissions_required = &[Permission::ResourceAction(
Resource::Database(NAMESPACE.to_string()),
Action::Write,
)],
want = Err(Error::Forbidden)
);
test_iox_authorizer!(
any_of_required_perms,
token_permissions = &["ACTION_WRITE"],
permissions_required = &[
Permission::ResourceAction(Resource::Database(NAMESPACE.to_string()), Action::Write,),
Permission::ResourceAction(Resource::Database(NAMESPACE.to_string()), Action::Create,)
],
want = Ok(_)
);
#[tokio::test]
async fn test_invalid_token() {
let authz_server = AuthorizerServer::create().await;
let authz = IoxAuthorizer::connect_lazy(authz_server.addr())
.expect("Failed to create IoxAuthorizer client.");
let invalid_token = b"UGLY";
let got = authz
.permissions(
Some(invalid_token.to_vec()),
&[Permission::ResourceAction(
Resource::Database(NAMESPACE.to_string()),
Action::Read,
)],
)
.await;
assert_matches!(got, Err(Error::InvalidToken));
}
}

View File

@ -16,12 +16,14 @@
)]
#![allow(rustdoc::private_intra_doc_links)]
use async_trait::async_trait;
use base64::{prelude::BASE64_STANDARD, Engine};
use generated_types::influxdata::iox::authz::v1 as proto;
use generated_types::influxdata::iox::authz::v1::{self as proto};
use observability_deps::tracing::warn;
use snafu::Snafu;
mod authorizer;
pub use authorizer::Authorizer;
mod iox_authorizer;
pub use iox_authorizer::{Error, IoxAuthorizer};
mod permission;
pub use permission::{Action, Permission, Resource};
@ -48,166 +50,6 @@ pub fn extract_token<T: AsRef<[u8]> + ?Sized>(value: Option<&T>) -> Option<Vec<u
}
}
/// An authorizer is used to validate the associated with
/// an authorization token that has been extracted from a request.
#[async_trait]
pub trait Authorizer: std::fmt::Debug + Send + Sync {
/// Determine the permissions associated with a request token.
///
/// The returned list of permissions is the intersection of the permissions
/// requested and the permissions associated with the token. An error
/// will only be returned if there is a failure processing the token.
/// An invalid token is taken to have no permissions, so these along
/// with tokens that match none of the requested permissions will return
/// empty permission sets.
async fn permissions(
&self,
token: Option<Vec<u8>>,
perms: &[Permission],
) -> Result<Vec<Permission>, Error>;
/// Make a test request that determines if end-to-end communication
/// with the service is working.
async fn probe(&self) -> Result<(), Error> {
self.permissions(Some(b"".to_vec()), &[]).await?;
Ok(())
}
/// Determine if a token has any of the requested permissions.
///
/// If the token has none of the permissions requested then a Forbidden
/// error is returned.
async fn require_any_permission(
&self,
token: Option<Vec<u8>>,
perms: &[Permission],
) -> Result<(), Error> {
if self.permissions(token, perms).await?.is_empty() {
Err(Error::Forbidden)
} else {
Ok(())
}
}
}
#[async_trait]
impl<T: Authorizer> Authorizer for Option<T> {
async fn permissions(
&self,
token: Option<Vec<u8>>,
perms: &[Permission],
) -> Result<Vec<Permission>, Error> {
match self {
Some(authz) => authz.permissions(token, perms).await,
None => Ok(perms.to_vec()),
}
}
}
#[async_trait]
impl<T: AsRef<dyn Authorizer> + std::fmt::Debug + Send + Sync> Authorizer for T {
async fn permissions(
&self,
token: Option<Vec<u8>>,
perms: &[Permission],
) -> Result<Vec<Permission>, Error> {
self.as_ref().permissions(token, perms).await
}
}
/// Authorizer implementation using influxdata.iox.authz.v1 protocol.
#[derive(Clone, Debug)]
pub struct IoxAuthorizer {
client:
proto::iox_authorizer_service_client::IoxAuthorizerServiceClient<tonic::transport::Channel>,
}
impl IoxAuthorizer {
/// Attempt to create a new client by connecting to a given endpoint.
pub fn connect_lazy<D>(dst: D) -> Result<Self, Box<dyn std::error::Error>>
where
D: TryInto<tonic::transport::Endpoint> + Send,
D::Error: Into<tonic::codegen::StdError>,
{
let ep = tonic::transport::Endpoint::new(dst)?;
let client = proto::iox_authorizer_service_client::IoxAuthorizerServiceClient::new(
ep.connect_lazy(),
);
Ok(Self { client })
}
}
#[async_trait]
impl Authorizer for IoxAuthorizer {
async fn permissions(
&self,
token: Option<Vec<u8>>,
perms: &[Permission],
) -> Result<Vec<Permission>, Error> {
let req = proto::AuthorizeRequest {
token: token.ok_or(Error::NoToken)?,
permissions: perms
.iter()
.filter_map(|p| p.clone().try_into().ok())
.collect(),
};
let mut client = self.client.clone();
let resp = client.authorize(req).await?;
Ok(resp
.into_inner()
.permissions
.into_iter()
.filter_map(|p| match p.try_into() {
Ok(p) => Some(p),
Err(e) => {
warn!(error=%e, "authz service returned incompatible permission");
None
}
})
.collect())
}
}
/// Authorization related error.
#[derive(Debug, Snafu)]
pub enum Error {
/// Communication error when verifying a token.
#[snafu(display("token verification not possible: {msg}"))]
Verification {
/// Message describing the error.
msg: String,
/// Source of the error.
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
/// The token's permissions do not allow the operation.
#[snafu(display("forbidden"))]
Forbidden,
/// No token has been supplied, but is required.
#[snafu(display("no token"))]
NoToken,
}
impl Error {
/// Create new Error::Verification.
pub fn verification(
msg: impl Into<String>,
source: impl Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
) -> Self {
Self::Verification {
msg: msg.into(),
source: source.into(),
}
}
}
impl From<tonic::Status> for Error {
fn from(value: tonic::Status) -> Self {
Self::verification(value.message(), value.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -19,7 +19,6 @@ observability_deps = { path = "../observability_deps" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.96"
snafu = "0.7"
tempfile = "3.5.0"
trace = { path = "../trace" }
trace_exporters = { path = "../trace_exporters" }
trogging = { path = "../trogging", default-features = false, features = ["clap"] }
@ -27,6 +26,7 @@ uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
tempfile = "3.5.0"
test_helpers = { path = "../test_helpers" }
[features]

View File

@ -1,14 +1,13 @@
//! Catalog-DSN-related configs.
use iox_catalog::sqlite::{SqliteCatalog, SqliteConnectionOptions};
use iox_catalog::{
create_or_get_default_records,
interface::Catalog,
mem::MemCatalog,
postgres::{PostgresCatalog, PostgresConnectionOptions},
};
use observability_deps::tracing::*;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{ops::DerefMut, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
@ -211,13 +210,6 @@ impl CatalogDsnConfig {
}
CatalogType::Memory => {
let mem = MemCatalog::new(metrics);
let mut txn = mem.start_transaction().await.context(CatalogSnafu)?;
create_or_get_default_records(txn.deref_mut())
.await
.context(CatalogSnafu)?;
txn.commit().await.context(CatalogSnafu)?;
Arc::new(mem) as Arc<dyn Catalog>
}
CatalogType::Sqlite => {

View File

@ -1,4 +1,4 @@
//! CLI config for compactor2-related commands
//! CLI config for compactor-related commands
use std::num::NonZeroUsize;
@ -13,9 +13,9 @@ pub enum CompactionType {
Cold,
}
/// CLI config for compactor2
/// CLI config for compactor
#[derive(Debug, Clone, clap::Parser)]
pub struct Compactor2Config {
pub struct CompactorConfig {
/// Type of compaction to perform.
#[clap(
value_enum,
@ -235,6 +235,7 @@ pub struct Compactor2Config {
/// Number of shards.
///
/// If this is set then the shard ID MUST also be set. If both are not provided, sharding is disabled.
/// (shard ID can be provided by the host name)
#[clap(
long = "compaction-shard-count",
env = "INFLUXDB_IOX_COMPACTION_SHARD_COUNT",
@ -254,6 +255,13 @@ pub struct Compactor2Config {
)]
pub shard_id: Option<usize>,
/// Host Name
///
/// comprised of leading text (e.g. 'iox-shared-compactor-'), ending with shard_id (e.g. '0').
/// When shard_count is specified, but shard_id is not specified, the id is extracted from hostname.
#[clap(long = "hostname", env = "HOSTNAME", action)]
pub hostname: Option<String>,
/// Minimum number of L1 files to compact to L2.
///
/// If there are more than this many L1 (by definition non
@ -314,27 +322,27 @@ mod tests {
#[test]
fn default_compaction_type_is_hot() {
let config = Compactor2Config::try_parse_from(["my_binary"]).unwrap();
let config = CompactorConfig::try_parse_from(["my_binary"]).unwrap();
assert_eq!(config.compaction_type, CompactionType::Hot);
}
#[test]
fn can_specify_hot() {
let config =
Compactor2Config::try_parse_from(["my_binary", "--compaction-type", "hot"]).unwrap();
CompactorConfig::try_parse_from(["my_binary", "--compaction-type", "hot"]).unwrap();
assert_eq!(config.compaction_type, CompactionType::Hot);
}
#[test]
fn can_specify_cold() {
let config =
Compactor2Config::try_parse_from(["my_binary", "--compaction-type", "cold"]).unwrap();
CompactorConfig::try_parse_from(["my_binary", "--compaction-type", "cold"]).unwrap();
assert_eq!(config.compaction_type, CompactionType::Cold);
}
#[test]
fn any_other_compaction_type_string_is_invalid() {
let error = Compactor2Config::try_parse_from(["my_binary", "--compaction-type", "hello"])
let error = CompactorConfig::try_parse_from(["my_binary", "--compaction-type", "hello"])
.unwrap_err()
.to_string();
assert_contains!(

View File

@ -32,7 +32,8 @@ pub struct GarbageCollectorConfig {
)]
pub objectstore_concurrent_deletes: usize,
/// Number of minutes to sleep between iterations of the objectstore deletion loop.
/// Number of minutes to sleep between iterations of the objectstore list loop.
/// This is the sleep between entirely fresh list operations.
/// Defaults to 30 minutes.
#[clap(
long,
@ -41,6 +42,16 @@ pub struct GarbageCollectorConfig {
)]
pub objectstore_sleep_interval_minutes: u64,
/// Number of milliseconds to sleep between listing consecutive chunks of objecstore files.
/// Object store listing is processed in batches; this is the sleep between batches.
/// Defaults to 1000 milliseconds.
#[clap(
long,
default_value_t = 1000,
env = "INFLUXDB_IOX_GC_OBJECTSTORE_SLEEP_INTERVAL_BATCH_MILLISECONDS"
)]
pub objectstore_sleep_interval_batch_milliseconds: u64,
/// Parquet file rows in the catalog flagged for deletion before this duration will be deleted.
/// Parsed with <https://docs.rs/humantime/latest/humantime/fn.parse_duration.html>
///

View File

@ -5,7 +5,7 @@ use std::path::PathBuf;
/// CLI config for the ingester using the RPC write path
#[derive(Debug, Clone, clap::Parser)]
#[allow(missing_copy_implementations)]
pub struct Ingester2Config {
pub struct IngesterConfig {
/// Where this ingester instance should store its write-ahead log files. Each ingester instance
/// must have its own directory.
#[clap(long = "wal-directory", env = "INFLUXDB_IOX_WAL_DIRECTORY", action)]

View File

@ -13,13 +13,13 @@
clippy::dbg_macro
)]
pub mod catalog_dsn;
pub mod compactor2;
pub mod compactor;
pub mod garbage_collector;
pub mod ingester2;
pub mod ingester;
pub mod ingester_address;
pub mod object_store;
pub mod querier;
pub mod router2;
pub mod router;
pub mod run_config;
pub mod single_tenant;
pub mod socket_addr;

View File

@ -14,7 +14,7 @@ use std::{
/// CLI config for the router using the RPC write path
#[derive(Debug, Clone, clap::Parser)]
#[allow(missing_copy_implementations)]
pub struct Router2Config {
pub struct RouterConfig {
/// Addr for connection to authz
#[clap(
long = CONFIG_AUTHZ_FLAG,
@ -71,26 +71,6 @@ pub struct Router2Config {
)]
pub ingester_addresses: Vec<IngesterAddress>,
/// Write buffer topic/database that should be used.
// This isn't really relevant to the RPC write path and will be removed eventually.
#[clap(
long = "write-buffer-topic",
env = "INFLUXDB_IOX_WRITE_BUFFER_TOPIC",
default_value = "iox-shared",
action
)]
pub topic: String,
/// Query pool name to dispatch writes to.
// This isn't really relevant to the RPC write path and will be removed eventually.
#[clap(
long = "query-pool",
env = "INFLUXDB_IOX_QUERY_POOL_NAME",
default_value = "iox-shared",
action
)]
pub query_pool_name: String,
/// Retention period to use when auto-creating namespaces.
/// For infinite retention, leave this unset and it will default to `None`.
/// Setting it to zero will not make it infinite.
@ -112,18 +92,6 @@ pub struct Router2Config {
)]
pub namespace_autocreation_enabled: bool,
/// A "strftime" format string used to derive the partition key from the row
/// timestamps.
///
/// Changing this from the default value is experimental.
#[clap(
long = "partition-key-pattern",
env = "INFLUXDB_IOX_PARTITION_KEY_PATTERN",
default_value = "%Y-%m-%d",
action
)]
pub partition_key_pattern: String,
/// Specify the timeout in seconds for a single RPC write request to an
/// ingester.
#[clap(

View File

@ -1,5 +1,5 @@
[package]
name = "compactor2"
name = "compactor"
version.workspace = true
authors.workspace = true
edition.workspace = true
@ -33,7 +33,7 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
arrow_util = { path = "../arrow_util" }
assert_matches = "1"
compactor2_test_utils = { path = "../compactor2_test_utils" }
compactor_test_utils = { path = "../compactor_test_utils" }
iox_tests = { path = "../iox_tests" }
test_helpers = { path = "../test_helpers"}
insta = { version = "1.29.0", features = ["yaml"] }

View File

Before

Width:  |  Height:  |  Size: 210 KiB

After

Width:  |  Height:  |  Size: 210 KiB

View File

@ -29,12 +29,12 @@ fn shared_handle(handle: JoinHandle<()>) -> SharedJoinHandle {
/// Main compactor driver.
#[derive(Debug)]
pub struct Compactor2 {
pub struct Compactor {
shutdown: CancellationToken,
worker: SharedJoinHandle,
}
impl Compactor2 {
impl Compactor {
/// Start compactor.
pub fn start(config: Config) -> Self {
info!("compactor starting");
@ -84,7 +84,7 @@ impl Compactor2 {
}
}
impl Drop for Compactor2 {
impl Drop for Compactor {
fn drop(&mut self) {
if self.worker.clone().now_or_never().is_none() {
warn!("Compactor was not shut down properly");

View File

@ -184,6 +184,7 @@ fn to_queryable_parquet_chunk(
let table_schema: Schema = partition_info
.table_schema
.as_ref()
.columns
.clone()
.try_into()
.expect("table schema is broken");

View File

@ -128,7 +128,7 @@ impl FilesSplit for NonOverlapSplit {
#[cfg(test)]
mod tests {
use compactor2_test_utils::{
use compactor_test_utils::{
create_l1_files, create_overlapped_files, create_overlapped_files_2,
create_overlapped_l0_l1_files, create_overlapped_l1_l2_files, format_files,
format_files_split,

View File

@ -35,7 +35,7 @@ impl FilesSplit for TargetLevelSplit {
#[cfg(test)]
mod tests {
use compactor2_test_utils::{
use compactor_test_utils::{
create_l0_files, create_l1_files, create_l2_files, create_overlapped_files, format_files,
format_files_split,
};

View File

@ -140,7 +140,7 @@ impl FilesSplit for UpgradeSplit {
#[cfg(test)]
mod tests {
use compactor2_test_utils::{
use compactor_test_utils::{
create_l0_files, create_l1_files, create_l1_files_mix_size, create_overlapped_files,
create_overlapped_files_2, create_overlapped_files_3, create_overlapped_files_3_mix_size,
create_overlapped_l0_l1_files, create_overlapped_l1_l2_files,

View File

@ -6,6 +6,7 @@ use std::{sync::Arc, time::Duration};
use data_types::CompactionLevel;
use object_store::memory::InMemory;
use observability_deps::tracing::info;
use crate::{
config::{CompactionType, Config, PartitionsSourceConfig},
@ -156,6 +157,10 @@ fn make_partitions_source_commit_partition_sink(
let mut id_only_partition_filters: Vec<Arc<dyn IdOnlyPartitionFilter>> = vec![];
if let Some(shard_config) = &config.shard_config {
// add shard filter before performing any catalog IO
info!(
"starting compactor {} of {}",
shard_config.shard_id, shard_config.n_shards
);
id_only_partition_filters.push(Arc::new(ShardPartitionFilter::new(
shard_config.n_shards,
shard_config.shard_id,

View File

@ -49,9 +49,7 @@ impl NamespacesSource for MockNamespacesSource {
mod tests {
use std::collections::BTreeMap;
use data_types::{
ColumnId, ColumnSchema, ColumnType, QueryPoolId, TableId, TableSchema, TopicId,
};
use data_types::{Column, ColumnId, ColumnType, ColumnsByName, TableId, TableSchema};
use super::*;
@ -132,21 +130,20 @@ mod tests {
"table1".to_string(),
TableSchema {
id: TableId::new(1),
columns: BTreeMap::from([
(
"col1".to_string(),
ColumnSchema {
id: ColumnId::new(1),
column_type: ColumnType::I64,
},
),
(
"col2".to_string(),
ColumnSchema {
id: ColumnId::new(2),
column_type: ColumnType::String,
},
),
partition_template: None,
columns: ColumnsByName::new([
Column {
name: "col1".to_string(),
id: ColumnId::new(1),
column_type: ColumnType::I64,
table_id: TableId::new(1),
},
Column {
name: "col2".to_string(),
id: ColumnId::new(2),
column_type: ColumnType::String,
table_id: TableId::new(1),
},
]),
},
),
@ -154,43 +151,37 @@ mod tests {
"table2".to_string(),
TableSchema {
id: TableId::new(2),
columns: BTreeMap::from([
(
"col1".to_string(),
ColumnSchema {
id: ColumnId::new(3),
column_type: ColumnType::I64,
},
),
(
"col2".to_string(),
ColumnSchema {
id: ColumnId::new(4),
column_type: ColumnType::String,
},
),
(
"col3".to_string(),
ColumnSchema {
id: ColumnId::new(5),
column_type: ColumnType::F64,
},
),
partition_template: None,
columns: ColumnsByName::new([
Column {
name: "col1".to_string(),
id: ColumnId::new(3),
column_type: ColumnType::I64,
table_id: TableId::new(2),
},
Column {
name: "col2".to_string(),
id: ColumnId::new(4),
column_type: ColumnType::String,
table_id: TableId::new(2),
},
Column {
name: "col3".to_string(),
id: ColumnId::new(5),
column_type: ColumnType::F64,
table_id: TableId::new(2),
},
]),
},
),
]);
let id = NamespaceId::new(id);
let topic_id = TopicId::new(0);
let query_pool_id = QueryPoolId::new(0);
Self {
namespace: NamespaceWrapper {
ns: Namespace {
id,
name: "ns".to_string(),
topic_id,
query_pool_id,
max_tables: 10,
max_columns_per_table: 10,
retention_period_ns: None,
@ -198,12 +189,11 @@ mod tests {
},
schema: NamespaceSchema {
id,
topic_id,
query_pool_id,
tables,
max_columns_per_table: 10,
max_tables: 42,
retention_period_ns: None,
partition_template: None,
},
},
}

View File

@ -4,7 +4,7 @@ use std::{
};
use async_trait::async_trait;
use data_types::{ColumnSet, CompactionLevel, ParquetFileParams, SequenceNumber, Timestamp};
use data_types::{ColumnSet, CompactionLevel, ParquetFileParams, Timestamp};
use datafusion::{
arrow::{datatypes::SchemaRef, record_batch::RecordBatch},
error::DataFusionError,
@ -72,7 +72,6 @@ impl ParquetFileSink for MockParquetFileSink {
table_id: partition.table.id,
partition_id: partition.partition_id,
object_store_id: Uuid::from_u128(guard.len() as u128),
max_sequence_number: SequenceNumber::new(0),
min_time: Timestamp::new(0),
max_time: Timestamp::new(0),
file_size_bytes: 1,
@ -168,7 +167,6 @@ mod tests {
table_id: TableId::new(3),
partition_id: PartitionId::new(1),
object_store_id: Uuid::from_u128(2),
max_sequence_number: SequenceNumber::new(0),
min_time: Timestamp::new(0),
max_time: Timestamp::new(0),
file_size_bytes: 1,
@ -231,7 +229,6 @@ mod tests {
table_id: TableId::new(3),
partition_id: PartitionId::new(1),
object_store_id: Uuid::from_u128(0),
max_sequence_number: SequenceNumber::new(0),
min_time: Timestamp::new(0),
max_time: Timestamp::new(0),
file_size_bytes: 1,

View File

@ -1,7 +1,7 @@
use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFileParams, SequenceNumber};
use data_types::{CompactionLevel, ParquetFileParams};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use iox_time::{Time, TimeProvider};
use parquet_file::{
@ -15,9 +15,6 @@ use crate::partition_info::PartitionInfo;
use super::ParquetFileSink;
// fields no longer used but still exists in the catalog
const MAX_SEQUENCE_NUMBER: i64 = 0;
#[derive(Debug)]
pub struct ObjectStoreParquetFileSink {
store: ParquetStorage,
@ -57,7 +54,6 @@ impl ParquetFileSink for ObjectStoreParquetFileSink {
table_name: partition.table.name.clone().into(),
partition_id: partition.partition_id,
partition_key: partition.partition_key.clone(),
max_sequence_number: SequenceNumber::new(MAX_SEQUENCE_NUMBER),
compaction_level: level,
sort_key: partition.sort_key.clone(),
max_l0_created_at,

Some files were not shown because too many files have changed in this diff Show More