Merge branch 'main' into cn/no-more-test-results

pull/24376/head
kodiakhq[bot] 2021-04-07 16:45:32 +00:00 committed by GitHub
commit 319e66706e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 358 additions and 159 deletions

2
Cargo.lock generated
View File

@ -1877,6 +1877,7 @@ dependencies = [
"string-interner",
"test_helpers",
"tokio",
"tracker",
]
[[package]]
@ -2321,6 +2322,7 @@ dependencies = [
"object_store",
"parking_lot",
"snafu",
"tracker",
]
[[package]]

View File

@ -21,12 +21,13 @@ data_types = { path = "../data_types" }
# version of the flatbuffers crate
flatbuffers = "0.8"
generated_types = { path = "../generated_types" }
internal_types = { path = "../internal_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
internal_types = { path = "../internal_types" }
observability_deps = { path = "../observability_deps" }
snafu = "0.6.2"
string-interner = "0.12.2"
tokio = { version = "1.0", features = ["macros"] }
observability_deps = { path = "../observability_deps" }
tracker = { path = "../tracker" }
[dev-dependencies] # In alphabetical order
test_helpers = { path = "../test_helpers" }

View File

@ -15,6 +15,7 @@ use crate::{
table::Table,
};
use snafu::{OptionExt, ResultExt, Snafu};
use tracker::{MemRegistry, MemTracker};
#[derive(Debug, Snafu)]
pub enum Error {
@ -105,7 +106,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Chunk {
/// The id for this chunk
pub id: u32,
@ -118,15 +119,36 @@ pub struct Chunk {
/// map of the dictionary ID for the table name to the table
pub tables: HashMap<u32, Table>,
/// keep track of memory used by chunk
tracker: MemTracker,
}
impl Clone for Chunk {
fn clone(&self) -> Self {
// TODO: The performance of this is not great - (#635)
let mut ret = Self {
id: self.id,
dictionary: self.dictionary.clone(),
tables: self.tables.clone(),
tracker: self.tracker.clone_empty(),
};
ret.tracker.set_bytes(ret.size());
ret
}
}
impl Chunk {
pub fn new(id: u32) -> Self {
Self {
pub fn new(id: u32, memory_registry: &MemRegistry) -> Self {
let mut chunk = Self {
id,
dictionary: Dictionary::new(),
tables: HashMap::new(),
}
tracker: memory_registry.register(),
};
chunk.tracker.set_bytes(chunk.size());
chunk
}
pub fn write_entry(&mut self, entry: &wb::WriteBufferEntry<'_>) -> Result<()> {
@ -136,6 +158,8 @@ impl Chunk {
}
}
self.tracker.set_bytes(self.size());
Ok(())
}

View File

@ -596,10 +596,12 @@ mod tests {
use internal_types::data::split_lines_into_write_entry_partitions;
use super::*;
use tracker::MemRegistry;
#[test]
fn test_has_columns() {
let mut chunk = Chunk::new(42);
let registry = Arc::new(MemRegistry::new());
let mut chunk = Chunk::new(42, registry.as_ref());
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
@ -641,7 +643,8 @@ mod tests {
#[test]
fn table_size() {
let mut chunk = Chunk::new(42);
let registry = Arc::new(MemRegistry::new());
let mut chunk = Chunk::new(42, registry.as_ref());
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
@ -664,7 +667,8 @@ mod tests {
#[test]
fn test_matches_table_name_predicate() {
let mut chunk = Chunk::new(42);
let registry = Arc::new(MemRegistry::new());
let mut chunk = Chunk::new(42, registry.as_ref());
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("h2o"));
@ -694,7 +698,8 @@ mod tests {
#[test]
fn test_matches_column_name_predicate() {
let mut chunk = Chunk::new(42);
let registry = Arc::new(MemRegistry::new());
let mut chunk = Chunk::new(42, registry.as_ref());
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("h2o"));
@ -740,7 +745,8 @@ mod tests {
#[test]
fn test_to_arrow_schema_all() {
let mut chunk = Chunk::new(42);
let registry = Arc::new(MemRegistry::new());
let mut chunk = Chunk::new(42, registry.as_ref());
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
@ -773,7 +779,8 @@ mod tests {
#[test]
fn test_to_arrow_schema_subset() {
let mut chunk = Chunk::new(42);
let registry = Arc::new(MemRegistry::new());
let mut chunk = Chunk::new(42, registry.as_ref());
let dictionary = &mut chunk.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));

View File

@ -12,3 +12,4 @@ futures = "0.3.7"
object_store = {path = "../object_store"}
parking_lot = "0.11.1"
snafu = "0.6"
tracker = { path = "../tracker" }

View File

@ -3,8 +3,9 @@ use std::collections::BTreeSet;
use crate::table::Table;
use data_types::partition_metadata::TableSummary;
use object_store::path::Path;
use tracker::{MemRegistry, MemTracker};
#[derive(Default, Debug, Clone)]
#[derive(Debug)]
pub struct Chunk {
/// Partition this chunk belongs to
pub partition_key: String,
@ -14,16 +15,21 @@ pub struct Chunk {
/// Tables of this chunk
pub tables: Vec<Table>,
/// Track memory used by this chunk
memory_tracker: MemTracker,
}
impl Chunk {
pub fn new(part_key: String, chunk_id: u32) -> Self {
Self {
pub fn new(part_key: String, chunk_id: u32, memory_registry: &MemRegistry) -> Self {
let mut chunk = Self {
partition_key: part_key,
id: chunk_id,
tables: vec![],
tables: Default::default(),
memory_tracker: memory_registry.register(),
};
Self::default()
chunk.memory_tracker.set_bytes(chunk.size());
chunk
}
pub fn add_table(&mut self, table_summary: TableSummary, file_location: Path) {

View File

@ -23,7 +23,7 @@ use data_types::wal::{SegmentPersistence, SegmentSummary, WriterSummary};
use observability_deps::tracing::{error, info, warn};
use parking_lot::Mutex;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use tracker::task::{TrackedFutureExt, TrackerRegistration};
use tracker::{TaskRegistration, TrackedFutureExt};
#[derive(Debug, Snafu)]
pub enum Error {
@ -378,7 +378,7 @@ impl Segment {
/// the given object store location.
pub fn persist_bytes_in_background(
&self,
tracker: TrackerRegistration,
tracker: TaskRegistration,
writer_id: u32,
db_name: &DatabaseName<'_>,
store: Arc<ObjectStore>,

View File

@ -27,7 +27,7 @@ use object_store::{memory::InMemory, ObjectStore};
use parquet_file::{chunk::Chunk, storage::Storage};
use query::{Database, DEFAULT_SCHEMA};
use read_buffer::Database as ReadBufferDb;
use tracker::task::{TrackedFutureExt, Tracker};
use tracker::{MemRegistry, TaskTracker, TrackedFutureExt};
use super::{buffer::Buffer, JobRegistry};
use data_types::job::Job;
@ -220,14 +220,29 @@ pub struct Db {
/// A handle to the global jobs registry for long running tasks
jobs: Arc<JobRegistry>,
/// Memory registries used for tracking memory usage by this Db
memory_registries: MemoryRegistries,
/// The system schema provider
system_tables: Arc<SystemSchemaProvider>,
/// Used to allocated sequence numbers for writes
sequence: AtomicU64,
/// Number of iterations of the worker loop for this Db
worker_iterations: AtomicUsize,
}
#[derive(Debug, Default)]
struct MemoryRegistries {
mutable_buffer: Arc<MemRegistry>,
// TODO: Wire into read buffer
read_buffer: Arc<MemRegistry>,
parquet: Arc<MemRegistry>,
}
impl Db {
pub fn new(
rules: DatabaseRules,
@ -247,6 +262,7 @@ impl Db {
wal_buffer,
jobs,
system_tables,
memory_registries: Default::default(),
sequence: AtomicU64::new(STARTING_SEQUENCE),
worker_iterations: AtomicUsize::new(0),
}
@ -271,7 +287,7 @@ impl Db {
.context(RollingOverPartition { partition_key })?;
// make a new chunk to track the newly created chunk in this partition
partition.create_open_chunk();
partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref());
return Ok(DBChunk::snapshot(&chunk));
}
@ -466,7 +482,11 @@ impl Db {
let table_stats = read_buffer.table_summaries(partition_key, &[chunk_id]);
// Create a parquet chunk for this chunk
let mut parquet_chunk = Chunk::new(partition_key.to_string(), chunk_id);
let mut parquet_chunk = Chunk::new(
partition_key.to_string(),
chunk_id,
self.memory_registries.parquet.as_ref(),
);
// Create a storage to save data of this chunk
// Todo: this must be gotten from server or somewhere
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
@ -533,7 +553,7 @@ impl Db {
self: &Arc<Self>,
partition_key: String,
chunk_id: u32,
) -> Tracker<Job> {
) -> TaskTracker<Job> {
let name = self.rules.read().name.clone();
let (tracker, registration) = self.jobs.register(Job::CloseChunk {
db_name: name.to_string(),
@ -664,9 +684,9 @@ impl Database for Db {
let mut partition = partition.write();
partition.update_last_write_at();
let chunk = partition
.open_chunk()
.unwrap_or_else(|| partition.create_open_chunk());
let chunk = partition.open_chunk().unwrap_or_else(|| {
partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref())
});
let mut chunk = chunk.write();
chunk.record_write();
@ -1104,6 +1124,15 @@ mod tests {
107,
)];
let size: usize = db
.chunk_summaries()
.unwrap()
.into_iter()
.map(|x| x.estimated_bytes)
.sum();
assert_eq!(db.memory_registries.mutable_buffer.bytes(), size);
assert_eq!(
expected, chunk_summaries,
"expected:\n{:#?}\n\nactual:{:#?}\n\n",
@ -1226,6 +1255,10 @@ mod tests {
),
];
assert_eq!(db.memory_registries.mutable_buffer.bytes(), 101 + 133 + 135);
// TODO: Instrument read buffer
//assert_eq!(db.memory_registries.read_buffer.bytes(), 1269);
assert_eq!(
expected, chunk_summaries,
"expected:\n{:#?}\n\nactual:{:#?}\n\n",

View File

@ -231,6 +231,7 @@ impl SchemaProvider for Catalog {
#[cfg(test)]
mod tests {
use super::*;
use tracker::MemRegistry;
#[test]
fn partition_get() {
@ -269,12 +270,13 @@ mod tests {
#[test]
fn chunk_create() {
let registry = MemRegistry::new();
let catalog = Catalog::new();
let p1 = catalog.get_or_create_partition("p1");
let mut p1 = p1.write();
p1.create_open_chunk();
p1.create_open_chunk();
p1.create_open_chunk(&registry);
p1.create_open_chunk(&registry);
let c1_0 = p1.chunk(0).unwrap();
assert_eq!(c1_0.read().key(), "p1");
@ -290,20 +292,21 @@ mod tests {
#[test]
fn chunk_list() {
let registry = MemRegistry::new();
let catalog = Catalog::new();
let p1 = catalog.get_or_create_partition("p1");
{
let mut p1 = p1.write();
p1.create_open_chunk();
p1.create_open_chunk();
p1.create_open_chunk(&registry);
p1.create_open_chunk(&registry);
}
let p2 = catalog.get_or_create_partition("p2");
{
let mut p2 = p2.write();
p2.create_open_chunk();
p2.create_open_chunk(&registry);
}
assert_eq!(
@ -355,19 +358,20 @@ mod tests {
#[test]
fn chunk_drop() {
let registry = MemRegistry::new();
let catalog = Catalog::new();
let p1 = catalog.get_or_create_partition("p1");
{
let mut p1 = p1.write();
p1.create_open_chunk();
p1.create_open_chunk();
p1.create_open_chunk(&registry);
p1.create_open_chunk(&registry);
}
let p2 = catalog.get_or_create_partition("p2");
{
let mut p2 = p2.write();
p2.create_open_chunk();
p2.create_open_chunk(&registry);
}
assert_eq!(chunk_strings(&catalog).len(), 3);
@ -399,14 +403,15 @@ mod tests {
#[test]
fn chunk_recreate_dropped() {
let registry = MemRegistry::new();
let catalog = Catalog::new();
let p1 = catalog.get_or_create_partition("p1");
{
let mut p1 = p1.write();
p1.create_open_chunk();
p1.create_open_chunk();
p1.create_open_chunk(&registry);
p1.create_open_chunk(&registry);
}
assert_eq!(chunk_strings(&catalog).len(), 2);
@ -419,7 +424,7 @@ mod tests {
// should be ok to recreate (thought maybe not a great idea)
{
let mut p1 = p1.write();
p1.create_open_chunk();
p1.create_open_chunk(&registry);
}
assert_eq!(chunk_strings(&catalog).len(), 2);
}

View File

@ -11,6 +11,7 @@ use read_buffer::Database as ReadBufferDb;
use crate::db::DBChunk;
use super::{InternalChunkState, Result};
use tracker::MemRegistry;
/// The state a Chunk is in and what its underlying backing storage is
#[derive(Debug)]
@ -109,8 +110,12 @@ impl Chunk {
}
/// Creates a new open chunk
pub(crate) fn new_open(partition_key: impl Into<String>, id: u32) -> Self {
let state = ChunkState::Open(mutable_buffer::chunk::Chunk::new(id));
pub(crate) fn new_open(
partition_key: impl Into<String>,
id: u32,
memory_registry: &MemRegistry,
) -> Self {
let state = ChunkState::Open(mutable_buffer::chunk::Chunk::new(id, memory_registry));
Self::new(partition_key, id, state)
}

View File

@ -11,6 +11,7 @@ use data_types::chunk::ChunkSummary;
use data_types::partition_metadata::PartitionSummary;
use parking_lot::RwLock;
use snafu::OptionExt;
use tracker::MemRegistry;
/// IOx Catalog Partition
///
@ -76,11 +77,15 @@ impl Partition {
}
/// Create a new Chunk in the open state
pub fn create_open_chunk(&mut self) -> Arc<RwLock<Chunk>> {
pub fn create_open_chunk(&mut self, memory_registry: &MemRegistry) -> Arc<RwLock<Chunk>> {
let chunk_id = self.next_chunk_id;
self.next_chunk_id += 1;
let chunk = Arc::new(RwLock::new(Chunk::new_open(&self.key, chunk_id)));
let chunk = Arc::new(RwLock::new(Chunk::new_open(
&self.key,
chunk_id,
memory_registry,
)));
if self.chunks.insert(chunk_id, Arc::clone(&chunk)).is_some() {
// A fundamental invariant has been violated - abort

View File

@ -7,7 +7,7 @@ use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use data_types::{database_rules::LifecycleRules, error::ErrorLogger, job::Job};
use tracker::task::Tracker;
use tracker::TaskTracker;
use super::{
catalog::chunk::{Chunk, ChunkState},
@ -19,7 +19,7 @@ use data_types::database_rules::SortOrder;
pub struct LifecycleManager {
db: Arc<Db>,
db_name: String,
move_task: Option<Tracker<Job>>,
move_task: Option<TaskTracker<Job>>,
}
impl LifecycleManager {
@ -232,6 +232,7 @@ fn can_move(rules: &LifecycleRules, chunk: &Chunk, now: DateTime<Utc>) -> bool {
mod tests {
use super::*;
use std::num::{NonZeroU32, NonZeroUsize};
use tracker::MemRegistry;
fn from_secs(secs: i64) -> DateTime<Utc> {
DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(secs, 0), Utc)
@ -242,7 +243,7 @@ mod tests {
time_of_first_write: Option<i64>,
time_of_last_write: Option<i64>,
) -> Chunk {
let mut chunk = Chunk::new_open("", id);
let mut chunk = Chunk::new_open("", id, &MemRegistry::new());
chunk.set_timestamps(
time_of_first_write.map(from_secs),
time_of_last_write.map(from_secs),

View File

@ -88,9 +88,7 @@ use internal_types::{
};
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use query::{exec::Executor, Database, DatabaseStore};
use tracker::task::{
TrackedFutureExt, Tracker, TrackerId, TrackerRegistration, TrackerRegistryWithHistory,
};
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
use futures::{pin_mut, FutureExt};
@ -158,13 +156,13 @@ const JOB_HISTORY_SIZE: usize = 1000;
/// The global job registry
#[derive(Debug)]
pub struct JobRegistry {
inner: Mutex<TrackerRegistryWithHistory<Job>>,
inner: Mutex<TaskRegistryWithHistory<Job>>,
}
impl Default for JobRegistry {
fn default() -> Self {
Self {
inner: Mutex::new(TrackerRegistryWithHistory::new(JOB_HISTORY_SIZE)),
inner: Mutex::new(TaskRegistryWithHistory::new(JOB_HISTORY_SIZE)),
}
}
}
@ -174,7 +172,7 @@ impl JobRegistry {
Default::default()
}
pub fn register(&self, job: Job) -> (Tracker<Job>, TrackerRegistration) {
pub fn register(&self, job: Job) -> (TaskTracker<Job>, TaskRegistration) {
self.inner.lock().register(job)
}
}
@ -413,7 +411,7 @@ impl<M: ConnectionManager> Server<M> {
self.config.delete_remote(id)
}
pub fn spawn_dummy_job(&self, nanos: Vec<u64>) -> Tracker<Job> {
pub fn spawn_dummy_job(&self, nanos: Vec<u64>) -> TaskTracker<Job> {
let (tracker, registration) = self.jobs.register(Job::Dummy {
nanos: nanos.clone(),
});
@ -435,7 +433,7 @@ impl<M: ConnectionManager> Server<M> {
db_name: DatabaseName<'_>,
partition_key: impl Into<String>,
chunk_id: u32,
) -> Result<Tracker<Job>> {
) -> Result<TaskTracker<Job>> {
let db_name = db_name.to_string();
let name = DatabaseName::new(&db_name).context(InvalidDatabaseName)?;
@ -450,12 +448,12 @@ impl<M: ConnectionManager> Server<M> {
}
/// Returns a list of all jobs tracked by this server
pub fn tracked_jobs(&self) -> Vec<Tracker<Job>> {
pub fn tracked_jobs(&self) -> Vec<TaskTracker<Job>> {
self.jobs.inner.lock().tracked()
}
/// Returns a specific job tracked by this server
pub fn get_job(&self, id: TrackerId) -> Option<Tracker<Job>> {
pub fn get_job(&self, id: TaskId) -> Option<TaskTracker<Job>> {
self.jobs.inner.lock().get(id)
}

View File

@ -281,6 +281,7 @@ mod tests {
use mutable_buffer::chunk::Chunk as ChunkWB;
use object_store::memory::InMemory;
use query::{test::TestLPWriter, Database};
use tracker::MemRegistry;
#[tokio::test]
async fn snapshot() {
@ -350,9 +351,10 @@ mem,host=A,region=west used=45 1
},
];
let registry = MemRegistry::new();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let chunk = Arc::new(DBChunk::MutableBuffer {
chunk: Arc::new(ChunkWB::new(11)),
chunk: Arc::new(ChunkWB::new(11, &registry)),
partition_key: Arc::new("key".to_string()),
open: false,
});

View File

@ -196,7 +196,7 @@ pub async fn command(url: String, config: Config) -> Result<()> {
Command::List(_) => {
let mut client = management::Client::new(connection);
let databases = client.list_databases().await?;
println!("{}", databases.join(", "))
println!("{}", databases.join("\n"))
}
Command::Get(get) => {
let mut client = management::Client::new(connection);

View File

@ -18,7 +18,7 @@ use generated_types::{
influxdata::iox::management::v1 as management,
protobuf_type_url,
};
use tracker::task::{Tracker, TrackerId, TrackerStatus};
use tracker::{TaskId, TaskStatus, TaskTracker};
use server::{ConnectionManager, Server};
use std::convert::TryInto;
@ -28,13 +28,13 @@ struct OperationsService<M: ConnectionManager> {
server: Arc<Server<M>>,
}
pub fn encode_tracker(tracker: Tracker<Job>) -> Result<Operation, tonic::Status> {
pub fn encode_tracker(tracker: TaskTracker<Job>) -> Result<Operation, tonic::Status> {
let id = tracker.id();
let is_cancelled = tracker.is_cancelled();
let status = tracker.get_status();
let (operation_metadata, is_complete) = match status {
TrackerStatus::Creating => {
TaskStatus::Creating => {
let metadata = management::OperationMetadata {
job: Some(tracker.metadata().clone().into()),
..Default::default()
@ -42,7 +42,7 @@ pub fn encode_tracker(tracker: Tracker<Job>) -> Result<Operation, tonic::Status>
(metadata, false)
}
TrackerStatus::Running {
TaskStatus::Running {
total_count,
pending_count,
cpu_nanos,
@ -57,7 +57,7 @@ pub fn encode_tracker(tracker: Tracker<Job>) -> Result<Operation, tonic::Status>
(metadata, false)
}
TrackerStatus::Complete {
TaskStatus::Complete {
total_count,
cpu_nanos,
wall_nanos,
@ -108,11 +108,11 @@ pub fn encode_tracker(tracker: Tracker<Job>) -> Result<Operation, tonic::Status>
})
}
fn get_tracker<M>(server: &Server<M>, tracker: String) -> Result<Tracker<Job>, tonic::Status>
fn get_tracker<M>(server: &Server<M>, tracker: String) -> Result<TaskTracker<Job>, tonic::Status>
where
M: ConnectionManager,
{
let tracker_id = tracker.parse::<TrackerId>().map_err(|e| FieldViolation {
let tracker_id = tracker.parse::<TaskId>().map_err(|e| FieldViolation {
field: "name".to_string(),
description: e.to_string(),
})?;

View File

@ -6,4 +6,8 @@
clippy::clone_on_ref_ptr
)]
pub mod task;
mod mem;
mod task;
pub use mem::*;
pub use task::*;

117
tracker/src/mem.rs Normal file
View File

@ -0,0 +1,117 @@
//! This module contains a basic memory tracking system
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
/// A simple memory registry that tracks the total memory consumption
/// across a set of MemTrackers
#[derive(Debug, Default)]
pub struct MemRegistry {
inner: Arc<MemTrackerShared>,
}
#[derive(Debug, Default)]
struct MemTrackerShared {
/// The total bytes across all registered trackers
bytes: AtomicUsize,
}
impl MemRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn bytes(&self) -> usize {
self.inner.bytes.load(Ordering::Relaxed)
}
pub fn register(&self) -> MemTracker {
MemTracker {
shared: Arc::clone(&self.inner),
bytes: 0,
}
}
}
/// A MemTracker is created with a reference to a MemRegistry
/// The memory "allocation" associated with a specific MemTracker
/// can be increased or decreased and this will update the totals
/// on the MemRegistry
///
/// On Drop the "allocated" bytes associated with the MemTracker
/// will be decremented from the MemRegistry's total
///
/// Note: this purposefully does not implement Clone as the semantics
/// of such a construct are unclear
///
/// Note: this purposefully does not implement Default to avoid
/// accidentally creating untracked objects
#[derive(Debug)]
pub struct MemTracker {
shared: Arc<MemTrackerShared>,
bytes: usize,
}
impl MemTracker {
/// Creates a new empty tracker registered to
/// the same registry as this tracker
pub fn clone_empty(&self) -> Self {
Self {
shared: Arc::clone(&self.shared),
bytes: 0,
}
}
/// Set the number of bytes associated with this tracked instance
pub fn set_bytes(&mut self, new: usize) {
if new > self.bytes {
self.shared
.bytes
.fetch_add(new - self.bytes, Ordering::Relaxed);
} else {
self.shared
.bytes
.fetch_sub(self.bytes - new, Ordering::Relaxed);
}
self.bytes = new;
}
}
impl Drop for MemTracker {
fn drop(&mut self) {
self.shared.bytes.fetch_sub(self.bytes, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tracker() {
let registry = MemRegistry::new();
let mut t1 = registry.register();
let mut t2 = registry.register();
t1.set_bytes(200);
assert_eq!(registry.bytes(), 200);
t1.set_bytes(100);
assert_eq!(registry.bytes(), 100);
t2.set_bytes(300);
assert_eq!(registry.bytes(), 400);
t2.set_bytes(400);
assert_eq!(registry.bytes(), 500);
std::mem::drop(t2);
assert_eq!(registry.bytes(), 100);
std::mem::drop(t1);
assert_eq!(registry.bytes(), 0);
}
}

View File

@ -88,8 +88,8 @@ use observability_deps::tracing::warn;
use tokio_util::sync::CancellationToken;
pub use future::{TrackedFuture, TrackedFutureExt};
pub use history::TrackerRegistryWithHistory;
pub use registry::{TrackerId, TrackerRegistry};
pub use history::TaskRegistryWithHistory;
pub use registry::{TaskId, TaskRegistry};
mod future;
mod history;
@ -110,9 +110,9 @@ struct TrackerState {
watch: tokio::sync::watch::Receiver<bool>,
}
/// The status of the tracker
/// The status of the tracked task
#[derive(Debug, Clone)]
pub enum TrackerStatus {
pub enum TaskStatus {
/// More futures can be registered
Creating,
@ -147,13 +147,13 @@ pub enum TrackerStatus {
/// A Tracker can be used to monitor/cancel/wait for a set of associated futures
#[derive(Debug)]
pub struct Tracker<T> {
id: TrackerId,
pub struct TaskTracker<T> {
id: TaskId,
state: Arc<TrackerState>,
metadata: Arc<T>,
}
impl<T> Clone for Tracker<T> {
impl<T> Clone for TaskTracker<T> {
fn clone(&self) -> Self {
Self {
id: self.id,
@ -163,9 +163,9 @@ impl<T> Clone for Tracker<T> {
}
}
impl<T> Tracker<T> {
impl<T> TaskTracker<T> {
/// Returns the ID of the Tracker - these are unique per TrackerRegistry
pub fn id(&self) -> TrackerId {
pub fn id(&self) -> TaskId {
self.id
}
@ -187,11 +187,11 @@ impl<T> Tracker<T> {
/// Returns true if all futures associated with this tracker have
/// been dropped and no more can be created
pub fn is_complete(&self) -> bool {
matches!(self.get_status(), TrackerStatus::Complete{..})
matches!(self.get_status(), TaskStatus::Complete{..})
}
/// Gets the status of the tracker
pub fn get_status(&self) -> TrackerStatus {
pub fn get_status(&self) -> TaskStatus {
// The atomic decrement in TrackerRegistration::drop has release semantics
// acquire here ensures that if a thread observes the tracker to have
// no pending_registrations it cannot subsequently observe pending_futures
@ -205,13 +205,13 @@ impl<T> Tracker<T> {
let pending_futures = self.state.pending_futures.load(Ordering::Acquire);
match (pending_registrations == 0, pending_futures == 0) {
(false, _) => TrackerStatus::Creating,
(true, false) => TrackerStatus::Running {
(false, _) => TaskStatus::Creating,
(true, false) => TaskStatus::Running {
total_count: self.state.created_futures.load(Ordering::Relaxed),
pending_count: self.state.pending_futures.load(Ordering::Relaxed),
cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed),
},
(true, true) => TrackerStatus::Complete {
(true, true) => TaskStatus::Complete {
total_count: self.state.created_futures.load(Ordering::Relaxed),
cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed),
wall_nanos: self.state.wall_nanos.load(Ordering::Relaxed),
@ -248,11 +248,11 @@ impl<T> Tracker<T> {
/// TrackedFutures are registered with a Tracker that has already signalled
/// completion
#[derive(Debug)]
pub struct TrackerRegistration {
pub struct TaskRegistration {
state: Arc<TrackerState>,
}
impl Clone for TrackerRegistration {
impl Clone for TaskRegistration {
fn clone(&self) -> Self {
self.state
.pending_registrations
@ -264,7 +264,7 @@ impl Clone for TrackerRegistration {
}
}
impl TrackerRegistration {
impl TaskRegistration {
fn new(watch: tokio::sync::watch::Receiver<bool>) -> Self {
let state = Arc::new(TrackerState {
start_instant: Instant::now(),
@ -281,7 +281,7 @@ impl TrackerRegistration {
}
}
impl Drop for TrackerRegistration {
impl Drop for TaskRegistration {
fn drop(&mut self) {
// This synchronizes with the Acquire load in Tracker::get_status
let previous = self
@ -305,7 +305,7 @@ mod tests {
#[tokio::test]
async fn test_lifecycle() {
let (sender, receive) = oneshot::channel();
let mut registry = TrackerRegistry::new();
let mut registry = TaskRegistry::new();
let (_, registration) = registry.register(());
let task = tokio::spawn(receive.track(registration));
@ -322,7 +322,7 @@ mod tests {
async fn test_interleaved() {
let (sender1, receive1) = oneshot::channel();
let (sender2, receive2) = oneshot::channel();
let mut registry = TrackerRegistry::new();
let mut registry = TaskRegistry::new();
let (_, registration1) = registry.register(1);
let (_, registration2) = registry.register(2);
@ -347,7 +347,7 @@ mod tests {
#[tokio::test]
async fn test_drop() {
let mut registry = TrackerRegistry::new();
let mut registry = TaskRegistry::new();
let (_, registration) = registry.register(());
{
@ -363,7 +363,7 @@ mod tests {
#[tokio::test]
async fn test_drop_multiple() {
let mut registry = TrackerRegistry::new();
let mut registry = TaskRegistry::new();
let (_, registration) = registry.register(());
{
@ -382,7 +382,7 @@ mod tests {
#[tokio::test]
async fn test_terminate() {
let mut registry = TrackerRegistry::new();
let mut registry = TaskRegistry::new();
let (_, registration) = registry.register(());
let task = tokio::spawn(futures::future::pending::<()>().track(registration));
@ -399,7 +399,7 @@ mod tests {
#[tokio::test]
async fn test_terminate_early() {
let mut registry = TrackerRegistry::new();
let mut registry = TaskRegistry::new();
let (tracker, registration) = registry.register(());
tracker.cancel();
@ -412,7 +412,7 @@ mod tests {
#[tokio::test]
async fn test_terminate_multiple() {
let mut registry = TrackerRegistry::new();
let mut registry = TaskRegistry::new();
let (_, registration) = registry.register(());
let task1 = tokio::spawn(futures::future::pending::<()>().track(registration.clone()));
@ -433,7 +433,7 @@ mod tests {
#[tokio::test]
async fn test_reclaim() {
let mut registry = TrackerRegistry::new();
let mut registry = TaskRegistry::new();
let (_, registration1) = registry.register(1);
let (_, registration2) = registry.register(2);
@ -489,7 +489,7 @@ mod tests {
assert!(result3.is_ok());
assert!(
matches!(tracked[0].get_status(), TrackerStatus::Running { pending_count: 1, total_count: 2, ..})
matches!(tracked[0].get_status(), TaskStatus::Running { pending_count: 1, total_count: 2, ..})
);
// Trigger termination of task5
@ -511,7 +511,7 @@ mod tests {
let result4 = task4.await.unwrap();
assert!(result4.is_err());
assert!(matches!(running[0].get_status(), TrackerStatus::Complete { total_count: 2, ..}));
assert!(matches!(running[0].get_status(), TaskStatus::Complete { total_count: 2, ..}));
let reclaimed = sorted(registry.reclaim().collect());
@ -524,7 +524,7 @@ mod tests {
// to prevent stalling the tokio executor
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_timing() {
let mut registry = TrackerRegistry::new();
let mut registry = TaskRegistry::new();
let (tracker1, registration1) = registry.register(1);
let (tracker2, registration2) = registry.register(2);
let (tracker3, registration3) = registry.register(3);
@ -564,11 +564,11 @@ mod tests {
);
};
let assert_complete = |status: TrackerStatus,
let assert_complete = |status: TaskStatus,
expected_cpu: std::time::Duration,
expected_wal: std::time::Duration| {
match status {
TrackerStatus::Complete {
TaskStatus::Complete {
cpu_nanos,
wall_nanos,
..
@ -599,7 +599,7 @@ mod tests {
#[tokio::test]
async fn test_register_race() {
let mut registry = TrackerRegistry::new();
let mut registry = TaskRegistry::new();
let (_, registration) = registry.register(());
let task1 = tokio::spawn(futures::future::ready(()).track(registration.clone()));
@ -607,7 +607,7 @@ mod tests {
let tracked = registry.tracked();
assert_eq!(tracked.len(), 1);
assert!(matches!(&tracked[0].get_status(), TrackerStatus::Creating));
assert!(matches!(&tracked[0].get_status(), TaskStatus::Creating));
// Should only consider tasks complete once cannot register more Futures
let reclaimed: Vec<_> = registry.reclaim().collect();
@ -620,12 +620,12 @@ mod tests {
assert_eq!(reclaimed.len(), 1);
}
fn sorted(mut input: Vec<Tracker<i32>>) -> Vec<Tracker<i32>> {
fn sorted(mut input: Vec<TaskTracker<i32>>) -> Vec<TaskTracker<i32>> {
input.sort_unstable_by_key(|x| *x.metadata());
input
}
fn get_metadata(input: &[Tracker<i32>]) -> Vec<i32> {
fn get_metadata(input: &[TaskTracker<i32>]) -> Vec<i32> {
let mut ret: Vec<_> = input.iter().map(|x| *x.metadata()).collect();
ret.sort_unstable();
ret

View File

@ -6,13 +6,13 @@ use std::time::Instant;
use futures::{future::BoxFuture, prelude::*};
use pin_project::{pin_project, pinned_drop};
use super::{TrackerRegistration, TrackerState};
use super::{TaskRegistration, TrackerState};
use std::sync::Arc;
/// An extension trait that provides `self.track(registration)` allowing
/// associating this future with a `TrackerRegistration`
pub trait TrackedFutureExt: Future {
fn track(self, registration: TrackerRegistration) -> TrackedFuture<Self>
fn track(self, registration: TaskRegistration) -> TrackedFuture<Self>
where
Self: Sized,
{

View File

@ -1,31 +1,31 @@
use super::{Tracker, TrackerId, TrackerRegistration, TrackerRegistry};
use super::{TaskId, TaskRegistration, TaskRegistry, TaskTracker};
use hashbrown::hash_map::Entry;
use hashbrown::HashMap;
use observability_deps::tracing::info;
use std::hash::Hash;
/// A wrapper around a TrackerRegistry that automatically retains a history
/// A wrapper around a TaskRegistry that automatically retains a history
#[derive(Debug)]
pub struct TrackerRegistryWithHistory<T> {
registry: TrackerRegistry<T>,
history: SizeLimitedHashMap<TrackerId, Tracker<T>>,
pub struct TaskRegistryWithHistory<T> {
registry: TaskRegistry<T>,
history: SizeLimitedHashMap<TaskId, TaskTracker<T>>,
}
impl<T: std::fmt::Debug> TrackerRegistryWithHistory<T> {
impl<T: std::fmt::Debug> TaskRegistryWithHistory<T> {
pub fn new(capacity: usize) -> Self {
Self {
history: SizeLimitedHashMap::new(capacity),
registry: TrackerRegistry::new(),
registry: TaskRegistry::new(),
}
}
/// Register a new tracker in the registry
pub fn register(&mut self, metadata: T) -> (Tracker<T>, TrackerRegistration) {
pub fn register(&mut self, metadata: T) -> (TaskTracker<T>, TaskRegistration) {
self.registry.register(metadata)
}
/// Get the tracker associated with a given id
pub fn get(&self, id: TrackerId) -> Option<Tracker<T>> {
pub fn get(&self, id: TaskId) -> Option<TaskTracker<T>> {
match self.history.get(&id) {
Some(x) => Some(x.clone()),
None => self.registry.get(id),
@ -37,14 +37,14 @@ impl<T: std::fmt::Debug> TrackerRegistryWithHistory<T> {
}
/// Returns a list of trackers, including those that are no longer running
pub fn tracked(&self) -> Vec<Tracker<T>> {
pub fn tracked(&self) -> Vec<TaskTracker<T>> {
let mut tracked = self.registry.tracked();
tracked.extend(self.history.values().cloned());
tracked
}
/// Returns a list of active trackers
pub fn running(&self) -> Vec<Tracker<T>> {
pub fn running(&self) -> Vec<TaskTracker<T>> {
self.registry.running()
}
@ -165,14 +165,14 @@ mod tests {
}
#[test]
fn test_tracker_archive() {
let compare = |expected_ids: &[TrackerId], archive: &TrackerRegistryWithHistory<i32>| {
fn test_registry_archive() {
let compare = |expected_ids: &[TaskId], archive: &TaskRegistryWithHistory<i32>| {
let mut collected: Vec<_> = archive.history.values().map(|x| x.id()).collect();
collected.sort();
assert_eq!(&collected, expected_ids);
};
let mut archive = TrackerRegistryWithHistory::new(4);
let mut archive = TaskRegistryWithHistory::new(4);
for i in 0..=3 {
archive.register(i);
@ -180,25 +180,16 @@ mod tests {
archive.reclaim();
compare(
&[TrackerId(0), TrackerId(1), TrackerId(2), TrackerId(3)],
&archive,
);
compare(&[TaskId(0), TaskId(1), TaskId(2), TaskId(3)], &archive);
for i in 4..=7 {
archive.register(i);
}
compare(
&[TrackerId(0), TrackerId(1), TrackerId(2), TrackerId(3)],
&archive,
);
compare(&[TaskId(0), TaskId(1), TaskId(2), TaskId(3)], &archive);
archive.reclaim();
compare(
&[TrackerId(4), TrackerId(5), TrackerId(6), TrackerId(7)],
&archive,
);
compare(&[TaskId(4), TaskId(5), TaskId(6), TaskId(7)], &archive);
}
}

View File

@ -1,15 +1,15 @@
use super::{Tracker, TrackerRegistration};
use super::{TaskRegistration, TaskTracker};
use hashbrown::HashMap;
use observability_deps::tracing::debug;
use std::str::FromStr;
use std::sync::Arc;
/// Every future registered with a `TrackerRegistry` is assigned a unique
/// `TrackerId`
/// Every future registered with a `TaskRegistry` is assigned a unique
/// `TaskId`
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TrackerId(pub(super) usize);
pub struct TaskId(pub(super) usize);
impl FromStr for TrackerId {
impl FromStr for TaskId {
type Err = std::num::ParseIntError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
@ -17,7 +17,7 @@ impl FromStr for TrackerId {
}
}
impl ToString for TrackerId {
impl ToString for TaskId {
fn to_string(&self) -> String {
self.0.to_string()
}
@ -25,8 +25,8 @@ impl ToString for TrackerId {
/// Internal data stored by TrackerRegistry
#[derive(Debug)]
struct TrackerSlot<T> {
tracker: Tracker<T>,
struct TaskSlot<T> {
tracker: TaskTracker<T>,
watch: tokio::sync::watch::Sender<bool>,
}
@ -35,42 +35,42 @@ struct TrackerSlot<T> {
///
/// Additionally can trigger graceful cancellation of registered futures
#[derive(Debug)]
pub struct TrackerRegistry<T> {
pub struct TaskRegistry<T> {
next_id: usize,
trackers: HashMap<TrackerId, TrackerSlot<T>>,
tasks: HashMap<TaskId, TaskSlot<T>>,
}
impl<T> Default for TrackerRegistry<T> {
impl<T> Default for TaskRegistry<T> {
fn default() -> Self {
Self {
next_id: 0,
trackers: Default::default(),
tasks: Default::default(),
}
}
}
impl<T> TrackerRegistry<T> {
impl<T> TaskRegistry<T> {
pub fn new() -> Self {
Default::default()
}
/// Register a new tracker in the registry
pub fn register(&mut self, metadata: T) -> (Tracker<T>, TrackerRegistration) {
let id = TrackerId(self.next_id);
pub fn register(&mut self, metadata: T) -> (TaskTracker<T>, TaskRegistration) {
let id = TaskId(self.next_id);
self.next_id += 1;
let (sender, receiver) = tokio::sync::watch::channel(false);
let registration = TrackerRegistration::new(receiver);
let registration = TaskRegistration::new(receiver);
let tracker = Tracker {
let tracker = TaskTracker {
id,
metadata: Arc::new(metadata),
state: Arc::clone(&registration.state),
};
self.trackers.insert(
self.tasks.insert(
id,
TrackerSlot {
TaskSlot {
tracker: tracker.clone(),
watch: sender,
},
@ -81,8 +81,8 @@ impl<T> TrackerRegistry<T> {
/// Removes completed tasks from the registry and returns an iterator of
/// those removed
pub fn reclaim(&mut self) -> impl Iterator<Item = Tracker<T>> + '_ {
self.trackers
pub fn reclaim(&mut self) -> impl Iterator<Item = TaskTracker<T>> + '_ {
self.tasks
.drain_filter(|_, v| v.tracker.is_complete())
.map(|(_, v)| {
if let Err(error) = v.watch.send(true) {
@ -93,26 +93,23 @@ impl<T> TrackerRegistry<T> {
})
}
pub fn get(&self, id: TrackerId) -> Option<Tracker<T>> {
self.trackers.get(&id).map(|x| x.tracker.clone())
pub fn get(&self, id: TaskId) -> Option<TaskTracker<T>> {
self.tasks.get(&id).map(|x| x.tracker.clone())
}
/// Returns the number of tracked tasks
pub fn tracked_len(&self) -> usize {
self.trackers.len()
self.tasks.len()
}
/// Returns a list of trackers, including those that are no longer running
pub fn tracked(&self) -> Vec<Tracker<T>> {
self.trackers
.iter()
.map(|(_, v)| v.tracker.clone())
.collect()
pub fn tracked(&self) -> Vec<TaskTracker<T>> {
self.tasks.iter().map(|(_, v)| v.tracker.clone()).collect()
}
/// Returns a list of active trackers
pub fn running(&self) -> Vec<Tracker<T>> {
self.trackers
pub fn running(&self) -> Vec<TaskTracker<T>> {
self.tasks
.iter()
.filter_map(|(_, v)| {
if !v.tracker.is_complete() {