Merge branch 'main' into ntran/heappy

pull/24376/head
kodiakhq[bot] 2021-08-12 17:57:16 +00:00 committed by GitHub
commit e0448be345
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
58 changed files with 835 additions and 319 deletions

1
Cargo.lock generated
View File

@ -3898,7 +3898,6 @@ dependencies = [
"serde_json",
"snafu",
"snap",
"tempfile",
"test_helpers",
"tikv-jemalloc-ctl",
"tokio",

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![allow(clippy::clone_on_ref_ptr)]
pub mod bitset;

View File

@ -29,6 +29,14 @@ pub fn to_approximate_datetime(instant: Instant) -> DateTime<Utc> {
}
}
// *NOTE*: these tests currently fail on (at least) aarch64 architectures
// such as an Apple M1 machine.
//
// Possibly related to https://github.com/rust-lang/rust/issues/87906 but
// not clear at this point.
//
// Ignoring the tests here to get the suite green on aarch64.
#[cfg(not(target_arch = "aarch64"))]
#[cfg(test)]
mod tests {
use super::*;
@ -50,4 +58,12 @@ mod tests {
ref_date - chrono::Duration::nanoseconds(23)
);
}
#[test]
fn test_to_datetime_simple() {
let d = std::time::Duration::from_nanos(78);
let a = Instant::now();
let b = a + d;
assert_eq!(b.duration_since(a), d);
}
}

View File

@ -2,7 +2,7 @@
//! servers including replicated data, rules for how data is split up and
//! queried, and what gets stored in the write buffer database.
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![allow(clippy::clone_on_ref_ptr)]
use std::sync::Arc;

View File

@ -1,7 +1,7 @@
// This crate deliberately does not use the same linting rules as the other
// crates because of all the generated code it contains that we don't have much
// control over.
#![deny(broken_intra_doc_links, rustdoc::bare_urls)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls)]
/// This module imports the generated protobuf code into a Rust module
/// hierarchy that matches the namespace hierarchy of the protobuf

View File

@ -1,7 +1,7 @@
// This crate deliberately does not use the same linting rules as the other
// crates because of all the generated code it contains that we don't have much
// control over.
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![allow(
unused_imports,
clippy::redundant_static_lifetimes,

View File

@ -166,7 +166,7 @@
//! # }
//! ```
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -1,6 +1,6 @@
//! An InfluxDB IOx API client.
#![deny(
broken_intra_doc_links,
rustdoc::broken_intra_doc_links,
rustdoc::bare_urls,
rust_2018_idioms,
missing_debug_implementations,

View File

@ -7,7 +7,7 @@
//! However, this implementation uses a nom combinator based parser
//! rather than attempting to port the imperative Go logic.
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
use observability_deps::{
tracing::{

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![allow(clippy::type_complexity)]
pub mod adapter;
pub mod column;

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,

View File

@ -142,10 +142,7 @@ impl MBChunk {
/// Returns a queryable snapshot of this chunk
#[cfg(feature = "nocache")]
pub fn snapshot(&self) -> Arc<ChunkSnapshot> {
Arc::new(ChunkSnapshot::new(
self,
self.metrics.memory_bytes.clone_empty(),
))
Arc::new(ChunkSnapshot::new(self))
}
/// Return the name of the table in this chunk

View File

@ -50,7 +50,7 @@
//! is done on a per-Chunk basis, so that as soon as the chunk is
//! closed the corresponding dictionary also becomes immutable
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -1,7 +1,7 @@
//! Custom panic hook that sends the panic information to a tracing
//! span
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,

View File

@ -284,9 +284,8 @@ impl PreservedCatalog {
db_name: &str,
) -> Result<Option<DateTime<Utc>>> {
let mut res = None;
for (path, _file_type, _revision_counter, _uuid) in
list_files(object_store, server_id, db_name).await?
{
for transaction in list_files(object_store, server_id, db_name).await? {
let path = transaction.file_path();
match load_transaction_proto(object_store, &path).await {
Ok(proto) => match parse_timestamp(&proto.start_timestamp) {
Ok(ts) => {
@ -316,9 +315,8 @@ impl PreservedCatalog {
server_id: ServerId,
db_name: &str,
) -> Result<()> {
for (path, _file_type, _revision_counter, _uuid) in
list_files(object_store, server_id, db_name).await?
{
for transaction in list_files(object_store, server_id, db_name).await? {
let path = transaction.file_path();
object_store.delete(&path).await.context(Write)?;
}
@ -379,38 +377,36 @@ impl PreservedCatalog {
let mut transactions: HashMap<u64, Uuid> = HashMap::new();
let mut max_revision = None;
let mut last_checkpoint = None;
for (_path, file_type, revision_counter, uuid) in
list_files(&object_store, server_id, &db_name).await?
{
for transaction in list_files(&object_store, server_id, &db_name).await? {
// keep track of the max
max_revision = Some(
max_revision
.map(|m: u64| m.max(revision_counter))
.unwrap_or(revision_counter),
.map(|m: u64| m.max(transaction.tkey.revision_counter))
.unwrap_or(transaction.tkey.revision_counter),
);
// keep track of latest checkpoint
if matches!(file_type, FileType::Checkpoint) {
if matches!(transaction.file_type, FileType::Checkpoint) {
last_checkpoint = Some(
last_checkpoint
.map(|m: u64| m.max(revision_counter))
.unwrap_or(revision_counter),
.map(|m: u64| m.max(transaction.tkey.revision_counter))
.unwrap_or(transaction.tkey.revision_counter),
);
}
// insert but check for duplicates
match transactions.entry(revision_counter) {
match transactions.entry(transaction.tkey.revision_counter) {
Occupied(o) => {
// sort for determinism
let (uuid1, uuid2) = if *o.get() < uuid {
(*o.get(), uuid)
let (uuid1, uuid2) = if *o.get() < transaction.tkey.uuid {
(*o.get(), transaction.tkey.uuid)
} else {
(uuid, *o.get())
(transaction.tkey.uuid, *o.get())
};
if uuid1 != uuid2 {
Fork {
revision_counter,
revision_counter: transaction.tkey.revision_counter,
uuid1,
uuid2,
}
@ -418,7 +414,7 @@ impl PreservedCatalog {
}
}
Vacant(v) => {
v.insert(uuid);
v.insert(transaction.tkey.uuid);
}
}
}
@ -456,7 +452,7 @@ impl PreservedCatalog {
&object_store,
server_id,
&db_name,
&tkey,
tkey,
&mut state,
&last_tkey,
file_type,
@ -497,7 +493,6 @@ impl PreservedCatalog {
pub fn revision_counter(&self) -> u64 {
self.previous_tkey
.read()
.clone()
.map(|tkey| tkey.revision_counter)
.expect("catalog should have at least an empty transaction")
}
@ -566,6 +561,14 @@ impl FileType {
Self::Checkpoint => proto::transaction::Encoding::Full,
}
}
fn parse_str(suffix: &str) -> Option<Self> {
match suffix {
TRANSACTION_FILE_SUFFIX => Some(Self::Transaction),
CHECKPOINT_FILE_SUFFIX => Some(Self::Checkpoint),
_ => None,
}
}
}
/// Creates object store path for given transaction or checkpoint.
@ -579,28 +582,41 @@ fn file_path(
object_store: &ObjectStore,
server_id: ServerId,
db_name: &str,
tkey: &TransactionKey,
tkey: TransactionKey,
file_type: FileType,
) -> Path {
let mut path = catalog_path(object_store, server_id, db_name);
let path = catalog_path(object_store, server_id, db_name);
// pad number: `u64::MAX.to_string().len()` is 20
path.push_dir(format!("{:0>20}", tkey.revision_counter));
let transaction = TransactionFile {
catalog_root: Some(path),
file_path: None,
tkey,
file_type,
};
let file_name = format!("{}.{}", tkey.uuid, file_type.suffix());
path.set_file_name(file_name);
path
transaction.file_path()
}
/// Extracts revision counter, UUID, and file type from transaction or checkpoint path.
fn parse_file_path(path: Path) -> Option<(u64, Uuid, FileType)> {
struct TransactionFile {
/// catalog_root will be stored elsewhere when all parquet file paths are relative
catalog_root: Option<Path>,
/// full path including catalog root; this will become a relative path only
file_path: Option<Path>,
tkey: TransactionKey,
file_type: FileType,
}
impl TransactionFile {
fn parse(path: Path) -> Option<Self> {
let file_path = path.clone();
let parsed: DirsAndFileName = path.into();
if parsed.directories.len() != 4 {
return None;
};
let revision_counter = parsed.directories[3].encoded().parse();
let revision_counter = parsed.directories[3].encoded().parse().ok()?;
let name_parts: Vec<_> = parsed
.file_name
@ -612,20 +628,41 @@ fn parse_file_path(path: Path) -> Option<(u64, Uuid, FileType)> {
if name_parts.len() != 2 {
return None;
}
let uuid = Uuid::parse_str(name_parts[0]);
let uuid = Uuid::parse_str(name_parts[0]).ok()?;
match (revision_counter, uuid) {
(Ok(revision_counter), Ok(uuid)) => {
for file_type in
std::array::IntoIter::new([FileType::Checkpoint, FileType::Transaction])
{
if name_parts[1] == file_type.suffix() {
return Some((revision_counter, uuid, file_type));
let file_type = FileType::parse_str(name_parts[1])?;
Some(Self {
catalog_root: None,
file_path: Some(file_path),
tkey: TransactionKey {
revision_counter,
uuid,
},
file_type,
})
}
fn file_path(&self) -> Path {
if let Some(file_path) = &self.file_path {
file_path.clone()
} else {
// If we don't have the full path, we must have the catalog path. This will be
// simplified soon
let mut path = self
.catalog_root
.as_ref()
.expect("must have catalog_root when there is no file_path")
.clone();
// pad number: `u64::MAX.to_string().len()` is 20
path.push_dir(format!("{:0>20}", self.tkey.revision_counter));
let file_name = format!("{}.{}", self.tkey.uuid, self.file_type.suffix());
path.set_file_name(file_name);
path
}
None
}
_ => None,
}
}
@ -636,7 +673,7 @@ async fn list_files(
object_store: &ObjectStore,
server_id: ServerId,
db_name: &str,
) -> Result<Vec<(Path, FileType, u64, Uuid)>> {
) -> Result<Vec<TransactionFile>> {
let list_path = catalog_path(object_store, server_id, db_name);
let paths = object_store
.list(Some(&list_path))
@ -645,11 +682,7 @@ async fn list_files(
.map_ok(|paths| {
paths
.into_iter()
.filter_map(|path| {
parse_file_path(path.clone()).map(|(revision_counter, uuid, file_type)| {
(path.clone(), file_type, revision_counter, uuid)
})
})
.filter_map(TransactionFile::parse)
.collect()
})
.try_concat()
@ -765,7 +798,7 @@ fn parse_encoding(encoding: i32) -> Result<proto::transaction::Encoding> {
}
/// Key to address transactions.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Copy)]
struct TransactionKey {
revision_counter: u64,
uuid: Uuid,
@ -884,7 +917,7 @@ impl OpenTransaction {
object_store,
server_id,
db_name,
&self.tkey(),
self.tkey(),
FileType::Transaction,
);
store_transaction_proto(object_store, &path, &self.proto).await?;
@ -895,7 +928,7 @@ impl OpenTransaction {
object_store: &Arc<ObjectStore>,
server_id: ServerId,
db_name: &str,
tkey: &TransactionKey,
tkey: TransactionKey,
state: &mut S,
last_tkey: &Option<TransactionKey>,
file_type: FileType,
@ -1232,7 +1265,7 @@ impl<'c> CheckpointHandle<'c> {
&object_store,
server_id,
db_name,
&self.tkey,
self.tkey,
FileType::Checkpoint,
);
store_transaction_proto(&object_store, &path, &proto).await?;
@ -1328,7 +1361,7 @@ pub mod test_helpers {
&catalog.object_store,
catalog.server_id,
&catalog.db_name,
&tkey,
tkey,
FileType::Transaction,
);
let mut proto = load_transaction_proto(&catalog.object_store, &path)
@ -1343,10 +1376,7 @@ pub mod test_helpers {
/// Helper function to ensure that guards don't leak into the future state machine.
fn get_tkey(catalog: &PreservedCatalog) -> TransactionKey {
let guard = catalog.previous_tkey.read();
guard
.as_ref()
.expect("should have at least a single transaction")
.clone()
guard.expect("should have at least a single transaction")
}
/// Torture-test implementations for [`CatalogState`].
@ -1772,7 +1802,7 @@ mod tests {
// remove transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -1835,7 +1865,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -1872,7 +1902,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -1914,7 +1944,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -1951,7 +1981,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -1988,7 +2018,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -2022,7 +2052,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[1];
let tkey = trace.tkeys[1];
let path = file_path(
&object_store,
server_id,
@ -2056,7 +2086,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -2093,7 +2123,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -2160,12 +2190,12 @@ mod tests {
// re-create transaction file with different UUID
assert!(trace.tkeys.len() >= 2);
let mut tkey = trace.tkeys[1].clone();
let mut tkey = trace.tkeys[1];
let path = file_path(
&object_store,
server_id,
db_name,
&tkey,
tkey,
FileType::Transaction,
);
let mut proto = load_transaction_proto(&object_store, &path).await.unwrap();
@ -2177,7 +2207,7 @@ mod tests {
&object_store,
server_id,
db_name,
&tkey,
tkey,
FileType::Transaction,
);
proto.uuid = new_uuid.to_string();
@ -2210,12 +2240,12 @@ mod tests {
// create checkpoint file with different UUID
assert!(trace.tkeys.len() >= 2);
let mut tkey = trace.tkeys[1].clone();
let mut tkey = trace.tkeys[1];
let path = file_path(
&object_store,
server_id,
db_name,
&tkey,
tkey,
FileType::Transaction,
);
let mut proto = load_transaction_proto(&object_store, &path).await.unwrap();
@ -2227,7 +2257,7 @@ mod tests {
&object_store,
server_id,
db_name,
&tkey,
tkey,
FileType::Checkpoint,
);
proto.uuid = new_uuid.to_string();
@ -2261,7 +2291,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -2304,7 +2334,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -2341,7 +2371,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -2381,7 +2411,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -2418,7 +2448,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -2455,7 +2485,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -2492,7 +2522,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -2587,7 +2617,7 @@ mod tests {
if trace.aborted[i] {
continue;
}
let tkey = &trace.tkeys[i];
let tkey = trace.tkeys[i];
let path = file_path(
&object_store,
server_id,
@ -2712,8 +2742,7 @@ mod tests {
}
fn record(&mut self, catalog: &PreservedCatalog, state: &TestCatalogState, aborted: bool) {
self.tkeys
.push(catalog.previous_tkey.read().clone().unwrap());
self.tkeys.push(catalog.previous_tkey.read().unwrap());
self.states.push(state.clone());
self.post_timestamps.push(Utc::now());
self.aborted.push(aborted);
@ -3118,7 +3147,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -3166,7 +3195,7 @@ mod tests {
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let tkey = trace.tkeys[0];
let path = file_path(
&object_store,
server_id,
@ -3239,7 +3268,7 @@ mod tests {
trace.record(&catalog, &state, false);
// delete transaction files
for (aborted, tkey) in trace.aborted.iter().zip(trace.tkeys.iter()) {
for (aborted, tkey) in trace.aborted.iter().zip(trace.tkeys.into_iter()) {
if *aborted {
continue;
}
@ -3325,12 +3354,12 @@ mod tests {
.unwrap();
// delete transaction file
let tkey = catalog.previous_tkey.read().clone().unwrap();
let tkey = catalog.previous_tkey.read().unwrap();
let path = file_path(
&object_store,
server_id,
db_name,
&tkey,
tkey,
FileType::Transaction,
);
checked_delete(&object_store, &path).await;
@ -3346,12 +3375,12 @@ mod tests {
}
// delete transaction file
let tkey = catalog.previous_tkey.read().clone().unwrap();
let tkey = catalog.previous_tkey.read().unwrap();
let path = file_path(
&object_store,
server_id,
db_name,
&tkey,
tkey,
FileType::Transaction,
);
checked_delete(&object_store, &path).await;

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -567,6 +567,14 @@ mod tests {
)
}
// *NOTE*: this test currently fails on (at least) aarch64 architectures
// such as an Apple M1 machine.
//
// Possibly related to https://github.com/rust-lang/rust/issues/87906 but
// not clear at this point.
//
// Ignoring the tests here to get the suite green on aarch64.
#[cfg(not(target_arch = "aarch64"))]
#[test]
#[should_panic(expected = "PersistenceWindows::add_range called out of order")]
fn panics_when_time_goes_backwards() {
@ -1389,6 +1397,14 @@ mod tests {
assert_eq!(w.closed[1].row_count.get(), 11);
}
// *NOTE*: this test currently fails on (at least) aarch64 architectures
// such as an Apple M1 machine.
//
// Possibly related to https://github.com/rust-lang/rust/issues/87906 but
// not clear at this point.
//
// Ignoring the tests here to get the suite green on aarch64.
#[cfg(not(target_arch = "aarch64"))]
#[test]
fn test_summaries() {
let late_arrival_period = Duration::from_secs(100);

View File

@ -1,5 +1,5 @@
//! Contains the IOx query engine
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,

View File

@ -369,10 +369,10 @@ async fn sql_select_from_system_chunk_columns() {
"+---------------+----------+------------+-------------+-------------------+-----------+------------+-----------+-----------+--------------+",
"| partition_key | chunk_id | table_name | column_name | storage | row_count | null_count | min_value | max_value | memory_bytes |",
"+---------------+----------+------------+-------------+-------------------+-----------+------------+-----------+-----------+--------------+",
"| 1970-01-01T00 | 0 | h2o | city | ReadBuffer | 2 | 0 | Boston | Boston | 252 |",
"| 1970-01-01T00 | 0 | h2o | other_temp | ReadBuffer | 2 | 1 | 70.4 | 70.4 | 425 |",
"| 1970-01-01T00 | 0 | h2o | state | ReadBuffer | 2 | 0 | MA | MA | 240 |",
"| 1970-01-01T00 | 0 | h2o | temp | ReadBuffer | 2 | 1 | 70.4 | 70.4 | 425 |",
"| 1970-01-01T00 | 0 | h2o | city | ReadBuffer | 2 | 0 | Boston | Boston | 255 |",
"| 1970-01-01T00 | 0 | h2o | other_temp | ReadBuffer | 2 | 1 | 70.4 | 70.4 | 281 |",
"| 1970-01-01T00 | 0 | h2o | state | ReadBuffer | 2 | 0 | MA | MA | 243 |",
"| 1970-01-01T00 | 0 | h2o | temp | ReadBuffer | 2 | 1 | 70.4 | 70.4 | 281 |",
"| 1970-01-01T00 | 0 | h2o | time | ReadBuffer | 2 | 0 | 50 | 250 | 51 |",
"| 1970-01-01T00 | 0 | o2 | city | OpenMutableBuffer | 2 | 1 | Boston | Boston | 35 |",
"| 1970-01-01T00 | 0 | o2 | reading | OpenMutableBuffer | 2 | 1 | 51 | 51 | 25 |",

View File

@ -659,10 +659,10 @@ mod test {
"# HELP read_buffer_column_bytes The number of bytes used by all columns in the Read Buffer",
"# TYPE read_buffer_column_bytes gauge",
r#"read_buffer_column_bytes{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64"} 72"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64"} 800"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64"} 512"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FIXED",log_data_type="f64"} 96"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool"} 672"#,
r#"read_buffer_column_bytes{db="mydb",encoding="RLE",log_data_type="string"} 500"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool"} 384"#,
r#"read_buffer_column_bytes{db="mydb",encoding="RLE",log_data_type="string"} 506"#,
"# HELP read_buffer_column_raw_bytes The number of bytes used by all columns if they were uncompressed in the Read Buffer",
"# TYPE read_buffer_column_raw_bytes gauge",
r#"read_buffer_column_raw_bytes{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="false"} 96"#,

View File

@ -11,7 +11,7 @@ impl BooleanEncoding {
/// The total size in bytes of the store columnar data.
pub fn size(&self) -> usize {
match self {
Self::BooleanNull(enc) => enc.size(),
Self::BooleanNull(enc) => enc.size(false),
}
}

View File

@ -1,6 +1,7 @@
//! An encoding nullable bool, by an Arrow array.
use std::cmp::Ordering;
use std::fmt::Debug;
use std::mem::size_of;
use arrow::array::{Array, BooleanArray};
use cmp::Operator;
@ -19,7 +20,7 @@ impl std::fmt::Display for Bool {
"[Bool] rows: {:?}, nulls: {:?}, size: {}",
self.arr.len(),
self.arr.null_count(),
self.size()
self.size(false)
)
}
}
@ -42,8 +43,12 @@ impl Bool {
/// Returns an estimation of the total size in bytes used by this column
/// encoding.
pub fn size(&self) -> usize {
std::mem::size_of::<BooleanArray>() + self.arr.get_array_memory_size()
pub fn size(&self, buffers: bool) -> usize {
size_of::<Self>()
+ match buffers {
true => self.arr.get_array_memory_size(), // includes buffer capacities
false => self.arr.get_buffer_memory_size(),
}
}
/// The estimated total size in bytes of the underlying bool values in the
@ -360,7 +365,8 @@ mod test {
#[test]
fn size() {
let v = Bool::from(vec![None, None, Some(true), Some(false)].as_slice());
assert_eq!(v.size(), 400);
assert_eq!(v.size(false), 256);
assert_eq!(v.size(true), 400); // includes allocated buffers
}
#[test]

View File

@ -18,8 +18,10 @@ pub trait ScalarEncoding<L>: Debug + Display + Send + Sync {
/// A useful name for the encoding, likely used in instrumentation.
fn name(&self) -> &'static str;
/// The total size in bytes to store encoded data in memory.
fn size(&self) -> usize;
/// The total size in bytes to store encoded data in memory. If `buffers`
/// is true then the returned size should account for any allocated buffers
/// within the contained encoding structures.
fn size(&self, buffers: bool) -> usize;
/// The estimated total size in bytes of the underlying encoded values if
/// they were stored contiguously as a vector of `L`. `include_null` should

View File

@ -53,7 +53,7 @@ where
"[{}] rows: {:?}, size: {}",
self.name(),
self.num_rows(),
self.size()
self.size(false)
)
}
}
@ -252,9 +252,13 @@ where
self.values.len() as u32
}
/// Encoded data size including `Self` - an "accurate" estimation.
fn size(&self) -> usize {
size_of::<Self>() + (size_of::<P>() * self.values.len())
fn size(&self, buffers: bool) -> usize {
let values = size_of::<P>()
* match buffers {
true => self.values.capacity(),
false => self.values.len(),
};
size_of::<Self>() + values
}
fn size_raw(&self, _: bool) -> usize {
@ -425,6 +429,19 @@ mod test {
(Fixed::new(values, Arc::clone(&mock)), mock)
}
#[test]
fn size() {
let (v, _) = new_encoding(vec![22_i64, 1, 18]);
// Self is 32 bytes and there are 3 * 8b values
assert_eq!(v.size(false), 56);
// check pre-allocated sizing
let (mut v, _) = new_encoding(vec![]);
v.values.reserve_exact(40);
// Self if 32 bytes and there are 40 * 8b values allocated
assert_eq!(v.size(true), 352);
}
#[test]
fn value() {
let (v, transcoder) = new_encoding(vec![22, 1, 18]);

View File

@ -52,7 +52,7 @@ where
self.name(),
self.arr.len(),
self.arr.null_count(),
self.size()
self.size(false)
)
}
}
@ -260,8 +260,12 @@ where
self.arr.null_count() as u32
}
fn size(&self) -> usize {
size_of::<Self>() + self.arr.get_array_memory_size()
fn size(&self, buffers: bool) -> usize {
size_of::<Self>()
+ match buffers {
true => self.arr.get_array_memory_size(),
false => self.arr.get_buffer_memory_size(),
}
}
/// The estimated total size in bytes of the underlying values in the
@ -478,7 +482,8 @@ mod test {
#[test]
fn size() {
let (v, _) = new_encoding(vec![None, None, Some(100), Some(2222)]);
assert_eq!(v.size(), 408);
assert_eq!(v.size(false), 264);
assert_eq!(v.size(true), 408); // includes allocated buffers
}
#[test]

View File

@ -70,7 +70,7 @@ where
f,
"[{}] size: {:?} rows: {:?} nulls: {} runs: {} ",
self.name(),
self.size(),
self.size(false),
self.num_rows(),
self.null_count(),
self.run_lengths.len()
@ -343,8 +343,13 @@ where
ENCODING_NAME
}
fn size(&self) -> usize {
std::mem::size_of::<Self>() + (self.run_lengths.len() * size_of::<(u32, Option<P>)>())
fn size(&self, buffers: bool) -> usize {
let values = size_of::<(u32, Option<P>)>()
* match buffers {
true => self.run_lengths.capacity(),
false => self.run_lengths.len(),
};
std::mem::size_of::<Self>() + values
}
fn size_raw(&self, include_nulls: bool) -> usize {
@ -713,16 +718,26 @@ mod test {
fn size() {
let (mut enc, _) = new_encoding(vec![]);
// 40b Self + (0 rl * 24) = 32
assert_eq!(enc.size(), 40);
// 40b Self + (0 rl * 24) = 40
assert_eq!(enc.size(false), 40);
enc.push_none();
// 40b Self + (1 rl * 24) = 56
assert_eq!(enc.size(), 64);
// 40b Self + (1 rl * 24) = 64
assert_eq!(enc.size(false), 64);
enc.push_additional_some(1, 10);
// 40b Self + (2 rl * 24) = 80
assert_eq!(enc.size(), 88);
// 40b Self + (2 rl * 24) = 88
assert_eq!(enc.size(false), 88);
// check allocated buffer size
let (mut enc, _) = new_encoding(vec![]);
enc.run_lengths.reserve_exact(40);
// 40b Self + (40 rl * 24) = 1000b
assert_eq!(enc.size(true), 1000);
// 40b Self + (40 rl * 24) = 1000b - no new allocations
enc.push_additional_some(1, 10);
assert_eq!(enc.size(true), 1000);
}
#[test]

View File

@ -30,8 +30,8 @@ impl Encoding {
pub fn size(&self) -> usize {
match &self {
Self::RLE(enc) => enc.size(),
Self::Plain(enc) => enc.size(),
Self::RLE(enc) => enc.size(false),
Self::Plain(enc) => enc.size(false),
}
}

View File

@ -47,7 +47,7 @@ impl Default for Dictionary {
}
impl Dictionary {
/// Initialises an Dictionar encoding with a set of logical values.
/// Initialises a Dictionary encoding with a set of logical values.
/// Creating an encoding using `with_dictionary` ensures that the dictionary
/// is in the correct order, and will allow values to be inserted with any
/// value in the dictionary.
@ -61,9 +61,13 @@ impl Dictionary {
}
/// A reasonable estimation of the on-heap size this encoding takes up.
pub fn size(&self) -> usize {
// the total size of all decoded values in the column.
let decoded_keys_size = self
/// If `buffers` is true then all allocated buffers in the encoding are
/// accounted for.
pub fn size(&self, buffers: bool) -> usize {
let base_size = size_of::<Self>();
// Total size of all decoded values in the column.
let mut decoded_keys_size = self
.entries
.iter()
.map(|k| match k {
@ -72,11 +76,18 @@ impl Dictionary {
} + size_of::<Option<String>>())
.sum::<usize>();
let entries_size = size_of::<Vec<Option<String>>>() + decoded_keys_size;
let encoded_ids_size = size_of::<Vec<u32>>() + (size_of::<u32>() * self.encoded_data.len());
if buffers {
decoded_keys_size +=
(self.entries.capacity() - self.entries.len()) * size_of::<Option<String>>();
}
// + 1 for contains_null field
entries_size + encoded_ids_size + 1
let encoded_ids_size = size_of::<u32>()
* match buffers {
true => self.encoded_data.capacity(),
false => self.encoded_data.len(),
};
base_size + decoded_keys_size + encoded_ids_size
}
/// A reasonable estimation of the on-heap size of the underlying string
@ -837,7 +848,7 @@ impl std::fmt::Display for Dictionary {
f,
"[{}] size: {:?} rows: {:?} cardinality: {}",
ENCODING_NAME,
self.size(),
self.size(false),
self.num_rows(),
self.cardinality(),
)
@ -873,17 +884,13 @@ mod test {
enc.push_none();
enc.push_none();
// keys - 14 bytes.
// 3 string entries in dictionary
// entries is 24 + (24*4) + 14 == 134
// Self - 24+24+8 = 56 bytes (two vectors, a bool and padding)
// 4 string entries (inc NULL) in vec = 4 * 24 = 96
// 3 string entries with length 4+5+5 = 14
// 15 rows.
// encoded ids is 24 + (4 * 15) == 84
// 134 + 84 + 1 == 219
assert_eq!(enc.size(), 219);
// encoded ids is (4 * 15) == 60
// 56 + 96 + 14 + 60 = 226
assert_eq!(enc.size(false), 226);
// check dictionary
assert_eq!(
@ -899,6 +906,24 @@ mod test {
enc.encoded_data,
vec![1, 1, 1, 2, 1, 1, 1, 1, 1, 3, 3, NULL_ID, NULL_ID, NULL_ID, NULL_ID]
);
// check for allocated size
let mut enc = Dictionary::default();
enc.encoded_data.reserve_exact(40);
enc.entries.reserve_exact(39); // account for already-allocated NULL element
enc.push_additional(Some("east".to_string()), 3);
enc.push_additional(Some("north".to_string()), 1);
enc.push_additional(Some("east".to_string()), 5);
enc.push_additional(Some("south".to_string()), 2);
enc.push_additional(None, 4);
// Self - 24+24+8 = 56 bytes (two vectors, a bool and padding)
// 40 string entries (inc NULL) in vec = 40 * 24 = 960
// 3 string entries with lengths 4+5+5 = 14
// 15 rows but 40 elements allocated
// encoded ids is (40 * 4) == 160
// 56 + 960 + 14 + 160 = 1190
assert_eq!(enc.size(true), 1190);
}
#[test]

View File

@ -3,8 +3,6 @@ use std::convert::From;
use std::iter;
use std::mem::size_of;
use croaring::Bitmap;
use arrow::array::{Array, StringArray};
use super::NULL_ID;
@ -75,13 +73,18 @@ impl RLE {
}
/// A reasonable estimation of the on-heap size this encoding takes up.
pub fn size(&self) -> usize {
// the total size of all decoded values in the column.
let decoded_keys_size = self.index_entries.iter().map(|k| k.len()).sum::<usize>();
/// If `buffers` is true then the size of all allocated buffers in the
/// encoding are accounted for.
pub fn size(&self, buffers: bool) -> usize {
let base_size = size_of::<Self>();
let index_entry_size = size_of::<Vec<String>>() // container size
+ (size_of::<String>() * self.index_entries.len()) // elements size
+ decoded_keys_size; // heap allocated strings size
let mut index_entries_size = size_of::<String>()
* match buffers {
true => self.index_entries.capacity(),
false => self.index_entries.len(),
};
// the total size of all decoded values in the column.
index_entries_size += self.index_entries.iter().map(|k| k.len()).sum::<usize>();
// The total size (an upper bound estimate) of all the bitmaps
// in the column.
@ -91,14 +94,16 @@ impl RLE {
.map(|row_ids| row_ids.size())
.sum::<usize>();
let index_row_ids_size = size_of::<BTreeMap<u32, Bitmap>>()
+ (size_of::<u32>() * self.index_row_ids.len())
+ row_ids_bitmaps_size;
let index_row_ids_size =
(size_of::<u32>() * self.index_row_ids.len()) + row_ids_bitmaps_size;
let run_lengths_size = size_of::<Vec<(u32, u32)>>() + // container size
(size_of::<(u32, u32)>() * self.run_lengths.len()); // each run-length size
let run_lengths_size = size_of::<(u32, u32)>()
* match buffers {
true => self.run_lengths.capacity(),
false => self.run_lengths.len(),
};
index_entry_size + index_row_ids_size + run_lengths_size + 1 + 4
base_size + index_entries_size + index_row_ids_size + run_lengths_size
}
/// A reasonable estimation of the on-heap size of the underlying string
@ -958,7 +963,7 @@ impl std::fmt::Display for RLE {
f,
"[{}] size: {:?} rows: {:?} cardinality: {}, nulls: {} runs: {} ",
ENCODING_NAME,
self.size(),
self.size(false),
self.num_rows,
self.cardinality(),
self.null_count(),
@ -1000,22 +1005,34 @@ mod test {
enc.push_none();
enc.push_none();
// Note: there are 4 index entries to account for NULL entry.
// `index_entry` is 24 + (24*4) + 14 == 134
// * Self: 24 + 24 + 24 + 1 + (padding 3b) + 4 = 80b
// * index entries: (4) are is (24*4) + 14 == 110
// * index row ids: (bitmaps) is (4 * 4) + (108b for bitmaps) == 124
// * run lengths: (8*5) == 40
//
// bitmaps for east, north, south and NULL entries.
// `index_row_ids` is 24 + (4 * 4) + (108b for bitmaps) == 148
//
// `run lengths` is 24 + (8*5) == 64
//
// `contains_null` - 1 byte
// `num_rows` - 4 bytes
//
// 351
// 354
// assert_eq!(enc.size(false), 354);
// TODO(edd): there some mystery bytes in the bitmap implementation.
// need to figure out how to measure these
assert_eq!(enc.size(), 351);
// check allocated size
let mut enc = RLE::default();
enc.index_entries.reserve_exact(39); // account for already-allocated NULL element
enc.run_lengths.reserve_exact(40);
enc.push_additional(Some("east".to_string()), 3);
enc.push_additional(Some("north".to_string()), 1);
enc.push_additional(Some("east".to_string()), 5);
enc.push_additional(Some("south".to_string()), 2);
enc.push_none();
enc.push_none();
enc.push_none();
enc.push_none();
// * Self: 24 + 24 + 24 + 1 + (padding 3b) + 4 = 80b
// * index entries: (40 * 24) + 14 == 974
// * index row ids: (bitmaps) is (4 * 4) + (108b for bitmaps) == 124
// * run lengths: (40 * 8) == 320
//
assert_eq!(enc.size(true), 1498);
}
#[test]

View File

@ -32,7 +32,7 @@ impl FloatEncoding {
/// The total size in bytes of to store columnar data in memory.
pub fn size(&self) -> usize {
match self {
Self::F64(enc, _) => enc.size(),
Self::F64(enc, _) => enc.size(false),
}
}

View File

@ -27,8 +27,8 @@ impl IntegerEncoding {
/// The total size in bytes of the store columnar data.
pub fn size(&self) -> usize {
match self {
Self::I64(enc, _) => enc.size(),
Self::U64(enc, _) => enc.size(),
Self::I64(enc, _) => enc.size(false),
Self::U64(enc, _) => enc.size(false),
}
}
@ -971,13 +971,13 @@ mod test {
// Input data containing NULL will be stored in an Arrow array encoding
let cases = vec![
(vec![None, Some(0_i64)], 400_usize), // u8 Arrow array
(vec![None, Some(-120_i64)], 400), // i8
(vec![None, Some(399_i64)], 400), // u16
(vec![None, Some(-399_i64)], 400), // i16
(vec![None, Some(u32::MAX as i64)], 400), // u32
(vec![None, Some(i32::MIN as i64)], 400), // i32
(vec![None, Some(u32::MAX as i64 + 1)], 400), //u64
(vec![None, Some(0_i64)], 256_usize), // u8 Arrow array
(vec![None, Some(-120_i64)], 256), // i8
(vec![None, Some(399_i64)], 256), // u16
(vec![None, Some(-399_i64)], 256), // i16
(vec![None, Some(u32::MAX as i64)], 256), // u32
(vec![None, Some(i32::MIN as i64)], 256), // i32
(vec![None, Some(u32::MAX as i64 + 1)], 256), //u64
];
for (case, name) in cases.iter().cloned() {
@ -1163,10 +1163,10 @@ mod test {
// Input data containing NULL will be stored in an Arrow array encoding
let cases = vec![
(vec![None, Some(0_u64)], 400_usize),
(vec![None, Some(399_u64)], 400),
(vec![None, Some(u32::MAX as u64)], 400),
(vec![None, Some(u64::MAX)], 400),
(vec![None, Some(0_u64)], 256_usize),
(vec![None, Some(399_u64)], 256),
(vec![None, Some(u32::MAX as u64)], 256),
(vec![None, Some(u64::MAX)], 256),
];
for (case, size) in cases.iter().cloned() {

View File

@ -30,8 +30,8 @@ impl StringEncoding {
/// The estimated total size in bytes of the in-memory columnar data.
pub fn size(&self) -> usize {
match self {
Self::RleDictionary(enc) => enc.size(),
Self::Dictionary(enc) => enc.size(),
Self::RleDictionary(enc) => enc.size(false),
Self::Dictionary(enc) => enc.size(false),
}
}

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(clippy::clone_on_ref_ptr, clippy::use_self)]
#![allow(dead_code, clippy::too_many_arguments)]
mod chunk;

View File

@ -45,7 +45,6 @@ serde = "1.0"
serde_json = "1.0"
snafu = "0.6"
snap = "1.0.0"
tempfile = "3.1.0"
tikv-jemalloc-ctl = "0.4.0"
tokio = { version = "1.0", features = ["macros", "time"] }
tokio-util = { version = "0.6.3" }

View File

@ -4,6 +4,7 @@ use metrics::MetricRegistry;
use object_store::ObjectStore;
use observability_deps::tracing::info;
use query::exec::Executor;
use write_buffer::config::WriteBufferConfigFactory;
use crate::JobRegistry;
@ -12,6 +13,7 @@ use crate::JobRegistry;
#[derive(Debug, Clone)]
pub struct ApplicationState {
object_store: Arc<ObjectStore>,
write_buffer_factory: Arc<WriteBufferConfigFactory>,
executor: Arc<Executor>,
job_registry: Arc<JobRegistry>,
metric_registry: Arc<MetricRegistry>,
@ -27,6 +29,28 @@ impl ApplicationState {
Self {
object_store,
write_buffer_factory: Arc::new(Default::default()),
executor: Arc::new(Executor::new(num_threads)),
job_registry: Arc::new(JobRegistry::new()),
metric_registry: Arc::new(metrics::MetricRegistry::new()),
}
}
/// Same as [`new`](Self::new) but also specifies the write buffer factory.
///
/// This is mostly useful for testing.
#[cfg(test)]
pub fn with_write_buffer_factory(
object_store: Arc<ObjectStore>,
write_buffer_factory: Arc<WriteBufferConfigFactory>,
num_worker_threads: Option<usize>,
) -> Self {
let num_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
info!(%num_threads, "using specified number of threads per thread pool");
Self {
object_store,
write_buffer_factory,
executor: Arc::new(Executor::new(num_threads)),
job_registry: Arc::new(JobRegistry::new()),
metric_registry: Arc::new(metrics::MetricRegistry::new()),
@ -37,6 +61,10 @@ impl ApplicationState {
&self.object_store
}
pub fn write_buffer_factory(&self) -> &Arc<WriteBufferConfigFactory> {
&self.write_buffer_factory
}
pub fn job_registry(&self) -> &Arc<JobRegistry> {
&self.job_registry
}

View File

@ -7,9 +7,10 @@ use data_types::server_id::ServerId;
use data_types::{database_rules::DatabaseRules, DatabaseName};
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, TryFutureExt};
use generated_types::database_rules::encode_database_rules;
use internal_types::freezable::Freezable;
use object_store::path::{ObjectStorePath, Path};
use observability_deps::tracing::{error, info};
use observability_deps::tracing::{error, info, warn};
use parking_lot::RwLock;
use persistence_windows::checkpoint::ReplayPlan;
use snafu::{ResultExt, Snafu};
@ -17,13 +18,12 @@ use tokio::sync::Notify;
use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use crate::db::load::load_or_create_preserved_catalog;
use crate::db::load::{create_preserved_catalog, load_or_create_preserved_catalog};
use crate::db::DatabaseToCommit;
use crate::{ApplicationState, Db, DB_RULES_FILE_NAME};
use bytes::BytesMut;
use object_store::{ObjectStore, ObjectStoreApi};
use parquet_file::catalog::PreservedCatalog;
use write_buffer::config::WriteBufferConfig;
const INIT_BACKOFF: Duration = Duration::from_secs(1);
@ -89,6 +89,9 @@ pub struct DatabaseConfig {
}
impl Database {
/// Create in-mem database object.
///
/// This is backed by an existing database, which was [created](Self::create) some time in the past.
pub fn new(application: Arc<ApplicationState>, config: DatabaseConfig) -> Self {
info!(db_name=%config.name, store_prefix=%config.store_prefix.display(), "new database");
@ -106,6 +109,30 @@ impl Database {
Self { join, shared }
}
/// Create fresh database w/o any state.
pub async fn create(
application: Arc<ApplicationState>,
store_prefix: &Path,
rules: DatabaseRules,
server_id: ServerId,
) -> Result<(), InitError> {
let db_name = rules.name.clone();
persist_database_rules(application.object_store(), store_prefix, rules).await?;
create_preserved_catalog(
db_name.as_str(),
Arc::clone(application.object_store()),
server_id,
Arc::clone(application.metric_registry()),
true,
)
.await
.context(CannotCreatePreservedCatalog)?;
Ok(())
}
/// Triggers shutdown of this `Database`
pub fn shutdown(&self) {
info!(db_name=%self.shared.config.name, "database shutting down");
@ -223,6 +250,20 @@ impl Database {
}
}
impl Drop for Database {
fn drop(&mut self) {
let db_name = &self.shared.config.name;
if !self.shared.shutdown.is_cancelled() {
warn!(%db_name, "database dropped without calling shutdown()");
self.shared.shutdown.cancel();
}
if self.join.clone().now_or_never().is_none() {
warn!(%db_name, "database dropped without waiting for worker termination");
}
}
}
/// State shared with the `Database` background worker
#[derive(Debug)]
struct DatabaseShared {
@ -407,6 +448,17 @@ pub enum InitError {
#[snafu(display("error during replay: {}", source))]
Replay { source: crate::db::Error },
#[snafu(display("store error: {}", source))]
StoreError { source: object_store::Error },
#[snafu(display("error serializing database rules to protobuf: {}", source))]
ErrorSerializingRulesProtobuf {
source: generated_types::database_rules::EncodeError,
},
#[snafu(display("cannot create preserved catalog: {}", source))]
CannotCreatePreservedCatalog { source: crate::db::load::Error },
}
/// The Database startup state machine
@ -534,7 +586,10 @@ impl DatabaseStateRulesLoaded {
.await
.context(CatalogLoad)?;
let write_buffer = WriteBufferConfig::new(shared.config.server_id, self.rules.as_ref())
let write_buffer = shared
.application
.write_buffer_factory()
.new_config(shared.config.server_id, self.rules.as_ref())
.await
.context(CreateWriteBuffer)?;
@ -608,6 +663,32 @@ async fn get_store_bytes(
Ok(bytes.freeze())
}
/// Persist the the `DatabaseRules` given the `Database` store prefix
pub(super) async fn persist_database_rules(
object_store: &ObjectStore,
store_prefix: &Path,
rules: DatabaseRules,
) -> Result<(), InitError> {
let mut data = BytesMut::new();
encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?;
let mut location = store_prefix.clone();
location.set_file_name(DB_RULES_FILE_NAME);
let len = data.len();
let stream_data = std::io::Result::Ok(data.freeze());
object_store
.put(
&location,
futures::stream::once(async move { stream_data }),
Some(len),
)
.await
.context(StoreError)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -2531,7 +2531,7 @@ mod tests {
("svr_id", "1"),
])
.histogram()
.sample_sum_eq(3191.0)
.sample_sum_eq(3197.0)
.unwrap();
let rb = collect_read_filter(&rb_chunk).await;
@ -3400,7 +3400,7 @@ mod tests {
id: 2,
storage: ChunkStorage::ReadBufferAndObjectStore,
lifecycle_action,
memory_bytes: 3284, // size of RB and OS chunks
memory_bytes: 3140, // size of RB and OS chunks
object_store_bytes: 1577, // size of parquet file
row_count: 2,
time_of_last_access: None,
@ -3451,7 +3451,7 @@ mod tests {
}
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 87);
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2410);
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2266);
assert_eq!(db.catalog.metrics().memory().object_store(), 874);
}

View File

@ -153,7 +153,7 @@ pub async fn create_preserved_catalog(
/// All input required to create an empty [`Loader`]
#[derive(Debug)]
pub struct LoaderEmptyInput {
struct LoaderEmptyInput {
domain: ::metrics::Domain,
metrics_registry: Arc<::metrics::MetricRegistry>,
metric_labels: Vec<KeyValue>,

View File

@ -59,7 +59,7 @@
//! └────────────┘ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//! ```
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,
@ -71,7 +71,6 @@
use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use data_types::database_rules::ShardConfig;
use data_types::error::ErrorLogger;
use data_types::{
@ -80,11 +79,9 @@ use data_types::{
server_id::ServerId,
{DatabaseName, DatabaseNameError},
};
use database::{Database, DatabaseConfig};
use db::load::create_preserved_catalog;
use database::{persist_database_rules, Database, DatabaseConfig};
use entry::{lines_to_sharded_entries, pb_to_entry, Entry, ShardedEntry};
use futures::future::{BoxFuture, Future, FutureExt, Shared, TryFutureExt};
use generated_types::database_rules::encode_database_rules;
use generated_types::influxdata::pbdata::v1 as pb;
use hashbrown::HashMap;
use influxdb_line_protocol::ParsedLine;
@ -140,6 +137,12 @@ pub enum Error {
#[snafu(display("database not initialized"))]
DatabaseNotInitialized { db_name: String },
#[snafu(display("cannot persisted updated rules: {}", source))]
CannotPersistUpdatedRules { source: crate::database::InitError },
#[snafu(display("cannot create database: {}", source))]
CannotCreateDatabase { source: crate::database::InitError },
#[snafu(display("database not found"))]
DatabaseNotFound { db_name: String },
@ -176,14 +179,6 @@ pub enum Error {
#[snafu(display("error replicating to remote: {}", source))]
ErrorReplicating { source: DatabaseError },
#[snafu(display("error serializing database rules to protobuf: {}", source))]
ErrorSerializingRulesProtobuf {
source: generated_types::database_rules::EncodeError,
},
#[snafu(display("store error: {}", source))]
StoreError { source: object_store::Error },
#[snafu(display("error converting line protocol to flatbuffers: {}", source))]
LineConversion { source: entry::Error },
@ -218,9 +213,6 @@ pub enum Error {
source: connection::ConnectionManagerError,
},
#[snafu(display("cannot create preserved catalog: {}", source))]
CannotCreatePreservedCatalog { source: DatabaseError },
#[snafu(display("database failed to initialize: {}", source))]
DatabaseInit { source: Arc<database::InitError> },
}
@ -374,6 +366,19 @@ pub struct Server<M: ConnectionManager> {
shared: Arc<ServerShared>,
}
impl<M: ConnectionManager> Drop for Server<M> {
fn drop(&mut self) {
if !self.shared.shutdown.is_cancelled() {
warn!("server dropped without calling shutdown()");
self.shared.shutdown.cancel();
}
if self.join.clone().now_or_never().is_none() {
warn!("server dropped without waiting for worker termination");
}
}
}
#[derive(Debug)]
struct ServerShared {
/// A token that is used to trigger shutdown of the background worker
@ -687,18 +692,14 @@ where
};
let store_prefix = database_store_prefix(object_store, server_id, &db_name);
persist_database_rules(object_store, &store_prefix, rules).await?;
create_preserved_catalog(
db_name.as_str(),
Arc::clone(self.shared.application.object_store()),
Database::create(
Arc::clone(&self.shared.application),
&store_prefix,
rules,
server_id,
Arc::clone(self.shared.application.metric_registry()),
true,
)
.await
.map_err(|e| Box::new(e) as _)
.context(CannotCreatePreservedCatalog)?;
.map_err(|e| Error::CannotCreateDatabase { source: e })?;
let database = {
let mut state = self.shared.state.write();
@ -971,7 +972,8 @@ where
&database.config().store_prefix,
rules.as_ref().clone(),
)
.await?;
.await
.map_err(|e| Error::CannotPersistUpdatedRules { source: e })?;
Ok(rules)
}
@ -1270,32 +1272,6 @@ fn database_store_prefix(
path
}
/// Persist the the `DatabaseRules` given the `Database` store prefix
async fn persist_database_rules(
object_store: &ObjectStore,
store_prefix: &Path,
rules: DatabaseRules,
) -> Result<()> {
let mut data = BytesMut::new();
encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?;
let mut location = store_prefix.clone();
location.set_file_name(DB_RULES_FILE_NAME);
let len = data.len();
let stream_data = std::io::Result::Ok(data.freeze());
object_store
.put(
&location,
futures::stream::once(async move { stream_data }),
Some(len),
)
.await
.context(StoreError)?;
Ok(())
}
#[cfg(test)]
mod tests {
use std::{
@ -2268,7 +2244,7 @@ mod tests {
// creating database will now result in an error
let err = create_simple_database(&server, db_name).await.unwrap_err();
assert!(matches!(err, Error::CannotCreatePreservedCatalog { .. }));
assert!(matches!(err, Error::CannotCreateDatabase { .. }));
}
// run a sql query against the database, returning the results as record batches

View File

@ -1,5 +1,5 @@
//! Entrypoint of InfluxDB IOx binary
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,

View File

@ -1,8 +1,9 @@
#![recursion_limit = "512"]
/// Prints what CPU features are used by the compiler by default
/// Script from
/// https://stackoverflow.com/questions/65156743/what-target-features-uses-rustc-by-default
/// https://gist.github.com/AngelicosPhosphoros/4f8c9f08656e0812f4ed3560e53bd600
/// Prints what CPU features are used by the compiler by default.
///
/// Script from:
/// - <https://stackoverflow.com/questions/65156743/what-target-features-uses-rustc-by-default>
/// - <https://gist.github.com/AngelicosPhosphoros/4f8c9f08656e0812f4ed3560e53bd600>
// This script prints all cpu features which active in this build.
// There are 3 steps in usage of script:

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,

View File

@ -1,6 +1,6 @@
//! Log and trace initialization and setup
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -1,4 +1,7 @@
use std::sync::Arc;
use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
};
use data_types::{
database_rules::{DatabaseRules, WriteBufferConnection},
@ -8,39 +11,250 @@ use data_types::{
use crate::{
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
kafka::{KafkaBufferConsumer, KafkaBufferProducer},
mock::{MockBufferForReading, MockBufferForWriting, MockBufferSharedState},
};
/// Prefix for mocked connections.
pub const PREFIX_MOCK: &str = "mock://";
#[derive(Debug)]
pub enum WriteBufferConfig {
Writing(Arc<dyn WriteBufferWriting>),
Reading(Arc<tokio::sync::Mutex<Box<dyn WriteBufferReading>>>),
}
impl WriteBufferConfig {
pub async fn new(
/// Factory that creates [`WriteBufferConfig`] from [`DatabaseRules`].
#[derive(Debug)]
pub struct WriteBufferConfigFactory {
mocks: BTreeMap<String, MockBufferSharedState>,
}
impl WriteBufferConfigFactory {
/// Create new factory w/o any mocks.
pub fn new() -> Self {
Self {
mocks: Default::default(),
}
}
/// Registers new mock.
///
/// # Panics
/// When mock with identical name is already registered.
pub fn register_mock(&mut self, name: String, state: MockBufferSharedState) {
match self.mocks.entry(name) {
Entry::Vacant(v) => {
v.insert(state);
}
Entry::Occupied(o) => {
panic!("Mock with the name '{}' already registered", o.key());
}
}
}
fn get_mock(&self, name: &str) -> Result<MockBufferSharedState, WriteBufferError> {
self.mocks
.get(name)
.cloned()
.ok_or_else::<WriteBufferError, _>(|| format!("Unknown mock ID: {}", name).into())
}
/// Create new config.
pub async fn new_config(
&self,
server_id: ServerId,
rules: &DatabaseRules,
) -> Result<Option<Self>, WriteBufferError> {
) -> Result<Option<WriteBufferConfig>, WriteBufferError> {
let name = rules.db_name();
// Right now, the Kafka producer and consumers ar the only production implementations of the
// `WriteBufferWriting` and `WriteBufferReading` traits. If/when there are other kinds of
// write buffers, additional configuration will be needed to determine what kind of write
// buffer to use here.
match rules.write_buffer_connection.as_ref() {
Some(WriteBufferConnection::Writing(conn)) => {
let writer: Arc<dyn WriteBufferWriting> =
if let Some(conn) = conn.strip_prefix(PREFIX_MOCK) {
let state = self.get_mock(conn)?;
let mock_buffer = MockBufferForWriting::new(state);
Arc::new(mock_buffer) as _
} else {
let kafka_buffer = KafkaBufferProducer::new(conn, name)?;
Arc::new(kafka_buffer) as _
};
Ok(Some(Self::Writing(Arc::new(kafka_buffer) as _)))
Ok(Some(WriteBufferConfig::Writing(writer)))
}
Some(WriteBufferConnection::Reading(conn)) => {
let reader: Box<dyn WriteBufferReading> =
if let Some(conn) = conn.strip_prefix(PREFIX_MOCK) {
let state = self.get_mock(conn)?;
let mock_buffer = MockBufferForReading::new(state);
Box::new(mock_buffer) as _
} else {
let kafka_buffer = KafkaBufferConsumer::new(conn, server_id, name).await?;
Box::new(kafka_buffer) as _
};
Ok(Some(Self::Reading(Arc::new(tokio::sync::Mutex::new(
Box::new(kafka_buffer) as _,
)))))
Ok(Some(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(reader),
))))
}
None => Ok(None),
}
}
}
impl Default for WriteBufferConfigFactory {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use std::convert::TryFrom;
use data_types::DatabaseName;
use crate::mock::MockBufferSharedState;
use super::*;
#[tokio::test]
async fn test_none() {
let factory = WriteBufferConfigFactory::new();
let server_id = ServerId::try_from(1).unwrap();
let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap());
rules.write_buffer_connection = None;
assert!(factory
.new_config(server_id, &rules)
.await
.unwrap()
.is_none());
}
#[tokio::test]
async fn test_writing_kafka() {
let factory = WriteBufferConfigFactory::new();
let server_id = ServerId::try_from(1).unwrap();
let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap());
rules.write_buffer_connection =
Some(WriteBufferConnection::Writing("127.0.0.1:2".to_string()));
if let WriteBufferConfig::Writing(conn) = factory
.new_config(server_id, &rules)
.await
.unwrap()
.unwrap()
{
assert_eq!(conn.type_name(), "kafka");
} else {
panic!("not a writing connection");
}
}
#[tokio::test]
#[ignore = "waits forever to connect until https://github.com/influxdata/influxdb_iox/issues/2189 is solved"]
async fn test_reading_kafka() {
let factory = WriteBufferConfigFactory::new();
let server_id = ServerId::try_from(1).unwrap();
let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap());
rules.write_buffer_connection = Some(WriteBufferConnection::Reading("test".to_string()));
if let WriteBufferConfig::Reading(conn) = factory
.new_config(server_id, &rules)
.await
.unwrap()
.unwrap()
{
let conn = conn.lock().await;
assert_eq!(conn.type_name(), "kafka");
} else {
panic!("not a reading connection");
}
}
#[tokio::test]
async fn test_writing_mock() {
let mut factory = WriteBufferConfigFactory::new();
let state = MockBufferSharedState::empty_with_n_sequencers(1);
let mock_name = "some_mock";
factory.register_mock(mock_name.to_string(), state);
let server_id = ServerId::try_from(1).unwrap();
let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap());
rules.write_buffer_connection = Some(WriteBufferConnection::Writing(format!(
"mock://{}",
mock_name,
)));
if let WriteBufferConfig::Writing(conn) = factory
.new_config(server_id, &rules)
.await
.unwrap()
.unwrap()
{
assert_eq!(conn.type_name(), "mock");
} else {
panic!("not a writing connection");
}
// will error when state is unknown
rules.write_buffer_connection =
Some(WriteBufferConnection::Writing("mock://bar".to_string()));
let err = factory.new_config(server_id, &rules).await.unwrap_err();
assert!(err.to_string().starts_with("Unknown mock ID:"));
}
#[tokio::test]
async fn test_reading_mock() {
let mut factory = WriteBufferConfigFactory::new();
let state = MockBufferSharedState::empty_with_n_sequencers(1);
let mock_name = "some_mock";
factory.register_mock(mock_name.to_string(), state);
let server_id = ServerId::try_from(1).unwrap();
let mut rules = DatabaseRules::new(DatabaseName::new("foo").unwrap());
rules.write_buffer_connection = Some(WriteBufferConnection::Reading(format!(
"mock://{}",
mock_name,
)));
if let WriteBufferConfig::Reading(conn) = factory
.new_config(server_id, &rules)
.await
.unwrap()
.unwrap()
{
let conn = conn.lock().await;
assert_eq!(conn.type_name(), "mock");
} else {
panic!("not a reading connection");
}
// will error when state is unknown
rules.write_buffer_connection =
Some(WriteBufferConnection::Reading("mock://bar".to_string()));
let err = factory.new_config(server_id, &rules).await.unwrap_err();
assert!(err.to_string().starts_with("Unknown mock ID:"));
}
#[test]
#[should_panic(expected = "Mock with the name 'some_mock' already registered")]
fn test_register_mock_twice_panics() {
let mut factory = WriteBufferConfigFactory::new();
let state = MockBufferSharedState::empty_with_n_sequencers(1);
let mock_name = "some_mock";
factory.register_mock(mock_name.to_string(), state.clone());
factory.register_mock(mock_name.to_string(), state);
}
}

View File

@ -22,6 +22,9 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
entry: &Entry,
sequencer_id: u32,
) -> Result<(Sequence, DateTime<Utc>), WriteBufferError>;
/// Return type (like `"mock"` or `"kafka"`) of this writer.
fn type_name(&self) -> &'static str;
}
pub type FetchHighWatermarkFut<'a> = BoxFuture<'a, Result<u64, WriteBufferError>>;
@ -65,6 +68,9 @@ pub trait WriteBufferReading: Sync + Send + Debug + 'static {
sequencer_id: u32,
sequence_number: u64,
) -> Result<(), WriteBufferError>;
/// Return type (like `"mock"` or `"kafka"`) of this reader.
fn type_name(&self) -> &'static str;
}
pub mod test_utils {

View File

@ -80,6 +80,10 @@ impl WriteBufferWriting for KafkaBufferProducer {
timestamp,
))
}
fn type_name(&self) -> &'static str {
"kafka"
}
}
impl KafkaBufferProducer {
@ -230,6 +234,10 @@ impl WriteBufferReading for KafkaBufferConsumer {
Ok(())
}
fn type_name(&self) -> &'static str {
"kafka"
}
}
impl KafkaBufferConsumer {

View File

@ -1,4 +1,4 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,

View File

@ -91,6 +91,19 @@ impl MockBufferSharedState {
})
.collect()
}
/// Provides a way to wipe messages (e.g. to simulate retention periods in Kafka)
///
/// # Panics
/// - when sequencer does not exist
pub fn clear_messages(&self, sequencer_id: u32) {
let mut entries = self.entries.lock();
let entry_vec = entries
.get_mut(&sequencer_id)
.expect("invalid sequencer ID");
entry_vec.clear();
}
}
#[derive(Debug)]
@ -139,6 +152,10 @@ impl WriteBufferWriting for MockBufferForWriting {
Ok((sequence, timestamp))
}
fn type_name(&self) -> &'static str {
"mock"
}
}
#[derive(Debug, Default, Clone, Copy)]
@ -156,6 +173,10 @@ impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors {
)
.into())
}
fn type_name(&self) -> &'static str {
"mock"
}
}
/// Sequencer-specific playback state
@ -304,6 +325,10 @@ impl WriteBufferReading for MockBufferForReading {
Ok(())
}
fn type_name(&self) -> &'static str {
"mock"
}
}
#[cfg(test)]
@ -427,4 +452,38 @@ mod tests {
let state = MockBufferSharedState::empty_with_n_sequencers(2);
state.get_messages(2);
}
#[test]
#[should_panic(expected = "invalid sequencer ID")]
fn test_state_clear_messages_panic_wrong_sequencer() {
let state = MockBufferSharedState::empty_with_n_sequencers(2);
state.clear_messages(2);
}
#[test]
fn test_clear_messages() {
let state = MockBufferSharedState::empty_with_n_sequencers(2);
let entry = lp_to_entry("upc,region=east user=1 100");
let sequence_1 = Sequence::new(0, 11);
let sequence_2 = Sequence::new(1, 12);
state.push_entry(SequencedEntry::new_from_sequence(
sequence_1,
Utc::now(),
entry.clone(),
));
state.push_entry(SequencedEntry::new_from_sequence(
sequence_2,
Utc::now(),
entry,
));
assert_eq!(state.get_messages(0).len(), 1);
assert_eq!(state.get_messages(1).len(), 1);
state.clear_messages(0);
assert_eq!(state.get_messages(0).len(), 0);
assert_eq!(state.get_messages(1).len(), 1);
}
}