From 7ccbab8c90b90f5d1638561c522a0402d512c8ad Mon Sep 17 00:00:00 2001
From: "Carol (Nichols || Goulding)" <carol.nichols@integer32.com>
Date: Mon, 12 Jul 2021 22:19:28 -0400
Subject: [PATCH 1/2] feat: Make a TableSummaryAndTimes to use to slowly
 replace TableSummary

And use TableSummaryAndTimes with the mutable buffer chunks when turning
them into catalog chunks.

It's proving too big to switch over everything using TableSummary at
once, so this will let us switch over more incrementally.
---
 data_types/src/partition_metadata.rs |  30 ++++++++
 mutable_buffer/src/chunk.rs          | 101 +++++++++++++--------------
 server/src/db/catalog/chunk.rs       |  14 ++--
 server/src/db/chunk.rs               |   2 +-
 4 files changed, 90 insertions(+), 57 deletions(-)

diff --git a/data_types/src/partition_metadata.rs b/data_types/src/partition_metadata.rs
index 62f8ed2979..42f46e94bb 100644
--- a/data_types/src/partition_metadata.rs
+++ b/data_types/src/partition_metadata.rs
@@ -1,6 +1,7 @@
 //! This module contains structs that describe the metadata for a partition
 //! including schema, summary statistics, and file locations in storage.
 
+use chrono::{DateTime, Utc};
 use serde::{Deserialize, Serialize};
 use std::{
     borrow::{Borrow, Cow},
@@ -54,6 +55,35 @@ impl FromIterator<Self> for TableSummary {
     }
 }
 
+/// Temporary transition struct that has times of first/last write. Will eventually replace
+/// TableSummary entirely.
+#[derive(Debug)]
+pub struct TableSummaryAndTimes {
+    /// Table name
+    pub name: String,
+
+    /// Per column statistics
+    pub columns: Vec<ColumnSummary>,
+
+    /// Time at which the first data was written into this table. Note
+    /// this is not the same as the timestamps on the data itself
+    pub time_of_first_write: DateTime<Utc>,
+
+    /// Most recent time at which data write was initiated into this
+    /// chunk. Note this is not the same as the timestamps on the data
+    /// itself
+    pub time_of_last_write: DateTime<Utc>,
+}
+
+impl From<TableSummaryAndTimes> for TableSummary {
+    fn from(other: TableSummaryAndTimes) -> Self {
+        Self {
+            name: other.name,
+            columns: other.columns,
+        }
+    }
+}
+
 /// Metadata and statistics information for a table. This can be
 /// either for the portion of a Table stored within a single chunk or
 /// aggregated across chunks.
diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs
index 243a642913..160923029d 100644
--- a/mutable_buffer/src/chunk.rs
+++ b/mutable_buffer/src/chunk.rs
@@ -4,7 +4,7 @@ use crate::{
 };
 use arrow::record_batch::RecordBatch;
 use chrono::{DateTime, Utc};
-use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummary};
+use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummaryAndTimes};
 use entry::{Sequence, TableBatch};
 use hashbrown::HashMap;
 use internal_types::{
@@ -237,7 +237,7 @@ impl MBChunk {
     }
 
     /// Returns a vec of the summary statistics of the tables in this chunk
-    pub fn table_summary(&self) -> TableSummary {
+    pub fn table_summary(&self) -> TableSummaryAndTimes {
         let mut columns: Vec<_> = self
             .columns
             .iter()
@@ -255,9 +255,11 @@ impl MBChunk {
 
         columns.sort_by(|a, b| a.name.cmp(&b.name));
 
-        TableSummary {
+        TableSummaryAndTimes {
             name: self.table_name.to_string(),
             columns,
+            time_of_first_write: self.time_of_first_write,
+            time_of_last_write: self.time_of_last_write,
         }
     }
 
@@ -521,55 +523,50 @@ mod tests {
         assert!(chunk.time_of_first_write < after_write);
 
         let summary = chunk.table_summary();
-
-        assert_eq!(
-            summary,
-            TableSummary {
-                name: "cpu".to_string(),
-                columns: vec![
-                    ColumnSummary {
-                        name: "env".to_string(),
-                        influxdb_type: Some(InfluxDbType::Tag),
-                        stats: Statistics::String(StatValues {
-                            min: Some("prod".to_string()),
-                            max: Some("stage".to_string()),
-                            count: 3,
-                            distinct_count: Some(NonZeroU64::new(3).unwrap())
-                        })
-                    },
-                    ColumnSummary {
-                        name: "host".to_string(),
-                        influxdb_type: Some(InfluxDbType::Tag),
-                        stats: Statistics::String(StatValues {
-                            min: Some("a".to_string()),
-                            max: Some("c".to_string()),
-                            count: 4,
-                            distinct_count: Some(NonZeroU64::new(3).unwrap())
-                        })
-                    },
-                    ColumnSummary {
-                        name: "time".to_string(),
-                        influxdb_type: Some(InfluxDbType::Timestamp),
-                        stats: Statistics::I64(StatValues {
-                            min: Some(1),
-                            max: Some(2),
-                            count: 4,
-                            distinct_count: None
-                        })
-                    },
-                    ColumnSummary {
-                        name: "val".to_string(),
-                        influxdb_type: Some(InfluxDbType::Field),
-                        stats: Statistics::F64(StatValues {
-                            min: Some(2.),
-                            max: Some(23.),
-                            count: 4,
-                            distinct_count: None
-                        })
-                    },
-                ]
-            }
-        )
+        assert_eq!(summary.name, "cpu");
+        let expected_column_summaries = vec![
+            ColumnSummary {
+                name: "env".to_string(),
+                influxdb_type: Some(InfluxDbType::Tag),
+                stats: Statistics::String(StatValues {
+                    min: Some("prod".to_string()),
+                    max: Some("stage".to_string()),
+                    count: 3,
+                    distinct_count: Some(NonZeroU64::new(3).unwrap()),
+                }),
+            },
+            ColumnSummary {
+                name: "host".to_string(),
+                influxdb_type: Some(InfluxDbType::Tag),
+                stats: Statistics::String(StatValues {
+                    min: Some("a".to_string()),
+                    max: Some("c".to_string()),
+                    count: 4,
+                    distinct_count: Some(NonZeroU64::new(3).unwrap()),
+                }),
+            },
+            ColumnSummary {
+                name: "time".to_string(),
+                influxdb_type: Some(InfluxDbType::Timestamp),
+                stats: Statistics::I64(StatValues {
+                    min: Some(1),
+                    max: Some(2),
+                    count: 4,
+                    distinct_count: None,
+                }),
+            },
+            ColumnSummary {
+                name: "val".to_string(),
+                influxdb_type: Some(InfluxDbType::Field),
+                stats: Statistics::F64(StatValues {
+                    min: Some(2.),
+                    max: Some(23.),
+                    count: 4,
+                    distinct_count: None,
+                }),
+            },
+        ];
+        assert_eq!(summary.columns, expected_column_summaries);
     }
 
     #[test]
diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs
index 8b544b3e80..17b1d51625 100644
--- a/server/src/db/catalog/chunk.rs
+++ b/server/src/db/catalog/chunk.rs
@@ -246,6 +246,10 @@ impl CatalogChunk {
         metrics: ChunkMetrics,
     ) -> Self {
         assert_eq!(chunk.table_name(), &addr.table_name);
+
+        let first_write = chunk.table_summary().time_of_first_write;
+        let last_write = chunk.table_summary().time_of_last_write;
+
         let stage = ChunkStage::Open { mb_chunk: chunk };
 
         metrics
@@ -257,8 +261,8 @@ impl CatalogChunk {
             stage,
             lifecycle_action: None,
             metrics,
-            time_of_first_write: None,
-            time_of_last_write: None,
+            time_of_first_write: Some(first_write),
+            time_of_last_write: Some(last_write),
             time_closed: None,
         };
         chunk.record_write();
@@ -475,7 +479,7 @@ impl CatalogChunk {
         match &self.stage {
             ChunkStage::Open { mb_chunk, .. } => {
                 // The stats for open chunks change so can't be cached
-                Arc::new(mb_chunk.table_summary())
+                Arc::new(mb_chunk.table_summary().into())
             }
             ChunkStage::Frozen { meta, .. } => Arc::clone(&meta.table_summary),
             ChunkStage::Persisted { meta, .. } => Arc::clone(&meta.table_summary),
@@ -533,7 +537,7 @@ impl CatalogChunk {
 
                 // Cache table summary + schema
                 let metadata = ChunkMetadata {
-                    table_summary: Arc::new(mb_chunk.table_summary()),
+                    table_summary: Arc::new(mb_chunk.table_summary().into()),
                     schema: s.full_schema(),
                 };
 
@@ -836,6 +840,8 @@ mod tests {
         let mb_chunk = make_mb_chunk(&addr.table_name, sequencer_id);
         let chunk = CatalogChunk::new_open(addr, mb_chunk, ChunkMetrics::new_unregistered());
         assert!(matches!(chunk.stage(), &ChunkStage::Open { .. }));
+        assert!(chunk.time_of_first_write.is_some());
+        assert!(chunk.time_of_last_write.is_some());
     }
 
     #[tokio::test]
diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs
index 88ebdf0376..729ca1282b 100644
--- a/server/src/db/chunk.rs
+++ b/server/src/db/chunk.rs
@@ -109,7 +109,7 @@ impl DbChunk {
                     chunk: Arc::clone(&snapshot),
                 };
                 let meta = ChunkMetadata {
-                    table_summary: Arc::new(mb_chunk.table_summary()),
+                    table_summary: Arc::new(mb_chunk.table_summary().into()),
                     schema: snapshot.full_schema(),
                 };
                 (state, Arc::new(meta))

From 649b467adbc2cd30baba818bd025a07c3bd61f3c Mon Sep 17 00:00:00 2001
From: "Carol (Nichols || Goulding)" <carol.nichols@integer32.com>
Date: Wed, 14 Jul 2021 09:53:42 -0400
Subject: [PATCH 2/2] fix: CatalogChunk no longer needs to record a write when
 created from a MUB chunk

---
 server/src/db/catalog/chunk.rs | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs
index 17b1d51625..38fdeb62cb 100644
--- a/server/src/db/catalog/chunk.rs
+++ b/server/src/db/catalog/chunk.rs
@@ -238,8 +238,7 @@ impl ChunkMetrics {
 impl CatalogChunk {
     /// Creates a new open chunk from the provided MUB chunk.
     ///
-    /// Panics if the provided chunk is empty, otherwise creates a new open chunk and records a
-    /// write at the current time.
+    /// Panics if the provided chunk is empty, otherwise creates a new open chunk.
     pub(super) fn new_open(
         addr: ChunkAddr,
         chunk: mutable_buffer::chunk::MBChunk,
@@ -256,7 +255,7 @@ impl CatalogChunk {
             .state
             .inc_with_labels(&[KeyValue::new("state", "open")]);
 
-        let mut chunk = Self {
+        Self {
             addr,
             stage,
             lifecycle_action: None,
@@ -264,9 +263,7 @@ impl CatalogChunk {
             time_of_first_write: Some(first_write),
             time_of_last_write: Some(last_write),
             time_closed: None,
-        };
-        chunk.record_write();
-        chunk
+        }
     }
 
     /// Creates a new RUB chunk from the provided RUB chunk and metadata