feat: v3 write API with series key (#25066)
Introduce the experimental series key feature to monolith, along with the new `/api/v3/write` API which accepts the new line protocol to write to tables containing a series key. Series key * The series key is supported in the `schema::Schema` type by the addition of a metadata entry that stores the series key members in their correct order. Writes that are received to `v3` tables must have the same series key for every single write. Series key columns are `NOT NULL` * Nullability of columns is enforced in the core `schema` crate based on a column's membership in the series key. So, when building a `schema::Schema` using `schema::SchemaBuilder`, the arrow `Field`s that are injected into the schema will have `nullable` set to false for columns that are part of the series key, as well as the `time` column. * The `NOT NULL` _constraint_, if you can call it that, is enforced in the buffer (see [here](https://github.com/influxdata/influxdb/pull/25066/files#diff-d70ef3dece149f3742ff6e164af17f6601c5a7818e31b0e3b27c3f83dcd7f199R102-R119)) by ensuring there are no gaps in data buffered for series key columns. Series key columns are still tags * Columns in the series key are annotated as tags in the arrow schema, which for now means that they are stored as Dictionaries. This was done to avoid having to support a new column type for series key columns. New write API * This PR introduces the new write API, `/api/v3/write`, which accepts the new `v3` line protocol. Currently, the only part of the new line protocol proposed in https://github.com/influxdata/influxdb/issues/24979 that is supported is the series key. New data types are not yet supported for fields. Split write paths * To support the existing write path alongside the new write path, a new module was set up to perform validation in the `influxdb3_write` crate (`write_buffer/validator.rs`). This re-uses the existing write validation logic, and replicates it with needed changes for the new API. I refactored the validation code to use a state machine over a series of nested function calls to help distinguish the fallible validation/update steps from the infallible conversion steps. * The code in that module could potentially be refactored to reduce code duplication.pull/25074/head
parent
5c146317aa
commit
5cb7874b2c
File diff suppressed because it is too large
Load Diff
83
Cargo.toml
83
Cargo.toml
|
@ -49,21 +49,21 @@ chrono = "0.4"
|
|||
clap = { version = "4", features = ["derive", "env", "string"] }
|
||||
crc32fast = "1.2.0"
|
||||
crossbeam-channel = "0.5.11"
|
||||
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "5fac581efbaffd0e6a9edf931182517524526afd" }
|
||||
datafusion-proto = { git = "https://github.com/apache/datafusion.git", rev = "5fac581efbaffd0e6a9edf931182517524526afd" }
|
||||
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "e495f1f6bfa8e65f4058829912283ee98ab46318" }
|
||||
datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "e495f1f6bfa8e65f4058829912283ee98ab46318" }
|
||||
csv = "1.3.0"
|
||||
dotenvy = "0.15.7"
|
||||
flate2 = "1.0.27"
|
||||
futures = "0.3.28"
|
||||
futures-util = "0.3.30"
|
||||
hashbrown = "0.14.3"
|
||||
hashbrown = "0.14.5"
|
||||
hex = "0.4.3"
|
||||
http = "0.2.9"
|
||||
humantime = "2.1.0"
|
||||
hyper = "0.14"
|
||||
insta = { version = "1.39", features = ["json"] }
|
||||
libc = { version = "0.2" }
|
||||
mockito = { version = "1.2.0", default-features = false }
|
||||
mockito = { version = "1.4.0", default-features = false }
|
||||
num_cpus = "1.16.0"
|
||||
object_store = "0.9.1"
|
||||
once_cell = { version = "1.18", features = ["parking_lot"] }
|
||||
|
@ -74,9 +74,9 @@ pbjson-build = "0.6.2"
|
|||
pbjson-types = "0.6.0"
|
||||
pin-project-lite = "0.2"
|
||||
pretty_assertions = "1.4.0"
|
||||
prost = "0.12.3"
|
||||
prost-build = "0.12.2"
|
||||
prost-types = "0.12.3"
|
||||
prost = "0.12.6"
|
||||
prost-build = "0.12.6"
|
||||
prost-types = "0.12.6"
|
||||
rand = "0.8.5"
|
||||
reqwest = { version = "0.11.24", default-features = false, features = ["rustls-tls", "stream"] }
|
||||
secrecy = "0.8.0"
|
||||
|
@ -86,7 +86,7 @@ serde_urlencoded = "0.7.0"
|
|||
serde_with = "3.8.1"
|
||||
sha2 = "0.10.8"
|
||||
snap = "1.0.0"
|
||||
sqlparser = "0.41.0"
|
||||
sqlparser = "0.47.0"
|
||||
sysinfo = "0.30.8"
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1.35", features = ["full"] }
|
||||
|
@ -102,38 +102,41 @@ urlencoding = "1.1"
|
|||
uuid = { version = "1", features = ["v4"] }
|
||||
|
||||
# Core.git crates we depend on
|
||||
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94"}
|
||||
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94", features = ["http"] }
|
||||
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
ioxd_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
iox_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
iox_system_tables = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
test_helpers_end_to_end = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94" }
|
||||
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94", default-features = true, features = ["clap"] }
|
||||
# Currently influxdb is pointed at a revision from the experimental branch
|
||||
# in influxdb3_core, hiltontj/17-june-2024-iox-sync-exp, instead of main.
|
||||
# See https://github.com/influxdata/influxdb3_core/pull/23
|
||||
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c"}
|
||||
authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c", features = ["http"] }
|
||||
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
ioxd_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
iox_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
iox_query = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
iox_query_params = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
iox_query_influxql = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
iox_system_tables = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
iox_time = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
metric = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
metric_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
observability_deps = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
panic_logging = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
parquet_file = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
schema = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
service_common = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
service_grpc_flight = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
test_helpers = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
test_helpers_end_to_end = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
tokio_metrics_bridge = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c" }
|
||||
trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "ca040f2a6e5b6470e469825a8d35060d0199297c", default-features = true, features = ["clap"] }
|
||||
|
||||
[workspace.lints.rust]
|
||||
rust_2018_idioms = "deny"
|
||||
|
|
|
@ -160,7 +160,9 @@ async fn flight_influxql() {
|
|||
|
||||
let mut client = server.flight_client().await;
|
||||
|
||||
// Ad-hoc query, using qualified measurement name:
|
||||
// Ad-hoc query, using qualified measurement name
|
||||
// This is no longer supported in 3.0, see
|
||||
// https://github.com/influxdata/influxdb_iox/pull/11254
|
||||
{
|
||||
let ticket = Ticket::new(
|
||||
r#"{
|
||||
|
@ -169,21 +171,9 @@ async fn flight_influxql() {
|
|||
"query_type": "influxql"
|
||||
}"#,
|
||||
);
|
||||
let response = client.do_get(ticket).await.unwrap();
|
||||
let response = client.do_get(ticket).await.unwrap_err().to_string();
|
||||
|
||||
let batches = collect_stream(response).await;
|
||||
assert_batches_sorted_eq!(
|
||||
[
|
||||
"+------------------+--------------------------------+------+---------+-------+",
|
||||
"| iox::measurement | time | host | region | usage |",
|
||||
"+------------------+--------------------------------+------+---------+-------+",
|
||||
"| cpu | 1970-01-01T00:00:00.000000001Z | s1 | us-east | 0.9 |",
|
||||
"| cpu | 1970-01-01T00:00:00.000000002Z | s1 | us-east | 0.89 |",
|
||||
"| cpu | 1970-01-01T00:00:00.000000003Z | s1 | us-east | 0.85 |",
|
||||
"+------------------+--------------------------------+------+---------+-------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
assert_contains!(response, "database prefix in qualified measurement syntax");
|
||||
}
|
||||
|
||||
// InfluxQL-specific query to show measurements:
|
||||
|
|
|
@ -179,6 +179,18 @@ impl TestServer {
|
|||
.await
|
||||
}
|
||||
|
||||
pub async fn api_v3_query_sql(&self, params: &[(&str, &str)]) -> Response {
|
||||
self.http_client
|
||||
.get(format!(
|
||||
"{base}/api/v3/query_sql",
|
||||
base = self.client_addr()
|
||||
))
|
||||
.query(params)
|
||||
.send()
|
||||
.await
|
||||
.expect("send /api/v3/query_sql request to server")
|
||||
}
|
||||
|
||||
pub async fn api_v3_query_influxql(&self, params: &[(&str, &str)]) -> Response {
|
||||
self.http_client
|
||||
.get(format!(
|
||||
|
|
|
@ -1,9 +1,118 @@
|
|||
use hyper::StatusCode;
|
||||
use influxdb3_client::Precision;
|
||||
use pretty_assertions::assert_eq;
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
use crate::TestServer;
|
||||
|
||||
#[tokio::test]
|
||||
async fn api_v3_write() {
|
||||
let server = TestServer::spawn().await;
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let url = format!("{base}/api/v3/write", base = server.client_addr());
|
||||
let params = &[("db", "foo")];
|
||||
|
||||
// Make a successful write:
|
||||
assert!(client
|
||||
.post(&url)
|
||||
.query(params)
|
||||
.body(
|
||||
"\
|
||||
cpu region/us-east/host/a1 usage=42.0,temp=10 1234\n\
|
||||
cpu region/us-east/host/b1 usage=10.5,temp=18 1234\n\
|
||||
cpu region/us-west/host/a2 usage=88.0,temp=15 1234\n\
|
||||
cpu region/us-west/host/b2 usage=92.2,temp=14 1234\n\
|
||||
",
|
||||
)
|
||||
.send()
|
||||
.await
|
||||
.expect("send write request")
|
||||
.status()
|
||||
.is_success());
|
||||
|
||||
// Query from the table written to:
|
||||
let resp = server
|
||||
.api_v3_query_sql(&[
|
||||
("db", "foo"),
|
||||
("q", "SELECT * FROM cpu"),
|
||||
("format", "pretty"),
|
||||
])
|
||||
.await
|
||||
.text()
|
||||
.await
|
||||
.expect("get body");
|
||||
|
||||
assert_eq!(
|
||||
"\
|
||||
+------+---------+------+---------------------+-------+\n\
|
||||
| host | region | temp | time | usage |\n\
|
||||
+------+---------+------+---------------------+-------+\n\
|
||||
| a1 | us-east | 10.0 | 1970-01-01T00:20:34 | 42.0 |\n\
|
||||
| a2 | us-west | 15.0 | 1970-01-01T00:20:34 | 88.0 |\n\
|
||||
| b1 | us-east | 18.0 | 1970-01-01T00:20:34 | 10.5 |\n\
|
||||
| b2 | us-west | 14.0 | 1970-01-01T00:20:34 | 92.2 |\n\
|
||||
+------+---------+------+---------------------+-------+",
|
||||
resp
|
||||
);
|
||||
|
||||
// Test several failure modes:
|
||||
struct TestCase {
|
||||
body: &'static str,
|
||||
response_contains: &'static str,
|
||||
}
|
||||
|
||||
let test_cases = [
|
||||
// No series key:
|
||||
TestCase {
|
||||
body: "cpu usage=10.0,temp=5 1235",
|
||||
response_contains:
|
||||
"write to table cpu was missing a series key, the series key contains [region, host]",
|
||||
},
|
||||
// Series key out-of-order:
|
||||
TestCase {
|
||||
body: "cpu host/c1/region/ca-cent usage=22.0,temp=6 1236",
|
||||
response_contains: "write to table cpu had the incorrect series key, \
|
||||
expected: [region, host], received: [host, region]",
|
||||
},
|
||||
// Series key with invalid member at end:
|
||||
TestCase {
|
||||
body: "cpu region/ca-cent/host/c1/container/foo usage=22.0,temp=6 1236",
|
||||
response_contains: "write to table cpu had the incorrect series key, \
|
||||
expected: [region, host], received: [region, host, container]",
|
||||
},
|
||||
// Series key with invalid member in middle:
|
||||
TestCase {
|
||||
body: "cpu region/ca-cent/sub-region/toronto/host/c1 usage=22.0,temp=6 1236",
|
||||
response_contains: "write to table cpu had the incorrect series key, \
|
||||
expected: [region, host], received: [region, sub-region, host]",
|
||||
},
|
||||
// Series key with invalid member at start:
|
||||
TestCase {
|
||||
body: "cpu planet/earth/region/ca-cent/host/c1 usage=22.0,temp=6 1236",
|
||||
response_contains: "write to table cpu had the incorrect series key, \
|
||||
expected: [region, host], received: [planet, region, host]",
|
||||
},
|
||||
];
|
||||
|
||||
for t in test_cases {
|
||||
let resp = client
|
||||
.post(&url)
|
||||
.query(params)
|
||||
.body(t.body)
|
||||
.send()
|
||||
.await
|
||||
.expect("get response from server")
|
||||
.text()
|
||||
.await
|
||||
.expect("parse response");
|
||||
|
||||
println!("RESPONSE:\n{resp}");
|
||||
|
||||
assert_contains!(resp, t.response_contains);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn api_v1_write_request_parsing() {
|
||||
let server = TestServer::spawn().await;
|
||||
|
|
|
@ -328,7 +328,13 @@ where
|
|||
async fn write_lp(&self, req: Request<Body>) -> Result<Response<Body>> {
|
||||
let query = req.uri().query().ok_or(Error::MissingWriteParams)?;
|
||||
let params: WriteParams = serde_urlencoded::from_str(query)?;
|
||||
self.write_lp_inner(params, req, false).await
|
||||
self.write_lp_inner(params, req, false, false).await
|
||||
}
|
||||
|
||||
async fn write_v3(&self, req: Request<Body>) -> Result<Response<Body>> {
|
||||
let query = req.uri().query().ok_or(Error::MissingWriteParams)?;
|
||||
let params: WriteParams = serde_urlencoded::from_str(query)?;
|
||||
self.write_lp_inner(params, req, false, true).await
|
||||
}
|
||||
|
||||
async fn write_lp_inner(
|
||||
|
@ -336,6 +342,7 @@ where
|
|||
params: WriteParams,
|
||||
req: Request<Body>,
|
||||
accept_rp: bool,
|
||||
use_v3: bool,
|
||||
) -> Result<Response<Body>> {
|
||||
validate_db_name(¶ms.db, accept_rp)?;
|
||||
info!("write_lp to {}", params.db);
|
||||
|
@ -347,16 +354,27 @@ where
|
|||
|
||||
let default_time = self.time_provider.now();
|
||||
|
||||
let result = self
|
||||
.write_buffer
|
||||
.write_lp(
|
||||
database,
|
||||
body,
|
||||
default_time,
|
||||
params.accept_partial,
|
||||
params.precision,
|
||||
)
|
||||
.await?;
|
||||
let result = if use_v3 {
|
||||
self.write_buffer
|
||||
.write_lp_v3(
|
||||
database,
|
||||
body,
|
||||
default_time,
|
||||
params.accept_partial,
|
||||
params.precision,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
self.write_buffer
|
||||
.write_lp(
|
||||
database,
|
||||
body,
|
||||
default_time,
|
||||
params.accept_partial,
|
||||
params.precision,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
if result.invalid_lines.is_empty() {
|
||||
Ok(Response::new(Body::empty()))
|
||||
|
@ -928,7 +946,7 @@ where
|
|||
Err(e) => return Ok(legacy_write_error_to_response(e)),
|
||||
};
|
||||
|
||||
http_server.write_lp_inner(params, req, true).await
|
||||
http_server.write_lp_inner(params, req, true, false).await
|
||||
}
|
||||
(Method::POST, "/api/v2/write") => {
|
||||
let params = match http_server.legacy_write_param_unifier.parse_v2(&req).await {
|
||||
|
@ -936,8 +954,9 @@ where
|
|||
Err(e) => return Ok(legacy_write_error_to_response(e)),
|
||||
};
|
||||
|
||||
http_server.write_lp_inner(params, req, false).await
|
||||
http_server.write_lp_inner(params, req, false, false).await
|
||||
}
|
||||
(Method::POST, "/api/v3/write") => http_server.write_v3(req).await,
|
||||
(Method::POST, "/api/v3/write_lp") => http_server.write_lp(req).await,
|
||||
(Method::GET | Method::POST, "/api/v3/query_sql") => http_server.query_sql(req).await,
|
||||
(Method::GET | Method::POST, "/api/v3/query_influxql") => {
|
||||
|
|
|
@ -251,6 +251,7 @@ impl TableDefinition {
|
|||
pub(crate) fn new<N: Into<String>, CN: AsRef<str>>(
|
||||
name: N,
|
||||
columns: impl AsRef<[(CN, InfluxColumnType)]>,
|
||||
series_key: Option<impl IntoIterator<Item: AsRef<str>>>,
|
||||
) -> Self {
|
||||
// Use a BTree to ensure that the columns are ordered:
|
||||
let mut ordered_columns = BTreeMap::new();
|
||||
|
@ -259,9 +260,10 @@ impl TableDefinition {
|
|||
}
|
||||
let mut schema_builder = SchemaBuilder::with_capacity(columns.as_ref().len());
|
||||
let name = name.into();
|
||||
// TODO: may need to capture some schema-level metadata, currently, this causes trouble in
|
||||
// tests, so I am omitting this for now:
|
||||
// schema_builder.measurement(&name);
|
||||
schema_builder.measurement(&name);
|
||||
if let Some(sk) = series_key {
|
||||
schema_builder.with_series_key(sk);
|
||||
}
|
||||
for (name, column_type) in ordered_columns {
|
||||
schema_builder.influx_column(name, *column_type);
|
||||
}
|
||||
|
@ -310,7 +312,6 @@ impl TableDefinition {
|
|||
.collect()
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn schema(&self) -> &Schema {
|
||||
&self.schema
|
||||
}
|
||||
|
@ -318,6 +319,14 @@ impl TableDefinition {
|
|||
pub(crate) fn num_columns(&self) -> usize {
|
||||
self.schema.len()
|
||||
}
|
||||
|
||||
pub(crate) fn field_type_by_name(&self, name: &str) -> Option<InfluxColumnType> {
|
||||
self.schema.field_type_by_name(name)
|
||||
}
|
||||
|
||||
pub(crate) fn is_v3(&self) -> bool {
|
||||
self.schema.series_key().is_some()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnType {
|
||||
|
@ -333,10 +342,13 @@ pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnT
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use insta::assert_json_snapshot;
|
||||
use pretty_assertions::assert_eq;
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
use super::*;
|
||||
|
||||
type SeriesKey = Option<Vec<String>>;
|
||||
|
||||
#[test]
|
||||
fn catalog_serialization() {
|
||||
let catalog = Catalog::new();
|
||||
|
@ -361,6 +373,7 @@ mod tests {
|
|||
("u64_field", Field(UInteger)),
|
||||
("f64_field", Field(Float)),
|
||||
],
|
||||
SeriesKey::None,
|
||||
),
|
||||
);
|
||||
database.tables.insert(
|
||||
|
@ -378,6 +391,7 @@ mod tests {
|
|||
("u64_field", Field(UInteger)),
|
||||
("f64_field", Field(Float)),
|
||||
],
|
||||
SeriesKey::None,
|
||||
),
|
||||
);
|
||||
let database = Arc::new(database);
|
||||
|
@ -482,6 +496,7 @@ mod tests {
|
|||
"test".to_string(),
|
||||
InfluxColumnType::Field(InfluxFieldType::String),
|
||||
)],
|
||||
SeriesKey::None,
|
||||
),
|
||||
);
|
||||
|
||||
|
|
|
@ -201,9 +201,7 @@ impl<'a> From<TableSnapshot<'a>> for TableDefinition {
|
|||
fn from(snap: TableSnapshot<'a>) -> Self {
|
||||
let name = snap.name.to_owned();
|
||||
let mut b = SchemaBuilder::new();
|
||||
// TODO: may need to capture some schema-level metadata, currently, this causes trouble in
|
||||
// tests, so I am omitting this for now:
|
||||
// b.measurement(&name);
|
||||
b.measurement(&name);
|
||||
for (name, col) in snap.cols {
|
||||
match col.influx_type {
|
||||
InfluxType::Tag => {
|
||||
|
|
|
@ -84,6 +84,16 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
|
|||
precision: Precision,
|
||||
) -> write_buffer::Result<BufferedWriteRequest>;
|
||||
|
||||
/// Write v3 line protocol
|
||||
async fn write_lp_v3(
|
||||
&self,
|
||||
database: NamespaceName<'static>,
|
||||
lp: &str,
|
||||
ingest_time: Time,
|
||||
accept_partial: bool,
|
||||
precision: Precision,
|
||||
) -> write_buffer::Result<BufferedWriteRequest>;
|
||||
|
||||
/// Returns the configured WAL, if there is one.
|
||||
fn wal(&self) -> Option<Arc<impl Wal>>;
|
||||
|
||||
|
@ -453,7 +463,7 @@ pub struct BufferedWriteRequest {
|
|||
pub invalid_lines: Vec<WriteLineError>,
|
||||
pub line_count: usize,
|
||||
pub field_count: usize,
|
||||
pub tag_count: usize,
|
||||
pub index_count: usize,
|
||||
}
|
||||
|
||||
/// A persisted Catalog that contains the database, table, and column schemas.
|
||||
|
@ -583,35 +593,30 @@ pub(crate) fn guess_precision(timestamp: i64) -> Precision {
|
|||
mod test_helpers {
|
||||
use crate::catalog::Catalog;
|
||||
use crate::write_buffer::buffer_segment::WriteBatch;
|
||||
use crate::write_buffer::{parse_validate_and_update_schema, TableBatch};
|
||||
use crate::{Precision, SegmentDuration, SequenceNumber};
|
||||
use crate::write_buffer::validator::WriteValidator;
|
||||
use crate::write_buffer::TableBatch;
|
||||
use crate::{Precision, SegmentDuration};
|
||||
use data_types::NamespaceName;
|
||||
use iox_time::Time;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub(crate) fn lp_to_write_batch(
|
||||
catalog: &Catalog,
|
||||
catalog: Arc<Catalog>,
|
||||
db_name: &'static str,
|
||||
lp: &str,
|
||||
) -> WriteBatch {
|
||||
let db_name = NamespaceName::new(db_name).unwrap();
|
||||
let mut write_batch = WriteBatch::default();
|
||||
let (seq, db) = catalog.db_or_create(db_name.as_str()).unwrap();
|
||||
let mut result = parse_validate_and_update_schema(
|
||||
lp,
|
||||
&db,
|
||||
db_name.clone(),
|
||||
Time::from_timestamp_nanos(0),
|
||||
SegmentDuration::new_5m(),
|
||||
false,
|
||||
Precision::Nanosecond,
|
||||
seq,
|
||||
)
|
||||
.unwrap();
|
||||
if let Some(db) = result.schema {
|
||||
catalog.replace_database(seq, Arc::new(db)).unwrap();
|
||||
}
|
||||
let mut result = WriteValidator::initialize(db_name.clone(), catalog)
|
||||
.unwrap()
|
||||
.v1_parse_lines_and_update_schema(lp, false)
|
||||
.unwrap()
|
||||
.convert_lines_to_buffer(
|
||||
Time::from_timestamp_nanos(0),
|
||||
SegmentDuration::new_5m(),
|
||||
Precision::Nanosecond,
|
||||
);
|
||||
|
||||
write_batch.add_db_write(
|
||||
db_name,
|
||||
|
@ -621,28 +626,21 @@ mod test_helpers {
|
|||
}
|
||||
|
||||
pub(crate) fn lp_to_table_batches(
|
||||
catalog: &Catalog,
|
||||
catalog: Arc<Catalog>,
|
||||
db_name: &str,
|
||||
lp: &str,
|
||||
default_time: i64,
|
||||
) -> HashMap<String, TableBatch> {
|
||||
let (seq, db) = catalog.db_or_create(db_name).unwrap();
|
||||
let db_name = NamespaceName::new(db_name.to_string()).unwrap();
|
||||
let mut result = parse_validate_and_update_schema(
|
||||
lp,
|
||||
&db,
|
||||
db_name,
|
||||
Time::from_timestamp_nanos(default_time),
|
||||
SegmentDuration::new_5m(),
|
||||
false,
|
||||
Precision::Nanosecond,
|
||||
SequenceNumber::new(0),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
if let Some(db) = result.schema {
|
||||
catalog.replace_database(seq, Arc::new(db)).unwrap();
|
||||
}
|
||||
let mut result = WriteValidator::initialize(db_name, catalog)
|
||||
.unwrap()
|
||||
.v1_parse_lines_and_update_schema(lp, false)
|
||||
.unwrap()
|
||||
.convert_lines_to_buffer(
|
||||
Time::from_timestamp_nanos(default_time),
|
||||
SegmentDuration::new_5m(),
|
||||
Precision::Nanosecond,
|
||||
);
|
||||
|
||||
result.valid_segmented_data.pop().unwrap().table_batches
|
||||
}
|
||||
|
|
|
@ -6,11 +6,9 @@ use crate::catalog::Catalog;
|
|||
use crate::chunk::BufferChunk;
|
||||
use crate::paths::ParquetFilePath;
|
||||
use crate::write_buffer::flusher::BufferedWriteResult;
|
||||
use crate::write_buffer::table_buffer::{Builder, Result as TableBufferResult, TableBuffer};
|
||||
use crate::write_buffer::table_buffer::{Result as TableBufferResult, TableBuffer};
|
||||
use crate::write_buffer::DatabaseSchema;
|
||||
use crate::write_buffer::{
|
||||
parse_validate_and_update_catalog, Error, TableBatch, ValidSegmentedData,
|
||||
};
|
||||
use crate::write_buffer::{Error, TableBatch, ValidSegmentedData};
|
||||
use crate::{
|
||||
wal, write_buffer, write_buffer::Result, DatabaseTables, ParquetFile, PersistedSegment,
|
||||
Persister, SegmentDuration, SegmentId, SegmentRange, SequenceNumber, TableParquetFiles, WalOp,
|
||||
|
@ -25,7 +23,7 @@ use data_types::TransitionPartitionId;
|
|||
use data_types::{NamespaceName, PartitionKey};
|
||||
use datafusion::logical_expr::Expr;
|
||||
use datafusion_util::stream_from_batches;
|
||||
use iox_query::chunk_statistics::create_chunk_statistics;
|
||||
use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges};
|
||||
use iox_query::frontend::reorg::ReorgPlanner;
|
||||
use iox_query::QueryChunk;
|
||||
use iox_time::Time;
|
||||
|
@ -35,6 +33,8 @@ use std::sync::Arc;
|
|||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use super::validator::WriteValidator;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct OpenBufferSegment {
|
||||
segment_writer: Box<dyn WalSegmentWriter>,
|
||||
|
@ -191,15 +191,14 @@ pub(crate) fn load_buffer_from_segment(
|
|||
for wal_op in batch.ops {
|
||||
match wal_op {
|
||||
WalOp::LpWrite(write) => {
|
||||
let mut validated_write = parse_validate_and_update_catalog(
|
||||
NamespaceName::new(write.db_name.clone())?,
|
||||
&write.lp,
|
||||
catalog,
|
||||
Time::from_timestamp_nanos(write.default_time),
|
||||
segment_duration,
|
||||
false,
|
||||
write.precision,
|
||||
)?;
|
||||
let ns = NamespaceName::new(write.db_name.clone())?;
|
||||
let mut validated_write = WriteValidator::initialize(ns, Arc::clone(catalog))?
|
||||
.v1_parse_lines_and_update_schema(&write.lp, false)?
|
||||
.convert_lines_to_buffer(
|
||||
Time::from_timestamp_nanos(write.default_time),
|
||||
segment_duration,
|
||||
write.precision,
|
||||
);
|
||||
|
||||
let db_name = &write.db_name;
|
||||
if !buffered_data.database_buffers.contains_key(db_name) {
|
||||
|
@ -399,7 +398,7 @@ impl ClosedBufferSegment {
|
|||
&self,
|
||||
persister: Arc<P>,
|
||||
executor: Arc<iox_query::exec::Executor>,
|
||||
sort_key: Option<SortKey>,
|
||||
mut sort_key: Option<SortKey>,
|
||||
) -> Result<PersistedSegment>
|
||||
where
|
||||
P: Persister,
|
||||
|
@ -447,7 +446,7 @@ impl ClosedBufferSegment {
|
|||
Some(row_count),
|
||||
schema,
|
||||
Some(time_min_max),
|
||||
None,
|
||||
&NoColumnRanges,
|
||||
);
|
||||
|
||||
chunks.push(Arc::new(BufferChunk {
|
||||
|
@ -470,21 +469,16 @@ impl ClosedBufferSegment {
|
|||
|
||||
let ctx = executor.new_context();
|
||||
|
||||
let sort_key = match sort_key.as_ref() {
|
||||
Some(key) => key.clone(),
|
||||
// Default to using tags sorted in lexographical
|
||||
// order as the sort key
|
||||
None => {
|
||||
let mut tags = table_buffer
|
||||
.data
|
||||
.iter()
|
||||
.filter(|(_, v)| matches!(v, Builder::Tag(_)))
|
||||
.map(|(k, _)| k)
|
||||
.cloned()
|
||||
.collect::<Vec<String>>();
|
||||
tags.sort();
|
||||
SortKey::from(tags)
|
||||
}
|
||||
let sort_key = if let Some(key) = sort_key.take() {
|
||||
key
|
||||
} else {
|
||||
SortKey::from(
|
||||
schema
|
||||
.primary_key()
|
||||
.into_iter()
|
||||
.map(|s| s.to_string())
|
||||
.collect::<Vec<String>>(),
|
||||
)
|
||||
};
|
||||
|
||||
let logical_plan = ReorgPlanner::new()
|
||||
|
@ -595,7 +589,7 @@ pub(crate) mod tests {
|
|||
let db_name: NamespaceName<'static> = NamespaceName::new("db1").unwrap();
|
||||
|
||||
let batches = lp_to_table_batches(
|
||||
&catalog,
|
||||
Arc::clone(&catalog),
|
||||
"db1",
|
||||
"cpu,tag1=cupcakes bar=1 10\nmem,tag2=snakes bar=2 20",
|
||||
10,
|
||||
|
@ -604,7 +598,12 @@ pub(crate) mod tests {
|
|||
write_batch.add_db_write(db_name.clone(), batches);
|
||||
open_segment.buffer_writes(write_batch).unwrap();
|
||||
|
||||
let batches = lp_to_table_batches(&catalog, "db1", "cpu,tag1=cupcakes bar=2 30", 10);
|
||||
let batches = lp_to_table_batches(
|
||||
Arc::clone(&catalog),
|
||||
"db1",
|
||||
"cpu,tag1=cupcakes bar=2 30",
|
||||
10,
|
||||
);
|
||||
let mut write_batch = WriteBatch::default();
|
||||
write_batch.add_db_write(db_name.clone(), batches);
|
||||
open_segment.buffer_writes(write_batch).unwrap();
|
||||
|
@ -663,23 +662,33 @@ pub(crate) mod tests {
|
|||
|
||||
let db_name: NamespaceName<'static> = NamespaceName::new("db1").unwrap();
|
||||
|
||||
let batches = lp_to_table_batches(&catalog, "db1", "cpu,tag1=cupcakes bar=1 10", 10);
|
||||
let mut write_batch = WriteBatch::default();
|
||||
write_batch.add_db_write(db_name.clone(), batches);
|
||||
open_segment.buffer_writes(write_batch).unwrap();
|
||||
|
||||
let batches = lp_to_table_batches(&catalog, "db1", "cpu,tag2=asdf bar=2 30", 10);
|
||||
let mut write_batch = WriteBatch::default();
|
||||
write_batch.add_db_write(db_name.clone(), batches);
|
||||
open_segment.buffer_writes(write_batch).unwrap();
|
||||
|
||||
let batches = lp_to_table_batches(&catalog, "db1", "cpu bar=2,ival=7i 30", 10);
|
||||
let batches = lp_to_table_batches(
|
||||
Arc::clone(&catalog),
|
||||
"db1",
|
||||
"cpu,tag1=cupcakes bar=1 10",
|
||||
10,
|
||||
);
|
||||
let mut write_batch = WriteBatch::default();
|
||||
write_batch.add_db_write(db_name.clone(), batches);
|
||||
open_segment.buffer_writes(write_batch).unwrap();
|
||||
|
||||
let batches =
|
||||
lp_to_table_batches(&catalog, "db1", "cpu bar=2,ival=9i 40\ncpu fval=2.1 40", 10);
|
||||
lp_to_table_batches(Arc::clone(&catalog), "db1", "cpu,tag2=asdf bar=2 30", 10);
|
||||
let mut write_batch = WriteBatch::default();
|
||||
write_batch.add_db_write(db_name.clone(), batches);
|
||||
open_segment.buffer_writes(write_batch).unwrap();
|
||||
|
||||
let batches = lp_to_table_batches(Arc::clone(&catalog), "db1", "cpu bar=2,ival=7i 30", 10);
|
||||
let mut write_batch = WriteBatch::default();
|
||||
write_batch.add_db_write(db_name.clone(), batches);
|
||||
open_segment.buffer_writes(write_batch).unwrap();
|
||||
|
||||
let batches = lp_to_table_batches(
|
||||
Arc::clone(&catalog),
|
||||
"db1",
|
||||
"cpu bar=2,ival=9i 40\ncpu fval=2.1 40",
|
||||
10,
|
||||
);
|
||||
let mut write_batch = WriteBatch::default();
|
||||
write_batch.add_db_write(db_name.clone(), batches);
|
||||
open_segment.buffer_writes(write_batch).unwrap();
|
||||
|
@ -747,7 +756,7 @@ pub(crate) mod tests {
|
|||
precision: crate::Precision::Nanosecond,
|
||||
});
|
||||
|
||||
let write_batch = lp_to_write_batch(&catalog, "db1", lp);
|
||||
let write_batch = lp_to_write_batch(Arc::clone(&catalog), "db1", lp);
|
||||
|
||||
open_segment.write_wal_ops(vec![wal_op]).unwrap();
|
||||
open_segment.buffer_writes(write_batch).unwrap();
|
||||
|
@ -883,7 +892,12 @@ pub(crate) mod tests {
|
|||
|
||||
let db_name: NamespaceName<'static> = NamespaceName::new("db1").unwrap();
|
||||
|
||||
let batches = lp_to_table_batches(&catalog, "db1", "cpu,tag1=cupcakes bar=1 10", 10);
|
||||
let batches = lp_to_table_batches(
|
||||
Arc::clone(&catalog),
|
||||
"db1",
|
||||
"cpu,tag1=cupcakes bar=1 10",
|
||||
10,
|
||||
);
|
||||
let mut write_batch = WriteBatch::default();
|
||||
write_batch.add_db_write(db_name.clone(), batches);
|
||||
segment.buffer_writes(write_batch).unwrap();
|
||||
|
|
|
@ -221,7 +221,7 @@ mod tests {
|
|||
use crate::catalog::Catalog;
|
||||
use crate::wal::{WalImpl, WalSegmentWriterNoopImpl};
|
||||
use crate::write_buffer::buffer_segment::OpenBufferSegment;
|
||||
use crate::write_buffer::parse_validate_and_update_catalog;
|
||||
use crate::write_buffer::validator::WriteValidator;
|
||||
use crate::{Precision, SegmentDuration, SegmentId, SegmentRange, SequenceNumber};
|
||||
use data_types::NamespaceName;
|
||||
use iox_time::MockProvider;
|
||||
|
@ -265,32 +265,31 @@ mod tests {
|
|||
|
||||
let db_name = NamespaceName::new("db1").unwrap();
|
||||
let ingest_time = Time::from_timestamp_nanos(0);
|
||||
let res = parse_validate_and_update_catalog(
|
||||
db_name.clone(),
|
||||
"cpu bar=1 10",
|
||||
&catalog,
|
||||
ingest_time,
|
||||
SegmentDuration::new_5m(),
|
||||
false,
|
||||
Precision::Nanosecond,
|
||||
)
|
||||
.unwrap();
|
||||
let res = WriteValidator::initialize(db_name.clone(), Arc::clone(&catalog))
|
||||
.unwrap()
|
||||
.v1_parse_lines_and_update_schema("cpu bar=1 10", false)
|
||||
.unwrap()
|
||||
.convert_lines_to_buffer(
|
||||
ingest_time,
|
||||
SegmentDuration::new_5m(),
|
||||
Precision::Nanosecond,
|
||||
);
|
||||
|
||||
flusher
|
||||
.write_to_open_segment(res.valid_segmented_data)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let res = parse_validate_and_update_catalog(
|
||||
db_name.clone(),
|
||||
"cpu bar=1 20",
|
||||
&catalog,
|
||||
ingest_time,
|
||||
SegmentDuration::new_5m(),
|
||||
false,
|
||||
Precision::Nanosecond,
|
||||
)
|
||||
.unwrap();
|
||||
let res = WriteValidator::initialize(db_name.clone(), Arc::clone(&catalog))
|
||||
.unwrap()
|
||||
.v1_parse_lines_and_update_schema("cpu bar=1 20", false)
|
||||
.unwrap()
|
||||
.convert_lines_to_buffer(
|
||||
ingest_time,
|
||||
SegmentDuration::new_5m(),
|
||||
Precision::Nanosecond,
|
||||
);
|
||||
|
||||
flusher
|
||||
.write_to_open_segment(res.valid_segmented_data)
|
||||
.await
|
||||
|
|
|
@ -184,7 +184,7 @@ mod tests {
|
|||
precision: Precision::Nanosecond,
|
||||
});
|
||||
|
||||
let write_batch = lp_to_write_batch(&catalog, "db1", lp);
|
||||
let write_batch = lp_to_write_batch(Arc::clone(&catalog), "db1", lp);
|
||||
|
||||
open_segment.write_wal_ops(vec![wal_op]).unwrap();
|
||||
open_segment.buffer_writes(write_batch).unwrap();
|
||||
|
@ -272,7 +272,7 @@ mod tests {
|
|||
precision: Precision::Nanosecond,
|
||||
});
|
||||
|
||||
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
|
||||
let write_batch = lp_to_write_batch(Arc::clone(&catalog), db_name, lp);
|
||||
|
||||
current_segment.write_wal_ops(vec![wal_op.clone()]).unwrap();
|
||||
current_segment.buffer_writes(write_batch).unwrap();
|
||||
|
@ -358,7 +358,7 @@ mod tests {
|
|||
precision: Precision::Nanosecond,
|
||||
});
|
||||
|
||||
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
|
||||
let write_batch = lp_to_write_batch(Arc::clone(&catalog), db_name, lp);
|
||||
|
||||
current_segment.write_wal_ops(vec![wal_op]).unwrap();
|
||||
current_segment.buffer_writes(write_batch).unwrap();
|
||||
|
@ -386,7 +386,7 @@ mod tests {
|
|||
precision: Precision::Nanosecond,
|
||||
});
|
||||
|
||||
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
|
||||
let write_batch = lp_to_write_batch(Arc::clone(&catalog), db_name, lp);
|
||||
|
||||
let segment_writer = wal
|
||||
.new_segment_writer(next_segment_id, next_segment_range)
|
||||
|
@ -423,7 +423,7 @@ mod tests {
|
|||
PersistedSegment {
|
||||
segment_id,
|
||||
segment_wal_size_bytes: 252,
|
||||
segment_parquet_size_bytes: 3458,
|
||||
segment_parquet_size_bytes: 3650,
|
||||
segment_row_count: 3,
|
||||
segment_min_time: 10,
|
||||
segment_max_time: 20,
|
||||
|
@ -438,7 +438,7 @@ mod tests {
|
|||
parquet_files: vec![ParquetFile {
|
||||
path: "dbs/db1/cpu/1970-01-01T00-00/4294967294.parquet"
|
||||
.to_string(),
|
||||
size_bytes: 1721,
|
||||
size_bytes: 1817,
|
||||
row_count: 1,
|
||||
min_time: 10,
|
||||
max_time: 10,
|
||||
|
@ -453,7 +453,7 @@ mod tests {
|
|||
parquet_files: vec![ParquetFile {
|
||||
path: "dbs/db1/mem/1970-01-01T00-00/4294967294.parquet"
|
||||
.to_string(),
|
||||
size_bytes: 1737,
|
||||
size_bytes: 1833,
|
||||
row_count: 2,
|
||||
min_time: 15,
|
||||
max_time: 20,
|
||||
|
@ -534,7 +534,7 @@ mod tests {
|
|||
precision: Precision::Nanosecond,
|
||||
});
|
||||
|
||||
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
|
||||
let write_batch = lp_to_write_batch(Arc::clone(&catalog), db_name, lp);
|
||||
|
||||
current_segment.write_wal_ops(vec![wal_op]).unwrap();
|
||||
current_segment.buffer_writes(write_batch).unwrap();
|
||||
|
@ -555,7 +555,7 @@ mod tests {
|
|||
precision: Precision::Nanosecond,
|
||||
});
|
||||
|
||||
let write_batch = lp_to_write_batch(&catalog, db_name, lp);
|
||||
let write_batch = lp_to_write_batch(Arc::clone(&catalog), db_name, lp);
|
||||
|
||||
let segment_writer = wal
|
||||
.new_segment_writer(next_segment_id, next_segment_range)
|
||||
|
|
|
@ -5,30 +5,29 @@ mod flusher;
|
|||
mod loader;
|
||||
mod segment_state;
|
||||
mod table_buffer;
|
||||
pub(crate) mod validator;
|
||||
|
||||
use crate::cache::ParquetCache;
|
||||
use crate::catalog::{
|
||||
influx_column_type_from_field_value, Catalog, DatabaseSchema, TableDefinition, TIME_COLUMN_NAME,
|
||||
};
|
||||
use crate::catalog::{Catalog, DatabaseSchema};
|
||||
use crate::chunk::ParquetChunk;
|
||||
use crate::persister::PersisterImpl;
|
||||
use crate::write_buffer::flusher::WriteBufferFlusher;
|
||||
use crate::write_buffer::loader::load_starting_state;
|
||||
use crate::write_buffer::segment_state::{run_buffer_segment_persist_and_cleanup, SegmentState};
|
||||
use crate::write_buffer::validator::WriteValidator;
|
||||
use crate::{
|
||||
BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, Persister, Precision,
|
||||
SegmentDuration, SequenceNumber, Wal, WalOp, WriteBuffer, WriteLineError,
|
||||
BufferedWriteRequest, Bufferer, ChunkContainer, Persister, Precision, SegmentDuration,
|
||||
SequenceNumber, Wal, WalOp, WriteBuffer, WriteLineError,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
column_type_from_field, ChunkId, ChunkOrder, ColumnType, NamespaceName, NamespaceNameError,
|
||||
};
|
||||
use data_types::{ChunkId, ChunkOrder, ColumnType, NamespaceName, NamespaceNameError};
|
||||
use datafusion::common::DataFusionError;
|
||||
use datafusion::execution::context::SessionState;
|
||||
use datafusion::logical_expr::Expr;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine};
|
||||
use iox_query::chunk_statistics::create_chunk_statistics;
|
||||
use influxdb_line_protocol::v3::SeriesValue;
|
||||
use influxdb_line_protocol::FieldValue;
|
||||
use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges};
|
||||
use iox_query::QueryChunk;
|
||||
use iox_time::{Time, TimeProvider};
|
||||
use object_store::path::Path as ObjPath;
|
||||
|
@ -36,12 +35,8 @@ use object_store::ObjectMeta;
|
|||
use observability_deps::tracing::{debug, error};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use parquet_file::storage::ParquetExecInput;
|
||||
use schema::InfluxColumnType;
|
||||
use sha2::Digest;
|
||||
use sha2::Sha256;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::watch;
|
||||
|
||||
|
@ -179,15 +174,9 @@ impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
|
|||
) -> Result<BufferedWriteRequest> {
|
||||
debug!("write_lp to {} in writebuffer", db_name);
|
||||
|
||||
let result = parse_validate_and_update_catalog(
|
||||
db_name.clone(),
|
||||
lp,
|
||||
&self.catalog,
|
||||
ingest_time,
|
||||
self.segment_duration,
|
||||
accept_partial,
|
||||
precision,
|
||||
)?;
|
||||
let result = WriteValidator::initialize(db_name.clone(), self.catalog())?
|
||||
.v1_parse_lines_and_update_schema(lp, accept_partial)?
|
||||
.convert_lines_to_buffer(ingest_time, self.segment_duration, precision);
|
||||
|
||||
self.write_buffer_flusher
|
||||
.write_to_open_segment(result.valid_segmented_data)
|
||||
|
@ -198,7 +187,32 @@ impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
|
|||
invalid_lines: result.errors,
|
||||
line_count: result.line_count,
|
||||
field_count: result.field_count,
|
||||
tag_count: result.tag_count,
|
||||
index_count: result.index_count,
|
||||
})
|
||||
}
|
||||
|
||||
async fn write_lp_v3(
|
||||
&self,
|
||||
db_name: NamespaceName<'static>,
|
||||
lp: &str,
|
||||
ingest_time: Time,
|
||||
accept_partial: bool,
|
||||
precision: Precision,
|
||||
) -> Result<BufferedWriteRequest> {
|
||||
let result = WriteValidator::initialize(db_name.clone(), self.catalog())?
|
||||
.v3_parse_lines_and_update_schema(lp, accept_partial)?
|
||||
.convert_lines_to_buffer(ingest_time, self.segment_duration, precision);
|
||||
|
||||
self.write_buffer_flusher
|
||||
.write_to_open_segment(result.valid_segmented_data)
|
||||
.await?;
|
||||
|
||||
Ok(BufferedWriteRequest {
|
||||
db_name,
|
||||
invalid_lines: result.errors,
|
||||
line_count: result.line_count,
|
||||
field_count: result.field_count,
|
||||
index_count: result.index_count,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -246,7 +260,7 @@ impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
|
|||
Some(parquet_file.row_count as usize),
|
||||
&table_schema,
|
||||
Some(parquet_file.timestamp_min_max()),
|
||||
None,
|
||||
&NoColumnRanges,
|
||||
);
|
||||
|
||||
let location = ObjPath::from(parquet_file.path.clone());
|
||||
|
@ -295,7 +309,7 @@ impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
|
|||
Some(parquet_file.row_count as usize),
|
||||
&table_schema,
|
||||
Some(parquet_file.timestamp_min_max()),
|
||||
None,
|
||||
&NoColumnRanges,
|
||||
);
|
||||
|
||||
let location = ObjPath::from(parquet_file.path.clone());
|
||||
|
@ -396,6 +410,18 @@ impl<W: Wal, T: TimeProvider> Bufferer for WriteBufferImpl<W, T> {
|
|||
.await
|
||||
}
|
||||
|
||||
async fn write_lp_v3(
|
||||
&self,
|
||||
database: NamespaceName<'static>,
|
||||
lp: &str,
|
||||
ingest_time: Time,
|
||||
accept_partial: bool,
|
||||
precision: Precision,
|
||||
) -> Result<BufferedWriteRequest> {
|
||||
self.write_lp_v3(database, lp, ingest_time, accept_partial, precision)
|
||||
.await
|
||||
}
|
||||
|
||||
fn wal(&self) -> Option<Arc<impl Wal>> {
|
||||
self.wal.clone()
|
||||
}
|
||||
|
@ -420,345 +446,6 @@ impl<W: Wal, T: TimeProvider> ChunkContainer for WriteBufferImpl<W, T> {
|
|||
|
||||
impl<W: Wal, T: TimeProvider> WriteBuffer for WriteBufferImpl<W, T> {}
|
||||
|
||||
/// Returns a validated result and the sequence number of the catalog before any updates were
|
||||
/// applied.
|
||||
pub(crate) fn parse_validate_and_update_catalog(
|
||||
db_name: NamespaceName<'static>,
|
||||
lp: &str,
|
||||
catalog: &Catalog,
|
||||
ingest_time: Time,
|
||||
segment_duration: SegmentDuration,
|
||||
accept_partial: bool,
|
||||
precision: Precision,
|
||||
) -> Result<ValidationResult> {
|
||||
let (sequence, db) = catalog.db_or_create(db_name.as_str())?;
|
||||
let mut result = parse_validate_and_update_schema(
|
||||
lp,
|
||||
&db,
|
||||
db_name,
|
||||
ingest_time,
|
||||
segment_duration,
|
||||
accept_partial,
|
||||
precision,
|
||||
sequence,
|
||||
)?;
|
||||
|
||||
if let Some(schema) = result.schema.take() {
|
||||
debug!("replacing schema for {:?}", schema);
|
||||
|
||||
catalog.replace_database(sequence, Arc::new(schema))?;
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Takes &str of line protocol, parses lines, validates the schema, and inserts new columns
|
||||
/// if present. Assigns the default time to any lines that do not include a time
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn parse_validate_and_update_schema(
|
||||
lp: &str,
|
||||
schema: &DatabaseSchema,
|
||||
db_name: NamespaceName<'static>,
|
||||
ingest_time: Time,
|
||||
segment_duration: SegmentDuration,
|
||||
accept_partial: bool,
|
||||
precision: Precision,
|
||||
starting_catalog_sequence_number: SequenceNumber,
|
||||
) -> Result<ValidationResult> {
|
||||
let mut errors = vec![];
|
||||
let mut lp_lines = lp.lines();
|
||||
|
||||
let mut valid_parsed_and_raw_lines: Vec<(ParsedLine, &str)> = vec![];
|
||||
|
||||
for (line_idx, maybe_line) in parse_lines(lp).enumerate() {
|
||||
let line = match maybe_line
|
||||
.map_err(|e| WriteLineError {
|
||||
// This unwrap is fine because we're moving line by line
|
||||
// alongside the output from parse_lines
|
||||
original_line: lp_lines.next().unwrap().to_string(),
|
||||
line_number: line_idx + 1,
|
||||
error_message: e.to_string(),
|
||||
})
|
||||
.and_then(|l| validate_line_schema(line_idx, l, schema))
|
||||
{
|
||||
Ok(line) => line,
|
||||
Err(e) => {
|
||||
if !accept_partial {
|
||||
return Err(Error::ParseError(e));
|
||||
} else {
|
||||
errors.push(e);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// This unwrap is fine because we're moving line by line
|
||||
// alongside the output from parse_lines
|
||||
valid_parsed_and_raw_lines.push((line, lp_lines.next().unwrap()));
|
||||
}
|
||||
|
||||
validate_or_insert_schema_and_partitions(
|
||||
valid_parsed_and_raw_lines,
|
||||
schema,
|
||||
db_name,
|
||||
ingest_time,
|
||||
segment_duration,
|
||||
precision,
|
||||
starting_catalog_sequence_number,
|
||||
)
|
||||
.map(move |mut result| {
|
||||
result.errors = errors;
|
||||
result
|
||||
})
|
||||
}
|
||||
|
||||
/// Validate a line of line protocol against the given schema definition
|
||||
///
|
||||
/// This is for scenarios where a write comes in for a table that exists, but may have invalid field
|
||||
/// types, based on the pre-existing schema.
|
||||
fn validate_line_schema<'a>(
|
||||
line_number: usize,
|
||||
line: ParsedLine<'a>,
|
||||
schema: &DatabaseSchema,
|
||||
) -> Result<ParsedLine<'a>, WriteLineError> {
|
||||
let table_name = line.series.measurement.as_str();
|
||||
if let Some(table_schema) = schema.get_table_schema(table_name) {
|
||||
for (field_name, field_val) in line.field_set.iter() {
|
||||
if let Some(schema_col_type) = table_schema.field_type_by_name(field_name) {
|
||||
let field_col_type = column_type_from_field(field_val);
|
||||
if field_col_type != schema_col_type {
|
||||
let field_name = field_name.to_string();
|
||||
return Err(WriteLineError {
|
||||
original_line: line.to_string(),
|
||||
line_number: line_number + 1,
|
||||
error_message: format!(
|
||||
"invalid field value in line protocol for field '{field_name}' on line \
|
||||
{line_number}: expected type {expected}, but got {got}",
|
||||
expected = ColumnType::from(schema_col_type),
|
||||
got = field_col_type,
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(line)
|
||||
}
|
||||
|
||||
/// Takes parsed lines, validates their schema. If new tables or columns are defined, they
|
||||
/// are passed back as a new DatabaseSchema as part of the ValidationResult. Lines are split
|
||||
/// into partitions and the validation result contains the data that can then be serialized
|
||||
/// into the WAL.
|
||||
pub(crate) fn validate_or_insert_schema_and_partitions(
|
||||
lines: Vec<(ParsedLine<'_>, &str)>,
|
||||
schema: &DatabaseSchema,
|
||||
db_name: NamespaceName<'static>,
|
||||
ingest_time: Time,
|
||||
segment_duration: SegmentDuration,
|
||||
precision: Precision,
|
||||
starting_catalog_sequence_number: SequenceNumber,
|
||||
) -> Result<ValidationResult> {
|
||||
// The (potentially updated) DatabaseSchema to return to the caller.
|
||||
let mut schema = Cow::Borrowed(schema);
|
||||
|
||||
// The parsed and validated table_batches
|
||||
let mut segment_table_batches: HashMap<Time, TableBatchMap> = HashMap::new();
|
||||
|
||||
let line_count = lines.len();
|
||||
let mut field_count = 0;
|
||||
let mut tag_count = 0;
|
||||
|
||||
for (line, raw_line) in lines.into_iter() {
|
||||
field_count += line.field_set.len();
|
||||
tag_count += line.series.tag_set.as_ref().map(|t| t.len()).unwrap_or(0);
|
||||
|
||||
validate_and_convert_parsed_line(
|
||||
line,
|
||||
raw_line,
|
||||
&mut segment_table_batches,
|
||||
&mut schema,
|
||||
ingest_time,
|
||||
segment_duration,
|
||||
precision,
|
||||
)?;
|
||||
}
|
||||
|
||||
let schema = match schema {
|
||||
Cow::Owned(s) => Some(s),
|
||||
Cow::Borrowed(_) => None,
|
||||
};
|
||||
|
||||
let valid_segmented_data = segment_table_batches
|
||||
.into_iter()
|
||||
.map(|(segment_start, table_batches)| ValidSegmentedData {
|
||||
database_name: db_name.clone(),
|
||||
segment_start,
|
||||
table_batches: table_batches.table_batches,
|
||||
wal_op: WalOp::LpWrite(LpWriteOp {
|
||||
db_name: db_name.to_string(),
|
||||
lp: table_batches.lines.join("\n"),
|
||||
default_time: ingest_time.timestamp_nanos(),
|
||||
precision,
|
||||
}),
|
||||
starting_catalog_sequence_number,
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(ValidationResult {
|
||||
schema,
|
||||
line_count,
|
||||
field_count,
|
||||
tag_count,
|
||||
errors: vec![],
|
||||
valid_segmented_data,
|
||||
})
|
||||
}
|
||||
|
||||
/// Check if the table exists in the schema and update the schema if it does not
|
||||
// Because the entry API requires &mut it is not used to avoid a premature
|
||||
// clone of the Cow.
|
||||
fn validate_and_update_schema(line: &ParsedLine<'_>, schema: &mut Cow<'_, DatabaseSchema>) {
|
||||
let table_name = line.series.measurement.as_str();
|
||||
match schema.tables.get(table_name) {
|
||||
Some(t) => {
|
||||
// Collect new column definitions
|
||||
let mut new_cols = Vec::with_capacity(line.column_count() + 1);
|
||||
if let Some(tagset) = &line.series.tag_set {
|
||||
for (tag_key, _) in tagset {
|
||||
if !t.column_exists(tag_key.as_str()) {
|
||||
new_cols.push((tag_key.to_string(), InfluxColumnType::Tag));
|
||||
}
|
||||
}
|
||||
}
|
||||
for (field_name, value) in &line.field_set {
|
||||
if !t.column_exists(field_name.as_str()) {
|
||||
new_cols.push((
|
||||
field_name.to_string(),
|
||||
influx_column_type_from_field_value(value),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
if !new_cols.is_empty() {
|
||||
let t = schema.to_mut().tables.get_mut(table_name).unwrap();
|
||||
t.add_columns(new_cols);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let mut columns = Vec::new();
|
||||
if let Some(tag_set) = &line.series.tag_set {
|
||||
for (tag_key, _) in tag_set {
|
||||
columns.push((tag_key.to_string(), InfluxColumnType::Tag));
|
||||
}
|
||||
}
|
||||
for (field_name, value) in &line.field_set {
|
||||
columns.push((
|
||||
field_name.to_string(),
|
||||
influx_column_type_from_field_value(value),
|
||||
));
|
||||
}
|
||||
|
||||
columns.push((TIME_COLUMN_NAME.to_string(), InfluxColumnType::Timestamp));
|
||||
|
||||
let table = TableDefinition::new(table_name, columns);
|
||||
|
||||
assert!(schema
|
||||
.to_mut()
|
||||
.tables
|
||||
.insert(table_name.to_string(), table)
|
||||
.is_none());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn validate_and_convert_parsed_line<'a>(
|
||||
line: ParsedLine<'_>,
|
||||
raw_line: &'a str,
|
||||
segment_table_batches: &mut HashMap<Time, TableBatchMap<'a>>,
|
||||
schema: &mut Cow<'_, DatabaseSchema>,
|
||||
ingest_time: Time,
|
||||
segment_duration: SegmentDuration,
|
||||
precision: Precision,
|
||||
) -> Result<()> {
|
||||
validate_and_update_schema(&line, schema);
|
||||
|
||||
// now that we've ensured all columns exist in the schema, construct the actual row and values
|
||||
// while validating the column types match.
|
||||
let mut values = Vec::with_capacity(line.column_count() + 1);
|
||||
|
||||
// validate tags, collecting any new ones that must be inserted, or adding the values
|
||||
if let Some(tag_set) = line.series.tag_set {
|
||||
for (tag_key, value) in tag_set {
|
||||
let value = Field {
|
||||
name: tag_key.to_string(),
|
||||
value: FieldData::Tag(value.to_string()),
|
||||
};
|
||||
values.push(value);
|
||||
}
|
||||
}
|
||||
|
||||
// validate fields, collecting any new ones that must be inserted, or adding values
|
||||
for (field_name, value) in line.field_set {
|
||||
let field_data = match value {
|
||||
FieldValue::I64(v) => FieldData::Integer(v),
|
||||
FieldValue::F64(v) => FieldData::Float(v),
|
||||
FieldValue::U64(v) => FieldData::UInteger(v),
|
||||
FieldValue::Boolean(v) => FieldData::Boolean(v),
|
||||
FieldValue::String(v) => FieldData::String(v.to_string()),
|
||||
};
|
||||
let value = Field {
|
||||
name: field_name.to_string(),
|
||||
value: field_data,
|
||||
};
|
||||
values.push(value);
|
||||
}
|
||||
|
||||
// set the time value
|
||||
let time_value_nanos = line
|
||||
.timestamp
|
||||
.map(|ts| {
|
||||
let multiplier = match precision {
|
||||
Precision::Auto => match crate::guess_precision(ts) {
|
||||
Precision::Second => 1_000_000_000,
|
||||
Precision::Millisecond => 1_000_000,
|
||||
Precision::Microsecond => 1_000,
|
||||
Precision::Nanosecond => 1,
|
||||
|
||||
Precision::Auto => unreachable!(),
|
||||
},
|
||||
Precision::Second => 1_000_000_000,
|
||||
Precision::Millisecond => 1_000_000,
|
||||
Precision::Microsecond => 1_000,
|
||||
Precision::Nanosecond => 1,
|
||||
};
|
||||
|
||||
ts * multiplier
|
||||
})
|
||||
.unwrap_or(ingest_time.timestamp_nanos());
|
||||
|
||||
let segment_start = segment_duration.start_time(time_value_nanos / 1_000_000_000);
|
||||
|
||||
values.push(Field {
|
||||
name: TIME_COLUMN_NAME.to_string(),
|
||||
value: FieldData::Timestamp(time_value_nanos),
|
||||
});
|
||||
|
||||
let table_batch_map = segment_table_batches.entry(segment_start).or_default();
|
||||
|
||||
let table_batch = table_batch_map
|
||||
.table_batches
|
||||
.entry(line.series.measurement.to_string())
|
||||
.or_default();
|
||||
table_batch.rows.push(Row {
|
||||
time: time_value_nanos,
|
||||
fields: values,
|
||||
});
|
||||
|
||||
table_batch_map.lines.push(raw_line);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct TableBatch {
|
||||
#[allow(dead_code)]
|
||||
|
@ -781,6 +468,7 @@ pub(crate) struct Field {
|
|||
#[derive(Clone, Debug)]
|
||||
pub(crate) enum FieldData {
|
||||
Timestamp(i64),
|
||||
Key(String),
|
||||
Tag(String),
|
||||
String(String),
|
||||
Integer(i64),
|
||||
|
@ -794,6 +482,7 @@ impl PartialEq for FieldData {
|
|||
match (self, other) {
|
||||
(FieldData::Timestamp(a), FieldData::Timestamp(b)) => a == b,
|
||||
(FieldData::Tag(a), FieldData::Tag(b)) => a == b,
|
||||
(FieldData::Key(a), FieldData::Key(b)) => a == b,
|
||||
(FieldData::String(a), FieldData::String(b)) => a == b,
|
||||
(FieldData::Integer(a), FieldData::Integer(b)) => a == b,
|
||||
(FieldData::UInteger(a), FieldData::UInteger(b)) => a == b,
|
||||
|
@ -806,25 +495,24 @@ impl PartialEq for FieldData {
|
|||
|
||||
impl Eq for FieldData {}
|
||||
|
||||
/// Result of the validation. If the NamespaceSchema or PartitionMap were updated, they will be
|
||||
/// in the result.
|
||||
#[derive(Debug, Default)]
|
||||
#[allow(dead_code)]
|
||||
pub(crate) struct ValidationResult {
|
||||
/// If the namespace schema is updated with new tables or columns it will be here, which
|
||||
/// can be used to update the cache.
|
||||
pub(crate) schema: Option<DatabaseSchema>,
|
||||
/// Number of lines passed in
|
||||
pub(crate) line_count: usize,
|
||||
/// Number of fields passed in
|
||||
pub(crate) field_count: usize,
|
||||
/// Number of tags passed in
|
||||
pub(crate) tag_count: usize,
|
||||
/// Any errors that occurred while parsing the lines
|
||||
pub(crate) errors: Vec<crate::WriteLineError>,
|
||||
/// Only valid lines from what was passed in to validate, segmented based on the
|
||||
/// timestamps of the data.
|
||||
pub(crate) valid_segmented_data: Vec<ValidSegmentedData>,
|
||||
impl<'a> From<&SeriesValue<'a>> for FieldData {
|
||||
fn from(sk: &SeriesValue<'a>) -> Self {
|
||||
match sk {
|
||||
SeriesValue::String(s) => Self::Key(s.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<FieldValue<'a>> for FieldData {
|
||||
fn from(value: FieldValue<'a>) -> Self {
|
||||
match value {
|
||||
FieldValue::I64(v) => Self::Integer(v),
|
||||
FieldValue::U64(v) => Self::UInteger(v),
|
||||
FieldValue::F64(v) => Self::Float(v),
|
||||
FieldValue::String(v) => Self::String(v.to_string()),
|
||||
FieldValue::Boolean(v) => Self::Boolean(v),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -843,34 +531,12 @@ pub(crate) struct TableBatchMap<'a> {
|
|||
pub(crate) table_batches: HashMap<String, TableBatch>,
|
||||
}
|
||||
|
||||
/// The 32 byte SHA256 digest of the full tag set for a line of measurement data
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
|
||||
pub struct SeriesId([u8; 32]);
|
||||
|
||||
impl std::fmt::Display for SeriesId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", hex::encode(self.0))
|
||||
}
|
||||
}
|
||||
|
||||
fn default_series_sha() -> &'static [u8; 32] {
|
||||
static DEFAULT_SERIES_ID_SHA: OnceLock<[u8; 32]> = OnceLock::new();
|
||||
// the unwrap is safe here because the Sha256 digest will always be 32 bytes:
|
||||
DEFAULT_SERIES_ID_SHA.get_or_init(|| Sha256::digest("")[..].try_into().unwrap())
|
||||
}
|
||||
|
||||
impl Default for SeriesId {
|
||||
fn default() -> Self {
|
||||
Self(default_series_sha().to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::persister::PersisterImpl;
|
||||
use crate::wal::WalImpl;
|
||||
use crate::{SegmentId, SequenceNumber, WalOpBatch};
|
||||
use crate::{LpWriteOp, SegmentId, SequenceNumber, WalOpBatch};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::assert_batches_eq;
|
||||
use datafusion_util::config::register_iox_object_store;
|
||||
|
@ -881,22 +547,20 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn parse_lp_into_buffer() {
|
||||
let db = Arc::new(DatabaseSchema::new("foo"));
|
||||
let catalog = Arc::new(Catalog::new());
|
||||
let db_name = NamespaceName::new("foo").unwrap();
|
||||
let lp = "cpu,region=west user=23.2 100\nfoo f1=1i";
|
||||
let result = parse_validate_and_update_schema(
|
||||
lp,
|
||||
&db,
|
||||
db_name,
|
||||
Time::from_timestamp_nanos(0),
|
||||
SegmentDuration::new_5m(),
|
||||
false,
|
||||
Precision::Nanosecond,
|
||||
SequenceNumber::new(0),
|
||||
)
|
||||
.unwrap();
|
||||
WriteValidator::initialize(db_name, Arc::clone(&catalog))
|
||||
.unwrap()
|
||||
.v1_parse_lines_and_update_schema(lp, false)
|
||||
.unwrap()
|
||||
.convert_lines_to_buffer(
|
||||
Time::from_timestamp_nanos(0),
|
||||
SegmentDuration::new_5m(),
|
||||
Precision::Nanosecond,
|
||||
);
|
||||
|
||||
let db = result.schema.unwrap();
|
||||
let db = catalog.db_schema("foo").unwrap();
|
||||
|
||||
assert_eq!(db.tables.len(), 2);
|
||||
assert_eq!(db.tables.get("cpu").unwrap().num_columns(), 3);
|
||||
|
@ -933,7 +597,7 @@ mod tests {
|
|||
.unwrap();
|
||||
assert_eq!(summary.line_count, 1);
|
||||
assert_eq!(summary.field_count, 1);
|
||||
assert_eq!(summary.tag_count, 0);
|
||||
assert_eq!(summary.index_count, 0);
|
||||
|
||||
// ensure the data is in the buffer
|
||||
let actual = write_buffer.get_table_record_batches("foo", "cpu");
|
||||
|
|
|
@ -15,7 +15,7 @@ use data_types::{ChunkId, ChunkOrder, TableId, TransitionPartitionId};
|
|||
use datafusion::common::DataFusionError;
|
||||
use datafusion::execution::context::SessionState;
|
||||
use datafusion::logical_expr::Expr;
|
||||
use iox_query::chunk_statistics::create_chunk_statistics;
|
||||
use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges};
|
||||
use iox_query::QueryChunk;
|
||||
use iox_time::{Time, TimeProvider};
|
||||
use observability_deps::tracing::error;
|
||||
|
@ -148,7 +148,7 @@ impl<T: TimeProvider, W: Wal> SegmentState<T, W> {
|
|||
Some(row_count),
|
||||
&schema,
|
||||
Some(segment.segment_range().timestamp_min_max()),
|
||||
None,
|
||||
&NoColumnRanges,
|
||||
);
|
||||
|
||||
chunks.push(Arc::new(BufferChunk {
|
||||
|
@ -187,7 +187,7 @@ impl<T: TimeProvider, W: Wal> SegmentState<T, W> {
|
|||
Some(row_count),
|
||||
&schema,
|
||||
Some(persisting_segment.segment_range.timestamp_min_max()),
|
||||
None,
|
||||
&NoColumnRanges,
|
||||
);
|
||||
|
||||
chunks.push(Arc::new(BufferChunk {
|
||||
|
@ -574,7 +574,11 @@ mod tests {
|
|||
None,
|
||||
);
|
||||
open_segment1
|
||||
.buffer_writes(lp_to_write_batch(&catalog, "foo", "cpu bar=1 10"))
|
||||
.buffer_writes(lp_to_write_batch(
|
||||
Arc::clone(&catalog),
|
||||
"foo",
|
||||
"cpu bar=1 10",
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut open_segment2 = OpenBufferSegment::new(
|
||||
|
@ -591,7 +595,11 @@ mod tests {
|
|||
None,
|
||||
);
|
||||
open_segment2
|
||||
.buffer_writes(lp_to_write_batch(&catalog, "foo", "cpu bar=2 300000000000"))
|
||||
.buffer_writes(lp_to_write_batch(
|
||||
Arc::clone(&catalog),
|
||||
"foo",
|
||||
"cpu bar=2 300000000000",
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut open_segment3 = OpenBufferSegment::new(
|
||||
|
@ -608,7 +616,11 @@ mod tests {
|
|||
None,
|
||||
);
|
||||
open_segment3
|
||||
.buffer_writes(lp_to_write_batch(&catalog, "foo", "cpu bar=3 700000000000"))
|
||||
.buffer_writes(lp_to_write_batch(
|
||||
Arc::clone(&catalog),
|
||||
"foo",
|
||||
"cpu bar=3 700000000000",
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let wal = Arc::new(TestWal::default());
|
||||
|
|
|
@ -99,6 +99,24 @@ impl TableBuffer {
|
|||
panic!("unexpected field type");
|
||||
}
|
||||
}
|
||||
FieldData::Key(v) => {
|
||||
if !self.data.contains_key(&f.name) {
|
||||
let key_builder = StringDictionaryBuilder::new();
|
||||
if self.row_count > 0 {
|
||||
panic!("series key columns must be passed in the very first write for a table");
|
||||
}
|
||||
self.data.insert(f.name.clone(), Builder::Key(key_builder));
|
||||
}
|
||||
let b = self
|
||||
.data
|
||||
.get_mut(&f.name)
|
||||
.expect("key builder should exist");
|
||||
let Builder::Key(b) = b else {
|
||||
panic!("unexpected field type");
|
||||
};
|
||||
self.index.add_row_if_indexed_column(b.len(), &f.name, &v);
|
||||
b.append_value(v);
|
||||
}
|
||||
FieldData::String(v) => {
|
||||
let b = self.data.entry(f.name).or_insert_with(|| {
|
||||
let mut string_builder = StringBuilder::new();
|
||||
|
@ -188,6 +206,7 @@ impl TableBuffer {
|
|||
Builder::U64(b) => b.append_null(),
|
||||
Builder::String(b) => b.append_null(),
|
||||
Builder::Tag(b) => b.append_null(),
|
||||
Builder::Key(b) => b.append_null(),
|
||||
Builder::Time(b) => b.append_null(),
|
||||
}
|
||||
}
|
||||
|
@ -234,6 +253,7 @@ impl TableBuffer {
|
|||
}
|
||||
|
||||
/// Returns an estimate of the size of this table buffer based on the data and index sizes.
|
||||
#[cfg(test)]
|
||||
pub fn _computed_size(&self) -> usize {
|
||||
let mut size = size_of::<Self>();
|
||||
for (k, v) in &self.data {
|
||||
|
@ -319,6 +339,9 @@ pub enum Builder {
|
|||
U64(UInt64Builder),
|
||||
String(StringBuilder),
|
||||
Tag(StringDictionaryBuilder<Int32Type>),
|
||||
// For now we use a string dict to be consistent with tags, but in future
|
||||
// keys, like fields may support different data types.
|
||||
Key(StringDictionaryBuilder<Int32Type>),
|
||||
Time(TimestampNanosecondBuilder),
|
||||
}
|
||||
|
||||
|
@ -331,6 +354,7 @@ impl Builder {
|
|||
Self::U64(b) => Arc::new(b.finish_cloned()),
|
||||
Self::String(b) => Arc::new(b.finish_cloned()),
|
||||
Self::Tag(b) => Arc::new(b.finish_cloned()),
|
||||
Self::Key(b) => Arc::new(b.finish_cloned()),
|
||||
Self::Time(b) => Arc::new(b.finish_cloned()),
|
||||
}
|
||||
}
|
||||
|
@ -377,7 +401,7 @@ impl Builder {
|
|||
}
|
||||
Arc::new(builder.finish())
|
||||
}
|
||||
Self::Tag(b) => {
|
||||
Self::Tag(b) | Self::Key(b) => {
|
||||
let b = b.finish_cloned();
|
||||
let bv = b.values();
|
||||
let bva: &StringArray = bv.as_any().downcast_ref::<StringArray>().unwrap();
|
||||
|
@ -422,7 +446,7 @@ impl Builder {
|
|||
+ b.offsets_slice().len()
|
||||
+ b.validity_slice().map(|s| s.len()).unwrap_or(0)
|
||||
}
|
||||
Self::Tag(b) => {
|
||||
Self::Tag(b) | Self::Key(b) => {
|
||||
let b = b.finish_cloned();
|
||||
b.keys().len() * size_of::<i32>() + b.values().get_array_memory_size()
|
||||
}
|
||||
|
@ -626,6 +650,6 @@ mod tests {
|
|||
table_buffer.add_rows(rows);
|
||||
|
||||
let size = table_buffer._computed_size();
|
||||
assert_eq!(size, 18126);
|
||||
assert_eq!(size, 18150);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,711 @@
|
|||
use std::{borrow::Cow, collections::HashMap, sync::Arc};
|
||||
|
||||
use data_types::NamespaceName;
|
||||
use influxdb_line_protocol::{parse_lines, v3, FieldValue, ParsedLine};
|
||||
use iox_time::Time;
|
||||
use schema::{InfluxColumnType, TIME_COLUMN_NAME};
|
||||
|
||||
use crate::{
|
||||
catalog::{influx_column_type_from_field_value, Catalog, DatabaseSchema, TableDefinition},
|
||||
write_buffer::Result,
|
||||
LpWriteOp, Precision, SegmentDuration, SequenceNumber, WalOp, WriteLineError,
|
||||
};
|
||||
|
||||
use super::{Error, Field, FieldData, Row, TableBatchMap, ValidSegmentedData};
|
||||
|
||||
/// Type state for the [`WriteValidator`] after it has been initialized
|
||||
/// with the catalog.
|
||||
pub(crate) struct WithCatalog {
|
||||
db_name: NamespaceName<'static>,
|
||||
catalog: Arc<Catalog>,
|
||||
sequence: SequenceNumber,
|
||||
db_schema: Arc<DatabaseSchema>,
|
||||
}
|
||||
|
||||
/// Type state for the [`WriteValidator`] after it has parsed v1 or v3
|
||||
/// line protocol.
|
||||
pub(crate) struct LinesParsed<'raw, PL> {
|
||||
catalog: WithCatalog,
|
||||
lines: Vec<(PL, &'raw str)>,
|
||||
errors: Vec<WriteLineError>,
|
||||
}
|
||||
|
||||
/// A state machine for validating v1 or v3 line protocol and updating
|
||||
/// the [`Catalog`] with new tables or schema changes.
|
||||
pub(crate) struct WriteValidator<State> {
|
||||
state: State,
|
||||
}
|
||||
|
||||
impl WriteValidator<WithCatalog> {
|
||||
/// Initialize the [`WriteValidator`] by getting a handle to, or creating
|
||||
/// a handle to the [`DatabaseSchema`] for the given namespace name `db_name`.
|
||||
pub(crate) fn initialize(
|
||||
db_name: NamespaceName<'static>,
|
||||
catalog: Arc<Catalog>,
|
||||
) -> Result<WriteValidator<WithCatalog>> {
|
||||
let (sequence, db_schema) = catalog.db_or_create(db_name.as_str())?;
|
||||
Ok(WriteValidator {
|
||||
state: WithCatalog {
|
||||
db_name,
|
||||
catalog,
|
||||
sequence,
|
||||
db_schema,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/// Parse the incoming lines of line protocol using the v3 parser and update
|
||||
/// the [`DatabaseSchema`] if:
|
||||
///
|
||||
/// * A new table is being added
|
||||
/// * New fields, or tags are being added to an existing table
|
||||
///
|
||||
/// # Implementation Note
|
||||
///
|
||||
/// If this function succeeds, then the catalog will receive an update, so
|
||||
/// steps following this should be infallible.
|
||||
pub(crate) fn v3_parse_lines_and_update_schema(
|
||||
self,
|
||||
lp: &str,
|
||||
accept_partial: bool,
|
||||
) -> Result<WriteValidator<LinesParsed<'_, v3::ParsedLine<'_>>>> {
|
||||
let mut errors = vec![];
|
||||
let mut lp_lines = lp.lines().peekable();
|
||||
let mut lines = vec![];
|
||||
let mut schema = Cow::Borrowed(self.state.db_schema.as_ref());
|
||||
|
||||
for (line_idx, maybe_line) in v3::parse_lines(lp).enumerate() {
|
||||
let line = match maybe_line
|
||||
.map_err(|e| WriteLineError {
|
||||
original_line: lp_lines.next().unwrap().to_string(),
|
||||
line_number: line_idx + 1,
|
||||
error_message: e.to_string(),
|
||||
})
|
||||
.and_then(|l| validate_v3_line(&mut schema, line_idx, l, lp_lines.peek().unwrap()))
|
||||
{
|
||||
Ok(line) => line,
|
||||
Err(e) => {
|
||||
if !accept_partial {
|
||||
return Err(Error::ParseError(e));
|
||||
} else {
|
||||
errors.push(e);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
lines.push((line, lp_lines.next().unwrap()));
|
||||
}
|
||||
|
||||
if let Cow::Owned(schema) = schema {
|
||||
self.state
|
||||
.catalog
|
||||
.replace_database(self.state.sequence, Arc::new(schema))?;
|
||||
}
|
||||
|
||||
Ok(WriteValidator {
|
||||
state: LinesParsed {
|
||||
catalog: self.state,
|
||||
lines,
|
||||
errors,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/// Parse the incoming lines of line protocol using the v1 parser and update
|
||||
/// the [`DatabaseSchema`] if:
|
||||
///
|
||||
/// * A new table is being added
|
||||
/// * New fields, or tags are being added to an existing table
|
||||
///
|
||||
/// # Implementation Note
|
||||
///
|
||||
/// If this function succeeds, then the catalog will receive an update, so
|
||||
/// steps following this should be infallible.
|
||||
pub(crate) fn v1_parse_lines_and_update_schema(
|
||||
self,
|
||||
lp: &str,
|
||||
accept_partial: bool,
|
||||
) -> Result<WriteValidator<LinesParsed<'_, ParsedLine<'_>>>> {
|
||||
let mut errors = vec![];
|
||||
let mut lp_lines = lp.lines();
|
||||
let mut lines = vec![];
|
||||
let mut schema = Cow::Borrowed(self.state.db_schema.as_ref());
|
||||
|
||||
for (line_idx, maybe_line) in parse_lines(lp).enumerate() {
|
||||
let line = match maybe_line
|
||||
.map_err(|e| WriteLineError {
|
||||
// This unwrap is fine because we're moving line by line
|
||||
// alongside the output from parse_lines
|
||||
original_line: lp_lines.next().unwrap().to_string(),
|
||||
line_number: line_idx + 1,
|
||||
error_message: e.to_string(),
|
||||
})
|
||||
.and_then(|l| validate_v1_line(&mut schema, line_idx, l))
|
||||
{
|
||||
Ok(line) => line,
|
||||
Err(e) => {
|
||||
if !accept_partial {
|
||||
return Err(Error::ParseError(e));
|
||||
} else {
|
||||
errors.push(e);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// This unwrap is fine because we're moving line by line
|
||||
// alongside the output from parse_lines
|
||||
lines.push((line, lp_lines.next().unwrap()));
|
||||
}
|
||||
|
||||
// All lines are parsed and validated, so all steps after this
|
||||
// are infallible, therefore, update the catalog if changes were
|
||||
// made to the schema:
|
||||
if let Cow::Owned(schema) = schema {
|
||||
self.state
|
||||
.catalog
|
||||
.replace_database(self.state.sequence, Arc::new(schema))?;
|
||||
}
|
||||
|
||||
Ok(WriteValidator {
|
||||
state: LinesParsed {
|
||||
catalog: self.state,
|
||||
lines,
|
||||
errors,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Validate an individual line of v3 line protocol and update the database
|
||||
/// schema
|
||||
///
|
||||
/// The [`DatabaseSchema`] will be updated if the line is being written to a new table, or if
|
||||
/// the line contains new fields. Note that for v3, the series key members must be consistent,
|
||||
/// and therefore new tag columns will never be added after the first write.
|
||||
///
|
||||
/// This errors if the write is being performed against a v1 table, i.e., one that does not have
|
||||
/// a series key.
|
||||
fn validate_v3_line<'a>(
|
||||
db_schema: &mut Cow<'_, DatabaseSchema>,
|
||||
line_number: usize,
|
||||
line: v3::ParsedLine<'a>,
|
||||
raw_line: &str,
|
||||
) -> Result<v3::ParsedLine<'a>, WriteLineError> {
|
||||
let table_name = line.series.measurement.as_str();
|
||||
if let Some(table_def) = db_schema.get_table(table_name) {
|
||||
if !table_def.is_v3() {
|
||||
return Err(WriteLineError {
|
||||
original_line: raw_line.to_string(),
|
||||
line_number,
|
||||
error_message: "received v3 write protocol for a table that uses the v1 data model"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
let mut columns = Vec::with_capacity(line.column_count() + 1);
|
||||
match (table_def.schema().series_key(), &line.series.series_key) {
|
||||
(Some(s), Some(l)) => {
|
||||
let l = l.iter().map(|sk| sk.0.as_str()).collect::<Vec<&str>>();
|
||||
if s != l {
|
||||
return Err(WriteLineError {
|
||||
original_line: raw_line.to_string(),
|
||||
line_number,
|
||||
error_message: format!(
|
||||
"write to table {table_name} had the incorrect series key, \
|
||||
expected: [{expected}], received: [{received}]",
|
||||
table_name = table_def.name,
|
||||
expected = s.join(", "),
|
||||
received = l.join(", "),
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
(Some(s), None) => {
|
||||
if !s.is_empty() {
|
||||
return Err(WriteLineError {
|
||||
original_line: raw_line.to_string(),
|
||||
line_number,
|
||||
error_message: format!(
|
||||
"write to table {table_name} was missing a series key, the series key \
|
||||
contains [{key_members}]",
|
||||
table_name = table_def.name,
|
||||
key_members = s.join(", "),
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
(None, _) => unreachable!(),
|
||||
}
|
||||
if let Some(series_key) = &line.series.series_key {
|
||||
for (sk, _) in series_key.iter() {
|
||||
if !table_def.column_exists(sk) {
|
||||
columns.push((sk.to_string(), InfluxColumnType::Tag));
|
||||
}
|
||||
}
|
||||
}
|
||||
for (field_name, field_val) in line.field_set.iter() {
|
||||
if let Some(schema_col_type) = table_def.field_type_by_name(field_name) {
|
||||
let field_col_type = influx_column_type_from_field_value(field_val);
|
||||
if field_col_type != schema_col_type {
|
||||
let field_name = field_name.to_string();
|
||||
return Err(WriteLineError {
|
||||
original_line: raw_line.to_string(),
|
||||
line_number: line_number + 1,
|
||||
error_message: format!(
|
||||
"invalid field value in line protocol for field '{field_name}' on line \
|
||||
{line_number}: expected type {expected}, but got {got}",
|
||||
expected = schema_col_type,
|
||||
got = field_col_type,
|
||||
),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
columns.push((
|
||||
field_name.to_string(),
|
||||
influx_column_type_from_field_value(field_val),
|
||||
));
|
||||
}
|
||||
}
|
||||
if !columns.is_empty() {
|
||||
let t = db_schema.to_mut().tables.get_mut(table_name).unwrap();
|
||||
t.add_columns(columns);
|
||||
}
|
||||
} else {
|
||||
let mut columns = Vec::new();
|
||||
let mut key = Vec::new();
|
||||
if let Some(series_key) = &line.series.series_key {
|
||||
for (sk, _) in series_key.iter() {
|
||||
key.push(sk.to_string());
|
||||
columns.push((sk.to_string(), InfluxColumnType::Tag));
|
||||
}
|
||||
}
|
||||
for (field_name, field_val) in line.field_set.iter() {
|
||||
columns.push((
|
||||
field_name.to_string(),
|
||||
influx_column_type_from_field_value(field_val),
|
||||
));
|
||||
}
|
||||
// Always add time last on new table:
|
||||
columns.push((TIME_COLUMN_NAME.to_string(), InfluxColumnType::Timestamp));
|
||||
let table = TableDefinition::new(table_name, columns, Some(key));
|
||||
|
||||
assert!(
|
||||
db_schema
|
||||
.to_mut()
|
||||
.tables
|
||||
.insert(table_name.to_string(), table)
|
||||
.is_none(),
|
||||
"attempted to overwrite existing table"
|
||||
)
|
||||
}
|
||||
|
||||
Ok(line)
|
||||
}
|
||||
|
||||
/// Validate a line of line protocol against the given schema definition
|
||||
///
|
||||
/// This is for scenarios where a write comes in for a table that exists, but may have
|
||||
/// invalid field types, based on the pre-existing schema.
|
||||
///
|
||||
/// An error will also be produced if the write, which is for the v1 data model, is targetting
|
||||
/// a v3 table.
|
||||
fn validate_v1_line<'a>(
|
||||
db_schema: &mut Cow<'_, DatabaseSchema>,
|
||||
line_number: usize,
|
||||
line: ParsedLine<'a>,
|
||||
) -> Result<ParsedLine<'a>, WriteLineError> {
|
||||
let table_name = line.series.measurement.as_str();
|
||||
if let Some(table_def) = db_schema.get_table(table_name) {
|
||||
if table_def.is_v3() {
|
||||
return Err(WriteLineError {
|
||||
original_line: line.to_string(),
|
||||
line_number,
|
||||
error_message: "received v1 write protocol for a table that uses the v3 data model"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
// This table already exists, so update with any new columns if present:
|
||||
let mut columns = Vec::with_capacity(line.column_count() + 1);
|
||||
if let Some(tag_set) = &line.series.tag_set {
|
||||
for (tag_key, _) in tag_set {
|
||||
if !table_def.column_exists(tag_key) {
|
||||
columns.push((tag_key.to_string(), InfluxColumnType::Tag));
|
||||
}
|
||||
}
|
||||
}
|
||||
for (field_name, field_val) in line.field_set.iter() {
|
||||
// This field already exists, so check the incoming type matches existing type:
|
||||
if let Some(schema_col_type) = table_def.field_type_by_name(field_name) {
|
||||
let field_col_type = influx_column_type_from_field_value(field_val);
|
||||
if field_col_type != schema_col_type {
|
||||
let field_name = field_name.to_string();
|
||||
return Err(WriteLineError {
|
||||
original_line: line.to_string(),
|
||||
line_number: line_number + 1,
|
||||
error_message: format!(
|
||||
"invalid field value in line protocol for field '{field_name}' on line \
|
||||
{line_number}: expected type {expected}, but got {got}",
|
||||
expected = schema_col_type,
|
||||
got = field_col_type,
|
||||
),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
columns.push((
|
||||
field_name.to_string(),
|
||||
influx_column_type_from_field_value(field_val),
|
||||
));
|
||||
}
|
||||
}
|
||||
if !columns.is_empty() {
|
||||
// unwrap is safe due to the surrounding if let condition:
|
||||
let t = db_schema.to_mut().tables.get_mut(table_name).unwrap();
|
||||
t.add_columns(columns);
|
||||
}
|
||||
} else {
|
||||
// This is a new table, so build up its columns:
|
||||
let mut columns = Vec::new();
|
||||
if let Some(tag_set) = &line.series.tag_set {
|
||||
for (tag_key, _) in tag_set {
|
||||
columns.push((tag_key.to_string(), InfluxColumnType::Tag));
|
||||
}
|
||||
}
|
||||
for (field_name, field_val) in &line.field_set {
|
||||
columns.push((
|
||||
field_name.to_string(),
|
||||
influx_column_type_from_field_value(field_val),
|
||||
));
|
||||
}
|
||||
// Always add time last on new table:
|
||||
columns.push((TIME_COLUMN_NAME.to_string(), InfluxColumnType::Timestamp));
|
||||
let table = TableDefinition::new(table_name, columns, Option::<Vec<String>>::None);
|
||||
|
||||
assert!(
|
||||
db_schema
|
||||
.to_mut()
|
||||
.tables
|
||||
.insert(table_name.to_string(), table)
|
||||
.is_none(),
|
||||
"attempted to overwrite existing table"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(line)
|
||||
}
|
||||
|
||||
/// Result of conversion from line protocol to valid segmented data
|
||||
/// for the buffer.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct ValidatedLines {
|
||||
/// Number of lines passed in
|
||||
pub(crate) line_count: usize,
|
||||
/// Number of fields passed in
|
||||
pub(crate) field_count: usize,
|
||||
/// Number of index columns passed in, whether tags (v1) or series keys (v3)
|
||||
pub(crate) index_count: usize,
|
||||
/// Any errors that occurred while parsing the lines
|
||||
pub(crate) errors: Vec<WriteLineError>,
|
||||
/// Only valid lines from what was passed in to validate, segmented based on the
|
||||
/// timestamps of the data.
|
||||
pub(crate) valid_segmented_data: Vec<ValidSegmentedData>,
|
||||
}
|
||||
|
||||
impl<'lp> WriteValidator<LinesParsed<'lp, v3::ParsedLine<'lp>>> {
|
||||
/// Convert a set of valid parsed `v3` lines to a [`ValidatedLines`] which will
|
||||
/// be buffered and written to the WAL, if configured.
|
||||
///
|
||||
/// This involves splitting out the writes into different batches for any
|
||||
/// segment affected by the write. This function should be infallible, because
|
||||
/// the schema for incoming writes has been fully validated.
|
||||
pub(crate) fn convert_lines_to_buffer(
|
||||
self,
|
||||
ingest_time: Time,
|
||||
segment_duration: SegmentDuration,
|
||||
precision: Precision,
|
||||
) -> ValidatedLines {
|
||||
let mut segment_table_batches = HashMap::new();
|
||||
let line_count = self.state.lines.len();
|
||||
let mut field_count = 0;
|
||||
let mut series_key_count = 0;
|
||||
|
||||
for (line, raw_line) in self.state.lines.into_iter() {
|
||||
field_count += line.field_set.len();
|
||||
series_key_count += line
|
||||
.series
|
||||
.series_key
|
||||
.as_ref()
|
||||
.map(|sk| sk.len())
|
||||
.unwrap_or(0);
|
||||
|
||||
convert_v3_parsed_line(
|
||||
line,
|
||||
raw_line,
|
||||
&mut segment_table_batches,
|
||||
ingest_time,
|
||||
segment_duration,
|
||||
precision,
|
||||
);
|
||||
}
|
||||
|
||||
let valid_segmented_data = segment_table_batches
|
||||
.into_iter()
|
||||
.map(|(segment_start, table_batch_map)| ValidSegmentedData {
|
||||
database_name: self.state.catalog.db_name.clone(),
|
||||
segment_start,
|
||||
table_batches: table_batch_map.table_batches,
|
||||
wal_op: WalOp::LpWrite(LpWriteOp {
|
||||
db_name: self.state.catalog.db_name.to_string(),
|
||||
lp: table_batch_map.lines.join("\n"),
|
||||
default_time: ingest_time.timestamp_nanos(),
|
||||
precision,
|
||||
}),
|
||||
starting_catalog_sequence_number: self.state.catalog.sequence,
|
||||
})
|
||||
.collect();
|
||||
|
||||
ValidatedLines {
|
||||
line_count,
|
||||
field_count,
|
||||
index_count: series_key_count,
|
||||
errors: self.state.errors,
|
||||
valid_segmented_data,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn convert_v3_parsed_line<'a>(
|
||||
line: v3::ParsedLine<'_>,
|
||||
raw_line: &'a str,
|
||||
segment_table_batches: &mut HashMap<Time, TableBatchMap<'a>>,
|
||||
ingest_time: Time,
|
||||
segment_duration: SegmentDuration,
|
||||
precision: Precision,
|
||||
) {
|
||||
// Set up row values:
|
||||
let mut values = Vec::with_capacity(line.column_count() + 1);
|
||||
|
||||
// Add series key columns:
|
||||
if let Some(series_key) = line.series.series_key {
|
||||
for (sk, sv) in series_key.iter() {
|
||||
values.push(Field {
|
||||
name: sk.to_string(),
|
||||
value: sv.into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Add fields columns:
|
||||
for (name, val) in line.field_set {
|
||||
values.push(Field {
|
||||
name: name.to_string(),
|
||||
value: val.into(),
|
||||
});
|
||||
}
|
||||
|
||||
// Add time column:
|
||||
// TODO: change the default time resolution to microseconds in v3
|
||||
let time_value_nanos = line
|
||||
.timestamp
|
||||
.map(|ts| apply_precision_to_timestamp(precision, ts))
|
||||
.unwrap_or(ingest_time.timestamp_nanos());
|
||||
values.push(Field {
|
||||
name: TIME_COLUMN_NAME.to_string(),
|
||||
value: FieldData::Timestamp(time_value_nanos),
|
||||
});
|
||||
|
||||
// Add the row to the table batch, creating a new entry for the segment if
|
||||
// it does not already exist:
|
||||
let segment_start = segment_duration.start_time(time_value_nanos / 1_000_000_000);
|
||||
let table_batch_map = segment_table_batches.entry(segment_start).or_default();
|
||||
let table_batch = table_batch_map
|
||||
.table_batches
|
||||
.entry(line.series.measurement.to_string())
|
||||
.or_default();
|
||||
table_batch.rows.push(Row {
|
||||
time: time_value_nanos,
|
||||
fields: values,
|
||||
});
|
||||
table_batch_map.lines.push(raw_line);
|
||||
}
|
||||
|
||||
impl<'lp> WriteValidator<LinesParsed<'lp, ParsedLine<'lp>>> {
|
||||
/// Convert a set of valid parsed lines to a [`ValidatedLines`] which will
|
||||
/// be buffered and written to the WAL, if configured.
|
||||
///
|
||||
/// This involves splitting out the writes into different batches for any
|
||||
/// segment affected by the write. This function should be infallible, because
|
||||
/// the schema for incoming writes has been fully validated.
|
||||
pub(crate) fn convert_lines_to_buffer(
|
||||
self,
|
||||
ingest_time: Time,
|
||||
segment_duration: SegmentDuration,
|
||||
precision: Precision,
|
||||
) -> ValidatedLines {
|
||||
let mut segment_table_batches = HashMap::new();
|
||||
let line_count = self.state.lines.len();
|
||||
let mut field_count = 0;
|
||||
let mut tag_count = 0;
|
||||
|
||||
for (line, raw_line) in self.state.lines.into_iter() {
|
||||
field_count += line.field_set.len();
|
||||
tag_count += line.series.tag_set.as_ref().map(|t| t.len()).unwrap_or(0);
|
||||
|
||||
convert_v1_parsed_line(
|
||||
line,
|
||||
raw_line,
|
||||
&mut segment_table_batches,
|
||||
ingest_time,
|
||||
segment_duration,
|
||||
precision,
|
||||
);
|
||||
}
|
||||
|
||||
let valid_segmented_data = segment_table_batches
|
||||
.into_iter()
|
||||
.map(|(segment_start, table_batches)| ValidSegmentedData {
|
||||
database_name: self.state.catalog.db_name.clone(),
|
||||
segment_start,
|
||||
table_batches: table_batches.table_batches,
|
||||
wal_op: WalOp::LpWrite(LpWriteOp {
|
||||
db_name: self.state.catalog.db_name.to_string(),
|
||||
lp: table_batches.lines.join("\n"),
|
||||
default_time: ingest_time.timestamp_nanos(),
|
||||
precision,
|
||||
}),
|
||||
starting_catalog_sequence_number: self.state.catalog.sequence,
|
||||
})
|
||||
.collect();
|
||||
|
||||
ValidatedLines {
|
||||
line_count,
|
||||
field_count,
|
||||
index_count: tag_count,
|
||||
errors: self.state.errors,
|
||||
valid_segmented_data,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn convert_v1_parsed_line<'a>(
|
||||
line: ParsedLine<'_>,
|
||||
raw_line: &'a str,
|
||||
segment_table_batches: &mut HashMap<Time, TableBatchMap<'a>>,
|
||||
ingest_time: Time,
|
||||
segment_duration: SegmentDuration,
|
||||
precision: Precision,
|
||||
) {
|
||||
// now that we've ensured all columns exist in the schema, construct the actual row and values
|
||||
// while validating the column types match.
|
||||
let mut values = Vec::with_capacity(line.column_count() + 1);
|
||||
|
||||
// validate tags, collecting any new ones that must be inserted, or adding the values
|
||||
if let Some(tag_set) = line.series.tag_set {
|
||||
for (tag_key, value) in tag_set {
|
||||
let value = Field {
|
||||
name: tag_key.to_string(),
|
||||
value: FieldData::Tag(value.to_string()),
|
||||
};
|
||||
values.push(value);
|
||||
}
|
||||
}
|
||||
|
||||
// validate fields, collecting any new ones that must be inserted, or adding values
|
||||
for (field_name, value) in line.field_set {
|
||||
let field_data = match value {
|
||||
FieldValue::I64(v) => FieldData::Integer(v),
|
||||
FieldValue::F64(v) => FieldData::Float(v),
|
||||
FieldValue::U64(v) => FieldData::UInteger(v),
|
||||
FieldValue::Boolean(v) => FieldData::Boolean(v),
|
||||
FieldValue::String(v) => FieldData::String(v.to_string()),
|
||||
};
|
||||
let value = Field {
|
||||
name: field_name.to_string(),
|
||||
value: field_data,
|
||||
};
|
||||
values.push(value);
|
||||
}
|
||||
|
||||
// set the time value
|
||||
let time_value_nanos = line
|
||||
.timestamp
|
||||
.map(|ts| apply_precision_to_timestamp(precision, ts))
|
||||
.unwrap_or(ingest_time.timestamp_nanos());
|
||||
|
||||
let segment_start = segment_duration.start_time(time_value_nanos / 1_000_000_000);
|
||||
|
||||
values.push(Field {
|
||||
name: TIME_COLUMN_NAME.to_string(),
|
||||
value: FieldData::Timestamp(time_value_nanos),
|
||||
});
|
||||
|
||||
let table_batch_map = segment_table_batches.entry(segment_start).or_default();
|
||||
|
||||
let table_batch = table_batch_map
|
||||
.table_batches
|
||||
.entry(line.series.measurement.to_string())
|
||||
.or_default();
|
||||
table_batch.rows.push(Row {
|
||||
time: time_value_nanos,
|
||||
fields: values,
|
||||
});
|
||||
|
||||
table_batch_map.lines.push(raw_line);
|
||||
}
|
||||
|
||||
fn apply_precision_to_timestamp(precision: Precision, ts: i64) -> i64 {
|
||||
let multiplier = match precision {
|
||||
Precision::Auto => match crate::guess_precision(ts) {
|
||||
Precision::Second => 1_000_000_000,
|
||||
Precision::Millisecond => 1_000_000,
|
||||
Precision::Microsecond => 1_000,
|
||||
Precision::Nanosecond => 1,
|
||||
|
||||
Precision::Auto => unreachable!(),
|
||||
},
|
||||
Precision::Second => 1_000_000_000,
|
||||
Precision::Millisecond => 1_000_000,
|
||||
Precision::Microsecond => 1_000,
|
||||
Precision::Nanosecond => 1,
|
||||
};
|
||||
|
||||
ts * multiplier
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use data_types::NamespaceName;
|
||||
use iox_time::Time;
|
||||
|
||||
use crate::{catalog::Catalog, write_buffer::Error, Precision, SegmentDuration};
|
||||
|
||||
use super::WriteValidator;
|
||||
|
||||
#[test]
|
||||
fn write_validator_v1() -> Result<(), Error> {
|
||||
let namespace = NamespaceName::new("test").unwrap();
|
||||
let catalog = Arc::new(Catalog::new());
|
||||
let result = WriteValidator::initialize(namespace.clone(), catalog)?
|
||||
.v1_parse_lines_and_update_schema("cpu,tag1=foo val1=\"bar\" 1234", false)?
|
||||
.convert_lines_to_buffer(
|
||||
Time::from_timestamp_nanos(0),
|
||||
SegmentDuration::new_5m(),
|
||||
Precision::Auto,
|
||||
);
|
||||
|
||||
assert_eq!(result.line_count, 1);
|
||||
assert_eq!(result.field_count, 1);
|
||||
assert_eq!(result.index_count, 1);
|
||||
assert!(result.errors.is_empty());
|
||||
|
||||
let data = &result.valid_segmented_data[0];
|
||||
assert_eq!(data.database_name, namespace);
|
||||
let batch = data.table_batches.get("cpu").unwrap();
|
||||
assert_eq!(batch.rows.len(), 1);
|
||||
|
||||
println!("{result:#?}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue