Merge branch 'main' into cn/alias-db-commands

pull/24376/head
kodiakhq[bot] 2021-11-18 15:13:43 +00:00 committed by GitHub
commit c9f02f83e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 140 additions and 27 deletions

View File

@ -371,7 +371,13 @@ jobs:
#
# To modify the contents of the CI image, update docker/Dockerfile.ci
ci_image:
machine: true
# Use a `machine` executor instead of a docker-based because:
# - we bootstrap our CI images somehow (we could use a docker-hub image as well)
# - the machine executor also supports multi-arch builds
# - we only run this job once a day, so the additional startup delay of 30-60s doesn't really matter
machine:
# https://circleci.com/docs/2.0/configuration-reference/#available-machine-images
image: ubuntu-2004:202107-02
resource_class: xlarge
steps:
- checkout
@ -380,11 +386,9 @@ jobs:
- run: |
COMMIT_SHA=$(git rev-parse --short HEAD)
RUST_VERSION=$(sed -E -ne 's/channel = "(.*)"/\1/p' rust-toolchain.toml)
docker build -t quay.io/influxdb/rust:$COMMIT_SHA -f docker/Dockerfile.ci --build-arg RUST_VERSION=$RUST_VERSION .
docker tag quay.io/influxdb/rust:$COMMIT_SHA quay.io/influxdb/rust:ci
docker push quay.io/influxdb/rust:$COMMIT_SHA
docker push quay.io/influxdb/rust:ci
docker --version
docker build -t quay.io/influxdb/rust:$COMMIT_SHA -t quay.io/influxdb/rust:ci -f docker/Dockerfile.ci --build-arg RUST_VERSION=$RUST_VERSION .
docker push --all-tags quay.io/influxdb/rust
parameters:

View File

@ -8,3 +8,4 @@
pub mod access;
pub mod freezable;
pub mod mailbox;

View File

@ -0,0 +1,115 @@
use std::sync::Arc;
/// A mailbox consists of two arrays, an inbox and an outbox
///
/// * [`Mailbox::push`] can be used to add entries to the inbox
/// * [`Mailbox::consume`] rotates data into the outbox and allows it to be consumed
///
/// This achieves the following goals:
///
/// * Allows data to continue to arrive whilst data is being consumed
/// * Allows multiple consumers to coordinate preventing concurrent consumption
///
/// This is different from a tokio channel in the following ways:
///
/// 1. The contents can be inspected prior to removal from the mailbox
/// (important as catalog transactions are fallible)
///
/// 2. Potentially multiple consumers can acquire exclusive access for
/// the duration of a catalog transaction
///
/// 3. Users can ensure that everything in the mailbox is consumed
///
#[derive(Debug)]
pub struct Mailbox<T> {
outbox: Arc<tokio::sync::Mutex<Vec<T>>>,
inbox: parking_lot::Mutex<Vec<T>>,
}
impl<T> Default for Mailbox<T> {
fn default() -> Self {
Self {
outbox: Default::default(),
inbox: Default::default(),
}
}
}
impl<T> Mailbox<T> {
/// Add an item to the inbox
pub fn push(&self, item: T) {
self.inbox.lock().push(item)
}
/// Get a handle to consume some data from this [`Mailbox`]
///
/// Rotates the inbox into the outbox and returns a [`MailboxHandle`]
pub async fn consume(&self) -> MailboxHandle<T> {
let mut outbox = Arc::clone(&self.outbox).lock_owned().await;
outbox.append(&mut self.inbox.lock());
MailboxHandle { outbox }
}
}
/// A [`MailboxHandle`] grants the ability to consume elements from the outbox of a [`Mailbox`]
///
/// Whilst a [`MailboxHandle`] is held for a [`Mailbox`]:
///
/// * Another [`MailboxHandle`] cannot be obtained
/// * Entries can be added to the [`Mailbox`]'s inbox
/// * Entries cannot be added to the [`Mailbox`]'s outbox
///
#[derive(Debug)]
pub struct MailboxHandle<T> {
outbox: tokio::sync::OwnedMutexGuard<Vec<T>>,
}
impl<T> MailboxHandle<T> {
/// Returns the outbox of the associated [`Mailbox`]
pub fn outbox(&self) -> &[T] {
&self.outbox
}
/// Flush the outbox of the [`Mailbox`]
pub fn flush(mut self) {
self.outbox.clear()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mailbox() {
let mailbox: Mailbox<i32> = Default::default();
mailbox.push(1);
mailbox.push(2);
let handle = mailbox.consume().await;
// Can continue to push items
mailbox.push(3);
mailbox.push(4);
assert_eq!(handle.outbox(), &[1, 2]);
// Drop handle without flushing
std::mem::drop(handle);
mailbox.push(5);
let handle = mailbox.consume().await;
mailbox.push(6);
assert_eq!(handle.outbox(), &[1, 2, 3, 4, 5]);
handle.flush();
let handle = mailbox.consume().await;
assert_eq!(handle.outbox(), &[6]);
}
}

View File

@ -28,6 +28,7 @@ use data_types::{
};
use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
use dml::{DmlDelete, DmlOperation, DmlWrite};
use internal_types::mailbox::Mailbox;
use iox_object_store::IoxObjectStore;
use mutable_batch::payload::PartitionWrite;
use mutable_buffer::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
@ -279,7 +280,7 @@ pub struct Db {
time_provider: Arc<dyn TimeProvider>,
/// To-be-written delete predicates.
delete_predicates_mailbox: Mutex<Vec<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)>>,
delete_predicates_mailbox: Mailbox<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)>,
/// TESTING ONLY: Override of IDs for persisted chunks.
persisted_chunk_id_override: Mutex<Option<ChunkId>>,
@ -575,8 +576,8 @@ impl Db {
}
if !affected_persisted_chunks.is_empty() {
let mut guard = self.delete_predicates_mailbox.lock();
guard.push((delete_predicate, affected_persisted_chunks));
self.delete_predicates_mailbox
.push((delete_predicate, affected_persisted_chunks));
}
Ok(())
@ -839,27 +840,15 @@ impl Db {
// worker loop to persist delete predicates
let delete_predicate_persistence_loop = async {
loop {
let todo: Vec<_> = {
let guard = self.delete_predicates_mailbox.lock();
guard.clone()
};
if !todo.is_empty() {
match self.preserve_delete_predicates(&todo).await {
Ok(()) => {
let mut guard = self.delete_predicates_mailbox.lock();
// TODO: we could also run a de-duplication here once
// https://github.com/influxdata/influxdb_iox/issues/2626 is implemented
guard.drain(0..todo.len());
}
Err(e) => {
error!(%e, "cannot preserve delete predicates");
}
}
let handle = self.delete_predicates_mailbox.consume().await;
match self.preserve_delete_predicates(handle.outbox()).await {
Ok(()) => handle.flush(),
Err(e) => error!(%e, "cannot preserve delete predicates"),
}
self.worker_iterations_delete_predicate_preservation
.fetch_add(1, Ordering::Relaxed);
tokio::time::sleep(Duration::from_secs(1)).await;
}
};
@ -886,9 +875,13 @@ impl Db {
}
async fn preserve_delete_predicates(
self: &Arc<Self>,
&self,
predicates: &[(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)],
) -> Result<(), parquet_catalog::core::Error> {
if predicates.is_empty() {
return Ok(());
}
let mut transaction = self.preserved_catalog.open_transaction().await;
for (predicate, chunks) in predicates {
transaction.delete_predicate(predicate, chunks);