feat: partition buffer chunks from the table buffer (#25304)

praveen/update-core-revision
Trevor Hilton 2024-09-10 11:22:59 -07:00 committed by GitHub
parent ad2ca83d72
commit 68ea7fc428
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 161 additions and 45 deletions

View File

@ -909,7 +909,7 @@ mod tests {
"+-----+----------------------+",
];
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
assert_batches_sorted_eq!(&expected, &actual);
let _ = write_buffer
.write_lp(
@ -931,7 +931,7 @@ mod tests {
"+-----+----------------------+",
];
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
assert_batches_sorted_eq!(&expected, &actual);
// trigger snapshot with a third write, creating parquet files
let _ = write_buffer
@ -971,7 +971,7 @@ mod tests {
"+-----+----------------------+",
];
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
assert_batches_sorted_eq!(&expected, &actual);
// now validate that buffered data and parquet data are all returned
let _ = write_buffer
@ -996,10 +996,10 @@ mod tests {
"+-----+----------------------+",
];
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
assert_batches_sorted_eq!(&expected, &actual);
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
assert_batches_sorted_eq!(&expected, &actual);
// and now replay in a new write buffer and attempt to write
let (last_cache, catalog) = write_buffer
@ -1032,7 +1032,7 @@ mod tests {
// verify the data is still there
let actual = get_table_batches(&write_buffer, "foo", "cpu", &ctx).await;
assert_batches_eq!(&expected, &actual);
assert_batches_sorted_eq!(&expected, &actual);
// now write some new data
let _ = write_buffer

View File

@ -77,51 +77,41 @@ impl QueryableBuffer {
let schema = table.schema.clone();
let arrow_schema = schema.as_arrow();
let mut chunks: Vec<Arc<dyn QueryChunk>> = vec![];
let buffer = self.buffer.read();
let Some(db_buffer) = buffer.db_to_table.get(db_schema.name.as_ref()) else {
return Ok(chunks);
return Ok(vec![]);
};
let Some(table_buffer) = db_buffer.get(table_name) else {
return Ok(chunks);
return Ok(vec![]);
};
let batches = table_buffer
.record_batches(Arc::clone(&arrow_schema), filters)
.map_err(|e| DataFusionError::Execution(format!("error getting batches {}", e)))?;
let timestamp_min_max = table_buffer.timestamp_min_max();
let row_count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
let chunk_stats = create_chunk_statistics(
Some(row_count),
&schema,
Some(timestamp_min_max),
&NoColumnRanges,
);
chunks.push(Arc::new(BufferChunk {
batches,
schema: schema.clone(),
stats: Arc::new(chunk_stats),
partition_id: TransitionPartitionId::new(
TableId::new(0),
&PartitionKey::from("buffer_partition"),
),
sort_key: None,
id: ChunkId::new(),
chunk_order: ChunkOrder::new(
chunks
.len()
.try_into()
.expect("should never have this many chunks"),
),
}));
Ok(chunks)
Ok(table_buffer
.partitioned_record_batches(Arc::clone(&arrow_schema), filters)
.map_err(|e| DataFusionError::Execution(format!("error getting batches {}", e)))?
.into_iter()
.map(|(gen_time, (ts_min_max, batches))| {
let row_count = batches.iter().map(|b| b.num_rows()).sum::<usize>();
let chunk_stats = create_chunk_statistics(
Some(row_count),
&schema,
Some(ts_min_max),
&NoColumnRanges,
);
Arc::new(BufferChunk {
batches,
schema: schema.clone(),
stats: Arc::new(chunk_stats),
partition_id: TransitionPartitionId::new(
TableId::new(0),
&PartitionKey::from(gen_time.to_string()),
),
sort_key: None,
id: ChunkId::new(),
chunk_order: ChunkOrder::new(i64::MAX),
}) as Arc<dyn QueryChunk>
})
.collect())
}
/// Called when the wal has persisted a new file. Buffer the contents in memory and update the last cache so the data is queryable.

View File

@ -62,6 +62,46 @@ impl TableBuffer {
buffer_chunk.add_rows(rows);
}
/// Produce a partitioned set of record batches along with their min/max timestamp
///
/// The partitions are stored and returned in a `HashMap`, keyed on the generation time.
pub fn partitioned_record_batches(
&self,
schema: SchemaRef,
filter: &[Expr],
) -> Result<HashMap<i64, (TimestampMinMax, Vec<RecordBatch>)>> {
let mut batches = HashMap::new();
for sc in &self.snapshotting_chunks {
let cols: std::result::Result<Vec<_>, _> = schema
.fields()
.iter()
.map(|f| {
let col = sc
.record_batch
.column_by_name(f.name())
.ok_or(Error::FieldNotFound(f.name().to_string()));
col.cloned()
})
.collect();
let cols = cols?;
let rb = RecordBatch::try_new(schema.clone(), cols)?;
let (ts, v) = batches
.entry(sc.chunk_time)
.or_insert_with(|| (sc.timestamp_min_max, Vec::new()));
*ts = ts.union(&sc.timestamp_min_max);
v.push(rb);
}
for (t, c) in &self.chunk_time_to_chunks {
let ts_min_max = TimestampMinMax::new(c.timestamp_min, c.timestamp_max);
let (ts, v) = batches
.entry(*t)
.or_insert_with(|| (ts_min_max, Vec::new()));
*ts = ts.union(&ts_min_max);
v.push(c.record_batch(schema.clone(), filter)?);
}
Ok(batches)
}
pub fn record_batches(&self, schema: SchemaRef, filter: &[Expr]) -> Result<Vec<RecordBatch>> {
let mut batches =
Vec::with_capacity(self.snapshotting_chunks.len() + self.chunk_time_to_chunks.len());
@ -648,11 +688,97 @@ impl Builder {
#[cfg(test)]
mod tests {
use super::*;
use arrow_util::assert_batches_eq;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion::common::Column;
use influxdb3_wal::Field;
use schema::{InfluxFieldType, SchemaBuilder};
#[test]
fn partitioned_table_buffer_batches() {
let mut table_buffer = TableBuffer::new(&["tag"], SortKey::empty());
let schema = SchemaBuilder::with_capacity(3)
.tag("tag")
.influx_field("val", InfluxFieldType::String)
.timestamp()
.build()
.unwrap();
for t in 0..10 {
let offset = t * 10;
let rows = vec![
Row {
time: offset + 1,
fields: vec![
Field {
name: "tag".into(),
value: FieldData::Tag("a".to_string()),
},
Field {
name: "val".into(),
value: FieldData::String(format!("thing {t}-1")),
},
Field {
name: "time".into(),
value: FieldData::Timestamp(offset + 1),
},
],
},
Row {
time: offset + 2,
fields: vec![
Field {
name: "tag".into(),
value: FieldData::Tag("b".to_string()),
},
Field {
name: "val".into(),
value: FieldData::String(format!("thing {t}-2")),
},
Field {
name: "time".into(),
value: FieldData::Timestamp(offset + 2),
},
],
},
];
table_buffer.buffer_chunk(offset, rows);
}
let partitioned_batches = table_buffer
.partitioned_record_batches(schema.as_arrow(), &[])
.unwrap();
println!("{partitioned_batches:#?}");
assert_eq!(10, partitioned_batches.len());
for t in 0..10 {
let offset = t * 10;
let (ts_min_max, batches) = partitioned_batches.get(&offset).unwrap();
assert_eq!(TimestampMinMax::new(offset + 1, offset + 2), *ts_min_max);
assert_batches_sorted_eq!(
[
"+-----+-----------+--------------------------------+",
"| tag | val | time |",
"+-----+-----------+--------------------------------+",
format!(
"| a | thing {t}-1 | 1970-01-01T00:00:00.{:0>9}Z |",
offset + 1
)
.as_str(),
format!(
"| b | thing {t}-2 | 1970-01-01T00:00:00.{:0>9}Z |",
offset + 2
)
.as_str(),
"+-----+-----------+--------------------------------+",
],
batches
);
}
}
#[test]
fn tag_row_index() {
let mut table_buffer = TableBuffer::new(&["tag"], SortKey::empty());