refactor: LifecycleStats tracks Namespace/TableId

Changes the lifecycle handle to also track the namespace + table ID in
addition to the existing shard ID.

Adds asserts to ensure the values never vary for a given partition.
pull/24376/head
Dom Dwyer 2022-09-29 16:55:09 +02:00
parent c213feb60c
commit e84186763f
3 changed files with 274 additions and 33 deletions

View File

@ -124,6 +124,8 @@ impl TableData {
let should_pause = lifecycle_handle.log_write(
partition_data.id(),
self.shard_id,
self.namespace_id,
self.table_id,
sequence_number,
batch.size(),
batch.rows(),

View File

@ -7,7 +7,7 @@
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use data_types::{PartitionId, SequenceNumber, ShardId};
use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardId, TableId};
use iox_time::{Time, TimeProvider};
use metric::{Metric, U64Counter};
use observability_deps::tracing::{error, info, warn};
@ -26,10 +26,13 @@ pub trait LifecycleHandle: Send + Sync + 'static {
/// Logs bytes written into a partition so that it can be tracked for the manager to
/// trigger persistence. Returns true if the ingester should pause consuming from the
/// write buffer so that persistence can catch up and free up memory.
#[allow(clippy::too_many_arguments)]
fn log_write(
&self,
partition_id: PartitionId,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
sequence_number: SequenceNumber,
bytes_written: usize,
rows_written: usize,
@ -60,6 +63,8 @@ impl LifecycleHandle for LifecycleHandleImpl {
&self,
partition_id: PartitionId,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
sequence_number: SequenceNumber,
bytes_written: usize,
rows_written: usize,
@ -73,6 +78,8 @@ impl LifecycleHandle for LifecycleHandleImpl {
.or_insert_with(|| PartitionLifecycleStats {
shard_id,
partition_id,
namespace_id,
table_id,
first_write: now,
last_write: now,
bytes_written: 0,
@ -80,6 +87,10 @@ impl LifecycleHandle for LifecycleHandleImpl {
first_sequence_number: sequence_number,
});
assert_eq!(stats.shard_id, shard_id);
assert_eq!(stats.namespace_id, namespace_id);
assert_eq!(stats.table_id, table_id);
stats.bytes_written += bytes_written;
stats.last_write = now;
stats.rows_written += rows_written;
@ -225,6 +236,10 @@ struct LifecycleStats {
struct PartitionLifecycleStats {
/// The shard this partition is under
shard_id: ShardId,
/// The namespace identifier
namespace_id: NamespaceId,
/// The table identifier
table_id: TableId,
/// The partition identifier
partition_id: PartitionId,
/// Time that the partition received its first write. This is reset anytime
@ -714,14 +729,36 @@ mod tests {
let h = m.handle();
// log first two writes at different times
assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(1), 1, 1));
assert!(!h.log_write(
PartitionId::new(1),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(1),
1,
1
));
time_provider.inc(Duration::from_nanos(10));
assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(2), 1, 1));
assert!(!h.log_write(
PartitionId::new(1),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(2),
1,
1
));
// log another write for different partition using a different handle
assert!(!m
.handle()
.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(3), 3, 3));
assert!(!m.handle().log_write(
PartitionId::new(2),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(3),
3,
3
));
let stats = m.stats();
assert_eq!(stats.total_bytes, 5);
@ -755,11 +792,27 @@ mod tests {
let h = m.handle();
// write more than the limit (10)
assert!(h.log_write(partition_id, shard_id, SequenceNumber::new(1), 15, 1));
assert!(h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(1),
15,
1
));
assert!(!h.can_resume_ingest());
// all subsequent writes should also indicate a pause
assert!(h.log_write(partition_id, shard_id, SequenceNumber::new(2), 1, 1));
assert!(h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(2),
1,
1
));
assert!(!h.can_resume_ingest());
// persist the partition
@ -768,7 +821,15 @@ mod tests {
// ingest can resume
assert!(h.can_resume_ingest());
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(3), 3, 1));
assert!(!h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(3),
3,
1
));
}
#[tokio::test]
@ -794,7 +855,15 @@ mod tests {
// write more than the limit (10) and don't get stopped, because the
// per-partition limit does not pause the server from ingesting.
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(1), 1, 50));
assert!(!h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(1),
1,
50
));
assert!(h.can_resume_ingest());
// Rows were counted
@ -808,7 +877,15 @@ mod tests {
}
// all subsequent writes should also be allowed
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(2), 1, 1));
assert!(!h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(2),
1,
1
));
assert!(h.can_resume_ingest());
// persist the partition
@ -823,7 +900,15 @@ mod tests {
// ingest can continue
assert!(h.can_resume_ingest());
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(3), 1, 1));
assert!(!h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(3),
1,
1
));
}
#[tokio::test]
@ -842,7 +927,15 @@ mod tests {
let h = m.handle();
// write more than the limit (20)
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 25, 1);
h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(1),
25,
1,
);
// can not resume ingest as we are overall the pause ingest limit
assert!(!h.can_resume_ingest());
@ -870,7 +963,15 @@ mod tests {
// ingest can resume
assert!(h.can_resume_ingest());
assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(2), 3, 1));
assert!(!h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(2),
3,
1
));
}
#[tokio::test]
@ -893,7 +994,15 @@ mod tests {
let shard_id = ShardId::new(1);
let h = m.handle();
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10, 1);
h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(1),
10,
1,
);
m.maybe_persist(&persister).await;
let stats = m.stats();
@ -908,7 +1017,15 @@ mod tests {
// write in data for a new partition so we can be sure it isn't persisted, but the older
// one is
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1);
h.log_write(
PartitionId::new(2),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(2),
6,
1,
);
m.maybe_persist(&persister).await;
@ -948,7 +1065,15 @@ mod tests {
let shard_id = ShardId::new(1);
let h = m.handle();
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10, 1);
h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(1),
10,
1,
);
m.maybe_persist(&persister).await;
let stats = m.stats();
@ -963,8 +1088,24 @@ mod tests {
// write in data for a new partition so we can be sure it isn't persisted, but the older
// one is
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1);
h.log_write(PartitionId::new(3), shard_id, SequenceNumber::new(3), 7, 1);
h.log_write(
PartitionId::new(2),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(2),
6,
1,
);
h.log_write(
PartitionId::new(3),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(3),
7,
1,
);
m.maybe_persist(&persister).await;
@ -1005,7 +1146,15 @@ mod tests {
let partition_id = PartitionId::new(1);
let persister = Arc::new(TestPersister::default());
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 4, 1);
h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(1),
4,
1,
);
m.maybe_persist(&persister).await;
@ -1015,8 +1164,24 @@ mod tests {
assert!(!persister.persist_called_for(partition_id));
// introduce a new partition under the limit to verify it doesn't get taken with the other
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 3, 1);
h.log_write(partition_id, shard_id, SequenceNumber::new(3), 5, 1);
h.log_write(
PartitionId::new(2),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(2),
3,
1,
);
h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(3),
5,
1,
);
m.maybe_persist(&persister).await;
@ -1055,8 +1220,24 @@ mod tests {
let h = m.handle();
let partition_id = PartitionId::new(1);
let persister = Arc::new(TestPersister::default());
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 8, 1);
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 13, 1);
h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(1),
8,
1,
);
h.log_write(
PartitionId::new(2),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(2),
13,
1,
);
m.maybe_persist(&persister).await;
@ -1072,8 +1253,24 @@ mod tests {
);
// add that partition back in over size
h.log_write(partition_id, shard_id, SequenceNumber::new(3), 20, 1);
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(4), 21, 1);
h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(3),
20,
1,
);
h.log_write(
PartitionId::new(2),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(4),
21,
1,
);
// both partitions should now need to be persisted to bring us below the mem threshold of
// 20.
@ -1118,11 +1315,35 @@ mod tests {
} = TestLifecycleManger::new(config);
let h = m.handle();
let persister = Arc::new(TestPersister::default());
h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(1), 4, 1);
h.log_write(
PartitionId::new(1),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(1),
4,
1,
);
time_provider.inc(Duration::from_nanos(1));
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1);
h.log_write(
PartitionId::new(2),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(2),
6,
1,
);
time_provider.inc(Duration::from_nanos(1));
h.log_write(PartitionId::new(3), shard_id, SequenceNumber::new(3), 3, 1);
h.log_write(
PartitionId::new(3),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(3),
3,
1,
);
m.maybe_persist(&persister).await;
@ -1162,7 +1383,15 @@ mod tests {
let persister = Arc::new(TestPersister::default());
let shard_id = ShardId::new(1);
h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10, 1);
h.log_write(
partition_id,
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(1),
10,
1,
);
m.maybe_persist(&persister).await;
let stats = m.stats();
@ -1176,7 +1405,15 @@ mod tests {
assert!(!persister.persist_called_for(partition_id));
// write in data for a new partition so we can be sure it isn't persisted, but the older one is
h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1);
h.log_write(
PartitionId::new(2),
shard_id,
NamespaceId::new(91),
TableId::new(92),
SequenceNumber::new(2),
6,
1,
);
m.maybe_persist(&persister).await;

View File

@ -3,8 +3,8 @@ use super::DbScenario;
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types::{
DeletePredicate, IngesterMapping, NonEmptyString, ParquetFileId, PartitionId, PartitionKey,
Sequence, SequenceNumber, ShardId, ShardIndex, TombstoneId,
DeletePredicate, IngesterMapping, NamespaceId, NonEmptyString, ParquetFileId, PartitionId,
PartitionKey, Sequence, SequenceNumber, ShardId, ShardIndex, TableId, TombstoneId,
};
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use futures::StreamExt;
@ -974,6 +974,8 @@ impl LifecycleHandle for NoopLifecycleHandle {
&self,
_partition_id: PartitionId,
_shard_id: ShardId,
_namespace_id: NamespaceId,
_table_id: TableId,
_sequence_number: SequenceNumber,
_bytes_written: usize,
_rows_written: usize,