feat: Set 72 hour query/write limit for Core (#25810)
This commit sets InfluxDB 3 Core to have a 72 hour limit for queries and writes. What this means is that writes that contain historical data older than 72 hours will be rejected and queries will filter out data older than 72 hours. Core is intended to be a recent timeseries database and performance over data older than 72 hours will degrade without a garbage collector, a core feature of InfluxDB 3 Enterprise. InfluxDB 3 Enterprise does not have this write or query limit in place. Note that this does *not* mean older data is deleted. Older data is still accessible in object storage as Parquet files that can still be used in other services and analyzed with dataframe libraries like pandas and polars. This commit does a few things: - Uses timestamps in the year 2065 for tests as these should not break for longer than many of us will be working in our lifetimes. This is only needed for the integration tests as other tests use the MockProvider for time. - Filters the buffer and persisted files to only show data newer than 3 days ago - Fixes the integration tests to work with the fact that writes older than 3 days are rejectedpull/25817/head
parent
4bfd95d068
commit
aa8a8c560d
|
@ -26,7 +26,7 @@ async fn auth() {
|
|||
client
|
||||
.post(&write_lp_url)
|
||||
.query(&write_lp_params)
|
||||
.body("cpu,host=a val=1i 123")
|
||||
.body("cpu,host=a val=1i 2998574937")
|
||||
.send()
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -47,7 +47,7 @@ async fn auth() {
|
|||
client
|
||||
.post(&write_lp_url)
|
||||
.query(&write_lp_params)
|
||||
.body("cpu,host=a val=1i 123")
|
||||
.body("cpu,host=a val=1i 2998574937")
|
||||
.bearer_auth(TOKEN)
|
||||
.send()
|
||||
.await
|
||||
|
@ -59,7 +59,7 @@ async fn auth() {
|
|||
client
|
||||
.post(&write_lp_url)
|
||||
.query(&write_lp_params)
|
||||
.body("cpu,host=a val=1i 123")
|
||||
.body("cpu,host=a val=1i 2998574937")
|
||||
// support both Bearer and Token auth schemes
|
||||
.header("Authorization", format!("Token {TOKEN}"))
|
||||
.send()
|
||||
|
@ -141,10 +141,10 @@ async fn auth_grpc() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=s1,region=us-east usage=0.9 1\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 3",
|
||||
Precision::Nanosecond,
|
||||
"cpu,host=s1,region=us-east usage=0.9 2998574937\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2998574938\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 2998574939",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -167,13 +167,13 @@ async fn auth_grpc() {
|
|||
let batches = collect_stream(response).await;
|
||||
assert_batches_sorted_eq!(
|
||||
[
|
||||
"+------+---------+--------------------------------+-------+",
|
||||
"| host | region | time | usage |",
|
||||
"+------+---------+--------------------------------+-------+",
|
||||
"| s1 | us-east | 1970-01-01T00:00:00.000000001Z | 0.9 |",
|
||||
"| s1 | us-east | 1970-01-01T00:00:00.000000002Z | 0.89 |",
|
||||
"| s1 | us-east | 1970-01-01T00:00:00.000000003Z | 0.85 |",
|
||||
"+------+---------+--------------------------------+-------+",
|
||||
"+------+---------+----------------------+-------+",
|
||||
"| host | region | time | usage |",
|
||||
"+------+---------+----------------------+-------+",
|
||||
"| s1 | us-east | 2065-01-07T17:28:57Z | 0.9 |",
|
||||
"| s1 | us-east | 2065-01-07T17:28:58Z | 0.89 |",
|
||||
"| s1 | us-east | 2065-01-07T17:28:59Z | 0.85 |",
|
||||
"+------+---------+----------------------+-------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
|
|
|
@ -276,7 +276,7 @@ async fn test_create_table() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
db_name,
|
||||
format!("{table_name},one=1,two=2,three=3 four=\"4\",five=5u,six=6,seven=7i,eight=true 1000"),
|
||||
format!("{table_name},one=1,two=2,three=3 four=\"4\",five=5u,six=6,seven=7i,eight=true 2998574937"),
|
||||
influxdb3_client::Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -303,7 +303,7 @@ async fn test_create_table() {
|
|||
"six": 6.0,
|
||||
"seven": 7,
|
||||
"eight": true,
|
||||
"time": "1970-01-01T00:16:40"
|
||||
"time": "2065-01-07T17:28:57"
|
||||
}])
|
||||
);
|
||||
}
|
||||
|
|
|
@ -1055,7 +1055,7 @@ async fn api_v3_configure_table_create_then_write() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"bar,tag1=1,tag2=2 field1=1u,field2=2i,field3=3,field4=\"4\",field5=true 1000",
|
||||
"bar,tag1=1,tag2=2 field1=1u,field2=2i,field3=3,field4=\"4\",field5=true 2998574938",
|
||||
influxdb3_client::Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -1080,7 +1080,7 @@ async fn api_v3_configure_table_create_then_write() {
|
|||
"field3": 3.0,
|
||||
"field4": "4",
|
||||
"field5": true,
|
||||
"time": "1970-01-01T00:16:40"
|
||||
"time": "2065-01-07T17:28:58"
|
||||
}])
|
||||
);
|
||||
}
|
||||
|
@ -1129,7 +1129,7 @@ async fn api_v3_configure_table_create_no_fields() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"bar,one=1,two=2 new_field=0 1000",
|
||||
"bar,one=1,two=2 new_field=0 2998574938",
|
||||
influxdb3_client::Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -1150,7 +1150,7 @@ async fn api_v3_configure_table_create_no_fields() {
|
|||
"one": "1",
|
||||
"two": "2",
|
||||
"new_field": 0.0,
|
||||
"time": "1970-01-01T00:16:40"
|
||||
"time": "2065-01-07T17:28:58"
|
||||
}])
|
||||
);
|
||||
}
|
||||
|
|
|
@ -14,10 +14,10 @@ async fn flight() -> Result<(), influxdb3_client::Error> {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=s1,region=us-east usage=0.9 1\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 3",
|
||||
Precision::Nanosecond,
|
||||
"cpu,host=s1,region=us-east usage=0.9 2998574936\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2998574937\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 2998574938",
|
||||
Precision::Second,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -33,13 +33,13 @@ async fn flight() -> Result<(), influxdb3_client::Error> {
|
|||
let batches = collect_stream(response).await;
|
||||
assert_batches_sorted_eq!(
|
||||
[
|
||||
"+------+---------+--------------------------------+-------+",
|
||||
"| host | region | time | usage |",
|
||||
"+------+---------+--------------------------------+-------+",
|
||||
"| s1 | us-east | 1970-01-01T00:00:00.000000001Z | 0.9 |",
|
||||
"| s1 | us-east | 1970-01-01T00:00:00.000000002Z | 0.89 |",
|
||||
"| s1 | us-east | 1970-01-01T00:00:00.000000003Z | 0.85 |",
|
||||
"+------+---------+--------------------------------+-------+",
|
||||
"+------+---------+----------------------+-------+",
|
||||
"| host | region | time | usage |",
|
||||
"+------+---------+----------------------+-------+",
|
||||
"| s1 | us-east | 2065-01-07T17:28:56Z | 0.9 |",
|
||||
"| s1 | us-east | 2065-01-07T17:28:57Z | 0.89 |",
|
||||
"| s1 | us-east | 2065-01-07T17:28:58Z | 0.85 |",
|
||||
"+------+---------+----------------------+-------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
|
@ -68,13 +68,13 @@ async fn flight() -> Result<(), influxdb3_client::Error> {
|
|||
let batches = collect_stream(stream).await;
|
||||
assert_batches_sorted_eq!(
|
||||
[
|
||||
"+------+---------+--------------------------------+-------+",
|
||||
"| host | region | time | usage |",
|
||||
"+------+---------+--------------------------------+-------+",
|
||||
"| s1 | us-east | 1970-01-01T00:00:00.000000001Z | 0.9 |",
|
||||
"| s1 | us-east | 1970-01-01T00:00:00.000000002Z | 0.89 |",
|
||||
"| s1 | us-east | 1970-01-01T00:00:00.000000003Z | 0.85 |",
|
||||
"+------+---------+--------------------------------+-------+",
|
||||
"+------+---------+----------------------+-------+",
|
||||
"| host | region | time | usage |",
|
||||
"+------+---------+----------------------+-------+",
|
||||
"| s1 | us-east | 2065-01-07T17:28:56Z | 0.9 |",
|
||||
"| s1 | us-east | 2065-01-07T17:28:57Z | 0.89 |",
|
||||
"| s1 | us-east | 2065-01-07T17:28:58Z | 0.85 |",
|
||||
"+------+---------+----------------------+-------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
|
@ -155,10 +155,10 @@ async fn flight_influxql() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=s1,region=us-east usage=0.9 1\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 3",
|
||||
Precision::Nanosecond,
|
||||
"cpu,host=s1,region=us-east usage=0.9 2998574936\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2998574937\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 2998574938",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
|
@ -12,7 +12,7 @@ async fn limits() -> Result<(), Error> {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
db,
|
||||
"cpu,host=s1,region=us-east usage=0.9 1\n",
|
||||
"cpu,host=s1,region=us-east usage=0.9 2998574938\n",
|
||||
Precision::Nanosecond,
|
||||
)
|
||||
.await?;
|
||||
|
@ -21,8 +21,8 @@ async fn limits() -> Result<(), Error> {
|
|||
let Err(Error::ApiError { code, .. }) = server
|
||||
.write_lp_to_db(
|
||||
"six",
|
||||
"cpu,host=s1,region=us-east usage=0.9 1\n",
|
||||
Precision::Nanosecond,
|
||||
"cpu,host=s1,region=us-east usage=0.9 2998574938\n",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
else {
|
||||
|
@ -35,19 +35,19 @@ async fn limits() -> Result<(), Error> {
|
|||
let table_lp = (0..1995).fold(String::new(), |mut acc, i| {
|
||||
acc.push_str("cpu");
|
||||
acc.push_str(&i.to_string());
|
||||
acc.push_str(",host=s1,region=us-east usage=0.9 1\n");
|
||||
acc.push_str(",host=s1,region=us-east usage=0.9 2998574938\n");
|
||||
acc
|
||||
});
|
||||
|
||||
server
|
||||
.write_lp_to_db("one", &table_lp, Precision::Nanosecond)
|
||||
.write_lp_to_db("one", &table_lp, Precision::Second)
|
||||
.await?;
|
||||
|
||||
let Err(Error::ApiError { code, .. }) = server
|
||||
.write_lp_to_db(
|
||||
"six",
|
||||
"cpu2000,host=s1,region=us-east usage=0.9 1\n",
|
||||
Precision::Nanosecond,
|
||||
"cpu2000,host=s1,region=us-east usage=0.9 2998574938\n",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
else {
|
||||
|
@ -63,15 +63,15 @@ async fn limits() -> Result<(), Error> {
|
|||
lp_500.push_str(&column);
|
||||
lp_501.push_str(&column);
|
||||
}
|
||||
lp_500.push_str(" 0\n");
|
||||
lp_501.push_str(",column501=1 0\n");
|
||||
lp_500.push_str(" 2998574938\n");
|
||||
lp_501.push_str(",column501=1 2998574938\n");
|
||||
|
||||
server
|
||||
.write_lp_to_db("one", &lp_500, Precision::Nanosecond)
|
||||
.write_lp_to_db("one", &lp_500, Precision::Second)
|
||||
.await?;
|
||||
|
||||
let Err(Error::ApiError { code, .. }) = server
|
||||
.write_lp_to_db("one", &lp_501, Precision::Nanosecond)
|
||||
.write_lp_to_db("one", &lp_501, Precision::Second)
|
||||
.await
|
||||
else {
|
||||
panic!("Did not error when adding 501st column");
|
||||
|
|
|
@ -14,10 +14,10 @@ async fn api_v3_query_sql() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=s1,region=us-east usage=0.9 1\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 3",
|
||||
Precision::Nanosecond,
|
||||
"cpu,host=s1,region=us-east usage=0.9 2998574936\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2998574937\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 2998574938",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -26,13 +26,13 @@ async fn api_v3_query_sql() {
|
|||
TestCase {
|
||||
database: Some("foo"),
|
||||
query: "SELECT host, region, time, usage FROM cpu",
|
||||
expected: "+------+---------+-------------------------------+-------+\n\
|
||||
| host | region | time | usage |\n\
|
||||
+------+---------+-------------------------------+-------+\n\
|
||||
| s1 | us-east | 1970-01-01T00:00:00.000000001 | 0.9 |\n\
|
||||
| s1 | us-east | 1970-01-01T00:00:00.000000002 | 0.89 |\n\
|
||||
| s1 | us-east | 1970-01-01T00:00:00.000000003 | 0.85 |\n\
|
||||
+------+---------+-------------------------------+-------+",
|
||||
expected: "+------+---------+---------------------+-------+\n\
|
||||
| host | region | time | usage |\n\
|
||||
+------+---------+---------------------+-------+\n\
|
||||
| s1 | us-east | 2065-01-07T17:28:56 | 0.9 |\n\
|
||||
| s1 | us-east | 2065-01-07T17:28:57 | 0.89 |\n\
|
||||
| s1 | us-east | 2065-01-07T17:28:58 | 0.85 |\n\
|
||||
+------+---------+---------------------+-------+",
|
||||
},
|
||||
TestCase {
|
||||
database: Some("foo"),
|
||||
|
@ -76,14 +76,14 @@ async fn api_v3_query_sql_params() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=a,region=us-east usage=0.9 1
|
||||
cpu,host=b,region=us-east usage=0.50 1
|
||||
cpu,host=a,region=us-east usage=0.80 2
|
||||
cpu,host=b,region=us-east usage=0.60 2
|
||||
cpu,host=a,region=us-east usage=0.70 3
|
||||
cpu,host=b,region=us-east usage=0.70 3
|
||||
cpu,host=a,region=us-east usage=0.50 4
|
||||
cpu,host=b,region=us-east usage=0.80 4",
|
||||
"cpu,host=a,region=us-east usage=0.9 2998574936
|
||||
cpu,host=b,region=us-east usage=0.50 2998574936
|
||||
cpu,host=a,region=us-east usage=0.80 2998574937
|
||||
cpu,host=b,region=us-east usage=0.60 2998574937
|
||||
cpu,host=a,region=us-east usage=0.70 2998574938
|
||||
cpu,host=b,region=us-east usage=0.70 2998574938
|
||||
cpu,host=a,region=us-east usage=0.50 2998574939
|
||||
cpu,host=b,region=us-east usage=0.80 2998574939",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -116,8 +116,8 @@ async fn api_v3_query_sql_params() {
|
|||
"+------+---------+---------------------+-------+\n\
|
||||
| host | region | time | usage |\n\
|
||||
+------+---------+---------------------+-------+\n\
|
||||
| b | us-east | 1970-01-01T00:00:03 | 0.7 |\n\
|
||||
| b | us-east | 1970-01-01T00:00:04 | 0.8 |\n\
|
||||
| b | us-east | 2065-01-07T17:28:58 | 0.7 |\n\
|
||||
| b | us-east | 2065-01-07T17:28:59 | 0.8 |\n\
|
||||
+------+---------+---------------------+-------+",
|
||||
resp
|
||||
);
|
||||
|
@ -152,8 +152,8 @@ async fn api_v3_query_sql_params() {
|
|||
"+------+---------+---------------------+-------+\n\
|
||||
| host | region | time | usage |\n\
|
||||
+------+---------+---------------------+-------+\n\
|
||||
| b | us-east | 1970-01-01T00:00:03 | 0.7 |\n\
|
||||
| b | us-east | 1970-01-01T00:00:04 | 0.8 |\n\
|
||||
| b | us-east | 2065-01-07T17:28:58 | 0.7 |\n\
|
||||
| b | us-east | 2065-01-07T17:28:59 | 0.8 |\n\
|
||||
+------+---------+---------------------+-------+",
|
||||
resp
|
||||
);
|
||||
|
@ -195,13 +195,13 @@ async fn api_v3_query_influxql() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=s1,region=us-east usage=0.9 1\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 3\n\
|
||||
mem,host=s1,region=us-east usage=0.5 4\n\
|
||||
mem,host=s1,region=us-east usage=0.6 5\n\
|
||||
mem,host=s1,region=us-east usage=0.7 6",
|
||||
Precision::Nanosecond,
|
||||
"cpu,host=s1,region=us-east usage=0.9 2998574930\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2998574931\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 2998574932\n
|
||||
mem,host=s1,region=us-east usage=0.5 2998574933\n\
|
||||
mem,host=s1,region=us-east usage=0.6 2998574934\n\
|
||||
mem,host=s1,region=us-east usage=0.7 2998574935",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -210,13 +210,13 @@ async fn api_v3_query_influxql() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"bar",
|
||||
"cpu,host=s1,region=us-east usage=0.9 1\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 3\n\
|
||||
mem,host=s1,region=us-east usage=0.5 4\n\
|
||||
mem,host=s1,region=us-east usage=0.6 5\n\
|
||||
mem,host=s1,region=us-east usage=0.7 6",
|
||||
Precision::Nanosecond,
|
||||
"cpu,host=s1,region=us-east usage=0.9 2998574930\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2998574931\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 2998574932\n\
|
||||
mem,host=s1,region=us-east usage=0.5 2998574933\n\
|
||||
mem,host=s1,region=us-east usage=0.6 2998574934\n\
|
||||
mem,host=s1,region=us-east usage=0.7 2998574935",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -225,41 +225,38 @@ async fn api_v3_query_influxql() {
|
|||
TestCase {
|
||||
database: Some("foo"),
|
||||
query: "SELECT time, host, region, usage FROM cpu",
|
||||
expected:
|
||||
"+------------------+-------------------------------+------+---------+-------+\n\
|
||||
| iox::measurement | time | host | region | usage |\n\
|
||||
+------------------+-------------------------------+------+---------+-------+\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000001 | s1 | us-east | 0.9 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000002 | s1 | us-east | 0.89 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000003 | s1 | us-east | 0.85 |\n\
|
||||
+------------------+-------------------------------+------+---------+-------+",
|
||||
expected: "+------------------+---------------------+------+---------+-------+\n\
|
||||
| iox::measurement | time | host | region | usage |\n\
|
||||
+------------------+---------------------+------+---------+-------+\n\
|
||||
| cpu | 2065-01-07T17:28:50 | s1 | us-east | 0.9 |\n\
|
||||
| cpu | 2065-01-07T17:28:51 | s1 | us-east | 0.89 |\n\
|
||||
| cpu | 2065-01-07T17:28:52 | s1 | us-east | 0.85 |\n\
|
||||
+------------------+---------------------+------+---------+-------+",
|
||||
},
|
||||
TestCase {
|
||||
database: None,
|
||||
query: "SELECT time, host, region, usage FROM foo.autogen.cpu",
|
||||
expected:
|
||||
"+------------------+-------------------------------+------+---------+-------+\n\
|
||||
| iox::measurement | time | host | region | usage |\n\
|
||||
+------------------+-------------------------------+------+---------+-------+\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000001 | s1 | us-east | 0.9 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000002 | s1 | us-east | 0.89 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000003 | s1 | us-east | 0.85 |\n\
|
||||
+------------------+-------------------------------+------+---------+-------+",
|
||||
expected: "+------------------+---------------------+------+---------+-------+\n\
|
||||
| iox::measurement | time | host | region | usage |\n\
|
||||
+------------------+---------------------+------+---------+-------+\n\
|
||||
| cpu | 2065-01-07T17:28:50 | s1 | us-east | 0.9 |\n\
|
||||
| cpu | 2065-01-07T17:28:51 | s1 | us-east | 0.89 |\n\
|
||||
| cpu | 2065-01-07T17:28:52 | s1 | us-east | 0.85 |\n\
|
||||
+------------------+---------------------+------+---------+-------+",
|
||||
},
|
||||
TestCase {
|
||||
database: Some("foo"),
|
||||
query: "SELECT host, region, usage FROM cpu, mem",
|
||||
expected:
|
||||
"+------------------+-------------------------------+------+---------+-------+\n\
|
||||
| iox::measurement | time | host | region | usage |\n\
|
||||
+------------------+-------------------------------+------+---------+-------+\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000001 | s1 | us-east | 0.9 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000002 | s1 | us-east | 0.89 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000003 | s1 | us-east | 0.85 |\n\
|
||||
| mem | 1970-01-01T00:00:00.000000004 | s1 | us-east | 0.5 |\n\
|
||||
| mem | 1970-01-01T00:00:00.000000005 | s1 | us-east | 0.6 |\n\
|
||||
| mem | 1970-01-01T00:00:00.000000006 | s1 | us-east | 0.7 |\n\
|
||||
+------------------+-------------------------------+------+---------+-------+",
|
||||
expected: "+------------------+---------------------+------+---------+-------+\n\
|
||||
| iox::measurement | time | host | region | usage |\n\
|
||||
+------------------+---------------------+------+---------+-------+\n\
|
||||
| cpu | 2065-01-07T17:28:50 | s1 | us-east | 0.9 |\n\
|
||||
| cpu | 2065-01-07T17:28:51 | s1 | us-east | 0.89 |\n\
|
||||
| cpu | 2065-01-07T17:28:52 | s1 | us-east | 0.85 |\n\
|
||||
| mem | 2065-01-07T17:28:53 | s1 | us-east | 0.5 |\n\
|
||||
| mem | 2065-01-07T17:28:54 | s1 | us-east | 0.6 |\n\
|
||||
| mem | 2065-01-07T17:28:55 | s1 | us-east | 0.7 |\n\
|
||||
+------------------+---------------------+------+---------+-------+",
|
||||
},
|
||||
TestCase {
|
||||
database: Some("foo"),
|
||||
|
@ -327,7 +324,8 @@ async fn api_v3_query_influxql() {
|
|||
},
|
||||
TestCase {
|
||||
database: Some("foo"),
|
||||
query: "SHOW TAG VALUES WITH KEY = \"host\" WHERE time < 1970-01-02",
|
||||
// TODO: WHERE time < 2065-01-08 does not work for some reason
|
||||
query: "SHOW TAG VALUES WITH KEY = \"host\" WHERE time > 2065-01-07",
|
||||
expected: "+------------------+------+-------+\n\
|
||||
| iox::measurement | key | value |\n\
|
||||
+------------------+------+-------+\n\
|
||||
|
@ -337,7 +335,8 @@ async fn api_v3_query_influxql() {
|
|||
},
|
||||
TestCase {
|
||||
database: None,
|
||||
query: "SHOW TAG VALUES ON foo WITH KEY = \"host\" WHERE time < 1970-01-02",
|
||||
// TODO: WHERE time < 2065-01-08 does not work for some reason
|
||||
query: "SHOW TAG VALUES ON foo WITH KEY = \"host\" WHERE time > 2065-01-07",
|
||||
expected: "+------------------+------+-------+\n\
|
||||
| iox::measurement | key | value |\n\
|
||||
+------------------+------+-------+\n\
|
||||
|
@ -417,14 +416,14 @@ async fn api_v3_query_influxql_params() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=a,region=us-east usage=0.9 1
|
||||
cpu,host=b,region=us-east usage=0.50 1
|
||||
cpu,host=a,region=us-east usage=0.80 2
|
||||
cpu,host=b,region=us-east usage=0.60 2
|
||||
cpu,host=a,region=us-east usage=0.70 3
|
||||
cpu,host=b,region=us-east usage=0.70 3
|
||||
cpu,host=a,region=us-east usage=0.50 4
|
||||
cpu,host=b,region=us-east usage=0.80 4",
|
||||
"cpu,host=a,region=us-east usage=0.9 2998574931
|
||||
cpu,host=b,region=us-east usage=0.50 2998574931
|
||||
cpu,host=a,region=us-east usage=0.80 2998574932
|
||||
cpu,host=b,region=us-east usage=0.60 2998574932
|
||||
cpu,host=a,region=us-east usage=0.70 2998574933
|
||||
cpu,host=b,region=us-east usage=0.70 2998574933
|
||||
cpu,host=a,region=us-east usage=0.50 2998574934
|
||||
cpu,host=b,region=us-east usage=0.80 2998574934",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -457,8 +456,8 @@ async fn api_v3_query_influxql_params() {
|
|||
"+------------------+---------------------+------+---------+-------+\n\
|
||||
| iox::measurement | time | host | region | usage |\n\
|
||||
+------------------+---------------------+------+---------+-------+\n\
|
||||
| cpu | 1970-01-01T00:00:03 | b | us-east | 0.7 |\n\
|
||||
| cpu | 1970-01-01T00:00:04 | b | us-east | 0.8 |\n\
|
||||
| cpu | 2065-01-07T17:28:53 | b | us-east | 0.7 |\n\
|
||||
| cpu | 2065-01-07T17:28:54 | b | us-east | 0.8 |\n\
|
||||
+------------------+---------------------+------+---------+-------+",
|
||||
resp
|
||||
);
|
||||
|
@ -493,8 +492,8 @@ async fn api_v3_query_influxql_params() {
|
|||
"+------------------+---------------------+------+---------+-------+\n\
|
||||
| iox::measurement | time | host | region | usage |\n\
|
||||
+------------------+---------------------+------+---------+-------+\n\
|
||||
| cpu | 1970-01-01T00:00:03 | b | us-east | 0.7 |\n\
|
||||
| cpu | 1970-01-01T00:00:04 | b | us-east | 0.8 |\n\
|
||||
| cpu | 2065-01-07T17:28:53 | b | us-east | 0.7 |\n\
|
||||
| cpu | 2065-01-07T17:28:54 | b | us-east | 0.8 |\n\
|
||||
+------------------+---------------------+------+---------+-------+",
|
||||
resp
|
||||
);
|
||||
|
@ -540,14 +539,14 @@ async fn api_v3_query_json_format() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=a,region=us-east usage=0.9 1
|
||||
cpu,host=b,region=us-east usage=0.50 1
|
||||
cpu,host=a,region=us-east usage=0.80 2
|
||||
cpu,host=b,region=us-east usage=0.60 2
|
||||
cpu,host=a,region=us-east usage=0.70 3
|
||||
cpu,host=b,region=us-east usage=0.70 3
|
||||
cpu,host=a,region=us-east usage=0.50 4
|
||||
cpu,host=b,region=us-east usage=0.80 4",
|
||||
"cpu,host=a,region=us-east usage=0.9 2998574931
|
||||
cpu,host=b,region=us-east usage=0.50 2998574931
|
||||
cpu,host=a,region=us-east usage=0.80 2998574932
|
||||
cpu,host=b,region=us-east usage=0.60 2998574932
|
||||
cpu,host=a,region=us-east usage=0.70 2998574933
|
||||
cpu,host=b,region=us-east usage=0.70 2998574933
|
||||
cpu,host=a,region=us-east usage=0.50 2998574934
|
||||
cpu,host=b,region=us-east usage=0.80 2998574934",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -568,56 +567,56 @@ async fn api_v3_query_json_format() {
|
|||
"host": "a",
|
||||
"iox::measurement": "cpu",
|
||||
"region": "us-east",
|
||||
"time": "1970-01-01T00:00:01",
|
||||
"time": "2065-01-07T17:28:51",
|
||||
"usage": 0.9
|
||||
},
|
||||
{
|
||||
"host": "b",
|
||||
"iox::measurement": "cpu",
|
||||
"region": "us-east",
|
||||
"time": "1970-01-01T00:00:01",
|
||||
"time": "2065-01-07T17:28:51",
|
||||
"usage": 0.5
|
||||
},
|
||||
{
|
||||
"host": "a",
|
||||
"iox::measurement": "cpu",
|
||||
"region": "us-east",
|
||||
"time": "1970-01-01T00:00:02",
|
||||
"time": "2065-01-07T17:28:52",
|
||||
"usage": 0.8
|
||||
},
|
||||
{
|
||||
"host": "b",
|
||||
"iox::measurement": "cpu",
|
||||
"region": "us-east",
|
||||
"time": "1970-01-01T00:00:02",
|
||||
"time": "2065-01-07T17:28:52",
|
||||
"usage": 0.6
|
||||
},
|
||||
{
|
||||
"host": "a",
|
||||
"iox::measurement": "cpu",
|
||||
"region": "us-east",
|
||||
"time": "1970-01-01T00:00:03",
|
||||
"time": "2065-01-07T17:28:53",
|
||||
"usage": 0.7
|
||||
},
|
||||
{
|
||||
"host": "b",
|
||||
"iox::measurement": "cpu",
|
||||
"region": "us-east",
|
||||
"time": "1970-01-01T00:00:03",
|
||||
"time": "2065-01-07T17:28:53",
|
||||
"usage": 0.7
|
||||
},
|
||||
{
|
||||
"host": "a",
|
||||
"iox::measurement": "cpu",
|
||||
"region": "us-east",
|
||||
"time": "1970-01-01T00:00:04",
|
||||
"time": "2065-01-07T17:28:54",
|
||||
"usage": 0.5
|
||||
},
|
||||
{
|
||||
"host": "b",
|
||||
"iox::measurement": "cpu",
|
||||
"region": "us-east",
|
||||
"time": "1970-01-01T00:00:04",
|
||||
"time": "2065-01-07T17:28:54",
|
||||
"usage": 0.8
|
||||
}
|
||||
]),
|
||||
|
@ -714,14 +713,14 @@ async fn api_v3_query_jsonl_format() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=a,region=us-east usage=0.9 1
|
||||
cpu,host=b,region=us-east usage=0.50 1
|
||||
cpu,host=a,region=us-east usage=0.80 2
|
||||
cpu,host=b,region=us-east usage=0.60 2
|
||||
cpu,host=a,region=us-east usage=0.70 3
|
||||
cpu,host=b,region=us-east usage=0.70 3
|
||||
cpu,host=a,region=us-east usage=0.50 4
|
||||
cpu,host=b,region=us-east usage=0.80 4",
|
||||
"cpu,host=a,region=us-east usage=0.9 2998574931
|
||||
cpu,host=b,region=us-east usage=0.50 2998574931
|
||||
cpu,host=a,region=us-east usage=0.80 2998574932
|
||||
cpu,host=b,region=us-east usage=0.60 2998574932
|
||||
cpu,host=a,region=us-east usage=0.70 2998574933
|
||||
cpu,host=b,region=us-east usage=0.70 2998574933
|
||||
cpu,host=a,region=us-east usage=0.50 2998574934
|
||||
cpu,host=b,region=us-east usage=0.80 2998574934",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -737,14 +736,14 @@ async fn api_v3_query_jsonl_format() {
|
|||
TestCase {
|
||||
database: Some("foo"),
|
||||
query: "SELECT time, host, region, usage FROM cpu",
|
||||
expected: "{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:01\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.9}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:01\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.5}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:02\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.8}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:02\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.6}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:03\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.7}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:03\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.7}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:04\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.5}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"1970-01-01T00:00:04\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.8}\n"
|
||||
expected: "{\"iox::measurement\":\"cpu\",\"time\":\"2065-01-07T17:28:51\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.9}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"2065-01-07T17:28:51\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.5}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"2065-01-07T17:28:52\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.8}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"2065-01-07T17:28:52\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.6}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"2065-01-07T17:28:53\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.7}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"2065-01-07T17:28:53\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.7}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"2065-01-07T17:28:54\",\"host\":\"a\",\"region\":\"us-east\",\"usage\":0.5}\n\
|
||||
{\"iox::measurement\":\"cpu\",\"time\":\"2065-01-07T17:28:54\",\"host\":\"b\",\"region\":\"us-east\",\"usage\":0.8}\n"
|
||||
.into(),
|
||||
},
|
||||
TestCase {
|
||||
|
@ -799,12 +798,12 @@ async fn api_v1_query_json_format() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=a usage=0.9 1\n\
|
||||
cpu,host=a usage=0.89 2\n\
|
||||
cpu,host=a usage=0.85 3\n\
|
||||
mem,host=a usage=0.5 4\n\
|
||||
mem,host=a usage=0.6 5\n\
|
||||
mem,host=a usage=0.7 6",
|
||||
"cpu,host=a usage=0.9 2998574931\n\
|
||||
cpu,host=a usage=0.89 2998574932\n\
|
||||
cpu,host=a usage=0.85 2998574933\n\
|
||||
mem,host=a usage=0.5 2998574934\n\
|
||||
mem,host=a usage=0.6 2998574935\n\
|
||||
mem,host=a usage=0.7 2998574936",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -842,9 +841,9 @@ async fn api_v1_query_json_format() {
|
|||
],
|
||||
"name": "cpu",
|
||||
"values": [
|
||||
["1970-01-01T00:00:01Z", "a", 0.9],
|
||||
["1970-01-01T00:00:02Z", "a", 0.89],
|
||||
["1970-01-01T00:00:03Z", "a", 0.85]
|
||||
["2065-01-07T17:28:51Z", "a", 0.9],
|
||||
["2065-01-07T17:28:52Z", "a", 0.89],
|
||||
["2065-01-07T17:28:53Z", "a", 0.85]
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -870,9 +869,9 @@ async fn api_v1_query_json_format() {
|
|||
],
|
||||
"name": "mem",
|
||||
"values": [
|
||||
["1970-01-01T00:00:04Z", "a", 0.5],
|
||||
["1970-01-01T00:00:05Z", "a", 0.6],
|
||||
["1970-01-01T00:00:06Z", "a", 0.7]
|
||||
["2065-01-07T17:28:54Z", "a", 0.5],
|
||||
["2065-01-07T17:28:55Z", "a", 0.6],
|
||||
["2065-01-07T17:28:56Z", "a", 0.7]
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -883,9 +882,9 @@ async fn api_v1_query_json_format() {
|
|||
],
|
||||
"name": "cpu",
|
||||
"values": [
|
||||
["1970-01-01T00:00:01Z", "a", 0.9],
|
||||
["1970-01-01T00:00:02Z", "a", 0.89],
|
||||
["1970-01-01T00:00:03Z", "a", 0.85]
|
||||
["2065-01-07T17:28:51Z", "a", 0.9],
|
||||
["2065-01-07T17:28:52Z", "a", 0.89],
|
||||
["2065-01-07T17:28:53Z", "a", 0.85]
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -911,9 +910,9 @@ async fn api_v1_query_json_format() {
|
|||
],
|
||||
"name": "cpu",
|
||||
"values": [
|
||||
["1970-01-01T00:00:01Z", "a", 0.9],
|
||||
["1970-01-01T00:00:02Z", "a", 0.89],
|
||||
["1970-01-01T00:00:03Z", "a", 0.85]
|
||||
["2065-01-07T17:28:51Z", "a", 0.9],
|
||||
["2065-01-07T17:28:52Z", "a", 0.89],
|
||||
["2065-01-07T17:28:53Z", "a", 0.85]
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -939,9 +938,9 @@ async fn api_v1_query_json_format() {
|
|||
],
|
||||
"name": "cpu",
|
||||
"values": [
|
||||
[1, "a", 0.9],
|
||||
[2, "a", 0.89],
|
||||
[3, "a", 0.85]
|
||||
[2998574931u32, "a", 0.9],
|
||||
[2998574932u32, "a", 0.89],
|
||||
[2998574933u32, "a", 0.85]
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -979,12 +978,12 @@ async fn api_v1_query_csv_format() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=a usage=0.9 1\n\
|
||||
cpu,host=a usage=0.89 2\n\
|
||||
cpu,host=a usage=0.85 3\n\
|
||||
mem,host=a usage=0.5 4\n\
|
||||
mem,host=a usage=0.6 5\n\
|
||||
mem,host=a usage=0.7 6",
|
||||
"cpu,host=a usage=0.9 2998574931\n\
|
||||
cpu,host=a usage=0.89 2998574932\n\
|
||||
cpu,host=a usage=0.85 2998574933\n\
|
||||
mem,host=a usage=0.5 2998574934\n\
|
||||
mem,host=a usage=0.6 2998574935\n\
|
||||
mem,host=a usage=0.7 2998574936",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -1004,9 +1003,9 @@ async fn api_v1_query_csv_format() {
|
|||
epoch: None,
|
||||
query: "SELECT time, host, usage FROM cpu",
|
||||
expected: "name,tags,time,host,usage\n\
|
||||
cpu,,1970-01-01T00:00:01Z,a,0.9\n\
|
||||
cpu,,1970-01-01T00:00:02Z,a,0.89\n\
|
||||
cpu,,1970-01-01T00:00:03Z,a,0.85\n\r\n",
|
||||
cpu,,2065-01-07T17:28:51Z,a,0.9\n\
|
||||
cpu,,2065-01-07T17:28:52Z,a,0.89\n\
|
||||
cpu,,2065-01-07T17:28:53Z,a,0.85\n\r\n",
|
||||
},
|
||||
// Basic Query with multiple measurements:
|
||||
TestCase {
|
||||
|
@ -1014,12 +1013,12 @@ async fn api_v1_query_csv_format() {
|
|||
epoch: None,
|
||||
query: "SELECT time, host, usage FROM cpu, mem",
|
||||
expected: "name,tags,time,host,usage\n\
|
||||
mem,,1970-01-01T00:00:04Z,a,0.5\n\
|
||||
mem,,1970-01-01T00:00:05Z,a,0.6\n\
|
||||
mem,,1970-01-01T00:00:06Z,a,0.7\n\
|
||||
cpu,,1970-01-01T00:00:01Z,a,0.9\n\
|
||||
cpu,,1970-01-01T00:00:02Z,a,0.89\n\
|
||||
cpu,,1970-01-01T00:00:03Z,a,0.85\n\r\n",
|
||||
mem,,2065-01-07T17:28:54Z,a,0.5\n\
|
||||
mem,,2065-01-07T17:28:55Z,a,0.6\n\
|
||||
mem,,2065-01-07T17:28:56Z,a,0.7\n\
|
||||
cpu,,2065-01-07T17:28:51Z,a,0.9\n\
|
||||
cpu,,2065-01-07T17:28:52Z,a,0.89\n\
|
||||
cpu,,2065-01-07T17:28:53Z,a,0.85\n\r\n",
|
||||
},
|
||||
// Basic Query with db in query string:
|
||||
TestCase {
|
||||
|
@ -1027,9 +1026,9 @@ async fn api_v1_query_csv_format() {
|
|||
epoch: None,
|
||||
query: "SELECT time, host, usage FROM foo.autogen.cpu",
|
||||
expected: "name,tags,time,host,usage\n\
|
||||
cpu,,1970-01-01T00:00:01Z,a,0.9\n\
|
||||
cpu,,1970-01-01T00:00:02Z,a,0.89\n\
|
||||
cpu,,1970-01-01T00:00:03Z,a,0.85\n\r\n",
|
||||
cpu,,2065-01-07T17:28:51Z,a,0.9\n\
|
||||
cpu,,2065-01-07T17:28:52Z,a,0.89\n\
|
||||
cpu,,2065-01-07T17:28:53Z,a,0.85\n\r\n",
|
||||
},
|
||||
// Basic Query epoch parameter set:
|
||||
TestCase {
|
||||
|
@ -1037,9 +1036,9 @@ async fn api_v1_query_csv_format() {
|
|||
epoch: Some("s"),
|
||||
query: "SELECT time, host, usage FROM cpu",
|
||||
expected: "name,tags,time,host,usage\n\
|
||||
cpu,,1,a,0.9\n\
|
||||
cpu,,2,a,0.89\n\
|
||||
cpu,,3,a,0.85\n\r\n",
|
||||
cpu,,2998574931,a,0.9\n\
|
||||
cpu,,2998574932,a,0.89\n\
|
||||
cpu,,2998574933,a,0.85\n\r\n",
|
||||
},
|
||||
];
|
||||
|
||||
|
@ -1072,12 +1071,12 @@ async fn api_v1_query_chunked() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=a usage=0.9 1\n\
|
||||
cpu,host=a usage=0.89 2\n\
|
||||
cpu,host=a usage=0.85 3\n\
|
||||
mem,host=a usage=0.5 4\n\
|
||||
mem,host=a usage=0.6 5\n\
|
||||
mem,host=a usage=0.7 6",
|
||||
"cpu,host=a usage=0.9 2998574931\n\
|
||||
cpu,host=a usage=0.89 2998574932\n\
|
||||
cpu,host=a usage=0.85 2998574933\n\
|
||||
mem,host=a usage=0.5 2998574934\n\
|
||||
mem,host=a usage=0.6 2998574935\n\
|
||||
mem,host=a usage=0.7 2998574936",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -1102,9 +1101,9 @@ async fn api_v1_query_chunked() {
|
|||
"name": "cpu",
|
||||
"columns": ["time","host","usage"],
|
||||
"values": [
|
||||
[1, "a", 0.9],
|
||||
[2, "a", 0.89],
|
||||
[3, "a", 0.85]
|
||||
[2998574931u32, "a", 0.9],
|
||||
[2998574932u32, "a", 0.89],
|
||||
[2998574933u32, "a", 0.85]
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -1126,8 +1125,8 @@ async fn api_v1_query_chunked() {
|
|||
"name": "cpu",
|
||||
"columns": ["time","host","usage"],
|
||||
"values": [
|
||||
[1, "a", 0.9],
|
||||
[2, "a", 0.89],
|
||||
[2998574931u32, "a", 0.9],
|
||||
[2998574932u32, "a", 0.89],
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -1143,7 +1142,7 @@ async fn api_v1_query_chunked() {
|
|||
"name": "cpu",
|
||||
"columns": ["time","host","usage"],
|
||||
"values": [
|
||||
[3, "a", 0.85]
|
||||
[2998574933u32, "a", 0.85]
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -1166,9 +1165,9 @@ async fn api_v1_query_chunked() {
|
|||
"name": "cpu",
|
||||
"columns": ["time","host","usage"],
|
||||
"values": [
|
||||
[1, "a", 0.9],
|
||||
[2, "a", 0.89],
|
||||
[3, "a", 0.85]
|
||||
[2998574931u32, "a", 0.9],
|
||||
[2998574932u32, "a", 0.89],
|
||||
[2998574933u32, "a", 0.85]
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -1184,9 +1183,9 @@ async fn api_v1_query_chunked() {
|
|||
"name": "mem",
|
||||
"columns": ["time","host","usage"],
|
||||
"values": [
|
||||
[4, "a", 0.5],
|
||||
[5, "a", 0.6],
|
||||
[6, "a", 0.7]
|
||||
[2998574934u32, "a", 0.5],
|
||||
[2998574935u32, "a", 0.6],
|
||||
[2998574936u32, "a", 0.7]
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -1209,8 +1208,8 @@ async fn api_v1_query_chunked() {
|
|||
"name": "cpu",
|
||||
"columns": ["time","host","usage"],
|
||||
"values": [
|
||||
[1, "a", 0.9],
|
||||
[2, "a", 0.89],
|
||||
[2998574931u32, "a", 0.9],
|
||||
[2998574932u32, "a", 0.89],
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -1226,7 +1225,7 @@ async fn api_v1_query_chunked() {
|
|||
"name": "cpu",
|
||||
"columns": ["time","host","usage"],
|
||||
"values": [
|
||||
[3, "a", 0.85]
|
||||
[2998574933u32, "a", 0.85]
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -1242,8 +1241,8 @@ async fn api_v1_query_chunked() {
|
|||
"name": "mem",
|
||||
"columns": ["time","host","usage"],
|
||||
"values": [
|
||||
[4, "a", 0.5],
|
||||
[5, "a", 0.6],
|
||||
[2998574934u32, "a", 0.5],
|
||||
[2998574935u32, "a", 0.6],
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -1259,7 +1258,7 @@ async fn api_v1_query_chunked() {
|
|||
"name": "mem",
|
||||
"columns": ["time","host","usage"],
|
||||
"values": [
|
||||
[6, "a", 0.7]
|
||||
[2998574936u32, "a", 0.7]
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -1301,11 +1300,11 @@ async fn api_v1_query_data_conversion() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"weather,location=us-midwest temperature_integer=82i 1465839830100400200\n\
|
||||
weather,location=us-midwest temperature_float=82 1465839830100400200\n\
|
||||
weather,location=us-midwest temperature_str=\"too warm\" 1465839830100400200\n\
|
||||
weather,location=us-midwest too_hot=true 1465839830100400200",
|
||||
Precision::Nanosecond,
|
||||
"weather,location=us-midwest temperature_integer=82i 2998574930\n\
|
||||
weather,location=us-midwest temperature_float=82 2998574930\n\
|
||||
weather,location=us-midwest temperature_str=\"too warm\" 2998574930\n\
|
||||
weather,location=us-midwest too_hot=true 2998574930",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -1338,7 +1337,7 @@ async fn api_v1_query_data_conversion() {
|
|||
],
|
||||
"name": "weather",
|
||||
"values": [
|
||||
["2016-06-13T17:43:50.100400200Z", "us-midwest", 82, 82.0, "too warm", true],
|
||||
["2065-01-07T17:28:50Z", "us-midwest", 82, 82.0, "too warm", true],
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -1378,12 +1377,12 @@ async fn api_v1_query_uri_and_body() {
|
|||
.write_lp_to_db(
|
||||
"foo",
|
||||
"\
|
||||
cpu,host=a usage=0.9 1\n\
|
||||
cpu,host=b usage=0.89 1\n\
|
||||
cpu,host=c usage=0.85 1\n\
|
||||
mem,host=a usage=0.5 2\n\
|
||||
mem,host=b usage=0.6 2\n\
|
||||
mem,host=c usage=0.7 2\
|
||||
cpu,host=a usage=0.9 2998674931\n\
|
||||
cpu,host=b usage=0.89 2998674931\n\
|
||||
cpu,host=c usage=0.85 2998674931\n\
|
||||
mem,host=a usage=0.5 2998674932\n\
|
||||
mem,host=b usage=0.6 2998674932\n\
|
||||
mem,host=c usage=0.7 2998674932\
|
||||
",
|
||||
Precision::Second,
|
||||
)
|
||||
|
@ -1427,17 +1426,17 @@ async fn api_v1_query_uri_and_body() {
|
|||
"name": "cpu",
|
||||
"values": [
|
||||
[
|
||||
"1970-01-01T00:00:01Z",
|
||||
"2065-01-08T21:15:31Z",
|
||||
"a",
|
||||
0.9
|
||||
],
|
||||
[
|
||||
"1970-01-01T00:00:01Z",
|
||||
"2065-01-08T21:15:31Z",
|
||||
"b",
|
||||
0.89
|
||||
],
|
||||
[
|
||||
"1970-01-01T00:00:01Z",
|
||||
"2065-01-08T21:15:31Z",
|
||||
"c",
|
||||
0.85
|
||||
]
|
||||
|
@ -1514,17 +1513,17 @@ async fn api_v1_query_uri_and_body() {
|
|||
"name": "mem",
|
||||
"values": [
|
||||
[
|
||||
"1970-01-01T00:00:02Z",
|
||||
"2065-01-08T21:15:32Z",
|
||||
"a",
|
||||
0.5
|
||||
],
|
||||
[
|
||||
"1970-01-01T00:00:02Z",
|
||||
"2065-01-08T21:15:32Z",
|
||||
"b",
|
||||
0.6
|
||||
],
|
||||
[
|
||||
"1970-01-01T00:00:02Z",
|
||||
"2065-01-08T21:15:32Z",
|
||||
"c",
|
||||
0.7
|
||||
]
|
||||
|
@ -1673,8 +1672,8 @@ async fn api_v3_query_null_tag_values_null_fields() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=a,region=us-east usage=0.9,system=0.1 1
|
||||
cpu,host=b usage=0.80,system=0.1 4",
|
||||
"cpu,host=a,region=us-east usage=0.9,system=0.1 2998674931
|
||||
cpu,host=b usage=0.80,system=0.1 2998674934",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -1704,8 +1703,8 @@ async fn api_v3_query_null_tag_values_null_fields() {
|
|||
"+------+---------+---------------------+-------+\n\
|
||||
| host | region | time | usage |\n\
|
||||
+------+---------+---------------------+-------+\n\
|
||||
| a | us-east | 1970-01-01T00:00:01 | 0.9 |\n\
|
||||
| b | | 1970-01-01T00:00:04 | 0.8 |\n\
|
||||
| a | us-east | 2065-01-08T21:15:31 | 0.9 |\n\
|
||||
| b | | 2065-01-08T21:15:34 | 0.8 |\n\
|
||||
+------+---------+---------------------+-------+",
|
||||
resp
|
||||
);
|
||||
|
@ -1713,7 +1712,7 @@ async fn api_v3_query_null_tag_values_null_fields() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=a,region=us-east usage=0.9 10000000",
|
||||
"cpu,host=a,region=us-east usage=0.9 2998674935",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -1737,9 +1736,9 @@ async fn api_v3_query_null_tag_values_null_fields() {
|
|||
"+------+---------+--------+---------------------+-------+\n\
|
||||
| host | region | system | time | usage |\n\
|
||||
+------+---------+--------+---------------------+-------+\n\
|
||||
| a | us-east | 0.1 | 1970-01-01T00:00:01 | 0.9 |\n\
|
||||
| b | | 0.1 | 1970-01-01T00:00:04 | 0.8 |\n\
|
||||
| a | us-east | | 1970-04-26T17:46:40 | 0.9 |\n\
|
||||
| a | us-east | 0.1 | 2065-01-08T21:15:31 | 0.9 |\n\
|
||||
| b | | 0.1 | 2065-01-08T21:15:34 | 0.8 |\n\
|
||||
| a | us-east | | 2065-01-08T21:15:35 | 0.9 |\n\
|
||||
+------+---------+--------+---------------------+-------+",
|
||||
resp
|
||||
);
|
||||
|
|
|
@ -11,10 +11,10 @@ async fn queries_table() {
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=s1,region=us-east usage=0.9 1\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 3",
|
||||
Precision::Nanosecond,
|
||||
"cpu,host=s1,region=us-east usage=0.9 2998574931\n\
|
||||
cpu,host=s1,region=us-east usage=0.89 2998574932\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 2998574933",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
.expect("write some lp");
|
||||
|
|
|
@ -128,11 +128,11 @@ async fn api_v1_write_round_trip() {
|
|||
|
||||
client
|
||||
.post(write_url)
|
||||
.query(&[("db", "foo")])
|
||||
.query(&[("db", "foo"), ("precision", "s")])
|
||||
.body(
|
||||
"cpu,host=a usage=0.5 1
|
||||
cpu,host=a usage=0.6 2
|
||||
cpu,host=a usage=0.7 3",
|
||||
"cpu,host=a usage=0.5 2998574931
|
||||
cpu,host=a usage=0.6 2998574932
|
||||
cpu,host=a usage=0.7 2998574933",
|
||||
)
|
||||
.send()
|
||||
.await
|
||||
|
@ -150,13 +150,13 @@ async fn api_v1_write_round_trip() {
|
|||
|
||||
assert_eq!(
|
||||
resp,
|
||||
"+------------------+-------------------------------+------+-------+\n\
|
||||
| iox::measurement | time | host | usage |\n\
|
||||
+------------------+-------------------------------+------+-------+\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000001 | a | 0.5 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000002 | a | 0.6 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000003 | a | 0.7 |\n\
|
||||
+------------------+-------------------------------+------+-------+"
|
||||
"+------------------+---------------------+------+-------+\n\
|
||||
| iox::measurement | time | host | usage |\n\
|
||||
+------------------+---------------------+------+-------+\n\
|
||||
| cpu | 2065-01-07T17:28:51 | a | 0.5 |\n\
|
||||
| cpu | 2065-01-07T17:28:52 | a | 0.6 |\n\
|
||||
| cpu | 2065-01-07T17:28:53 | a | 0.7 |\n\
|
||||
+------------------+---------------------+------+-------+"
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -266,11 +266,11 @@ async fn api_v2_write_round_trip() {
|
|||
|
||||
client
|
||||
.post(write_url)
|
||||
.query(&[("bucket", "foo")])
|
||||
.query(&[("bucket", "foo"), ("precision", "s")])
|
||||
.body(
|
||||
"cpu,host=a usage=0.5 1
|
||||
cpu,host=a usage=0.6 2
|
||||
cpu,host=a usage=0.7 3",
|
||||
"cpu,host=a usage=0.5 2998574931
|
||||
cpu,host=a usage=0.6 2998574932
|
||||
cpu,host=a usage=0.7 2998574933",
|
||||
)
|
||||
.send()
|
||||
.await
|
||||
|
@ -288,13 +288,13 @@ async fn api_v2_write_round_trip() {
|
|||
|
||||
assert_eq!(
|
||||
resp,
|
||||
"+------------------+-------------------------------+------+-------+\n\
|
||||
| iox::measurement | time | host | usage |\n\
|
||||
+------------------+-------------------------------+------+-------+\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000001 | a | 0.5 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000002 | a | 0.6 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000003 | a | 0.7 |\n\
|
||||
+------------------+-------------------------------+------+-------+"
|
||||
"+------------------+---------------------+------+-------+\n\
|
||||
| iox::measurement | time | host | usage |\n\
|
||||
+------------------+---------------------+------+-------+\n\
|
||||
| cpu | 2065-01-07T17:28:51 | a | 0.5 |\n\
|
||||
| cpu | 2065-01-07T17:28:52 | a | 0.6 |\n\
|
||||
| cpu | 2065-01-07T17:28:53 | a | 0.7 |\n\
|
||||
+------------------+---------------------+------+-------+"
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -309,8 +309,8 @@ async fn writes_with_different_schema_should_fail() {
|
|||
.write_lp_to_db(
|
||||
"foo",
|
||||
"\
|
||||
t0,t0_tag0=initTag t0_f0=0i 1715694000\n\
|
||||
t0,t0_tag0=initTag t0_f0=1i 1715694001",
|
||||
t0,t0_tag0=initTag t0_f0=0i\n\
|
||||
t0,t0_tag0=initTag t0_f0=1i 2998574931",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
@ -321,8 +321,8 @@ async fn writes_with_different_schema_should_fail() {
|
|||
.write_lp_to_db(
|
||||
"foo",
|
||||
"\
|
||||
t0,t0_tag0=initTag t0_f0=0u 1715694000\n\
|
||||
t0,t0_tag0=initTag t0_f0=1u 1715694001",
|
||||
t0,t0_tag0=initTag t0_f0=0u 2998574930\n\
|
||||
t0,t0_tag0=initTag t0_f0=1u 2998574931",
|
||||
Precision::Second,
|
||||
)
|
||||
.await
|
||||
|
|
|
@ -28,6 +28,9 @@ use serde::{Deserialize, Serialize};
|
|||
use std::{fmt::Debug, sync::Arc, time::Duration};
|
||||
use thiserror::Error;
|
||||
|
||||
/// Used to determine if writes are older than what we can accept or query
|
||||
pub const THREE_DAYS: Duration = Duration::from_secs(60 * 60 * 24 * 3);
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("object store path error: {0}")]
|
||||
|
|
|
@ -199,6 +199,7 @@ impl WriteBufferImpl {
|
|||
}
|
||||
|
||||
let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots(
|
||||
Arc::clone(&time_provider),
|
||||
persisted_snapshots,
|
||||
));
|
||||
let queryable_buffer = Arc::new(QueryableBuffer::new(QueryableBufferArgs {
|
||||
|
@ -208,6 +209,7 @@ impl WriteBufferImpl {
|
|||
last_cache_provider: Arc::clone(&last_cache),
|
||||
distinct_cache_provider: Arc::clone(&distinct_cache),
|
||||
persisted_files: Arc::clone(&persisted_files),
|
||||
time_provider: Arc::clone(&time_provider),
|
||||
parquet_cache: parquet_cache.clone(),
|
||||
}));
|
||||
|
||||
|
|
|
@ -7,21 +7,35 @@ use hashbrown::HashMap;
|
|||
use influxdb3_id::DbId;
|
||||
use influxdb3_id::TableId;
|
||||
use influxdb3_telemetry::ParquetMetrics;
|
||||
use iox_time::TimeProvider;
|
||||
use parking_lot::RwLock;
|
||||
use std::sync::Arc;
|
||||
|
||||
type DatabaseToTables = HashMap<DbId, TableToFiles>;
|
||||
type TableToFiles = HashMap<TableId, Vec<ParquetFile>>;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug)]
|
||||
pub struct PersistedFiles {
|
||||
/// The time provider to check if something is older than 3 days
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
inner: RwLock<Inner>,
|
||||
}
|
||||
|
||||
impl PersistedFiles {
|
||||
pub fn new(time_provider: Arc<dyn TimeProvider>) -> Self {
|
||||
Self {
|
||||
time_provider,
|
||||
inner: Default::default(),
|
||||
}
|
||||
}
|
||||
/// Create a new `PersistedFiles` from a list of persisted snapshots
|
||||
pub fn new_from_persisted_snapshots(persisted_snapshots: Vec<PersistedSnapshot>) -> Self {
|
||||
pub fn new_from_persisted_snapshots(
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
persisted_snapshots: Vec<PersistedSnapshot>,
|
||||
) -> Self {
|
||||
let inner = Inner::new_from_persisted_snapshots(persisted_snapshots);
|
||||
Self {
|
||||
time_provider,
|
||||
inner: RwLock::new(inner),
|
||||
}
|
||||
}
|
||||
|
@ -34,6 +48,7 @@ impl PersistedFiles {
|
|||
|
||||
/// Get the list of files for a given database and table, always return in descending order of min_time
|
||||
pub fn get_files(&self, db_id: DbId, table_id: TableId) -> Vec<ParquetFile> {
|
||||
let three_days_ago = (self.time_provider.now() - crate::THREE_DAYS).timestamp_nanos();
|
||||
let mut files = {
|
||||
let inner = self.inner.read();
|
||||
inner
|
||||
|
@ -42,6 +57,9 @@ impl PersistedFiles {
|
|||
.and_then(|tables| tables.get(&table_id))
|
||||
.cloned()
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.filter(|file| dbg!(file.min_time) > dbg!(three_days_ago))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
files.sort_by(|a, b| b.min_time.cmp(&a.min_time));
|
||||
|
@ -153,6 +171,8 @@ mod tests {
|
|||
|
||||
use influxdb3_catalog::catalog::CatalogSequenceNumber;
|
||||
use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber};
|
||||
use iox_time::MockProvider;
|
||||
use iox_time::Time;
|
||||
use observability_deps::tracing::info;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
|
@ -163,8 +183,12 @@ mod tests {
|
|||
#[test_log::test(test)]
|
||||
fn test_get_metrics_after_initial_load() {
|
||||
let all_persisted_snapshot_files = build_persisted_snapshots();
|
||||
let persisted_file =
|
||||
PersistedFiles::new_from_persisted_snapshots(all_persisted_snapshot_files);
|
||||
let time_provider: Arc<dyn TimeProvider> =
|
||||
Arc::new(MockProvider::new(Time::from_timestamp(0, 0).unwrap()));
|
||||
let persisted_file = PersistedFiles::new_from_persisted_snapshots(
|
||||
time_provider,
|
||||
all_persisted_snapshot_files,
|
||||
);
|
||||
|
||||
let (file_count, size_in_mb, row_count) = persisted_file.get_metrics();
|
||||
|
||||
|
@ -177,8 +201,12 @@ mod tests {
|
|||
#[test_log::test(test)]
|
||||
fn test_get_metrics_after_update() {
|
||||
let all_persisted_snapshot_files = build_persisted_snapshots();
|
||||
let persisted_file =
|
||||
PersistedFiles::new_from_persisted_snapshots(all_persisted_snapshot_files);
|
||||
let time_provider: Arc<dyn TimeProvider> =
|
||||
Arc::new(MockProvider::new(Time::from_timestamp(0, 0).unwrap()));
|
||||
let persisted_file = PersistedFiles::new_from_persisted_snapshots(
|
||||
time_provider,
|
||||
all_persisted_snapshot_files,
|
||||
);
|
||||
let parquet_files = build_parquet_files(5);
|
||||
let new_snapshot = build_snapshot(parquet_files, 1, 1, 1);
|
||||
persisted_file.add_persisted_snapshot_files(new_snapshot);
|
||||
|
@ -207,8 +235,12 @@ mod tests {
|
|||
.cloned()
|
||||
.unwrap();
|
||||
|
||||
let persisted_file =
|
||||
PersistedFiles::new_from_persisted_snapshots(all_persisted_snapshot_files);
|
||||
let time_provider: Arc<dyn TimeProvider> =
|
||||
Arc::new(MockProvider::new(Time::from_timestamp(0, 0).unwrap()));
|
||||
let persisted_file = PersistedFiles::new_from_persisted_snapshots(
|
||||
time_provider,
|
||||
all_persisted_snapshot_files,
|
||||
);
|
||||
let mut parquet_files = build_parquet_files(4);
|
||||
info!(all_persisted_files = ?persisted_file, "Full persisted file");
|
||||
info!(already_existing_file = ?already_existing_file, "Existing file");
|
||||
|
|
|
@ -26,6 +26,7 @@ use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges};
|
|||
use iox_query::exec::Executor;
|
||||
use iox_query::frontend::reorg::ReorgPlanner;
|
||||
use iox_query::QueryChunk;
|
||||
use iox_time::TimeProvider;
|
||||
use object_store::path::Path;
|
||||
use observability_deps::tracing::{debug, error, info};
|
||||
use parking_lot::RwLock;
|
||||
|
@ -47,6 +48,7 @@ pub struct QueryableBuffer {
|
|||
persisted_files: Arc<PersistedFiles>,
|
||||
buffer: Arc<RwLock<BufferState>>,
|
||||
parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
/// Sends a notification to this watch channel whenever a snapshot info is persisted
|
||||
persisted_snapshot_notify_rx: tokio::sync::watch::Receiver<Option<PersistedSnapshot>>,
|
||||
persisted_snapshot_notify_tx: tokio::sync::watch::Sender<Option<PersistedSnapshot>>,
|
||||
|
@ -60,6 +62,7 @@ pub struct QueryableBufferArgs {
|
|||
pub distinct_cache_provider: Arc<DistinctCacheProvider>,
|
||||
pub persisted_files: Arc<PersistedFiles>,
|
||||
pub parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
|
||||
pub time_provider: Arc<dyn TimeProvider>,
|
||||
}
|
||||
|
||||
impl QueryableBuffer {
|
||||
|
@ -72,6 +75,7 @@ impl QueryableBuffer {
|
|||
distinct_cache_provider,
|
||||
persisted_files,
|
||||
parquet_cache,
|
||||
time_provider,
|
||||
}: QueryableBufferArgs,
|
||||
) -> Self {
|
||||
let buffer = Arc::new(RwLock::new(BufferState::new(Arc::clone(&catalog))));
|
||||
|
@ -86,6 +90,7 @@ impl QueryableBuffer {
|
|||
persisted_files,
|
||||
buffer,
|
||||
parquet_cache,
|
||||
time_provider,
|
||||
persisted_snapshot_notify_rx,
|
||||
persisted_snapshot_notify_tx,
|
||||
}
|
||||
|
@ -118,6 +123,9 @@ impl QueryableBuffer {
|
|||
.partitioned_record_batches(Arc::clone(&table_def), filters)
|
||||
.map_err(|e| DataFusionError::Execution(format!("error getting batches {}", e)))?
|
||||
.into_iter()
|
||||
.filter(|(_, (ts_min_max, _))| {
|
||||
ts_min_max.min > (self.time_provider.now() - crate::THREE_DAYS).timestamp_nanos()
|
||||
})
|
||||
.map(|(gen_time, (ts_min_max, batches))| {
|
||||
let row_count = batches.iter().map(|b| b.num_rows()).sum::<usize>();
|
||||
let chunk_stats = create_chunk_statistics(
|
||||
|
@ -755,7 +763,8 @@ mod tests {
|
|||
Arc::clone(&catalog),
|
||||
)
|
||||
.unwrap(),
|
||||
persisted_files: Arc::new(Default::default()),
|
||||
time_provider: Arc::clone(&time_provider),
|
||||
persisted_files: Arc::new(PersistedFiles::new(Arc::clone(&time_provider))),
|
||||
parquet_cache: None,
|
||||
};
|
||||
let queryable_buffer = QueryableBuffer::new(queryable_buffer_args);
|
||||
|
@ -764,10 +773,18 @@ mod tests {
|
|||
|
||||
// create the initial write with two tags
|
||||
let val = WriteValidator::initialize(db.clone(), Arc::clone(&catalog), 0).unwrap();
|
||||
let lp = "foo,t1=a,t2=b f1=1i 1000000000";
|
||||
let lp = format!(
|
||||
"foo,t1=a,t2=b f1=1i {}",
|
||||
time_provider.now().timestamp_nanos()
|
||||
);
|
||||
|
||||
let lines = val
|
||||
.v1_parse_lines_and_update_schema(lp, false, time_provider.now(), Precision::Nanosecond)
|
||||
.v1_parse_lines_and_update_schema(
|
||||
&lp,
|
||||
false,
|
||||
time_provider.now(),
|
||||
Precision::Nanosecond,
|
||||
)
|
||||
.unwrap()
|
||||
.convert_lines_to_buffer(Gen1Duration::new_1m());
|
||||
let batch: WriteBatch = lines.into();
|
||||
|
|
|
@ -241,6 +241,15 @@ fn validate_and_qualify_v1_line(
|
|||
.timestamp
|
||||
.map(|ts| apply_precision_to_timestamp(precision, ts))
|
||||
.unwrap_or(ingest_time.timestamp_nanos());
|
||||
|
||||
if timestamp_ns < (ingest_time - crate::THREE_DAYS).timestamp_nanos() {
|
||||
return Err(WriteLineError {
|
||||
original_line: line.to_string(),
|
||||
line_number: line_number + 1,
|
||||
error_message: "line contained a date that was more than 3 days ago".into(),
|
||||
});
|
||||
}
|
||||
|
||||
fields.push(Field::new(time_col_id, FieldData::Timestamp(timestamp_ns)));
|
||||
|
||||
// if we have new columns defined, add them to the db_schema table so that subsequent lines
|
||||
|
|
Loading…
Reference in New Issue