feat: HTTP write interface for router

pull/24376/head
Marco Neumann 2021-11-09 16:41:50 +01:00
parent 09239c4943
commit c80088bf12
7 changed files with 543 additions and 334 deletions

2
Cargo.lock generated
View File

@ -1622,9 +1622,11 @@ dependencies = [
"query",
"rand",
"read_buffer",
"regex",
"reqwest",
"router",
"rustyline",
"schema",
"serde",
"serde_json",
"serde_urlencoded",

View File

@ -98,6 +98,8 @@ influxdb2_client = { path = "../influxdb2_client" }
influxdb_storage_client = { path = "../influxdb_storage_client" }
test_helpers = { path = "../test_helpers" }
parking_lot = "0.11.2"
regex = "1.4"
schema = { path = "../schema" }
write_buffer = { path = "../write_buffer" }
# Crates.io dependencies, in alphabetical order

View File

@ -10,6 +10,7 @@ use reqwest::Client;
use serde::de::DeserializeOwned;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use trace::RingBufferTraceCollector;
use crate::influxdb_ioxd::{http::serve, server_type::ServerType};
@ -208,3 +209,34 @@ where
assert!(!data.contains(&"nonexistent"));
assert!(data.contains(&"\nhttp_requests_total{status=\"client_error\"} 1\n"));
}
/// Assert that tracing works.
///
/// For this to work the used trace collector must be a [`RingBufferTraceCollector`].
pub async fn assert_tracing<T>(test_server: TestServer<T>)
where
T: ServerType,
{
let trace_collector = test_server.server_type().trace_collector().unwrap();
let trace_collector = trace_collector
.as_any()
.downcast_ref::<RingBufferTraceCollector>()
.unwrap();
let client = Client::new();
let response = client
.get(&format!("{}/health", test_server.url()))
.header("uber-trace-id", "34f3495:36e34:0:1")
.send()
.await;
// Print the response so if the test fails, we have a log of what went wrong
check_response("health", response, StatusCode::OK, Some("OK")).await;
let mut spans = trace_collector.spans();
assert_eq!(spans.len(), 1);
let span = spans.pop().unwrap();
assert_eq!(span.ctx.trace_id.get(), 0x34f3495);
assert_eq!(span.ctx.parent_span_id.unwrap().get(), 0x36e34);
}

View File

@ -235,3 +235,300 @@ pub struct WriteInfo {
pub org: String,
pub bucket: String,
}
#[cfg(test)]
pub mod test_utils {
use http::{header::CONTENT_ENCODING, StatusCode};
use metric::{Attributes, DurationHistogram, Metric, U64Counter, U64Histogram};
use mutable_batch::DbWrite;
use mutable_batch_lp::lines_to_batches;
use reqwest::Client;
use crate::influxdb_ioxd::{
http::test_utils::{check_response, TestServer},
server_type::ServerType,
};
/// Assert that writes work.
///
/// The database `bucket_name="MyBucket", org_name="MyOrg"` must exist for this test to work.
///
/// Returns write that was generated. The caller MUST check that the write is actually present.
pub async fn assert_write<T>(test_server: &TestServer<T>) -> DbWrite
where
T: ServerType,
{
let client = Client::new();
let lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1617286224000000000";
// send write data
let bucket_name = "MyBucket";
let org_name = "MyOrg";
let response = client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
))
.body(lp_data)
.send()
.await;
check_response("write", response, StatusCode::NO_CONTENT, Some("")).await;
DbWrite::new(lines_to_batches(lp_data, 0).unwrap(), Default::default())
}
/// Assert that GZIP-compressed writes work.
///
/// The database `bucket_name="MyBucket", org_name="MyOrg"` must exist for this test to work.
///
/// Returns write that was generated. The caller MUST check that the write is actually present.
pub async fn assert_gzip_write<T>(test_server: &TestServer<T>) -> DbWrite
where
T: ServerType,
{
let client = Client::new();
let lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1617286224000000000";
// send write data encoded with gzip
let bucket_name = "MyBucket";
let org_name = "MyOrg";
let response = client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
))
.header(CONTENT_ENCODING, "gzip")
.body(gzip_str(lp_data))
.send()
.await;
check_response("gzip_write", response, StatusCode::NO_CONTENT, Some("")).await;
DbWrite::new(lines_to_batches(lp_data, 0).unwrap(), Default::default())
}
/// Assert that write to an invalid database behave as expected.
pub async fn assert_write_to_invalid_database<T>(test_server: TestServer<T>)
where
T: ServerType,
{
let client = Client::new();
let bucket_name = "NotMyBucket";
let org_name = "MyOrg";
let response = client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
))
.body("cpu bar=1 10")
.send()
.await;
check_response(
"write_to_invalid_databases",
response,
StatusCode::NOT_FOUND,
Some(""),
)
.await;
}
/// Assert that write metrics work.
///
/// The database `bucket_name="MyBucket", org_name="MyOrg"` must exist for this test to work.
///
/// If `test_incompatible` is set this will test the ingestion of schema-incompatible data.
pub async fn assert_write_metrics<T>(test_server: TestServer<T>, test_incompatible: bool)
where
T: ServerType,
{
let metric_registry = test_server.server_type().metric_registry();
let client = Client::new();
let lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1568756160";
let incompatible_lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=\"incompatible\" 1568756170";
// send good data
let org_name = "MyOrg";
let bucket_name = "MyBucket";
let post_url = format!(
"{}/api/v2/write?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
);
client
.post(&post_url)
.body(lp_data)
.send()
.await
.expect("sent data");
// The request completed successfully
let request_count = metric_registry
.get_instrument::<Metric<U64Counter>>("http_requests")
.unwrap();
let request_count_ok = request_count
.get_observer(&Attributes::from(&[
("path", "/api/v2/write"),
("status", "ok"),
]))
.unwrap()
.clone();
let request_count_client_error = request_count
.get_observer(&Attributes::from(&[
("path", "/api/v2/write"),
("status", "client_error"),
]))
.unwrap()
.clone();
let request_count_server_error = request_count
.get_observer(&Attributes::from(&[
("path", "/api/v2/write"),
("status", "server_error"),
]))
.unwrap()
.clone();
let request_duration_ok = metric_registry
.get_instrument::<Metric<DurationHistogram>>("http_request_duration")
.unwrap()
.get_observer(&Attributes::from(&[
("path", "/api/v2/write"),
("status", "ok"),
]))
.unwrap()
.clone();
assert_eq!(request_duration_ok.fetch().sample_count(), 1);
assert_eq!(request_count_ok.fetch(), 1);
assert_eq!(request_count_client_error.fetch(), 0);
assert_eq!(request_count_server_error.fetch(), 0);
// A single successful point landed
let ingest_lines = metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_lines")
.unwrap();
let ingest_lines_ok = ingest_lines
.get_observer(&Attributes::from(&[
("db_name", "MyOrg_MyBucket"),
("status", "ok"),
]))
.unwrap()
.clone();
let ingest_lines_error = ingest_lines
.get_observer(&Attributes::from(&[
("db_name", "MyOrg_MyBucket"),
("status", "error"),
]))
.unwrap()
.clone();
assert_eq!(ingest_lines_ok.fetch(), 1);
assert_eq!(ingest_lines_error.fetch(), 0);
// Which consists of two fields
let observation = metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_fields")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "MyOrg_MyBucket"),
("status", "ok"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 2);
// Bytes of data were written
let observation = metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_bytes")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "MyOrg_MyBucket"),
("status", "ok"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 98);
// Batch size distribution is measured
let observation = metric_registry
.get_instrument::<Metric<U64Histogram>>("ingest_batch_size_bytes")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "MyOrg_MyBucket"),
("status", "ok"),
]))
.unwrap()
.fetch();
assert_eq!(observation.total, 98);
assert_eq!(observation.buckets[0].count, 1);
assert_eq!(observation.buckets[1].count, 0);
// Write to a non-existent database
client
.post(&format!(
"{}/api/v2/write?bucket=NotMyBucket&org=NotMyOrg",
test_server.url(),
))
.body(lp_data)
.send()
.await
.unwrap();
// An invalid database should not be reported as a new metric
assert!(metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_lines")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "NotMyOrg_NotMyBucket"),
("status", "error"),
]))
.is_none());
assert_eq!(ingest_lines_ok.fetch(), 1);
assert_eq!(ingest_lines_error.fetch(), 0);
// Perform an invalid write
if test_incompatible {
client
.post(&post_url)
.body(incompatible_lp_data)
.send()
.await
.unwrap();
// This currently results in an InternalServerError which is correctly recorded
// as a server error, but this should probably be a BadRequest client error (#2538)
assert_eq!(ingest_lines_ok.fetch(), 1);
assert_eq!(ingest_lines_error.fetch(), 1);
assert_eq!(request_duration_ok.fetch().sample_count(), 1);
assert_eq!(request_count_ok.fetch(), 1);
assert_eq!(request_count_client_error.fetch(), 0);
assert_eq!(request_count_server_error.fetch(), 1);
}
}
fn gzip_str(s: &str) -> Vec<u8> {
use flate2::{write::GzEncoder, Compression};
use std::io::Write;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
write!(encoder, "{}", s).expect("writing into encoder");
encoder.finish().expect("successfully encoding gzip data")
}
}

View File

@ -402,10 +402,17 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
#[cfg(test)]
mod tests {
use crate::influxdb_ioxd::{
http::test_utils::{
assert_health, assert_metrics, check_response, get_content_type, TestServer,
http::{
test_utils::{
assert_health, assert_metrics, assert_tracing, check_response, get_content_type,
TestServer,
},
write::test_utils::{
assert_gzip_write, assert_write, assert_write_metrics,
assert_write_to_invalid_database,
},
},
server_type::{common_state::CommonServerState, ServerType},
server_type::common_state::CommonServerState,
};
use super::*;
@ -413,12 +420,11 @@ mod tests {
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use http::header::CONTENT_ENCODING;
use reqwest::Client;
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
use metric::{Attributes, DurationHistogram, Metric, U64Counter, U64Histogram};
use object_store::ObjectStore;
use schema::selection::Selection;
use server::{
connection::ConnectionManagerImpl, db::Db, rules::ProvidedDatabaseRules, ApplicationState,
Server,
@ -429,7 +435,7 @@ mod tests {
Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
None,
Some(Arc::new(RingBufferTraceCollector::new(5))),
))
}
@ -453,77 +459,57 @@ mod tests {
#[tokio::test]
async fn test_tracing() {
let trace_collector = Arc::new(RingBufferTraceCollector::new(5));
let application = Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
Some(Arc::<RingBufferTraceCollector>::clone(&trace_collector)),
));
let app_server = make_server(Arc::clone(&application));
let server_type =
DatabaseServerType::new(application, app_server, &CommonServerState::for_testing());
let test_server = TestServer::new(Arc::new(server_type));
let client = Client::new();
let response = client
.get(&format!("{}/health", test_server.url()))
.header("uber-trace-id", "34f3495:36e34:0:1")
.send()
.await;
// Print the response so if the test fails, we have a log of what went wrong
check_response("health", response, StatusCode::OK, Some("OK")).await;
let mut spans = trace_collector.spans();
assert_eq!(spans.len(), 1);
let span = spans.pop().unwrap();
assert_eq!(span.ctx.trace_id.get(), 0x34f3495);
assert_eq!(span.ctx.parent_span_id.unwrap().get(), 0x36e34);
assert_tracing(setup_server().await).await;
}
#[tokio::test]
async fn test_write() {
let test_server = setup_server().await;
async fn assert_dbwrite(
test_server: TestServer<DatabaseServerType<ConnectionManagerImpl>>,
write: DbWrite,
) {
let (table_name, mutable_batch) = write.tables().next().unwrap();
let client = Client::new();
let lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1617286224000000000";
// send write data
let bucket_name = "MyBucket";
let org_name = "MyOrg";
let response = client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
))
.body(lp_data)
.send()
.await;
check_response("write", response, StatusCode::NO_CONTENT, Some("")).await;
// Check that the data got into the right bucket
let test_db = test_server
.server_type()
.server
.db(&DatabaseName::new("MyOrg_MyBucket").unwrap())
.expect("Database exists");
let batches = run_query(test_db, &format!("select * from {}", table_name)).await;
let expected = arrow_util::display::pretty_format_batches(&[mutable_batch
.to_arrow(Selection::All)
.unwrap()])
.unwrap();
let expected = expected.split('\n');
let batches = run_query(test_db, "select * from h2o_temperature").await;
let expected = vec![
"+----------------+--------------+-------+-----------------+----------------------+",
"| bottom_degrees | location | state | surface_degrees | time |",
"+----------------+--------------+-------+-----------------+----------------------+",
"| 50.4 | santa_monica | CA | 65.2 | 2021-04-01T14:10:24Z |",
"+----------------+--------------+-------+-----------------+----------------------+",
];
assert_batches_eq!(expected, &batches);
}
#[tokio::test]
async fn test_write() {
let test_server = setup_server().await;
let write = assert_write(&test_server).await;
assert_dbwrite(test_server, write).await;
}
#[tokio::test]
async fn test_write_metrics() {
assert_write_metrics(setup_server().await, true).await;
}
#[tokio::test]
async fn test_gzip_write() {
let test_server = setup_server().await;
let write = assert_gzip_write(&test_server).await;
assert_dbwrite(test_server, write).await;
}
#[tokio::test]
async fn write_to_invalid_database() {
assert_write_to_invalid_database(setup_server().await).await;
}
#[tokio::test]
async fn test_delete() {
// Set up server
@ -652,179 +638,6 @@ mod tests {
.await;
}
#[tokio::test]
async fn test_write_metrics() {
let test_server = setup_server().await;
let metric_registry = test_server.server_type().metric_registry();
let client = Client::new();
let lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1568756160";
let incompatible_lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=\"incompatible\" 1568756170";
// send good data
let org_name = "MyOrg";
let bucket_name = "MyBucket";
let post_url = format!(
"{}/api/v2/write?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
);
client
.post(&post_url)
.body(lp_data)
.send()
.await
.expect("sent data");
// The request completed successfully
let request_count = metric_registry
.get_instrument::<Metric<U64Counter>>("http_requests")
.unwrap();
let request_count_ok = request_count
.get_observer(&Attributes::from(&[
("path", "/api/v2/write"),
("status", "ok"),
]))
.unwrap()
.clone();
let request_count_client_error = request_count
.get_observer(&Attributes::from(&[
("path", "/api/v2/write"),
("status", "client_error"),
]))
.unwrap()
.clone();
let request_count_server_error = request_count
.get_observer(&Attributes::from(&[
("path", "/api/v2/write"),
("status", "server_error"),
]))
.unwrap()
.clone();
let request_duration_ok = metric_registry
.get_instrument::<Metric<DurationHistogram>>("http_request_duration")
.unwrap()
.get_observer(&Attributes::from(&[
("path", "/api/v2/write"),
("status", "ok"),
]))
.unwrap()
.clone();
assert_eq!(request_duration_ok.fetch().sample_count(), 1);
assert_eq!(request_count_ok.fetch(), 1);
assert_eq!(request_count_client_error.fetch(), 0);
assert_eq!(request_count_server_error.fetch(), 0);
// A single successful point landed
let ingest_lines = metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_lines")
.unwrap();
let ingest_lines_ok = ingest_lines
.get_observer(&Attributes::from(&[
("db_name", "MyOrg_MyBucket"),
("status", "ok"),
]))
.unwrap()
.clone();
let ingest_lines_error = ingest_lines
.get_observer(&Attributes::from(&[
("db_name", "MyOrg_MyBucket"),
("status", "error"),
]))
.unwrap()
.clone();
assert_eq!(ingest_lines_ok.fetch(), 1);
assert_eq!(ingest_lines_error.fetch(), 0);
// Which consists of two fields
let observation = metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_fields")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "MyOrg_MyBucket"),
("status", "ok"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 2);
// Bytes of data were written
let observation = metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_bytes")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "MyOrg_MyBucket"),
("status", "ok"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 98);
// Batch size distribution is measured
let observation = metric_registry
.get_instrument::<Metric<U64Histogram>>("ingest_batch_size_bytes")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "MyOrg_MyBucket"),
("status", "ok"),
]))
.unwrap()
.fetch();
assert_eq!(observation.total, 98);
assert_eq!(observation.buckets[0].count, 1);
assert_eq!(observation.buckets[1].count, 0);
// Write to a non-existent database
client
.post(&format!(
"{}/api/v2/write?bucket=NotMyBucket&org=NotMyOrg",
test_server.url(),
))
.body(lp_data)
.send()
.await
.unwrap();
// An invalid database should not be reported as a new metric
assert!(metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_lines")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "NotMyOrg_NotMyBucket"),
("status", "error"),
]))
.is_none());
assert_eq!(ingest_lines_ok.fetch(), 1);
assert_eq!(ingest_lines_error.fetch(), 0);
// Perform an invalid write
client
.post(&post_url)
.body(incompatible_lp_data)
.send()
.await
.unwrap();
// This currently results in an InternalServerError which is correctly recorded
// as a server error, but this should probably be a BadRequest client error (#2538)
assert_eq!(ingest_lines_ok.fetch(), 1);
assert_eq!(ingest_lines_error.fetch(), 1);
assert_eq!(request_duration_ok.fetch().sample_count(), 1);
assert_eq!(request_count_ok.fetch(), 1);
assert_eq!(request_count_client_error.fetch(), 0);
assert_eq!(request_count_server_error.fetch(), 1);
}
/// Sets up a test database with some data for testing the query endpoint
/// returns a client for communicating with the server, and the server
/// endpoint
@ -956,85 +769,6 @@ mod tests {
check_response("query", response, StatusCode::OK, Some(res)).await;
}
fn gzip_str(s: &str) -> Vec<u8> {
use flate2::{write::GzEncoder, Compression};
use std::io::Write;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
write!(encoder, "{}", s).expect("writing into encoder");
encoder.finish().expect("successfully encoding gzip data")
}
#[tokio::test]
async fn test_gzip_write() {
let test_server = setup_server().await;
let client = Client::new();
let lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1617286224000000000";
// send write data encoded with gzip
let bucket_name = "MyBucket";
let org_name = "MyOrg";
let response = client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
))
.header(CONTENT_ENCODING, "gzip")
.body(gzip_str(lp_data))
.send()
.await;
check_response("gzip_write", response, StatusCode::NO_CONTENT, Some("")).await;
// Check that the data got into the right bucket
let test_db = test_server
.server_type()
.server
.db(&DatabaseName::new("MyOrg_MyBucket").unwrap())
.expect("Database exists");
let batches = run_query(test_db, "select * from h2o_temperature").await;
let expected = vec![
"+----------------+--------------+-------+-----------------+----------------------+",
"| bottom_degrees | location | state | surface_degrees | time |",
"+----------------+--------------+-------+-----------------+----------------------+",
"| 50.4 | santa_monica | CA | 65.2 | 2021-04-01T14:10:24Z |",
"+----------------+--------------+-------+-----------------+----------------------+",
];
assert_batches_eq!(expected, &batches);
}
#[tokio::test]
async fn write_to_invalid_database() {
let test_server = setup_server().await;
let client = Client::new();
let bucket_name = "NotMyBucket";
let org_name = "MyOrg";
let response = client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
test_server.url(),
bucket_name,
org_name
))
.body("cpu bar=1 10")
.send()
.await;
check_response(
"write_to_invalid_databases",
response,
StatusCode::NOT_FOUND,
Some(""),
)
.await;
}
/// Run the specified SQL query and return formatted results as a string
async fn run_query(db: Arc<Db>, query: &str) -> Vec<RecordBatch> {
let ctx = db.new_query_context(None);

View File

@ -1,7 +1,18 @@
use hyper::{Body, Method, Request, Response};
use snafu::Snafu;
use std::sync::Arc;
use crate::influxdb_ioxd::server_type::RouteError;
use async_trait::async_trait;
use data_types::DatabaseName;
use hyper::{Body, Method, Request, Response};
use mutable_batch::DbWrite;
use snafu::{ResultExt, Snafu};
use crate::influxdb_ioxd::{
http::{
metrics::LineProtocolMetrics,
write::{HttpDrivenWrite, InnerWriteError, RequestOrResponse},
},
server_type::RouteError,
};
use super::RouterServerType;
@ -9,41 +20,86 @@ use super::RouterServerType;
pub enum ApplicationError {
#[snafu(display("No handler for {:?} {}", method, path))]
RouteNotFound { method: Method, path: String },
#[snafu(display("Cannot write data: {}", source))]
WriteError {
source: crate::influxdb_ioxd::http::write::HttpWriteError,
},
}
impl RouteError for ApplicationError {
fn response(&self) -> http::Response<hyper::Body> {
match self {
ApplicationError::RouteNotFound { .. } => self.not_found(),
Self::RouteNotFound { .. } => self.not_found(),
Self::WriteError { source } => source.response(),
}
}
}
#[async_trait]
impl HttpDrivenWrite for RouterServerType {
fn max_request_size(&self) -> usize {
self.max_request_size
}
fn lp_metrics(&self) -> Arc<LineProtocolMetrics> {
Arc::clone(&self.lp_metrics)
}
async fn write(
&self,
db_name: &DatabaseName<'_>,
write: DbWrite,
) -> Result<(), InnerWriteError> {
match self.server.router(db_name) {
Some(router) => router
.write(write)
.await
.map_err(|e| InnerWriteError::OtherError {
source: Box::new(e),
}),
None => Err(InnerWriteError::NotFound {
db_name: db_name.to_string(),
}),
}
}
}
#[allow(clippy::match_single_binding)]
pub async fn route_request(
_server_type: &RouterServerType,
server_type: &RouterServerType,
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
let method = req.method().clone();
let uri = req.uri().clone();
match (method, uri.path()) {
(method, path) => Err(ApplicationError::RouteNotFound {
method,
path: path.to_string(),
match server_type
.route_write_http_request(req)
.await
.context(WriteError)?
{
RequestOrResponse::Response(resp) => Ok(resp),
RequestOrResponse::Request(req) => Err(ApplicationError::RouteNotFound {
method: req.method().clone(),
path: req.uri().path().to_string(),
}),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};
use router::server::RouterServer;
use data_types::server_id::ServerId;
use router::{grpc_client::MockClient, resolver::RemoteTemplate, server::RouterServer};
use time::SystemProvider;
use trace::RingBufferTraceCollector;
use crate::influxdb_ioxd::{
http::test_utils::{assert_health, assert_metrics, TestServer},
http::{
test_utils::{assert_health, assert_metrics, assert_tracing, TestServer},
write::test_utils::{
assert_gzip_write, assert_write, assert_write_metrics,
assert_write_to_invalid_database,
},
},
server_type::common_state::CommonServerState,
};
@ -59,12 +115,91 @@ mod tests {
assert_metrics(test_server().await).await;
}
#[tokio::test]
async fn test_tracing() {
assert_tracing(test_server().await).await;
}
#[tokio::test]
async fn test_write() {
let test_server = test_server().await;
let write = assert_write(&test_server).await;
assert_dbwrite(test_server, write).await;
}
#[tokio::test]
async fn test_gzip_write() {
let test_server = test_server().await;
let write = assert_gzip_write(&test_server).await;
assert_dbwrite(test_server, write).await;
}
#[tokio::test]
async fn test_write_metrics() {
assert_write_metrics(test_server().await, false).await;
}
#[tokio::test]
async fn test_write_to_invalid_database() {
assert_write_to_invalid_database(test_server().await).await;
}
async fn test_server() -> TestServer<RouterServerType> {
use data_types::router::{
Matcher, MatcherToShard, Router, ShardConfig, ShardId, WriteSink, WriteSinkSet,
WriteSinkVariant,
};
use regex::Regex;
let common_state = CommonServerState::for_testing();
let time_provider = Arc::new(SystemProvider::new());
let server =
Arc::new(RouterServer::new(None, common_state.trace_collector(), time_provider).await);
let server_id_1 = ServerId::try_from(1).unwrap();
let remote_template = RemoteTemplate::new("{id}");
let server = Arc::new(
RouterServer::for_testing(
Some(remote_template),
Some(Arc::new(RingBufferTraceCollector::new(1))),
time_provider,
)
.await,
);
server.update_router(Router {
name: String::from("MyOrg_MyBucket"),
write_sharder: ShardConfig {
specific_targets: vec![MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new(".*").unwrap()),
},
shard: ShardId::new(1),
}],
hash_ring: None,
},
write_sinks: BTreeMap::from([(
ShardId::new(1),
WriteSinkSet {
sinks: vec![WriteSink {
ignore_errors: false,
sink: WriteSinkVariant::GrpcRemote(server_id_1),
}],
},
)]),
query_sinks: Default::default(),
});
let server_type = Arc::new(RouterServerType::new(server, &common_state));
TestServer::new(server_type)
}
async fn assert_dbwrite(test_server: TestServer<RouterServerType>, write: DbWrite) {
let grpc_client = test_server
.server_type()
.server
.connection_pool()
.grpc_client("1")
.await
.unwrap();
let grpc_client = grpc_client.as_any().downcast_ref::<MockClient>().unwrap();
grpc_client.assert_writes(&[(String::from("MyOrg_MyBucket"), write)]);
}
}

View File

@ -8,6 +8,7 @@ use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
use crate::influxdb_ioxd::{
http::metrics::LineProtocolMetrics,
rpc::RpcBuilderInput,
server_type::{common_state::CommonServerState, RpcError, ServerType},
serving_readiness::ServingReadiness,
@ -23,14 +24,20 @@ pub struct RouterServerType {
server: Arc<RouterServer>,
serving_readiness: ServingReadiness,
shutdown: CancellationToken,
max_request_size: usize,
lp_metrics: Arc<LineProtocolMetrics>,
}
impl RouterServerType {
pub fn new(server: Arc<RouterServer>, common_state: &CommonServerState) -> Self {
let lp_metrics = Arc::new(LineProtocolMetrics::new(server.metric_registry().as_ref()));
Self {
server,
serving_readiness: common_state.serving_readiness().clone(),
shutdown: CancellationToken::new(),
max_request_size: common_state.run_config().max_http_request_size,
lp_metrics,
}
}
}