feat: wire up query from parquet files (#24749)

* feat: wire up query from parquet files

This adds the functionality to query from Parquet files that have been persisted in object storage. Any segments that are loaded up on boot up will be included (limit of 1k segments at the time of this PR). In a follow on PR we should add a good end-to-end test that has persistence and query through the main API (might be tricky).

* Move BufferChunk and ParquetChunk into chunk module
* Add object_store_url to Persister
* Register object_store on server startup
* Add loaded persisted_segments to SegmentState

* refactor: PR feedback
pull/24761/head
Paul Dix 2024-03-12 09:47:32 -04:00 committed by GitHub
parent db77ed0a19
commit 01d33f69b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 497 additions and 175 deletions

3
Cargo.lock generated
View File

@ -2475,6 +2475,7 @@ dependencies = [
"clap",
"clap_blocks",
"console-subscriber",
"datafusion_util",
"dotenvy",
"futures",
"hex",
@ -2616,6 +2617,7 @@ dependencies = [
"observability_deps",
"parking_lot",
"parquet",
"parquet_file",
"pretty_assertions",
"schema",
"serde",
@ -2624,6 +2626,7 @@ dependencies = [
"test_helpers",
"thiserror",
"tokio",
"url",
]
[[package]]

View File

@ -9,6 +9,7 @@ license.workspace = true
# Core Crates
authz.workspace = true
clap_blocks.workspace = true
datafusion_util.workspace = true
iox_query.workspace = true
iox_time.workspace = true
ioxd_common.workspace = true

View File

@ -7,6 +7,7 @@ use clap_blocks::{
object_store::{make_object_store, ObjectStoreConfig},
socket_addr::SocketAddr,
};
use datafusion_util::config::register_iox_object_store;
use influxdb3_server::{
auth::AllOrNothingAuthorizer, builder::ServerBuilder, query_executor::QueryExecutorImpl, serve,
CommonServerState,
@ -15,7 +16,7 @@ use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::wal::WalImpl;
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::SegmentDuration;
use iox_query::exec::{Executor, ExecutorConfig};
use iox_query::exec::{Executor, ExecutorConfig, ExecutorType};
use iox_time::SystemProvider;
use ioxd_common::reexport::trace_http::ctx::TraceHeaderParser;
use object_store::DynObjectStore;
@ -242,6 +243,8 @@ pub async fn command(config: Config) -> Result<()> {
metric_registry: Arc::clone(&metrics),
mem_pool_size: config.exec_mem_pool_bytes.bytes(),
}));
let runtime_env = exec.new_context(ExecutorType::Query).inner().runtime_env();
register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store));
let trace_header_parser = TraceHeaderParser::new()
.with_jaeger_trace_context_header_name(

View File

@ -5,12 +5,10 @@ use arrow::record_batch::RecordBatch;
use arrow_schema::ArrowError;
use async_trait::async_trait;
use data_types::NamespaceId;
use data_types::{ChunkId, ChunkOrder, TransitionPartitionId};
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::CatalogProvider;
use datafusion::common::arrow::array::StringArray;
use datafusion::common::arrow::datatypes::{DataType, Field, Schema as DatafusionSchema};
use datafusion::common::Statistics;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
@ -31,12 +29,11 @@ use iox_query::query_log::QueryLog;
use iox_query::query_log::QueryText;
use iox_query::query_log::StateReceived;
use iox_query::QueryNamespaceProvider;
use iox_query::{QueryChunk, QueryChunkData, QueryNamespace};
use iox_query::{QueryChunk, QueryNamespace};
use iox_query_influxql::frontend::planner::InfluxQLQueryPlanner;
use iox_query_params::StatementParams;
use metric::Registry;
use observability_deps::tracing::{debug, info, trace};
use schema::sort::SortKey;
use schema::Schema;
use serde::{Deserialize, Serialize};
use serde_arrow::schema::SchemaLike;
@ -495,48 +492,3 @@ impl<B: WriteBuffer> TableProvider for QueryTable<B> {
provider.scan(ctx, projection, &filters, limit).await
}
}
#[derive(Debug)]
pub struct ParquetChunk {}
impl QueryChunk for ParquetChunk {
fn stats(&self) -> Arc<Statistics> {
todo!()
}
fn schema(&self) -> &Schema {
todo!()
}
fn partition_id(&self) -> &TransitionPartitionId {
todo!()
}
fn sort_key(&self) -> Option<&SortKey> {
todo!()
}
fn id(&self) -> ChunkId {
todo!()
}
fn may_contain_pk_duplicates(&self) -> bool {
todo!()
}
fn data(&self) -> QueryChunkData {
todo!()
}
fn chunk_type(&self) -> &str {
todo!()
}
fn order(&self) -> ChunkOrder {
todo!()
}
fn as_any(&self) -> &dyn Any {
todo!()
}
}

View File

@ -13,6 +13,7 @@ influxdb-line-protocol.workspace = true
iox_catalog.workspace = true
iox_query.workspace = true
iox_time.workspace = true
parquet_file.workspace = true
observability_deps.workspace = true
schema.workspace = true
@ -34,6 +35,7 @@ serde_json.workspace = true
snap.workspace = true
bytes.workspace = true
futures-util.workspace = true
url.workspace = true
[dev-dependencies]
# Core Crates

View File

@ -0,0 +1,115 @@
use arrow::array::RecordBatch;
use data_types::{ChunkId, ChunkOrder, TransitionPartitionId};
use datafusion::common::Statistics;
use iox_query::{QueryChunk, QueryChunkData};
use parquet_file::storage::ParquetExecInput;
use schema::sort::SortKey;
use schema::Schema;
use std::any::Any;
use std::sync::Arc;
#[derive(Debug)]
pub struct BufferChunk {
pub(crate) batches: Vec<RecordBatch>,
pub(crate) schema: Schema,
pub(crate) stats: Arc<Statistics>,
pub(crate) partition_id: data_types::partition::TransitionPartitionId,
pub(crate) sort_key: Option<SortKey>,
pub(crate) id: data_types::ChunkId,
pub(crate) chunk_order: data_types::ChunkOrder,
}
impl QueryChunk for BufferChunk {
fn stats(&self) -> Arc<Statistics> {
Arc::clone(&self.stats)
}
fn schema(&self) -> &Schema {
&self.schema
}
fn partition_id(&self) -> &data_types::partition::TransitionPartitionId {
&self.partition_id
}
fn sort_key(&self) -> Option<&SortKey> {
self.sort_key.as_ref()
}
fn id(&self) -> data_types::ChunkId {
self.id
}
fn may_contain_pk_duplicates(&self) -> bool {
false
}
fn data(&self) -> QueryChunkData {
QueryChunkData::in_mem(self.batches.clone(), Arc::clone(self.schema.inner()))
}
fn chunk_type(&self) -> &str {
"BufferChunk"
}
fn order(&self) -> data_types::ChunkOrder {
self.chunk_order
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Debug)]
pub struct ParquetChunk {
pub(crate) schema: Schema,
pub(crate) stats: Arc<Statistics>,
pub(crate) partition_id: TransitionPartitionId,
pub(crate) sort_key: Option<SortKey>,
pub(crate) id: ChunkId,
pub(crate) chunk_order: ChunkOrder,
pub(crate) parquet_exec: ParquetExecInput,
}
impl QueryChunk for ParquetChunk {
fn stats(&self) -> Arc<Statistics> {
Arc::clone(&self.stats)
}
fn schema(&self) -> &Schema {
&self.schema
}
fn partition_id(&self) -> &TransitionPartitionId {
&self.partition_id
}
fn sort_key(&self) -> Option<&SortKey> {
self.sort_key.as_ref()
}
fn id(&self) -> ChunkId {
self.id
}
fn may_contain_pk_duplicates(&self) -> bool {
false
}
fn data(&self) -> QueryChunkData {
QueryChunkData::Parquet(self.parquet_exec.clone())
}
fn chunk_type(&self) -> &str {
"Parquet"
}
fn order(&self) -> ChunkOrder {
self.chunk_order
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@ -7,6 +7,7 @@
//! to be persisted. A new open segment will be created and new writes will be written to that segment.
pub mod catalog;
mod chunk;
pub mod paths;
pub mod persister;
pub mod wal;
@ -16,7 +17,8 @@ use crate::catalog::Catalog;
use crate::paths::{ParquetFilePath, SegmentWalFilePath};
use async_trait::async_trait;
use bytes::Bytes;
use data_types::NamespaceName;
use data_types::{NamespaceName, TimestampMinMax};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::SendableRecordBatchStream;
@ -306,6 +308,8 @@ impl SequenceNumber {
}
}
pub const DEFAULT_OBJECT_STORE_URL: &str = "iox://influxdb3/";
#[async_trait]
pub trait Persister: Debug + Send + Sync + 'static {
type Error;
@ -348,6 +352,14 @@ pub trait Persister: Debug + Send + Sync + 'static {
/// Returns the configured `ObjectStore` that data is loaded from and persisted to.
fn object_store(&self) -> Arc<dyn object_store::ObjectStore>;
// This is used by the query engine to know where to read parquet files from. This assumes
// that there is a `ParquetStorage` with an id of `influxdb3` and that this url has been
// registered with the query execution context. Kind of ugly here, but not sure where else
// to keep this.
fn object_store_url(&self) -> ObjectStoreUrl {
ObjectStoreUrl::parse(DEFAULT_OBJECT_STORE_URL).unwrap()
}
fn as_any(&self) -> &dyn Any;
}
@ -503,6 +515,15 @@ pub struct ParquetFile {
pub max_time: i64,
}
impl ParquetFile {
pub fn timestamp_min_max(&self) -> TimestampMinMax {
TimestampMinMax {
min: self.min_time,
max: self.max_time,
}
}
}
/// The summary data for a persisted parquet file in a segment.
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]

View File

@ -190,7 +190,6 @@ pub(crate) fn load_buffer_from_segment(
segment_reader.path().to_string(),
));
}
let segment_data = validated_write.valid_segmented_data.pop().unwrap();
for (table_name, table_batch) in segment_data.table_batches {
@ -523,6 +522,13 @@ impl ClosedBufferSegment {
Ok(persisted_segment)
}
pub fn table_buffer(&self, db_name: &str, table_name: &str) -> Option<TableBuffer> {
self.buffered_data
.database_buffers
.get(db_name)
.and_then(|db_buffer| db_buffer.table_buffers.get(table_name).cloned())
}
}
#[cfg(test)]

View File

@ -246,6 +246,7 @@ mod tests {
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))),
vec![open_segment, next_segment],
vec![],
vec![],
None,
)));
let flusher = WriteBufferFlusher::new(Arc::clone(&segment_state));

View File

@ -6,7 +6,7 @@ mod loader;
mod segment_state;
use crate::catalog::{Catalog, DatabaseSchema, TableDefinition, TIME_COLUMN_NAME};
use crate::write_buffer::buffer_segment::TableBuffer;
use crate::chunk::ParquetChunk;
use crate::write_buffer::flusher::WriteBufferFlusher;
use crate::write_buffer::loader::load_starting_state;
use crate::write_buffer::segment_state::{run_buffer_segment_persist_and_cleanup, SegmentState};
@ -14,24 +14,22 @@ use crate::{
persister, BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, Persister,
Precision, SegmentDuration, SegmentId, Wal, WalOp, WriteBuffer, WriteLineError,
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use data_types::{
column_type_from_field, ChunkId, ChunkOrder, ColumnType, NamespaceName, NamespaceNameError,
TableId, TransitionPartitionId,
};
use datafusion::common::{DataFusionError, Statistics};
use datafusion::common::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine};
use iox_query::chunk_statistics::create_chunk_statistics;
use iox_query::{QueryChunk, QueryChunkData};
use iox_query::QueryChunk;
use iox_time::{Time, TimeProvider};
use observability_deps::tracing::{debug, error, info};
use object_store::path::Path as ObjPath;
use object_store::ObjectMeta;
use observability_deps::tracing::{debug, error};
use parking_lot::{Mutex, RwLock};
use schema::sort::SortKey;
use schema::Schema;
use std::any::Any;
use parquet_file::storage::ParquetExecInput;
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
@ -82,9 +80,10 @@ pub struct WriteRequest<'a> {
}
#[derive(Debug)]
pub struct WriteBufferImpl<W, T> {
pub struct WriteBufferImpl<W, T, P> {
catalog: Arc<Catalog>,
segment_state: Arc<RwLock<SegmentState<T, W>>>,
persister: Arc<P>,
#[allow(dead_code)]
wal: Option<Arc<W>>,
write_buffer_flusher: WriteBufferFlusher,
@ -97,8 +96,8 @@ pub struct WriteBufferImpl<W, T> {
shutdown_segment_persist_tx: watch::Sender<()>,
}
impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
pub async fn new<P>(
impl<W: Wal, T: TimeProvider, P: Persister> WriteBufferImpl<W, T, P> {
pub async fn new(
persister: Arc<P>,
wal: Option<Arc<W>>,
time_provider: Arc<T>,
@ -112,6 +111,7 @@ impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
let now = time_provider.now();
let loaded_state =
load_starting_state(Arc::clone(&persister), wal.clone(), now, segment_duration).await?;
let segment_state = Arc::new(RwLock::new(SegmentState::new(
segment_duration,
loaded_state.last_segment_id,
@ -119,6 +119,7 @@ impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
Arc::clone(&time_provider),
loaded_state.open_segments,
loaded_state.persisting_buffer_segments,
loaded_state.persisted_segments,
wal.clone(),
)));
@ -127,11 +128,12 @@ impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
let segment_state_persister = Arc::clone(&segment_state);
let time_provider_persister = Arc::clone(&time_provider);
let wal_perister = wal.clone();
let cloned_persister = Arc::clone(&persister);
let (shutdown_segment_persist_tx, shutdown_rx) = watch::channel(());
let segment_persist_handle = tokio::task::spawn(async move {
run_buffer_segment_persist_and_cleanup(
persister,
cloned_persister,
segment_state_persister,
shutdown_rx,
time_provider_persister,
@ -143,6 +145,7 @@ impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
Ok(Self {
catalog: loaded_state.catalog,
segment_state,
persister,
wal,
write_buffer_flusher,
time_provider,
@ -193,66 +196,95 @@ impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
&self,
database_name: &str,
table_name: &str,
_filters: &[Expr],
_projection: Option<&Vec<usize>>,
_ctx: &SessionState,
filters: &[Expr],
projection: Option<&Vec<usize>>,
ctx: &SessionState,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
let db_schema = self
.catalog
.db_schema(database_name)
.ok_or_else(|| DataFusionError::Execution(format!("db {} not found", database_name)))?;
let table = db_schema
.tables
.get(table_name)
.ok_or_else(|| DataFusionError::Execution(format!("table {} not found", table_name)))?;
let schema = table.schema.clone();
let table_buffers = self.clone_table_buffers(database_name, table_name);
let chunks = table_buffers
.into_iter()
.map(|table_buffer| {
let batch = table_buffer.rows_to_record_batch(&schema, table.columns());
let batch_stats = create_chunk_statistics(
Some(table_buffer.row_count()),
&schema,
Some(table_buffer.timestamp_min_max()),
None,
);
let table_schema = {
let table = db_schema.tables.get(table_name).ok_or_else(|| {
DataFusionError::Execution(format!(
"table {} not found in db {}",
table_name, database_name
))
})?;
let chunk: Arc<dyn QueryChunk> = Arc::new(BufferChunk {
batches: vec![batch],
schema: schema.clone(),
stats: Arc::new(batch_stats),
partition_id: TransitionPartitionId::new(
TableId::new(0),
&table_buffer.segment_key,
),
sort_key: None,
id: ChunkId::new(),
chunk_order: ChunkOrder::new(0),
});
table.schema.clone()
};
chunk
})
.collect();
let segment_state = self.segment_state.read();
let mut chunks =
segment_state.get_table_chunks(db_schema, table_name, filters, projection, ctx)?;
let parquet_files = segment_state.get_parquet_files(database_name, table_name);
let mut chunk_order = chunks.len() as i64;
let object_store_url = self.persister.object_store_url();
for parquet_file in parquet_files {
// TODO: update persisted segments to serialize their key to use here
let partition_key = data_types::PartitionKey::from(parquet_file.path.clone());
let partition_id = data_types::partition::TransitionPartitionId::new(
data_types::TableId::new(0),
&partition_key,
);
let chunk_stats = create_chunk_statistics(
Some(parquet_file.row_count as usize),
&table_schema,
Some(parquet_file.timestamp_min_max()),
None,
);
let location = ObjPath::from(parquet_file.path.clone());
let parquet_exec = ParquetExecInput {
object_store_url: object_store_url.clone(),
object_meta: ObjectMeta {
location,
last_modified: Default::default(),
size: parquet_file.size_bytes as usize,
e_tag: None,
version: None,
},
};
let parquet_chunk = ParquetChunk {
schema: table_schema.clone(),
stats: Arc::new(chunk_stats),
partition_id,
sort_key: None,
id: ChunkId::new(),
chunk_order: ChunkOrder::new(chunk_order),
parquet_exec,
};
chunk_order += 1;
chunks.push(Arc::new(parquet_chunk));
}
Ok(chunks)
}
fn clone_table_buffers(&self, database_name: &str, table_name: &str) -> Vec<TableBuffer> {
let state = self.segment_state.read();
state.clone_table_buffers(database_name, table_name)
}
#[cfg(test)]
fn get_table_record_batches(&self, datbase_name: &str, table_name: &str) -> Vec<RecordBatch> {
fn get_table_record_batches(
&self,
datbase_name: &str,
table_name: &str,
) -> Vec<arrow::record_batch::RecordBatch> {
let db_schema = self.catalog.db_schema(datbase_name).unwrap();
let table = db_schema.tables.get(table_name).unwrap();
let schema = table.schema.clone();
let table_buffer = self.clone_table_buffers(datbase_name, table_name);
table_buffer
let table_buffers = self
.segment_state
.read()
.clone_table_buffers(datbase_name, table_name);
table_buffers
.into_iter()
.map(|table_buffer| table_buffer.rows_to_record_batch(&schema, table.columns()))
.collect()
@ -260,7 +292,7 @@ impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
}
#[async_trait]
impl<W: Wal, T: TimeProvider> Bufferer for WriteBufferImpl<W, T> {
impl<W: Wal, T: TimeProvider, P: Persister> Bufferer for WriteBufferImpl<W, T, P> {
async fn write_lp(
&self,
database: NamespaceName<'static>,
@ -294,7 +326,7 @@ impl<W: Wal, T: TimeProvider> Bufferer for WriteBufferImpl<W, T> {
}
}
impl<W: Wal, T: TimeProvider> ChunkContainer for WriteBufferImpl<W, T> {
impl<W: Wal, T: TimeProvider, P: Persister> ChunkContainer for WriteBufferImpl<W, T, P> {
fn get_table_chunks(
&self,
database_name: &str,
@ -307,67 +339,7 @@ impl<W: Wal, T: TimeProvider> ChunkContainer for WriteBufferImpl<W, T> {
}
}
impl<W: Wal, T: TimeProvider> WriteBuffer for WriteBufferImpl<W, T> {}
#[derive(Debug)]
pub struct BufferChunk {
batches: Vec<RecordBatch>,
schema: Schema,
stats: Arc<Statistics>,
partition_id: data_types::partition::TransitionPartitionId,
sort_key: Option<SortKey>,
id: data_types::ChunkId,
chunk_order: data_types::ChunkOrder,
}
impl QueryChunk for BufferChunk {
fn stats(&self) -> Arc<Statistics> {
info!("BufferChunk stats {}", self.id);
Arc::clone(&self.stats)
}
fn schema(&self) -> &Schema {
info!("BufferChunk schema {}", self.id);
&self.schema
}
fn partition_id(&self) -> &data_types::partition::TransitionPartitionId {
info!("BufferChunk partition_id {}", self.id);
&self.partition_id
}
fn sort_key(&self) -> Option<&SortKey> {
info!("BufferChunk sort_key {}", self.id);
self.sort_key.as_ref()
}
fn id(&self) -> data_types::ChunkId {
info!("BufferChunk id {}", self.id);
self.id
}
fn may_contain_pk_duplicates(&self) -> bool {
false
}
fn data(&self) -> QueryChunkData {
info!("BufferChunk data {}", self.id);
QueryChunkData::in_mem(self.batches.clone(), Arc::clone(self.schema.inner()))
}
fn chunk_type(&self) -> &str {
"BufferChunk"
}
fn order(&self) -> data_types::ChunkOrder {
info!("BufferChunk order {}", self.id);
self.chunk_order
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl<W: Wal, T: TimeProvider, P: Persister> WriteBuffer for WriteBufferImpl<W, T, P> {}
pub(crate) fn parse_validate_and_update_catalog(
db_name: NamespaceName<'static>,
@ -750,7 +722,10 @@ mod tests {
use crate::persister::PersisterImpl;
use crate::wal::WalImpl;
use crate::{SequenceNumber, WalOpBatch};
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use datafusion_util::config::register_iox_object_store;
use iox_query::exec::IOxSessionContext;
use iox_time::{MockProvider, Time};
use object_store::memory::InMemory;
use object_store::ObjectStore;
@ -846,4 +821,143 @@ mod tests {
let actual = write_buffer.get_table_record_batches("foo", "cpu");
assert_batches_eq!(&expected, &actual);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn returns_chunks_across_buffered_persisted_and_persisting_data() {
let dir = test_helpers::tmp_dir().unwrap().into_path();
let wal = Some(Arc::new(WalImpl::new(dir.clone()).unwrap()));
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let segment_duration = SegmentDuration::new_5m();
let write_buffer = WriteBufferImpl::new(
Arc::clone(&persister),
wal.clone(),
Arc::clone(&time_provider),
segment_duration,
)
.await
.unwrap();
let session_context = IOxSessionContext::with_testing();
let runtime_env = session_context.inner().runtime_env();
register_iox_object_store(runtime_env, "influxdb3", Arc::clone(&object_store));
let _ = write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=1 10",
Time::from_timestamp_nanos(123),
false,
Precision::Nanosecond,
)
.await
.unwrap();
let expected = vec![
"+-----+--------------------------------+",
"| bar | time |",
"+-----+--------------------------------+",
"| 1.0 | 1970-01-01T00:00:00.000000010Z |",
"+-----+--------------------------------+",
];
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
// advance the time and wait for it to persist
time_provider.set(Time::from_timestamp(800, 0).unwrap());
loop {
let segment_state = write_buffer.segment_state.read();
if !segment_state.persisted_segments().is_empty() {
break;
}
}
// nothing should be open at this point
assert!(write_buffer
.segment_state
.read()
.open_segment_times()
.is_empty());
// verify we get the persisted data
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
// now write some into the next segment we're in and verify we get both buffer and persisted
let _ = write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=2",
Time::from_timestamp(900, 0).unwrap(),
false,
Precision::Nanosecond,
)
.await
.unwrap();
let expected = vec![
"+-----+--------------------------------+",
"| bar | time |",
"+-----+--------------------------------+",
"| 2.0 | 1970-01-01T00:15:00Z |",
"| 1.0 | 1970-01-01T00:00:00.000000010Z |",
"+-----+--------------------------------+",
];
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
// and now reload the buffer and verify that we get persisted and the buffer again
let write_buffer = WriteBufferImpl::new(
Arc::clone(&persister),
wal,
Arc::clone(&time_provider),
segment_duration,
)
.await
.unwrap();
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
// and now add to the buffer and verify that we still only get two chunks
let _ = write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=3",
Time::from_timestamp(950, 0).unwrap(),
false,
Precision::Nanosecond,
)
.await
.unwrap();
let expected = vec![
"+-----+--------------------------------+",
"| bar | time |",
"+-----+--------------------------------+",
"| 2.0 | 1970-01-01T00:15:00Z |",
"| 3.0 | 1970-01-01T00:15:50Z |",
"| 1.0 | 1970-01-01T00:00:00.000000010Z |",
"+-----+--------------------------------+",
];
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
}
async fn get_table_batches(
write_buffer: &WriteBufferImpl<WalImpl, MockProvider, PersisterImpl>,
database_name: &str,
table_name: &str,
ctx: &IOxSessionContext,
) -> Vec<RecordBatch> {
let chunks = write_buffer
.get_table_chunks(database_name, table_name, &[], None, &ctx.inner().state())
.unwrap();
let mut batches = vec![];
for chunk in chunks {
let chunk = chunk
.data()
.read_to_batches(chunk.schema(), ctx.inner())
.await;
batches.extend(chunk);
}
batches
}
}

View File

@ -1,14 +1,21 @@
//! State for the write buffer segments.
use crate::catalog::Catalog;
use crate::catalog::{Catalog, DatabaseSchema};
use crate::chunk::BufferChunk;
use crate::wal::WalSegmentWriterNoopImpl;
use crate::write_buffer::buffer_segment::{
ClosedBufferSegment, OpenBufferSegment, TableBuffer, WriteBatch,
};
use crate::{
persister, wal, write_buffer, PersistedSegment, Persister, SegmentDuration, SegmentId,
SegmentRange, Wal, WalOp,
persister, wal, write_buffer, ParquetFile, PersistedSegment, Persister, SegmentDuration,
SegmentId, SegmentRange, Wal, WalOp,
};
use data_types::{ChunkId, ChunkOrder, TableId, TransitionPartitionId};
use datafusion::common::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use iox_query::chunk_statistics::create_chunk_statistics;
use iox_query::QueryChunk;
use iox_time::{Time, TimeProvider};
use observability_deps::tracing::error;
use parking_lot::RwLock;
@ -36,6 +43,7 @@ pub(crate) struct SegmentState<T, W> {
}
impl<T: TimeProvider, W: Wal> SegmentState<T, W> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
segment_duration: SegmentDuration,
last_segment_id: SegmentId,
@ -43,6 +51,7 @@ impl<T: TimeProvider, W: Wal> SegmentState<T, W> {
time_provider: Arc<T>,
open_segments: Vec<OpenBufferSegment>,
persisting_segments: Vec<ClosedBufferSegment>,
persisted_segments: Vec<PersistedSegment>,
wal: Option<Arc<W>>,
) -> Self {
let mut segments = BTreeMap::new();
@ -55,6 +64,14 @@ impl<T: TimeProvider, W: Wal> SegmentState<T, W> {
persisting_segments_map.insert(segment.segment_range.start_time, Arc::new(segment));
}
let mut persisted_segments_map = BTreeMap::new();
for segment in persisted_segments {
persisted_segments_map.insert(
Time::from_timestamp_nanos(segment.segment_min_time),
Arc::new(segment),
);
}
Self {
segment_duration,
last_segment_id,
@ -63,7 +80,7 @@ impl<T: TimeProvider, W: Wal> SegmentState<T, W> {
wal,
segments,
persisting_segments: persisting_segments_map,
persisted_segments: BTreeMap::new(),
persisted_segments: persisted_segments_map,
}
}
@ -85,6 +102,81 @@ impl<T: TimeProvider, W: Wal> SegmentState<T, W> {
segment.buffer_writes(write_batch)
}
pub(crate) fn get_table_chunks(
&self,
db_schema: Arc<DatabaseSchema>,
table_name: &str,
_filters: &[Expr],
_projection: Option<&Vec<usize>>,
_ctx: &SessionState,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
let table = db_schema
.tables
.get(table_name)
.ok_or_else(|| DataFusionError::Execution(format!("table {} not found", table_name)))?;
let schema = table.schema.clone();
let mut table_buffers = self.clone_table_buffers(&db_schema.name, table_name);
table_buffers.extend(
self.persisting_segments
.values()
.filter_map(|segment| segment.table_buffer(&db_schema.name, table_name))
.collect::<Vec<_>>(),
);
let mut chunk_order = 0;
let chunks = table_buffers
.into_iter()
.map(|table_buffer| {
let batch = table_buffer.rows_to_record_batch(&schema, table.columns());
let batch_stats = create_chunk_statistics(
Some(table_buffer.row_count()),
&schema,
Some(table_buffer.timestamp_min_max()),
None,
);
let chunk: Arc<dyn QueryChunk> = Arc::new(BufferChunk {
batches: vec![batch],
schema: schema.clone(),
stats: Arc::new(batch_stats),
partition_id: TransitionPartitionId::new(
TableId::new(0),
&table_buffer.segment_key,
),
sort_key: None,
id: ChunkId::new(),
chunk_order: ChunkOrder::new(chunk_order),
});
chunk_order += 1;
chunk
})
.collect();
Ok(chunks)
}
pub(crate) fn get_parquet_files(
&self,
database_name: &str,
table_name: &str,
) -> Vec<ParquetFile> {
let mut parquet_files = vec![];
for segment in self.persisted_segments.values() {
segment.databases.get(database_name).map(|db| {
db.tables.get(table_name).map(|table| {
parquet_files.extend(table.parquet_files.clone());
})
});
}
parquet_files
}
pub(crate) fn clone_table_buffers(
&self,
database_name: &str,
@ -96,6 +188,16 @@ impl<T: TimeProvider, W: Wal> SegmentState<T, W> {
.collect::<Vec<_>>()
}
#[cfg(test)]
pub(crate) fn persisted_segments(&self) -> Vec<Arc<PersistedSegment>> {
self.persisted_segments.values().cloned().collect()
}
#[cfg(test)]
pub(crate) fn open_segment_times(&self) -> Vec<Time> {
self.segments.keys().cloned().collect()
}
#[allow(dead_code)]
pub(crate) fn segment_for_time(&self, time: Time) -> Option<&OpenBufferSegment> {
self.segments.get(&time)
@ -364,6 +466,7 @@ mod tests {
Arc::clone(&time_provider),
vec![open_segment1, open_segment2, open_segment3],
vec![],
vec![],
None,
);
@ -443,6 +546,7 @@ mod tests {
Arc::clone(&time_provider),
vec![open_segment2, open_segment3],
vec![open_segment1.into_closed_segment(Arc::clone(&catalog))],
vec![],
Some(Arc::clone(&wal)),
);
let segment_state = Arc::new(RwLock::new(segment_state));