Merge branch 'main' into ntran/grpc_compact_os_chunks

pull/24376/head
kodiakhq[bot] 2021-12-07 03:26:06 +00:00 committed by GitHub
commit 75cd1d24f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
80 changed files with 724 additions and 908 deletions

View File

@ -323,8 +323,10 @@ jobs:
# To change the contents of the build container, modify docker/Dockerfile.ci
# To change the final release container, modify docker/Dockerfile.iox
perf_image:
docker:
- image: quay.io/influxdb/rust:ci
# need a machine executor to have a full-powered docker daemon (the `setup_remote_docker` system just provides a
# kinda small node)
machine:
image: ubuntu-2004:202111-01
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
@ -337,47 +339,27 @@ jobs:
RUSTFLAGS: "-C target-feature=+avx2"
steps:
- checkout
- rust_components
- cache_restore
- run:
name: Cargo release build with target arch set for CRoaring
command: cargo build --release --no-default-features --features="aws,gcp,azure,jemalloc_replacing_malloc"
- run:
name: Compress debug symbols
command: objcopy --compress-debug-sections target/release/influxdb_iox
- run: |
echo "binary sha256sum after build is (not to be confused w/ the GIT SHA and the resulting image SHA!)"
sha256sum target/release/influxdb_iox
- run:
name: Print rustc target CPU options
command: target/release/influxdb_iox debug print-cpu
- setup_remote_docker:
# There seems to be a cache invalidation bug in docker
# or in the way that circleci implements layer caching.
# Disabling for now, and tracked further investigations
# in https://github.com/influxdata/k8s-idpe/issues/3038
docker_layer_caching: false
version: 20.10.7
- run: |
sudo apt-get update
sudo apt-get install -y docker.io
docker --version
- run: |
echo "$QUAY_INFLUXDB_IOX_PASS" | docker login quay.io --username $QUAY_INFLUXDB_IOX_USER --password-stdin
- run:
# Docker has functionality to support per-Dockerfile .dockerignore
# This was added in https://github.com/moby/buildkit/pull/901
# And included in 19.03 - https://docs.docker.com/engine/release-notes/19.03/#19030
# Unfortunately CircleCI only seems to understand a root-level .dockerignore
# So we need to move it into position for it to not send ~10GB of build context
name: Fudge CircleCI Docker Context
command: mv docker/Dockerfile.iox.dockerignore .dockerignore
- run: |
BRANCH=$(git rev-parse --abbrev-ref HEAD | tr '/' '.')
COMMIT_SHA=$(git rev-parse --short HEAD)
docker build -t quay.io/influxdb/iox:$COMMIT_SHA -t quay.io/influxdb/iox:main -f docker/Dockerfile.iox .
docker push --all-tags quay.io/influxdb/iox
echo "export COMMIT_SHA=${COMMIT_SHA}" >> $BASH_ENV
name: Cargo release build with target arch set for CRoaring
command: |
COMMIT_SHA=$(git rev-parse --short HEAD)
docker buildx build \
--build-arg RUSTFLAGS="-C target-feature=+avx2" \
--progress plain \
--tag quay.io/influxdb/iox:"$COMMIT_SHA" \
--tag quay.io/influxdb/iox:"$(echo "$CIRCLE_BRANCH" | tr '[:upper:]' '[:lower:]' | sed 's/[^a-z0-9]/_/g')" \
.
docker run -it --rm quay.io/influxdb/iox:$COMMIT_SHA debug print-cpu
docker push --all-tags quay.io/influxdb/iox
echo "export COMMIT_SHA=${COMMIT_SHA}" >> $BASH_ENV
# linking might take a while and doesn't produce CLI output
no_output_timeout: 20m
- run:
name: Deploy tags
command: |
@ -413,8 +395,12 @@ jobs:
parameters:
# Trigger build of CI image
ci_image:
description: "Trigger build of CI image"
type: boolean
default: false
perf_image:
description: "Trigger build of perf image"
type: boolean
default: false
@ -457,6 +443,12 @@ workflows:
jobs:
- ci_image
# Manual build of perf image
perf_image:
when: << pipeline.parameters.perf_image >>
jobs:
- perf_image
# Nightly rebuild of the build container
ci_image_nightly:
triggers:

View File

@ -23,18 +23,28 @@ platforms = [
# Don't search in these crates for dependencies, and don't have these crates depend on the
# workspace-hack crate.
#
# Includes most bench- or test-only crates except for query_tests, as that crate is built often
# and should share as many dependencies as possible.
# Lists most bench- or test-only crates except for query_tests, as that crate is built often
# and should share as many dependencies as possible. Also lists optional object_store dependencies
# as those are usually off in development.
[traversal-excludes]
workspace-members = [
"grpc-router",
"grpc-router-test-gen",
"influxdb_iox_client",
"influxdb2_client",
"iox_data_generator",
"mutable_batch_tests",
"server_benchmarks",
"trogging",
]
third-party = [
{ name = "azure_core", git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "cb5aba657fe378009c38e979434b2bc93ac0a3d5" },
{ name = "azure_storage", git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "cb5aba657fe378009c38e979434b2bc93ac0a3d5" },
{ name = "cloud-storage" },
{ name = "criterion" },
{ name = "pprof" },
{ name = "rusoto_core" },
{ name = "rusoto_credential" },
{ name = "rusoto_s3" },
{ name = "tikv-jemalloc-sys" },
]

216
Cargo.lock generated
View File

@ -58,15 +58,6 @@ dependencies = [
"alloc-no-stdlib",
]
[[package]]
name = "ansi_term"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
dependencies = [
"winapi",
]
[[package]]
name = "ansi_term"
version = "0.12.1"
@ -299,9 +290,9 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bindgen"
version = "0.59.1"
version = "0.59.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "453c49e5950bb0eb63bb3df640e31618846c89d5b7faa54040d76e98e0134375"
checksum = "2bd2a9a458e8f4304c52c43ebb0cfbd520289f8379a52e329a38afda99bf8eb8"
dependencies = [
"bitflags",
"cexpr",
@ -317,7 +308,7 @@ dependencies = [
"regex",
"rustc-hash",
"shlex",
"which 3.1.1",
"which",
]
[[package]]
@ -326,18 +317,6 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "bitvec"
version = "0.19.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8942c8d352ae1838c9dda0b0ca2ab657696ef2232a20147cf1b30ae1a9cb4321"
dependencies = [
"funty",
"radium",
"tap",
"wyz",
]
[[package]]
name = "block-buffer"
version = "0.7.3"
@ -462,11 +441,11 @@ dependencies = [
[[package]]
name = "cexpr"
version = "0.5.0"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db507a7679252d2276ed0dd8113c6875ec56d3089f9225b2b42c30cc1f8e5c89"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom 6.1.2",
"nom",
]
[[package]]
@ -513,11 +492,11 @@ dependencies = [
[[package]]
name = "clap"
version = "2.33.3"
version = "2.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c"
dependencies = [
"ansi_term 0.11.0",
"ansi_term",
"atty",
"bitflags",
"strsim",
@ -630,9 +609,9 @@ dependencies = [
[[package]]
name = "crc32fast"
version = "1.2.2"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3825b1e8580894917dc4468cb634a1b4e9745fddc854edad72d9c04644c0319f"
checksum = "738c290dfaea84fc1ca15ad9c168d083b05a714e1efddd8edaab678dc28d2836"
dependencies = [
"cfg-if",
]
@ -964,9 +943,9 @@ checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
[[package]]
name = "env_logger"
version = "0.8.4"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3"
dependencies = [
"atty",
"humantime",
@ -1077,17 +1056,11 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
[[package]]
name = "funty"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
[[package]]
name = "futures"
version = "0.3.17"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca"
checksum = "8cd0210d8c325c245ff06fd95a3b13689a1a276ac8cfa8e8720cb840bfb84b9e"
dependencies = [
"futures-channel",
"futures-core",
@ -1116,9 +1089,9 @@ checksum = "629316e42fe7c2a0b9a65b47d159ceaa5453ab14e8f0a3c5eedbb8cd55b4a445"
[[package]]
name = "futures-executor"
version = "0.3.17"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c"
checksum = "7b808bf53348a36cab739d7e04755909b9fcaaa69b7d7e588b37b6ec62704c97"
dependencies = [
"futures-core",
"futures-task",
@ -1133,12 +1106,10 @@ checksum = "e481354db6b5c353246ccf6a728b0c5511d752c08da7260546fc0933869daa11"
[[package]]
name = "futures-macro"
version = "0.3.17"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb"
checksum = "a89f17b21645bc4ed773c69af9c9a0effd4a3f1a3876eadd453469f8854e7fdd"
dependencies = [
"autocfg",
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
@ -1158,9 +1129,9 @@ checksum = "dabf1872aaab32c886832f2276d2f5399887e2bd613698a02359e4ea83f8de12"
[[package]]
name = "futures-test"
version = "0.3.17"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46b9f6d284a9595a613f881a7b08d46abaab1005995e6fe3ccfe2398f6aea492"
checksum = "10cdcfa59cedc30521f70d43bd021845ff2c89457d24486c5c3c8041cef8cccb"
dependencies = [
"futures-core",
"futures-executor",
@ -1175,11 +1146,10 @@ dependencies = [
[[package]]
name = "futures-util"
version = "0.3.17"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481"
checksum = "41d22213122356472061ac0f1ab2cee28d2bac8491410fd68c2af53d1cedb83e"
dependencies = [
"autocfg",
"futures-channel",
"futures-core",
"futures-io",
@ -1189,8 +1159,6 @@ dependencies = [
"memchr",
"pin-project-lite",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
@ -1319,9 +1287,9 @@ checksum = "ac5956d4e63858efaec57e0d6c1c2f6a41e1487f830314a324ccd7e2223a7ca0"
[[package]]
name = "handlebars"
version = "4.1.5"
version = "4.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ad84da8f63da982543fc85fcabaee2ad1fdd809d99d64a48887e2e942ddfe46"
checksum = "167fa173496c9eadd8749cca6f8339ac88e248f3ad2442791d0b743318a94fc0"
dependencies = [
"log",
"pest",
@ -1456,17 +1424,15 @@ dependencies = [
[[package]]
name = "hyper-rustls"
version = "0.22.1"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64"
checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac"
dependencies = [
"futures-util",
"http",
"hyper",
"log",
"rustls",
"tokio",
"tokio-rustls",
"webpki",
]
[[package]]
@ -1550,7 +1516,6 @@ dependencies = [
"tokio",
"url",
"uuid",
"workspace-hack",
]
[[package]]
@ -1582,7 +1547,6 @@ dependencies = [
"http",
"humantime",
"hyper",
"influxdb2_client",
"influxdb_iox_client",
"influxdb_line_protocol",
"influxdb_storage_client",
@ -1677,7 +1641,7 @@ dependencies = [
name = "influxdb_line_protocol"
version = "0.1.0"
dependencies = [
"nom 7.1.0",
"nom",
"observability_deps",
"smallvec",
"snafu",
@ -1754,7 +1718,6 @@ dependencies = [
"criterion",
"data_types",
"futures",
"generated_types",
"handlebars",
"humantime",
"influxdb2_client",
@ -1920,15 +1883,15 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.108"
version = "0.2.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119"
checksum = "f98a04dce437184842841303488f70d0188c5f51437d2a834dc097eafa909a01"
[[package]]
name = "libloading"
version = "0.7.1"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0cf036d15402bea3c5d4de17b3fce76b3e4a56ebc1f577be0e7a72f7c607cf0"
checksum = "afe203d669ec979b7128619bae5a63b7b42e9203c1b29146079ee05e2f604b52"
dependencies = [
"cfg-if",
"winapi",
@ -2262,7 +2225,6 @@ dependencies = [
"schema",
"snafu",
"test_helpers",
"tokio",
"workspace-hack",
]
@ -2325,18 +2287,6 @@ version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb"
[[package]]
name = "nom"
version = "6.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2"
dependencies = [
"bitvec",
"funty",
"memchr",
"version_check",
]
[[package]]
name = "nom"
version = "7.1.0"
@ -3105,18 +3055,6 @@ dependencies = [
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro-nested"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]]
name = "proc-macro2"
version = "1.0.27"
@ -3165,7 +3103,7 @@ dependencies = [
"prost",
"prost-types",
"tempfile",
"which 4.2.2",
"which",
]
[[package]]
@ -3269,12 +3207,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "radium"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"
[[package]]
name = "radix_trie"
version = "0.2.1"
@ -3471,9 +3403,9 @@ dependencies = [
[[package]]
name = "reqwest"
version = "0.11.6"
version = "0.11.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66d2927ca2f685faf0fc620ac4834690d29e7abb153add10f5812eef20b5e280"
checksum = "07bea77bc708afa10e59905c3d4af7c8fd43c9214251673095ff8b14345fcbc5"
dependencies = [
"base64 0.13.0",
"bytes",
@ -3494,12 +3426,14 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-util",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
@ -3662,15 +3596,23 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.19.1"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
checksum = "d37e5e2290f3e040b594b1a9e04377c2c671f1a1cfd9bfdef82106ac1c113f84"
dependencies = [
"base64 0.13.0",
"log",
"ring",
"sct",
"webpki",
"webpki 0.22.0",
]
[[package]]
name = "rustls-pemfile"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9"
dependencies = [
"base64 0.13.0",
]
[[package]]
@ -3747,9 +3689,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "sct"
version = "0.6.1"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
dependencies = [
"ring",
"untrusted",
@ -4200,12 +4142,6 @@ version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f886607578031fffb0996a24a5e5b73313f36dca63416b9d1c1004f7cb6084d"
[[package]]
name = "tap"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "tempfile"
version = "3.2.0"
@ -4426,13 +4362,13 @@ dependencies = [
[[package]]
name = "tokio-rustls"
version = "0.22.0"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6"
checksum = "4baa378e417d780beff82bf54ceb0d195193ea6a00c14e22359e7f39456b5689"
dependencies = [
"rustls",
"tokio",
"webpki",
"webpki 0.22.0",
]
[[package]]
@ -4687,11 +4623,11 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7507ec620f809cdf07cccb5bc57b13069a88031b795efd4079b1c71b66c1613d"
checksum = "245da694cc7fc4729f3f418b304cb57789f1bed2a78c575407ab8a23f53cb4d3"
dependencies = [
"ansi_term 0.12.1",
"ansi_term",
"lazy_static",
"matchers",
"regex",
@ -4820,7 +4756,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
dependencies = [
"getrandom",
"serde",
]
[[package]]
@ -4963,22 +4898,23 @@ dependencies = [
"untrusted",
]
[[package]]
name = "webpki"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940"
dependencies = [
"webpki",
]
[[package]]
name = "which"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d011071ae14a2f6671d0b74080ae0cd8ebf3a6f8c9589a2cd45f23126fe29724"
dependencies = [
"libc",
"webpki 0.21.4",
]
[[package]]
@ -5040,21 +4976,15 @@ dependencies = [
"bytes",
"cc",
"chrono",
"clap",
"either",
"futures",
"futures-channel",
"futures-core",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
"getrandom",
"hashbrown",
"hyper",
"indexmap",
"itoa",
"libc",
"log",
"memchr",
"num-bigint 0.4.3",
@ -5077,8 +5007,6 @@ dependencies = [
"tracing",
"tracing-core",
"tracing-subscriber",
"url",
"uuid",
]
[[package]]
@ -5111,12 +5039,6 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "wyz"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"
[[package]]
name = "xml-rs"
version = "0.8.4"

View File

@ -10,12 +10,12 @@ members = [
"generated_types",
"grpc-router",
"grpc-router-test-gen",
"influxdb2_client",
"influxdb_iox",
"influxdb_iox_client",
"influxdb_line_protocol",
"influxdb_storage_client",
"influxdb_tsm",
"influxdb2_client",
"internal_types",
"iox_data_generator",
"iox_object_store",

View File

@ -1,34 +1,51 @@
#syntax=docker/dockerfile:1.2
FROM rust:1.56-slim-bullseye as build
ARG RUST_VERSION=1.57
FROM rust:${RUST_VERSION}-slim-bullseye as build
# cache mounts below may already exist and owned by root
USER root
RUN apt update \
&& apt install --yes build-essential pkg-config libssl-dev clang \
&& apt install --yes binutils build-essential pkg-config libssl-dev clang \
&& rm -rf /var/lib/{apt,dpkg,cache,log}
# Build influxdb_iox
COPY . /influxdb_iox
WORKDIR /influxdb_iox
ARG CARGO_INCREMENTAL=yes
ARG CARGO_PROFILE_RELEASE_CODEGEN_UNITS=1
ARG CARGO_PROFILE_RELEASE_LTO=thin
ARG FEATURES=aws,gcp,azure,jemalloc_replacing_malloc
ARG ROARING_ARCH="haswell"
ARG RUSTFLAGS=""
ENV CARGO_INCREMENTAL=$CARGO_INCREMENTAL \
CARGO_PROFILE_RELEASE_CODEGEN_UNITS=$CARGO_PROFILE_RELEASE_CODEGEN_UNITS \
CARGO_PROFILE_RELEASE_LTO=$CARGO_PROFILE_RELEASE_LTO \
FEATURES=$FEATURES \
ROARING_ARCH=$ROARING_ARCH \
RUSTFLAGS=$RUSTFLAGS
RUN \
--mount=type=cache,id=influxdb_iox_registry,sharing=locked,target=/usr/local/cargo/registry \
--mount=type=cache,id=influxdb_iox_git,sharing=locked,target=/usr/local/cargo/git \
--mount=type=cache,id=influxdb_iox_target,sharing=locked,target=/influxdb_iox/target \
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target && \
cargo build --target-dir /influxdb_iox/target --release --no-default-features --features=aws,gcp,azure && \
cp /influxdb_iox/target/release/influxdb_iox /root/influxdb_iox && \
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target && \
cargo build --target-dir /influxdb_iox/target --release --no-default-features --features="$FEATURES" && \
objcopy --compress-debug-sections target/release/influxdb_iox && \
cp /influxdb_iox/target/release/influxdb_iox /root/influxdb_iox && \
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target
FROM debian:bullseye-slim
RUN apt update \
&& apt install --yes libssl1.1 ca-certificates --no-install-recommends \
&& rm -rf /var/lib/{apt,dpkg,cache,log}
&& apt install --yes ca-certificates gettext-base libssl1.1 --no-install-recommends \
&& rm -rf /var/lib/{apt,dpkg,cache,log}
RUN groupadd --gid 1500 iox \
&& useradd --uid 1500 --gid iox --shell /bin/bash --create-home iox
&& useradd --uid 1500 --gid iox --shell /bin/bash --create-home iox
USER iox
@ -36,9 +53,12 @@ RUN mkdir ~/.influxdb_iox
RUN ls -la ~/.influxdb_iox
COPY --from=build /root/influxdb_iox /usr/bin/influxdb_iox
COPY docker/entrypoint.sh /usr/bin/entrypoint.sh
ENV INFLUXDB_IOX_SERVER_MODE=database
EXPOSE 8080 8082
ENTRYPOINT ["/usr/bin/influxdb_iox"]
ENTRYPOINT ["/usr/bin/entrypoint.sh"]
CMD ["run", "database"]
CMD ["run", "$INFLUXDB_IOX_SERVER_MODE"]

View File

@ -2,3 +2,4 @@
target/
tests/
docker/
!docker/entrypoint.sh

View File

@ -6,10 +6,10 @@ edition = "2021"
description = "Apache Arrow utilities"
[dependencies]
ahash = "0.7.5"
ahash = { version = "0.7.5", default-features = false }
arrow = { version = "6.0", features = ["prettyprint"] }
# used by arrow anyway (needed for printing workaround)
chrono = "0.4"
chrono = { version = "0.4", default-features = false }
comfy-table = { version = "5.0", default-features = false }
hashbrown = "0.11"
num-traits = "0.2"

View File

@ -14,4 +14,4 @@ tower = "0.4"
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
tokio = { version = "1.13", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread"] }

View File

@ -12,7 +12,7 @@ num_cpus = "1.13.0"
observability_deps = { path = "../observability_deps" }
ordered-float = "2"
percent-encoding = "2.1.0"
regex = "1.4"
regex = "1"
siphasher = "0.3"
snafu = "0.6"
time = { path = "../time" }

View File

@ -8,6 +8,6 @@ description = "Datafusion utilities"
[dependencies]
datafusion = { path = "../datafusion" }
futures = "0.3"
tokio = { version = "1.13", features = ["macros"] }
tokio-stream = "0.1.8"
tokio = { version = "1.13", features = ["parking_lot", "sync"] }
tokio-stream = "0.1"
workspace-hack = { path = "../workspace-hack"}

View File

@ -17,4 +17,4 @@ workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
mutable_batch_lp = { path = "../mutable_batch_lp" }
regex = "1.4"
regex = "1"

View File

@ -1,27 +0,0 @@
###
# Dockerfile used for deploying IOx
##
FROM debian:bullseye-slim
RUN apt-get update \
&& apt-get install -y libssl1.1 libgcc1 libc6 ca-certificates gettext-base --no-install-recommends \
&& rm -rf /var/lib/{apt,dpkg,cache,log}
RUN groupadd -g 1500 rust \
&& useradd -u 1500 -g rust -s /bin/bash -m rust
USER rust
RUN mkdir ~/.influxdb_iox
RUN ls -la ~/.influxdb_iox
ENV INFLUXDB_IOX_SERVER_MODE=database
COPY docker/entrypoint.sh /usr/bin/entrypoint.sh
COPY target/release/influxdb_iox /usr/bin/influxdb_iox
EXPOSE 8080 8082
ENTRYPOINT ["/usr/bin/entrypoint.sh"]
CMD ["run", "$INFLUXDB_IOX_SERVER_MODE"]

View File

@ -1,5 +0,0 @@
# Ignore everything
**
# Except
!docker/entrypoint.sh
!target/release/influxdb_iox

View File

@ -11,7 +11,7 @@ observability_deps = { path = "../observability_deps" }
pbjson = "0.1"
pbjson-types = "0.1"
prost = "0.8"
regex = "1.4"
regex = "1"
serde = { version = "1.0", features = ["derive"] }
tonic = "0.5"
time = { path = "../time" }

View File

@ -13,8 +13,8 @@ paste = "1.0.6"
prost = "0.8"
prost-types = "0.8"
thiserror = "1.0.30"
tokio = { version = "1.13", features = ["macros", "rt-multi-thread", "parking_lot", "signal"] }
tokio-stream = { version = "0.1.8", features = ["net"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.6.9" }
tonic = "0.5"
tonic-reflection = "0.2.0"

View File

@ -13,11 +13,10 @@ serde_json = "1.0.72"
snafu = "0.6.6"
url = "2.1.1"
uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] # In alphabetical order
mockito = "0.30"
once_cell = { version = "1.4.0", features = ["parking_lot"] }
parking_lot = "0.11.2"
tokio = { version = "1.13", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
test_helpers = { path = "../test_helpers" }

View File

@ -7,11 +7,10 @@ default-run = "influxdb_iox"
[dependencies]
# Workspace dependencies, in alphabetical order
datafusion = { path = "../datafusion" }
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
internal_types = { path = "../internal_types" }
@ -22,7 +21,6 @@ metric_exporters = { path = "../metric_exporters" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
mutable_buffer = { path = "../mutable_buffer" }
num_cpus = "1.13.0"
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
panic_logging = { path = "../panic_logging" }
@ -33,12 +31,12 @@ query = { path = "../query" }
read_buffer = { path = "../read_buffer" }
router = { path = "../router" }
server = { path = "../server" }
time = { path = "../time" }
trace = { path = "../trace" }
trace_exporters = { path = "../trace_exporters" }
trace_http = { path = "../trace_http" }
tracker = { path = "../tracker" }
trogging = { path = "../trogging", default-features = false, features = ["structopt"] }
time = { path = "../time" }
# Crates.io dependencies, in alphabetical order
arrow = { version = "6.0", features = ["prettyprint"] }
@ -47,8 +45,10 @@ async-trait = "0.1"
backtrace = "0.3"
byteorder = "1.3.4"
bytes = "1.0"
chrono = "0.4"
clap = "2.33.1"
chrono = { version = "0.4", default-features = false }
clap = "2.34.0"
# used by arrow/datafusion anyway
comfy-table = { version = "5.0", default-features = false }
csv = "1.1"
dotenv = "0.15.0"
flate2 = "1.0"
@ -57,15 +57,14 @@ hashbrown = "0.11"
http = "0.2.0"
humantime = "2.1.0"
hyper = "0.14"
itertools = "0.10.1"
libc = { version = "0.2" }
log = "0.4"
num_cpus = "1.13.0"
once_cell = { version = "1.4.0", features = ["parking_lot"] }
parking_lot = "0.11.2"
itertools = "0.10.1"
parquet = "6.0"
pin-project = "1.0"
# used by arrow/datafusion anyway
comfy-table = { version = "5.0", default-features = false }
pprof = { version = "^0.5", default-features = false, features = ["flamegraph", "protobuf"], optional = true }
prost = "0.8"
rustyline = { version = "9.0", default-features = false }
@ -76,8 +75,8 @@ snafu = "0.6.9"
structopt = "0.3.25"
thiserror = "1.0.30"
tikv-jemalloc-ctl = { version = "0.4.0" }
tokio = { version = "1.13", features = ["macros", "rt-multi-thread", "parking_lot", "signal"] }
tokio-stream = { version = "0.1.8", features = ["net"] }
tokio = { version = "1.13", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.6.9" }
tonic = "0.5.0"
tonic-health = "0.4.0"
@ -90,15 +89,11 @@ tikv-jemalloc-sys = { version = "0.4.0", optional = true, features = ["unprefixe
heappy = { git = "https://github.com/mkmik/heappy", rev = "20aa466524ac9ce34a4bae29f27ec11869b50e21", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
# Workspace dependencies, in alphabetical order
arrow_util = { path = "../arrow_util" }
influxdb2_client = { path = "../influxdb2_client" }
influxdb_storage_client = { path = "../influxdb_storage_client" }
test_helpers = { path = "../test_helpers" }
parking_lot = "0.11.2"
regex = "1.4"
schema = { path = "../schema" }
write_buffer = { path = "../write_buffer" }
@ -107,7 +102,8 @@ assert_cmd = "2.0.2"
hex = "0.4.2"
predicates = "2.1.0"
rand = "0.8.3"
reqwest = "0.11"
regex = "1"
reqwest = { version = "0.11", features = ["json"] }
tempfile = "3.1.0"
[features]

View File

@ -1,26 +1,24 @@
use std::collections::HashMap;
use std::net::SocketAddrV4;
use std::num::NonZeroU32;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{
path::Path,
process::{Child, Command},
str,
sync::Weak,
time::Instant,
};
use assert_cmd::prelude::*;
use futures::prelude::*;
use generated_types::influxdata::iox::management::v1::{
database_status::DatabaseState, ServerStatus,
};
use http::header::HeaderName;
use http::HeaderValue;
use http::{header::HeaderName, HeaderValue};
use influxdb_iox_client::connection::Connection;
use once_cell::sync::OnceCell;
use std::{
collections::HashMap,
net::SocketAddrV4,
num::NonZeroU32,
path::Path,
process::{Child, Command},
str,
sync::{
atomic::{AtomicU16, Ordering},
Arc, Weak,
},
time::{Duration, Instant},
};
use tempfile::{NamedTempFile, TempDir};
use tokio::sync::Mutex;
use uuid::Uuid;
@ -81,8 +79,6 @@ impl Default for BindAddresses {
}
}
const TOKEN: &str = "InfluxDB IOx doesn't have authentication yet";
/// Represents a server that has been started and is available for
/// testing.
pub struct ServerFixture {
@ -213,12 +209,6 @@ impl ServerFixture {
influxdb_iox_client::deployment::Client::new(self.grpc_channel())
}
/// Return an a http client suitable suitable for communicating with this
/// server
pub fn influxdb2_client(&self) -> influxdb2_client::Client {
influxdb2_client::Client::new(self.http_base(), TOKEN)
}
/// Return a management client suitable for communicating with this
/// server
pub fn management_client(&self) -> influxdb_iox_client::management::Client {

View File

@ -0,0 +1,131 @@
//! Contains tests using the CLI and other tools to test scenarios for
//! moving data from one server/database to another
use std::path::{Path, PathBuf};
use assert_cmd::Command;
use predicates::prelude::*;
use tempfile::TempDir;
use uuid::Uuid;
use crate::{
common::server_fixture::{ServerFixture, ServerType},
end_to_end_cases::scenario::{data_dir, db_data_dir, Scenario},
};
/// Copy the `source_dir` directory into the `target_dir` directory using the `cp` command
fn cp_dir(source_dir: impl AsRef<Path>, target_dir: impl AsRef<Path>) {
let source_dir = source_dir.as_ref();
let target_dir = target_dir.as_ref();
// needed so that if the target server has had no databases
// created yet, it will have no `data` directory. See #3292
println!("Ensuring {:?} directory exists", target_dir);
Command::new("mkdir")
.arg("-p")
.arg(target_dir.to_string_lossy().to_string())
.assert()
.success();
println!("Copying data from {:?} to {:?}", source_dir, target_dir);
Command::new("cp")
.arg("-R")
.arg(source_dir.to_string_lossy().to_string())
.arg(target_dir.to_string_lossy().to_string())
.assert()
.success();
}
/// Creates a new database on a shared server, writes data to it,
/// shuts it down cleanly, and copies the files to Tempdir/uuid
///
/// Returns (db_name, uuid, tmp_dir)
async fn create_copied_database() -> (String, Uuid, TempDir) {
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
let addr = server_fixture.grpc_base();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("set")
.arg("3113")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Ok"));
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("wait-server-initialized")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Server initialized."));
let mut management_client = server_fixture.management_client();
let scenario = Scenario::new();
let (db_name, db_uuid) = scenario.create_database(&mut management_client).await;
// todo write data and force it to be written to disk
// figure out where the database lives and copy its data to a temporary directory,
// as you might copy data from remote object storage to local disk for debugging.
let source_dir = db_data_dir(server_fixture.dir(), db_uuid);
let tmp_dir = TempDir::new().expect("making tmp dir");
cp_dir(source_dir, tmp_dir.path());
// stop the first server (note this call blocks until the process stops)
std::mem::drop(server_fixture);
(db_name.to_string(), db_uuid, tmp_dir)
}
#[tokio::test]
async fn migrate_database_files_from_one_server_to_another() {
let (db_name, db_uuid, tmp_dir) = create_copied_database().await;
// Now start another server that can claim the database
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
let addr = server_fixture.grpc_base();
// copy the data from tmp_dir/<uuid> to the new server's location
let mut source_dir: PathBuf = tmp_dir.path().into();
source_dir.push(db_uuid.to_string());
let target_dir = data_dir(server_fixture.dir());
cp_dir(source_dir, &target_dir);
// Claiming without --force doesn't work as owner.pb still record the other server owning it
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("claim")
.arg(db_uuid.to_string())
.arg("--host")
.arg(addr)
.assert()
.failure()
.stderr(predicate::str::contains(
"is already owned by the server with ID 3113",
));
// however with --force the owner.pb file is updated forcibly
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("claim")
.arg(db_uuid.to_string())
.arg("--host")
.arg(addr)
.arg("--force") // sudo make me a sandwich
.assert()
.success()
.stdout(predicate::str::contains(format!(
"Claimed database {}",
db_name
)));
}

View File

@ -6,13 +6,13 @@ use arrow_util::assert_batches_sorted_eq;
pub async fn test() {
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
let influxdb2 = server_fixture.influxdb2_client();
let mut write_client = server_fixture.write_client();
let mut management_client = server_fixture.management_client();
let scenario = Scenario::new();
scenario.create_database(&mut management_client).await;
let expected_read_data = scenario.load_data(&influxdb2).await;
let expected_read_data = scenario.load_data(&mut write_client).await;
let sql_query = "select * from cpu_load_short";
let mut client = server_fixture.flight_client();

View File

@ -3,14 +3,14 @@ use crate::common::server_fixture::{ServerFixture, ServerType};
#[tokio::test]
async fn test_http_error_messages() {
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
let client = server_fixture.influxdb2_client();
let mut client = server_fixture.write_client();
// send malformed request (bucket id is invalid)
let result = client
.write_line_protocol("Bar", "Foo", "arbitrary")
.write_lp("Bar", "Foo", 0)
.await
.expect_err("Should have errored");
let expected_error = r#"HTTP request returned an error: 400 Bad Request, `{"code":"invalid","message":"Error parsing line protocol: error parsing line 1: A generic parsing error occurred: TakeWhile1"}`"#;
let expected_error = r#"An unexpected error occurred in the client library: error parsing line 1: A generic parsing error occurred: TakeWhile1"#;
assert_eq!(result.to_string(), expected_error);
}

View File

@ -751,127 +751,6 @@ async fn force_claim_database() {
)));
}
#[tokio::test]
async fn migrate_database_files_from_one_server_to_another() {
let server_fixture = ServerFixture::create_single_use(ServerType::Database).await;
let addr = server_fixture.grpc_base();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("set")
.arg("3113")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Ok"));
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("wait-server-initialized")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Server initialized."));
let db_name = rand_name();
let db = &db_name;
// Create a database on one server
let stdout = String::from_utf8(
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("create")
.arg(db)
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Created"))
.get_output()
.stdout
.clone(),
)
.unwrap();
let db_uuid = stdout.lines().last().unwrap().trim();
// figure out where the database lives and copy its data to a temporary directory,
// as you might copy data from remote object storage to local disk for debugging.
// Assume data layout is <dir>/dbs/<uuid>
let mut source_dir: PathBuf = server_fixture.dir().into();
source_dir.push("dbs");
source_dir.push(db_uuid);
let tmp_dir = TempDir::new().expect("making tmp dir");
let target_dir = tmp_dir.path();
println!("Copying data from {:?} to {:?}", source_dir, target_dir);
Command::new("cp")
.arg("-R")
.arg(source_dir.to_string_lossy().to_string())
.arg(target_dir.to_string_lossy().to_string())
.assert()
.success();
// stop the first server (note this call blocks until the process stops)
std::mem::drop(server_fixture);
// Now start another server that can claim the database
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
let addr = server_fixture.grpc_base();
// copy the data from tmp_dir/<uuid> to the new server's location
let mut source_dir: PathBuf = tmp_dir.path().into();
source_dir.push(db_uuid);
let mut target_dir: PathBuf = server_fixture.dir().into();
target_dir.push("dbs");
println!("Copying data from {:?} to {:?}", source_dir, target_dir);
Command::new("cp")
.arg("-R")
.arg(source_dir.to_string_lossy().to_string())
.arg(target_dir.to_string_lossy().to_string())
.assert()
.success();
// Claiming without --force doesn't work as owner.pb still record the other server owning it
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("claim")
.arg(db_uuid)
.arg("--host")
.arg(addr)
.assert()
.failure()
.stderr(predicate::str::contains(
"is already owned by the server with ID 3113",
));
// however with --force the owner.pb file is updated forcibly
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("claim")
.arg(db_uuid)
.arg("--host")
.arg(addr)
.arg("--force") // sudo make me a sandwich
.assert()
.success()
.stdout(predicate::str::contains(format!(
"Claimed database {}",
db_name
)));
}
#[tokio::test]
async fn test_get_chunks() {
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;

View File

@ -11,6 +11,7 @@ pub async fn test_row_timestamp() {
let server_fixture = ServerFixture::create_single_use_with_config(test_config).await;
let mut deployment_client = server_fixture.deployment_client();
let mut management_client = server_fixture.management_client();
let mut write_client = server_fixture.write_client();
deployment_client
.update_server_id(NonZeroU32::new(1).unwrap())
@ -20,7 +21,7 @@ pub async fn test_row_timestamp() {
let scenario = Scenario::new();
scenario.create_database(&mut management_client).await;
scenario.load_data(&server_fixture.influxdb2_client()).await;
scenario.load_data(&mut write_client).await;
let client = reqwest::Client::new();
let url = format!("{}/metrics", server_fixture.http_base());

View File

@ -1,3 +1,4 @@
mod database_migration;
mod debug_cli;
mod delete_api;
mod deletes;

View File

@ -5,12 +5,12 @@ use crate::common::server_fixture::{ServerFixture, ServerType};
pub async fn test() {
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
let mut management_client = server_fixture.management_client();
let influxdb2 = server_fixture.influxdb2_client();
let mut write_client = server_fixture.write_client();
let scenario = Scenario::new();
scenario.create_database(&mut management_client).await;
let mut expected_read_data = scenario.load_data(&influxdb2).await;
let mut expected_read_data = scenario.load_data(&mut write_client).await;
let sql_query = "select * from cpu_load_short";
let client = reqwest::Client::new();

View File

@ -1,38 +1,50 @@
use std::collections::HashMap;
use std::num::NonZeroU32;
use std::path::Path;
use std::time::Duration;
use std::{convert::TryInto, str, u32};
use std::{sync::Arc, time::SystemTime};
use crate::common::server_fixture::{ServerFixture, ServerType, TestConfig, DEFAULT_SERVER_ID};
use arrow::{
array::{ArrayRef, Float64Array, StringArray, TimestampNanosecondArray},
record_batch::RecordBatch,
};
use data_types::chunk_metadata::{ChunkStorage, ChunkSummary};
use futures::prelude::*;
use influxdb_iox_client::management::generated_types::partition_template;
use influxdb_iox_client::management::generated_types::WriteBufferConnection;
use data_types::{
chunk_metadata::{ChunkStorage, ChunkSummary},
names::org_and_bucket_to_database,
DatabaseName,
};
use generated_types::{
google::protobuf::Empty,
influxdata::iox::{management::v1::*, write_buffer::v1::WriteBufferCreationConfig},
ReadSource, TimestampRange,
};
use influxdb_iox_client::{
connection::Connection,
flight::PerformQuery,
management::{
self,
generated_types::{partition_template, WriteBufferConnection},
},
};
use prost::Message;
use rand::{
distributions::{Alphanumeric, Standard},
thread_rng, Rng,
};
use std::{
collections::HashMap,
convert::TryInto,
num::NonZeroU32,
path::{Path, PathBuf},
str,
sync::Arc,
time::Duration,
time::SystemTime,
u32,
};
use tempfile::TempDir;
use test_helpers::assert_contains;
use data_types::{names::org_and_bucket_to_database, DatabaseName};
use generated_types::google::protobuf::Empty;
use generated_types::{
influxdata::iox::{management::v1::*, write_buffer::v1::WriteBufferCreationConfig},
ReadSource, TimestampRange,
};
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
use time::SystemProvider;
use write_buffer::core::{WriteBufferReading, WriteBufferWriting};
use write_buffer::file::{FileBufferConsumer, FileBufferProducer};
use crate::common::server_fixture::{ServerFixture, ServerType, TestConfig, DEFAULT_SERVER_ID};
use uuid::Uuid;
use write_buffer::{
core::{WriteBufferReading, WriteBufferWriting},
file::{FileBufferConsumer, FileBufferProducer},
};
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
@ -115,82 +127,62 @@ impl Scenario {
})
}
/// Creates the database on the server for this scenario
pub async fn create_database(&self, client: &mut influxdb_iox_client::management::Client) {
client
/// Creates the database on the server for this scenario,
/// returning (name, uuid)
pub async fn create_database(
&self,
client: &mut management::Client,
) -> (DatabaseName<'_>, Uuid) {
let db_name = self.database_name();
let db_uuid = client
.create_database(DatabaseRules {
name: self.database_name().to_string(),
name: db_name.to_string(),
lifecycle_rules: Some(Default::default()),
..Default::default()
})
.await
.unwrap();
(db_name, db_uuid)
}
pub async fn load_data(&self, influxdb2: &influxdb2_client::Client) -> Vec<String> {
pub async fn load_data(&self, client: &mut influxdb_iox_client::write::Client) -> Vec<String> {
// TODO: make a more extensible way to manage data for tests, such as in
// external fixture files or with factories.
let points = vec![
influxdb2_client::models::DataPoint::builder("cpu_load_short")
.tag("host", "server01")
.tag("region", "us-west")
.field("value", 0.64)
.timestamp(self.ns_since_epoch())
.build()
.unwrap(),
influxdb2_client::models::DataPoint::builder("cpu_load_short")
.tag("host", "server01")
.field("value", 27.99)
.timestamp(self.ns_since_epoch() + 1)
.build()
.unwrap(),
influxdb2_client::models::DataPoint::builder("cpu_load_short")
.tag("host", "server02")
.tag("region", "us-west")
.field("value", 3.89)
.timestamp(self.ns_since_epoch() + 2)
.build()
.unwrap(),
influxdb2_client::models::DataPoint::builder("cpu_load_short")
.tag("host", "server01")
.tag("region", "us-east")
.field("value", 1234567.891011)
.timestamp(self.ns_since_epoch() + 3)
.build()
.unwrap(),
influxdb2_client::models::DataPoint::builder("cpu_load_short")
.tag("host", "server01")
.tag("region", "us-west")
.field("value", 0.000003)
.timestamp(self.ns_since_epoch() + 4)
.build()
.unwrap(),
influxdb2_client::models::DataPoint::builder("system")
.tag("host", "server03")
.field("uptime", 1303385)
.timestamp(self.ns_since_epoch() + 5)
.build()
.unwrap(),
influxdb2_client::models::DataPoint::builder("swap")
.tag("host", "server01")
.tag("name", "disk0")
.field("in", 3)
.field("out", 4)
.timestamp(self.ns_since_epoch() + 6)
.build()
.unwrap(),
influxdb2_client::models::DataPoint::builder("status")
.field("active", true)
.timestamp(self.ns_since_epoch() + 7)
.build()
.unwrap(),
influxdb2_client::models::DataPoint::builder("attributes")
.field("color", "blue")
.timestamp(self.ns_since_epoch() + 8)
.build()
.unwrap(),
format!(
"cpu_load_short,host=server01,region=us-west value=0.64 {}",
self.ns_since_epoch()
),
format!(
"cpu_load_short,host=server01 value=27.99 {}",
self.ns_since_epoch() + 1
),
format!(
"cpu_load_short,host=server02,region=us-west value=3.89 {}",
self.ns_since_epoch() + 2
),
format!(
"cpu_load_short,host=server01,region=us-east value=1234567.891011 {}",
self.ns_since_epoch() + 3
),
format!(
"cpu_load_short,host=server01,region=us-west value=0.000003 {}",
self.ns_since_epoch() + 4
),
format!(
"system,host=server03 uptime=1303385i {}",
self.ns_since_epoch() + 5
),
format!(
"swap,host=server01,name=disk0 in=3i,out=4i {}",
self.ns_since_epoch() + 6
),
format!("status active=true {}", self.ns_since_epoch() + 7),
format!("attributes color=\"blue\" {}", self.ns_since_epoch() + 8),
];
self.write_data(influxdb2, points).await.unwrap();
self.write_data(client, points.join("\n")).await.unwrap();
let host_array = StringArray::from(vec![
Some("server01"),
@ -234,17 +226,13 @@ impl Scenario {
.collect()
}
async fn write_data(
pub async fn write_data(
&self,
client: &influxdb2_client::Client,
points: Vec<influxdb2_client::models::DataPoint>,
client: &mut influxdb_iox_client::write::Client,
lp_data: impl AsRef<str> + Send,
) -> Result<()> {
client
.write(
self.org_id_str(),
self.bucket_id_str(),
stream::iter(points),
)
.write_lp(&*self.database_name(), lp_data, self.ns_since_epoch())
.await?;
Ok(())
}
@ -300,6 +288,24 @@ pub fn rand_id() -> String {
.collect()
}
/// Return the path that the database stores data for all databases:
/// `<server_path>/dbs`
pub fn data_dir(server_path: impl AsRef<Path>) -> PathBuf {
// Assume data layout is <dir>/dbs/<uuid>
let mut data_dir: PathBuf = server_path.as_ref().into();
data_dir.push("dbs");
data_dir
}
/// Return the path that the database with <uuid> stores its data:
/// `<server_path>/dbs/<uuid>`
pub fn db_data_dir(server_path: impl AsRef<Path>, db_uuid: Uuid) -> PathBuf {
// Assume data layout is <dir>/dbs/<uuid>
let mut data_dir = data_dir(server_path);
data_dir.push(db_uuid.to_string());
data_dir
}
pub struct DatabaseBuilder {
name: String,
partition_template: PartitionTemplate,
@ -376,7 +382,7 @@ impl DatabaseBuilder {
self,
channel: Connection,
) -> Result<(), influxdb_iox_client::error::Error> {
let mut management_client = influxdb_iox_client::management::Client::new(channel);
let mut management_client = management::Client::new(channel);
management_client
.create_database(DatabaseRules {
@ -408,7 +414,7 @@ pub async fn create_readable_database(db_name: impl Into<String>, channel: Conne
/// given a channel to talk with the management api, create a new
/// database with no mutable buffer configured, no partitioning rules
pub async fn create_unreadable_database(db_name: impl Into<String>, channel: Connection) {
let mut management_client = influxdb_iox_client::management::Client::new(channel);
let mut management_client = management::Client::new(channel);
let rules = DatabaseRules {
name: db_name.into(),

View File

@ -24,13 +24,13 @@ use std::str;
pub async fn test() {
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
let influxdb2 = server_fixture.influxdb2_client();
let mut write_client = server_fixture.write_client();
let mut storage_client = StorageClient::new(server_fixture.grpc_channel());
let mut management_client = server_fixture.management_client();
let scenario = Scenario::new();
scenario.create_database(&mut management_client).await;
scenario.load_data(&influxdb2).await;
scenario.load_data(&mut write_client).await;
read_filter_endpoint(&mut storage_client, &scenario).await;
tag_keys_endpoint(&mut storage_client, &scenario).await;
@ -321,12 +321,12 @@ pub async fn regex_operator_test() {
let fixture = ServerFixture::create_shared(ServerType::Database).await;
let mut management = fixture.management_client();
let mut storage_client = StorageClient::new(fixture.grpc_channel());
let influxdb2 = fixture.influxdb2_client();
let mut write = fixture.write_client();
let scenario = Scenario::new();
scenario.create_database(&mut management).await;
load_read_group_data(&influxdb2, &scenario).await;
load_read_group_data(&mut write, &scenario).await;
let read_source = scenario.read_source();
@ -362,16 +362,19 @@ pub async fn regex_operator_test() {
async fn read_group_setup() -> (ServerFixture, Scenario) {
let fixture = ServerFixture::create_shared(ServerType::Database).await;
let mut management = fixture.management_client();
let influxdb2 = fixture.influxdb2_client();
let mut write = fixture.write_client();
let scenario = Scenario::new();
scenario.create_database(&mut management).await;
load_read_group_data(&influxdb2, &scenario).await;
load_read_group_data(&mut write, &scenario).await;
(fixture, scenario)
}
async fn load_read_group_data(client: &influxdb2_client::Client, scenario: &Scenario) {
async fn load_read_group_data(
client: &mut influxdb_iox_client::write::Client,
scenario: &Scenario,
) {
let line_protocol = vec![
"cpu,cpu=cpu1,host=foo usage_user=71.0,usage_system=10.0 1000",
"cpu,cpu=cpu1,host=foo usage_user=72.0,usage_system=11.0 2000",
@ -384,12 +387,8 @@ async fn load_read_group_data(client: &influxdb2_client::Client, scenario: &Scen
]
.join("\n");
client
.write_line_protocol(
scenario.org_id_str(),
scenario.bucket_id_str(),
line_protocol,
)
scenario
.write_data(client, line_protocol)
.await
.expect("Wrote cpu line protocol data");
}
@ -652,7 +651,7 @@ pub async fn read_window_aggregate_test() {
let fixture = ServerFixture::create_shared(ServerType::Database).await;
let mut management = fixture.management_client();
let mut storage_client = StorageClient::new(fixture.grpc_channel());
let influxdb2 = fixture.influxdb2_client();
let mut write = fixture.write_client();
let scenario = Scenario::new();
let read_source = scenario.read_source();
@ -678,12 +677,8 @@ pub async fn read_window_aggregate_test() {
]
.join("\n");
influxdb2
.write_line_protocol(
scenario.org_id_str(),
scenario.bucket_id_str(),
line_protocol,
)
write
.write_lp(&*scenario.database_name(), line_protocol, 0)
.await
.expect("Wrote h20 line protocol");

View File

@ -1,5 +1,3 @@
use std::num::NonZeroU32;
use super::scenario::{collect_query, Scenario};
use crate::common::{
server_fixture::{ServerFixture, ServerType, TestConfig},
@ -7,6 +5,7 @@ use crate::common::{
};
use futures::TryStreamExt;
use generated_types::{storage_client::StorageClient, ReadFilterRequest};
use std::num::NonZeroU32;
async fn setup() -> (UdpCapture, ServerFixture) {
let udp_capture = UdpCapture::new().await;
@ -39,7 +38,7 @@ async fn run_sql_query(server_fixture: &ServerFixture) {
scenario
.create_database(&mut server_fixture.management_client())
.await;
scenario.load_data(&server_fixture.influxdb2_client()).await;
scenario.load_data(&mut server_fixture.write_client()).await;
// run a query, ensure we get traces
let sql_query = "select * from cpu_load_short";
@ -80,7 +79,7 @@ pub async fn test_tracing_storage_api() {
scenario
.create_database(&mut server_fixture.management_client())
.await;
scenario.load_data(&server_fixture.influxdb2_client()).await;
scenario.load_data(&mut server_fixture.write_client()).await;
// run a query via gRPC, ensure we get traces
let read_source = scenario.read_source();

View File

@ -19,7 +19,7 @@ generated_types = { path = "../generated_types" }
arrow = { version = "6.0", optional = true }
arrow-flight = { version = "6.0", optional = true }
bytes = "1.0"
futures-util = { version = "0.3.1", optional = true }
futures-util = { version = "0.3", optional = true }
dml = { path = "../dml", optional = true }
mutable_batch = { path = "../mutable_batch", optional = true }
mutable_batch_lp = { path = "../mutable_batch_lp", optional = true }
@ -34,4 +34,4 @@ uuid = { version = "0.8", features = ["v4"] }
[dev-dependencies] # In alphabetical order
serde_json = "1.0"
tokio = { version = "1.13", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread"] }

View File

@ -5,8 +5,8 @@ authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2021"
[dependencies] # In alphabetical order
nom = "7"
smallvec = "1.7.0"
nom = { version = "7", default-features = false, features = ["std"] }
smallvec = { version = "1.7.0", features = ["union"] }
snafu = "0.6.2"
observability_deps = { path = "../observability_deps" }
workspace-hack = { path = "../workspace-hack"}

View File

@ -9,7 +9,7 @@ client_util = { path = "../client_util" }
generated_types = { path = "../generated_types" }
prost = "0.8"
tonic = { version = "0.5.0" }
futures-util = { version = "0.3.1" }
futures-util = { version = "0.3" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]

View File

@ -9,9 +9,9 @@ readme = "README.md"
[dependencies]
parking_lot = "0.11"
time = { path = "../time" }
tokio = { version = "1.13", features = ["sync"] }
tokio = { version = "1.13", features = ["parking_lot", "sync"] }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
futures = "0.3"
tokio = { version = "1.13", features = ["macros", "rt", "rt-multi-thread", "sync", "time"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt", "rt-multi-thread", "sync", "time"] }

View File

@ -6,14 +6,13 @@ edition = "2021"
default-run = "iox_data_generator"
[dependencies]
chrono = "0.4.13"
chrono = { version = "0.4", default-features = false }
chrono-english = "0.1.4"
clap = "2.33.1"
futures = "0.3.5"
handlebars = "4.1.5"
clap = "2.34.0"
futures = "0.3"
handlebars = "4.1.6"
humantime = "2.1.0"
data_types = { path = "../data_types" }
generated_types = { path = "../generated_types" }
influxdb2_client = { path = "../influxdb2_client" }
influxdb_iox_client = { path = "../influxdb_iox_client" }
itertools = "0.10.0"
@ -21,10 +20,10 @@ rand = { version = "0.8.3", features = ["small_rng"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.72"
snafu = "0.6.8"
tokio = { version = "1.13", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
toml = "0.5.6"
tracing = "0.1"
tracing-subscriber = "0.3.2"
tracing-subscriber = "0.3"
uuid = { version = "0.8.1", default_features = false }
[dev-dependencies]

View File

@ -12,6 +12,7 @@ pub fn single_agent(c: &mut Criterion) {
values: vec![],
tag_sets: vec![],
agents: vec![AgentSpec {
name: "foo".to_string(),
count: None,
sampling_interval: Some("1s".to_string()),
measurements: vec![MeasurementSpec {
@ -123,6 +124,7 @@ for_each = [
]
[[agents]]
name = "foo"
# create this many agents
count = 3
@ -170,7 +172,7 @@ i64_range = [1, 8147240]
group.bench_function("single agent with basic configuration", |b| {
b.iter(|| {
agent.reset_current_date_time(0);
let points_writer = points_writer.build_for_agent(1).unwrap();
let points_writer = points_writer.build_for_agent("foo").unwrap();
let r = block_on(agent.generate_all(points_writer, 1));
let n_points = r.expect("Could not generate data");
assert_eq!(n_points, expected_points as usize);

View File

@ -3,6 +3,7 @@
name = "cap_write"
[[agents]]
name = "telegraf"
count = 3
sampling_interval = "10s"
tag_pairs = [

View File

@ -104,6 +104,7 @@ for_each = [
]
[[agents]]
name = "first_agent"
# Create this many agents. Agents are single threaded so the way to get more paralellism is to
# increase the number of agents.
count = 2
@ -160,7 +161,8 @@ name = "uptime_format"
uptime = "telegraf"
[[agents]]
tag_pairs = [{key = "agent-name", template = "another_example"}]
name = "another_example"
tag_pairs = [{key = "agent_name", template = "agent.name"}]
[[agents.measurements]]
name = "new_agent_measurement-{{measurement.id}}"

View File

@ -59,6 +59,7 @@ for_each = [
]
[[agents]]
name = "sender"
# create this many agents
count = 1
sampling_interval = "10s"

View File

@ -16,6 +16,7 @@ name = "host_services"
for_each = ["host", "host.service"]
[[agents]]
name = "tracing_agent"
count = 1
sampling_interval = "1s"

View File

@ -55,6 +55,8 @@ pub enum Error {
pub struct Agent {
/// identifier for the agent. This can be used in generated tags and fields
pub id: usize,
/// name for the agent. This can be used in generated tags and fields
pub name: String,
measurement_generators: Vec<MeasurementGenerator>,
sampling_interval: Option<Duration>,
/// nanoseconds since the epoch, used as the timestamp for the next
@ -88,7 +90,7 @@ impl Agent {
let agents: Vec<_> = (1..agent_count + 1)
.into_iter()
.map(|agent_id| {
let data = json!({"agent": {"id": agent_id}});
let data = json!({"agent": {"id": agent_id, "name": agent_spec.name}});
let agent_tag_pairs = TagPair::pairs_from_specs(&agent_spec.tag_pairs, data)
.context(CouldNotCreateAgentTagPairs)?;
@ -120,6 +122,7 @@ impl Agent {
Ok(Self {
id: agent_id,
name: agent_spec.name.to_string(),
measurement_generators,
sampling_interval,
current_datetime,
@ -289,6 +292,7 @@ mod test {
Self {
id: 0,
name: "foo".to_string(),
finished: false,
interval: None,

View File

@ -137,7 +137,7 @@ pub async fn generate(
for mut agent in agents {
let agent_points_writer = points_writer_builder
.build_for_agent(agent.id)
.build_for_agent(&agent.name)
.context(CouldNotCreateAgentWriter)?;
let lock_ref = Arc::clone(&lock);
@ -200,6 +200,7 @@ mod test {
name = "demo_schema"
[[agents]]
name = "foo"
sampling_interval = "10s" # seconds
[[agents.measurements]]

View File

@ -536,6 +536,7 @@ mod test {
for_each = ["foo"]
[[agents]]
name = "foo"
[[agents.measurements]]
name = "m1"
@ -594,6 +595,7 @@ mod test {
for_each = ["foo"]
[[agents]]
name = "foo"
[[agents.measurements]]
name = "m1"

View File

@ -116,6 +116,8 @@ pub struct TagSetsSpec {
#[cfg_attr(test, derive(Default))]
#[serde(deny_unknown_fields)]
pub struct AgentSpec {
/// The name of the agent, which can be referenced in templates with `agent.name`.
pub name: String,
/// Specifies the number of agents that should be created with this spec.
/// Default value is 1.
pub count: Option<usize>,
@ -483,6 +485,7 @@ mod test {
name = "demo_schema"
[[agents]]
name = "foo"
[[agents.measurements]]
name = "cpu"

View File

@ -484,6 +484,7 @@ name = "testage"
for_each = ["foo"]
[[agents]]
name = "foo"
[[agents.measurements]]
name = "cpu"
@ -528,6 +529,7 @@ for_each = [
]
[[agents]]
name = "foo"
[[agents.measurements]]
name = "cpu"
@ -593,6 +595,7 @@ for_each = [
]
[[agents]]
name = "foo"
[[agents.measurements]]
name = "cpu"

View File

@ -104,7 +104,7 @@ enum PointsWriterConfig {
perform_write: bool,
},
#[cfg(test)]
Vector(BTreeMap<usize, Arc<Mutex<Vec<u8>>>>),
Vector(BTreeMap<String, Arc<Mutex<Vec<u8>>>>),
Stdout,
}
@ -172,7 +172,7 @@ impl PointsWriterBuilder {
/// Create a writer out of this writer's configuration for a particular
/// agent that runs in a separate thread/task.
pub fn build_for_agent(&mut self, id: usize) -> Result<PointsWriter> {
pub fn build_for_agent(&mut self, name: impl Into<String>) -> Result<PointsWriter> {
let inner_writer = match &mut self.config {
PointsWriterConfig::Api {
client,
@ -185,7 +185,7 @@ impl PointsWriterBuilder {
},
PointsWriterConfig::Directory(dir_path) => {
let mut filename = dir_path.clone();
filename.push(format!("agent_{}", id));
filename.push(name.into());
filename.set_extension("txt");
let file = OpenOptions::new()
@ -204,7 +204,7 @@ impl PointsWriterBuilder {
#[cfg(test)]
PointsWriterConfig::Vector(ref mut agents_by_name) => {
let v = agents_by_name
.entry(id)
.entry(name.into())
.or_insert_with(|| Arc::new(Mutex::new(Vec::new())));
InnerPointsWriter::Vec(Arc::clone(v))
}
@ -321,11 +321,11 @@ mod test {
}
}
fn written_data(self, agent_id: usize) -> String {
fn written_data(self, agent_name: &str) -> String {
match self.config {
PointsWriterConfig::Vector(agents_by_name) => {
let bytes_ref =
Arc::clone(agents_by_name.get(&agent_id).expect(
Arc::clone(agents_by_name.get(agent_name).expect(
"Should have written some data, did not find any for this agent",
));
let bytes = bytes_ref
@ -344,6 +344,7 @@ mod test {
name = "demo_schema"
[[agents]]
name = "foo"
[[agents.measurements]]
name = "cpu"
@ -369,7 +370,7 @@ i64_range = [3,3]"#;
)
.await?;
let line_protocol = points_writer_builder.written_data(1);
let line_protocol = points_writer_builder.written_data("foo");
let expected_line_protocol = format!(
r#"cpu val=3i {}
@ -387,6 +388,7 @@ i64_range = [3,3]"#;
name = "demo_schema"
[[agents]]
name = "foo"
sampling_interval = "1s" # seconds
[[agents.measurements]]
@ -413,7 +415,7 @@ i64_range = [2, 2]"#;
)
.await?;
let line_protocol = points_writer_builder.written_data(1);
let line_protocol = points_writer_builder.written_data("foo");
let expected_line_protocol = format!(
r#"cpu val=2i {}

View File

@ -11,9 +11,9 @@ futures = "0.3"
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
snafu = "0.6"
tokio = { version = "1.13", features = ["macros", "time"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "sync", "rt"] }
tokio-stream = "0.1"
uuid = { version = "0.8", features = ["serde", "v4"] }
uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] # In alphabetical order

View File

@ -14,10 +14,10 @@
clippy::clone_on_ref_ptr
)]
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use data_types::server_id::ServerId;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use object_store::{path::Path, ObjectStore, ObjectStoreApi, Result};
use object_store::{path::Path, GetResult, ObjectStore, ObjectStoreApi, Result};
use observability_deps::tracing::warn;
use snafu::{ensure, ResultExt, Snafu};
use std::sync::Arc;
@ -65,7 +65,7 @@ impl IoxObjectStore {
/// config is not found.
pub async fn get_server_config_file(inner: &ObjectStore, server_id: ServerId) -> Result<Bytes> {
let path = paths::server_config_path(inner, server_id);
let mut stream = match inner.get(&path).await {
let result = match inner.get(&path).await {
Err(object_store::Error::NotFound { .. }) => {
use object_store::path::ObjectStorePath;
let mut legacy_path = inner.new_path();
@ -76,13 +76,8 @@ impl IoxObjectStore {
}
other => other,
}?;
let mut bytes = BytesMut::new();
while let Some(buf) = stream.next().await {
bytes.extend(buf?);
}
Ok(bytes.freeze())
Ok(result.bytes().await?.into())
}
/// Store the data for the server config with the names and locations of the databases
@ -195,14 +190,7 @@ impl IoxObjectStore {
pub async fn get_owner_file(&self) -> Result<Bytes> {
let owner_path = self.root_path.owner_path();
let mut stream = self.inner.get(&owner_path).await?;
let mut bytes = BytesMut::new();
while let Some(buf) = stream.next().await {
bytes.extend(buf?);
}
Ok(bytes.freeze())
Ok(self.inner.get(&owner_path).await?.bytes().await?.into())
}
/// The location in object storage for all files for this database, suitable for logging or
@ -241,7 +229,7 @@ impl IoxObjectStore {
pub async fn get_catalog_transaction_file(
&self,
location: &TransactionFilePath,
) -> Result<BoxStream<'static, Result<Bytes>>> {
) -> Result<GetResult<object_store::Error>> {
let full_path = self.transactions_path.join(location);
self.inner.get(&full_path).await
@ -303,7 +291,7 @@ impl IoxObjectStore {
pub async fn get_parquet_file(
&self,
location: &ParquetFilePath,
) -> Result<BoxStream<'static, Result<Bytes>>> {
) -> Result<GetResult<object_store::Error>> {
let full_path = self.data_path.join(location);
self.inner.get(&full_path).await
@ -334,14 +322,8 @@ impl IoxObjectStore {
/// Get the data for the database rules
pub async fn get_database_rules_file(&self) -> Result<Bytes> {
let mut stream = self.inner.get(&self.db_rules_path()).await?;
let mut bytes = BytesMut::new();
while let Some(buf) = stream.next().await {
bytes.extend(buf?);
}
Ok(bytes.freeze())
let path = &self.db_rules_path();
Ok(self.inner.get(path).await?.bytes().await?.into())
}
/// Return the database rules file content without creating an IoxObjectStore instance. Useful
@ -352,14 +334,7 @@ impl IoxObjectStore {
let root_path = Self::root_path_for(&inner, uuid);
let db_rules_path = root_path.rules_path().inner;
let mut stream = inner.get(&db_rules_path).await?;
let mut bytes = BytesMut::new();
while let Some(buf) = stream.next().await {
bytes.extend(buf?);
}
Ok(bytes.freeze())
Ok(inner.get(&db_rules_path).await?.bytes().await?.into())
}
/// Store the data for the database rules
@ -626,9 +601,8 @@ mod tests {
.get(&rules_path)
.await
.unwrap()
.next()
.bytes()
.await
.unwrap()
.unwrap();
assert_eq!(original_file_content, actual_content);
@ -687,9 +661,8 @@ mod tests {
.get(&owner_path)
.await
.unwrap()
.next()
.bytes()
.await
.unwrap()
.unwrap();
assert_eq!(original_file_content, actual_content);

View File

@ -13,9 +13,9 @@ internal_types = { path = "../internal_types" }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11"
time = { path = "../time" }
tokio = { version = "1.13", features = ["macros", "time"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "time"] }
tracker = { path = "../tracker" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
tokio = { version = "1.13", features = ["macros", "time", "rt"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt", "time"] }

View File

@ -13,5 +13,5 @@ workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] # In alphabetical order
once_cell = { version = "1.4.0", features = ["parking_lot"] }
parking_lot = "0.11.2"
regex = "1.4.3"
regex = "1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -7,7 +7,7 @@ description = "A mutable arrow RecordBatch"
[dependencies]
arrow = { version = "6.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" }
chrono = "0.4"
chrono = { version = "0.4", default-features = false }
data_types = { path = "../data_types" }
schema = { path = "../schema" }
snafu = "0.6"

View File

@ -17,7 +17,6 @@ snafu = "0.6.2"
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] # In alphabetical order
tokio = { version = "1.13", features = ["macros"] }
arrow_util = { path = "../arrow_util" }
test_helpers = { path = "../test_helpers" }

View File

@ -11,11 +11,11 @@ async-trait = "0.1.42"
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "cb5aba657fe378009c38e979434b2bc93ac0a3d5", optional = true }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "cb5aba657fe378009c38e979434b2bc93ac0a3d5", optional = true, default-features = false, features = ["table", "blob", "queue"] }
bytes = "1.0"
chrono = "0.4"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
# Google Cloud Storage integration
cloud-storage = {version = "0.10.3", optional = true}
futures = "0.3"
indexmap = { version = "1.7", optional = true }
indexmap = { version = "1.7", optional = true, features = ["std"] }
itertools = "0.10.1"
observability_deps = { path = "../observability_deps" }
percent-encoding = "2.1"
@ -24,7 +24,7 @@ rusoto_core = { version = "0.47.0", optional = true}
rusoto_credential = { version = "0.47.0", optional = true}
rusoto_s3 = { version = "0.47.0", optional = true}
snafu = "0.6.10"
tokio = { version = "1.13", features = ["macros", "fs", "io-util", "rt-multi-thread", "time"] }
tokio = { version = "1.13", features = ["fs", "io-util", "macros", "parking_lot", "rt-multi-thread", "time"] }
# Filesystem integration
tokio-util = { version = "0.6.9", features = [ "codec", "io" ] }
reqwest = { version = "0.11", optional = true }
@ -41,4 +41,4 @@ aws = ["rusoto_core", "rusoto_credential", "rusoto_s3"]
[dev-dependencies] # In alphabetical order
dotenv = "0.15.0"
tempfile = "3.1.0"
futures-test = "0.3.12"
futures-test = "0.3"

View File

@ -2,7 +2,7 @@
//! store.
use crate::{
path::{cloud::CloudPath, DELIMITER},
ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath,
GetResult, ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath,
};
use async_trait::async_trait;
use bytes::Bytes;
@ -201,7 +201,7 @@ impl ObjectStoreApi for AmazonS3 {
Ok(())
}
async fn get(&self, location: &Self::Path) -> Result<BoxStream<'static, Result<Bytes>>> {
async fn get(&self, location: &Self::Path) -> Result<GetResult<Error>> {
let key = location.to_raw();
let get_request = rusoto_s3::GetObjectRequest {
bucket: self.bucket_name.clone(),
@ -209,7 +209,7 @@ impl ObjectStoreApi for AmazonS3 {
..Default::default()
};
let bucket_name = self.bucket_name.clone();
Ok(self
let s = self
.client
.get_object(get_request)
.await
@ -237,7 +237,9 @@ impl ObjectStoreApi for AmazonS3 {
location: key.clone(),
})
.err_into()
.boxed())
.boxed();
Ok(GetResult::Stream(s))
}
async fn delete(&self, location: &Self::Path) -> Result<()> {

View File

@ -2,7 +2,7 @@
//! the object store.
use crate::{
path::{cloud::CloudPath, DELIMITER},
ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath,
GetResult, ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath,
};
use async_trait::async_trait;
use azure_core::prelude::*;
@ -88,10 +88,10 @@ impl ObjectStoreApi for MicrosoftAzure {
Ok(())
}
async fn get(&self, location: &Self::Path) -> Result<BoxStream<'static, Result<Bytes>>> {
async fn get(&self, location: &Self::Path) -> Result<GetResult<Error>> {
let container_client = Arc::clone(&self.container_client);
let location = location.to_raw();
Ok(async move {
let s = async move {
container_client
.as_blob_client(&location)
.get()
@ -103,7 +103,9 @@ impl ObjectStoreApi for MicrosoftAzure {
})
}
.into_stream()
.boxed())
.boxed();
Ok(GetResult::Stream(s))
}
async fn delete(&self, location: &Self::Path) -> Result<()> {

View File

@ -2,18 +2,17 @@
//! object store.
use crate::cache::Cache;
use crate::path::Path;
use crate::{path::file::FilePath, ListResult, ObjectMeta, ObjectStore, ObjectStoreApi};
use crate::{path::file::FilePath, GetResult, ListResult, ObjectMeta, ObjectStore, ObjectStoreApi};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{
stream::{self, BoxStream},
StreamExt, TryStreamExt,
StreamExt,
};
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::Arc;
use std::{collections::BTreeSet, convert::TryFrom, io, path::PathBuf};
use tokio::fs;
use tokio_util::codec::{BytesCodec, FramedRead};
use walkdir::WalkDir;
/// A specialized `Result` for filesystem object store-related errors
@ -131,7 +130,7 @@ impl ObjectStoreApi for File {
Ok(())
}
async fn get(&self, location: &Self::Path) -> Result<BoxStream<'static, Result<Bytes>>> {
async fn get(&self, location: &Self::Path) -> Result<GetResult<Error>> {
let path = self.path(location);
let file = fs::File::open(&path).await.map_err(|e| {
@ -148,13 +147,7 @@ impl ObjectStoreApi for File {
}
})?;
let s = FramedRead::new(file, BytesCodec::new())
.map_ok(|b| b.freeze())
.map_err(move |source| Error::UnableToReadBytes {
source,
path: path.clone(),
});
Ok(s.boxed())
Ok(GetResult::File(file, path))
}
async fn delete(&self, location: &Self::Path) -> Result<()> {
@ -365,8 +358,7 @@ mod tests {
.get(&location)
.await
.unwrap()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
@ -389,8 +381,7 @@ mod tests {
.get(&location)
.await
.unwrap()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);

View File

@ -4,7 +4,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use snafu::Snafu;
use crate::{path::cloud::CloudPath, ObjectStoreApi};
use crate::{path::cloud::CloudPath, GetResult, ObjectStoreApi};
/// A specialized `Error` for Azure object store-related errors
#[derive(Debug, Snafu, Clone)]
@ -54,10 +54,7 @@ impl ObjectStoreApi for DummyObjectStore {
async fn get(
&self,
_location: &Self::Path,
) -> crate::Result<
futures::stream::BoxStream<'static, crate::Result<bytes::Bytes, Self::Error>>,
Self::Error,
> {
) -> crate::Result<GetResult<Self::Error>, Self::Error> {
NotSupported { name: &self.name }.fail()
}

View File

@ -2,7 +2,7 @@
//! as the object store.
use crate::{
path::{cloud::CloudPath, DELIMITER},
ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath,
GetResult, ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath,
};
use async_trait::async_trait;
use bytes::Bytes;
@ -117,7 +117,7 @@ impl ObjectStoreApi for GoogleCloudStorage {
Ok(())
}
async fn get(&self, location: &Self::Path) -> Result<BoxStream<'static, Result<Bytes>>> {
async fn get(&self, location: &Self::Path) -> Result<GetResult<Error>> {
let location = location.to_raw();
let location_copy = location.clone();
let bucket_name = self.bucket_name.clone();
@ -141,7 +141,8 @@ impl ObjectStoreApi for GoogleCloudStorage {
},
})?;
Ok(futures::stream::once(async move { Ok(bytes.into()) }).boxed())
let s = futures::stream::once(async move { Ok(bytes.into()) }).boxed();
Ok(GetResult::Stream(s))
}
async fn delete(&self, location: &Self::Path) -> Result<()> {

View File

@ -59,6 +59,7 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryFutureExt, TryStreamExt};
use snafu::{ResultExt, Snafu};
use std::fmt::Formatter;
use std::{path::PathBuf, sync::Arc};
/// Universal API to multiple object store services.
@ -80,10 +81,7 @@ pub trait ObjectStoreApi: Send + Sync + 'static {
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<(), Self::Error>;
/// Return the bytes that are stored at the specified location.
async fn get(
&self,
location: &Self::Path,
) -> Result<BoxStream<'static, Result<Bytes, Self::Error>>, Self::Error>;
async fn get(&self, location: &Self::Path) -> Result<GetResult<Self::Error>, Self::Error>;
/// Delete the object at the specified location.
async fn delete(&self, location: &Self::Path) -> Result<(), Self::Error>;
@ -277,26 +275,22 @@ impl ObjectStoreApi for ObjectStore {
Ok(())
}
async fn get(&self, location: &Self::Path) -> Result<BoxStream<'static, Result<Bytes>>> {
async fn get(&self, location: &Self::Path) -> Result<GetResult<Error>> {
use ObjectStoreIntegration::*;
Ok(match (&self.integration, location) {
(AmazonS3(s3), path::Path::AmazonS3(location)) => {
s3.get(location).await?.err_into().boxed()
}
(AmazonS3(s3), path::Path::AmazonS3(location)) => s3.get(location).await?.err_into(),
(GoogleCloudStorage(gcs), path::Path::GoogleCloudStorage(location)) => {
gcs.get(location).await?.err_into().boxed()
gcs.get(location).await?.err_into()
}
(InMemory(in_mem), path::Path::InMemory(location)) => {
in_mem.get(location).await?.err_into().boxed()
in_mem.get(location).await?.err_into()
}
(InMemoryThrottled(in_mem_throttled), path::Path::InMemory(location)) => {
in_mem_throttled.get(location).await?.err_into().boxed()
}
(File(file), path::Path::File(location)) => {
file.get(location).await?.err_into().boxed()
in_mem_throttled.get(location).await?.err_into()
}
(File(file), path::Path::File(location)) => file.get(location).await?.err_into(),
(MicrosoftAzure(azure), path::Path::MicrosoftAzure(location)) => {
azure.get(location).await?.err_into().boxed()
azure.get(location).await?.err_into()
}
_ => unreachable!(),
})
@ -582,6 +576,65 @@ impl<P: ObjectStorePath> ObjectMeta<P> {
}
}
/// Result for a get request
pub enum GetResult<E> {
/// A file
File(tokio::fs::File, std::path::PathBuf),
/// An asynchronous stream
Stream(BoxStream<'static, Result<Bytes, E>>),
}
impl<E> std::fmt::Debug for GetResult<E> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
GetResult::File(_, _) => write!(f, "GetResult(File)"),
GetResult::Stream(_) => write!(f, "GetResult(Stream)"),
}
}
}
impl GetResult<Error> {
/// Collects the data into a [`Vec<u8>`]
pub async fn bytes(self) -> Result<Vec<u8>, Error> {
let mut stream = self.into_stream();
let mut bytes = Vec::new();
while let Some(next) = stream.next().await {
bytes.extend_from_slice(next?.as_ref())
}
Ok(bytes)
}
/// Converts this into a byte stream
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes, Error>> {
match self {
Self::File(file, path) => {
tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
.map_ok(|b| b.freeze())
.map_err(move |source| Error::FileObjectStoreError {
source: disk::Error::UnableToReadBytes {
source,
path: path.clone(),
},
})
.boxed()
}
Self::Stream(s) => s,
}
}
}
impl<E: 'static> GetResult<E> {
/// Maps the error
fn err_into<T: From<E> + 'static>(self) -> GetResult<T> {
match self {
Self::File(f, p) => GetResult::File(f, p),
Self::Stream(s) => GetResult::Stream(s.err_into().boxed()),
}
}
}
/// A specialized `Result` for object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -734,12 +787,7 @@ mod tests {
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
assert!(content_list.is_empty());
let read_data = storage
.get(&location)
.await?
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await?;
let read_data = storage.get(&location).await?.bytes().await?;
assert_eq!(&*read_data, expected_data);
storage.delete(&location).await?;
@ -841,7 +889,7 @@ mod tests {
pub(crate) async fn get_nonexistent_object(
storage: &ObjectStore,
location: Option<<ObjectStore as ObjectStoreApi>::Path>,
) -> Result<Bytes> {
) -> Result<Vec<u8>> {
let location = location.unwrap_or_else(|| {
let mut loc = storage.new_path();
loc.set_file_name("this_file_should_not_exist");
@ -851,13 +899,7 @@ mod tests {
let content_list = flatten_list_stream(storage, Some(&location)).await?;
assert!(content_list.is_empty());
Ok(storage
.get(&location)
.await?
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await?
.freeze())
Ok(storage.get(&location).await?.bytes().await?)
}
/// Parse a str as a `CloudPath` into a `DirAndFileName`, even though the

View File

@ -2,7 +2,7 @@
//! store.
use crate::{
path::{cloud::CloudPath, parsed::DirsAndFileName},
ListResult, ObjectMeta, ObjectStoreApi,
GetResult, ListResult, ObjectMeta, ObjectStoreApi,
};
use async_trait::async_trait;
use bytes::Bytes;
@ -55,7 +55,7 @@ impl ObjectStoreApi for InMemory {
Ok(())
}
async fn get(&self, location: &Self::Path) -> Result<BoxStream<'static, Result<Bytes>>> {
async fn get(&self, location: &Self::Path) -> Result<GetResult<Self::Error>> {
let data = self
.storage
.read()
@ -66,7 +66,9 @@ impl ObjectStoreApi for InMemory {
location: location.to_string(),
})?;
Ok(futures::stream::once(async move { Ok(data) }).boxed())
Ok(GetResult::Stream(
futures::stream::once(async move { Ok(data) }).boxed(),
))
}
async fn delete(&self, location: &Self::Path) -> Result<()> {
@ -162,7 +164,6 @@ mod tests {
tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list},
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
};
use futures::TryStreamExt;
#[tokio::test]
async fn in_memory_test() {
@ -188,8 +189,7 @@ mod tests {
.get(&location)
.await
.unwrap()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);

View File

@ -1,7 +1,7 @@
//! This module contains the IOx implementation for wrapping existing object store types into an artificial "sleep" wrapper.
use std::{convert::TryInto, sync::Mutex};
use crate::{ListResult, ObjectStoreApi, Result};
use crate::{GetResult, ListResult, ObjectStoreApi, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt};
@ -129,18 +129,20 @@ impl<T: ObjectStoreApi> ObjectStoreApi for ThrottledStore<T> {
self.inner.put(location, bytes).await
}
async fn get(
&self,
location: &Self::Path,
) -> Result<BoxStream<'static, Result<Bytes, Self::Error>>, Self::Error> {
async fn get(&self, location: &Self::Path) -> Result<GetResult<Self::Error>, Self::Error> {
sleep(self.config().wait_get_per_call).await;
// need to copy to avoid moving / referencing `self`
let wait_get_per_byte = self.config().wait_get_per_byte;
self.inner.get(location).await.map(|stream| {
stream
.then(move |bytes_result| async move {
self.inner.get(location).await.map(|result| {
let s = match result {
GetResult::Stream(s) => s,
GetResult::File(_, _) => unimplemented!(),
};
GetResult::Stream(
s.then(move |bytes_result| async move {
match bytes_result {
Ok(bytes) => {
let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
@ -150,7 +152,8 @@ impl<T: ObjectStoreApi> ObjectStoreApi for ThrottledStore<T> {
Err(err) => Err(err),
}
})
.boxed()
.boxed(),
)
})
}
@ -420,8 +423,12 @@ mod tests {
let res = store.get(&path).await;
if n_bytes.is_some() {
// need to consume bytes to provoke sleep times
res.unwrap()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
let s = match res.unwrap() {
GetResult::Stream(s) => s,
GetResult::File(_, _) => unimplemented!(),
};
s.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.unwrap();

View File

@ -10,7 +10,7 @@ bytes = "1.0"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
futures = "0.3.7"
futures = "0.3"
generated_types = { path = "../generated_types", features = ["data_types_conversions"] }
iox_object_store = { path = "../iox_object_store" }
metric = { path = "../metric" }
@ -29,8 +29,8 @@ schema = { path = "../schema" }
tempfile = "3.1.0"
thrift = "0.13"
time = { path = "../time" }
tokio = { version = "1.13", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt", "rt-multi-thread", "sync"] }
tokio-stream = "0.1"
uuid = { version = "0.8", features = ["serde", "v4"] }
uuid = { version = "0.8", features = ["v4"] }
zstd = "0.9"
workspace-hack = { path = "../workspace-hack"}

View File

@ -1837,8 +1837,7 @@ mod tests {
.get_catalog_transaction_file(path)
.await
.unwrap()
.map_ok(|bytes| bytes.to_vec())
.try_concat()
.bytes()
.await
.unwrap();

View File

@ -1,5 +1,4 @@
use bytes::Bytes;
use futures::TryStreamExt;
use generated_types::influxdata::iox::preserved_catalog::v1 as proto;
use iox_object_store::{IoxObjectStore, TransactionFilePath};
use object_store::{ObjectStore, ObjectStoreApi};
@ -54,8 +53,7 @@ pub async fn load_transaction_proto(
.get_catalog_transaction_file(path)
.await
.context(Read {})?
.map_ok(|bytes| bytes.to_vec())
.try_concat()
.bytes()
.await
.context(Read {})?;
let proto = proto::Transaction::decode(&data[..]).context(Deserialization {})?;

View File

@ -136,8 +136,7 @@ async fn read_parquet(
.get_parquet_file(path)
.await
.context(ReadFailure)?
.map_ok(|bytes| bytes.to_vec())
.try_concat()
.bytes()
.await
.context(ReadFailure)?;

View File

@ -11,7 +11,7 @@ bytes = "1.0"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
futures = "0.3.7"
futures = "0.3"
generated_types = { path = "../generated_types" }
iox_object_store = { path = "../iox_object_store" }
metric = { path = "../metric" }
@ -29,9 +29,9 @@ schema = { path = "../schema" }
tempfile = "3.1.0"
thrift = "0.13"
time = { path = "../time" }
tokio = { version = "1.13", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt", "rt-multi-thread", "sync"] }
tokio-stream = "0.1"
uuid = { version = "0.8", features = ["serde", "v4"] }
uuid = { version = "0.8", features = ["v4"] }
zstd = "0.9"
workspace-hack = { path = "../workspace-hack"}

View File

@ -7,19 +7,15 @@ use arrow::{
};
use bytes::Bytes;
use data_types::chunk_metadata::ChunkAddr;
use datafusion::{
datasource::{object_store::local::LocalFileSystem, PartitionedFile},
logical_plan::Expr,
physical_plan::{
file_format::{ParquetExec, PhysicalPlanConfig},
ExecutionPlan, Partitioning, SendableRecordBatchStream,
},
};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::AdapterStream;
use futures::StreamExt;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::GetResult;
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::file::reader::SerializedFileReader;
use parquet::{
self,
arrow::ArrowWriter,
@ -28,7 +24,7 @@ use parquet::{
};
use predicate::predicate::Predicate;
use schema::selection::Selection;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
io::{Cursor, Seek, SeekFrom, Write},
sync::Arc,
@ -62,66 +58,25 @@ pub enum Error {
#[snafu(display("Error converting to vec[u8]: Nothing else should have a reference here"))]
WritingToMemWriter {},
#[snafu(display("Non local file not supported"))]
NonLocalFile {},
#[snafu(display("Error opening file: {}", source))]
OpenFile { source: std::io::Error },
#[snafu(display("Error opening temp file: {}", source))]
OpenTempFile { source: std::io::Error },
#[snafu(display("Error writing to temp file: {}", source))]
WriteTempFile { source: std::io::Error },
#[snafu(display("Error getting metadata from temp file: {}", source))]
MetaTempFile { source: std::io::Error },
#[snafu(display("Internal error: can not get temp file as str: {}", path))]
TempFilePathAsStr { path: String },
#[snafu(display(
"Internal error: unexpected partitioning in parquet reader: {:?}",
partitioning
))]
UnexpectedPartitioning { partitioning: Partitioning },
#[snafu(display("Error creating pruning predicate: {}", source))]
CreatingPredicate {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Error reading from parquet stream: {}", source))]
ReadingParquet {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Error at serialized file reader: {}", source))]
SerializedFileReaderError {
source: parquet::errors::ParquetError,
},
#[snafu(display("Error at parquet arrow reader: {}", source))]
ParquetArrowReaderError {
source: parquet::errors::ParquetError,
},
#[snafu(display("Error reading data from parquet file: {}", source))]
ReadingFile { source: ArrowError },
#[snafu(display("Error reading data from object store: {}", source))]
ReadingObjectStore { source: object_store::Error },
#[snafu(display("Error sending results: {}", source))]
SendResult {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Cannot extract Parquet metadata from byte array: {}", source))]
ExtractingMetadataFailure { source: crate::metadata::Error },
#[snafu(display("Cannot encode metadata: {}", source))]
MetadataEncodeFailure { source: prost::EncodeError },
#[snafu(display("Error reading parquet: {}", source))]
ParquetReader {
source: parquet::errors::ParquetError,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -231,13 +186,10 @@ impl Storage {
}
/// Downloads the specified parquet file to a local temporary file
/// and uses the `[ParquetExec`] from DataFusion to read that
/// parquet file (including predicate and projection pushdown).
/// and uses the `[ParquetExec`]
///
/// The resulting record batches from Parquet are sent back to `tx`
async fn download_and_scan_parquet(
schema: SchemaRef,
predicate: Option<Expr>,
fn download_and_scan_parquet(
projection: Vec<usize>,
path: ParquetFilePath,
store: Arc<IoxObjectStore>,
@ -246,106 +198,56 @@ impl Storage {
// Size of each batch
let batch_size = 1024; // Todo: make a constant or policy for this
// Limit of total rows to read
let limit: Option<usize> = None; // Todo: this should be a parameter of the function
// todo(paul): Here is where I'd get the cache from object store. If it has
// one, I'd do the `fs_path_or_cache`. Otherwise, do the temp file like below.
// TODO use DataFusion ObjectStore implementation rather than
// download the file directly
// read parquet file to local file
let mut temp_file = tempfile::Builder::new()
.prefix("iox-parquet-cache")
.suffix(".parquet")
.tempfile()
.context(OpenTempFile)?;
debug!(?path, ?temp_file, "Beginning to read parquet to temp file");
let mut read_stream = store
.get_parquet_file(&path)
.await
let read_stream = futures::executor::block_on(store.get_parquet_file(&path))
.context(ReadingObjectStore)?;
while let Some(bytes) = read_stream.next().await {
let bytes = bytes.context(ReadingObjectStore)?;
debug!(len = bytes.len(), "read bytes from object store");
temp_file.write_all(&bytes).context(WriteTempFile)?;
}
let file = match read_stream {
GetResult::File(f, _) => {
debug!(?path, "Using file directly");
futures::executor::block_on(f.into_std())
}
GetResult::Stream(read_stream) => {
// read parquet file to local file
let mut file = tempfile::tempfile().context(OpenTempFile)?;
let file_size = temp_file.as_file().metadata().context(MetaTempFile)?.len();
debug!(?path, ?file, "Beginning to read parquet to temp file");
// now, create the appropriate parquet exec from datafusion and make it
let temp_path = temp_file.into_temp_path();
debug!(?temp_path, "Completed read parquet to tempfile");
for bytes in futures::executor::block_on_stream(read_stream) {
let bytes = bytes.context(ReadingObjectStore)?;
debug!(len = bytes.len(), "read bytes from object store");
file.write_all(&bytes).context(WriteTempFile)?;
}
let temp_path = temp_path.to_str().with_context(|| TempFilePathAsStr {
path: temp_path.to_string_lossy(),
})?;
file.rewind().context(WriteTempFile)?;
// TODO: renenable when bug in parquet statistics generation
// is fixed: https://github.com/apache/arrow-rs/issues/641
// https://github.com/influxdata/influxdb_iox/issues/2163
if predicate.is_some() {
debug!(?predicate, "Skipping predicate pushdown due to XXX");
}
let predicate = None;
let object_store = Arc::new(LocalFileSystem {});
// TODO real statistics so we can use parquet row group
// pruning (needs both a real predicate and the formats to be
// exposed)
let statistics = datafusion::physical_plan::Statistics::default();
let part_file = PartitionedFile::new(temp_path.to_string(), file_size);
let file_groups = vec![vec![part_file]];
let base_config = PhysicalPlanConfig {
object_store,
file_schema: schema,
file_groups,
statistics,
projection: Some(projection),
batch_size,
limit,
table_partition_cols: vec![],
debug!(?path, "Completed read parquet to tempfile");
file
}
};
let parquet_exec = ParquetExec::new(base_config, predicate);
let file_reader = SerializedFileReader::new(file).context(ParquetReader)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let record_batch_reader = arrow_reader
.get_record_reader_by_columns(projection, batch_size)
.context(ParquetReader)?;
// We are assuming there is only a single stream in the
// call to execute(0) below
let partitioning = parquet_exec.output_partitioning();
ensure!(
matches!(partitioning, Partitioning::UnknownPartitioning(1)),
UnexpectedPartitioning { partitioning }
);
let mut parquet_stream = parquet_exec.execute(0).await.context(ReadingParquet)?;
while let Some(batch) = parquet_stream.next().await {
if let Err(e) = tx.send(batch).await {
debug!(%e, "Stopping parquet exec early, receiver hung up");
return Ok(());
for batch in record_batch_reader {
if tx.blocking_send(batch).is_err() {
debug!(?path, "Receiver hung up - exiting");
break;
}
}
Ok(())
}
pub fn read_filter(
predicate: &Predicate,
_predicate: &Predicate,
selection: Selection<'_>,
schema: SchemaRef,
path: ParquetFilePath,
store: Arc<IoxObjectStore>,
) -> Result<SendableRecordBatchStream> {
// fire up a async task that will fetch the parquet file
// locally, start it executing and send results
let parquet_schema = Arc::clone(&schema);
// Indices of columns in the schema needed to read
let projection: Vec<usize> = Self::column_indices(selection, Arc::clone(&schema));
@ -357,29 +259,19 @@ impl Storage {
.collect(),
));
// pushdown predicate, if any
let predicate = predicate.filter_expr();
let (tx, rx) = tokio::sync::mpsc::channel(2);
// Run async dance here to make sure any error returned
// `download_and_scan_parquet` is sent back to the reader and
// not silently ignored
tokio::task::spawn(async move {
let download_result = Self::download_and_scan_parquet(
parquet_schema,
predicate,
projection,
path,
store,
tx.clone(),
)
.await;
tokio::task::spawn_blocking(move || {
let download_result =
Self::download_and_scan_parquet(projection, path, store, tx.clone());
// If there was an error returned from download_and_scan_parquet send it back to the receiver.
if let Err(e) = download_result {
let e = ArrowError::ExternalError(Box::new(e));
if let Err(e) = tx.send(ArrowResult::Err(e)).await {
if let Err(e) = tx.blocking_send(ArrowResult::Err(e)) {
// if no one is listening, there is no one else to hear our screams
debug!(%e, "Error sending result of download function. Receiver is closed.");
}

View File

@ -14,7 +14,6 @@ use data_types::{
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics},
server_id::ServerId,
};
use futures::TryStreamExt;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::ObjectStore;
use parquet::{
@ -88,8 +87,7 @@ pub async fn load_parquet_from_store_for_path(
.get_parquet_file(path)
.await
.context(GettingDataFromObjectStore)?
.map_ok(|bytes| bytes.to_vec())
.try_concat()
.bytes()
.await
.context(GettingDataFromObjectStore)?;

View File

@ -1,6 +1,7 @@
name = "example"
[[agents]]
name = "foo"
count = 3
sampling_interval = "10s"

View File

@ -5,7 +5,7 @@ edition = "2021"
[dependencies]
arrow = { version = "6.0", features = ["prettyprint"] }
chrono = "0.4"
chrono = { version = "0.4", default-features = false }
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
@ -20,4 +20,4 @@ workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
test_helpers = { path = "../test_helpers" }
tokio = { version = "1.13", features = ["macros"] }
tokio = { version = "1.13", features = ["macros", "parking_lot"] }

View File

@ -17,7 +17,7 @@ description = "IOx Query Interface and Executor"
arrow = { version = "6.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" }
async-trait = "0.1"
chrono = "0.4"
chrono = { version = "0.4", default-features = false }
croaring = "0.5"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
@ -30,14 +30,13 @@ pin-project = "1.0"
regex = "1"
schema = { path = "../schema" }
snafu = "0.6.9"
tokio = { version = "1.13", features = ["macros"] }
tokio-stream = "0.1.8"
tokio = { version = "1.13", features = ["macros", "parking_lot"] }
tokio-stream = "0.1"
tokio-util = { version = "0.6.9" }
trace = { path = "../trace" }
predicate = { path = "../predicate" }
workspace-hack = { path = "../workspace-hack"}
# use libc on unix like platforms to set worker priority in DedicatedExecutor
[target."cfg(unix)".dependencies.libc]
version = "0.2"

View File

@ -26,4 +26,4 @@ schema = { path = "../schema" }
snafu = "0.6.3"
tempfile = "3.1.0"
test_helpers = { path = "../test_helpers" }
tokio = { version = "1.13", features = ["macros", "time"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "time"] }

View File

@ -23,5 +23,5 @@ workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
mutable_batch_lp = { path = "../mutable_batch_lp" }
regex = "1.4"
tokio = { version = "1.13", features = ["macros", "time"] }
regex = "1"
tokio = { version = "1.13", features = ["macros", "parking_lot"] }

View File

@ -8,7 +8,7 @@ description = "IOx Schema definition"
[dependencies]
arrow = { version = "6.0", features = ["prettyprint"] }
hashbrown = "0.11"
indexmap = "1.7"
indexmap = { version = "1.7", features = ["std"] }
itertools = "0.10.1"
snafu = "0.6"
workspace-hack = { path = "../workspace-hack"}

View File

@ -9,15 +9,15 @@ arrow = { version = "6.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" }
async-trait = "0.1"
bytes = "1.0"
chrono = "0.4"
chrono = { version = "0.4", default-features = false }
cache_loader_async = { version = "0.1.2", features = ["ttl-cache"] }
crc32fast = "1.2.2"
crc32fast = "1.3.0"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
dml = { path = "../dml" }
futures = "0.3"
futures-util = { version = "0.3.1" }
futures-util = { version = "0.3" }
generated_types = { path = "../generated_types", features = ["data_types_conversions"] }
hashbrown = "0.11"
influxdb_iox_client = { path = "../influxdb_iox_client" }
@ -34,7 +34,7 @@ mutable_buffer = { path = "../mutable_buffer" }
num_cpus = "1.13.0"
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.4.0", features = ["race"] }
once_cell = { version = "1.4.0", features = ["parking_lot"] }
parking_lot = "0.11.2"
parquet_catalog = { path = "../parquet_catalog" }
parquet_file = { path = "../parquet_file" }
@ -51,15 +51,15 @@ snafu = "0.6"
snap = "1.0.0"
time = { path = "../time" }
trace = { path = "../trace" }
tokio = { version = "1.13", features = ["macros", "time"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tokio-util = { version = "0.6.9" }
tracker = { path = "../tracker" }
uuid = { version = "0.8", features = ["serde", "v4"] }
uuid = { version = "0.8", features = ["v4"] }
write_buffer = { path = "../write_buffer" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] # In alphabetical order
regex = "1.4"
regex = "1"
router = { path = "../router" }
test_helpers = { path = "../test_helpers" }

View File

@ -5,15 +5,12 @@ authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2021"
description = "Server related bechmarks, grouped into their own crate to minimize build dev build times"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
[dev-dependencies] # In alphabetical order
arrow_util = { path = "../arrow_util" }
criterion = { version = "0.3.4", features = ["async_tokio"] }
chrono = { version = "0.4", features = ["serde"] }
chrono = { version = "0.4", default-features = false }
datafusion = { path = "../datafusion" }
data_types = { path = "../data_types" }
flate2 = "1.0.20"
@ -27,7 +24,7 @@ query_tests = { path = "../query_tests" }
rand = "0.8.3"
server = { path = "../server" }
test_helpers = { path = "../test_helpers" }
tokio = { version = "1.13", features = ["macros", "time"] }
tokio = { version = "1.13", features = ["macros", "parking_lot"] }
[[bench]]
name = "influxrpc"

View File

@ -5,8 +5,7 @@ edition = "2021"
description = "Time functionality for IOx"
[dependencies]
chrono = "0.4"
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
parking_lot = "0.11"
workspace-hack = { path = "../workspace-hack"}

View File

@ -6,8 +6,7 @@ edition = "2021"
description = "Distributed tracing support within IOx"
[dependencies]
chrono = "0.4"
chrono = { version = "0.4", default-features = false }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11"
rand = "0.8"

View File

@ -8,13 +8,13 @@ description = "Additional tracing exporters for IOx"
[dependencies]
async-trait = "0.1"
chrono = { version = "0.4" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
futures = "0.3"
observability_deps = { path = "../observability_deps" }
snafu = "0.6"
structopt = { version = "0.3.25" }
thrift = { version = "0.13.0" }
tokio = { version = "1.13", features = ["macros", "time", "sync", "rt"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt", "sync"] }
trace = { path = "../trace" }
workspace-hack = { path = "../workspace-hack"}

View File

@ -15,10 +15,10 @@ observability_deps = { path = "../observability_deps" }
parking_lot = "0.11.2"
pin-project = "1.0"
time = { path = "../time" }
tokio = { version = "1.13", features = ["macros", "time"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "sync", "time"] }
tokio-util = { version = "0.6.9" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
# Need the multi-threaded executor for testing
tokio = { version = "1.13", features = ["macros", "time", "rt-multi-thread"] }
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt-multi-thread", "time"] }

View File

@ -15,24 +15,18 @@ publish = false
[dependencies]
ahash = { version = "0.7", features = ["std"] }
bytes = { version = "1", features = ["std"] }
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] }
clap = { version = "2", features = ["ansi_term", "atty", "color", "strsim", "suggestions", "vec_map"] }
chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "std", "time", "winapi"] }
either = { version = "1", features = ["use_std"] }
futures = { version = "0.3", features = ["alloc", "async-await", "executor", "futures-executor", "std"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
futures-core = { version = "0.3", features = ["alloc", "std"] }
futures-io = { version = "0.3", default-features = false, features = ["std"] }
futures-sink = { version = "0.3", features = ["alloc", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "proc-macro-hack", "proc-macro-nested", "sink", "slab", "std"] }
getrandom = { version = "0.2", default-features = false, features = ["js", "js-sys", "std", "wasm-bindgen"] }
futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown = { version = "0.11", features = ["ahash", "inline-more", "raw"] }
hyper = { version = "0.14", features = ["client", "full", "h2", "http1", "http2", "runtime", "server", "socket2", "stream", "tcp"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itoa = { version = "0.4", features = ["i128", "std"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2", features = ["std", "use_std"] }
memchr = { version = "2", features = ["std"] }
num-bigint = { version = "0.4", features = ["std"] }
num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
@ -41,7 +35,7 @@ rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-automata = { version = "0.1", features = ["regex-syntax", "std"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
reqwest = { version = "0.11", features = ["__rustls", "__tls", "blocking", "default-tls", "hyper-rustls", "hyper-tls", "json", "native-tls-crate", "rustls", "rustls-tls", "rustls-tls-webpki-roots", "serde_json", "stream", "tokio-native-tls", "tokio-rustls", "webpki-roots"] }
reqwest = { version = "0.11", features = ["__tls", "default-tls", "hyper-tls", "json", "native-tls-crate", "serde_json", "tokio-native-tls"] }
serde = { version = "1", features = ["derive", "rc", "serde_derive", "std"] }
serde_json = { version = "1", features = ["indexmap", "preserve_order", "std"] }
smallvec = { version = "1", default-features = false, features = ["union"] }
@ -52,21 +46,17 @@ tower = { version = "0.4", features = ["balance", "buffer", "discover", "futures
tracing = { version = "0.1", features = ["attributes", "log", "max_level_trace", "release_max_level_debug", "std", "tracing-attributes"] }
tracing-core = { version = "0.1", features = ["lazy_static", "std"] }
tracing-subscriber = { version = "0.3", features = ["alloc", "ansi", "ansi_term", "env-filter", "fmt", "lazy_static", "matchers", "regex", "registry", "sharded-slab", "smallvec", "std", "thread_local", "tracing", "tracing-log"] }
url = { version = "2", default-features = false, features = ["serde"] }
uuid = { version = "0.8", features = ["getrandom", "serde", "std", "v4"] }
[build-dependencies]
ahash = { version = "0.7", features = ["std"] }
bytes = { version = "1", features = ["std"] }
cc = { version = "1", default-features = false, features = ["jobserver", "parallel"] }
clap = { version = "2", features = ["ansi_term", "atty", "color", "strsim", "suggestions", "vec_map"] }
either = { version = "1", features = ["use_std"] }
getrandom = { version = "0.2", default-features = false, features = ["js", "js-sys", "std", "wasm-bindgen"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown = { version = "0.11", features = ["ahash", "inline-more", "raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2", features = ["std", "use_std"] }
memchr = { version = "2", features = ["std"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }

View File

@ -21,11 +21,11 @@ pin-project = "1.0"
prost = "0.8"
rdkafka = "0.28.0"
time = { path = "../time" }
tokio = { version = "1.13", features = ["macros", "fs"] }
tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
tokio-util = "0.6.9"
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
uuid = { version = "0.8", features = ["serde", "v4"] }
uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]