362 lines
13 KiB
Rust
362 lines
13 KiB
Rust
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc, time::Duration};
|
|
|
|
use arrow_array::RecordBatch;
|
|
use data_types::NamespaceName;
|
|
use datafusion::assert_batches_sorted_eq;
|
|
use futures::TryStreamExt;
|
|
use influxdb3_cache::{distinct_cache::DistinctCacheProvider, last_cache::LastCacheProvider};
|
|
use influxdb3_catalog::catalog::Catalog;
|
|
use influxdb3_internal_api::query_executor::QueryExecutor;
|
|
use influxdb3_server::query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl};
|
|
use influxdb3_shutdown::ShutdownManager;
|
|
use influxdb3_sys_events::SysEventStore;
|
|
use influxdb3_telemetry::store::TelemetryStore;
|
|
use influxdb3_wal::{Gen1Duration, WalConfig};
|
|
use influxdb3_write::{
|
|
Precision, WriteBuffer,
|
|
persister::Persister,
|
|
write_buffer::{WriteBufferImpl, WriteBufferImplArgs},
|
|
};
|
|
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig, PerQueryMemoryPoolConfig};
|
|
use iox_time::{MockProvider, Time, TimeProvider};
|
|
use metric::Registry;
|
|
use object_store::{ObjectStore, memory::InMemory};
|
|
use parquet_file::storage::{ParquetStorage, StorageId};
|
|
|
|
/// Test that when duplicates are written into the write buffer, the query results do not contain
|
|
/// duplicate rows
|
|
#[test_log::test(tokio::test)]
|
|
async fn test_deduplicate_rows_in_write_buffer_memory() {
|
|
let wal_config = WalConfig {
|
|
gen1_duration: Gen1Duration::new_1m(),
|
|
max_write_buffer_size: 100,
|
|
flush_interval: Duration::from_millis(10),
|
|
// use a large snapshot size so that queries are served only by in-memory buffer chunks:
|
|
snapshot_size: 100,
|
|
};
|
|
let service = TestService::setup(wal_config).await;
|
|
|
|
// do the same writes every time:
|
|
let writes = [
|
|
TestWrite::lp("bar val=1", 1),
|
|
TestWrite::lp("bar val=2", 2),
|
|
TestWrite::lp("bar val=3", 3),
|
|
];
|
|
|
|
// query results should always be the same:
|
|
let expected = [
|
|
"+---------------------+-----+",
|
|
"| time | val |",
|
|
"+---------------------+-----+",
|
|
"| 1970-01-01T00:00:01 | 1.0 |",
|
|
"| 1970-01-01T00:00:02 | 2.0 |",
|
|
"| 1970-01-01T00:00:03 | 3.0 |",
|
|
"+---------------------+-----+",
|
|
];
|
|
|
|
// do the writes several times:
|
|
for _ in 0..5 {
|
|
service.do_writes("foo", writes.clone()).await;
|
|
let batches = service.query_sql("foo", "select * from bar").await;
|
|
assert_batches_sorted_eq!(expected, &batches);
|
|
}
|
|
|
|
// check the explain plan:
|
|
let batches = service.query_sql("foo", "explain select * from bar").await;
|
|
let plan = arrow::util::pretty::pretty_format_batches(&batches)
|
|
.unwrap()
|
|
.to_string();
|
|
// checking that the DeduplicateExec is above the RecordBatchesExec in the query plan:
|
|
insta::assert_snapshot!(plan);
|
|
}
|
|
|
|
/// Test that when duplicates are written into the write buffer, across snapshot points, i.e., so
|
|
/// that there are duplicate rows contained across multiple parquet files, the query results do not
|
|
/// contain duplicate rows
|
|
#[test_log::test(tokio::test)]
|
|
async fn test_deduplicate_rows_in_write_buffer_parquet() {
|
|
let wal_config = WalConfig {
|
|
gen1_duration: Gen1Duration::new_1m(),
|
|
max_write_buffer_size: 100,
|
|
flush_interval: Duration::from_millis(10),
|
|
// uses a small snapshot size to get parquet persisted early, and therefore have queries
|
|
// serve chunks from both in-memory buffer and persisted parquet
|
|
snapshot_size: 1,
|
|
};
|
|
let service = TestService::setup(wal_config).await;
|
|
|
|
// do the same 3 writes every time:
|
|
let writes = [
|
|
TestWrite::lp("bar val=1", 1),
|
|
TestWrite::lp("bar val=2", 2),
|
|
TestWrite::lp("bar val=3", 3),
|
|
];
|
|
|
|
// query result should always be the same:
|
|
let expected = [
|
|
"+---------------------+-----+",
|
|
"| time | val |",
|
|
"+---------------------+-----+",
|
|
"| 1970-01-01T00:00:01 | 1.0 |",
|
|
"| 1970-01-01T00:00:02 | 2.0 |",
|
|
"| 1970-01-01T00:00:03 | 3.0 |",
|
|
"+---------------------+-----+",
|
|
];
|
|
|
|
// do the writes and query several times:
|
|
for i in 1..5 {
|
|
// write then wait for a snapshot to ensure parquet
|
|
// is persisted:
|
|
service.do_writes("foo", writes.clone()).await;
|
|
service.wait_for_snapshot_sequence(i).await;
|
|
|
|
let batches = service.query_sql("foo", "select * from bar").await;
|
|
assert_batches_sorted_eq!(expected, &batches);
|
|
}
|
|
|
|
// check the query plan to ensure that deduplication is taking place:
|
|
let batches = service.query_sql("foo", "explain select * from bar").await;
|
|
let plan = arrow::util::pretty::pretty_format_batches(&batches)
|
|
.unwrap()
|
|
.to_string();
|
|
// looking to see that the `DeduplicateExec` is above the `ParquetExec` to ensure data from
|
|
// multiple parquet files is deduplicated:
|
|
insta::assert_snapshot!(plan);
|
|
}
|
|
|
|
#[test_log::test(tokio::test)]
|
|
async fn test_deduplicate_rows_in_write_buffer_both_memory_and_parquet() {
|
|
let wal_config = WalConfig {
|
|
gen1_duration: Gen1Duration::new_1m(),
|
|
max_write_buffer_size: 100,
|
|
flush_interval: Duration::from_millis(10),
|
|
// uses a small snapshot size to get parquet persisted early, and therefore have queries
|
|
// serve chunks from both in-memory buffer and persisted parquet
|
|
snapshot_size: 1,
|
|
};
|
|
let service = TestService::setup(wal_config).await;
|
|
|
|
service
|
|
.do_writes(
|
|
"foo",
|
|
[
|
|
TestWrite::lp("bar value=false", 1),
|
|
TestWrite::lp("bar value=false", 2),
|
|
TestWrite::lp("bar value=false", 3),
|
|
],
|
|
)
|
|
.await;
|
|
|
|
service.wait_for_snapshot_sequence(1).await;
|
|
|
|
let batches = service.query_sql("foo", "select * from bar").await;
|
|
assert_batches_sorted_eq!(
|
|
[
|
|
"+---------------------+-------+",
|
|
"| time | value |",
|
|
"+---------------------+-------+",
|
|
"| 1970-01-01T00:00:01 | false |", // note that this is `false`
|
|
"| 1970-01-01T00:00:02 | false |",
|
|
"| 1970-01-01T00:00:03 | false |",
|
|
"+---------------------+-------+",
|
|
],
|
|
&batches
|
|
);
|
|
|
|
// write a duplicate row, but don't trigger a snapshot, so subsequent query will be served
|
|
// by chunks in both memory buffer and persisted parquet; this also flips the `value` to `true`
|
|
// to show that the most recent written line appears in the query result:
|
|
service
|
|
.do_writes("foo", [TestWrite::lp("bar value=true", 1)])
|
|
.await;
|
|
|
|
let batches = service.query_sql("foo", "select * from bar").await;
|
|
assert_batches_sorted_eq!(
|
|
[
|
|
"+---------------------+-------+",
|
|
"| time | value |",
|
|
"+---------------------+-------+",
|
|
"| 1970-01-01T00:00:01 | true |", // note that this is now `true`
|
|
"| 1970-01-01T00:00:02 | false |",
|
|
"| 1970-01-01T00:00:03 | false |",
|
|
"+---------------------+-------+",
|
|
],
|
|
&batches
|
|
);
|
|
|
|
let batches = service.query_sql("foo", "explain select * from bar").await;
|
|
// There should be a union of a ParquetExec and RecordBatchesExec that feeds into a
|
|
// DeduplicateExec in the query plan, i.e., there is both in-memory buffer chunks and persisted
|
|
// parquet chunks:
|
|
let plan = arrow::util::pretty::pretty_format_batches(&batches)
|
|
.unwrap()
|
|
.to_string();
|
|
insta::assert_snapshot!(plan);
|
|
}
|
|
|
|
struct TestService {
|
|
query_executor: Arc<dyn QueryExecutor>,
|
|
write_buffer: Arc<dyn WriteBuffer>,
|
|
_time_provider: Arc<dyn TimeProvider>,
|
|
_metrics: Arc<Registry>,
|
|
_object_store: Arc<dyn ObjectStore>,
|
|
}
|
|
|
|
impl TestService {
|
|
async fn setup(wal_config: WalConfig) -> Self {
|
|
let node_id = "test-node";
|
|
let object_store = Arc::new(InMemory::new());
|
|
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
|
let metrics = Arc::new(Registry::new());
|
|
let catalog = Arc::new(
|
|
Catalog::new(
|
|
node_id,
|
|
Arc::clone(&object_store) as _,
|
|
Arc::clone(&time_provider) as _,
|
|
Arc::clone(&metrics),
|
|
)
|
|
.await
|
|
.unwrap(),
|
|
);
|
|
let parquet_store =
|
|
ParquetStorage::new(Arc::clone(&object_store) as _, StorageId::from("influxdb3"));
|
|
let exec = Arc::new(Executor::new_with_config_and_executor(
|
|
ExecutorConfig {
|
|
target_query_partitions: NonZeroUsize::new(1).unwrap(),
|
|
object_stores: [&parquet_store]
|
|
.into_iter()
|
|
.map(|store| (store.id(), Arc::clone(store.object_store())))
|
|
.collect(),
|
|
metric_registry: Arc::clone(&metrics),
|
|
mem_pool_size: usize::MAX,
|
|
per_query_mem_pool_config: PerQueryMemoryPoolConfig::Disabled,
|
|
heap_memory_limit: None,
|
|
},
|
|
DedicatedExecutor::new_testing(),
|
|
));
|
|
let persister = Arc::new(Persister::new(
|
|
Arc::clone(&object_store) as _,
|
|
node_id,
|
|
Arc::clone(&time_provider) as _,
|
|
));
|
|
let write_buffer: Arc<dyn WriteBuffer> = WriteBufferImpl::new(WriteBufferImplArgs {
|
|
persister: Arc::clone(&persister),
|
|
catalog: Arc::clone(&catalog),
|
|
last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog))
|
|
.await
|
|
.unwrap(),
|
|
distinct_cache: DistinctCacheProvider::new_from_catalog(
|
|
Arc::clone(&time_provider) as _,
|
|
Arc::clone(&catalog),
|
|
)
|
|
.await
|
|
.unwrap(),
|
|
time_provider: Arc::clone(&time_provider) as _,
|
|
executor: Arc::clone(&exec),
|
|
wal_config,
|
|
parquet_cache: None,
|
|
metric_registry: Arc::clone(&metrics),
|
|
snapshotted_wal_files_to_keep: 100,
|
|
query_file_limit: None,
|
|
shutdown: ShutdownManager::new_testing().register(),
|
|
wal_replay_concurrency_limit: Some(1),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
let sys_events_store = Arc::new(SysEventStore::new(Arc::clone(&time_provider) as _));
|
|
let telemetry_store =
|
|
TelemetryStore::new_without_background_runners(None, Arc::clone(&catalog) as _);
|
|
let mut datafusion_config = HashMap::new();
|
|
datafusion_config.insert(
|
|
"iox.hint_known_object_size_to_object_store".to_string(),
|
|
false.to_string(),
|
|
);
|
|
let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs {
|
|
catalog: Arc::clone(&catalog),
|
|
write_buffer: Arc::clone(&write_buffer) as _,
|
|
exec: Arc::clone(&exec),
|
|
metrics: Arc::clone(&metrics),
|
|
datafusion_config: Arc::new(datafusion_config),
|
|
query_log_size: 10,
|
|
telemetry_store: Arc::clone(&telemetry_store),
|
|
sys_events_store: Arc::clone(&sys_events_store),
|
|
started_with_auth: false,
|
|
time_provider: Arc::clone(&time_provider) as _,
|
|
}));
|
|
|
|
Self {
|
|
query_executor,
|
|
write_buffer,
|
|
_time_provider: time_provider,
|
|
_metrics: metrics,
|
|
_object_store: object_store,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct TestWrite<LP> {
|
|
lp: LP,
|
|
time_seconds: i64,
|
|
}
|
|
|
|
impl<LP> TestWrite<LP>
|
|
where
|
|
LP: AsRef<str>,
|
|
{
|
|
fn lp(lp: LP, time_seconds: i64) -> Self {
|
|
Self { lp, time_seconds }
|
|
}
|
|
}
|
|
|
|
impl TestService {
|
|
async fn do_writes<LP: AsRef<str>>(
|
|
&self,
|
|
db: &'static str,
|
|
writes: impl IntoIterator<Item = TestWrite<LP>>,
|
|
) {
|
|
for w in writes {
|
|
self.write_buffer
|
|
.write_lp(
|
|
NamespaceName::new(db).unwrap(),
|
|
w.lp.as_ref(),
|
|
Time::from_timestamp_nanos(w.time_seconds * 1_000_000_000),
|
|
false,
|
|
Precision::Nanosecond,
|
|
false,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
}
|
|
|
|
async fn query_sql(&self, db: &str, query_str: &str) -> Vec<RecordBatch> {
|
|
let stream = self
|
|
.query_executor
|
|
.query_sql(db, query_str, None, None, None)
|
|
.await
|
|
.unwrap();
|
|
stream.try_collect().await.unwrap()
|
|
}
|
|
|
|
async fn wait_for_snapshot_sequence(&self, wait_for: u64) {
|
|
let mut count = 0;
|
|
loop {
|
|
let last = self
|
|
.write_buffer
|
|
.wal()
|
|
.last_snapshot_sequence_number()
|
|
.await
|
|
.as_u64();
|
|
if last >= wait_for {
|
|
break;
|
|
}
|
|
count += 1;
|
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
|
if count > 100 {
|
|
panic!("waited too long for a snapshot with sequence {wait_for}");
|
|
}
|
|
}
|
|
}
|
|
}
|