chore: enforce `clippy::future_not_send` for `parquet_file`

pull/24376/head
Marco Neumann 2021-06-09 18:18:27 +02:00
parent 0e2a291683
commit 4fe2d7af9c
5 changed files with 79 additions and 57 deletions

View File

@ -192,7 +192,7 @@ pub trait CatalogState {
/// Input to create a new empty instance.
///
/// See [`new_empty`](Self::new_empty) for details.
type EmptyInput;
type EmptyInput: Send;
/// Create empty state w/o any known files.
fn new_empty(data: Self::EmptyInput) -> Self;
@ -289,7 +289,7 @@ where
impl<S> PreservedCatalog<S>
where
S: CatalogState,
S: CatalogState + Send + Sync,
{
/// Checks if a preserved catalog exists.
pub async fn exists(
@ -309,7 +309,7 @@ where
pub async fn new_empty(
object_store: Arc<ObjectStore>,
server_id: ServerId,
db_name: impl Into<String>,
db_name: impl Into<String> + Send,
state_data: S::EmptyInput,
) -> Result<Self> {
let db_name = db_name.into();
@ -709,7 +709,7 @@ impl Display for TransactionKey {
/// Tracker for an open, uncommitted transaction.
struct OpenTransaction<S>
where
S: CatalogState,
S: CatalogState + Send + Sync,
{
next_state: Arc<S>,
proto: proto::Transaction,
@ -717,7 +717,7 @@ where
impl<S> OpenTransaction<S>
where
S: CatalogState,
S: CatalogState + Send + Sync,
{
/// Private API to create new transaction, users should always use [`PreservedCatalog::open_transaction`].
fn new(catalog_inner: &PreservedCatalogInner<S>, uuid: Uuid) -> Self {
@ -889,7 +889,7 @@ where
/// Dropping this object w/o calling [`commit`](Self::commit) will issue a warning.
pub struct TransactionHandle<'c, S>
where
S: CatalogState,
S: CatalogState + Send + Sync,
{
catalog: &'c PreservedCatalog<S>,
@ -903,7 +903,7 @@ where
impl<'c, S> TransactionHandle<'c, S>
where
S: CatalogState,
S: CatalogState + Send + Sync,
{
async fn new(catalog: &'c PreservedCatalog<S>, uuid: Uuid) -> TransactionHandle<'c, S> {
// first acquire semaphore (which is only being used for transactions), then get state lock
@ -1020,7 +1020,7 @@ where
impl<'c, S> Debug for TransactionHandle<'c, S>
where
S: CatalogState,
S: CatalogState + Send + Sync,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.transaction {
@ -1032,7 +1032,7 @@ where
impl<'c, S> Drop for TransactionHandle<'c, S>
where
S: CatalogState,
S: CatalogState + Send + Sync,
{
fn drop(&mut self) {
if self.transaction.is_some() {
@ -1042,8 +1042,10 @@ where
}
pub mod test_helpers {
use parking_lot::Mutex;
use super::*;
use std::{cell::RefCell, ops::Deref};
use std::ops::Deref;
/// Part that actually holds the data of [`TestCatalogState`].
#[derive(Clone, Debug)]
@ -1053,10 +1055,10 @@ pub mod test_helpers {
}
/// In-memory catalog state, for testing.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct TestCatalogState {
/// Inner mutable state.
pub inner: RefCell<TestCatalogStateInner>,
pub inner: Mutex<TestCatalogStateInner>,
}
impl CatalogState for TestCatalogState {
@ -1064,7 +1066,7 @@ pub mod test_helpers {
fn new_empty(_data: Self::EmptyInput) -> Self {
Self {
inner: RefCell::new(TestCatalogStateInner {
inner: Mutex::new(TestCatalogStateInner {
parquet_files: HashMap::new(),
}),
}
@ -1081,7 +1083,9 @@ pub mod test_helpers {
_db_name: &str,
info: CatalogParquetInfo,
) -> Result<()> {
match self.inner.borrow_mut().parquet_files.entry(info.path) {
let mut guard = self.inner.lock();
match guard.parquet_files.entry(info.path) {
Occupied(o) => {
return Err(Error::ParquetFileAlreadyExists {
path: o.key().clone(),
@ -1096,7 +1100,9 @@ pub mod test_helpers {
}
fn remove(&self, path: DirsAndFileName) -> Result<()> {
match self.inner.borrow_mut().parquet_files.entry(path) {
let mut guard = self.inner.lock();
match guard.parquet_files.entry(path) {
Occupied(o) => {
o.remove();
}
@ -1109,21 +1115,25 @@ pub mod test_helpers {
}
}
impl Clone for TestCatalogState {
fn clone(&self) -> Self {
let guard = self.inner.lock();
let inner = Mutex::new(guard.clone());
Self { inner }
}
}
/// Break preserved catalog by moving one of the transaction files into a weird unknown version.
pub async fn break_catalog_with_weird_version<S>(catalog: &PreservedCatalog<S>)
where
S: CatalogState,
S: CatalogState + Send + Sync,
{
let guard = catalog.inner.read();
let tkey = guard
.previous_tkey
.as_ref()
.expect("should have at least a single transaction");
let tkey = get_tkey(catalog);
let path = transaction_path(
&catalog.object_store,
catalog.server_id,
&catalog.db_name,
tkey,
&tkey,
);
let mut proto = load_transaction_proto(&catalog.object_store, &path)
.await
@ -1133,6 +1143,19 @@ pub mod test_helpers {
.await
.unwrap();
}
/// Helper function to ensure that guards don't leak into the future state machine.
fn get_tkey<S>(catalog: &PreservedCatalog<S>) -> TransactionKey
where
S: CatalogState + Send + Sync,
{
let guard = catalog.inner.read();
guard
.previous_tkey
.as_ref()
.expect("should have at least a single transaction")
.clone()
}
}
#[cfg(test)]
@ -1753,9 +1776,9 @@ mod tests {
/// Get sorted list of catalog files from state
fn get_catalog_parquet_files(state: &TestCatalogState) -> Vec<(String, ParquetMetaData)> {
let mut files: Vec<(String, ParquetMetaData)> = state
.inner
.borrow()
let guard = state.inner.lock();
let mut files: Vec<(String, ParquetMetaData)> = guard
.parquet_files
.iter()
.map(|(path, md)| (path.display(), md.clone()))

View File

@ -45,7 +45,7 @@ pub async fn cleanup_unreferenced_parquet_files<S>(
max_files: usize,
) -> Result<()>
where
S: CatalogState,
S: CatalogState + Send + Sync,
{
// Create a transaction to prevent parallel modifications of the catalog. This avoids that we delete files there
// that are about to get added to the catalog.

View File

@ -3,6 +3,7 @@
missing_copy_implementations,
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]

View File

@ -99,8 +99,8 @@ pub async fn rebuild_catalog<S, N>(
ignore_metadata_read_failure: bool,
) -> Result<PreservedCatalog<S>>
where
S: CatalogState,
N: Into<String>,
S: CatalogState + Send + Sync,
N: Into<String> + Send,
{
// collect all revisions from parquet files
let revisions =
@ -324,15 +324,13 @@ mod tests {
}
// store catalog state
let mut paths_expected: Vec<_> = catalog
.state()
.inner
.borrow()
.parquet_files
.keys()
.cloned()
.collect();
paths_expected.sort();
let paths_expected = {
let state = catalog.state();
let guard = state.inner.lock();
let mut tmp: Vec<_> = guard.parquet_files.keys().cloned().collect();
tmp.sort();
tmp
};
// wipe catalog
drop(catalog);
@ -354,15 +352,13 @@ mod tests {
.unwrap();
// check match
let mut paths_actual: Vec<_> = catalog
.state()
.inner
.borrow()
.parquet_files
.keys()
.cloned()
.collect();
paths_actual.sort();
let paths_actual = {
let state = catalog.state();
let guard = state.inner.lock();
let mut tmp: Vec<_> = guard.parquet_files.keys().cloned().collect();
tmp.sort();
tmp
};
assert_eq!(paths_actual, paths_expected);
assert_eq!(catalog.revision_counter(), 3);
}
@ -403,7 +399,9 @@ mod tests {
.unwrap();
// check match
assert!(catalog.state().inner.borrow().parquet_files.is_empty());
let state = catalog.state();
let guard = state.inner.lock();
assert!(guard.parquet_files.is_empty());
assert_eq!(catalog.revision_counter(), 0);
}
@ -562,7 +560,9 @@ mod tests {
)
.await
.unwrap();
assert!(catalog.state().inner.borrow().parquet_files.is_empty());
let state = catalog.state();
let guard = state.inner.lock();
assert!(guard.parquet_files.is_empty());
assert_eq!(catalog.revision_counter(), 0);
}

View File

@ -3021,15 +3021,13 @@ mod tests {
.await
.unwrap()
.unwrap();
let mut paths_actual: Vec<String> = preserved_catalog
.state()
.inner
.borrow()
.parquet_files
.keys()
.map(|p| p.display())
.collect();
paths_actual.sort();
let paths_actual = {
let state = preserved_catalog.state();
let guard = state.inner.lock();
let mut tmp: Vec<String> = guard.parquet_files.keys().map(|p| p.display()).collect();
tmp.sort();
tmp
};
assert_eq!(paths_actual, paths_expected);
// ==================== do: re-load DB ====================