feat: chunk sorting (#1077)
parent
7154dfd5f6
commit
13612047c0
|
@ -7,7 +7,7 @@ edition = "2018"
|
||||||
[dependencies] # In alphabetical order
|
[dependencies] # In alphabetical order
|
||||||
arrow_deps = { path = "../arrow_deps" }
|
arrow_deps = { path = "../arrow_deps" }
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
bytes = "1.0"
|
bytes = { version = "1.0", features = ["serde"] }
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
crc32fast = "1.2.0"
|
crc32fast = "1.2.0"
|
||||||
data_types = { path = "../data_types" }
|
data_types = { path = "../data_types" }
|
||||||
|
|
|
@ -8,17 +8,13 @@ use std::sync::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use chrono::{DateTime, Utc};
|
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use snafu::{OptionExt, ResultExt, Snafu};
|
use snafu::{OptionExt, ResultExt, Snafu};
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
|
|
||||||
use arrow_deps::datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
|
use arrow_deps::datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
|
||||||
use catalog::{chunk::ChunkState, Catalog};
|
use catalog::{chunk::ChunkState, Catalog};
|
||||||
use data_types::{
|
use data_types::{chunk::ChunkSummary, database_rules::DatabaseRules};
|
||||||
chunk::ChunkSummary,
|
|
||||||
database_rules::{DatabaseRules, Order, Sort, SortOrder},
|
|
||||||
};
|
|
||||||
use internal_types::{data::ReplicatedWrite, selection::Selection};
|
use internal_types::{data::ReplicatedWrite, selection::Selection};
|
||||||
use query::{Database, DEFAULT_SCHEMA};
|
use query::{Database, DEFAULT_SCHEMA};
|
||||||
use read_buffer::Database as ReadBufferDb;
|
use read_buffer::Database as ReadBufferDb;
|
||||||
|
@ -149,7 +145,7 @@ const STARTING_SEQUENCE: u64 = 1;
|
||||||
/// Catalog Usage: the state of the catalog and the state of the `Db`
|
/// Catalog Usage: the state of the catalog and the state of the `Db`
|
||||||
/// must remain in sync. If they are ever out of sync, the IOx system
|
/// must remain in sync. If they are ever out of sync, the IOx system
|
||||||
/// should be shutdown and forced through a "recovery" to correctly
|
/// should be shutdown and forced through a "recovery" to correctly
|
||||||
/// recconcile the state.
|
/// reconcile the state.
|
||||||
///
|
///
|
||||||
/// Ensuring the Catalog and Db remain in sync is accomplished by
|
/// Ensuring the Catalog and Db remain in sync is accomplished by
|
||||||
/// manipulating the catalog state alongside the state in the `Db`
|
/// manipulating the catalog state alongside the state in the `Db`
|
||||||
|
@ -397,35 +393,6 @@ impl Db {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the partition_keys in the requested sort order
|
|
||||||
pub fn partition_keys_sorted_by(&self, sort_rules: &SortOrder) -> Vec<String> {
|
|
||||||
let mut partitions: Vec<(String, DateTime<Utc>, DateTime<Utc>)> = self
|
|
||||||
.catalog
|
|
||||||
.partitions()
|
|
||||||
.map(|p| {
|
|
||||||
let p = p.read();
|
|
||||||
(p.key().to_string(), p.created_at(), p.last_write_at())
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
match &sort_rules.sort {
|
|
||||||
Sort::CreatedAtTime => partitions.sort_by_key(|(_, created_at, _)| *created_at),
|
|
||||||
Sort::LastWriteTime => partitions.sort_by_key(|(_, _, last_write_at)| *last_write_at),
|
|
||||||
Sort::Column(_name, _data_type, _val) => {
|
|
||||||
unimplemented!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if sort_rules.order == Order::Desc {
|
|
||||||
partitions.reverse();
|
|
||||||
}
|
|
||||||
|
|
||||||
partitions
|
|
||||||
.into_iter()
|
|
||||||
.map(|(key, _, _)| key)
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the number of iterations of the background worker loop
|
/// Returns the number of iterations of the background worker loop
|
||||||
pub fn worker_iterations(&self) -> usize {
|
pub fn worker_iterations(&self) -> usize {
|
||||||
self.worker_iterations.load(Ordering::Relaxed)
|
self.worker_iterations.load(Ordering::Relaxed)
|
||||||
|
@ -574,7 +541,7 @@ mod tests {
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
chunk::ChunkStorage,
|
chunk::ChunkStorage,
|
||||||
database_rules::{LifecycleRules, SortOrder},
|
database_rules::{LifecycleRules, Order, Sort, SortOrder},
|
||||||
};
|
};
|
||||||
use query::{
|
use query::{
|
||||||
exec::Executor, frontend::sql::SQLQueryPlanner, test::TestLPWriter, PartitionChunk,
|
exec::Executor, frontend::sql::SQLQueryPlanner, test::TestLPWriter, PartitionChunk,
|
||||||
|
@ -836,7 +803,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn partitions_sorted_by_times() {
|
async fn chunks_sorted_by_times() {
|
||||||
let db = make_db();
|
let db = make_db();
|
||||||
let mut writer = TestLPWriter::default();
|
let mut writer = TestLPWriter::default();
|
||||||
writer.write_lp_string(&db, "cpu val=1 1").unwrap();
|
writer.write_lp_string(&db, "cpu val=1 1").unwrap();
|
||||||
|
@ -852,14 +819,23 @@ mod tests {
|
||||||
order: Order::Desc,
|
order: Order::Desc,
|
||||||
sort: Sort::LastWriteTime,
|
sort: Sort::LastWriteTime,
|
||||||
};
|
};
|
||||||
let partitions = db.partition_keys_sorted_by(&sort_rules);
|
let chunks = db.catalog.chunks_sorted_by(&sort_rules);
|
||||||
|
let partitions: Vec<_> = chunks
|
||||||
|
.into_iter()
|
||||||
|
.map(|x| x.read().key().to_string())
|
||||||
|
.collect();
|
||||||
|
|
||||||
assert_eq!(partitions, vec!["1970-01-05T15", "1970-01-01T00"]);
|
assert_eq!(partitions, vec!["1970-01-05T15", "1970-01-01T00"]);
|
||||||
|
|
||||||
let sort_rules = SortOrder {
|
let sort_rules = SortOrder {
|
||||||
order: Order::Asc,
|
order: Order::Asc,
|
||||||
sort: Sort::CreatedAtTime,
|
sort: Sort::CreatedAtTime,
|
||||||
};
|
};
|
||||||
let partitions = db.partition_keys_sorted_by(&sort_rules);
|
let chunks = db.catalog.chunks_sorted_by(&sort_rules);
|
||||||
|
let partitions: Vec<_> = chunks
|
||||||
|
.into_iter()
|
||||||
|
.map(|x| x.read().key().to_string())
|
||||||
|
.collect();
|
||||||
assert_eq!(partitions, vec!["1970-01-01T00", "1970-01-05T15"]);
|
assert_eq!(partitions, vec!["1970-01-01T00", "1970-01-05T15"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,8 @@ use parking_lot::RwLock;
|
||||||
use snafu::{OptionExt, Snafu};
|
use snafu::{OptionExt, Snafu};
|
||||||
|
|
||||||
use arrow_deps::datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider};
|
use arrow_deps::datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider};
|
||||||
|
use chunk::Chunk;
|
||||||
|
use data_types::database_rules::{Order, Sort, SortOrder};
|
||||||
use data_types::error::ErrorLogger;
|
use data_types::error::ErrorLogger;
|
||||||
use internal_types::selection::Selection;
|
use internal_types::selection::Selection;
|
||||||
use partition::Partition;
|
use partition::Partition;
|
||||||
|
@ -119,6 +121,38 @@ impl Catalog {
|
||||||
.cloned()
|
.cloned()
|
||||||
.context(UnknownPartition { partition_key })
|
.context(UnknownPartition { partition_key })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns all chunks within the catalog in an arbitrary order
|
||||||
|
pub fn chunks(&self) -> Vec<Arc<RwLock<Chunk>>> {
|
||||||
|
let mut chunks = Vec::new();
|
||||||
|
let partitions = self.partitions.read();
|
||||||
|
|
||||||
|
for partition in partitions.values() {
|
||||||
|
let partition = partition.read();
|
||||||
|
chunks.extend(partition.chunks().cloned())
|
||||||
|
}
|
||||||
|
chunks
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the chunks in the requested sort order
|
||||||
|
pub fn chunks_sorted_by(&self, sort_rules: &SortOrder) -> Vec<Arc<RwLock<Chunk>>> {
|
||||||
|
let mut chunks = self.chunks();
|
||||||
|
|
||||||
|
match &sort_rules.sort {
|
||||||
|
// The first write is technically not the created time but is in practice close enough
|
||||||
|
Sort::CreatedAtTime => chunks.sort_by_cached_key(|x| x.read().time_of_first_write()),
|
||||||
|
Sort::LastWriteTime => chunks.sort_by_cached_key(|x| x.read().time_of_last_write()),
|
||||||
|
Sort::Column(_name, _data_type, _val) => {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if sort_rules.order == Order::Desc {
|
||||||
|
chunks.reverse();
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SchemaProvider for Catalog {
|
impl SchemaProvider for Catalog {
|
||||||
|
|
Loading…
Reference in New Issue