From c9168a2c132a97071d16a015d9a836861f35888b Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 17 Nov 2021 14:17:27 +0100 Subject: [PATCH 1/5] ci: update CI image builder to use newer docker This is a precondition to build ARM64 CI images. --- .circleci/config.yml | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c378c7e5cd..cc83121397 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -371,7 +371,8 @@ jobs: # # To modify the contents of the CI image, update docker/Dockerfile.ci ci_image: - machine: true + machine: + image: ubuntu-2004:202107-02 resource_class: xlarge steps: - checkout @@ -380,11 +381,9 @@ jobs: - run: | COMMIT_SHA=$(git rev-parse --short HEAD) RUST_VERSION=$(sed -E -ne 's/channel = "(.*)"/\1/p' rust-toolchain.toml) - docker build -t quay.io/influxdb/rust:$COMMIT_SHA -f docker/Dockerfile.ci --build-arg RUST_VERSION=$RUST_VERSION . - docker tag quay.io/influxdb/rust:$COMMIT_SHA quay.io/influxdb/rust:ci - docker push quay.io/influxdb/rust:$COMMIT_SHA - docker push quay.io/influxdb/rust:ci - + docker --version + docker build -t quay.io/influxdb/rust:$COMMIT_SHA -t quay.io/influxdb/rust:ci -f docker/Dockerfile.ci --build-arg RUST_VERSION=$RUST_VERSION . + docker push --all-tags quay.io/influxdb/rust parameters: From 714fc85c8dd4900b04af9f82827a52914f482979 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 18 Nov 2021 09:34:06 +0000 Subject: [PATCH 2/5] refactor: extract Mailbox type (#3126) (#3142) * refactor: extract Mailbox type (#3126) * fix: doc * chore: review feedback Co-authored-by: Andrew Lamb Co-authored-by: Andrew Lamb --- internal_types/src/lib.rs | 1 + internal_types/src/mailbox.rs | 105 ++++++++++++++++++++++++++++++++++ server/src/db.rs | 35 +++++------- 3 files changed, 120 insertions(+), 21 deletions(-) create mode 100644 internal_types/src/mailbox.rs diff --git a/internal_types/src/lib.rs b/internal_types/src/lib.rs index f3c0c95782..411541ca3f 100644 --- a/internal_types/src/lib.rs +++ b/internal_types/src/lib.rs @@ -8,3 +8,4 @@ pub mod access; pub mod freezable; +pub mod mailbox; diff --git a/internal_types/src/mailbox.rs b/internal_types/src/mailbox.rs new file mode 100644 index 0000000000..b0436219fb --- /dev/null +++ b/internal_types/src/mailbox.rs @@ -0,0 +1,105 @@ +use std::sync::Arc; + +/// A mailbox consists of two arrays, an inbox and an outbox +/// +/// * [`Mailbox::push`] can be used to add entries to the inbox +/// * [`Mailbox::consume`] rotates data into the outbox and allows it to be consumed +/// +/// This achieves the following goals: +/// +/// * Allows data to continue to arrive whilst data is being consumed +/// * Allows multiple consumers to coordinate preventing concurrent consumption +/// +#[derive(Debug)] +pub struct Mailbox { + outbox: Arc>>, + inbox: parking_lot::Mutex>, +} + +impl Default for Mailbox { + fn default() -> Self { + Self { + outbox: Default::default(), + inbox: Default::default(), + } + } +} + +impl Mailbox { + /// Add an item to the inbox + pub fn push(&self, item: T) { + self.inbox.lock().push(item) + } + + /// Get a handle to consume some data from this [`Mailbox`] + /// + /// Rotates the inbox into the outbox and returns a [`MailboxHandle`] + pub async fn consume(&self) -> MailboxHandle { + let mut outbox = Arc::clone(&self.outbox).lock_owned().await; + outbox.append(&mut self.inbox.lock()); + MailboxHandle { outbox } + } +} + +/// A [`MailboxHandle`] grants the ability to consume elements from the outbox of a [`Mailbox`] +/// +/// Whilst a [`MailboxHandle`] is held for a [`Mailbox`]: +/// +/// * Another [`MailboxHandle`] cannot be obtained +/// * Entries can be added to the [`Mailbox`]'s inbox +/// * Entries cannot be added to the [`Mailbox`]'s outbox +/// +#[derive(Debug)] +pub struct MailboxHandle { + outbox: tokio::sync::OwnedMutexGuard>, +} + +impl MailboxHandle { + /// Returns the outbox of the associated [`Mailbox`] + pub fn outbox(&self) -> &[T] { + &self.outbox + } + + /// Flush the outbox of the [`Mailbox`] + pub fn flush(mut self) { + self.outbox.clear() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_mailbox() { + let mailbox: Mailbox = Default::default(); + + mailbox.push(1); + mailbox.push(2); + + let handle = mailbox.consume().await; + + // Can continue to push items + mailbox.push(3); + mailbox.push(4); + + assert_eq!(handle.outbox(), &[1, 2]); + + // Drop handle without flushing + std::mem::drop(handle); + + mailbox.push(5); + + let handle = mailbox.consume().await; + + mailbox.push(6); + + assert_eq!(handle.outbox(), &[1, 2, 3, 4, 5]); + + handle.flush(); + + let handle = mailbox.consume().await; + + assert_eq!(handle.outbox(), &[6]); + } +} diff --git a/server/src/db.rs b/server/src/db.rs index f263829f0f..437d830e7a 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -28,6 +28,7 @@ use data_types::{ }; use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider}; use dml::{DmlDelete, DmlOperation, DmlWrite}; +use internal_types::mailbox::Mailbox; use iox_object_store::IoxObjectStore; use mutable_batch::payload::PartitionWrite; use mutable_buffer::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; @@ -279,7 +280,7 @@ pub struct Db { time_provider: Arc, /// To-be-written delete predicates. - delete_predicates_mailbox: Mutex, Vec)>>, + delete_predicates_mailbox: Mailbox<(Arc, Vec)>, /// TESTING ONLY: Override of IDs for persisted chunks. persisted_chunk_id_override: Mutex>, @@ -575,8 +576,8 @@ impl Db { } if !affected_persisted_chunks.is_empty() { - let mut guard = self.delete_predicates_mailbox.lock(); - guard.push((delete_predicate, affected_persisted_chunks)); + self.delete_predicates_mailbox + .push((delete_predicate, affected_persisted_chunks)); } Ok(()) @@ -839,27 +840,15 @@ impl Db { // worker loop to persist delete predicates let delete_predicate_persistence_loop = async { loop { - let todo: Vec<_> = { - let guard = self.delete_predicates_mailbox.lock(); - guard.clone() - }; - - if !todo.is_empty() { - match self.preserve_delete_predicates(&todo).await { - Ok(()) => { - let mut guard = self.delete_predicates_mailbox.lock(); - // TODO: we could also run a de-duplication here once - // https://github.com/influxdata/influxdb_iox/issues/2626 is implemented - guard.drain(0..todo.len()); - } - Err(e) => { - error!(%e, "cannot preserve delete predicates"); - } - } + let handle = self.delete_predicates_mailbox.consume().await; + match self.preserve_delete_predicates(handle.outbox()).await { + Ok(()) => handle.flush(), + Err(e) => error!(%e, "cannot preserve delete predicates"), } self.worker_iterations_delete_predicate_preservation .fetch_add(1, Ordering::Relaxed); + tokio::time::sleep(Duration::from_secs(1)).await; } }; @@ -886,9 +875,13 @@ impl Db { } async fn preserve_delete_predicates( - self: &Arc, + &self, predicates: &[(Arc, Vec)], ) -> Result<(), parquet_catalog::core::Error> { + if predicates.is_empty() { + return Ok(()); + } + let mut transaction = self.preserved_catalog.open_transaction().await; for (predicate, chunks) in predicates { transaction.delete_predicate(predicate, chunks); From fef6cafa24b45c2e79b8ba3c931beb027b078737 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 18 Nov 2021 10:55:36 +0100 Subject: [PATCH 3/5] ci: explain some circle decisions --- .circleci/config.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index cc83121397..04f9ff3202 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -371,7 +371,12 @@ jobs: # # To modify the contents of the CI image, update docker/Dockerfile.ci ci_image: + # Use a `machine` executor instead of a docker-based because: + # - we bootstrap our CI images somehow (we could use a docker-hub image as well) + # - the machine executor also supports multi-arch builds + # - we only run this job once a day, so the additional startup delay of 30-60s doesn't really matter machine: + # https://circleci.com/docs/2.0/configuration-reference/#available-machine-images image: ubuntu-2004:202107-02 resource_class: xlarge steps: From 1fae3559cf39fe6a3412546f58841e2437c55e8c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 18 Nov 2021 08:08:03 -0500 Subject: [PATCH 4/5] docs: document differences between Mailbox and channel (#3148) --- internal_types/src/mailbox.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/internal_types/src/mailbox.rs b/internal_types/src/mailbox.rs index b0436219fb..afd885c2db 100644 --- a/internal_types/src/mailbox.rs +++ b/internal_types/src/mailbox.rs @@ -10,6 +10,17 @@ use std::sync::Arc; /// * Allows data to continue to arrive whilst data is being consumed /// * Allows multiple consumers to coordinate preventing concurrent consumption /// +/// This is different than a tokio channel in the following ways: +/// +/// 1. The contents can be inspected prior to remval from the mailbox +/// (important as catalog transactions are fallible) +/// +/// 2. Potentially multiple consumers can acquire exclusive access for +/// the duration of a catalog transaction +/// +/// 3. Users can ensure that everything in the mailbox is +/// consumed +/// #[derive(Debug)] pub struct Mailbox { outbox: Arc>>, From 5e7336b4758a2d4cdf735341bb5abe2a126c3c21 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 18 Nov 2021 09:19:19 -0500 Subject: [PATCH 5/5] docs: Tweak comments on Mailbox (#3152) --- internal_types/src/mailbox.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal_types/src/mailbox.rs b/internal_types/src/mailbox.rs index afd885c2db..ff3790615a 100644 --- a/internal_types/src/mailbox.rs +++ b/internal_types/src/mailbox.rs @@ -10,16 +10,15 @@ use std::sync::Arc; /// * Allows data to continue to arrive whilst data is being consumed /// * Allows multiple consumers to coordinate preventing concurrent consumption /// -/// This is different than a tokio channel in the following ways: +/// This is different from a tokio channel in the following ways: /// -/// 1. The contents can be inspected prior to remval from the mailbox +/// 1. The contents can be inspected prior to removal from the mailbox /// (important as catalog transactions are fallible) /// /// 2. Potentially multiple consumers can acquire exclusive access for /// the duration of a catalog transaction /// -/// 3. Users can ensure that everything in the mailbox is -/// consumed +/// 3. Users can ensure that everything in the mailbox is consumed /// #[derive(Debug)] pub struct Mailbox {