refactor: Remove `Result` in QueryDatabase trait (none of the functions can fail) (#3422)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
8d732ad78c
commit
336ffd1966
|
@ -5,7 +5,6 @@ use super::{
|
|||
catalog::{Catalog, TableNameFilter},
|
||||
chunk::DbChunk,
|
||||
query_log::QueryLog,
|
||||
Error, Result,
|
||||
};
|
||||
use crate::system_tables;
|
||||
use async_trait::async_trait;
|
||||
|
@ -213,7 +212,6 @@ impl PruningObserver for ChunkAccess {
|
|||
|
||||
#[async_trait]
|
||||
impl QueryDatabase for QueryCatalogAccess {
|
||||
type Error = Error;
|
||||
type Chunk = DbChunk;
|
||||
|
||||
/// Return a covering set of chunks for a particular partition
|
||||
|
@ -221,12 +219,12 @@ impl QueryDatabase for QueryCatalogAccess {
|
|||
self.chunk_access.candidate_chunks(predicate)
|
||||
}
|
||||
|
||||
fn partition_addrs(&self) -> Result<Vec<PartitionAddr>, Self::Error> {
|
||||
Ok(self.catalog.partition_addrs())
|
||||
fn partition_addrs(&self) -> Vec<PartitionAddr> {
|
||||
self.catalog.partition_addrs()
|
||||
}
|
||||
|
||||
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> {
|
||||
Ok(self.catalog.chunk_summaries())
|
||||
fn chunk_summaries(&self) -> Vec<ChunkSummary> {
|
||||
self.catalog.chunk_summaries()
|
||||
}
|
||||
|
||||
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
|
||||
|
|
|
@ -1195,18 +1195,17 @@ impl Db {
|
|||
/// can just use Db as a `Database` even though the implementation
|
||||
/// lives in `catalog_access`
|
||||
impl QueryDatabase for Db {
|
||||
type Error = Error;
|
||||
type Chunk = DbChunk;
|
||||
|
||||
fn chunks(&self, predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
|
||||
self.catalog_access.chunks(predicate)
|
||||
}
|
||||
|
||||
fn partition_addrs(&self) -> Result<Vec<PartitionAddr>, Self::Error> {
|
||||
fn partition_addrs(&self) -> Vec<PartitionAddr> {
|
||||
self.catalog_access.partition_addrs()
|
||||
}
|
||||
|
||||
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> {
|
||||
fn chunk_summaries(&self) -> Vec<ChunkSummary> {
|
||||
self.catalog_access.chunk_summaries()
|
||||
}
|
||||
|
||||
|
@ -2486,7 +2485,7 @@ mod tests {
|
|||
// write into a separate partitiion
|
||||
write_lp(&db, "cpu bar=1,baz2,frob=3 400000000000000");
|
||||
|
||||
print!("Partitions: {:?}", db.partition_addrs().unwrap());
|
||||
print!("Partitions: {:?}", db.partition_addrs());
|
||||
|
||||
let chunk_summaries = db.partition_chunk_summaries("1970-01-05T15");
|
||||
|
||||
|
@ -2507,7 +2506,6 @@ mod tests {
|
|||
|
||||
let size: usize = db
|
||||
.chunk_summaries()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|x| x.memory_bytes)
|
||||
.sum();
|
||||
|
@ -2534,7 +2532,7 @@ mod tests {
|
|||
let t_second_write = time.inc(Duration::from_secs(2));
|
||||
write_lp(&db, "cpu bar=2 2");
|
||||
|
||||
let mut chunk_summaries = db.chunk_summaries().unwrap();
|
||||
let mut chunk_summaries = db.chunk_summaries();
|
||||
|
||||
chunk_summaries.sort_by_key(|s| s.id);
|
||||
|
||||
|
@ -2595,7 +2593,7 @@ mod tests {
|
|||
write_lp(&db, "cpu bar=1,baz=2 2");
|
||||
|
||||
// Check first/last write times on the chunks at this point
|
||||
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
|
||||
let mut chunk_summaries = db.chunk_summaries();
|
||||
chunk_summaries.sort_unstable();
|
||||
assert_eq!(chunk_summaries.len(), 2);
|
||||
// Each chunk has one write, so both chunks should have first write == last write
|
||||
|
@ -2612,7 +2610,7 @@ mod tests {
|
|||
write_lp(&db, "cpu bar=1,baz=2,frob=3 400000000000000");
|
||||
|
||||
// Check first/last write times on the chunks at this point
|
||||
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
|
||||
let mut chunk_summaries = db.chunk_summaries();
|
||||
chunk_summaries.sort_unstable();
|
||||
assert_eq!(chunk_summaries.len(), 3);
|
||||
// The closed chunk's times should be the same
|
||||
|
@ -2636,7 +2634,7 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
// Check first/last write times on the chunks at this point
|
||||
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
|
||||
let mut chunk_summaries = db.chunk_summaries();
|
||||
chunk_summaries.sort_unstable();
|
||||
assert_eq!(chunk_summaries.len(), 3);
|
||||
// The rb chunk's times should be the same as they were when this was the closed mb chunk
|
||||
|
@ -2661,7 +2659,7 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
// Check first/last write times on the chunks at this point
|
||||
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
|
||||
let mut chunk_summaries = db.chunk_summaries();
|
||||
chunk_summaries.sort_unstable();
|
||||
// Persisting compacts chunks, so now there's only 2
|
||||
assert_eq!(chunk_summaries.len(), 2);
|
||||
|
@ -2682,7 +2680,7 @@ mod tests {
|
|||
db.rollover_partition("cpu", "1970-01-05T15").await.unwrap();
|
||||
|
||||
// Check first/last write times on the chunks at this point
|
||||
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
|
||||
let mut chunk_summaries = db.chunk_summaries();
|
||||
chunk_summaries.sort_unstable();
|
||||
assert_eq!(chunk_summaries.len(), 2);
|
||||
// The rb chunk's times should still be the same
|
||||
|
@ -2700,7 +2698,7 @@ mod tests {
|
|||
write_lp(&db, "cpu bar=1,baz=3,blargh=3 400000000000000");
|
||||
|
||||
// Check first/last write times on the chunks at this point
|
||||
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
|
||||
let mut chunk_summaries = db.chunk_summaries();
|
||||
chunk_summaries.sort_unstable();
|
||||
assert_eq!(chunk_summaries.len(), 3);
|
||||
// The rb chunk's times should still be the same
|
||||
|
@ -2806,7 +2804,7 @@ mod tests {
|
|||
write_lp(&db, "cpu bar=1 400000000000000");
|
||||
write_lp(&db, "mem frob=3 400000000000001");
|
||||
|
||||
print!("Partitions: {:?}", db.partition_addrs().unwrap());
|
||||
print!("Partitions: {:?}", db.partition_addrs());
|
||||
|
||||
let partition_summaries = vec![
|
||||
db.partition_summary("cpu", "1970-01-01T00").unwrap(),
|
||||
|
@ -3630,7 +3628,7 @@ mod tests {
|
|||
|
||||
write_lp(db.as_ref(), "cpu foo=1 10");
|
||||
|
||||
let chunks = db.chunk_summaries().unwrap();
|
||||
let chunks = db.chunk_summaries();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0].time_of_first_write, t0);
|
||||
assert_eq!(chunks[0].time_of_last_write, t0);
|
||||
|
@ -3640,7 +3638,7 @@ mod tests {
|
|||
|
||||
run_query(Arc::clone(&db), "select * from cpu").await;
|
||||
|
||||
let chunks = db.chunk_summaries().unwrap();
|
||||
let chunks = db.chunk_summaries();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0].time_of_first_write, t0);
|
||||
assert_eq!(chunks[0].time_of_last_write, t0);
|
||||
|
@ -3650,7 +3648,7 @@ mod tests {
|
|||
|
||||
write_lp(db.as_ref(), "cpu foo=1 20");
|
||||
|
||||
let chunks = db.chunk_summaries().unwrap();
|
||||
let chunks = db.chunk_summaries();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0].time_of_first_write, t0);
|
||||
assert_eq!(chunks[0].time_of_last_write, t2);
|
||||
|
@ -3661,7 +3659,7 @@ mod tests {
|
|||
// This chunk should be pruned out and therefore not accessed by the query
|
||||
run_query(Arc::clone(&db), "select * from cpu where foo = 2;").await;
|
||||
|
||||
let chunks = db.chunk_summaries().unwrap();
|
||||
let chunks = db.chunk_summaries();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0].time_of_first_write, t0);
|
||||
assert_eq!(chunks[0].time_of_last_write, t2);
|
||||
|
@ -3700,7 +3698,6 @@ mod tests {
|
|||
|
||||
fn partition_keys(db: &Db) -> Vec<String> {
|
||||
db.partition_addrs()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|addr| addr.partition_key.to_string())
|
||||
.collect()
|
||||
|
|
|
@ -192,7 +192,7 @@ mod tests {
|
|||
let t_last_write = time.inc(Duration::from_secs(1));
|
||||
write_lp(db.as_ref(), "cpu,tag1=bongo,tag2=a bar=2 10");
|
||||
|
||||
let partition_addrs = db.partition_addrs().unwrap();
|
||||
let partition_addrs = db.partition_addrs();
|
||||
assert_eq!(partition_addrs.len(), 1);
|
||||
|
||||
let partition = db
|
||||
|
@ -250,7 +250,7 @@ mod tests {
|
|||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=3 23");
|
||||
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=2 26");
|
||||
|
||||
let partition_addrs = db.partition_addrs().unwrap();
|
||||
let partition_addrs = db.partition_addrs();
|
||||
assert_eq!(partition_addrs.len(), 1);
|
||||
|
||||
// Cannot simply use empty predicate (#2687)
|
||||
|
@ -308,7 +308,7 @@ mod tests {
|
|||
db.delete("cpu", Arc::clone(&pred2)).unwrap();
|
||||
|
||||
// start compaction job (but don't poll the future yet)
|
||||
let partition_keys = db.partition_addrs().unwrap();
|
||||
let partition_keys = db.partition_addrs();
|
||||
assert_eq!(partition_keys.len(), 1);
|
||||
let partition_key: &str = &partition_keys[0].partition_key;
|
||||
|
||||
|
|
|
@ -89,21 +89,21 @@ mod tests {
|
|||
.unwrap();
|
||||
let chunk_id = chunk.id();
|
||||
|
||||
let chunks = db.chunk_summaries().unwrap();
|
||||
let chunks = db.chunk_summaries();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0].storage, ChunkStorage::ReadBufferAndObjectStore);
|
||||
|
||||
db.unload_read_buffer("cpu", partition_key, chunk_id)
|
||||
.unwrap();
|
||||
|
||||
let chunks = db.chunk_summaries().unwrap();
|
||||
let chunks = db.chunk_summaries();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0].storage, ChunkStorage::ObjectStoreOnly);
|
||||
|
||||
let chunk = db.lockable_chunk("cpu", partition_key, chunk_id).unwrap();
|
||||
load_chunk(chunk.write()).unwrap().1.await.unwrap().unwrap();
|
||||
|
||||
let chunks = db.chunk_summaries().unwrap();
|
||||
let chunks = db.chunk_summaries();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0].storage, ChunkStorage::ReadBufferAndObjectStore);
|
||||
|
||||
|
|
|
@ -571,7 +571,6 @@ mod tests {
|
|||
|
||||
fn partition_keys(db: &Db) -> Vec<String> {
|
||||
db.partition_addrs()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|addr| addr.partition_key.to_string())
|
||||
.collect()
|
||||
|
|
|
@ -192,7 +192,6 @@ pub async fn make_db_time() -> (Arc<Db>, Arc<time::MockProvider>) {
|
|||
|
||||
fn chunk_summary_iter(db: &Db) -> impl Iterator<Item = ChunkSummary> + '_ {
|
||||
db.partition_addrs()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.flat_map(move |addr| db.partition_chunk_summaries(addr.partition_key.as_ref()))
|
||||
}
|
||||
|
|
|
@ -169,12 +169,8 @@ impl management_service_server::ManagementService for ManagementService {
|
|||
.db(&db_name)
|
||||
.map_err(default_server_error_handler)?;
|
||||
|
||||
let chunk_summaries = match db.chunk_summaries() {
|
||||
Ok(chunk_summaries) => chunk_summaries,
|
||||
Err(e) => return Err(default_db_error_handler(e)),
|
||||
};
|
||||
|
||||
let chunks: Vec<Chunk> = chunk_summaries
|
||||
let chunks: Vec<Chunk> = db
|
||||
.chunk_summaries()
|
||||
.into_iter()
|
||||
.map(|summary| summary.into())
|
||||
.collect();
|
||||
|
@ -207,8 +203,8 @@ impl management_service_server::ManagementService for ManagementService {
|
|||
.db(&db_name)
|
||||
.map_err(default_server_error_handler)?;
|
||||
|
||||
let partition_addrs = db.partition_addrs().map_err(default_db_error_handler)?;
|
||||
let partitions = partition_addrs
|
||||
let partitions = db
|
||||
.partition_addrs()
|
||||
.into_iter()
|
||||
.map(|addr| Partition {
|
||||
table_name: addr.table_name.to_string(),
|
||||
|
@ -234,9 +230,9 @@ impl management_service_server::ManagementService for ManagementService {
|
|||
.map_err(default_server_error_handler)?;
|
||||
|
||||
// TODO: get more actual partition details
|
||||
let partition_addrs = db.partition_addrs().map_err(default_db_error_handler)?;
|
||||
|
||||
let partition = partition_addrs
|
||||
let partition = db
|
||||
.partition_addrs()
|
||||
.iter()
|
||||
.find(|addr| addr.partition_key.as_ref() == partition_key)
|
||||
.map(|addr| Partition {
|
||||
|
|
|
@ -83,11 +83,10 @@ pub trait QueryChunkMeta: Sized {
|
|||
/// Databases store data organized by partitions and each partition stores
|
||||
/// data in Chunks.
|
||||
pub trait QueryDatabase: Debug + Send + Sync {
|
||||
type Error: std::error::Error + Send + Sync + 'static;
|
||||
type Chunk: QueryChunk;
|
||||
|
||||
/// Return the partition keys for data in this DB
|
||||
fn partition_addrs(&self) -> Result<Vec<PartitionAddr>, Self::Error>;
|
||||
fn partition_addrs(&self) -> Vec<PartitionAddr>;
|
||||
|
||||
/// Schema for a specific table if the table exists.
|
||||
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>>;
|
||||
|
@ -98,7 +97,7 @@ pub trait QueryDatabase: Debug + Send + Sync {
|
|||
fn chunks(&self, predicate: &Predicate) -> Vec<Arc<Self::Chunk>>;
|
||||
|
||||
/// Return a summary of all chunks in this database, in all partitions
|
||||
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>, Self::Error>;
|
||||
fn chunk_summaries(&self) -> Vec<ChunkSummary>;
|
||||
|
||||
/// Record that particular type of query was run / planned
|
||||
fn record_query(&self, query_type: impl Into<String>, query_text: impl Into<String>);
|
||||
|
|
|
@ -96,13 +96,12 @@ impl TestDatabase {
|
|||
}
|
||||
|
||||
impl QueryDatabase for TestDatabase {
|
||||
type Error = TestError;
|
||||
type Chunk = TestChunk;
|
||||
|
||||
/// Return the partition keys for data in this DB
|
||||
fn partition_addrs(&self) -> Result<Vec<PartitionAddr>, Self::Error> {
|
||||
fn partition_addrs(&self) -> Vec<PartitionAddr> {
|
||||
let partitions = self.partitions.lock();
|
||||
let addrs = partitions
|
||||
partitions
|
||||
.values()
|
||||
.filter_map(|chunks| {
|
||||
// each partition has some number of chunks which
|
||||
|
@ -113,9 +112,7 @@ impl QueryDatabase for TestDatabase {
|
|||
.next()
|
||||
.map(|chunk| chunk.addr().into_partition())
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(addrs)
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn chunks(&self, _predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
|
||||
|
@ -127,7 +124,7 @@ impl QueryDatabase for TestDatabase {
|
|||
.collect()
|
||||
}
|
||||
|
||||
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>, Self::Error> {
|
||||
fn chunk_summaries(&self) -> Vec<ChunkSummary> {
|
||||
unimplemented!("summaries not implemented TestDatabase")
|
||||
}
|
||||
|
||||
|
|
|
@ -434,7 +434,7 @@ pub async fn make_different_stage_chunks_with_deletes_scenario(
|
|||
// Make an open MUB
|
||||
write_lp(&db, &chunk_data.lp_lines.join("\n"));
|
||||
// 0 does not represent the real chunk id. It is here just to initialize the chunk_id variable for later assignment
|
||||
let mut chunk_id = db.chunk_summaries().unwrap()[0].id;
|
||||
let mut chunk_id = db.chunk_summaries()[0].id;
|
||||
|
||||
// ----------
|
||||
// freeze MUB
|
||||
|
|
|
@ -1727,7 +1727,7 @@ mod tests {
|
|||
db.store_write(&write).unwrap();
|
||||
|
||||
// get chunk ID
|
||||
let chunks = db.chunk_summaries().unwrap();
|
||||
let chunks = db.chunk_summaries();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
let chunk_id = chunks[0].id;
|
||||
|
||||
|
@ -1761,7 +1761,7 @@ mod tests {
|
|||
let db_name = DatabaseName::new("foo").unwrap();
|
||||
let db = server.db(&db_name).unwrap();
|
||||
|
||||
let chunk_summaries = db.chunk_summaries().unwrap();
|
||||
let chunk_summaries = db.chunk_summaries();
|
||||
assert_eq!(chunk_summaries.len(), 1);
|
||||
assert_eq!(chunk_summaries[0].storage, ChunkStorage::ReadBuffer);
|
||||
}
|
||||
|
|
|
@ -127,7 +127,7 @@ async fn delete_predicate_preservation() {
|
|||
loop {
|
||||
let did_delete_predicate_preservation =
|
||||
db.worker_iterations_delete_predicate_preservation() > iters_start;
|
||||
let did_compaction = db.chunk_summaries().unwrap().into_iter().any(|summary| {
|
||||
let did_compaction = db.chunk_summaries().into_iter().any(|summary| {
|
||||
(summary.partition_key.as_ref() == "part_c")
|
||||
&& (summary.storage == ChunkStorage::ReadBuffer)
|
||||
});
|
||||
|
|
|
@ -220,11 +220,7 @@ async fn write_buffer_lifecycle() {
|
|||
// As soon as replay finishes the lifecycle should have persisted everything in
|
||||
// table_1 into a single chunk. We should therefore have two chunks, one for
|
||||
// each of table_1 and table_2
|
||||
assert_eq!(
|
||||
db.chunk_summaries().unwrap().len(),
|
||||
2,
|
||||
"persisted during replay!"
|
||||
);
|
||||
assert_eq!(db.chunk_summaries().len(), 2, "persisted during replay!");
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -268,7 +264,6 @@ fn write_group2(write_buffer_state: &MockBufferSharedState) {
|
|||
|
||||
fn count_persisted_chunks(db: &Db) -> usize {
|
||||
db.chunk_summaries()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.filter(|x| {
|
||||
matches!(
|
||||
|
|
Loading…
Reference in New Issue