2023-01-26 14:38:05 +00:00
|
|
|
use std::sync::Arc;
|
2022-10-26 14:24:49 +00:00
|
|
|
|
2022-02-18 14:04:39 +00:00
|
|
|
use assert_matches::assert_matches;
|
2023-01-26 14:38:05 +00:00
|
|
|
use data_types::{ColumnType, QueryPoolId, ShardIndex, TopicId};
|
2022-02-18 14:04:39 +00:00
|
|
|
use dml::DmlOperation;
|
2022-10-28 10:37:05 +00:00
|
|
|
use futures::{stream::FuturesUnordered, StreamExt};
|
2022-02-18 14:04:39 +00:00
|
|
|
use hashbrown::HashMap;
|
|
|
|
use hyper::{Body, Request, StatusCode};
|
2022-11-17 20:55:58 +00:00
|
|
|
use iox_time::{SystemProvider, TimeProvider};
|
2023-01-26 14:38:05 +00:00
|
|
|
use metric::{Attributes, DurationHistogram, Metric, U64Counter};
|
|
|
|
use router::dml_handlers::{DmlError, RetentionError, SchemaError};
|
2022-02-18 14:04:39 +00:00
|
|
|
|
2023-01-26 14:38:05 +00:00
|
|
|
use crate::common::{TestContext, TEST_QUERY_POOL_ID, TEST_RETENTION_PERIOD_NS, TEST_TOPIC_ID};
|
2022-02-18 14:04:39 +00:00
|
|
|
|
2023-01-26 14:38:05 +00:00
|
|
|
pub mod common;
|
2022-02-18 14:04:39 +00:00
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_write_ok() {
|
2023-01-26 14:41:14 +00:00
|
|
|
let ctx = TestContext::new(true, None).await;
|
2022-02-18 14:04:39 +00:00
|
|
|
|
2022-11-17 20:55:58 +00:00
|
|
|
// Write data inside retention period
|
|
|
|
let now = SystemProvider::default()
|
|
|
|
.now()
|
|
|
|
.timestamp_nanos()
|
|
|
|
.to_string();
|
|
|
|
let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now;
|
2022-02-18 14:04:39 +00:00
|
|
|
let response = ctx
|
2023-01-26 15:00:40 +00:00
|
|
|
.write_lp("bananas", "test", lp)
|
2022-02-18 14:04:39 +00:00
|
|
|
.await
|
2023-01-26 15:00:40 +00:00
|
|
|
.expect("write failed");
|
2022-02-18 14:04:39 +00:00
|
|
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
|
|
|
|
|
|
|
// Check the write buffer observed the correct write.
|
2022-08-19 21:19:28 +00:00
|
|
|
let writes = ctx.write_buffer_state().get_messages(ShardIndex::new(0));
|
2022-02-18 14:04:39 +00:00
|
|
|
assert_eq!(writes.len(), 1);
|
|
|
|
assert_matches!(writes.as_slice(), [Ok(DmlOperation::Write(w))] => {
|
2022-11-16 16:43:23 +00:00
|
|
|
let table_id = ctx.table_id("bananas_test", "platanos").await;
|
|
|
|
assert!(w.table(&table_id).is_some());
|
2022-02-18 14:04:39 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
// Ensure the catalog saw the namespace creation
|
|
|
|
let ns = ctx
|
|
|
|
.catalog()
|
|
|
|
.repositories()
|
|
|
|
.await
|
|
|
|
.namespaces()
|
|
|
|
.get_by_name("bananas_test")
|
|
|
|
.await
|
|
|
|
.expect("query should succeed")
|
|
|
|
.expect("namespace not found");
|
|
|
|
assert_eq!(ns.name, "bananas_test");
|
2022-08-25 20:17:15 +00:00
|
|
|
assert_eq!(ns.topic_id, TopicId::new(TEST_TOPIC_ID));
|
2022-02-18 14:04:39 +00:00
|
|
|
assert_eq!(ns.query_pool_id, QueryPoolId::new(TEST_QUERY_POOL_ID));
|
2022-11-18 13:02:12 +00:00
|
|
|
assert_eq!(ns.retention_period_ns, None);
|
2022-02-18 14:04:39 +00:00
|
|
|
|
|
|
|
// Ensure the metric instrumentation was hit
|
|
|
|
let histogram = ctx
|
|
|
|
.metrics()
|
2022-06-09 17:07:45 +00:00
|
|
|
.get_instrument::<Metric<DurationHistogram>>("dml_handler_write_duration")
|
2022-02-18 14:04:39 +00:00
|
|
|
.expect("failed to read metric")
|
|
|
|
.get_observer(&Attributes::from(&[
|
|
|
|
("handler", "request"),
|
|
|
|
("result", "success"),
|
|
|
|
]))
|
|
|
|
.expect("failed to get observer")
|
|
|
|
.fetch();
|
2022-06-14 17:58:19 +00:00
|
|
|
let hit_count = histogram.sample_count();
|
2022-02-18 14:04:39 +00:00
|
|
|
assert_eq!(hit_count, 1);
|
2022-03-02 12:42:31 +00:00
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
ctx.metrics()
|
2022-11-07 10:04:31 +00:00
|
|
|
.get_instrument::<Metric<U64Counter>>("http_write_lines")
|
2022-03-02 12:42:31 +00:00
|
|
|
.expect("failed to read metric")
|
|
|
|
.get_observer(&Attributes::from(&[]))
|
|
|
|
.expect("failed to get observer")
|
|
|
|
.fetch(),
|
|
|
|
1
|
|
|
|
);
|
2022-03-03 16:24:51 +00:00
|
|
|
|
|
|
|
let histogram = ctx
|
|
|
|
.metrics()
|
2022-08-19 21:19:28 +00:00
|
|
|
.get_instrument::<Metric<DurationHistogram>>("shard_enqueue_duration")
|
2022-03-03 16:24:51 +00:00
|
|
|
.expect("failed to read metric")
|
|
|
|
.get_observer(&Attributes::from(&[
|
2022-05-23 19:11:58 +00:00
|
|
|
("kafka_partition", "0"),
|
2022-03-03 16:24:51 +00:00
|
|
|
("result", "success"),
|
|
|
|
]))
|
|
|
|
.expect("failed to get observer")
|
|
|
|
.fetch();
|
2022-06-14 17:58:19 +00:00
|
|
|
let hit_count = histogram.sample_count();
|
2022-03-03 16:24:51 +00:00
|
|
|
assert_eq!(hit_count, 1);
|
2022-02-18 14:04:39 +00:00
|
|
|
}
|
2022-03-25 10:39:30 +00:00
|
|
|
|
2022-11-17 20:55:58 +00:00
|
|
|
#[tokio::test]
|
|
|
|
async fn test_write_outside_retention_period() {
|
2023-01-26 14:41:14 +00:00
|
|
|
let ctx = TestContext::new(true, TEST_RETENTION_PERIOD_NS).await;
|
2022-11-17 20:55:58 +00:00
|
|
|
|
|
|
|
// Write data outside retention period into a new table
|
|
|
|
let two_hours_ago =
|
|
|
|
(SystemProvider::default().now().timestamp_nanos() - 2 * 3_600 * 1_000_000_000).to_string();
|
|
|
|
let lp = "apple,tag1=AAA,tag2=BBB val=422i ".to_string() + &two_hours_ago;
|
|
|
|
|
2023-01-26 15:00:40 +00:00
|
|
|
let response = ctx
|
|
|
|
.write_lp("bananas", "test", lp)
|
2022-11-17 20:55:58 +00:00
|
|
|
.await
|
2023-01-26 15:00:40 +00:00
|
|
|
.expect_err("write should fail");
|
2022-11-17 20:55:58 +00:00
|
|
|
|
|
|
|
assert_matches!(
|
2023-01-26 15:00:40 +00:00
|
|
|
&response,
|
2022-11-17 20:55:58 +00:00
|
|
|
router::server::http::Error::DmlHandler(
|
|
|
|
DmlError::Retention(
|
|
|
|
RetentionError::OutsideRetention(e))
|
|
|
|
) => {
|
|
|
|
assert_eq!(e, "apple");
|
|
|
|
}
|
|
|
|
);
|
2023-01-26 15:00:40 +00:00
|
|
|
assert_eq!(response.as_status_code(), StatusCode::FORBIDDEN);
|
2022-11-17 20:55:58 +00:00
|
|
|
}
|
|
|
|
|
2022-03-25 10:39:30 +00:00
|
|
|
#[tokio::test]
|
|
|
|
async fn test_schema_conflict() {
|
2023-01-26 14:41:14 +00:00
|
|
|
let ctx = TestContext::new(true, None).await;
|
2022-03-25 10:39:30 +00:00
|
|
|
|
2022-11-17 20:55:58 +00:00
|
|
|
// data inside the retention period
|
|
|
|
let now = SystemProvider::default()
|
|
|
|
.now()
|
|
|
|
.timestamp_nanos()
|
|
|
|
.to_string();
|
|
|
|
let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now;
|
|
|
|
|
2022-03-25 10:39:30 +00:00
|
|
|
let response = ctx
|
2023-01-26 15:00:40 +00:00
|
|
|
.write_lp("bananas", "test", lp)
|
2022-03-25 10:39:30 +00:00
|
|
|
.await
|
2023-01-26 15:00:40 +00:00
|
|
|
.expect("write should succeed");
|
2022-03-25 10:39:30 +00:00
|
|
|
|
|
|
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
|
|
|
|
2022-11-17 20:55:58 +00:00
|
|
|
let now = SystemProvider::default()
|
|
|
|
.now()
|
|
|
|
.timestamp_nanos()
|
|
|
|
.to_string();
|
|
|
|
let lp = "platanos,tag1=A,tag2=B val=42.0 ".to_string() + &now;
|
|
|
|
|
2022-03-25 10:39:30 +00:00
|
|
|
let err = ctx
|
2023-01-26 15:00:40 +00:00
|
|
|
.write_lp("bananas", "test", lp)
|
2022-03-25 10:39:30 +00:00
|
|
|
.await
|
2023-01-26 15:00:40 +00:00
|
|
|
.expect_err("write should fail");
|
2022-03-25 10:39:30 +00:00
|
|
|
|
|
|
|
assert_matches!(
|
|
|
|
&err,
|
2022-05-06 18:51:52 +00:00
|
|
|
router::server::http::Error::DmlHandler(
|
2022-03-25 10:39:30 +00:00
|
|
|
DmlError::Schema(
|
2022-03-25 11:04:35 +00:00
|
|
|
SchemaError::Conflict(
|
2022-08-16 16:48:15 +00:00
|
|
|
e
|
2022-03-25 10:39:30 +00:00
|
|
|
)
|
|
|
|
)
|
|
|
|
) => {
|
2022-08-16 16:48:15 +00:00
|
|
|
assert_matches!(e.err(), iox_catalog::interface::Error::ColumnTypeMismatch {
|
|
|
|
name,
|
|
|
|
existing,
|
|
|
|
new,
|
|
|
|
} => {
|
|
|
|
assert_eq!(name, "val");
|
2022-09-12 21:24:30 +00:00
|
|
|
assert_eq!(*existing, ColumnType::I64);
|
|
|
|
assert_eq!(*new, ColumnType::F64);
|
2022-08-16 16:48:15 +00:00
|
|
|
});
|
2022-03-25 10:39:30 +00:00
|
|
|
}
|
|
|
|
);
|
|
|
|
assert_eq!(err.as_status_code(), StatusCode::BAD_REQUEST);
|
|
|
|
}
|
|
|
|
|
2022-12-07 16:12:00 +00:00
|
|
|
#[tokio::test]
|
|
|
|
async fn test_rejected_ns() {
|
2023-01-26 14:41:14 +00:00
|
|
|
let ctx = TestContext::new(false, None).await;
|
2022-12-07 16:12:00 +00:00
|
|
|
|
|
|
|
let now = SystemProvider::default()
|
|
|
|
.now()
|
|
|
|
.timestamp_nanos()
|
|
|
|
.to_string();
|
|
|
|
let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now;
|
|
|
|
|
|
|
|
let err = ctx
|
2023-01-26 15:00:40 +00:00
|
|
|
.write_lp("bananas", "test", lp)
|
2022-12-07 16:12:00 +00:00
|
|
|
.await
|
2023-01-26 15:00:40 +00:00
|
|
|
.expect_err("write should fail");
|
|
|
|
|
2022-12-07 16:12:00 +00:00
|
|
|
assert_matches!(
|
|
|
|
err,
|
|
|
|
router::server::http::Error::NamespaceResolver(
|
|
|
|
// can't check the type of the create error without making ns_autocreation public, but
|
|
|
|
// not worth it just for this test, as the correct error is asserted in unit tests in
|
|
|
|
// that module. here it's just important that the write fails.
|
|
|
|
router::namespace_resolver::Error::Create(_)
|
|
|
|
)
|
|
|
|
);
|
|
|
|
assert_eq!(err.as_status_code(), StatusCode::BAD_REQUEST);
|
|
|
|
}
|
|
|
|
|
2022-03-25 10:39:30 +00:00
|
|
|
#[tokio::test]
|
|
|
|
async fn test_schema_limit() {
|
2023-01-26 14:41:14 +00:00
|
|
|
let ctx = TestContext::new(true, None).await;
|
2022-03-25 10:39:30 +00:00
|
|
|
|
2022-11-17 20:55:58 +00:00
|
|
|
let now = SystemProvider::default()
|
|
|
|
.now()
|
|
|
|
.timestamp_nanos()
|
|
|
|
.to_string();
|
|
|
|
let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now;
|
|
|
|
|
2022-03-25 10:39:30 +00:00
|
|
|
// Drive the creation of the namespace
|
|
|
|
let response = ctx
|
2023-01-26 15:00:40 +00:00
|
|
|
.write_lp("bananas", "test", lp)
|
2022-03-25 10:39:30 +00:00
|
|
|
.await
|
2023-01-26 15:00:40 +00:00
|
|
|
.expect("write should succeed");
|
2022-03-25 10:39:30 +00:00
|
|
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
|
|
|
|
|
|
|
// Update the table limit
|
|
|
|
ctx.catalog()
|
|
|
|
.repositories()
|
|
|
|
.await
|
|
|
|
.namespaces()
|
|
|
|
.update_table_limit("bananas_test", 1)
|
|
|
|
.await
|
|
|
|
.expect("failed to update table limit");
|
|
|
|
|
|
|
|
// Attempt to create another table
|
2022-11-17 20:55:58 +00:00
|
|
|
let now = SystemProvider::default()
|
|
|
|
.now()
|
|
|
|
.timestamp_nanos()
|
|
|
|
.to_string();
|
|
|
|
let lp = "platanos2,tag1=A,tag2=B val=42i ".to_string() + &now;
|
|
|
|
|
2022-03-25 10:39:30 +00:00
|
|
|
let err = ctx
|
2023-01-26 15:00:40 +00:00
|
|
|
.write_lp("bananas", "test", lp)
|
2022-03-25 10:39:30 +00:00
|
|
|
.await
|
2023-01-26 15:00:40 +00:00
|
|
|
.expect_err("write should fail");
|
2022-03-25 10:39:30 +00:00
|
|
|
|
|
|
|
assert_matches!(
|
|
|
|
&err,
|
2022-05-06 18:51:52 +00:00
|
|
|
router::server::http::Error::DmlHandler(
|
2022-03-25 10:39:30 +00:00
|
|
|
DmlError::Schema(
|
2022-10-14 11:34:17 +00:00
|
|
|
SchemaError::ServiceLimit(e)
|
2022-03-25 10:39:30 +00:00
|
|
|
)
|
|
|
|
) => {
|
2022-10-14 11:34:17 +00:00
|
|
|
assert_eq!(
|
|
|
|
e.to_string(),
|
|
|
|
"couldn't create table platanos2; limit reached on namespace 1"
|
|
|
|
);
|
2022-03-25 10:39:30 +00:00
|
|
|
}
|
|
|
|
);
|
2022-10-14 14:12:56 +00:00
|
|
|
assert_eq!(err.as_status_code(), StatusCode::BAD_REQUEST);
|
2022-03-25 10:39:30 +00:00
|
|
|
}
|
2022-10-28 10:37:05 +00:00
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_write_propagate_ids() {
|
2023-01-26 14:41:14 +00:00
|
|
|
let ctx = TestContext::new(true, None).await;
|
2022-10-28 10:37:05 +00:00
|
|
|
|
|
|
|
// Create the namespace and a set of tables.
|
|
|
|
let ns = ctx
|
|
|
|
.catalog()
|
|
|
|
.repositories()
|
|
|
|
.await
|
|
|
|
.namespaces()
|
|
|
|
.create(
|
|
|
|
"bananas_test",
|
2022-11-18 13:02:12 +00:00
|
|
|
None,
|
2022-10-28 10:37:05 +00:00
|
|
|
TopicId::new(TEST_TOPIC_ID),
|
|
|
|
QueryPoolId::new(TEST_QUERY_POOL_ID),
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
.expect("failed to update table limit");
|
|
|
|
|
|
|
|
let catalog = ctx.catalog();
|
|
|
|
let ids = ["another", "test", "table", "platanos"]
|
|
|
|
.iter()
|
|
|
|
.map(|t| {
|
|
|
|
let catalog = Arc::clone(&catalog);
|
|
|
|
async move {
|
|
|
|
let table = catalog
|
|
|
|
.repositories()
|
|
|
|
.await
|
|
|
|
.tables()
|
|
|
|
.create_or_get(t, ns.id)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
(*t, table.id)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.collect::<FuturesUnordered<_>>()
|
|
|
|
.collect::<HashMap<_, _>>()
|
|
|
|
.await;
|
|
|
|
|
2022-11-17 20:55:58 +00:00
|
|
|
// data inside the retention period
|
|
|
|
let now = SystemProvider::default()
|
|
|
|
.now()
|
|
|
|
.timestamp_nanos()
|
|
|
|
.to_string();
|
|
|
|
let lp = format! {
|
|
|
|
"
|
|
|
|
platanos,tag1=A,tag2=B val=42i {}\n\
|
|
|
|
another,tag1=A,tag2=B val=42i {}\n\
|
|
|
|
test,tag1=A,tag2=B val=42i {}\n\
|
|
|
|
platanos,tag1=A,tag2=B val=42i {}\n\
|
|
|
|
table,tag1=A,tag2=B val=42i {}\n\
|
|
|
|
", now, now, now, now, now
|
|
|
|
|
|
|
|
};
|
|
|
|
|
2022-10-28 10:37:05 +00:00
|
|
|
let response = ctx
|
2023-01-26 15:00:40 +00:00
|
|
|
.write_lp("bananas", "test", lp)
|
2022-10-28 10:37:05 +00:00
|
|
|
.await
|
2023-01-26 15:00:40 +00:00
|
|
|
.expect("write should succeed");
|
2022-10-28 10:37:05 +00:00
|
|
|
|
|
|
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
|
|
|
|
|
|
|
// Check the write buffer observed the correct write.
|
|
|
|
let writes = ctx.write_buffer_state().get_messages(ShardIndex::new(0));
|
|
|
|
assert_eq!(writes.len(), 1);
|
|
|
|
assert_matches!(writes.as_slice(), [Ok(DmlOperation::Write(w))] => {
|
2022-11-03 10:05:51 +00:00
|
|
|
assert_eq!(w.namespace_id(), ns.id);
|
2022-10-28 10:37:05 +00:00
|
|
|
|
2022-11-16 16:43:23 +00:00
|
|
|
for id in ids.values() {
|
|
|
|
assert!(w.table(id).is_some());
|
2022-10-28 10:37:05 +00:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_delete_propagate_ids() {
|
2023-01-26 14:41:14 +00:00
|
|
|
let ctx = TestContext::new(true, None).await;
|
2022-10-28 10:37:05 +00:00
|
|
|
|
|
|
|
// Create the namespace and a set of tables.
|
|
|
|
let ns = ctx
|
|
|
|
.catalog()
|
|
|
|
.repositories()
|
|
|
|
.await
|
|
|
|
.namespaces()
|
|
|
|
.create(
|
|
|
|
"bananas_test",
|
2022-11-18 13:02:12 +00:00
|
|
|
None,
|
2022-10-28 10:37:05 +00:00
|
|
|
TopicId::new(TEST_TOPIC_ID),
|
|
|
|
QueryPoolId::new(TEST_QUERY_POOL_ID),
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
.expect("failed to update table limit");
|
|
|
|
|
|
|
|
let request = Request::builder()
|
|
|
|
.uri("https://bananas.example/api/v2/delete?org=bananas&bucket=test")
|
|
|
|
.method("POST")
|
|
|
|
.body(Body::from(
|
|
|
|
r#"{
|
|
|
|
"predicate": "_measurement=bananas",
|
|
|
|
"start": "1970-01-01T00:00:00Z",
|
|
|
|
"stop": "2070-01-02T00:00:00Z"
|
|
|
|
}"#,
|
|
|
|
))
|
|
|
|
.expect("failed to construct HTTP request");
|
|
|
|
|
|
|
|
let response = ctx
|
2023-01-26 14:41:14 +00:00
|
|
|
.http_delegate()
|
2022-10-28 10:37:05 +00:00
|
|
|
.route(request)
|
|
|
|
.await
|
|
|
|
.expect("delete request failed");
|
|
|
|
|
|
|
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
|
|
|
|
|
|
|
// Check the write buffer observed the correct write.
|
|
|
|
let writes = ctx.write_buffer_state().get_messages(ShardIndex::new(0));
|
|
|
|
assert_eq!(writes.len(), 1);
|
|
|
|
assert_matches!(writes.as_slice(), [Ok(DmlOperation::Delete(w))] => {
|
2022-11-03 10:05:51 +00:00
|
|
|
assert_eq!(w.namespace_id(), ns.id);
|
2022-10-28 10:37:05 +00:00
|
|
|
});
|
|
|
|
}
|