From fb7299a169c092ffcd6e4fd24a67d13d9c6e2037 Mon Sep 17 00:00:00 2001
From: Marco Neumann <marco@crepererum.net>
Date: Tue, 21 Sep 2021 15:08:37 +0000
Subject: [PATCH] fix: bubble up write errors (#2598)

Fixes #2538.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
---
 influxdb_iox_client/src/client/write.rs | 17 +++++++++---
 server/src/database.rs                  |  9 +++++++
 server/src/lib.rs                       | 11 ++++++++
 src/influxdb_ioxd/rpc/error.rs          |  3 +++
 tests/end_to_end_cases/write_api.rs     | 35 ++++++++++++++++++++++---
 5 files changed, 69 insertions(+), 6 deletions(-)

diff --git a/influxdb_iox_client/src/client/write.rs b/influxdb_iox_client/src/client/write.rs
index a912896df2..660fbc76be 100644
--- a/influxdb_iox_client/src/client/write.rs
+++ b/influxdb_iox_client/src/client/write.rs
@@ -13,6 +13,10 @@ pub enum WriteError {
     /// Client received an unexpected error from the server
     #[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
     ServerError(tonic::Status),
+
+    /// Server returned an invalid argument error
+    #[error("Invalid argument: {}: {}", .0.code(), .0.message())]
+    InvalidArgument(tonic::Status),
 }
 
 /// An IOx Write API client.
@@ -70,7 +74,7 @@ impl Client {
             .inner
             .write(write::WriteRequest { db_name, lp_data })
             .await
-            .map_err(WriteError::ServerError)?;
+            .map_err(Self::map_err)?;
 
         Ok(response.into_inner().lines_written as usize)
     }
@@ -91,7 +95,7 @@ impl Client {
         self.inner
             .write_entry(write::WriteEntryRequest { db_name, entry })
             .await
-            .map_err(WriteError::ServerError)?;
+            .map_err(Self::map_err)?;
 
         Ok(())
     }
@@ -104,8 +108,15 @@ impl Client {
         self.inner_pb
             .write(write_request)
             .await
-            .map_err(WriteError::ServerError)?;
+            .map_err(Self::map_err)?;
 
         Ok(())
     }
+
+    fn map_err(status: tonic::Status) -> WriteError {
+        match status.code() {
+            tonic::Code::InvalidArgument => WriteError::InvalidArgument(status),
+            _ => WriteError::ServerError(status),
+        }
+    }
 }
diff --git a/server/src/database.rs b/server/src/database.rs
index 9dc1891a25..b8603fec08 100644
--- a/server/src/database.rs
+++ b/server/src/database.rs
@@ -117,6 +117,12 @@ pub enum WriteError {
 
     #[snafu(display("Hard buffer size limit reached"))]
     HardLimitReached {},
+
+    #[snafu(display(
+        "Storing sequenced entry failed with the following error(s), and possibly more: {}",
+        errors.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
+    ))]
+    StoreSequencedEntryFailures { errors: Vec<super::db::Error> },
 }
 
 type BackgroundWorkerFuture = Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>;
@@ -576,6 +582,9 @@ impl Database {
                 // TODO: Pull write buffer producer out of Db
                 Error::WriteBufferWritingError { source } => WriteError::WriteBuffer { source },
                 Error::HardLimitReached {} => WriteError::HardLimitReached {},
+                Error::StoreSequencedEntryFailures { errors } => {
+                    WriteError::StoreSequencedEntryFailures { errors }
+                }
                 e => e.into(),
             }
         })?;
diff --git a/server/src/lib.rs b/server/src/lib.rs
index 18e9ea19d1..7cc6c8e045 100644
--- a/server/src/lib.rs
+++ b/server/src/lib.rs
@@ -201,6 +201,12 @@ pub enum Error {
     #[snafu(display("hard buffer limit reached"))]
     HardLimitReached {},
 
+    #[snafu(display(
+        "Storing sequenced entry failed with the following error(s), and possibly more: {}",
+        errors.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
+    ))]
+    StoreSequencedEntryFailures { errors: Vec<DatabaseError> },
+
     #[snafu(display(
         "Cannot write to database {}, it's configured to only read from the write buffer",
         db_name
@@ -915,6 +921,11 @@ where
                     source: Box::new(source),
                 },
                 WriteError::HardLimitReached { .. } => Error::HardLimitReached {},
+                WriteError::StoreSequencedEntryFailures { errors } => {
+                    Error::StoreSequencedEntryFailures {
+                        errors: errors.into_iter().map(|e| Box::new(e) as _).collect(),
+                    }
+                }
             })
     }
 
diff --git a/src/influxdb_ioxd/rpc/error.rs b/src/influxdb_ioxd/rpc/error.rs
index f21c41e3e6..442c2885fb 100644
--- a/src/influxdb_ioxd/rpc/error.rs
+++ b/src/influxdb_ioxd/rpc/error.rs
@@ -67,6 +67,9 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
         Error::DatabaseInit { source } => {
             tonic::Status::invalid_argument(format!("Cannot initialize database: {}", source))
         }
+        e @ Error::StoreSequencedEntryFailures { .. } => {
+            tonic::Status::invalid_argument(e.to_string())
+        }
         error => {
             error!(?error, "Unexpected error");
             InternalError {}.into()
diff --git a/tests/end_to_end_cases/write_api.rs b/tests/end_to_end_cases/write_api.rs
index 1ea55b39bc..d2db41306f 100644
--- a/tests/end_to_end_cases/write_api.rs
+++ b/tests/end_to_end_cases/write_api.rs
@@ -59,7 +59,7 @@ async fn test_write() {
         err.to_string(),
         r#"Client specified an invalid argument: Violation for field "lp_data": Invalid Line Protocol: A generic parsing error occurred"#
     );
-    assert!(matches!(dbg!(err), WriteError::ServerError(_)));
+    assert!(matches!(dbg!(err), WriteError::InvalidArgument(_)));
 
     // ---- test non existent database ----
     let err = write_client
@@ -87,8 +87,11 @@ async fn test_write() {
     }
     assert!(maybe_err.is_some());
     let err = maybe_err.unwrap();
-    let WriteError::ServerError(status) = dbg!(err);
-    assert_eq!(status.code(), tonic::Code::ResourceExhausted);
+    if let WriteError::ServerError(status) = dbg!(&err) {
+        assert_eq!(status.code(), tonic::Code::ResourceExhausted);
+    } else {
+        panic!("Expected ServerError, got {}", err);
+    }
 
     // IMPORTANT: At this point, the database is flooded and pretty much
     // useless. Don't append any tests after the "hard limit" test!
@@ -699,3 +702,29 @@ async fn test_write_routed_no_shard() {
         .to_string()
         .contains("Table or CTE with name 'disk' not found\""));
 }
+
+#[tokio::test]
+async fn test_write_schema_mismatch() {
+    // regression test for https://github.com/influxdata/influxdb_iox/issues/2538
+    let fixture = ServerFixture::create_shared().await;
+    let mut write_client = fixture.write_client();
+
+    let db_name = rand_name();
+    create_readable_database(&db_name, fixture.grpc_channel()).await;
+
+    write_client
+        .write(&db_name, "table field=1i 10")
+        .await
+        .expect("cannot write");
+
+    let err = write_client
+        .write(&db_name, "table field=1.1 10")
+        .await
+        .unwrap_err();
+    assert_contains!(err.to_string(), "Table batch has mismatching schema");
+    if let WriteError::InvalidArgument(status) = &err {
+        assert_eq!(status.code(), tonic::Code::InvalidArgument);
+    } else {
+        panic!("Expected InvalidArgument, got {}", err);
+    }
+}