feat: Make CatalogChunk first/last write times required

Connects to #1927.
pull/24376/head
Carol (Nichols || Goulding) 2021-07-26 14:27:47 -04:00
parent 09e48018a0
commit 8add00e761
12 changed files with 282 additions and 333 deletions

View File

@ -140,12 +140,12 @@ pub struct ChunkSummary {
/// The earliest time at which data contained within this chunk was written /// The earliest time at which data contained within this chunk was written
/// into IOx. Note due to the compaction, etc... this may not be the chunk /// into IOx. Note due to the compaction, etc... this may not be the chunk
/// that data was originally written into /// that data was originally written into
pub time_of_first_write: Option<DateTime<Utc>>, pub time_of_first_write: DateTime<Utc>,
/// The latest time at which data contained within this chunk was written /// The latest time at which data contained within this chunk was written
/// into IOx. Note due to the compaction, etc... this may not be the chunk /// into IOx. Note due to the compaction, etc... this may not be the chunk
/// that data was originally written into /// that data was originally written into
pub time_of_last_write: Option<DateTime<Utc>>, pub time_of_last_write: DateTime<Utc>,
/// Time at which this chunk was marked as closed. Note this is /// Time at which this chunk was marked as closed. Note this is
/// not the same as the timestamps on the data itself /// not the same as the timestamps on the data itself
@ -173,66 +173,14 @@ pub struct DetailedChunkSummary {
} }
impl ChunkSummary { impl ChunkSummary {
/// Construct a ChunkSummary that has None for all timestamps pub fn equal_without_timestamps(&self, other: &Self) -> bool {
#[allow(clippy::too_many_arguments)] self.partition_key == other.partition_key
pub fn new_without_timestamps( && self.table_name == other.table_name
partition_key: Arc<str>, && self.id == other.id
table_name: Arc<str>, && self.storage == other.storage
id: u32, && self.lifecycle_action == other.lifecycle_action
storage: ChunkStorage, && self.memory_bytes == other.memory_bytes
lifecycle_action: Option<ChunkLifecycleAction>, && self.object_store_bytes == other.object_store_bytes
memory_bytes: usize, && self.row_count == other.row_count
object_store_bytes: usize,
row_count: usize,
) -> Self {
Self {
partition_key,
table_name,
id,
storage,
lifecycle_action,
memory_bytes,
object_store_bytes,
row_count,
time_of_last_access: None,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
}
}
/// Return a new ChunkSummary with None for all timestamps
pub fn normalize(self) -> Self {
let ChunkSummary {
partition_key,
table_name,
id,
storage,
lifecycle_action,
memory_bytes,
object_store_bytes,
row_count,
..
} = self;
Self::new_without_timestamps(
partition_key,
table_name,
id,
storage,
lifecycle_action,
memory_bytes,
object_store_bytes,
row_count,
)
}
/// Normalizes a set of ChunkSummaries for comparison by removing timestamps
pub fn normalize_summaries(summaries: Vec<Self>) -> Vec<Self> {
let mut summaries = summaries
.into_iter()
.map(|summary| summary.normalize())
.collect::<Vec<_>>();
summaries.sort_unstable();
summaries
} }
} }

View File

@ -36,8 +36,8 @@ impl From<ChunkSummary> for management::Chunk {
object_store_bytes: object_store_bytes as u64, object_store_bytes: object_store_bytes as u64,
row_count: row_count as u64, row_count: row_count as u64,
time_of_last_access: time_of_last_access.map(Into::into), time_of_last_access: time_of_last_access.map(Into::into),
time_of_first_write: time_of_first_write.map(Into::into), time_of_first_write: Some(time_of_first_write.into()),
time_of_last_write: time_of_last_write.map(Into::into), time_of_last_write: Some(time_of_last_write.into()),
time_closed: time_closed.map(Into::into), time_closed: time_closed.map(Into::into),
} }
} }
@ -83,6 +83,15 @@ impl TryFrom<management::Chunk> for ChunkSummary {
t.map(|t| convert_timestamp(t, field)).transpose() t.map(|t| convert_timestamp(t, field)).transpose()
}; };
let required_timestamp = |t: Option<google_types::protobuf::Timestamp>,
field: &'static str| {
t.ok_or_else(|| FieldViolation {
field: field.to_string(),
description: "Timestamp is required".to_string(),
})
.and_then(|t| convert_timestamp(t, field))
};
let management::Chunk { let management::Chunk {
partition_key, partition_key,
table_name, table_name,
@ -109,8 +118,8 @@ impl TryFrom<management::Chunk> for ChunkSummary {
object_store_bytes: object_store_bytes as usize, object_store_bytes: object_store_bytes as usize,
row_count: row_count as usize, row_count: row_count as usize,
time_of_last_access: timestamp(time_of_last_access, "time_of_last_access")?, time_of_last_access: timestamp(time_of_last_access, "time_of_last_access")?,
time_of_first_write: timestamp(time_of_first_write, "time_of_first_write")?, time_of_first_write: required_timestamp(time_of_first_write, "time_of_first_write")?,
time_of_last_write: timestamp(time_of_last_write, "time_of_last_write")?, time_of_last_write: required_timestamp(time_of_last_write, "time_of_last_write")?,
time_closed: timestamp(time_closed, "time_closed")?, time_closed: timestamp(time_closed, "time_closed")?,
}) })
} }
@ -158,6 +167,7 @@ mod test {
#[test] #[test]
fn valid_proto_to_summary() { fn valid_proto_to_summary() {
let now = Utc::now();
let proto = management::Chunk { let proto = management::Chunk {
partition_key: "foo".to_string(), partition_key: "foo".to_string(),
table_name: "bar".to_string(), table_name: "bar".to_string(),
@ -168,8 +178,8 @@ mod test {
storage: management::ChunkStorage::ObjectStoreOnly.into(), storage: management::ChunkStorage::ObjectStoreOnly.into(),
lifecycle_action: management::ChunkLifecycleAction::Moving.into(), lifecycle_action: management::ChunkLifecycleAction::Moving.into(),
time_of_first_write: None, time_of_first_write: Some(now.into()),
time_of_last_write: None, time_of_last_write: Some(now.into()),
time_closed: None, time_closed: None,
time_of_last_access: Some(google_types::protobuf::Timestamp { time_of_last_access: Some(google_types::protobuf::Timestamp {
seconds: 50, seconds: 50,
@ -187,8 +197,8 @@ mod test {
row_count: 321, row_count: 321,
storage: ChunkStorage::ObjectStoreOnly, storage: ChunkStorage::ObjectStoreOnly,
lifecycle_action: Some(ChunkLifecycleAction::Moving), lifecycle_action: Some(ChunkLifecycleAction::Moving),
time_of_first_write: None, time_of_first_write: now,
time_of_last_write: None, time_of_last_write: now,
time_closed: None, time_closed: None,
time_of_last_access: Some(Utc.timestamp_nanos(50_000_000_007)), time_of_last_access: Some(Utc.timestamp_nanos(50_000_000_007)),
}; };
@ -202,6 +212,7 @@ mod test {
#[test] #[test]
fn valid_summary_to_proto() { fn valid_summary_to_proto() {
let now = Utc::now();
let summary = ChunkSummary { let summary = ChunkSummary {
partition_key: Arc::from("foo"), partition_key: Arc::from("foo"),
table_name: Arc::from("bar"), table_name: Arc::from("bar"),
@ -211,8 +222,8 @@ mod test {
row_count: 321, row_count: 321,
storage: ChunkStorage::ObjectStoreOnly, storage: ChunkStorage::ObjectStoreOnly,
lifecycle_action: Some(ChunkLifecycleAction::Persisting), lifecycle_action: Some(ChunkLifecycleAction::Persisting),
time_of_first_write: None, time_of_first_write: now,
time_of_last_write: None, time_of_last_write: now,
time_closed: None, time_closed: None,
time_of_last_access: Some(Utc.timestamp_nanos(12_000_100_007)), time_of_last_access: Some(Utc.timestamp_nanos(12_000_100_007)),
}; };
@ -228,8 +239,8 @@ mod test {
row_count: 321, row_count: 321,
storage: management::ChunkStorage::ObjectStoreOnly.into(), storage: management::ChunkStorage::ObjectStoreOnly.into(),
lifecycle_action: management::ChunkLifecycleAction::Persisting.into(), lifecycle_action: management::ChunkLifecycleAction::Persisting.into(),
time_of_first_write: None, time_of_first_write: Some(now.into()),
time_of_last_write: None, time_of_last_write: Some(now.into()),
time_closed: None, time_closed: None,
time_of_last_access: Some(google_types::protobuf::Timestamp { time_of_last_access: Some(google_types::protobuf::Timestamp {
seconds: 12, seconds: 12,

View File

@ -178,7 +178,7 @@ pub trait LifecycleChunk {
/// Returns the access metrics for this chunk /// Returns the access metrics for this chunk
fn access_metrics(&self) -> AccessMetrics; fn access_metrics(&self) -> AccessMetrics;
fn time_of_last_write(&self) -> Option<DateTime<Utc>>; fn time_of_last_write(&self) -> DateTime<Utc>;
fn addr(&self) -> &ChunkAddr; fn addr(&self) -> &ChunkAddr;

View File

@ -614,16 +614,7 @@ fn can_move<C: LifecycleChunk>(rules: &LifecycleRules, chunk: &C, now: DateTime<
return true; return true;
} }
match chunk.time_of_last_write() { elapsed_seconds(now, chunk.time_of_last_write()) >= rules.late_arrive_window_seconds.get()
Some(last_write)
if elapsed_seconds(now, last_write) >= rules.late_arrive_window_seconds.get() =>
{
true
}
// Disable movement the chunk is empty, or the linger hasn't expired
_ => false,
}
} }
/// An action to free up memory /// An action to free up memory
@ -710,13 +701,13 @@ mod tests {
row_count: usize, row_count: usize,
min_timestamp: Option<DateTime<Utc>>, min_timestamp: Option<DateTime<Utc>>,
access_metrics: AccessMetrics, access_metrics: AccessMetrics,
time_of_last_write: Option<DateTime<Utc>>, time_of_last_write: DateTime<Utc>,
lifecycle_action: Option<TaskTracker<ChunkLifecycleAction>>, lifecycle_action: Option<TaskTracker<ChunkLifecycleAction>>,
storage: ChunkStorage, storage: ChunkStorage,
} }
impl TestChunk { impl TestChunk {
fn new(id: u32, time_of_last_write: Option<i64>, storage: ChunkStorage) -> Self { fn new(id: u32, time_of_last_write: i64, storage: ChunkStorage) -> Self {
let addr = ChunkAddr { let addr = ChunkAddr {
db_name: Arc::from(""), db_name: Arc::from(""),
table_name: Arc::from(""), table_name: Arc::from(""),
@ -732,7 +723,7 @@ mod tests {
count: 0, count: 0,
last_instant: Instant::now(), last_instant: Instant::now(),
}, },
time_of_last_write: time_of_last_write.map(from_secs), time_of_last_write: from_secs(time_of_last_write),
lifecycle_action: None, lifecycle_action: None,
storage, storage,
} }
@ -836,7 +827,7 @@ mod tests {
let id = partition.next_id; let id = partition.next_id;
partition.next_id += 1; partition.next_id += 1;
let mut new_chunk = TestChunk::new(id, None, ChunkStorage::ReadBuffer); let mut new_chunk = TestChunk::new(id, 0, ChunkStorage::ReadBuffer);
new_chunk.row_count = 0; new_chunk.row_count = 0;
for chunk in &chunks { for chunk in &chunks {
@ -876,7 +867,7 @@ mod tests {
partition.next_id += 1; partition.next_id += 1;
// The remainder left behind after the split // The remainder left behind after the split
let new_chunk = TestChunk::new(id, None, ChunkStorage::ReadBuffer) let new_chunk = TestChunk::new(id, 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(handle.timestamp + chrono::Duration::nanoseconds(1)); .with_min_timestamp(handle.timestamp + chrono::Duration::nanoseconds(1));
partition partition
@ -973,7 +964,7 @@ mod tests {
self.access_metrics.clone() self.access_metrics.clone()
} }
fn time_of_last_write(&self) -> Option<DateTime<Utc>> { fn time_of_last_write(&self) -> DateTime<Utc> {
self.time_of_last_write self.time_of_last_write
} }
@ -1094,20 +1085,20 @@ mod tests {
mub_row_threshold: NonZeroUsize::new(74).unwrap(), mub_row_threshold: NonZeroUsize::new(74).unwrap(),
..Default::default() ..Default::default()
}; };
let chunk = TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer); let chunk = TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer);
assert!(!can_move(&rules, &chunk, from_secs(9))); assert!(!can_move(&rules, &chunk, from_secs(9)));
assert!(can_move(&rules, &chunk, from_secs(11))); assert!(can_move(&rules, &chunk, from_secs(11)));
// can move even if the chunk is small // can move even if the chunk is small
let chunk = TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer).with_row_count(73); let chunk = TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer).with_row_count(73);
assert!(can_move(&rules, &chunk, from_secs(11))); assert!(can_move(&rules, &chunk, from_secs(11)));
// If over the row count threshold, we should be able to move // If over the row count threshold, we should be able to move
let chunk = TestChunk::new(0, None, ChunkStorage::OpenMutableBuffer).with_row_count(74); let chunk = TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer).with_row_count(74);
assert!(can_move(&rules, &chunk, from_secs(0))); assert!(can_move(&rules, &chunk, from_secs(0)));
// If below the default row count threshold, it shouldn't move // If below the default row count threshold, it shouldn't move
let chunk = TestChunk::new(0, None, ChunkStorage::OpenMutableBuffer).with_row_count(73); let chunk = TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer).with_row_count(73);
assert!(!can_move(&rules, &chunk, from_secs(0))); assert!(!can_move(&rules, &chunk, from_secs(0)));
} }
@ -1167,9 +1158,9 @@ mod tests {
// The default rules shouldn't do anything // The default rules shouldn't do anything
let rules = LifecycleRules::default(); let rules = LifecycleRules::default();
let chunks = vec![ let chunks = vec![
TestChunk::new(0, Some(1), ChunkStorage::OpenMutableBuffer), TestChunk::new(0, 1, ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, Some(1), ChunkStorage::OpenMutableBuffer), TestChunk::new(1, 1, ChunkStorage::OpenMutableBuffer),
TestChunk::new(2, Some(1), ChunkStorage::OpenMutableBuffer), TestChunk::new(2, 1, ChunkStorage::OpenMutableBuffer),
]; ];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
@ -1185,9 +1176,9 @@ mod tests {
..Default::default() ..Default::default()
}; };
let chunks = vec![ let chunks = vec![
TestChunk::new(0, Some(8), ChunkStorage::OpenMutableBuffer), TestChunk::new(0, 8, ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, Some(5), ChunkStorage::OpenMutableBuffer), TestChunk::new(1, 5, ChunkStorage::OpenMutableBuffer),
TestChunk::new(2, Some(0), ChunkStorage::OpenMutableBuffer), TestChunk::new(2, 0, ChunkStorage::OpenMutableBuffer),
]; ];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
@ -1262,7 +1253,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)]; let chunks = vec![TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer)];
let db = TestDb::new(rules.clone(), chunks); let db = TestDb::new(rules.clone(), chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
@ -1272,29 +1263,30 @@ mod tests {
let instant = Instant::now(); let instant = Instant::now();
let chunks = let chunks = vec![
vec![ // two "open" chunks => they must not be dropped (yet)
// two "open" chunks => they must not be dropped (yet) TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer),
TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer), TestChunk::new(1, 0, ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, Some(0), ChunkStorage::OpenMutableBuffer), // "moved" chunk => can be dropped because `drop_non_persistent=true`
// "moved" chunk => can be dropped because `drop_non_persistent=true` TestChunk::new(2, 0, ChunkStorage::ReadBuffer),
TestChunk::new(2, Some(0), ChunkStorage::ReadBuffer), // "writing" chunk => cannot be unloaded while write is in-progress
// "writing" chunk => cannot be unloaded while write is in-progress TestChunk::new(3, 0, ChunkStorage::ReadBuffer)
TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer) .with_action(ChunkLifecycleAction::Persisting),
.with_action(ChunkLifecycleAction::Persisting), // "written" chunk => can be unloaded
// "written" chunk => can be unloaded TestChunk::new(4, 0, ChunkStorage::ReadBufferAndObjectStore).with_access_metrics(
TestChunk::new(4, Some(0), ChunkStorage::ReadBufferAndObjectStore) AccessMetrics {
.with_access_metrics(AccessMetrics { count: 1,
count: 1, last_instant: instant,
last_instant: instant, },
}), ),
// "written" chunk => can be unloaded // "written" chunk => can be unloaded
TestChunk::new(5, Some(0), ChunkStorage::ReadBufferAndObjectStore) TestChunk::new(5, 0, ChunkStorage::ReadBufferAndObjectStore).with_access_metrics(
.with_access_metrics(AccessMetrics { AccessMetrics {
count: 12, count: 12,
last_instant: instant - Duration::from_secs(1), last_instant: instant - Duration::from_secs(1),
}), },
]; ),
];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
@ -1324,7 +1316,7 @@ mod tests {
}; };
assert!(!rules.drop_non_persisted); assert!(!rules.drop_non_persisted);
let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)]; let chunks = vec![TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer)];
let db = TestDb::new(rules.clone(), chunks); let db = TestDb::new(rules.clone(), chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
@ -1334,15 +1326,15 @@ mod tests {
let chunks = vec![ let chunks = vec![
// two "open" chunks => they must not be dropped (yet) // two "open" chunks => they must not be dropped (yet)
TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer), TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer),
TestChunk::new(1, Some(0), ChunkStorage::OpenMutableBuffer), TestChunk::new(1, 0, ChunkStorage::OpenMutableBuffer),
// "moved" chunk => cannot be dropped because `drop_non_persistent=false` // "moved" chunk => cannot be dropped because `drop_non_persistent=false`
TestChunk::new(2, Some(0), ChunkStorage::ReadBuffer), TestChunk::new(2, 0, ChunkStorage::ReadBuffer),
// "writing" chunk => cannot be drop while write is in-progess // "writing" chunk => cannot be drop while write is in-progess
TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer) TestChunk::new(3, 0, ChunkStorage::ReadBuffer)
.with_action(ChunkLifecycleAction::Persisting), .with_action(ChunkLifecycleAction::Persisting),
// "written" chunk => can be unloaded // "written" chunk => can be unloaded
TestChunk::new(4, Some(0), ChunkStorage::ReadBufferAndObjectStore), TestChunk::new(4, 0, ChunkStorage::ReadBufferAndObjectStore),
]; ];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
@ -1360,7 +1352,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)]; let chunks = vec![TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer)];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
@ -1383,55 +1375,55 @@ mod tests {
let partitions = vec![ let partitions = vec![
TestPartition::new(vec![ TestPartition::new(vec![
// still receiving writes => cannot compact // still receiving writes => cannot compact
TestChunk::new(0, Some(20), ChunkStorage::OpenMutableBuffer), TestChunk::new(0, 20, ChunkStorage::OpenMutableBuffer),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// still receiving writes => cannot compact // still receiving writes => cannot compact
TestChunk::new(1, Some(20), ChunkStorage::OpenMutableBuffer), TestChunk::new(1, 20, ChunkStorage::OpenMutableBuffer),
// closed => can compact // closed => can compact
TestChunk::new(2, Some(20), ChunkStorage::ClosedMutableBuffer), TestChunk::new(2, 20, ChunkStorage::ClosedMutableBuffer),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// open but cold => can compact // open but cold => can compact
TestChunk::new(3, Some(5), ChunkStorage::OpenMutableBuffer), TestChunk::new(3, 5, ChunkStorage::OpenMutableBuffer),
// closed => can compact // closed => can compact
TestChunk::new(4, Some(20), ChunkStorage::ClosedMutableBuffer), TestChunk::new(4, 20, ChunkStorage::ClosedMutableBuffer),
// closed => can compact // closed => can compact
TestChunk::new(5, Some(20), ChunkStorage::ReadBuffer), TestChunk::new(5, 20, ChunkStorage::ReadBuffer),
// persisted => cannot compact // persisted => cannot compact
TestChunk::new(6, Some(20), ChunkStorage::ReadBufferAndObjectStore), TestChunk::new(6, 20, ChunkStorage::ReadBufferAndObjectStore),
// persisted => cannot compact // persisted => cannot compact
TestChunk::new(7, Some(20), ChunkStorage::ObjectStoreOnly), TestChunk::new(7, 20, ChunkStorage::ObjectStoreOnly),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// closed => can compact // closed => can compact
TestChunk::new(8, Some(20), ChunkStorage::ReadBuffer), TestChunk::new(8, 20, ChunkStorage::ReadBuffer),
// closed => can compact // closed => can compact
TestChunk::new(9, Some(20), ChunkStorage::ReadBuffer), TestChunk::new(9, 20, ChunkStorage::ReadBuffer),
// persisted => cannot compact // persisted => cannot compact
TestChunk::new(10, Some(20), ChunkStorage::ReadBufferAndObjectStore), TestChunk::new(10, 20, ChunkStorage::ReadBufferAndObjectStore),
// persisted => cannot compact // persisted => cannot compact
TestChunk::new(11, Some(20), ChunkStorage::ObjectStoreOnly), TestChunk::new(11, 20, ChunkStorage::ObjectStoreOnly),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// open but cold => can compact // open but cold => can compact
TestChunk::new(12, Some(5), ChunkStorage::OpenMutableBuffer), TestChunk::new(12, 5, ChunkStorage::OpenMutableBuffer),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// already compacted => should not compact // already compacted => should not compact
TestChunk::new(13, Some(5), ChunkStorage::ReadBuffer), TestChunk::new(13, 5, ChunkStorage::ReadBuffer),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// closed => can compact // closed => can compact
TestChunk::new(14, Some(20), ChunkStorage::ReadBuffer).with_row_count(400), TestChunk::new(14, 20, ChunkStorage::ReadBuffer).with_row_count(400),
// too many individual rows => ignore // too many individual rows => ignore
TestChunk::new(15, Some(20), ChunkStorage::ReadBuffer).with_row_count(1_000), TestChunk::new(15, 20, ChunkStorage::ReadBuffer).with_row_count(1_000),
// closed => can compact // closed => can compact
TestChunk::new(16, Some(20), ChunkStorage::ReadBuffer).with_row_count(400), TestChunk::new(16, 20, ChunkStorage::ReadBuffer).with_row_count(400),
// too many total rows => next compaction job // too many total rows => next compaction job
TestChunk::new(17, Some(20), ChunkStorage::ReadBuffer).with_row_count(400), TestChunk::new(17, 20, ChunkStorage::ReadBuffer).with_row_count(400),
// too many total rows => next compaction job // too many total rows => next compaction job
TestChunk::new(18, Some(20), ChunkStorage::ReadBuffer).with_row_count(400), TestChunk::new(18, 20, ChunkStorage::ReadBuffer).with_row_count(400),
]), ]),
]; ];
@ -1466,19 +1458,19 @@ mod tests {
let partitions = vec![ let partitions = vec![
TestPartition::new(vec![ TestPartition::new(vec![
// closed => can compact // closed => can compact
TestChunk::new(0, Some(20), ChunkStorage::ClosedMutableBuffer), TestChunk::new(0, 20, ChunkStorage::ClosedMutableBuffer),
// closed => can compact // closed => can compact
TestChunk::new(10, Some(30), ChunkStorage::ClosedMutableBuffer), TestChunk::new(10, 30, ChunkStorage::ClosedMutableBuffer),
// closed => can compact // closed => can compact
TestChunk::new(12, Some(40), ChunkStorage::ClosedMutableBuffer), TestChunk::new(12, 40, ChunkStorage::ClosedMutableBuffer),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// closed => can compact // closed => can compact
TestChunk::new(1, Some(20), ChunkStorage::ClosedMutableBuffer), TestChunk::new(1, 20, ChunkStorage::ClosedMutableBuffer),
]), ]),
TestPartition::new(vec![ TestPartition::new(vec![
// closed => can compact // closed => can compact
TestChunk::new(200, Some(10), ChunkStorage::ClosedMutableBuffer), TestChunk::new(200, 10, ChunkStorage::ClosedMutableBuffer),
]), ]),
]; ];
@ -1516,65 +1508,59 @@ mod tests {
let partitions = vec![ let partitions = vec![
// Insufficient rows and not old enough => don't persist but can compact // Insufficient rows and not old enough => don't persist but can compact
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(0, Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(0, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(1, Some(0), ChunkStorage::ReadBuffer) TestChunk::new(1, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
.with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(10, now, from_secs(20)), .with_persistence(10, now, from_secs(20)),
// Sufficient rows => persist // Sufficient rows => persist
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(2, Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(2, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer) TestChunk::new(3, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
.with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, now, from_secs(20)), .with_persistence(1_000, now, from_secs(20)),
// Writes too old => persist // Writes too old => persist
TestPartition::new(vec![ TestPartition::new(vec![
// Should split open chunks // Should split open chunks
TestChunk::new(4, Some(20), ChunkStorage::OpenMutableBuffer) TestChunk::new(4, 20, ChunkStorage::OpenMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(5, Some(0), ChunkStorage::ReadBuffer) TestChunk::new(5, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
.with_min_timestamp(from_secs(5)), TestChunk::new(6, 0, ChunkStorage::ObjectStoreOnly)
TestChunk::new(6, Some(0), ChunkStorage::ObjectStoreOnly)
.with_min_timestamp(from_secs(5)), .with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(10, now - Duration::from_secs(10), from_secs(20)), .with_persistence(10, now - Duration::from_secs(10), from_secs(20)),
// Sufficient rows but conflicting compaction => prevent compaction // Sufficient rows but conflicting compaction => prevent compaction
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(7, Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(7, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)) .with_min_timestamp(from_secs(10))
.with_action(ChunkLifecycleAction::Compacting), .with_action(ChunkLifecycleAction::Compacting),
// This chunk would be a compaction candidate, but we want to persist it // This chunk would be a compaction candidate, but we want to persist it
TestChunk::new(8, Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(8, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(9, Some(0), ChunkStorage::ReadBuffer) TestChunk::new(9, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
.with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, now, from_secs(20)), .with_persistence(1_000, now, from_secs(20)),
// Sufficient rows and non-conflicting compaction => persist // Sufficient rows and non-conflicting compaction => persist
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(10, Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(10, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21)) .with_min_timestamp(from_secs(21))
.with_action(ChunkLifecycleAction::Compacting), .with_action(ChunkLifecycleAction::Compacting),
TestChunk::new(11, Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(11, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(12, Some(0), ChunkStorage::ReadBuffer) TestChunk::new(12, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
.with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, now, from_secs(20)), .with_persistence(1_000, now, from_secs(20)),
// Sufficient rows, non-conflicting compaction and compact-able chunk => persist + compact // Sufficient rows, non-conflicting compaction and compact-able chunk => persist + compact
TestPartition::new(vec![ TestPartition::new(vec![
TestChunk::new(13, Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(13, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21)) .with_min_timestamp(from_secs(21))
.with_action(ChunkLifecycleAction::Compacting), .with_action(ChunkLifecycleAction::Compacting),
TestChunk::new(14, Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(14, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21)), .with_min_timestamp(from_secs(21)),
TestChunk::new(15, Some(0), ChunkStorage::ClosedMutableBuffer) TestChunk::new(15, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)), .with_min_timestamp(from_secs(10)),
TestChunk::new(16, Some(0), ChunkStorage::ReadBuffer) TestChunk::new(16, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
.with_min_timestamp(from_secs(5)),
]) ])
.with_persistence(1_000, now, from_secs(20)), .with_persistence(1_000, now, from_secs(20)),
]; ];
@ -1604,7 +1590,7 @@ mod tests {
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
..Default::default() ..Default::default()
}; };
let chunks = vec![TestChunk::new(0, Some(40), ChunkStorage::OpenMutableBuffer)]; let chunks = vec![TestChunk::new(0, 40, ChunkStorage::OpenMutableBuffer)];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
@ -1619,11 +1605,7 @@ mod tests {
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
..Default::default() ..Default::default()
}; };
let chunks = vec![TestChunk::new( let chunks = vec![TestChunk::new(0, 40, ChunkStorage::ClosedMutableBuffer)];
0,
Some(40),
ChunkStorage::ClosedMutableBuffer,
)];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);
@ -1635,7 +1617,7 @@ mod tests {
#[test] #[test]
fn test_recovers_lifecycle_action() { fn test_recovers_lifecycle_action() {
let rules = LifecycleRules::default(); let rules = LifecycleRules::default();
let chunks = vec![TestChunk::new(0, None, ChunkStorage::ClosedMutableBuffer)]; let chunks = vec![TestChunk::new(0, 0, ChunkStorage::ClosedMutableBuffer)];
let db = TestDb::new(rules, chunks); let db = TestDb::new(rules, chunks);
let mut lifecycle = LifecyclePolicy::new(&db); let mut lifecycle = LifecyclePolicy::new(&db);

View File

@ -1332,7 +1332,7 @@ mod tests {
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use bytes::Bytes; use bytes::Bytes;
use chrono::DateTime; use chrono::{DateTime, TimeZone};
use data_types::{ use data_types::{
chunk_metadata::ChunkStorage, chunk_metadata::ChunkStorage,
database_rules::{LifecycleRules, PartitionTemplate, TemplatePart}, database_rules::{LifecycleRules, PartitionTemplate, TemplatePart},
@ -2629,10 +2629,7 @@ mod tests {
let chunk = partition.open_chunk().unwrap(); let chunk = partition.open_chunk().unwrap();
let chunk = chunk.read(); let chunk = chunk.read();
( (partition.last_write_at(), chunk.time_of_last_write())
partition.last_write_at(),
chunk.time_of_last_write().unwrap(),
)
}; };
let entry = lp_to_entry("cpu bar=true 10"); let entry = lp_to_entry("cpu bar=true 10");
@ -2644,7 +2641,7 @@ mod tests {
assert_eq!(last_write_prev, partition.last_write_at()); assert_eq!(last_write_prev, partition.last_write_at());
let chunk = partition.open_chunk().unwrap(); let chunk = partition.open_chunk().unwrap();
let chunk = chunk.read(); let chunk = chunk.read();
assert_eq!(chunk_last_write_prev, chunk.time_of_last_write().unwrap()); assert_eq!(chunk_last_write_prev, chunk.time_of_last_write());
} }
} }
@ -2788,9 +2785,9 @@ mod tests {
println!("Chunk: {:#?}", chunk); println!("Chunk: {:#?}", chunk);
// then the chunk creation and rollover times are as expected // then the chunk creation and rollover times are as expected
assert!(start < chunk.time_of_first_write().unwrap()); assert!(start < chunk.time_of_first_write());
assert!(chunk.time_of_first_write().unwrap() < after_data_load); assert!(chunk.time_of_first_write() < after_data_load);
assert!(chunk.time_of_first_write().unwrap() == chunk.time_of_last_write().unwrap()); assert!(chunk.time_of_first_write() == chunk.time_of_last_write());
assert!(after_data_load < chunk.time_closed().unwrap()); assert!(after_data_load < chunk.time_closed().unwrap());
assert!(chunk.time_closed().unwrap() < after_rollover); assert!(chunk.time_closed().unwrap() < after_rollover);
} }
@ -2850,18 +2847,21 @@ mod tests {
print!("Partitions: {:?}", db.partition_keys().unwrap()); print!("Partitions: {:?}", db.partition_keys().unwrap());
let chunk_summaries = db.partition_chunk_summaries("1970-01-05T15"); let chunk_summaries = db.partition_chunk_summaries("1970-01-05T15");
let chunk_summaries = ChunkSummary::normalize_summaries(chunk_summaries);
let expected = vec![ChunkSummary::new_without_timestamps( let expected = vec![ChunkSummary {
Arc::from("1970-01-05T15"), partition_key: Arc::from("1970-01-05T15"),
Arc::from("cpu"), table_name: Arc::from("cpu"),
0, id: 0,
ChunkStorage::OpenMutableBuffer, storage: ChunkStorage::OpenMutableBuffer,
None, lifecycle_action: None,
70, // memory_size memory_bytes: 70, // memory_size
0, // os_size object_store_bytes: 0, // os_size
1, row_count: 1,
)]; time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None,
}];
let size: usize = db let size: usize = db
.chunk_summaries() .chunk_summaries()
@ -2872,11 +2872,14 @@ mod tests {
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), size); assert_eq!(db.catalog.metrics().memory().mutable_buffer(), size);
assert_eq!( for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) {
expected, chunk_summaries, assert!(
"expected:\n{:#?}\n\nactual:{:#?}\n\n", expected_summary.equal_without_timestamps(&actual_summary),
expected, chunk_summaries "expected:\n{:#?}\n\nactual:{:#?}\n\n",
); expected_summary,
actual_summary
);
}
} }
#[tokio::test] #[tokio::test]
@ -2896,23 +2899,23 @@ mod tests {
let summary = &chunk_summaries[0]; let summary = &chunk_summaries[0];
assert_eq!(summary.id, 0, "summary; {:#?}", summary); assert_eq!(summary.id, 0, "summary; {:#?}", summary);
assert!( assert!(
summary.time_of_first_write.unwrap() > start, summary.time_of_first_write > start,
"summary; {:#?}", "summary; {:#?}",
summary summary
); );
assert!( assert!(
summary.time_of_first_write.unwrap() < after_close, summary.time_of_first_write < after_close,
"summary; {:#?}", "summary; {:#?}",
summary summary
); );
assert!( assert!(
summary.time_of_last_write.unwrap() > after_first_write, summary.time_of_last_write > after_first_write,
"summary; {:#?}", "summary; {:#?}",
summary summary
); );
assert!( assert!(
summary.time_of_last_write.unwrap() < after_close, summary.time_of_last_write < after_close,
"summary; {:#?}", "summary; {:#?}",
summary summary
); );
@ -2930,8 +2933,8 @@ mod tests {
} }
fn assert_first_last_times_eq(chunk_summary: &ChunkSummary) { fn assert_first_last_times_eq(chunk_summary: &ChunkSummary) {
let first_write = chunk_summary.time_of_first_write.unwrap(); let first_write = chunk_summary.time_of_first_write;
let last_write = chunk_summary.time_of_last_write.unwrap(); let last_write = chunk_summary.time_of_last_write;
assert_eq!(first_write, last_write); assert_eq!(first_write, last_write);
} }
@ -2941,8 +2944,8 @@ mod tests {
before: DateTime<Utc>, before: DateTime<Utc>,
after: DateTime<Utc>, after: DateTime<Utc>,
) { ) {
let first_write = chunk_summary.time_of_first_write.unwrap(); let first_write = chunk_summary.time_of_first_write;
let last_write = chunk_summary.time_of_last_write.unwrap(); let last_write = chunk_summary.time_of_last_write;
assert!(before < first_write); assert!(before < first_write);
assert!(before < last_write); assert!(before < last_write);
@ -2951,8 +2954,8 @@ mod tests {
} }
fn assert_chunks_times_ordered(before: &ChunkSummary, after: &ChunkSummary) { fn assert_chunks_times_ordered(before: &ChunkSummary, after: &ChunkSummary) {
let before_last_write = before.time_of_last_write.unwrap(); let before_last_write = before.time_of_last_write;
let after_first_write = after.time_of_first_write.unwrap(); let after_first_write = after.time_of_first_write;
assert!(before_last_write < after_first_write); assert!(before_last_write < after_first_write);
} }
@ -2963,14 +2966,14 @@ mod tests {
} }
fn assert_chunks_first_times_eq(a: &ChunkSummary, b: &ChunkSummary) { fn assert_chunks_first_times_eq(a: &ChunkSummary, b: &ChunkSummary) {
let a_first_write = a.time_of_first_write.unwrap(); let a_first_write = a.time_of_first_write;
let b_first_write = b.time_of_first_write.unwrap(); let b_first_write = b.time_of_first_write;
assert_eq!(a_first_write, b_first_write); assert_eq!(a_first_write, b_first_write);
} }
fn assert_chunks_last_times_eq(a: &ChunkSummary, b: &ChunkSummary) { fn assert_chunks_last_times_eq(a: &ChunkSummary, b: &ChunkSummary) {
let a_last_write = a.time_of_last_write.unwrap(); let a_last_write = a.time_of_last_write;
let b_last_write = b.time_of_last_write.unwrap(); let b_last_write = b.time_of_last_write;
assert_eq!(a_last_write, b_last_write); assert_eq!(a_last_write, b_last_write);
} }
@ -3132,49 +3135,62 @@ mod tests {
assert_first_last_times_eq(&open_mb_t8); assert_first_last_times_eq(&open_mb_t8);
assert_first_last_times_between(&open_mb_t8, time7, time8); assert_first_last_times_between(&open_mb_t8, time7, time8);
let chunk_summaries = ChunkSummary::normalize_summaries(chunk_summaries);
let lifecycle_action = None; let lifecycle_action = None;
let expected = vec![ let expected = vec![
ChunkSummary::new_without_timestamps( ChunkSummary {
Arc::from("1970-01-01T00"), partition_key: Arc::from("1970-01-01T00"),
Arc::from("cpu"), table_name: Arc::from("cpu"),
2, id: 2,
ChunkStorage::ReadBufferAndObjectStore, storage: ChunkStorage::ReadBufferAndObjectStore,
lifecycle_action, lifecycle_action,
3332, // size of RB and OS chunks memory_bytes: 3332, // size of RB and OS chunks
1523, // size of parquet file object_store_bytes: 1523, // size of parquet file
2, row_count: 2,
), time_of_last_access: None,
ChunkSummary::new_without_timestamps( time_of_first_write: Utc.timestamp_nanos(1),
Arc::from("1970-01-05T15"), time_of_last_write: Utc.timestamp_nanos(1),
Arc::from("cpu"), time_closed: None,
0, },
ChunkStorage::ClosedMutableBuffer, ChunkSummary {
partition_key: Arc::from("1970-01-05T15"),
table_name: Arc::from("cpu"),
id: 0,
storage: ChunkStorage::ClosedMutableBuffer,
lifecycle_action, lifecycle_action,
2510, memory_bytes: 2510,
0, // no OS chunks object_store_bytes: 0, // no OS chunks
1, row_count: 1,
), time_of_last_access: None,
ChunkSummary::new_without_timestamps( time_of_first_write: Utc.timestamp_nanos(1),
Arc::from("1970-01-05T15"), time_of_last_write: Utc.timestamp_nanos(1),
Arc::from("cpu"), time_closed: None,
1, },
ChunkStorage::OpenMutableBuffer, ChunkSummary {
partition_key: Arc::from("1970-01-05T15"),
table_name: Arc::from("cpu"),
id: 1,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action, lifecycle_action,
87, memory_bytes: 87,
0, // no OS chunks object_store_bytes: 0, // no OS chunks
1, row_count: 1,
), time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None,
},
]; ];
for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) { for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) {
assert_eq!( assert!(
expected_summary, actual_summary, expected_summary.equal_without_timestamps(&actual_summary),
"\n\nexpected item:\n{:#?}\n\nactual item:\n{:#?}\n\n\ "\n\nexpected item:\n{:#?}\n\nactual item:\n{:#?}\n\n\
all expected:\n{:#?}\n\nall actual:\n{:#?}", all expected:\n{:#?}\n\nall actual:\n{:#?}",
expected_summary, actual_summary, expected, chunk_summaries expected_summary,
actual_summary,
expected,
chunk_summaries
); );
} }

View File

@ -200,12 +200,12 @@ pub struct CatalogChunk {
/// The earliest time at which data contained within this chunk was written /// The earliest time at which data contained within this chunk was written
/// into IOx. Note due to the compaction, etc... this may not be the chunk /// into IOx. Note due to the compaction, etc... this may not be the chunk
/// that data was originally written into /// that data was originally written into
time_of_first_write: Option<DateTime<Utc>>, time_of_first_write: DateTime<Utc>,
/// The latest time at which data contained within this chunk was written /// The latest time at which data contained within this chunk was written
/// into IOx. Note due to the compaction, etc... this may not be the chunk /// into IOx. Note due to the compaction, etc... this may not be the chunk
/// that data was originally written into /// that data was originally written into
time_of_last_write: Option<DateTime<Utc>>, time_of_last_write: DateTime<Utc>,
/// Time at which this chunk was marked as closed. Note this is /// Time at which this chunk was marked as closed. Note this is
/// not the same as the timestamps on the data itself /// not the same as the timestamps on the data itself
@ -269,8 +269,9 @@ impl CatalogChunk {
) -> Self { ) -> Self {
assert_eq!(chunk.table_name(), &addr.table_name); assert_eq!(chunk.table_name(), &addr.table_name);
let first_write = chunk.table_summary().time_of_first_write; let summary = chunk.table_summary();
let last_write = chunk.table_summary().time_of_last_write; let time_of_first_write = summary.time_of_first_write;
let time_of_last_write = summary.time_of_last_write;
let stage = ChunkStage::Open { mb_chunk: chunk }; let stage = ChunkStage::Open { mb_chunk: chunk };
@ -284,8 +285,8 @@ impl CatalogChunk {
lifecycle_action: None, lifecycle_action: None,
metrics, metrics,
access_recorder: Default::default(), access_recorder: Default::default(),
time_of_first_write: Some(first_write), time_of_first_write,
time_of_last_write: Some(last_write), time_of_last_write,
time_closed: None, time_closed: None,
}; };
chunk.update_metrics(); chunk.update_metrics();
@ -302,8 +303,8 @@ impl CatalogChunk {
metrics: ChunkMetrics, metrics: ChunkMetrics,
) -> Self { ) -> Self {
let summary = chunk.table_summary(); let summary = chunk.table_summary();
let first_write = summary.time_of_first_write; let time_of_first_write = summary.time_of_first_write;
let last_write = summary.time_of_last_write; let time_of_last_write = summary.time_of_last_write;
let stage = ChunkStage::Frozen { let stage = ChunkStage::Frozen {
meta: Arc::new(ChunkMetadata { meta: Arc::new(ChunkMetadata {
@ -323,8 +324,8 @@ impl CatalogChunk {
lifecycle_action: None, lifecycle_action: None,
metrics, metrics,
access_recorder: Default::default(), access_recorder: Default::default(),
time_of_first_write: Some(first_write), time_of_first_write,
time_of_last_write: Some(last_write), time_of_last_write,
time_closed: None, time_closed: None,
}; };
chunk.update_metrics(); chunk.update_metrics();
@ -341,8 +342,8 @@ impl CatalogChunk {
assert_eq!(chunk.table_name(), addr.table_name.as_ref()); assert_eq!(chunk.table_name(), addr.table_name.as_ref());
let summary = chunk.table_summary(); let summary = chunk.table_summary();
let first_write = summary.time_of_first_write; let time_of_first_write = summary.time_of_first_write;
let last_write = summary.time_of_last_write; let time_of_last_write = summary.time_of_last_write;
// this is temporary // this is temporary
let table_summary = TableSummary { let table_summary = TableSummary {
@ -368,8 +369,8 @@ impl CatalogChunk {
lifecycle_action: None, lifecycle_action: None,
metrics, metrics,
access_recorder: Default::default(), access_recorder: Default::default(),
time_of_first_write: Some(first_write), time_of_first_write,
time_of_last_write: Some(last_write), time_of_last_write,
time_closed: None, time_closed: None,
}; };
chunk.update_metrics(); chunk.update_metrics();
@ -411,11 +412,11 @@ impl CatalogChunk {
.map_or(false, |action| action.metadata() == &lifecycle_action) .map_or(false, |action| action.metadata() == &lifecycle_action)
} }
pub fn time_of_first_write(&self) -> Option<DateTime<Utc>> { pub fn time_of_first_write(&self) -> DateTime<Utc> {
self.time_of_first_write self.time_of_first_write
} }
pub fn time_of_last_write(&self) -> Option<DateTime<Utc>> { pub fn time_of_last_write(&self) -> DateTime<Utc> {
self.time_of_last_write self.time_of_last_write
} }
@ -478,18 +479,10 @@ impl CatalogChunk {
self.metrics.timestamp_histogram.add(timestamps); self.metrics.timestamp_histogram.add(timestamps);
self.access_recorder.record_access_now(); self.access_recorder.record_access_now();
if let Some(t) = self.time_of_first_write { self.time_of_first_write = self.time_of_first_write.min(time_of_write);
self.time_of_first_write = Some(t.min(time_of_write))
} else {
self.time_of_first_write = Some(time_of_write)
}
// DateTime<Utc> isn't necessarily monotonic // DateTime<Utc> isn't necessarily monotonic
if let Some(t) = self.time_of_last_write { self.time_of_last_write = self.time_of_last_write.max(time_of_write);
self.time_of_last_write = Some(t.max(time_of_write))
} else {
self.time_of_last_write = Some(time_of_write)
}
self.update_metrics(); self.update_metrics();
} }
@ -984,8 +977,6 @@ mod tests {
let mb_chunk = make_mb_chunk(&addr.table_name); let mb_chunk = make_mb_chunk(&addr.table_name);
let chunk = CatalogChunk::new_open(addr, mb_chunk, ChunkMetrics::new_unregistered()); let chunk = CatalogChunk::new_open(addr, mb_chunk, ChunkMetrics::new_unregistered());
assert!(matches!(chunk.stage(), &ChunkStage::Open { .. })); assert!(matches!(chunk.stage(), &ChunkStage::Open { .. }));
assert!(chunk.time_of_first_write.is_some());
assert!(chunk.time_of_last_write.is_some());
} }
#[tokio::test] #[tokio::test]

View File

@ -612,8 +612,8 @@ mod tests {
let chunk = chunks.into_iter().next().unwrap(); let chunk = chunks.into_iter().next().unwrap();
let chunk = chunk.read(); let chunk = chunk.read();
assert_eq!(chunk.storage().1, ChunkStorage::ObjectStoreOnly); assert_eq!(chunk.storage().1, ChunkStorage::ObjectStoreOnly);
let first_write = chunk.time_of_first_write().unwrap(); let first_write = chunk.time_of_first_write();
let last_write = chunk.time_of_last_write().unwrap(); let last_write = chunk.time_of_last_write();
assert_eq!(first_write, last_write); assert_eq!(first_write, last_write);
assert!(before_creation < first_write); assert!(before_creation < first_write);
assert!(last_write < after_creation); assert!(last_write < after_creation);

View File

@ -310,7 +310,7 @@ impl LifecycleChunk for CatalogChunk {
self.access_recorder().get_metrics() self.access_recorder().get_metrics()
} }
fn time_of_last_write(&self) -> Option<DateTime<Utc>> { fn time_of_last_write(&self) -> DateTime<Utc> {
self.time_of_last_write() self.time_of_last_write()
} }

View File

@ -48,17 +48,15 @@ pub(crate) fn compact_chunks(
input_rows += chunk.table_summary().total_count(); input_rows += chunk.table_summary().total_count();
time_of_first_write = match (time_of_first_write, chunk.time_of_first_write()) { let candidate_first = chunk.time_of_first_write();
(Some(prev_first), Some(candidate_first)) => Some(prev_first.min(candidate_first)), time_of_first_write = time_of_first_write
(Some(only), None) | (None, Some(only)) => Some(only), .map(|prev_first| prev_first.min(candidate_first))
(None, None) => None, .or(Some(candidate_first));
};
time_of_last_write = match (time_of_last_write, chunk.time_of_last_write()) { let candidate_last = chunk.time_of_last_write();
(Some(prev_last), Some(candidate_last)) => Some(prev_last.max(candidate_last)), time_of_last_write = time_of_last_write
(Some(only), None) | (None, Some(only)) => Some(only), .map(|prev_last| prev_last.max(candidate_last))
(None, None) => None, .or(Some(candidate_last));
};
chunk.set_compacting(&registration)?; chunk.set_compacting(&registration)?;
Ok(DbChunk::snapshot(&*chunk)) Ok(DbChunk::snapshot(&*chunk))
@ -179,15 +177,15 @@ mod tests {
chunk_summaries.sort_unstable(); chunk_summaries.sort_unstable();
let mub_summary = &chunk_summaries[0]; let mub_summary = &chunk_summaries[0];
let first_mub_write = mub_summary.time_of_first_write.unwrap(); let first_mub_write = mub_summary.time_of_first_write;
let last_mub_write = mub_summary.time_of_last_write.unwrap(); let last_mub_write = mub_summary.time_of_last_write;
assert!(time2 < first_mub_write); assert!(time2 < first_mub_write);
assert_eq!(first_mub_write, last_mub_write); assert_eq!(first_mub_write, last_mub_write);
assert!(first_mub_write < time3); assert!(first_mub_write < time3);
let rub_summary = &chunk_summaries[1]; let rub_summary = &chunk_summaries[1];
let first_rub_write = rub_summary.time_of_first_write.unwrap(); let first_rub_write = rub_summary.time_of_first_write;
let last_rub_write = rub_summary.time_of_last_write.unwrap(); let last_rub_write = rub_summary.time_of_last_write;
assert!(time0 < first_rub_write); assert!(time0 < first_rub_write);
assert!(first_rub_write < last_rub_write); assert!(first_rub_write < last_rub_write);
assert!(last_rub_write < time1); assert!(last_rub_write < time1);

View File

@ -54,17 +54,15 @@ pub fn persist_chunks(
input_rows += chunk.table_summary().total_count(); input_rows += chunk.table_summary().total_count();
time_of_first_write = match (time_of_first_write, chunk.time_of_first_write()) { let candidate_first = chunk.time_of_first_write();
(Some(prev_first), Some(candidate_first)) => Some(prev_first.min(candidate_first)), time_of_first_write = time_of_first_write
(Some(only), None) | (None, Some(only)) => Some(only), .map(|prev_first| prev_first.min(candidate_first))
(None, None) => None, .or(Some(candidate_first));
};
time_of_last_write = match (time_of_last_write, chunk.time_of_last_write()) { let candidate_last = chunk.time_of_last_write();
(Some(prev_last), Some(candidate_last)) => Some(prev_last.max(candidate_last)), time_of_last_write = time_of_last_write
(Some(only), None) | (None, Some(only)) => Some(only), .map(|prev_last| prev_last.max(candidate_last))
(None, None) => None, .or(Some(candidate_last));
};
chunk.set_writing_to_object_store(&registration)?; chunk.set_writing_to_object_store(&registration)?;
query_chunks.push(DbChunk::snapshot(&*chunk)); query_chunks.push(DbChunk::snapshot(&*chunk));

View File

@ -48,15 +48,19 @@ fn chunk_summaries_schema() -> SchemaRef {
Field::new("object_store_bytes", DataType::UInt64, false), Field::new("object_store_bytes", DataType::UInt64, false),
Field::new("row_count", DataType::UInt64, false), Field::new("row_count", DataType::UInt64, false),
Field::new("time_of_last_access", ts.clone(), true), Field::new("time_of_last_access", ts.clone(), true),
Field::new("time_of_first_write", ts.clone(), true), Field::new("time_of_first_write", ts.clone(), false),
Field::new("time_of_last_write", ts.clone(), true), Field::new("time_of_last_write", ts.clone(), false),
Field::new("time_closed", ts, true), Field::new("time_closed", ts, true),
])) ]))
} }
// TODO: Use a custom proc macro or serde to reduce the boilerplate // TODO: Use a custom proc macro or serde to reduce the boilerplate
fn time_to_ts(time: Option<DateTime<Utc>>) -> Option<i64> { fn optional_time_to_ts(time: Option<DateTime<Utc>>) -> Option<i64> {
time.map(|ts| ts.timestamp_nanos()) time.and_then(time_to_ts)
}
fn time_to_ts(ts: DateTime<Utc>) -> Option<i64> {
Some(ts.timestamp_nanos())
} }
fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<RecordBatch> { fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<RecordBatch> {
@ -92,7 +96,7 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
let time_of_last_access = chunks let time_of_last_access = chunks
.iter() .iter()
.map(|c| c.time_of_last_access) .map(|c| c.time_of_last_access)
.map(time_to_ts) .map(optional_time_to_ts)
.collect::<TimestampNanosecondArray>(); .collect::<TimestampNanosecondArray>();
let time_of_first_write = chunks let time_of_first_write = chunks
.iter() .iter()
@ -107,7 +111,7 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
let time_closed = chunks let time_closed = chunks
.iter() .iter()
.map(|c| c.time_closed) .map(|c| c.time_closed)
.map(time_to_ts) .map(optional_time_to_ts)
.collect::<TimestampNanosecondArray>(); .collect::<TimestampNanosecondArray>();
RecordBatch::try_new( RecordBatch::try_new(
@ -149,8 +153,8 @@ mod tests {
object_store_bytes: 0, object_store_bytes: 0,
row_count: 11, row_count: 11,
time_of_last_access: None, time_of_last_access: None,
time_of_first_write: Some(Utc.timestamp_nanos(10_000_000_000)), time_of_first_write: Utc.timestamp_nanos(10_000_000_000),
time_of_last_write: None, time_of_last_write: Utc.timestamp_nanos(10_000_000_000),
time_closed: None, time_closed: None,
}, },
ChunkSummary { ChunkSummary {
@ -163,8 +167,8 @@ mod tests {
object_store_bytes: 0, object_store_bytes: 0,
row_count: 22, row_count: 22,
time_of_last_access: Some(Utc.timestamp_nanos(754_000_000_000)), time_of_last_access: Some(Utc.timestamp_nanos(754_000_000_000)),
time_of_first_write: None, time_of_first_write: Utc.timestamp_nanos(80_000_000_000),
time_of_last_write: Some(Utc.timestamp_nanos(80_000_000_000)), time_of_last_write: Utc.timestamp_nanos(80_000_000_000),
time_closed: None, time_closed: None,
}, },
ChunkSummary { ChunkSummary {
@ -177,8 +181,8 @@ mod tests {
object_store_bytes: 5678, object_store_bytes: 5678,
row_count: 33, row_count: 33,
time_of_last_access: Some(Utc.timestamp_nanos(5_000_000_000)), time_of_last_access: Some(Utc.timestamp_nanos(5_000_000_000)),
time_of_first_write: Some(Utc.timestamp_nanos(100_000_000_000)), time_of_first_write: Utc.timestamp_nanos(100_000_000_000),
time_of_last_write: Some(Utc.timestamp_nanos(200_000_000_000)), time_of_last_write: Utc.timestamp_nanos(200_000_000_000),
time_closed: None, time_closed: None,
}, },
]; ];
@ -187,8 +191,8 @@ mod tests {
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+", "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+",
"| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_last_access | time_of_first_write | time_of_last_write | time_closed |", "| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_last_access | time_of_first_write | time_of_last_write | time_closed |",
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+", "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+",
"| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | | 1970-01-01T00:00:10Z | | |", "| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | | 1970-01-01T00:00:10Z | 1970-01-01T00:00:10Z | |",
"| 1 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | 1970-01-01T00:12:34Z | | 1970-01-01T00:01:20Z | |", "| 1 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | 1970-01-01T00:12:34Z | 1970-01-01T00:01:20Z | 1970-01-01T00:01:20Z | |",
"| 2 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01T00:00:05Z | 1970-01-01T00:01:40Z | 1970-01-01T00:03:20Z | |", "| 2 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01T00:00:05Z | 1970-01-01T00:01:40Z | 1970-01-01T00:03:20Z | |",
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+", "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+",
]; ];

View File

@ -218,6 +218,7 @@ fn assemble_chunk_columns(
mod tests { mod tests {
use super::*; use super::*;
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use chrono::{TimeZone, Utc};
use data_types::{ use data_types::{
chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary}, chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics},
@ -317,8 +318,8 @@ mod tests {
object_store_bytes: 0, object_store_bytes: 0,
row_count: 11, row_count: 11,
time_of_last_access: None, time_of_last_access: None,
time_of_first_write: None, time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: None, time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None, time_closed: None,
}, },
columns: vec![ columns: vec![
@ -353,8 +354,8 @@ mod tests {
object_store_bytes: 0, object_store_bytes: 0,
row_count: 11, row_count: 11,
time_of_last_access: None, time_of_last_access: None,
time_of_first_write: None, time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: None, time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None, time_closed: None,
}, },
columns: vec![ChunkColumnSummary { columns: vec![ChunkColumnSummary {
@ -383,8 +384,8 @@ mod tests {
object_store_bytes: 0, object_store_bytes: 0,
row_count: 11, row_count: 11,
time_of_last_access: None, time_of_last_access: None,
time_of_first_write: None, time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: None, time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None, time_closed: None,
}, },
columns: vec![ChunkColumnSummary { columns: vec![ChunkColumnSummary {