Merge branch 'main' into crepererum/remove_routing_from_database_mode_1
commit
6e1f86ca48
|
@ -5130,6 +5130,7 @@ dependencies = [
|
|||
"tempfile",
|
||||
"time 0.1.0",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"trace",
|
||||
"trace_http",
|
||||
"uuid",
|
||||
|
|
|
@ -4,11 +4,13 @@ use async_trait::async_trait;
|
|||
use chrono::Utc;
|
||||
use data_types::{
|
||||
names::{org_and_bucket_to_database, OrgBucketMappingError},
|
||||
non_empty::NonEmptyString,
|
||||
DatabaseName,
|
||||
};
|
||||
use dml::{DmlMeta, DmlWrite};
|
||||
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
|
||||
use hyper::{Body, Method, Request, Response, StatusCode};
|
||||
use observability_deps::tracing::debug;
|
||||
use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request};
|
||||
use serde::Deserialize;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
|
||||
|
@ -21,17 +23,41 @@ use super::{
|
|||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum HttpWriteError {
|
||||
pub enum HttpDmlError {
|
||||
#[snafu(display("Internal error mapping org & bucket: {}", source))]
|
||||
BucketMappingError { source: OrgBucketMappingError },
|
||||
|
||||
#[snafu(display("User error writing points into {}: {}", db_name, source))]
|
||||
WritingPointsUser {
|
||||
db_name: String,
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
|
||||
#[snafu(display("Internal error writing points into {}: {}", db_name, source))]
|
||||
WritingPointsInternal {
|
||||
db_name: String,
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Internal error writing points into org {}, bucket {}: {}",
|
||||
"User error writing deleting into org {}, bucket {}: {}",
|
||||
org,
|
||||
bucket_name,
|
||||
source
|
||||
))]
|
||||
WritingPoints {
|
||||
DeletingPointsUser {
|
||||
org: String,
|
||||
bucket_name: String,
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Internal error deleting points into org {}, bucket {}: {}",
|
||||
org,
|
||||
bucket_name,
|
||||
source
|
||||
))]
|
||||
DeletingPointsInternal {
|
||||
org: String,
|
||||
bucket_name: String,
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
|
@ -55,39 +81,99 @@ pub enum HttpWriteError {
|
|||
ParsingLineProtocol { source: mutable_batch_lp::Error },
|
||||
|
||||
#[snafu(display("Database {} not found", db_name))]
|
||||
DatabaseNotFound { db_name: String },
|
||||
NotFoundDatabase { db_name: String },
|
||||
|
||||
#[snafu(display("Table {}:{} not found", db_name, table_name))]
|
||||
NotFoundTable { db_name: String, table_name: String },
|
||||
|
||||
#[snafu(display("Cannot parse body: {}", source))]
|
||||
ParseBody {
|
||||
source: crate::influxdb_ioxd::http::utils::ParseBodyError,
|
||||
},
|
||||
|
||||
#[snafu(display("Error parsing delete {}: {}", input, source))]
|
||||
ParsingDelete {
|
||||
source: predicate::delete_predicate::Error,
|
||||
input: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Error building delete predicate {}: {}", input, source))]
|
||||
BuildingDeletePredicate {
|
||||
source: predicate::delete_predicate::Error,
|
||||
input: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl HttpApiErrorSource for HttpWriteError {
|
||||
impl HttpApiErrorSource for HttpDmlError {
|
||||
fn to_http_api_error(&self) -> HttpApiError {
|
||||
match self {
|
||||
e @ Self::BucketMappingError { .. } => e.internal_error(),
|
||||
e @ Self::WritingPoints { .. } => e.internal_error(),
|
||||
e @ Self::WritingPointsInternal { .. } => e.internal_error(),
|
||||
e @ Self::WritingPointsUser { .. } => e.invalid(),
|
||||
e @ Self::DeletingPointsInternal { .. } => e.internal_error(),
|
||||
e @ Self::DeletingPointsUser { .. } => e.invalid(),
|
||||
e @ Self::ExpectedQueryString { .. } => e.invalid(),
|
||||
e @ Self::InvalidQueryString { .. } => e.invalid(),
|
||||
e @ Self::ReadingBodyAsUtf8 { .. } => e.invalid(),
|
||||
e @ Self::ParsingLineProtocol { .. } => e.invalid(),
|
||||
e @ Self::DatabaseNotFound { .. } => e.not_found(),
|
||||
e @ Self::NotFoundDatabase { .. } => e.not_found(),
|
||||
e @ Self::NotFoundTable { .. } => e.not_found(),
|
||||
Self::ParseBody { source } => source.to_http_api_error(),
|
||||
e @ Self::ParsingDelete { .. } => e.invalid(),
|
||||
e @ Self::BuildingDeletePredicate { .. } => e.invalid(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Write error when calling the underlying server type.
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum InnerWriteError {
|
||||
pub enum InnerDmlError {
|
||||
#[snafu(display("Database {} not found", db_name))]
|
||||
NotFound { db_name: String },
|
||||
DatabaseNotFound { db_name: String },
|
||||
|
||||
#[snafu(display("Error while writing: {}", source))]
|
||||
OtherError {
|
||||
#[snafu(display("Table {}:{} not found", db_name, table_name))]
|
||||
TableNotFound { db_name: String, table_name: String },
|
||||
|
||||
#[snafu(display("User-provoked error while processing DML request: {}", source))]
|
||||
UserError {
|
||||
db_name: String,
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
|
||||
#[snafu(display("Internal error while processing DML request: {}", source))]
|
||||
InternalError {
|
||||
db_name: String,
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<InnerDmlError> for HttpDmlError {
|
||||
fn from(e: InnerDmlError) -> Self {
|
||||
match e {
|
||||
InnerDmlError::DatabaseNotFound { db_name } => {
|
||||
debug!(%db_name, "database not found");
|
||||
Self::NotFoundDatabase { db_name }
|
||||
}
|
||||
InnerDmlError::TableNotFound {
|
||||
db_name,
|
||||
table_name,
|
||||
} => {
|
||||
debug!(%db_name, %table_name, "table not found");
|
||||
Self::NotFoundTable {
|
||||
db_name,
|
||||
table_name,
|
||||
}
|
||||
}
|
||||
InnerDmlError::UserError { db_name, source } => {
|
||||
debug!(e=%source, %db_name, "error writing lines");
|
||||
Self::WritingPointsUser { db_name, source }
|
||||
}
|
||||
InnerDmlError::InternalError { db_name, source } => {
|
||||
debug!(e=%source, %db_name, "error writing lines");
|
||||
Self::WritingPointsInternal { db_name, source }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Contains a request or a response.
|
||||
|
@ -102,7 +188,7 @@ pub enum RequestOrResponse {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait HttpDrivenWrite: ServerType {
|
||||
pub trait HttpDrivenDml: ServerType {
|
||||
/// Routes HTTP write requests.
|
||||
///
|
||||
/// Returns `RequestOrResponse::Response` if the request was routed,
|
||||
|
@ -110,7 +196,7 @@ pub trait HttpDrivenWrite: ServerType {
|
|||
async fn route_write_http_request(
|
||||
&self,
|
||||
req: Request<Body>,
|
||||
) -> Result<RequestOrResponse, HttpWriteError> {
|
||||
) -> Result<RequestOrResponse, HttpDmlError> {
|
||||
if (req.method() != Method::POST) || (req.uri().path() != "/api/v2/write") {
|
||||
return Ok(RequestOrResponse::Request(req));
|
||||
}
|
||||
|
@ -149,7 +235,7 @@ pub trait HttpDrivenWrite: ServerType {
|
|||
.unwrap(),
|
||||
));
|
||||
}
|
||||
Err(source) => return Err(HttpWriteError::ParsingLineProtocol { source }),
|
||||
Err(source) => return Err(HttpDmlError::ParsingLineProtocol { source }),
|
||||
};
|
||||
|
||||
debug!(
|
||||
|
@ -164,7 +250,7 @@ pub trait HttpDrivenWrite: ServerType {
|
|||
|
||||
let write = DmlWrite::new(tables, DmlMeta::unsequenced(span_ctx));
|
||||
|
||||
match self.write(&db_name, write).await {
|
||||
match self.write(&db_name, DmlOperation::Write(write)).await {
|
||||
Ok(_) => {
|
||||
lp_metrics.record_write(
|
||||
&db_name,
|
||||
|
@ -180,15 +266,13 @@ pub trait HttpDrivenWrite: ServerType {
|
|||
.unwrap(),
|
||||
))
|
||||
}
|
||||
Err(InnerWriteError::NotFound { .. }) => {
|
||||
debug!(%db_name, ?stats, "database not found");
|
||||
Err(
|
||||
e @ (InnerDmlError::DatabaseNotFound { .. } | InnerDmlError::TableNotFound { .. }),
|
||||
) => {
|
||||
// Purposefully do not record ingest metrics
|
||||
Err(HttpWriteError::DatabaseNotFound {
|
||||
db_name: db_name.to_string(),
|
||||
})
|
||||
Err(e.into())
|
||||
}
|
||||
Err(InnerWriteError::OtherError { source }) => {
|
||||
debug!(e=%source, %db_name, ?stats, "error writing lines");
|
||||
Err(e @ (InnerDmlError::UserError { .. } | InnerDmlError::InternalError { .. })) => {
|
||||
lp_metrics.record_write(
|
||||
&db_name,
|
||||
stats.num_lines,
|
||||
|
@ -196,31 +280,106 @@ pub trait HttpDrivenWrite: ServerType {
|
|||
body.len(),
|
||||
false,
|
||||
);
|
||||
Err(HttpWriteError::WritingPoints {
|
||||
org: write_info.org.clone(),
|
||||
bucket_name: write_info.bucket.clone(),
|
||||
source,
|
||||
})
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Routes HTTP delete requests.
|
||||
///
|
||||
/// Returns `RequestOrResponse::Response` if the request was routed,
|
||||
/// Returns `RequestOrResponse::Response` if the request did not match (and needs to be handled some other way)
|
||||
async fn route_delete_http_request(
|
||||
&self,
|
||||
req: Request<Body>,
|
||||
) -> Result<RequestOrResponse, HttpDmlError> {
|
||||
if (req.method() != Method::POST) || (req.uri().path() != "/api/v2/delete") {
|
||||
return Ok(RequestOrResponse::Request(req));
|
||||
}
|
||||
|
||||
let span_ctx = req.extensions().get().cloned();
|
||||
|
||||
let max_request_size = self.max_request_size();
|
||||
|
||||
// Extract the DB name from the request
|
||||
// db_name = orrID_bucketID
|
||||
let query = req.uri().query().context(ExpectedQueryString)?;
|
||||
let delete_info: WriteInfo =
|
||||
serde_urlencoded::from_str(query).context(InvalidQueryString {
|
||||
query_string: String::from(query),
|
||||
})?;
|
||||
let db_name = org_and_bucket_to_database(&delete_info.org, &delete_info.bucket)
|
||||
.context(BucketMappingError)?;
|
||||
|
||||
// Parse body
|
||||
let body = parse_body(req, max_request_size).await.context(ParseBody)?;
|
||||
let body = std::str::from_utf8(&body).context(ReadingBodyAsUtf8)?;
|
||||
|
||||
// Parse and extract table name (which can be empty), start, stop, and predicate
|
||||
let parsed_delete =
|
||||
parse_http_delete_request(body).context(ParsingDelete { input: body })?;
|
||||
|
||||
let table_name = parsed_delete.table_name;
|
||||
let predicate = parsed_delete.predicate;
|
||||
let start = parsed_delete.start_time;
|
||||
let stop = parsed_delete.stop_time;
|
||||
debug!(%table_name, %predicate, %start, %stop, body_size=body.len(), %db_name, org=%delete_info.org, bucket=%delete_info.bucket, "delete data from database");
|
||||
|
||||
// Build delete predicate
|
||||
let predicate = parse_delete_predicate(&start, &stop, &predicate)
|
||||
.context(BuildingDeletePredicate { input: body })?;
|
||||
|
||||
let delete = DmlDelete::new(
|
||||
predicate,
|
||||
NonEmptyString::new(table_name),
|
||||
DmlMeta::unsequenced(span_ctx),
|
||||
);
|
||||
|
||||
match self.write(&db_name, DmlOperation::Delete(delete)).await {
|
||||
Ok(_) => Ok(RequestOrResponse::Response(
|
||||
Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Routes HTTP DML requests.
|
||||
///
|
||||
/// Combines:
|
||||
/// - [`route_delete_http_request`](Self::route_delete_http_request)
|
||||
/// - [`route_write_http_request`](Self::route_write_http_request)
|
||||
///
|
||||
/// Returns `RequestOrResponse::Response` if the request was routed,
|
||||
/// Returns `RequestOrResponse::Response` if the request did not match (and needs to be handled some other way)
|
||||
async fn route_dml_http_request(
|
||||
&self,
|
||||
req: Request<Body>,
|
||||
) -> Result<RequestOrResponse, HttpDmlError> {
|
||||
match self.route_delete_http_request(req).await? {
|
||||
RequestOrResponse::Response(resp) => Ok(RequestOrResponse::Response(resp)),
|
||||
RequestOrResponse::Request(req) => self.route_write_http_request(req).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// Max request size.
|
||||
fn max_request_size(&self) -> usize;
|
||||
|
||||
/// Line protocol metrics.
|
||||
fn lp_metrics(&self) -> Arc<LineProtocolMetrics>;
|
||||
|
||||
/// Perform write.
|
||||
/// Perform DML operation.
|
||||
async fn write(
|
||||
&self,
|
||||
db_name: &DatabaseName<'_>,
|
||||
write: DmlWrite,
|
||||
) -> Result<(), InnerWriteError>;
|
||||
op: DmlOperation,
|
||||
) -> Result<(), InnerDmlError>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
/// Body of the request to the /write endpoint
|
||||
/// Body of the request to the dml endpoints
|
||||
pub struct WriteInfo {
|
||||
pub org: String,
|
||||
pub bucket: String,
|
||||
|
@ -513,6 +672,94 @@ pub mod test_utils {
|
|||
}
|
||||
}
|
||||
|
||||
/// Assert that deleting from an unknown database/router returns the expected message and error code.
|
||||
pub async fn assert_delete_unknown_database<T>(test_server: TestServer<T>)
|
||||
where
|
||||
T: ServerType,
|
||||
{
|
||||
// delete from non-existing table
|
||||
let client = Client::new();
|
||||
let delete_line = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=not_a_table and location=Boston"}"#;
|
||||
let bucket_name = "MyBucket";
|
||||
let org_name = "NotMyOrg";
|
||||
let response = client
|
||||
.post(&format!(
|
||||
"{}/api/v2/delete?bucket={}&org={}",
|
||||
test_server.url(),
|
||||
bucket_name,
|
||||
org_name
|
||||
))
|
||||
.body(delete_line)
|
||||
.send()
|
||||
.await;
|
||||
check_response(
|
||||
"delete",
|
||||
response,
|
||||
StatusCode::NOT_FOUND,
|
||||
Some("Database NotMyOrg_MyBucket not found"),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Assert that deleting from an unknown table returns the expected message and error code.
|
||||
pub async fn assert_delete_unknown_table<T>(test_server: TestServer<T>)
|
||||
where
|
||||
T: ServerType,
|
||||
{
|
||||
// delete from non-existing table
|
||||
let client = Client::new();
|
||||
let delete_line = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=not_a_table and location=Boston"}"#;
|
||||
let bucket_name = "MyBucket";
|
||||
let org_name = "MyOrg";
|
||||
let response = client
|
||||
.post(&format!(
|
||||
"{}/api/v2/delete?bucket={}&org={}",
|
||||
test_server.url(),
|
||||
bucket_name,
|
||||
org_name
|
||||
))
|
||||
.body(delete_line)
|
||||
.send()
|
||||
.await;
|
||||
check_response(
|
||||
"delete",
|
||||
response,
|
||||
StatusCode::NOT_FOUND,
|
||||
Some("Table MyOrg_MyBucket:not_a_table not found"),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Assert that deleting with a malformed body returns the expected message and error code.
|
||||
pub async fn assert_delete_bad_request<T>(test_server: TestServer<T>)
|
||||
where
|
||||
T: ServerType,
|
||||
{
|
||||
// Not able to parse _measurement="not_a_table" (it must be _measurement=\"not_a_table\" to work)
|
||||
let client = Client::new();
|
||||
let delete_line = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement="not_a_table" and location=Boston"}"#;
|
||||
let bucket_name = "MyBucket";
|
||||
let org_name = "MyOrg";
|
||||
let response = client
|
||||
.post(&format!(
|
||||
"{}/api/v2/delete?bucket={}&org={}",
|
||||
test_server.url(),
|
||||
bucket_name,
|
||||
org_name
|
||||
))
|
||||
.body(delete_line)
|
||||
.send()
|
||||
.await;
|
||||
check_response(
|
||||
"delete",
|
||||
response,
|
||||
StatusCode::BAD_REQUEST,
|
||||
Some("Unable to parse delete string"),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// GZIP the given string.
|
||||
fn gzip_str(s: &str) -> Vec<u8> {
|
||||
use flate2::{write::GzEncoder, Compression};
|
||||
use std::io::Write;
|
|
@ -23,10 +23,10 @@ mod heappy;
|
|||
#[cfg(feature = "pprof")]
|
||||
mod pprof;
|
||||
|
||||
pub mod dml;
|
||||
pub mod error;
|
||||
pub mod metrics;
|
||||
pub mod utils;
|
||||
pub mod write;
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test_utils;
|
||||
|
|
|
@ -11,33 +11,28 @@
|
|||
//! database names and may remove this quasi /v2 API.
|
||||
|
||||
// Influx crates
|
||||
use data_types::{
|
||||
names::{org_and_bucket_to_database, OrgBucketMappingError},
|
||||
DatabaseName,
|
||||
};
|
||||
use data_types::{names::OrgBucketMappingError, DatabaseName};
|
||||
use influxdb_iox_client::format::QueryOutputFormat;
|
||||
use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request};
|
||||
use query::exec::ExecutionContextProvider;
|
||||
use server::{connection::ConnectionManager, Error};
|
||||
|
||||
// External crates
|
||||
use async_trait::async_trait;
|
||||
use http::header::CONTENT_TYPE;
|
||||
use hyper::{Body, Method, Request, Response, StatusCode};
|
||||
use hyper::{Body, Method, Request, Response};
|
||||
use observability_deps::tracing::{debug, error};
|
||||
use serde::Deserialize;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
|
||||
use crate::influxdb_ioxd::{
|
||||
http::{
|
||||
dml::{HttpDrivenDml, InnerDmlError, RequestOrResponse},
|
||||
error::{HttpApiError, HttpApiErrorExt, HttpApiErrorSource},
|
||||
metrics::LineProtocolMetrics,
|
||||
utils::parse_body,
|
||||
write::{HttpDrivenWrite, InnerWriteError, RequestOrResponse, WriteInfo},
|
||||
},
|
||||
planner::Planner,
|
||||
};
|
||||
use dml::DmlWrite;
|
||||
use dml::DmlOperation;
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
str::{self, FromStr},
|
||||
|
@ -69,27 +64,6 @@ pub enum ApplicationError {
|
|||
source: serde_urlencoded::de::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Error reading request body as utf8: {}", source))]
|
||||
ReadingBodyAsUtf8 { source: std::str::Utf8Error },
|
||||
|
||||
#[snafu(display("Error parsing delete {}: {}", input, source))]
|
||||
ParsingDelete {
|
||||
source: predicate::delete_predicate::Error,
|
||||
input: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Error building delete predicate {}: {}", input, source))]
|
||||
BuildingDeletePredicate {
|
||||
source: predicate::delete_predicate::Error,
|
||||
input: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Error executing delete {}: {}", input, source))]
|
||||
ExecutingDelete {
|
||||
source: server::db::Error,
|
||||
input: String,
|
||||
},
|
||||
|
||||
#[snafu(display("No handler for {:?} {}", method, path))]
|
||||
RouteNotFound { method: Method, path: String },
|
||||
|
||||
|
@ -139,14 +113,9 @@ pub enum ApplicationError {
|
|||
#[snafu(display("Internal server error"))]
|
||||
InternalServerError,
|
||||
|
||||
#[snafu(display("Cannot parse body: {}", source))]
|
||||
ParseBody {
|
||||
source: crate::influxdb_ioxd::http::utils::ParseBodyError,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot write data: {}", source))]
|
||||
WriteError {
|
||||
source: crate::influxdb_ioxd::http::write::HttpWriteError,
|
||||
#[snafu(display("Cannot perform DML operation: {}", source))]
|
||||
DmlError {
|
||||
source: crate::influxdb_ioxd::http::dml::HttpDmlError,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -159,10 +128,6 @@ impl HttpApiErrorSource for ApplicationError {
|
|||
e @ Self::Query { .. } => e.internal_error(),
|
||||
e @ Self::ExpectedQueryString { .. } => e.invalid(),
|
||||
e @ Self::InvalidQueryString { .. } => e.invalid(),
|
||||
e @ Self::ReadingBodyAsUtf8 { .. } => e.invalid(),
|
||||
e @ Self::ParsingDelete { .. } => e.invalid(),
|
||||
e @ Self::BuildingDeletePredicate { .. } => e.invalid(),
|
||||
e @ Self::ExecutingDelete { .. } => e.invalid(),
|
||||
e @ Self::RouteNotFound { .. } => e.not_found(),
|
||||
e @ Self::DatabaseNameError { .. } => e.invalid(),
|
||||
e @ Self::DatabaseNotFound { .. } => e.not_found(),
|
||||
|
@ -174,8 +139,7 @@ impl HttpApiErrorSource for ApplicationError {
|
|||
e @ Self::ServerNotInitialized => e.invalid(),
|
||||
e @ Self::DatabaseNotInitialized { .. } => e.invalid(),
|
||||
e @ Self::InternalServerError => e.internal_error(),
|
||||
Self::ParseBody { source } => source.to_http_api_error(),
|
||||
Self::WriteError { source } => source.to_http_api_error(),
|
||||
Self::DmlError { source } => source.to_http_api_error(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -198,7 +162,7 @@ impl From<server::Error> for ApplicationError {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<M> HttpDrivenWrite for DatabaseServerType<M>
|
||||
impl<M> HttpDrivenDml for DatabaseServerType<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
|
@ -213,19 +177,47 @@ where
|
|||
async fn write(
|
||||
&self,
|
||||
db_name: &DatabaseName<'_>,
|
||||
write: DmlWrite,
|
||||
) -> Result<(), InnerWriteError> {
|
||||
self.server
|
||||
.write(db_name, write)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
server::Error::DatabaseNotFound { .. } => InnerWriteError::NotFound {
|
||||
db_name: db_name.to_string(),
|
||||
},
|
||||
e => InnerWriteError::OtherError {
|
||||
source: Box::new(e),
|
||||
},
|
||||
})
|
||||
op: DmlOperation,
|
||||
) -> Result<(), InnerDmlError> {
|
||||
match op {
|
||||
DmlOperation::Write(write) => {
|
||||
self.server
|
||||
.write(db_name, write)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
server::Error::DatabaseNotFound { .. } => InnerDmlError::DatabaseNotFound {
|
||||
db_name: db_name.to_string(),
|
||||
},
|
||||
e => InnerDmlError::InternalError {
|
||||
db_name: db_name.to_string(),
|
||||
source: Box::new(e),
|
||||
},
|
||||
})
|
||||
}
|
||||
DmlOperation::Delete(delete) => {
|
||||
let db = self
|
||||
.server
|
||||
.db(db_name)
|
||||
.map_err(|_| InnerDmlError::DatabaseNotFound {
|
||||
db_name: db_name.to_string(),
|
||||
})?;
|
||||
|
||||
db.store_delete(&delete).map_err(|e| match e {
|
||||
server::db::Error::DeleteFromTable { table_name, .. } => {
|
||||
InnerDmlError::TableNotFound {
|
||||
db_name: db_name.to_string(),
|
||||
table_name,
|
||||
}
|
||||
}
|
||||
e => InnerDmlError::InternalError {
|
||||
db_name: db_name.to_string(),
|
||||
source: Box::new(e),
|
||||
},
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -237,9 +229,9 @@ where
|
|||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
match server_type
|
||||
.route_write_http_request(req)
|
||||
.route_dml_http_request(req)
|
||||
.await
|
||||
.context(WriteError)?
|
||||
.context(DmlError)?
|
||||
{
|
||||
RequestOrResponse::Response(resp) => Ok(resp),
|
||||
RequestOrResponse::Request(req) => {
|
||||
|
@ -247,7 +239,6 @@ where
|
|||
let uri = req.uri().clone();
|
||||
|
||||
match (method.clone(), uri.path()) {
|
||||
(Method::POST, "/api/v2/delete") => delete(req, server_type).await,
|
||||
(Method::GET, "/api/v3/query") => query(req, server_type).await,
|
||||
|
||||
(method, path) => Err(ApplicationError::RouteNotFound {
|
||||
|
@ -259,73 +250,6 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
async fn delete<M>(
|
||||
req: Request<Body>,
|
||||
server_type: &DatabaseServerType<M>,
|
||||
) -> Result<Response<Body>, ApplicationError>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
let DatabaseServerType {
|
||||
server,
|
||||
max_request_size,
|
||||
..
|
||||
} = server_type;
|
||||
let max_request_size = *max_request_size;
|
||||
let server = Arc::clone(server);
|
||||
|
||||
// Extract the DB name from the request
|
||||
// db_name = orrID_bucketID
|
||||
let query = req.uri().query().context(ExpectedQueryString)?;
|
||||
let delete_info: WriteInfo = serde_urlencoded::from_str(query).context(InvalidQueryString {
|
||||
query_string: String::from(query),
|
||||
})?;
|
||||
let db_name = org_and_bucket_to_database(&delete_info.org, &delete_info.bucket)
|
||||
.context(BucketMappingError)?;
|
||||
|
||||
// Parse body
|
||||
let body = parse_body(req, max_request_size).await.context(ParseBody)?;
|
||||
let body = str::from_utf8(&body).context(ReadingBodyAsUtf8)?;
|
||||
|
||||
// Parse and extract table name (which can be empty), start, stop, and predicate
|
||||
let parsed_delete = parse_http_delete_request(body).context(ParsingDelete { input: body })?;
|
||||
|
||||
let table_name = parsed_delete.table_name;
|
||||
let predicate = parsed_delete.predicate;
|
||||
let start = parsed_delete.start_time;
|
||||
let stop = parsed_delete.stop_time;
|
||||
debug!(%table_name, %predicate, %start, %stop, body_size=body.len(), %db_name, org=%delete_info.org, bucket=%delete_info.bucket, "delete data from database");
|
||||
|
||||
// Validate that the database name is legit
|
||||
let db = server.db(&db_name)?;
|
||||
|
||||
// Build delete predicate
|
||||
let del_predicate = parse_delete_predicate(&start, &stop, &predicate)
|
||||
.context(BuildingDeletePredicate { input: body })?;
|
||||
|
||||
// Tables data will be deleted from
|
||||
// Note for developer: this the only place we support INFLUX DELETE that deletes
|
||||
// data from many tables in one command. If you want to use general delete API to
|
||||
// delete data from a specified table, use the one in the management API (src/influxdb_ioxd/rpc/management.rs) instead
|
||||
let mut tables = vec![];
|
||||
if table_name.is_empty() {
|
||||
tables = db.table_names();
|
||||
} else {
|
||||
tables.push(table_name);
|
||||
}
|
||||
|
||||
// Execute delete
|
||||
for table in tables {
|
||||
db.delete(&table, Arc::new(del_predicate.clone()))
|
||||
.context(ExecutingDelete { input: body })?;
|
||||
}
|
||||
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(Body::empty())
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, PartialEq)]
|
||||
/// Parsed URI Parameters of the request to the .../query endpoint
|
||||
struct QueryParams {
|
||||
|
@ -390,14 +314,15 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|||
mod tests {
|
||||
use crate::influxdb_ioxd::{
|
||||
http::{
|
||||
dml::test_utils::{
|
||||
assert_delete_bad_request, assert_delete_unknown_database,
|
||||
assert_delete_unknown_table, assert_gzip_write, assert_write, assert_write_metrics,
|
||||
assert_write_to_invalid_database,
|
||||
},
|
||||
test_utils::{
|
||||
assert_health, assert_metrics, assert_tracing, check_response, get_content_type,
|
||||
TestServer,
|
||||
},
|
||||
write::test_utils::{
|
||||
assert_gzip_write, assert_write, assert_write_metrics,
|
||||
assert_write_to_invalid_database,
|
||||
},
|
||||
},
|
||||
server_type::common_state::CommonServerState,
|
||||
};
|
||||
|
@ -407,6 +332,8 @@ mod tests {
|
|||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::assert_batches_eq;
|
||||
use dml::DmlWrite;
|
||||
use http::StatusCode;
|
||||
use reqwest::Client;
|
||||
|
||||
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
|
||||
|
@ -581,48 +508,21 @@ mod tests {
|
|||
"+----------------+--------------+-------+-----------------+----------------------+",
|
||||
];
|
||||
assert_batches_eq!(expected, &batches);
|
||||
}
|
||||
|
||||
// -------------------
|
||||
// negative tests
|
||||
// Not able to parse _measurement="not_a_table" (it must be _measurement=\"not_a_table\" to work)
|
||||
let delete_line = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement="not_a_table" and location=Boston"}"#;
|
||||
let response = client
|
||||
.post(&format!(
|
||||
"{}/api/v2/delete?bucket={}&org={}",
|
||||
test_server.url(),
|
||||
bucket_name,
|
||||
org_name
|
||||
))
|
||||
.body(delete_line)
|
||||
.send()
|
||||
.await;
|
||||
check_response(
|
||||
"delete",
|
||||
response,
|
||||
StatusCode::BAD_REQUEST,
|
||||
Some("Unable to parse delete string"),
|
||||
)
|
||||
.await;
|
||||
#[tokio::test]
|
||||
async fn test_delete_unknown_database() {
|
||||
assert_delete_unknown_database(setup_server().await).await;
|
||||
}
|
||||
|
||||
// delete from non-existing table
|
||||
let delete_line = r#"{"start":"2021-04-01T14:00:00Z","stop":"2021-04-02T14:00:00Z", "predicate":"_measurement=not_a_table and location=Boston"}"#;
|
||||
let response = client
|
||||
.post(&format!(
|
||||
"{}/api/v2/delete?bucket={}&org={}",
|
||||
test_server.url(),
|
||||
bucket_name,
|
||||
org_name
|
||||
))
|
||||
.body(delete_line)
|
||||
.send()
|
||||
.await;
|
||||
check_response(
|
||||
"delete",
|
||||
response,
|
||||
StatusCode::BAD_REQUEST,
|
||||
Some("Cannot delete data from non-existing table"),
|
||||
)
|
||||
.await;
|
||||
#[tokio::test]
|
||||
async fn test_delete_unknown_table() {
|
||||
assert_delete_unknown_table(setup_server().await).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_bad_request() {
|
||||
assert_delete_bad_request(setup_server().await).await;
|
||||
}
|
||||
|
||||
/// Sets up a test database with some data for testing the query endpoint
|
||||
|
|
|
@ -18,7 +18,7 @@ use generated_types::{
|
|||
ReadWindowAggregateRequest, StringValuesResponse, TagKeyMetaNames, TagKeysRequest,
|
||||
TagValuesRequest, TimestampRange,
|
||||
};
|
||||
use observability_deps::tracing::{error, info};
|
||||
use observability_deps::tracing::{error, info, trace};
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use query::exec::{
|
||||
fieldlist::FieldList, seriesset::converter::Error as SeriesSetError, ExecutionContextProvider,
|
||||
|
@ -735,6 +735,7 @@ where
|
|||
.map(|name| name.bytes().collect())
|
||||
.collect();
|
||||
|
||||
trace!(measurement_names=?values.iter().map(|k| String::from_utf8_lossy(k)).collect::<Vec<_>>(), "Measurement names response");
|
||||
Ok(StringValuesResponse { values })
|
||||
}
|
||||
|
||||
|
@ -786,10 +787,8 @@ where
|
|||
|
||||
// Map the resulting collection of Strings into a Vec<Vec<u8>>for return
|
||||
let values = tag_keys_to_byte_vecs(tag_keys);
|
||||
// Debugging help: uncomment this out to see what is coming back
|
||||
// info!("Returning tag keys");
|
||||
// values.iter().for_each(|k| info!(" {}", String::from_utf8_lossy(k)));
|
||||
|
||||
trace!(tag_keys=?values.iter().map(|k| String::from_utf8_lossy(k)).collect::<Vec<_>>(), "Tag keys response");
|
||||
Ok(StringValuesResponse { values })
|
||||
}
|
||||
|
||||
|
@ -842,10 +841,7 @@ where
|
|||
.map(|name| name.bytes().collect())
|
||||
.collect();
|
||||
|
||||
// Debugging help: uncomment to see raw values coming back
|
||||
//info!("Returning tag values");
|
||||
//values.iter().for_each(|k| info!(" {}", String::from_utf8_lossy(k)));
|
||||
|
||||
trace!(tag_values=?values.iter().map(|k| String::from_utf8_lossy(k)).collect::<Vec<_>>(), "Tag values response");
|
||||
Ok(StringValuesResponse { values })
|
||||
}
|
||||
|
||||
|
@ -1004,6 +1000,7 @@ where
|
|||
.map_err(|e| Box::new(e) as _)
|
||||
.context(ListingFields { db_name })?;
|
||||
|
||||
trace!(field_names=?field_list, "Field names response");
|
||||
Ok(field_list)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,14 +2,14 @@ use std::sync::Arc;
|
|||
|
||||
use async_trait::async_trait;
|
||||
use data_types::DatabaseName;
|
||||
use dml::{DmlOperation, DmlWrite};
|
||||
use dml::DmlOperation;
|
||||
use hyper::{Body, Method, Request, Response};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use crate::influxdb_ioxd::http::{
|
||||
dml::{HttpDrivenDml, InnerDmlError, RequestOrResponse},
|
||||
error::{HttpApiError, HttpApiErrorExt, HttpApiErrorSource},
|
||||
metrics::LineProtocolMetrics,
|
||||
write::{HttpDrivenWrite, InnerWriteError, RequestOrResponse},
|
||||
};
|
||||
|
||||
use super::RouterServerType;
|
||||
|
@ -21,7 +21,7 @@ pub enum ApplicationError {
|
|||
|
||||
#[snafu(display("Cannot write data: {}", source))]
|
||||
WriteError {
|
||||
source: crate::influxdb_ioxd::http::write::HttpWriteError,
|
||||
source: crate::influxdb_ioxd::http::dml::HttpDmlError,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -35,7 +35,7 @@ impl HttpApiErrorSource for ApplicationError {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HttpDrivenWrite for RouterServerType {
|
||||
impl HttpDrivenDml for RouterServerType {
|
||||
fn max_request_size(&self) -> usize {
|
||||
self.max_request_size
|
||||
}
|
||||
|
@ -47,15 +47,17 @@ impl HttpDrivenWrite for RouterServerType {
|
|||
async fn write(
|
||||
&self,
|
||||
db_name: &DatabaseName<'_>,
|
||||
write: DmlWrite,
|
||||
) -> Result<(), InnerWriteError> {
|
||||
op: DmlOperation,
|
||||
) -> Result<(), InnerDmlError> {
|
||||
match self.server.router(db_name) {
|
||||
Some(router) => router.write(DmlOperation::Write(write)).await.map_err(|e| {
|
||||
InnerWriteError::OtherError {
|
||||
Some(router) => router
|
||||
.write(op)
|
||||
.await
|
||||
.map_err(|e| InnerDmlError::InternalError {
|
||||
db_name: db_name.to_string(),
|
||||
source: Box::new(e),
|
||||
}
|
||||
}),
|
||||
None => Err(InnerWriteError::NotFound {
|
||||
}),
|
||||
None => Err(InnerDmlError::DatabaseNotFound {
|
||||
db_name: db_name.to_string(),
|
||||
}),
|
||||
}
|
||||
|
@ -68,7 +70,7 @@ pub async fn route_request(
|
|||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApplicationError> {
|
||||
match server_type
|
||||
.route_write_http_request(req)
|
||||
.route_dml_http_request(req)
|
||||
.await
|
||||
.context(WriteError)?
|
||||
{
|
||||
|
@ -84,18 +86,26 @@ pub async fn route_request(
|
|||
mod tests {
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use data_types::server_id::ServerId;
|
||||
use dml::DmlOperation;
|
||||
use data_types::{
|
||||
delete_predicate::{DeleteExpr, DeletePredicate},
|
||||
server_id::ServerId,
|
||||
timestamp::TimestampRange,
|
||||
};
|
||||
use dml::{DmlDelete, DmlMeta, DmlOperation};
|
||||
use http::StatusCode;
|
||||
use reqwest::Client;
|
||||
use router::{grpc_client::MockClient, resolver::RemoteTemplate, server::RouterServer};
|
||||
use time::SystemProvider;
|
||||
use trace::RingBufferTraceCollector;
|
||||
|
||||
use crate::influxdb_ioxd::{
|
||||
http::{
|
||||
test_utils::{assert_health, assert_metrics, assert_tracing, TestServer},
|
||||
write::test_utils::{
|
||||
assert_gzip_write, assert_write, assert_write_metrics,
|
||||
assert_write_to_invalid_database,
|
||||
dml::test_utils::{
|
||||
assert_delete_bad_request, assert_delete_unknown_database, assert_gzip_write,
|
||||
assert_write, assert_write_metrics, assert_write_to_invalid_database,
|
||||
},
|
||||
test_utils::{
|
||||
assert_health, assert_metrics, assert_tracing, check_response, TestServer,
|
||||
},
|
||||
},
|
||||
server_type::common_state::CommonServerState,
|
||||
|
@ -142,6 +152,52 @@ mod tests {
|
|||
assert_write_to_invalid_database(test_server().await).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete() {
|
||||
// Set up server
|
||||
let test_server = test_server().await;
|
||||
|
||||
// Set up client
|
||||
let client = Client::new();
|
||||
let bucket_name = "MyBucket";
|
||||
let org_name = "MyOrg";
|
||||
|
||||
// Client requests to delete data
|
||||
let delete_line = r#"{"start":"1","stop":"2", "predicate":"foo=1"}"#;
|
||||
let response = client
|
||||
.post(&format!(
|
||||
"{}/api/v2/delete?bucket={}&org={}",
|
||||
test_server.url(),
|
||||
bucket_name,
|
||||
org_name
|
||||
))
|
||||
.body(delete_line)
|
||||
.send()
|
||||
.await;
|
||||
check_response("delete", response, StatusCode::NO_CONTENT, Some("")).await;
|
||||
|
||||
let predicate = DeletePredicate {
|
||||
range: TimestampRange { start: 1, end: 2 },
|
||||
exprs: vec![DeleteExpr {
|
||||
column: String::from("foo"),
|
||||
op: data_types::delete_predicate::Op::Eq,
|
||||
scalar: data_types::delete_predicate::Scalar::I64(1),
|
||||
}],
|
||||
};
|
||||
let delete = DmlDelete::new(predicate, None, DmlMeta::unsequenced(None));
|
||||
assert_dbwrite(test_server, DmlOperation::Delete(delete)).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_unknown_database() {
|
||||
assert_delete_unknown_database(test_server().await).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_bad_request() {
|
||||
assert_delete_bad_request(test_server().await).await;
|
||||
}
|
||||
|
||||
async fn test_server() -> TestServer<RouterServerType> {
|
||||
use data_types::router::{
|
||||
Matcher, MatcherToShard, Router, ShardConfig, ShardId, WriteSink, WriteSinkSet,
|
||||
|
|
|
@ -22,6 +22,7 @@ prost = "0.8"
|
|||
rdkafka = "0.27.0"
|
||||
time = { path = "../time" }
|
||||
tokio = { version = "1.13", features = ["macros", "fs"] }
|
||||
tokio-util = "0.6.9"
|
||||
trace = { path = "../trace" }
|
||||
trace_http = { path = "../trace_http" }
|
||||
uuid = { version = "0.8", features = ["serde", "v4"] }
|
||||
|
|
|
@ -285,7 +285,9 @@ pub mod test_utils {
|
|||
let (_sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap();
|
||||
assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w1);
|
||||
|
||||
// re-creating stream after reading remembers offset
|
||||
// re-creating stream after reading remembers offset, but wait a bit to provoke the stream to buffer some
|
||||
// entries
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
drop(stream);
|
||||
drop(streams);
|
||||
let mut streams = reader.streams();
|
||||
|
|
|
@ -123,10 +123,10 @@ use crate::codec::{ContentType, IoxHeaders};
|
|||
use async_trait::async_trait;
|
||||
use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig};
|
||||
use dml::{DmlMeta, DmlOperation};
|
||||
use futures::{channel::mpsc::Receiver, FutureExt, SinkExt, Stream, StreamExt};
|
||||
use pin_project::{pin_project, pinned_drop};
|
||||
use futures::{FutureExt, Stream, StreamExt};
|
||||
use pin_project::pin_project;
|
||||
use time::{Time, TimeProvider};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::ReusableBoxFuture;
|
||||
use trace::TraceCollector;
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -343,11 +343,13 @@ impl WriteBufferReading for FileBufferConsumer {
|
|||
}
|
||||
}
|
||||
|
||||
#[pin_project(PinnedDrop)]
|
||||
#[pin_project]
|
||||
struct ConsumerStream {
|
||||
join_handle: JoinHandle<()>,
|
||||
#[pin]
|
||||
rx: Receiver<Result<DmlOperation, WriteBufferError>>,
|
||||
fut: ReusableBoxFuture<Result<DmlOperation, WriteBufferError>>,
|
||||
sequencer_id: u32,
|
||||
path: PathBuf,
|
||||
next_sequence_number: Arc<AtomicU64>,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
}
|
||||
|
||||
impl ConsumerStream {
|
||||
|
@ -357,49 +359,71 @@ impl ConsumerStream {
|
|||
next_sequence_number: Arc<AtomicU64>,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
) -> Self {
|
||||
let (mut tx, rx) = futures::channel::mpsc::channel(1);
|
||||
Self {
|
||||
fut: ReusableBoxFuture::new(Self::poll_next_inner(
|
||||
sequencer_id,
|
||||
path.clone(),
|
||||
Arc::clone(&next_sequence_number),
|
||||
trace_collector.clone(),
|
||||
)),
|
||||
sequencer_id,
|
||||
path,
|
||||
next_sequence_number,
|
||||
trace_collector,
|
||||
}
|
||||
}
|
||||
|
||||
let join_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
let sequence_number = next_sequence_number.load(Ordering::SeqCst);
|
||||
async fn poll_next_inner(
|
||||
sequencer_id: u32,
|
||||
path: PathBuf,
|
||||
next_sequence_number: Arc<AtomicU64>,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
) -> Result<DmlOperation, WriteBufferError> {
|
||||
loop {
|
||||
let sequence_number = next_sequence_number.load(Ordering::SeqCst);
|
||||
|
||||
// read file
|
||||
let file_path = path.join(sequence_number.to_string());
|
||||
let msg = match tokio::fs::read(&file_path).await {
|
||||
Ok(data) => {
|
||||
// decode file
|
||||
let sequence = Sequence {
|
||||
id: sequencer_id,
|
||||
number: sequence_number,
|
||||
};
|
||||
match Self::decode_file(data, sequence, trace_collector.clone()) {
|
||||
Ok(write) => {
|
||||
match next_sequence_number.compare_exchange(
|
||||
sequence_number,
|
||||
sequence_number + 1,
|
||||
Ordering::SeqCst,
|
||||
Ordering::SeqCst,
|
||||
) {
|
||||
Ok(_) => {
|
||||
// can send to output
|
||||
Ok(write)
|
||||
}
|
||||
Err(_) => {
|
||||
// interleaving change, retry
|
||||
continue;
|
||||
}
|
||||
// read file
|
||||
let file_path = path.join(sequence_number.to_string());
|
||||
let msg = match tokio::fs::read(&file_path).await {
|
||||
Ok(data) => {
|
||||
// decode file
|
||||
let sequence = Sequence {
|
||||
id: sequencer_id,
|
||||
number: sequence_number,
|
||||
};
|
||||
match Self::decode_file(data, sequence, trace_collector.clone()) {
|
||||
Ok(write) => {
|
||||
match next_sequence_number.compare_exchange(
|
||||
sequence_number,
|
||||
sequence_number + 1,
|
||||
Ordering::SeqCst,
|
||||
Ordering::SeqCst,
|
||||
) {
|
||||
Ok(_) => {
|
||||
// can send to output
|
||||
Ok(write)
|
||||
}
|
||||
Err(_) => {
|
||||
// interleaving change, retry
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
Err(error) => {
|
||||
match error.kind() {
|
||||
std::io::ErrorKind::NotFound => {
|
||||
// figure out watermark and see if there's a gap in the stream
|
||||
if let Ok(watermark) = watermark(&path).await {
|
||||
// watermark is "last sequence number + 1", so substract 1 before comparing
|
||||
if watermark.saturating_sub(1) > sequence_number {
|
||||
}
|
||||
Err(error) => {
|
||||
match error.kind() {
|
||||
std::io::ErrorKind::NotFound => {
|
||||
// figure out watermark and see if there's a gap in the stream
|
||||
if let Ok(watermark) = watermark(&path).await {
|
||||
// watermark is "last sequence number + 1", so substract 1 before comparing
|
||||
if watermark.saturating_sub(1) > sequence_number {
|
||||
// while generating the watermark, a writer might have created the file that we've
|
||||
// tried to read, so we need to double-check
|
||||
if let Err(std::io::ErrorKind::NotFound) =
|
||||
tokio::fs::metadata(&file_path).await.map_err(|e| e.kind())
|
||||
{
|
||||
// update position
|
||||
// failures are OK here since we'll re-read this value next round
|
||||
next_sequence_number
|
||||
|
@ -412,28 +436,23 @@ impl ConsumerStream {
|
|||
.ok();
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
// no gap detected, just wait a bit for new data
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
_ => {
|
||||
// cannot read file => communicate to user
|
||||
Err(Box::new(error) as WriteBufferError)
|
||||
}
|
||||
// no gap detected, just wait a bit for new data
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
_ => {
|
||||
// cannot read file => communicate to user
|
||||
Err(Box::new(error) as WriteBufferError)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if tx.send(msg).await.is_err() {
|
||||
// Receiver is gone
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
Self { join_handle, rx }
|
||||
return msg;
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_file(
|
||||
|
@ -476,13 +495,6 @@ impl ConsumerStream {
|
|||
}
|
||||
}
|
||||
|
||||
#[pinned_drop]
|
||||
impl PinnedDrop for ConsumerStream {
|
||||
fn drop(self: Pin<&mut Self>) {
|
||||
self.join_handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for ConsumerStream {
|
||||
type Item = Result<DmlOperation, WriteBufferError>;
|
||||
|
||||
|
@ -491,7 +503,19 @@ impl Stream for ConsumerStream {
|
|||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
this.rx.poll_next(cx)
|
||||
|
||||
match this.fut.poll(cx) {
|
||||
std::task::Poll::Ready(res) => {
|
||||
this.fut.set(Self::poll_next_inner(
|
||||
*this.sequencer_id,
|
||||
this.path.clone(),
|
||||
Arc::clone(this.next_sequence_number),
|
||||
this.trace_collector.clone(),
|
||||
));
|
||||
std::task::Poll::Ready(Some(res))
|
||||
}
|
||||
std::task::Poll::Pending => std::task::Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue