From d28749bd9395141c70933deabbd48afbaeba92d3 Mon Sep 17 00:00:00 2001
From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
Date: Thu, 4 Nov 2021 13:28:47 +0000
Subject: [PATCH] refactor: remove Entry from Db interfaces (#2724) (#3027)

---
 .../server_type/database/http.rs              |  24 ----
 server/src/database.rs                        |  24 +---
 server/src/database/metrics.rs                |  58 ---------
 server/src/db.rs                              | 117 +++++++-----------
 server/src/lib.rs                             |  28 ++---
 5 files changed, 61 insertions(+), 190 deletions(-)
 delete mode 100644 server/src/database/metrics.rs

diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs
index 0688ce6356..eafe65c65b 100644
--- a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs
+++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs
@@ -923,32 +923,10 @@ mod tests {
             .unwrap()
             .clone();
 
-        let entry_ingest = metric_registry
-            .get_instrument::<Metric<U64Counter>>("ingest_entries_bytes")
-            .unwrap();
-
-        let entry_ingest_ok = entry_ingest
-            .get_observer(&Attributes::from(&[
-                ("db_name", "MyOrg_MyBucket"),
-                ("status", "ok"),
-            ]))
-            .unwrap()
-            .clone();
-
-        let entry_ingest_error = entry_ingest
-            .get_observer(&Attributes::from(&[
-                ("db_name", "MyOrg_MyBucket"),
-                ("status", "error"),
-            ]))
-            .unwrap()
-            .clone();
-
         assert_eq!(request_duration_ok.fetch().sample_count(), 1);
         assert_eq!(request_count_ok.fetch(), 1);
         assert_eq!(request_count_client_error.fetch(), 0);
         assert_eq!(request_count_server_error.fetch(), 0);
-        assert_ne!(entry_ingest_ok.fetch(), 0);
-        assert_eq!(entry_ingest_error.fetch(), 0);
 
         // A single successful point landed
         let ingest_lines = metric_registry
@@ -1051,8 +1029,6 @@ mod tests {
         assert_eq!(request_count_ok.fetch(), 1);
         assert_eq!(request_count_client_error.fetch(), 0);
         assert_eq!(request_count_server_error.fetch(), 1);
-        assert_ne!(entry_ingest_ok.fetch(), 0);
-        assert_ne!(entry_ingest_error.fetch(), 0);
     }
 
     /// Sets up a test database with some data for testing the query endpoint
diff --git a/server/src/database.rs b/server/src/database.rs
index 900954a779..16fac956bb 100644
--- a/server/src/database.rs
+++ b/server/src/database.rs
@@ -18,6 +18,7 @@ use generated_types::{
 };
 use internal_types::freezable::Freezable;
 use iox_object_store::IoxObjectStore;
+use mutable_batch::DbWrite;
 use observability_deps::tracing::{error, info, warn};
 use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
 use parquet_catalog::core::PreservedCatalog;
@@ -26,13 +27,10 @@ use snafu::{ensure, OptionExt, ResultExt, Snafu};
 use std::{future::Future, sync::Arc, time::Duration};
 use tokio::{sync::Notify, task::JoinError};
 use tokio_util::sync::CancellationToken;
-use trace::ctx::SpanContext;
 use uuid::Uuid;
 
 const INIT_BACKOFF: Duration = Duration::from_secs(1);
 
-mod metrics;
-
 #[derive(Debug, Snafu)]
 pub enum Error {
     #[snafu(display(
@@ -162,16 +160,12 @@ impl Database {
             "new database"
         );
 
-        let metrics =
-            metrics::Metrics::new(application.metric_registry().as_ref(), config.name.as_str());
-
         let shared = Arc::new(DatabaseShared {
             config,
             application,
             shutdown: Default::default(),
             state: RwLock::new(Freezable::new(DatabaseState::Known(DatabaseStateKnown {}))),
             state_notify: Default::default(),
-            metrics,
         });
 
         let handle = tokio::spawn(background_worker(Arc::clone(&shared)));
@@ -589,18 +583,12 @@ impl Database {
         Ok(())
     }
 
-    /// Writes an entry to this `Database` this will either:
+    /// Writes a [`DbWrite`] to this `Database` this will either:
     ///
     /// - write it to a write buffer
     /// - write it to a local `Db`
     ///
-    pub async fn write_entry(
-        &self,
-        entry: entry::Entry,
-        span_ctx: Option<SpanContext>,
-    ) -> Result<(), WriteError> {
-        let recorder = self.shared.metrics.entry_ingest(entry.data().len());
-
+    pub async fn route_write(&self, write: &DbWrite) -> Result<(), WriteError> {
         let db = {
             let state = self.shared.state.read();
             match &**state {
@@ -617,7 +605,7 @@ impl Database {
             }
         };
 
-        db.store_entry(entry, span_ctx).await.map_err(|e| {
+        db.route_write(write).await.map_err(|e| {
             use super::db::Error;
             match e {
                 // TODO: Pull write buffer producer out of Db
@@ -628,7 +616,6 @@ impl Database {
             }
         })?;
 
-        recorder.success();
         Ok(())
     }
 }
@@ -666,9 +653,6 @@ struct DatabaseShared {
 
     /// Notify that the database state has changed
     state_notify: Notify,
-
-    /// Metrics for this database
-    metrics: metrics::Metrics,
 }
 
 /// The background worker for `Database` - there should only ever be one
diff --git a/server/src/database/metrics.rs b/server/src/database/metrics.rs
deleted file mode 100644
index cbb0fde303..0000000000
--- a/server/src/database/metrics.rs
+++ /dev/null
@@ -1,58 +0,0 @@
-use metric::U64Counter;
-
-#[derive(Debug)]
-pub struct Metrics {
-    ingest_entries_bytes_ok: U64Counter,
-
-    ingest_entries_bytes_error: U64Counter,
-}
-
-impl Metrics {
-    pub fn new(registry: &metric::Registry, db_name: impl Into<String>) -> Self {
-        let db_name = db_name.into();
-        let metric = registry
-            .register_metric::<U64Counter>("ingest_entries_bytes", "total ingested entry bytes");
-
-        Self {
-            ingest_entries_bytes_ok: metric
-                .recorder([("db_name", db_name.clone().into()), ("status", "ok".into())]),
-            ingest_entries_bytes_error: metric
-                .recorder([("db_name", db_name.into()), ("status", "error".into())]),
-        }
-    }
-
-    /// Get a recorder for reporting entry ingest
-    pub fn entry_ingest(&self, bytes: usize) -> EntryIngestRecorder<'_> {
-        EntryIngestRecorder {
-            metrics: self,
-            recorded: false,
-            bytes,
-        }
-    }
-}
-
-/// An RAII handle that records metrics for ingest
-///
-/// Records an error on drop unless `EntryIngestRecorder::success` invoked
-pub struct EntryIngestRecorder<'a> {
-    metrics: &'a Metrics,
-    bytes: usize,
-    recorded: bool,
-}
-
-impl<'a> EntryIngestRecorder<'a> {
-    pub fn success(mut self) {
-        self.recorded = true;
-        self.metrics.ingest_entries_bytes_ok.inc(self.bytes as u64)
-    }
-}
-
-impl<'a> Drop for EntryIngestRecorder<'a> {
-    fn drop(&mut self) {
-        if !self.recorded {
-            self.metrics
-                .ingest_entries_bytes_error
-                .inc(self.bytes as u64);
-        }
-    }
-}
diff --git a/server/src/db.rs b/server/src/db.rs
index a6ffcae1c6..57675d8c03 100644
--- a/server/src/db.rs
+++ b/server/src/db.rs
@@ -26,11 +26,8 @@ use data_types::{
     server_id::ServerId,
 };
 use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
-use entry::Entry;
 use iox_object_store::IoxObjectStore;
 use mutable_batch::payload::{DbWrite, PartitionWrite};
-use mutable_batch::WriteMeta;
-use mutable_batch_entry::entry_to_batches;
 use mutable_buffer::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
 use observability_deps::tracing::{debug, error, info, warn};
 use parquet_catalog::{
@@ -105,9 +102,6 @@ pub enum Error {
     #[snafu(display("Hard buffer size limit reached"))]
     HardLimitReached {},
 
-    #[snafu(display("Cannot convert entry to db write: {}", source))]
-    EntryConversion { source: mutable_batch_entry::Error },
-
     #[snafu(display(
         "Cannot delete data from non-existing table, {}: {}",
         table_name,
@@ -915,19 +909,16 @@ impl Db {
         Ok(())
     }
 
-    /// Stores an entry based on the configuration.
+    /// Stores the write on this [`Db`] and/or routes it to the write buffer
     ///
     /// TODO: Remove this method (#2243)
-    pub async fn store_entry(&self, entry: Entry, span_ctx: Option<SpanContext>) -> Result<()> {
+    pub async fn route_write(&self, write: &DbWrite) -> Result<()> {
         let immutable = {
             let rules = self.rules.read();
             rules.lifecycle_rules.immutable
         };
         debug!(%immutable, has_write_buffer_producer=self.write_buffer_producer.is_some(), "storing entry");
 
-        let tables = entry_to_batches(&entry).context(EntryConversion)?;
-        let mut write = DbWrite::new(tables, WriteMeta::new(None, None, span_ctx, None));
-
         match (self.write_buffer_producer.as_ref(), immutable) {
             (Some(write_buffer), true) => {
                 // If only the write buffer is configured, this is passing the data through to
@@ -936,7 +927,7 @@ impl Db {
 
                 // TODO: be smarter than always using sequencer 0
                 let _ = write_buffer
-                    .store_write(0, &write)
+                    .store_write(0, write)
                     .await
                     .context(WriteBufferWritingError)?;
 
@@ -947,12 +938,10 @@ impl Db {
                 // buffer to return success before adding the entry to the mutable buffer.
 
                 // TODO: be smarter than always using sequencer 0
-                let meta = write_buffer
-                    .store_write(0, &write)
+                write_buffer
+                    .store_write(0, write)
                     .await
                     .context(WriteBufferWritingError)?;
-
-                write.set_meta(meta);
             }
             (_, true) => {
                 // If not configured to send entries to the write buffer and the database is
@@ -966,7 +955,7 @@ impl Db {
             }
         };
 
-        self.store_write(&write)
+        self.store_write(write)
     }
 
     /// Writes the provided [`DbWrite`] to this database
@@ -1206,37 +1195,25 @@ pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData
 }
 
 pub mod test_helpers {
-    use std::collections::{BTreeSet, HashSet};
+    use std::collections::BTreeSet;
 
     use arrow::record_batch::RecordBatch;
     use data_types::chunk_metadata::ChunkStorage;
-    use entry::test_helpers::lp_to_entries;
+    use mutable_batch_lp::lines_to_batches;
     use query::frontend::sql::SqlQueryPlanner;
 
     use super::*;
 
     /// Try to write lineprotocol data and return all tables that where written.
     pub async fn try_write_lp(db: &Db, lp: &str) -> Result<Vec<String>> {
-        let entries = {
-            let partitioner = &db.rules.read().partition_template;
-            lp_to_entries(lp, partitioner)
-        };
+        let tables = lines_to_batches(lp, 0).unwrap();
+        let mut table_names: Vec<_> = tables.keys().cloned().collect();
 
-        let mut tables = HashSet::new();
-        for entry in entries {
-            if let Some(writes) = entry.partition_writes() {
-                for write in writes {
-                    for batch in write.table_batches() {
-                        tables.insert(batch.name().to_string());
-                    }
-                }
-                db.store_entry(entry, None).await?;
-            }
-        }
+        let write = DbWrite::new(tables, Default::default());
+        db.route_write(&write).await?;
 
-        let mut tables: Vec<_> = tables.into_iter().collect();
-        tables.sort();
-        Ok(tables)
+        table_names.sort_unstable();
+        Ok(table_names)
     }
 
     /// Same was [`try_write_lp`](try_write_lp) but will panic on failure.
@@ -1377,17 +1354,16 @@ mod tests {
         partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
         write_summary::TimestampSummary,
     };
-    use entry::test_helpers::lp_to_entry;
     use futures::{stream, StreamExt, TryStreamExt};
     use iox_object_store::ParquetFilePath;
     use metric::{Attributes, CumulativeGauge, Metric, Observation};
+    use mutable_batch_lp::lines_to_batches;
     use object_store::ObjectStore;
     use parquet_catalog::test_helpers::load_ok;
     use parquet_file::{
         metadata::IoxParquetMetaData,
         test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
     };
-    use persistence_windows::min_max_sequence::MinMaxSequence;
     use query::{QueryChunk, QueryDatabase};
     use schema::selection::Selection;
     use schema::Schema;
@@ -1424,8 +1400,9 @@ mod tests {
         // Validate that writes are rejected if there is no mutable buffer
         let db = immutable_db().await;
 
-        let entry = lp_to_entry("cpu bar=1 10");
-        let res = db.store_entry(entry, None).await;
+        let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
+        let write = DbWrite::new(tables, Default::default());
+        let res = db.route_write(&write).await;
         assert_contains!(
             res.unwrap_err().to_string(),
             "Cannot write to this database: no mutable buffer configured"
@@ -1452,8 +1429,9 @@ mod tests {
             .await
             .db;
 
-        let entry = lp_to_entry("cpu bar=1 10");
-        test_db.store_entry(entry, None).await.unwrap();
+        let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
+        let write = DbWrite::new(tables, Default::default());
+        test_db.route_write(&write).await.unwrap();
 
         assert_eq!(write_buffer_state.get_messages(0).len(), 1);
     }
@@ -1474,8 +1452,9 @@ mod tests {
             .await
             .db;
 
-        let entry = lp_to_entry("cpu bar=1 10");
-        db.store_entry(entry, None).await.unwrap();
+        let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
+        let write = DbWrite::new(tables, Default::default());
+        db.route_write(&write).await.unwrap();
 
         assert_eq!(write_buffer_state.get_messages(0).len(), 1);
 
@@ -1501,9 +1480,9 @@ mod tests {
             .await
             .db;
 
-        let entry = lp_to_entry("cpu bar=1 10");
-
-        let res = db.store_entry(entry, None).await;
+        let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
+        let write = DbWrite::new(tables, Default::default());
+        let res = db.route_write(&write).await;
 
         assert!(
             matches!(res, Err(Error::WriteBufferWritingError { .. })),
@@ -1516,8 +1495,9 @@ mod tests {
     async fn cant_write_when_reading_from_write_buffer() {
         // Validate that writes are rejected if this database is reading from the write buffer
         let db = immutable_db().await;
-        let entry = lp_to_entry("cpu bar=1 10");
-        let res = db.store_entry(entry, None).await;
+        let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
+        let write = DbWrite::new(tables, Default::default());
+        let res = db.route_write(&write).await;
         assert_contains!(
             res.unwrap_err().to_string(),
             "Cannot write to this database: no mutable buffer configured"
@@ -1552,10 +1532,11 @@ mod tests {
             bar,t1=alpha iv=1i 1
         "#;
 
-        let entry = lp_to_entry(lp);
+        let tables = lines_to_batches(lp, 0).unwrap();
+        let write = DbWrite::new(tables, Default::default());
 
         // This should succeed and start chunks in the MUB
-        db.store_entry(entry, None).await.unwrap();
+        db.route_write(&write).await.unwrap();
 
         // Line 1 has the same schema and should end up in the MUB.
         // Line 2 has a different schema than line 1 and should error
@@ -1565,12 +1546,13 @@ mod tests {
              foo,t1=important iv=1i 3"
             .to_string();
 
-        let entry = lp_to_entry(&lp);
+        let tables = lines_to_batches(&lp, 0).unwrap();
+        let write = DbWrite::new(tables, Default::default());
 
         // This should return an error because there was at least one error in the loop
-        let result = db.store_entry(entry, None).await;
+        let err = db.route_write(&write).await.unwrap_err();
         assert_contains!(
-            result.unwrap_err().to_string(),
+            err.to_string(),
             "Storing database write failed with the following error(s), and possibly more:"
         );
 
@@ -2366,9 +2348,9 @@ mod tests {
 
         time.inc(Duration::from_secs(1));
 
-        let entry = lp_to_entry("cpu bar=true 10");
-        let result = db.store_entry(entry, None).await;
-        assert!(result.is_err());
+        let tables = lines_to_batches("cpu bar=true 10", 0).unwrap();
+        let write = DbWrite::new(tables, Default::default());
+        db.route_write(&write).await.unwrap_err();
         {
             let partition = db.catalog.partition("cpu", partition_key).unwrap();
             let partition = partition.read();
@@ -2382,19 +2364,8 @@ mod tests {
 
     #[tokio::test]
     async fn write_updates_persistence_windows() {
-        // Writes should update the persistence windows when there
-        // is a write buffer configured.
-        let write_buffer_state =
-            MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
-        let time_provider = Arc::new(time::MockProvider::new(Time::from_timestamp_nanos(0)));
-        let write_buffer = Arc::new(
-            MockBufferForWriting::new(write_buffer_state.clone(), None, time_provider).unwrap(),
-        );
-        let db = TestDb::builder()
-            .write_buffer_producer(write_buffer)
-            .build()
-            .await
-            .db;
+        // Writes should update the persistence windows
+        let db = make_db().await.db;
 
         let partition_key = "1970-01-01T00";
         write_lp(&db, "cpu bar=1 10").await; // seq 0
@@ -2406,8 +2377,8 @@ mod tests {
         let windows = partition.persistence_windows().unwrap();
         let seq = windows.minimum_unpersisted_sequence().unwrap();
 
-        let seq = seq.get(&0).unwrap();
-        assert_eq!(seq, &MinMaxSequence::new(0, 2));
+        // No write buffer configured
+        assert!(seq.is_empty());
     }
 
     #[tokio::test]
diff --git a/server/src/lib.rs b/server/src/lib.rs
index 99afee620f..a57f50d7c4 100644
--- a/server/src/lib.rs
+++ b/server/src/lib.rs
@@ -105,6 +105,8 @@ use uuid::Uuid;
 pub use application::ApplicationState;
 pub use db::Db;
 pub use job::JobRegistry;
+use mutable_batch::{DbWrite, WriteMeta};
+use mutable_batch_entry::entry_to_batches;
 pub use resolver::{GrpcConnectionString, RemoteTemplate};
 
 mod application;
@@ -272,6 +274,9 @@ pub enum Error {
 
     #[snafu(display("error persisting server config to object storage: {}", source))]
     PersistServerConfig { source: object_store::Error },
+
+    #[snafu(display("Cannot convert entry to db write: {}", source))]
+    EntryConversion { source: mutable_batch_entry::Error },
 }
 
 pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1045,8 +1050,11 @@ where
     ) -> Result<()> {
         use database::WriteError;
 
+        let tables = entry_to_batches(&entry).context(EntryConversion)?;
+        let write = DbWrite::new(tables, WriteMeta::new(None, None, span_ctx, None));
+
         self.active_database(db_name)?
-            .write_entry(entry, span_ctx)
+            .route_write(&write)
             .await
             .map_err(|e| match e {
                 WriteError::NotInitialized { .. } => Error::DatabaseNotInitialized {
@@ -1498,7 +1506,6 @@ mod tests {
     use entry::test_helpers::lp_to_entry;
     use influxdb_line_protocol::parse_lines;
     use iox_object_store::IoxObjectStore;
-    use metric::{Attributes, Metric, U64Counter};
     use object_store::ObjectStore;
     use parquet_catalog::{
         core::{PreservedCatalog, PreservedCatalogConfig},
@@ -1856,9 +1863,7 @@ mod tests {
 
     #[tokio::test]
     async fn write_entry_local() {
-        let application = make_application();
-        let registry = Arc::clone(application.metric_registry());
-        let server = make_server(application);
+        let server = make_server(make_application());
         server.set_id(ServerId::try_from(1).unwrap()).unwrap();
         server.wait_for_init().await.unwrap();
 
@@ -1881,10 +1886,11 @@ mod tests {
             rules.as_ref(),
         )
         .expect("sharded entries");
+        assert_eq!(sharded_entries.len(), 1);
 
-        let entry = &sharded_entries[0].entry;
+        let entry = sharded_entries.into_iter().next().unwrap().entry;
         server
-            .write_entry_local(&name, entry.clone(), None)
+            .write_entry_local(&name, entry, None)
             .await
             .expect("write entry");
 
@@ -1897,14 +1903,6 @@ mod tests {
             "+-----+--------------------------------+",
         ];
         assert_batches_eq!(expected, &batches);
-
-        let bytes = registry
-            .get_instrument::<Metric<U64Counter>>("ingest_entries_bytes")
-            .unwrap()
-            .get_observer(&Attributes::from(&[("status", "ok"), ("db_name", "foo")]))
-            .unwrap()
-            .fetch();
-        assert_eq!(bytes, 240)
     }
 
     // This tests sets up a database with a sharding config which defines exactly one shard