diff --git a/server/Cargo.toml b/server/Cargo.toml index 7c020bc999..5673fe0baa 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] # In alphabetical order arrow_deps = { path = "../arrow_deps" } async-trait = "0.1" -bytes = "1.0" +bytes = { version = "1.0", features = ["serde"] } chrono = "0.4" crc32fast = "1.2.0" data_types = { path = "../data_types" } diff --git a/server/src/db.rs b/server/src/db.rs index f37e9eb632..9d6c647aa4 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -8,17 +8,13 @@ use std::sync::{ }; use async_trait::async_trait; -use chrono::{DateTime, Utc}; use parking_lot::Mutex; use snafu::{OptionExt, ResultExt, Snafu}; use tracing::{debug, info}; use arrow_deps::datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider}; use catalog::{chunk::ChunkState, Catalog}; -use data_types::{ - chunk::ChunkSummary, - database_rules::{DatabaseRules, Order, Sort, SortOrder}, -}; +use data_types::{chunk::ChunkSummary, database_rules::DatabaseRules}; use internal_types::{data::ReplicatedWrite, selection::Selection}; use query::{Database, DEFAULT_SCHEMA}; use read_buffer::Database as ReadBufferDb; @@ -149,7 +145,7 @@ const STARTING_SEQUENCE: u64 = 1; /// Catalog Usage: the state of the catalog and the state of the `Db` /// must remain in sync. If they are ever out of sync, the IOx system /// should be shutdown and forced through a "recovery" to correctly -/// recconcile the state. +/// reconcile the state. /// /// Ensuring the Catalog and Db remain in sync is accomplished by /// manipulating the catalog state alongside the state in the `Db` @@ -397,35 +393,6 @@ impl Db { }) } - /// Returns the partition_keys in the requested sort order - pub fn partition_keys_sorted_by(&self, sort_rules: &SortOrder) -> Vec { - let mut partitions: Vec<(String, DateTime, DateTime)> = self - .catalog - .partitions() - .map(|p| { - let p = p.read(); - (p.key().to_string(), p.created_at(), p.last_write_at()) - }) - .collect(); - - match &sort_rules.sort { - Sort::CreatedAtTime => partitions.sort_by_key(|(_, created_at, _)| *created_at), - Sort::LastWriteTime => partitions.sort_by_key(|(_, _, last_write_at)| *last_write_at), - Sort::Column(_name, _data_type, _val) => { - unimplemented!() - } - } - - if sort_rules.order == Order::Desc { - partitions.reverse(); - } - - partitions - .into_iter() - .map(|(key, _, _)| key) - .collect::>() - } - /// Returns the number of iterations of the background worker loop pub fn worker_iterations(&self) -> usize { self.worker_iterations.load(Ordering::Relaxed) @@ -574,7 +541,7 @@ mod tests { use chrono::Utc; use data_types::{ chunk::ChunkStorage, - database_rules::{LifecycleRules, SortOrder}, + database_rules::{LifecycleRules, Order, Sort, SortOrder}, }; use query::{ exec::Executor, frontend::sql::SQLQueryPlanner, test::TestLPWriter, PartitionChunk, @@ -836,7 +803,7 @@ mod tests { } #[tokio::test] - async fn partitions_sorted_by_times() { + async fn chunks_sorted_by_times() { let db = make_db(); let mut writer = TestLPWriter::default(); writer.write_lp_string(&db, "cpu val=1 1").unwrap(); @@ -852,14 +819,23 @@ mod tests { order: Order::Desc, sort: Sort::LastWriteTime, }; - let partitions = db.partition_keys_sorted_by(&sort_rules); + let chunks = db.catalog.chunks_sorted_by(&sort_rules); + let partitions: Vec<_> = chunks + .into_iter() + .map(|x| x.read().key().to_string()) + .collect(); + assert_eq!(partitions, vec!["1970-01-05T15", "1970-01-01T00"]); let sort_rules = SortOrder { order: Order::Asc, sort: Sort::CreatedAtTime, }; - let partitions = db.partition_keys_sorted_by(&sort_rules); + let chunks = db.catalog.chunks_sorted_by(&sort_rules); + let partitions: Vec<_> = chunks + .into_iter() + .map(|x| x.read().key().to_string()) + .collect(); assert_eq!(partitions, vec!["1970-01-01T00", "1970-01-05T15"]); } diff --git a/server/src/db/catalog.rs b/server/src/db/catalog.rs index c34b29cdee..62ff593adc 100644 --- a/server/src/db/catalog.rs +++ b/server/src/db/catalog.rs @@ -9,6 +9,8 @@ use parking_lot::RwLock; use snafu::{OptionExt, Snafu}; use arrow_deps::datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider}; +use chunk::Chunk; +use data_types::database_rules::{Order, Sort, SortOrder}; use data_types::error::ErrorLogger; use internal_types::selection::Selection; use partition::Partition; @@ -119,6 +121,38 @@ impl Catalog { .cloned() .context(UnknownPartition { partition_key }) } + + /// Returns all chunks within the catalog in an arbitrary order + pub fn chunks(&self) -> Vec>> { + let mut chunks = Vec::new(); + let partitions = self.partitions.read(); + + for partition in partitions.values() { + let partition = partition.read(); + chunks.extend(partition.chunks().cloned()) + } + chunks + } + + /// Returns the chunks in the requested sort order + pub fn chunks_sorted_by(&self, sort_rules: &SortOrder) -> Vec>> { + let mut chunks = self.chunks(); + + match &sort_rules.sort { + // The first write is technically not the created time but is in practice close enough + Sort::CreatedAtTime => chunks.sort_by_cached_key(|x| x.read().time_of_first_write()), + Sort::LastWriteTime => chunks.sort_by_cached_key(|x| x.read().time_of_last_write()), + Sort::Column(_name, _data_type, _val) => { + unimplemented!() + } + } + + if sort_rules.order == Order::Desc { + chunks.reverse(); + } + + chunks + } } impl SchemaProvider for Catalog {