parent
b2caf54b3a
commit
ed44817ed1
|
@ -19,7 +19,7 @@ use futures::{Stream, StreamExt};
|
|||
use iox_catalog::interface::{get_table_schema_by_id, Catalog};
|
||||
use iox_query::exec::Executor;
|
||||
use iox_time::SystemProvider;
|
||||
use metric::U64Counter;
|
||||
use metric::{Attributes, Metric, U64Counter, U64Histogram, U64HistogramOptions};
|
||||
use mutable_batch::MutableBatch;
|
||||
use object_store::DynObjectStore;
|
||||
use observability_deps::tracing::{debug, warn};
|
||||
|
@ -94,6 +94,9 @@ pub struct IngesterData {
|
|||
|
||||
/// Backoff config
|
||||
backoff_config: BackoffConfig,
|
||||
|
||||
/// Metrics for file size of persisted Parquet files
|
||||
persisted_file_size_bytes: Metric<U64Histogram>,
|
||||
}
|
||||
|
||||
impl IngesterData {
|
||||
|
@ -104,13 +107,30 @@ impl IngesterData {
|
|||
sequencers: BTreeMap<SequencerId, SequencerData>,
|
||||
exec: Arc<Executor>,
|
||||
backoff_config: BackoffConfig,
|
||||
metrics: Arc<metric::Registry>,
|
||||
) -> Self {
|
||||
let persisted_file_size_bytes = metrics.register_metric_with_options(
|
||||
"ingester_persisted_file_size_bytes",
|
||||
"Size of files persisted by the ingester",
|
||||
|| {
|
||||
U64HistogramOptions::new([
|
||||
500 * 1024, // 500 KB
|
||||
1024 * 1024, // 1 MB
|
||||
3 * 1024 * 1024, // 3 MB
|
||||
10 * 1024 * 1024, // 10 MB
|
||||
30 * 1024 * 1024, // 30 MB
|
||||
u64::MAX, // Inf
|
||||
])
|
||||
},
|
||||
);
|
||||
|
||||
Self {
|
||||
store: ParquetStorage::new(object_store),
|
||||
catalog,
|
||||
sequencers,
|
||||
exec,
|
||||
backoff_config,
|
||||
persisted_file_size_bytes,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -344,6 +364,15 @@ impl Persister for IngesterData {
|
|||
.await
|
||||
.expect("retry forever");
|
||||
|
||||
// Record metrics
|
||||
let attributes = Attributes::from([(
|
||||
"sequencer_id",
|
||||
format!("{}", partition_info.partition.sequencer_id).into(),
|
||||
)]);
|
||||
self.persisted_file_size_bytes
|
||||
.recorder(attributes)
|
||||
.record(file_size as u64);
|
||||
|
||||
// and remove the persisted data from memory
|
||||
debug!(
|
||||
?partition_id,
|
||||
|
@ -1566,8 +1595,8 @@ pub(crate) type IngesterQueryPartitionStream =
|
|||
|
||||
/// Response streams for querier<>ingester requests.
|
||||
///
|
||||
/// The data structure is constructed to allow lazy/streaming data generation. For easier consumption according to the
|
||||
/// wire protocol, use the [`flatten`](Self::flatten) method.
|
||||
/// The data structure is constructed to allow lazy/streaming data generation. For easier
|
||||
/// consumption according to the wire protocol, use the [`flatten`](Self::flatten) method.
|
||||
pub struct IngesterQueryResponse {
|
||||
/// Stream of partitions.
|
||||
partitions: IngesterQueryPartitionStream,
|
||||
|
@ -1649,13 +1678,15 @@ pub enum FlatIngesterQueryResponse {
|
|||
|
||||
/// Start a new snapshot.
|
||||
///
|
||||
/// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition) message.
|
||||
/// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition)
|
||||
/// message.
|
||||
StartSnapshot {
|
||||
/// Snapshot schema.
|
||||
schema: Arc<arrow::datatypes::Schema>,
|
||||
},
|
||||
|
||||
/// Add a record batch to the snapshot that was announced by the last [`StartSnapshot`](Self::StartSnapshot) message.
|
||||
/// Add a record batch to the snapshot that was announced by the last
|
||||
/// [`StartSnapshot`](Self::StartSnapshot) message.
|
||||
RecordBatch {
|
||||
/// Record batch.
|
||||
batch: RecordBatch,
|
||||
|
@ -1796,6 +1827,7 @@ mod tests {
|
|||
sequencers,
|
||||
Arc::new(Executor::new(1)),
|
||||
BackoffConfig::default(),
|
||||
Arc::clone(&metrics),
|
||||
));
|
||||
|
||||
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);
|
||||
|
@ -1889,6 +1921,7 @@ mod tests {
|
|||
sequencers,
|
||||
Arc::new(Executor::new(1)),
|
||||
BackoffConfig::default(),
|
||||
Arc::clone(&metrics),
|
||||
));
|
||||
|
||||
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);
|
||||
|
@ -1943,7 +1976,7 @@ mod tests {
|
|||
|
||||
let manager = LifecycleManager::new(
|
||||
LifecycleConfig::new(1, 0, 0, Duration::from_secs(1), Duration::from_secs(1)),
|
||||
metrics,
|
||||
Arc::clone(&metrics),
|
||||
Arc::new(SystemProvider::new()),
|
||||
);
|
||||
|
||||
|
@ -2016,6 +2049,30 @@ mod tests {
|
|||
assert_eq!(pf.sequencer_id, sequencer1.id);
|
||||
assert!(pf.to_delete.is_none());
|
||||
|
||||
// This value should be recorded in the metrics asserted next; it is less than 500 KB
|
||||
assert_eq!(pf.file_size_bytes, 1252);
|
||||
|
||||
// verify metrics
|
||||
let persisted_file_size_bytes: Metric<U64Histogram> = metrics
|
||||
.get_instrument("ingester_persisted_file_size_bytes")
|
||||
.unwrap();
|
||||
|
||||
let observation = persisted_file_size_bytes
|
||||
.get_observer(&Attributes::from([(
|
||||
"sequencer_id",
|
||||
format!("{}", sequencer1.id).into(),
|
||||
)]))
|
||||
.unwrap()
|
||||
.fetch();
|
||||
assert_eq!(observation.sample_count(), 1);
|
||||
let buckets_with_counts: Vec<_> = observation
|
||||
.buckets
|
||||
.iter()
|
||||
.filter_map(|o| if o.count == 0 { None } else { Some(o.le) })
|
||||
.collect();
|
||||
// Only the < 500 KB bucket has a count
|
||||
assert_eq!(buckets_with_counts, &[500 * 1024]);
|
||||
|
||||
// verify it set a sort key on the partition in the catalog
|
||||
let partition_info = repos
|
||||
.partitions()
|
||||
|
@ -2083,6 +2140,7 @@ mod tests {
|
|||
sequencers,
|
||||
Arc::new(Executor::new(1)),
|
||||
BackoffConfig::default(),
|
||||
Arc::clone(&metrics),
|
||||
));
|
||||
|
||||
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);
|
||||
|
@ -2284,7 +2342,8 @@ mod tests {
|
|||
50, // max time of data to get deleted
|
||||
"city=Boston", // delete predicate
|
||||
);
|
||||
// two rows will get deleted, one from existing snapshot, one from the buffer being moved to snpashot
|
||||
// two rows will get deleted, one from existing snapshot, one from the buffer being moved
|
||||
// to snpashot
|
||||
p.buffer_tombstone(&exec, "restaurant", ts).await;
|
||||
|
||||
// verify data
|
||||
|
@ -2341,9 +2400,11 @@ mod tests {
|
|||
|
||||
// verify data
|
||||
assert!(p.data.buffer.is_none()); // always empty after delete
|
||||
// no snpashots becasue buffer has not data yet and the sanpshot was empty too
|
||||
// no snpashots becasue buffer has not data yet and the
|
||||
// snapshot was empty too
|
||||
assert_eq!(p.data.snapshots.len(), 0);
|
||||
assert_eq!(p.data.deletes_during_persisting.len(), 1); // tombstone added since data is persisting
|
||||
assert_eq!(p.data.deletes_during_persisting.len(), 1); // tombstone added since data is
|
||||
// persisting
|
||||
assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch)));
|
||||
|
||||
// ------------------------------------------
|
||||
|
@ -2362,7 +2423,7 @@ mod tests {
|
|||
assert_eq!(
|
||||
p.data.buffer.as_ref().unwrap().min_sequence_number,
|
||||
SequenceNumber::new(8)
|
||||
); // 1 newlly added mutable batch of 3 rows of data
|
||||
); // 1 newly added mutable batch of 3 rows of data
|
||||
assert_eq!(p.data.snapshots.len(), 0); // still empty
|
||||
assert_eq!(p.data.deletes_during_persisting.len(), 1);
|
||||
assert_eq!(p.data.persisting, Some(Arc::clone(&p_batch)));
|
||||
|
@ -2622,6 +2683,7 @@ mod tests {
|
|||
sequencers,
|
||||
Arc::new(Executor::new(1)),
|
||||
BackoffConfig::default(),
|
||||
Arc::clone(&metrics),
|
||||
));
|
||||
|
||||
let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id);
|
||||
|
|
|
@ -157,6 +157,7 @@ impl IngestHandlerImpl {
|
|||
sequencers,
|
||||
exec,
|
||||
BackoffConfig::default(),
|
||||
Arc::clone(&metric_registry),
|
||||
));
|
||||
|
||||
let ingester_data = Arc::clone(&data);
|
||||
|
|
|
@ -701,13 +701,14 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa
|
|||
sequencers,
|
||||
exec,
|
||||
backoff::BackoffConfig::default(),
|
||||
metrics,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterData {
|
||||
// Whatever data because they won't be used in the tests
|
||||
let metrics: Arc<metric::Registry> = Default::default();
|
||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
|
||||
let object_store = Arc::new(InMemory::new());
|
||||
let exec = Arc::new(iox_query::exec::Executor::new(1));
|
||||
|
||||
|
@ -745,6 +746,7 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa
|
|||
sequencers,
|
||||
exec,
|
||||
backoff::BackoffConfig::default(),
|
||||
metrics,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -704,6 +704,7 @@ impl MockIngester {
|
|||
sequencers,
|
||||
catalog.exec(),
|
||||
BackoffConfig::default(),
|
||||
catalog.metric_registry(),
|
||||
));
|
||||
|
||||
Self {
|
||||
|
|
Loading…
Reference in New Issue