fix: query bugs with buffer (#25213)

* fix: query bugs with buffer

This fixes three different bugs with the buffer. First was that aggregations would fail because projection was pushed down to the in-buffer data that de-duplication needs to be called on. The test in influxdb3/tests/server/query.rs catches that.

I also added a test in write_buffer/mod.rs to ensure that data is correctly queryable when combining with different states: only data in buffer, only data in parquet files, and data across both. This showed two bugs, one where the parquet data was being doubled up (parquet chunks were being created in write buffer mod and in queryable buffer. The second was that the timestamp min max on table buffer would panic if the buffer was empty.

* refactor: PR feedback

* fix: fix wal replay and buffer snapshot

Fixes two problems uncovered by adding to the write_buffer/mod.rs test. Ensures we can replay wal data and that snapshots work properly with replayed data.

* fix: run cargo update to fix audit
pull/25224/head
Paul Dix 2024-08-07 16:00:17 -04:00 committed by GitHub
parent 29d3a28a9c
commit 43877beb15
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 331 additions and 108 deletions

14
Cargo.lock generated
View File

@ -765,9 +765,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.1.7"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc"
checksum = "504bdec147f2cc13c8b57ed9401fd8a147cc66b67ad5cb241394244f2c947549"
dependencies = [
"jobserver",
"libc",
@ -3446,9 +3446,9 @@ dependencies = [
[[package]]
name = "object"
version = "0.36.2"
version = "0.36.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e"
checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9"
dependencies = [
"memchr",
]
@ -5286,15 +5286,15 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.11.0"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fcd239983515c23a32fb82099f97d0b11b8c72f654ed659363a95c3dad7a53"
checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64"
dependencies = [
"cfg-if",
"fastrand",
"once_cell",
"rustix",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]

View File

@ -20,35 +20,39 @@ async fn api_v3_query_sql() {
.await
.unwrap();
let client = reqwest::Client::new();
let test_cases = [
TestCase {
database: Some("foo"),
query: "SELECT host, region, time, usage FROM cpu",
expected: "+------+---------+-------------------------------+-------+\n\
| host | region | time | usage |\n\
+------+---------+-------------------------------+-------+\n\
| s1 | us-east | 1970-01-01T00:00:00.000000001 | 0.9 |\n\
| s1 | us-east | 1970-01-01T00:00:00.000000002 | 0.89 |\n\
| s1 | us-east | 1970-01-01T00:00:00.000000003 | 0.85 |\n\
+------+---------+-------------------------------+-------+",
},
TestCase {
database: Some("foo"),
query: "SELECT count(usage) FROM cpu",
expected: "+------------------+\n\
| count(cpu.usage) |\n\
+------------------+\n\
| 3 |\n\
+------------------+",
},
];
let resp = client
.get(format!(
"{base}/api/v3/query_sql",
base = server.client_addr()
))
.query(&[
("db", "foo"),
("q", "SELECT host, region, time, usage FROM cpu"),
("format", "pretty"),
])
.send()
.await
.unwrap()
.text()
.await
.unwrap();
assert_eq!(
"+------+---------+-------------------------------+-------+\n\
| host | region | time | usage |\n\
+------+---------+-------------------------------+-------+\n\
| s1 | us-east | 1970-01-01T00:00:00.000000001 | 0.9 |\n\
| s1 | us-east | 1970-01-01T00:00:00.000000002 | 0.89 |\n\
| s1 | us-east | 1970-01-01T00:00:00.000000003 | 0.85 |\n\
+------+---------+-------------------------------+-------+",
resp,
);
for t in test_cases {
let mut params = vec![("q", t.query), ("format", "pretty")];
if let Some(db) = t.database {
params.push(("db", db))
}
let resp = server.api_v3_query_sql(&params).await.text().await.unwrap();
println!("\n{q}", q = t.query);
println!("{resp}");
assert_eq!(t.expected, resp, "query failed: {q}", q = t.query);
}
}
#[tokio::test]
@ -203,12 +207,6 @@ async fn api_v3_query_influxql() {
.await
.unwrap();
struct TestCase<'a> {
database: Option<&'a str>,
query: &'a str,
expected: &'a str,
}
let test_cases = [
TestCase {
database: Some("foo"),
@ -390,6 +388,13 @@ async fn api_v3_query_influxql() {
}
}
#[cfg(test)]
struct TestCase<'a> {
database: Option<&'a str>,
query: &'a str,
expected: &'a str,
}
#[tokio::test]
async fn api_v3_query_influxql_params() {
let server = TestServer::spawn().await;

View File

@ -388,7 +388,9 @@ impl WalContents {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
)]
pub struct WalFileSequenceNumber(u64);
impl WalFileSequenceNumber {
@ -405,12 +407,6 @@ impl WalFileSequenceNumber {
}
}
impl Default for WalFileSequenceNumber {
fn default() -> Self {
Self(1)
}
}
/// Details about a snapshot of the WAL
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub struct SnapshotDetails {

View File

@ -31,9 +31,15 @@ impl WalObjectStore {
object_store: Arc<dyn ObjectStore>,
file_notifier: Arc<dyn WalFileNotifier>,
config: WalConfig,
last_snapshot_wal_sequence: Option<WalFileSequenceNumber>,
) -> Result<Arc<Self>, crate::Error> {
let flush_interval = config.flush_interval;
let wal = Self::new_without_replay(object_store, file_notifier, config);
let wal = Self::new_without_replay(
object_store,
file_notifier,
config,
last_snapshot_wal_sequence,
);
wal.replay().await?;
let wal = Arc::new(wal);
@ -46,14 +52,16 @@ impl WalObjectStore {
object_store: Arc<dyn ObjectStore>,
file_notifier: Arc<dyn WalFileNotifier>,
config: WalConfig,
last_snapshot_wal_sequence: Option<WalFileSequenceNumber>,
) -> Self {
let wal_file_sequence_number = last_snapshot_wal_sequence.unwrap_or_default().next();
Self {
object_store,
file_notifier,
flush_buffer: Mutex::new(FlushBuffer::new(
WalBuffer {
is_shutdown: false,
wal_file_sequence_number: Default::default(),
wal_file_sequence_number,
op_limit: config.max_write_buffer_size,
op_count: 0,
database_to_write_batch: Default::default(),
@ -79,8 +87,7 @@ impl WalObjectStore {
self.flush_buffer
.lock()
.await
.snapshot_tracker
.add_wal_period(WalPeriod::new(
.replay_wal_period(WalPeriod::new(
wal_contents.wal_file_number,
Timestamp::new(wal_contents.min_timestamp_ns),
Timestamp::new(wal_contents.max_timestamp_ns),
@ -368,6 +375,11 @@ impl FlushBuffer {
}
}
fn replay_wal_period(&mut self, wal_period: WalPeriod) {
self.wal_buffer.wal_file_sequence_number = wal_period.wal_file_number.next();
self.snapshot_tracker.add_wal_period(wal_period);
}
/// Converts the wal_buffer into contents and resets it. Returns the channels waiting for
/// responses. If a snapshot should occur with this flush, a semaphore permit is also returned.
async fn flush_buffer_into_contents_and_responses(
@ -564,6 +576,7 @@ mod tests {
Arc::clone(&object_store),
Arc::clone(&notifier),
wal_config,
None,
);
let db_name: Arc<str> = "db1".into();
@ -768,6 +781,7 @@ mod tests {
flush_interval: Duration::from_millis(10),
snapshot_size: 2,
},
None,
);
assert_eq!(
replay_wal.load_existing_wal_file_paths().await.unwrap(),
@ -905,6 +919,7 @@ mod tests {
object_store,
Arc::clone(&replay_notifier),
wal_config,
None,
);
assert_eq!(
replay_wal.load_existing_wal_file_paths().await.unwrap(),
@ -935,6 +950,7 @@ mod tests {
Arc::clone(&object_store),
Arc::clone(&notifier),
wal_config,
None,
);
assert!(wal.flush_buffer().await.is_none());

View File

@ -125,6 +125,9 @@ impl<T: TimeProvider> WriteBufferImpl<T> {
.unwrap_or_else(Catalog::new),
);
let persisted_snapshots = persister.load_snapshots(1000).await?;
let last_snapshot_wal_sequence = persisted_snapshots
.first()
.map(|s| s.wal_file_sequence_number);
let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots(
persisted_snapshots,
));
@ -142,6 +145,7 @@ impl<T: TimeProvider> WriteBufferImpl<T> {
persister.object_store(),
Arc::clone(&queryable_buffer) as Arc<dyn WalFileNotifier>,
wal_config,
last_snapshot_wal_sequence,
)
.await?;
@ -684,7 +688,16 @@ mod tests {
#[tokio::test]
async fn persists_catalog_on_last_cache_create_and_delete() {
let (wbuf, _ctx) = setup(Time::from_timestamp_nanos(0)).await;
let (wbuf, _ctx) = setup(
Time::from_timestamp_nanos(0),
WalConfig {
level_0_duration: Duration::from_secs(300),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
)
.await;
let db_name = "db";
let tbl_name = "table";
let cache_name = "cache";
@ -793,6 +806,197 @@ mod tests {
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn returns_chunks_across_parquet_and_buffered_data() {
let (write_buffer, session_context) = setup(
Time::from_timestamp_nanos(0),
WalConfig {
level_0_duration: Duration::from_secs(60),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 2,
},
)
.await;
let _ = write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=1",
Time::from_timestamp(10, 0).unwrap(),
false,
Precision::Nanosecond,
)
.await
.unwrap();
let expected = [
"+-----+----------------------+",
"| bar | time |",
"+-----+----------------------+",
"| 1.0 | 1970-01-01T00:00:10Z |",
"+-----+----------------------+",
];
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
let _ = write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=2",
Time::from_timestamp(65, 0).unwrap(),
false,
Precision::Nanosecond,
)
.await
.unwrap();
let expected = [
"+-----+----------------------+",
"| bar | time |",
"+-----+----------------------+",
"| 1.0 | 1970-01-01T00:00:10Z |",
"| 2.0 | 1970-01-01T00:01:05Z |",
"+-----+----------------------+",
];
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
// trigger snapshot with a third write, creating parquet files
let _ = write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=3 147000000000",
Time::from_timestamp(147, 0).unwrap(),
false,
Precision::Nanosecond,
)
.await
.unwrap();
// give the snapshot some time to persist in the background
let mut ticks = 0;
loop {
ticks += 1;
let persisted = write_buffer.persister.load_snapshots(1000).await.unwrap();
if !persisted.is_empty() {
assert_eq!(persisted.len(), 1);
assert_eq!(persisted[0].min_time, 10000000000);
assert_eq!(persisted[0].row_count, 2);
break;
} else if ticks > 10 {
panic!("not persisting");
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let expected = [
"+-----+----------------------+",
"| bar | time |",
"+-----+----------------------+",
"| 3.0 | 1970-01-01T00:02:27Z |",
"| 1.0 | 1970-01-01T00:00:10Z |",
"| 2.0 | 1970-01-01T00:01:05Z |",
"+-----+----------------------+",
];
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
// now validate that buffered data and parquet data are all returned
let _ = write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=4",
Time::from_timestamp(250, 0).unwrap(),
false,
Precision::Nanosecond,
)
.await
.unwrap();
let expected = [
"+-----+----------------------+",
"| bar | time |",
"+-----+----------------------+",
"| 3.0 | 1970-01-01T00:02:27Z |",
"| 4.0 | 1970-01-01T00:04:10Z |",
"| 1.0 | 1970-01-01T00:00:10Z |",
"| 2.0 | 1970-01-01T00:01:05Z |",
"+-----+----------------------+",
];
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
let actual = get_table_batches(&write_buffer, "foo", "cpu", &session_context).await;
assert_batches_eq!(&expected, &actual);
// and now replay in a new write buffer and attempt to write
let write_buffer = WriteBufferImpl::new(
Arc::clone(&write_buffer.persister),
Arc::clone(&write_buffer.time_provider),
write_buffer.level_0_duration,
Arc::clone(&write_buffer.buffer.executor),
WalConfig {
level_0_duration: Duration::from_secs(60),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 2,
},
)
.await
.unwrap();
let ctx = IOxSessionContext::with_testing();
let runtime_env = ctx.inner().runtime_env();
register_iox_object_store(
runtime_env,
"influxdb3",
write_buffer.persister.object_store(),
);
// verify the data is still there
let actual = get_table_batches(&write_buffer, "foo", "cpu", &ctx).await;
assert_batches_eq!(&expected, &actual);
// now write some new data
let _ = write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=5",
Time::from_timestamp(300, 0).unwrap(),
false,
Precision::Nanosecond,
)
.await
.unwrap();
// and write more to force another snapshot
let _ = write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu bar=6",
Time::from_timestamp(330, 0).unwrap(),
false,
Precision::Nanosecond,
)
.await
.unwrap();
let expected = [
"+-----+----------------------+",
"| bar | time |",
"+-----+----------------------+",
"| 5.0 | 1970-01-01T00:05:00Z |",
"| 6.0 | 1970-01-01T00:05:30Z |",
"| 1.0 | 1970-01-01T00:00:10Z |",
"| 2.0 | 1970-01-01T00:01:05Z |",
"| 3.0 | 1970-01-01T00:02:27Z |",
"| 4.0 | 1970-01-01T00:04:10Z |",
"+-----+----------------------+",
];
let actual = get_table_batches(&write_buffer, "foo", "cpu", &ctx).await;
assert_batches_eq!(&expected, &actual);
}
async fn fetch_catalog_as_json(object_store: Arc<dyn ObjectStore>) -> serde_json::Value {
let mut list = object_store.list(Some(&CatalogFilePath::dir()));
let Some(item) = list.next().await else {
@ -809,7 +1013,10 @@ mod tests {
.expect("parse bytes as JSON")
}
async fn setup(start: Time) -> (WriteBufferImpl<MockProvider>, IOxSessionContext) {
async fn setup(
start: Time,
wal_config: WalConfig,
) -> (WriteBufferImpl<MockProvider>, IOxSessionContext) {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
let time_provider = Arc::new(MockProvider::new(start));
@ -818,12 +1025,7 @@ mod tests {
Arc::clone(&time_provider),
Level0Duration::new_5m(),
crate::test_help::make_exec(),
WalConfig {
level_0_duration: Duration::from_secs(300),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
wal_config,
)
.await
.unwrap();

View File

@ -2,11 +2,9 @@ use crate::chunk::BufferChunk;
use crate::last_cache::LastCacheProvider;
use crate::paths::ParquetFilePath;
use crate::persister::PersisterImpl;
use crate::write_buffer::parquet_chunk_from_file;
use crate::write_buffer::persisted_files::PersistedFiles;
use crate::write_buffer::table_buffer::TableBuffer;
use crate::{persister, write_buffer, ParquetFile, PersistedSnapshot, Persister};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use data_types::{
@ -36,7 +34,7 @@ use tokio::sync::oneshot::Receiver;
#[derive(Debug)]
pub(crate) struct QueryableBuffer {
executor: Arc<Executor>,
pub(crate) executor: Arc<Executor>,
catalog: Arc<Catalog>,
last_cache_provider: Arc<LastCacheProvider>,
persister: Arc<PersisterImpl>,
@ -68,7 +66,7 @@ impl QueryableBuffer {
db_schema: Arc<DatabaseSchema>,
table_name: &str,
filters: &[Expr],
projection: Option<&Vec<usize>>,
_projection: Option<&Vec<usize>>,
_ctx: &SessionState,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
let table = db_schema
@ -76,31 +74,11 @@ impl QueryableBuffer {
.get(table_name)
.ok_or_else(|| DataFusionError::Execution(format!("table {} not found", table_name)))?;
let arrow_schema: SchemaRef = match projection {
Some(projection) => Arc::new(table.schema.as_arrow().project(projection).unwrap()),
None => table.schema.as_arrow(),
};
let schema = schema::Schema::try_from(Arc::clone(&arrow_schema))
.map_err(|e| DataFusionError::Execution(format!("schema error {}", e)))?;
let schema = table.schema.clone();
let arrow_schema = schema.as_arrow();
let mut chunks: Vec<Arc<dyn QueryChunk>> = vec![];
for parquet_file in self.persisted_files.get_files(&db_schema.name, table_name) {
let parquet_chunk = parquet_chunk_from_file(
&parquet_file,
&schema,
self.persister.object_store_url(),
self.persister.object_store(),
chunks
.len()
.try_into()
.expect("should never have this many chunks"),
);
chunks.push(Arc::new(parquet_chunk));
}
let buffer = self.buffer.read();
let table_buffer = buffer
@ -172,19 +150,13 @@ impl QueryableBuffer {
write: WalContents,
snapshot_details: SnapshotDetails,
) -> Receiver<SnapshotDetails> {
info!(
?snapshot_details,
"Buffering contents and persisting snapshotted data"
);
let persist_jobs = {
let mut buffer = self.buffer.write();
for op in write.ops {
match op {
WalOp::Write(write_batch) => buffer.add_write_batch(write_batch),
WalOp::Catalog(catalog_batch) => buffer
.catalog
.apply_catalog_batch(&catalog_batch)
.expect("catalog batch should apply"),
}
}
let mut persisting_chunks = vec![];
for (database_name, table_map) in buffer.db_to_table.iter_mut() {
for (table_name, table_buffer) in table_map.iter_mut() {
@ -212,6 +184,18 @@ impl QueryableBuffer {
}
}
// we must buffer the ops after the snapshotting as this data should not be persisted
// with this set of wal files
for op in write.ops {
match op {
WalOp::Write(write_batch) => buffer.add_write_batch(write_batch),
WalOp::Catalog(catalog_batch) => buffer
.catalog
.apply_catalog_batch(&catalog_batch)
.expect("catalog batch should apply"),
}
}
persisting_chunks
};
@ -241,6 +225,10 @@ impl QueryableBuffer {
}
}
info!(
"catalog persisted, persisting {} chunks",
persist_jobs.len()
);
// persist the individual files, building the snapshot as we go
let mut persisted_snapshot = PersistedSnapshot::new(wal_file_number);
for persist_job in persist_jobs {
@ -392,6 +380,14 @@ where
// Dedupe and sort using the COMPACT query built into
// iox_query
let row_count = persist_job.batch.num_rows();
info!(
"Persisting {} rows for db {} and table {} and chunk {} to file {}",
row_count,
persist_job.database_name,
persist_job.table_name,
persist_job.chunk_time,
persist_job.path.to_string()
);
let chunk_stats = create_chunk_statistics(
Some(row_count),

View File

@ -11,7 +11,7 @@ use data_types::TimestampMinMax;
use datafusion::logical_expr::{BinaryExpr, Expr};
use hashbrown::HashMap;
use influxdb3_wal::{FieldData, Row};
use observability_deps::tracing::{debug, error};
use observability_deps::tracing::{debug, error, info};
use schema::sort::SortKey;
use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder};
use std::collections::{BTreeMap, HashSet};
@ -63,10 +63,6 @@ impl TableBuffer {
}
pub fn record_batches(&self, schema: SchemaRef, filter: &[Expr]) -> Result<Vec<RecordBatch>> {
println!(
"chunk time to chunks: {:?}",
self.chunk_time_to_chunks.keys().collect::<Vec<_>>()
);
let mut batches =
Vec::with_capacity(self.snapshotting_chunks.len() + self.chunk_time_to_chunks.len());
@ -96,13 +92,16 @@ impl TableBuffer {
}
pub fn timestamp_min_max(&self) -> TimestampMinMax {
let (min, max) = self
.chunk_time_to_chunks
.values()
.map(|c| (c.timestamp_min, c.timestamp_max))
.fold((i64::MAX, i64::MIN), |(a_min, b_min), (a_max, b_max)| {
(a_min.min(b_min), a_max.max(b_max))
});
let (min, max) = if self.chunk_time_to_chunks.is_empty() {
(0, 0)
} else {
self.chunk_time_to_chunks
.values()
.map(|c| (c.timestamp_min, c.timestamp_max))
.fold((i64::MAX, i64::MIN), |(a_min, b_min), (a_max, b_max)| {
(a_min.min(b_min), a_max.max(b_max))
})
};
let mut timestamp_min_max = TimestampMinMax::new(min, max);
for sc in &self.snapshotting_chunks {
@ -129,6 +128,7 @@ impl TableBuffer {
}
pub fn snapshot(&mut self, older_than_chunk_time: i64) -> Vec<SnapshotChunk> {
info!(%older_than_chunk_time, "Snapshotting table buffer");
let keys_to_remove = self
.chunk_time_to_chunks
.keys()
@ -847,4 +847,12 @@ mod tests {
let size = table_buffer.computed_size();
assert_eq!(size, 18094);
}
#[test]
fn timestamp_min_max_works_when_empty() {
let table_buffer = TableBuffer::new(&["tag"], SortKey::empty());
let timestamp_min_max = table_buffer.timestamp_min_max();
assert_eq!(timestamp_min_max.min, 0);
assert_eq!(timestamp_min_max.max, 0);
}
}