Merge branch 'main' into crepererum/issue2575

pull/24376/head
kodiakhq[bot] 2021-09-21 15:25:40 +00:00 committed by GitHub
commit 0a52822eab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 69 additions and 6 deletions

View File

@ -13,6 +13,10 @@ pub enum WriteError {
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
/// Server returned an invalid argument error
#[error("Invalid argument: {}: {}", .0.code(), .0.message())]
InvalidArgument(tonic::Status),
}
/// An IOx Write API client.
@ -70,7 +74,7 @@ impl Client {
.inner
.write(write::WriteRequest { db_name, lp_data })
.await
.map_err(WriteError::ServerError)?;
.map_err(Self::map_err)?;
Ok(response.into_inner().lines_written as usize)
}
@ -91,7 +95,7 @@ impl Client {
self.inner
.write_entry(write::WriteEntryRequest { db_name, entry })
.await
.map_err(WriteError::ServerError)?;
.map_err(Self::map_err)?;
Ok(())
}
@ -104,8 +108,15 @@ impl Client {
self.inner_pb
.write(write_request)
.await
.map_err(WriteError::ServerError)?;
.map_err(Self::map_err)?;
Ok(())
}
fn map_err(status: tonic::Status) -> WriteError {
match status.code() {
tonic::Code::InvalidArgument => WriteError::InvalidArgument(status),
_ => WriteError::ServerError(status),
}
}
}

View File

@ -117,6 +117,12 @@ pub enum WriteError {
#[snafu(display("Hard buffer size limit reached"))]
HardLimitReached {},
#[snafu(display(
"Storing sequenced entry failed with the following error(s), and possibly more: {}",
errors.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
))]
StoreSequencedEntryFailures { errors: Vec<super::db::Error> },
}
type BackgroundWorkerFuture = Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>;
@ -576,6 +582,9 @@ impl Database {
// TODO: Pull write buffer producer out of Db
Error::WriteBufferWritingError { source } => WriteError::WriteBuffer { source },
Error::HardLimitReached {} => WriteError::HardLimitReached {},
Error::StoreSequencedEntryFailures { errors } => {
WriteError::StoreSequencedEntryFailures { errors }
}
e => e.into(),
}
})?;

View File

@ -201,6 +201,12 @@ pub enum Error {
#[snafu(display("hard buffer limit reached"))]
HardLimitReached {},
#[snafu(display(
"Storing sequenced entry failed with the following error(s), and possibly more: {}",
errors.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
))]
StoreSequencedEntryFailures { errors: Vec<DatabaseError> },
#[snafu(display(
"Cannot write to database {}, it's configured to only read from the write buffer",
db_name
@ -915,6 +921,11 @@ where
source: Box::new(source),
},
WriteError::HardLimitReached { .. } => Error::HardLimitReached {},
WriteError::StoreSequencedEntryFailures { errors } => {
Error::StoreSequencedEntryFailures {
errors: errors.into_iter().map(|e| Box::new(e) as _).collect(),
}
}
})
}

View File

@ -67,6 +67,9 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
Error::DatabaseInit { source } => {
tonic::Status::invalid_argument(format!("Cannot initialize database: {}", source))
}
e @ Error::StoreSequencedEntryFailures { .. } => {
tonic::Status::invalid_argument(e.to_string())
}
error => {
error!(?error, "Unexpected error");
InternalError {}.into()

View File

@ -59,7 +59,7 @@ async fn test_write() {
err.to_string(),
r#"Client specified an invalid argument: Violation for field "lp_data": Invalid Line Protocol: A generic parsing error occurred"#
);
assert!(matches!(dbg!(err), WriteError::ServerError(_)));
assert!(matches!(dbg!(err), WriteError::InvalidArgument(_)));
// ---- test non existent database ----
let err = write_client
@ -87,8 +87,11 @@ async fn test_write() {
}
assert!(maybe_err.is_some());
let err = maybe_err.unwrap();
let WriteError::ServerError(status) = dbg!(err);
assert_eq!(status.code(), tonic::Code::ResourceExhausted);
if let WriteError::ServerError(status) = dbg!(&err) {
assert_eq!(status.code(), tonic::Code::ResourceExhausted);
} else {
panic!("Expected ServerError, got {}", err);
}
// IMPORTANT: At this point, the database is flooded and pretty much
// useless. Don't append any tests after the "hard limit" test!
@ -699,3 +702,29 @@ async fn test_write_routed_no_shard() {
.to_string()
.contains("Table or CTE with name 'disk' not found\""));
}
#[tokio::test]
async fn test_write_schema_mismatch() {
// regression test for https://github.com/influxdata/influxdb_iox/issues/2538
let fixture = ServerFixture::create_shared().await;
let mut write_client = fixture.write_client();
let db_name = rand_name();
create_readable_database(&db_name, fixture.grpc_channel()).await;
write_client
.write(&db_name, "table field=1i 10")
.await
.expect("cannot write");
let err = write_client
.write(&db_name, "table field=1.1 10")
.await
.unwrap_err();
assert_contains!(err.to_string(), "Table batch has mismatching schema");
if let WriteError::InvalidArgument(status) = &err {
assert_eq!(status.code(), tonic::Code::InvalidArgument);
} else {
panic!("Expected InvalidArgument, got {}", err);
}
}