fix: Remove lifecycle crate
parent
e7de16732d
commit
e63f006398
|
@ -3086,21 +3086,6 @@ version = "0.2.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33a33a362ce288760ec6a508b94caaec573ae7d3bbbd91b87aa0bad4456839db"
|
||||
|
||||
[[package]]
|
||||
name = "lifecycle"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"data_types",
|
||||
"futures",
|
||||
"hashbrown 0.12.0",
|
||||
"iox_time",
|
||||
"observability_deps",
|
||||
"parking_lot 0.12.0",
|
||||
"tokio",
|
||||
"tracker",
|
||||
"workspace-hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "linked-hash-map"
|
||||
version = "0.5.4"
|
||||
|
@ -5342,7 +5327,6 @@ dependencies = [
|
|||
"iox_object_store",
|
||||
"iox_time",
|
||||
"job_registry",
|
||||
"lifecycle",
|
||||
"metric",
|
||||
"mutable_batch_lp",
|
||||
"num_cpus",
|
||||
|
|
|
@ -38,7 +38,6 @@ members = [
|
|||
"ioxd_router2",
|
||||
"ioxd_test",
|
||||
"job_registry",
|
||||
"lifecycle",
|
||||
"logfmt",
|
||||
"metric",
|
||||
"metric_exporters",
|
||||
|
|
|
@ -1,20 +0,0 @@
|
|||
[package]
|
||||
name = "lifecycle"
|
||||
version = "0.1.0"
|
||||
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
|
||||
edition = "2021"
|
||||
description = "Implements the IOx data lifecycle"
|
||||
|
||||
[dependencies]
|
||||
data_types = { path = "../data_types" }
|
||||
futures = "0.3"
|
||||
hashbrown = "0.12"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.12"
|
||||
iox_time = { path = "../iox_time" }
|
||||
tokio = { version = "1.18", features = ["macros", "parking_lot", "time"] }
|
||||
tracker = { path = "../tracker" }
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.18", features = ["macros", "parking_lot", "rt", "time"] }
|
|
@ -1,197 +0,0 @@
|
|||
//! This module contains lock guards for use in the lifecycle traits
|
||||
//!
|
||||
//! Specifically they exist to work around a lack of support for generic associated
|
||||
//! types within traits. <https://github.com/rust-lang/rust/issues/44265>
|
||||
//!
|
||||
//! ```ignore
|
||||
//! trait MyTrait {
|
||||
//! type Guard;
|
||||
//!
|
||||
//! fn read(&self) -> Self::Guard<'_> <-- this is not valid rust
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! The structs in this module therefore provide concrete types that can be
|
||||
//! used in their place
|
||||
//!
|
||||
//! ```
|
||||
//! use lifecycle::LifecycleReadGuard;
|
||||
//! trait MyTrait {
|
||||
//! type AdditionalData;
|
||||
//! type LockedType;
|
||||
//!
|
||||
//! fn read(&self) -> LifecycleReadGuard<'_, Self::LockedType, Self::AdditionalData>;
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! One drawback of this approach is that the type returned from the read() method can't
|
||||
//! be a user-provided type that implements a trait
|
||||
//!
|
||||
//! ```ignore
|
||||
//! trait MyTrait {
|
||||
//! type Guard: GuardTrait; <-- this makes for a nice API
|
||||
//!
|
||||
//! fn read(&self) -> Self::Guard<'_> <-- this is not valid rust
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! Instead we have to use associated functions, which are a bit more cumbersome
|
||||
//!
|
||||
//! ```
|
||||
//! use lifecycle::LifecycleReadGuard;
|
||||
//! use tracker::RwLock;
|
||||
//! use std::sync::Arc;
|
||||
//!
|
||||
//! trait Lockable {
|
||||
//! type AdditionalData;
|
||||
//! type LockedType;
|
||||
//!
|
||||
//! fn read(&self) -> LifecycleReadGuard<'_, Self::LockedType, Self::AdditionalData>;
|
||||
//!
|
||||
//! fn guard_func(s: LifecycleReadGuard<'_, Self::LockedType, Self::AdditionalData>) -> u32;
|
||||
//! }
|
||||
//!
|
||||
//! struct Locked {
|
||||
//! num: u32,
|
||||
//! }
|
||||
//!
|
||||
//! #[derive(Clone)]
|
||||
//! struct MyLockable {
|
||||
//! offset: u32,
|
||||
//! data: Arc<RwLock<Locked>>
|
||||
//! }
|
||||
//!
|
||||
//! impl Lockable for MyLockable {
|
||||
//! type AdditionalData = Self;
|
||||
//! type LockedType = Locked;
|
||||
//!
|
||||
//! fn read(&self) -> LifecycleReadGuard<'_, Self::LockedType, Self::AdditionalData> {
|
||||
//! LifecycleReadGuard::new(self.clone(), &self.data)
|
||||
//! }
|
||||
//!
|
||||
//! fn guard_func(s: LifecycleReadGuard<'_, Self::LockedType, Self::AdditionalData>) -> u32 {
|
||||
//! s.num + s.data().offset
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! let lockable = MyLockable{ offset: 32, data: Arc::new(RwLock::new(Locked{ num: 1 }))};
|
||||
//! assert_eq!(MyLockable::guard_func(lockable.read()), 33);
|
||||
//! lockable.data.write().num = 21;
|
||||
//! assert_eq!(MyLockable::guard_func(lockable.read()), 53);
|
||||
//!
|
||||
//! ```
|
||||
//!
|
||||
|
||||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
ops::{Deref, DerefMut},
|
||||
};
|
||||
|
||||
use tracker::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard};
|
||||
|
||||
/// The `LifecycleReadGuard` combines a `RwLockUpgradableReadGuard` with an arbitrary
|
||||
/// data payload of type `D`.
|
||||
///
|
||||
/// The data of `P` can be immutably accessed through `std::ops::Deref` akin
|
||||
/// to a normal read guard or smart pointer
|
||||
///
|
||||
/// Note: The `LifecycleReadGuard` will not block other readers to `RwLock<P>` but
|
||||
/// they will block other upgradeable readers, e.g. other `LifecycleReadGuard`
|
||||
#[derive(Debug)]
|
||||
pub struct LifecycleReadGuard<'a, P, D> {
|
||||
data: D,
|
||||
guard: RwLockUpgradableReadGuard<'a, P>,
|
||||
}
|
||||
|
||||
impl<'a, P, D> LifecycleReadGuard<'a, P, D> {
|
||||
/// Create a new `LifecycleReadGuard` from the provided lock
|
||||
pub fn new(data: D, lock: &'a RwLock<P>) -> Self {
|
||||
let guard = lock.upgradable_read();
|
||||
Self { data, guard }
|
||||
}
|
||||
|
||||
/// Upgrades this to a `LifecycleWriteGuard`
|
||||
pub fn upgrade(self) -> LifecycleWriteGuard<'a, P, D> {
|
||||
let guard = RwLockUpgradableReadGuard::upgrade(self.guard);
|
||||
LifecycleWriteGuard {
|
||||
data: self.data,
|
||||
guard,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the contained data payload
|
||||
pub fn data(&self) -> &D {
|
||||
&self.data
|
||||
}
|
||||
|
||||
/// Drops the locks held by this guard and returns the data payload
|
||||
pub fn into_data(self) -> D {
|
||||
self.data
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, P: Display, D> Display for LifecycleReadGuard<'a, P, D> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.guard.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, P, D> Deref for LifecycleReadGuard<'a, P, D> {
|
||||
type Target = P;
|
||||
#[inline]
|
||||
fn deref(&self) -> &P {
|
||||
&self.guard
|
||||
}
|
||||
}
|
||||
|
||||
/// A `LifecycleWriteGuard` combines a `RwLockWriteGuard` with an arbitrary
|
||||
/// data payload of type `D`.
|
||||
///
|
||||
/// The data of `P` can be immutably accessed through `std::ops::Deref` akin to
|
||||
/// a normal read guard or smart pointer, and also mutably through
|
||||
/// `std::ops::DerefMut` akin to a normal write guard
|
||||
///
|
||||
#[derive(Debug)]
|
||||
pub struct LifecycleWriteGuard<'a, P, D> {
|
||||
data: D,
|
||||
guard: RwLockWriteGuard<'a, P>,
|
||||
}
|
||||
|
||||
impl<'a, P, D> LifecycleWriteGuard<'a, P, D> {
|
||||
/// Create a new `LifecycleWriteGuard` from the provided lock
|
||||
pub fn new(data: D, lock: &'a RwLock<P>) -> Self {
|
||||
let guard = lock.write();
|
||||
Self { data, guard }
|
||||
}
|
||||
|
||||
/// Returns a reference to the contained data payload
|
||||
pub fn data(&self) -> &D {
|
||||
&self.data
|
||||
}
|
||||
|
||||
/// Drops the locks held by this guard and returns the data payload
|
||||
pub fn into_data(self) -> D {
|
||||
self.data
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, P: Display, D> Display for LifecycleWriteGuard<'a, P, D> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.guard.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, P, D> Deref for LifecycleWriteGuard<'a, P, D> {
|
||||
type Target = P;
|
||||
#[inline]
|
||||
fn deref(&self) -> &P {
|
||||
&self.guard
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, P, D> DerefMut for LifecycleWriteGuard<'a, P, D> {
|
||||
#[inline]
|
||||
fn deref_mut(&mut self) -> &mut P {
|
||||
&mut self.guard
|
||||
}
|
||||
}
|
|
@ -1,208 +0,0 @@
|
|||
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
|
||||
#![warn(
|
||||
missing_copy_implementations,
|
||||
missing_debug_implementations,
|
||||
clippy::explicit_iter_loop,
|
||||
clippy::future_not_send,
|
||||
clippy::use_self,
|
||||
clippy::clone_on_ref_ptr
|
||||
)]
|
||||
|
||||
use data_types::{
|
||||
chunk_metadata::{ChunkAddr, ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkStorage},
|
||||
database_rules::LifecycleRules,
|
||||
DatabaseName,
|
||||
};
|
||||
use internal_types::access::AccessMetrics;
|
||||
use std::sync::Arc;
|
||||
use tracker::TaskTracker;
|
||||
|
||||
mod guard;
|
||||
pub use guard::*;
|
||||
mod policy;
|
||||
use iox_time::{Time, TimeProvider};
|
||||
pub use policy::*;
|
||||
|
||||
/// A trait that encapsulates the database logic that is automated by `LifecyclePolicy`
|
||||
pub trait LifecycleDb {
|
||||
type Chunk: LockableChunk;
|
||||
type Partition: LockablePartition;
|
||||
|
||||
/// Return the in-memory size of the database. We expect this
|
||||
/// to change from call to call as chunks are dropped
|
||||
fn buffer_size(&self) -> usize;
|
||||
|
||||
/// Returns the lifecycle policy
|
||||
fn rules(&self) -> LifecycleRules;
|
||||
|
||||
/// Returns a list of lockable partitions in the database
|
||||
fn partitions(&self) -> Vec<Self::Partition>;
|
||||
|
||||
/// Return the database name.
|
||||
fn name(&self) -> DatabaseName<'static>;
|
||||
|
||||
/// Return the time provider for this database
|
||||
fn time_provider(&self) -> &Arc<dyn TimeProvider>;
|
||||
}
|
||||
|
||||
/// A `LockablePartition` is a wrapper around a `LifecyclePartition` that allows
|
||||
/// for planning and executing lifecycle actions on the partition
|
||||
pub trait LockablePartition: Sized {
|
||||
type Partition: LifecyclePartition;
|
||||
type Chunk: LockableChunk;
|
||||
type PersistHandle: PersistHandle + Send + Sync + 'static;
|
||||
|
||||
type Error: std::error::Error + Send + Sync;
|
||||
|
||||
/// Acquire a shared read lock on the chunk
|
||||
fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self>;
|
||||
|
||||
/// Acquire an exclusive write lock on the chunk
|
||||
fn write(&self) -> LifecycleWriteGuard<'_, Self::Partition, Self>;
|
||||
|
||||
/// Returns a specific chunk
|
||||
fn chunk(
|
||||
s: &LifecycleReadGuard<'_, Self::Partition, Self>,
|
||||
chunk_id: ChunkId,
|
||||
) -> Option<Self::Chunk>;
|
||||
|
||||
/// Return a list of lockable chunks in this partition.
|
||||
///
|
||||
/// This must be ordered by `(order, id)`.
|
||||
fn chunks(s: &LifecycleReadGuard<'_, Self::Partition, Self>) -> Vec<Self::Chunk>;
|
||||
|
||||
/// Compact chunks into a single read buffer chunk
|
||||
///
|
||||
/// TODO: Encapsulate these locks into a CatalogTransaction object
|
||||
fn compact_chunks(
|
||||
partition: LifecycleWriteGuard<'_, Self::Partition, Self>,
|
||||
chunks: Vec<LifecycleWriteGuard<'_, <Self::Chunk as LockableChunk>::Chunk, Self::Chunk>>,
|
||||
) -> Result<TaskTracker<<Self::Chunk as LockableChunk>::Job>, Self::Error>;
|
||||
|
||||
/// Compact object store chunks into a single object store chunk
|
||||
fn compact_object_store_chunks(
|
||||
partition: LifecycleWriteGuard<'_, Self::Partition, Self>,
|
||||
chunks: Vec<LifecycleWriteGuard<'_, <Self::Chunk as LockableChunk>::Chunk, Self::Chunk>>,
|
||||
) -> Result<TaskTracker<<Self::Chunk as LockableChunk>::Job>, Self::Error>;
|
||||
|
||||
/// Returns a PersistHandle for the provided partition, and the
|
||||
/// timestamp up to which to to flush
|
||||
///
|
||||
/// Returns None if there is a persistence operation in flight, or
|
||||
/// if there are no persistable windows.
|
||||
///
|
||||
/// If `force` is `true` will persist all unpersisted data regardless of arrival time
|
||||
fn prepare_persist(
|
||||
partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>,
|
||||
force: bool,
|
||||
) -> Option<Self::PersistHandle>;
|
||||
|
||||
/// Split and persist chunks.
|
||||
///
|
||||
/// Combines and deduplicates the data in `chunks` into two new chunks:
|
||||
///
|
||||
/// 1. A read buffer chunk that contains any rows with timestamps
|
||||
/// prior to `flush_timestamp`
|
||||
///
|
||||
/// 2. A read buffer chunk (also written to the object store) with
|
||||
/// all other rows
|
||||
///
|
||||
/// TODO: Encapsulate these locks into a CatalogTransaction object
|
||||
fn persist_chunks(
|
||||
partition: LifecycleWriteGuard<'_, Self::Partition, Self>,
|
||||
chunks: Vec<LifecycleWriteGuard<'_, <Self::Chunk as LockableChunk>::Chunk, Self::Chunk>>,
|
||||
handle: Self::PersistHandle,
|
||||
) -> Result<TaskTracker<<Self::Chunk as LockableChunk>::Job>, Self::Error>;
|
||||
|
||||
/// Drops a chunk from the partition
|
||||
fn drop_chunk(
|
||||
partition: LifecycleWriteGuard<'_, Self::Partition, Self>,
|
||||
chunk: LifecycleWriteGuard<'_, <Self::Chunk as LockableChunk>::Chunk, Self::Chunk>,
|
||||
) -> Result<TaskTracker<<Self::Chunk as LockableChunk>::Job>, Self::Error>;
|
||||
}
|
||||
|
||||
/// A `LockableChunk` is a wrapper around a `LifecycleChunk` that allows for
|
||||
/// planning and executing lifecycle actions on the chunk
|
||||
///
|
||||
/// Specifically a read lock can be obtained, a decision made based on the chunk's
|
||||
/// data, and then a lifecycle action optionally triggered, all without allowing
|
||||
/// concurrent modification
|
||||
///
|
||||
/// See the module level documentation for the guard module for more information
|
||||
/// on why this trait is the way it is
|
||||
///
|
||||
pub trait LockableChunk: Sized {
|
||||
type Chunk: LifecycleChunk;
|
||||
type Job: Sized + Send + Sync + 'static;
|
||||
type Error: std::error::Error + Send + Sync;
|
||||
|
||||
/// Acquire a shared read lock on the chunk
|
||||
fn read(&self) -> LifecycleReadGuard<'_, Self::Chunk, Self>;
|
||||
|
||||
/// Acquire an exclusive write lock on the chunk
|
||||
fn write(&self) -> LifecycleWriteGuard<'_, Self::Chunk, Self>;
|
||||
|
||||
/// Remove the copy of the Chunk's data from the read buffer.
|
||||
///
|
||||
/// Note that this can only be called for persisted chunks
|
||||
/// (otherwise the read buffer may contain the *only* copy of this
|
||||
/// chunk's data). In order to drop un-persisted chunks,
|
||||
/// [`drop_chunk`](LockablePartition::drop_chunk) must be used.
|
||||
fn unload_read_buffer(s: LifecycleWriteGuard<'_, Self::Chunk, Self>)
|
||||
-> Result<(), Self::Error>;
|
||||
|
||||
/// Load data back into the read buffer from object storage
|
||||
///
|
||||
/// Note that this can only be called for persisted chunks
|
||||
fn load_read_buffer(
|
||||
s: LifecycleWriteGuard<'_, Self::Chunk, Self>,
|
||||
) -> Result<TaskTracker<Self::Job>, Self::Error>;
|
||||
|
||||
fn id(&self) -> ChunkId;
|
||||
|
||||
fn order(&self) -> ChunkOrder;
|
||||
}
|
||||
|
||||
pub trait LifecyclePartition: std::fmt::Display {
|
||||
fn partition_key(&self) -> &str;
|
||||
|
||||
/// Returns true if all chunks in the partition are persisted.
|
||||
fn is_persisted(&self) -> bool;
|
||||
|
||||
/// Returns an approximation of the number of rows that can be persisted
|
||||
///
|
||||
/// `now` is the wall clock time that should be used to compute how long a given
|
||||
/// write has been present in memory
|
||||
fn persistable_row_count(&self) -> usize;
|
||||
|
||||
/// Returns the age of the oldest unpersisted write
|
||||
fn minimum_unpersisted_age(&self) -> Option<Time>;
|
||||
}
|
||||
|
||||
/// The lifecycle operates on chunks implementing this trait
|
||||
pub trait LifecycleChunk {
|
||||
fn lifecycle_action(&self) -> Option<&TaskTracker<ChunkLifecycleAction>>;
|
||||
|
||||
fn clear_lifecycle_action(&mut self);
|
||||
|
||||
/// Returns the min timestamp contained within this chunk
|
||||
fn min_timestamp(&self) -> Time;
|
||||
|
||||
/// Returns the access metrics for this chunk
|
||||
fn access_metrics(&self) -> AccessMetrics;
|
||||
|
||||
fn time_of_last_write(&self) -> Time;
|
||||
|
||||
fn addr(&self) -> &ChunkAddr;
|
||||
|
||||
fn storage(&self) -> ChunkStorage;
|
||||
|
||||
fn row_count(&self) -> usize;
|
||||
}
|
||||
|
||||
/// The trait for a persist handle
|
||||
pub trait PersistHandle {
|
||||
/// Any unpersisted chunks containing rows with timestamps less than or equal to this
|
||||
/// must be included in the corresponding `LockablePartition::persist_chunks` call
|
||||
fn timestamp(&self) -> Time;
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -17,7 +17,6 @@ hashbrown = "0.12"
|
|||
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
|
||||
iox_object_store = { path = "../iox_object_store" }
|
||||
job_registry = { path = "../job_registry" }
|
||||
lifecycle = { path = "../lifecycle" }
|
||||
metric = { path = "../metric" }
|
||||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
num_cpus = "1.13.0"
|
||||
|
|
Loading…
Reference in New Issue