Merge branch 'main' into ntran/compact_os_framework

pull/24376/head
kodiakhq[bot] 2021-11-22 15:17:13 +00:00 committed by GitHub
commit b5d6e201e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 352 additions and 529 deletions

24
Cargo.lock generated
View File

@ -1321,9 +1321,9 @@ checksum = "ac5956d4e63858efaec57e0d6c1c2f6a41e1487f830314a324ccd7e2223a7ca0"
[[package]] [[package]]
name = "handlebars" name = "handlebars"
version = "4.1.4" version = "4.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1874024f4a29f47d609014caec0b1c866f1c1eb0661a09c9733ecc4757f5f88" checksum = "8ad84da8f63da982543fc85fcabaee2ad1fdd809d99d64a48887e2e942ddfe46"
dependencies = [ dependencies = [
"log", "log",
"pest", "pest",
@ -1434,9 +1434,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "0.14.14" version = "0.14.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b91bb1f221b6ea1f1e4371216b70f40748774c2fb5971b450c07773fb92d26b" checksum = "436ec0091e4f20e655156a30a0df3770fe2900aa301e548e08446ec794b6953c"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-channel", "futures-channel",
@ -1914,9 +1914,9 @@ dependencies = [
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.106" version = "0.2.108"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a60553f9a9e039a333b4e9b20573b9e9b9c0bb3a11e201ccc48ef4283456d673" checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119"
[[package]] [[package]]
name = "libloading" name = "libloading"
@ -3801,9 +3801,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.70" version = "1.0.71"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e277c495ac6cd1a01a58d0a0c574568b4d1ddf14f59965c6a58b8d96400b54f3" checksum = "063bf466a64011ac24040a49009724ee60a57da1b437617ceb32e53ad61bfb19"
dependencies = [ dependencies = [
"indexmap", "indexmap",
"itoa", "itoa",
@ -4530,9 +4530,9 @@ dependencies = [
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.10" version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c00e500fff5fa1131c866b246041a6bf96da9c965f8fe4128cb1421f23e93c00" checksum = "5651b5f6860a99bd1adb59dbfe1db8beb433e73709d9032b413a77e2fb7c066a"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
@ -4670,9 +4670,9 @@ dependencies = [
[[package]] [[package]]
name = "tracing-subscriber" name = "tracing-subscriber"
version = "0.3.1" version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80a4ddde70311d8da398062ecf6fc2c309337de6b0f77d6c27aff8d53f6fca52" checksum = "7507ec620f809cdf07cccb5bc57b13069a88031b795efd4079b1c71b66c1613d"
dependencies = [ dependencies = [
"ansi_term 0.12.1", "ansi_term 0.12.1",
"lazy_static", "lazy_static",

View File

@ -9,7 +9,7 @@ bytes = "1.0"
futures = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false }
reqwest = { version = "0.11", features = ["stream", "json"] } reqwest = { version = "0.11", features = ["stream", "json"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.70" serde_json = "1.0.71"
snafu = "0.6.6" snafu = "0.6.6"
url = "2.1.1" url = "2.1.1"

View File

@ -70,7 +70,7 @@ pprof = { version = "^0.5", default-features = false, features = ["flamegraph",
prost = "0.8" prost = "0.8"
rustyline = { version = "9.0", default-features = false } rustyline = { version = "9.0", default-features = false }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.70" serde_json = "1.0.71"
serde_urlencoded = "0.7.0" serde_urlencoded = "0.7.0"
snafu = "0.6.9" snafu = "0.6.9"
structopt = "0.3.25" structopt = "0.3.25"

View File

@ -27,7 +27,7 @@ mutable_batch_pb = { path = "../mutable_batch_pb", optional = true }
prost = "0.8" prost = "0.8"
rand = "0.8.3" rand = "0.8.3"
serde = "1.0.128" serde = "1.0.128"
serde_json = { version = "1.0.70", optional = true } serde_json = { version = "1.0.71", optional = true }
thiserror = "1.0.30" thiserror = "1.0.30"
tonic = { version = "0.5.0" } tonic = { version = "0.5.0" }
uuid = { version = "0.8", features = ["v4"] } uuid = { version = "0.8", features = ["v4"] }

View File

@ -10,7 +10,7 @@ chrono = "0.4.13"
chrono-english = "0.1.4" chrono-english = "0.1.4"
clap = "2.33.1" clap = "2.33.1"
futures = "0.3.5" futures = "0.3.5"
handlebars = "4.1.4" handlebars = "4.1.5"
humantime = "2.1.0" humantime = "2.1.0"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
generated_types = { path = "../generated_types" } generated_types = { path = "../generated_types" }
@ -19,12 +19,12 @@ influxdb_iox_client = { path = "../influxdb_iox_client" }
itertools = "0.10.0" itertools = "0.10.0"
rand = { version = "0.8.3", features = ["small_rng"] } rand = { version = "0.8.3", features = ["small_rng"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.70" serde_json = "1.0.71"
snafu = "0.6.8" snafu = "0.6.8"
tokio = { version = "1.13", features = ["macros", "rt-multi-thread"] } tokio = { version = "1.13", features = ["macros", "rt-multi-thread"] }
toml = "0.5.6" toml = "0.5.6"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3.1" tracing-subscriber = "0.3.2"
uuid = { version = "0.8.1", default_features = false } uuid = { version = "0.8.1", default_features = false }
[dev-dependencies] [dev-dependencies]

View File

@ -155,7 +155,7 @@ impl CatalogState for TracerCatalogState {
mod tests { mod tests {
use super::*; use super::*;
use crate::test_helpers::{make_config, new_empty}; use crate::test_helpers::{make_config, new_empty};
use parquet_file::test_utils::{chunk_addr, make_metadata, TestSize}; use parquet_file::test_utils::generator::ChunkGenerator;
use std::{collections::HashSet, sync::Arc}; use std::{collections::HashSet, sync::Arc};
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -176,6 +176,7 @@ mod tests {
async fn test_cleanup_rules() { async fn test_cleanup_rules() {
let config = make_config().await; let config = make_config().await;
let iox_object_store = &config.iox_object_store; let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
let catalog = new_empty(config.clone()).await; let catalog = new_empty(config.clone()).await;
@ -186,36 +187,20 @@ mod tests {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction().await;
// an ordinary tracked parquet file => keep // an ordinary tracked parquet file => keep
let (path, metadata) = let (chunk, _) = generator.generate().await;
make_metadata(iox_object_store, "foo", chunk_addr(1), TestSize::Full).await; transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
let metadata = Arc::new(metadata); paths_keep.push(chunk.path().clone());
let info = CatalogParquetInfo {
path,
file_size_bytes: 33,
metadata,
};
transaction.add_parquet(&info);
paths_keep.push(info.path);
// another ordinary tracked parquet file that was added and removed => keep (for time // another ordinary tracked parquet file that was added and removed => keep (for time
// travel) // travel)
let (path, metadata) = let (chunk, _) = generator.generate().await;
make_metadata(iox_object_store, "foo", chunk_addr(2), TestSize::Full).await; transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
let metadata = Arc::new(metadata); transaction.remove_parquet(chunk.path());
let info = CatalogParquetInfo { paths_keep.push(chunk.path().clone());
path,
file_size_bytes: 33,
metadata,
};
transaction.add_parquet(&info);
transaction.remove_parquet(&info.path);
paths_keep.push(info.path);
// an untracked parquet file => delete // an untracked parquet file => delete
let (path, _md) = let (chunk, _) = generator.generate().await;
make_metadata(iox_object_store, "foo", chunk_addr(3), TestSize::Full).await; paths_delete.push(chunk.path().clone());
paths_delete.push(path);
transaction.commit().await.unwrap(); transaction.commit().await.unwrap();
} }
@ -224,6 +209,7 @@ mod tests {
let files = get_unreferenced_parquet_files(&catalog, 1_000) let files = get_unreferenced_parquet_files(&catalog, 1_000)
.await .await
.unwrap(); .unwrap();
delete_files(&catalog, &files).await.unwrap(); delete_files(&catalog, &files).await.unwrap();
// deleting a second time should just work // deleting a second time should just work
@ -243,39 +229,33 @@ mod tests {
async fn test_cleanup_with_parallel_transaction() { async fn test_cleanup_with_parallel_transaction() {
let config = make_config().await; let config = make_config().await;
let iox_object_store = &config.iox_object_store; let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
let lock: RwLock<()> = Default::default(); let lock: RwLock<()> = Default::default();
let catalog = new_empty(config.clone()).await; let catalog = new_empty(config.clone()).await;
// try multiple times to provoke a conflict // try multiple times to provoke a conflict
for i in 0..100 { for i in 1..100 {
// Every so often try to create a file with the same ChunkAddr beforehand. This should // Every so often try to create a file with the same ChunkAddr beforehand. This should
// not trick the cleanup logic to remove the actual file because file paths contains a // not trick the cleanup logic to remove the actual file because file paths contains a
// UUIDv4 part. // UUIDv4 part.
if i % 2 == 0 { if i % 2 == 0 {
make_metadata(iox_object_store, "foo", chunk_addr(i), TestSize::Full).await; generator.generate_id(i).await;
} }
let (path, _) = tokio::join!( let (chunk, _) = tokio::join!(
async { async {
let guard = lock.read().await; let guard = lock.read().await;
let (path, md) =
make_metadata(iox_object_store, "foo", chunk_addr(i), TestSize::Full).await;
let metadata = Arc::new(md); let (chunk, _) = generator.generate_id(i).await;
let info = CatalogParquetInfo {
path,
file_size_bytes: 33,
metadata,
};
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction().await;
transaction.add_parquet(&info); transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
transaction.commit().await.unwrap(); transaction.commit().await.unwrap();
drop(guard); drop(guard);
info.path chunk
}, },
async { async {
let guard = lock.write().await; let guard = lock.write().await;
@ -289,7 +269,7 @@ mod tests {
); );
let all_files = list_all_files(iox_object_store).await; let all_files = list_all_files(iox_object_store).await;
assert!(dbg!(all_files).contains(dbg!(&path))); assert!(dbg!(all_files).contains(dbg!(chunk.path())));
} }
} }
@ -297,20 +277,15 @@ mod tests {
async fn test_cleanup_max_files() { async fn test_cleanup_max_files() {
let config = make_config().await; let config = make_config().await;
let iox_object_store = &config.iox_object_store; let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
let catalog = new_empty(config.clone()).await; let catalog = new_empty(config.clone()).await;
// create some files // create some files
let mut to_remove = HashSet::default(); let mut to_remove = HashSet::default();
for chunk_id in 0..3 { for _ in 0..3 {
let (path, _md) = make_metadata( let (chunk, _) = generator.generate().await;
iox_object_store, to_remove.insert(chunk.path().clone());
"foo",
chunk_addr(chunk_id),
TestSize::Full,
)
.await;
to_remove.insert(path);
} }
// run clean-up // run clean-up

View File

@ -1064,7 +1064,10 @@ mod tests {
use std::vec; use std::vec;
use bytes::Bytes; use bytes::Bytes;
use parquet_file::test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize}; use data_types::chunk_metadata::ChunkAddr;
use parquet_file::chunk::ParquetChunk;
use parquet_file::test_utils::generator::ChunkGenerator;
use parquet_file::test_utils::make_iox_object_store;
use super::*; use super::*;
use crate::test_helpers::{ use crate::test_helpers::{
@ -1642,6 +1645,7 @@ mod tests {
async fn test_checkpoint() { async fn test_checkpoint() {
let config = make_config().await; let config = make_config().await;
let mut trace = assert_single_catalog_inmem_works(config.clone()).await; let mut trace = assert_single_catalog_inmem_works(config.clone()).await;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(&config.iox_object_store));
// re-open catalog // re-open catalog
let (catalog, mut state) = load_ok(config.clone()).await.unwrap(); let (catalog, mut state) = load_ok(config.clone()).await.unwrap();
@ -1659,21 +1663,10 @@ mod tests {
// create another transaction on-top that adds a file (this transaction will be required to load the full state) // create another transaction on-top that adds a file (this transaction will be required to load the full state)
{ {
let addr = chunk_addr(1337); let (chunk, _) = generator.generate_id(1337).await;
let (path, metadata) = make_metadata(
&config.iox_object_store,
"foo",
addr.clone(),
TestSize::Full,
)
.await;
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction().await;
let info = CatalogParquetInfo { let info = CatalogParquetInfo::from_chunk(&chunk);
path,
file_size_bytes: 33,
metadata: Arc::new(metadata),
};
state.insert(info.clone()).unwrap(); state.insert(info.clone()).unwrap();
transaction.add_parquet(&info); transaction.add_parquet(&info);
let ckpt_handle = transaction.commit().await.unwrap(); let ckpt_handle = transaction.commit().await.unwrap();
@ -1713,6 +1706,7 @@ mod tests {
async fn test_delete_predicates() { async fn test_delete_predicates() {
let config = make_config().await; let config = make_config().await;
let iox_object_store = &config.iox_object_store; let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
let catalog = new_empty(config.clone()).await; let catalog = new_empty(config.clone()).await;
let mut state = TestCatalogState::default(); let mut state = TestCatalogState::default();
@ -1722,16 +1716,11 @@ mod tests {
// create 3 chunks // create 3 chunks
let mut chunk_addrs = vec![]; let mut chunk_addrs = vec![];
for id in 0..3 { for _ in 0..3 {
let chunk_addr = chunk_addr(id); let (chunk, metadata) = generator.generate().await;
let (path, metadata) = let chunk_addr = ChunkAddr::new(generator.partition(), metadata.chunk_id);
make_metadata(iox_object_store, "foo", chunk_addr.clone(), TestSize::Full)
.await; let info = CatalogParquetInfo::from_chunk(&chunk);
let info = CatalogParquetInfo {
path,
file_size_bytes: 33,
metadata: Arc::new(metadata),
};
state.insert(info.clone()).unwrap(); state.insert(info.clone()).unwrap();
t.add_parquet(&info); t.add_parquet(&info);
@ -1819,6 +1808,29 @@ mod tests {
} }
} }
/// Assert that set of parquet files tracked by a catalog are identical to the given sorted list.
fn assert_catalog_chunks(state: &TestCatalogState, expected: &[ParquetChunk]) {
let actual = get_catalog_parquet_files(state);
let mut expected: Vec<_> = expected.iter().collect();
expected.sort_by(|a, b| a.path().cmp(b.path()));
for ((actual_path, actual_md), chunk) in actual.iter().zip(expected.iter()) {
assert_eq!(actual_path, chunk.path());
let actual_md = actual_md.decode().unwrap();
let actual_schema = actual_md.read_schema().unwrap();
let expected_schema = chunk.schema();
assert_eq!(actual_schema, expected_schema);
// NOTE: the actual table name is not important here as long as it is the same for both calls, since it is
// only used to generate out statistics struct (not to read / dispatch anything).
let actual_stats = actual_md.read_statistics(&actual_schema).unwrap();
let expected_stats = &chunk.table_summary().columns;
assert_eq!(&actual_stats, expected_stats);
}
}
async fn checked_delete(iox_object_store: &IoxObjectStore, path: &TransactionFilePath) { async fn checked_delete(iox_object_store: &IoxObjectStore, path: &TransactionFilePath) {
// issue full GET operation to check if object is preset // issue full GET operation to check if object is preset
iox_object_store iox_object_store
@ -1872,6 +1884,7 @@ mod tests {
async fn assert_single_catalog_inmem_works(config: PreservedCatalogConfig) -> TestTrace { async fn assert_single_catalog_inmem_works(config: PreservedCatalogConfig) -> TestTrace {
let iox_object_store = &config.iox_object_store; let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
let catalog = new_empty(config.clone()).await; let catalog = new_empty(config.clone()).await;
let mut state = TestCatalogState::default(); let mut state = TestCatalogState::default();
@ -1889,102 +1902,56 @@ mod tests {
{ {
let mut t = catalog.open_transaction().await; let mut t = catalog.open_transaction().await;
let (path, metadata) = for _ in 0..4 {
make_metadata(iox_object_store, "foo", chunk_addr(0), TestSize::Full).await; let (chunk, _) = generator.generate().await;
expected.push((path.clone(), metadata.clone())); let info = CatalogParquetInfo::from_chunk(&chunk);
let info = CatalogParquetInfo { expected.push(chunk);
path,
file_size_bytes: 33,
metadata: Arc::new(metadata),
};
state.insert(info.clone()).unwrap();
t.add_parquet(&info);
let (path, metadata) =
make_metadata(iox_object_store, "bar", chunk_addr(1), TestSize::Full).await;
expected.push((path.clone(), metadata.clone()));
let info = CatalogParquetInfo {
path,
file_size_bytes: 33,
metadata: Arc::new(metadata),
};
state.insert(info.clone()).unwrap();
t.add_parquet(&info);
let (path, metadata) =
make_metadata(iox_object_store, "bar", chunk_addr(2), TestSize::Full).await;
expected.push((path.clone(), metadata.clone()));
let info = CatalogParquetInfo {
path,
file_size_bytes: 33,
metadata: Arc::new(metadata),
};
state.insert(info.clone()).unwrap();
t.add_parquet(&info);
let (path, metadata) =
make_metadata(iox_object_store, "foo", chunk_addr(3), TestSize::Full).await;
expected.push((path.clone(), metadata.clone()));
let info = CatalogParquetInfo {
path,
file_size_bytes: 33,
metadata: Arc::new(metadata),
};
state.insert(info.clone()).unwrap(); state.insert(info.clone()).unwrap();
t.add_parquet(&info); t.add_parquet(&info);
}
t.commit().await.unwrap(); t.commit().await.unwrap();
} }
assert_eq!(catalog.revision_counter(), 1); assert_eq!(catalog.revision_counter(), 1);
assert_catalog_parquet_files(&state, &expected); assert_catalog_chunks(&state, &expected);
trace.record(&catalog, &state, false); trace.record(&catalog, &state, false);
// modify catalog with examples // modify catalog with examples
{ {
let (path, metadata) = let (chunk, _) = generator.generate().await;
make_metadata(iox_object_store, "foo", chunk_addr(4), TestSize::Full).await; let info = CatalogParquetInfo::from_chunk(&chunk);
expected.push((path.clone(), metadata.clone())); expected.push(chunk);
let mut t = catalog.open_transaction().await; let mut t = catalog.open_transaction().await;
// "real" modifications // "real" modifications
let info = CatalogParquetInfo {
path,
file_size_bytes: 33,
metadata: Arc::new(metadata),
};
state.insert(info.clone()).unwrap(); state.insert(info.clone()).unwrap();
t.add_parquet(&info); t.add_parquet(&info);
let (path, _) = expected.remove(0); let chunk = expected.remove(0);
state.remove(&path).unwrap(); state.remove(chunk.path()).unwrap();
t.remove_parquet(&path); t.remove_parquet(chunk.path());
t.commit().await.unwrap(); t.commit().await.unwrap();
} }
assert_eq!(catalog.revision_counter(), 2); assert_eq!(catalog.revision_counter(), 2);
assert_catalog_parquet_files(&state, &expected); assert_catalog_chunks(&state, &expected);
trace.record(&catalog, &state, false); trace.record(&catalog, &state, false);
// uncommitted modifications have no effect // uncommitted modifications have no effect
{ {
let mut t = catalog.open_transaction().await; let mut t = catalog.open_transaction().await;
let (path, metadata) = let (chunk, _) = generator.generate().await;
make_metadata(iox_object_store, "foo", chunk_addr(1), TestSize::Full).await; let info = CatalogParquetInfo::from_chunk(&chunk);
let info = CatalogParquetInfo {
path,
file_size_bytes: 33,
metadata: Arc::new(metadata),
};
t.add_parquet(&info); t.add_parquet(&info);
t.remove_parquet(&expected[0].0); t.remove_parquet(expected[0].path());
// NO commit here! // NO commit here!
} }
assert_eq!(catalog.revision_counter(), 2); assert_eq!(catalog.revision_counter(), 2);
assert_catalog_parquet_files(&state, &expected); assert_catalog_chunks(&state, &expected);
trace.record(&catalog, &state, true); trace.record(&catalog, &state, true);
trace trace

View File

@ -222,7 +222,7 @@ impl Debug for Metadata {
mod tests { mod tests {
use super::*; use super::*;
use crate::{core::PreservedCatalog, interface::CatalogParquetInfo, test_helpers::make_config}; use crate::{core::PreservedCatalog, interface::CatalogParquetInfo, test_helpers::make_config};
use parquet_file::test_utils::{chunk_addr, make_metadata, TestSize}; use parquet_file::test_utils::generator::{ChunkGenerator, GeneratorConfig};
use time::Time; use time::Time;
use uuid::Uuid; use uuid::Uuid;
@ -235,21 +235,15 @@ mod tests {
.with_time_provider(time_provider); .with_time_provider(time_provider);
let iox_object_store = &config.iox_object_store; let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
generator.set_config(GeneratorConfig::Simple);
// build catalog with some data // build catalog with some data
let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap(); let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap();
{ {
let (chunk, _) = generator.generate().await;
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction().await;
transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
let (path, metadata) =
make_metadata(iox_object_store, "foo", chunk_addr(0), TestSize::Minimal).await;
let info = CatalogParquetInfo {
path,
file_size_bytes: 33,
metadata: Arc::new(metadata),
};
transaction.add_parquet(&info);
transaction.commit().await.unwrap(); transaction.commit().await.unwrap();
} }
@ -304,11 +298,11 @@ File {
"table1", "table1",
"part1", "part1",
], ],
file_name: "00000000-0000-0000-0000-000000000000.parquet", file_name: "00000000-0000-0000-0000-000000000001.parquet",
}, },
), ),
file_size_bytes: 33, file_size_bytes: 3052,
metadata: b"metadata omitted (937 bytes)", metadata: b"metadata omitted (935 bytes)",
}, },
), ),
), ),
@ -352,21 +346,15 @@ File {
.with_fixed_uuid(Uuid::nil()) .with_fixed_uuid(Uuid::nil())
.with_time_provider(time_provider); .with_time_provider(time_provider);
let iox_object_store = &config.iox_object_store; let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
generator.set_config(GeneratorConfig::Simple);
// build catalog with some data // build catalog with some data
let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap(); let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap();
{ {
let (chunk, _) = generator.generate().await;
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction().await;
transaction.add_parquet(&CatalogParquetInfo::from_chunk(&chunk));
let (path, metadata) =
make_metadata(iox_object_store, "foo", chunk_addr(0), TestSize::Minimal).await;
let info = CatalogParquetInfo {
path,
file_size_bytes: 33,
metadata: Arc::new(metadata),
};
transaction.add_parquet(&info);
transaction.commit().await.unwrap(); transaction.commit().await.unwrap();
} }
@ -426,11 +414,11 @@ File {
"table1", "table1",
"part1", "part1",
], ],
file_name: "00000000-0000-0000-0000-000000000000.parquet", file_name: "00000000-0000-0000-0000-000000000001.parquet",
}, },
), ),
file_size_bytes: 33, file_size_bytes: 3052,
metadata: b"metadata omitted (937 bytes)", metadata: b"metadata omitted (935 bytes)",
}, },
), ),
), ),
@ -460,7 +448,7 @@ File {
table_name: "table1", table_name: "table1",
partition_key: "part1", partition_key: "part1",
chunk_id: ChunkId( chunk_id: ChunkId(
0, 1,
), ),
partition_checkpoint: PartitionCheckpoint { partition_checkpoint: PartitionCheckpoint {
table_name: "table1", table_name: "table1",
@ -500,7 +488,7 @@ File {
}, },
}, },
chunk_order: ChunkOrder( chunk_order: ChunkOrder(
5, 1,
), ),
}, },
), ),

View File

@ -7,6 +7,7 @@ use std::{
use data_types::chunk_metadata::{ChunkAddr, ChunkId}; use data_types::chunk_metadata::{ChunkAddr, ChunkId};
use data_types::delete_predicate::DeletePredicate; use data_types::delete_predicate::DeletePredicate;
use iox_object_store::{IoxObjectStore, ParquetFilePath}; use iox_object_store::{IoxObjectStore, ParquetFilePath};
use parquet_file::chunk::ParquetChunk;
use snafu::Snafu; use snafu::Snafu;
use parquet_file::metadata::IoxParquetMetaData; use parquet_file::metadata::IoxParquetMetaData;
@ -24,6 +25,17 @@ pub struct CatalogParquetInfo {
pub metadata: Arc<IoxParquetMetaData>, pub metadata: Arc<IoxParquetMetaData>,
} }
impl CatalogParquetInfo {
/// Creates a [`CatalogParquetInfo`] from a [`ParquetChunk`]
pub fn from_chunk(chunk: &ParquetChunk) -> Self {
Self {
path: chunk.path().clone(),
file_size_bytes: chunk.file_size_bytes(),
metadata: chunk.parquet_metadata(),
}
}
}
/// Same as [ChunkAddr] but w/o the database part. /// Same as [ChunkAddr] but w/o the database part.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ChunkAddrWithoutDatabase { pub struct ChunkAddrWithoutDatabase {

View File

@ -10,11 +10,14 @@ use crate::{
}, },
}; };
use data_types::delete_predicate::{DeleteExpr, DeletePredicate, Op, Scalar}; use data_types::delete_predicate::{DeleteExpr, DeletePredicate, Op, Scalar};
use data_types::{chunk_metadata::ChunkId, timestamp::TimestampRange}; use data_types::{
chunk_metadata::{ChunkAddr, ChunkId},
timestamp::TimestampRange,
};
use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath}; use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath};
use parquet_file::{ use parquet_file::{
metadata::IoxParquetMetaData, chunk::ParquetChunk,
test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize}, test_utils::{generator::ChunkGenerator, make_iox_object_store},
}; };
use snafu::ResultExt; use snafu::ResultExt;
use std::{ use std::{
@ -259,158 +262,107 @@ where
F: Fn(&S) -> CheckpointData + Send, F: Fn(&S) -> CheckpointData + Send,
{ {
let config = make_config().await; let config = make_config().await;
let iox_object_store = &config.iox_object_store;
let mut generator = ChunkGenerator::new_with_store(Arc::clone(iox_object_store));
// The expected state of the catalog // The expected state of the catalog
let mut expected_files: HashMap<ChunkId, (ParquetFilePath, Arc<IoxParquetMetaData>)> = let mut expected_chunks: HashMap<u32, ParquetChunk> = HashMap::new();
HashMap::new();
let mut expected_predicates: HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>> = let mut expected_predicates: HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>> =
HashMap::new(); HashMap::new();
assert_checkpoint(&state, &f, &expected_files, &expected_predicates); assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// add files // add files
{ {
for chunk_id in 0..5 { for chunk_id in 1..5 {
let (path, metadata) = make_metadata( let (chunk, _) = generator.generate_id(chunk_id).await;
&config.iox_object_store,
"ok",
chunk_addr(chunk_id),
TestSize::Full,
)
.await;
state state
.add( .add(
Arc::clone(&config.iox_object_store), Arc::clone(iox_object_store),
CatalogParquetInfo { CatalogParquetInfo::from_chunk(&chunk),
path: path.clone(),
file_size_bytes: 33,
metadata: Arc::new(metadata.clone()),
},
) )
.unwrap(); .unwrap();
expected_files.insert(ChunkId::new_test(chunk_id), (path, Arc::new(metadata))); expected_chunks.insert(chunk_id, chunk);
} }
} }
assert_checkpoint(&state, &f, &expected_files, &expected_predicates); assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// remove files // remove files
{ {
let (path, _) = expected_files.remove(&ChunkId::new_test(1)).unwrap(); let chunk = expected_chunks.remove(&1).unwrap();
state.remove(&path).unwrap(); state.remove(chunk.path()).unwrap();
} }
assert_checkpoint(&state, &f, &expected_files, &expected_predicates); assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// add and remove in the same transaction // add and remove in the same transaction
{ {
let (path, metadata) = make_metadata( let (chunk, _) = generator.generate_id(5).await;
&config.iox_object_store,
"ok",
chunk_addr(5),
TestSize::Full,
)
.await;
state state
.add( .add(
Arc::clone(&config.iox_object_store), Arc::clone(iox_object_store),
CatalogParquetInfo { CatalogParquetInfo::from_chunk(&chunk),
path: path.clone(),
file_size_bytes: 33,
metadata: Arc::new(metadata),
},
) )
.unwrap(); .unwrap();
state.remove(&path).unwrap(); state.remove(chunk.path()).unwrap();
} }
assert_checkpoint(&state, &f, &expected_files, &expected_predicates); assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// remove and add in the same transaction // remove and add in the same transaction
{ {
let (path, metadata) = expected_files.get(&ChunkId::new_test(3)).unwrap(); let chunk = expected_chunks.get(&3).unwrap();
state.remove(path).unwrap(); state.remove(chunk.path()).unwrap();
state state
.add( .add(
Arc::clone(&config.iox_object_store), Arc::clone(iox_object_store),
CatalogParquetInfo { CatalogParquetInfo::from_chunk(chunk),
path: path.clone(),
file_size_bytes: 33,
metadata: Arc::clone(metadata),
},
) )
.unwrap(); .unwrap();
} }
assert_checkpoint(&state, &f, &expected_files, &expected_predicates); assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// add, remove, add in the same transaction // add, remove, add in the same transaction
{ {
let (path, metadata) = make_metadata( let (chunk, _) = generator.generate_id(6).await;
&config.iox_object_store,
"ok",
chunk_addr(6),
TestSize::Full,
)
.await;
state state
.add( .add(
Arc::clone(&config.iox_object_store), Arc::clone(iox_object_store),
CatalogParquetInfo { CatalogParquetInfo::from_chunk(&chunk),
path: path.clone(),
file_size_bytes: 33,
metadata: Arc::new(metadata.clone()),
},
) )
.unwrap(); .unwrap();
state.remove(&path).unwrap(); state.remove(chunk.path()).unwrap();
state state
.add( .add(
Arc::clone(&config.iox_object_store), Arc::clone(iox_object_store),
CatalogParquetInfo { CatalogParquetInfo::from_chunk(&chunk),
path: path.clone(),
file_size_bytes: 33,
metadata: Arc::new(metadata.clone()),
},
) )
.unwrap(); .unwrap();
expected_files.insert(ChunkId::new_test(6), (path, Arc::new(metadata))); expected_chunks.insert(6, chunk);
} }
assert_checkpoint(&state, &f, &expected_files, &expected_predicates); assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// remove, add, remove in same transaction // remove, add, remove in same transaction
{ {
let (path, metadata) = expected_files.remove(&ChunkId::new_test(4)).unwrap(); let chunk = expected_chunks.remove(&4).unwrap();
state.remove(&path).unwrap(); state.remove(chunk.path()).unwrap();
state state
.add( .add(
Arc::clone(&config.iox_object_store), Arc::clone(iox_object_store),
CatalogParquetInfo { CatalogParquetInfo::from_chunk(&chunk),
path: path.clone(),
file_size_bytes: 33,
metadata: Arc::clone(&metadata),
},
) )
.unwrap(); .unwrap();
state.remove(&path).unwrap(); state.remove(chunk.path()).unwrap();
} }
assert_checkpoint(&state, &f, &expected_files, &expected_predicates); assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// error handling, no real opt // error handling, no real opt
{ {
// TODO: Error handling should disambiguate between chunk collision and filename collision // TODO: Error handling should disambiguate between chunk collision and filename collision
// chunk with same ID already exists (should also not change the metadata) // chunk with same ID already exists (should also not change the metadata)
let (path, metadata) = make_metadata( let (chunk, _) = generator.generate_id(2).await;
&config.iox_object_store,
"fail",
chunk_addr(0),
TestSize::Full,
)
.await;
let err = state let err = state
.add( .add(
Arc::clone(&config.iox_object_store), Arc::clone(iox_object_store),
CatalogParquetInfo { CatalogParquetInfo::from_chunk(&chunk),
path,
file_size_bytes: 33,
metadata: Arc::new(metadata),
},
) )
.unwrap_err(); .unwrap_err();
assert!(matches!( assert!(matches!(
@ -418,21 +370,16 @@ where
CatalogStateAddError::ParquetFileAlreadyExists { .. } CatalogStateAddError::ParquetFileAlreadyExists { .. }
)); ));
} }
assert_checkpoint(&state, &f, &expected_files, &expected_predicates); assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// error handling, still something works // error handling, still something works
{ {
// already exists (should also not change the metadata) // already exists (should also not change the metadata)
let (_, metadata) = expected_files.get(&ChunkId::new_test(0)).unwrap(); let (chunk, _) = generator.generate_id(2).await;
let err = state let err = state
.add( .add(
Arc::clone(&config.iox_object_store), Arc::clone(iox_object_store),
CatalogParquetInfo { CatalogParquetInfo::from_chunk(&chunk),
// Intentionally "incorrect" path
path: ParquetFilePath::new(&chunk_addr(10)),
file_size_bytes: 33,
metadata: Arc::clone(metadata),
},
) )
.unwrap_err(); .unwrap_err();
assert!(matches!( assert!(matches!(
@ -441,97 +388,57 @@ where
)); ));
// this transaction will still work // this transaction will still work
let (path, metadata) = make_metadata( let (chunk, _) = generator.generate_id(7).await;
&config.iox_object_store, let info = CatalogParquetInfo::from_chunk(&chunk);
"ok",
chunk_addr(7),
TestSize::Full,
)
.await;
let metadata = Arc::new(metadata);
state state
.add( .add(Arc::clone(iox_object_store), info.clone())
Arc::clone(&config.iox_object_store),
CatalogParquetInfo {
path: path.clone(),
file_size_bytes: 33,
metadata: Arc::clone(&metadata),
},
)
.unwrap(); .unwrap();
expected_files.insert(ChunkId::new_test(7), (path.clone(), Arc::clone(&metadata))); expected_chunks.insert(7, chunk);
// recently added // recently added
let err = state let err = state.add(Arc::clone(iox_object_store), info).unwrap_err();
.add(
Arc::clone(&config.iox_object_store),
CatalogParquetInfo {
path,
file_size_bytes: 33,
metadata: Arc::clone(&metadata),
},
)
.unwrap_err();
assert!(matches!( assert!(matches!(
err, err,
CatalogStateAddError::ParquetFileAlreadyExists { .. } CatalogStateAddError::ParquetFileAlreadyExists { .. }
)); ));
// this still works // this still works
let (path, _) = expected_files.remove(&ChunkId::new_test(7)).unwrap(); let chunk = expected_chunks.remove(&7).unwrap();
state.remove(&path).unwrap(); state.remove(chunk.path()).unwrap();
// recently removed // recently removed
let err = state.remove(&path).unwrap_err(); let err = state.remove(chunk.path()).unwrap_err();
assert!(matches!( assert!(matches!(
err, err,
CatalogStateRemoveError::ParquetFileDoesNotExist { .. } CatalogStateRemoveError::ParquetFileDoesNotExist { .. }
)); ));
} }
assert_checkpoint(&state, &f, &expected_files, &expected_predicates); assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// add predicates // add predicates
{ {
// create two chunks that we can use for delete predicate // create two chunks that we can use for delete predicate
let chunk_addr_1 = chunk_addr(8); let (chunk, metadata) = generator.generate_id(8).await;
let (path, metadata) = make_metadata( let chunk_addr_1 = ChunkAddr::new(generator.partition(), metadata.chunk_id);
&config.iox_object_store,
"ok",
chunk_addr_1.clone(),
TestSize::Full,
)
.await;
state
.add(
Arc::clone(&config.iox_object_store),
CatalogParquetInfo {
path: path.clone(),
file_size_bytes: 33,
metadata: Arc::new(metadata.clone()),
},
)
.unwrap();
expected_files.insert(chunk_addr_1.chunk_id, (path, Arc::new(metadata)));
let chunk_addr_2 = chunk_addr(9);
let (path, metadata) = make_metadata(
&config.iox_object_store,
"ok",
chunk_addr_2.clone(),
TestSize::Full,
)
.await;
state state
.add( .add(
Arc::clone(&config.iox_object_store), Arc::clone(iox_object_store),
CatalogParquetInfo { CatalogParquetInfo::from_chunk(&chunk),
path: path.clone(),
file_size_bytes: 33,
metadata: Arc::new(metadata.clone()),
},
) )
.unwrap(); .unwrap();
expected_files.insert(chunk_addr_2.chunk_id, (path, Arc::new(metadata))); expected_chunks.insert(8, chunk);
let (chunk, metadata) = generator.generate_id(9).await;
let chunk_addr_2 = ChunkAddr::new(generator.partition(), metadata.chunk_id);
state
.add(
Arc::clone(iox_object_store),
CatalogParquetInfo::from_chunk(&chunk),
)
.unwrap();
expected_chunks.insert(9, chunk);
// first predicate used only a single chunk // first predicate used only a single chunk
let predicate_1 = create_delete_predicate(1); let predicate_1 = create_delete_predicate(1);
@ -546,32 +453,21 @@ where
expected_predicates.insert(predicate_2, chunks_2.into_iter().collect()); expected_predicates.insert(predicate_2, chunks_2.into_iter().collect());
// chunks created afterwards are unaffected // chunks created afterwards are unaffected
let chunk_addr_3 = chunk_addr(10); let (chunk, _) = generator.generate_id(10).await;
let (path, metadata) = make_metadata(
&config.iox_object_store,
"ok",
chunk_addr_3.clone(),
TestSize::Full,
)
.await;
state state
.add( .add(
Arc::clone(&config.iox_object_store), Arc::clone(iox_object_store),
CatalogParquetInfo { CatalogParquetInfo::from_chunk(&chunk),
path: path.clone(),
file_size_bytes: 33,
metadata: Arc::new(metadata.clone()),
},
) )
.unwrap(); .unwrap();
expected_files.insert(chunk_addr_3.chunk_id, (path, Arc::new(metadata))); expected_chunks.insert(10, chunk);
} }
assert_checkpoint(&state, &f, &expected_files, &expected_predicates); assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// removing a chunk will also remove its predicates // removing a chunk will also remove its predicates
{ {
let (path, _) = expected_files.remove(&ChunkId::new_test(8)).unwrap(); let chunk = expected_chunks.remove(&8).unwrap();
state.remove(&path).unwrap(); state.remove(chunk.path()).unwrap();
expected_predicates = expected_predicates expected_predicates = expected_predicates
.into_iter() .into_iter()
.filter_map(|(predicate, chunks)| { .filter_map(|(predicate, chunks)| {
@ -583,7 +479,7 @@ where
}) })
.collect(); .collect();
} }
assert_checkpoint(&state, &f, &expected_files, &expected_predicates); assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
// Registering predicates for unknown chunks is just ignored because chunks might been in "persisting" intermediate // Registering predicates for unknown chunks is just ignored because chunks might been in "persisting" intermediate
// state while the predicate was reported. // state while the predicate was reported.
@ -596,30 +492,30 @@ where
}]; }];
state.delete_predicate(Arc::clone(&predicate), chunks); state.delete_predicate(Arc::clone(&predicate), chunks);
} }
assert_checkpoint(&state, &f, &expected_files, &expected_predicates); assert_checkpoint(&state, &f, &expected_chunks, &expected_predicates);
} }
/// Assert that tracked files and their linked metadata are equal. /// Assert that tracked files and their linked metadata are equal.
fn assert_checkpoint<S, F>( fn assert_checkpoint<S, F>(
state: &S, state: &S,
f: &F, f: &F,
expected_files: &HashMap<ChunkId, (ParquetFilePath, Arc<IoxParquetMetaData>)>, expected_chunks: &HashMap<u32, ParquetChunk>,
expected_predicates: &HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>>, expected_predicates: &HashMap<Arc<DeletePredicate>, HashSet<ChunkAddrWithoutDatabase>>,
) where ) where
F: Fn(&S) -> CheckpointData, F: Fn(&S) -> CheckpointData,
{ {
let data = f(state); let data: CheckpointData = f(state);
let actual_files = data.files; let actual_files = data.files;
let sorted_keys_actual = get_sorted_keys(actual_files.keys()); let sorted_keys_actual = get_sorted_keys(actual_files.keys());
let sorted_keys_expected = get_sorted_keys(expected_files.values().map(|(path, _)| path)); let sorted_keys_expected = get_sorted_keys(expected_chunks.values().map(|chunk| chunk.path()));
assert_eq!(sorted_keys_actual, sorted_keys_expected); assert_eq!(sorted_keys_actual, sorted_keys_expected);
for (path, md_expected) in expected_files.values() { for chunk in expected_chunks.values() {
let md_actual = &actual_files[path].metadata; let md_actual = &actual_files[chunk.path()].metadata;
let md_actual = md_actual.decode().unwrap(); let md_actual = md_actual.decode().unwrap();
let md_expected = md_expected.decode().unwrap(); let md_expected = chunk.parquet_metadata().decode().unwrap();
let iox_md_actual = md_actual.read_iox_metadata().unwrap(); let iox_md_actual = md_actual.read_iox_metadata().unwrap();
let iox_md_expected = md_expected.read_iox_metadata().unwrap(); let iox_md_expected = md_expected.read_iox_metadata().unwrap();

View File

@ -1,6 +1,5 @@
use crate::{ use crate::{
chunk::{self, ChunkMetrics, ParquetChunk}, chunk::{self, ParquetChunk},
metadata::{IoxMetadata, IoxParquetMetaData},
storage::Storage, storage::Storage,
}; };
use arrow::{ use arrow::{
@ -12,12 +11,9 @@ use arrow::{
record_batch::RecordBatch, record_batch::RecordBatch,
}; };
use data_types::{ use data_types::{
chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
server_id::ServerId, server_id::ServerId,
}; };
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream;
use futures::TryStreamExt; use futures::TryStreamExt;
use iox_object_store::{IoxObjectStore, ParquetFilePath}; use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::ObjectStore; use object_store::ObjectStore;
@ -100,102 +96,6 @@ pub async fn load_parquet_from_store_for_path(
Ok(parquet_data) Ok(parquet_data)
} }
/// The db name to use for testing
pub fn db_name() -> &'static str {
"db1"
}
/// Creates a test chunk address for a given chunk id
pub fn chunk_addr(id: u128) -> ChunkAddr {
ChunkAddr {
db_name: Arc::from(db_name()),
table_name: Arc::from("table1"),
partition_key: Arc::from("part1"),
chunk_id: ChunkId::new_test(id),
}
}
/// Same as [`make_chunk`] but parquet file does not contain any row group.
///
/// TODO(raphael): Replace with ChunkGenerator
pub async fn make_chunk(
iox_object_store: Arc<IoxObjectStore>,
column_prefix: &str,
addr: ChunkAddr,
test_size: TestSize,
) -> ParquetChunk {
let (record_batches, schema, column_summaries, _num_rows) =
make_record_batch(column_prefix, test_size);
make_chunk_given_record_batch(
iox_object_store,
record_batches,
schema,
addr,
column_summaries,
)
.await
}
/// Create a test chunk by writing data to object store.
///
/// TODO: This code creates a chunk that isn't hooked up with metrics
///
/// TODO(raphael): Replace with ChunkGenerator
async fn make_chunk_given_record_batch(
iox_object_store: Arc<IoxObjectStore>,
record_batches: Vec<RecordBatch>,
schema: Schema,
addr: ChunkAddr,
column_summaries: Vec<ColumnSummary>,
) -> ParquetChunk {
let storage = Storage::new(Arc::clone(&iox_object_store));
let table_summary = TableSummary {
name: addr.table_name.to_string(),
columns: column_summaries,
};
let stream: SendableRecordBatchStream = if record_batches.is_empty() {
Box::pin(MemoryStream::new_with_schema(
record_batches,
Arc::clone(schema.inner()),
))
} else {
Box::pin(MemoryStream::new(record_batches))
};
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
Arc::clone(&addr.table_name),
Arc::clone(&addr.partition_key),
);
let metadata = IoxMetadata {
creation_timestamp: Time::from_timestamp(10, 20),
table_name: Arc::clone(&addr.table_name),
partition_key: Arc::clone(&addr.partition_key),
chunk_id: addr.chunk_id,
partition_checkpoint,
database_checkpoint,
time_of_first_write: Time::from_timestamp(30, 40),
time_of_last_write: Time::from_timestamp(50, 60),
chunk_order: ChunkOrder::new(5).unwrap(),
};
let (path, file_size_bytes, parquet_metadata) = storage
.write_to_object_store(addr.clone(), stream, metadata)
.await
.unwrap();
let rows = parquet_metadata.decode().unwrap().row_count();
ParquetChunk::new_from_parts(
addr.partition_key,
Arc::new(table_summary),
Arc::new(schema),
&path,
Arc::clone(&iox_object_store),
file_size_bytes,
Arc::new(parquet_metadata),
rows,
ChunkMetrics::new_unregistered(),
)
}
fn create_column_tag( fn create_column_tag(
name: &str, name: &str,
data: Vec<Vec<Option<&str>>>, data: Vec<Vec<Option<&str>>>,
@ -888,25 +788,6 @@ pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec<u8>) ->
record_batches record_batches
} }
/// Create test metadata by creating a parquet file and reading it back into memory.
///
/// TODO(raphael): Replace with ChunkGenerator
pub async fn make_metadata(
iox_object_store: &Arc<IoxObjectStore>,
column_prefix: &str,
addr: ChunkAddr,
test_size: TestSize,
) -> (ParquetFilePath, IoxParquetMetaData) {
let chunk = make_chunk(Arc::clone(iox_object_store), column_prefix, addr, test_size).await;
let parquet_data = load_parquet_from_store(&chunk, Arc::clone(iox_object_store))
.await
.unwrap();
(
chunk.path().clone(),
IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(),
)
}
/// Create [`PartitionCheckpoint`] and [`DatabaseCheckpoint`] for testing. /// Create [`PartitionCheckpoint`] and [`DatabaseCheckpoint`] for testing.
pub fn create_partition_and_database_checkpoint( pub fn create_partition_and_database_checkpoint(
table_name: Arc<str>, table_name: Arc<str>,

View File

@ -62,19 +62,24 @@ impl ChunkGenerator {
self.config = config; self.config = config;
} }
fn next_chunk(&mut self) -> (ChunkId, ChunkOrder) { pub fn partition(&self) -> &PartitionAddr {
let t = self.next_chunk; &self.partition
self.next_chunk += 1;
(ChunkId::new_test(t as _), ChunkOrder::new(t).unwrap())
} }
pub async fn generate(&mut self) -> (ParquetChunk, IoxMetadata) { pub async fn generate(&mut self) -> (ParquetChunk, IoxMetadata) {
let id = self.next_chunk;
self.next_chunk += 1;
self.generate_id(id).await
}
pub async fn generate_id(&mut self, id: u32) -> (ParquetChunk, IoxMetadata) {
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint( let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
Arc::clone(&self.partition.table_name), Arc::clone(&self.partition.table_name),
Arc::clone(&self.partition.partition_key), Arc::clone(&self.partition.partition_key),
); );
let (chunk_id, chunk_order) = self.next_chunk(); let chunk_id = ChunkId::new_test(id as _);
let chunk_order = ChunkOrder::new(id).unwrap();
let chunk_addr = ChunkAddr::new(&self.partition, chunk_id); let chunk_addr = ChunkAddr::new(&self.partition, chunk_id);
let metadata = IoxMetadata { let metadata = IoxMetadata {

View File

@ -13,7 +13,7 @@ schema = { path = "../schema" }
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
ordered-float = "2" ordered-float = "2"
regex = "1" regex = "1"
serde_json = "1.0.70" serde_json = "1.0.71"
snafu = "0.6.9" snafu = "0.6.9"
sqlparser = "0.12.0" sqlparser = "0.12.0"

View File

@ -53,7 +53,7 @@ use write_buffer::core::{WriteBufferReading, WriteBufferWriting};
pub(crate) use crate::db::chunk::DbChunk; pub(crate) use crate::db::chunk::DbChunk;
pub(crate) use crate::db::lifecycle::ArcDb; pub(crate) use crate::db::lifecycle::ArcDb;
use crate::db::write::{WriteFilter, WriteFilterNone}; use crate::db::write::{DeleteFilter, DeleteFilterNone, WriteFilter, WriteFilterNone};
use crate::{ use crate::{
db::{ db::{
access::QueryCatalogAccess, access::QueryCatalogAccess,
@ -522,18 +522,27 @@ impl Db {
/// Store a delete /// Store a delete
pub fn store_delete(&self, delete: &DmlDelete) -> Result<()> { pub fn store_delete(&self, delete: &DmlDelete) -> Result<()> {
self.store_filtered_delete(delete, DeleteFilterNone::default())
}
/// Store a delete with the provided [`DeleteFilter`]
pub fn store_filtered_delete(
&self,
delete: &DmlDelete,
filter: impl DeleteFilter,
) -> Result<()> {
let predicate = Arc::new(delete.predicate().clone()); let predicate = Arc::new(delete.predicate().clone());
match delete.table_name() { match delete.table_name() {
None => { None => {
// Note: This assumes tables cannot be removed from the catalog and therefore // Note: This assumes tables cannot be removed from the catalog and therefore
// this lock gap is not problematic // this lock gap is not problematic
for table_name in self.catalog.table_names() { for table_name in self.catalog.table_names() {
self.delete(&table_name, Arc::clone(&predicate)) self.delete_filtered(&table_name, Arc::clone(&predicate), filter)
.expect("table exists") .expect("table exists")
} }
Ok(()) Ok(())
} }
Some(table_name) => self.delete(table_name, predicate), Some(table_name) => self.delete_filtered(table_name, predicate, filter),
} }
} }
@ -541,6 +550,15 @@ impl Db {
/// ///
/// Returns an error if the table cannot be found in the catalog /// Returns an error if the table cannot be found in the catalog
pub fn delete(&self, table_name: &str, delete_predicate: Arc<DeletePredicate>) -> Result<()> { pub fn delete(&self, table_name: &str, delete_predicate: Arc<DeletePredicate>) -> Result<()> {
self.delete_filtered(table_name, delete_predicate, DeleteFilterNone::default())
}
fn delete_filtered(
&self,
table_name: &str,
delete_predicate: Arc<DeletePredicate>,
filter: impl DeleteFilter,
) -> Result<()> {
// collect delete predicates on preserved partitions for a catalog transaction // collect delete predicates on preserved partitions for a catalog transaction
let mut affected_persisted_chunks = vec![]; let mut affected_persisted_chunks = vec![];
@ -558,6 +576,10 @@ impl Db {
for chunk in chunks { for chunk in chunks {
// save the delete predicate in the chunk // save the delete predicate in the chunk
let mut chunk = chunk.write(); let mut chunk = chunk.write();
if !filter.filter_chunk(&chunk) {
continue;
}
chunk.add_delete_predicate(Arc::clone(&delete_predicate)); chunk.add_delete_predicate(Arc::clone(&delete_predicate));
// We should only report persisted chunks or chunks that are currently being persisted, because the // We should only report persisted chunks or chunks that are currently being persisted, because the

View File

@ -917,12 +917,7 @@ mod tests {
use data_types::{delete_predicate::DeleteExpr, timestamp::TimestampRange}; use data_types::{delete_predicate::DeleteExpr, timestamp::TimestampRange};
use mutable_buffer::test_helpers::write_lp_to_new_chunk; use mutable_buffer::test_helpers::write_lp_to_new_chunk;
use parquet_file::{ use parquet_file::test_utils::generator::{ChunkGenerator, GeneratorConfig};
chunk::ParquetChunk,
test_utils::{
make_chunk as make_parquet_chunk_with_store, make_iox_object_store, TestSize,
},
};
#[test] #[test]
fn test_new_open() { fn test_new_open() {
@ -946,7 +941,7 @@ mod tests {
let mut chunk = make_persisted_chunk().await; let mut chunk = make_persisted_chunk().await;
assert_eq!( assert_eq!(
chunk.freeze().unwrap_err().to_string(), chunk.freeze().unwrap_err().to_string(),
"Internal Error: unexpected chunk state for Chunk('db':'table1':'part1':00000000-0000-0000-0000-000000000000) \ "Internal Error: unexpected chunk state for Chunk('db1':'table1':'part1':00000000-0000-0000-0000-000000000001) \
during setting closed. Expected Open or Frozen, got Persisted" during setting closed. Expected Open or Frozen, got Persisted"
); );
} }
@ -1132,11 +1127,6 @@ mod tests {
write_lp_to_new_chunk(&format!("{} bar=1 10", table_name)) write_lp_to_new_chunk(&format!("{} bar=1 10", table_name))
} }
async fn make_parquet_chunk(addr: ChunkAddr) -> ParquetChunk {
let iox_object_store = make_iox_object_store().await;
make_parquet_chunk_with_store(iox_object_store, "foo", addr, TestSize::Full).await
}
fn chunk_addr() -> ChunkAddr { fn chunk_addr() -> ChunkAddr {
ChunkAddr { ChunkAddr {
db_name: Arc::from("db"), db_name: Arc::from("db"),
@ -1160,11 +1150,12 @@ mod tests {
} }
async fn make_persisted_chunk() -> CatalogChunk { async fn make_persisted_chunk() -> CatalogChunk {
let addr = chunk_addr(); let mut generator = ChunkGenerator::new().await;
let now = Time::from_timestamp_nanos(43564); generator.set_config(GeneratorConfig::NoData);
let (parquet_chunk, metadata) = generator.generate().await;
let addr = ChunkAddr::new(generator.partition(), metadata.chunk_id);
// assemble ParquetChunk let now = Time::from_timestamp_nanos(43564);
let parquet_chunk = make_parquet_chunk(addr.clone()).await;
CatalogChunk::new_object_store_only( CatalogChunk::new_object_store_only(
addr, addr,

View File

@ -18,7 +18,8 @@ use snafu::{ResultExt, Snafu};
use time::Time; use time::Time;
use write_buffer::core::WriteBufferReading; use write_buffer::core::WriteBufferReading;
use crate::db::write::WriteFilter; use crate::db::catalog::chunk::{CatalogChunk, ChunkStage};
use crate::db::write::{DeleteFilter, WriteFilter};
use crate::Db; use crate::Db;
#[allow(clippy::enum_variant_names)] #[allow(clippy::enum_variant_names)]
@ -243,8 +244,7 @@ pub async fn perform_replay(
for n_try in 1..=n_tries { for n_try in 1..=n_tries {
let result = match &dml_operation { let result = match &dml_operation {
DmlOperation::Write(write) => db.store_filtered_write(write, filter), DmlOperation::Write(write) => db.store_filtered_write(write, filter),
// TODO: Only apply delete to unpersisted chunks (#3125) DmlOperation::Delete(delete) => db.store_filtered_delete(delete, filter),
DmlOperation::Delete(delete) => db.store_delete(delete),
}; };
match result { match result {
@ -370,6 +370,19 @@ impl<'a> WriteFilter for ReplayFilter<'a> {
} }
} }
impl<'a> DeleteFilter for ReplayFilter<'a> {
fn filter_chunk(&self, chunk: &CatalogChunk) -> bool {
// The persist lifecycle action MUST persist any outstanding delete predicates
//
// As such deletes should only be applied to unpersisted chunks - i.e.
// those containing data from the in-progress replay operation
//
// This avoids a situation where a delete could be applied to a chunk containing
// data from writes sequenced after the delete being replayed
!matches!(chunk.stage(), ChunkStage::Persisted { .. })
}
}
/// Where is a given sequence number and the entire data batch associated with it compared to the range of persisted and /// Where is a given sequence number and the entire data batch associated with it compared to the range of persisted and
/// partially persisted sequence numbers (extracted from partition checkpoint). /// partially persisted sequence numbers (extracted from partition checkpoint).
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -2664,6 +2677,60 @@ mod tests {
.await; .await;
} }
#[tokio::test]
async fn replay_delete_persisted_chunks() {
ReplayTest {
steps: vec![
Step::Ingest(vec![TestSequencedEntry {
sequencer_id: 0,
sequence_number: 0,
lp: "table_1,tag_partition_by=a bar=10 10",
}]),
Step::Delete(vec![TestDelete {
sequencer_id: 0,
sequence_number: 1,
table_name: None,
predicate: DeletePredicate {
range: TimestampRange { start: 0, end: 11 },
exprs: vec![],
},
}]),
Step::Ingest(vec![TestSequencedEntry {
sequencer_id: 0,
sequence_number: 2,
lp: "table_1,tag_partition_by=b bar=20 10",
}]),
Step::Await(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 20 | b | 1970-01-01T00:00:00.000000010Z |",
"+-----+------------------+--------------------------------+",
],
)]),
Step::MakeWritesPersistable,
Step::Persist(vec![("table_1", "tag_partition_by_b")]),
Step::Restart,
Step::Replay,
Step::Assert(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 20 | b | 1970-01-01T00:00:00.000000010Z |",
"+-----+------------------+--------------------------------+",
],
)]),
],
..Default::default()
}
.run()
.await;
}
#[tokio::test] #[tokio::test]
async fn replay_fail_sequencers_change() { async fn replay_fail_sequencers_change() {
// create write buffer w/ sequencer 0 and 1 // create write buffer w/ sequencer 0 and 1

View File

@ -1,3 +1,4 @@
use crate::db::catalog::chunk::CatalogChunk;
use mutable_batch::PartitionWrite; use mutable_batch::PartitionWrite;
/// A [`WriteFilter`] provides the ability to mask rows from a [`PartitionWrite`] /// A [`WriteFilter`] provides the ability to mask rows from a [`PartitionWrite`]
@ -27,3 +28,21 @@ impl WriteFilter for WriteFilterNone {
Some(write) Some(write)
} }
} }
/// A [`DeleteFilter`] provides the ability to exclude chunks from having a delete applied
///
/// This is important for replay where it needs to prevent deletes from being applied to chunks
/// containing writes sequenced after the delete
pub trait DeleteFilter: Copy {
/// Returns true if the delete should be applied to this chunk
fn filter_chunk(&self, chunk: &CatalogChunk) -> bool;
}
#[derive(Debug, Default, Copy, Clone)]
pub struct DeleteFilterNone {}
impl DeleteFilter for DeleteFilterNone {
fn filter_chunk(&self, _chunk: &CatalogChunk) -> bool {
true
}
}