diff --git a/Cargo.lock b/Cargo.lock index a3b589d504..e3abffd53a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1877,6 +1877,7 @@ dependencies = [ "string-interner", "test_helpers", "tokio", + "tracker", ] [[package]] @@ -2321,6 +2322,7 @@ dependencies = [ "object_store", "parking_lot", "snafu", + "tracker", ] [[package]] diff --git a/mutable_buffer/Cargo.toml b/mutable_buffer/Cargo.toml index 7f5a3faafe..8f1a476c80 100644 --- a/mutable_buffer/Cargo.toml +++ b/mutable_buffer/Cargo.toml @@ -21,12 +21,13 @@ data_types = { path = "../data_types" } # version of the flatbuffers crate flatbuffers = "0.8" generated_types = { path = "../generated_types" } -internal_types = { path = "../internal_types" } influxdb_line_protocol = { path = "../influxdb_line_protocol" } +internal_types = { path = "../internal_types" } +observability_deps = { path = "../observability_deps" } snafu = "0.6.2" string-interner = "0.12.2" tokio = { version = "1.0", features = ["macros"] } -observability_deps = { path = "../observability_deps" } +tracker = { path = "../tracker" } [dev-dependencies] # In alphabetical order test_helpers = { path = "../test_helpers" } diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index c0c84ac216..14bacacbfe 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -15,6 +15,7 @@ use crate::{ table::Table, }; use snafu::{OptionExt, ResultExt, Snafu}; +use tracker::{MemRegistry, MemTracker}; #[derive(Debug, Snafu)] pub enum Error { @@ -105,7 +106,7 @@ pub enum Error { pub type Result = std::result::Result; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Chunk { /// The id for this chunk pub id: u32, @@ -118,15 +119,36 @@ pub struct Chunk { /// map of the dictionary ID for the table name to the table pub tables: HashMap, + + /// keep track of memory used by chunk + tracker: MemTracker, +} + +impl Clone for Chunk { + fn clone(&self) -> Self { + // TODO: The performance of this is not great - (#635) + let mut ret = Self { + id: self.id, + dictionary: self.dictionary.clone(), + tables: self.tables.clone(), + tracker: self.tracker.clone_empty(), + }; + + ret.tracker.set_bytes(ret.size()); + ret + } } impl Chunk { - pub fn new(id: u32) -> Self { - Self { + pub fn new(id: u32, memory_registry: &MemRegistry) -> Self { + let mut chunk = Self { id, dictionary: Dictionary::new(), tables: HashMap::new(), - } + tracker: memory_registry.register(), + }; + chunk.tracker.set_bytes(chunk.size()); + chunk } pub fn write_entry(&mut self, entry: &wb::WriteBufferEntry<'_>) -> Result<()> { @@ -136,6 +158,8 @@ impl Chunk { } } + self.tracker.set_bytes(self.size()); + Ok(()) } diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index 54fbeebdb6..fe35b2f1f8 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -596,10 +596,12 @@ mod tests { use internal_types::data::split_lines_into_write_entry_partitions; use super::*; + use tracker::MemRegistry; #[test] fn test_has_columns() { - let mut chunk = Chunk::new(42); + let registry = Arc::new(MemRegistry::new()); + let mut chunk = Chunk::new(42, registry.as_ref()); let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); @@ -641,7 +643,8 @@ mod tests { #[test] fn table_size() { - let mut chunk = Chunk::new(42); + let registry = Arc::new(MemRegistry::new()); + let mut chunk = Chunk::new(42, registry.as_ref()); let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); @@ -664,7 +667,8 @@ mod tests { #[test] fn test_matches_table_name_predicate() { - let mut chunk = Chunk::new(42); + let registry = Arc::new(MemRegistry::new()); + let mut chunk = Chunk::new(42, registry.as_ref()); let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("h2o")); @@ -694,7 +698,8 @@ mod tests { #[test] fn test_matches_column_name_predicate() { - let mut chunk = Chunk::new(42); + let registry = Arc::new(MemRegistry::new()); + let mut chunk = Chunk::new(42, registry.as_ref()); let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("h2o")); @@ -740,7 +745,8 @@ mod tests { #[test] fn test_to_arrow_schema_all() { - let mut chunk = Chunk::new(42); + let registry = Arc::new(MemRegistry::new()); + let mut chunk = Chunk::new(42, registry.as_ref()); let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); @@ -773,7 +779,8 @@ mod tests { #[test] fn test_to_arrow_schema_subset() { - let mut chunk = Chunk::new(42); + let registry = Arc::new(MemRegistry::new()); + let mut chunk = Chunk::new(42, registry.as_ref()); let dictionary = &mut chunk.dictionary; let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index 3c7503af3b..015913d1d1 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -11,4 +11,5 @@ data_types = { path = "../data_types" } futures = "0.3.7" object_store = {path = "../object_store"} parking_lot = "0.11.1" -snafu = "0.6" \ No newline at end of file +snafu = "0.6" +tracker = { path = "../tracker" } \ No newline at end of file diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 3f80e7996f..d6adc2dda3 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -3,8 +3,9 @@ use std::collections::BTreeSet; use crate::table::Table; use data_types::partition_metadata::TableSummary; use object_store::path::Path; +use tracker::{MemRegistry, MemTracker}; -#[derive(Default, Debug, Clone)] +#[derive(Debug)] pub struct Chunk { /// Partition this chunk belongs to pub partition_key: String, @@ -14,16 +15,21 @@ pub struct Chunk { /// Tables of this chunk pub tables: Vec, + + /// Track memory used by this chunk + memory_tracker: MemTracker, } impl Chunk { - pub fn new(part_key: String, chunk_id: u32) -> Self { - Self { + pub fn new(part_key: String, chunk_id: u32, memory_registry: &MemRegistry) -> Self { + let mut chunk = Self { partition_key: part_key, id: chunk_id, - tables: vec![], + tables: Default::default(), + memory_tracker: memory_registry.register(), }; - Self::default() + chunk.memory_tracker.set_bytes(chunk.size()); + chunk } pub fn add_table(&mut self, table_summary: TableSummary, file_location: Path) { diff --git a/server/src/buffer.rs b/server/src/buffer.rs index 8d2afdd0e3..c9b5342610 100644 --- a/server/src/buffer.rs +++ b/server/src/buffer.rs @@ -23,7 +23,7 @@ use data_types::wal::{SegmentPersistence, SegmentSummary, WriterSummary}; use observability_deps::tracing::{error, info, warn}; use parking_lot::Mutex; use snafu::{ensure, OptionExt, ResultExt, Snafu}; -use tracker::task::{TrackedFutureExt, TrackerRegistration}; +use tracker::{TaskRegistration, TrackedFutureExt}; #[derive(Debug, Snafu)] pub enum Error { @@ -378,7 +378,7 @@ impl Segment { /// the given object store location. pub fn persist_bytes_in_background( &self, - tracker: TrackerRegistration, + tracker: TaskRegistration, writer_id: u32, db_name: &DatabaseName<'_>, store: Arc, diff --git a/server/src/db.rs b/server/src/db.rs index c08400e9df..7fea941ba4 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -27,7 +27,7 @@ use object_store::{memory::InMemory, ObjectStore}; use parquet_file::{chunk::Chunk, storage::Storage}; use query::{Database, DEFAULT_SCHEMA}; use read_buffer::Database as ReadBufferDb; -use tracker::task::{TrackedFutureExt, Tracker}; +use tracker::{MemRegistry, TaskTracker, TrackedFutureExt}; use super::{buffer::Buffer, JobRegistry}; use data_types::job::Job; @@ -220,14 +220,29 @@ pub struct Db { /// A handle to the global jobs registry for long running tasks jobs: Arc, + /// Memory registries used for tracking memory usage by this Db + memory_registries: MemoryRegistries, + /// The system schema provider system_tables: Arc, + /// Used to allocated sequence numbers for writes sequence: AtomicU64, + /// Number of iterations of the worker loop for this Db worker_iterations: AtomicUsize, } +#[derive(Debug, Default)] +struct MemoryRegistries { + mutable_buffer: Arc, + + // TODO: Wire into read buffer + read_buffer: Arc, + + parquet: Arc, +} + impl Db { pub fn new( rules: DatabaseRules, @@ -247,6 +262,7 @@ impl Db { wal_buffer, jobs, system_tables, + memory_registries: Default::default(), sequence: AtomicU64::new(STARTING_SEQUENCE), worker_iterations: AtomicUsize::new(0), } @@ -271,7 +287,7 @@ impl Db { .context(RollingOverPartition { partition_key })?; // make a new chunk to track the newly created chunk in this partition - partition.create_open_chunk(); + partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref()); return Ok(DBChunk::snapshot(&chunk)); } @@ -466,7 +482,11 @@ impl Db { let table_stats = read_buffer.table_summaries(partition_key, &[chunk_id]); // Create a parquet chunk for this chunk - let mut parquet_chunk = Chunk::new(partition_key.to_string(), chunk_id); + let mut parquet_chunk = Chunk::new( + partition_key.to_string(), + chunk_id, + self.memory_registries.parquet.as_ref(), + ); // Create a storage to save data of this chunk // Todo: this must be gotten from server or somewhere let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); @@ -533,7 +553,7 @@ impl Db { self: &Arc, partition_key: String, chunk_id: u32, - ) -> Tracker { + ) -> TaskTracker { let name = self.rules.read().name.clone(); let (tracker, registration) = self.jobs.register(Job::CloseChunk { db_name: name.to_string(), @@ -664,9 +684,9 @@ impl Database for Db { let mut partition = partition.write(); partition.update_last_write_at(); - let chunk = partition - .open_chunk() - .unwrap_or_else(|| partition.create_open_chunk()); + let chunk = partition.open_chunk().unwrap_or_else(|| { + partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref()) + }); let mut chunk = chunk.write(); chunk.record_write(); @@ -1104,6 +1124,15 @@ mod tests { 107, )]; + let size: usize = db + .chunk_summaries() + .unwrap() + .into_iter() + .map(|x| x.estimated_bytes) + .sum(); + + assert_eq!(db.memory_registries.mutable_buffer.bytes(), size); + assert_eq!( expected, chunk_summaries, "expected:\n{:#?}\n\nactual:{:#?}\n\n", @@ -1226,6 +1255,10 @@ mod tests { ), ]; + assert_eq!(db.memory_registries.mutable_buffer.bytes(), 101 + 133 + 135); + // TODO: Instrument read buffer + //assert_eq!(db.memory_registries.read_buffer.bytes(), 1269); + assert_eq!( expected, chunk_summaries, "expected:\n{:#?}\n\nactual:{:#?}\n\n", diff --git a/server/src/db/catalog.rs b/server/src/db/catalog.rs index 72db4f3696..e8100cf9e5 100644 --- a/server/src/db/catalog.rs +++ b/server/src/db/catalog.rs @@ -231,6 +231,7 @@ impl SchemaProvider for Catalog { #[cfg(test)] mod tests { use super::*; + use tracker::MemRegistry; #[test] fn partition_get() { @@ -269,12 +270,13 @@ mod tests { #[test] fn chunk_create() { + let registry = MemRegistry::new(); let catalog = Catalog::new(); let p1 = catalog.get_or_create_partition("p1"); let mut p1 = p1.write(); - p1.create_open_chunk(); - p1.create_open_chunk(); + p1.create_open_chunk(®istry); + p1.create_open_chunk(®istry); let c1_0 = p1.chunk(0).unwrap(); assert_eq!(c1_0.read().key(), "p1"); @@ -290,20 +292,21 @@ mod tests { #[test] fn chunk_list() { + let registry = MemRegistry::new(); let catalog = Catalog::new(); let p1 = catalog.get_or_create_partition("p1"); { let mut p1 = p1.write(); - p1.create_open_chunk(); - p1.create_open_chunk(); + p1.create_open_chunk(®istry); + p1.create_open_chunk(®istry); } let p2 = catalog.get_or_create_partition("p2"); { let mut p2 = p2.write(); - p2.create_open_chunk(); + p2.create_open_chunk(®istry); } assert_eq!( @@ -355,19 +358,20 @@ mod tests { #[test] fn chunk_drop() { + let registry = MemRegistry::new(); let catalog = Catalog::new(); let p1 = catalog.get_or_create_partition("p1"); { let mut p1 = p1.write(); - p1.create_open_chunk(); - p1.create_open_chunk(); + p1.create_open_chunk(®istry); + p1.create_open_chunk(®istry); } let p2 = catalog.get_or_create_partition("p2"); { let mut p2 = p2.write(); - p2.create_open_chunk(); + p2.create_open_chunk(®istry); } assert_eq!(chunk_strings(&catalog).len(), 3); @@ -399,14 +403,15 @@ mod tests { #[test] fn chunk_recreate_dropped() { + let registry = MemRegistry::new(); let catalog = Catalog::new(); let p1 = catalog.get_or_create_partition("p1"); { let mut p1 = p1.write(); - p1.create_open_chunk(); - p1.create_open_chunk(); + p1.create_open_chunk(®istry); + p1.create_open_chunk(®istry); } assert_eq!(chunk_strings(&catalog).len(), 2); @@ -419,7 +424,7 @@ mod tests { // should be ok to recreate (thought maybe not a great idea) { let mut p1 = p1.write(); - p1.create_open_chunk(); + p1.create_open_chunk(®istry); } assert_eq!(chunk_strings(&catalog).len(), 2); } diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index a20b72d463..4b18b2277a 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -11,6 +11,7 @@ use read_buffer::Database as ReadBufferDb; use crate::db::DBChunk; use super::{InternalChunkState, Result}; +use tracker::MemRegistry; /// The state a Chunk is in and what its underlying backing storage is #[derive(Debug)] @@ -109,8 +110,12 @@ impl Chunk { } /// Creates a new open chunk - pub(crate) fn new_open(partition_key: impl Into, id: u32) -> Self { - let state = ChunkState::Open(mutable_buffer::chunk::Chunk::new(id)); + pub(crate) fn new_open( + partition_key: impl Into, + id: u32, + memory_registry: &MemRegistry, + ) -> Self { + let state = ChunkState::Open(mutable_buffer::chunk::Chunk::new(id, memory_registry)); Self::new(partition_key, id, state) } diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 41d3550e8f..5e44cf6b2d 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -11,6 +11,7 @@ use data_types::chunk::ChunkSummary; use data_types::partition_metadata::PartitionSummary; use parking_lot::RwLock; use snafu::OptionExt; +use tracker::MemRegistry; /// IOx Catalog Partition /// @@ -76,11 +77,15 @@ impl Partition { } /// Create a new Chunk in the open state - pub fn create_open_chunk(&mut self) -> Arc> { + pub fn create_open_chunk(&mut self, memory_registry: &MemRegistry) -> Arc> { let chunk_id = self.next_chunk_id; self.next_chunk_id += 1; - let chunk = Arc::new(RwLock::new(Chunk::new_open(&self.key, chunk_id))); + let chunk = Arc::new(RwLock::new(Chunk::new_open( + &self.key, + chunk_id, + memory_registry, + ))); if self.chunks.insert(chunk_id, Arc::clone(&chunk)).is_some() { // A fundamental invariant has been violated - abort diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 5600cb34a8..8bb59b6fa3 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -7,7 +7,7 @@ use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use data_types::{database_rules::LifecycleRules, error::ErrorLogger, job::Job}; -use tracker::task::Tracker; +use tracker::TaskTracker; use super::{ catalog::chunk::{Chunk, ChunkState}, @@ -19,7 +19,7 @@ use data_types::database_rules::SortOrder; pub struct LifecycleManager { db: Arc, db_name: String, - move_task: Option>, + move_task: Option>, } impl LifecycleManager { @@ -232,6 +232,7 @@ fn can_move(rules: &LifecycleRules, chunk: &Chunk, now: DateTime) -> bool { mod tests { use super::*; use std::num::{NonZeroU32, NonZeroUsize}; + use tracker::MemRegistry; fn from_secs(secs: i64) -> DateTime { DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(secs, 0), Utc) @@ -242,7 +243,7 @@ mod tests { time_of_first_write: Option, time_of_last_write: Option, ) -> Chunk { - let mut chunk = Chunk::new_open("", id); + let mut chunk = Chunk::new_open("", id, &MemRegistry::new()); chunk.set_timestamps( time_of_first_write.map(from_secs), time_of_last_write.map(from_secs), diff --git a/server/src/lib.rs b/server/src/lib.rs index 18896a650a..5dcb0181f3 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -88,9 +88,7 @@ use internal_types::{ }; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use query::{exec::Executor, Database, DatabaseStore}; -use tracker::task::{ - TrackedFutureExt, Tracker, TrackerId, TrackerRegistration, TrackerRegistryWithHistory, -}; +use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt}; use futures::{pin_mut, FutureExt}; @@ -158,13 +156,13 @@ const JOB_HISTORY_SIZE: usize = 1000; /// The global job registry #[derive(Debug)] pub struct JobRegistry { - inner: Mutex>, + inner: Mutex>, } impl Default for JobRegistry { fn default() -> Self { Self { - inner: Mutex::new(TrackerRegistryWithHistory::new(JOB_HISTORY_SIZE)), + inner: Mutex::new(TaskRegistryWithHistory::new(JOB_HISTORY_SIZE)), } } } @@ -174,7 +172,7 @@ impl JobRegistry { Default::default() } - pub fn register(&self, job: Job) -> (Tracker, TrackerRegistration) { + pub fn register(&self, job: Job) -> (TaskTracker, TaskRegistration) { self.inner.lock().register(job) } } @@ -413,7 +411,7 @@ impl Server { self.config.delete_remote(id) } - pub fn spawn_dummy_job(&self, nanos: Vec) -> Tracker { + pub fn spawn_dummy_job(&self, nanos: Vec) -> TaskTracker { let (tracker, registration) = self.jobs.register(Job::Dummy { nanos: nanos.clone(), }); @@ -435,7 +433,7 @@ impl Server { db_name: DatabaseName<'_>, partition_key: impl Into, chunk_id: u32, - ) -> Result> { + ) -> Result> { let db_name = db_name.to_string(); let name = DatabaseName::new(&db_name).context(InvalidDatabaseName)?; @@ -450,12 +448,12 @@ impl Server { } /// Returns a list of all jobs tracked by this server - pub fn tracked_jobs(&self) -> Vec> { + pub fn tracked_jobs(&self) -> Vec> { self.jobs.inner.lock().tracked() } /// Returns a specific job tracked by this server - pub fn get_job(&self, id: TrackerId) -> Option> { + pub fn get_job(&self, id: TaskId) -> Option> { self.jobs.inner.lock().get(id) } diff --git a/server/src/snapshot.rs b/server/src/snapshot.rs index 33e78db20d..423e3e9527 100644 --- a/server/src/snapshot.rs +++ b/server/src/snapshot.rs @@ -281,6 +281,7 @@ mod tests { use mutable_buffer::chunk::Chunk as ChunkWB; use object_store::memory::InMemory; use query::{test::TestLPWriter, Database}; + use tracker::MemRegistry; #[tokio::test] async fn snapshot() { @@ -350,9 +351,10 @@ mem,host=A,region=west used=45 1 }, ]; + let registry = MemRegistry::new(); let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); let chunk = Arc::new(DBChunk::MutableBuffer { - chunk: Arc::new(ChunkWB::new(11)), + chunk: Arc::new(ChunkWB::new(11, ®istry)), partition_key: Arc::new("key".to_string()), open: false, }); diff --git a/src/commands/database.rs b/src/commands/database.rs index e33d723ffe..c35580315c 100644 --- a/src/commands/database.rs +++ b/src/commands/database.rs @@ -196,7 +196,7 @@ pub async fn command(url: String, config: Config) -> Result<()> { Command::List(_) => { let mut client = management::Client::new(connection); let databases = client.list_databases().await?; - println!("{}", databases.join(", ")) + println!("{}", databases.join("\n")) } Command::Get(get) => { let mut client = management::Client::new(connection); diff --git a/src/influxdb_ioxd/rpc/operations.rs b/src/influxdb_ioxd/rpc/operations.rs index 90a18c95cd..40c706e5dc 100644 --- a/src/influxdb_ioxd/rpc/operations.rs +++ b/src/influxdb_ioxd/rpc/operations.rs @@ -18,7 +18,7 @@ use generated_types::{ influxdata::iox::management::v1 as management, protobuf_type_url, }; -use tracker::task::{Tracker, TrackerId, TrackerStatus}; +use tracker::{TaskId, TaskStatus, TaskTracker}; use server::{ConnectionManager, Server}; use std::convert::TryInto; @@ -28,13 +28,13 @@ struct OperationsService { server: Arc>, } -pub fn encode_tracker(tracker: Tracker) -> Result { +pub fn encode_tracker(tracker: TaskTracker) -> Result { let id = tracker.id(); let is_cancelled = tracker.is_cancelled(); let status = tracker.get_status(); let (operation_metadata, is_complete) = match status { - TrackerStatus::Creating => { + TaskStatus::Creating => { let metadata = management::OperationMetadata { job: Some(tracker.metadata().clone().into()), ..Default::default() @@ -42,7 +42,7 @@ pub fn encode_tracker(tracker: Tracker) -> Result (metadata, false) } - TrackerStatus::Running { + TaskStatus::Running { total_count, pending_count, cpu_nanos, @@ -57,7 +57,7 @@ pub fn encode_tracker(tracker: Tracker) -> Result (metadata, false) } - TrackerStatus::Complete { + TaskStatus::Complete { total_count, cpu_nanos, wall_nanos, @@ -108,11 +108,11 @@ pub fn encode_tracker(tracker: Tracker) -> Result }) } -fn get_tracker(server: &Server, tracker: String) -> Result, tonic::Status> +fn get_tracker(server: &Server, tracker: String) -> Result, tonic::Status> where M: ConnectionManager, { - let tracker_id = tracker.parse::().map_err(|e| FieldViolation { + let tracker_id = tracker.parse::().map_err(|e| FieldViolation { field: "name".to_string(), description: e.to_string(), })?; diff --git a/tracker/src/lib.rs b/tracker/src/lib.rs index 98f06295fd..75f24ea558 100644 --- a/tracker/src/lib.rs +++ b/tracker/src/lib.rs @@ -6,4 +6,8 @@ clippy::clone_on_ref_ptr )] -pub mod task; +mod mem; +mod task; + +pub use mem::*; +pub use task::*; diff --git a/tracker/src/mem.rs b/tracker/src/mem.rs new file mode 100644 index 0000000000..a337a59cf6 --- /dev/null +++ b/tracker/src/mem.rs @@ -0,0 +1,117 @@ +//! This module contains a basic memory tracking system + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +/// A simple memory registry that tracks the total memory consumption +/// across a set of MemTrackers +#[derive(Debug, Default)] +pub struct MemRegistry { + inner: Arc, +} + +#[derive(Debug, Default)] +struct MemTrackerShared { + /// The total bytes across all registered trackers + bytes: AtomicUsize, +} + +impl MemRegistry { + pub fn new() -> Self { + Self::default() + } + + pub fn bytes(&self) -> usize { + self.inner.bytes.load(Ordering::Relaxed) + } + + pub fn register(&self) -> MemTracker { + MemTracker { + shared: Arc::clone(&self.inner), + bytes: 0, + } + } +} + +/// A MemTracker is created with a reference to a MemRegistry +/// The memory "allocation" associated with a specific MemTracker +/// can be increased or decreased and this will update the totals +/// on the MemRegistry +/// +/// On Drop the "allocated" bytes associated with the MemTracker +/// will be decremented from the MemRegistry's total +/// +/// Note: this purposefully does not implement Clone as the semantics +/// of such a construct are unclear +/// +/// Note: this purposefully does not implement Default to avoid +/// accidentally creating untracked objects +#[derive(Debug)] +pub struct MemTracker { + shared: Arc, + bytes: usize, +} + +impl MemTracker { + /// Creates a new empty tracker registered to + /// the same registry as this tracker + pub fn clone_empty(&self) -> Self { + Self { + shared: Arc::clone(&self.shared), + bytes: 0, + } + } + + /// Set the number of bytes associated with this tracked instance + pub fn set_bytes(&mut self, new: usize) { + if new > self.bytes { + self.shared + .bytes + .fetch_add(new - self.bytes, Ordering::Relaxed); + } else { + self.shared + .bytes + .fetch_sub(self.bytes - new, Ordering::Relaxed); + } + self.bytes = new; + } +} + +impl Drop for MemTracker { + fn drop(&mut self) { + self.shared.bytes.fetch_sub(self.bytes, Ordering::Relaxed); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_tracker() { + let registry = MemRegistry::new(); + let mut t1 = registry.register(); + let mut t2 = registry.register(); + + t1.set_bytes(200); + + assert_eq!(registry.bytes(), 200); + + t1.set_bytes(100); + + assert_eq!(registry.bytes(), 100); + + t2.set_bytes(300); + + assert_eq!(registry.bytes(), 400); + + t2.set_bytes(400); + assert_eq!(registry.bytes(), 500); + + std::mem::drop(t2); + assert_eq!(registry.bytes(), 100); + + std::mem::drop(t1); + assert_eq!(registry.bytes(), 0); + } +} diff --git a/tracker/src/task.rs b/tracker/src/task.rs index 1afb5db0d2..ac725d0592 100644 --- a/tracker/src/task.rs +++ b/tracker/src/task.rs @@ -88,8 +88,8 @@ use observability_deps::tracing::warn; use tokio_util::sync::CancellationToken; pub use future::{TrackedFuture, TrackedFutureExt}; -pub use history::TrackerRegistryWithHistory; -pub use registry::{TrackerId, TrackerRegistry}; +pub use history::TaskRegistryWithHistory; +pub use registry::{TaskId, TaskRegistry}; mod future; mod history; @@ -110,9 +110,9 @@ struct TrackerState { watch: tokio::sync::watch::Receiver, } -/// The status of the tracker +/// The status of the tracked task #[derive(Debug, Clone)] -pub enum TrackerStatus { +pub enum TaskStatus { /// More futures can be registered Creating, @@ -147,13 +147,13 @@ pub enum TrackerStatus { /// A Tracker can be used to monitor/cancel/wait for a set of associated futures #[derive(Debug)] -pub struct Tracker { - id: TrackerId, +pub struct TaskTracker { + id: TaskId, state: Arc, metadata: Arc, } -impl Clone for Tracker { +impl Clone for TaskTracker { fn clone(&self) -> Self { Self { id: self.id, @@ -163,9 +163,9 @@ impl Clone for Tracker { } } -impl Tracker { +impl TaskTracker { /// Returns the ID of the Tracker - these are unique per TrackerRegistry - pub fn id(&self) -> TrackerId { + pub fn id(&self) -> TaskId { self.id } @@ -187,11 +187,11 @@ impl Tracker { /// Returns true if all futures associated with this tracker have /// been dropped and no more can be created pub fn is_complete(&self) -> bool { - matches!(self.get_status(), TrackerStatus::Complete{..}) + matches!(self.get_status(), TaskStatus::Complete{..}) } /// Gets the status of the tracker - pub fn get_status(&self) -> TrackerStatus { + pub fn get_status(&self) -> TaskStatus { // The atomic decrement in TrackerRegistration::drop has release semantics // acquire here ensures that if a thread observes the tracker to have // no pending_registrations it cannot subsequently observe pending_futures @@ -205,13 +205,13 @@ impl Tracker { let pending_futures = self.state.pending_futures.load(Ordering::Acquire); match (pending_registrations == 0, pending_futures == 0) { - (false, _) => TrackerStatus::Creating, - (true, false) => TrackerStatus::Running { + (false, _) => TaskStatus::Creating, + (true, false) => TaskStatus::Running { total_count: self.state.created_futures.load(Ordering::Relaxed), pending_count: self.state.pending_futures.load(Ordering::Relaxed), cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed), }, - (true, true) => TrackerStatus::Complete { + (true, true) => TaskStatus::Complete { total_count: self.state.created_futures.load(Ordering::Relaxed), cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed), wall_nanos: self.state.wall_nanos.load(Ordering::Relaxed), @@ -248,11 +248,11 @@ impl Tracker { /// TrackedFutures are registered with a Tracker that has already signalled /// completion #[derive(Debug)] -pub struct TrackerRegistration { +pub struct TaskRegistration { state: Arc, } -impl Clone for TrackerRegistration { +impl Clone for TaskRegistration { fn clone(&self) -> Self { self.state .pending_registrations @@ -264,7 +264,7 @@ impl Clone for TrackerRegistration { } } -impl TrackerRegistration { +impl TaskRegistration { fn new(watch: tokio::sync::watch::Receiver) -> Self { let state = Arc::new(TrackerState { start_instant: Instant::now(), @@ -281,7 +281,7 @@ impl TrackerRegistration { } } -impl Drop for TrackerRegistration { +impl Drop for TaskRegistration { fn drop(&mut self) { // This synchronizes with the Acquire load in Tracker::get_status let previous = self @@ -305,7 +305,7 @@ mod tests { #[tokio::test] async fn test_lifecycle() { let (sender, receive) = oneshot::channel(); - let mut registry = TrackerRegistry::new(); + let mut registry = TaskRegistry::new(); let (_, registration) = registry.register(()); let task = tokio::spawn(receive.track(registration)); @@ -322,7 +322,7 @@ mod tests { async fn test_interleaved() { let (sender1, receive1) = oneshot::channel(); let (sender2, receive2) = oneshot::channel(); - let mut registry = TrackerRegistry::new(); + let mut registry = TaskRegistry::new(); let (_, registration1) = registry.register(1); let (_, registration2) = registry.register(2); @@ -347,7 +347,7 @@ mod tests { #[tokio::test] async fn test_drop() { - let mut registry = TrackerRegistry::new(); + let mut registry = TaskRegistry::new(); let (_, registration) = registry.register(()); { @@ -363,7 +363,7 @@ mod tests { #[tokio::test] async fn test_drop_multiple() { - let mut registry = TrackerRegistry::new(); + let mut registry = TaskRegistry::new(); let (_, registration) = registry.register(()); { @@ -382,7 +382,7 @@ mod tests { #[tokio::test] async fn test_terminate() { - let mut registry = TrackerRegistry::new(); + let mut registry = TaskRegistry::new(); let (_, registration) = registry.register(()); let task = tokio::spawn(futures::future::pending::<()>().track(registration)); @@ -399,7 +399,7 @@ mod tests { #[tokio::test] async fn test_terminate_early() { - let mut registry = TrackerRegistry::new(); + let mut registry = TaskRegistry::new(); let (tracker, registration) = registry.register(()); tracker.cancel(); @@ -412,7 +412,7 @@ mod tests { #[tokio::test] async fn test_terminate_multiple() { - let mut registry = TrackerRegistry::new(); + let mut registry = TaskRegistry::new(); let (_, registration) = registry.register(()); let task1 = tokio::spawn(futures::future::pending::<()>().track(registration.clone())); @@ -433,7 +433,7 @@ mod tests { #[tokio::test] async fn test_reclaim() { - let mut registry = TrackerRegistry::new(); + let mut registry = TaskRegistry::new(); let (_, registration1) = registry.register(1); let (_, registration2) = registry.register(2); @@ -489,7 +489,7 @@ mod tests { assert!(result3.is_ok()); assert!( - matches!(tracked[0].get_status(), TrackerStatus::Running { pending_count: 1, total_count: 2, ..}) + matches!(tracked[0].get_status(), TaskStatus::Running { pending_count: 1, total_count: 2, ..}) ); // Trigger termination of task5 @@ -511,7 +511,7 @@ mod tests { let result4 = task4.await.unwrap(); assert!(result4.is_err()); - assert!(matches!(running[0].get_status(), TrackerStatus::Complete { total_count: 2, ..})); + assert!(matches!(running[0].get_status(), TaskStatus::Complete { total_count: 2, ..})); let reclaimed = sorted(registry.reclaim().collect()); @@ -524,7 +524,7 @@ mod tests { // to prevent stalling the tokio executor #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_timing() { - let mut registry = TrackerRegistry::new(); + let mut registry = TaskRegistry::new(); let (tracker1, registration1) = registry.register(1); let (tracker2, registration2) = registry.register(2); let (tracker3, registration3) = registry.register(3); @@ -564,11 +564,11 @@ mod tests { ); }; - let assert_complete = |status: TrackerStatus, + let assert_complete = |status: TaskStatus, expected_cpu: std::time::Duration, expected_wal: std::time::Duration| { match status { - TrackerStatus::Complete { + TaskStatus::Complete { cpu_nanos, wall_nanos, .. @@ -599,7 +599,7 @@ mod tests { #[tokio::test] async fn test_register_race() { - let mut registry = TrackerRegistry::new(); + let mut registry = TaskRegistry::new(); let (_, registration) = registry.register(()); let task1 = tokio::spawn(futures::future::ready(()).track(registration.clone())); @@ -607,7 +607,7 @@ mod tests { let tracked = registry.tracked(); assert_eq!(tracked.len(), 1); - assert!(matches!(&tracked[0].get_status(), TrackerStatus::Creating)); + assert!(matches!(&tracked[0].get_status(), TaskStatus::Creating)); // Should only consider tasks complete once cannot register more Futures let reclaimed: Vec<_> = registry.reclaim().collect(); @@ -620,12 +620,12 @@ mod tests { assert_eq!(reclaimed.len(), 1); } - fn sorted(mut input: Vec>) -> Vec> { + fn sorted(mut input: Vec>) -> Vec> { input.sort_unstable_by_key(|x| *x.metadata()); input } - fn get_metadata(input: &[Tracker]) -> Vec { + fn get_metadata(input: &[TaskTracker]) -> Vec { let mut ret: Vec<_> = input.iter().map(|x| *x.metadata()).collect(); ret.sort_unstable(); ret diff --git a/tracker/src/task/future.rs b/tracker/src/task/future.rs index 1a28734bf7..68de0e8570 100644 --- a/tracker/src/task/future.rs +++ b/tracker/src/task/future.rs @@ -6,13 +6,13 @@ use std::time::Instant; use futures::{future::BoxFuture, prelude::*}; use pin_project::{pin_project, pinned_drop}; -use super::{TrackerRegistration, TrackerState}; +use super::{TaskRegistration, TrackerState}; use std::sync::Arc; /// An extension trait that provides `self.track(registration)` allowing /// associating this future with a `TrackerRegistration` pub trait TrackedFutureExt: Future { - fn track(self, registration: TrackerRegistration) -> TrackedFuture + fn track(self, registration: TaskRegistration) -> TrackedFuture where Self: Sized, { diff --git a/tracker/src/task/history.rs b/tracker/src/task/history.rs index 643647e578..9577b289fb 100644 --- a/tracker/src/task/history.rs +++ b/tracker/src/task/history.rs @@ -1,31 +1,31 @@ -use super::{Tracker, TrackerId, TrackerRegistration, TrackerRegistry}; +use super::{TaskId, TaskRegistration, TaskRegistry, TaskTracker}; use hashbrown::hash_map::Entry; use hashbrown::HashMap; use observability_deps::tracing::info; use std::hash::Hash; -/// A wrapper around a TrackerRegistry that automatically retains a history +/// A wrapper around a TaskRegistry that automatically retains a history #[derive(Debug)] -pub struct TrackerRegistryWithHistory { - registry: TrackerRegistry, - history: SizeLimitedHashMap>, +pub struct TaskRegistryWithHistory { + registry: TaskRegistry, + history: SizeLimitedHashMap>, } -impl TrackerRegistryWithHistory { +impl TaskRegistryWithHistory { pub fn new(capacity: usize) -> Self { Self { history: SizeLimitedHashMap::new(capacity), - registry: TrackerRegistry::new(), + registry: TaskRegistry::new(), } } /// Register a new tracker in the registry - pub fn register(&mut self, metadata: T) -> (Tracker, TrackerRegistration) { + pub fn register(&mut self, metadata: T) -> (TaskTracker, TaskRegistration) { self.registry.register(metadata) } /// Get the tracker associated with a given id - pub fn get(&self, id: TrackerId) -> Option> { + pub fn get(&self, id: TaskId) -> Option> { match self.history.get(&id) { Some(x) => Some(x.clone()), None => self.registry.get(id), @@ -37,14 +37,14 @@ impl TrackerRegistryWithHistory { } /// Returns a list of trackers, including those that are no longer running - pub fn tracked(&self) -> Vec> { + pub fn tracked(&self) -> Vec> { let mut tracked = self.registry.tracked(); tracked.extend(self.history.values().cloned()); tracked } /// Returns a list of active trackers - pub fn running(&self) -> Vec> { + pub fn running(&self) -> Vec> { self.registry.running() } @@ -165,14 +165,14 @@ mod tests { } #[test] - fn test_tracker_archive() { - let compare = |expected_ids: &[TrackerId], archive: &TrackerRegistryWithHistory| { + fn test_registry_archive() { + let compare = |expected_ids: &[TaskId], archive: &TaskRegistryWithHistory| { let mut collected: Vec<_> = archive.history.values().map(|x| x.id()).collect(); collected.sort(); assert_eq!(&collected, expected_ids); }; - let mut archive = TrackerRegistryWithHistory::new(4); + let mut archive = TaskRegistryWithHistory::new(4); for i in 0..=3 { archive.register(i); @@ -180,25 +180,16 @@ mod tests { archive.reclaim(); - compare( - &[TrackerId(0), TrackerId(1), TrackerId(2), TrackerId(3)], - &archive, - ); + compare(&[TaskId(0), TaskId(1), TaskId(2), TaskId(3)], &archive); for i in 4..=7 { archive.register(i); } - compare( - &[TrackerId(0), TrackerId(1), TrackerId(2), TrackerId(3)], - &archive, - ); + compare(&[TaskId(0), TaskId(1), TaskId(2), TaskId(3)], &archive); archive.reclaim(); - compare( - &[TrackerId(4), TrackerId(5), TrackerId(6), TrackerId(7)], - &archive, - ); + compare(&[TaskId(4), TaskId(5), TaskId(6), TaskId(7)], &archive); } } diff --git a/tracker/src/task/registry.rs b/tracker/src/task/registry.rs index fd5c4a4130..c616dd4ecc 100644 --- a/tracker/src/task/registry.rs +++ b/tracker/src/task/registry.rs @@ -1,15 +1,15 @@ -use super::{Tracker, TrackerRegistration}; +use super::{TaskRegistration, TaskTracker}; use hashbrown::HashMap; use observability_deps::tracing::debug; use std::str::FromStr; use std::sync::Arc; -/// Every future registered with a `TrackerRegistry` is assigned a unique -/// `TrackerId` +/// Every future registered with a `TaskRegistry` is assigned a unique +/// `TaskId` #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct TrackerId(pub(super) usize); +pub struct TaskId(pub(super) usize); -impl FromStr for TrackerId { +impl FromStr for TaskId { type Err = std::num::ParseIntError; fn from_str(s: &str) -> Result { @@ -17,7 +17,7 @@ impl FromStr for TrackerId { } } -impl ToString for TrackerId { +impl ToString for TaskId { fn to_string(&self) -> String { self.0.to_string() } @@ -25,8 +25,8 @@ impl ToString for TrackerId { /// Internal data stored by TrackerRegistry #[derive(Debug)] -struct TrackerSlot { - tracker: Tracker, +struct TaskSlot { + tracker: TaskTracker, watch: tokio::sync::watch::Sender, } @@ -35,42 +35,42 @@ struct TrackerSlot { /// /// Additionally can trigger graceful cancellation of registered futures #[derive(Debug)] -pub struct TrackerRegistry { +pub struct TaskRegistry { next_id: usize, - trackers: HashMap>, + tasks: HashMap>, } -impl Default for TrackerRegistry { +impl Default for TaskRegistry { fn default() -> Self { Self { next_id: 0, - trackers: Default::default(), + tasks: Default::default(), } } } -impl TrackerRegistry { +impl TaskRegistry { pub fn new() -> Self { Default::default() } /// Register a new tracker in the registry - pub fn register(&mut self, metadata: T) -> (Tracker, TrackerRegistration) { - let id = TrackerId(self.next_id); + pub fn register(&mut self, metadata: T) -> (TaskTracker, TaskRegistration) { + let id = TaskId(self.next_id); self.next_id += 1; let (sender, receiver) = tokio::sync::watch::channel(false); - let registration = TrackerRegistration::new(receiver); + let registration = TaskRegistration::new(receiver); - let tracker = Tracker { + let tracker = TaskTracker { id, metadata: Arc::new(metadata), state: Arc::clone(®istration.state), }; - self.trackers.insert( + self.tasks.insert( id, - TrackerSlot { + TaskSlot { tracker: tracker.clone(), watch: sender, }, @@ -81,8 +81,8 @@ impl TrackerRegistry { /// Removes completed tasks from the registry and returns an iterator of /// those removed - pub fn reclaim(&mut self) -> impl Iterator> + '_ { - self.trackers + pub fn reclaim(&mut self) -> impl Iterator> + '_ { + self.tasks .drain_filter(|_, v| v.tracker.is_complete()) .map(|(_, v)| { if let Err(error) = v.watch.send(true) { @@ -93,26 +93,23 @@ impl TrackerRegistry { }) } - pub fn get(&self, id: TrackerId) -> Option> { - self.trackers.get(&id).map(|x| x.tracker.clone()) + pub fn get(&self, id: TaskId) -> Option> { + self.tasks.get(&id).map(|x| x.tracker.clone()) } /// Returns the number of tracked tasks pub fn tracked_len(&self) -> usize { - self.trackers.len() + self.tasks.len() } /// Returns a list of trackers, including those that are no longer running - pub fn tracked(&self) -> Vec> { - self.trackers - .iter() - .map(|(_, v)| v.tracker.clone()) - .collect() + pub fn tracked(&self) -> Vec> { + self.tasks.iter().map(|(_, v)| v.tracker.clone()).collect() } /// Returns a list of active trackers - pub fn running(&self) -> Vec> { - self.trackers + pub fn running(&self) -> Vec> { + self.tasks .iter() .filter_map(|(_, v)| { if !v.tracker.is_complete() {