Merge branch 'main' into crepererum/in_mem_expr_part3

pull/24376/head
kodiakhq[bot] 2021-10-04 16:31:49 +00:00 committed by GitHub
commit 018c5bd1e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 67 additions and 64 deletions

View File

@ -2,7 +2,7 @@
use crate::partition_metadata::PartitionAddr;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::{num::NonZeroU32, sync::Arc};
/// Address of the chunk within the catalog
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
@ -233,17 +233,19 @@ impl std::fmt::Display for ChunkId {
/// 1. **upsert order:** chunks with higher order overwrite data in chunks with lower order
/// 2. **locking order:** chunks must be locked in consistent (ascending) order
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct ChunkOrder(u32);
pub struct ChunkOrder(NonZeroU32);
impl ChunkOrder {
pub const MAX: Self = Self(u32::MAX);
// TODO: remove `unsafe` once https://github.com/rust-lang/rust/issues/51999 is fixed
pub const MIN: Self = Self(unsafe { NonZeroU32::new_unchecked(1) });
pub const MAX: Self = Self(unsafe { NonZeroU32::new_unchecked(u32::MAX) });
pub fn new(order: u32) -> Self {
Self(order)
pub fn new(order: u32) -> Option<Self> {
NonZeroU32::new(order).map(Self)
}
pub fn get(&self) -> u32 {
self.0
self.0.get()
}
/// Get next chunk order.
@ -251,18 +253,15 @@ impl ChunkOrder {
/// # Panic
/// Panics if `self` is already [max](Self::MAX).
pub fn next(&self) -> Self {
Self(self.0.checked_add(1).expect("chunk order overflow"))
Self(
NonZeroU32::new(self.0.get().checked_add(1).expect("chunk order overflow"))
.expect("did not overflow, so cannot be zero"),
)
}
}
impl std::fmt::Display for ChunkOrder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ChunkOrder").field(&self.0).finish()
}
}
impl From<u32> for ChunkOrder {
fn from(order: u32) -> Self {
Self(order)
f.debug_tuple("ChunkOrder").field(&self.0.get()).finish()
}
}

View File

@ -2,7 +2,9 @@ use crate::{
google::{FieldViolation, FromFieldOpt},
influxdata::iox::management::v1 as management,
};
use data_types::chunk_metadata::{ChunkId, ChunkLifecycleAction, ChunkStorage, ChunkSummary};
use data_types::chunk_metadata::{
ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkStorage, ChunkSummary,
};
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
@ -122,7 +124,10 @@ impl TryFrom<management::Chunk> for ChunkSummary {
time_of_first_write: required_timestamp(time_of_first_write, "time_of_first_write")?,
time_of_last_write: required_timestamp(time_of_last_write, "time_of_last_write")?,
time_closed: timestamp(time_closed, "time_closed")?,
order: order.into(),
order: ChunkOrder::new(order).ok_or_else(|| FieldViolation {
field: "order".to_string(),
description: "Order must be non-zero".to_string(),
})?,
})
}
}
@ -204,7 +209,7 @@ mod test {
time_of_last_write: now,
time_closed: None,
time_of_last_access: Some(Utc.timestamp_nanos(50_000_000_007)),
order: ChunkOrder::new(5),
order: ChunkOrder::new(5).unwrap(),
};
assert_eq!(
@ -230,7 +235,7 @@ mod test {
time_of_last_write: now,
time_closed: None,
time_of_last_access: Some(Utc.timestamp_nanos(12_000_100_007)),
order: ChunkOrder::new(5),
order: ChunkOrder::new(5).unwrap(),
};
let proto = management::Chunk::try_from(summary).expect("conversion successful");

View File

@ -754,7 +754,7 @@ mod tests {
time_of_last_write: from_secs(time_of_last_write),
lifecycle_action: None,
storage,
order: ChunkOrder::new(0),
order: ChunkOrder::MIN,
}
}
@ -1517,23 +1517,23 @@ mod tests {
// blocked by action below
TestChunk::new(ChunkId::new(19), 20, ChunkStorage::ReadBuffer)
.with_row_count(400)
.with_order(ChunkOrder::new(4)),
.with_order(ChunkOrder::new(5).unwrap()),
// has an action
TestChunk::new(ChunkId::new(20), 20, ChunkStorage::ReadBuffer)
.with_row_count(400)
.with_order(ChunkOrder::new(3))
.with_order(ChunkOrder::new(4).unwrap())
.with_action(ChunkLifecycleAction::Compacting),
// closed => can compact
TestChunk::new(ChunkId::new(21), 20, ChunkStorage::ReadBuffer)
.with_row_count(400)
.with_order(ChunkOrder::new(2)),
.with_order(ChunkOrder::new(3).unwrap()),
TestChunk::new(ChunkId::new(22), 20, ChunkStorage::ReadBuffer)
.with_row_count(400)
.with_order(ChunkOrder::new(1)),
.with_order(ChunkOrder::new(2).unwrap()),
// has an action, but doesn't block because it's first
TestChunk::new(ChunkId::new(23), 20, ChunkStorage::ReadBuffer)
.with_row_count(400)
.with_order(ChunkOrder::new(0))
.with_order(ChunkOrder::new(1).unwrap())
.with_action(ChunkLifecycleAction::Compacting),
]),
];

View File

@ -39,7 +39,7 @@ pub use crate::catalog::internals::proto_parse::Error as ProtoParseError;
/// Current version for serialized transactions.
///
/// For breaking changes, this will change.
pub const TRANSACTION_VERSION: u32 = 15;
pub const TRANSACTION_VERSION: u32 = 16;
#[derive(Debug, Snafu)]
pub enum Error {

View File

@ -272,7 +272,7 @@ File {
is_checkpoint: false,
proto: Ok(
Transaction {
version: 15,
version: 16,
actions: [],
revision_counter: 0,
uuid: "00000000-0000-0000-0000-000000000000",
@ -297,7 +297,7 @@ File {
is_checkpoint: false,
proto: Ok(
Transaction {
version: 15,
version: 16,
actions: [
Action {
action: Some(
@ -396,7 +396,7 @@ File {
is_checkpoint: false,
proto: Ok(
Transaction {
version: 15,
version: 16,
actions: [],
revision_counter: 0,
uuid: "00000000-0000-0000-0000-000000000000",
@ -421,7 +421,7 @@ File {
is_checkpoint: false,
proto: Ok(
Transaction {
version: 15,
version: 16,
actions: [
Action {
action: Some(

View File

@ -379,7 +379,7 @@ mod tests {
database_checkpoint,
time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(),
chunk_order: ChunkOrder::new(5),
chunk_order: ChunkOrder::new(5).unwrap(),
};
let stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches));
let (path, file_size_bytes, metadata) = storage

View File

@ -121,7 +121,7 @@ use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputPro
///
/// **Important: When changing this structure, consider bumping the
/// [catalog transaction version](crate::catalog::core::TRANSACTION_VERSION)!**
pub const METADATA_VERSION: u32 = 7;
pub const METADATA_VERSION: u32 = 8;
/// File-level metadata key to store the IOx-specific data.
///
@ -375,7 +375,11 @@ impl IoxMetadata {
chunk_id: ChunkId::new(proto_msg.chunk_id),
partition_checkpoint,
database_checkpoint,
chunk_order: proto_msg.chunk_order.into(),
chunk_order: ChunkOrder::new(proto_msg.chunk_order).ok_or_else(|| {
Error::IoxMetadataFieldMissing {
field: "chunk_order".to_string(),
}
})?,
})
}
@ -1074,7 +1078,7 @@ mod tests {
database_checkpoint,
time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(),
chunk_order: ChunkOrder::new(5),
chunk_order: ChunkOrder::new(5).unwrap(),
};
let proto_bytes = metadata.to_protobuf().unwrap();

View File

@ -450,7 +450,7 @@ mod tests {
database_checkpoint,
time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(),
chunk_order: ChunkOrder::new(5),
chunk_order: ChunkOrder::new(5).unwrap(),
};
// create parquet file
@ -525,7 +525,7 @@ mod tests {
database_checkpoint,
time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(),
chunk_order: ChunkOrder::new(5),
chunk_order: ChunkOrder::new(5).unwrap(),
};
let (path, _file_size_bytes, _metadata) = storage

View File

@ -189,7 +189,7 @@ pub async fn make_chunk_given_record_batch(
database_checkpoint,
time_of_first_write: Utc.timestamp(30, 40),
time_of_last_write: Utc.timestamp(50, 60),
chunk_order: ChunkOrder::new(5),
chunk_order: ChunkOrder::new(5).unwrap(),
};
let (path, file_size_bytes, parquet_metadata) = storage
.write_to_object_store(addr.clone(), stream, metadata)

View File

@ -254,7 +254,7 @@ impl TestChunk {
saved_error: Default::default(),
predicate_match: Default::default(),
delete_predicates: Default::default(),
order: ChunkOrder::new(0),
order: ChunkOrder::MIN,
}
}

View File

@ -911,14 +911,9 @@ impl Db {
/// `Instant::now()` that is used by the background worker. Can be mocked for testing.
fn background_worker_now(&self) -> Instant {
let mut guard = self.background_worker_now_override.lock();
match *guard {
Some(now) => {
*guard = Some(now + Duration::from_nanos(1));
now
}
None => Instant::now(),
}
self.background_worker_now_override
.lock()
.unwrap_or_else(Instant::now)
}
async fn cleanup_unreferenced_parquet_files(
@ -1773,7 +1768,7 @@ mod tests {
.id();
// A chunk is now in the object store and still in read buffer
let expected_parquet_size = 1243;
let expected_parquet_size = 1245;
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", expected_read_buffer_size);
// now also in OS
catalog_chunk_size_bytes_metric_eq(registry, "object_store", expected_parquet_size);
@ -2212,7 +2207,7 @@ mod tests {
// Read buffer + Parquet chunk size
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0);
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1241);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1243);
// All the chunks should have different IDs
assert_ne!(mb_chunk.id(), rb_chunk.id());
@ -2327,7 +2322,7 @@ mod tests {
// Read buffer + Parquet chunk size
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0);
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1241);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1243);
// Unload RB chunk but keep it in OS
let pq_chunk = db
@ -2348,7 +2343,7 @@ mod tests {
// Parquet chunk size only
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0);
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 0);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1241);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1243);
// Verify data written to the parquet file in object store
//
@ -2594,7 +2589,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None,
order: ChunkOrder::new(5),
order: ChunkOrder::new(5).unwrap(),
}];
let size: usize = db
@ -2829,7 +2824,7 @@ mod tests {
storage: ChunkStorage::ReadBufferAndObjectStore,
lifecycle_action,
memory_bytes: 4085, // size of RB and OS chunks
object_store_bytes: 1533, // size of parquet file
object_store_bytes: 1537, // size of parquet file
row_count: 2,
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),

View File

@ -1162,7 +1162,7 @@ mod tests {
mb_chunk,
time_of_write,
ChunkMetrics::new_unregistered(),
ChunkOrder::new(5),
ChunkOrder::new(5).unwrap(),
)
}
@ -1180,7 +1180,7 @@ mod tests {
now,
ChunkMetrics::new_unregistered(),
vec![],
ChunkOrder::new(6),
ChunkOrder::new(6).unwrap(),
)
}
}

View File

@ -154,7 +154,7 @@ impl Partition {
next_chunk_id: ChunkId::new(0),
metrics: Arc::new(metrics),
persistence_windows: None,
next_chunk_order: ChunkOrder::new(0),
next_chunk_order: ChunkOrder::MIN,
}
}

View File

@ -165,7 +165,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(10_000_000_000),
time_of_last_write: Utc.timestamp_nanos(10_000_000_000),
time_closed: None,
order: ChunkOrder::new(5),
order: ChunkOrder::new(5).unwrap(),
},
ChunkSummary {
partition_key: Arc::from("p1"),
@ -180,7 +180,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(80_000_000_000),
time_of_last_write: Utc.timestamp_nanos(80_000_000_000),
time_closed: None,
order: ChunkOrder::new(6),
order: ChunkOrder::new(6).unwrap(),
},
ChunkSummary {
partition_key: Arc::from("p1"),
@ -195,7 +195,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(100_000_000_000),
time_of_last_write: Utc.timestamp_nanos(200_000_000_000),
time_closed: None,
order: ChunkOrder::new(7),
order: ChunkOrder::new(7).unwrap(),
},
];

View File

@ -321,7 +321,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None,
order: ChunkOrder::new(5),
order: ChunkOrder::new(5).unwrap(),
},
columns: vec![
ChunkColumnSummary {
@ -358,7 +358,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None,
order: ChunkOrder::new(6),
order: ChunkOrder::new(6).unwrap(),
},
columns: vec![ChunkColumnSummary {
name: "c1".into(),
@ -389,7 +389,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None,
order: ChunkOrder::new(5),
order: ChunkOrder::new(5).unwrap(),
},
columns: vec![ChunkColumnSummary {
name: "c3".into(),

View File

@ -535,7 +535,7 @@ async fn test_chunk_get() {
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
order: 0,
order: 1,
},
Chunk {
partition_key: "disk".into(),
@ -550,7 +550,7 @@ async fn test_chunk_get() {
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
order: 0,
order: 1,
},
];
assert_eq!(
@ -722,7 +722,7 @@ async fn test_list_partition_chunks() {
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
order: 0,
order: 1,
}];
assert_eq!(

View File

@ -709,7 +709,7 @@ async fn test_list_partition_chunks() {
let expected = r#"
"partition_key": "cpu",
"table_name": "cpu",
"order": 0,
"order": 1,
"id": 0,
"storage": "OpenMutableBuffer",
"#;