Merge branch 'main' into ntran/refactor_use_sort_key

pull/24376/head
kodiakhq[bot] 2021-07-15 21:17:26 +00:00 committed by GitHub
commit 50aa1f857d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 905 additions and 272 deletions

8
Cargo.lock generated
View File

@ -841,7 +841,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion" name = "datafusion"
version = "4.0.0-SNAPSHOT" version = "4.0.0-SNAPSHOT"
source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/perf_integration_df#9ef4a257cf8b7717df60de56b9e17c6bd7286cd4" source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/perf_integration_df_2#d201ebf323a532ac858fe33083639df4a8d321ee"
dependencies = [ dependencies = [
"ahash 0.7.4", "ahash 0.7.4",
"arrow", "arrow",
@ -1677,6 +1677,7 @@ dependencies = [
"tracker", "tracker",
"trogging", "trogging",
"uuid", "uuid",
"write_buffer",
] ]
[[package]] [[package]]
@ -4955,9 +4956,14 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"data_types", "data_types",
"dotenv",
"entry", "entry",
"futures", "futures",
"observability_deps",
"parking_lot",
"rdkafka", "rdkafka",
"tokio",
"uuid",
] ]
[[package]] [[package]]

View File

@ -122,6 +122,7 @@ influxdb_iox_client = { path = "influxdb_iox_client", features = ["flight"] }
test_helpers = { path = "test_helpers" } test_helpers = { path = "test_helpers" }
synchronized-writer = "1" synchronized-writer = "1"
parking_lot = "0.11.1" parking_lot = "0.11.1"
write_buffer = { path = "write_buffer" }
# Crates.io dependencies, in alphabetical order # Crates.io dependencies, in alphabetical order
assert_cmd = "1.0.0" assert_cmd = "1.0.0"

View File

@ -126,8 +126,11 @@ pub struct ChunkSummary {
/// Is there any outstanding lifecycle action for this chunk? /// Is there any outstanding lifecycle action for this chunk?
pub lifecycle_action: Option<ChunkLifecycleAction>, pub lifecycle_action: Option<ChunkLifecycleAction>,
/// The total estimated size of this chunk, in bytes /// The number of bytes used to store this chunk in memory
pub estimated_bytes: usize, pub memory_bytes: usize,
/// The number of bytes used to store this chunk in object storage
pub object_store_bytes: usize,
/// The total number of rows in this chunk /// The total number of rows in this chunk
pub row_count: usize, pub row_count: usize,
@ -153,7 +156,7 @@ pub struct ChunkColumnSummary {
pub name: Arc<str>, pub name: Arc<str>,
/// Estimated size, in bytes, consumed by this column. /// Estimated size, in bytes, consumed by this column.
pub estimated_bytes: usize, pub memory_bytes: usize,
} }
/// Contains additional per-column details about physical storage of a chunk /// Contains additional per-column details about physical storage of a chunk
@ -168,13 +171,15 @@ pub struct DetailedChunkSummary {
impl ChunkSummary { impl ChunkSummary {
/// Construct a ChunkSummary that has None for all timestamps /// Construct a ChunkSummary that has None for all timestamps
#[allow(clippy::too_many_arguments)]
pub fn new_without_timestamps( pub fn new_without_timestamps(
partition_key: Arc<str>, partition_key: Arc<str>,
table_name: Arc<str>, table_name: Arc<str>,
id: u32, id: u32,
storage: ChunkStorage, storage: ChunkStorage,
lifecycle_action: Option<ChunkLifecycleAction>, lifecycle_action: Option<ChunkLifecycleAction>,
estimated_bytes: usize, memory_bytes: usize,
object_store_bytes: usize,
row_count: usize, row_count: usize,
) -> Self { ) -> Self {
Self { Self {
@ -183,11 +188,47 @@ impl ChunkSummary {
id, id,
storage, storage,
lifecycle_action, lifecycle_action,
estimated_bytes, memory_bytes,
object_store_bytes,
row_count, row_count,
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
time_closed: None, time_closed: None,
} }
} }
/// Return a new ChunkSummary with None for all timestamps
pub fn normalize(self) -> Self {
let ChunkSummary {
partition_key,
table_name,
id,
storage,
lifecycle_action,
memory_bytes,
object_store_bytes,
row_count,
..
} = self;
Self::new_without_timestamps(
partition_key,
table_name,
id,
storage,
lifecycle_action,
memory_bytes,
object_store_bytes,
row_count,
)
}
/// Normalizes a set of ChunkSummaries for comparison by removing timestamps
pub fn normalize_summaries(summaries: Vec<Self>) -> Vec<Self> {
let mut summaries = summaries
.into_iter()
.map(|summary| summary.normalize())
.collect::<Vec<_>>();
summaries.sort_unstable();
summaries
}
} }

View File

@ -9,4 +9,4 @@ description = "Re-exports datafusion at a specific version"
# Rename to workaround doctest bug # Rename to workaround doctest bug
# Turn off optional datafusion features (function packages) # Turn off optional datafusion features (function packages)
upstream = { git = "https://github.com/alamb/arrow-datafusion.git", branch = "alamb/perf_integration_df", default-features = false, package = "datafusion" } upstream = { git = "https://github.com/alamb/arrow-datafusion.git", branch = "alamb/perf_integration_df_2", default-features = false, package = "datafusion" }

View File

@ -21,4 +21,4 @@ ENV TEST_INTEGRATION=1
ENV KAFKA_CONNECT=kafka:9092 ENV KAFKA_CONNECT=kafka:9092
# Run the integration tests that connect to Kafka that will be running in another container # Run the integration tests that connect to Kafka that will be running in another container
CMD ["sh", "-c", "cargo test -p influxdb_iox --test end_to_end write_buffer -- --nocapture"] CMD ["sh", "-c", "./docker/integration_test.sh"]

6
docker/integration_test.sh Executable file
View File

@ -0,0 +1,6 @@
#!/bin/bash
set -euxo pipefail
cargo test -p write_buffer kafka -- --nocapture
cargo test -p influxdb_iox --test end_to_end write_buffer -- --nocapture

View File

@ -86,7 +86,7 @@ OBSERVER;
Example SQL to show the total estimated storage size by database: Example SQL to show the total estimated storage size by database:
SELECT database_name, storage, count(*) as num_chunks, SELECT database_name, storage, count(*) as num_chunks,
sum(estimated_bytes)/1024/1024 as estimated_mb sum(memory_bytes)/1024/1024 as estimated_mb
FROM chunks FROM chunks
GROUP BY database_name, storage GROUP BY database_name, storage
ORDER BY estimated_mb desc; ORDER BY estimated_mb desc;
@ -164,7 +164,7 @@ Here are some interesting reports you can run when in `OBSERVER` mode:
```sql ```sql
SELECT SELECT
database_name, count(*) as num_chunks, database_name, count(*) as num_chunks,
sum(estimated_bytes)/1024/1024 as estimated_mb sum(memory_bytes)/1024/1024 as estimated_mb
FROM chunks FROM chunks
GROUP BY database_name GROUP BY database_name
ORDER BY estimated_mb desc ORDER BY estimated_mb desc
@ -175,7 +175,7 @@ LIMIT 20;
```sql ```sql
SELECT SELECT
database_name, storage, count(*) as num_chunks, database_name, storage, count(*) as num_chunks,
sum(estimated_bytes)/1024/1024 as estimated_mb sum(memory_bytes)/1024/1024 as estimated_mb
FROM chunks FROM chunks
GROUP BY database_name, storage GROUP BY database_name, storage
ORDER BY estimated_mb desc ORDER BY estimated_mb desc
@ -187,7 +187,7 @@ LIMIT 20;
```sql ```sql
SELECT SELECT
database_name, table_name, storage, count(*) as num_chunks, database_name, table_name, storage, count(*) as num_chunks,
sum(estimated_bytes)/1024/1024 as estimated_mb sum(memory_bytes)/1024/1024 as estimated_mb
FROM chunks FROM chunks
GROUP BY database_name, table_name, storage GROUP BY database_name, table_name, storage
ORDER BY estimated_mb desc ORDER BY estimated_mb desc

View File

@ -1716,7 +1716,7 @@ pub enum SequencedEntryError {
}, },
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct SequencedEntry { pub struct SequencedEntry {
entry: Entry, entry: Entry,
/// The (optional) sequence for this entry. At the time of /// The (optional) sequence for this entry. At the time of
@ -1775,6 +1775,10 @@ impl SequencedEntry {
pub fn sequence(&self) -> Option<&Sequence> { pub fn sequence(&self) -> Option<&Sequence> {
self.sequence.as_ref() self.sequence.as_ref()
} }
pub fn entry(&self) -> &Entry {
&self.entry
}
} }
pub mod test_helpers { pub mod test_helpers {

View File

@ -62,8 +62,11 @@ message Chunk {
// Is there any outstanding lifecycle action for this chunk? // Is there any outstanding lifecycle action for this chunk?
ChunkLifecycleAction lifecycle_action = 10; ChunkLifecycleAction lifecycle_action = 10;
// The total estimated size of this chunk, in bytes // The number of bytes used to store this chunk in memory
uint64 estimated_bytes = 4; uint64 memory_bytes = 4;
// The number of bytes used to store this chunk in object storage
uint64 object_store_bytes = 11;
// The number of rows in this chunk // The number of rows in this chunk
uint64 row_count = 9; uint64 row_count = 9;

View File

@ -13,7 +13,8 @@ impl From<ChunkSummary> for management::Chunk {
id, id,
storage, storage,
lifecycle_action, lifecycle_action,
estimated_bytes, memory_bytes,
object_store_bytes,
row_count, row_count,
time_of_first_write, time_of_first_write,
time_of_last_write, time_of_last_write,
@ -25,7 +26,8 @@ impl From<ChunkSummary> for management::Chunk {
let lifecycle_action: management::ChunkLifecycleAction = lifecycle_action.into(); let lifecycle_action: management::ChunkLifecycleAction = lifecycle_action.into();
let lifecycle_action = lifecycle_action.into(); // convert to i32 let lifecycle_action = lifecycle_action.into(); // convert to i32
let estimated_bytes = estimated_bytes as u64; let memory_bytes = memory_bytes as u64;
let object_store_bytes = object_store_bytes as u64;
let row_count = row_count as u64; let row_count = row_count as u64;
let partition_key = partition_key.to_string(); let partition_key = partition_key.to_string();
@ -41,7 +43,8 @@ impl From<ChunkSummary> for management::Chunk {
id, id,
storage, storage,
lifecycle_action, lifecycle_action,
estimated_bytes, memory_bytes,
object_store_bytes,
row_count, row_count,
time_of_first_write, time_of_first_write,
time_of_last_write, time_of_last_write,
@ -114,12 +117,14 @@ impl TryFrom<management::Chunk> for ChunkSummary {
partition_key, partition_key,
table_name, table_name,
id, id,
estimated_bytes, memory_bytes,
object_store_bytes,
row_count, row_count,
.. ..
} = proto; } = proto;
let estimated_bytes = estimated_bytes as usize; let memory_bytes = memory_bytes as usize;
let object_store_bytes = object_store_bytes as usize;
let row_count = row_count as usize; let row_count = row_count as usize;
let partition_key = Arc::from(partition_key.as_str()); let partition_key = Arc::from(partition_key.as_str());
let table_name = Arc::from(table_name.as_str()); let table_name = Arc::from(table_name.as_str());
@ -130,7 +135,8 @@ impl TryFrom<management::Chunk> for ChunkSummary {
id, id,
storage, storage,
lifecycle_action, lifecycle_action,
estimated_bytes, memory_bytes,
object_store_bytes,
row_count, row_count,
time_of_first_write, time_of_first_write,
time_of_last_write, time_of_last_write,
@ -184,7 +190,8 @@ mod test {
partition_key: "foo".to_string(), partition_key: "foo".to_string(),
table_name: "bar".to_string(), table_name: "bar".to_string(),
id: 42, id: 42,
estimated_bytes: 1234, memory_bytes: 1234,
object_store_bytes: 567,
row_count: 321, row_count: 321,
storage: management::ChunkStorage::ObjectStoreOnly.into(), storage: management::ChunkStorage::ObjectStoreOnly.into(),
lifecycle_action: management::ChunkLifecycleAction::Moving.into(), lifecycle_action: management::ChunkLifecycleAction::Moving.into(),
@ -198,7 +205,8 @@ mod test {
partition_key: Arc::from("foo"), partition_key: Arc::from("foo"),
table_name: Arc::from("bar"), table_name: Arc::from("bar"),
id: 42, id: 42,
estimated_bytes: 1234, memory_bytes: 1234,
object_store_bytes: 567,
row_count: 321, row_count: 321,
storage: ChunkStorage::ObjectStoreOnly, storage: ChunkStorage::ObjectStoreOnly,
lifecycle_action: Some(ChunkLifecycleAction::Moving), lifecycle_action: Some(ChunkLifecycleAction::Moving),
@ -220,7 +228,8 @@ mod test {
partition_key: Arc::from("foo"), partition_key: Arc::from("foo"),
table_name: Arc::from("bar"), table_name: Arc::from("bar"),
id: 42, id: 42,
estimated_bytes: 1234, memory_bytes: 1234,
object_store_bytes: 567,
row_count: 321, row_count: 321,
storage: ChunkStorage::ObjectStoreOnly, storage: ChunkStorage::ObjectStoreOnly,
lifecycle_action: Some(ChunkLifecycleAction::Persisting), lifecycle_action: Some(ChunkLifecycleAction::Persisting),
@ -235,7 +244,8 @@ mod test {
partition_key: "foo".to_string(), partition_key: "foo".to_string(),
table_name: "bar".to_string(), table_name: "bar".to_string(),
id: 42, id: 42,
estimated_bytes: 1234, memory_bytes: 1234,
object_store_bytes: 567,
row_count: 321, row_count: 321,
storage: management::ChunkStorage::ObjectStoreOnly.into(), storage: management::ChunkStorage::ObjectStoreOnly.into(),
lifecycle_action: management::ChunkLifecycleAction::Persisting.into(), lifecycle_action: management::ChunkLifecycleAction::Persisting.into(),

View File

@ -265,16 +265,16 @@ async fn sql_select_from_system_chunks() {
// test timestamps, etc) // test timestamps, etc)
let expected = vec![ let expected = vec![
"+----+---------------+------------+-------------------+-----------------+-----------+", "+----+---------------+------------+-------------------+--------------+-----------+",
"| id | partition_key | table_name | storage | estimated_bytes | row_count |", "| id | partition_key | table_name | storage | memory_bytes | row_count |",
"+----+---------------+------------+-------------------+-----------------+-----------+", "+----+---------------+------------+-------------------+--------------+-----------+",
"| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 213 | 3 |", "| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 213 | 3 |",
"| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 177 | 2 |", "| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 177 | 2 |",
"+----+---------------+------------+-------------------+-----------------+-----------+", "+----+---------------+------------+-------------------+--------------+-----------+",
]; ];
run_sql_test_case!( run_sql_test_case!(
TwoMeasurementsManyFieldsOneChunk {}, TwoMeasurementsManyFieldsOneChunk {},
"SELECT id, partition_key, table_name, storage, estimated_bytes, row_count from system.chunks", "SELECT id, partition_key, table_name, storage, memory_bytes, row_count from system.chunks",
&expected &expected
); );
} }
@ -316,9 +316,9 @@ async fn sql_select_from_system_chunk_columns() {
// with different chunk configurations. // with different chunk configurations.
let expected = vec![ let expected = vec![
"+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+",
"| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | estimated_bytes |", "| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | memory_bytes |",
"+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+",
"| 1970-01-01T00 | 0 | h2o | city | ReadBuffer | 2 | Boston | Boston | 252 |", "| 1970-01-01T00 | 0 | h2o | city | ReadBuffer | 2 | Boston | Boston | 252 |",
"| 1970-01-01T00 | 0 | h2o | other_temp | ReadBuffer | 1 | 70.4 | 70.4 | 425 |", "| 1970-01-01T00 | 0 | h2o | other_temp | ReadBuffer | 1 | 70.4 | 70.4 | 425 |",
"| 1970-01-01T00 | 0 | h2o | state | ReadBuffer | 2 | MA | MA | 240 |", "| 1970-01-01T00 | 0 | h2o | state | ReadBuffer | 2 | MA | MA | 240 |",
@ -333,7 +333,7 @@ async fn sql_select_from_system_chunk_columns() {
"| 1970-01-01T00 | 1 | h2o | other_temp | OpenMutableBuffer | 1 | 72.4 | 72.4 | 17 |", "| 1970-01-01T00 | 1 | h2o | other_temp | OpenMutableBuffer | 1 | 72.4 | 72.4 | 17 |",
"| 1970-01-01T00 | 1 | h2o | state | OpenMutableBuffer | 1 | CA | CA | 27 |", "| 1970-01-01T00 | 1 | h2o | state | OpenMutableBuffer | 1 | CA | CA | 27 |",
"| 1970-01-01T00 | 1 | h2o | time | OpenMutableBuffer | 1 | 350 | 350 | 17 |", "| 1970-01-01T00 | 1 | h2o | time | OpenMutableBuffer | 1 | 350 | 350 | 17 |",
"+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+",
]; ];
run_sql_test_case!( run_sql_test_case!(
TwoMeasurementsManyFieldsTwoChunks {}, TwoMeasurementsManyFieldsTwoChunks {},

View File

@ -161,16 +161,16 @@ impl Table {
.iter() .iter()
.flat_map(|rg| rg.column_sizes()) .flat_map(|rg| rg.column_sizes())
// combine statistics for columns across row groups // combine statistics for columns across row groups
.fold(BTreeMap::new(), |mut map, (name, estimated_bytes)| { .fold(BTreeMap::new(), |mut map, (name, memory_bytes)| {
let entry = map.entry(name).or_insert(0); let entry = map.entry(name).or_insert(0);
*entry += estimated_bytes; *entry += memory_bytes;
map map
}) })
// Now turn into Vec<ChunkColumnSummary> // Now turn into Vec<ChunkColumnSummary>
.into_iter() .into_iter()
.map(|(name, estimated_bytes)| ChunkColumnSummary { .map(|(name, memory_bytes)| ChunkColumnSummary {
name: name.into(), name: name.into(),
estimated_bytes, memory_bytes,
}) })
.collect() .collect()
} }
@ -1116,11 +1116,11 @@ mod test {
let expected = vec![ let expected = vec![
ChunkColumnSummary { ChunkColumnSummary {
name: "count".into(), name: "count".into(),
estimated_bytes: 110, memory_bytes: 110,
}, },
ChunkColumnSummary { ChunkColumnSummary {
name: "time".into(), name: "time".into(),
estimated_bytes: 107, memory_bytes: 107,
}, },
]; ];
assert_eq!(table.column_sizes(), expected); assert_eq!(table.column_sizes(), expected);

View File

@ -651,14 +651,17 @@ impl Db {
let rules = self.rules.read(); let rules = self.rules.read();
rules.lifecycle_rules.immutable rules.lifecycle_rules.immutable
}; };
debug!(%immutable, has_write_buffer=self.write_buffer.is_some(), "storing entry");
match (self.write_buffer.as_ref(), immutable) { match (self.write_buffer.as_ref(), immutable) {
(Some(WriteBufferConfig::Writing(write_buffer)), true) => { (Some(WriteBufferConfig::Writing(write_buffer)), true) => {
// If only the write buffer is configured, this is passing the data through to // If only the write buffer is configured, this is passing the data through to
// the write buffer, and it's not an error. We ignore the returned metadata; it // the write buffer, and it's not an error. We ignore the returned metadata; it
// will get picked up when data is read from the write buffer. // will get picked up when data is read from the write buffer.
// TODO: be smarter than always using sequencer 0
let _ = write_buffer let _ = write_buffer
.store_entry(&entry) .store_entry(&entry, 0)
.await .await
.context(WriteBufferWritingError)?; .context(WriteBufferWritingError)?;
Ok(()) Ok(())
@ -666,8 +669,10 @@ impl Db {
(Some(WriteBufferConfig::Writing(write_buffer)), false) => { (Some(WriteBufferConfig::Writing(write_buffer)), false) => {
// If using both write buffer and mutable buffer, we want to wait for the write // If using both write buffer and mutable buffer, we want to wait for the write
// buffer to return success before adding the entry to the mutable buffer. // buffer to return success before adding the entry to the mutable buffer.
// TODO: be smarter than always using sequencer 0
let sequence = write_buffer let sequence = write_buffer
.store_entry(&entry) .store_entry(&entry, 0)
.await .await
.context(WriteBufferWritingError)?; .context(WriteBufferWritingError)?;
let sequenced_entry = Arc::new( let sequenced_entry = Arc::new(
@ -1020,6 +1025,7 @@ mod tests {
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use write_buffer::mock::{ use write_buffer::mock::{
MockBufferForReading, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, MockBufferForReading, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors,
MockBufferSharedState,
}; };
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>; type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
@ -1042,7 +1048,8 @@ mod tests {
async fn write_with_write_buffer_no_mutable_buffer() { async fn write_with_write_buffer_no_mutable_buffer() {
// Writes should be forwarded to the write buffer and *not* rejected if the write buffer is // Writes should be forwarded to the write buffer and *not* rejected if the write buffer is
// configured and the mutable buffer isn't // configured and the mutable buffer isn't
let write_buffer = Arc::new(MockBufferForWriting::default()); let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1);
let write_buffer = Arc::new(MockBufferForWriting::new(write_buffer_state.clone()));
let test_db = TestDb::builder() let test_db = TestDb::builder()
.write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _)) .write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _))
.build() .build()
@ -1054,14 +1061,15 @@ mod tests {
let entry = lp_to_entry("cpu bar=1 10"); let entry = lp_to_entry("cpu bar=1 10");
test_db.store_entry(entry).await.unwrap(); test_db.store_entry(entry).await.unwrap();
assert_eq!(write_buffer.entries.lock().unwrap().len(), 1); assert_eq!(write_buffer_state.get_messages(0).len(), 1);
} }
#[tokio::test] #[tokio::test]
async fn write_to_write_buffer_and_mutable_buffer() { async fn write_to_write_buffer_and_mutable_buffer() {
// Writes should be forwarded to the write buffer *and* the mutable buffer if both are // Writes should be forwarded to the write buffer *and* the mutable buffer if both are
// configured. // configured.
let write_buffer = Arc::new(MockBufferForWriting::default()); let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1);
let write_buffer = Arc::new(MockBufferForWriting::new(write_buffer_state.clone()));
let db = TestDb::builder() let db = TestDb::builder()
.write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _)) .write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _))
.build() .build()
@ -1071,7 +1079,7 @@ mod tests {
let entry = lp_to_entry("cpu bar=1 10"); let entry = lp_to_entry("cpu bar=1 10");
db.store_entry(entry).await.unwrap(); db.store_entry(entry).await.unwrap();
assert_eq!(write_buffer.entries.lock().unwrap().len(), 1); assert_eq!(write_buffer_state.get_messages(0).len(), 1);
let batches = run_query(db, "select * from cpu").await; let batches = run_query(db, "select * from cpu").await;
@ -1109,9 +1117,10 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn read_from_write_buffer_write_to_mutable_buffer() { async fn read_from_write_buffer_write_to_mutable_buffer() {
let entry = lp_to_entry("cpu bar=1 10"); let entry = lp_to_entry("cpu bar=1 10");
let write_buffer = Arc::new(MockBufferForReading::new(vec![Ok( let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1);
SequencedEntry::new_unsequenced(entry), write_buffer_state
)])); .push_entry(SequencedEntry::new_from_sequence(Sequence::new(0, 0), entry).unwrap());
let write_buffer = Arc::new(MockBufferForReading::new(write_buffer_state));
let db = TestDb::builder() let db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _)) .write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _))
@ -1165,10 +1174,12 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() { async fn error_converting_data_from_write_buffer_to_sequenced_entry_is_reported() {
let write_buffer = Arc::new(MockBufferForReading::new(vec![Err(String::from( let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1);
"Something bad happened on the way to creating a SequencedEntry", write_buffer_state.push_error(
) String::from("Something bad happened on the way to creating a SequencedEntry").into(),
.into())])); 0,
);
let write_buffer = Arc::new(MockBufferForReading::new(write_buffer_state));
let test_db = TestDb::builder() let test_db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _)) .write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _))
@ -2070,7 +2081,8 @@ mod tests {
async fn write_updates_persistence_windows() { async fn write_updates_persistence_windows() {
// Writes should update the persistence windows when there // Writes should update the persistence windows when there
// is a write buffer configured. // is a write buffer configured.
let write_buffer = Arc::new(MockBufferForWriting::default()); let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1);
let write_buffer = Arc::new(MockBufferForWriting::new(write_buffer_state));
let db = TestDb::builder() let db = TestDb::builder()
.write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _)) .write_buffer(WriteBufferConfig::Writing(Arc::clone(&write_buffer) as _))
.build() .build()
@ -2117,12 +2129,19 @@ mod tests {
let entry = lp_to_entry("cpu bar=1 10"); let entry = lp_to_entry("cpu bar=1 10");
let partition_key = "1970-01-01T00"; let partition_key = "1970-01-01T00";
let write_buffer = Arc::new(MockBufferForReading::new(vec![ let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(2);
Ok(SequencedEntry::new_from_sequence(Sequence::new(0, 0), entry.clone()).unwrap()), write_buffer_state.push_entry(
Ok(SequencedEntry::new_from_sequence(Sequence::new(1, 0), entry.clone()).unwrap()), SequencedEntry::new_from_sequence(Sequence::new(0, 0), entry.clone()).unwrap(),
Ok(SequencedEntry::new_from_sequence(Sequence::new(1, 2), entry.clone()).unwrap()), );
Ok(SequencedEntry::new_from_sequence(Sequence::new(0, 1), entry).unwrap()), write_buffer_state.push_entry(
])); SequencedEntry::new_from_sequence(Sequence::new(1, 0), entry.clone()).unwrap(),
);
write_buffer_state.push_entry(
SequencedEntry::new_from_sequence(Sequence::new(1, 2), entry.clone()).unwrap(),
);
write_buffer_state
.push_entry(SequencedEntry::new_from_sequence(Sequence::new(0, 1), entry).unwrap());
let write_buffer = Arc::new(MockBufferForReading::new(write_buffer_state));
let db = TestDb::builder() let db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _)) .write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _))
@ -2244,36 +2263,6 @@ mod tests {
assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![1]); assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![1]);
} }
/// Normalizes a set of ChunkSummaries for comparison by removing timestamps
fn normalize_summaries(summaries: Vec<ChunkSummary>) -> Vec<ChunkSummary> {
let mut summaries = summaries
.into_iter()
.map(|summary| {
let ChunkSummary {
partition_key,
table_name,
id,
storage,
lifecycle_action,
estimated_bytes,
row_count,
..
} = summary;
ChunkSummary::new_without_timestamps(
partition_key,
table_name,
id,
storage,
lifecycle_action,
estimated_bytes,
row_count,
)
})
.collect::<Vec<_>>();
summaries.sort_unstable();
summaries
}
#[tokio::test] #[tokio::test]
async fn partition_chunk_summaries() { async fn partition_chunk_summaries() {
// Test that chunk id listing is hooked up // Test that chunk id listing is hooked up
@ -2288,7 +2277,7 @@ mod tests {
print!("Partitions: {:?}", db.partition_keys().unwrap()); print!("Partitions: {:?}", db.partition_keys().unwrap());
let chunk_summaries = db.partition_chunk_summaries("1970-01-05T15"); let chunk_summaries = db.partition_chunk_summaries("1970-01-05T15");
let chunk_summaries = normalize_summaries(chunk_summaries); let chunk_summaries = ChunkSummary::normalize_summaries(chunk_summaries);
let expected = vec![ChunkSummary::new_without_timestamps( let expected = vec![ChunkSummary::new_without_timestamps(
Arc::from("1970-01-05T15"), Arc::from("1970-01-05T15"),
@ -2296,7 +2285,8 @@ mod tests {
0, 0,
ChunkStorage::OpenMutableBuffer, ChunkStorage::OpenMutableBuffer,
None, None,
70, 70, // memory_size
0, // os_size
1, 1,
)]; )];
@ -2304,7 +2294,7 @@ mod tests {
.chunk_summaries() .chunk_summaries()
.unwrap() .unwrap()
.into_iter() .into_iter()
.map(|x| x.estimated_bytes) .map(|x| x.memory_bytes)
.sum(); .sum();
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), size); assert_eq!(db.catalog.metrics().memory().mutable_buffer(), size);
@ -2394,7 +2384,7 @@ mod tests {
write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000").await; write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000").await;
let chunk_summaries = db.chunk_summaries().expect("expected summary to return"); let chunk_summaries = db.chunk_summaries().expect("expected summary to return");
let chunk_summaries = normalize_summaries(chunk_summaries); let chunk_summaries = ChunkSummary::normalize_summaries(chunk_summaries);
let lifecycle_action = None; let lifecycle_action = None;
@ -2406,6 +2396,7 @@ mod tests {
ChunkStorage::ReadBufferAndObjectStore, ChunkStorage::ReadBufferAndObjectStore,
lifecycle_action, lifecycle_action,
2115, // size of RB and OS chunks 2115, // size of RB and OS chunks
1132, // size of parquet file
1, 1,
), ),
ChunkSummary::new_without_timestamps( ChunkSummary::new_without_timestamps(
@ -2415,6 +2406,7 @@ mod tests {
ChunkStorage::OpenMutableBuffer, ChunkStorage::OpenMutableBuffer,
lifecycle_action, lifecycle_action,
64, 64,
0, // no OS chunks
1, 1,
), ),
ChunkSummary::new_without_timestamps( ChunkSummary::new_without_timestamps(
@ -2424,6 +2416,7 @@ mod tests {
ChunkStorage::ClosedMutableBuffer, ChunkStorage::ClosedMutableBuffer,
lifecycle_action, lifecycle_action,
2398, 2398,
0, // no OS chunks
1, 1,
), ),
ChunkSummary::new_without_timestamps( ChunkSummary::new_without_timestamps(
@ -2433,13 +2426,14 @@ mod tests {
ChunkStorage::OpenMutableBuffer, ChunkStorage::OpenMutableBuffer,
lifecycle_action, lifecycle_action,
87, 87,
0, // no OS chunks
1, 1,
), ),
]; ];
assert_eq!( assert_eq!(
expected, chunk_summaries, expected, chunk_summaries,
"expected:\n{:#?}\n\nactual:{:#?}\n\n", "\n\nexpected:\n{:#?}\n\nactual:{:#?}\n\n",
expected, chunk_summaries expected, chunk_summaries
); );

View File

@ -476,7 +476,8 @@ impl CatalogChunk {
id: self.addr.chunk_id, id: self.addr.chunk_id,
storage, storage,
lifecycle_action, lifecycle_action,
estimated_bytes: self.size(), memory_bytes: self.memory_bytes(),
object_store_bytes: self.object_store_bytes(),
row_count, row_count,
time_of_first_write: self.time_of_first_write, time_of_first_write: self.time_of_first_write,
time_of_last_write: self.time_of_last_write, time_of_last_write: self.time_of_last_write,
@ -491,7 +492,7 @@ impl CatalogChunk {
fn to_summary(v: (&str, usize)) -> ChunkColumnSummary { fn to_summary(v: (&str, usize)) -> ChunkColumnSummary {
ChunkColumnSummary { ChunkColumnSummary {
name: v.0.into(), name: v.0.into(),
estimated_bytes: v.1, memory_bytes: v.1,
} }
} }
@ -529,7 +530,7 @@ impl CatalogChunk {
} }
/// Returns an approximation of the amount of process memory consumed by the chunk /// Returns an approximation of the amount of process memory consumed by the chunk
pub fn size(&self) -> usize { pub fn memory_bytes(&self) -> usize {
match &self.stage { match &self.stage {
ChunkStage::Open { mb_chunk, .. } => mb_chunk.size(), ChunkStage::Open { mb_chunk, .. } => mb_chunk.size(),
ChunkStage::Frozen { representation, .. } => match &representation { ChunkStage::Frozen { representation, .. } => match &representation {
@ -550,6 +551,15 @@ impl CatalogChunk {
} }
} }
/// Returns the number of bytes of object storage consumed by this chunk
pub fn object_store_bytes(&self) -> usize {
match &self.stage {
ChunkStage::Open { .. } => 0,
ChunkStage::Frozen { .. } => 0,
ChunkStage::Persisted { parquet, .. } => parquet.file_size_bytes(),
}
}
/// Returns a mutable reference to the mutable buffer storage for chunks in the Open state /// Returns a mutable reference to the mutable buffer storage for chunks in the Open state
pub fn mutable_buffer(&mut self) -> Result<&mut MBChunk> { pub fn mutable_buffer(&mut self) -> Result<&mut MBChunk> {
match &mut self.stage { match &mut self.stage {

View File

@ -71,6 +71,7 @@ pub(crate) fn compact_chunks(
let ctx = db.exec.new_context(ExecutorType::Reorg); let ctx = db.exec.new_context(ExecutorType::Reorg);
let fut = async move { let fut = async move {
let fut_now = std::time::Instant::now();
let key = compute_sort_key(query_chunks.iter().map(|x| x.summary())); let key = compute_sort_key(query_chunks.iter().map(|x| x.summary()));
let key_str = format!("\"{}\"", key); // for logging let key_str = format!("\"{}\"", key); // for logging
@ -108,7 +109,8 @@ pub(crate) fn compact_chunks(
info!(input_chunks=query_chunks.len(), rub_row_groups=rb_row_groups, info!(input_chunks=query_chunks.len(), rub_row_groups=rb_row_groups,
input_rows=input_rows, output_rows=guard.table_summary().count(), input_rows=input_rows, output_rows=guard.table_summary().count(),
sort_key=%key_str, compaction_took = ?elapsed, rows_per_sec=?throughput, "chunk(s) compacted"); sort_key=%key_str, compaction_took = ?elapsed, fut_execution_duration= ?fut_now.elapsed(),
rows_per_sec=?throughput, "chunk(s) compacted");
Ok(DbChunk::snapshot(&guard)) Ok(DbChunk::snapshot(&guard))
}; };

View File

@ -197,7 +197,8 @@ fn chunk_summaries_schema() -> SchemaRef {
Field::new("table_name", DataType::Utf8, false), Field::new("table_name", DataType::Utf8, false),
Field::new("storage", DataType::Utf8, false), Field::new("storage", DataType::Utf8, false),
Field::new("lifecycle_action", DataType::Utf8, true), Field::new("lifecycle_action", DataType::Utf8, true),
Field::new("estimated_bytes", DataType::UInt64, false), Field::new("memory_bytes", DataType::UInt64, false),
Field::new("object_store_bytes", DataType::UInt64, false),
Field::new("row_count", DataType::UInt64, false), Field::new("row_count", DataType::UInt64, false),
Field::new("time_of_first_write", ts.clone(), true), Field::new("time_of_first_write", ts.clone(), true),
Field::new("time_of_last_write", ts.clone(), true), Field::new("time_of_last_write", ts.clone(), true),
@ -223,9 +224,13 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
.iter() .iter()
.map(|c| c.lifecycle_action.map(|a| a.name())) .map(|c| c.lifecycle_action.map(|a| a.name()))
.collect::<StringArray>(); .collect::<StringArray>();
let estimated_bytes = chunks let memory_bytes = chunks
.iter() .iter()
.map(|c| Some(c.estimated_bytes as u64)) .map(|c| Some(c.memory_bytes as u64))
.collect::<UInt64Array>();
let object_store_bytes = chunks
.iter()
.map(|c| Some(c.object_store_bytes as u64).filter(|&v| v > 0))
.collect::<UInt64Array>(); .collect::<UInt64Array>();
let row_counts = chunks let row_counts = chunks
.iter() .iter()
@ -255,7 +260,8 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
Arc::new(table_name), Arc::new(table_name),
Arc::new(storage), Arc::new(storage),
Arc::new(lifecycle_action), Arc::new(lifecycle_action),
Arc::new(estimated_bytes), Arc::new(memory_bytes),
Arc::new(object_store_bytes),
Arc::new(row_counts), Arc::new(row_counts),
Arc::new(time_of_first_write), Arc::new(time_of_first_write),
Arc::new(time_of_last_write), Arc::new(time_of_last_write),
@ -380,7 +386,7 @@ fn chunk_columns_schema() -> SchemaRef {
Field::new("row_count", DataType::UInt64, true), Field::new("row_count", DataType::UInt64, true),
Field::new("min_value", DataType::Utf8, true), Field::new("min_value", DataType::Utf8, true),
Field::new("max_value", DataType::Utf8, true), Field::new("max_value", DataType::Utf8, true),
Field::new("estimated_bytes", DataType::UInt64, true), Field::new("memory_bytes", DataType::UInt64, true),
])) ]))
} }
@ -396,7 +402,7 @@ fn assemble_chunk_columns(
.map(|column_summary| { .map(|column_summary| {
( (
column_summary.name.as_ref(), column_summary.name.as_ref(),
column_summary.estimated_bytes as u64, column_summary.memory_bytes as u64,
) )
}) })
.collect() .collect()
@ -413,7 +419,7 @@ fn assemble_chunk_columns(
let mut row_count = UInt64Builder::new(row_estimate); let mut row_count = UInt64Builder::new(row_estimate);
let mut min_values = StringBuilder::new(row_estimate); let mut min_values = StringBuilder::new(row_estimate);
let mut max_values = StringBuilder::new(row_estimate); let mut max_values = StringBuilder::new(row_estimate);
let mut estimated_bytes = UInt64Builder::new(row_estimate); let mut memory_bytes = UInt64Builder::new(row_estimate);
// Note no rows are produced for partitions with no chunks, or // Note no rows are produced for partitions with no chunks, or
// tables with no partitions: There are other tables to list tables // tables with no partitions: There are other tables to list tables
@ -442,7 +448,7 @@ fn assemble_chunk_columns(
let size = column_index.remove(column.name.as_str()); let size = column_index.remove(column.name.as_str());
estimated_bytes.append_option(size)?; memory_bytes.append_option(size)?;
} }
} }
@ -457,7 +463,7 @@ fn assemble_chunk_columns(
Arc::new(row_count.finish()), Arc::new(row_count.finish()),
Arc::new(min_values.finish()), Arc::new(min_values.finish()),
Arc::new(max_values.finish()), Arc::new(max_values.finish()),
Arc::new(estimated_bytes.finish()), Arc::new(memory_bytes.finish()),
], ],
) )
} }
@ -616,7 +622,8 @@ mod tests {
id: 0, id: 0,
storage: ChunkStorage::OpenMutableBuffer, storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action: None, lifecycle_action: None,
estimated_bytes: 23754, memory_bytes: 23754,
object_store_bytes: 0,
row_count: 11, row_count: 11,
time_of_first_write: Some(DateTime::from_utc( time_of_first_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(10, 0), NaiveDateTime::from_timestamp(10, 0),
@ -628,10 +635,11 @@ mod tests {
ChunkSummary { ChunkSummary {
partition_key: Arc::from("p1"), partition_key: Arc::from("p1"),
table_name: Arc::from("table1"), table_name: Arc::from("table1"),
id: 0, id: 1,
storage: ChunkStorage::OpenMutableBuffer, storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action: Some(ChunkLifecycleAction::Persisting), lifecycle_action: Some(ChunkLifecycleAction::Persisting),
estimated_bytes: 23454, memory_bytes: 23455,
object_store_bytes: 0,
row_count: 22, row_count: 22,
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: Some(DateTime::from_utc( time_of_last_write: Some(DateTime::from_utc(
@ -640,15 +648,35 @@ mod tests {
)), )),
time_closed: None, time_closed: None,
}, },
ChunkSummary {
partition_key: Arc::from("p1"),
table_name: Arc::from("table1"),
id: 2,
storage: ChunkStorage::ObjectStoreOnly,
lifecycle_action: None,
memory_bytes: 1234,
object_store_bytes: 5678,
row_count: 33,
time_of_first_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(100, 0),
Utc,
)),
time_of_last_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(200, 0),
Utc,
)),
time_closed: None,
},
]; ];
let expected = vec![ let expected = vec![
"+----+---------------+------------+-------------------+------------------------------+-----------------+-----------+---------------------+---------------------+-------------+", "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+",
"| id | partition_key | table_name | storage | lifecycle_action | estimated_bytes | row_count | time_of_first_write | time_of_last_write | time_closed |", "| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_first_write | time_of_last_write | time_closed |",
"+----+---------------+------------+-------------------+------------------------------+-----------------+-----------+---------------------+---------------------+-------------+", "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+",
"| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | 11 | 1970-01-01 00:00:10 | | |", "| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | 1970-01-01 00:00:10 | | |",
"| 0 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23454 | 22 | | 1970-01-01 00:01:20 | |", "| 1 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | | 1970-01-01 00:01:20 | |",
"+----+---------------+------------+-------------------+------------------------------+-----------------+-----------+---------------------+---------------------+-------------+", "| 2 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01 00:01:40 | 1970-01-01 00:03:20 | |",
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+",
]; ];
let schema = chunk_summaries_schema(); let schema = chunk_summaries_schema();
@ -825,7 +853,8 @@ mod tests {
id: 42, id: 42,
storage: ChunkStorage::ReadBuffer, storage: ChunkStorage::ReadBuffer,
lifecycle_action, lifecycle_action,
estimated_bytes: 23754, memory_bytes: 23754,
object_store_bytes: 0,
row_count: 11, row_count: 11,
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
@ -834,11 +863,11 @@ mod tests {
columns: vec![ columns: vec![
ChunkColumnSummary { ChunkColumnSummary {
name: "c1".into(), name: "c1".into(),
estimated_bytes: 11, memory_bytes: 11,
}, },
ChunkColumnSummary { ChunkColumnSummary {
name: "c2".into(), name: "c2".into(),
estimated_bytes: 12, memory_bytes: 12,
}, },
], ],
}, },
@ -859,7 +888,8 @@ mod tests {
id: 43, id: 43,
storage: ChunkStorage::OpenMutableBuffer, storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action, lifecycle_action,
estimated_bytes: 23754, memory_bytes: 23754,
object_store_bytes: 0,
row_count: 11, row_count: 11,
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
@ -867,7 +897,7 @@ mod tests {
}, },
columns: vec![ChunkColumnSummary { columns: vec![ChunkColumnSummary {
name: "c1".into(), name: "c1".into(),
estimated_bytes: 100, memory_bytes: 100,
}], }],
}, },
), ),
@ -887,7 +917,8 @@ mod tests {
id: 44, id: 44,
storage: ChunkStorage::OpenMutableBuffer, storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action, lifecycle_action,
estimated_bytes: 23754, memory_bytes: 23754,
object_store_bytes: 0,
row_count: 11, row_count: 11,
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
@ -895,21 +926,21 @@ mod tests {
}, },
columns: vec![ChunkColumnSummary { columns: vec![ChunkColumnSummary {
name: "c3".into(), name: "c3".into(),
estimated_bytes: 200, memory_bytes: 200,
}], }],
}, },
), ),
]; ];
let expected = vec![ let expected = vec![
"+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+",
"| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | estimated_bytes |", "| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | memory_bytes |",
"+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+",
"| p1 | 42 | t1 | c1 | ReadBuffer | 55 | bar | foo | 11 |", "| p1 | 42 | t1 | c1 | ReadBuffer | 55 | bar | foo | 11 |",
"| p1 | 42 | t1 | c2 | ReadBuffer | 66 | 11 | 43 | 12 |", "| p1 | 42 | t1 | c2 | ReadBuffer | 66 | 11 | 43 | 12 |",
"| p2 | 43 | t1 | c1 | OpenMutableBuffer | 667 | 110 | 430 | 100 |", "| p2 | 43 | t1 | c1 | OpenMutableBuffer | 667 | 110 | 430 | 100 |",
"| p2 | 44 | t2 | c3 | OpenMutableBuffer | 4 | -1 | 2 | 200 |", "| p2 | 44 | t2 | c3 | OpenMutableBuffer | 4 | -1 | 2 | 200 |",
"+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+-----------------+", "+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+",
]; ];
let batch = assemble_chunk_columns(chunk_columns_schema(), summaries).unwrap(); let batch = assemble_chunk_columns(chunk_columns_schema(), summaries).unwrap();

View File

@ -83,7 +83,7 @@ OBSERVER;
Example SQL to show the total estimated storage size by database: Example SQL to show the total estimated storage size by database:
SELECT database_name, storage, count(*) as num_chunks, SELECT database_name, storage, count(*) as num_chunks,
sum(estimated_bytes)/1024/1024 as estimated_mb sum(memory_bytes)/1024/1024 as estimated_mb
FROM chunks FROM chunks
GROUP BY database_name, storage GROUP BY database_name, storage
ORDER BY estimated_mb desc; ORDER BY estimated_mb desc;

View File

@ -116,7 +116,7 @@ SHOW COLUMNS FROM my_table; ;; Show columns in the table
SELECT SELECT
partition_key, table_name, storage, partition_key, table_name, storage,
count(*) as chunk_count, count(*) as chunk_count,
sum(estimated_bytes)/(1024*1024) as size_mb sum(memory_bytes)/(1024*1024) as size_mb
FROM FROM
system.chunks system.chunks
GROUP BY GROUP BY

View File

@ -317,7 +317,8 @@ async fn test_chunk_get() {
id: 0, id: 0,
storage: ChunkStorage::OpenMutableBuffer.into(), storage: ChunkStorage::OpenMutableBuffer.into(),
lifecycle_action, lifecycle_action,
estimated_bytes: 100, memory_bytes: 100,
object_store_bytes: 0,
row_count: 2, row_count: 2,
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
@ -329,7 +330,8 @@ async fn test_chunk_get() {
id: 0, id: 0,
storage: ChunkStorage::OpenMutableBuffer.into(), storage: ChunkStorage::OpenMutableBuffer.into(),
lifecycle_action, lifecycle_action,
estimated_bytes: 82, memory_bytes: 82,
object_store_bytes: 0,
row_count: 1, row_count: 1,
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
@ -498,7 +500,8 @@ async fn test_list_partition_chunks() {
id: 0, id: 0,
storage: ChunkStorage::OpenMutableBuffer.into(), storage: ChunkStorage::OpenMutableBuffer.into(),
lifecycle_action: ChunkLifecycleAction::Unspecified.into(), lifecycle_action: ChunkLifecycleAction::Unspecified.into(),
estimated_bytes: 100, memory_bytes: 100,
object_store_bytes: 0,
row_count: 2, row_count: 2,
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
@ -819,7 +822,8 @@ fn normalize_chunks(chunks: Vec<Chunk>) -> Vec<Chunk> {
id, id,
storage, storage,
lifecycle_action, lifecycle_action,
estimated_bytes, memory_bytes,
object_store_bytes,
row_count, row_count,
.. ..
} = summary; } = summary;
@ -829,11 +833,12 @@ fn normalize_chunks(chunks: Vec<Chunk>) -> Vec<Chunk> {
id, id,
storage, storage,
lifecycle_action, lifecycle_action,
estimated_bytes,
row_count, row_count,
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
time_closed: None, time_closed: None,
memory_bytes,
object_store_bytes,
} }
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>()

View File

@ -197,7 +197,7 @@ async fn test_get_chunks() {
.and(predicate::str::contains( .and(predicate::str::contains(
r#""storage": "OpenMutableBuffer","#, r#""storage": "OpenMutableBuffer","#,
)) ))
.and(predicate::str::contains(r#""estimated_bytes": 100"#)) .and(predicate::str::contains(r#""memory_bytes": 100"#))
// Check for a non empty timestamp such as // Check for a non empty timestamp such as
// "time_of_first_write": "2021-03-30T17:11:10.723866Z", // "time_of_first_write": "2021-03-30T17:11:10.723866Z",
.and(predicate::str::contains(r#""time_of_first_write": "20"#)); .and(predicate::str::contains(r#""time_of_first_write": "20"#));

View File

@ -10,6 +10,7 @@ use crate::{
}; };
use super::scenario::{collect_query, create_readable_database, rand_name, DatabaseBuilder}; use super::scenario::{collect_query, create_readable_database, rand_name, DatabaseBuilder};
use generated_types::influxdata::iox::management::v1::{operation_metadata::Job, CompactChunks};
#[tokio::test] #[tokio::test]
async fn test_chunk_is_persisted_automatically() { async fn test_chunk_is_persisted_automatically() {
@ -53,21 +54,25 @@ async fn test_full_lifecycle() {
let fixture = ServerFixture::create_shared().await; let fixture = ServerFixture::create_shared().await;
let mut write_client = fixture.write_client(); let mut write_client = fixture.write_client();
let db_name = rand_name();
DatabaseBuilder::new(db_name.clone())
.persist(true)
// wait 2 seconds for the data to arrive (to ensure we compact a single chunk)
.persist_age_threshold_seconds(2)
.late_arrive_window_seconds(1)
.build(fixture.grpc_channel())
.await;
// write in enough data to exceed the soft limit (512K) and
// expect that it compacts, persists and then unloads the data from memory
let num_payloads = 10; let num_payloads = 10;
let num_duplicates = 2; let num_duplicates = 2;
let payload_size = 1_000; let payload_size = 1_000;
let total_rows = num_payloads * num_duplicates * payload_size;
let db_name = rand_name();
DatabaseBuilder::new(db_name.clone())
.persist(true)
// Each write should go into a separate chunk to test compaction
.mub_row_threshold(payload_size)
// Only trigger persistence once we've finished writing
.persist_row_threshold(total_rows)
.persist_age_threshold_seconds(1000)
// A low late arrival time to speed up the test
.late_arrive_window_seconds(1)
.build(fixture.grpc_channel())
.await;
let payloads: Vec<_> = (0..num_payloads) let payloads: Vec<_> = (0..num_payloads)
.map(|x| { .map(|x| {
(0..payload_size) (0..payload_size)
@ -76,24 +81,17 @@ async fn test_full_lifecycle() {
}) })
.collect(); .collect();
for payload in payloads.iter().take(num_payloads - 1) { for payload in &payloads {
// Writing the same data multiple times should be compacted away // Writing the same data multiple times should be compacted away
for _ in 0..num_duplicates { for _ in 0..num_duplicates {
let num_lines_written = write_client let num_lines_written = write_client
.write(&db_name, payload) .write(&db_name, payload)
.await .await
.expect("successful write"); .expect("successful write");
assert_eq!(num_lines_written, payload_size); assert_eq!(num_lines_written, payload_size as usize);
} }
} }
// Don't duplicate last write as it is what crosses the persist row threshold
let num_lines_written = write_client
.write(&db_name, payloads.last().unwrap())
.await
.expect("successful write");
assert_eq!(num_lines_written, payload_size);
wait_for_exact_chunk_states( wait_for_exact_chunk_states(
&fixture, &fixture,
&db_name, &db_name,
@ -102,11 +100,27 @@ async fn test_full_lifecycle() {
) )
.await; .await;
// Expect compaction to have occurred
let performed_compaction = fixture
.operations_client()
.list_operations()
.await
.unwrap()
.iter()
.any(|operation| match operation.metadata().job {
Some(Job::CompactChunks(CompactChunks {
db_name: operation_db_name,
..
})) => operation_db_name == db_name,
_ => false,
});
assert!(performed_compaction);
// Expect them to have been compacted into a single read buffer // Expect them to have been compacted into a single read buffer
// with the duplicates eliminated // with the duplicates eliminated
let chunks = list_chunks(&fixture, &db_name).await; let chunks = list_chunks(&fixture, &db_name).await;
assert_eq!(chunks.len(), 1); assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].row_count, num_payloads * payload_size) assert_eq!(chunks[0].row_count, (num_payloads * payload_size) as usize)
} }
#[tokio::test] #[tokio::test]
@ -163,8 +177,6 @@ async fn test_query_chunk_after_restart() {
/// Create a closed read buffer chunk and return its id /// Create a closed read buffer chunk and return its id
async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32 { async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32 {
use influxdb_iox_client::management::generated_types::operation_metadata::Job;
let mut management_client = fixture.management_client(); let mut management_client = fixture.management_client();
let mut write_client = fixture.write_client(); let mut write_client = fixture.write_client();
let mut operations_client = fixture.operations_client(); let mut operations_client = fixture.operations_client();

View File

@ -317,11 +317,21 @@ impl DatabaseBuilder {
self self
} }
pub fn mub_row_threshold(mut self, threshold: u64) -> Self {
self.lifecycle_rules.mub_row_threshold = threshold;
self
}
pub fn persist_age_threshold_seconds(mut self, threshold: u32) -> Self { pub fn persist_age_threshold_seconds(mut self, threshold: u32) -> Self {
self.lifecycle_rules.persist_age_threshold_seconds = threshold; self.lifecycle_rules.persist_age_threshold_seconds = threshold;
self self
} }
pub fn persist_row_threshold(mut self, threshold: u64) -> Self {
self.lifecycle_rules.persist_row_threshold = threshold;
self
}
pub fn late_arrive_window_seconds(mut self, late_arrive_window_seconds: u32) -> Self { pub fn late_arrive_window_seconds(mut self, late_arrive_window_seconds: u32) -> Self {
self.lifecycle_rules.late_arrive_window_seconds = late_arrive_window_seconds; self.lifecycle_rules.late_arrive_window_seconds = late_arrive_window_seconds;
self self

View File

@ -15,54 +15,11 @@ use rdkafka::{
}; };
use std::convert::TryFrom; use std::convert::TryFrom;
use test_helpers::assert_contains; use test_helpers::assert_contains;
use write_buffer::maybe_skip_kafka_integration;
/// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the
/// caller.
///
/// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide
/// guidance for setting `KAFKA_CONNECTION`.
///
/// If `TEST_INTEGRATION` is not set, skip the calling test by returning early.
macro_rules! maybe_skip_integration {
() => {{
use std::env;
dotenv::dotenv().ok();
match (
env::var("TEST_INTEGRATION").is_ok(),
env::var("KAFKA_CONNECT").ok(),
) {
(true, Some(kafka_connection)) => kafka_connection,
(true, None) => {
panic!(
"TEST_INTEGRATION is set which requires running integration tests, but \
KAFKA_CONNECT is not set. Please run Kafka, perhaps by using the command \
`docker-compose -f docker/ci-kafka-docker-compose.yml up kafka`, then \
set KAFKA_CONNECT to the host and port where Kafka is accessible. If \
running the `docker-compose` command and the Rust tests on the host, the \
value for `KAFKA_CONNECT` should be `localhost:9093`. If running the Rust \
tests in another container in the `docker-compose` network as on CI, \
`KAFKA_CONNECT` should be `kafka:9092`."
)
}
(false, Some(_)) => {
eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run");
return;
}
(false, None) => {
eprintln!(
"skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \
run"
);
return;
}
}
}};
}
#[tokio::test] #[tokio::test]
async fn writes_go_to_kafka() { async fn writes_go_to_kafka() {
let kafka_connection = maybe_skip_integration!(); let kafka_connection = maybe_skip_kafka_integration!();
// set up a database with a write buffer pointing at kafka // set up a database with a write buffer pointing at kafka
let server = ServerFixture::create_shared().await; let server = ServerFixture::create_shared().await;
@ -135,7 +92,7 @@ async fn produce_to_kafka_directly(
#[tokio::test] #[tokio::test]
async fn reads_come_from_kafka() { async fn reads_come_from_kafka() {
let kafka_connection = maybe_skip_integration!(); let kafka_connection = maybe_skip_kafka_integration!();
// set up a database to read from Kafka // set up a database to read from Kafka
let server = ServerFixture::create_shared().await; let server = ServerFixture::create_shared().await;
@ -220,7 +177,7 @@ async fn reads_come_from_kafka() {
#[tokio::test] #[tokio::test]
async fn cant_write_to_db_reading_from_kafka() { async fn cant_write_to_db_reading_from_kafka() {
let kafka_connection = maybe_skip_integration!(); let kafka_connection = maybe_skip_kafka_integration!();
// set up a database to read from Kafka // set up a database to read from Kafka
let server = ServerFixture::create_shared().await; let server = ServerFixture::create_shared().await;

View File

@ -8,4 +8,11 @@ async-trait = "0.1"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
entry = { path = "../entry" } entry = { path = "../entry" }
futures = "0.3" futures = "0.3"
parking_lot = "0.11.1"
rdkafka = "0.26.0" rdkafka = "0.26.0"
observability_deps = { path = "../observability_deps" }
[dev-dependencies]
dotenv = "0.15.0"
tokio = { version = "1.0", features = ["macros", "fs"] }
uuid = { version = "0.8", features = ["serde", "v4"] }

View File

@ -11,9 +11,14 @@ pub type WriteBufferError = Box<dyn std::error::Error + Sync + Send>;
/// entries from the Write Buffer at a later time. /// entries from the Write Buffer at a later time.
#[async_trait] #[async_trait]
pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static { pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static {
/// Send an `Entry` to the write buffer and return information that can be used to restore /// Send an `Entry` to the write buffer using the specified sequencer ID.
/// entries at a later time. ///
async fn store_entry(&self, entry: &Entry) -> Result<Sequence, WriteBufferError>; /// Returns information that can be used to restore entries at a later time.
async fn store_entry(
&self,
entry: &Entry,
sequencer_id: u32,
) -> Result<Sequence, WriteBufferError>;
} }
/// Produce a stream of `SequencedEntry` that a `Db` can add to the mutable buffer by using /// Produce a stream of `SequencedEntry` that a `Db` can add to the mutable buffer by using
@ -26,3 +31,151 @@ pub trait WriteBufferReading: Sync + Send + std::fmt::Debug + 'static {
'life0: 'async_trait, 'life0: 'async_trait,
Self: 'async_trait; Self: 'async_trait;
} }
pub mod test_utils {
use async_trait::async_trait;
use entry::{test_helpers::lp_to_entry, Entry};
use futures::{StreamExt, TryStreamExt};
use super::{WriteBufferReading, WriteBufferWriting};
#[async_trait]
pub trait TestAdapter: Send + Sync {
type Context: TestContext;
async fn new_context(&self, n_sequencers: u32) -> Self::Context;
}
pub trait TestContext: Send + Sync {
type Writing: WriteBufferWriting;
type Reading: WriteBufferReading;
fn writing(&self) -> Self::Writing;
fn reading(&self) -> Self::Reading;
}
pub async fn perform_generic_tests<T>(adapter: T)
where
T: TestAdapter,
{
test_single_stream_io(&adapter).await;
test_multi_stream_io(&adapter).await;
test_multi_writer_multi_reader(&adapter).await;
}
async fn test_single_stream_io<T>(adapter: &T)
where
T: TestAdapter,
{
let context = adapter.new_context(1).await;
let entry_1 = lp_to_entry("upc user=1 100");
let entry_2 = lp_to_entry("upc user=2 200");
let entry_3 = lp_to_entry("upc user=3 300");
let writer = context.writing();
let reader = context.reading();
let mut stream = reader.stream();
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
// empty stream is pending
assert!(stream.poll_next_unpin(&mut cx).is_pending());
// adding content allows us to get results
writer.store_entry(&entry_1, 0).await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_1);
// stream is pending again
assert!(stream.poll_next_unpin(&mut cx).is_pending());
// adding more data unblocks the stream
writer.store_entry(&entry_2, 0).await.unwrap();
writer.store_entry(&entry_3, 0).await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_2);
assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_3);
// stream is pending again
assert!(stream.poll_next_unpin(&mut cx).is_pending());
}
async fn test_multi_stream_io<T>(adapter: &T)
where
T: TestAdapter,
{
let context = adapter.new_context(1).await;
let entry_1 = lp_to_entry("upc user=1 100");
let entry_2 = lp_to_entry("upc user=2 200");
let entry_3 = lp_to_entry("upc user=3 300");
let writer = context.writing();
let reader = context.reading();
let mut stream_1 = reader.stream();
let mut stream_2 = reader.stream();
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
// empty streams is pending
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
// streams poll from same source
writer.store_entry(&entry_1, 0).await.unwrap();
writer.store_entry(&entry_2, 0).await.unwrap();
writer.store_entry(&entry_3, 0).await.unwrap();
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_1);
assert_eq!(stream_2.next().await.unwrap().unwrap().entry(), &entry_2);
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_3);
// both streams are pending again
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
}
async fn test_multi_writer_multi_reader<T>(adapter: &T)
where
T: TestAdapter,
{
let context = adapter.new_context(2).await;
let entry_east_1 = lp_to_entry("upc,region=east user=1 100");
let entry_east_2 = lp_to_entry("upc,region=east user=2 200");
let entry_west_1 = lp_to_entry("upc,region=west user=1 200");
let writer_1 = context.writing();
let writer_2 = context.writing();
let reader_1 = context.reading();
let reader_2 = context.reading();
// TODO: do not hard-code sequencer IDs here but provide a proper interface
writer_1.store_entry(&entry_east_1, 0).await.unwrap();
writer_1.store_entry(&entry_west_1, 1).await.unwrap();
writer_2.store_entry(&entry_east_2, 0).await.unwrap();
assert_reader_content(reader_1, &[&entry_east_1, &entry_east_2, &entry_west_1]).await;
assert_reader_content(reader_2, &[&entry_east_1, &entry_east_2, &entry_west_1]).await;
}
async fn assert_reader_content<R>(reader: R, expected: &[&Entry])
where
R: WriteBufferReading,
{
// we need to limit the stream to `expected.len()` elements, otherwise it might be pending forever
let mut results: Vec<_> = reader
.stream()
.take(expected.len())
.try_collect()
.await
.unwrap();
results.sort_by_key(|entry| {
let sequence = entry.sequence().unwrap();
(sequence.id, sequence.number)
});
let actual: Vec<_> = results.iter().map(|entry| entry.entry()).collect();
assert_eq!(&actual[..], expected);
}
}

View File

@ -4,10 +4,12 @@ use async_trait::async_trait;
use data_types::server_id::ServerId; use data_types::server_id::ServerId;
use entry::{Entry, Sequence, SequencedEntry}; use entry::{Entry, Sequence, SequencedEntry};
use futures::{stream::BoxStream, StreamExt}; use futures::{stream::BoxStream, StreamExt};
use observability_deps::tracing::debug;
use rdkafka::{ use rdkafka::{
consumer::{Consumer, StreamConsumer}, consumer::{Consumer, StreamConsumer},
error::KafkaError, error::KafkaError,
producer::{FutureProducer, FutureRecord}, producer::{FutureProducer, FutureRecord},
util::Timeout,
ClientConfig, Message, ClientConfig, Message,
}; };
@ -31,24 +33,28 @@ impl std::fmt::Debug for KafkaBufferProducer {
#[async_trait] #[async_trait]
impl WriteBufferWriting for KafkaBufferProducer { impl WriteBufferWriting for KafkaBufferProducer {
/// Send an `Entry` to Kafka and return the partition ID as the sequencer ID and the offset /// Send an `Entry` to Kafka using the sequencer ID as a partition.
/// as the sequence number. async fn store_entry(
async fn store_entry(&self, entry: &Entry) -> Result<Sequence, WriteBufferError> { &self,
entry: &Entry,
sequencer_id: u32,
) -> Result<Sequence, WriteBufferError> {
let partition = i32::try_from(sequencer_id)?;
// This type annotation is necessary because `FutureRecord` is generic over key type, but // This type annotation is necessary because `FutureRecord` is generic over key type, but
// key is optional and we're not setting a key. `String` is arbitrary. // key is optional and we're not setting a key. `String` is arbitrary.
let record: FutureRecord<'_, String, _> = let record: FutureRecord<'_, String, _> = FutureRecord::to(&self.database_name)
FutureRecord::to(&self.database_name).payload(entry.data()); .payload(entry.data())
.partition(partition);
// Can't use `?` here because `send_result` returns `Err((E: Error, original_msg))` so we debug!(db_name=%self.database_name, partition, size=entry.data().len(), "writing to kafka");
// have to extract the actual error out with a `match`.
let (partition, offset) = match self.producer.send_result(record) { let (partition, offset) = self
// Same error structure on the result of the future, need to `match` .producer
Ok(delivery_future) => match delivery_future.await? { .send(record, Timeout::Never)
Ok((partition, offset)) => (partition, offset), .await
Err((e, _returned_record)) => return Err(Box::new(e)), .map_err(|(e, _owned_message)| Box::new(e))?;
},
Err((e, _returned_record)) => return Err(Box::new(e)), debug!(db_name=%self.database_name, %offset, %partition, size=entry.data().len(), "wrote to kafka");
};
Ok(Sequence { Ok(Sequence {
id: partition.try_into()?, id: partition.try_into()?,
@ -69,6 +75,7 @@ impl KafkaBufferProducer {
cfg.set("bootstrap.servers", &conn); cfg.set("bootstrap.servers", &conn);
cfg.set("message.timeout.ms", "5000"); cfg.set("message.timeout.ms", "5000");
cfg.set("message.max.bytes", "10000000"); cfg.set("message.max.bytes", "10000000");
cfg.set("queue.buffering.max.kbytes", "10485760");
let producer: FutureProducer = cfg.create()?; let producer: FutureProducer = cfg.create()?;
@ -153,3 +160,138 @@ impl KafkaBufferConsumer {
}) })
} }
} }
pub mod test_utils {
/// Get the testing Kafka connection string or return current scope.
///
/// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the
/// caller.
///
/// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide
/// guidance for setting `KAFKA_CONNECTION`.
///
/// If `TEST_INTEGRATION` is not set, skip the calling test by returning early.
#[macro_export]
macro_rules! maybe_skip_kafka_integration {
() => {{
use std::env;
dotenv::dotenv().ok();
match (
env::var("TEST_INTEGRATION").is_ok(),
env::var("KAFKA_CONNECT").ok(),
) {
(true, Some(kafka_connection)) => kafka_connection,
(true, None) => {
panic!(
"TEST_INTEGRATION is set which requires running integration tests, but \
KAFKA_CONNECT is not set. Please run Kafka, perhaps by using the command \
`docker-compose -f docker/ci-kafka-docker-compose.yml up kafka`, then \
set KAFKA_CONNECT to the host and port where Kafka is accessible. If \
running the `docker-compose` command and the Rust tests on the host, the \
value for `KAFKA_CONNECT` should be `localhost:9093`. If running the Rust \
tests in another container in the `docker-compose` network as on CI, \
`KAFKA_CONNECT` should be `kafka:9092`."
)
}
(false, Some(_)) => {
eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run");
return;
}
(false, None) => {
eprintln!(
"skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \
run"
);
return;
}
}
}};
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU32, Ordering};
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
client::DefaultClientContext,
};
use uuid::Uuid;
use crate::{
core::test_utils::{perform_generic_tests, TestAdapter, TestContext},
maybe_skip_kafka_integration,
};
use super::*;
struct KafkaTestAdapter {
conn: String,
}
impl KafkaTestAdapter {
fn new(conn: String) -> Self {
Self { conn }
}
}
#[async_trait]
impl TestAdapter for KafkaTestAdapter {
type Context = KafkaTestContext;
async fn new_context(&self, n_sequencers: u32) -> Self::Context {
// Common Kafka config
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", &self.conn);
cfg.set("message.timeout.ms", "5000");
// Create a topic with `n_partitions` partitions in Kafka
let database_name = format!("test_topic_{}", Uuid::new_v4());
let admin: AdminClient<DefaultClientContext> = cfg.clone().create().unwrap();
let topic = NewTopic::new(
&database_name,
n_sequencers as i32,
TopicReplication::Fixed(1),
);
let opts = AdminOptions::default();
admin.create_topics(&[topic], &opts).await.unwrap();
KafkaTestContext {
conn: self.conn.clone(),
database_name,
server_id_counter: AtomicU32::new(1),
}
}
}
struct KafkaTestContext {
conn: String,
database_name: String,
server_id_counter: AtomicU32,
}
impl TestContext for KafkaTestContext {
type Writing = KafkaBufferProducer;
type Reading = KafkaBufferConsumer;
fn writing(&self) -> Self::Writing {
KafkaBufferProducer::new(&self.conn, &self.database_name).unwrap()
}
fn reading(&self) -> Self::Reading {
let server_id = self.server_id_counter.fetch_add(1, Ordering::SeqCst);
let server_id = ServerId::try_from(server_id).unwrap();
KafkaBufferConsumer::new(&self.conn, server_id, &self.database_name).unwrap()
}
}
#[tokio::test]
async fn test_generic() {
let conn = maybe_skip_kafka_integration!();
perform_generic_tests(KafkaTestAdapter::new(conn)).await;
}
}

View File

@ -1,4 +1,4 @@
use std::sync::{Arc, Mutex}; use std::{collections::BTreeMap, sync::Arc, task::Poll};
use async_trait::async_trait; use async_trait::async_trait;
use entry::{Entry, Sequence, SequencedEntry}; use entry::{Entry, Sequence, SequencedEntry};
@ -6,25 +6,136 @@ use futures::{
stream::{self, BoxStream}, stream::{self, BoxStream},
StreamExt, StreamExt,
}; };
use parking_lot::Mutex;
use crate::core::{WriteBufferError, WriteBufferReading, WriteBufferWriting}; use crate::core::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
#[derive(Debug, Default)] type EntryResVec = Vec<Result<SequencedEntry, WriteBufferError>>;
/// Mocked entries for [`MockBufferForWriting`] and [`MockBufferForReading`].
#[derive(Debug, Clone)]
pub struct MockBufferSharedState {
entries: Arc<Mutex<BTreeMap<u32, EntryResVec>>>,
}
impl MockBufferSharedState {
/// Create new shared state w/ N sequencers.
pub fn empty_with_n_sequencers(n_sequencers: u32) -> Self {
let entries: BTreeMap<_, _> = (0..n_sequencers)
.map(|sequencer_id| (sequencer_id, vec![]))
.collect();
Self {
entries: Arc::new(Mutex::new(entries)),
}
}
/// Push a new entry to the specified sequencer.
///
/// # Panics
/// - when given entry is not sequenced
/// - when specified sequencer does not exist
/// - when sequence number in entry is not larger the current maximum
pub fn push_entry(&self, entry: SequencedEntry) {
let sequence = entry.sequence().expect("entry must be sequenced");
let mut entries = self.entries.lock();
let entry_vec = entries.get_mut(&sequence.id).expect("invalid sequencer ID");
let max_sequence_number = entry_vec
.iter()
.filter_map(|entry_res| {
entry_res
.as_ref()
.ok()
.map(|entry| entry.sequence().unwrap().number)
})
.max();
if let Some(max_sequence_number) = max_sequence_number {
assert!(
max_sequence_number < sequence.number,
"sequence number {} is less/equal than current max sequencer number {}",
sequence.number,
max_sequence_number
);
}
entry_vec.push(Ok(entry));
}
/// Push error to specified sequencer.
///
/// # Panics
/// - when sequencer does not exist
pub fn push_error(&self, error: WriteBufferError, sequencer_id: u32) {
let mut entries = self.entries.lock();
let entry_vec = entries
.get_mut(&sequencer_id)
.expect("invalid sequencer ID");
entry_vec.push(Err(error));
}
/// Get messages (entries and errors) for specified sequencer.
///
/// # Panics
/// - when sequencer does not exist
pub fn get_messages(&self, sequencer_id: u32) -> Vec<Result<SequencedEntry, WriteBufferError>> {
let mut entries = self.entries.lock();
let entry_vec = entries
.get_mut(&sequencer_id)
.expect("invalid sequencer ID");
entry_vec
.iter()
.map(|entry_res| match entry_res {
Ok(entry) => Ok(entry.clone()),
Err(e) => Err(e.to_string().into()),
})
.collect()
}
}
#[derive(Debug)]
pub struct MockBufferForWriting { pub struct MockBufferForWriting {
pub entries: Arc<Mutex<Vec<Entry>>>, state: MockBufferSharedState,
}
impl MockBufferForWriting {
pub fn new(state: MockBufferSharedState) -> Self {
Self { state }
}
} }
#[async_trait] #[async_trait]
impl WriteBufferWriting for MockBufferForWriting { impl WriteBufferWriting for MockBufferForWriting {
async fn store_entry(&self, entry: &Entry) -> Result<Sequence, WriteBufferError> { async fn store_entry(
let mut entries = self.entries.lock().unwrap(); &self,
let offset = entries.len() as u64; entry: &Entry,
entries.push(entry.clone()); sequencer_id: u32,
) -> Result<Sequence, WriteBufferError> {
let mut entries = self.state.entries.lock();
let sequencer_entries = entries.get_mut(&sequencer_id).unwrap();
Ok(Sequence { let sequence_number = sequencer_entries
id: 0, .iter()
number: offset, .filter_map(|entry_res| {
entry_res
.as_ref()
.ok()
.map(|entry| entry.sequence().unwrap().number)
}) })
.max()
.map(|n| n + 1)
.unwrap_or(0);
let sequence = Sequence {
id: sequencer_id,
number: sequence_number,
};
sequencer_entries.push(Ok(SequencedEntry::new_from_sequence(
sequence,
entry.clone(),
)
.unwrap()));
Ok(sequence)
} }
} }
@ -33,7 +144,11 @@ pub struct MockBufferForWritingThatAlwaysErrors;
#[async_trait] #[async_trait]
impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors { impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors {
async fn store_entry(&self, _entry: &Entry) -> Result<Sequence, WriteBufferError> { async fn store_entry(
&self,
_entry: &Entry,
_sequencer_id: u32,
) -> Result<Sequence, WriteBufferError> {
Err(String::from( Err(String::from(
"Something bad happened on the way to writing an entry in the write buffer", "Something bad happened on the way to writing an entry in the write buffer",
) )
@ -41,9 +156,23 @@ impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors {
} }
} }
type MoveableEntries = Arc<Mutex<Vec<Result<SequencedEntry, WriteBufferError>>>>;
pub struct MockBufferForReading { pub struct MockBufferForReading {
entries: MoveableEntries, state: MockBufferSharedState,
positions: Arc<Mutex<BTreeMap<u32, usize>>>,
}
impl MockBufferForReading {
pub fn new(state: MockBufferSharedState) -> Self {
let n_sequencers = state.entries.lock().len() as u32;
let positions: BTreeMap<_, _> = (0..n_sequencers)
.map(|sequencer_id| (sequencer_id, 0))
.collect();
Self {
state,
positions: Arc::new(Mutex::new(positions)),
}
}
} }
impl std::fmt::Debug for MockBufferForReading { impl std::fmt::Debug for MockBufferForReading {
@ -52,14 +181,6 @@ impl std::fmt::Debug for MockBufferForReading {
} }
} }
impl MockBufferForReading {
pub fn new(entries: Vec<Result<SequencedEntry, WriteBufferError>>) -> Self {
Self {
entries: Arc::new(Mutex::new(entries)),
}
}
}
impl WriteBufferReading for MockBufferForReading { impl WriteBufferReading for MockBufferForReading {
fn stream<'life0, 'async_trait>( fn stream<'life0, 'async_trait>(
&'life0 self, &'life0 self,
@ -68,11 +189,129 @@ impl WriteBufferReading for MockBufferForReading {
'life0: 'async_trait, 'life0: 'async_trait,
Self: 'async_trait, Self: 'async_trait,
{ {
// move the entries out of `self` to move them into the stream let state = self.state.clone();
let entries: Vec<_> = self.entries.lock().unwrap().drain(..).collect(); let positions = Arc::clone(&self.positions);
stream::iter(entries.into_iter()) stream::poll_fn(move |_ctx| {
.chain(stream::pending()) let entries = state.entries.lock();
let mut positions = positions.lock();
for (sequencer_id, position) in positions.iter_mut() {
let entry_vec = entries.get(sequencer_id).unwrap();
if entry_vec.len() > *position {
let entry = match &entry_vec[*position] {
Ok(entry) => Ok(entry.clone()),
Err(e) => Err(e.to_string().into()),
};
*position += 1;
return Poll::Ready(Some(entry));
}
}
Poll::Pending
})
.boxed() .boxed()
} }
} }
#[cfg(test)]
mod tests {
use entry::test_helpers::lp_to_entry;
use crate::core::test_utils::{perform_generic_tests, TestAdapter, TestContext};
use super::*;
struct MockTestAdapter {}
#[async_trait]
impl TestAdapter for MockTestAdapter {
type Context = MockTestContext;
async fn new_context(&self, n_sequencers: u32) -> Self::Context {
MockTestContext {
state: MockBufferSharedState::empty_with_n_sequencers(n_sequencers),
}
}
}
struct MockTestContext {
state: MockBufferSharedState,
}
impl TestContext for MockTestContext {
type Writing = MockBufferForWriting;
type Reading = MockBufferForReading;
fn writing(&self) -> Self::Writing {
MockBufferForWriting::new(self.state.clone())
}
fn reading(&self) -> Self::Reading {
MockBufferForReading::new(self.state.clone())
}
}
#[tokio::test]
async fn test_generic() {
perform_generic_tests(MockTestAdapter {}).await;
}
#[test]
#[should_panic(expected = "entry must be sequenced")]
fn test_state_push_entry_panic_unsequenced() {
let state = MockBufferSharedState::empty_with_n_sequencers(2);
let entry = lp_to_entry("upc,region=east user=1 100");
state.push_entry(SequencedEntry::new_unsequenced(entry));
}
#[test]
#[should_panic(expected = "invalid sequencer ID")]
fn test_state_push_entry_panic_wrong_sequencer() {
let state = MockBufferSharedState::empty_with_n_sequencers(2);
let entry = lp_to_entry("upc,region=east user=1 100");
let sequence = Sequence::new(2, 0);
state.push_entry(SequencedEntry::new_from_sequence(sequence, entry).unwrap());
}
#[test]
#[should_panic(
expected = "sequence number 13 is less/equal than current max sequencer number 13"
)]
fn test_state_push_entry_panic_wrong_sequence_number_equal() {
let state = MockBufferSharedState::empty_with_n_sequencers(2);
let entry = lp_to_entry("upc,region=east user=1 100");
let sequence = Sequence::new(1, 13);
state.push_entry(SequencedEntry::new_from_sequence(sequence, entry.clone()).unwrap());
state.push_entry(SequencedEntry::new_from_sequence(sequence, entry).unwrap());
}
#[test]
#[should_panic(
expected = "sequence number 12 is less/equal than current max sequencer number 13"
)]
fn test_state_push_entry_panic_wrong_sequence_number_less() {
let state = MockBufferSharedState::empty_with_n_sequencers(2);
let entry = lp_to_entry("upc,region=east user=1 100");
let sequence_1 = Sequence::new(1, 13);
let sequence_2 = Sequence::new(1, 12);
state.push_entry(SequencedEntry::new_from_sequence(sequence_1, entry.clone()).unwrap());
state.push_entry(SequencedEntry::new_from_sequence(sequence_2, entry).unwrap());
}
#[test]
#[should_panic(expected = "invalid sequencer ID")]
fn test_state_push_error_panic_wrong_sequencer() {
let state = MockBufferSharedState::empty_with_n_sequencers(2);
let error = "foo".to_string().into();
state.push_error(error, 2);
}
#[test]
#[should_panic(expected = "invalid sequencer ID")]
fn test_state_get_messages_panic_wrong_sequencer() {
let state = MockBufferSharedState::empty_with_n_sequencers(2);
state.get_messages(2);
}
}