fix: `GroupGenerator`/`Converter` panic (#6351)

Do not poll a ready future.
pull/24376/head
Marco Neumann 2022-12-08 11:08:21 +00:00 committed by GitHub
parent 080aff8f71
commit c25afda6cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 212 additions and 14 deletions

View File

@ -503,13 +503,27 @@ impl Stream for SeriesSetConverterStream {
pub struct GroupGenerator {
group_columns: Vec<Arc<str>>,
memory_manager: Arc<MemoryManager>,
collector_buffered_size_max: usize,
}
impl GroupGenerator {
pub fn new(group_columns: Vec<Arc<str>>, memory_manager: Arc<MemoryManager>) -> Self {
Self::new_with_buffered_size_max(
group_columns,
memory_manager,
Collector::<()>::DEFAULT_ALLOCATION_BUFFER_SIZE,
)
}
fn new_with_buffered_size_max(
group_columns: Vec<Arc<str>>,
memory_manager: Arc<MemoryManager>,
collector_buffered_size_max: usize,
) -> Self {
Self {
group_columns,
memory_manager,
collector_buffered_size_max,
}
}
@ -524,7 +538,13 @@ impl GroupGenerator {
S: Stream<Item = Result<Series, DataFusionError>> + Send,
{
let series = Box::pin(series);
let mut series = Collector::new(series, self.group_columns, self.memory_manager).await?;
let mut series = Collector::new(
series,
self.group_columns,
self.memory_manager,
self.collector_buffered_size_max,
)
.await?;
// Potential optimization is to skip this sort if we are
// grouping by a prefix of the tags for a single measurement
@ -688,10 +708,7 @@ impl SortableSeries {
/// allocations with a [`MemoryManager`].
///
/// This avoids unbounded memory growth when merging multiple `Series` in memory
struct Collector<S>
where
S: Stream<Item = Result<Series, DataFusionError>> + Send + Unpin,
{
struct Collector<S> {
/// The inner stream was fully drained.
inner_done: bool,
@ -715,6 +732,9 @@ where
/// NOT batched and we want to avoid costly memory allocations checks with the [`MemoryManager`] for every single element.
buffered_size: usize,
/// Maximum [buffered size](Self::buffered_size).
buffered_size_max: usize,
/// Our memory consumer.
///
/// This is optional because for [`MemoryConsumerProxy::alloc`], we need to move this into
@ -728,14 +748,21 @@ where
Option<BoxFuture<'static, (MemoryConsumerProxy, Result<(), DataFusionError>)>>,
}
impl<S> Collector<S> {
/// Maximum [buffered size](Self::buffered_size).
const DEFAULT_ALLOCATION_BUFFER_SIZE: usize = 1024 * 1024;
}
impl<S> Collector<S>
where
S: Stream<Item = Result<Series, DataFusionError>> + Send + Unpin,
{
/// Maximum [buffered size](Self::buffered_size).
const ALLOCATION_BUFFER_SIZE: usize = 1024 * 1024;
fn new(inner: S, group_columns: Vec<Arc<str>>, memory_manager: Arc<MemoryManager>) -> Self {
fn new(
inner: S,
group_columns: Vec<Arc<str>>,
memory_manager: Arc<MemoryManager>,
buffered_size_max: usize,
) -> Self {
let mem_proxy =
MemoryConsumerProxy::new("Collector stream", MemoryConsumerId::new(0), memory_manager);
Self {
@ -745,6 +772,7 @@ where
group_columns,
collected: Vec::with_capacity(0),
buffered_size: 0,
buffered_size_max,
mem_proxy: Some(mem_proxy),
mem_proxy_alloc_fut: None,
}
@ -786,6 +814,7 @@ where
let (mem_proxy, res) = ready!(fut.poll_unpin(cx));
assert!(this.mem_proxy.is_none());
this.mem_proxy = Some(mem_proxy);
this.mem_proxy_alloc_fut = None;
if let Err(e) = res {
// poison this future
this.outer_done = true;
@ -808,7 +837,7 @@ where
.push_accounted(series, &mut this.buffered_size);
// should we clear our allocation buffer?
if this.buffered_size > Self::ALLOCATION_BUFFER_SIZE {
if this.buffered_size > this.buffered_size_max {
this.alloc();
continue;
}
@ -1629,6 +1658,115 @@ mod tests {
assert_matches!(err, DataFusionError::ResourcesExhausted(_));
}
#[tokio::test]
async fn test_group_generator_no_mem_limit() {
let memory_manager =
MemoryManager::new(MemoryManagerConfig::try_new_limit(usize::MAX, 1.0).unwrap());
// use a generator w/ a low buffered allocation to force multiple `alloc` calls
let ggen =
GroupGenerator::new_with_buffered_size_max(vec![Arc::from("g")], memory_manager, 1);
let input = futures::stream::iter([
Ok(Series {
tags: vec![Tag {
key: Arc::from("g"),
value: Arc::from("x"),
}],
data: Data::IntegerPoints {
timestamps: vec![1],
values: vec![1],
},
}),
Ok(Series {
tags: vec![Tag {
key: Arc::from("g"),
value: Arc::from("y"),
}],
data: Data::IntegerPoints {
timestamps: vec![2],
values: vec![2],
},
}),
Ok(Series {
tags: vec![Tag {
key: Arc::from("g"),
value: Arc::from("x"),
}],
data: Data::IntegerPoints {
timestamps: vec![3],
values: vec![3],
},
}),
Ok(Series {
tags: vec![Tag {
key: Arc::from("g"),
value: Arc::from("x"),
}],
data: Data::IntegerPoints {
timestamps: vec![4],
values: vec![4],
},
}),
]);
let actual = ggen
.group(input)
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let expected = vec![
Either::Group(Group {
tag_keys: vec![Arc::from("g")],
partition_key_vals: vec![Arc::from("x")],
}),
Either::Series(Series {
tags: vec![Tag {
key: Arc::from("g"),
value: Arc::from("x"),
}],
data: Data::IntegerPoints {
timestamps: vec![1],
values: vec![1],
},
}),
Either::Series(Series {
tags: vec![Tag {
key: Arc::from("g"),
value: Arc::from("x"),
}],
data: Data::IntegerPoints {
timestamps: vec![3],
values: vec![3],
},
}),
Either::Series(Series {
tags: vec![Tag {
key: Arc::from("g"),
value: Arc::from("x"),
}],
data: Data::IntegerPoints {
timestamps: vec![4],
values: vec![4],
},
}),
Either::Group(Group {
tag_keys: vec![Arc::from("g")],
partition_key_vals: vec![Arc::from("y")],
}),
Either::Series(Series {
tags: vec![Tag {
key: Arc::from("g"),
value: Arc::from("y"),
}],
data: Data::IntegerPoints {
timestamps: vec![2],
values: vec![2],
},
}),
];
assert_eq!(actual, expected);
}
fn assert_series_set<const N: usize, const M: usize>(
set: &SeriesSet,
table_name: &'static str,

View File

@ -28,7 +28,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A name=value pair used to represent a series's tag
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Tag {
pub key: Arc<str>,
pub value: Arc<str>,
@ -48,7 +48,7 @@ impl fmt::Display for Tag {
}
/// Represents a single logical TimeSeries
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Series {
/// key = value pairs that define this series
/// (including the _measurement and _field that correspond to table name and column name)
@ -143,6 +143,66 @@ impl Data {
}
}
impl PartialEq for Data {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(
Self::FloatPoints {
timestamps: l_timestamps,
values: l_values,
},
Self::FloatPoints {
timestamps: r_timestamps,
values: r_values,
},
) => l_timestamps == r_timestamps && l_values == r_values,
(
Self::IntegerPoints {
timestamps: l_timestamps,
values: l_values,
},
Self::IntegerPoints {
timestamps: r_timestamps,
values: r_values,
},
) => l_timestamps == r_timestamps && l_values == r_values,
(
Self::UnsignedPoints {
timestamps: l_timestamps,
values: l_values,
},
Self::UnsignedPoints {
timestamps: r_timestamps,
values: r_values,
},
) => l_timestamps == r_timestamps && l_values == r_values,
(
Self::BooleanPoints {
timestamps: l_timestamps,
values: l_values,
},
Self::BooleanPoints {
timestamps: r_timestamps,
values: r_values,
},
) => l_timestamps == r_timestamps && l_values == r_values,
(
Self::StringPoints {
timestamps: l_timestamps,
values: l_values,
},
Self::StringPoints {
timestamps: r_timestamps,
values: r_values,
},
) => l_timestamps == r_timestamps && l_values == r_values,
_ => false,
}
}
}
impl Eq for Data {}
/// Returns size of given vector of primitive types in bytes, EXCLUDING `vec` itself.
fn primitive_vec_size<T>(vec: &Vec<T>) -> usize {
std::mem::size_of::<T>() * vec.capacity()
@ -331,7 +391,7 @@ impl SeriesSet {
}
/// Represents a group of `Series`
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct Group {
/// Contains *ALL* tag keys (not just those used for grouping)
pub tag_keys: Vec<Arc<str>>,
@ -358,7 +418,7 @@ impl fmt::Display for Group {
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Either {
Series(Series),
Group(Group),