feat: rework delete predicate preservation as integration test (#2820)
* feat: rework delete predicate preservation as integration test * chore: review feedback * chore: fix lintpull/24376/head
parent
f7f6965b65
commit
d390dfa280
|
@ -120,8 +120,7 @@ impl RemoteServer for RemoteServerImpl {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test_helpers {
|
||||
pub mod test_helpers {
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use super::*;
|
||||
|
@ -132,6 +131,12 @@ pub(crate) mod test_helpers {
|
|||
pub remotes: BTreeMap<String, Arc<TestRemoteServer>>,
|
||||
}
|
||||
|
||||
impl Default for TestConnectionManager {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl TestConnectionManager {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
|
|
231
server/src/db.rs
231
server/src/db.rs
|
@ -1398,16 +1398,14 @@ mod tests {
|
|||
use arrow::record_batch::RecordBatch;
|
||||
use bytes::Bytes;
|
||||
use futures::{stream, StreamExt, TryStreamExt};
|
||||
use predicate::delete_expr::DeleteExpr;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use ::test_helpers::{assert_contains, maybe_start_logging};
|
||||
use ::test_helpers::assert_contains;
|
||||
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
|
||||
use data_types::{
|
||||
chunk_metadata::{ChunkAddr, ChunkStorage},
|
||||
database_rules::{LifecycleRules, PartitionTemplate, TemplatePart},
|
||||
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
|
||||
timestamp::TimestampRange,
|
||||
write_summary::TimestampSummary,
|
||||
};
|
||||
use entry::test_helpers::lp_to_entry;
|
||||
|
@ -3563,233 +3561,6 @@ mod tests {
|
|||
join_handle.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_predicate_preservation() {
|
||||
// Test that delete predicates are stored within the preserved catalog
|
||||
maybe_start_logging();
|
||||
|
||||
// ==================== setup ====================
|
||||
let object_store = Arc::new(ObjectStore::new_in_memory());
|
||||
let server_id = ServerId::try_from(1).unwrap();
|
||||
let db_name = "delete_predicate_preservation_test";
|
||||
|
||||
// ==================== do: create DB ====================
|
||||
// Create a DB given a server id, an object store and a db name
|
||||
let test_db = TestDb::builder()
|
||||
.object_store(Arc::clone(&object_store))
|
||||
.server_id(server_id)
|
||||
.db_name(db_name)
|
||||
.lifecycle_rules(LifecycleRules {
|
||||
catalog_transactions_until_checkpoint: NonZeroU64::try_from(1).unwrap(),
|
||||
// do not prune transactions files because this tests relies on them
|
||||
catalog_transaction_prune_age: Duration::from_secs(1_000),
|
||||
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
|
||||
..Default::default()
|
||||
})
|
||||
.partition_template(PartitionTemplate {
|
||||
parts: vec![TemplatePart::Column("part".to_string())],
|
||||
})
|
||||
.build()
|
||||
.await;
|
||||
let db = Arc::new(test_db.db);
|
||||
|
||||
// ==================== do: create chunks ====================
|
||||
let table_name = "cpu";
|
||||
|
||||
// 1: preserved
|
||||
let partition_key = "part_a";
|
||||
write_lp(&db, "cpu,part=a row=10,selector=0i 10").await;
|
||||
write_lp(&db, "cpu,part=a row=11,selector=1i 11").await;
|
||||
db.persist_partition(table_name, partition_key, true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 2: RUB
|
||||
let partition_key = "part_b";
|
||||
write_lp(&db, "cpu,part=b row=20,selector=0i 20").await;
|
||||
write_lp(&db, "cpu,part=b row=21,selector=1i 21").await;
|
||||
db.compact_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 3: MUB
|
||||
let _partition_key = "part_c";
|
||||
write_lp(&db, "cpu,part=c row=30,selector=0i 30").await;
|
||||
write_lp(&db, "cpu,part=c row=31,selector=1i 31").await;
|
||||
|
||||
// 4: preserved and unloaded
|
||||
let partition_key = "part_d";
|
||||
write_lp(&db, "cpu,part=d row=40,selector=0i 40").await;
|
||||
write_lp(&db, "cpu,part=d row=41,selector=1i 41").await;
|
||||
|
||||
let chunk_id = db
|
||||
.persist_partition(table_name, partition_key, true)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.id();
|
||||
|
||||
db.unload_read_buffer(table_name, partition_key, chunk_id)
|
||||
.unwrap();
|
||||
|
||||
// ==================== do: delete ====================
|
||||
let pred = Arc::new(DeletePredicate {
|
||||
range: TimestampRange {
|
||||
start: 0,
|
||||
end: 1_000,
|
||||
},
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"selector".to_string(),
|
||||
predicate::delete_expr::Op::Eq,
|
||||
predicate::delete_expr::Scalar::I64(1),
|
||||
)],
|
||||
});
|
||||
db.delete("cpu", Arc::clone(&pred)).await.unwrap();
|
||||
|
||||
// ==================== do: preserve another partition ====================
|
||||
let partition_key = "part_b";
|
||||
db.persist_partition(table_name, partition_key, true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// ==================== do: use background worker for a short while ====================
|
||||
let iters_start = db.worker_iterations_delete_predicate_preservation();
|
||||
let shutdown: CancellationToken = Default::default();
|
||||
let shutdown_captured = shutdown.clone();
|
||||
let db_captured = Arc::clone(&db);
|
||||
let join_handle =
|
||||
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
|
||||
|
||||
let t_0 = Instant::now();
|
||||
loop {
|
||||
let did_delete_predicate_preservation =
|
||||
db.worker_iterations_delete_predicate_preservation() > iters_start;
|
||||
let did_compaction = db.chunk_summaries().unwrap().into_iter().any(|summary| {
|
||||
(summary.partition_key.as_ref() == "part_c")
|
||||
&& (summary.storage == ChunkStorage::ReadBuffer)
|
||||
});
|
||||
if did_delete_predicate_preservation && did_compaction {
|
||||
break;
|
||||
}
|
||||
assert!(t_0.elapsed() < Duration::from_secs(10));
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
shutdown.cancel();
|
||||
join_handle.await.unwrap();
|
||||
|
||||
// ==================== check: delete predicates ====================
|
||||
let closure_check_delete_predicates = |db: &Db| {
|
||||
for chunk in db.catalog.chunks() {
|
||||
let chunk = chunk.read();
|
||||
if chunk.addr().partition_key.as_ref() == "part_b" {
|
||||
// Strictly speaking not required because the chunk was persisted AFTER the delete predicate was
|
||||
// registered so we can get away with materializing it during persistence.
|
||||
continue;
|
||||
}
|
||||
if chunk.addr().partition_key.as_ref() == "part_c" {
|
||||
// This partition was compacted, so the delete predicates were materialized.
|
||||
continue;
|
||||
}
|
||||
let predicates = chunk.delete_predicates();
|
||||
assert_eq!(predicates.len(), 1);
|
||||
assert_eq!(predicates[0].as_ref(), pred.as_ref());
|
||||
}
|
||||
};
|
||||
closure_check_delete_predicates(&db);
|
||||
|
||||
// ==================== check: query ====================
|
||||
let expected = vec![
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
"| part | row | selector | time |",
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
"| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |",
|
||||
"| c | 30 | 0 | 1970-01-01T00:00:00.000000030Z |",
|
||||
"| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |",
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
];
|
||||
let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
|
||||
// ==================== do: re-load DB ====================
|
||||
// Re-create database with same store, serverID, and DB name
|
||||
drop(db);
|
||||
let test_db = TestDb::builder()
|
||||
.object_store(Arc::clone(&object_store))
|
||||
.server_id(server_id)
|
||||
.db_name(db_name)
|
||||
.build()
|
||||
.await;
|
||||
let db = Arc::new(test_db.db);
|
||||
|
||||
// ==================== check: delete predicates ====================
|
||||
closure_check_delete_predicates(&db);
|
||||
|
||||
// ==================== check: query ====================
|
||||
// NOTE: partition "c" is gone here because it was not written to object store
|
||||
let expected = vec![
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
"| part | row | selector | time |",
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
"| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |",
|
||||
"| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |",
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
];
|
||||
let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
|
||||
// ==================== do: remove checkpoint files ====================
|
||||
let files = db
|
||||
.iox_object_store
|
||||
.catalog_transaction_files()
|
||||
.await
|
||||
.unwrap()
|
||||
.try_concat()
|
||||
.await
|
||||
.unwrap();
|
||||
let mut deleted_one = false;
|
||||
for file in files {
|
||||
if file.is_checkpoint() {
|
||||
db.iox_object_store
|
||||
.delete_catalog_transaction_file(&file)
|
||||
.await
|
||||
.unwrap();
|
||||
deleted_one = true;
|
||||
}
|
||||
}
|
||||
assert!(deleted_one);
|
||||
|
||||
// ==================== do: re-load DB ====================
|
||||
// Re-create database with same store, serverID, and DB name
|
||||
drop(db);
|
||||
let test_db = TestDb::builder()
|
||||
.object_store(Arc::clone(&object_store))
|
||||
.server_id(server_id)
|
||||
.db_name(db_name)
|
||||
.build()
|
||||
.await;
|
||||
let db = Arc::new(test_db.db);
|
||||
|
||||
// ==================== check: delete predicates ====================
|
||||
closure_check_delete_predicates(&db);
|
||||
|
||||
// ==================== check: query ====================
|
||||
// NOTE: partition "c" is gone here because it was not written to object store
|
||||
let _expected = vec![
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
"| part | row | selector | time |",
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
"| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |",
|
||||
"| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |",
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
];
|
||||
let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn table_wide_schema_enforcement() {
|
||||
// need a table with a partition template that uses a tag column, so that we can easily write to different partitions
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use super::{
|
||||
catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream,
|
||||
};
|
||||
use data_types::chunk_metadata::ChunkAddr;
|
||||
use data_types::{
|
||||
chunk_metadata::{ChunkId, ChunkOrder},
|
||||
partition_metadata,
|
||||
|
@ -76,8 +77,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// MutableBuffer, ReadBuffer, or a ParquetFile
|
||||
#[derive(Debug)]
|
||||
pub struct DbChunk {
|
||||
id: ChunkId,
|
||||
table_name: Arc<str>,
|
||||
addr: ChunkAddr,
|
||||
access_recorder: AccessRecorder,
|
||||
state: State,
|
||||
meta: Arc<ChunkMetadata>,
|
||||
|
@ -88,22 +88,15 @@ pub struct DbChunk {
|
|||
|
||||
#[derive(Debug)]
|
||||
enum State {
|
||||
MutableBuffer {
|
||||
chunk: Arc<ChunkSnapshot>,
|
||||
},
|
||||
ReadBuffer {
|
||||
chunk: Arc<RBChunk>,
|
||||
partition_key: Arc<str>,
|
||||
},
|
||||
ParquetFile {
|
||||
chunk: Arc<ParquetChunk>,
|
||||
},
|
||||
MutableBuffer { chunk: Arc<ChunkSnapshot> },
|
||||
ReadBuffer { chunk: Arc<RBChunk> },
|
||||
ParquetFile { chunk: Arc<ParquetChunk> },
|
||||
}
|
||||
|
||||
impl DbChunk {
|
||||
/// Create a DBChunk snapshot of the catalog chunk
|
||||
pub fn snapshot(chunk: &super::catalog::chunk::CatalogChunk) -> Arc<Self> {
|
||||
let partition_key = Arc::from(chunk.key());
|
||||
let addr = chunk.addr().clone();
|
||||
|
||||
use super::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr};
|
||||
|
||||
|
@ -137,7 +130,6 @@ impl DbChunk {
|
|||
},
|
||||
ChunkStageFrozenRepr::ReadBuffer(repr) => State::ReadBuffer {
|
||||
chunk: Arc::clone(repr),
|
||||
partition_key,
|
||||
},
|
||||
};
|
||||
(state, Arc::clone(meta))
|
||||
|
@ -151,7 +143,6 @@ impl DbChunk {
|
|||
let state = if let Some(read_buffer) = &read_buffer {
|
||||
State::ReadBuffer {
|
||||
chunk: Arc::clone(read_buffer),
|
||||
partition_key,
|
||||
}
|
||||
} else {
|
||||
State::ParquetFile {
|
||||
|
@ -163,8 +154,7 @@ impl DbChunk {
|
|||
};
|
||||
|
||||
Arc::new(Self {
|
||||
id: chunk.id(),
|
||||
table_name: chunk.table_name(),
|
||||
addr,
|
||||
access_recorder: chunk.access_recorder().clone(),
|
||||
state,
|
||||
meta,
|
||||
|
@ -193,8 +183,7 @@ impl DbChunk {
|
|||
}
|
||||
};
|
||||
Arc::new(Self {
|
||||
id: chunk.id(),
|
||||
table_name: chunk.table_name(),
|
||||
addr: chunk.addr().clone(),
|
||||
meta,
|
||||
state,
|
||||
access_recorder: chunk.access_recorder().clone(),
|
||||
|
@ -213,9 +202,14 @@ impl DbChunk {
|
|||
}
|
||||
}
|
||||
|
||||
/// Return the address of this chunk
|
||||
pub fn addr(&self) -> &ChunkAddr {
|
||||
&self.addr
|
||||
}
|
||||
|
||||
/// Return the name of the table in this chunk
|
||||
pub fn table_name(&self) -> &Arc<str> {
|
||||
&self.table_name
|
||||
&self.addr.table_name
|
||||
}
|
||||
|
||||
pub fn time_of_first_write(&self) -> Time {
|
||||
|
@ -249,11 +243,11 @@ impl QueryChunk for DbChunk {
|
|||
type Error = Error;
|
||||
|
||||
fn id(&self) -> ChunkId {
|
||||
self.id
|
||||
self.addr.chunk_id
|
||||
}
|
||||
|
||||
fn table_name(&self) -> &str {
|
||||
self.table_name.as_ref()
|
||||
self.addr.table_name.as_ref()
|
||||
}
|
||||
|
||||
fn may_contain_pk_duplicates(&self) -> bool {
|
||||
|
@ -663,7 +657,7 @@ mod tests {
|
|||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.id;
|
||||
.id();
|
||||
|
||||
db.unload_read_buffer("cpu", "1970-01-01T00", id).unwrap();
|
||||
|
||||
|
|
|
@ -68,7 +68,9 @@
|
|||
clippy::future_not_send
|
||||
)]
|
||||
|
||||
use ::lifecycle::{LockableChunk, LockablePartition};
|
||||
use async_trait::async_trait;
|
||||
use connection::{ConnectionManager, RemoteServer};
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkId,
|
||||
database_rules::{NodeGroup, RoutingRules, ShardId, Sink},
|
||||
|
@ -86,7 +88,6 @@ use hashbrown::HashMap;
|
|||
use influxdb_line_protocol::ParsedLine;
|
||||
use internal_types::freezable::Freezable;
|
||||
use iox_object_store::IoxObjectStore;
|
||||
use lifecycle::{LockableChunk, LockablePartition};
|
||||
use observability_deps::tracing::{error, info, warn};
|
||||
use parking_lot::RwLock;
|
||||
use rand::seq::SliceRandom;
|
||||
|
@ -100,13 +101,12 @@ use tracker::{TaskTracker, TrackedFutureExt};
|
|||
use uuid::Uuid;
|
||||
|
||||
pub use application::ApplicationState;
|
||||
pub use connection::{ConnectionManager, ConnectionManagerImpl, RemoteServer};
|
||||
pub use db::Db;
|
||||
pub use job::JobRegistry;
|
||||
pub use resolver::{GrpcConnectionString, RemoteTemplate};
|
||||
|
||||
mod application;
|
||||
mod connection;
|
||||
pub mod connection;
|
||||
pub mod database;
|
||||
pub mod db;
|
||||
mod job;
|
||||
|
@ -1233,9 +1233,8 @@ mod tests {
|
|||
use arrow_util::assert_batches_eq;
|
||||
use bytes::Bytes;
|
||||
use connection::test_helpers::{TestConnectionManager, TestRemoteServer};
|
||||
use data_types::chunk_metadata::ChunkStorage;
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkAddr,
|
||||
chunk_metadata::{ChunkAddr, ChunkStorage},
|
||||
database_rules::{
|
||||
DatabaseRules, HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart,
|
||||
WriteBufferConnection, WriteBufferDirection, NO_SHARD_CONFIG,
|
||||
|
|
|
@ -0,0 +1,274 @@
|
|||
use futures::TryStreamExt;
|
||||
use std::{
|
||||
num::{NonZeroU32, NonZeroU64},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkStorage,
|
||||
database_rules::{DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart},
|
||||
server_id::ServerId,
|
||||
timestamp::TimestampRange,
|
||||
DatabaseName,
|
||||
};
|
||||
use iox_object_store::IoxObjectStore;
|
||||
use object_store::ObjectStore;
|
||||
use predicate::{delete_expr::DeleteExpr, delete_predicate::DeletePredicate};
|
||||
use query::{QueryChunk, QueryChunkMeta, QueryDatabase};
|
||||
use server::{
|
||||
connection::test_helpers::TestConnectionManager,
|
||||
db::test_helpers::{run_query, write_lp},
|
||||
rules::ProvidedDatabaseRules,
|
||||
ApplicationState, Db, Server,
|
||||
};
|
||||
use test_helpers::maybe_start_logging;
|
||||
|
||||
async fn start_server(
|
||||
server_id: ServerId,
|
||||
application: Arc<ApplicationState>,
|
||||
) -> Arc<Server<TestConnectionManager>> {
|
||||
let server = Arc::new(Server::new(
|
||||
TestConnectionManager::new(),
|
||||
application,
|
||||
Default::default(),
|
||||
));
|
||||
server.set_id(server_id).unwrap();
|
||||
server.wait_for_init().await.unwrap();
|
||||
server
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_predicate_preservation() {
|
||||
maybe_start_logging();
|
||||
|
||||
// ==================== setup ====================
|
||||
let object_store = Arc::new(ObjectStore::new_in_memory());
|
||||
let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
|
||||
let db_name = DatabaseName::new("delete_predicate_preservation_test").unwrap();
|
||||
|
||||
let application = Arc::new(ApplicationState::new(Arc::clone(&object_store), None));
|
||||
let server = start_server(server_id, Arc::clone(&application)).await;
|
||||
|
||||
// Test that delete predicates are stored within the preserved catalog
|
||||
|
||||
// ==================== do: create DB ====================
|
||||
// Create a DB given a server id, an object store and a db name
|
||||
|
||||
let rules = DatabaseRules {
|
||||
partition_template: PartitionTemplate {
|
||||
parts: vec![TemplatePart::Column("part".to_string())],
|
||||
},
|
||||
lifecycle_rules: LifecycleRules {
|
||||
catalog_transactions_until_checkpoint: NonZeroU64::new(1).unwrap(),
|
||||
// do not prune transactions files because this tests relies on them
|
||||
catalog_transaction_prune_age: Duration::from_secs(1_000),
|
||||
late_arrive_window_seconds: NonZeroU32::new(1).unwrap(),
|
||||
..Default::default()
|
||||
},
|
||||
..DatabaseRules::new(db_name.clone())
|
||||
};
|
||||
|
||||
let database = server
|
||||
.create_database(ProvidedDatabaseRules::new_rules(rules.clone().into()).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
let db = database.initialized_db().unwrap();
|
||||
|
||||
// ==================== do: create chunks ====================
|
||||
let table_name = "cpu";
|
||||
|
||||
// 1: preserved
|
||||
let partition_key = "part_a";
|
||||
write_lp(&db, "cpu,part=a row=10,selector=0i 10").await;
|
||||
write_lp(&db, "cpu,part=a row=11,selector=1i 11").await;
|
||||
db.persist_partition(table_name, partition_key, true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 2: RUB
|
||||
let partition_key = "part_b";
|
||||
write_lp(&db, "cpu,part=b row=20,selector=0i 20").await;
|
||||
write_lp(&db, "cpu,part=b row=21,selector=1i 21").await;
|
||||
db.compact_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 3: MUB
|
||||
let _partition_key = "part_c";
|
||||
write_lp(&db, "cpu,part=c row=30,selector=0i 30").await;
|
||||
write_lp(&db, "cpu,part=c row=31,selector=1i 31").await;
|
||||
|
||||
// 4: preserved and unloaded
|
||||
let partition_key = "part_d";
|
||||
write_lp(&db, "cpu,part=d row=40,selector=0i 40").await;
|
||||
write_lp(&db, "cpu,part=d row=41,selector=1i 41").await;
|
||||
|
||||
let chunk_id = db
|
||||
.persist_partition(table_name, partition_key, true)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.id();
|
||||
|
||||
db.unload_read_buffer(table_name, partition_key, chunk_id)
|
||||
.unwrap();
|
||||
|
||||
// ==================== do: delete ====================
|
||||
let pred = Arc::new(DeletePredicate {
|
||||
range: TimestampRange {
|
||||
start: 0,
|
||||
end: 1_000,
|
||||
},
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"selector".to_string(),
|
||||
predicate::delete_expr::Op::Eq,
|
||||
predicate::delete_expr::Scalar::I64(1),
|
||||
)],
|
||||
});
|
||||
db.delete("cpu", Arc::clone(&pred)).await.unwrap();
|
||||
|
||||
// ==================== do: preserve another partition ====================
|
||||
let partition_key = "part_b";
|
||||
db.persist_partition(table_name, partition_key, true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// ==================== do: use background worker for a short while ====================
|
||||
let iters_start = db.worker_iterations_delete_predicate_preservation();
|
||||
// time_provider.inc(rules.lifecycle_rules.late_arrive_window());
|
||||
|
||||
let t_0 = Instant::now();
|
||||
loop {
|
||||
let did_delete_predicate_preservation =
|
||||
db.worker_iterations_delete_predicate_preservation() > iters_start;
|
||||
let did_compaction = db.chunk_summaries().unwrap().into_iter().any(|summary| {
|
||||
(summary.partition_key.as_ref() == "part_c")
|
||||
&& (summary.storage == ChunkStorage::ReadBuffer)
|
||||
});
|
||||
if did_delete_predicate_preservation && did_compaction {
|
||||
break;
|
||||
}
|
||||
assert!(t_0.elapsed() < Duration::from_secs(10));
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
// ==================== check: delete predicates ====================
|
||||
|
||||
let closure_check_delete_predicates = |db: &Db| {
|
||||
for chunk in db.chunks(&Default::default()) {
|
||||
let partition_key = chunk.addr().partition_key.as_ref();
|
||||
if partition_key == "part_b" {
|
||||
// Strictly speaking not required because the chunk was persisted AFTER the delete predicate was
|
||||
// registered so we can get away with materializing it during persistence.
|
||||
continue;
|
||||
}
|
||||
if partition_key == "part_c" {
|
||||
// This partition was compacted, so the delete predicates were materialized.
|
||||
continue;
|
||||
}
|
||||
let predicates = chunk.delete_predicates();
|
||||
assert_eq!(predicates.len(), 1);
|
||||
assert_eq!(predicates[0].as_ref(), pred.as_ref());
|
||||
}
|
||||
};
|
||||
closure_check_delete_predicates(&db);
|
||||
|
||||
// ==================== check: query ====================
|
||||
let expected = vec![
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
"| part | row | selector | time |",
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
"| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |",
|
||||
"| c | 30 | 0 | 1970-01-01T00:00:00.000000030Z |",
|
||||
"| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |",
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
];
|
||||
let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
|
||||
// ==================== do: re-load DB ====================
|
||||
// Re-create database with same store, serverID, and DB name
|
||||
server.shutdown();
|
||||
server.join().await.unwrap();
|
||||
|
||||
drop(db);
|
||||
drop(server);
|
||||
let server = start_server(server_id, Arc::clone(&application)).await;
|
||||
let db = server.db(&db_name).unwrap();
|
||||
|
||||
// ==================== check: delete predicates ====================
|
||||
closure_check_delete_predicates(&db);
|
||||
|
||||
// ==================== check: query ====================
|
||||
// NOTE: partition "c" is gone here because it was not written to object store
|
||||
let expected = vec![
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
"| part | row | selector | time |",
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
"| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |",
|
||||
"| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |",
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
];
|
||||
|
||||
let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
|
||||
server.shutdown();
|
||||
server.join().await.unwrap();
|
||||
|
||||
drop(db);
|
||||
drop(server);
|
||||
|
||||
// ==================== do: remove checkpoint files ====================
|
||||
let iox_object_store =
|
||||
IoxObjectStore::find_existing(Arc::clone(application.object_store()), server_id, &db_name)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let files = iox_object_store
|
||||
.catalog_transaction_files()
|
||||
.await
|
||||
.unwrap()
|
||||
.try_concat()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut deleted_one = false;
|
||||
for file in files {
|
||||
if file.is_checkpoint() {
|
||||
iox_object_store
|
||||
.delete_catalog_transaction_file(&file)
|
||||
.await
|
||||
.unwrap();
|
||||
deleted_one = true;
|
||||
}
|
||||
}
|
||||
assert!(deleted_one);
|
||||
|
||||
// ==================== do: re-load DB ====================
|
||||
// Re-create database with same store, serverID, and DB name
|
||||
let server = start_server(server_id, Arc::clone(&application)).await;
|
||||
let db = server.db(&db_name).unwrap();
|
||||
|
||||
// ==================== check: delete predicates ====================
|
||||
closure_check_delete_predicates(&db);
|
||||
|
||||
// ==================== check: query ====================
|
||||
// NOTE: partition "c" is gone here because it was not written to object store
|
||||
let _expected = vec![
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
"| part | row | selector | time |",
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
"| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |",
|
||||
"| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |",
|
||||
"+------+-----+----------+--------------------------------+",
|
||||
];
|
||||
let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
}
|
|
@ -8,7 +8,7 @@ use object_store::{self, ObjectStore};
|
|||
use observability_deps::tracing::{error, info, warn};
|
||||
use panic_logging::SendPanicsToTracing;
|
||||
use server::{
|
||||
ApplicationState, ConnectionManagerImpl as ConnectionManager, RemoteTemplate,
|
||||
connection::ConnectionManagerImpl as ConnectionManager, ApplicationState, RemoteTemplate,
|
||||
Server as AppServer, ServerConfig,
|
||||
};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
|
|
@ -28,7 +28,7 @@ use influxdb_iox_client::format::QueryOutputFormat;
|
|||
use influxdb_line_protocol::parse_lines;
|
||||
use predicate::delete_predicate::{parse_delete, DeletePredicate};
|
||||
use query::exec::ExecutionContextProvider;
|
||||
use server::{ApplicationState, ConnectionManager, Error, Server as AppServer};
|
||||
use server::{connection::ConnectionManager, ApplicationState, Error, Server as AppServer};
|
||||
|
||||
// External crates
|
||||
use bytes::{Bytes, BytesMut};
|
||||
|
@ -910,7 +910,9 @@ mod tests {
|
|||
use metric::{Attributes, DurationHistogram, Metric, U64Counter};
|
||||
use object_store::ObjectStore;
|
||||
use serde::de::DeserializeOwned;
|
||||
use server::{db::Db, rules::ProvidedDatabaseRules, ApplicationState, ConnectionManagerImpl};
|
||||
use server::{
|
||||
connection::ConnectionManagerImpl, db::Db, rules::ProvidedDatabaseRules, ApplicationState,
|
||||
};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use trace::RingBufferTraceCollector;
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ use tonic::transport::NamedService;
|
|||
use trace_http::ctx::TraceHeaderParser;
|
||||
|
||||
use crate::influxdb_ioxd::serving_readiness::ServingReadiness;
|
||||
use server::{ApplicationState, ConnectionManager, Server};
|
||||
use server::{connection::ConnectionManager, ApplicationState, Server};
|
||||
use trace::TraceCollector;
|
||||
|
||||
pub mod error;
|
||||
|
|
|
@ -21,7 +21,7 @@ use tonic::{Request, Response, Streaming};
|
|||
use data_types::{DatabaseName, DatabaseNameError};
|
||||
use observability_deps::tracing::{info, warn};
|
||||
use query::exec::ExecutionContextProvider;
|
||||
use server::{ConnectionManager, Server};
|
||||
use server::{connection::ConnectionManager, Server};
|
||||
|
||||
use crate::influxdb_ioxd::rpc::error::default_server_error_handler;
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *
|
|||
use predicate::delete_predicate::DeletePredicate;
|
||||
use query::QueryDatabase;
|
||||
use server::rules::ProvidedDatabaseRules;
|
||||
use server::{ApplicationState, ConnectionManager, Error, Server};
|
||||
use server::{connection::ConnectionManager, ApplicationState, Error, Server};
|
||||
use tonic::{Request, Response, Status};
|
||||
|
||||
struct ManagementService<M: ConnectionManager> {
|
||||
|
|
|
@ -12,7 +12,7 @@ use generated_types::{
|
|||
};
|
||||
use influxdb_line_protocol::parse_lines;
|
||||
use observability_deps::tracing::debug;
|
||||
use server::{ConnectionManager, Server};
|
||||
use server::{connection::ConnectionManager, Server};
|
||||
|
||||
use super::error::default_server_error_handler;
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use super::error::default_server_error_handler;
|
||||
use generated_types::google::FieldViolation;
|
||||
use generated_types::influxdata::pbdata::v1::*;
|
||||
use server::{ConnectionManager, Server};
|
||||
use server::{connection::ConnectionManager, Server};
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
|
Loading…
Reference in New Issue