feat: Add write_lp partial write, name check, and precision (#24677)

* feat: Add partial write and name check to write_lp

This commit adds new behavior to the v3 write_lp http endpoint by
implementing both partial writes and checking the db name for validity.
It also sets the partial write behavior as the default now, whereas
before we would reject the entire request if one line was incorrect.
Users who *do* actually want that behavior can now opt in by putting
'accept_partial=false' into the url of the request.

We also check that the db name used in the request contains only
numbers, letters, underscores and hyphens and that it must start with
either a number or letter.

We also introduce a more standardized way to return errors to the user
as JSON that we can expand over time to give actionable error messages
to the user that they can use to fix their requests.

Finally tests have been included to mock out and test the behavior for
all of the above so that changes to the error messages are reflected in
tests, that both partial and not partial writes work as expected, and
that invalid db names are rejected without writing.

* feat: Add precision to write_lp http endpoint

This commit adds the ability to control the precision of the time stamp
passed in to the endpoint. For example if a user chooses 'second' and
the timestamp 20 that will be 20 seconds past the Unix Epoch. If they
choose 'millisecond' instead it will be 20 milliseconds past the Epoch.

Up to this point we assumed that all data passed in was of nanosecond
precision. The data is still stored in the database as nanoseconds.
Instead upon receiving the data we convert it to nanoseconds. If the
precision URL parameter is not specified we default to auto and take a
best effort guess at what the user wanted based on the order of
magnitude of the data passed in.

This change will allow users finer grained control over what precision
they want to use for their data as well as trying our best to make a
good user experience and having things work as expected and not creating
a failure mode whereby a user wanted seconds and instead put in
nanoseconds by default.
pull/24701/head
Michael Gattozzi 2024-02-27 11:57:10 -05:00 committed by GitHub
parent 298055e9fb
commit 8fec1d636e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 822 additions and 197 deletions

294
Cargo.lock generated
View File

@ -90,9 +90,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]]
name = "anstream"
version = "0.6.11"
version = "0.6.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5"
checksum = "96b09b5178381e0874812a9b157f7fe84982617e48f71f4e3235482775e5b540"
dependencies = [
"anstyle",
"anstyle-parse",
@ -138,9 +138,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.79"
version = "1.0.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca"
checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1"
[[package]]
name = "arc-swap"
@ -418,7 +418,7 @@ dependencies = [
"proptest",
"rand",
"regex",
"snafu 0.8.0",
"snafu 0.8.1",
"uuid",
"workspace-hack",
]
@ -461,7 +461,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3"
dependencies = [
"concurrent-queue",
"event-listener 5.0.0",
"event-listener 5.1.0",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
@ -513,7 +513,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -524,7 +524,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -561,7 +561,7 @@ dependencies = [
"observability_deps",
"parking_lot 0.12.1",
"paste",
"snafu 0.8.0",
"snafu 0.8.1",
"test_helpers_end_to_end",
"tokio",
"tonic 0.10.2",
@ -625,7 +625,7 @@ version = "0.1.0"
dependencies = [
"observability_deps",
"rand",
"snafu 0.8.0",
"snafu 0.8.1",
"tokio",
"workspace-hack",
]
@ -737,9 +737,9 @@ dependencies = [
[[package]]
name = "bstr"
version = "1.9.0"
version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c48f0051a4b4c5e0b6d365cd04af53aeaa209e3cc15ec2cdb69e73cc87fbd0dc"
checksum = "05efc5cfd9110c8416e471df0e96702d58690178e206e61b7173706673c93706"
dependencies = [
"memchr",
"regex-automata 0.4.5",
@ -748,9 +748,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.14.0"
version = "3.15.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
checksum = "8ea184aa71bb362a1157c896979544cc23974e08fd265f29ea96b59f0b4a555b"
[[package]]
name = "bytecount"
@ -867,7 +867,7 @@ dependencies = [
"futures",
"hyper",
"reqwest",
"snafu 0.8.0",
"snafu 0.8.1",
"tokio",
"tokio-util",
"url",
@ -876,11 +876,10 @@ dependencies = [
[[package]]
name = "cc"
version = "1.0.83"
version = "1.0.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0"
checksum = "02f341c093d19155a6e41631ce5971aac4e9a868262212153124c15fa22d1cdc"
dependencies = [
"jobserver",
"libc",
]
@ -902,7 +901,7 @@ dependencies = [
"num-traits",
"serde",
"wasm-bindgen",
"windows-targets 0.52.0",
"windows-targets 0.52.3",
]
[[package]]
@ -981,7 +980,7 @@ dependencies = [
"object_store",
"observability_deps",
"parquet_cache",
"snafu 0.8.0",
"snafu 0.8.1",
"sysinfo",
"tempfile",
"test_helpers",
@ -1013,7 +1012,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -1362,14 +1361,14 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
name = "darling"
version = "0.20.5"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc5d6b04b3fd0ba9926f945895de7d806260a2d7431ba82e7edaecb043c4c6b8"
checksum = "54e36fcd13ed84ffdfda6f5be89b31287cbb80c439841fe69e04841435464391"
dependencies = [
"darling_core",
"darling_macro",
@ -1377,27 +1376,27 @@ dependencies = [
[[package]]
name = "darling_core"
version = "0.20.5"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04e48a959bcd5c761246f5d090ebc2fbf7b9cd527a492b07a67510c108f1e7e3"
checksum = "9c2cf1c23a687a1feeb728783b993c4e1ad83d99f351801977dd809b48d0a70f"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim 0.10.0",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
name = "darling_macro"
version = "0.20.5"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d1545d67a2149e1d93b7e5c7752dce5a7426eb5d1357ddcfd89336b94444f77"
checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f"
dependencies = [
"darling_core",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -1438,7 +1437,7 @@ dependencies = [
"serde_json",
"sha2",
"siphasher 1.0.0",
"snafu 0.8.0",
"snafu 0.8.1",
"sqlx",
"test_helpers",
"thiserror",
@ -1765,9 +1764,9 @@ checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
[[package]]
name = "dyn-clone"
version = "1.0.16"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "545b22097d44f8a9581187cdf93de7a71e4722bf51200cfaba810865b49a495d"
checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125"
[[package]]
name = "ed25519"
@ -1861,9 +1860,9 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "event-listener"
version = "5.0.0"
version = "5.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b72557800024fabbaa2449dd4bf24e37b93702d457a4d4f2b0dd1f0f039f20c1"
checksum = "b7ad6fd685ce13acd6d9541a30f6db6567a7a24c9ffd4ba2955d29e3f22c8b27"
dependencies = [
"concurrent-queue",
"parking",
@ -1876,7 +1875,7 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feedafcaa9b749175d5ac357452a9d41ea2911da598fde46ce1fe02c37751291"
dependencies = [
"event-listener 5.0.0",
"event-listener 5.1.0",
"pin-project-lite",
]
@ -1891,7 +1890,7 @@ dependencies = [
"once_cell",
"parking_lot 0.12.1",
"pin-project",
"snafu 0.8.0",
"snafu 0.8.1",
"tokio",
"tokio-util",
"tokio_metrics_bridge",
@ -1980,7 +1979,7 @@ dependencies = [
"observability_deps",
"once_cell",
"prost 0.12.3",
"snafu 0.8.0",
"snafu 0.8.1",
"workspace-hack",
]
@ -2086,7 +2085,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -2239,9 +2238,9 @@ dependencies = [
[[package]]
name = "half"
version = "2.3.1"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc52e53916c08643f1b56ec082790d1e86a32e58dc5268f897f313fbae7b4872"
checksum = "b5eceaaeec696539ddaf7b333340f1af35a5aa87ae3e4f3ead0532f72affab2e"
dependencies = [
"cfg-if",
"crunchy",
@ -2326,9 +2325,9 @@ dependencies = [
[[package]]
name = "hermit-abi"
version = "0.3.6"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd5256b483761cd23699d0da46cc6fd2ee3be420bbe6d020ae4a091e70b7e9fd"
checksum = "379dada1584ad501b383485dd706b8afb7a70fcbc7f4da7d780638a5a6124a60"
[[package]]
name = "hex"
@ -2567,7 +2566,7 @@ dependencies = [
"log",
"nom",
"smallvec",
"snafu 0.8.0",
"snafu 0.8.1",
"test_helpers",
]
@ -2583,7 +2582,7 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"snafu 0.8.0",
"snafu 0.8.1",
"test_helpers",
"tokio",
"url",
@ -2691,6 +2690,7 @@ dependencies = [
"parquet",
"parquet_file",
"pin-project-lite",
"pretty_assertions",
"schema",
"serde",
"serde_json",
@ -2709,6 +2709,7 @@ dependencies = [
"trace_exporters",
"trace_http",
"tracker",
"unicode-segmentation",
"urlencoding 1.3.3",
"workspace-hack",
]
@ -2821,7 +2822,7 @@ name = "influxrpc_parser"
version = "0.1.0"
dependencies = [
"generated_types",
"snafu 0.8.0",
"snafu 0.8.1",
"sqlparser",
"workspace-hack",
]
@ -2844,7 +2845,7 @@ dependencies = [
"prost-build",
"query_functions",
"serde",
"snafu 0.8.0",
"snafu 0.8.1",
"tonic 0.10.2",
"tonic-build",
"workspace-hack",
@ -2931,7 +2932,7 @@ dependencies = [
"rand",
"serde",
"siphasher 1.0.0",
"snafu 0.8.0",
"snafu 0.8.1",
"sqlx",
"sqlx-hotswap-pool",
"tempfile",
@ -2966,7 +2967,7 @@ dependencies = [
"schema",
"serde",
"serde_json",
"snafu 0.8.0",
"snafu 0.8.1",
"test_helpers",
"tokio",
"toml",
@ -3004,7 +3005,7 @@ dependencies = [
"query_functions",
"schema",
"serde",
"snafu 0.8.0",
"snafu 0.8.1",
"test_helpers",
"tokio",
"tokio-stream",
@ -3058,7 +3059,7 @@ dependencies = [
"predicate",
"query_functions",
"schema",
"snafu 0.8.0",
"snafu 0.8.1",
"test_helpers",
"tokio",
"workspace-hack",
@ -3137,7 +3138,7 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"service_grpc_testing",
"snafu 0.8.0",
"snafu 0.8.1",
"tokio",
"tokio-stream",
"tokio-util",
@ -3162,7 +3163,7 @@ dependencies = [
"hyper",
"ioxd_common",
"metric",
"snafu 0.8.0",
"snafu 0.8.1",
"tokio-util",
"trace",
"workspace-hack",
@ -3218,15 +3219,6 @@ version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c"
[[package]]
name = "jobserver"
version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab46a6e9526ddef3ae7f787c06f0f2600639ba80ea3eade3d8e670a2230f51d6"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.68"
@ -3373,7 +3365,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde_json",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -3668,12 +3660,12 @@ dependencies = [
[[package]]
name = "mockito"
version = "1.2.0"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8d3038e23466858569c2d30a537f691fa0d53b51626630ae08262943e3bbb8b"
checksum = "031ec85a3f39370cc7663640077c38766fd32b03e6beb54e6e402d0454443f7f"
dependencies = [
"assert-json-diff",
"futures",
"futures-core",
"hyper",
"log",
"rand",
@ -3748,7 +3740,7 @@ dependencies = [
"proptest",
"rand",
"schema",
"snafu 0.8.0",
"snafu 0.8.1",
"workspace-hack",
]
@ -3764,7 +3756,7 @@ dependencies = [
"itertools 0.12.1",
"mutable_batch",
"schema",
"snafu 0.8.0",
"snafu 0.8.1",
"test_helpers",
"workspace-hack",
]
@ -3782,7 +3774,7 @@ dependencies = [
"mutable_batch_lp",
"partition",
"schema",
"snafu 0.8.0",
"snafu 0.8.1",
"workspace-hack",
]
@ -4044,7 +4036,7 @@ dependencies = [
"metric",
"object_store",
"pin-project",
"snafu 0.8.0",
"snafu 0.8.1",
"tokio",
"workspace-hack",
]
@ -4118,7 +4110,7 @@ dependencies = [
"proc-macro2",
"proc-macro2-diagnostics",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -4294,7 +4286,7 @@ dependencies = [
"prost 0.12.3",
"rand",
"schema",
"snafu 0.8.0",
"snafu 0.8.1",
"test_helpers",
"thiserror",
"thrift",
@ -4317,7 +4309,7 @@ dependencies = [
"object_store",
"parquet_file",
"schema",
"snafu 0.8.0",
"snafu 0.8.1",
"tokio",
"workspace-hack",
]
@ -4463,7 +4455,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -4542,7 +4534,7 @@ checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -4580,9 +4572,9 @@ dependencies = [
[[package]]
name = "pkg-config"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb"
checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
[[package]]
name = "platforms"
@ -4635,7 +4627,7 @@ dependencies = [
"observability_deps",
"query_functions",
"schema",
"snafu 0.8.0",
"snafu 0.8.1",
"sqlparser",
"test_helpers",
"workspace-hack",
@ -4685,7 +4677,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5"
dependencies = [
"proc-macro2",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -4705,7 +4697,7 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
"version_check",
"yansi 1.0.0-rc.1",
]
@ -4777,7 +4769,7 @@ dependencies = [
"prost 0.12.3",
"prost-types 0.12.3",
"regex",
"syn 2.0.48",
"syn 2.0.51",
"tempfile",
"which",
]
@ -4805,7 +4797,7 @@ dependencies = [
"itertools 0.11.0",
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -4871,7 +4863,7 @@ dependencies = [
"regex",
"regex-syntax 0.8.2",
"schema",
"snafu 0.8.0",
"snafu 0.8.1",
"tokio",
"workspace-hack",
]
@ -5201,9 +5193,9 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4"
[[package]]
name = "ryu"
version = "1.0.16"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]]
name = "same-file"
@ -5232,7 +5224,7 @@ dependencies = [
"indexmap 2.2.3",
"observability_deps",
"once_cell",
"snafu 0.8.0",
"snafu 0.8.1",
"workspace-hack",
]
@ -5311,9 +5303,9 @@ dependencies = [
[[package]]
name = "semver"
version = "1.0.21"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0"
checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca"
dependencies = [
"serde",
]
@ -5351,7 +5343,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -5399,9 +5391,9 @@ dependencies = [
[[package]]
name = "serde_yaml"
version = "0.9.31"
version = "0.9.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adf8a49373e98a4c5f0ceb5d05aa7c648d75f63774981ed95b7c7443bbd50c6e"
checksum = "8fd075d994154d4a774f95b51fb96bdc2832b0ea48425c92546073816cda1f2f"
dependencies = [
"indexmap 2.2.3",
"itoa",
@ -5445,7 +5437,7 @@ dependencies = [
"serde",
"serde_json",
"service_common",
"snafu 0.8.0",
"snafu 0.8.1",
"test_helpers",
"tokio",
"tonic 0.10.2",
@ -5591,11 +5583,11 @@ dependencies = [
[[package]]
name = "snafu"
version = "0.8.0"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d342c51730e54029130d7dc9fd735d28c4cd360f1368c01981d4f03ff207f096"
checksum = "5ed22871b3fe6eff9f1b48f6cbd54149ff8e9acd740dea9146092435f9c43bd3"
dependencies = [
"snafu-derive 0.8.0",
"snafu-derive 0.8.1",
]
[[package]]
@ -5612,14 +5604,14 @@ dependencies = [
[[package]]
name = "snafu-derive"
version = "0.8.0"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "080c44971436b1af15d6f61ddd8b543995cf63ab8e677d46b00cc06f4ef267a0"
checksum = "4651148226ec36010993fcba6c3381552e8463e9f3e337b75af202b0688b5274"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -5630,12 +5622,12 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
[[package]]
name = "socket2"
version = "0.5.5"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9"
checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871"
dependencies = [
"libc",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]
[[package]]
@ -5692,7 +5684,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -5972,7 +5964,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -6017,9 +6009,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.48"
version = "2.0.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f"
checksum = "6ab617d94515e94ae53b8406c628598680aa0c9587474ecbe58188f7b345d66c"
dependencies = [
"proc-macro2",
"quote",
@ -6145,7 +6137,7 @@ dependencies = [
"regex",
"reqwest",
"serde_json",
"snafu 0.8.0",
"snafu 0.8.1",
"sqlx",
"tempfile",
"test_helpers",
@ -6172,14 +6164,14 @@ checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
name = "thread_local"
version = "1.1.7"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [
"cfg-if",
"once_cell",
@ -6300,7 +6292,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -6384,9 +6376,9 @@ dependencies = [
[[package]]
name = "toml_edit"
version = "0.22.5"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99e68c159e8f5ba8a28c4eb7b0c0c190d77bb479047ca713270048145a9ad28a"
checksum = "2c1b5fd4128cc8d3e0cb74d4ed9a9cc7c7284becd4df68f5f940e1ad123606f6"
dependencies = [
"indexmap 2.2.3",
"serde",
@ -6464,7 +6456,7 @@ dependencies = [
"proc-macro2",
"prost-build",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -6580,7 +6572,7 @@ dependencies = [
"futures",
"iox_time",
"observability_deps",
"snafu 0.8.0",
"snafu 0.8.1",
"thrift",
"tokio",
"trace",
@ -6601,7 +6593,7 @@ dependencies = [
"observability_deps",
"parking_lot 0.12.1",
"pin-project",
"snafu 0.8.0",
"snafu 0.8.1",
"tower",
"trace",
"workspace-hack",
@ -6627,7 +6619,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]
@ -6909,7 +6901,7 @@ dependencies = [
"observability_deps",
"parking_lot 0.12.1",
"prost 0.12.3",
"snafu 0.8.0",
"snafu 0.8.1",
"snap",
"test_helpers",
"tokio",
@ -6982,7 +6974,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
"wasm-bindgen-shared",
]
@ -7016,7 +7008,7 @@ checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -7112,7 +7104,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
dependencies = [
"windows-core",
"windows-targets 0.52.0",
"windows-targets 0.52.3",
]
[[package]]
@ -7121,7 +7113,7 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets 0.52.0",
"windows-targets 0.52.3",
]
[[package]]
@ -7139,7 +7131,7 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.0",
"windows-targets 0.52.3",
]
[[package]]
@ -7159,17 +7151,17 @@ dependencies = [
[[package]]
name = "windows-targets"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd"
checksum = "d380ba1dc7187569a8a9e91ed34b8ccfc33123bbacb8c0aed2d1ad7f3ef2dc5f"
dependencies = [
"windows_aarch64_gnullvm 0.52.0",
"windows_aarch64_msvc 0.52.0",
"windows_i686_gnu 0.52.0",
"windows_i686_msvc 0.52.0",
"windows_x86_64_gnu 0.52.0",
"windows_x86_64_gnullvm 0.52.0",
"windows_x86_64_msvc 0.52.0",
"windows_aarch64_gnullvm 0.52.3",
"windows_aarch64_msvc 0.52.3",
"windows_i686_gnu 0.52.3",
"windows_i686_msvc 0.52.3",
"windows_x86_64_gnu 0.52.3",
"windows_x86_64_gnullvm 0.52.3",
"windows_x86_64_msvc 0.52.3",
]
[[package]]
@ -7180,9 +7172,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea"
checksum = "68e5dcfb9413f53afd9c8f86e56a7b4d86d9a2fa26090ea2dc9e40fba56c6ec6"
[[package]]
name = "windows_aarch64_msvc"
@ -7192,9 +7184,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef"
checksum = "8dab469ebbc45798319e69eebf92308e541ce46760b49b18c6b3fe5e8965b30f"
[[package]]
name = "windows_i686_gnu"
@ -7204,9 +7196,9 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]]
name = "windows_i686_gnu"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313"
checksum = "2a4e9b6a7cac734a8b4138a4e1044eac3404d8326b6c0f939276560687a033fb"
[[package]]
name = "windows_i686_msvc"
@ -7216,9 +7208,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]]
name = "windows_i686_msvc"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a"
checksum = "28b0ec9c422ca95ff34a78755cfa6ad4a51371da2a5ace67500cf7ca5f232c58"
[[package]]
name = "windows_x86_64_gnu"
@ -7228,9 +7220,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd"
checksum = "704131571ba93e89d7cd43482277d6632589b18ecf4468f591fbae0a8b101614"
[[package]]
name = "windows_x86_64_gnullvm"
@ -7240,9 +7232,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e"
checksum = "42079295511643151e98d61c38c0acc444e52dd42ab456f7ccfd5152e8ecf21c"
[[package]]
name = "windows_x86_64_msvc"
@ -7252,15 +7244,15 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.0"
version = "0.52.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04"
checksum = "0770833d60a970638e989b3fa9fd2bb1aaadcf88963d1659fd7d9990196ed2d6"
[[package]]
name = "winnow"
version = "0.6.0"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b1dbce9e90e5404c5a52ed82b1d13fc8cfbdad85033b6f57546ffd1265f8451"
checksum = "7a4191c47f15cc3ec71fcb4913cb83d58def65dd3787610213c649283b5ce178"
dependencies = [
"memchr",
]
@ -7354,7 +7346,7 @@ dependencies = [
"sqlx-sqlite",
"strum",
"syn 1.0.109",
"syn 2.0.48",
"syn 2.0.51",
"thrift",
"tokio",
"tokio-stream",
@ -7376,9 +7368,9 @@ dependencies = [
[[package]]
name = "xxhash-rust"
version = "0.8.8"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53be06678ed9e83edb1745eb72efc0bbcd7b5c3c35711a860906aed827a13d61"
checksum = "927da81e25be1e1a2901d59b81b37dd2efd1fc9c9345a55007f09bf5a2d3ee03"
[[package]]
name = "xz2"
@ -7427,7 +7419,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
"syn 2.0.51",
]
[[package]]

View File

@ -2,6 +2,7 @@ use arrow::record_batch::RecordBatch;
use arrow_flight::{decode::FlightRecordBatchStream, sql::SqlInfo};
use arrow_util::assert_batches_sorted_eq;
use futures::TryStreamExt;
use influxdb3_client::Precision;
use crate::common::TestServer;
@ -18,6 +19,7 @@ async fn flight() {
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s1,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3",
Precision::Nanosecond,
)
.await;
@ -133,11 +135,17 @@ async fn flight() {
}
}
async fn write_lp_to_db(server: &TestServer, database: &str, lp: &'static str) {
async fn write_lp_to_db(
server: &TestServer,
database: &str,
lp: &'static str,
precision: Precision,
) {
let client = influxdb3_client::Client::new(server.client_addr()).unwrap();
client
.api_v3_write_lp(database)
.body(lp)
.precision(precision)
.send()
.await
.unwrap();

View File

@ -86,7 +86,7 @@ impl Client {
/// let client = Client::new("http://localhost:8181")?;
/// client
/// .api_v3_write_lp("db_name")
/// .precision(Precision::Milli)
/// .precision(Precision::Millisecond)
/// .accept_partial(true)
/// .body("cpu,host=s1 usage=0.5")
/// .send()
@ -162,9 +162,9 @@ impl<'a, B> From<&'a WriteRequestBuilder<'a, B>> for WriteParams<'a> {
#[serde(rename_all = "snake_case")]
pub enum Precision {
Second,
Milli,
Micro,
Nano,
Millisecond,
Microsecond,
Nanosecond,
}
/// Builder type for composing a request to `/api/v3/write_lp`
@ -332,7 +332,7 @@ mod tests {
.mock("POST", "/api/v3/write_lp")
.match_header("Authorization", format!("Bearer {token}").as_str())
.match_query(Matcher::AllOf(vec![
Matcher::UrlEncoded("precision".into(), "milli".into()),
Matcher::UrlEncoded("precision".into(), "millisecond".into()),
Matcher::UrlEncoded("db".into(), db.into()),
Matcher::UrlEncoded("accept_partial".into(), "true".into()),
]))
@ -346,7 +346,7 @@ mod tests {
client
.api_v3_write_lp(db)
.precision(Precision::Milli)
.precision(Precision::Millisecond)
.accept_partial(true)
.body(body)
.send()

View File

@ -30,10 +30,10 @@ trace_http = { path = "../trace_http" }
tracker = { path = "../tracker" }
arrow = { workspace = true, features = ["prettyprint"] }
arrow-csv = "49.0.0"
arrow-flight.workspace = true
arrow-json = "49.0.0"
arrow-schema = "49.0.0"
arrow-csv = "49.0.0"
async-trait = "0.1"
chrono = "0.4"
datafusion = { workspace = true }
@ -43,15 +43,16 @@ hex = "0.4.3"
hyper = "0.14"
parking_lot = "0.11.1"
pin-project-lite = "0.2"
thiserror = "1.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
tokio-util = { version = "0.7.9" }
tonic = { workspace = true }
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.114"
serde_urlencoded = "0.7.0"
sha2 = "0.10.8"
thiserror = "1.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
tokio-util = { version = "0.7.9" }
tonic = { workspace = true }
tower = "0.4.13"
unicode-segmentation = "1.11.0"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
@ -62,3 +63,4 @@ test_helpers_end_to_end = { path = "../test_helpers_end_to_end" }
http = "0.2.9"
hyper = "0.14"
urlencoding = "1.1"
pretty_assertions = "1.4.0"

View File

@ -14,10 +14,14 @@ use hyper::header::CONTENT_ENCODING;
use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response, StatusCode};
use influxdb3_write::persister::TrackedMemoryArrowWriter;
use influxdb3_write::write_buffer::Error as WriteBufferError;
use influxdb3_write::BufferedWriteRequest;
use influxdb3_write::Precision;
use influxdb3_write::WriteBuffer;
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::{debug, error, info};
use serde::Deserialize;
use serde::Serialize;
use sha2::Digest;
use sha2::Sha256;
use std::convert::Infallible;
@ -26,6 +30,7 @@ use std::num::NonZeroI32;
use std::str::Utf8Error;
use std::sync::Arc;
use thiserror::Error;
use unicode_segmentation::UnicodeSegmentation;
#[derive(Debug, Error)]
pub enum Error {
@ -129,6 +134,17 @@ pub enum Error {
// Influxdb3 Write
#[error("serde json error: {0}")]
Influxdb3Write(#[from] influxdb3_write::Error),
// Invalid Start Character for a Database Name
#[error("db name did not start with a number or letter")]
DbNameInvalidStartChar,
// Invalid Character for a Database Name
#[error("db name must use ASCII letters, numbers, underscores and hyphens only")]
DbNameInvalidChar,
#[error("partial write of line protocol ocurred")]
PartialLpWrite(BufferedWriteRequest),
}
#[derive(Debug, Error)]
@ -142,12 +158,57 @@ pub enum AuthorizationError {
}
impl Error {
fn response(&self) -> Response<Body> {
let body = Body::from(self.to_string());
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(body)
.unwrap()
fn response(self) -> Response<Body> {
#[derive(Debug, Serialize)]
struct ErrorMessage<T: Serialize> {
error: String,
data: Option<T>,
}
match self {
Self::WriteBuffer(WriteBufferError::ParseError(err)) => {
let err = ErrorMessage {
error: "parsing failed for write_lp endpoint".into(),
data: Some(err),
};
let serialized = serde_json::to_string(&err).unwrap();
let body = Body::from(serialized);
Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(body)
.unwrap()
}
Self::DbNameInvalidStartChar | Self::DbNameInvalidChar => {
let err: ErrorMessage<()> = ErrorMessage {
error: self.to_string(),
data: None,
};
let serialized = serde_json::to_string(&err).unwrap();
let body = Body::from(serialized);
Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(body)
.unwrap()
}
Self::PartialLpWrite(data) => {
let err = ErrorMessage {
error: "partial write of line protocol ocurred".into(),
data: Some(data.invalid_lines),
};
let serialized = serde_json::to_string(&err).unwrap();
let body = Body::from(serialized);
Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(body)
.unwrap()
}
_ => {
let body = Body::from(self.to_string());
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(body)
.unwrap()
}
}
}
}
@ -185,6 +246,7 @@ where
async fn write_lp(&self, req: Request<Body>) -> Result<Response<Body>> {
let query = req.uri().query().ok_or(Error::MissingWriteParams)?;
let params: WriteParams = serde_urlencoded::from_str(query)?;
validate_db_name(&params.db)?;
info!("write_lp to {}", params.db);
let body = self.read_body(req).await?;
@ -195,11 +257,22 @@ where
// TODO: use the time provider
let default_time = SystemProvider::new().now().timestamp_nanos();
self.write_buffer
.write_lp(database, body, default_time)
let result = self
.write_buffer
.write_lp(
database,
body,
default_time,
params.accept_partial,
params.precision,
)
.await?;
Ok(Response::new(Body::from("{}")))
if result.invalid_lines.is_empty() {
Ok(Response::new(Body::empty()))
} else {
Err(Error::PartialLpWrite(result))
}
}
async fn query_sql(&self, req: Request<Body>) -> Result<Response<Body>> {
@ -429,6 +502,33 @@ where
}
}
/// A valid name:
/// - Starts with a letter or a number
/// - Is ASCII not UTF-8
/// - Contains only letters, numbers, underscores or hyphens
fn validate_db_name(name: &str) -> Result<()> {
let mut is_first_char = true;
for grapheme in name.graphemes(true) {
if grapheme.as_bytes().len() > 1 {
// In the case of a unicode we need to handle multibyte chars
return Err(Error::DbNameInvalidChar);
}
let char = grapheme.as_bytes()[0] as char;
if !is_first_char {
if !(char.is_ascii_alphanumeric() || char == '_' || char == '-') {
return Err(Error::DbNameInvalidChar);
}
} else {
if !char.is_ascii_alphanumeric() {
return Err(Error::DbNameInvalidStartChar);
}
is_first_char = false;
}
}
Ok(())
}
#[derive(Debug, Deserialize)]
pub(crate) struct QuerySqlParams {
pub(crate) db: String,
@ -436,9 +536,17 @@ pub(crate) struct QuerySqlParams {
pub(crate) format: Option<String>,
}
// This is a hack around the fact that bool default is false not true
const fn true_fn() -> bool {
true
}
#[derive(Debug, Deserialize)]
pub(crate) struct WriteParams {
pub(crate) db: String,
#[serde(default = "true_fn")]
pub(crate) accept_partial: bool,
#[serde(default)]
pub(crate) precision: Precision,
}
pub(crate) async fn route_request<W: WriteBuffer, Q: QueryExecutor>(

View File

@ -225,12 +225,13 @@ pub async fn wait_for_signal() {
mod tests {
use crate::serve;
use datafusion::parquet::data_type::AsBytes;
use hyper::{body, Body, Client, Request, Response};
use hyper::{body, Body, Client, Request, Response, StatusCode};
use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::SegmentId;
use iox_query::exec::{Executor, ExecutorConfig};
use object_store::DynObjectStore;
use parquet_file::storage::{ParquetStorage, StorageId};
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::net::{SocketAddr, SocketAddrV4};
use std::num::NonZeroUsize;
@ -300,7 +301,15 @@ mod tests {
tokio::spawn(async move { serve(server, frontend_shutdown).await });
let server = format!("http://{}", addr);
write_lp(&server, "foo", "cpu,host=a val=1i 123", None).await;
write_lp(
&server,
"foo",
"cpu,host=a val=1i 123",
None,
false,
"nanosecond",
)
.await;
// Test that we can query the output with a pretty output
let res = query(&server, "foo", "select * from cpu", "pretty", None).await;
@ -371,15 +380,363 @@ mod tests {
shutdown.cancel();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn write_lp_tests() {
let addr = get_free_port();
let trace_header_parser = trace_http::ctx::TraceHeaderParser::new();
let metrics = Arc::new(metric::Registry::new());
let common_state = crate::CommonServerState::new(
Arc::clone(&metrics),
None,
trace_header_parser,
addr,
None,
)
.unwrap();
let catalog = Arc::new(influxdb3_write::catalog::Catalog::new());
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::new());
let parquet_store =
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
let num_threads = NonZeroUsize::new(2).unwrap();
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
num_threads,
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: usize::MAX,
}));
let write_buffer = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&catalog),
None::<Arc<influxdb3_write::wal::WalImpl>>,
SegmentId::new(0),
)
.unwrap(),
);
let query_executor = crate::query_executor::QueryExecutorImpl::new(
catalog,
Arc::clone(&write_buffer),
Arc::clone(&exec),
Arc::clone(&metrics),
Arc::new(HashMap::new()),
10,
);
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let server = crate::Server::new(
common_state,
persister,
Arc::clone(&write_buffer),
Arc::new(query_executor),
usize::MAX,
);
let frontend_shutdown = CancellationToken::new();
let shutdown = frontend_shutdown.clone();
tokio::spawn(async move { serve(server, frontend_shutdown).await });
// Test that only one error comes back
let server = format!("http://{}", addr);
let resp = write_lp(
&server,
"foo",
"cpu,host=a val= 123\ncpu,host=b val=5 124\ncpu,host=b val= 124",
None,
false,
"nanosecond",
)
.await;
let status = resp.status();
let body =
String::from_utf8(body::to_bytes(resp.into_body()).await.unwrap().to_vec()).unwrap();
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_eq!(
body,
"{\
\"error\":\"parsing failed for write_lp endpoint\",\
\"data\":{\
\"original_line\":\"cpu,host=a val= 123\",\
\"line_number\":1,\
\"error_message\":\"No fields were provided\"\
}\
}"
);
let resp = write_lp(
&server,
"foo",
"cpu,host=b val=2 155\ncpu,host=a val= 123\ncpu,host=b val=5 199",
None,
true,
"nanosecond",
)
.await;
let status = resp.status();
let body =
String::from_utf8(body::to_bytes(resp.into_body()).await.unwrap().to_vec()).unwrap();
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_eq!(
body,
"{\
\"error\":\"partial write of line protocol ocurred\",\
\"data\":[{\
\"original_line\":\"cpu,host=a val= 123\",\
\"line_number\":2,\
\"error_message\":\"No fields were provided\"\
}]\
}"
);
// Check that the first write did not partially write any data. We
// should only see 2 values from the above write.
let res = query(&server, "foo", "select * from cpu", "csv", None).await;
let body = body::to_bytes(res.into_body()).await.unwrap();
let actual = std::str::from_utf8(body.as_bytes()).unwrap();
let expected = "host,time,val\n\
b,1970-01-01T00:00:00.000000155,2.0\n\
b,1970-01-01T00:00:00.000000199,5.0\n";
assert_eq!(actual, expected);
// Check that invalid database names are rejected
let resp = write_lp(
&server,
"this/_is_fine",
"cpu,host=b val=2 155\n",
None,
true,
"nanosecond",
)
.await;
let status = resp.status();
let body =
String::from_utf8(body::to_bytes(resp.into_body()).await.unwrap().to_vec()).unwrap();
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_eq!(
body,
"{\
\"error\":\"db name must use ASCII letters, numbers, underscores and hyphens only\",\
\"data\":null\
}"
);
let resp = write_lp(
&server,
"?this_is_fine",
"cpu,host=b val=2 155\n",
None,
true,
"nanosecond",
)
.await;
let status = resp.status();
let body =
String::from_utf8(body::to_bytes(resp.into_body()).await.unwrap().to_vec()).unwrap();
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_eq!(
body,
"{\
\"error\":\"db name did not start with a number or letter\",\
\"data\":null\
}"
);
shutdown.cancel();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn write_lp_precision_tests() {
let addr = get_free_port();
let trace_header_parser = trace_http::ctx::TraceHeaderParser::new();
let metrics = Arc::new(metric::Registry::new());
let common_state = crate::CommonServerState::new(
Arc::clone(&metrics),
None,
trace_header_parser,
addr,
None,
)
.unwrap();
let catalog = Arc::new(influxdb3_write::catalog::Catalog::new());
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::new());
let parquet_store =
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
let num_threads = NonZeroUsize::new(2).unwrap();
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
num_threads,
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
mem_pool_size: usize::MAX,
}));
let write_buffer = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&catalog),
None::<Arc<influxdb3_write::wal::WalImpl>>,
SegmentId::new(0),
)
.unwrap(),
);
let query_executor = crate::query_executor::QueryExecutorImpl::new(
catalog,
Arc::clone(&write_buffer),
Arc::clone(&exec),
Arc::clone(&metrics),
Arc::new(HashMap::new()),
10,
);
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let server = crate::Server::new(
common_state,
persister,
Arc::clone(&write_buffer),
Arc::new(query_executor),
usize::MAX,
);
let frontend_shutdown = CancellationToken::new();
let shutdown = frontend_shutdown.clone();
tokio::spawn(async move { serve(server, frontend_shutdown).await });
let server = format!("http://{}", addr);
let resp = write_lp(
&server,
"foo",
"cpu,host=b val=5 1708473600",
None,
false,
"auto",
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
let resp = write_lp(
&server,
"foo",
"cpu,host=b val=5 1708473601000",
None,
false,
"auto",
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
let resp = write_lp(
&server,
"foo",
"cpu,host=b val=5 1708473602000000",
None,
false,
"auto",
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
let resp = write_lp(
&server,
"foo",
"cpu,host=b val=5 1708473603000000000",
None,
false,
"auto",
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
let resp = write_lp(
&server,
"foo",
"cpu,host=b val=6 1708473604",
None,
false,
"second",
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
let resp = write_lp(
&server,
"foo",
"cpu,host=b val=6 1708473605000",
None,
false,
"millisecond",
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
let resp = write_lp(
&server,
"foo",
"cpu,host=b val=6 1708473606000000",
None,
false,
"microsecond",
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
let resp = write_lp(
&server,
"foo",
"cpu,host=b val=6 1708473607000000000",
None,
false,
"nanosecond",
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
let res = query(&server, "foo", "select * from cpu", "csv", None).await;
let body = body::to_bytes(res.into_body()).await.unwrap();
// Since a query can come back with data in any order we need to sort it
// here before we do any assertions
let mut unsorted = String::from_utf8(body.as_bytes().to_vec())
.unwrap()
.lines()
.skip(1)
.map(|s| s.to_string())
.collect::<Vec<String>>();
unsorted.sort();
let actual = unsorted.join("\n");
let expected = "b,2024-02-21T00:00:00,5.0\n\
b,2024-02-21T00:00:01,5.0\n\
b,2024-02-21T00:00:02,5.0\n\
b,2024-02-21T00:00:03,5.0\n\
b,2024-02-21T00:00:04,6.0\n\
b,2024-02-21T00:00:05,6.0\n\
b,2024-02-21T00:00:06,6.0\n\
b,2024-02-21T00:00:07,6.0";
assert_eq!(actual, expected);
shutdown.cancel();
}
pub(crate) async fn write_lp(
server: impl Into<String> + Send,
database: impl Into<String> + Send,
lp: impl Into<String> + Send,
authorization: Option<&str>,
accept_partial: bool,
precision: impl Into<String> + Send,
) -> Response<Body> {
let server = server.into();
let client = Client::new();
let url = format!("{}/api/v3/write_lp?db={}", server, database.into());
let url = format!(
"{}/api/v3/write_lp?db={}&accept_partial={accept_partial}&precision={}",
server,
database.into(),
precision.into(),
);
println!("{}", url);
let mut builder = Request::builder().uri(url).method("POST");

View File

@ -83,6 +83,8 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
database: NamespaceName<'static>,
lp: &str,
default_time: i64,
accept_partial: bool,
precision: Precision,
) -> write_buffer::Result<BufferedWriteRequest>;
/// Closes the open segment and returns it so that it can be persisted or thrown away. A new segment will be opened
@ -342,3 +344,51 @@ pub struct ParquetFile {
pub min_time: i64,
pub max_time: i64,
}
/// The summary data for a persisted parquet file in a segment.
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Precision {
Auto,
Second,
Millisecond,
Microsecond,
Nanosecond,
}
impl Default for Precision {
fn default() -> Self {
Self::Auto
}
}
/// Guess precision based off of a given timestamp.
// Note that this will fail in June 2128, but that's not our problem
pub(crate) fn guess_precision(timestamp: i64) -> Precision {
const NANO_SECS_PER_SEC: i64 = 1_000_000_000;
// Get the absolute value of the timestamp so we can work with negative
// numbers
let val = timestamp.abs() / NANO_SECS_PER_SEC;
if val < 5 {
// If the time sent to us is in seconds then this will be a number less than
// 5 so for example if the time in seconds is 1_708_976_567 then it will be
// 1 (due to integer truncation) and be less than 5
Precision::Second
} else if val < 5_000 {
// If however the value is milliseconds and not seconds than the same number
// for time but now in milliseconds 1_708_976_567_000 when divided will now
// be 1708 which is bigger than the previous if statement but less than this
// one and so we return milliseconds
Precision::Millisecond
} else if val < 5_000_000 {
// If we do the same thing here by going up another order of magnitude then
// 1_708_976_567_000_000 when divided will be 1708976 which is large enough
// for this if statement
Precision::Microsecond
} else {
// Anything else we can assume is large enough of a number that it must
// be nanoseconds
Precision::Nanosecond
}
}

View File

@ -455,6 +455,7 @@ mod tests {
use crate::wal::WalSegmentWriterNoopImpl;
use crate::write_buffer::tests::lp_to_table_batches;
use crate::write_buffer::{parse_validate_and_update_schema, Partitioner};
use crate::Precision;
use crate::{LpWriteOp, PersistedCatalog};
use bytes::Bytes;
use datafusion::execution::SendableRecordBatchStream;
@ -644,7 +645,15 @@ mod tests {
let mut write_batch = WriteBatch::default();
let (seq, db) = catalog.db_or_create(db_name);
let partitioner = Partitioner::new_per_day_partitioner();
let result = parse_validate_and_update_schema(lp, &db, &partitioner, 0).unwrap();
let result = parse_validate_and_update_schema(
lp,
&db,
&partitioner,
0,
false,
Precision::Nanosecond,
)
.unwrap();
if let Some(db) = result.schema {
catalog.replace_database(seq, Arc::new(db)).unwrap();
}

View File

@ -8,8 +8,8 @@ use crate::wal::WalSegmentWriterNoopImpl;
use crate::write_buffer::buffer_segment::{ClosedBufferSegment, OpenBufferSegment, TableBuffer};
use crate::write_buffer::flusher::WriteBufferFlusher;
use crate::{
BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, SegmentId, Wal,
WalOp, WriteBuffer,
BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, Precision, SegmentId,
Wal, WalOp, WriteBuffer, WriteLineError,
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
@ -36,8 +36,8 @@ use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("error parsing line {line_number}: {message}")]
ParseError { line_number: usize, message: String },
#[error("parsing for line protocol failed")]
ParseError(WriteLineError),
#[error("column type mismatch for column {name}: existing: {existing:?}, new: {new:?}")]
ColumnTypeMismatch {
@ -121,14 +121,22 @@ impl<W: Wal> WriteBufferImpl<W> {
db_name: NamespaceName<'static>,
lp: &str,
default_time: i64,
accept_partial: bool,
precision: Precision,
) -> Result<BufferedWriteRequest> {
debug!("write_lp to {} in writebuffer", db_name);
let result = self.parse_validate_and_update_schema(db_name.clone(), lp, default_time)?;
let result = self.parse_validate_and_update_schema(
db_name.clone(),
lp,
default_time,
accept_partial,
precision,
)?;
let wal_op = WalOp::LpWrite(LpWriteOp {
db_name: db_name.to_string(),
lp: lp.to_string(),
lp: result.lp_valid,
default_time,
});
@ -139,7 +147,7 @@ impl<W: Wal> WriteBufferImpl<W> {
Ok(BufferedWriteRequest {
db_name,
invalid_lines: vec![],
invalid_lines: result.errors,
line_count: result.line_count,
field_count: result.field_count,
tag_count: result.tag_count,
@ -154,6 +162,8 @@ impl<W: Wal> WriteBufferImpl<W> {
db_name: NamespaceName<'static>,
lp: &str,
default_time: i64,
accept_partial: bool,
precision: Precision,
) -> Result<ValidationResult> {
let (sequence, db) = self.catalog.db_or_create(db_name.as_str());
let mut result = parse_validate_and_update_schema(
@ -161,6 +171,8 @@ impl<W: Wal> WriteBufferImpl<W> {
&db,
&Partitioner::new_per_day_partitioner(),
default_time,
accept_partial,
precision,
)?;
if let Some(schema) = result.schema.take() {
@ -253,8 +265,11 @@ impl<W: Wal> Bufferer for WriteBufferImpl<W> {
database: NamespaceName<'static>,
lp: &str,
default_time: i64,
accept_partial: bool,
precision: Precision,
) -> Result<BufferedWriteRequest> {
self.write_lp(database, lp, default_time).await
self.write_lp(database, lp, default_time, accept_partial, precision)
.await
}
async fn close_open_segment(&self) -> crate::Result<Arc<dyn BufferSegment>> {
@ -358,17 +373,50 @@ pub(crate) fn parse_validate_and_update_schema(
schema: &DatabaseSchema,
partitioner: &Partitioner,
default_time: i64,
accept_partial: bool,
precision: Precision,
) -> Result<ValidationResult> {
let mut lines = vec![];
let mut errors = vec![];
let mut valid_lines = vec![];
let mut lp_lines = lp.lines();
for (line_idx, maybe_line) in parse_lines(lp).enumerate() {
let line = maybe_line.map_err(|e| Error::ParseError {
line_number: line_idx + 1,
message: e.to_string(),
})?;
let line = match maybe_line {
Ok(line) => line,
Err(e) => {
if !accept_partial {
return Err(Error::ParseError(WriteLineError {
// This unwrap is fine because we're moving line by line
// alongside the output from parse_lines
original_line: lp_lines.next().unwrap().to_string(),
line_number: line_idx + 1,
error_message: e.to_string(),
}));
} else {
errors.push(WriteLineError {
original_line: lp_lines.next().unwrap().to_string(),
// This unwrap is fine because we're moving line by line
// alongside the output from parse_lines
line_number: line_idx + 1,
error_message: e.to_string(),
});
}
continue;
}
};
// This unwrap is fine because we're moving line by line
// alongside the output from parse_lines
valid_lines.push(lp_lines.next().unwrap());
lines.push(line);
}
validate_or_insert_schema_and_partitions(lines, schema, partitioner, default_time)
validate_or_insert_schema_and_partitions(lines, schema, partitioner, default_time, precision)
.map(move |mut result| {
result.lp_valid = valid_lines.join("\n");
result.errors = errors;
result
})
}
/// Takes parsed lines, validates their schema. If new tables or columns are defined, they
@ -380,6 +428,7 @@ pub(crate) fn validate_or_insert_schema_and_partitions(
schema: &DatabaseSchema,
partitioner: &Partitioner,
default_time: i64,
precision: Precision,
) -> Result<ValidationResult> {
// The (potentially updated) DatabaseSchema to return to the caller.
let mut schema = Cow::Borrowed(schema);
@ -401,6 +450,7 @@ pub(crate) fn validate_or_insert_schema_and_partitions(
&mut schema,
partitioner,
default_time,
precision,
)?;
}
@ -415,6 +465,8 @@ pub(crate) fn validate_or_insert_schema_and_partitions(
line_count,
field_count,
tag_count,
errors: vec![],
lp_valid: String::new(),
})
}
@ -426,6 +478,7 @@ fn validate_and_convert_parsed_line(
schema: &mut Cow<'_, DatabaseSchema>,
partitioner: &Partitioner,
default_time: i64,
precision: Precision,
) -> Result<()> {
let table_name = line.series.measurement.as_str();
@ -512,7 +565,27 @@ fn validate_and_convert_parsed_line(
}
// set the time value
let time_value = line.timestamp.unwrap_or(default_time);
let time_value = line
.timestamp
.map(|ts| {
let multiplier = match precision {
Precision::Auto => match crate::guess_precision(ts) {
Precision::Second => 1_000_000_000,
Precision::Millisecond => 1_000_000,
Precision::Microsecond => 1_000,
Precision::Nanosecond => 1,
Precision::Auto => unreachable!(),
},
Precision::Second => 1_000_000_000,
Precision::Millisecond => 1_000_000,
Precision::Microsecond => 1_000,
Precision::Nanosecond => 1,
};
ts * multiplier
})
.unwrap_or(default_time);
values.push(Field {
name: TIME_COLUMN_NAME.to_string(),
value: FieldData::Timestamp(time_value),
@ -585,6 +658,10 @@ pub(crate) struct ValidationResult {
pub(crate) field_count: usize,
/// Number of tags passed in
pub(crate) tag_count: usize,
/// Any errors that ocurred while parsing the lines
pub(crate) errors: Vec<crate::WriteLineError>,
/// Only valid lines from what was passed in to validate
pub(crate) lp_valid: String,
}
/// Generates the partition key for a given line or row
@ -628,7 +705,15 @@ mod tests {
let db = Arc::new(DatabaseSchema::new("foo"));
let partitioner = Partitioner::new_per_day_partitioner();
let lp = "cpu,region=west user=23.2 100\nfoo f1=1i";
let result = parse_validate_and_update_schema(lp, &db, &partitioner, 0).unwrap();
let result = parse_validate_and_update_schema(
lp,
&db,
&partitioner,
0,
false,
Precision::Nanosecond,
)
.unwrap();
println!("result: {:#?}", result);
let db = result.schema.unwrap();
@ -647,7 +732,13 @@ mod tests {
WriteBufferImpl::new(catalog, Some(Arc::new(wal)), SegmentId::new(0)).unwrap();
let summary = write_buffer
.write_lp(NamespaceName::new("foo").unwrap(), "cpu bar=1 10", 123)
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=1 10",
123,
false,
Precision::Nanosecond,
)
.await
.unwrap();
assert_eq!(summary.line_count, 1);
@ -686,7 +777,15 @@ mod tests {
pub(crate) fn lp_to_table_batches(lp: &str, default_time: i64) -> HashMap<String, TableBatch> {
let db = Arc::new(DatabaseSchema::new("foo"));
let partitioner = Partitioner::new_per_day_partitioner();
let result = parse_validate_and_update_schema(lp, &db, &partitioner, default_time).unwrap();
let result = parse_validate_and_update_schema(
lp,
&db,
&partitioner,
default_time,
false,
Precision::Nanosecond,
)
.unwrap();
result.table_batches
}

View File

@ -83,7 +83,7 @@ sqlx-postgres = { version = "0.7", default-features = false, features = ["any",
sqlx-sqlite = { version = "0.7", default-features = false, features = ["any", "json", "migrate", "offline", "uuid"] }
strum = { version = "0.25", features = ["derive"] }
thrift = { version = "0.17" }
tokio = { version = "1", features = ["full", "tracing"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time", "tracing"] }
tokio-stream = { version = "0.1", features = ["fs", "net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "time"] }
tower = { version = "0.4", features = ["balance", "buffer", "filter", "limit", "timeout", "util"] }
@ -151,7 +151,7 @@ sqlx-postgres = { version = "0.7", default-features = false, features = ["any",
sqlx-sqlite = { version = "0.7", default-features = false, features = ["any", "json", "migrate", "offline", "uuid"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit-mut"] }
tokio = { version = "1", features = ["full", "tracing"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time", "tracing"] }
tokio-stream = { version = "0.1", features = ["fs", "net"] }
tracing = { version = "0.1", features = ["log", "max_level_trace", "release_max_level_trace"] }
tracing-core = { version = "0.1" }
@ -196,12 +196,12 @@ spin = { version = "0.9" }
hyper-rustls = { version = "0.24" }
spin = { version = "0.9" }
winapi = { version = "0.3", default-features = false, features = ["cfg", "consoleapi", "errhandlingapi", "evntrace", "fileapi", "handleapi", "in6addr", "inaddr", "minwinbase", "minwindef", "ntsecapi", "ntstatus", "processenv", "profileapi", "std", "sysinfoapi", "winbase", "wincon", "windef", "winerror", "winioctl", "winnt"] }
windows-sys-b21d60becc0929df = { package = "windows-sys", version = "0.52", features = ["Win32_Foundation", "Win32_NetworkManagement_IpHelper", "Win32_Networking_WinSock", "Win32_Security_Authentication_Identity", "Win32_Security_Credentials", "Win32_Security_Cryptography", "Win32_Storage_FileSystem", "Win32_System_Com", "Win32_System_Console", "Win32_System_Diagnostics_Debug", "Win32_System_Memory", "Win32_System_Threading", "Win32_UI_Input_KeyboardAndMouse", "Win32_UI_Shell"] }
windows-sys-b21d60becc0929df = { package = "windows-sys", version = "0.52", features = ["Win32_Foundation", "Win32_NetworkManagement_IpHelper", "Win32_Networking_WinSock", "Win32_Security_Authentication_Identity", "Win32_Security_Credentials", "Win32_Security_Cryptography", "Win32_Storage_FileSystem", "Win32_System_Com", "Win32_System_Console", "Win32_System_Diagnostics_Debug", "Win32_System_IO", "Win32_System_Memory", "Win32_System_Threading", "Win32_System_WindowsProgramming", "Win32_UI_Input_KeyboardAndMouse", "Win32_UI_Shell"] }
windows-sys-c8eced492e86ede7 = { package = "windows-sys", version = "0.48", features = ["Win32_Foundation", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage_FileSystem", "Win32_System_Console", "Win32_System_Diagnostics_Debug", "Win32_System_IO", "Win32_System_Pipes", "Win32_System_Registry", "Win32_System_SystemServices", "Win32_System_Threading", "Win32_System_Time", "Win32_System_WindowsProgramming", "Win32_UI_Shell"] }
[target.x86_64-pc-windows-msvc.build-dependencies]
spin = { version = "0.9" }
windows-sys-b21d60becc0929df = { package = "windows-sys", version = "0.52", features = ["Win32_Foundation", "Win32_NetworkManagement_IpHelper", "Win32_Networking_WinSock", "Win32_Security_Authentication_Identity", "Win32_Security_Credentials", "Win32_Security_Cryptography", "Win32_Storage_FileSystem", "Win32_System_Com", "Win32_System_Console", "Win32_System_Diagnostics_Debug", "Win32_System_Memory", "Win32_System_Threading", "Win32_UI_Input_KeyboardAndMouse", "Win32_UI_Shell"] }
windows-sys-b21d60becc0929df = { package = "windows-sys", version = "0.52", features = ["Win32_Foundation", "Win32_NetworkManagement_IpHelper", "Win32_Networking_WinSock", "Win32_Security_Authentication_Identity", "Win32_Security_Credentials", "Win32_Security_Cryptography", "Win32_Storage_FileSystem", "Win32_System_Com", "Win32_System_Console", "Win32_System_Diagnostics_Debug", "Win32_System_IO", "Win32_System_Memory", "Win32_System_Threading", "Win32_System_WindowsProgramming", "Win32_UI_Input_KeyboardAndMouse", "Win32_UI_Shell"] }
windows-sys-c8eced492e86ede7 = { package = "windows-sys", version = "0.48", features = ["Win32_Foundation", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage_FileSystem", "Win32_System_Console", "Win32_System_Diagnostics_Debug", "Win32_System_IO", "Win32_System_Pipes", "Win32_System_Registry", "Win32_System_SystemServices", "Win32_System_Threading", "Win32_System_Time", "Win32_System_WindowsProgramming", "Win32_UI_Shell"] }
### END HAKARI SECTION